<h1>Music Genre Classification</h1>

<h2>configuration</h2>

In [1]:
import pandas as pd
import numpy as np
import seaborn as sns
import sklearn
import matplotlib.pyplot as plt
%matplotlib inline

from time import time
import librosa
import librosa.display
import IPython.display as ipd

In [2]:
#CONFIG
src_path = './music-data/audio_data' # Input audio data root directory path
outfile_path = "data.csv" # Output path for extracted features

NUMBER_OF_MFCC = 20 # Mel coefficients
N_FFT = 2048  # Fourier Transform Settings
HOP_LENGTH = 512 # Hop length on audio

In [3]:
import os
genres = list(os.listdir(f'{src_path}'))
print(genres)

['pop', '.DS_Store', 'metal', 'disco', 'blues', 'reggae', 'classical', 'rock', 'hiphop', 'country', 'jazz']


<h2>Audio Exploration and Visualizations</h2>

In [None]:
y_sig, sr_viz = librosa.load(f'{src_path}/reggae/reggae.00036.wav')
tempo, beats = librosa.beat.beat_track(y=y_sig, sr=sr_viz)

In [None]:
#Load the audio file
y_sig, sr_viz = librosa.load(f'{src_path}/reggae/reggae.00036.wav')

print('y:', y_sig, '\n')
print('Datatype of y:', type(y_sig))
print('Datatype of sr:', type(sr_viz), '\n')
print('y shape:', np.shape(y_sig), '\n')
print('Sampling Rate (Hz):', sr_viz, '\n')

#Verify the length of the audio
audio_length_seconds = np.shape(y_sig)[0] / sr_viz
print('Checking the length of the audio:', audio_length_seconds)
ipd.Audio(f'{src_path}/reggae/reggae.00036.wav')

In [None]:
#Trimming any leading or trailing silences from the audio
yt, index = librosa.effects.trim(y_sig)

#Printing the durations
print('Durations before and after trimming:\n')
print('Before:', librosa.get_duration(y=y_sig, sr=sr_viz), 'After:', librosa.get_duration(y=yt, sr=sr_viz), '\n')

print('Audio File:', yt, '\n')
print('Audio File shape:', np.shape(yt))

In [None]:
#Creating a figure and axes for subplots
fig, axes = plt.subplots(nrows=5, ncols=2, figsize=(15, 20))

#Initializing row and column indices
i = 0
j = 0

#Iterate over each genre
for genre in genres:
    if genre.startswith('.'): #Skip hidden files like .DS_Store
        continue
        
    #Reading the first audio file
    path = f"{src_path}/{genre}/{genre}.00000.wav"
    y_sig, sampling_rate_viz = librosa.load(path)
    
    #Plotting the waveform
    librosa.display.waveshow(y=y_sig, sr=sampling_rate_viz, color="#A300F9", ax=axes[i][j])
    axes[i][j].set_title(genre)
    
    #Updating row and column indices for the next subplot
    if j == 1:
        i += 1
        j = 0
    else:
        j += 1

#Adjusting layout and display the plot
plt.suptitle("Waveform of Audio Files from Different Genres\n", fontsize=16)
plt.tight_layout()
plt.show()

In [None]:
#Default FFT window size
vis_n_fft = 2048
vis_hop_length = 512

#Creating a figure and axes for subplots
fig, axes = plt.subplots(nrows=5, ncols=2, figsize=(15, 20))

#Initializing row and column indices
i = 0
j = 0

#Iterating over each genre
for genre in genres:
    if genre.startswith('.'): #Skip hidden files like .DS_Store
        continue
        
    #Reading the first audio file
    file = f"{src_path}/{genre}/{genre}.00000.wav"
    y_sig, sampling_rate_viz = librosa.load(file)
    
    #Computing the short-time Fourier transform (STFT)
    stft_data = np.abs(librosa.stft(y=y_sig, n_fft=vis_n_fft, hop_length=vis_hop_length))
    
    #Convert the amplitude spectrogram to Decibels-scaled spectrogram
    DB_viz = librosa.amplitude_to_db(stft_data, ref=np.max)

    #Display the spectrogram
    img = librosa.display.specshow(DB_viz, sr=sampling_rate_viz, hop_length=vis_hop_length, x_axis='time', y_axis='log', ax=axes[i][j])
    fig.colorbar(img, ax=axes[i][j], format="%+2.0f dB")

    axes[i][j].set_title(genre)

    #Update row and column indices for the next subplot
    if j == 1:
        i += 1
        j = 0
    else:
        j += 1

# Adjust layout and display the plot
plt.suptitle("Power Spectrograms of Audio Files from Different Genres\n", fontsize=16)
plt.tight_layout()
plt.show()

In [None]:
#Creating a figure and axes for subplots
fig, axes = plt.subplots(nrows=5, ncols=2, figsize=(15, 20))

#Initializing row and column indices
i = 0
j = 0

#Iterate over each genre
for genre in genres:
    if genre.startswith('.'): #Skip hidden files like .DS_Store
        continue
        
    #Reading the first audio file
    path = f"{src_path}/{genre}/{genre}.00000.wav"
    y_sig, sampling_rate_viz = librosa.load(path)

    #Calculating the spectral centroids
    spectral_centroids = librosa.feature.spectral_centroid(y=y_sig, sr=sampling_rate_viz)[0]
    
    # Computing the time variable for visualization
    frames_viz = range(len(spectral_centroids))

    #Converts frame counts to time (seconds)
    ti = librosa.frames_to_time(frames_viz)
    
    #Plotting the waveform
    librosa.display.waveshow(y=y_sig, sr=sampling_rate_viz, alpha=0.4, color = 'purple',ax=axes[i][j])
    axes[i][j].plot(ti, sklearn.preprocessing.minmax_scale(spectral_centroids, axis=0), color='orange')
    axes[i][j].set_title(genre)

    # Adding x and y labels
    axes[i][j].set_xlabel('Time (s)')
    axes[i][j].set_ylabel('Normalized Spectral Centroids')
    
    #Updating row and column indices for the next subplot
    if j == 1:
        i += 1
        j = 0
    else:
        j += 1

#Adjusting layout and display the plot
plt.suptitle("Spectral Centroid along the waveform\n", fontsize=16)
plt.tight_layout()
plt.show()

In [None]:
#Creating a figure and axes for subplots
fig, axes = plt.subplots(nrows=5, ncols=2, figsize=(15, 20))

#Initializing row and column indices
i = 0
j = 0

#Iterate over each genre
for genre in genres:
    if genre.startswith('.'): #Skip hidden files like .DS_Store
        continue
        
    #Reading the first audio file
    file = f"{src_path}/{genre}/{genre}.00000.wav"
    y_sig, sampling_rate_viz = librosa.load(file)

    #Calculating the spectral rolloffs
    spectral_rolloff_viz = librosa.feature.spectral_rolloff(y=y_sig, sr=sampling_rate_viz)[0]
    
    # Computing the time variable for visualization
    frames_viz = range(len(spectral_rolloff_viz))

    #Converts frame counts to time (seconds)
    ti = librosa.frames_to_time(frames_viz)
    
    #Plotting the waveform
    librosa.display.waveshow(y=y_sig, sr=sampling_rate_viz, alpha=0.4, color = 'purple',ax=axes[i][j])
    axes[i][j].plot(ti, sklearn.preprocessing.minmax_scale(spectral_rolloff_viz, axis=0), color='orange')
    axes[i][j].set_title(genre)

    # Adding x and y labels
    axes[i][j].set_xlabel('Time (s)')
    axes[i][j].set_ylabel('Normalized Spectral Rolloff')
    
    #Updating row and column indices for the next subplot
    if j == 1:
        i += 1
        j = 0
    else:
        j += 1

#Adjusting layout and display the plot
plt.suptitle("Spectral Rolloff along the waveform\n", fontsize=16)
plt.tight_layout()
plt.show()

In [None]:
fig, axes= plt.subplots(nrows=5, ncols=2, figsize=(15, 20))
i = 0
j = 0
for genre in genres:
    if genre.startswith('.'): #Skip hidden files like .DS_Store
        continue
        
    # Reading the first audio file
    file = f"{src_path}/{genre}/{genre}.00000.wav"
    y_sig, sampling_rate_viz = librosa.load(file)
    
    zero_crossing_rate_viz = librosa.feature.zero_crossing_rate(y=y_sig, hop_length = vis_hop_length)[0]

    #Plot
    axes[i][j].plot(zero_crossing_rate_viz,color = "purple")
    axes[i][j].set_title(genre)

    # Adding x and y labels
    axes[i][j].set_xlabel("Sample Index")
    axes[i][j].set_ylabel("Amplitude")

    if(j == 1):
        i = i + 1
        j = 0
    else:
        j = j + 1

plt.suptitle("Signal Segment with Zero-Crossings\n", fontsize=16)
plt.tight_layout()
plt.show()

In [None]:
#Defining the start and end points for the segment
start_seg = 1000
end_seg = 1200

#Calculating the zero-crossing rate for reggae.00036.wav in the above defined segment
zero_cross_rate_viz = librosa.zero_crossings(yt[start_seg:end_seg], pad=False)

#Counting the number of zero crossings
num_zero_crossings_viz = sum(zero_cross_rate_viz)
print("The number of zero crossings is:", num_zero_crossings_viz)

<h2>Feature Extraction from audio data</h2>

In [4]:
import multiprocessing as mp

import dask
from dask import dataframe as dd
from dask import delayed, compute
from dask.system import CPU_COUNT
from dask.distributed import LocalCluster, Client  # For distributed schedulers
from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler, visualize

Dask dataframe query planning is disabled because dask-expr is not installed.

You can install it with `pip install dask[dataframe]` or `conda install dask`.
This will raise in a future version.



<h3> Feature Extraction Pipeline </h3>

In [12]:
# Configuration constants

path = src_path # Input audio data root directory path
outfile_path = "data.csv" # Output path for extracted features

NUMBER_OF_MFCC = 20
N_FFT = 2048
HOP_LENGTH = 512
BLOCK_LENGTH = 11
FRAME_LENGTH = 6616
HOP_LENGTH_STREAM = 6615

def extract_features(trial_audio_file_path: str) -> pd.DataFrame:
    #Extracting features for each chunk of the audio file
    # print("Started processing file")
    df0 = extract_features_per_chunk(trial_audio_file_path)
    # print("Ended processing file")
    #Concatenate dataframes
    df0 = pd.concat(df0)
    return df0

def extract_features_per_chunk(audio_file_path):
    # print("Started features per chunk")
    stream = librosa.stream(audio_file_path, block_length=BLOCK_LENGTH, frame_length=FRAME_LENGTH, hop_length=HOP_LENGTH_STREAM)
    processed_chunks = []
    for i, y_block in enumerate(stream):
        #Trimming starting and trailing silences
        y_trimmed, _ = librosa.effects.trim(y_block)
        #Extracting features from trimmed audio
        df = extract_feature_means(y_block, f'{audio_file_path}', 22050)
        df['label'] = audio_file_path.split("/")[-2]
        processed_chunks.append(df)
    # print("Ended features per chunk")
    return processed_chunks

def extract_feature_means(signal, audio_file_path, sr) -> pd.DataFrame:
    # print("Started feature means")
    # print(f"{audio_file_path}")
    n_fft = N_FFT
    hop_length = HOP_LENGTH
    # print("1")
    #Computing Short-time Fourier Transform (STFT) and spectrograms
    d_audio = np.abs(librosa.stft(signal, n_fft=n_fft, hop_length=hop_length))
    # print("2")
    db_audio = librosa.amplitude_to_db(d_audio, ref = np.max)
    # print("3")
    #Extracting harmonic and percussive components
    y_harm, y_perc = librosa.effects.hpss(signal)
    # print("4")
    #Computing spectral centroids
    spectral_centroids = librosa.feature.spectral_centroid(y=signal, sr=sr)[0]
    # print("5")
    spectral_centroids_delta = librosa.feature.delta(spectral_centroids)
    # print("6")
    spectral_centroids_accelerate = librosa.feature.delta(spectral_centroids, order=2)
    # print("7")
    #Computing chroma features
    chromagram = librosa.feature.chroma_stft(y=signal, sr=sr, hop_length=hop_length)
    # print("8")
    #Computing tempo (beats per minute) and spectral rolloff
    # tempo_y, _ = librosa.beat.beat_track(y=signal, sr=sr)
    # print("9")
    spectral_rolloff = librosa.feature.spectral_rolloff(y=signal, sr=sr)[0]
    # print("10")
    #Computing spectral flux and spectral bandwidth
    onset_env = librosa.onset.onset_strength(y=signal, sr=sr)
    # print("11")
    spectral_bandwidth_2 = librosa.feature.spectral_bandwidth(y=signal, sr=sr)[0]
    # print("12")
    spectral_bandwidth_3 = librosa.feature.spectral_bandwidth(y=signal, sr=sr, p=3)[0]
    # print("13")
    spectral_bandwidth_4 = librosa.feature.spectral_bandwidth(y=signal, sr=sr, p=4)[0]
    # print("14")
    #Computing the Mel Spectrograms
    s_audio = librosa.feature.melspectrogram(y=signal, sr=sr)
    # print("15")
    s_db_audio = librosa.amplitude_to_db(s_audio, ref=np.max)
    # print("16")
    #Creating a dictionary to store audio features
    audio_features = {
        "file_name": audio_file_path,
        "zero_crossing_rate_mean": np.mean(librosa.feature.zero_crossing_rate(y=signal)[0]),
        "zero_crossings_mean": np.sum(librosa.zero_crossings(y=signal, pad=False)),
        "spectrogram_mean": np.mean(db_audio[0]),
        "mel_spectrogram_mean": np.mean(s_db_audio[0]),
        "harmonics_mean": np.mean(y_harm),
        "perceptual_shock_wave_mean": np.mean(y_perc),
        "spectral_centroids_mean": np.mean(spectral_centroids),
        "spectral_centroids_delta_mean": np.mean(spectral_centroids_delta),
        "spectral_centroids_accelerate_mean": np.mean(spectral_centroids_accelerate),
        "chroma1_mean": np.mean(chromagram[0]),
        "chroma2_mean": np.mean(chromagram[1]),
        "chroma3_mean": np.mean(chromagram[2]),
        "chroma4_mean": np.mean(chromagram[3]),
        "chroma5_mean": np.mean(chromagram[4]),
        "chroma6_mean": np.mean(chromagram[5]),
        "chroma7_mean": np.mean(chromagram[6]),
        "chroma8_mean": np.mean(chromagram[7]),
        "chroma9_mean": np.mean(chromagram[8]),
        "chroma10_mean": np.mean(chromagram[9]),
        "chroma11_mean": np.mean(chromagram[10]),
        "chroma12_mean": np.mean(chromagram[11]),
        # "tempo_bpm": tempo_y,
        "spectral_rolloff_var": np.var(spectral_rolloff),
        "spectral_flux_var": np.var(onset_env),
        "spectral_bandwidth_2_var": np.var(spectral_bandwidth_2),
        "spectral_bandwidth_3_var": np.var(spectral_bandwidth_3),
        "spectral_bandwidth_4_var": np.var(spectral_bandwidth_4),
        "zero_crossing_rate_var": np.var(librosa.feature.zero_crossing_rate(y=signal)[0]),
        "zero_crossings_var": np.var(librosa.zero_crossings(y=signal, pad=False)),
        "spectrogram_var": np.var(db_audio[0]),
        "mel_spectrogram_var": np.var(s_db_audio[0]),
        "harmonics_var": np.var(y_harm),
        "perceptual_shock_wave_var": np.var(y_perc),
        "spectral_centroids_var": np.var(spectral_centroids),
        "spectral_centroids_delta_var": np.var(spectral_centroids_delta),
        "spectral_centroids_accelerate_var": np.var(spectral_centroids_accelerate),
        "chroma1_var": np.var(chromagram[0]),
        "chroma2_var": np.var(chromagram[1]),
        "chroma3_var": np.var(chromagram[2]),
        "chroma4_var": np.var(chromagram[3]),
        "chroma5_var": np.var(chromagram[4]),
        "chroma6_var": np.var(chromagram[5]),
        "chroma7_var": np.var(chromagram[6]),
        "chroma8_var": np.var(chromagram[7]),
        "chroma9_var": np.var(chromagram[8]),
        "chroma10_var": np.var(chromagram[9]),
        "chroma11_var": np.var(chromagram[10]),
        "chroma12_var": np.var(chromagram[11]),
        "spectral_rolloff_var": np.var(spectral_rolloff),
        "spectral_flux_var": np.var(onset_env),
        "spectral_bandwidth_2_var": np.var(spectral_bandwidth_2),
        "spectral_bandwidth_3_var": np.var(spectral_bandwidth_3),
        "spectral_bandwidth_4_var": np.var(spectral_bandwidth_4),
    }
    #Extracting MFCC features
    mfcc_df = extract_mfcc_feature_means(audio_file_path, signal, sample_rate=sr, number_of_mfcc=NUMBER_OF_MFCC)

    #Combining audio features and MFCC features into a single dataframe
    df = pd.DataFrame.from_records(data=[audio_features])
    df = pd.merge(df, mfcc_df, on='file_name')
    # print("Ended feature means")
    return df

def extract_mfcc_feature_means(audio_file_name: str, signal: np.ndarray, sample_rate: int, number_of_mfcc: int) -> pd.DataFrame:
    # print("Started mfcc feature means")
    #Computing MFCC features
    mfcc_alt = librosa.feature.mfcc(y=signal, sr=sample_rate, n_mfcc=number_of_mfcc)
    delta = librosa.feature.delta(mfcc_alt)
    accelerate = librosa.feature.delta(mfcc_alt, order=2)

    #Creating a dictionary to store MFCC features
    mfcc_features = {"file_name": audio_file_name}
    for i in range(0, number_of_mfcc):
        mfcc_features[f'mfcc{i}_mean'] = np.mean(mfcc_alt[i])
        mfcc_features[f'mfcc_delta_{i}_mean'] = np.mean(delta[i])
        mfcc_features[f'mfcc_accelerate_{i}_mean'] = np.mean(accelerate[i])
    # print("Ended mfcc feature means")
    return pd.DataFrame.from_records(data=[mfcc_features])

In [13]:
def list_files_recursively(directory):
    file_paths = []
    for root, directories, files in os.walk(directory):
        for filename in files:
            if filename.startswith('.'): #Skip hidden files like .DS_Store
                continue
            file_path = os.path.join(root, filename)
            file_paths.append(file_path)
    return file_paths

files = list_files_recursively(path)

<h3> Estimating Feature Extraction Times </h3>

In [14]:
SUBSET_SIZE = 50

In [None]:
# Serial code
start = time()
with mp.Pool(1) as p:
    for i in files[:SUBSET_SIZE]:
        p.apply(extract_features, (i,))
serial_extraction_time = time()-start
print(f'took {time()-start} seconds')

<h4>Mutiprocessing module</h4>

In [None]:
start = time()
with mp.Pool(2) as p:
    for i in files[:SUBSET_SIZE]:
        p.apply(extract_features, (i,))
print(f'took {time()-start} seconds')

In [None]:
start = time()
with mp.Pool(2) as p:
    for i in files[:SUBSET_SIZE]:
        res = p.apply_async(extract_features, (i,))
        res.get()
print(f'took {time()-start} seconds')

In [None]:
start = time()
with mp.Pool(2) as p:
    res = p.map(extract_features, files[:SUBSET_SIZE])
print(f'took {time()-start} seconds')

In [None]:
start = time()
with mp.Pool(2) as p:
    res = p.map_async(extract_features, files[:SUBSET_SIZE])
    res.get()
print(f'took {time()-start} seconds')

In [None]:
start = time()
jobs = []
for i in files[:SUBSET_SIZE]:
    p1 = mp.Process(target=extract_features, args=(i,))
    p1.start()
    jobs.append(p1)
for job in jobs:
    job.join()
p1.close()
print(f'took {time()-start} seconds')

<h4>Parallel with Dask</h4>

In [15]:
#Setting the number of workers
n_workers = min(4, CPU_COUNT)

def try_scheduler(scheduler_name, files):
    print(f"Scheduler: {scheduler_name}")
    start = time()

    if scheduler_name == "distributed":
        #Using Local Cluster for distributed scheduler
        # cluster = LocalCluster(n_workers=n_workers)  
        client = Client(n_workers=5, threads_per_worker=2)
    else:
        dask.config.set(scheduler=scheduler_name, num_workers=n_workers)  # Set the scheduler for threads or processes or synchronous
    
    res = []
    for file_path in files:
        f = delayed(extract_features)(file_path)
        res.append(f)
        
    extracted_features = delayed(pd.concat)(res)
    df = compute(extracted_features)

    if scheduler_name == "distributed":
        cluster.close()  # Close the local cluster

    duration = time() - start
    print(f"Extracted features in {duration} seconds\n")
    return duration

In [None]:
#Comparison between different dask schedulers to see which is best with available Dask schedulers
schedulers = ["synchronous", "threads", "processes", "distributed"]
execution_times = []
for scheduler in schedulers:
    execution_time = try_scheduler(scheduler, files[:SUBSET_SIZE])
    execution_times.append(execution_time)

Scheduler: synchronous


In [None]:
#Calculating the speedup achieved by using parallel on feature extraction
parallel_extraction_time = min(execution_times)
speedup_feature_extraction = serial_extraction_time / parallel_extraction_time

print("Speedup achieved for feature extraction: ", round(speedup_feature_extraction))

#Calculating the efficiency achieved by using parallel on feature extraction
efficiency_feature_extraction = speedup_feature_extraction / n_workers

print("Efficiency achieved for feature extraction: ", round(efficiency_feature_extraction))

In [None]:
#Plotting the execution time comparison for each scheduler
plt.figure(figsize=(10, 6))

bars = plt.bar(schedulers, execution_times, color='skyblue')
plt.xlabel('Schedulers')
plt.ylabel('Time (seconds)')
plt.title('Extraction Time comparison of Dask Schedulers for 50 audio files')

#Displaying time on top of each bar
for bar, time_value in zip(bars, execution_times):
    plt.text(bar.get_x() + bar.get_width() / 2, bar.get_height(), f'{time_value:.2f}', 
             ha='center', va='bottom')

plt.show()

In [16]:
#Parallel feature extraction using the Dask Distributed Scheduler
def extract_features_distributed(files, n_workers):
    # cluster = LocalCluster()
    client = Client(n_workers=5, threads_per_worker=2)
    
    res = []
    for file_path in files:
        f = delayed(extract_features)(file_path)
        res.append(f)

    extracted_features = delayed(pd.concat)(res)
    with Profiler() as prof, ResourceProfiler() as rprof, CacheProfiler() as cprof:
        df = compute(extracted_features)
    visualize([prof, rprof, cprof])
    
    client.close()
    # cluster.close()
    
    return df

In [None]:
#Range of CPUs to test
num_workers = [7, 10, 14, 20, 28]

#List to store execution times
execution_times = []

#Extracting features for each configuration and recording execution time
for n_workers in num_workers:
    start = time()
    df = extract_features_distributed(files, n_workers)
    duration = time() - start
    execution_times.append(duration)
    print(f"FEATURE EXTRACTION: Number of CPUs: {n_workers}\tTime taken: {duration:.2f} seconds")

In [17]:
#FINAL
# try_scheduler("distributed", files[:SUBSET_SIZE])
# for file_path in files:
#     f = extract_features(file_path)
#     res.append(f)
#     break
df = extract_features_distributed(files, 4)

Logs for tcp://127.0.0.1:50052:

(('INFO', '2024-10-24 16:40:41,830 - distributed.worker - INFO - -------------------------------------------------'), ('INFO', '2024-10-24 16:40:41,830 - distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:50039'), ('INFO', '2024-10-24 16:40:41,830 - distributed.worker - INFO - Starting Worker plugin shuffle'), ('INFO', '2024-10-24 16:40:41,647 - distributed.worker - INFO - -------------------------------------------------'), ('INFO', '2024-10-24 16:40:41,647 - distributed.worker - INFO -       Local Directory: /var/folders/s0/my61_z_12tn1l1j8crj94kmw0000gn/T/dask-scratch-space/worker-0daxmw8m'), ('INFO', '2024-10-24 16:40:41,647 - distributed.worker - INFO -                Memory:                   3.20 GiB'), ('INFO', '2024-10-24 16:40:41,647 - distributed.worker - INFO -               Threads:                          2'), ('INFO', '2024-10-24 16:40:41,647 - distributed.worker - INFO - ---------------------------------------------

  return pitch_tuning(
  return pitch_tuning(
  return pitch_tuning(


In [None]:
# Plotting execution times as a bar chart with equal spacing
plt.figure(figsize=(10, 6))
bar_width = 0.5  # Adjust the width of the bars as needed
x_ticks = np.arange(len(num_workers))  # Position of the ticks

bars = plt.bar(x_ticks, execution_times, color='skyblue', width=bar_width)
plt.xlabel('Number of CPUs')
plt.ylabel('Execution Time (seconds)')
plt.title('Feature Extraction Times by Number of CPUs (Lower is better)')
plt.xticks(x_ticks, num_workers)

# Displaying time on top of each bar
for bar, time_value in zip(bars, execution_times):
    plt.text(bar.get_x() + bar.get_width() / 2, bar.get_height(), f'{time_value:.2f}', 
             ha='center', va='bottom')

plt.show()

In [None]:
res = []
for file_path in files[:10]:
    f = delayed(extract_features)(file_path)
    res.append(f)

extracted_features = delayed(pd.concat)(res)
extracted_features.visualize()

<h3>Dask Task Graph</h3>

In [None]:
# Save as csv file
data = df[0]
data.to_csv(outfile_path, index=False)

<h2>Exploratory Data Analysis and Data Preprocessing</h2>

In [None]:
from sklearn.preprocessing import MinMaxScaler, LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split

In [None]:
data = pd.read_csv(outfile_path)
data.head()

In [None]:
data.info()

In [None]:
data.describe()

In [None]:
data.shape

In [None]:
data.dtypes

In [None]:
# Display count of data rows for each genre
genre_counts = data['label'].value_counts()
print(genre_counts)

In [None]:
#checking for missing value
data.isna().sum()

In [None]:
#Computing the correlation matrix between the features of the dataset
correlation_matrix = data.iloc[:, 1:-1].corr()

#Plot
plt.figure(figsize=(16, 11))
color_palette = sns.color_palette("viridis", as_cmap=True)

#Drawing the heatmap
sns.heatmap(correlation_matrix, cmap=color_palette, mask=np.triu(correlation_matrix), vmax=0.3, center=0, square=True, linewidths=0.5, cbar_kws={"shrink":0.5})

plt.title('Feature Correlation Heatmap', fontsize=25)
plt.xticks(fontsize=10)
plt.yticks(fontsize=10)
plt.savefig("Feature_Correlation_Heatmap.jpg")
plt.show()

In [None]:
#Splitting data into features(X) and target (y)
target_column = 'label'
features = data.drop(columns=[target_column, 'file_name'])
target = data[target_column]

# Standardizing features to have a mean of 0 and variance of 1
scaler = StandardScaler()
features_normalized_df = scaler.fit_transform(features.astype(float))

#Updating variables for clarity
X = features_normalized_df
y = target

In [None]:
#Encoding target labels
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)

In [None]:
X

In [None]:
#70-30 split
X_train, X_test, y_train, y_test = train_test_split(X, y_encoded, test_size=0.3, random_state=42, stratify=y)

In [None]:
len(y_train)

In [None]:
len(y_test)

<h2>Training and Classifying</h2>

In [None]:
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier
from xgboost import XGBClassifier, XGBRFClassifier
from xgboost import plot_tree, plot_importance

from sklearn.metrics import confusion_matrix, accuracy_score, roc_auc_score, roc_curve, classification_report
from sklearn.feature_selection import RFE

import xgboost.dask as bst
import xgboost as xgb

In [None]:
def evaluate_model(model, model_name = "", cr = False):
    model.fit(X_train, y_train)
    y_hat = model.predict(X_test)
    print('Accuracy', model_name, ':', round(accuracy_score(y_test, y_hat), 5), '\n')
    if cr:
        target_names = sorted(set(label_encoder.inverse_transform(y_encoded)))
        print(classification_report(y_test, y_hat, target_names=target_names))

In [None]:
knn = KNeighborsClassifier(n_neighbors=19)
evaluate_model(knn, "KNN")

rforest = RandomForestClassifier(n_estimators=1000, max_depth=10, random_state=0)
evaluate_model(rforest, "Random Forest")

svm = SVC(decision_function_shape="ovo")
evaluate_model(svm, "SVM")

xgb = XGBClassifier(n_estimators=1000)
evaluate_model(xgb, "XGBoost Classifier")

In [None]:
model = XGBClassifier(n_estimators=1000)
model.fit(X_train, y_train, eval_metric='merror')

In [None]:
from sklearn.metrics import classification_report

#Predictions on the training set
train_predictions = model.predict(X_train)
print("Training set predictions:", train_predictions)

#Predictions on the testing set
test_predictions = model.predict(X_test)
print("Testing set predictions:", test_predictions)

target_names = sorted(set(label_encoder.inverse_transform(y_encoded)))

#Training accuracy
training_accuracy = accuracy_score(y_train, train_predictions)
print(f'Training accuracy: {training_accuracy}')

#Classification report for the training set
print("Training Classification Report:")
train_classification_report = classification_report(y_train, train_predictions, target_names=target_names)
print(train_classification_report)

#Testing accuracy
testing_accuracy = accuracy_score(y_test, test_predictions)
print(f'Testing accuracy: {testing_accuracy}')

#Classification report for the testing set
print("Testing Classification Report:")
test_classification_report = classification_report(y_test, test_predictions, target_names=target_names)
print(test_classification_report)

In [None]:
# Compute the confusion matrix
conf_matrix = confusion_matrix(y_test, test_predictions)

# Setting up the plot figure
plt.figure(figsize=(16, 9))

# Plotting the heatmap
plt_labels = sorted(set(y))
sns.heatmap(conf_matrix, cmap="BuPu", annot=True, xticklabels=plt_labels, yticklabels=plt_labels)

# Show plot
plt.show()

<h2>Testing parallelization for model training</h2>

In [None]:
from joblib import parallel_backend
from dask.distributed import Client
from dask import array as da

In [None]:
# serial execution time for baseline
start = time()
model = XGBClassifier(n_estimators=1000)
evaluate_model(model)
serial_execution_time = time()-start
print(f'serial XGBoost on took {serial_execution_time} seconds')

<h3>Try Dask for parallelization</h3>

In [None]:
# dict to hold execution times with number of cpu as key
dask_parallel_times = {}
# create dask array with appropriate chunk size
da_X_train, da_y_train = da.from_array(X_train, chunks=(1000,108)), da.from_array(y_train, chunks=(1000,))
da_X_test, da_y_test = da.from_array(X_test, chunks=(100,108)), da.from_array(y_test, chunks=(100))
da_X_train

In [None]:
start = time()
with Client(n_workers=2) as client:
    model = XGBClassifier(n_estimators=1000)
    model.client = client
    model.fit(da_X_train, da_y_train)
duration = time()-start
dask_parallel_times[2] = duration
print(f'Dask XGBoost on 2 CPUs took {duration} seconds')

In [None]:
start = time()
with Client(n_workers=4) as client:
    model = XGBClassifier(n_estimators=1000)
    model.client = client
    model.fit(da_X_train, da_y_train)
duration = time()-start
dask_parallel_times[4] = duration
print(f'Dask XGBoost on 4 CPUs took {duration} seconds')

In [None]:
start = time()
with Client(n_workers=7) as client:
    model = XGBClassifier(n_estimators=1000)
    model.client = client
    model.fit(da_X_train, da_y_train)
duration = time()-start
dask_parallel_times[7] = duration
print(f'Dask XGBoost on 7 CPUs took {duration} seconds')

In [None]:
start = time()
with Client(n_workers=14) as client:
    model = XGBClassifier(n_estimators=1000)
    model.client = client
    model.fit(da_X_train, da_y_train)
duration = time()-start
dask_parallel_times[14] = duration
print(f'Dask XGBoost on 14 CPUs took {duration} seconds')

In [None]:
start = time()
with Client(n_workers=21) as client:
    model = XGBClassifier(n_estimators=1000)
    model.client = client
    model.fit(da_X_train, da_y_train)
duration = time()-start
dask_parallel_times[21] = duration
print(f'Dask XGBoost on 21 CPUs took {duration} seconds')

In [None]:
start = time()
with Client(n_workers=28) as client:
    model = XGBClassifier(n_estimators=1000)
    model.client = client
    model.fit(da_X_train, da_y_train)
duration = time()-start
dask_parallel_times[28] = duration
print(f'Dask XGBoost on 28 CPUs took {duration} seconds')

<h3>Try Joblib for parallelization</h3>

In [None]:
joblib_parallel_times = {}

In [None]:
start = time()
with parallel_backend('threading', n_jobs=2):
    model = XGBClassifier(n_estimators=1000)
    model.fit(X_train, y_train)
duration = time()-start
joblib_parallel_times[2] = duration
print(f'joblib XGBoost on 2 CPUs took {duration} seconds')

In [None]:
start = time()
with parallel_backend('threading', n_jobs=4):
    model = XGBClassifier(n_estimators=1000)
    model.fit(X_train, y_train)
duration = time()-start
joblib_parallel_times[4] = duration
print(f'joblib XGBoost on 4 CPUs took {duration} seconds')

In [None]:
start = time()
with parallel_backend('threading', n_jobs=7):
    model = XGBClassifier(n_estimators=1000)
    model.fit(X_train, y_train)
duration = time()-start
joblib_parallel_times[7] = duration
print(f'joblib XGBoost on 7 CPUs took {duration} seconds')

In [None]:
start = time()
with parallel_backend('threading', n_jobs=14):
    model = XGBClassifier(n_estimators=1000)
    model.fit(X_train, y_train)
duration = time()-start
joblib_parallel_times[14] = duration
print(f'joblib XGBoost on 14 CPUs took {duration} seconds')

In [None]:
start = time()
with parallel_backend('threading', n_jobs=21):
    model = XGBClassifier(n_estimators=1000)
    model.fit(X_train, y_train)
duration = time()-start
joblib_parallel_times[21] = duration
print(f'joblib XGBoost on 21 CPUs took {duration} seconds')

In [None]:
start = time()
with parallel_backend('threading', n_jobs=28):
    model = XGBClassifier(n_estimators=1000)
    model.fit(X_train, y_train)
duration = time()-start
joblib_parallel_times[28] = duration
print(f'joblib XGBoost on 28 CPUs took {duration} seconds')

In [None]:
#finding the min times among dask and joblib
fastest_dask = min(dask_parallel_times, key=dask_parallel_times.get)
fastest_joblib = min(joblib_parallel_times, key=joblib_parallel_times.get)
fastest_parallel = min(dask_parallel_times.get(fastest_dask), dask_parallel_times.get(fastest_joblib))
fastest_cpu = -1
if fastest_parallel == dask_parallel_times.get(fastest_dask):
    fastest_cpu = fastest_dask
else:
    fastest_cpu = fastest_joblib

In [None]:
#Calculating the speedup achieved by using parallel on xgboost
speedup = {}
speedup_xgboost = serial_execution_time / fastest_parallel

print("Speedup achieved for xgboost training: ", round(speedup_xgboost,2),"X")

# #Calculating the efficiency achieved by using parallel on feature extraction
efficiency_xgboost = speedup_xgboost / fastest_cpu

print("Efficiency achieved for xgboost training: ", round(efficiency_xgboost,2), "%")

In [None]:
execution_times = []
cpus = []
for key in joblib_parallel_times:
    cpus.append(key)
    execution_times.append(joblib_parallel_times.get(key))

In [None]:
print(dask_parallel_times)
print(joblib_parallel_times)

In [None]:
num_cpus = joblib_parallel_times.keys()
parallel_techniques = {
    'Joblib': joblib_parallel_times.values(),
    'Dask': dask_parallel_times.values(),
}

x = np.arange(len(num_cpus))  # the label locations
width = 0.25  # the width of the bars
multiplier = 0

fig, ax = plt.subplots(layout='constrained')

for attribute, exec_time in parallel_techniques.items():
    offset = width * multiplier
    rects = ax.bar(x + offset, exec_time, width, label=attribute)
    multiplier += 1

# Add some text for labels, title and custom x-axis tick labels, etc.
ax.set_xlabel('Number of CPUs')
ax.set_ylabel('Execution Time (seconds)')
ax.set_title('Execution Time Comparison: Joblib vs Dask')
ax.set_xticks(x + width, num_cpus)
ax.legend(loc='upper left', ncols=2)

plt.show()

<h2>Tuning XGBoost Model</h2>

In [None]:
from dask_ml.model_selection import GridSearchCV
from joblib import parallel_backend

In [None]:
X_train = pd.DataFrame(X_train)
y_train = pd.Series(y_train)

dd_X_train = dd.from_pandas(X_train, npartitions=4)
dd_y_train = dd.from_pandas(y_train, npartitions=4)

In [None]:
xgb = XGBClassifier(n_estimators=1000)

In [None]:
param_grid = {
    'learning_rate': [0.05, 0.1, 0.2],
}

In [None]:
grid_search = GridSearchCV(xgb, param_grid, cv=3)
with Client(n_workers=16) as client:
    grid_search.fit(dd_X_train, dd_y_train)

In [None]:
model = XGBClassifier(n_estimators=1000, learning_rate=0.05)
evaluate_model(model)