Importing Libraries

In [1]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import os, json, math, librosa
import IPython.display as ipd
import librosa.display
import tensorflow as tf
import tensorflow.keras as keras
import random
from sklearn.metrics import confusion_matrix
from tensorflow.keras import Sequential, Model
from tensorflow.keras.layers import Conv2D, GlobalAveragePooling2D, ZeroPadding2D, Input
from sklearn.model_selection import train_test_split
from keras.callbacks import ModelCheckpoint
# from google.colab import drive
from statistics import mode
# from google.colab import files

In [None]:
drive.mount('/content/drive') #mounting drive so that we can access the dataset that has been uploaded



# Exploring, Testing and Visualizing Dataset

Loading and exploring the Dataset, testing the audio, making waveplots for the audio, generating spectrograms

In [None]:
MUSIC = '/content/drive/MyDrive/urdu_data/Genres'
music_dataset = []  
genre_target = []  
for root, dirs, files in os.walk(MUSIC):
    for name in files:
        filename = os.path.join(root, name)
        music_dataset.append(filename) #adding song name
        genre_target.append(filename.split("/")[5]) #adding song genre    

In [None]:
audio_path = music_dataset[10]
print(audio_path)
x , sr = librosa.load(audio_path) #loading the 11th file just for trial

In [None]:
plt.figure(figsize=(16, 5))
librosa.display.waveplot(x, sr=sr) 

In [None]:
X = librosa.stft(x)
Xdb = librosa.amplitude_to_db(abs(X))
plt.figure(figsize=(14, 5))
librosa.display.specshow(Xdb, sr=sr, x_axis='time', y_axis='hz')
plt.title('STFT Spectogram')
plt.colorbar()

In [None]:
file_location = audio_path
y, sr = librosa.load(file_location)
melSpec = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=128)
melSpec_dB = librosa.power_to_db(melSpec, ref=np.max)
plt.figure(figsize=(10, 5))
librosa.display.specshow(melSpec_dB, x_axis='time', y_axis='mel', sr=sr, fmax=8000)
plt.colorbar(format='%+1.0f dB')
plt.title("MelSpectrogram")
plt.tight_layout()
plt.show() #display spectrogram

# Loading MFCC of data in JSON




Initializing paths and other variables and saving the mfccs of dataset in a json using data augmentation

In [None]:
DATASET_PATH = '/content/drive/MyDrive/urdu_data/Genres'
JSON_PATH = "data.json"
SAMPLE_RATE = 22050
TRACK_DURATION = 30
SAMPLES_PER_TRACK = SAMPLE_RATE * TRACK_DURATION

In [None]:
def save_mfcc(dataset_path, json_path, num_mfcc=32, n_fft=2048, hop_length=512, num_segments=5):
    data = {
        "mapping": [],
        "labels": [],
        "mfcc": []
    }

    for i, (dirpath, dirnames, filenames) in enumerate(os.walk(dataset_path)):
        
        if dirpath is not dataset_path:
            semantic_label = dirpath.split("/")[-1]
            data["mapping"].append(semantic_label) #stores the genre label
            print("\nProcessing: {}".format(semantic_label))

            for f in filenames:

                file_path = os.path.join(dirpath, f)
                signal, sample_rate = librosa.load(file_path, sr=SAMPLE_RATE)

                #data augmentation
                for d in range(num_segments): #generating mfccs for random segments in songs

                    if semantic_label in ["qawwali", "ghazal"]:
                        # Starting is 30s after song begins
                        rnd = random.randint(SAMPLE_RATE * 30, len(signal)-SAMPLES_PER_TRACK)
                        
                    elif semantic_label == "rock":
                        # Staring is 20 seconds after song begins, will not pick a second that is after the last 30sec of song begins
                        rnd = random.randint(SAMPLE_RATE * 20, len(signal)-SAMPLES_PER_TRACK)

                        
                    else:
                        # Starting is initial beginning of song (hiphop)
                        rnd = random.randint(0, len(signal)-SAMPLES_PER_TRACK)
                        
                    start = rnd
                    finish = rnd + SAMPLES_PER_TRACK #starting second + 30secs

                    mfcc = librosa.feature.mfcc(signal[start:finish], sample_rate, n_mfcc=num_mfcc, n_fft=n_fft, hop_length=hop_length)
                    mfcc = mfcc.T

                    data["mfcc"].append(mfcc.tolist()) #converting to lst as numpy arr not stored by json file
                    data["labels"].append(i-1) #stores the genre label index
                    print("{}, segment:{}".format(file_path, d+1))
    
    #creating a json file to save all mfccs and relevant details of each song segment
    with open(json_path, "w") as fp:
        json.dump(data, fp, indent=4)

In [None]:
# save_mfcc(DATASET_PATH, JSON_PATH)



---



# Loading JSON

In [None]:
def load_data(data_path): #open and read json from given path
    with open(data_path, "r") as fp:
        data = json.load(fp)

    #save mfcc in X and relevant data in y and z
    X = np.array(data["mfcc"])
    y = np.array(data["labels"])
    z = np.array(data['mapping'])
    return X, y, z

def plot_history(history):
    fig, axs = plt.subplots(2)
    axs[0].plot(history.history["accuracy"], label="train accuracy")
    axs[0].plot(history.history["val_accuracy"], label="test accuracy")
    axs[0].set_ylabel("Accuracy")
    axs[0].legend(loc="lower right")
    axs[0].set_title("Accuracy eval")
    axs[1].plot(history.history["loss"], label="train error")
    axs[1].plot(history.history["val_loss"], label="test error")
    axs[1].set_ylabel("Error")
    axs[1].set_xlabel("Epoch")
    axs[1].legend(loc="upper right")
    axs[1].set_title("Error eval")
    plt.show()

def prepare_datasets(test_size, validation_size):
    # load data
    X, y, z = load_data(DATASET_PATH_DRIVE)

    # create train, validation and test split
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size)
    X_train, X_validation, y_train, y_validation = train_test_split(X_train, y_train, test_size=validation_size)

    # add an axis to input sets
    X_train = X_train[..., np.newaxis]
    X_validation = X_validation[..., np.newaxis]
    X_test = X_test[..., np.newaxis]

    return X_train, X_validation, X_test, y_train, y_validation, y_test, z

def predict(model, X, y):
    X = X[np.newaxis, ...] 
    prediction = model.predict(X)
    predicted_index = np.argmax(prediction, axis=1)
    target = z[y] #use actual label index to get genre
    predicted = z[predicted_index][0] #use predicted label index to get genre

    print("Target: {}, Predicted label: {}".format(target, predicted))
    return predicted, target

# Models

In [None]:
# Original Model
def build_model_BN(input_shape): #multilayer perceptron with CNN and batch normalization
    model = keras.Sequential()

    model.add(keras.layers.Conv2D(64, (3, 3), activation='relu', input_shape=input_shape))
    model.add(keras.layers.MaxPooling2D((3, 3), strides=(2, 2), padding='same'))
    model.add(keras.layers.BatchNormalization())

    model.add(keras.layers.Conv2D(32, (3, 3), activation='relu'))
    model.add(keras.layers.MaxPooling2D((3, 3), strides=(2, 2), padding='same'))
    model.add(keras.layers.BatchNormalization())
    
    model.add(keras.layers.Conv2D(32, (2, 2), activation='relu'))
    model.add(keras.layers.MaxPooling2D((2, 2), strides=(2, 2), padding='same'))
    model.add(keras.layers.BatchNormalization())

    model.add(keras.layers.Flatten())
    model.add(keras.layers.Dense(64, activation='relu'))
    
    model.add(keras.layers.Dense(4, activation='softmax'))

    return model

In [None]:
#failed attempts at other models
def build_model_GlobalPool(input_shape): #multilayer perceptron with CNN but global ave pool
    model = keras.Sequential()

    model.add(keras.layers.Conv2D(64, (3, 3), activation='relu', input_shape=input_shape))
    model.add(keras.layers.MaxPooling2D((3, 3), strides=(2, 2), padding='same'))
    model.add(keras.layers.BatchNormalization())

    model.add(keras.layers.Conv2D(64, (1, 1), activation='relu'))
    model.add(keras.layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(keras.layers.MaxPooling2D((3, 3), strides=(1, 1), padding='same'))
    model.add(keras.layers.BatchNormalization())

    model.add(keras.layers.Conv2D(64, (1, 1), activation='relu'))
    model.add(keras.layers.Conv2D(32, (2, 2), activation='relu'))
    model.add(keras.layers.MaxPooling2D((2, 2), strides=(1, 1), padding='same'))
    model.add(keras.layers.BatchNormalization())

    model.add(keras.layers.Conv2D(4, (2, 2), activation='relu'))
    model.add(keras.layers.MaxPooling2D((2, 2), strides=(1, 1), padding='same'))
    model.add(keras.layers.BatchNormalization())

    model.add(GlobalAveragePooling2D())

    return model

# Training model and Plotting results

In [None]:
DATA_PATH = "./data.json" #path of json when json created first time
DATASET_PATH_DRIVE = "/content/drive/MyDrive/urdu_data/data.json" #using json that has been uploaded to drive

In [None]:
X_train, X_validation, X_test, y_train, y_validation, y_test, z = prepare_datasets(0.2, 0.2)
input_shape = (X_train.shape[1], X_train.shape[2], 1) 

Following cell is used to implement our model with batch normalization or the model with global average pooling or with drop out

In [None]:
model = build_model_BN(input_shape)
# model = build_model_GlobalPool(input_shape)

optimiser = keras.optimizers.Adam(learning_rate=0.0001)
model.compile(optimizer=optimiser,  loss='sparse_categorical_crossentropy', metrics=['accuracy'])

model.summary()

checkpoint = ModelCheckpoint("model-{epoch:02d}-{val_accuracy:0.2f}", monitor='val_accuracy', verbose=1, save_best_only=True, save_weights_only=False, mode='auto', save_freq='epoch')

history = model.fit(X_train, y_train, validation_data=(X_validation, y_validation), batch_size=32, epochs=5000, callbacks=[checkpoint])

plot_history(history)

test_loss, test_acc = model.evaluate(X_test, y_test, verbose=2)
print('\nTest accuracy:', test_acc)

# Transfer Learning with VGG-16


Run following cell for VGG model implementation

In [None]:
# from keras.applications import VGG16
t = []
g = []
for i in X_train:
  j = i.reshape((1292, 32))
  j = np.repeat(j[:, :, np.newaxis], 3, axis=2)
  t.append(j)

for i in X_validation:
  j = i.reshape((1292, 32))
  j = np.repeat(j[:, :, np.newaxis], 3, axis=2)
  g.append(j)

t = np.array(t)
g = np.array(g)
model = keras.applications.vgg16.VGG16(include_top=False, weights = "imagenet", input_shape=(1292, 32, 3))
model.summary()
model.trainable = False

base_outputs = model.layers[-1].output

x = keras.layers.AveragePooling2D(padding="same")(base_outputs)
x = keras.layers.Flatten()(x)
x = keras.layers.Dense(256, activation='relu')(x)
final_outputs = keras.layers.Dense(4, activation = 'softmax')(x)

new_model= keras.Model(inputs=model.layers[0].input, outputs=final_outputs)
print(new_model.summary())

In [None]:
optimiser = keras.optimizers.Adam(learning_rate=0.0001)
new_model.compile(optimizer=optimiser,  loss='sparse_categorical_crossentropy', metrics=['accuracy'])

checkpoint = ModelCheckpoint("model-{epoch:02d}-{val_accuracy:0.2f}", monitor='val_accuracy', verbose=1, save_best_only=True, save_weights_only=False, mode='auto', save_freq='epoch')

history = new_model.fit(t, y_train, validation_data=(g, y_validation), batch_size=32, epochs=1000, callbacks=[checkpoint])

plot_history(history)

test_loss, test_acc = new_model.evaluate(X_test, y_test, verbose=2)
print('\nTest accuracy:', test_acc)

Saving the model with best accuracy

In [None]:
model = tf.keras.models.load_model('model-2319-0.91')
# model.save("/content/drive/MyDrive/urdu_data/Best Model")

# Tesing model on test data and unseen data

Load best accuracy model from drive

In [None]:
model = tf.keras.models.load_model("/content/drive/MyDrive/urdu_data/Best Model")

Making predictions on test data using model

In [None]:
# # tesing and printing predictions on one particular instance
# print("Predicting on the 106th song segment from test data")
# X_to_predict = X_test[105]
# y_to_predict = y_test[105]
# predict(model, X_to_predict, y_to_predict)

# testing and printing predictions on all of the test data
preds = []
targets = []
print("Predicting on all song segments from test data and saving them too...")
for i in range(len(X_test)):
  y_pred, y_act = predict(model, X_test[i], y_test[i])
  preds.append(y_pred)
  targets.append(y_act) 

Plotting a confusion matrix on test data

In [None]:
# Confusion matrix
cm = confusion_matrix(targets, preds)
# print(cm)

class_label = z #mapping in json had a list of all classes
df_cm = pd.DataFrame(cm, index = class_label, columns = class_label)
sns.heatmap(df_cm, annot = True, fmt= "d", cbar = False)
plt.title("Confusion Matrix")
plt.xlabel("Predicted Label")
plt.ylabel("Actual Label")
plt.show()
count = 0
for i in range(len(targets)):
  if targets[i] == preds[i]:
    count += 1
print(count/len(targets) * 100)

Testing model on unseen data that is not part of dataset

In [None]:
SAMPLE_RATE = 22050
TRACK_DURATION = 30
SAMPLES_PER_TRACK = SAMPLE_RATE * TRACK_DURATION #30secs

def new_song_predict(dirc, label, segments): #make prediction on new unseen data 
    preds = [] #to store predictions of various segments from song
    signal, sample_rate = librosa.load(dirc, sr=SAMPLE_RATE)
    name = dirc.split('/')[-1]

    for i in range(segments): #making 11 predictions of genre by taking 11 random segments
        
        rnd = random.randint(0, len(signal)-SAMPLES_PER_TRACK)
        start = rnd
        end = rnd + SAMPLES_PER_TRACK

        mfcc_test = librosa.feature.mfcc(signal[start:end], sample_rate, n_mfcc=32, n_fft=2048, hop_length=512)
        mfcc_test = mfcc_test.T

        mfcc_test = mfcc_test[np.newaxis, ...]
        pred = model.predict(mfcc_test)

        predic_index = np.argmax(pred, axis=1)

        z = ['ghazal', 'hiphop', 'qawwali', 'rock']
        pred = str(z[predic_index[0]]) 
        preds.append(pred)

    # print(preds)
    pred = mode(preds) #find most frequent prediction of genre in sections

    print("Track: {}, Target: {}, Predicted label: {}".format(name, label, pred))

In [None]:
new_song_predict("/content/drive/MyDrive/testing/ghazal/Humko Kisi Ke Gham Ne Maara.wav", "ghazal", 11)
new_song_predict("/content/drive/MyDrive/testing/ghazal/Dil dhadakne ka sabab yaad aaya Noor Jahan.wav", "ghazal", 11)
new_song_predict("/content/drive/MyDrive/testing/ghazal/Yoon Zindagi Ki Raah Mein.wav", "ghazal", 11)
new_song_predict("/content/drive/MyDrive/testing/rock/Alif Allah.wav", "rock", 11)
new_song_predict("/content/drive/MyDrive/testing/rock/Dosti.wav", "rock", 11)
new_song_predict("/content/drive/MyDrive/testing/rock/Meri Zindagi.wav", "rock", 11)
new_song_predict("/content/drive/MyDrive/testing/hiphop/Kabhi Kabhi.wav", "hiphop", 11)
new_song_predict("/content/drive/MyDrive/testing/hiphop/Nazar.wav", "hiphop", 11)
new_song_predict("/content/drive/MyDrive/testing/hiphop/Stunner.wav", "hiphop", 11)
new_song_predict("/content/drive/MyDrive/testing/qawwali/Allah Hoo.wav", "qawwali", 11)
new_song_predict("/content/drive/MyDrive/testing/qawwali/Teri Yaad Ibadat Meri.wav", "qawwali", 11)
new_song_predict("/content/drive/MyDrive/testing/qawwali/Naara e Haideri.wav", "qawwali", 11)