In [None]:
import torch
import torch.nn as nn
import librosa
import dagshub
import mlflow
import subprocess
import json
import os
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from math import ceil
from pathlib import Path
from torch.utils.data import Dataset, DataLoader
from torch.utils.tensorboard.writer import SummaryWriter
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix

In [None]:
if torch.cuda.is_available():
    # GPU is available
    print("GPU is available.")
    print(f"Current GPU: {torch.cuda.get_device_name(0)}")
else:
    # GPU is not available
    print("GPU is not available. PyTorch will use the CPU.")

In [None]:
dagshub.init(repo_owner="stephenjera", repo_name="Genre-Classification", mlflow=True)

In [None]:
SAMPLE_RATE = 22050
DURATION = 30  # length of audio files measured in seconds
NUM_SEGMENTS = 1
SAMPLES_PER_TRACK = SAMPLE_RATE * DURATION

In [None]:
data_directory = Path.cwd().parent / "data"
genres_dir = data_directory / "genres"

In [None]:
def save_mfcc(
    dataset_path: str | Path, #Can input the path or of string format
    json_path: str,
    samples_per_track: int,
    n_mfcc=13,
    n_fft=2048,
    hop_length=512,
    num_segments=5,
):
    """Creates a JSON file of the MFCCs for the dataset
    :param
    ----------
     dataset_path: path to the dataset folder
     json_path: name of JSON file to be created
     n_mfcc: number of MFCC coefficients to create
     n_fft:
     hop_length:
     num_segments:

    """
    # Create a dictionary to map semantic labels to numerical labels
    semantic_to_numeric = {}
    # dictionary to store data
    data = {
        "mappings": {},
        "mfcc": [],
        "labels": [],
    }
    num_samples_per_segment = int(samples_per_track / num_segments)
    expected_num_mfcc_vectors_per_segment = ceil(
        num_samples_per_segment / hop_length
    )  # round up always

    # Loop through all the data
    for i, (dirpath, _, filenames) in enumerate(os.walk(dataset_path)):
        # dirpath = current folder path
        # dirnames = subfolders in dirpath
        # filenames = all files in dirpath

        # ensure that we're not at the root level (Audio folder)
        if dirpath != str(dataset_path):
            # save the semantic label
            dirpath_components = dirpath.split(os.sep)
            semantic_label = dirpath_components[-1]
            # Subtract 1 to skip the root folder
            semantic_to_numeric[semantic_label] = i - 1
            print(f"\nProcessing {semantic_label}")

            # process files
            for filename in filenames:
                # load audio file
                file_path = Path(dirpath, filename)  # os.path.join(dirpath, filename)
                try:
                    signal, sr = librosa.load(
                        file_path  # , sr=SAMPLE_RATE, duration=DURATION
                    )
                except Exception as e:
                    print(f"Error loading {file_path}: {e}")
                    continue
                # process segments extracting mfcc and storing data
                for s in range(num_segments):
                    start_sample = num_samples_per_segment * s
                    finish_sample = start_sample + num_samples_per_segment

                    mfcc = librosa.feature.mfcc(
                        y=signal[start_sample:finish_sample],
                        sr=sr,
                        n_fft=n_fft,
                        n_mfcc=n_mfcc,
                        hop_length=hop_length,
                    )
                    mfcc = mfcc.T

                    # store mfcc for segment if it has expected length
                    if len(mfcc) == expected_num_mfcc_vectors_per_segment:
                        # can't save numpy arrays as json files
                        data["mfcc"].append(mfcc.tolist())
                        data["labels"].append(i - 1)
                        print(f"{file_path}, segment:{s+1}")
    data["mappings"] = semantic_to_numeric
    with open(json_path, "w") as fp:
        json.dump(data, fp, indent=4)

In [None]:
save_mfcc(genres_dir, "data.json", SAMPLES_PER_TRACK)

In [None]:
def load_data(dataset_path):
    """
    Loads training dataset from json file.
        :param data_path (str): Path to json file containing data
        :return X (ndarray): Inputs
        :return y (ndarray): Targets
    """

    with open(dataset_path, "r") as fp:
        data = json.load(fp)

    # convert lists to numpy arrays
    X = np.array(data["mfcc"])
    # X = np.array(data["spectrogram"])
    y = np.array(data["labels"])
    mappings = data["mappings"]
    return X, y, mappings

In [None]:
# LSTM model
class LSTMGenreModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super().__init__() # super initialises attributes of parent class
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x): #Forward tells PyTorch how to manipulate the data
        # x shape: (batch, seq_len, input_size)
        out, (hn, cn) = self.lstm(x)
        # out shape: (batch, seq_len, hidden_size)

        # Take the final output and classify
        out = self.fc(out[:, -1, :])
        # out shape: (batch, num_classes)
        return out


# Dataset
class MFCCDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        mfccs = self.X[idx]
        label = self.y[idx]
        return mfccs, label

In [None]:
X, y, mappings = load_data("data.json")
X = torch.tensor(np.array(X, dtype=np.float32)).clone().detach()
y = torch.tensor(y, dtype=torch.long).clone().detach()

In [None]:
def prepare_datasets(X, y, test_size, validation_size, shuffle=True, random_state=42):
    # create train, validation and test split
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, shuffle=shuffle, random_state=random_state
    )

    # create train/validation split
    X_train, X_val, y_train, y_val = train_test_split(
        X_train, y_train, test_size=validation_size
    )

    return X_train, y_train, X_test, y_test, X_val, y_val

In [None]:
# Hyperparameters
num_classes = 10  # number of genres
input_size = 13  # number of MFCC coefficients
hidden_size = 128
num_layers = 2
batch_size = 64
num_epochs = 50
learning_rate = 1e-3

In [None]:
X_train, y_train, X_test, y_test, X_val, labels_val = prepare_datasets(X, y, 0.25, 0.2)

# Create datasets and dataloaders
train_dataset = MFCCDataset(X_train, y_train)
test_dataset = MFCCDataset(X_test, y_test)
val_dataset = MFCCDataset(X_val, labels_val)

train_loader = DataLoader(train_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)
val_loader = DataLoader(val_dataset, batch_size=batch_size)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Train model
model = LSTMGenreModel(input_size, hidden_size, num_layers, num_classes).to(device)

sample_input = torch.zeros(1, 128, 13).to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss() # Used for our loss function

writer = SummaryWriter()

for epoch in range(num_epochs):
    model.train()
    train_loss = 0
    train_correct = 0
    for mfccs_test, labels_test in train_loader:
        # Move tensors to CUDA
        mfccs_test = mfccs_test.to(device)
        labels_test = labels_test.to(device)

        optimizer.zero_grad() # Zero out the gradients else would be a combination of old gradient
        # Forward pass
        outputs = model(mfccs_test)

        loss = criterion(outputs, labels_test)
        loss.backward() # Applies back propogation
        optimizer.step() # updating the weights

        # Track training metrics
        train_loss += loss.item()
        # Calculate accuracy for current batch
        batch_correct = (outputs.argmax(1) == labels_test).float().sum()
        # Convert to python scalar for logging
        train_correct += batch_correct.item()
    # Average training metrics
    train_loss /= len(train_loader)
    train_accuracy = train_correct / len(train_loader.dataset)  # type: ignore
    # Validation loop
    model.eval()

    val_loss = 0
    val_correct = 0

    with torch.no_grad(): # no grad is a context manager, model does not calculate gradient 
        for mfccs_val, labels_val in val_loader:
            # Move tensors to CUDA
            mfccs_val = mfccs_val.to(device)
            labels_val = labels_val.to(device)
            outputs = model(mfccs_val)
            val_loss += criterion(outputs, labels_val).item()
            val_correct += (outputs.argmax(1) == labels_val).sum().item()

    val_loss /= len(test_loader)
    val_accuracy = val_correct / len(test_loader.dataset)  # type: ignore

    # Log metrics to TensorBoard
    writer.add_scalar("Loss/Train", train_loss, epoch)
    writer.add_scalar("Loss/Valid", val_loss, epoch)

    writer.add_scalar("Accuracy/Train", train_accuracy, epoch)
    writer.add_scalar("Accuracy/Valid", val_accuracy, epoch)

    # Log FC layer weights
    writer.add_histogram("FC Weights", model.fc.weight)

    # Log RNN layer weights
    writer.add_histogram("RNN Weights", model.lstm.weight_ih_l0)

    writer.add_graph(model, sample_input)

model.to("cpu")

# Evaluation
correct = 0
total = 0
with torch.no_grad():
    for mfccs_test, labels_test in test_loader:
        outputs = model(mfccs_test)
        _, predicted = torch.max(outputs.data, 1)
        total += labels_test.size(0)
        correct += (predicted == labels_test).sum().item()
accuracy = 100 * correct / total
print(f"Accuracy: {accuracy}")

In [None]:
# Log parameters
mlflow.start_run()

mlflow.log_metric("accuracy", accuracy)
mlflow.log_param("SAMPLE_RATE", SAMPLE_RATE)
mlflow.log_param("DURATION", DURATION)
mlflow.log_param("NUM_SEGMENTS", NUM_SEGMENTS)
mlflow.log_param("SAMPLES_PER_TRACK", SAMPLES_PER_TRACK)
mlflow.log_param("num_classes", num_classes)
mlflow.log_param("input_size", input_size)
mlflow.log_param("hidden_size", hidden_size)
mlflow.log_param("num_layers", num_layers)
mlflow.log_param("batch_size", batch_size)
mlflow.log_param("num_epochs", num_epochs)
mlflow.log_param("learning_rate", learning_rate)
mlflow.log_param(
    "data_version",
    subprocess.check_output(
        ["git", "rev-parse", "HEAD"], universal_newlines=True
    ).strip(),
)

mlflow.end_run()

In [None]:
# Save the model
torch.save(model.state_dict(), 'model.pth')

In [None]:
# Load the model
pred_model = LSTMGenreModel(input_size, hidden_size, num_layers, num_classes)
pred_model.load_state_dict(torch.load('model.pth'))
pred_model.eval()
# Assuming that `data` is your new input data

In [None]:
single_data = torch.tensor(test_loader.dataset.X[0])
if len(single_data.shape) < 3:
    # Add one dimension for batch with unsqueeze() if your data is 2D (sequence length, features)
    single_data = single_data.unsqueeze(0)
    #print(single_data.shape)

with torch.no_grad():
    outputs = model(single_data)
    _, predicted = torch.max(outputs.data, 1)
    predicted_genre_name = list(mappings.keys())[list(mappings.values()).index(predicted.item())]
    actual_genre_name = list(mappings.keys())[list(mappings.values()).index(test_loader.dataset.y[0])]
    print(f"Predicted class: {predicted_genre_name} {predicted.item()}, Actual: {actual_genre_name} {test_loader.dataset.y[0]}")

In [None]:
# Assuming `true_labels` is a list of true labels and `pred_labels` is a list of predicted labels
true_labels = []
pred_labels = []

with torch.no_grad():
    for mfcc, label in test_loader:
        outputs = model(mfcc)
        _, predicted = torch.max(outputs.data, 1)
        true_labels.extend(label.tolist())
        pred_labels.extend(predicted.tolist())

# Create confusion matrix
cm = confusion_matrix(true_labels, pred_labels)

# Get genre names in the order of label numbers
genre_names = [k for k, v in sorted(mappings.items(), key=lambda item: item[1])]

# Plot confusion matrix with genre names as labels
fig, ax = plt.subplots(figsize=(10, 10))
sns.heatmap(
    cm,
    annot=True,
    fmt="d",
    cmap="Blues",
    xticklabels=genre_names,
    yticklabels=genre_names,
    ax=ax
)
plt.xlabel("Predicted")
plt.ylabel("True")

# Rotate y-axis labels
ax.set_yticklabels(ax.get_yticklabels(), rotation=0)

plt.show()

# Log the confusion matrix as an image to TensorBoard
writer.add_figure('Confusion Matrix', fig)

In [None]:
# Load the TensorBoard notebook extension
%load_ext tensorboard

# Start TensorBoard within the notebook using magics function
%tensorboard --logdir runs
