In [None]:
from torchvision.datasets import DatasetFolder
import torch
import numpy as np
from torch.utils.data import DataLoader, random_split

from torchvision import transforms
import torch
import numpy as np
from torchvision.datasets import DatasetFolder


# if you get a FileNotFoundError it might be cause by a directory called iopynb checkpoint thats created for some reason. Torch considers it as a class and raises an exception
#FileNotFoundError: Found no valid file for the classes .ipynb_checkpoints. Supported extensions are: .npy
#remove the directory accordingly
device = torch.device('cuda:0')
# load the stft dataset from folder

path = "spectrograms"

# loader function for .npy files
def npy_loader(path):
    x = np.load(path)
    x = np.expand_dims(x, axis=0)
    return torch.tensor(x, dtype=torch.float32)
    
# augmentations for spectrograms (treat them like grayscale images)
train_transforms = transforms.Compose([
    transforms.RandomApply([
        transforms.RandomAffine(degrees=10, translate=(0.05, 0.05), scale=(0.95, 1.05))
    ], p=0.7),
    transforms.RandomApply([
        transforms.RandomErasing(p=0.5, scale=(0.01, 0.1), ratio=(0.3, 3.3))
    ], p=0.5),
    transforms.RandomHorizontalFlip(p=0.5),  # flips time axis
    transforms.RandomVerticalFlip(p=0.1),    # flips frequency axis
    transforms.Lambda(lambda x: x + 0.01 * torch.randn_like(x)),  # Gaussian noise
])

val_transforms = transforms.Compose([])  # no augmentations for validation

dataset = DatasetFolder(root=path, loader=npy_loader, extensions=[".npy"],transform=train_transforms)

train_ratio = 0.8
val_ratio = 0.2
dataset_size = len(dataset)
train_size = int(train_ratio * dataset_size)
val_size = dataset_size - train_size

# Split the dataset
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

from torch.nn.utils.rnn import pad_sequence

def collate_fn(batch):
    inputs, labels = zip(*batch)
    # inputs: list of tensors (1, freq, time)
    max_len = max(x.shape[-1] for x in inputs)
    padded = []
    for x in inputs:
        pad_len = max_len - x.shape[-1]
        if pad_len > 0:
            x = torch.nn.functional.pad(x, (0, pad_len))  # pad time dimension
        padded.append(x)
    return torch.stack(padded), torch.tensor(labels)
    
train_loader = DataLoader(
    train_dataset,
    batch_size=32,
    shuffle=True,      
    num_workers=0,
    collate_fn=collate_fn
)

val_loader = DataLoader(
    val_dataset,
    batch_size=32,
    shuffle=False,     
    num_workers=0,
    collate_fn=collate_fn
)
print("Finished loading the spectrograms")

In [None]:
print(len(train_loader))
print(len(val_loader))
dataset

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Get labels for train and val
train_labels = [dataset[i][1] for i in train_dataset.indices]
val_labels = [dataset[i][1] for i in val_dataset.indices]

# Plot side by side
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))

ax1.hist(train_labels, bins=np.arange(len(dataset.classes) + 1) - 0.5, edgecolor='black')
ax1.set_xlabel('Class')
ax1.set_ylabel('Number of Samples')
ax1.set_title(f'Training Set Distribution (n={len(train_labels)})')
ax1.set_xticks(range(len(dataset.classes)))
ax1.set_xticklabels(dataset.classes, rotation=45)
ax1.grid(axis='y', alpha=0.3)

ax2.hist(val_labels, bins=np.arange(len(dataset.classes) + 1) - 0.5, edgecolor='black')
ax2.set_xlabel('Class')
ax2.set_ylabel('Number of Samples')
ax2.set_title(f'Validation Set Distribution (n={len(val_labels)})')
ax2.set_xticks(range(len(dataset.classes)))
ax2.set_xticklabels(dataset.classes, rotation=45)
ax2.grid(axis='y', alpha=0.3)

plt.tight_layout()
plt.show()

In [None]:
import torch
import torch.nn as nn

class EmotionDetector(nn.Module):
    def __init__(self, layers, channels_in, channels_out, kernel_size, num_classes=6, n_fft=2048):
        super().__init__()

        self.convs = nn.ModuleList()

        self.layers = layers
        self.channels_out = channels_out
        self.channels_in = channels_in
        self.kernel_size = kernel_size
        self.channels_mult = channels_out

        for i in range(layers):
            self.convs.append(
                nn.Sequential(
                    nn.Conv2d(self.channels_in, self.channels_out, self.kernel_size, padding="same"),
                    nn.BatchNorm2d(self.channels_out),
                    nn.ReLU(inplace=True),
                    nn.MaxPool2d(2),
                )
            )
            self.channels_in = self.channels_out
            self.channels_out = self.channels_mult * self.channels_in

        self.freq_pool = nn.AdaptiveAvgPool2d((None, 256))
        # self.lstm_input_size = 64 * 8
        # self.rnn = nn.GRU(self.lstm_input_size, 16, batch_first=True, bidirectional=True)
        # self.fc = nn.Linear(16*2, num_classes)
        self.fc1 = nn.Linear(262144, num_classes)

    def forward(self, x):
        batch_size = x.size(0)

        for conv in self.convs:
            x = conv(x)
        # x = self.freq_pool(x)
        x = self.freq_pool(x)

        x = x.flatten(start_dim=1)
        # x = x.permute(0, 3, 1, 2).contiguous().view(batch_size, x.size(3), -1)
        # x, _ = self.rnn(x)
        # x = x.mean(dim=1)
        x = self.fc1(x)
        return x

print("Memory-optimized model ready!")


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class DepthwiseSeparableConv(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()
        self.depthwise = nn.Conv2d(in_channels, in_channels, kernel_size=3,
                                   stride=stride, padding=1, groups=in_channels, bias=False)
        self.pointwise = nn.Conv2d(in_channels, out_channels, kernel_size=1, bias=False)
        self.bn = nn.BatchNorm2d(out_channels)
        self.act = nn.ReLU6(inplace=True)

    def forward(self, x):
        x = self.depthwise(x)
        x = self.pointwise(x)
        x = self.bn(x)
        return self.act(x)

class ResidualBlockLite(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()
        self.conv1 = DepthwiseSeparableConv(in_channels, out_channels, stride)
        self.conv2 = DepthwiseSeparableConv(out_channels, out_channels, 1)
        self.downsample = None
        if stride != 1 or in_channels != out_channels:
            self.downsample = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )
        self.act = nn.ReLU6(inplace=True)

    def forward(self, x):
        identity = x
        out = self.conv1(x)
        out = self.conv2(out)
        if self.downsample:
            identity = self.downsample(x)
        return self.act(out + identity)

class EmotionResNetLite(nn.Module):
    def __init__(self, layers=(1, 1, 1, 1), channels_in=1, base_channels=32, num_classes=6):
        super().__init__()
        self.in_channels = base_channels

        self.stem = nn.Sequential(
            nn.Conv2d(channels_in, base_channels, kernel_size=3, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(base_channels),
            nn.ReLU6(inplace=True),
        )

        self.layer1 = self._make_layer(base_channels, layers[0])
        self.layer2 = self._make_layer(base_channels * 2, layers[1], stride=2)
        self.layer3 = self._make_layer(base_channels * 4, layers[2], stride=2)
        self.layer4 = self._make_layer(base_channels * 8, layers[3], stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Linear(base_channels * 8, num_classes)

    def _make_layer(self, out_channels, blocks, stride=1):
        layers = [ResidualBlockLite(self.in_channels, out_channels, stride)]
        self.in_channels = out_channels
        for _ in range(1, blocks):
            layers.append(ResidualBlockLite(out_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.stem(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        return self.fc(x)

print("Lightweight EmotionResNet ready.")


In [None]:
import torch
import torch.nn as nn

class EmotionDetector2(nn.Module):
    def __init__(self, layers, channels_in, channels_out, kernel_size, num_classes=6, n_fft=2048, lstm_hidden=128, lstm_layers=2):
        super().__init__()
        self.convs = nn.ModuleList()
        self.layers = layers
        self.channels_out = channels_out
        self.channels_in = channels_in
        self.kernel_size = kernel_size
        self.channels_mult = channels_out
        
        # Build convolutional layers
        for i in range(layers):
            self.convs.append(
                nn.Sequential(
                    nn.Conv2d(self.channels_in, self.channels_out, self.kernel_size, padding="same"),
                    nn.BatchNorm2d(self.channels_out),
                    nn.ReLU(inplace=True),
                    nn.MaxPool2d(2),
                )
            )
            self.channels_in = self.channels_out
            self.channels_out = self.channels_mult * self.channels_in
        
        # Frequency attention mechanism
        # Takes channel features and outputs attention scores for each frequency bin
        self.freq_attention = nn.Sequential(
            nn.Linear(self.channels_in, self.channels_in // 4),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(self.channels_in // 4, 1)
        )
        
        # LSTM layer processes temporal sequence with attended frequency features
        self.lstm_input_size = self.channels_in
        self.lstm_hidden = lstm_hidden
        self.lstm = nn.LSTM(
            input_size=self.lstm_input_size,
            hidden_size=lstm_hidden,
            num_layers=lstm_layers,
            batch_first=True,
            bidirectional=True,
            dropout=0.3 if lstm_layers > 1 else 0
        )
        
        # Final classification layer
        # *2 because bidirectional LSTM
        self.fc = nn.Linear(lstm_hidden * 2, num_classes)
        
    def forward(self, x):
        batch_size = x.size(0)
        
        # Convolutional feature extraction
        # Input: (batch, 1, freq, time)
        for conv in self.convs:
            x = conv(x)
        # After convs: (batch, channels, freq, time)
        
        batch, channels, freq, time = x.shape
        
        # Reshape for attention: (batch, time, freq, channels)
        x = x.permute(0, 3, 2, 1).contiguous()
        
        # Compute attention scores for each frequency bin at each time step
        # Input to attention: (batch, time, freq, channels)
        attn_scores = self.freq_attention(x)  # (batch, time, freq, 1)
        
        # Apply softmax over frequency dimension to get attention weights
        attn_weights = torch.softmax(attn_scores, dim=2)  # (batch, time, freq, 1)
        
        # Apply attention weights: weighted sum over frequency dimension
        x_attended = (x * attn_weights).sum(dim=2)  # (batch, time, channels)
        
        # Now x_attended has shape (batch, time, channels) - perfect for LSTM
        # LSTM processing
        lstm_out, (h_n, c_n) = self.lstm(x_attended)
        # lstm_out: (batch, time, lstm_hidden*2)
        # h_n: (num_layers*2, batch, lstm_hidden)
        
        # Use the last hidden state from both directions
        h_forward = h_n[-2, :, :]   # Forward direction, last layer
        h_backward = h_n[-1, :, :]  # Backward direction, last layer
        x = torch.cat([h_forward, h_backward], dim=1)  # (batch, lstm_hidden*2)
        
        # Alternative: Mean pooling over time (uncomment to use)
        # x = lstm_out.mean(dim=1)  # (batch, lstm_hidden*2)
        
        # Classification
        x = self.fc(x)
        
        return x

print("Attention-enhanced LSTM model ready!")

"""
hyperparameters to Tune

lstm_hidden: Number of LSTM hidden units (default 128)
lstm_layers: Number of stacked LSTM layers (default 2)
"""

In [None]:
import torch
from tqdm import tqdm
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
import matplotlib.pyplot as plt
import gc


def train_model(model, train_loader, val_loader, num_epochs, lr, device='cpu'):
    """
    Train a CNN+LSTM model for emotion recognition with progress bars.

    Args:
        model: PyTorch nn.Module
        train_loader: DataLoader for training set
        val_loader: DataLoader for validation set
        num_epochs: number of epochs
        lr: learning rate
        device: 'cuda' or 'cpu'
    """

    train_losses, val_losses = [], []
    train_accs, val_accs = [], []

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        # Training loop with tqdm
        train_loader_iter = tqdm(train_loader, desc=f"Epoch {epoch + 1}/{num_epochs} [Train]", leave=False)
        for inputs, labels in train_loader_iter:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)
            _, predicted = outputs.max(1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

            train_loader_iter.set_postfix({'loss': running_loss / total, 'acc': correct / total})

        epoch_loss = running_loss / total
        epoch_acc = correct / total
        train_losses.append(epoch_loss)
        train_accs.append(epoch_acc)

        # Validation loop with tqdm
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0
        val_loader_iter = tqdm(val_loader, desc=f"Epoch {epoch + 1}/{num_epochs} [Val]", leave=False)
        with torch.no_grad():
            for val_inputs, val_labels in val_loader_iter:
                val_inputs, val_labels = val_inputs.to(device), val_labels.to(device)

                val_outputs = model(val_inputs)
                v_loss = criterion(val_outputs, val_labels)

                val_loss += v_loss.item() * val_inputs.size(0)
                _, val_pred = val_outputs.max(1)
                val_correct += (val_pred == val_labels).sum().item()
                val_total += val_labels.size(0)

                val_loader_iter.set_postfix({'val_loss': val_loss / val_total, 'val_acc': val_correct / val_total})

        val_loss /= val_total
        val_acc = val_correct / val_total
        val_losses.append(val_loss)
        val_accs.append(val_acc)

        print(f"Epoch [{epoch + 1}/{num_epochs}] "
              f"Train Loss: {epoch_loss:.4f}, Acc: {epoch_acc:.4f} "
              f"Val Loss: {val_loss:.4f}, Acc: {val_acc:.4f}")

    # Plot loss and accuracy curves
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Training Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Loss Curve')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(train_accs, label='Training Accuracy')
    plt.plot(val_accs, label='Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.title('Accuracy Curve')
    plt.legend()
    plt.tight_layout()
    plt.show()

    return model

device = 'cpu'#torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device ", device)

param_grid = {
    'lr': [1e-2, 1e-3, 1e-4],
    'hidden_size': [64, 128, 256],
    'num_layers': [2, 3, 4],
    'in_channels': [1],
    'out_channels': [2],
    'kernel_size': [3, 5],
    'num_epochs': [5]
}

torch.cuda.empty_cache()
gc.collect()
if 'm' in locals():
    del m

m = EmotionResNetLite(layers=(1, 1, 1, 1), base_channels=16, num_classes=6).to(device)
checkpoint = train_model(m, train_loader, val_loader, 20, 0.001, device=device)
torch.save(checkpoint, "dataset_all_model.pt")

"""
for lr in param_grid['lr']:
    for hidden_layer_size in param_grid['hidden_size']:
        for num_conv_layers in param_grid['num_layers']:
            for in_channels in param_grid['in_channels']:
                for out_channels in param_grid['out_channels']:
                    for kernel_size in param_grid['kernel_size']:
                        for num_epochs in param_grid['num_epochs']:
                            print(f"lr: {lr} hidden size: {hidden_layer_size} in_channels: {in_channels}"
                                  f" out_channels: {out_channels} kernel_size: {kernel_size} num epochs: {num_epochs}")
                            m = EmotionDetector(num_conv_layers, in_channels, out_channels, kernel_size).to(device)
                            train_model(m, train_loader, val_loader, num_epochs, lr, device=device)
                            torch.cuda.empty_cache()
                            gc.collect()
                            del m 
"""

In [None]:
load = iter(train_loader)
feature, label = next(load)
print(feature.shape)

In [None]:
c = 0
l = []
for name, param in m.named_parameters():
    c+=param.numel()
    l.append(param.numel())
l.sort()
print(l[len(l)//2])
print(l)
print(c)

In [None]:
import torch
import gc
torch.cuda.empty_cache()
gc.collect()
if 'm' in locals():
    del m
print(torch.cuda.memory_summary())

In [None]:
for name, param in m.named_parameters():
    print()
    print(name)
    print(param.flatten().cpu().median())
    print(param.flatten().cpu().std())

In [None]:
checkpoint