# Question A4

In this section, we will understand the utility of such a neural network in real world scenarios.

#### Please use the real record data named ‘record.wav’  as a test sample. Preprocess the data using the provided preprocessing script (data_preprocess.ipynb) and prepare the dataset.
Do a model prediction on the sample test dataset and obtain the predicted label using a threshold of 0.5. The model used is the optimized pretrained model using the selected optimal batch size and optimal number of neurons.
Find the most important features on the model prediction for the test sample using SHAP. Plot the local feature importance with a force plot and explain your observations.  (Refer to the documentation and these three useful references:
https://christophm.github.io/interpretable-ml-book/shap.html#examples-5,
https://towardsdatascience.com/deep-learning-model-interpretation-using-shap-a21786e91d16,  
https://medium.com/mlearning-ai/shap-force-plots-for-classification-d30be430e195)



1. Firstly, we import relevant libraries.

In [116]:
import tqdm
import time
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
from torch import nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

from scipy.io import wavfile as wav

from sklearn import preprocessing
from sklearn.model_selection import KFold
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, precision_score, recall_score, confusion_matrix
from common_utils import set_seed

# setting seed
set_seed()

To reduce repeated code, place your
network (MLP defined in QA1)
torch datasets (CustomDataset defined in QA1)
loss function (loss_fn defined in QA1)
in a separate file called common_utils.py

Import them into this file. You will not be repenalised for any error in QA1 here as the code in QA1 will not be remarked.

The following code cell will not be marked.


In [117]:
# YOUR CODE HERE
import librosa
import soundfile as sf

import numpy as np
import pandas as pd

import os
from os import listdir
from os.path import isfile, join

from collections import OrderedDict

import json

def extract_features(filepath):
    
    '''
    Source: https://github.com/danz1ka19/Music-Emotion-Recognition/blob/master/Feature-Extraction.py
    Modified to process a single file

        function: extract_features
        input: path to mp3 files
        output: csv file containing features extracted

        This function reads the content in a directory and for each audio file detected
        reads the file and extracts relevant features using librosa library for audio
        signal processing
    '''

    feature_set = {}  # Features

    # Reading audio file
    y, sr = librosa.load(filepath)
    S = np.abs(librosa.stft(y, n_fft=512)) 
    # https://librosa.org/doc/main/generated/librosa.stft.html (set 512 for speech processing)

    # Extracting Features
    tempo, beats = librosa.beat.beat_track(y=y, sr=sr)
    chroma_stft = librosa.feature.chroma_stft(y=y, sr=sr, n_fft=512)
    
    chroma_cq = librosa.feature.chroma_cqt(y=y, sr=sr)
    
    chroma_cens = librosa.feature.chroma_cens(y=y, sr=sr)
    melspectrogram = librosa.feature.melspectrogram(y=y, sr=sr, n_fft=512)
    rmse = librosa.feature.rms(y=y)[0]
    cent = librosa.feature.spectral_centroid(y=y, sr=sr, n_fft=512)
    spec_bw = librosa.feature.spectral_bandwidth(y=y, sr=sr, n_fft=512)
    contrast = librosa.feature.spectral_contrast(S=S, sr=sr, n_fft=512)
    rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr, n_fft=512)
    poly_features = librosa.feature.poly_features(S=S, sr=sr, n_fft=512)
    
    tonnetz = librosa.feature.tonnetz(y=y, sr=sr)
    
    zcr = librosa.feature.zero_crossing_rate(y)
    harmonic = librosa.effects.harmonic(y)
    percussive = librosa.effects.percussive(y)

    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_fft=512)
    mfcc_delta = librosa.feature.delta(mfcc)

    onset_frames = librosa.onset.onset_detect(y=y, sr=sr)
    frames_to_time = librosa.frames_to_time(onset_frames[:20], sr=sr)

    # Concatenating Features into one csv and json format
    feature_set['filename'] = filepath  # song name
    feature_set['tempo'] = tempo  # tempo 
    feature_set['total_beats'] = sum(beats)  # beats
    feature_set['average_beats'] = np.average(beats)
    feature_set['chroma_stft_mean'] = np.mean(chroma_stft)  # chroma stft
    feature_set['chroma_stft_var'] = np.var(chroma_stft)
    
    feature_set['chroma_cq_mean'] = np.mean(chroma_cq)  # chroma cq
    feature_set['chroma_cq_var'] = np.var(chroma_cq)
    
    feature_set['chroma_cens_mean'] = np.mean(chroma_cens)  # chroma cens
    feature_set['chroma_cens_var'] = np.var(chroma_cens)
    feature_set['melspectrogram_mean'] = np.mean(melspectrogram)  # melspectrogram
    feature_set['melspectrogram_var'] = np.var(melspectrogram)
    feature_set['mfcc_mean'] = np.mean(mfcc)  # mfcc
    feature_set['mfcc_var'] = np.var(mfcc)
    feature_set['mfcc_delta_mean'] = np.mean(mfcc_delta)  # mfcc delta
    feature_set['mfcc_delta_var'] = np.var(mfcc_delta)
    feature_set['rmse_mean'] = np.mean(rmse)  # rmse
    feature_set['rmse_var'] = np.var(rmse)
    feature_set['cent_mean'] = np.mean(cent)  # cent
    feature_set['cent_var'] = np.var(cent)
    feature_set['spec_bw_mean'] = np.mean(spec_bw)  # spectral bandwidth
    feature_set['spec_bw_var'] = np.var(spec_bw)
    feature_set['contrast_mean'] = np.mean(contrast)  # contrast
    feature_set['contrast_var'] = np.var(contrast)
    feature_set['rolloff_mean'] = np.mean(rolloff)  # rolloff
    feature_set['rolloff_var'] = np.mean(rolloff)
    feature_set['poly_mean'] = np.mean(poly_features)  # poly features
    feature_set['poly_var'] = np.mean(poly_features)
    
    feature_set['tonnetz_mean'] = np.mean(tonnetz)  # tonnetz
    feature_set['tonnetz_var'] = np.var(tonnetz)
    
    feature_set['zcr_mean'] = np.mean(zcr)  # zero crossing rate
    feature_set['zcr_var'] = np.var(zcr)
    feature_set['harm_mean'] = np.mean(harmonic)  # harmonic
    feature_set['harm_var'] = np.var(harmonic)
    feature_set['perc_mean'] = np.mean(percussive)  # percussive
    feature_set['perc_var'] = np.var(percussive)
    feature_set['frame_mean'] = np.mean(frames_to_time)  # frames
    feature_set['frame_var'] = np.var(frames_to_time)
    
    for ix, coeff in enumerate(mfcc):
        feature_set['mfcc' + str(ix) + '_mean'] = coeff.mean()
        feature_set['mfcc' + str(ix) + '_var'] = coeff.var()
    
    return feature_set


2. Install and import shap

In [118]:
# YOUR CODE HERE
import shap

3. Read the csv data preprocessed from 'record.wav', using variable name 'df', and fill the size of 'df' in 'size_row' and 'size_column'.

In [119]:
new_features_dict = extract_features('record.wav')
df = pd.DataFrame([new_features_dict])
df.to_csv('./new_record.csv', index=False)

# df = 0
# size_row = 0
# size_column = 0
# YOUR CODE HERE
# Use data_preprocess.ipynb? I think we use data_preprocess.ipynb here to process record.wav for using as test_data.
# Here you will use record.wav as test data, the normalization or preprocessing idea will be same with previous tasks. 
# The preprocess function defined here is for the real data 'df', and it would be better not to modify it. 

df = pd.read_csv('new_record.csv')
size_row, size_column = df.shape

 4.  Preprocess to obtain the test data, save the test data as numpy array.

In [136]:
# Are we allowed to save and load the pretrained model from A3? Otherwise, for retraining the model I believe the preprocess needs to return the scaled X_train and scaled X-test. 
# Yes, you are allowed to save the pre-trained model from A3.
# You can use the pre-trained model from the last step in Part A3. You can either save the pre-trained model in PartA 3, or retrain and save it using the similar data split with PartA 3 in the code cells of PartA 4. Then you can proceed with the prediction of 'record.wav'.
# so we do five folds cv, and use the best model from the 5 folds?, or do we just do train test split, and train it once and use that model?
# You just do train test split, and train it once and then use that model, no need k-fold cv.

# Basically, what I think it means is that you have already done the validation in A3 using the whole simplified.csv and the training in A3 so you can just use the model in A3 to do the prediction on record.wav.

# No, you need to use the optimal number of neurons found in A3 only for the first layer of the model in A4. The number of neurons for other layers can be set the same with previous tasks.
from common_utils import preprocess_dataset, split_dataset
# preprocess_dataset scales both df_train and df_test

# print(size_row)
# print(size_column)
# print(df)

class CustomDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float)
        self.y = torch.tensor(y, dtype=torch.long)

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# def preprocess(df):
#     # YOUR CODE HERE
    
#     X_train, y_train, X_test, y_test = split_dataset(df, 'filename', 0.30, 1)
#     X_train_scaled, X_test_scaled = preprocess_dataset(X_train, X_test)
    
#     return X_train_scaled, y_train, X_test_scaled, y_test

# X_train, y_train, X_test, y_test = preprocess(df)
# X_train and y_train go into intialise loaders batch

# def intialise_loaders_batch(X_train_scaled, y_train, X_test_scaled, y_test, batch_size):

# #     print("X_train_scaled in initialise loaders batch")
# #     print(len(X_train_scaled[0]))
#     train_data = BatchCustomDataset(X_train_scaled,y_train)
# #     print(len(train_data[1]))
#     test_data = BatchCustomDataset(X_test_scaled,y_test)
    
#     train_dataloader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
#     test_dataloader = DataLoader(test_data, batch_size=batch_size, shuffle=True)
    
#     return train_dataloader, test_dataloader

# just bring out the train_data and throw into preprocess

def preprocess(X_train, df):
    """preprocess your dataset to obtain your test dataset, remember to remove the 'filename' as Q1
    """
    # YOUR CODE HERE
    df = df.drop(columns=['filename'])
    X_train_scaled, X_test_scaled_eg = preprocess_dataset(X_train, df)

    # note: already a numpy array
    # X_test_scaled_eg is the input features into our model later
    return X_test_scaled_eg

# for training the model
simplified = pd.read_csv('simplified.csv')
simplified['label'] = simplified['filename'].str.split('_').str[-2]
simplified['label'].value_counts()

X_train_simplified, y_train_simplified, X_test_simplified, y_test_simplified = split_dataset(simplified, 'filename', 0.30, 1)
print(X_train_simplified)
X_train_simplified = X_train_simplified.drop(columns=['label'])
X_test_simplified = X_test_simplified.drop(columns=['label'])
X_train_scaled_simplified, X_test_scaled_simplified = preprocess_dataset(X_train_simplified, X_test_simplified)
# for training the model

X_test_scaled_eg = preprocess(X_train_scaled_simplified, df)

# print(df.drop(columns=['filename']))
# print(len(X_test_scaled_eg[0]))
# print(X_test_scaled_eg)
# print(len(X_train_scaled_simplified[0]))
# print(X_train_scaled_simplified)

# X_test_scaled_eg is to be used for testing

# print(X_test_scaled_eg)

def train(model, X_train_scaled, y_train2, X_val_scaled, y_val2, batch_size):

    # YOUR CODE HERE
    
    epochs = 100
    times = []
    train_dataloader, test_dataloader = intialise_loaders(X_train_scaled, y_train2, X_val_scaled, y_val2)
    
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    loss_fn = nn.CrossEntropyLoss()

    tr_loss, tr_correct = [], []
    te_loss, te_correct = [], []
    for t in range(epochs):
        train_loss, train_correct = train_loop(train_dataloader, model, loss_fn, optimizer)
        test_loss, test_correct = test_loop(test_dataloader, model, loss_fn)

        tr_loss.append(train_loss), tr_correct.append(train_correct)
        te_loss.append(test_loss), te_correct.append(test_correct)
        times.append(t+1)
        
        print(f"Epoch {t+1}: Train_accuracy: {(100*train_correct):>0.2f}%, Train_loss: {train_loss:>8f}, Test_accuracy: {(100*test_correct):>0.2f}%, Test_loss: {test_loss:>8f}")
        
    train_accuracies = tr_correct
    train_losses = tr_loss
    test_accuracies = te_correct
    test_losses = te_loss

    return train_accuracies, train_losses, test_accuracies, test_losses, times

def intialise_loaders(X_train_scaled, y_train, X_test_scaled, y_test):
    # YOUR CODE HERE
#     train_dataset = CustomDataset(X_train_scaled)
#     test_dataset = CustomDataset(X_test_scaled)
    
#     train_dataloader = DataLoader(train_dataset, batch_size=256, shuffle=True)
#     test_dataloader = DataLoader(test_dataset, batch_size=256, shuffle=False)

    train_data = CustomDataset(X_train_scaled,y_train)
    test_data = CustomDataset(X_test_scaled,y_test)
    
    train_dataloader = DataLoader(train_data, batch_size=1024, shuffle=True)
    test_dataloader = DataLoader(test_data, batch_size=1024, shuffle=True)
    
    return train_dataloader, test_dataloader

def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    train_loss, train_correct = 0, 0
    for batch, (X, y) in enumerate(dataloader):
        # Compute prediction and loss
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
        train_correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    
    train_loss /= num_batches
    train_correct /=size

    return train_loss, train_correct

def test_loop(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, test_correct = 0, 0

    with torch.no_grad():
        for X, y in dataloader:
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            test_correct += (pred.argmax(1) == y).type(torch.float).sum().item()

    test_loss /= num_batches
    test_correct /= size
    
    return test_loss, test_correct

class FirstHiddenLayerMLP(nn.Module):

    def __init__(self, no_features, no_hidden_first_layer, no_labels):
        super().__init__()
        self.mlp_stack = nn.Sequential(
            # YOUR CODE HERE
            nn.Linear(no_features, no_hidden_first_layer),
            nn.ReLU(),
            nn.Dropout(p=0.2),
            nn.Linear(no_hidden_first_layer, 128),
            nn.ReLU(),
            nn.Dropout(p=0.2),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Dropout(p=0.2),
            nn.Linear(128, no_labels),
            nn.Sigmoid()
        )

    # YOUR CODE HERE
    def forward(self, x):
         logits = self.mlp_stack(x)
         return logits

model = FirstHiddenLayerMLP(77,256,2)
train_accuracies, train_losses, test_accuracies, test_losses, times = train(model, X_train_scaled_simplified, y_train_simplified, X_test_scaled_simplified, y_test_simplified, 1024)

            tempo  total_beats  average_beats  chroma_stft_mean  \
5358    95.703125         1874     187.400000          0.567137   
642    103.359375          477      79.500000          0.549953   
7565    78.302557          875     125.000000          0.646271   
9584   112.347147         3430     201.764706          0.599859   
9374   198.768029         6870     214.687500          0.724747   
...           ...          ...            ...               ...   
7813   151.999081         3349     176.263158          0.591543   
10955  107.666016         3107     194.187500          0.514742   
905    161.499023        16138     375.302326          0.492115   
5192    92.285156          247      61.750000          0.526634   
235     95.703125          602      86.000000          0.500863   

       chroma_stft_var  chroma_cq_mean  chroma_cq_var  chroma_cens_mean  \
5358          0.088985        0.515726       0.076869          0.262738   
642           0.088597        0.488051       

X has feature names, but StandardScaler was fitted without feature names


Epoch 1: Train_accuracy: 53.18%, Train_loss: 0.691306, Test_accuracy: 53.70%, Test_loss: 0.688969
Epoch 2: Train_accuracy: 56.42%, Train_loss: 0.684554, Test_accuracy: 56.58%, Test_loss: 0.682408
Epoch 3: Train_accuracy: 57.58%, Train_loss: 0.677616, Test_accuracy: 57.88%, Test_loss: 0.675637
Epoch 4: Train_accuracy: 59.54%, Train_loss: 0.667388, Test_accuracy: 59.37%, Test_loss: 0.669969
Epoch 5: Train_accuracy: 60.53%, Train_loss: 0.661852, Test_accuracy: 59.51%, Test_loss: 0.668730
Epoch 6: Train_accuracy: 61.78%, Train_loss: 0.654298, Test_accuracy: 60.01%, Test_loss: 0.665563
Epoch 7: Train_accuracy: 63.56%, Train_loss: 0.644742, Test_accuracy: 61.28%, Test_loss: 0.659559
Epoch 8: Train_accuracy: 63.93%, Train_loss: 0.641845, Test_accuracy: 60.92%, Test_loss: 0.656881
Epoch 9: Train_accuracy: 65.41%, Train_loss: 0.631541, Test_accuracy: 62.30%, Test_loss: 0.654725
Epoch 10: Train_accuracy: 66.23%, Train_loss: 0.625010, Test_accuracy: 61.64%, Test_loss: 0.653578
Epoch 11: Train_acc

5. Do a model prediction on the sample test dataset and obtain the predicted label using a threshold of 0.5. The model used is the optimized pretrained model using the selected optimal batch size and optimal number of neurons. Note: Please define the variable of your final predicted label as 'pred_label'.

In [138]:
# YOUR CODE HERE
# Do model prediction on record.wav.
# remember to set the 0.5 threshold.

print(len(X_test_scaled_eg[0]))
X_test_scaled_eg = torch.tensor(X_test_scaled_eg, dtype=torch.float32)
model.eval()
with torch.no_grad():
    predictions = model(X_test_scaled_eg)
threshold = 0.5
predicted_labels = (predictions > threshold).type(torch.long)
predicted_labels_array = predicted_labels.numpy()
predicted_label = torch.max(predictions, 1)
predicted_label_array = predicted_labels.numpy()
pred_label = np.argmax(predicted_label_array)
print(pred_label)
print(predicted_labels_array)
print(predicted_label_array)
# one hot vector classifies as class on the zeroth column
# negative is 0 and positive is 1
# so it has classified the record.wav as negative

77
0
[[1 0]]
[[1 0]]


To copy construct from a tensor, it is recommended to use sourceTensor.clone().detach() or sourceTensor.clone().detach().requires_grad_(True), rather than torch.tensor(sourceTensor).


6. Find the most important features on the model prediction for your test sample using SHAP. Create an instance of the DeepSHAP which is called DeepExplainer using traianing dataset: https://shap-lrjball.readthedocs.io/en/latest/generated/shap.DeepExplainer.html.

Plot the local feature importance with a force plot and explain your observations.  (Refer to the documentation and these three useful references:
https://christophm.github.io/interpretable-ml-book/shap.html#examples-5,
https://towardsdatascience.com/deep-learning-model-interpretation-using-shap-a21786e91d16,  
https://medium.com/mlearning-ai/shap-force-plots-for-classification-d30be430e195)


In [159]:
'''
Fit the explainer on a subset of the data (you can try all but then gets slower)
Return approximate SHAP values for the model applied to the data given by X.
Plot the local feature importance with a force plot and explain your observations.
'''
# YOUR CODE HERE
# Can try shap.force_plot(..., matplotlib=True)
# This way your force plot is presented as a matplotlib image rather than an interactive element in your ipynb.


X_train_scaled_simplified = torch.tensor(X_train_scaled_simplified, dtype=torch.float)
explainer = shap.DeepExplainer(model, X_train_scaled_simplified)
shap_values = explainer.shap_values(X_train_scaled_simplified)
shap.initjs()
# assuming I expect a negative sentiment classification
shap.force_plot(explainer.expected_value[0], shap_values[0][76])

To copy construct from a tensor, it is recommended to use sourceTensor.clone().detach() or sourceTensor.clone().detach().requires_grad_(True), rather than torch.tensor(sourceTensor).


KeyboardInterrupt: 