In [7]:
import pandas as pd
import os
import ast
import matplotlib.pyplot as plt
import seaborn as sns

import IPython.display as ipd
from sklearn.decomposition import PCA 
from sklearn.preprocessing import *
import numpy as np

from sklearn.feature_selection import *
import librosa
import warnings
warnings.filterwarnings("ignore")

In [8]:
# paths for the data

DATA_PATH = "/kaggle/input/fma-small/"
METADATA_PATH = DATA_PATH + "fma_metadata/fma_metadata/"
FMA_SMALL_PATH = DATA_PATH + "fma_small/fma_small/"
FIG_SIZE = (20, 20)

In [9]:
os.listdir(METADATA_PATH)

['echonest.csv',
 'raw_genres.csv',
 'raw_artists.csv',
 'features.csv',
 'genres.csv',
 'README.txt',
 'not_found.pickle',
 'tracks.csv',
 'raw_tracks.csv',
 'raw_albums.csv',
 'raw_echonest.csv',
 'checksums']

In [10]:
def load(filepath):

    filename = os.path.basename(filepath)

    if 'features' in filename:
        return pd.read_csv(filepath, index_col=0, header=[0, 1, 2])

    if 'echonest' in filename:
        return pd.read_csv(filepath, index_col=0, header=[0, 1, 2])

    if 'genres' in filename:
        return pd.read_csv(filepath, index_col=0)

    if 'tracks' in filename:
        tracks = pd.read_csv(filepath, index_col=0, header=[0, 1])

        COLUMNS = [('track', 'tags'), ('album', 'tags'), ('artist', 'tags'),
                   ('track', 'genres'), ('track', 'genres_all'),
                   ('track', 'genres_top')]
        for column in COLUMNS:
            tracks[column] = tracks[column].map(ast.literal_eval)

        COLUMNS = [('track', 'date_created'), ('track', 'date_recorded'),
                   ('album', 'date_created'), ('album', 'date_released'),
                   ('artist', 'date_created'), ('artist', 'active_year_begin'),
                   ('artist', 'active_year_end')]
        for column in COLUMNS:
            tracks[column] = pd.to_datetime(tracks[column])

        SUBSETS = ('small', 'medium', 'large')
        tracks['set', 'subset'] = tracks['set', 'subset'].astype(
                'category', categories=SUBSETS, ordered=True)

        COLUMNS = [('track', 'license'), ('artist', 'bio'),
                   ('album', 'type'), ('album', 'information')]
        for column in COLUMNS:
            tracks[column] = tracks[column].astype('category')

        return tracks

# **Creaing npz files for train, test, val data**

In [1]:
def get_audio_path(audio_dir, track_id):
    """
    Return the path to the mp3 given the directory where the audio is stored
    and the track ID.
    Examples
    
    --------
    >>> import utils
    >>> AUDIO_DIR = os.environ.get('AUDIO_DIR')
    >>> utils.get_audio_path(AUDIO_DIR, 2)
    '../data/fma_small/000/000002.mp3'
    """
    try:
        tid_str = '{:06d}'.format(track_id)
        return os.path.join(audio_dir, tid_str[:3], tid_str + '.mp3')
    
    except Exception as err:
        return err

def create_spectogram(dir_path , track_id):
    """
        this function, will create a melspectrogram from the signal.
        Params:
            dir_path(type: str): Path to the main directory, where it contains, the music files like (.wav, .mp3).
            track_id(type: Int): Each row in the dataframe, have a track_id, which map to the music file.
        Return(type: np.NdArray)
            this function, will return the melspectrogram data.
    """
    try:
        filepath = get_audio_path(dir_path, track_id)
        y, sr = librosa.load(filepath)
        spect = librosa.feature.melspectrogram(y=y, sr=sr,n_fft=2048, hop_length=1024)
        spect = librosa.power_to_db(spect, ref=np.max)
        return spect.T

    except Exception as err:
        return err

In [2]:
def get_combined_df(): 
    """
        this function, will create a combined.
        PArams:(none)
        Return (type: pd.DataFrame)
    """
    try:
        filepath = METADATA_PATH + "tracks.csv"
        tracks = pd.read_csv(filepath, index_col=0, header=[0, 1])
        keep_cols = [('set', 'split'),
        ('set', 'subset'),('track', 'genre_top')]

        df_all = tracks[keep_cols]
        df_all = df_all[df_all[('set', 'subset')] == 'small']

        df_all['track_id'] = df_all.index
        
        return df_all 
    
    except Exception as err:
        return err

def get_processing_data(df): 
    """
        this function, will get the training, testing and validation subset data from the dataframe.
        Params:
            df(type: pandas.DataFrame): DataFrame, that contains the data.
        Return(type: pd.DataFrame, pd.DataFrame, pd.DataFrame)
            this function, will return the train, test, val dataframes.
    """
    try:
        train = df[df['set', 'split'] == 'training']
        val = df[df['set', 'split'] == 'validation']
        test = df[df['set', 'split'] == 'test']
        
        return train, val, test
    
    except Exception as error:
        return error

In [3]:
def create_label_map(labels): 
    """
        this function, will create a dictionary with the key as a integer value and value of dictionary as 
        a label. (similar to the label encoding). this function creates two dictionary one for mapping integer to the label
        and another maps label to the integer
        eg:
            label_map = {
                'cat': 0
            }
            reverse_label_map = [
                0: 'cat'
            ]
        Params;
            labels(type: List): list containing all the label categories.
        Return(type: Dict, Dict)
            this function, will return the dictionary of labels and integet mapping and integer and label mapping.
    """
    try:
        label_map = {}
        reverse_label_map = {}
        for indx, val in enumerate(labels): 
            label_map[val] = indx 
            reverse_label_map[indx] = val
        
        return label_map, reverse_label_map
    
    except Exception as error:
        return error

## **Function for creating a training data.**

### **Function for creating Mel Spectrogram data**

In [4]:
import tqdm 

def create_melspectrogram_data(df, dir_path, label_map):
    """
        - this function, will create a training data with melspectrogram feature. for each audio we will extract
        the melspectrogram feature(2-Dimension) data. We will add a extra index at the end to convert it into 3D data.
        And it also uses the create_spectogram function to extract the mel spectrogram for each audio file.
        - Params:
            df(type: pd.DataFrame): FMA dataframe.
            dir_path(type: str): Path to the main directory, where it contains, the music files like (.wav, .mp3).
            label_map(type: Dict): label_map, which is nothing but a mapping of label to the integer value,
        - Return(type: (np.ndarray, np.array))
            this function, will return X and label data, which will be used to trained a Neural Network.
    """
    X = np.empty((0, 640, 128))
    y = []
    for i in tqdm.tqdm(range(len(df))): 
        try: 
            if i >= 5000:
                break 
            track_id = df["track_id"].iloc[i]
            genre_type = df[("track", "genre_top")].iloc[i]
            spect = create_spectogram(dir_path, track_id)
            
            spect = spect[:640, :]
            X = np.append(X, [spect], axis=0)
            
        except Exception as error:
            continue 
            
        else: 
            y.append(label_map[genre_type])
            
    return X, np.array(y)


### **Function for creating MFCC data**

In [None]:

def create_mfcc_data(df, dir_path, label_map):
     """
        - this function, will create a training data with MFCC(Mel-frequency cepstral coefficients) feature. 
        for each audio we will extract the melspectrogram feature(2-Dimension) data. We will add a extra index 
        at the end to convert it into 3D data. And it also uses the create_spectogram function to extract the mel 
        spectrogram for each audio file.
        - Params:
            df(type: pd.DataFrame): FMA dataframe.
            dir_path(type: str): Path to the main directory, where it contains, the music files like (.wav, .mp3).
            label_map(type: Dict): label_map, which is nothing but a mapping of label to the integer value,
        - Return(type: (np.ndarray, np.array))
            this function, will return X and label data, which will be used to trained a Neural Network.
    """
    X = np.empty((0, 13, 1280))
    y = []
    for i in tqdm.tqdm(range(len(df))): 
        try: 
            if i >= 5000:
                break 
            track_id = df["track_id"].iloc[i]
            genre_type = df[("track", "genre_top")].iloc[i]
            
            filepath = get_audio_path(dir_path, track_id)
            signal, sample_rate = librosa.load(filepath, sr=22050)
            hop_length = 512
            n_fft = 2048 
            hop_length_duration = float(hop_length)/sample_rate
            n_fft_duration = float(n_fft)/sample_rate
            mfcc = librosa.feature.mfcc(y=signal, sr=sample_rate, n_fft=n_fft, hop_length=hop_length, n_mfcc=13)
         #   mfcc = np.moveaxis(mfcc, 0, 1)
            mfcc = mfcc[:, :1280]
            X = np.append(X, [mfcc], axis=0)
        
        except Exception as error:Mel-frequency cepstral coefficients
            continue
            
        else:
            y.append(label_map[genre_type])
            
    return X, np.array(y)

### **Function for creating Chromogram data**

In [None]:
def create_chromagram_data(df, dir_path, label_map):
    """
        - this function, will create a training data with Chromagram feature. for each audio we will extract 
        the melspectrogram feature(2-Dimension) data. We will add a extra index at the end to convert it into 
        3D data. And it also uses the create_spectogram function to extract the mel spectrogram for each audio file.
        - Params:
            df(type: pd.DataFrame): FMA dataframe.
            dir_path(type: str): Path to the main directory, where it contains, the music files like (.wav, .mp3).
            label_map(type: Dict): label_map, which is nothing but a mapping of label to the integer value,
        - Return(type: (np.ndarray, np.array))
            this function, will return X and label data, which will be used to trained a Neural Network.
    """
    X = np.empty((0, 13, 1280))
    y = []
    for i in tqdm.tqdm(range(len(df))): 
        try: 
            if i >= 5000:
                break 
            track_id = df["track_id"].iloc[i]
            genre_type = df[("track", "genre_top")].iloc[i]
            
            filepath = get_audio_path(dir_path, track_id)
            signal, sample_rate = librosa.load(filepath, sr=22050)
            chroma_stft = librosa.feature.chroma_stft(y=signal, sr=sample_rate, n_chroma=13, n_fft=4096)
            
            chroma_stft = chroma_stft[:, :1280]
            X = np.append(X, [chroma_stft], axis=0)
        
        except Exception as error:
            continue
            
        else:
            y.append(label_map[genre_type])
            
    return X, np.array(y)

### **Function for creating Ensemble data**

In [5]:

def padding(array, xx, yy):
    """
        this function, will create a ensemble data, that will be used to train the neural network, this ensemble of 
        data contains, multiple features from the audio like spectral centroids, mfcc, melspec, chromogram, etc.
        :param array: numpy array
        :param xx: desired height
        :param yy: desirex width
        :return: padded array
    """
    h = array.shape[0]
    w = array.shape[1]
    a = max((xx - h) // 2,0)
    aa = max(0,xx - a - h)
    b = max(0,(yy - w) // 2)
    bb = max(yy - b - w,0)
    return np.pad(array, pad_width=((a, aa), (b, bb)), mode='constant')


def generate_features(y_cut, sr):
    max_size=1291 #my max audio file feature width
    stft = padding(np.abs(librosa.stft(y=y_cut, n_fft=255, hop_length        = 512)), 128, max_size)
    MFCCs = padding(librosa.feature.mfcc(y=y_cut, n_fft=255, hop_length=512,n_mfcc=128),128,max_size)
    spec_centroid = librosa.feature.spectral_centroid(y=y_cut, sr=sr)
    chroma_stft = librosa.feature.chroma_stft(y=y_cut, sr=sr)
    spec_bw = librosa.feature.spectral_bandwidth(y=y_cut, sr=sr)
    #Now the padding part
    image = np.array([padding(normalize(spec_bw),1, max_size)]).reshape(1,max_size)
    image = np.append(image,padding(normalize(spec_centroid),1, max_size), axis=0) 
    
    print(image, "i")
    #repeat the padded spec_bw,spec_centroid and chroma stft until they are stft and MFCC-sized
    for i in range(0,9):
        image = np.append(image,padding(normalize(spec_bw),1, max_size), axis=0)
        image = np.append(image, padding(normalize(spec_centroid),1, max_size), axis=0)
        image = np.append(image, padding(normalize(chroma_stft),12, max_size), axis=0)
    image=np.dstack((image,np.abs(stft)))
    image=np.dstack((image,MFCCs))
    return image


## **Creating Training, Testing and Validation DataFrame from the FMA dataframe(subset: small)**

In [11]:
combined_df = get_combined_df()
train, val, test = get_processing_data(combined_df)

genres = np.unique(train[("track", "genre_top")])
label_map, rev_label_map = create_label_map(genres)
label_map

print(f"Number training samples: {train.shape[0]}")
print(f"Number validation samples: {val.shape[0]}")
print(f"Number testomg samples: {test.shape[0]}")

Number training samples: 6400
Number validation samples: 800
Number testomg samples: 800


In [None]:
from concurrent.futures import ProcessPoolExecutor

def use_multiprocessing(): 
    data = [(train, FMA_SMALL_PATH, label_map), (val, FMA_SMALL_PATH, label_map), (test, FMA_SMALL_PATH, label_map)]
    with ProcessPoolExecutor() as executor:
        futures = [executor.submit(create_melspectrogram_data, d[0], d[1], d[2]) for d in data]
        results = [result.result() for result in futures]

### **Creating a Mel-Spectrogram data**

In [None]:
mel_train_X, mel_train_y = create_melspectrogram_data(train, FMA_SMALL_PATH, label_map)
mel_val_X, mel_val_y = create_melspectrogram_data(val, FMA_SMALL_PATH, label_map)
mel_test_X, mel_test_y = create_melspectrogram_data(test, FMA_SMALL_PATH, label_map)

### **Creating MFCC data**

In [57]:
mfcc_train_X, mfcc_train_y = create_mfcc_data(train, FMA_SMALL_PATH, label_map)
mfcc_test_X, mfcc_test_y = create_mfcc_data(test, FMA_SMALL_PATH, label_map)
mfcc_val_X, mfcc_val_y = create_mfcc_data(val, FMA_SMALL_PATH, label_map)

100%|██████████| 800/800 [05:31<00:00,  2.41it/s]
100%|██████████| 800/800 [05:40<00:00,  2.35it/s]


### **Creating Chromogram data**

In [12]:
chroma_train_X, chroma_train_y = create_chromagram_data(train, FMA_SMALL_PATH, label_map)
chroma_test_X, chroma_test_y = create_chromagram_data(test, FMA_SMALL_PATH, label_map)
chroma_val_X, chroma_val_y = create_chromagram_data(val, FMA_SMALL_PATH, label_map)

100%|██████████| 800/800 [06:50<00:00,  1.95it/s]
100%|██████████| 800/800 [06:35<00:00,  2.02it/s]


### **Creating Ensemble data**

In [None]:
en_train_X, en_train_y = generate_ensemble_features(train, FMA_SMALL_PATH, label_map)
en_test_X, en_test_y = generate_ensemble_features(test, FMA_SMALL_PATH, label_map)
en_val_X, en_val_y = generate_ensemble_features(val, FMA_SMALL_PATH, label_map)

# **Converting the np arrays of the data into npz compressed files**

In [15]:
def create_compressed_File(data_X, data_y, X_type, type_of_data): 
    """
        this function, will convert the numpy array into file.
        Params:
            data_X(type: np.ndarray): X(independent data).
            data_y(type: np.array): y(dependent data).
            X_type(type: str): Which type of audio feature(eg: mfcc, mel-spectrogram).
            type_of_data(type: str): tpye of data (eg: train, test or val).
        Return(type: None)
    """
    filename = f"{X_type}-{type_of_data}"
    np.savez_compressed(filename, X=data_X, y = data_y)

### **Saving MelSpectrogram data**

In [None]:
create_compressed_File(mel_train_X, mel_train_y, "melspectrogram", 'train')
create_compressed_File(mel_val_X, mel_val_y, "melspectrogram", 'val')
create_compressed_File(mel_test_X, mel_test_y, "melspectrogram", 'test')

### **Saving MFCC (Mel-frequency cepstral coefficients) data**

In [95]:
create_compressed_File(mfcc_train_X, mfcc_train_y, "mfcc", 'train')
create_compressed_File(mfcc_val_X, mfcc_val_y, "mfcc", 'val')
create_compressed_File(mfcc_test_X, mfcc_test_y, "mfcc", 'test')

### **Saving MFCC (Mel-frequency cepstral coefficients) No Moving Axis data**

In [58]:
create_compressed_File(mfcc_train_X, mfcc_train_y, "mfcc-nomoveaxis", 'train')
create_compressed_File(mfcc_val_X, mfcc_val_y, "mfcc-nomoveaxis", 'val')
create_compressed_File(mfcc_test_X, mfcc_test_y, "mfcc-nomoveaxis", 'test')

### **Saving Chromogram data**

In [122]:
create_compressed_File(chroma_train_X, chroma_train_y, "chroma", 'train')
create_compressed_File(chroma_test_X, chroma_test_y, "chroma", 'test')
create_compressed_File(chroma_val_X, chroma_val_y, "chroma", 'val')

### **Saving Ensemble Feature data**

In [16]:
create_compressed_File(en_train_X, en_train_y, "ensemble", 'train')
create_compressed_File(en_test_X, en_test_y, "ensemble", 'test')
create_compressed_File(en_val_X, en_val_y, "ensemble", 'val')