# Install packages

In [15]:
import librosa
import numpy as np
import pandas as pd
import math, random
from tqdm import tqdm

# Read audio file
import torchaudio
from torchaudio import transforms
from IPython.display import Audio

# Dataloader
from torch.utils.data import DataLoader, Dataset, random_split

# CNN
import torch
from torchsummary import summary
import torch.nn as nn
import torchvision.models as models


### Sources :
https://towardsdatascience.com/audio-deep-learning-made-simple-sound-classification-step-by-step-cebc936bbe5 </br>
https://towardsdatascience.com/audio-deep-learning-made-simple-part-2-why-mel-spectrograms-perform-better-aad889a93505 </br>
https://towardsdatascience.com/audio-deep-learning-made-simple-part-3-data-preparation-and-augmentation-24c6e1f6b52 </br>
https://www.kaggle.com/code/frlemarchand/bird-song-classification-using-an-efficientnet#Model-creation </br>
https://github.com/sbs80/cnn-audio-classification
 

# Template functions

In [16]:
def get_audio_time_series(filename:str):
    '''
    Reads a mp3 file

    Returns a tuple containing y, the audio time series, and sr, the sampling rate of y
    '''
    try:
        y, sr = librosa.load(filename)
        return y, sr
    except:
        pass

In [17]:
filename_template = 'mp3_files/00pcolwO8c6vOxOUwpZ0QM.mp3'
y, sr = get_audio_time_series(filename = filename_template)

In [18]:
df_train = pd.read_csv('train_dataset.csv')
df_test =  pd.read_csv('test_dataset.csv')

# Utils

In [39]:
class AudioUtil():
    # -----------------
    # Load an audio file. Return the signal as a tensor and the sample rate
    # -----------------
    @staticmethod
    def open(audio_file):
        sig, sr = torchaudio.load(audio_file)
        return (sig, sr)
    
    # -----------------
    # Convert the given audio to the desired number of channels
    # -----------------

    @staticmethod
    def rechannel(aud, new_channel):
        sig, sr = aud

        if (sig.shape[0] == new_channel):
            # Nothing to do
            return aud

        if (new_channel == 1):
            # Convert from stereo to mono by selecting only the first channel
            resig = sig[:1, :]
        else:
            # Convert from mono to stereo by duplicating the first channel
            resig = torch.cat([sig, sig])

        return ((resig, sr))
        

    
    # -----------------
    # Standardize sampling rate, since Resample applies to a single channel, we resample one channel at the time
    # -----------------
    @staticmethod
    def resample(aud, newsr):
        sig, sr = aud

        if (sr == newsr):
            # Nothing to do 
            return aud
        
        num_channel = sig.shape[0]
        # Resample first channel
        resig = torchaudio.transforms.Resample(sr, newsr)(sig[:1,:])
        if (num_channel > 1):
            # Resample the second channel and merge both channels
            retwo = torchaudio.transforms.Resample(sr, newsr)(sig[:1,:])
            resig = torch.cat([resig, retwo])
        return ((resig, newsr))
    
    @staticmethod
    def pad_trunc(aud, max_ms):
        sig, sr = aud
        num_rows, sig_len = sig.shape
        max_len = sr//1000 * max_ms

        if (sig_len > max_len):
            # Truncate the signal to the given length
            sig = sig[:,:max_len]

        elif (sig_len < max_len):
            # Length of padding to add at the beginning and end of the signal
            pad_begin_len = random.randint(0, max_len - sig_len)
            pad_end_len = max_len - sig_len - pad_begin_len

             # Pad with 0s
            pad_begin = torch.zeros((num_rows, pad_begin_len))
            pad_end = torch.zeros((num_rows, pad_end_len))

            sig = torch.cat((pad_begin, sig, pad_end), 1)
        
        return (sig, sr)
    
        # ----------------------------
        # Data Augmentation: Shifts the signal to the left or right by some percent. Values at the end
        # are 'wrapped around' to the start of the transformed signal.
        # ----------------------------

    @staticmethod
    def time_shift(aud, shift_limit):
        sig,sr = aud
        _, sig_len = sig.shape
        shift_amt = int(random.random() * shift_limit * sig_len)
        return (sig.roll(shift_amt), sr)
    
    # ----------------------------
    # Mel Spectrogram: Generate a Spectrogram
    # ----------------------------
    @staticmethod
    def spectro_gram(aud, n_mels=64, n_fft=1024, hop_len=None):
        sig,sr = aud
        top_db = 80

        # spec has shape [channel, n_mels, time], where channel is mono, stereo etc
        spec = transforms.MelSpectrogram(sr, n_fft=n_fft, hop_length=hop_len, n_mels=n_mels)(sig)

        # Convert to decibels
        spec = transforms.AmplitudeToDB(top_db=top_db)(spec)
        return (spec)
    
    # ----------------------------
    # Augment the Spectrogram by masking out some sections of it in both the frequency
    # dimension (ie. horizontal bars) and the time dimension (vertical bars) to prevent
    # overfitting and to help the model generalise better. The masked sections are
    # replaced with the mean value.
    # ----------------------------
    @staticmethod
    def spectro_augment(spec, max_mask_pct=0.1, n_freq_masks=1, n_time_masks=1):
        _, n_mels, n_steps = spec.shape
        mask_value = spec.mean()
        aug_spec = spec

        freq_mask_param = max_mask_pct * n_mels
        for _ in range(n_freq_masks):
            aug_spec = transforms.FrequencyMasking(freq_mask_param)(aug_spec, mask_value)

        time_mask_param = max_mask_pct * n_steps
        for _ in range(n_time_masks):
            aug_spec = transforms.TimeMasking(time_mask_param)(aug_spec, mask_value)

        return aug_spec

# Dataloader


In [20]:
# ----------------------------
# Sound Dataset
# ----------------------------
class SoundDS(Dataset):

  def __init__(self, df):

    self.df = df
    self.duration = 10000 # Hyper-parametrisation
    self.sr = 44100 # Hyper-parametrisation
    self.channel = 2
    self.shift_pct = 0.4
            
  # ----------------------------
  # Number of items in dataset
  # ----------------------------
  def __len__(self):
    return len(self.df)    
  


  # ----------------------------
  # Get i'th item in dataset
  # ----------------------------
  def __getitem__(self, idx):
    try:
      # Absolute file path of the audio file - concatenate the audio directory with
      # the relative path
      audio_file = self.df.loc[idx, 'mp3_filepath']

      # Get the release date
      year_id = self.df.loc[idx, 'year']

      aud = AudioUtil.open(audio_file)

      # Some sounds have a higher sample rate, or fewer channels compared to the
      # majority. So make all sounds have the same number of channels and same 
      # sample rate. Unless the sample rate is the same, the pad_trunc will still
      # result in arrays of different lengths, even though the sound duration is
      # the same.

      # Resample
      reaud = AudioUtil.resample(aud, self.sr)

      # Re-channel
      rechan = AudioUtil.rechannel(reaud, self.channel)

      # Size uniformisation
      dur_aud = AudioUtil.pad_trunc(rechan, self.duration)

      # Data augmentation
      shift_aud = AudioUtil.time_shift(dur_aud, self.shift_pct)

      # Convert to a Mel-Spectrogram
      sgram = AudioUtil.spectro_gram(shift_aud, n_mels=64, n_fft=1024, hop_len=None)


      # Data augmentation
      aug_sgram = AudioUtil.spectro_augment(sgram, max_mask_pct=0.1, n_freq_masks=2, n_time_masks=2)
      # Validate the augmented data
    
      return aug_sgram, year_id
    
    
    except Exception as e:
      print(f"Error processing data for index {idx}: {e} \n")
      return None 

      

In [40]:
ds_train = SoundDS(df_train)
ds_test = SoundDS(df_test)
# Create training and validation data loaders
train_dl = torch.utils.data.DataLoader(ds_train, batch_size=16, shuffle=True)
test_dl = torch.utils.data.DataLoader(ds_test , batch_size=16, shuffle=False)

In [43]:
# Print some data samples from the training data loader
print("Training Data Samples:")
for batch in train_dl:
    features, labels = batch  # Assuming your dataset returns (features, labels)
    print("Labels:", labels)
    print("Input shape :" ,features.shape )
    break  # Print only the first batch as an example

Training Data Samples:
Labels: tensor([2020, 2001, 2019, 2005, 2014, 2022, 2023, 2022, 1998, 1976, 2022, 2022,
        1985, 2019, 2020, 2007])
Input shape : torch.Size([16, 2, 64, 860])


# EfficientNet_Audio

Modify the output to be okay with reg

In [33]:
# ----------------------------
# Audio Classification Model
# ----------------------------
class AudioReg (nn.Module):
    # ----------------------------
    # Build the model architecture
    # ----------------------------
    def __init__(self):
    
        super().__init__()

        conv_layers = []

        # First Convolution Block with Relu and Batch Norm. Use Kaiming Initialization
        self.conv1 = nn.Conv2d(2, 8, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
        self.relu1 = nn.ReLU()
        self.bn1 = nn.BatchNorm2d(8)
        nn.init.kaiming_normal_(self.conv1.weight, a=0.1)
        self.conv1.bias.data.zero_()
        conv_layers += [self.conv1, self.relu1, self.bn1]

        # Second Convolution Block
        self.conv2 = nn.Conv2d(8, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu2 = nn.ReLU()
        self.bn2 = nn.BatchNorm2d(16)
        nn.init.kaiming_normal_(self.conv2.weight, a=0.1)
        self.conv2.bias.data.zero_()
        conv_layers += [self.conv2, self.relu2, self.bn2]

        # Second Convolution Block
        self.conv3 = nn.Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu3 = nn.ReLU()
        self.bn3 = nn.BatchNorm2d(32)
        nn.init.kaiming_normal_(self.conv3.weight, a=0.1)
        self.conv3.bias.data.zero_()
        conv_layers += [self.conv3, self.relu3, self.bn3]

        # Second Convolution Block
        self.conv4 = nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu4 = nn.ReLU()
        self.bn4 = nn.BatchNorm2d(64)
        nn.init.kaiming_normal_(self.conv4.weight, a=0.1)
        self.conv4.bias.data.zero_()
        conv_layers += [self.conv4, self.relu4, self.bn4]

        # Linear Classifier
        self.ap = nn.AdaptiveAvgPool2d(output_size=1)
        self.lin = nn.Linear(in_features=64, out_features=1)

        # Wrap the Convolutional Blocks
        self.conv = nn.Sequential(*conv_layers)

# ----------------------------
    # Forward pass computations
    # ----------------------------
    def forward(self, x):
        print(x.shape)
        # Run the convolutional blocks
        x = self.conv(x)

        # Adaptive pool and flatten for input to linear layer
        x = self.ap(x)
        x = x.view(x.shape[0], -1)

        # Linear layer
        x = self.lin(x)

        # Final output
        return x



In [24]:
# Create the model and put it on the GPU if available
model = AudioReg()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = model.to(device)
# Check that it is on Cuda
next(model.parameters()).device

device(type='cpu')

### Test shape

In [25]:
input_tensor = torch.empty(16, 2, 64, 344)
input_shape = input_tensor.shape
with torch.no_grad():
    output_tensor = model(input_tensor)
output_shape = output_tensor.shape

print("Input Shape:", input_shape)
print("Output Shape:", output_shape)

Input Shape: torch.Size([16, 2, 64, 344])
Output Shape: torch.Size([16, 1])


In [30]:
summary(model,input_data=input_tensor,batch_dim=0 )

Layer (type:depth-idx)                   Output Shape              Param #
├─Sequential: 1-1                        [-1, 64, 4, 22]           --
├─Conv2d: 1-2                            [-1, 8, 32, 172]          408
├─Sequential: 1                          []                        --
|    └─Conv2d: 2-1                       [-1, 8, 32, 172]          (recursive)
├─ReLU: 1-3                              [-1, 8, 32, 172]          --
├─Sequential: 1                          []                        --
|    └─ReLU: 2-2                         [-1, 8, 32, 172]          --
├─BatchNorm2d: 1-4                       [-1, 8, 32, 172]          16
├─Sequential: 1                          []                        --
|    └─BatchNorm2d: 2-3                  [-1, 8, 32, 172]          (recursive)
├─Conv2d: 1-5                            [-1, 16, 16, 86]          1,168
├─Sequential: 1                          []                        --
|    └─Conv2d: 2-4                       [-1, 16, 16, 86]      

Layer (type:depth-idx)                   Output Shape              Param #
├─Sequential: 1-1                        [-1, 64, 4, 22]           --
├─Conv2d: 1-2                            [-1, 8, 32, 172]          408
├─Sequential: 1                          []                        --
|    └─Conv2d: 2-1                       [-1, 8, 32, 172]          (recursive)
├─ReLU: 1-3                              [-1, 8, 32, 172]          --
├─Sequential: 1                          []                        --
|    └─ReLU: 2-2                         [-1, 8, 32, 172]          --
├─BatchNorm2d: 1-4                       [-1, 8, 32, 172]          16
├─Sequential: 1                          []                        --
|    └─BatchNorm2d: 2-3                  [-1, 8, 32, 172]          (recursive)
├─Conv2d: 1-5                            [-1, 16, 16, 86]          1,168
├─Sequential: 1                          []                        --
|    └─Conv2d: 2-4                       [-1, 16, 16, 86]      

# Training

In [47]:
# -----------------------------
# Training Loop
# ----------------------------
def training(model, train_dl, num_epochs):
  # Loss Function, Optimizer and Scheduler
  criterion = nn.MSELoss()
  optimizer = torch.optim.Adam(model.parameters(),lr=0.001)
  scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.001,
                                                steps_per_epoch=int(len(train_dl)),
                                                epochs=num_epochs,
                                                anneal_strategy='linear')

  # Repeat for each epoch
  for epoch in range(num_epochs):
    running_loss = 0.0
    correct_prediction = 0
    total_prediction = 0

    with tqdm(train_dl, unit="batch") as tepoch:

      # Repeat for each batch in the training set
      for i, data in enumerate(tepoch):
          #Refresh tqdm
          tepoch.set_description(f"Epoch {epoch}")

          # Get the input features and target labels, and put them on the GPU
          inputs, labels = data[0].to(device), data[1].to(device)
          # Ensure labels are of float data type
          labels = labels.float()
       

          # Normalize the inputs
          inputs_m, inputs_s = inputs.mean(), inputs.std()
          inputs = (inputs - inputs_m) / inputs_s

          # Zero the parameter gradients
          optimizer.zero_grad()

          # forward + backward + optimize
          outputs = model(inputs)
          
          loss = torch.sqrt(criterion(outputs, labels))
          loss.backward()
          optimizer.step()
          scheduler.step()

          # Keep stats for Loss and Accuracy
          running_loss += loss.item()

          # Get the predicted class with the highest score
          _, prediction = torch.max(outputs,1)
          # Count of predictions that matched the target label
          current_acc =  (prediction == labels).sum().item() / prediction.shape[0]
          correct_prediction += (prediction == labels).sum().item()
          total_prediction += prediction.shape[0]

          #if i % 10 == 0:    # print every 10 mini-batches
          #    print('[%d, %5d] loss: %.3f' % (epoch + 1, i + 1, running_loss / 10))
          tepoch.set_postfix(loss=loss.item() , accuracy=100. * current_acc)
      # Print stats at the end of the epoch
      num_batches = len(train_dl)
      avg_loss = running_loss / num_batches
      acc = correct_prediction/total_prediction

      
      print(f'Epoch: {epoch}, Average Loss: {avg_loss:.2f}, Average accuracy: {acc:.2f}')

  print('Finished Training')
  


In [48]:
num_epochs=1   # Just for demo, adjust this higher.
training(model, train_dl, num_epochs)

Epoch 0:  21%|██        | 20/97 [00:17<01:07,  1.14batch/s, accuracy=0, loss=2.01e+3]


KeyboardInterrupt: 