In [None]:
# Import Statements
import os
import librosa
import librosa.display
import numpy as np
from numpy import argmax
import matplotlib.pyplot as plt
import tensorflow as tf
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split
from keras.models import Sequential
from keras.optimizers import Adam
from keras.layers import LSTM
from keras.layers import Dense
from keras.layers import Conv2D
from keras.layers import LeakyReLU
from keras.layers import Concatenate
from keras.layers import Input
from keras.models import Model
from keras.layers import Embedding
import IPython.display as ipd
from scipy.io.wavfile import write
import soundfile as sf
import math

In [None]:
# Setting the seed to ensure reproducibility
from numpy.random import seed
seed(1)
import tensorflow
tensorflow.random.set_seed(2)

In [None]:
# Loading the drive
from google.colab import drive
drive.mount('/content/drive')

**Loading the Dataset (Audio files):**

In [None]:
# The location of the original dataset in the Drive

path = "/content/drive/MyDrive/genres_original"

In [None]:
def load_dataset(path):

  # All the 10 classes is stored in the variable "classes"
  classes = []
  X = []
  y = []
  num_segments = 10
  sample_rate_for_3sec = 22050 * 3

  # Audio files (30 seconds) are loaded using librosa and split as 3 second files
  for root, dir, files in os.walk(path):

    if root != path:
      classes.append(root.split("/")[-1])

      for fp in files:

        filepath = root + "/" + fp

        # There is an error with this audio file, so it has just been skipped and all other files have been read
        # The total number of 30 second audio files are 999 instead of 1000
        if filepath != "/content/drive/MyDrive/genres_original/jazz/jazz.00054.wav":

          # Librosa.load returns a time series representation of the audio signal and the sampling rate
          # Sample rate = 22,050 per second
          # Since there are 30 seconds in a audio signal, the total sample rate will be 30 * 22,050 = 661,500
          # waveform shape => (661500,) => for 30 second audio file
          waveform, sample_rate = librosa.load(filepath, duration = 30)

          # If the audio file is not 30 seconds long, we will not process it
          if waveform.shape[0] / sample_rate == 30:

            # Splitting into 3 second audio files
            for i in range(num_segments):
              start_segment = int(sample_rate_for_3sec * i)
              end_segment = int(start_segment + sample_rate_for_3sec)
              X.append(waveform[start_segment:end_segment])
              y.append(root.split("/")[-1])
  return X, y, classes

In [None]:
# X is the time series represebtation of the input audio file with a shape of => (9900, 66150)
# y is the genre of the audio file => length of 9900
# classes => length of 10
X, y_cat, classes = load_dataset(path)

In [None]:
# Encoding the categories using numbers
y = []

for i in range(len(y_cat)):
  y.append(classes.index(y_cat[i]))

**Preparing the Training, Validation and Test Set:**

In [None]:
# X shape => (9900, 66150)
X = np.array(X)
# y shape => (9900,)
y = np.array(y)

In [None]:
# trainX shape => (7128, 66150)
# valX shape => (1782, 66150)
# testX shape => (990, 66150)
trainX, testX, trainy, testy = train_test_split(X, y, test_size = 0.1)
trainX, valX, trainy, valy = train_test_split(trainX, trainy, test_size = 0.2)

**Mel-frequency cepstral coefficients Feature Extraction (MFCC):**

In [None]:
def mfcc_feature_extraction(X):
  mfcc_features = []
  hop_length = 512
  sampling_rate_per_sec = 22050
  num_segments = 10
  audio_duration = 30
  sampling_rate_for_audio = sampling_rate_per_sec * audio_duration
  num_samples_per_segment = sampling_rate_for_audio / num_segments
  f_length = math.ceil(num_samples_per_segment / hop_length)

  # For each 3 second audio file, getting 20 mfcc features each
  for i in range(len(X)):

    # Returns a numpy array with the shape as [n_mfcc, number of samples per audio file / hop length]
    # (number of samples per audio file / hop length) => gives the number of frames that the FFT is applied to
    # The shape of f => (20, 130)
    f = librosa.feature.mfcc(y = X[i], sr = sampling_rate_per_sec, n_mfcc = 20, n_fft = 2048, hop_length = 512)
    f = f.T

    if f.shape[0] == f_length:
      mfcc_features.append(f.tolist())
  return np.array(mfcc_features)

In [None]:
# X_train shape => (7128, 130, 20)
X_train = mfcc_feature_extraction(trainX)
y_train = trainy

In [None]:
# X_val shape => (1782, 130, 20)
X_val = mfcc_feature_extraction(valX)
y_val = valy

In [None]:
# X_val shape => (990, 130, 20)
X_test = mfcc_feature_extraction(testX)
y_test = testy

# **Network 1:**

In [None]:
# Defining an LSTM model using keras

model1 = Sequential()

model1.add(LSTM(128, input_shape=(X_train.shape[1:]), return_sequences=True))

model1.add(LSTM(128, activation = "relu"))

model1.add(Dense(64, activation = "relu"))

model1.add(Dense(10, activation = "softmax"))

optimiser = tf.keras.optimizers.Adam(learning_rate = 0.0001)
model1.compile(optimizer = optimiser, loss = "SparseCategoricalCrossentropy", metrics = ["accuracy"])

In [None]:
history = model1.fit(X_train, y_train, validation_data = (X_val, y_val), batch_size = 32, epochs = 50)

**Visualisation of the Loss and Accuracy of the model:**

In [None]:
def plots(history):

  # Accuracy
  train_accuracy = history.history['accuracy']
  validation_accuracy = history.history['val_accuracy']

  # Loss
  train_loss = history.history['loss']
  val_loss = history.history['val_loss']

  epochs = []
  for i in range(1, len(train_accuracy)+1):
    epochs.append(i)

  # Plotting the Accuracy
  plt.plot(epochs, train_accuracy, label = "Training Accuracy")
  plt.plot(epochs, validation_accuracy, label = "Validation Accuracy")
  plt.xlabel("Epochs")
  plt.xlabel("Accuracy")
  plt.legend()
  plt.show()
  print()
  print()

  # Plotting the Loss
  plt.plot(epochs, train_loss, label = "Training Loss")
  plt.plot(epochs, val_loss, label = "Validation Loss")
  plt.xlabel("Epochs")
  plt.xlabel("Loss")
  plt.legend()
  plt.show()

In [None]:
plots(history)

In [None]:
# Testing the performance of the model
score1 = model1.evaluate(X_test, y_test)
print("Test Loss = ", score1[0])
print("Test Accuracy = ", score1[1])

# **Network 2:**

## **1. Conditional Generative Adversarial Network (CGAN) for augementing audio samples**

**1.1. Defining the Discriminator:**

In [None]:
# The Discriminator for Conditional GAN's will learn on both input and labels
# The output of the Discriminator => (32, 1); labelling whether the input is true or false
def discriminator(in_shape = 66150):

    embedding_dimension = 16
    num_classes = 10

    # Defining the input audio and passing it to the model
    model = Sequential()
    # The input of this layer will be 32 audio files + their labels
    # Each of these audio files will have a duration of 3 seconds = 66150 samples for 3 seconds
    # The label have a shape => (32, 16)
    # Input shape = (32, 66166)
    model.add(Dense(20))
    model.add(LeakyReLU(alpha = 0.1))
    # Output Shape = (32, 20)
    # Input shape = (32, 20)
    model.add(Dense(20))
    model.add(LeakyReLU(alpha = 0.1))
    # Output shape = (32, 20)
    # Input shape = (32, 20)
    model.add(Dense(20))
    model.add(LeakyReLU(alpha = 0.1))
    # Output shape = (32, 20)
    # Input shape = (32, 20)
    model.add(Dense(1, activation = "sigmoid"))
    # Output shape = (32, 1)

    # Audio input of shape => (32, 66150)
    audio_input = Input(shape = in_shape)

    # Label input of shape => (32, 10)
    label = Input(shape=(num_classes,), dtype='int32')

    # Defining the labels
    # Input shape = (32, 10)
    label_embedding = Dense(16)(label)
    # Output shape = (32, 16)

    # The Discriminator will take both the audio and labels as input
    # audio_input => (32, 66150)
    # label_embedding => (32, 16)
    # concat shape => (32, 66166)
    concat = Concatenate(axis=-1)([audio_input, label_embedding])

    pred = model(concat)

    return Model([audio_input, label], pred)

**1.2. Defining the Generator:**

In [None]:
# Output of the generator will be (batch_size, number of samples for a 3 second audio file) => (32, 66150)
def generator(latent_dimension):

    embedding_dimension = 16
    num_classes = 10

    # Defining the generator
    model = Sequential()
    # Input will be noise and label
    # Noise shape => (32, 64)
    # Label shape => (32, 16)
    # Input to model => (32, 80)
    model.add(Dense(20, input_shape = (latent_dimension + embedding_dimension, )))
    model.add(LeakyReLU(alpha = 0.1))
    # Output shape => (32, 20)
    # Input shape => (32, 20)
    model.add(Dense(20))
    model.add(LeakyReLU(alpha = 0.1))
    # Output shape => (32, 20)
    # Input shape => (32, 20)
    model.add(Dense(20))
    model.add(LeakyReLU(alpha = 0.1))
    # Output shape => (32, 20)
    # Input shape => (32, 20)
    model.add(Dense(66150))
    # Output shape => (32, 66150)

    # Noise
    noise = Input(shape=(latent_dimension, ))

    # Label input of shape => (32, 10)
    label = Input(shape=(num_classes,), dtype='int32')

    # Defining the labels
    # Input shape = (32, 10)
    label_embedding = Dense(16)(label)
    # Output shape = (32, 16)

    # noise (32, 64) + label (32, 16) = output (32, 80)
    concat = Concatenate()([noise, label_embedding])

    pred = model(concat)

    # This is given as the input to the model
    return Model([noise, label], pred)

**1.3. Building the Model:**

In [None]:
latent_dimension = 64

# Build the discriminator
dis_model = discriminator()
dis_model.compile(loss = "binary_crossentropy", metrics = ["accuracy"], optimizer = Adam())

# Build the generator
gen_model = generator(latent_dimension)

# Input to the Generator
# 1. Labels
gen_label = Input(shape = (10, ))
# 2. Noise
gen_input = Input(shape = (latent_dimension, ))

# Output of the discriminator
gen_pred = gen_model([gen_input, gen_label])

# The discriminator is not being trained here
dis_model.trainable = False

# The output of the generator is given as the input to the discriminator
dis_model_pred = dis_model([gen_pred, gen_label])

# When building a combined GAN model, we want the generator to produce audio to trick the discriminator
# So the Discriminator is not trained for the Combined GAN model
# And the generator will be trained and the weights will be updated
c_gan = Model([gen_input, gen_label], dis_model_pred)
c_gan.compile(loss = "binary_crossentropy", optimizer = Adam())

**1.4. Training the GAN:**

In [None]:
loss = []
accuracy = []
X = trainX
new_y = []
for val in trainy:
  new_y.append(classes[val])

In [None]:
# Using one hot encoding for the labels passed to train the GAN
l_encoder = LabelEncoder()
int_encoder = l_encoder.fit_transform(new_y)
onehot_encoder = OneHotEncoder(sparse=False)
Y = onehot_encoder.fit_transform(int_encoder.reshape(len(int_encoder), 1))

In [None]:
def training(input_features, batch_size, epochs, out_features, discriminator, generator, latent_dimension):

    batches_per_epoch = int(X.shape[0] / batch_size)

    for epoch in range(epochs):
        for batch_num in range(batches_per_epoch):

            # Training the Discriminator
            # Trained on both real and fake data
            # Real data with shape => (32, 66150), where 32 is the batch size
            X_real_train = X[batch_num * batch_size:(batch_num * batch_size) + batch_size, :]
            # Shape => (32, 10)
            y_real_train_labels = Y[batch_num * batch_size:(batch_num * batch_size) + batch_size, :]

            # The fake data is produced by the generator
            noise = np.random.normal(loc = 0, scale = 1, size = (batch_size, latent_dimension))
            # Shape => (32, 66150) => fake audio
            X_fake_train = gen_model.predict([noise, y_real_train_labels])

            # Since we are trying to fool the Discriminator, the fake data will also be given a label of 1
            real_label = np.ones((X_real_train.shape[0], 1))
            fake_label = np.ones((X_real_train.shape[0], 1))

            # Training the discriminator
            dis_real_loss = dis_model.train_on_batch([X_real_train, y_real_train_labels], np.ones((batch_size, 1)))
            dis_fake_loss = dis_model.train_on_batch([X_fake_train, y_real_train_labels], np.zeros((batch_size, 1)))
            dis_loss = 0.5 * np.add(dis_real_loss, dis_fake_loss)

            # Training the Generator, which means using the combined model, where the Discriminator is not training
            noise = np.random.normal(loc = 0, scale = 1, size = (batch_size, latent_dimension))
            cgan_loss = c_gan.train_on_batch([noise, y_real_train_labels], real_label)

            loss.append((dis_loss[0], cgan_loss))
            accuracy.append(dis_loss[1])
            print("Epoch: ", epoch, " ; Batch No: ", batch_num, "  ; d_loss: ", dis_loss[0], " ; dis_acc: ", 100 * dis_loss[1], " ; gan_loss: ", cgan_loss)

In [None]:
training(input_features = X.shape[1],
         batch_size = 32,
         epochs = 50,
         out_features = X.shape[1],
         discriminator = dis_model,
         generator = gen_model,
         latent_dimension = latent_dimension
        )

**1.5. Visualising the Generator and Discriminator Loss:**

In [None]:
# length should be 50
dis_loss_epoch = []
gen_loss_epoch = []
for i in range(221, len(loss), 221):
  dis_loss_epoch.append(loss[i][0])
  gen_loss_epoch.append(loss[i][1])

epochs = []
for i in range(50):
  epochs.append(i)

In [None]:
plt.plot(epochs, dis_loss_epoch, label = "Discriminator Loss")
plt.plot(epochs, gen_loss_epoch, label = "Generator Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.title("Loss per Epoch")
plt.legend()
plt.show()

**1.6. Saving the audio files to the drive:**

In [None]:
# Changing the current working directory to the location on the drive
%cd "/content/drive/MyDrive/"

In [None]:
# Creating the directories
!mkdir -p Augmented_Data

In [None]:
# Changing the current working directory to the location of the new folder
%cd "/content/drive/MyDrive/Augmented_Data"

In [None]:
# Creating the sub-directories for each class
!mkdir -p blues
!mkdir -p classical
!mkdir -p country
!mkdir -p disco
!mkdir -p hiphop
!mkdir -p jazz
!mkdir -p metal
!mkdir -p pop
!mkdir -p reggae
!mkdir -p rock

In [None]:
# Creating fake audio samples (duration = 3 seconds) and putting them in a separate folder
label_vectors = np.identity(10)
dir_path = "/content/drive/MyDrive/Augmented_Data"

for i in range(label_vectors.shape[0]):
  inverted = l_encoder.inverse_transform([argmax(label_vectors[i])])
  label = inverted[0]
  dir_name = dir_path + "/" + label + "/" + label

  for j in range(1000):
    name = dir_name + "." + str(j) + ".wav"
    noise = np.random.normal(loc = 0, scale = 1, size = (1, 64))
    audio_time_series = gen_model.predict([noise, (label_vectors[i]).reshape(1, -1)])
    sf.write(name, np.ravel(audio_time_series), 22050, format = "wav", subtype = "PCM_16")

In [None]:
# Changing the current working directory back to the original
!cd "/content"

## **2. Building Network 2:**

In [None]:
# Path to the augemented dataset
path1 = "/content/drive/MyDrive/Augmented_Data"

In [None]:
X_train = list(X_train)
y_train = list(y_train)

**2.1. Reading the new 3 second audio files generated by the GAN and extracting the features:**

In [None]:
hop_length = 512
sampling_rate_per_sec = 22050
num_segments = 10
audio_duration = 30
sampling_rate_for_audio = sampling_rate_per_sec * audio_duration
num_samples_per_segment = sampling_rate_for_audio / num_segments
f_length = math.ceil(num_samples_per_segment / hop_length)

In [None]:
for rootdir, subdir, files in os.walk(path1):
  if rootdir != path1:

    genre = rootdir.split("/")[-1]
    label = classes.index(genre)

    for f in files:
      filepath = os.path.join(rootdir, f)
      waveform, sample_rate = librosa.load(filepath)

      # Returns a numpy array with the shape as [n_mfcc, ]
      # number of samples per second / hop length => which gives the number of frames that the FFT is applied to
      f = librosa.feature.mfcc(y = waveform, sr = sampling_rate_per_sec, n_mfcc = 20, n_fft = 2048, hop_length = hop_length)
      f = f.T

      if f.shape[0] == f_length:
        X_train.append(f.tolist())
        y_train.append(label)

In [None]:
# X_train shape =>(17128, 130, 20)
X_train = np.array(X_train)
# y_train shape => (17128,)
y_train = np.array(y_train)

**2.2. LSTM with augmented Data:**

In [None]:
model2 = Sequential()

model2.add(LSTM(128, input_shape=(X_train.shape[1:]), return_sequences=True))

model2.add(LSTM(128, activation='relu'))

model2.add(Dense(64, activation='relu'))

model2.add(Dense(10, activation='softmax'))

optimiser = tf.keras.optimizers.Adam(learning_rate=0.0001)
model2.compile(optimizer=optimiser, loss='SparseCategoricalCrossentropy', metrics=['accuracy'])

In [None]:
history = model2.fit(X_train, y_train, validation_data = (X_val, y_val), batch_size = 64, epochs = 50)

**2.3. Visualisation of the Accuracy and the Loss:**

In [None]:
plots(history)

In [None]:
# Testing the performance of the model
score2 = model2.evaluate(X_test, y_test)
print("Test Loss = ", score2[0])
print("Test Accuracy = ", score2[1])