In [None]:
import os
import torch
import numpy as np
import torchvision as tv
from torch.utils.data import DataLoader,random_split,Dataset,TensorDataset
import torch.nn as nn
from torchvision import transforms,datasets
import torch.optim as optim
from torchmetrics import ConfusionMatrix
import matplotlib.pyplot as plt
import seaborn as sb
from torchmetrics.classification import MulticlassAccuracy
from torchvision.transforms import Resize,ToTensor,Compose
import librosa, IPython
import librosa.display as lplt
import random
from tqdm import tqdm
import torchaudio
from torchaudio.transforms import Resample
import json
import math
from sklearn.metrics import confusion_matrix



## Audio Classification using LSTM

In [None]:
data_path = '/kaggle/input/gtzan-dataset-music-genre-classification/Data/genres_original'
genres = ['blues', 'classical', 'country', 'disco', 'hiphop', 'jazz', 'metal', 'pop', 'reggae', 'rock']

# define dataset split
training_size = 0.7
validation_size = 0.2
testing_size = 0.1
batch_size = 64

# define epochs
epochs = 50

# define seed for reproducibility 
seed=7
random.seed(seed)
torch.manual_seed(seed)
device = "cuda"

# get sample rate and audio length
audio_example = '/kaggle/input/gtzan-dataset-music-genre-classification/Data/genres_original/country/country.00000.wav'
audio_data, sr = librosa.load(audio_example)
sample_num = len(audio_data)

aud_length = sample_num / sr
print("Sample rate:",sr)
print("Audio length:",aud_length)
print("Sequence length:",sample_num)

# extract MFCCs with coefficient of 13
mfccs = librosa.feature.mfcc(y=audio_data, sr=sr, n_mfcc=13)


In [None]:

music_array = [] 
genres = [] 
for root, dirs, files in os.walk(data_path):
    for name in files:
        filename = os.path.join(root, name)
        # skip corrupt file for processing
        if filename != '/data/genres_original/jazz/jazz.00054.wav':
            music_array.append(filename)
            genres.append(filename.split("/")[5])
            
        

### Feature extraction

In [None]:
# define file path for extarcted feature dictionary
fe_file="/kaggle/working/fe.json"
samples = aud_length*sr

# function for feature extraction
def feature_extraction(data_path, fe_file, num_mfcc=13, n_fft=2048, hop_length=512, num_segments=5):
    
     # dictionary to store mfccs, target labels, and corresponding mappings
    audio_info = {
        "mapping": [],
        "labels": [],
        "mfcc": []
    }
    
    samples_per_seg = int(samples / num_segments)
    mfcc_vectors_per_seg = math.ceil(samples_per_seg / hop_length)

    
    # loop through all genre sub-folder
    for i, (dirpath, dirnames, filenames) in enumerate(os.walk(data_path)):

        if dirpath is not data_path:
            
            # append genre label
            genre_label = dirpath.split("/")[-1]
            audio_info["mapping"].append(genre_label)

            # process all audio files in genre sub-dir
            for f in filenames:

            # load audio file

                file_path = os.path.join(dirpath, f)
                
            # skip corrupt file jazz.00054
                if (file_path != '/kaggle/input/gtzan-dataset-music-genre-classification/Data/genres_original/jazz/jazz.00054.wav') and (file_path !='/kaggle/working/final/jazz/jazz.00054.wav'):

                    signal, sample_rate = librosa.load(file_path, sr=sr)
                
                
                    # process audio segments (split into 5) of audio file
                    for s in range(num_segments):

                        # find start and finish sample of current audio segment
                        start = samples_per_seg * s
                        finish = start + samples_per_seg

                        # extract mfccs and transpose
                        mfcc = librosa.feature.mfcc(y = signal[start:finish], sr=sample_rate, n_mfcc=num_mfcc, n_fft=n_fft, hop_length=hop_length)
                        mfcc = mfcc.T

                        # if mfcc feature length has expected number of vectors, append to dictionary
                        if len(mfcc) == mfcc_vectors_per_seg:
                            audio_info["mfcc"].append(mfcc.tolist())
                            audio_info["labels"].append(i-1)

    # save extarcted mfccs to defined json file
    with open(fe_file, "w") as fp:
        json.dump(audio_info, fp, indent=4)
        
#     torch.save(data,fe_file)

feature_extraction(data_path, fe_file, num_segments=5)


### Data Loading

In [None]:
# json_path = "/kaggle/working/fe.json"


def load_fe_data(json_path):
   
    with open(json_path, "r") as fp:
        fe_data = json.load(fp)

    X = np.array(fe_data["mfcc"])
    y = np.array(fe_data["labels"])
    z = np.array(fe_data['mapping'])
    return X, y, z


In [None]:
X,y,z = load_fe_data('/kaggle/working/fe.json')

sample_num = len(X)

train_samples = int(training_size * sample_num)
val_samples = int(validation_size * sample_num)
test_samples = sample_num - train_samples - val_samples

# convert numpy arrays to tensors for dataset
ds = TensorDataset(torch.from_numpy(X).unsqueeze(1), torch.from_numpy(y))

# split data as defined
train_data, val_data, test_data = random_split(ds, [train_samples, val_samples, test_samples])

# load data using DataLoader with batch size 64
train_loader = DataLoader(train_data, batch_size=64, shuffle=True)
val_loader = DataLoader(val_data, batch_size=64, shuffle=True)
test_loader = DataLoader(test_data, batch_size=64, shuffle=False)

# shape of mfccs
print("MFCCs shape:", mfccs.shape)

### RNN with LSTM

In [None]:
class RNN(nn.Module):
    
    def __init__(self, input_size, hidden_size, num_layers,seq_length):
        super(RNN, self).__init__()
        # define network layers
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True,dropout=0.8)
        self.fc = nn.Linear(216 * hidden_size, 10)
        self.dropout= nn.Dropout(0.8)
        
    def forward(self, x):
        
        # set the initial states of hidden and cell states and move to gpu
        hidden_1 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        cell_1 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)

        # forward pass through LSTM
        output, _ = self.lstm(x, (hidden_1, cell_1))
        
        # flatten output to feed to linear layer
        output = output.reshape(output.shape[0], -1)
        
        # out = self.dropout(out)
        
        #final output through linear layer
        output = self.fc(output)
        return output    

In [None]:
# define parameters
input_size =  mfccs.shape[0]
hidden_size =256
num_layers=3
seq_length= 663300

# instantiate RNN model
model = RNN(input_size, hidden_size, num_layers,seq_length).to(device)

# define loss function 
loss_fn = nn.CrossEntropyLoss()

# define optimiser with learning rate 0.0001 and weight decay for regularisation
optimizer = optim.Adam(model.parameters(), lr=0.0001,weight_decay=0.1)

# training loop
for epoch in range(epochs):
    for batch_idx, (data, targets) in enumerate( tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}")):
        
        if data is None:
            continue 
            
        data = data.to(device=device).squeeze(1)
        targets = targets.to(device=device)
        data = data.float()

        # forward pass
        scores = model(data)
        
        # get loss value
        loss = loss_fn(scores, targets)

        # backward pass
        optimizer.zero_grad()
        loss.backward()
        
        optimizer.step()

        
# define confusion matrix for 10 class output
conmat=ConfusionMatrix(task='multiclass',num_classes=10).cuda()

# calculate accuracy on given model
def get_accuracy(loader, model):
    correct = 0
    num_samples = 0

    # set to evaluate model
    model.eval()

    with torch.no_grad():
        for x, y in loader:
            x = x.to(device=device).squeeze(1)
            y = y.to(device=device)
            x=x.float()
            scores = model(x)
            _, predictions = scores.max(1)
            correct += (predictions == y).sum()
            num_samples += predictions.size(0)
            
            #update confusion matrix values
            conmat.update(scores,y)

    # Toggle model back to train
    model.train()
    return correct / num_samples

# output accuracy
print(f"Training set accuracy: {get_accuracy(train_loader, model)*100:2f}")
print(f"Testing set accuracy: {get_accuracy(test_loader, model)*100:.2f}")


In [None]:
# produce confusion matrix heatmap
x=conmat.compute().cpu().numpy()

# normalise raw values to convert to percentages
conmat_normed = x.astype('float') / x.sum(axis=1)[:, np.newaxis]
conmat_per = np.round(conmat_normed * 100, 2)

# plot confusion matrix heatmap
plt.figure(figsize=(10, 8))
sb.heatmap(conmat_per, annot=True, cmap='rocket', fmt='g')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.show()
