# **Data augmentation research** #

In [None]:
import librosa
import soundfile as sf
import os
import numpy as np
from sklearn.preprocessing import StandardScaler
from tensorflow import keras
import shutil
import cv2
import matplotlib.pyplot as plt
import multiprocessing
import concurrent.futures
from time import perf_counter

**The following cell is for clearing the output directory from kaggle; run it only if needed**

In [None]:
# Specify the path to the output directory
output_directory = "/kaggle/working/"

# Iterate through the files and subdirectories in the output directory
for item in os.listdir(output_directory):
    item_path = os.path.join(output_directory, item)
    if os.path.isfile(item_path):
        os.remove(item_path)
    elif os.path.isdir(item_path):
        shutil.rmtree(item_path)

print("Output directory cleared.")

### Create distinct dataset
No overlap in songs with augmentated variants both in the train and test set

NOTE: Code for generating N Segments per song was lost during an unfortunate incident. This method now assumes you want to make full spectrograms of the 30 seconds. This because the project had to proceed and after doing the experiments we had the results and wanted to continue with the full dataset anyway.

In [None]:
def prepare_dataset(base_path, test_size=10, validation_size=10):
    """
    Prepares the dataset by selecting songs for each set (train, test, validation)
    without moving or copying files.
    """
    dataset_info = {'train': [], 'test': [], 'validation': []}

    for genre_folder in os.listdir(base_path):
        genre_path = os.path.join(base_path, genre_folder)
        if os.path.isdir(genre_path):
            all_songs = os.listdir(genre_path)
            np.random.shuffle(all_songs)

            test_songs = all_songs[:test_size]
            validation_songs = all_songs[test_size:test_size + validation_size]
            train_songs = all_songs[test_size + validation_size:]

            for song in test_songs:
                dataset_info['test'].append((os.path.join(genre_path, song), genre_folder))
            for song in validation_songs:
                dataset_info['validation'].append((os.path.join(genre_path, song), genre_folder))
            for song in train_songs:
                dataset_info['train'].append((os.path.join(genre_path, song), genre_folder))

    return dataset_info

def generate_spectrogram(file_path, genre, output_path):
    """
    Generates and saves a spectrogram for a given audio file.
    """
    y, sr = librosa.load(file_path)
    spectrogram = librosa.feature.melspectrogram(y=y, sr=sr)
    spectrogram_db = librosa.power_to_db(spectrogram, ref=np.max)

#     plt.figure(figsize=(10, 4))
    librosa.display.specshow(spectrogram_db)
    
    song_name = os.path.basename(file_path)
    # Split the string on the period (.)
    split_words = song_name.split('.')
    # Take the first two words and concatenate them with an underscore
    song_name = '.'.join(split_words[:2])
    
    output_genre_path = os.path.join(output_path, genre)
    if not os.path.exists(output_genre_path):
        os.makedirs(output_genre_path)
    plt.savefig(os.path.join(output_genre_path, f'{song_name}.png'))
    plt.close()

def process_songs_parallel(dataset_info, output_path):
    """
    Processes the songs in parallel using multiprocessing to generate spectrograms.
    """
    with multiprocessing.Pool() as pool:
        for set_name, songs_info in dataset_info.items():
            for song_path, genre in songs_info:
                if set_name == "train":
                    continue
                else:
                    pool.apply_async(generate_spectrogram, args=(song_path, genre, os.path.join(output_path, set_name)))
        pool.close()
        pool.join()

# Usage:
base_path = "/kaggle/input/gtzan-dataset-music-genre-classification/Data/genres_original"
spectrogram_output_path = "/kaggle/working/spectrograms_computed"
dataset_info = prepare_dataset(base_path)
process_songs_parallel(dataset_info, spectrogram_output_path)

### Create train set
generate the spectrograms of te train set audio

In [None]:
'Define the function to compute and save the spectograms'

def spectrogram_creation(audio_path, song_name, spectrogram_save_path, num_augment=3):
    """
    Create a spectrogram of a song. This will also create augmented versions of your dataset when "num_segment" > 1
    """
    # Load audio file
    y, sr = librosa.load(audio_path)

    # Create Figure and Axes objects
#     fig, ax = plt.subplots(figsize=(10, 4))

    for i in range(num_augment):
        
        # Apply augmentation with a certain probability
        if i > 0:
            spectrogram_save_path = spectrogram_save_path + f"/Aug_"
            y = apply_random_augmentation(y, sr)
        else:
            spectrogram_save_path = spectrogram_save_path + f"/"

        # Compute the spectrogram and convert to dB
        spectrogram = librosa.feature.melspectrogram(y=y, sr=sr)
        spectrogram_db = librosa.power_to_db(spectrogram, ref=np.max)

        # Plot the spectrogram on the existing Axes
        librosa.display.specshow(spectrogram_db)

        # Save the figure to the file
        plt.savefig(f'{spectrogram_save_path}-{song_name}.png')

        # Clear the existing plot for the next iteration
#         ax.cla()

    # Close the Figure to release resources
    plt.close()

# Choose a random data augmentation method to apply on the signal
def apply_random_augmentation(signal, sr):
    # Randomly choose an augmentation function or return the original signal
    augmentation_functions = [add_white_noise, pitch_scale, random_gain]
    chosen_function = np.random.choice(augmentation_functions)

    # Apply the chosen augmentation function
    augmented_signal = chosen_function(signal, sr)

    return augmented_signal

def add_white_noise(signal, sr, noise_percentage_factor=0.005):
    noise = np.random.normal(0, signal.std(), signal.size)
    augmented_signal = signal + noise * noise_percentage_factor
    return augmented_signal

def pitch_scale(signal, sr, num_semitones=2):
    return librosa.effects.pitch_shift(y=signal, sr=sr, n_steps=num_semitones)

def random_gain(signal, sr, min_factor=0.8, max_factor=1.2):
    gain_rate = np.random.uniform(min_factor, max_factor)
    augmented_signal = signal * gain_rate
    return augmented_signal


## Code to actually process the train set 
Calls the parallelized routines to convert audio files to spectrograms and create the augmentations if needed (based on 30 second full clips)

In [None]:
def process_genre_group(genre_group, output_base_path):
    """
    Process a group of songs of the same genre.
    :param genre_group: List of tuples (song_path, genre).
    :param output_base_path: Base output path for saving spectrograms.
    """
    for song_path, genre in genre_group:
        worker(song_path, genre, output_base_path)

def worker(song_path, genre, output_base_path):
    song_name = os.path.basename(song_path)
    # Split the string on the period (.)
    split_words = song_name.split('.')
    # Take the first two words and concatenate them with an underscore
    song_name = '.'.join(split_words[:2])
    
    genre_folder_path = os.path.join(output_base_path, 'train', genre)
    if not os.path.exists(genre_folder_path):
        os.makedirs(genre_folder_path)

    spectrogram_creation(song_path, song_name, genre_folder_path)

def process_training_set_parallel(training_songs_info, output_path):
    """
    Processes the training set songs in parallel using multiprocessing,
    with each genre processed in a different process.

    :param training_songs_info: List of tuples containing song paths and their genres.
    :param output_path: Path where the training set spectrograms will be saved.
    """
    training_folder_path = os.path.join(output_path, 'train')
    if not os.path.exists(training_folder_path):
        os.makedirs(training_folder_path)

    # Group songs by genre
    genre_groups = {}
    for song_path, genre in training_songs_info:
        if genre not in genre_groups:
            genre_groups[genre] = []
        genre_groups[genre].append((song_path, genre))
        
#     print(genre_groups)

    # Process each genre group in a separate process
    with multiprocessing.Pool() as pool:
        for genre_group in genre_groups.values():
#             print(genre_group)
            pool.apply_async(process_genre_group, args=(genre_group, output_path))
        pool.close()
        pool.join()
        
        
# Assuming you have the dataset_info dictionary from prepare_dataset function
training_songs_info = dataset_info['train']
process_training_set_parallel(training_songs_info, spectrogram_output_path)

One audios from the dataset is not used: 

* one audio that cannot be loaded neither with librosa or soundfile libraries.


**Check if the number of files from each directory is Correct**

In [None]:
# Define the parent directory you want to analyze
parent_directory = '/kaggle/working/spectrograms_computed/train'
# parent_directory = "/kaggle/input/gtzan-dataset-music-genre-classification/Data/images_original"

# Create an empty dictionary to store directory names and file counts
directory_file_counts = {}

# Walk through the parent directory and count files in each subdirectory
for root, dirs, files in os.walk(parent_directory):
    # Count the files in the current directory
    file_count = len(files)
    
    # Store the directory name and file count in the dictionary
    directory_file_counts[root] = file_count

# Print the directory names and their respective file counts
for directory, file_count in directory_file_counts.items():
    print(f"Directory: {directory}, File Count: {file_count}")

In [None]:
classes = [a for a in os.listdir('/kaggle/input/gtzan-dataset-music-genre-classification/Data/images_original') if '.' not in a]
print(classes)

In [None]:
def process_dataset(folder_path, classes, img_size=256):
    """
    Processes images in the given folder into a dataset.

    :param folder_path: Path to the folder containing images.
    :param classes: List of class names.
    :param img_size: Size to which images are resized.
    :return: Tuple of numpy arrays (features, labels).
    """
    dataset = []

    for label in classes:
        class_path = os.path.join(folder_path, label)
        class_num = classes.index(label)

        for img in os.listdir(class_path):
            try:
                img_arr = cv2.imread(os.path.join(class_path, img))[...,::-1]  # Convert BGR to RGB
                resized_arr = cv2.resize(img_arr, (img_size, img_size))  # Reshape images
                dataset.append([resized_arr, class_num])
            except Exception as e:
                print(f"Error processing {img}: {e}")

    features, labels = zip(*dataset)
    return np.array(features), np.array(labels)

base_directory = "/kaggle/working/spectrograms_computed"

# Process each dataset
x_train, y_train = process_dataset(os.path.join(base_directory, 'train'), classes)
x_test, y_test = process_dataset(os.path.join(base_directory, 'test'), classes)
x_valid, y_valid = process_dataset(os.path.join(base_directory, 'validation'), classes)
print("Done...")


In [None]:
x_train = x_train / 255
x_test = x_test / 255
x_valid = x_valid / 255
print(x_train.shape, y_train.shape)
print(x_test.shape, y_test.shape)
print(x_valid.shape, y_valid.shape)

# **Transfer learning part** #

In [None]:
import tensorflow as tf
from tensorflow.keras import applications, layers, models, regularizers
from tensorflow.keras.optimizers import Adam
from keras.applications.vgg16 import VGG16
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from sklearn.model_selection import train_test_split
from keras.models import Sequential
from keras.layers import Dense, Conv2D , MaxPool2D , Flatten , Dropout
from tensorflow.keras.callbacks import EarlyStopping

In [None]:
model = Sequential()
model.add(Conv2D(32,3,padding="same", activation=tf.keras.layers.LeakyReLU(alpha=0.01), input_shape=(256,256,3), kernel_regularizer='l2'))
model.add(MaxPool2D())

model.add(Conv2D(32, 3, padding="same", activation=tf.keras.layers.LeakyReLU(alpha=0.01), kernel_regularizer='l2'))
model.add(MaxPool2D())

model.add(Conv2D(64, 3, padding="same", activation=tf.keras.layers.LeakyReLU(alpha=0.01), kernel_regularizer='l2'))
model.add(MaxPool2D())

model.add(Conv2D(64, 3, padding="same", activation=tf.keras.layers.LeakyReLU(alpha=0.01), kernel_regularizer='l2'))
model.add(MaxPool2D())

model.add(Flatten())
model.add(Dense(128,activation=tf.keras.layers.LeakyReLU(alpha=0.01)))
model.add(Dropout(0.2))
model.add(Dense(10, activation="softmax"))

# Compile the model
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Fit the model
model.fit(x_train, y_train, validation_data=(x_valid, y_valid), epochs=40, batch_size=16)

# Evaluate the model on the test set
results = model.evaluate(x_test, y_test)

# Print the evaluation results
print("Test Loss:", results[0])
print("Test Accuracy:", results[1])


In [None]:
# Load the model chose for transfer learning, excluding the top (classification) layer
base_model =  tf.keras.applications.MobileNetV2(weights='imagenet', include_top=False, input_shape=(256, 256, 3))

for layer in base_model.layers:
    layer.trainable = False
  
    
x = base_model.output 
# Add your custom classification head
x = keras.layers.GlobalAveragePooling2D()(x)
x = layers.Dense(128, activation=tf.keras.layers.LeakyReLU(alpha=0.005))(x)
x = layers.Dropout(0.2)(x)
output = layers.Dense(10, activation='softmax')(x)  # 10 classes for music genres

# Create a new model with the custom input and classification head
model = models.Model(inputs=base_model.input, outputs=output)

# model.summary()

# Compile the model
model.compile(optimizer=tf.keras.optimizers.Adam(lr=0.0001), loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# callbacks = [EarlyStopping(monitor='val_loss', patience=7, verbose=1)]

# Fit the model
model.fit(x_train, y_train, validation_data=(x_valid, y_valid), epochs=30, batch_size=32)

# Evaluate the model on the test set
results = model.evaluate(x_test, y_test)

# Print the evaluation results
print("Test Loss:", results[0])
print("Test Accuracy:", results[1])

In [None]:
from tensorflow.keras.applications import Xception

base_model = Xception(weights='imagenet', include_top=False, input_shape=(256, 256, 3))
# Freeze the layers of the pre-trained model
for layer in base_model.layers:
    layer.trainable = False
    
base_model.layers[-1].trainable = True

# Define the custom classification head
x = base_model.output
x = keras.layers.GlobalAveragePooling2D()(x)
x = layers.Dense(128, activation='relu')(x)
x = layers.Dropout(0.2)(x)
output = layers.Dense(10, activation='softmax')(x)  # 10 classes for music genres

# Create a new model with the custom input and classification head
model = models.Model(inputs=base_model.input, outputs=output)

# Compile the model
model.compile(optimizer=tf.keras.optimizers.SGD(lr=0.01, momentum = 0.7), loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Fit the model
model.fit(x_train, y_train, validation_data=(x_valid, y_valid), epochs=50, batch_size=8)

# Evaluate the model on the test set
results = model.evaluate(x_test, y_test)
# Print the evaluation results
print("Test Loss:", results[0])
print("Test Accuracy:", results[1])