In [None]:
# Usual Libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns

import IPython.display as ipd
import warnings
warnings.filterwarnings('ignore')
import os
import glob
import librosa
import librosa.display
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, confusion_matrix
import joblib
import mirdata
from xgboost import XGBRegressor
from sklearn.multioutput import MultiOutputRegressor
from sklearn.metrics import mean_squared_error

In [13]:
SR = 22050
SEGMENT_SEC = 1
NUM_SEGMENTS = 8 
AUDIO_LEN = (SR * SEGMENT_SEC)
N_MELS = 64
N_FFT = 1024
HOP_LENGTH = 256   # ~12ms at 22050Hz
WIN_LENGTH = 1024  # ~46ms at 22050Hz

In [5]:
dataset_path = './deam'
# Define the paths for "DEAM_audio/MEMD_audio" and the static annotations CSV
audio_dir = os.path.join(dataset_path, 'DEAM_audio', 'MEMD_audio')
dynamic_dir= os.path.join(dataset_path, 'DEAM_Annotations', 'annotations', 
                          'annotations averaged per song', 'dynamic (per second annotations)')

# Print to verify the paths
print("Audio Directory Path:", audio_dir)
print("Dynamic directory Path:", dynamic_dir)

# Check if the paths exist
if os.path.exists(audio_dir):
    print("Audio directory exists.")
else:
    print("Audio directory does not exist!")

if os.path.exists(dynamic_dir):
    print("Dynamic directory exists.")
else:
    print("Dynamic directory does not exist!")

files = os.listdir(dynamic_dir)
print("dynamic files: ")
print(files)

Audio Directory Path: ./deam\DEAM_audio\MEMD_audio
Dynamic directory Path: ./deam\DEAM_Annotations\annotations\annotations averaged per song\dynamic (per second annotations)
Audio directory exists.
Dynamic directory exists.
dynamic files: 
['arousal.csv', 'valence.csv']


In [6]:
arousal_file = os.path.join(dynamic_dir,'arousal.csv')
valence_file = os.path.join(dynamic_dir,'valence.csv')

PREPROCESSING

In [52]:
# Function to extract features from a 1-second audio segment
def extract_genre_segment_features(y_seg, sr=SR):
    """Extract features from a 1-second audio segment."""
    features = {}

    # length
    features['length'] = len(y_seg)

    # chroma_stft
    chroma_stft = librosa.feature.chroma_stft(y=y_seg, sr=sr)
    features['chroma_stft_mean'] = chroma_stft.mean()
    features['chroma_stft_var'] = chroma_stft.var()

    # rms
    rms = librosa.feature.rms(y=y_seg)
    features['rms_mean'] = rms.mean()
    features['rms_var'] = rms.var()

    # spectral_centroid
    spec_cent = librosa.feature.spectral_centroid(y=y_seg, sr=sr)
    features['spectral_centroid_mean'] = spec_cent.mean()
    features['spectral_centroid_var'] = spec_cent.var()

    # spectral_bandwidth
    spec_bw = librosa.feature.spectral_bandwidth(y=y_seg, sr=sr)
    features['spectral_bandwidth_mean'] = spec_bw.mean()
    features['spectral_bandwidth_var'] = spec_bw.var()

    # rolloff
    rolloff = librosa.feature.spectral_rolloff(y=y_seg, sr=sr)
    features['rolloff_mean'] = rolloff.mean()
    features['rolloff_var'] = rolloff.var()

    # zero_crossing_rate
    zcr = librosa.feature.zero_crossing_rate(y_seg)
    features['zero_crossing_rate_mean'] = zcr.mean()
    features['zero_crossing_rate_var'] = zcr.var()

    # harmony & perceptr
    try:
        harmony, perceptr = librosa.effects.hpss(y_seg)
        features['harmony_mean'] = harmony.mean()
        features['harmony_var'] = harmony.var()
        features['perceptr_mean'] = perceptr.mean()
        features['perceptr_var'] = perceptr.var()
    except Exception:
        features['harmony_mean'] = 0
        features['harmony_var'] = 0
        features['perceptr_mean'] = 0
        features['perceptr_var'] = 0

    # tempo
    tempo, _ = librosa.beat.beat_track(y=y_seg, sr=sr)
    features['tempo'] = tempo

    # MFCCs (20 coefficients, mean + var = 40 total)
    mfcc = librosa.feature.mfcc(y=y_seg, sr=sr, n_mfcc=20)
    for i in range(1, 21):
        features[f'mfcc{i}_mean'] = mfcc[i-1].mean()
        features[f'mfcc{i}_var'] = mfcc[i-1].var()
    
    return features

In [30]:
def extract_pitch_segment_features(y_seg, sr=SR):
    """Extract features from a 1-second audio segment."""
    # pitch
    fmin = 80.0
    fmax = 2000.0
    pitch = librosa.yin(y_seg, fmin=fmin, fmax=fmax, sr=sr)
    if len(pitch) > 0:
        return np.mean(pitch), np.var(pitch)
    else:
        return 0, 0

In [None]:
# Load dynamic annotations
arousal_df = pd.read_csv(arousal_file, index_col=0)
valence_df = pd.read_csv(valence_file, index_col=0)

print("Arousal shape:", arousal_df.shape)
print("Valence shape:", valence_df.shape)
print("Sample arousal columns:", arousal_df.columns[:10])
print("Sample valence columns:", valence_df.columns[:10])

Arousal shape: (1802, 1224)
Valence shape: (1802, 1223)
Sample arousal columns: Index(['sample_15000ms', 'sample_15500ms', 'sample_16000ms', 'sample_16500ms',
       'sample_17000ms'],
      dtype='object')
Sample valence columns: Index(['sample_15000ms', 'sample_15500ms', 'sample_16000ms', 'sample_16500ms',
       'sample_17000ms'],
      dtype='object')


In [98]:
import os

genre_path = './genre_classification/genre_mlp_model.pkl'
if os.path.exists(genre_path):
    genre_pipeline = joblib.load(genre_path)
    print("Loaded genre pipeline")
else:
    print("Genre file not found, skipping")

min_max_scaler = joblib.load('./genre_classification/gztan_minmax_scaler.pkl')

Loaded genre pipeline


In [99]:
print(min_max_scaler)

MinMaxScaler()


In [102]:
import glob
import librosa
import numpy as np
import os

all_data = []

audio_files = glob.glob(os.path.join(audio_dir, '*.mp3'))
print(f"Found {len(audio_files)} audio files")

def get_song_id(filepath):
    filename = os.path.basename(filepath)
    return int(filename.split('.')[0])  # assuming format like '1.mp3'

for audio_file in audio_files:
    song_id = get_song_id(audio_file)
    print(f"Processing song {song_id}")

    # Skip if no annotations
    if song_id not in arousal_df.index or song_id not in valence_df.index:
        print(f"Skipping song {song_id}, no annotations")
        continue

    arousal_list = list(arousal_df.loc[song_id].values)
    valence_list = list(valence_df.loc[song_id].values)

    # Load audio
    y, _ = librosa.load(audio_file, sr=SR)
    duration = len(y) / SR
    num_segments = int(duration // SEGMENT_SEC)

    # Start from segment 15 (15s) to ensure annotations exist
    for i in range(15, num_segments):
        start = int(i * SEGMENT_SEC * SR)
        end = int((i + 1) * SEGMENT_SEC * SR)
        y_seg = y[start:end]

        if len(y_seg) < SEGMENT_SEC * SR * 0.8:
            continue

        features = extract_genre_segment_features(y_seg, sr=SR)

        # Predict genre
        feat_values = np.array(list(features.values())).reshape(1, -1)
        feat_scaled = min_max_scaler.transform(feat_values)
        features['predicted_genre'] = genre_pipeline.predict(feat_scaled)[0]

        # Rename chroma features
        if 'chroma_stft_mean' in features:
            features['chord_mean'] = features.pop('chroma_stft_mean')
        if 'chroma_stft_var' in features:
            features['chord_var'] = features.pop('chroma_stft_var')

        # Add pitch features
        features['pitch_mean'], features['pitch_var'] = extract_pitch_segment_features(y_seg, sr=SR)

        # Add song_id, segment index
        features['song_id'] = song_id
        features['time_sec'] = i

        # Add annotations (arousal/valence)
        idx = int((i - 15) / 2)  # annotation every 0.5s
        if idx < len(arousal_list):
            features['arousal'] = arousal_list[idx]
            features['valence'] = valence_list[idx]
        else:
            continue  # skip segment nếu không có annotation

        all_data.append(features)


Found 1802 audio files
Processing song 10
Processing song 1000
Processing song 1001
Processing song 1002
Processing song 1003
Processing song 1004
Processing song 1005
Processing song 1006
Processing song 1007
Processing song 1008
Processing song 1009
Processing song 101
Processing song 1010
Processing song 1011
Processing song 1012
Processing song 1013
Processing song 1014
Processing song 1015
Processing song 1016
Processing song 1017
Processing song 1018
Processing song 1019
Processing song 102
Processing song 1020
Processing song 1021
Processing song 1022
Processing song 1023
Processing song 1024
Processing song 1025
Processing song 1026
Processing song 1027
Processing song 1028
Processing song 1029
Processing song 103
Processing song 1030
Processing song 1031
Processing song 1032
Processing song 1033
Processing song 1034
Processing song 1035
Processing song 1036
Processing song 1037
Processing song 1038
Processing song 1039
Processing song 104
Processing song 1040
Processing song 1

In [103]:
# Create Data frame
features_df = pd.DataFrame(all_data)
print("Extracted features shape:", features_df.shape)
print("Unique predicted genres:", features_df['predicted_genre'].unique())
features_df.head()
# Save to CSV
features_df.to_csv('deam_features.csv', index=False)

Extracted features shape: (64983, 65)
Unique predicted genres: [np.str_('classical') np.str_('country') np.str_('rock') np.str_('pop')
 np.str_('reggae') np.str_('disco') np.str_('hiphop') np.str_('jazz')
 np.str_('blues') np.str_('metal')]


In [107]:
from sklearn.preprocessing import OneHotEncoder
# Load dataset
df = pd.read_csv("deam_features.csv") 

# Tách target
y = df[["arousal", "valence"]].values  # 2 outputs

# Tách input features, giữ DataFrame
X = df.drop(columns=["arousal", "valence"])  # <-- KHÔNG .values

# -------------------------------
# Split dataset
# -------------------------------
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# Lấy cột genre
genre_train = X_train[['predicted_genre']]
genre_test  = X_test[['predicted_genre']]

# One-hot encode
encoder = OneHotEncoder(sparse_output=False, handle_unknown="ignore")
genre_encoded_train = encoder.fit_transform(genre_train)
genre_encoded_test  = encoder.transform(genre_test)

# Numeric features
X_train_numeric = X_train.drop(columns=['predicted_genre'])
X_test_numeric  = X_test.drop(columns=['predicted_genre'])

# Ghép numeric + one-hot genre
X_train_encoded = np.hstack([X_train_numeric.values, genre_encoded_train])
X_test_encoded  = np.hstack([X_test_numeric.values, genre_encoded_test])

In [108]:
from sklearn.preprocessing import StandardScaler
from sklearn.neural_network import MLPRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.pipeline import Pipeline

# -------------------------------
# Define MLP Regressor
# -------------------------------
mlp_pipeline = Pipeline([
    ("scaler", StandardScaler()),
    ("mlp", MLPRegressor(random_state=42))
])

mlp_param_grid = {
    "mlp__hidden_layer_sizes": [(256, 128, ), (64, 32, ), (128, 64, 32)],
    "mlp__activation": ["relu"],
    "mlp__solver": ["adam"], 
    "mlp__alpha": [1e-4, 1e-3, 1e-2],
    "mlp__learning_rate_init": [1e-3, 5e-4],
    "mlp__max_iter": [200, 300, 350]
}

mlp_grid = GridSearchCV(
    mlp_pipeline,
    mlp_param_grid,
    cv=5,
    n_jobs=-1,
    verbose=1
)

mlp_grid.fit(X_train_encoded, y_train)

print("Best MLP params:", mlp_grid.best_params_)

best_mlp = mlp_grid.best_estimator_
y_pred = best_mlp.predict(X_test_encoded)

mse = mean_squared_error(y_test, y_pred)
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)

print(f"Test MSE: {mse:.4f}")
print(f"Test MAE: {mae:.4f}")
print(f"Test R2 score: {r2:.4f}")

# -------------------------------
# Save pipeline
# -------------------------------
joblib.dump(best_mlp, "mlp_emotion_regressor.pkl")
print("Saved MLP regressor pipeline (includes scaler).")

Fitting 5 folds for each of 54 candidates, totalling 270 fits
Best MLP params: {'mlp__activation': 'relu', 'mlp__alpha': 0.01, 'mlp__hidden_layer_sizes': (256, 128), 'mlp__learning_rate_init': 0.001, 'mlp__max_iter': 200, 'mlp__solver': 'adam'}
Test MSE: 0.0177
Test MAE: 0.1010
Test R2 score: 0.7025
Saved MLP regressor pipeline (includes scaler).


In [117]:
def map_av_to_emotion(arousal, valence, threshold=0.05):
    if abs(arousal) < threshold and abs(valence) < threshold:
        return "Neutral"
    elif arousal >= 0 and valence >= 0:
        return "Happy/Excited"
    elif arousal >= 0 and valence < 0:
        return "Angry/Tense"
    elif arousal < 0 and valence >= 0:
        return "Relaxed/Calm"
    else:
        return "Sad/Depressed"


In [113]:
joblib.dump(encoder, "emotion_label_encoder.pkl")

['emotion_label_encoder.pkl']

In [119]:
import glob
from pathlib import Path
import os

test_path = r"C:\workspace\HMUD\kh"
mp3_files = glob.glob(os.path.join(test_path, "*.mp3"))

pipeline = joblib.load("mlp_emotion_regressor.pkl")

genre_encoder = joblib.load("emotion_label_encoder.pkl")

min_max_scaler = joblib.load('./genre_classification/gztan_minmax_scaler.pkl')
genre_pipeline = joblib.load('./genre_classification/genre_mlp_model.pkl')


for idx, mp3_file in enumerate(mp3_files):
    if not Path(mp3_file).exists():
        print("File not found:", mp3_file)
        continue

    y, sr = librosa.load(mp3_file, sr=SR)
    num_segments = int(len(y) / SR)  # số segment 1 giây
    segment_preds = []

    for i in range(num_segments):
        start = i * SR
        end = (i + 1) * SR
        y_seg = y[start:end]
    
        feat = extract_genre_segment_features(y_seg)  # giống training
        features = extract_genre_segment_features(y_seg, sr=SR)

        # Predict genre
        feat_values = np.array(list(features.values())).reshape(1, -1)
        feat_scaled = min_max_scaler.transform(feat_values)
        genre_label = genre_pipeline.predict(feat_scaled)[0]
        encoded_genre = genre_encoder.transform([[genre_label]])[0]

        # Rename chroma features
        if 'chroma_stft_mean' in features:
            features['chord_mean'] = features.pop('chroma_stft_mean')
        if 'chroma_stft_var' in features:
            features['chord_var'] = features.pop('chroma_stft_var')

        # Add pitch features
        features['pitch_mean'], features['pitch_var'] = extract_pitch_segment_features(y_seg, sr=SR)

        # Add song_id, segment index
        features['song_id'] = idx
        features['time_sec'] = i

        X_mp3_segment = np.hstack([np.array(list(features.values())), encoded_genre]).reshape(1, -1)
        segment_pred = pipeline.predict(X_mp3_segment)[0]  # shape: (2,)

        # Add song_id, segment index
        segment_preds.append(segment_pred)  # shape: (2,)

    # Average predictions over all segments
    segment_preds = np.array(segment_preds)  # shape: [n_segments, 2]
    print(f"Predictions for file: {mp3_file}")
    print(segment_preds)

    song_arousal = np.median(segment_preds[:, 0])
    song_valence = np.median(segment_preds[:, 1])

    print(f"Song-level emotion: Arousal={song_arousal:.3f}, Valence={song_valence:.3f}")

    emotion_label = map_av_to_emotion(song_arousal, song_valence)
    print(emotion_label)


Predictions for file: C:\workspace\HMUD\kh\fashion.mp3
[[ 1.14036815e-01  3.30586178e-01]
 [-1.86101087e-01 -1.14863176e-01]
 [ 1.44051825e-01 -9.22024419e-02]
 [ 2.02115738e-01 -5.92926135e-02]
 [-3.64248960e-02 -8.79883242e-02]
 [ 5.77997346e-02 -4.03184195e-02]
 [ 8.68244241e-02 -4.54686650e-04]
 [ 1.14566585e-01  2.87214685e-04]
 [ 1.87300833e-01  2.39558091e-02]
 [ 5.92115420e-02 -1.24071634e-01]
 [ 2.05324755e-01 -4.35792077e-03]
 [ 1.15970844e-01 -7.47278834e-02]
 [ 1.23407856e-02 -1.57672464e-01]
 [ 1.26358662e-01  1.11872469e-01]]
Song-level emotion: Arousal=0.114, Valence=-0.050
Angry/Tense
Predictions for file: C:\workspace\HMUD\kh\go.mp3
[[ 0.33089554  0.48952748]
 [ 0.25670368  0.23104914]
 [ 0.16730262  0.11169463]
 [ 0.13771512  0.13723586]
 [-0.04572613  0.08195824]
 [-0.04482995 -0.09983672]
 [ 0.19932398  0.11594545]
 [ 0.36129661  0.17417947]]
Song-level emotion: Arousal=0.183, Valence=0.127
Happy/Excited
Predictions for file: C:\workspace\HMUD\kh\indianCM.mp3
[[ 0.0

In [None]:
def quantize_int8(arr):
    """
    Symmetric per-tensor int8 quantization
    return: q_arr, scale
    """
    max_val = np.max(np.abs(arr))
    scale = max_val / 127.0 if max_val != 0 else 1.0
    q = np.round(arr / scale).astype(np.int8)
    return q, scale

In [None]:
best_mlp = joblib.load("mlp_emotion_regressor.pkl")

mlp_model = best_mlp.named_steps["mlp"]
mlp_scaler = best_mlp.named_steps["scaler"]

# Extract weights and biases for all layers
coefs = mlp_model.coefs_
intercepts = mlp_model.intercepts_
n_layers = len(coefs)  # Total layers (input + hidden + output)

# Quantize all weights and biases
q_coefs = []
s_coefs = []
q_intercepts = []
s_intercepts = []
for i in range(n_layers):
    qW, sW = quantize_int8(coefs[i])
    qb, sb = quantize_int8(intercepts[i])
    q_coefs.append(qW)
    s_coefs.append(sW)
    q_intercepts.append(qb)
    s_intercepts.append(sb)

mean = mlp_scaler.mean_.astype(np.float32)
scale = mlp_scaler.scale_.astype(np.float32)

with open("emotion_mlp.h", "w") as f:
    f.write("// INT8 MLP (multi-layer) for ESP32\n")
    f.write("#pragma once\n")
    f.write("#include <stdint.h>\n\n")
    
    # Defines
    f.write(f"#define N_FEATURES {coefs[0].shape[0]}\n")
    f.write(f"#define N_CLASSES {coefs[-1].shape[1]}\n")  # Correct: Use output layer size
    f.write(f"#define N_HIDDEN_LAYERS {n_layers - 1}\n")  # Number of hidden layers
    
    # Hidden layer sizes (array for dynamic access)
    hidden_sizes = [coefs[i].shape[1] for i in range(n_layers - 1)]  # Sizes of hidden layers
    f.write(f"static const int HIDDEN_SIZES[N_HIDDEN_LAYERS] = {{{', '.join(map(str, hidden_sizes))}}};\n\n")
    
    # Input scaler
    f.write("static const float INPUT_MEAN[N_FEATURES] = {" + ",".join(map(str, mean)) + "};\n")
    f.write("static const float INPUT_SCALE[N_FEATURES] = {" + ",".join(map(str, scale)) + "};\n\n")

    # Scales for weights and biases
    for i in range(n_layers):
        layer_name = f"LAYER_{i+1}" if i < n_layers - 1 else "OUTPUT"
        f.write(f"static const float W{i+1}_SCALE = {s_coefs[i]};\n")
        f.write(f"static const float B{i+1}_SCALE = {s_intercepts[i]};\n")
    
    f.write("\n")
    
    # Weights (2D arrays)
    for i in range(n_layers):
        in_size = coefs[i].shape[0]
        out_size = coefs[i].shape[1]
        layer_name = f"W{i+1}"
        f.write(f"static const int8_t {layer_name}[{in_size}][{out_size}] = {{\n")
        for row in q_coefs[i]:
            f.write("{" + ",".join(map(str, row.tolist())) + "},\n")
        f.write("};\n\n")
    
    # Biases (1D arrays)
    for i in range(n_layers):
        out_size = coefs[i].shape[1]
        layer_name = f"B{i+1}"
        f.write(f"static const int8_t {layer_name}[{out_size}] = {{" + ",".join(map(str, q_intercepts[i].tolist())) + "}};\n\n")

print("Exported with corrected N_CLASSES and multi-layer support")

# ...existing code...