# Pytorch Approach

## Dataset Creation

In [110]:
import torch
from torchvision.datasets import DatasetFolder
from torchvision.transforms import Compose
import torchaudio
from torch.utils.data import DataLoader, random_split
import torch.nn as nn
from torchvision import  models



In [111]:
if torch.backends.mps.is_available():
        device = "mps"

else:
    device = "cpu"
print(f"Using device {device}")

Using device mps


In [112]:
import os

def list_subdirectories(directory_path):

    subdirectories = []
    for item in os.listdir(directory_path):
        if os.path.isdir(os.path.join(directory_path, item)):
            subdirectories.append(item)

    return subdirectories

directory_path = "./songs"
genres = list_subdirectories(directory_path)
print(genres)

['psytrance', 'house', 'dupstep', 'hardcore_breaks', 'techno']


In [113]:
label2id, id2label = dict(), dict()

for i, label in enumerate(genres):
    label2id[label] = i
    id2label[i] = label

In [114]:
label2id,id2label

({'psytrance': 0, 'house': 1, 'dupstep': 2, 'hardcore_breaks': 3, 'techno': 4},
 {0: 'psytrance', 1: 'house', 2: 'dupstep', 3: 'hardcore_breaks', 4: 'techno'})

In [115]:


class GenreDataset(torch.utils.data.Dataset):
    def __init__(self, root_dir, transformation, num_samples,
                target_sample_rate, device):
        self.root_dir = root_dir
        self.genres = os.listdir(root_dir)
        self.files = {}
        for genre in self.genres:
            self.files[genre] = os.listdir(os.path.join(root_dir, genre))

        self.device = device
        self.transformation = transformation.to(self.device)
        self.target_sample_rate = target_sample_rate
        self.num_samples = num_samples

    def __len__(self):
        return sum(len(files) for files in self.files.values())

    def __getitem__(self, idx):
        genre = None
        file_idx = 0
        for g, files in self.files.items():
            if idx < len(files):
                genre = g
                file_idx = idx
                break
            else:
                idx -= len(files)
        audio_path = os.path.join(self.root_dir, genre, self.files[genre][file_idx])
        genre = label2id[genre]
        # Load audio file
        signal, sr = torchaudio.load(audio_path, format="mp3")
        signal = signal.to(self.device)
        signal = self._resample_if_necessary(signal, sr)
        signal = self._mix_down_if_necessary(signal)
        signal = self._cut_if_necessary(signal)
        signal = self._right_pad_if_necessary(signal)
        signal = self.transformation(signal)
        signal = signal.repeat(3, 1, 1)
        # signal = signal.transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        return signal, genre

    def _cut_if_necessary(self, signal):
        if signal.shape[1] > self.num_samples:
            signal = signal[:, :self.num_samples]
        return signal

    def _right_pad_if_necessary(self, signal):
        length_signal = signal.shape[1]
        if length_signal < self.num_samples:
            num_missing_samples = self.num_samples - length_signal
            last_dim_padding = (0, num_missing_samples)
            signal = torch.nn.functional.pad(signal, last_dim_padding)
        return signal

    def _resample_if_necessary(self, signal, sr):
        if sr != self.target_sample_rate:
            resampler = torchaudio.transforms.Resample(sr, self.target_sample_rate).to(self.device)
            signal = resampler(signal)
        return signal

    def _mix_down_if_necessary(self, signal):
        if signal.shape[0] > 1:
            signal = torch.mean(signal, dim=0, keepdim=True)
        return signal








In [116]:

SAMPLE_RATE = 16000
NUM_SAMPLES = 16000*3
IMAGE_SIZE = 224

mel_spectrogram = torchaudio.transforms.MelSpectrogram(
        sample_rate=SAMPLE_RATE,
        n_fft=1024,
        hop_length=512,
        n_mels=64,
        normalized = True
    )




In [117]:
torch_dataset = GenreDataset("./songs/",
                            # transformations,
                            mel_spectrogram,
                            SAMPLE_RATE,
                            NUM_SAMPLES,
                            device)

In [118]:
print(f"There are {len(torch_dataset)} samples in the dataset.")



There are 375 samples in the dataset.


In [119]:
torch_dataset[0]

(tensor([[[4.4472e-03, 1.1521e+00, 2.6496e+01,  ..., 1.4050e+01,
           1.6485e+01, 6.1929e+01],
          [2.3156e-02, 9.2341e+00, 4.8718e+01,  ..., 6.4142e+00,
           4.3728e+00, 1.0904e+01],
          [2.5428e-01, 5.8940e+00, 4.9396e+00,  ..., 1.7173e+00,
           5.6762e+00, 1.7210e+00],
          ...,
          [6.6990e-07, 3.8549e-11, 2.6087e-10,  ..., 8.3255e-11,
           1.1596e-10, 1.9073e-06],
          [5.5068e-07, 1.7485e-11, 2.8538e-11,  ..., 1.0314e-10,
           2.1542e-10, 2.0002e-06],
          [4.9015e-07, 1.4572e-11, 8.7152e-11,  ..., 2.9891e-11,
           6.0339e-11, 2.0758e-06]],
 
         [[4.4472e-03, 1.1521e+00, 2.6496e+01,  ..., 1.4050e+01,
           1.6485e+01, 6.1929e+01],
          [2.3156e-02, 9.2341e+00, 4.8718e+01,  ..., 6.4142e+00,
           4.3728e+00, 1.0904e+01],
          [2.5428e-01, 5.8940e+00, 4.9396e+00,  ..., 1.7173e+00,
           5.6762e+00, 1.7210e+00],
          ...,
          [6.6990e-07, 3.8549e-11, 2.6087e-10,  ..., 8.325

In [120]:
torch_dataset[0][0].shape


torch.Size([3, 64, 32])

## Model

In [121]:
num_classes = len(genres)
num_classes

5

In [122]:
model = models.resnet18(pretrained=True)
model.fc = torch.nn.Linear(model.fc.in_features, num_classes)



In [123]:
BATCH_SIZE = 8

In [124]:
# Calculate the size of the validation set as 10% of the training data.
val_size = int(len(torch_dataset)*0.1)

# The rest of the data will be the training data.
train_size = len(torch_dataset) - val_size

# Split the training data into training and validation sets.
train_ds, val_ds = random_split(torch_dataset, [train_size,val_size])

# Create data loaders for the training and validation sets.
# This will allow us to load data in batches.
train_dl = DataLoader(train_ds, BATCH_SIZE, shuffle=True, num_workers=0, pin_memory=True)
val_dl = DataLoader(val_ds, BATCH_SIZE*2, num_workers=0, pin_memory=True)

In [125]:
dataloaders = {"train" : train_dl, "val":val_dl}

In [126]:
dataset_sizes = {"train" : len(train_ds), "val":len(val_ds)}

In [127]:
dataset_sizes

{'train': 338, 'val': 37}

In [128]:
import torch.optim as optim
from torch.optim import lr_scheduler
import time
import copy

model = model.to(device)

criterion = nn.CrossEntropyLoss()

# Observe that all parameters are being optimized
# optimizer = optim.SGD(model.parameters(), lr=0.001)
optimizer = optim.Adam(model.parameters(), lr = 0.001, betas=(0.9, 0.999))
# StepLR Decays the learning rate of each parameter group by gamma every step_size epochs
step_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)



In [129]:
#lists for graph generation
epoch_counter_train = []
epoch_counter_val = []
train_loss = []
val_loss = []
train_acc = []
val_acc = []

In [130]:
dataloaders


{'train': <torch.utils.data.dataloader.DataLoader at 0x362dbd8d0>,
 'val': <torch.utils.data.dataloader.DataLoader at 0x362dcd750>}

In [131]:
def train_model(model, criterion, optimizer, scheduler, num_epochs):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch +1, num_epochs))
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                scheduler.step()
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            #For graph generation
            if phase == "train":
                train_loss.append(running_loss/dataset_sizes[phase])
                train_acc.append(running_corrects / dataset_sizes[phase])
                epoch_counter_train.append(epoch)
            if phase == "val":
                val_loss.append(running_loss/ dataset_sizes[phase])
                val_acc.append(running_corrects / dataset_sizes[phase])
                epoch_counter_val.append(epoch)

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects / dataset_sizes[phase]

            #for printing        
            if phase == "train":    
                epoch_loss = running_loss / dataset_sizes[phase]
                epoch_acc = running_corrects / dataset_sizes[phase]
            if phase == "val":    
                epoch_loss = running_loss / dataset_sizes[phase]
                epoch_acc = running_corrects / dataset_sizes[phase]
            
            
            print('{} Loss: {:.4f} Acc: {:.4f}'.format(
                phase, epoch_loss, epoch_acc))

            # deep copy the best model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model


In [132]:
model = train_model(model, criterion, optimizer, step_lr_scheduler, num_epochs=20)


Epoch 1/20
----------
train Loss: 1.9078 Acc: 0.3136
val Loss: 1.4083 Acc: 0.3784

Epoch 2/20
----------
train Loss: 1.4217 Acc: 0.3994
val Loss: 1.6478 Acc: 0.3784

Epoch 3/20
----------
train Loss: 1.4439 Acc: 0.4408
val Loss: 1.7679 Acc: 0.5676

Epoch 4/20
----------
train Loss: 1.4096 Acc: 0.4852
val Loss: 29.3191 Acc: 0.2162

Epoch 5/20
----------
train Loss: 1.4181 Acc: 0.4172
val Loss: 1.5019 Acc: 0.4865

Epoch 6/20
----------
train Loss: 1.2882 Acc: 0.4734
val Loss: 1.4903 Acc: 0.3514

Epoch 7/20
----------
train Loss: 1.2372 Acc: 0.4645
val Loss: 1.2320 Acc: 0.5135

Epoch 8/20
----------
train Loss: 1.0583 Acc: 0.5888
val Loss: 1.1834 Acc: 0.5676

Epoch 9/20
----------
train Loss: 0.9927 Acc: 0.6036
val Loss: 1.0217 Acc: 0.5405

Epoch 10/20
----------
train Loss: 0.9638 Acc: 0.6213
val Loss: 0.9422 Acc: 0.5676

Epoch 11/20
----------
train Loss: 0.9259 Acc: 0.6805
val Loss: 0.9830 Acc: 0.5135

Epoch 12/20
----------
train Loss: 0.8313 Acc: 0.6953
val Loss: 0.8650 Acc: 0.6486

