# PyTorch Tutorial

## CLMR

In the following examples, we will be taking a look at how Contrastive Learning of Musical Representations (Spijkervet & Burgoyne, 2021) uses self-supervised learning to learn powerful representations for the downstream task of music tagging. 

<div align="center">
<img width="700" src="../images/janne/clmr_model.png"/>
</div>

In the above figure, we transform a single audio example into two, distinct augmented views by processing it through a set of stochastic audio augmentations.

In [3]:
!git clone https://github.com/spijkervet/clmr.git
!pip3 install clmr/

import sys
sys.path.append("clmr")

fatal: destination path 'clmr' already exists and is not an empty directory.
Processing ./clmr
Building wheels for collected packages: clmr
  Building wheel for clmr (setup.py) ... [?25ldone
[?25h  Created wheel for clmr: filename=clmr-0.1.0-py3-none-any.whl size=7258 sha256=11bd135a2d8a72ae95dca3dd62c0b59b639a64bbd879a4250d06dd3a01951213
  Stored in directory: /private/var/folders/5n/msbkkqhj2y9bhqwj2gvr70n40000gp/T/pip-ephem-wheel-cache-qq2_mgjk/wheels/47/96/99/4adc73d74f8b28040b7b1a2bbed5172c9ce04a9ba62645fc05
Successfully built clmr
Installing collected packages: clmr
  Attempting uninstall: clmr
    Found existing installation: clmr 0.1.0
    Uninstalling clmr-0.1.0:
      Successfully uninstalled clmr-0.1.0
Successfully installed clmr-0.1.0


In [None]:
import os
import random
import torch
import numpy as np
import soundfile as sf
from torch.utils import data
from torchaudio_augmentations import (
    RandomResizedCrop,
    RandomApply,
    PolarityInversion,
    Noise,
    Gain,
    HighLowPass,
    Delay,
    PitchShift,
    Reverb,
    Compose,
)


GTZAN_GENRES = ['blues', 'classical', 'country', 'disco', 'hiphop', 'jazz', 'metal', 'pop', 'reggae', 'rock']


class GTZANDataset(data.Dataset):
    def __init__(self, data_path, split, num_samples, num_chunks, is_augmentation):
        self.data_path =  data_path if data_path else ''
        self.split = split
        self.num_samples = num_samples
        self.num_chunks = num_chunks
        self.is_augmentation = is_augmentation
        self.genres = GTZAN_GENRES
        self._get_song_list()
        if is_augmentation:
            self._get_augmentations()

    def _get_song_list(self):
        list_filename = os.path.join(self.data_path, '%s_filtered.txt' % self.split)
        with open(list_filename) as f:
            lines = f.readlines()
        self.song_list = [line.strip() for line in lines]

    def _get_augmentations(self):
        transforms = [
            RandomResizedCrop(n_samples=self.num_samples),
            RandomApply([PolarityInversion()], p=0.8),
            RandomApply([Noise(min_snr=0.3, max_snr=0.5)], p=0.3),
            RandomApply([Gain()], p=0.2),
            RandomApply([HighLowPass(sample_rate=22050)], p=0.8),
            RandomApply([Delay(sample_rate=22050)], p=0.5),
            RandomApply([PitchShift(n_samples=self.num_samples, sample_rate=22050)], p=0.4),
            RandomApply([Reverb(sample_rate=22050)], p=0.3),
        ]
        self.augmentation = Compose(transforms=transforms)

    def _adjust_audio_length(self, wav):
        if self.split == 'train':
            random_index = random.randint(0, len(wav) - self.num_samples - 1)
            wav = wav[random_index : random_index + self.num_samples]
        else:
            hop = (len(wav) - self.num_samples) // self.num_chunks
            wav = np.array([wav[i * hop : i * hop + self.num_samples] for i in range(self.num_chunks)])
        return wav

    def __getitem__(self, index):
        line = self.song_list[index]

        # get genre
        genre_name = line.split('/')[0]
        genre_index = self.genres.index(genre_name)

        # get audio
        audio_filename = os.path.join(self.data_path, 'genres', line)
        wav, fs = sf.read(audio_filename)

        # adjust audio length
        wav = self._adjust_audio_length(wav).astype('float32')

        # data augmentation
        if self.is_augmentation:
            wav = self.augmentation(torch.from_numpy(wav).unsqueeze(0)).squeeze(0).numpy()

        return wav, genre_index

    def __len__(self):
        return len(self.song_list)

def get_dataloader(data_path=None, 
                   split='train', 
                   num_samples=22050 * 29, 
                   num_chunks=1, 
                   batch_size=16, 
                   num_workers=0, 
                   is_augmentation=False):
    is_shuffle = True if (split == 'train') else False
    batch_size = batch_size if (split == 'train') else (batch_size // num_chunks)
    data_loader = data.DataLoader(dataset=GTZANDataset(data_path, 
                                                       split, 
                                                       num_samples, 
                                                       num_chunks, 
                                                       is_augmentation),
                                  batch_size=batch_size,
                                  shuffle=is_shuffle,
                                  drop_last=False,
                                  num_workers=num_workers)
    return data_loader



### Audio Data Augmentations
- Crop
- Filter
- Reverb
- Polarity
- Noise
- Pitch
- Gain
- Delay


In [7]:
import torchaudio
from torchaudio_augmentations import (
    RandomApply,
    ComposeMany,
    RandomResizedCrop,
    PolarityInversion,
    Noise,
    Gain,
    HighLowPass,
    Delay,
    PitchShift,
    Reverb,
)

Now, let's apply a series of transformations, each applied with an independent probability:

In [2]:
train_transform = [
    RandomResizedCrop(n_samples=args.audio_length),
    RandomApply([PolarityInversion()], p=args.transforms_polarity),
    RandomApply([Noise()], p=args.transforms_noise),
    RandomApply([Gain()], p=args.transforms_gain),
    RandomApply([HighLowPass(sample_rate=args.sample_rate)], p=args.transforms_filters),
    RandomApply([Delay(sample_rate=args.sample_rate)], p=args.transforms_delay),
    RandomApply([PitchShift(n_samples=args.audio_length, sample_rate=args.sample_rate)], p=args.transforms_pitch),
    RandomApply([Reverb(sample_rate=args.sample_rate)], p=args.transforms_reverb),
]
num_augmented_samples = 2

## SampleCNN Encoder

In [18]:
import torch.nn as nn

class SampleCNN(nn.Module):
    def __init__(self, strides, supervised, out_dim):
        super(SampleCNN, self).__init__()

        self.strides = strides
        self.supervised = supervised
        self.sequential = [
            nn.Sequential(
                nn.Conv1d(1, 128, kernel_size=3, stride=3, padding=0),
                nn.BatchNorm1d(128),
                nn.ReLU(),
            )
        ]

        self.hidden = [
            [128, 128],
            [128, 128],
            [128, 256],
            [256, 256],
            [256, 256],
            [256, 256],
            [256, 256],
            [256, 256],
            [256, 512],
        ]

        assert len(self.hidden) == len(
            self.strides
        ), "Number of hidden layers and strides are not equal"
        for stride, (h_in, h_out) in zip(self.strides, self.hidden):
            self.sequential.append(
                nn.Sequential(
                    nn.Conv1d(h_in, h_out, kernel_size=stride, stride=1, padding=1),
                    nn.BatchNorm1d(h_out),
                    nn.ReLU(),
                    nn.MaxPool1d(stride, stride=stride),
                )
            )

        # 1 x 512
        self.sequential.append(
            nn.Sequential(
                nn.Conv1d(512, 512, kernel_size=3, stride=1, padding=1),
                nn.BatchNorm1d(512),
                nn.ReLU(),
            )
        )

        self.sequential = nn.Sequential(*self.sequential)

        if self.supervised:
            self.dropout = nn.Dropout(0.5)
        self.fc = nn.Linear(512, out_dim)
    
    def initialize(self, m):
        if isinstance(m, (nn.Conv1d)):
            # nn.init.xavier_uniform_(m.weight)
            # if m.bias is not None:
            #     nn.init.xavier_uniform_(m.bias)

            nn.init.kaiming_uniform_(m.weight, mode="fan_in", nonlinearity="relu")
            
    def forward(self, x):
        out = self.sequential(x)
        if self.supervised:
            out = self.dropout(out)

        out = out.reshape(x.shape[0], out.size(1) * out.size(2))
        logit = self.fc(out)
        return logit

## SimCLR

In [37]:
class SimCLR(nn.Module):

    def __init__(self, encoder, projection_dim, n_features):
        super(SimCLR, self).__init__()

        self.encoder = encoder
        self.n_features = n_features

        # Replace the fc layer with an Identity function
        self.encoder.fc = Identity()

        # We use a MLP with one hidden layer to obtain z_i = g(h_i) = W(2)σ(W(1)h_i) where σ is a ReLU non-linearity.
        self.projector = nn.Sequential(
            nn.Linear(self.n_features, self.n_features, bias=False),
            nn.ReLU(),
            nn.Linear(self.n_features, projection_dim, bias=False),
        )

    def forward(self, x_i, x_j):
        h_i = self.encoder(x_i)
        h_j = self.encoder(x_j)

        z_i = self.projector(h_i)
        z_j = self.projector(h_j)
        return h_i, h_j, z_i, z_j
    

class Identity(nn.Module):
    def __init__(self):
        super(Identity, self).__init__()

    def forward(self, x):
        return x

## Loss

Here, we apply an InfoNCE loss, as proposed by van den Oord et al. (2018) for contrastive learning. InfoNCE loss compares the similarity of our representations $z_i$ and $z_j$, to the similarity of $z_i$ to any other representation in our batch, and applies a softmax over the obtained similarity values. We can write this loss more formally as follows:

$$\ell_{i, j}=-\log \frac{\exp \left(\operatorname{sim}\left(z_{i}, z_{j}\right) / \tau\right)}{\sum_{k=1}^{2 N} \mathbb{1}_{[k \neq i]} \exp \left(\operatorname{sim}\left(z_{i}, z_{k}\right) / \tau\right)}
=-\operatorname{sim}\left(z_{i}, z_{j}\right) / \tau+\log \left[\sum_{k=1}^{2 N} \mathbb{1}_{[k \neq i]} \exp \left(\operatorname{sim}\left(z_{i}, z_{k}\right) / \tau\right)\right]$$


The similarity metric is the cosine similarity between our representations:

$$\operatorname{sim}\left(z_{i}, z_{j}\right)=\frac{z_{i}^{\top} \cdot z_{j}}{\left\|z_{i}\right\| \cdot\left\|z_{j}\right\|}$$


In [38]:
import torch
import torch.nn as nn
import torch.distributed as dist


class NT_Xent(nn.Module):
    def __init__(self, batch_size, temperature, world_size):
        super(NT_Xent, self).__init__()
        self.batch_size = batch_size
        self.temperature = temperature
        self.world_size = world_size

        self.mask = self.mask_correlated_samples(batch_size, world_size)
        self.criterion = nn.CrossEntropyLoss(reduction="sum")
        self.similarity_f = nn.CosineSimilarity(dim=2)

    def mask_correlated_samples(self, batch_size, world_size):
        N = 2 * batch_size * world_size
        mask = torch.ones((N, N), dtype=bool)
        mask = mask.fill_diagonal_(0)
        for i in range(batch_size * world_size):
            mask[i, batch_size * world_size + i] = 0
            mask[batch_size * world_size + i, i] = 0
        return mask

    def forward(self, z_i, z_j):
        """
        We do not sample negative examples explicitly.
        Instead, given a positive pair, similar to (Chen et al., 2017), we treat the other 2(N − 1) augmented examples within a minibatch as negative examples.
        """
        N = 2 * self.batch_size * self.world_size

        z = torch.cat((z_i, z_j), dim=0)

        sim = self.similarity_f(z.unsqueeze(1), z.unsqueeze(0)) / self.temperature

        sim_i_j = torch.diag(sim, self.batch_size * self.world_size)
        sim_j_i = torch.diag(sim, -self.batch_size * self.world_size)

        # We have 2N samples, but with Distributed training every GPU gets N examples too, resulting in: 2xNxN
        positive_samples = torch.cat((sim_i_j, sim_j_i), dim=0).reshape(N, 1)
        negative_samples = sim[self.mask].reshape(N, -1)

        labels = torch.zeros(N).to(positive_samples.device).long()
        logits = torch.cat((positive_samples, negative_samples), dim=1)
        loss = self.criterion(logits, labels)
        loss /= N
        return loss

In [None]:
from sklearn.metrics import accuracy_score, confusion_matrix
from clmr.models import SampleCNN
from clmr.utils import (
    load_encoder_checkpoint,
    load_finetuner_checkpoint,
)

batch_size = 48
n_classes = 50
encoder = SampleCNN(
    strides=[3, 3, 3, 3, 3, 3, 3, 3, 3],
    supervised=False,
    out_dim=n_classes,
)

n_features = (
    encoder.fc.in_features
)  # get dimensions of last fully-connected layer

model = SimCLR(encoder, projection_dim=64, n_features=n_features)

temperature = 0.5
optimizer = torch.optim.Adam(model.parameters(), lr=3e-4)
criterion = NT_Xent(batch_size, temperature, world_size=1)

epochs = 100
for e in range(epochs):
    optimizer.zero_grad()
    
    # for x, _ in train_dataloader:
    x_i = torch.randn(batch_size, 1, 59049)
    x_j = torch.randn(batch_size, 1, 59049)
    
    h_i, h_j, z_i, z_j = model(x_i, x_j)
    loss = criterion(z_i, z_j)
    
    loss.backward()
    optimizer.step()
    
    print(f"Loss: {loss}")

## Linear Evaluation

In [55]:
class LinearModel(nn.Module):

    def __init__(self, hidden_dim, output_dim):
        super().__init__()
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        self.model = nn.Linear(self.hidden_dim, self.output_dim)

    def forward(self, x):
        return self.model(x)

In [67]:


linear_model = LinearModel(n_features, n_classes)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(
    linear_model.parameters(),
    lr=3e-4,
)

# freeze SampleCNN encoder weights
for param in encoder.parameters():
    param.requires_grad = False


linear_epochs = 10
for e in range(linear_epochs):
    optimizer.zero_grad()
    
    x = torch.randn(batch_size, 1, 59049)
    y = torch.randn(batch_size).float()
    h = encoder(x)
    
    p = linear_model(h)
    
    loss = criterion(p, y)
        
    loss.backward()
    optimizer.step()
    break

RuntimeError: expected scalar type Long but found Float

In [None]:
import seaborn as sns
from sklearn.metrics import confusion_matrix


accuracy = accuracy_score(y_true, y_pred)
cm = confusion_matrix(y_true, y_pred)
sns.heatmap(cm, annot=True, xticklabels=GTZAN_GENRES, yticklabels=GTZAN_GENRES, cmap='YlGnBu')
print('Accuracy: %.4f' % accuracy)

## Load CLMR pre-training weights to encoder

Pre-training the encoder takes a while to complete, so let's load the pre-trained weights into our encoder now to speed this up:

In [21]:
from clmr.models import SampleCNN
from clmr.utils import (
    load_encoder_checkpoint,
    load_finetuner_checkpoint,
)


n_classes = 50
ENCODER_CHECKPOINT_PATH = "checkpoint.pt"

encoder = SampleCNN(
    strides=[3, 3, 3, 3, 3, 3, 3, 3, 3],
    supervised=False,
    out_dim=n_classes,
)

# get dimensions of last fully-connected layer
n_features = encoder.fc.in_features

# load the enoder weights from a CLMR checkpoint
# set the last fc layer to the Identity function, since we attach the
# fine-tune head seperately

# state_dict = load_encoder_checkpoint(ENCODER_CHECKPOINT_PATH)
# encoder.load_state_dict(state_dict)
# encoder.fc = Identity()

## Get ROC-AUC and PR-AUC scores on test set

In [None]:
import seaborn as sns
from sklearn.metrics import confusion_matrix


accuracy = accuracy_score(y_true, y_pred)
cm = confusion_matrix(y_true, y_pred)
sns.heatmap(cm, annot=True, xticklabels=GTZAN_GENRES, yticklabels=GTZAN_GENRES, cmap='YlGnBu')
print('Accuracy: %.4f' % accuracy)