TASK 1: For this task you are required to process a midi file and predict which composer wrote the piece of music. This task is evaluated
based on accuracy (percentage of correct predictions).

Try: CNN based Model

In [6]:
# Probably more imports than are really necessary...
import os
import torch
import torchaudio
from torch.utils.data import Dataset, DataLoader, random_split
import torch.nn as nn
import torch.nn.functional as F
from torchaudio.transforms import MelSpectrogram, AmplitudeToDB
from tqdm import tqdm
import librosa
import numpy as np
import miditoolkit
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import f1_score, average_precision_score, accuracy_score
import random

In [2]:
def accuracy1(groundtruth, predictions):
    correct = 0
    for k in groundtruth:
        if not (k in predictions):
            print("Missing " + str(k) + " from predictions")
            return 0
        if predictions[k] == groundtruth[k]:
            correct += 1
    return correct / len(groundtruth)

In [1]:
dataroot1 = "data/student_files/task1_composer_classification/"

In [4]:
class model1():
    def __init__(self):
        pass

    def features(self, path):
        midi_obj = miditoolkit.midi.parser.MidiFile(dataroot1 + '/' + path)
        notes = midi_obj.instruments[0].notes
        num_notes = len(notes)
        average_pitch = sum([note.pitch for note in notes]) / num_notes
        average_duration = sum([note.end - note.start for note in notes]) / num_notes
        features = [average_pitch, average_duration]
        return features
    
    def predict(self, path, outpath=None):
        d = eval(open(path, 'r').read())
        predictions = {}
        for k in d:
            x = self.features(k)
            pred = self.model.predict([x])
            predictions[k] = str(pred[0])
        if outpath:
            with open(outpath, "w") as z:
                z.write(str(predictions) + '\n')
        return predictions

    # Train your model. Note that this function will not be called from the autograder:
    # instead you should upload your saved model using save()
    def train(self, path):
        with open(path, 'r') as f:
            train_json = eval(f.read())
        X_train = [self.features(k) for k in train_json]
        y_train = [train_json[k] for k in train_json]
        
        model = LogisticRegression(max_iter=1000)
        model.fit(X_train, y_train)
        self.model = model

In [5]:
def run1():
    model = model1()
    model.train(dataroot1 + "/train.json")
    train_preds = model.predict(dataroot1 + "/train.json")
    test_preds = model.predict(dataroot1 + "/test.json", "predictions1.json")
    
    train_labels = eval(open(dataroot1 + "/train.json").read())
    acc1 = accuracy1(train_labels, train_preds)
    print("Task 1 training accuracy = " + str(acc1))

Custom Model 

In [2]:
import ast

def create_artist_mapping(json_path):
    with open(json_path, 'r') as f:
        midi_to_artist = ast.literal_eval(f.read())
        

    unique_artists = sorted(set(midi_to_artist.values()))
    id_to_artist = {i: artist for i, artist in enumerate(unique_artists)}

    
    return id_to_artist
idToArtist = create_artist_mapping("data/student_files/task1_composer_classification/train.json")

In [3]:
print(len(idToArtist))
print(idToArtist)

artistToId={}
for key,value in idToArtist.items():
    artistToId[value] = key
print(artistToId)

8
{0: 'Bach', 1: 'Beethoven', 2: 'Chopin', 3: 'Haydn', 4: 'Liszt', 5: 'Mozart', 6: 'Schubert', 7: 'Schumann'}
{'Bach': 0, 'Beethoven': 1, 'Chopin': 2, 'Haydn': 3, 'Liszt': 4, 'Mozart': 5, 'Schubert': 6, 'Schumann': 7}


In [4]:
def save_model(model, filepath='sol_1.pt'):
    """Save a PyTorch model to a file"""
    torch.save(model.state_dict(), filepath)
    print(f"Model saved to {filepath}")

def load_model(model_class, filepath='sol_1.pt', *args, **kwargs):
    """Load a PyTorch model from a file"""
    model = model_class(*args, **kwargs)  # instantiate the model
    model.load_state_dict(torch.load(filepath))
    model.eval()  # optional: sets dropout/batchnorm to eval mode
    print(f"Model loaded from {filepath}")
    return model


In [57]:
from mido import MidiFile
from sklearn.model_selection import train_test_split
from itertools import islice
import fluidsynth

SAMPLE_RATE = 25000

# create train loader 

def extract_waveform(path):
    # Your code here
    wave, sr = librosa.load(path, sr=SAMPLE_RATE)
    return wave 

def extract_spec(w):
    # Your code here
    # load
    stft = librosa.stft(y=w)
    # take squared absolute values
    spec = np.abs(stft) ** 2
    
    return torch.FloatTensor(spec)

def extract_q(w):
    # Your code here
    result = librosa.cqt(y=w, sr=SAMPLE_RATE)
    result = librosa.amplitude_to_db(np.abs(result))
    
    return torch.FloatTensor(result)

def pad_or_truncate(spec, max_time=2048):
    freq_bins, time_bins = spec.shape
    if time_bins > max_time:
        return spec[:, :max_time]
    elif time_bins < max_time:
        pad_width = max_time - time_bins
        return F.pad(spec, (0, pad_width), mode='constant', value=0)
    return spec


import pretty_midi

def features(path):
    # https://medium.com/composer-style-classification-using-deep-learning/composer-style-classification-using-deep-learning-6bab64490995
    #Spectral Centroid, Zero Crossing Rate, Chroma Frequencies, and Spectral Roll-off
    
    """Extract robust features from all MIDI tracks for composer classification."""
    full_path = dataroot1 + '/' + path
    
    midi_obj = pretty_midi.PrettyMIDI(full_path)
    
    # Synthesize to waveform
    w = midi_obj.fluidsynth()  # returns np array of audio
    
    # spec = extract_spec(w) # -> gets .320 and very slow 
    q = extract_q(w) #-> gets .850
    
    feature = pad_or_truncate(q, max_time=1000)

    return feature


def create_train_features(size=None, val_split=0.2):
    # Load data
    with open(dataroot1 + "/train.json", 'r') as f:
        train_json = eval(f.read())
    
    # Limit size if specified
    if size is not None:
        train_json = dict(list(train_json.items())[:size])
    
    # Extract features and labels
    X = [torch.tensor(features(key), dtype=torch.float32) for key, value in train_json.items()]
    Y = [artistToId[value] for key, value in train_json.items()]
    
    # Convert lists to tensors
    X = torch.stack(X)
    Y = torch.tensor(Y, dtype=torch.int64)
    
    # Return all data if no validation split needed
    if val_split <= 0:
        return X, Y
    
    # Split into training and validation sets
    X_train, X_val, Y_train, Y_val = train_test_split(
        X, Y, test_size=val_split, random_state=42, shuffle=True
    )
    
    return X_train, Y_train, X_val, Y_val
    
    
X_train, y_train, X_val, y_val = create_train_features()

print(X_train[0])
print(y_train[0])


  X = [torch.tensor(features(key), dtype=torch.float32) for key, value in train_json.items()]


tensor([[-4.8038e+01, -5.1450e+01, -5.8416e+01,  ..., -4.3110e-01,
          1.1619e+00,  2.4942e+00],
        [-4.1819e+01, -4.0434e+01, -3.8540e+01,  ..., -1.4225e+00,
          3.4195e-02,  1.0889e+00],
        [-3.9575e+01, -3.8109e+01, -3.7030e+01,  ..., -2.7528e+00,
         -1.8523e+00, -1.3869e+00],
        ...,
        [-6.1445e+01, -6.1445e+01, -6.1445e+01,  ..., -6.1445e+01,
         -6.1445e+01, -3.8426e+01],
        [-6.1445e+01, -6.1445e+01, -6.1445e+01,  ..., -6.1445e+01,
         -6.1445e+01, -4.2374e+01],
        [-6.1445e+01, -6.1445e+01, -6.1445e+01,  ..., -6.1445e+01,
         -6.1445e+01, -4.4460e+01]])
tensor(2)


In [58]:
print(len(X_train[0]))
feature_size = (len(X_train[0]))


84


In [59]:
from torch.utils.data import TensorDataset, DataLoader

train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

val_dataset = TensorDataset(X_val, y_val)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=True)

In [65]:
import torch.nn as nn
import torch.nn.functional as nnF

CLASSES = 8

class CNNClassifier(nn.Module):
    def __init__(self):
        super(CNNClassifier, self).__init__()
        
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.pool1 = nn.MaxPool2d(2, 2)
        
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.pool2 = nn.MaxPool2d(2, 2)
        
        self.conv3 = nn.Conv2d(64, 256, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(256)
        self.pool3 = nn.MaxPool2d(2, 2)

        self.conv4 = nn.Conv2d(256, 128, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm2d(128)
        self.pool4 = nn.AdaptiveAvgPool2d((1, 1))  # Global avg pool to flatten

        self.dropout = nn.Dropout(0.3)
        self.fc1 = nn.Linear(128, 128)
        self.fc2 = nn.Linear(128, CLASSES)

    def forward(self, x):
        x = x.unsqueeze(1)  # [B, 1, H, W]
        x = self.pool1(nnF.relu(self.bn1(self.conv1(x))))
        x = self.pool2(nnF.relu(self.bn2(self.conv2(x))))
        x = self.pool3(nnF.relu(self.bn3(self.conv3(x))))
        x = self.pool4(nnF.relu(self.bn4(self.conv4(x))))
        x = x.view(x.size(0), -1)
        # x = nnF.relu(self.fc1(x))
        x = self.dropout(nnF.relu(self.fc1(x)))
        x = self.fc2(x)
        return x

In [66]:
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch import optim


class model2():
    def __init__(self):
        self.model = CNNClassifier()
        self.train_acc = []
        self.val_acc = []
        return
    
    def predict(self, path, outpath=None):
        d = eval(open(path, 'r').read())
        predictions = {}
        for k in d:
            x = (torch.tensor(features(k), dtype=torch.float32))
            pred = self.model.predict([x])
            predictions[k] = str(idToArtist[pred[0]])
        if outpath:
            with open(outpath, "w") as z:
                z.write(str(predictions) + '\n')
        return predictions

    # Train your model. Note that this function will not be called from the autograder:
    # instead you should upload your saved model using save()
    import torch

    def train(self, train_loader, val_loader, epochs=5):
        torch.mps.empty_cache()

        device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
        # device = "cpu"

        model = self.model
        model.to(device)

        criterion = nn.CrossEntropyLoss()  
        optimizer = optim.Adam(model.parameters(), lr=0.001)
        scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=.9, patience=3)
        
    
        train_acc = []
        val_acc = []
    
        best_val_acc = 0.0
        patience_counter = 0
        patience=15
    
        for epoch in range(epochs):
            model.train()
            correct_train = 0
            total_train = 0
            for batch_x, batch_y in train_loader:
                batch_x = batch_x.to(device)
                batch_y = batch_y.to(device)

                optimizer.zero_grad()
                outputs = model(batch_x)
                loss = criterion(outputs, batch_y)
                loss.backward()
                optimizer.step()
                _, predicted = torch.max(outputs.data, 1)
                total_train += batch_y.size(0)
                correct_train += (predicted == batch_y).sum().item()
    
            train_accuracy = correct_train / total_train
            train_acc.append(train_accuracy)
    
            # Validation
            model.eval()
            correct_val = 0
            total_val = 0
            with torch.no_grad():
                for batch_x, batch_y in val_loader:
                    batch_x = batch_x.to(device)
                    batch_y = batch_y.to(device)

                    outputs = model(batch_x)
                    _, predicted = torch.max(outputs.data, 1)
                    total_val += batch_y.size(0)
                    correct_val += (predicted == batch_y).sum().item()
    
            val_accuracy = correct_val / total_val
            val_acc.append(val_accuracy)
            
            print(f'Epoch [{epoch+1}/{epochs}], Train Accuracy: {train_accuracy:.4f}, Validation Accuracy: {val_accuracy:.4f}, LR: {optimizer.param_groups[0]["lr"]}')
    
            # Step the LR scheduler
            scheduler.step(val_accuracy)
    
            # Early Stopping
            if val_accuracy > best_val_acc:
                best_val_acc = val_accuracy
                patience_counter = 0
                best_model_state = model.state_dict()  # Save best model
                save_model(model, 'sol_1_CNN.pt')
            else:
                patience_counter += 1
                if patience_counter >= patience:
                    print("Early stopping triggered.")
                    break
    
        # Load best model state
        model.load_state_dict(best_model_state)
    
        self.train_acc = train_acc
        self.val_acc = val_acc
        self.model = model
                
    def get_train_acc(self):
        return self.train_acc, self.val_acc
    
    def _get_model_copy(self, model):
        """Create a deep copy of the model."""
        model_copy = type(model)(*model.__init_args__, **model.__init_kwargs__)
        model_copy.load_state_dict(model.state_dict())
        return model_copy

In [67]:
EPOCHS = 100
model = model2()
model.train(train_loader, val_loader, EPOCHS)


# train_preds = model.predict(dataroot1 + "/train.json")
# test_preds = model.predict(dataroot1 + "/test.json", "predictions1.json")
# 
# train_labels = eval(open(dataroot1 + "/train.json").read())
# acc1 = accuracy1(train_labels, train_preds)
# print("Task 1 training accuracy = " + str(acc1))

Epoch [1/100], Train Accuracy: 0.3781, Validation Accuracy: 0.4504, LR: 0.001
Model saved to sol_1_CNN.pt
Epoch [2/100], Train Accuracy: 0.4349, Validation Accuracy: 0.2025, LR: 0.001
Epoch [3/100], Train Accuracy: 0.4514, Validation Accuracy: 0.4174, LR: 0.001
Epoch [4/100], Train Accuracy: 0.4752, Validation Accuracy: 0.4711, LR: 0.001
Model saved to sol_1_CNN.pt
Epoch [5/100], Train Accuracy: 0.4855, Validation Accuracy: 0.5041, LR: 0.001
Model saved to sol_1_CNN.pt
Epoch [6/100], Train Accuracy: 0.5021, Validation Accuracy: 0.4587, LR: 0.001
Epoch [7/100], Train Accuracy: 0.5207, Validation Accuracy: 0.4835, LR: 0.001
Epoch [8/100], Train Accuracy: 0.5207, Validation Accuracy: 0.3347, LR: 0.001
Epoch [9/100], Train Accuracy: 0.5382, Validation Accuracy: 0.3636, LR: 0.001
Epoch [10/100], Train Accuracy: 0.5300, Validation Accuracy: 0.5372, LR: 0.0009000000000000001
Model saved to sol_1_CNN.pt
Epoch [11/100], Train Accuracy: 0.5465, Validation Accuracy: 0.4669, LR: 0.0009000000000000

In [None]:
import matplotlib.pyplot as plt

train_acc, val_acc = model.get_train_acc()

x = range(1, EPOCHS + 1)

# Plot training accuracy
plt.plot(x, train_acc, label='Training Accuracy', color='blue', linestyle='-', marker='o')

# Plot validation accuracy
plt.plot(x, val_acc, label='Validation Accuracy', color='red', linestyle='--', marker='x')

# Adding labels and title
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.title('Training and Validation Accuracy per Epoch')

# Add a legend
plt.legend()

# Show the plot
plt.grid(True)
plt.show()

In [65]:
import pickle
with open('fullTrainData.pkl', 'wb') as f:
    data = {'train_loader': train_loader, 'val_loader':val_loader, 'train_dataset': train_dataset, 'val_dataset':val_dataset}
    pickle.dump(data, f)