In [1]:
# === Imports ===
import os
import numpy as np
import librosa
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from sklearn.preprocessing import LabelEncoder
from imblearn.over_sampling import SMOTE
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input, LSTM, Bidirectional, Dropout, Dense, Masking
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.optimizers.schedules import ExponentialDecay

# === Paths ===
speech_path = '/Users/yathamlohithreddy/Desktop/vscodefloder /marsproject/Audio_Speech_Actors_01-24'
song_path = '/Users/yathamlohithreddy/Desktop/vscodefloder /marsproject/Audio_Song_Actors_01-24'

# === Settings ===
SAMPLE_RATE = 16000
max_len = 300  # timesteps
sequences, labels = [], []

# === Emotion Mapping ===
ravdess_emotion_map = {
    '01': 'neutral', '02': 'calm', '03': 'happy', '04': 'sad',
    '05': 'angry', '06': 'fearful', '07': 'disgust', '08': 'surprised'
}

# === Functions ===
def load_and_preprocess(file_path, sr=SAMPLE_RATE):
    y, _ = librosa.load(file_path, sr=sr)
    y, _ = librosa.effects.trim(y, top_db=30)
    return y / np.max(np.abs(y)) if np.max(np.abs(y)) > 0 else y

def extract_features(y, sr):
    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
    rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr)
    rms = librosa.feature.rms(y=y)
    zcr = librosa.feature.zero_crossing_rate(y=y)
    try:
        f0 = librosa.yin(y, sr=sr, fmin=librosa.note_to_hz('C2'), fmax=librosa.note_to_hz('C7'))
        f0 = f0.reshape(1, -1)
    except:
        f0 = np.zeros((1, mfcc.shape[1]))

    T = mfcc.shape[1]
    def resize(f): return f[:, :T] if f.shape[1] >= T else np.pad(f, ((0, 0), (0, T - f.shape[1])))
    all_features = np.vstack([mfcc, resize(rolloff), resize(rms), resize(zcr), resize(f0)])
    return all_features.T

def process_ravdess(path, tag=""):
    count = 0
    for actor_folder in os.listdir(path):
        folder_path = os.path.join(path, actor_folder)
        if not os.path.isdir(folder_path): continue
        for file in os.listdir(folder_path):
            if not file.endswith(".wav"): continue
            parts = file.split("-")
            if len(parts) < 3: continue
            label = ravdess_emotion_map.get(parts[2])
            if not label: continue

            file_path = os.path.join(folder_path, file)
            y = load_and_preprocess(file_path)
            features = extract_features(y, SAMPLE_RATE)
            sequences.append(features)
            labels.append(label)
            count += 1
    print(f"✔️ {tag} - Loaded {count} samples")

# === Load Dataset ===
process_ravdess(speech_path, tag="Speech")
process_ravdess(song_path, tag="Song")

# === Prepare Dataset ===
X = pad_sequences(sequences, maxlen=max_len, dtype='float32', padding='post', truncating='post')
le = LabelEncoder()
y = le.fit_transform(labels)

print("✅ Total samples:", len(labels))
print("✅ X shape:", X.shape)
print("✅ y shape:", y.shape)

# === Train-Val Split ===
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)

# === SMOTE Resampling ===
n_samples, timesteps, feat_dim = X_train.shape
X_flat = X_train.reshape(n_samples, timesteps * feat_dim)
X_flat_res, y_res = SMOTE(random_state=42).fit_resample(X_flat, y_train)
X_res = X_flat_res.reshape(X_flat_res.shape[0], timesteps, feat_dim)

# === Augmentation ===
def add_noise(X, noise_level=0.01):
    return X + np.random.normal(0, noise_level, X.shape)

def random_time_scaling(X, scale_range=(0.9, 1.1)):
    return np.array([seq * np.random.uniform(*scale_range) for seq in X])

def time_mask(X, max_mask_size=5):
    X_masked = X.copy()
    for i in range(X.shape[0]):
        t = np.random.randint(0, X.shape[1] - max_mask_size)
        X_masked[i, t:t+max_mask_size, :] = 0
    return X_masked

def time_shift(X, shift_max=5):
    shifted = []
    for seq in X:
        shift = np.random.randint(-shift_max, shift_max)
        rolled = np.roll(seq, shift, axis=0)
        if shift > 0: rolled[:shift, :] = 0
        elif shift < 0: rolled[shift:, :] = 0
        shifted.append(rolled)
    return np.array(shifted)

X_aug = np.concatenate([X_res,
                        add_noise(X_res),
                        random_time_scaling(X_res),
                        time_mask(X_res),
                        time_shift(X_res)], axis=0)
y_aug = np.concatenate([y_res] * 5)

# === LSTM Model ===
# === LSTM Model ===
model = Sequential([
    Input(shape=(timesteps, feat_dim)),
    Masking(mask_value=0.0),
    Bidirectional(LSTM(128, return_sequences=True)),
    Dropout(0.3),
    Bidirectional(LSTM(64)),
    Dropout(0.3),
    Dense(64, activation='relu'),
    Dropout(0.3),
    Dense(len(np.unique(y)), activation='softmax')
])

# === Optimizer with fixed LR ===
optimizer = Adam(learning_rate=1e-4)

# === Compile ===
model.compile(optimizer=optimizer,
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

# === Callbacks ===
checkpoint = ModelCheckpoint('best_lstm_model.h5', monitor='val_loss', save_best_only=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6)
early_stop = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

# === Class Weights ===
class_weights = compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)
class_weights = dict(enumerate(class_weights))

# === Train ===
history = model.fit(
    X_aug, y_aug,
    validation_data=(X_val, y_val),
    epochs=90,
    batch_size=32,
    class_weight=class_weights,
    callbacks=[checkpoint, reduce_lr, early_stop]
)


# === Evaluate ===
y_val_pred_probs = model.predict(X_val)
y_val_pred = np.argmax(y_val_pred_probs, axis=1)

print("✅ Validation Accuracy:", accuracy_score(y_val, y_val_pred))
print("✅ Classification Report:\n", classification_report(y_val, y_val_pred, target_names=le.classes_))

# === Confusion Matrix ===
cm = confusion_matrix(y_val, y_val_pred)
cm_norm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
class_names = le.classes_

fig, axes = plt.subplots(1, 2, figsize=(16, 6))
for i, mat in enumerate([cm, cm_norm]):
    ax = axes[i]
    im = ax.imshow(mat, cmap=plt.cm.Blues)
    ax.set_title("Counts" if i == 0 else "Normalized")
    ax.set_xticks(np.arange(len(class_names)))
    ax.set_yticks(np.arange(len(class_names)))
    ax.set_xticklabels(class_names, rotation=45)
    ax.set_yticklabels(class_names)
    for x in range(len(class_names)):
        for y in range(len(class_names)):
            value = f"{mat[x, y]:.2f}" if i == 1 else str(mat[x, y])
            ax.text(y, x, value, ha='center', va='center',
                    color="white" if mat[x, y] > mat.max() / 2 else "black")
    ax.set_xlabel("Predicted")
    ax.set_ylabel("True")
plt.tight_layout()
plt.show()


✔️ Speech - Loaded 1440 samples
✔️ Song - Loaded 1012 samples
✅ Total samples: 2452
✅ X shape: (2452, 300, 17)
✅ y shape: (2452,)
Epoch 1/90
[1m 65/377[0m [32m━━━[0m[37m━━━━━━━━━━━━━━━━━[0m [1m1:18[0m 251ms/step - accuracy: 0.1138 - loss: 2.3245

KeyboardInterrupt: 