# CNN

In [None]:
import math, random
import torch
import torchaudio
from torchaudio import transforms
from IPython.display import Audio
from torch.utils.data import DataLoader, Dataset, random_split
import torchaudio
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import init

In [None]:
def get_melspectrogram(data):
    
    melspec = librosa.feature.melspectrogram(y = data,
                                                  sr = sample_rate, 
                                                  n_fft = 512, 
                                                  hop_length = 256, 
                                                  n_mels = 40).T

    melspec = librosa.power_to_db(melspec)
    
    return melspec

In [None]:
df_train_extracted

In [None]:
def spectro_augment(spec, max_mask_pct=0.1, n_freq_masks=2, n_time_masks=2):
        
    n_mels, n_steps = spec.shape
    mask_value = spec.mean()
    aug_spec = spec

    freq_mask_param = max_mask_pct * n_mels
    for _ in range(n_freq_masks):
      aug_spec = transforms.FrequencyMasking(freq_mask_param)(aug_spec, mask_value)

    time_mask_param = max_mask_pct * n_steps
    for _ in range(n_time_masks):
      aug_spec = transforms.TimeMasking(time_mask_param)(aug_spec, mask_value)

    return aug_spec

In [None]:
# Splitting the data
mel_train, mel_valid, mel_train_label, mel_valid_label = train_test_split(df_train_extracted['data'], df_train_extracted['label'], train_size=0.80)

# Function to process data and apply augmentation
def process_data(data, label):
    melspec = torch.tensor(get_melspectrogram(data))
    augmented_spec = torch.tensor(spectro_augment(melspec))
    return melspec, augmented_spec, label

# Train data spectrograms
mel_train_spec = []
mel_train_spec_label = []

for data, label in zip(mel_train, mel_train_label):
    melspec, augmented_spec, label = process_data(data, label)
    mel_train_spec.append(melspec)
    mel_train_spec.append(augmented_spec)
    mel_train_spec_label.extend([label, label])

# Validation data spectrograms 
mel_valid_spec = []
mel_valid_spec_label = []

for data, label in zip(mel_valid, mel_valid_label):
    melspec, _, label = process_data(data, label)
    mel_valid_spec.append(melspec)
    mel_valid_spec_label.append(label)

In [None]:
class MelSpecDataset(Dataset):
    def __init__(self, dataframe):
        self.dataframe = dataframe
    
    def __len__(self):
        return len(self.dataframe)
    
    def __getitem__(self, idx):
        spec = self.dataframe.loc[idx, 'data']
        label = self.dataframe.loc[idx, 'label']
        return spec, label

In [None]:
train_data = {
    'data': mel_train_spec,
    'label': mel_train_spec_label
}

valid_data = {
    'data': mel_valid_spec,
    'label': mel_valid_spec_label
}

df_mel_train = pd.DataFrame.from_dict(train_data)
df_mel_valid = pd.DataFrame.from_dict(valid_data)

train_ds = MelSpecDataset(df_mel_train)
valid_ds = MelSpecDataset(df_mel_valid)

print(df_mel_train.head())
print()
print(df_mel_valid.head())

train_dl = torch.utils.data.DataLoader(train_ds, batch_size=16, shuffle=True)
valid_dl = torch.utils.data.DataLoader(valid_ds, batch_size=16, shuffle=True)

In [None]:
# Audio Classification Model

class AudioClassifier (nn.Module):

    
    def __init__(self):
        super().__init__()
        conv_layers = []

        # First Convolution Block with Relu and Batch Norm. Use Kaiming Initialization
        self.conv1 = nn.Conv2d(16, 8, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
        self.relu1 = nn.ReLU()
        self.bn1 = nn.BatchNorm2d(8)
        init.kaiming_normal_(self.conv1.weight, a=0.1)
        self.conv1.bias.data.zero_()
        conv_layers += [self.conv1, self.relu1, self.bn1]

        # Second Convolution Block
        self.conv2 = nn.Conv2d(8, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu2 = nn.ReLU()
        self.bn2 = nn.BatchNorm2d(16)
        init.kaiming_normal_(self.conv2.weight, a=0.1)
        self.conv2.bias.data.zero_()
        conv_layers += [self.conv2, self.relu2, self.bn2]

        # Second Convolution Block
        self.conv3 = nn.Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu3 = nn.ReLU()
        self.bn3 = nn.BatchNorm2d(32)
        init.kaiming_normal_(self.conv3.weight, a=0.1)
        self.conv3.bias.data.zero_()
        conv_layers += [self.conv3, self.relu3, self.bn3]

        # Second Convolution Block
        self.conv4 = nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu4 = nn.ReLU()
        self.bn4 = nn.BatchNorm2d(64)
        init.kaiming_normal_(self.conv4.weight, a=0.1)
        self.conv4.bias.data.zero_()
        conv_layers += [self.conv4, self.relu4, self.bn4]

        # Linear Classifier
        self.ap = nn.AdaptiveAvgPool2d(output_size=1)
        self.lin = nn.Linear(in_features=64, out_features=10)

        # Wrap the Convolutional Blocks
        self.conv = nn.Sequential(*conv_layers)
 
    # ----------------------------
    # Forward pass computations
    # ----------------------------
    def forward(self, x):
        # Run the convolutional blocks
        x = self.conv(x)

        # Adaptive pool and flatten for input to linear layer
        x = self.ap(x)
        x = x.view(x.shape[0], -1)

        # Linear layer
        x = self.lin(x)

        # Final output
        return x

# Create the model and put it on the GPU if available
myModel = AudioClassifier()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
myModel = myModel.to(device)
# Check that it is on Cuda
next(myModel.parameters()).device

In [None]:
# Training Loop
def training(model, train_dl, num_epochs):
  # Loss Function, Optimizer and Scheduler
  criterion = nn.CrossEntropyLoss()
  optimizer = torch.optim.Adam(model.parameters(),lr=0.001)
  scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.001,
                                                steps_per_epoch=int(len(train_dl)),
                                                epochs=num_epochs,
                                                anneal_strategy='linear')

  # Repeat for each epoch
  for epoch in range(num_epochs):
    running_loss = 0.0
    correct_prediction = 0
    total_prediction = 0

    # Repeat for each batch in the training set
    for i, data in enumerate(train_dl):
        
        inputs, labels = data[0].to(device), data[1]
        inputs = inputs.unsqueeze(1)
        
        label_mapping = {'cough': 1, 'other': 0}
        label_tensor = torch.tensor([label_mapping[label] for label in labels], dtype=torch.long).to(device)

        # Normalize the inputs
        inputs_m, inputs_s = inputs.mean(), inputs.std()
        inputs = (inputs - inputs_m) / inputs_s
        
        inputs = inputs.float().to(device)
        
        # Zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        scheduler.step()

        # Keep stats for Loss and Accuracy
        running_loss += loss.item()

        # Get the predicted class with the highest score
        _, prediction = torch.max(outputs,1)
        # Count of predictions that matched the target label
        correct_prediction += (prediction == labels).sum().item()
        total_prediction += prediction.shape[0]

        #if i % 10 == 0:    # print every 10 mini-batches
        #    print('[%d, %5d] loss: %.3f' % (epoch + 1, i + 1, running_loss / 10))
    
    # Print stats at the end of the epoch
    num_batches = len(train_dl)
    avg_loss = running_loss / num_batches
    acc = correct_prediction/total_prediction
    print(f'Epoch: {epoch}, Loss: {avg_loss:.2f}, Accuracy: {acc:.2f}')

  print('Finished Training')
  
num_epochs=2   # Just for demo, adjust this higher.

training(myModel, train_dl, num_epochs)