<h2>Audio Classification using Pytorch</h2>
<p> Author: Shubham Dash </p>
<p>CNN based audio classification, using Mel-Spectrogram for feature map extraction</p>

<h3>Header imports</h3>

In [32]:
from __future__ import print_function
import argparse
import torch
import pickle
import matplotlib.pyplot as plt
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import Dataset, DataLoader
from torch.autograd import Variable
import os
from os.path import join as pjoin
import time
import pandas as pd
from scipy.io import wavfile
import glob
import librosa
import librosa.display

In [5]:
def set_seed(SEED):
    np.random.seed(SEED)
    torch.manual_seed(SEED)
    torch.cuda.manual_seed(SEED)
    torch.backends.cudnn.deterministic = True
    

<h3>Data Loader</h3>
<p> Read and process the audio files using mel-spectrometer and create a Pytorch dataloader </p> 

In [16]:
class AudioRead:
    '''
    Data loader for loading audio dataset
    '''
    def __init__(self, train_path, val_path, test_path):
        self.train_path = train_path
        self.val_path = val_path
        self.test_path = test_path
        self.train_names = []
        self.test_names = []
        self.val_names = []
    
    def load_data(self, mode='train'):
        '''
        Read audio files with scipy waveread library 
        '''
        if mode == 'train':
            base_path = self.train_path
        elif mode == 'val':
            base_path = self.val_path
        else:
            base_path = self.test_path
            
        audio_list = glob.glob(pjoin(base_path, "*.wav"))
        file_names = list(map(lambda x: pjoin(self.train_path, x), audio_list))

        if mode == 'train':
            self.train_names = file_names
        elif mode == 'val':
            self.val_names = file_names
        else:
            self.test_names = file_names
    
    def plot_spectrogram(self, mode, S, idx):
        '''
        Plotting and saving Mel Spectrogram for each audio waveform
        '''
        if mode == 'train':
            base_path = self.train_path
        elif mode == 'val':
            base_path = self.val_path
        else:
            base_path = self.test_path
        file_name = 'spectrogram_' + str(mode) + '_' + str(idx) + '.jpg'
        save_path = pjoin(base_path, file_name)
        plt.figure(figsize=(10, 4))
        librosa.display.specshow(librosa.power_to_db(S,
                                                     ref=np.max),
                                 y_axis='mel', fmax=8000,
                                 x_axis='time')
        plt.colorbar(format='%+2.0f dB')
        plt.title('Mel spectrogram')
        plt.tight_layout()
        plt.savefig(save_path)

    def process_data(self, mode='train'):
        '''
        Mel-spectrography to process raw audio files to 
        2D feature maps and save the images to .jpg
        '''
        if mode == 'train':
            ans = list(map(lambda x: librosa.load(x), self.train_names))
        elif mode == 'val':
            ans = list(map(lambda x: librosa.load(x), self.val_names))
        else:
            ans = list(map(lambda x: librosa.load(x), self.test_names))
        y, sr = list(zip(*ans)) # unzipping the lists
        S = []
        for i in range(len(y)):
            s = librosa.feature.melspectrogram(y=y[i], sr=sr[i], n_mels=128,
                                        fmax=8000)
            S.append(s)
            self.plot_spectrogram(mode, s, i) # plot and save Mel-spectrogram
        

<h3>Pytorch DataLoader</h3>

In [17]:
class FATDataset(Dataset):
    """Face Landmarks dataset."""

    def __init__(self, csv_file, root_dir, transform=None):
        """
        Args:
            csv_file (string): Path to the csv file with annotations.
            root_dir (string): Directory with all the images.
            transform (callable, optional): Optional transform to be applied
                on a sample.
        """
        self.landmarks_frame = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.landmarks_frame)

    def __getitem__(self, idx):
        img_name = os.path.join(self.root_dir,
                                self.landmarks_frame.iloc[idx, 0])
        image = io.imread(img_name)
        landmarks = self.landmarks_frame.iloc[idx, 1:].as_matrix()
        landmarks = landmarks.astype('float').reshape(-1, 2)
        sample = {'image': image, 'landmarks': landmarks}

        if self.transform:
            sample = self.transform(sample)

        return sample

<h3>Metrics</h3>
<p> Metrics to be used for evaluating model performance</p>

In [18]:
def _one_sample_positive_class_precisions(scores, truth):
    """Calculate precisions for each true class for a single sample.

    Args:
      scores: np.array of (num_classes,) giving the individual classifier scores.
      truth: np.array of (num_classes,) bools indicating which classes are true.

    Returns:
      pos_class_indices: np.array of indices of the true classes for this sample.
      pos_class_precisions: np.array of precisions corresponding to each of those
        classes.
    """
    num_classes = scores.shape[0]
    pos_class_indices = np.flatnonzero(truth > 0)
    # Only calculate precisions if there are some true classes.
    if not len(pos_class_indices):
        return pos_class_indices, np.zeros(0)
    # Retrieval list of classes for this sample.
    retrieved_classes = np.argsort(scores)[::-1]
    # class_rankings[top_scoring_class_index] == 0 etc.
    class_rankings = np.zeros(num_classes, dtype=np.int)
    class_rankings[retrieved_classes] = range(num_classes)
    # Which of these is a true label?
    retrieved_class_true = np.zeros(num_classes, dtype=np.bool)
    retrieved_class_true[class_rankings[pos_class_indices]] = True
    # Num hits for every truncated retrieval list.
    retrieved_cumulative_hits = np.cumsum(retrieved_class_true)
    # Precision of retrieval list truncated at each hit, in order of pos_labels.
    precision_at_hits = (
            retrieved_cumulative_hits[class_rankings[pos_class_indices]] /
            (1 + class_rankings[pos_class_indices].astype(np.float)))
    return pos_class_indices, precision_at_hits


def calculate_per_class_lwlrap(truth, scores):
    """Calculate label-weighted label-ranking average precision.

    Arguments:
      truth: np.array of (num_samples, num_classes) giving boolean ground-truth
        of presence of that class in that sample.
      scores: np.array of (num_samples, num_classes) giving the classifier-under-
        test's real-valued score for each class for each sample.

    Returns:
      per_class_lwlrap: np.array of (num_classes,) giving the lwlrap for each
        class.
      weight_per_class: np.array of (num_classes,) giving the prior of each
        class within the truth labels.  Then the overall unbalanced lwlrap is
        simply np.sum(per_class_lwlrap * weight_per_class)
    """
    assert truth.shape == scores.shape
    num_samples, num_classes = scores.shape
    # Space to store a distinct precision value for each class on each sample.
    # Only the classes that are true for each sample will be filled in.
    precisions_for_samples_by_classes = np.zeros((num_samples, num_classes))
    for sample_num in range(num_samples):
        pos_class_indices, precision_at_hits = (
            _one_sample_positive_class_precisions(scores[sample_num, :],
                                                  truth[sample_num, :]))
        precisions_for_samples_by_classes[sample_num, pos_class_indices] = (
            precision_at_hits)
    labels_per_class = np.sum(truth > 0, axis=0)
    weight_per_class = labels_per_class / float(np.sum(labels_per_class))
    # Form average of each column, i.e. all the precisions assigned to labels in
    # a particular class.
    per_class_lwlrap = (np.sum(precisions_for_samples_by_classes, axis=0) /
                        np.maximum(1, labels_per_class))
    # overall_lwlrap = simple average of all the actual per-class, per-sample precisions
    #                = np.sum(precisions_for_samples_by_classes) / np.sum(precisions_for_samples_by_classes > 0)
    #           also = weighted mean of per-class lwlraps, weighted by class label prior across samples
    #                = np.sum(per_class_lwlrap * weight_per_class)
    return per_class_lwlrap, weight_per_class

In [19]:
# for storing the configurations for the model
class Config:
    in_channels = 3
    filter_dims = [512, 256, 128, 64]
    out_features = 24566 # totally wrong, correct this
    num_classes = 200
    num_epochs = 80
    batch_size = 64
    test_batch_size = 256
    lr = 3e-3
    eta_min = 1e-5
    t_max = 10

<h3>Model Description</h3>
<p> Experimenting with a CNN based model for multi-class audio-tagging</p>

In [49]:
class Model(nn.Module):
    '''
    config is the Config object
    '''
    def __init__(self, config):
        super(Model, self).__init__()
        self.cfg = config
        self.ConvBlock = nn.Sequential(nn.Conv2d(self.cfg.in_channels, self.cfg.filter_dims[0]),
                    nn.Conv2d(self.cfg.filter_dims[0], self.cfg.filter_dims[1]),
                    nn.Conv2d(self.cfg.filter_dims[1], self.cfg.filter_dims[2]),
                    nn.Conv2d(self.cfg.filter_dims[2], self.cfg.filter_dims[3]))
        self.linear = nn.Linear(self.cfg.out_features, self.cfg.num_classes)
    
    def forward(self, x):
        out = self.ConvBlock(x)
        out = F.softmax(self.linear(out))
        return out
        

In [None]:
# traning loop
def train(x_train, y_train, train_transforms, cfg):
    num_epochs = 80
    batch_size = 64
    test_batch_size = 256
    lr = 3e-3
    eta_min = 1e-5
    t_max = 10
    
    num_classes = y_train.shape[1]

    x_trn, x_val, y_trn, y_val = train_test_split(x_train, y_train, test_size=0.2, random_state=SEED)
    
    train_dataset = FATTrainDataset(x_trn, y_trn, train_transforms)
    valid_dataset = FATTrainDataset(x_val, y_val, train_transforms)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    valid_loader = DataLoader(valid_dataset, batch_size=test_batch_size, shuffle=False)

    model = Classifier(num_classes=num_classes).cuda()
    criterion = nn.BCEWithLogitsLoss().cuda()
    optimizer = Adam(params=model.parameters(), lr=lr, amsgrad=False)
    scheduler = CosineAnnealingLR(optimizer, T_max=t_max, eta_min=eta_min)

    best_epoch = -1
    best_lwlrap = 0.
    mb = master_bar(range(num_epochs))

    for epoch in mb:
        start_time = time.time()
        model.train()
        avg_loss = 0.

        for x_batch, y_batch in progress_bar(train_loader, parent=mb):
            preds = model(x_batch.cuda())
            loss = criterion(preds, y_batch.cuda())

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            avg_loss += loss.item() / len(train_loader)

        model.eval()
        valid_preds = np.zeros((len(x_val), num_classes))
        avg_val_loss = 0.

        for i, (x_batch, y_batch) in enumerate(valid_loader):
            preds = model(x_batch.cuda()).detach()
            loss = criterion(preds, y_batch.cuda())

            preds = torch.sigmoid(preds)
            valid_preds[i * test_batch_size: (i+1) * test_batch_size] = preds.cpu().numpy()

            avg_val_loss += loss.item() / len(valid_loader)
            
        score, weight = calculate_per_class_lwlrap(y_val, valid_preds)
        lwlrap = (score * weight).sum()
        
        scheduler.step()

        if (epoch + 1) % 5 == 0:
            elapsed = time.time() - start_time
            mb.write(f'Epoch {epoch+1} - avg_train_loss: {avg_loss:.4f}  avg_val_loss: {avg_val_loss:.4f}  val_lwlrap: {lwlrap:.6f}  time: {elapsed:.0f}s')
    
        if lwlrap > best_lwlrap:
            best_epoch = epoch + 1
            best_lwlrap = lwlrap
            torch.save(model.state_dict(), 'weight_best.pt')
            
    return {
        'best_epoch': best_epoch,
        'best_lwlrap': best_lwlrap,
    }


<h3>Driver</h3>

In [21]:
if __name__ == "__main__":
    train_file = r'C:\Users\Shubham\Desktop\Lottery Ticket Hypothesis\train_data'
    d = AudioRead(train_file, train_file, train_file)
    l, names = d.load_data()