# Imports

In [12]:
import glob, os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader, random_split, Dataset

# Mock Data

# Model Creation

In [2]:
class CNNUnit(nn.Module):
    def __init__(self, in_channels, out_channels=16):
        super(CNNUnit, self).__init__()
        
        # First 3x3 Conv Layer
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.gelu1 = nn.GELU()
        
        # Second 3x3 Conv Layer 
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.gelu2 = nn.GELU()

        # Third 3x3 Conv Layer with Stride 2 (Downsampling)
        self.conv3 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=2, padding=1)
        self.bn3 = nn.BatchNorm2d(out_channels)
        self.gelu3 = nn.GELU()

    def forward(self, x):

        # First Conv2D + GELU + BatchNorm
        x = self.gelu1(self.bn1(self.conv1(x)))  
        
        # Second Conv2D + GELU + BatchNorm
        x = self.gelu2(self.bn2(self.conv2(x)))  
        
        # Third Conv2D (Stride=2 for downsampling) + GELU + BatchNorm
        x = self.gelu3(self.bn3(self.conv3(x)))  
        return x

In [3]:
class CNNLayer(nn.Module):
    def __init__(self, in_channels, out_channels=24):
        super(CNNLayer, self).__init__()

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=2, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.gelu1 = nn.GELU()
        
    def forward(self, x):

        x = self.gelu1(self.bn1(self.conv1(x)))
        return x

In [4]:
class LocalInformationLayer(nn.Module):
    def __init__(self, in_channels=24):
        super(LocalInformationLayer, self).__init__()
        self.conv = nn.Conv2d(in_channels, in_channels, kernel_size=3, padding=1, bias=False)

    def forward(self, x):
        return self.conv(x) + x

In [5]:
class SMHSA(nn.Module):
    def __init__(self, dim, heads=8, kernel_size=3, stride=2, bias=True):
        super(SMHSA, self).__init__()

        assert dim % heads == 0, "dim must be divisible by heads"

        self.dim = dim
        self.heads = heads
        self.scale = (dim // heads) ** -0.5  # Scaling factor for attention

        # Linear layers to compute Query, Key, and Value
        self.to_qkv = nn.Linear(dim, dim * 3, bias=False)

        # Depth-wise convolution for Q and K downsampling
        self.spatial_reduction = nn.Conv2d(
            dim, dim, kernel_size=kernel_size, stride=stride, padding=kernel_size//2, 
            groups=dim, bias=False
        )

        # Initialize bias with proper shape for attention
        self.bias = nn.Parameter(torch.zeros(1, heads, 1, 1)) if bias else None

        # Final projection after self-attention
        self.to_out = nn.Linear(dim, dim)

    def forward(self, x):
        B, C, H, W = x.shape
        
        # Reshape input to sequence format
        x = x.permute(0, 2, 3, 1).reshape(B, H * W, C)  # (B, seq_len, C)

        # Compute Query, Key, and Value
        qkv = self.to_qkv(x)
        q, k, v = qkv.chunk(3, dim=-1)

        # Reshape for multi-head attention
        q = q.view(B, -1, self.heads, C // self.heads).permute(0, 2, 1, 3)
        k = k.view(B, -1, self.heads, C // self.heads).permute(0, 2, 1, 3)
        v = v.view(B, -1, self.heads, C // self.heads).permute(0, 2, 1, 3)

        # 🔹 **Fix: Ensure correct reshaping before Conv2D**
        q = q.permute(0, 1, 3, 2).reshape(B, C, H, W)
        k = k.permute(0, 1, 3, 2).reshape(B, C, H, W)

        # Apply spatial reduction
        q = self.spatial_reduction(q)
        k = self.spatial_reduction(k)

        # Get reduced spatial dimensions after Conv2D
        H_reduced, W_reduced = q.shape[2], q.shape[3]

        # Reshape back for attention
        q = q.reshape(B, self.heads, C // self.heads, H_reduced * W_reduced).permute(0, 1, 3, 2)
        k = k.reshape(B, self.heads, C // self.heads, H_reduced * W_reduced).permute(0, 1, 3, 2)

        # Compute attention scores
        attn = torch.matmul(q, k.transpose(-2, -1)) * self.scale

        # Apply bias if enabled
        if self.bias is not None:
            # Expand bias to match attention dimensions
            bias_expanded = self.bias.expand(B, self.heads, H_reduced * W_reduced, H_reduced * W_reduced)
            attn = attn + bias_expanded

        attn = F.softmax(attn, dim=-1)

        # Apply attention to Value
        out = torch.matmul(attn, v)

        # Reshape output back to spatial format
        out = out.permute(0, 2, 1, 3).reshape(B, H_reduced * W_reduced, C)
        out = self.to_out(out)
        out = out.reshape(B, H_reduced, W_reduced, C).permute(0, 3, 1, 2)

        return out


In [6]:
class RFFN(nn.Module):
    def __init__(self, dim, expansion=4):
        super(RFFN, self).__init__()
        
        hidden_dim = dim * expansion

        self.conv1 = nn.Conv2d(dim, hidden_dim, kernel_size=1, padding=0, bias=False)
        self.bn1 = nn.BatchNorm2d(hidden_dim)
        self.gelu1 = nn.GELU()

        self.conv2 = nn.Conv2d(hidden_dim, hidden_dim, kernel_size=3, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(hidden_dim)
        self.gelu2 = nn.GELU()

        self.conv3 = nn.Conv2d(hidden_dim, dim, kernel_size=1, padding=0, bias=False)
        self.bn3 = nn.BatchNorm2d(dim)

    def forward(self, x):

        residual = x
        x = self.gelu1(self.bn1(self.conv1(x)))
        x = self.gelu2(self.bn2(self.conv2(x)))
        x = self.bn3(self.conv3(x))

        return x + residual
        

In [7]:
import torch
import torch.nn as nn

class FC(nn.Module):
    def __init__(self, in_features):
        """
        Fully Connected (FC) Layer for Classification

        Args:
        - in_features: Number of input features (from GAP output, i.e., number of channels)
        """
        super(FC, self).__init__()

        self.pool = nn.AdaptiveAvgPool2d((1, 1))  # Global Average Pooling

        self.fc1 = nn.Linear(in_features, 512)  # First Fully Connected Layer
        self.relu1 = nn.ReLU()
        self.dropout = nn.Dropout(0.5)  # Dropout to prevent overfitting
        self.fc2 = nn.Linear(512, 2)  # Output Layer (Binary Classification)

    def forward(self, x):
        x = self.pool(x)  # Apply Global Average Pooling
        x = torch.flatten(x, start_dim=1)  # Flatten from (B, C, 1, 1) -> (B, C)
        x = self.relu1(self.fc1(x))  # First Fully Connected Layer + ReLU
        x = self.dropout(x)  # Apply Dropout
        x = self.fc2(x)  # Final Output Layer
        return x


In [15]:
class TGCNN(nn.Module):
    def __init__(self):
        super(TGCNN, self).__init__()

        # CNN Layers
        self.cnn_unit = CNNUnit(in_channels=22, out_channels=16)
        self.cnn_layer1 = CNNLayer(in_channels=16, out_channels=24)
        self.cnn_layer2 = CNNLayer(in_channels=24, out_channels=32)

        # Transformer Block 1
        self.LIL1 = LocalInformationLayer(in_channels=24)
        self.norm1a = nn.LayerNorm([24, 2, 320])

        self.multiheaded_att1 = SMHSA(dim=24, stride=1)  # Adjust stride if needed
        self.norm1b = nn.LayerNorm([24, 2, 320])
        self.recurrent1 = RFFN(dim=24)

        # Transformer Block 2
        self.LIL2 = LocalInformationLayer(in_channels=32)
        self.norm1b = nn.LayerNorm([32, 1, 160])
        self.multiheaded_att2 = SMHSA(dim=32, stride=1)  # Adjust stride if needed
        self.norm1b = nn.LayerNorm([32, 1, 160])
        self.recurrent2 = RFFN(dim=32)

        # Global Average Pooling + Fully Connected Layer
        self.fc = FC(in_features=32)  # Matches last CNN layer's output channels

    def forward(self, x):
        x = x.unsqueeze(2)
        x = self.cnn_unit(x)      
        x = self.cnn_layer1(x)    
        x = self.LIL1(x)          
        x = self.multiheaded_att1(x)
        x = self.recurrent1(x)    
        x = self.cnn_layer2(x)    
        x = self.LIL2(x)          
        x = self.multiheaded_att2(x)
        x = self.recurrent2(x)    
        x = self.fc(x)

        return x


In [17]:
data = torch.rand(100,22,1280)
model = TGCNN()
output = model(data)
print(output.shape)

torch.Size([100, 2])


# Load in Data

In [23]:
import pickle
pkl_file = os.path.join("processed_data", "final_data.pkl")
with open(pkl_file, "rb") as f:
    data = pickle.load(f)

X = data["X"]          # shape (N, 22, 1471)
y = data["y"]          # shape (N,)

print("Loaded:", X.shape, y.shape)

X = torch.tensor(X, dtype=torch.float32)   # <-- cast to float32
y = torch.tensor(y, dtype=torch.long)  

dataset = TensorDataset(X, y)

train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_data, test_data = random_split(dataset, [train_size, test_size])

Loaded: (1601, 22, 1471) (1601,)


# ML

In [24]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
from sklearn.model_selection import train_test_split

# Check for GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# # Example Data (Ensure your real data is loaded correctly)
# spectrogram_data = np.random.rand(1000, 28, 5, 1280)  # Simulated input
# y = np.random.randint(0, 2, (1000,))  # Simulated binary classification labels

# # Train-Test Split
# X_train, X_test, y_train, y_test = train_test_split(spectrogram_data, y, test_size=0.2, random_state=42)

# # Convert to PyTorch Tensors
# X_train = torch.tensor(X_train, dtype=torch.float32).to(device)
# X_test = torch.tensor(X_test, dtype=torch.float32).to(device)
# y_train = torch.tensor(y_train, dtype=torch.long).to(device)
# y_test = torch.tensor(y_test, dtype=torch.long).to(device)

# # Create DataLoader objects
batch_size = 32
# train_dataset = TensorDataset(X_train, y_train)
# test_dataset = TensorDataset(X_test, y_test)

train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)

# Model, Loss Function, and Optimizer
model = TGCNN().to(device)  # Move model to GPU if available
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training Parameters
num_epochs = 20  # Adjust based on dataset size
print_interval = 5  # Print every 5 batches

# 🚀 Training Loop
for epoch in range(num_epochs):
    model.train()  # Set model to training mode
    total_loss = 0
    correct = 0
    total = 0
    
    for batch_idx, (inputs, targets) in enumerate(train_loader):
        inputs, targets = inputs.to(device), targets.to(device)
        
        optimizer.zero_grad()  # Reset gradients
        outputs = model(inputs)  # Forward pass
        
        loss = criterion(outputs, targets)  # Compute loss
        loss.backward()  # Backpropagation
        
        nn.utils.clip_grad_norm_(model.parameters(), max_norm=5)  # Optional: Gradient clipping
        optimizer.step()  # Update weights
        
        total_loss += loss.item()
        predicted = torch.argmax(outputs, dim=1)
        correct += (predicted == targets).sum().item()
        total += targets.size(0)

        if batch_idx % print_interval == 0:
            print(f"Epoch [{epoch+1}/{num_epochs}], Step [{batch_idx+1}/{len(train_loader)}], Loss: {loss.item():.4f}")

    # Print epoch summary
    print(f"🔥 Epoch [{epoch+1}/{num_epochs}] -> Loss: {total_loss/len(train_loader):.4f}, Accuracy: {100 * correct / total:.2f}%")

# Save the trained model
torch.save(model.state_dict(), "TGCNN_model.pth")
print("✅ Model saved successfully!")

# 🚀 Evaluation Function
def evaluate_model(model, test_loader):
    model.eval()  # Set model to evaluation mode
    correct = 0
    total = 0
    all_preds = []
    all_labels = []
    
    with torch.no_grad():  # Disable gradient calculation
        for inputs, targets in test_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            
            predicted = torch.argmax(outputs, dim=1)
            correct += (predicted == targets).sum().item()
            total += targets.size(0)
            
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(targets.cpu().numpy())

    accuracy = 100 * correct / total
    print(f"🎯 Test Accuracy: {accuracy:.2f}%")
    return accuracy

# Run Evaluation
evaluate_model(model, test_loader)


Using device: cuda
Epoch [1/20], Step [1/40], Loss: 0.8719
Epoch [1/20], Step [6/40], Loss: 0.2999
Epoch [1/20], Step [11/40], Loss: 0.2387
Epoch [1/20], Step [16/40], Loss: 0.1502
Epoch [1/20], Step [21/40], Loss: 0.1054
Epoch [1/20], Step [26/40], Loss: 0.2723
Epoch [1/20], Step [31/40], Loss: 0.4487
Epoch [1/20], Step [36/40], Loss: 0.6301
🔥 Epoch [1/20] -> Loss: 0.2839, Accuracy: 90.94%
Epoch [2/20], Step [1/40], Loss: 0.1918
Epoch [2/20], Step [6/40], Loss: 0.4778
Epoch [2/20], Step [11/40], Loss: 0.1846
Epoch [2/20], Step [16/40], Loss: 0.6642
Epoch [2/20], Step [21/40], Loss: 0.0866
Epoch [2/20], Step [26/40], Loss: 0.1190
Epoch [2/20], Step [31/40], Loss: 0.5692
Epoch [2/20], Step [36/40], Loss: 0.0973
🔥 Epoch [2/20] -> Loss: 0.2393, Accuracy: 93.28%
Epoch [3/20], Step [1/40], Loss: 0.3889
Epoch [3/20], Step [6/40], Loss: 0.2395
Epoch [3/20], Step [11/40], Loss: 0.1369
Epoch [3/20], Step [16/40], Loss: 0.4024
Epoch [3/20], Step [21/40], Loss: 0.2107
Epoch [3/20], Step [26/40], 

92.5233644859813

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
from sklearn.model_selection import train_test_split

# Check for GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Example Data (Ensure your real data is loaded correctly)
spectrogram_data = np.random.rand(1000, 28, 5, 1280)  # Simulated input
y = np.random.randint(0, 2, (1000,))  # Simulated binary classification labels

# Train-Test Split
X_train, X_test, y_train, y_test = train_test_split(spectrogram_data, y, test_size=0.2, random_state=42)

# Convert to PyTorch Tensors
X_train = torch.tensor(X_train, dtype=torch.float32).to(device)
X_test = torch.tensor(X_test, dtype=torch.float32).to(device)
y_train = torch.tensor(y_train, dtype=torch.long).to(device)
y_test = torch.tensor(y_test, dtype=torch.long).to(device)

# Create DataLoader objects
batch_size = 32
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Model, Loss Function, and Optimizer
model = TGCNN().to(device)  # Move model to GPU if available
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training Parameters
num_epochs = 20  # Adjust based on dataset size
print_interval = 5  # Print every 5 batches

# 🚀 Training Loop
for epoch in range(num_epochs):
    model.train()  # Set model to training mode
    total_loss = 0
    correct = 0
    total = 0
    
    for batch_idx, (inputs, targets) in enumerate(train_loader):
        inputs, targets = inputs.to(device), targets.to(device)
        
        optimizer.zero_grad()  # Reset gradients
        outputs = model(inputs)  # Forward pass
        
        loss = criterion(outputs, targets)  # Compute loss
        loss.backward()  # Backpropagation
        
        nn.utils.clip_grad_norm_(model.parameters(), max_norm=5)  # Optional: Gradient clipping
        optimizer.step()  # Update weights
        
        total_loss += loss.item()
        predicted = torch.argmax(outputs, dim=1)
        correct += (predicted == targets).sum().item()
        total += targets.size(0)

        if batch_idx % print_interval == 0:
            print(f"Epoch [{epoch+1}/{num_epochs}], Step [{batch_idx+1}/{len(train_loader)}], Loss: {loss.item():.4f}")

    # Print epoch summary
    print(f"🔥 Epoch [{epoch+1}/{num_epochs}] -> Loss: {total_loss/len(train_loader):.4f}, Accuracy: {100 * correct / total:.2f}%")

# Save the trained model
torch.save(model.state_dict(), "TGCNN_model.pth")
print("✅ Model saved successfully!")

# 🚀 Evaluation Function
def evaluate_model(model, test_loader):
    model.eval()  # Set model to evaluation mode
    correct = 0
    total = 0
    all_preds = []
    all_labels = []
    
    with torch.no_grad():  # Disable gradient calculation
        for inputs, targets in test_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            
            predicted = torch.argmax(outputs, dim=1)
            correct += (predicted == targets).sum().item()
            total += targets.size(0)
            
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(targets.cpu().numpy())

    accuracy = 100 * correct / total
    print(f"🎯 Test Accuracy: {accuracy:.2f}%")
    return accuracy

# Run Evaluation
evaluate_model(model, test_loader)


In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
from sklearn.model_selection import train_test_split

# Check for GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cpu
