In [1]:
!pip install Pydub
!pip install PyWavelets
# !pip install openl3
import numpy as np
import librosa
import librosa.display
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import pywt
import pandas as pd
import os
from sklearn.preprocessing import StandardScaler, LabelEncoder
import audioread
from pydub import AudioSegment
import tensorflow as tf
#from tensorflow.keras import layers, models
#import openl3
import audioread
import tensorflow_hub as hub
from keras import layers, models, preprocessing
import keras.backend as K
from keras.preprocessing.sequence import pad_sequences
from keras import mixed_precision
from tensorflow.keras.optimizers import Adam, RMSprop
from tensorflow.keras.callbacks import LearningRateScheduler, EarlyStopping
import math
import random
import matplotlib.pyplot as plt



In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
# Load the datasets
#file_30_sec = 'data/GTZAN/features_30_sec.csv'
#file_3_sec = 'data/GTZAN/features_3_sec.csv'

file_30_sec = '/content/drive/MyDrive/Colab Notebooks/GTZAN/features_30_sec.csv'
file_3_sec = '/content/drive/MyDrive/Colab Notebooks/GTZAN/features_3_sec.csv'
audio_directory = '/content/drive/MyDrive/Colab Notebooks/GTZAN/genres_original'

df_30_sec = pd.read_csv(file_30_sec)
df_3_sec = pd.read_csv(file_3_sec)

In [4]:
# Load VGGish model from TensorFlow Hub
vggish_model = hub.load('https://tfhub.dev/google/vggish/1')

# Function to extract VGGish features with correct preprocessing
def extract_vggish_features(y, sr, file_path):
    try:
        # Resample the audio to 16 kHz for VGGish input
        y_resampled = librosa.resample(y, orig_sr=sr, target_sr=16000)

        # Ensure input is in float32 format
        y_resampled = np.array(y_resampled, dtype=np.float32)

        # VGGish expects an input with shape [num_samples], no batch dimension
        # Convert to tensor for the model
        y_tensor = tf.convert_to_tensor(y_resampled, dtype=tf.float32)

        # Extract features using VGGish model
        vggish_features = vggish_model(y_tensor)

        # If output is a tensor, convert it to numpy array
        if isinstance(vggish_features, tf.Tensor):
            vggish_features = vggish_features.numpy()

        # Flatten the features to use in ML models
        vggish_features_flattened = vggish_features.flatten()
    except Exception as e:
        print(f"Error extracting VGGish features from {file_path}: {e}")
        vggish_features_flattened = None

    return vggish_features_flattened

def extract_features(file_path, sr=22050, n_mels=128, wavelet='db1'):
    try:
        # Load audio file
        y, sr = librosa.load(file_path, sr=sr)
        # Ensure input is in float32 format
        y = np.array(y, dtype=np.float32)

        # Extract Mel spectrogram
        mel_spectrogram = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=n_mels)
        mel_spectrogram_db = librosa.power_to_db(mel_spectrogram, ref=np.max)
        mel_spectrogram_flattened = mel_spectrogram_db.flatten()  # Flatten for use in ML models

        # Extract wavelet features
        coeffs = pywt.wavedec(y, wavelet, level=5)
        wavelet_features = np.concatenate([np.array(c).flatten() for c in coeffs])

        # Extract VGGish features
        vggish_features_flattened = extract_vggish_features(y, sr, file_path)

        # Extract waveform features
        waveform = y
        if waveform.shape[0] % sr != 0:
            waveform = np.concatenate([waveform, np.zeros(sr)])
        inp = tf.constant( np.array([waveform]) , dtype='float32'  )

        # Combine all features independently into a dictionary (without chroma)
        features = {
            'mel_spectrogram': mel_spectrogram_flattened,
            'wavelet': wavelet_features,
            'vggish': vggish_features_flattened if vggish_features_flattened is not None else np.array([]),
            'waveform': waveform
        }
    except Exception as e:
        print(f"Error processing audio file {file_path}: {e}")
        features = {
            'mel_spectrogram': np.array([]),
            'wavelet': np.array([]),
            'vggish': np.array([])
        }

    return features

# Function to process the entire dataset and extract features
# and combine them with existing CSV features
def process_dataset(df, audio_directory):
    features = {
        'mel_spectrogram': [],
        'wavelet': [],
        'vggish': [],
        'waveform': [],
        'mfcc': [],
        'chroma': [],
        'rms': [],
        'spectral_centroid': [],
        'spectral_bandwidth': [],
        'zero_crossing_rate': [],
        'tempo': []
    }
    labels = []

    for idx, row in df.iterrows():
        file_path = os.path.join(audio_directory, row['label'], row['filename'])
        extracted_features = extract_features(file_path)

        # Check if all extracted features are empty
        if all(len(extracted_features[key]) == 0 for key in extracted_features):
            print(f"Skipping row {idx} due to audio extraction issues.")
            continue  # Optionally skip the row with extraction issues

        # Extract CSV features
        mfcc_features = row.filter(like='mfcc').values.astype(np.float32)
        chroma_features = row.filter(like='chroma').values.astype(np.float32)
        rms_features = row.filter(like='rms').values.astype(np.float32)
        spectral_centroid_features = row.filter(like='spectral_centroid').values.astype(np.float32)
        spectral_bandwidth_features = row.filter(like='spectral_bandwidth').values.astype(np.float32)
        zero_crossing_rate_features = row.filter(like='zero_crossing_rate').values.astype(np.float32)
        tempo_features = row.filter(like='tempo').values.astype(np.float32)

        # Add CSV features and extracted features to their respective lists
        features['mfcc'].append(mfcc_features)
        features['chroma'].append(chroma_features)
        features['rms'].append(rms_features)
        features['spectral_centroid'].append(spectral_centroid_features)
        features['spectral_bandwidth'].append(spectral_bandwidth_features)
        features['zero_crossing_rate'].append(zero_crossing_rate_features)
        features['tempo'].append(tempo_features)
        for key in extracted_features:
            features[key].append(extracted_features[key])

        # Add label
        labels.append(row['label'])

    # Convert features lists to NumPy arrays without padding/truncation
    for key in features:
        features[key] = np.array(features[key], dtype=object)  # Keep features as arrays of varying lengths

    labels = np.array(labels)

    return features, labels

# Function to standardize features (without padding or truncation)
def standardize_features(features):
    standardized_features = {}
    scalers = {}

    for key in features:
        standardized_features[key] = []
        scalers[key] = []
        for feature in features[key]:
            feature = feature.reshape(-1, 1)  # Reshape to 2D for StandardScaler
            scaler = StandardScaler()
            standardized_feature = scaler.fit_transform(feature).flatten()
            standardized_features[key].append(standardized_feature)
            scalers[key].append(scaler)  # Save scaler for future use (e.g., test data)
        standardized_features[key] = np.array(standardized_features[key], dtype=object)  # Keep as object array for varying lengths

    return standardized_features, scalers

In [None]:
features, labels = process_dataset(df_30_sec, audio_directory)

# Standardize features before splitting
standardized_features, scalers = standardize_features(features)

# Split dataset into training, validation, and test sets
train_features = {}
val_features = {}
test_features = {}
# Create a combined list of feature dictionaries and labels for consistent splitting
combined_data = list(zip([{key: features[key][i] for key in features} for i in range(len(labels))], labels))

# Split combined data
train_data, test_data = train_test_split(combined_data, test_size=0.2, random_state=42, stratify=labels)
train_data, val_data = train_test_split(train_data, test_size=0.25, random_state=42, stratify=[label for _, label in train_data])  # 0.25 * 0.8 = 0.2 for validation set

# Separate features and labels after splitting
for key in features:
    train_features[key] = [data[0][key] for data in train_data]
    val_features[key] = [data[0][key] for data in val_data]
    test_features[key] = [data[0][key] for data in test_data]

train_labels = np.array([data[1] for data in train_data])
val_labels = np.array([data[1] for data in val_data])
test_labels = np.array([data[1] for data in test_data])


  y, sr = librosa.load(file_path, sr=sr)
	Deprecated as of librosa version 0.10.0.
	It will be removed in librosa version 1.0.
  y, sr_native = __audioread_load(path, offset, duration, dtype)


Error processing audio file /content/drive/MyDrive/Colab Notebooks/GTZAN/genres_original/jazz/jazz.00054.wav: 
Skipping row 554 due to audio extraction issues.


In [None]:
SEED = 42
np.random.seed(SEED)
tf.random.set_seed(SEED)
random.seed(SEED)
os.environ['PYTHONHASHSEED'] = str(SEED)
os.environ['TF_DETERMINISTIC_OPS'] = '1'

# Encode labels as integers
label_encoder = LabelEncoder()
labels = label_encoder.fit_transform(labels)
train_labels = label_encoder.transform(train_labels)
val_labels = label_encoder.transform(val_labels)
test_labels = label_encoder.transform(test_labels)

In [None]:
train_data

In [None]:
# VGG-ish Model Implementation
import tensorflow_hub as hub
import numpy as np

classes = ['blues', 'classical', 'country', 'disco', 'hiphop', 'jazz', 'metal', 'pop', 'reggae', 'rock']

# Load the model.
model = hub.load('https://kaggle.com/models/google/vggish/frameworks/TensorFlow2/variations/vggish/versions/1')
# model = tf.saved_model.load('/kaggle/input/music-genre-classification-vggish-model/VGGish')

# Run the model, check the output.
pred_3s = model([w[0]['waveform'] for w in train_data])

print(pred_3s)


In [None]:
from sklearn.metrics import classification_report
print(classification_report(train_labels, np.argmax(pred_3s, axis=1)))

In [None]:
# Plotting model loss and accuracy for 30-sec and 3-sec
import matplotlib.pyplot as plt

def plot_losses(hist):

  fig, axs = plt.subplots(1,2, figsize=(12, 4))
  axs[0].plot(hist.history['loss'])
  axs[0].plot(hist.history['val_loss'])
  axs[0].set_title('Model Loss')
  axs[0].set_ylabel('Loss')
  axs[0].set_xlabel('Epoch')
  axs[0].legend(['Train', 'Test'], loc='upper right')

  axs[1].plot(hist.history['accuracy'])
  axs[1].plot(hist.history['val_accuracy'])
  axs[1].set_title('Model Accuracy')
  axs[1].set_ylabel('Accuracy')
  axs[1].set_xlabel('Epoch')
  axs[1].legend(['Train', 'Test'], loc='upper right')
  plt.show()

# Plot for 30-sec model
plot_losses(history_30s)

# Plot for 3-sec model
plot_losses(history_3s)