In [1]:
import os
import pickle

import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import librosa
import librosa.display
from sklearn.metrics import classification_report
import torch
from torch import nn
from torch import optim
from torch.nn.utils.rnn import pad_sequence, pad_packed_sequence, pack_padded_sequence, PackedSequence
from torch.utils.data import Dataset, Subset, DataLoader, random_split

RANDOM_STATE = 42
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

In [2]:
class_mapping = {
    'Rock': 'Rock',
    'Psych-Rock': 'Rock',
    'Indie-Rock': None,
    'Post-Rock': 'Rock',
    'Psych-Folk': 'Folk',
    'Folk': 'Folk',
    'Metal': 'Metal',
    'Punk': 'Metal',
    'Post-Punk': None,
    'Trip-Hop': 'Trip-Hop',
    'Pop': 'Pop',
    'Electronic': 'Electronic',
    'Hip-Hop': 'Hip-Hop',
    'Classical': 'Classical',
    'Blues': 'Blues',
    'Chiptune': 'Electronic',
    'Jazz': 'Jazz',
    'Soundtrack': None,
    'International': None,
    'Old-Time': None
}


def read_fused_spectrogram(spectrogram_file):
    spectrogram = np.load(spectrogram_file)
    return spectrogram.T

def read_mel_spectrogram(spectrogram_file):
    spectrogram = np.load(spectrogram_file)[:128]
    return spectrogram.T

def read_chromagram(spectrogram_file):
    spectrogram = np.load(spectrogram_file)[128:]
    return spectrogram.T

# I will implement the DataSets differently, so that they load the data on demand,
# instead of preloading everything and filling precious memory.
# Also, perform padding on batch creation and split our datasets using torch's random_split.

class SpectrogramDataset(Dataset):
    def __init__(self, path, read_spec_fn, class_mapping, train=True):
        self.class_mapping = class_mapping
        self.read_spec_fn = read_spec_fn
        t = 'train' if train else 'test'
        self.data_dir = os.path.join(path, t)
        self.labels_file = os.path.join(path, f'{t}_labels.txt')
        data_files, labels_str = self.get_file_labels()
        self.data_files = np.array(data_files)
        self.labels_str, self.labels = np.unique(labels_str, return_inverse=True)
        
    def get_file_labels(self):
        data_files = []
        labels = []
        with open(self.labels_file) as f:
            next(f)  # Skip the headers
            for line in f:
                line = line.rstrip()
                t, label = line.split('\t')
                if self.class_mapping is not None:
                    label = self.class_mapping[label]
                if label is None:
                    continue
                t, _ = t.split('.', 1)
                data_file = f'{t}.fused.full.npy'
                data_files.append(data_file)
                labels.append(label)
        return data_files, labels
    
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, index):
        x = self.read_spec_fn(os.path.join(self.data_dir, self.data_files[index]))
        y = self.labels[index]
        return torch.Tensor(x), torch.LongTensor([y]), torch.LongTensor([len(x)])


def split_dataset(dataset, train_size, seed=RANDOM_STATE):
    n = len(dataset)
    n_train = int(train_size * n)
    n_val = n - n_train
    generator = torch.Generator()
    if seed is not None:
        generator.manual_seed(seed)
    dataset_train, dataset_val = random_split(dataset, [n_train, n_val], generator)
    return dataset_train, dataset_val


def collate_fn(batch):
    seqs, labels, lengths = map(list, zip(*batch))
    return pad_sequence(seqs, batch_first=True), torch.LongTensor(labels), torch.LongTensor(lengths)


def plot_spectograms(spec1, spec2, title1=None, title2=None, suptitle=None, cmap='viridis'):
    fig, axs = plt.subplots(2, figsize=(9, 12))
    img = librosa.display.specshow(spec1, ax=axs[0], cmap=cmap)
    librosa.display.specshow(spec2, ax=axs[1], cmap=cmap)
    axs[0].set_title(title1)
    axs[1].set_title(title2)
    fig.colorbar(img, ax=axs)
    fig.suptitle(suptitle)

In [3]:
# Prepare all datasets and loaders
raw_path = '/kaggle/input/patreco3-multitask-affective-music/data/fma_genre_spectrograms'

fused_raw_train_full = SpectrogramDataset(raw_path, read_spec_fn=read_fused_spectrogram, train=True, class_mapping=class_mapping)
fused_raw_train, fused_raw_val = split_dataset(fused_raw_train_full, train_size=0.8)
fused_raw_test = SpectrogramDataset(raw_path, read_spec_fn=read_fused_spectrogram, train=False, class_mapping=class_mapping)

mel_raw_train_full = SpectrogramDataset(raw_path, read_spec_fn=read_mel_spectrogram, train=True, class_mapping=class_mapping)
mel_raw_train, mel_raw_val = split_dataset(mel_raw_train_full, train_size=0.8)
mel_raw_test = SpectrogramDataset(raw_path, read_spec_fn=read_mel_spectrogram, train=False, class_mapping=class_mapping)

chroma_raw_train_full = SpectrogramDataset(raw_path, read_spec_fn=read_chromagram, train=True, class_mapping=class_mapping)
chroma_raw_train, chroma_raw_val = split_dataset(chroma_raw_train_full, train_size=0.8)
chroma_raw_test = SpectrogramDataset(raw_path, read_spec_fn=read_chromagram, train=False, class_mapping=class_mapping)

beat_path = '/kaggle/input/patreco3-multitask-affective-music/data/fma_genre_spectrograms_beat'

fused_beat_train_full = SpectrogramDataset(beat_path, read_spec_fn=read_fused_spectrogram, train=True, class_mapping=class_mapping)
fused_beat_train, fused_beat_val = split_dataset(fused_beat_train_full, train_size=0.8)
fused_beat_test = SpectrogramDataset(beat_path, read_spec_fn=read_fused_spectrogram, train=False, class_mapping=class_mapping)

mel_beat_train_full = SpectrogramDataset(beat_path, read_spec_fn=read_mel_spectrogram, train=True, class_mapping=class_mapping)
mel_beat_train, mel_beat_val = split_dataset(mel_beat_train_full, train_size=0.8)
mel_beat_test = SpectrogramDataset(beat_path, read_spec_fn=read_mel_spectrogram, train=False, class_mapping=class_mapping)

chroma_beat_train_full = SpectrogramDataset(beat_path, read_spec_fn=read_chromagram, train=True, class_mapping=class_mapping)
chroma_beat_train, chroma_beat_val = split_dataset(chroma_beat_train_full, train_size=0.8)
chroma_beat_test = SpectrogramDataset(beat_path, read_spec_fn=read_chromagram, train=False, class_mapping=class_mapping)

labels = mel_raw_train_full.labels
labels_str = mel_raw_train_full.labels_str

In our example we chose Electronic music vs classical music.
We see that the Electronic sample is more tightly structured in a disrete manner, while Classical sample is more fluid and continuous,
and this holds both for the mel spectogram and the chromogram.
Also, from the mel spectograms we see that the Electronic sample has harmonics over the entire frequency range,
while the Classical sample does not. Finaly notice the regular vertical lines in the Electronic samples
which are a result of a regular rhythm

As we see, size of each raw sample is above 150,000 which is almost impossible to use for training on normal machines.
On the other hand beat-synced samples have size of roughly 750, which is definitely something we can work with.

In [4]:
def step_0_1_2_3():
    label1_str = 'Electronic'
    label2_str = 'Classical'
    label1 = labels_str.tolist().index(label1_str)
    label2 = labels_str.tolist().index(label2_str)
    index1 = labels.tolist().index(label1)
    index2 = labels.tolist().index(label2)

    for dataset, spec_type, transform in zip(
            (mel_raw_train_full, chroma_raw_train, mel_beat_train_full, chroma_beat_train_full),
            ('Mel frequencies', 'Chromagrams')*2,
            ('Raw',)*2 + ('Beat-Synced',)*2
        ):
        spec1 = dataset[index1][0].numpy()
        spec2 = dataset[index2][0].numpy()
        print(f'{spec_type} ({transform}) shape: {spec1.shape}')
        plot_spectograms(spec1.T, spec2.T, label1_str, label2_str, f'{spec_type} ({transform})')
    
    
step_0_1_2_3()

As noted earlier I implemented the Datasets differently.
Below I answer the questions asked in the original implementation.

* QUESTION: Comment on howv the train and validation splits are created.

  ANSWER: We read the data in arrays, create an array of the indices,
  we shuffle the indices, and then we split them.

* QUESTION: It's useful to set the seed when debugging but when experimenting ALWAYS set seed=None. Why?

  ANSWER: Because we would always be training and validating on the same data,
  which could make the model learn properties specific to that split
  and which aren't properties of the entire set.

* QUESTION: Comment on why padding is needed

  ANSWER: Because PyTorch doesn't support ragged tensors.



In [5]:
def step_4():
    # Create a dataset without using the class mapping, solely for computing the labels
    # Note that constructing the dataset is cheap, since our implementation is lazy.
    ds = SpectrogramDataset(raw_path, read_spec_fn=read_mel_spectrogram, train=True, class_mapping=None)
    labels_str_original = ds.labels_str
    labels_original = ds.labels

    fig, axs = plt.subplots(ncols=2, figsize=(12, 8))
    sns.histplot(labels_str_original[labels_original], bins=len(labels_str_original), ax=axs[0])
    sns.histplot(labels_str[labels], bins=len(labels_str), ax=axs[1])
    _ = plt.setp(axs[0].get_xticklabels(), rotation=45, ha='right')
    _ = plt.setp(axs[1].get_xticklabels(), rotation=45, ha='right')
    axs[0].set_title('Original Labels')
    axs[1].set_title('Transformed Labels')

    
step_4()

In [6]:
class CustomLSTM(nn.Module):
    
    def __init__(self, input_size, hidden_size, output_size,
                 bidirectional=False, dropout=0.
                 ):
        super().__init__()
        self.lstm = nn.LSTM(input_size, hidden_size,
                            bidirectional=bidirectional, batch_first=True)
        self.linear = nn.Linear(hidden_size * (bidirectional + 1), output_size)
        self.dropout = nn.Dropout(p=dropout)
        
    def forward(self, x, lengths):
        
        lstm_out, *_ = self.lstm(x)
        if isinstance(lstm_out, PackedSequence):
            lstm_out, _ = pad_packed_sequence(lstm_out, batch_first=True)
        
        # Get the final outputs of each direction and concatenate them
        end_indices = (lengths - 1)[..., None, None].to(DEVICE)
        end1 = torch.take_along_dim(lstm_out[..., :self.lstm.hidden_size],
                                    end_indices,
                                    1
                                    ).squeeze()
        end2 = torch.take_along_dim(lstm_out[..., self.lstm.hidden_size:],
                                    end_indices,
                                    1
                                    ).squeeze()
        # If self.lstm.bidirectional, end2 is an empty tensor
        lstm_out = torch.cat((end1, end2), dim=-1)
    
        dropout_out = self.dropout(lstm_out)
        linear_out = self.linear(dropout_out)
        return linear_out


def train_loop(dataloader, model, loss_fn, optimizer, device=DEVICE):
    model.train()
    train_loss = 0.
    n_batches = len(dataloader)
    
    for x, y, lengths in dataloader:
        x, y = x.to(device), y.to(device)
        x = pack_padded_sequence(x, lengths, enforce_sorted=False, batch_first=True)
        
        # Compute prediction and loss
        pred = model(x, lengths)
        loss = loss_fn(pred, y)
        train_loss += loss.item()

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
    train_loss /= n_batches
    return train_loss


def test_loop(dataloader, model, loss_fn, device=DEVICE):
    model.eval()
    n_batches = len(dataloader)
    test_loss = 0
    test_accuracy = 0

    with torch.inference_mode():
        for x, y, lengths in dataloader:
            x, y = x.to(device), y.to(device)
            x = pack_padded_sequence(x, lengths, enforce_sorted=False, batch_first=True)
            probs = model(x, lengths)
            test_loss += loss_fn(probs, y).item()
            preds = torch.argmax(probs, 1)
            test_accuracy += (preds == y).float().mean().item()

    test_loss /= n_batches
    test_accuracy /= n_batches
    return test_loss, test_accuracy


def train_eval(model, train_dataset, val_dataset, batch_size,epochs,
               lr=1e-3, l2=1e-2, patience=5, tolerance=1e-3,
               save_path='best-model.pth', overfit_batch=False,
               ):
    
    
    if overfit_batch:
        k = 1
        # Create a subset of the dataset of size k*batch_size and use this instead
        rng = np.random.default_rng(seed=RANDOM_STATE)
        indices = rng.choice(np.arange(len(train_dataset)), size=k*batch_size, replace=False)
        train_dataset = Subset(train_dataset, indices)
        # Increase the number of epochs appropriately
        # total = epochs * len(dataset)
        #       = epochs * n_batches * batch_size
        #       = epochs * n_batches * k * (batch_size/k)
        # Thus, to keep roughly same total we do:
        epochs *= (batch_size // k) + 1
        # But we will use at most 200 epochs
        epochs = min(epochs, 200)
        print(f'Overfit Batch mode. The dataset now comprises of only {k} Batches. '
              f'Epochs increased to {epochs}.')
        
    
    
    train_loader = DataLoader(train_dataset, batch_size=batch_size, collate_fn=collate_fn,
                              pin_memory=True, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, collate_fn=collate_fn,
                            pin_memory=True)

    optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=l2)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.5)
    loss_fn = nn.CrossEntropyLoss()

    train_losses = []
    val_losses = []
    val_accuracies = []

    best_val_loss = float('+infinity')
    waiting = 0

    for t in range(epochs):
        # Train and validate
        print(f'----EPOCH {t}----')
        train_loss = train_loop(train_loader, model, loss_fn, optimizer)
        print(f'Train Loss: {train_loss}')
        
        # Validating is not usefull in overfit_batch mode.
        # We also won't use the scheduler in over_fit batch mode
        # because the epoch numbers become too large.
        if not overfit_batch:
            val_loss, val_accuracy = test_loop(val_loader, model, loss_fn)
            print(f'Val Loss: {val_loss}')
            print(f'Val Accuracy: {val_accuracy}')
            
            # Save the best model
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                torch.save(model, save_path)
                print('Saving')
                
            # Early Stopping
            if val_losses and val_losses[-1] - val_loss < tolerance:
                if waiting == patience:
                    print('Early Stopping')
                    break
                waiting += 1
                print(f'waiting = {waiting}')
            else:
                waiting = 0
        
            scheduler.step()
        
        train_losses.append(train_loss)
        if not overfit_batch:
            val_losses.append(val_loss)
            val_accuracies.append(val_accuracy)
        print()
        
    return train_losses, val_losses, val_accuracies

In [7]:
# The parameters in the following were chosen so that they work well with overfit_batch=True

def train_mel_raw(overfit_batch=False):
    train_dataset = mel_raw_train
    val_dataset = mel_raw_val
    input_dim = train_dataset[0][0].shape[1]
    output_dim = len(labels_str)
    model = CustomLSTM(input_dim, 512, output_dim, bidirectional=True, dropout=0.2).to(DEVICE)

    losses = train_eval(model, train_dataset, val_dataset,
                    batch_size=128, epochs=50, lr=1e-3,
                    overfit_batch=overfit_batch, save_path='best-mel-raw.pth')
    if not overfit_batch:
        with open('losses-mel-raw.pkl', 'wb') as f:
            pickle.dump(losses, f)


def train_mel_beat(overfit_batch=False):
    train_dataset = mel_beat_train
    val_dataset = mel_beat_val
    input_dim = train_dataset[0][0].shape[1]
    output_dim = len(labels_str)
    model = CustomLSTM(input_dim, 256, output_dim, bidirectional=True, dropout=0.1).to(DEVICE)

    losses = train_eval(model, train_dataset, val_dataset,
                    batch_size=512, epochs=200, lr=1e-3,
                    overfit_batch=overfit_batch, save_path='best-mel-beat.pth')
    if not overfit_batch:
        with open('losses-mel-beat.pkl', 'wb') as f:
            pickle.dump(losses, f)
    

def train_chroma_raw(overfit_batch=False):
    train_dataset = chroma_raw_train
    val_dataset = chroma_raw_val
    input_dim = train_dataset[0][0].shape[1]
    output_dim = len(labels_str)
    model = CustomLSTM(input_dim, 128, output_dim, bidirectional=True, dropout=0.1).to(DEVICE)

    losses = train_eval(model, train_dataset, val_dataset,
                    batch_size=256, epochs=50, lr=1e-3,
                    overfit_batch=overfit_batch, save_path='best-chroma-raw.pth')
    if not overfit_batch:
        with open('losses-chroma-raw.pkl', 'wb') as f:
            pickle.dump(losses, f)


def train_fused_raw(overfit_batch=False):
    train_dataset = fused_raw_train
    val_dataset = fused_raw_val
    input_dim = train_dataset[0][0].shape[1]
    output_dim = len(labels_str)
    model = CustomLSTM(input_dim, 512, output_dim, bidirectional=True, dropout=0.2).to(DEVICE)

    losses = train_eval(model, train_dataset, val_dataset,
                    batch_size=128, epochs=50, lr=1e-3,
                    overfit_batch=overfit_batch, save_path='best-fused-raw.pth')
    if not overfit_batch:
        with open('losses-fused-raw.pkl', 'wb') as f:
            pickle.dump(losses, f)

            
def train_fused_beat(overfit_batch=False):
    train_dataset = fused_beat_train
    val_dataset = fused_beat_val
    input_dim = train_dataset[0][0].shape[1]
    output_dim = len(labels_str)
    model = CustomLSTM(input_dim, 256, output_dim, bidirectional=True, dropout=0.1).to(DEVICE)

    losses = train_eval(model, train_dataset, val_dataset,
                    batch_size=512, epochs=200, lr=1e-3,
                    overfit_batch=overfit_batch, save_path='best-fused-beat.pth')
    if not overfit_batch:
        with open('losses-fused-beat.pkl', 'wb') as f:
            pickle.dump(losses, f)

In [8]:
def predict(test_dataset, model, batch_size=128, device=DEVICE):
    test_loader = DataLoader(test_dataset, batch_size=batch_size, collate_fn=collate_fn,
                             pin_memory=True)
    res = []
    with torch.inference_mode():
        for x, y, lengths in test_loader:
            x, y = x.to(device), y.to(device)
            probs = model(x, lengths)
            preds = torch.argmax(probs, 1)
            res.append(preds)
    return torch.cat(res, 0).cpu()


def report(model, test_dataset):
    y_true = test_dataset.labels
    y_pred = predict(test_dataset, model)
    print(classification_report(y_true, y_pred, zero_division=0))
    

def plot_learning_curves(path):
    with open(path, 'rb') as f:
        train_losses, val_losses, _ = pickle.load(f)
    fig, ax = plt.subplots(figsize=(9, 9))
    ax.plot(train_losses, label='Training Loss')
    ax.plot(val_losses, label='Validation Loss')
    name = path.split('.', 1)[0]
    ax.set_title(f'Learning Curves for {name}')
    ax.legend()

In [9]:
def step_5_6():
    train_mel_raw(overfit_batch=False)
    train_mel_beat(overfit_batch=False)
    train_chroma_raw(overfit_batch=False)
    train_fused_raw(overfit_batch=False)
    train_fused_beat(overfit_batch=False)
    
    model_mel_raw = torch.load('best-mel-raw.pth')
    model_mel_beat = torch.load('best-mel-beat.pth')
    model_chroma_raw = torch.load('best-chroma-raw.pth')
    model_fused_raw = torch.load('best-fused-raw.pth')
    model_fused_beat = torch.load('best-fused-beat.pth')
    
    print('Mel raw')
    report(model_mel_raw, mel_raw_test)
    print('\n\n')
    print('Mel beat-sync')
    report(model_mel_beat, mel_beat_test)
    print('\n\n')
    print('Chroma raw')
    report(model_chroma_raw, chroma_raw_test)
    print('\n\n')
    print('Fused raw')
    report(model_fused_raw, fused_raw_test)
    print('\n\n')
    print('Fused beat')
    report(model_fused_beat, fused_beat_test)


step_5_6()

In [13]:
plot_learning_curves('losses-fused-beat.pkl')

To calculate precision, recall and f1 of a class C, we view the our problem as a binary classification problem, with C being the positive class and all the other classes comprising the negative class.
Then we have,
* **Precision**: True Positives / (True Positives + False Positives). 

  Precision is the ability of the classifier not to label as positive a sample that is negative.
* **Recall**: True Positives / (True Positives + False Negatives). 

  Recall is the ability of the classifier to find all the positive samples.
* **F1**: True Positives / (True Positives + (False Positives + False Negatives)/2). 

  F1 is a mix of recall and precision. Instead of picking either False Positives or False Negatives, we just pick their average.

When we consider all the classes we have
* **Accuracy**: correct_predictions / predictions

and then we can generalize the inherently binary metrics Precision, Recall and F1, by averaging them in one of the following ways:

* **Macro**: Simply taking the mean of the results
* **Weighted**: Take the *weighted* mean of the results using class frequencies as weights.
* **Micro**: We consider all the True Positives, all the False Positives and all the False Negatives of all classes, and we use their sums to calculate Precision, Recall and F1. Note that micro F1 is the same as Accuracy (if each sample belongs to exactly one class), since (False Positives + False Negatives) / 2 = Number of misclassified data (a false negative for one class is a false postive for another, meaning we double-count).


In our case, all classes are equally important, so we should use their weighted average instead of the simple macro. Micro average is also a good global metric in our case.

In F1 score can be low when either the precision or recall are low. F1 differs from Accuracy mainly when the classes are imbalanced. For example consider that we have 90 negative samples and 10 positive ones, and a classifier which predicts everything correctly except fot 10 negative samples. In that case TP=10, FP=0, FN=10 and we have Accuracy = 9/10 and F1 = 2/3.

Micro F1 vs Macro F1 is essentially a generalization of Accuracy vs F1 in a multilabel setting. Micro F1 is equal to Accuracy (if each sample belongs to exactly one class), while the Macro F1 average the F1 scores for each class giving every class equal weight. This means that if heavily underrepresented class will influence the macro F1 as much as the other class, but it won't influence the micro F1.

Different problems require different metrics.
* We need high recall for problems where catching every positive is important, for example, identifying a severe desease, even at the cost of giving many false positives.
* We need high precision for problems where we false positives are costly. For example we don't want to classify an important email as spam, even at the cost of giving letting some spam go to the inbox.