<a href="https://colab.research.google.com/github/umesh6396/DAIassignment/blob/main/Mars.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import zipfile
import os
import shutil

# Paths to your original ZIP files
speech_zip = "Audio_Speech_Actors_01-24.zip"
song_zip = "Audio_Song_Actors_01-24 - Copy.zip"

# Temporary extract directories
extract_base = "temp_extracted"
speech_extract_dir = os.path.join(extract_base, "speech")
song_extract_dir = os.path.join(extract_base, "song")

# Output subset directory
subset_dir = "subset_audio"

# Remove the subset_dir if it exists to avoid FileExistsError
if os.path.exists(subset_dir):
    shutil.rmtree(subset_dir)

os.makedirs(subset_dir, exist_ok=True)


# Step 1: Unzip full archives locally
def unzip_selected(zip_path, target_dir, selected_actors=(1, 2)):
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        for member in zip_ref.namelist():
            for actor in selected_actors:
                actor_str = f"Actor_{actor:02d}"
                if actor_str in member and member.endswith(".wav"):
                    zip_ref.extract(member, target_dir)

# Extract selected actors
unzip_selected(speech_zip, speech_extract_dir)
unzip_selected(song_zip, song_extract_dir)

# Step 2: Copy relevant folders into a clean subset folder
def copy_actors(src_dir, dest_dir, label):
    for actor_folder in os.listdir(src_dir):
        actor_path = os.path.join(src_dir, actor_folder)
        if os.path.isdir(actor_path) and actor_folder.startswith("Actor_"):
            dest_path = os.path.join(dest_dir, f"{label}_{actor_folder}")
            shutil.copytree(actor_path, dest_path)

copy_actors(speech_extract_dir, subset_dir, "speech")
copy_actors(song_extract_dir, subset_dir, "song")

# Step 3: Zip the subset folder
shutil.make_archive("subset_audio_dataset", 'zip', subset_dir)

print("✅ Subset ZIP created: subset_audio_dataset.zip (ready to upload)")

✅ Subset ZIP created: subset_audio_dataset.zip (ready to upload)


In [3]:
import os
import librosa
import numpy as np
import pandas as pd

AUDIO_DIR = "subset_audio"
SAMPLE_RATE = 16000
N_MFCC = 40

def get_emotion_from_filename(filename):
    """Extracts emotion label from the filename based on the format."""
    # Assuming filename format like 'EmotionCode-ActorNumber-additional_info.wav'
    # or similar where emotion is in the first part
    parts = filename.split('-')
    # This is a placeholder; adjust based on your actual filename format
    # For RAVDESS dataset, filename format is usually like:
    # 03-01-01-01-01-01-01.wav (Emotion-Emotional intensity-Statement-Repetition-Actor-...)
    # Emotion codes: 01=neutral, 02=calm, 03=happy, 04=sad, 05=angry, 06=fearful, 07=disgust, 08=surprised.
    emotion_codes = {
        '01': 'neutral', '02': 'calm', '03': 'happy', '04': 'sad',
        '05': 'angry', '06': 'fearful', '07': 'disgust', '08': 'surprised'
    }
    try:
        emotion_code = parts[2] # Adjust index based on your filename structure
        return emotion_codes.get(emotion_code, 'unknown')
    except IndexError:
        return 'unknown' # Handle cases where filename format is unexpected

def extract_features_with_augment(path, label):
    features = []

    y, sr = librosa.load(path, sr=SAMPLE_RATE)
    versions = {
        'original': y,
        'pitch_shift': librosa.effects.pitch_shift(y, sr=sr, n_steps=2),
        'time_stretch': librosa.effects.time_stretch(y, rate=0.9)
    }

    for name, audio in versions.items():
        mfcc = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=N_MFCC)
        mfcc_mean = np.mean(mfcc.T, axis=0)
        features.append([*mfcc_mean, label])

    return features

data = []
for root, _, files in os.walk(AUDIO_DIR):
    for file in files:
        if file.endswith(".wav"):
            path = os.path.join(root, file)
            emotion = get_emotion_from_filename(file)  # reuse your existing label parser
            data.extend(extract_features_with_augment(path, emotion))

df_aug = pd.DataFrame(data)
df_aug.to_csv("mfcc_features_augmented.csv", index=False)
print("✅ Saved: mfcc_features_augmented.csv with original + augmented features")

✅ Saved: mfcc_features_augmented.csv with original + augmented features


In [5]:
import pandas as pd
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.utils.class_weight import compute_class_weight
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Dropout
from tensorflow.keras.layers import Reshape, Bidirectional, LSTM, Dense
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.metrics import classification_report, confusion_matrix

# Step 1: Load and preprocess data
df = pd.read_csv("mfcc_features_augmented.csv")
X = df.iloc[:, :-1].values
y = df.iloc[:, -1].values

# Encode labels
le = LabelEncoder()
y_encoded = le.fit_transform(y)
y_cat = to_categorical(y_encoded)  # One-hot encoding

# Reshape MFCCs (40,) → (10, 4, 1) for CNN input
X_reshaped = X.reshape(-1, 10, 4, 1)

# Split into train/val/test
X_train, X_temp, y_train, y_temp = train_test_split(
    X_reshaped, y_cat, test_size=0.3, stratify=y_encoded, random_state=42)

X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, stratify=np.argmax(y_temp, axis=1), random_state=42)

# Step 2: Compute class weights
class_weights = compute_class_weight(
    class_weight='balanced',
    classes=np.unique(np.argmax(y_train, axis=1)),
    y=np.argmax(y_train, axis=1)
)
class_weights_dict = {i: class_weights[i] for i in range(len(class_weights))}

# Step 3: Define focal loss for one-hot labels
def categorical_focal_loss(gamma=2.0, alpha=0.25):
    def loss_fn(y_true, y_pred):
        y_pred = tf.clip_by_value(y_pred, 1e-7, 1.0)
        ce = -y_true * tf.math.log(y_pred)
        fl = alpha * tf.math.pow(1 - y_pred, gamma) * ce
        return tf.reduce_mean(tf.reduce_sum(fl, axis=1))
    return loss_fn

focal_loss = categorical_focal_loss(gamma=2.0, alpha=0.25)

# Step 4: Build CNN + BiLSTM model
input_shape = X_train.shape[1:]
inp = Input(shape=input_shape)

x = Conv2D(32, (3, 3), activation='relu', padding='same')(inp)
x = MaxPooling2D((2, 2))(x)
x = Dropout(0.4)(x)

# Reshape the output of MaxPooling2D to be 3D for the LSTM layer
# The new shape will be (batch_size, number_of_time_steps, number_of_features)
# Here, time_steps is the product of the height and width of the MaxPooling2D output
# and features is the number of filters (32).
reshape_shape = (x.shape[1] * x.shape[2], x.shape[3])
x = Reshape(reshape_shape)(x)


from tensorflow.keras.layers import Attention, LayerNormalization

# After BiLSTM
x = Bidirectional(LSTM(64, return_sequences=True))(x)
x = Attention()([x, x])  # self-attention
x = tf.keras.layers.GlobalAveragePooling1D()(x)

# x = Reshape((x.shape[1] * x.shape[2], x.shape[3]))(x) # This reshape is now incorrect after Attention and GlobalAveragePooling1D
# The output of GlobalAveragePooling1D is already 2D (batch_size, features), so no reshape is needed before the next Dense layer.
# If you intended to have another BiLSTM *after* attention, you would need return_sequences=True in the first BiLSTM
# and then potentially reshape or handle the output for the second BiLSTM.
# Assuming the intention was a single BiLSTM followed by attention and pooling, the next layers should be Dense.


x = Dropout(0.4)(x)

x = Dense(64, activation='relu')(x)
out = Dense(y_cat.shape[1], activation='softmax')(x)

model = Model(inputs=inp, outputs=out)
model.compile(optimizer='adam', loss=focal_loss, metrics=['accuracy'])
model.summary()

# Step 5: Train model
early_stop = EarlyStopping(patience=10, restore_best_weights=True)

history = model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=100,
    batch_size=16,
    class_weight=class_weights_dict,
    callbacks=[early_stop]
)

# Step 6: Evaluate model
y_pred = model.predict(X_test)
y_pred_labels = np.argmax(y_pred, axis=1)
y_true_labels = np.argmax(y_test, axis=1)

print("Confusion Matrix:")
print(confusion_matrix(y_true_labels, y_pred_labels))
print("\nClassification Report:")
print(classification_report(y_true_labels, y_pred_labels, target_names=le.classes_))

Epoch 1/100
[1m28/28[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 25ms/step - accuracy: 0.1342 - loss: 0.4006 - val_accuracy: 0.2021 - val_loss: 0.3798
Epoch 2/100
[1m28/28[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 12ms/step - accuracy: 0.1659 - loss: 0.3851 - val_accuracy: 0.2021 - val_loss: 0.3701
Epoch 3/100
[1m28/28[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step - accuracy: 0.1999 - loss: 0.3557 - val_accuracy: 0.2340 - val_loss: 0.3663
Epoch 4/100
[1m28/28[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step - accuracy: 0.1641 - loss: 0.3728 - val_accuracy: 0.2553 - val_loss: 0.3604
Epoch 5/100
[1m28/28[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step - accuracy: 0.2288 - loss: 0.3756 - val_accuracy: 0.1702 - val_loss: 0.3596
Epoch 6/100
[1m28/28[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 11ms/step - accuracy: 0.1983 - loss: 0.3797 - val_accuracy: 0.2447 - val_loss: 0.3494
Epoch 7/100
[1m28/28[0m [3

In [10]:
model.save("models/final_model.h5")


