<a href="https://colab.research.google.com/github/michalgorul/classification-of-musical-genres/blob/main/FMA_MG.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Imports

In [None]:
# !pip install tensorflow==2.10.0 pandas==1.5.0 numpy==1.23.3 seaborn==0.12.0 scikit-learn==1.1.2 librosa==0.9.2 imageio==2.22.3
!pip install pydub==0.25.1

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pydub==0.25.1
  Downloading pydub-0.25.1-py2.py3-none-any.whl (32 kB)
Installing collected packages: pydub
Successfully installed pydub-0.25.1


In [36]:
import glob
import os
import ast
import random
import shutil
from typing import Dict, List, Any

import librosa
import numpy as np
import pandas as pd
from keras.callbacks import History
from matplotlib import pyplot as plt
from matplotlib.pyplot import figure
from pandas import DataFrame
from PIL import ImageFile
from keras import models, layers, activations, optimizers, losses, metrics, Sequential, regularizers
from keras.layers import Flatten

# Global variables

In [23]:
metadata_path = "/content/drive/MyDrive/FMA/dataset/metadata"

genres_path =  "/content/drive/MyDrive/FMA/dataset/genres"
small_dataset = "/content/drive/MyDrive/FMA/dataset/fma_small"
images_path = "/content/drive/MyDrive/FMA/dataset/images"

fma_train_dir = "/content/drive/MyDrive/FMA/dataset/train"
fma_val_dir = "/content/drive/MyDrive/FMA/dataset/validation"
fma_test_dir = "/content/drive/MyDrive/FMA/dataset/test"

directories: Dict[str, str] = {
    "train_dir": fma_train_dir,
    "val_dir": fma_val_dir,
    "test_dir": fma_test_dir,
}

image_target_size = (288, 432)

# Dataset

In [9]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Utils

In [34]:
from typing import Optional

def load_util(filepath: str) -> Optional[DataFrame]:
  filename = os.path.basename(filepath)

  if 'features' in filename:
    return pd.read_csv(filepath, index_col=0, header=[0, 1, 2])

  if 'echonest' in filename:
    return pd.read_csv(filepath, index_col=0, header=[0, 1, 2])

  if 'genres' in filename:
    return pd.read_csv(filepath, index_col=0)

  if 'tracks' in filename:
    tracks = pd.read_csv(filepath, index_col=0, header=[0, 1])

    COLUMNS = [('track', 'tags'), ('album', 'tags'), ('artist', 'tags'),
              ('track', 'genres'), ('track', 'genres_all')]
    for column in COLUMNS:
      tracks[column] = tracks[column].map(ast.literal_eval)

    COLUMNS = [('track', 'date_created'), ('track', 'date_recorded'),
              ('album', 'date_created'), ('album', 'date_released'),
              ('artist', 'date_created'), ('artist', 'active_year_begin'),
              ('artist', 'active_year_end')]
    for column in COLUMNS:
      tracks[column] = pd.to_datetime(tracks[column])

    SUBSETS = ('small', 'medium', 'large')
    try:
      tracks['set', 'subset'] = tracks['set', 'subset'].astype(
          'category', categories=SUBSETS, ordered=True)
    except (ValueError, TypeError):
      # the categories and ordered arguments were removed in pandas 0.25
      tracks['set', 'subset'] = tracks['set', 'subset'].astype(
          pd.CategoricalDtype(categories=SUBSETS, ordered=True))

    COLUMNS = [('track', 'genre_top'), ('track', 'license'),
              ('album', 'type'), ('album', 'information'),
              ('artist', 'bio')]
    for column in COLUMNS:
      tracks[column] = tracks[column].astype('category')

    return tracks

def load() -> DataFrame:
  tracks: DataFrame = load_util(f"{metadata_path}/tracks.csv")
  return tracks


def subset(tracks: DataFrame, subset_name: str) -> DataFrame:
  assert subset_name in ["small", "medium"]

  subset = tracks.index[tracks["set", "subset"] <= "small"]
  assert subset.isin(tracks.index).all()

  return tracks.loc[subset]


def list_specific_genre_tracks(tracks: DataFrame, genre: str) -> None:
  tracks_with_genre_top = tracks.index[tracks["track", "genre_top"] == genre]
  print(genre.upper())
  print(tracks_with_genre_top.to_list())


def get_track_ids_for_genre(tracks: DataFrame, genres: List[str]) -> Dict[str, List[Any]]:
  return {
      genre: tracks.index[tracks["track", "genre_top"] == genre].to_list() for genre in genres
  }


def genres_top_track_ids(tracks: DataFrame) -> Dict[str, List[Any]]:
  subset_small = subset(tracks, "small")
  genres_top = list(tracks["track", "genre_top"].unique())
  return get_track_ids_for_genre(subset_small, genres_top)


def get_genres_top(tracks: DataFrame) -> List[str]:
  genres_top_fixed: List[str] = []
  try:
    genres_top = list(tracks["track", "genre_top"].unique())
    for genre in genres_top:
      if " " not in str(genre):
        genres_top_fixed.append(genre)
      else:
        genres_top_fixed.append(genre.split(" ")[0])
    print("Top genres got")
  except Exception as e:
    print(f"Failed to get top genres, error={e}")
  return genres_top_fixed

def copy_files(file_paths: List[str], dest_dir: str) -> int:
  count = 0
  try:
    for file in file_paths:
      shutil.copy(
        file,
        os.path.join(
          os.path.join(dest_dir),
          os.path.split(file)[1],
        ),
      )
      count += 1
  except KeyError as e:
      print(f"Failed to copy files to destination directory, error={e}")
  return count


def create_directories(directories: List[str]) -> None:
  # Create folders
  for path in directories:
    if os.path.exists(path):
      shutil.rmtree(path)
      os.mkdir(path)
    else:
      os.mkdir(path)



def create_directory(path: str) -> None:
  # Create folder
  if os.path.exists(path):
    shutil.rmtree(path)
    os.mkdir(path)
  else:
    os.mkdir(path)


def remove_directories(directories: Dict[str, str]) -> None:
  # Remove folders
  for folder_name, path in directories.items():
    if os.path.exists(path):
      shutil.rmtree(path)


def delete_dir_if_empty(self, dir_names: List[str]) -> None:
  dirs_to_remove = []
  count = 0
  for dir_name in dir_names:
    songs = glob.glob(
      os.path.join(self.genres_path, f"{dir_name}", "*.mp3"), recursive=True
    )
    if len(songs) == 0:
      dirs_to_remove.append(f"{self.genres_path}/{dir_name}")

  print("Found empty directories:", len(dirs_to_remove))
  print("Removing found empty directories...")
  for path in dirs_to_remove:
    try:
      if os.path.exists(path):
        shutil.rmtree(path)
        count += 1
    except Exception as e:
      print(f"Failed to remove dir, dir={path}, error={e}")
  print("Removed empty directories:", count)

## Initialization

In [32]:
def fill_directories_with_songs() -> None:
  tracks = load()
  ids = genres_top_track_ids(tracks)
  all_dirs = os.listdir(small_dataset)

  print("Getting Top genres...")
  genres_top = get_genres_top(tracks)
  print(genres_top)

  print("Creating directories...")
  try:
    genres_dir_paths = [f"{genres_path}/{genre}" for genre in genres_top]
    create_directories(genres_dir_paths)
    print("Directories created")
  except Exception as e:
    print(f"Failed to create directories, error={e}")
    raise e

  print("Getting files to copy...")
  files_to_copy: Dict[str, List[str]] = {}
  try:
    files_to_copy = {f"{genres_path}/{genre}": [] for genre in genres_top}
    for dir_name in all_dirs:
      for song_file in glob.glob(
        os.path.join(small_dataset, f"{dir_name}", "*.mp3"), recursive=True
      ):
        song_index = int(song_file.split("/")[-1].split(".")[0])
        for genre, ids_list in ids.items():
          if " " in str(genre):
            genre = genre.split(" ")[0]
          if song_index in ids_list:
            files_to_copy[f"{genres_path}/{genre}"].append(song_file)
    print("Files to copy got")
  except Exception as e:
    print(f"Failed to get files to copy, error={e}")
    raise e

  print("Copying files to desired directories...")
  try:
    files_copied = 0
    for genre_dir, songs in files_to_copy.items():
      print("\tCurrent genre:", genre_dir.split("/")[-1])
      files_copied += copy_files(file_paths=songs, dest_dir=genre_dir)
    if files_copied == 0:
      raise ValueError("Zero files were copied")
    print("Files copied...")
  except Exception as e:
    print(f"Failed to get files to copy, error={e}")
    raise e

  delete_dir_if_empty(os.listdir(genres_path))

def make_spectograms(genre: str, last_song: int = 0) -> None:
  j = 0
  songs_path = genres_path
  for g in songs_path:
    for filename in os.listdir(os.path.join(songs_path, f"{g}")):
      j = j + 1
      if j > last_song:
        print(f"Current file in {g}: {j}")

        song = os.path.join(f"{songs_path}/{g}", f"{filename}")

        y, sr = librosa.load(song)
        # print(sr)
        mels = librosa.feature.melspectrogram(y=y, sr=sr)
        figure(figsize=(4, 2))
        plt.imshow(librosa.power_to_db(mels, ref=np.max), aspect="auto")
        plt.axis("off")

        genre_dir_path = f"{images_path}/{g}"
        if not os.path.exists(genre_dir_path):
          os.mkdir(genre_dir_path)

        plt.savefig(f"{genre_dir_path}/{g + str(j)}.png")
        plt.close()

def data_init() -> None:
  create_directories(list(directories.values()))

  genres = list(os.listdir(images_path))
  for genre in genres:
    print(f"Current genre: {genre}")

    # Finding all images & split in train, test, and validation
    src_file_paths = []

    for file in glob.glob(os.path.join(images_path, f"{genre}", "*.png"), recursive=True):
      src_file_paths.append(file)

    # Randomizing directories content
    random.shuffle(src_file_paths)

    test_files = src_file_paths[0:50]
    val_files = src_file_paths[50:200]
    train_files = src_file_paths[200:]

    #  make destination folders for train and test images
    for folder_name, path in directories.items():
      if not os.path.exists(f"{path}/{genre}"):
        os.mkdir(f"{path}/{genre}")

    # Coping training and testing images over
    copy_files(
        file_paths=train_files, dest_dir=f"{directories['train_dir']}/{genre}/"
    )
    copy_files(
        file_paths=test_files, dest_dir=f"{directories['test_dir']}/{genre}/"
    )
    copy_files(file_paths=val_files, dest_dir=f"{directories['val_dir']}/{genre}/")
  return

In [35]:
fill_directories_with_songs()

Getting Top genres...
Top genres got
['Hip-Hop', 'Pop', nan, 'Rock', 'Experimental', 'Folk', 'Jazz', 'Electronic', 'Spoken', 'International', 'Soul-RnB', 'Blues', 'Country', 'Classical', 'Old-Time', 'Instrumental', 'Easy']
Creating directories...
Directories created
Getting files to copy...
Files to copy got
Copying files to desired directories...
	Current genre: Hip-Hop
	Current genre: Pop
	Current genre: nan
	Current genre: Rock
	Current genre: Experimental
	Current genre: Folk
	Current genre: Jazz
	Current genre: Electronic
	Current genre: Spoken
	Current genre: International
	Current genre: Soul-RnB
	Current genre: Blues
	Current genre: Country
	Current genre: Classical
	Current genre: Old-Time
	Current genre: Instrumental
	Current genre: Easy
Failed to get files to copy, error=Zero files were copied


ValueError: ignored

## Sanity data test

In [12]:
def sanity_data_test() -> None:
  print(
    "Genres directories in train data:",
    len(os.listdir(directories["train_dir"])),
  )
  print(
    "Genres directories in test data:", len(os.listdir(directories["test_dir"]))
  )
  print(
    "Genres directories in validation data:",
    len(os.listdir(directories["val_dir"])),
  )

  print("\nTotal number of images in:")
  for genre in genres:
    print()
    for folder_name, path in directories.items():
      print(
        f"\t{folder_name} of {genre} songs: "
        + str(len(os.listdir(f"{directories[folder_name]}/{genre}"))),
      )

sanity_data_test()

FileNotFoundError: ignored

# Data generators

In [37]:
train_dir_on_colab_instance = "/content/bin/data/train"
val_dir_on_colab_instance = "/content/bin/data/val"
test_dir_on_colab_instance = "/content/bin/data/test"

In [38]:
if os.path.exists(train_dir_on_colab_instance):
  shutil.rmtree(train_dir_on_colab_instance)

if os.path.exists(val_dir_on_colab_instance):
  shutil.rmtree(val_dir_on_colab_instance)

if os.path.exists(test_dir_on_colab_instance):
  shutil.rmtree(test_dir_on_colab_instance)

shutil.copytree(fma_train_dir, train_dir_on_colab_instance)
shutil.copytree(fma_val_dir, val_dir_on_colab_instance)
shutil.copytree(fma_val_dir, test_dir_on_colab_instance)

'/content/bin/data/test'

In [39]:
from keras.preprocessing.image import ImageDataGenerator, DirectoryIterator

# BATCH_SIZE = 128
BATCH_SIZE = 20



def get_train_data_generator() -> DirectoryIterator:
  print("Creating train data generator")
  train_datagen = ImageDataGenerator(rescale=1.0 / 255)

  train_dir = train_dir_on_colab_instance
  target_size = image_target_size

  train_generator = train_datagen.flow_from_directory(
      train_dir, 
      target_size=target_size, 
      batch_size=BATCH_SIZE, 
      class_mode="categorical", 
      color_mode="rgba"
  )
  return train_generator


def get_validation_data_generator() -> DirectoryIterator:
  print("Creating validation data generator")
  validation_datagen = ImageDataGenerator(rescale=1.0 / 255)

  validation_dir = val_dir_on_colab_instance
  target_size = image_target_size

  validation_generator = validation_datagen.flow_from_directory(
      validation_dir, 
      target_size=target_size, 
      batch_size=BATCH_SIZE, 
      class_mode="categorical",  
      color_mode="rgba"
  )
  return validation_generator


def list_output_of_generators() -> None:
  for data_batch, labels_batch in get_train_data_generator():
    print("Train generator:")
    print("\tData batch shape:", data_batch.shape)
    print("\tLabels batch shape:", labels_batch.shape)
    print()
    break

  for data_batch, labels_batch in get_validation_data_generator():
    print("Validation generator:")
    print("\tData batch shape:", data_batch.shape)
    print("\tLabels batch shape:", labels_batch.shape)
    break


# list_output_of_generators()

# Model (My network)

In [40]:
ImageFile.LOAD_TRUNCATED_IMAGES = True


def build_model() -> Sequential:
  """
  Function creating keras model
  :return: a model
  """

  input_shape = (*image_target_size, 4)
  
  model = models.Sequential()
  
  model.add(layers.Conv2D(8, (3, 3), activation=activations.relu, input_shape=input_shape))
  model.add(layers.MaxPooling2D((2, 2)))
  model.add(layers.Dropout(0.3))

  model.add(layers.Conv2D(16, (3, 3), activation=activations.relu))
  model.add(layers.MaxPooling2D((2, 2)))
  model.add(layers.Dropout(0.3))

  model.add(layers.Conv2D(32, (3, 3), activation=activations.relu))
  model.add(layers.MaxPooling2D((2, 2)))
  model.add(layers.Dropout(0.3))

  model.add(layers.Conv2D(64, (3, 3), activation=activations.relu))
  model.add(layers.MaxPooling2D((2, 2)))
  model.add(layers.Dropout(0.3))

  model.add(layers.Conv2D(128, (3, 3), activation=activations.relu))
  model.add(layers.MaxPooling2D((2, 2)))
  model.add(layers.Dropout(0.3))

  # flattening the data to be passed to a dense layer
  model.add(Flatten())

  model.add(layers.Dense(256, activation=activations.relu))
  model.add(layers.Dense(8, activation=activations.softmax))

  model.summary()

  model.compile(
      optimizer=optimizers.RMSprop(learning_rate=0.0005),
      loss=losses.categorical_crossentropy,
      metrics=[metrics.categorical_accuracy],
  )

  return model

# Plotting functions

In [41]:
def show_training_and_validation_loss(
    epochs: range, loss_values: List[float], val_loss_values: List[float]
) -> None:
  plt.plot(epochs, loss_values, "bo", label="Training loss")
  plt.plot(epochs, val_loss_values, "b", label="Validation loss")
  plt.title("Training and validation loss")
  plt.xlabel("Epochs")
  plt.ylabel("Loss")
  plt.legend()
  plt.show()


def show_training_and_validation_accuracy(
    epochs: range, acc: List[float], val_acc: List[float]
) -> None:
  plt.plot(epochs, acc, "bo", label="Training acc")
  plt.plot(epochs, val_acc, "b", label="Validation acc")
  plt.title("Training and validation accuracy")
  plt.xlabel("Epochs")
  plt.ylabel("Accuracy")
  plt.legend()
  plt.show()

# Training the model

### Checking if CUDA is available

In [None]:
!nvcc --version

nvcc: NVIDIA (R) Cuda compiler driver
Copyright (c) 2005-2021 NVIDIA Corporation
Built on Sun_Feb_14_21:12:58_PST_2021
Cuda compilation tools, release 11.2, V11.2.152
Build cuda_11.2.r11.2/compiler.29618528_0


In [None]:
import tensorflow as tf
tf.test.gpu_device_name()
# Standard output is '/device:GPU:0'

'/device:GPU:0'

### Training

In [None]:
from keras.callbacks import History

train_data = get_train_data_generator()
val_data = get_validation_data_generator()

EPOCHS = 30

model = build_model()

history: History = model.fit(
        train_data,
        steps_per_epoch=train_data.samples / train_data.batch_size,
        epochs=EPOCHS,
        validation_data=val_data,
        validation_steps=val_data.samples / val_data.batch_size,
    )

model_path = "/content/drive/MyDrive/GTZAN/models/gztan_provided_spectograms.h5"
try:
  model.save(model_path)
  print(f"Model saved at {model_path}")
except Exception as e:
  print(f"Failed to save model, error={e}")


train_loss_values = history.history.get("loss")
val_loss_values = history.history.get("val_loss")
train_accuracy = history.history.get("categorical_accuracy")
val_accuracy = history.history.get("val_categorical_accuracy")
num_of_epochs = range(1, len(train_accuracy) + 1)

print(f"categorical_accuracy max: {max(history.history.get('categorical_accuracy'))}")
print(f"val_categorical_accuracy max: {max(history.history.get('val_categorical_accuracy'))}")
print(f"loss min: {min(history.history.get('loss'))}")
print(f"val_loss min: {min(history.history.get('val_loss'))}")

show_training_and_validation_loss(
    epochs=num_of_epochs, loss_values=train_loss_values, val_loss_values=val_loss_values
)

show_training_and_validation_accuracy(
    epochs=num_of_epochs, acc=train_accuracy, val_acc=val_accuracy
)

# Results

In [None]:
# make test folder with one class

labels = train_data.class_indices
genres = dict((v,k) for k,v in labels.items())

test_dir = fma_test_dir + "/images/" 
create_directory(test_dir)

for genre in genres:
  source = fma_test_dir + "/" + genre + "/"

  # code to move the files from sub-folder to main folder.
  files = os.listdir(source)
  for file in files:
    file_name = os.path.join(source, file)
    shutil.move(file_name, test_dir)
  os.rmdir(source)
  print(f"Files from {genre} moved")

In [None]:
test_dir = fma_test_dir
test_datagen = ImageDataGenerator(rescale=1./255)

BATCH_SIZE = 20

# shuffle=False in order to preserve the order of filenames and predictions.
test_generator = test_datagen.flow_from_directory(
        test_dir,
        target_size=image_target_size,
        batch_size=BATCH_SIZE,
        class_mode="categorical", 
        color_mode="rgba",
        shuffle=False
)

Found 49 images belonging to 1 classes.


In [None]:
# Predict from generator (returns probabilities)
test_generator.reset()
predictions = model.predict(
    test_generator, 
    steps=len(test_generator), 
    verbose=1
)



In [None]:
import pandas as pd

predicted_class_indices=np.argmax(predictions,axis=1)
labels = genres
# labels = train_data.class_indices
# labels = dict((v,k) for k,v in labels.items())
prediction_results = [labels[k] for k in predicted_class_indices]

filenames = test_generator.filenames
results = pd.DataFrame({"Filename": filenames,
                      "Predictions": prediction_results})
print(results)

In [None]:
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, cohen_kappa_score
import re


classes = sorted([
    genres.index(re.sub(r'\d', '', c.replace(".png", ""))) 
    for c in os.listdir(test_dir + "/images/")
])
print(classes)
y_pred = np.argmax(predictions, axis=1)
print(y_pred)
print('Confusion Matrix')
print(confusion_matrix(classes, y_pred))
print('Classification Report')
target_names = genres
print(classification_report(classes, y_pred, target_names=target_names))
print("Cohen's Kappa: {}".format(cohen_kappa_score(classes, y_pred)))
print("Accuracy: ",accuracy_score(classes, y_pred))


#### Results 
