# Chord classifier


## Packages installation

In [None]:
!wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
!chmod +x Miniconda3-latest-Linux-x86_64.sh
!bash ./Miniconda3-latest-Linux-x86_64.sh -b -f -p /usr/local

In [None]:
import sys
sys.path.append('/usr/local/lib/python3.7/site-packages/')

In [None]:
!conda install -c conda-forge ffmpeg libsndfile pandas numpy librosa matplotlib pytorch torchvision seaborn

In [None]:
%pip install spleeter

In [None]:
%pip install pandas numpy librosa matplotlib torch torchvision seaborn

In [None]:
%pip install ray

## Data preprocessing

The purpose of preprocessing step, which is crucial for training the model, is to convert each chord represented as `wav` file into   spectrograms.


In [None]:
import pandas as pd
import os

### Google Colab file management (no need to run outside Colab)

For usage of data in Google Colab, I mount my Google Drive and then copy files from it to the local instance to make processing faster.

In [None]:
from google.colab import drive
drive.mount('/gdrive')
%cd /gdrive

In [None]:
os.chdir("/content")

In [None]:
!cp -R "/gdrive/My Drive/chord_classifier/data" "data"

### Data exploration


In [None]:
from pathlib import Path

data_dir = "data"
preprocessed_train_data_dir = Path(os.path.join(data_dir, "preprocessed", "train"))
preprocessed_test_data_dir = Path(os.path.join(data_dir, "preprocessed", "test"))
pretrained_models_dir = Path("pretrained_models")
preprocessed_train_data_dir.mkdir(parents=True, exist_ok=True)
preprocessed_test_data_dir.mkdir(parents=True, exist_ok=True)
pretrained_models_dir.mkdir(parents=True, exist_ok=True)

train_data_dir = data_dir + "/Training"
test_data_dir = data_dir + "/Test"

Show the number of chord recordings available for training and testing

In [None]:
def get_chord_counts(data_dir):
    chord_counts = {}

    for chord in os.listdir(data_dir):
        chord_path = os.path.join(data_dir, chord)
        if os.path.isdir(chord_path):
            chord_counts[chord] = len(os.listdir(chord_path))

    return chord_counts

In [None]:
print(f"Training data chord count {get_chord_counts(train_data_dir)}")
print(f"Testing data chord count {get_chord_counts(test_data_dir)}")

Show the duration of sounds of chords. It ranges from 1.12 to 16.34 seconds with median about 5 seconds.

In [None]:
import librosa

durations = []

for chord in os.listdir(train_data_dir):
    chord_path = os.path.join(train_data_dir, chord)
    if os.path.isdir(chord_path):
        for file in os.listdir(chord_path):
            file_path = os.path.join(chord_path, file)
            y, sr = librosa.load(file_path, sr=None)
            durations.append(librosa.get_duration(y=y, sr=sr))

durations_df = pd.DataFrame(durations, columns=['duration'])
durations_df.describe()

A little trick to create an image from a spectrogram with matplotlib and then save it.

In [None]:
import os
import librosa.display
import numpy as np
import matplotlib.pyplot as plt

def create_spectrogram(audio_input, sr=None, save_path=None):
    # skip already existing sprectrogram
    if os.path.exists(save_path):
        print(f"Spectrogram  {save_path} already exists. Skipping.")
        return

    # Load the audio file if a path is provided, else use the provided audio data
    if isinstance(audio_input, str):
        y, sr = librosa.load(audio_input, sr=None)
    else:
        y = audio_input
        if sr is None:
            raise ValueError("Sampling rate must be provided with audio data")

    print(f"Creating spectrogram {save_path}")

    # Generate the Mel spectrogram
    S = librosa.feature.melspectrogram(y=y, sr=sr)
    S_dB = librosa.power_to_db(S, ref=np.max)

    # Plot and save the spectrogram
    plt.figure(figsize=(10, 4))
    librosa.display.specshow(S_dB, sr=sr, x_axis='time', y_axis='mel')
    plt.axis('off')
    plt.xticks([]), plt.yticks([])
    plt.tight_layout()

    if save_path:
        plt.savefig(save_path, bbox_inches='tight', pad_inches=0, format="jpg")
    plt.close()

    return S_dB


In [None]:
def convert_chords_to_spectrograms(source_dir, destination_dir, durations=[0.4, 0.5, 0.6]):
    for chord in os.listdir(source_dir):
        chord_path = os.path.join(source_dir, chord)
        preprocessed_chord_path = os.path.join(destination_dir, chord)

        if not os.path.exists(preprocessed_chord_path):
            os.makedirs(preprocessed_chord_path)

        if os.path.isdir(chord_path):
            for file in os.listdir(chord_path):
                file_path = os.path.join(chord_path, file)
                y, sr = librosa.load(file_path, sr=None)

                for beat_duration in durations:
                    samples_per_beat = int(beat_duration * sr)

                    for i in range(0, len(y), samples_per_beat):
                        end_frame = i + samples_per_beat
                        if end_frame > len(y):
                            end_frame = len(y)  # Adjust the end frame for the last segment

                        segment = y[i:end_frame]
                        duration_ms = int(beat_duration * 1000)  # Convert to milliseconds for filename
                        save_path = os.path.join(preprocessed_chord_path, f"{os.path.splitext(file)[0]}_{i // samples_per_beat}_{duration_ms}ms.jpg")
                        # Create spectrogram from a segment and save it to disk
                        create_spectrogram(segment, sr, save_path)



Convert train and test data. This is going to take a while.


In [None]:
convert_chords_to_spectrograms(train_data_dir, preprocessed_train_data_dir)
convert_chords_to_spectrograms(test_data_dir, preprocessed_test_data_dir)

Let's check the number of generated files.

In [None]:
print(f"Training data chord count {get_chord_counts(preprocessed_train_data_dir)}")
print(f"Testing data chord count {get_chord_counts(preprocessed_test_data_dir)}")

An example of a spectrogram

In [None]:
import matplotlib.image as mpimg

image_path = os.path.join(preprocessed_train_data_dir, 'Am', 'Am_acousticguitar_Mari_1_0.jpg')

img = mpimg.imread(image_path)

# Display the image
plt.imshow(img)
plt.axis('off')
plt.show()

## Track splitting


In [None]:
songs_dir = "songs"
separated_songs_dir = os.path.join(songs_dir, "separated")
song_file_name = "Pet-Sematary"
song_path = os.path.join(songs_dir, song_file_name + ".mp3")

Extracting the guitar track from the uploaded song. Let's check this feature on the Have You Ever Seen The Rain by Creedence Clearwater Revival which has a distinctive guitar track.


In [None]:
from IPython.display import Audio

Audio(song_path)


Spleeter will divide the song into 4 separate tracks: bass, drums, vocals and other. The guitar will be contained in "other".

In [None]:
import subprocess

command = f"spleeter separate -p spleeter:4stems -o {separated_songs_dir} {song_path}"
subprocess.run(command, shell=True)

In [None]:
guitar_track_path = os.path.join(separated_songs_dir, song_file_name, "other.mp3")

Audio(guitar_track_path)


## Generating spectrograms from the extracted track


In [None]:
output_dir = Path(os.path.join("data", "extracted", song_file_name))

The following function will process each segment of a track and convert it to a spectrogram. The spectrogram will be returned to be used as input to CNN and also will be saved to disk so that if we want to process the same track again, we could use existing data.

The following function detects beat length and generates a spectrogram for each beat. Currently, it returns an array of spectrograms. Later on, predictions of a chord with CNN will be plugged in and it will return a list of predictions.

In [None]:
import librosa

def process_track(audio_path, output_dir):
    y, sr = librosa.load(audio_path, sr=None)
    tempo, _ = librosa.beat.beat_track(y=y, sr=sr)

    print(f"Track tempo is {tempo}")
    beat_duration = 60.0 / tempo  # Duration of a beat in seconds

    print(f"Beat duration is {beat_duration} seconds")

    # Calculate the number of samples per beat
    samples_per_beat = int(beat_duration * sr)

    print(f"Samples per beat: {samples_per_beat}")

    spectrograms = []
    for i in range(0, len(y), samples_per_beat):
        end_frame = i + samples_per_beat
        if end_frame > len(y):
            end_frame = len(y)  # Adjust the end frame for the last segment

        segment = y[i:end_frame]
        filename = f'spectrogram_{i // samples_per_beat}.jpg'
        print(f"Create spectrogram {filename}")
        spectrogram = create_spectrogram(segment, sr, os.path.join(output_dir, filename))
        spectrograms.append(spectrogram)

    return spectrograms


To keep things simple, we will consider that the track already processed if the directory with the song name already exists. We can implement more sophisticated and robust checks like file hash sum later.

In [None]:
try:
    if os.path.exists(output_dir):
        print("The track is already processed", output_dir)
    else:
        os.makedirs(output_dir)
        print("Processing track", output_dir)
        spectrograms = process_track(guitar_track_path, output_dir)

        print(spectrograms[:10])
except Exception as e:
    print(e)
    os.removedirs(output_dir)


Let's visualize an example of generated spectrogram

In [None]:
import matplotlib.image as mpimg

files = os.listdir(output_dir)
image_path = os.path.join(output_dir, files[50])

print(f"Displaying the image at {image_path}")

img = mpimg.imread(image_path)

# Display the image
plt.imshow(img)
plt.axis('off')
plt.show()


## Fine tuning ResNet50


In [None]:
import torchvision.models as models
from torchvision.models import ResNet50_Weights
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset


Load the model and enable the last layer for fine tuning. If we have a saved version of the model, we can load its state.

In [None]:
import torch
# use GPU where available
device = "mps" if getattr(torch, 'has_mps', False) \
    else "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
def load_model(saved_model_path=None):
  num_classes = 8 # we have 8 chords in our dataset
  model = models.resnet50(weights=ResNet50_Weights.DEFAULT)

  # replacing the last layer for fine tuning
  num_features = model.fc.in_features
  model.fc = nn.Linear(num_features, num_classes)


  if saved_model_path:
    model.load_state_dict(torch.load(saved_model_path, map_location=torch.device(device)))

  return model.to(device)


In [None]:
model = load_model()

Categorical mapping of chords so that we could use them as numerical indexes in the neural network but also could decipher it back to actual labels for representation.

In [None]:
chord_labels = ["Am", "Bb", "Bdim", "C", "Dm", "Em", "F", "G"]
label_to_idx = {
    "Am": 0,
    "Bb": 1,
    "Bdim": 2,
    "C": 3,
    "Dm": 4,
    "Em": 5,
    "F": 6,
    "G": 7,
}

Define our dataset


In [None]:
from PIL import Image

class SpectrogramDataset(Dataset):
    def __init__(self, root_dir, transform=None, include_labels=True, image_mode="RGB"):
        """
        Args:
            root_dir (string): Directory with all the subdirectories for each label.
            transform (callable, optional): Optional transform to be applied on a sample.
        """
        self.transform = transform
        self.samples = []
        self.label_to_idx = label_to_idx
        self.include_labels = include_labels
        self.image_mode = image_mode

        # Iterate over each subdirectory in root_dir
        for label_dir in os.listdir(root_dir):
            label_dir_full_path = os.path.join(root_dir, label_dir)
            if os.path.isdir(label_dir_full_path):

                # Iterate over each file in the subdirectory
                for file in os.listdir(label_dir_full_path):
                    file_full_path = os.path.join(label_dir_full_path, file)
                    if os.path.isfile(file_full_path):
                        # Append the file path and its label to the samples list
                        self.samples.append((file_full_path, self.label_to_idx[label_dir]))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        img_path, label_idx = self.samples[idx]
        img = Image.open(img_path).convert(self.image_mode)

        if self.transform:
            img = self.transform(img)

        if self.include_labels:
            return img, label_idx

        return img


The function to resize the images to the size expected by ResNet (224x224). As the original images are not square, it also pads them with blank space to keep the proportions as they are crucial for a time representation like the spectrogram.

In [None]:
from PIL import Image, ImageOps

def resize_and_pad(spectrogram, target_size=(224, 224)):
    # Calculate the resize ratio and resize the spectrogram
    ratio = min(target_size[0] / spectrogram.width, target_size[1] / spectrogram.height)
    new_size = (int(spectrogram.width * ratio), int(spectrogram.height * ratio))
    spectrogram = spectrogram.resize(new_size, Image.LANCZOS)

    # Calculate padding
    delta_width = target_size[0] - new_size[0]
    delta_height = target_size[1] - new_size[1]
    padding = (delta_width // 2, delta_height // 2, delta_width - (delta_width // 2), delta_height - (delta_height // 2))

    # Add padding
    return ImageOps.expand(spectrogram, padding)


A function to calculate mean and standard deviation for normalization.


In [None]:
from torch.utils.data import DataLoader
import torch

def calculate_mean_std(loader):
    mean = 0.
    std = 0.
    total_images_count = 0
    for images, _ in loader:
        batch_samples = images.size(0)
        channels = images.size(1)
        images = images.view(batch_samples, channels, -1)
        mean += images.mean(2).sum(0)
        std += images.std(2).sum(0)
        total_images_count += batch_samples

    mean /= total_images_count
    std /= total_images_count

    return mean, std


Transform the data to the input format expected by the network. This step without the normalization as we first need to calculate the mean and standard deviation.


In [None]:
from torchvision import transforms

pre_transform = transforms.Compose([
    transforms.Lambda(resize_and_pad),
    transforms.ToTensor(),
])


The following operation is to calculate mean and std across the training dataset for normalization. It is an expensive operation so this cell is disabled and the following cell contains those values received from a previous calculation.

In [None]:
%%script false --no-raise-error
#| code: true
#| output: false
#} eval: false

pre_dataset = SpectrogramDataset(root_dir=preprocessed_train_data_dir, transform=pre_transform)

pre_loader = DataLoader(pre_dataset, batch_size=64, shuffle=True, num_workers=0)

mean, std = calculate_mean_std(pre_loader)
print(mean, std)


In [None]:
mean = [0.1923, 0.0816, 0.1362]
std = [0.3121, 0.1567, 0.1977]

Now that we have all necessary information, we can fully transform the dataset.

In [None]:
from torchvision import transforms

transform = transforms.Compose([
    transforms.Lambda(resize_and_pad),
    transforms.ToTensor(),
    transforms.Normalize(mean=mean, std=std),
])

In [None]:
dataset = SpectrogramDataset(root_dir=preprocessed_train_data_dir, transform=transform)
len(dataset)

Create validation dataset

In [None]:
from torch.utils.data import random_split

train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, validation_dataset = random_split(dataset, [train_size, val_size])
len(validation_dataset)

The following function is used for testing the model and returns current loss and accuracy. It is used both for hyperparameter tuning feedback and for testing the model on the test dataset.

In [None]:
import torch.nn as nn

criterion = nn.CrossEntropyLoss()

In [None]:
def evaluate_model(model, data_loader, criterion):
    model.eval()
    total_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    avg_loss = total_loss / len(data_loader)
    accuracy = correct / total
    return avg_loss, accuracy


In [None]:
max_epochs = 18

The model training loop. It uses checkpoints as the training is a long operation that might be interrupted and it that case we should be able to continue where we left off.

In [None]:
from ray import train
import tempfile
from ray.train import Checkpoint

def train_model(config):
    start = 1
    checkpoint = train.get_checkpoint()
    if checkpoint:
        print("Load from checkpoint", checkpoint)
        with checkpoint.as_directory() as checkpoint_dir:
            checkpoint_dict = torch.load(os.path.join(checkpoint_dir, "checkpoint.pt"))
            start = checkpoint_dict["epoch"] + 1
            model.load_state_dict(checkpoint_dict["model_state"])

    # optimizer = optim.Adam(model.parameters(), lr=config["lr"])
    optimizer = getattr(torch.optim, config["optimizer"])(model.parameters(), lr=config["lr"])

    train_loader = DataLoader(train_dataset, batch_size=config["batch_size"], shuffle=True)
    val_loader = DataLoader(validation_dataset, batch_size=config["batch_size"], shuffle=False)

    num_epochs = config.get("epochs", 7)

    for epoch in range(start, num_epochs):
      print(f"Start epoch {epoch}")
      model.train()
      running_loss = 0.0
      for inputs, labels in train_loader:
          inputs, labels = inputs.to(device), labels.to(device)

          optimizer.zero_grad()

          outputs = model(inputs)
          loss = criterion(outputs, labels)
          loss.backward()
          optimizer.step()

          running_loss += loss.item()

      # validation
      val_loss, val_accuracy = evaluate_model(model, val_loader, criterion)
      print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader)}")

      with tempfile.TemporaryDirectory() as tempdir:
        print("Save to checkpoint", os.path.join(tempdir, "checkpoint.pt"))
        torch.save(
            {"epoch": epoch, "model_state": model.state_dict()},
            os.path.join(tempdir, "checkpoint.pt"),
        )

        checkpoint=Checkpoint.from_directory(tempdir)
        # send data to Ray Tune at each epoch to allow the scheduler to cancel inefficient experiments early
        train.report({"val_loss": val_loss, "accuracy": val_accuracy})


## Hyperparameter tuning

Here I use a Ray Tune scheduler to be able to cut off inefficient experiments early and improve the efficiency of hyperparameter tuning

In [None]:
from ray.tune.schedulers import ASHAScheduler

scheduler = ASHAScheduler(
    metric="val_loss",
    mode="min",
    max_t = max_epochs,
    grace_period=1,
    reduction_factor=2,
)


These are hyperparameters that we are going to choose.

In [None]:
from ray import tune
from ray.tune import conditional

# TODO: run a short experiment for optimizer choice?
config = {
    "lr": tune.loguniform(1e-4, 1e-1),
    "batch_size": tune.choice([16, 32, 64]),
    "epochs": tune.choice([7, 10, 12, 15, 18]),
    "optimizer": tune.choice(["Adam", "SGD"]),
    "momentum": conditional(
        lambda spec: spec.config.optimizer == "SGD",
        tune.uniform(0.8, 0.99),
    ),
}

In [None]:
import ray
ray.shutdown()
ray.init(ignore_reinit_error=True, local_mode=True)

In [None]:
def get_best_params(config, train_model, scheduler, num_samples=10):
    analysis = tune.run(
        train_model,
        config=config,
        num_samples=num_samples,  # Number of times to sample from the hyperparameter space
        resources_per_trial={"cpu": 2, "gpu": 2560},  # Resources per trial
        scheduler=scheduler,
        resume="AUTO"
    )

    return analysis.get_best_config(metric="accuracy", mode="max")

In [None]:
# this is the most successful config received by running hyperparameter tuning on Google Colab
best_config = {"lr": 0.002610838003776132, "batch_size": 32, "epochs": 12}

Run this cell to get current best config result

In [None]:
best_config = get_best_params(config, train_model, scheduler, 30)
print("Best config: ", best_config)

### Training the model with optimal hyperparameters


In [None]:
train_model(best_config)

In [None]:
torch.save(model.state_dict(), os.path.join(pretrained_models_dir, "chord_classifier.pth"))

## Transfer Learning

Create a custom neural network which will accept features from pre-trained and fine tuned ResNet50 to make final classifications. This will allow for more flexibility in tuning the architecture for this particular task.

In [None]:
model = load_model(os.path.join(pretrained_models_dir, "chord_classifier_actual.pth"))
model.to(device)

In [None]:
import torch.nn as nn

class ChordClassifier(nn.Module):
    def __init__(self, input_size, num_classes):
        super().__init__()
        # accept features from ResNet50 (size 2048)
        self.fc1 = nn.Linear(input_size, 512)
        # add non linearity
        self.relu = nn.ReLU()
        # accept 512-dimensional features from the previous fully connected layer
        self.fc2 = nn.Linear(512, num_classes)
        # output predictions as probabilities
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        out = self.fc1(x)
        out = self.relu(out)
        out = self.fc2(out)
        out = self.softmax(out)
        return out

Replace the last ResNet50 layer with an identity function that will just return features without changes.

In [None]:
model.fc = nn.Identity()

Extract features using pre-trained and fine-tuned ResNet50 and save them into a HDF5 file on disk for training the classifier.

In [None]:
import h5py
import numpy as np
import torch


dataset = SpectrogramDataset(root_dir=preprocessed_train_data_dir, transform=transform)
train_loader = DataLoader(dataset, batch_size=32, shuffle=True)

with h5py.File('features_labels.h5', 'w') as h5f:
    for i, (inputs, labels) in enumerate(train_loader):
        inputs = inputs.to(device)
        with torch.no_grad():
            output = model(inputs)

        # Convert to numpy and write to disk
        features_batch = output.cpu().detach().numpy()
        labels_batch = labels.numpy()

        # Create datasets for the first batch and then append for subsequent batches
        if i == 0:
            h5f.create_dataset('features', data=features_batch, maxshape=(None, features_batch.shape[1]), chunks=True)
            h5f.create_dataset('labels', data=labels_batch, maxshape=(None,), chunks=True)
        else:
            h5f['features'].resize((h5f['features'].shape[0] + features_batch.shape[0]), axis=0)
            h5f['features'][-features_batch.shape[0]:] = features_batch
            h5f['labels'].resize((h5f['labels'].shape[0] + labels_batch.shape[0]), axis=0)
            h5f['labels'][-labels_batch.shape[0]:] = labels_batch

In [None]:
import h5py
import torch
from torch.utils.data import TensorDataset, DataLoader

# Load features and labels
with h5py.File('features_labels.h5', 'r') as hf:
    features = hf['features'][:]
    labels = hf['labels'][:]

features = torch.tensor(features, dtype=torch.float)
labels = torch.tensor(labels, dtype=torch.long)

Split the data into training and validation sets

In [None]:
from sklearn.model_selection import train_test_split

features_train, features_val, labels_train, labels_val = train_test_split(
    features, labels, test_size=0.2, random_state=42)

Create datasets

In [None]:
train_dataset = TensorDataset(features_train, labels_train)
val_dataset = TensorDataset(features_val, labels_val)

Train and validate

In [None]:
classifier_model = ChordClassifier(input_size=features.shape[1], num_classes=len(torch.unique(labels)))


In [None]:
from ray import train

def train_classifier_model(config):
    num_epochs = config['epochs']

    optimizer = optim.Adam(model.parameters(), lr=config["lr"])
    criterion = nn.CrossEntropyLoss()

    train_loader = DataLoader(train_dataset, batch_size=config["batch_size"], shuffle=True)
    val_loader = DataLoader(validation_dataset, batch_size=config["batch_size"], shuffle=False)

    for epoch in range(num_epochs):
        print(f"Start epoch {epoch}")
        classifier_model.train()
        for data, target in train_loader:
            optimizer.zero_grad()
            output = classifier_model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()

        # Validation loop
        classifier_model.eval()
        val_loss = 0
        correct = 0
        with torch.no_grad():
            for data, target in val_loader:
                output = classifier_model(data)
                val_loss += criterion(output, target).item()
                pred = output.argmax(dim=1, keepdim=True)
                correct += pred.eq(target.view_as(pred)).sum().item()

        val_loss, val_accuracy = evaluate_model(classifier_model, val_loader, criterion)
        train.report({"val_loss": val_loss, "accuracy": val_accuracy})
        print(f'Epoch {epoch+1}, Val Loss: {val_loss:.4f}, Val Accuracy: {correct / len(val_loader.dataset):.4f}')

Hyperparameters tuning. The classifier is a simpler network than ResNet50 and trains much faster so we can afford more relaxed scheduler settings and deeper exploration of the hyperparameters space.

In [None]:
from ray.tune.schedulers import ASHAScheduler

scheduler = ASHAScheduler(
    metric="val_loss",
    mode="min",
    max_t = 200,
    grace_period=3,
    reduction_factor=2,
)

In [None]:
from ray import tune

config = {
    "lr": tune.loguniform(1e-4, 1e-1),
    "batch_size": tune.choice([8, 16, 32, 64]),
    "epochs": tune.choice([50, 100, 150, 200, 250, 300])
}

In [None]:
best_config = get_best_params(config, train_classifier_model, scheduler, 100)
print("Best config: ", best_config)

In [None]:
train_classifier_model()

## Testing the model


In [None]:
test_dataset = SpectrogramDataset(root_dir=preprocessed_test_data_dir, transform=transform, include_labels=False)
len(test_dataset)

In [None]:
test_loader = DataLoader(train_dataset, batch_size=best_config["batch_size"], shuffle=True)

In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix

model.eval()

all_predictions = []
all_targets = []

with torch.no_grad():
    for inputs, targets in test_loader:
        inputs = inputs.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)

        all_predictions.extend(predicted.cpu().numpy())
        all_targets.extend(targets.cpu().numpy())

# Calculate metrics
precision = precision_score(all_targets, all_predictions, average='macro')
recall = recall_score(all_targets, all_predictions, average='macro')
f1 = f1_score(all_targets, all_predictions, average='macro')

conf_matrix = confusion_matrix(all_targets, all_predictions, labels=range(len(chord_labels)))


print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")


Metrics received on test dataset:

Precision: 0.9044754119873304

Recall: 0.8930117832821793

F1 Score: 0.8953096711745837

In [None]:
conf_matrix

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(10, 8))
sns.heatmap(conf_matrix, annot=True, fmt='g', xticklabels=chord_labels, yticklabels=chord_labels)
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.show()


## Inference


In [None]:
from PIL import Image

def load_spectrogram(file_path):
    image = Image.open(file_path).convert('RGB')
    return transform(image)

Switch to evaluation mode and convert logits to probabilities using softmax


In [None]:
import torch

# Ensure the model is in evaluation mode
model.eval()

# Define the softmax function
softmax = torch.nn.Softmax(dim=1)

Sort spectrograms by file name as it contains its order number. We need to have it ordered to be able to output it aligned with track timeline.

In [None]:
output_dir

In [None]:
import os

# List all jpg files in the output directory
file_names = [file for file in os.listdir(output_dir) if file.endswith('.jpg')]

# Sort the file names (natural sort or simple alphanumeric sort)
file_names.sort(key=lambda f: int(f.split('_')[-1].split('.')[0]))

spectrograms = [load_spectrogram(os.path.join(output_dir, file)) for file in file_names]

Make predictions for each spectrogram. For each prediction we get top N results. We also map them to labels for output.

In [None]:
index_to_label = {v: k for k,v in label_to_idx.items()}

N = 3  # Number of top predictions

top_labels_all = []
top_confidences_all = []

for spectrogram in spectrograms:
    # make sure spectrograms are on the same device as the model, otherwise there will be input type mismatch
    spectrogram = spectrogram.to(device)
    # Apply the model and get the prediction
    output = model(spectrogram.unsqueeze(0))  # Add batch dimension
    probabilities = softmax(output)

    # Get the top N predictions
    top_probs, top_preds = torch.topk(probabilities, N, dim=1)

    # Convert to labels and confidences
    top_labels = [index_to_label[pred.item()] for pred in top_preds[0]]
    top_confidences = [prob.item() for prob in top_probs[0]]

    top_labels_all.append(top_labels)
    top_confidences_all.append(top_confidences)

## Output

* For each beat, we get top N predictions
* Low confidence predictions with threshold lower than 0.3 will be discarded
* We will consider two adjacents beats at once. First, chords that appear in both beats predictions will be chosen. If there are several such chords, then the chord with the highest average confidence will be chosen.

In [None]:
for file, labels, confidences in zip(file_names[:20], top_labels_all[:20], top_confidences_all[:20]):
    print(f"{file}:")
    for label, confidence in zip(labels, confidences):
        print(f"  {label}: {confidence:.4f}")

In [None]:
def map_label_to_confidence(labels, confidences):
    return {label: confidence for label, confidence in zip(labels, confidences)}

In [None]:
def get_with_max_confidence(common_labels, prev_labels_to_confidence, labels_to_confidence):
    highest_conf_label = None
    highest_confidence = 0

    for label in common_labels:
        avg_conf = (prev_labels_to_confidence.get(label, 0) + labels_to_confidence.get(label, 0)) / 2
        if avg_conf > highest_confidence:
            highest_confidence = avg_conf
            highest_conf_label = label

    return highest_conf_label, highest_confidence

In [None]:
def process_chord_predictions(top_labels_all, top_confidences_all, confidence_threshold=0.3):
    chord_predictions = []

    for i in range(1, len(top_labels_all), 2):
        prev_labels = top_labels_all[i - 1]
        labels = top_labels_all[i]
        prev_confidences = top_confidences_all[i - 1]
        confidences = top_confidences_all[i]

        prev_labels_to_confidence = map_label_to_confidence(prev_labels, prev_confidences)
        labels_to_confidence = map_label_to_confidence(labels, confidences)

        # Filter labels by confidence threshold
        prev_filtered = {label: conf for label, conf in prev_labels_to_confidence.items() if conf >= confidence_threshold}
        filtered = {label: conf for label, conf in labels_to_confidence.items() if conf >= confidence_threshold}

        common_labels = set(prev_filtered.keys()).intersection(filtered.keys())

        if common_labels:
            chosen_label, chosen_confidence = get_with_max_confidence(common_labels, prev_labels_to_confidence, labels_to_confidence)
        else:
            # Combine and sort all labels by confidence, regardless of threshold, if no common labels meet the threshold
            all_labels_confidences = list(prev_labels_to_confidence.items()) + list(labels_to_confidence.items())
            chosen_label, chosen_confidence = max(all_labels_confidences, key=lambda x: x[1])

        chord_predictions.append((chosen_label, chosen_confidence))

    return chord_predictions

In [None]:
chord_predictions = process_chord_predictions(top_labels_all, top_confidences_all)
print(chord_predictions)

Combine predictions into a chord sequence.

In [None]:
combined_predictions = []

# Initialize the previous chord variable with None
prev_chord = None

# Iterate through each prediction
for chord, confidence in chord_predictions:
    # Check if the current chord is different from the previous chord
    if chord != prev_chord:
        # If it's different, append it to the combined list
        combined_predictions.append(chord)
        # Update the previous chord
        prev_chord = chord

In [None]:
from IPython.display import display, HTML

def display_chords(chord_predictions):
    # Define a color scheme for each chord
    chord_colors = {
        "Am": "#FFD700",  # Gold
        "Bb": "#FF4500",  # OrangeRed
        "Bdim": "#1E90FF",  # DodgerBlue
        "C": "#32CD32",  # LimeGreen
        "Dm": "#BA55D3",  # MediumOrchid
        "Em": "#FF69B4",  # HotPink
        "F": "#00CED1",  # DarkTurquoise
        "G": "#FFA500",  # Orange
    }

    # Start the HTML string for output
    output = "<div style='display: flex; flex-wrap: wrap;'>"

    # Counter to keep track of chords per line
    chords_per_line = 0

    for i, chord in enumerate(chord_predictions, start=1):
        # Get the color for the current chord
        color = chord_colors.get(chord, "grey")  # Default to grey if chord not found

        # Create a div for the chord with the specific background color and white text for contrast
        output += f"<div style='color: white; margin: 5px; padding: 10px; background-color: {color}; width: 100px; text-align: center;'>{chord}</div>"

        # Increment the counter
        chords_per_line += 1

        # Check if we've reached 4 chords or the end of the list, then reset counter and add a line break
        if chords_per_line == 4 or i == len(chord_predictions):
            output += "<div style='flex-basis: 100%; height: 0;'></div>"  # This creates a line break in flexbox
            chords_per_line = 0

    # Close the HTML string
    output += "</div>"

    # Display the HTML in the Jupyter Notebook
    display(HTML(output))


In [None]:
display_chords(combined_predictions)