# Fixed Ensemble Learning for Music Genre Classification

This notebook implements a properly designed ensemble approach combining:
- LSTM model for vocal features
- CNN model for accompaniment features
- Proper train/validation/test splits
- Correct stacking implementation without data leakage

In [None]:
import tensorflow as tf
import numpy as np
import json
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report
from xgboost import XGBClassifier
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Dense, Flatten, Dropout, Bidirectional, LSTM
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
import matplotlib.pyplot as plt
import os
import seaborn as sns

print("TensorFlow version:", tf.__version__)
print("GPU available:", tf.config.list_physical_devices('GPU'))

## Configuration and Parameters

In [None]:
# Parameters
batch_size = 64
data_path_vocal = '../Data/data.json'  # Vocal MFCCs
data_path_accomp = '../Data/accompaniment_mfcc.json'  # Accompaniment MFCCs
lstm_model_path = '../models/lstm_vocal_classifier.keras'
cnn_model_path = '../models/cnn_accompaniment_classifier.keras'

genres = ['blues', 'classical', 'country', 'disco', 'hiphop', 'jazz', 'metal', 'pop', 'reggae', 'rock']
num_classes = len(genres)
epochs = 100
learning_rate = 0.0001

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

## Step 1: Load and Prepare Data

**FIX**: Load separate vocal and accompaniment data sources

In [None]:
print("Loading data...")

# Load vocal MFCCs
with open(data_path_vocal, 'r', encoding='utf-8') as f:
    data_vocal = json.load(f)

# Load accompaniment MFCCs (if available)
if os.path.exists(data_path_accomp):
    with open(data_path_accomp, 'r', encoding='utf-8') as f:
        data_accomp = json.load(f)
    print("✓ Loaded separate vocal and accompaniment data")
else:
    print("⚠ Warning: Accompaniment data not found. Using vocal data for both (not ideal).")
    data_accomp = data_vocal

# Extract features and labels
mfccs_vocal = data_vocal['mfcc']
mfccs_accomp = data_accomp['mfcc']
labels = data_vocal['genre_num']

print(f"Vocal MFCCs: {len(mfccs_vocal)}, Accomp MFCCs: {len(mfccs_accomp)}, Labels: {len(labels)}")

# Align data
min_length = min(len(mfccs_vocal), len(mfccs_accomp), len(labels))
mfccs_vocal = mfccs_vocal[:min_length]
mfccs_accomp = mfccs_accomp[:min_length]
labels = labels[:min_length]

# Convert to numpy arrays
X_vocal = np.array(mfccs_vocal)
X_accomp = np.array(mfccs_accomp)
y = np.array(labels)

print(f"\nInitial shapes:")
print(f"X_vocal: {X_vocal.shape}, X_accomp: {X_accomp.shape}, y: {y.shape}")

# Handle non-finite values
for X, name in [(X_vocal, "Vocal"), (X_accomp, "Accompaniment")]:
    if not np.isfinite(X).all():
        print(f"⚠ Warning: Non-finite values in {name} MFCCs, replacing with zeros")
        X = np.where(np.isfinite(X), X, 0.0)

## Step 2: Preprocess Features for Different Models

**FIX**: Properly reshape data for LSTM (40, 132) and CNN (40, 132, 1)

In [None]:
def prepare_mfcc_for_lstm(X, target_shape=(40, 132)):
    """Prepare MFCCs for LSTM input: (samples, 40, 132)"""
    X_processed = X.copy()
    
    if X_processed.shape[1:] == (130, 13):
        # Truncate to 40 coefficients
        X_processed = X_processed[:, :40, :]
        # Pad time dimension to 132
        X_processed = np.pad(X_processed, ((0, 0), (0, 0), (0, 132 - X_processed.shape[2])), mode='constant')
    
    return X_processed

def prepare_mfcc_for_cnn(X, target_shape=(40, 132, 1)):
    """Prepare MFCCs for CNN input: (samples, 40, 132, 1)"""
    X_processed = prepare_mfcc_for_lstm(X)
    # Add channel dimension
    X_processed = X_processed[..., np.newaxis]
    return X_processed

# Prepare data
X_vocal_processed = prepare_mfcc_for_lstm(X_vocal)
X_accomp_processed = prepare_mfcc_for_cnn(X_accomp)

print("Processed shapes:")
print(f"X_vocal (for LSTM): {X_vocal_processed.shape}")
print(f"X_accomp (for CNN): {X_accomp_processed.shape}")

# Encode labels
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)
print(f"\nEncoded labels: {np.unique(y_encoded)}")

## Step 3: Split Data Properly

**FIX**: Use proper train/val/test split (60/20/20) with stratification

In [None]:
# First split: 60% train, 40% temp (for val and test)
X_vocal_train, X_vocal_temp, X_accomp_train, X_accomp_temp, y_train, y_temp = train_test_split(
    X_vocal_processed, X_accomp_processed, y_encoded, 
    test_size=0.4, stratify=y_encoded, random_state=42
)

# Second split: 50% of temp for val, 50% for test (20% each of total)
X_vocal_val, X_vocal_test, X_accomp_val, X_accomp_test, y_val, y_test = train_test_split(
    X_vocal_temp, X_accomp_temp, y_temp, 
    test_size=0.5, stratify=y_temp, random_state=42
)

print("Data split:")
print(f"Train: {X_vocal_train.shape[0]} samples ({X_vocal_train.shape[0]/len(X_vocal_processed)*100:.1f}%)")
print(f"Val:   {X_vocal_val.shape[0]} samples ({X_vocal_val.shape[0]/len(X_vocal_processed)*100:.1f}%)")
print(f"Test:  {X_vocal_test.shape[0]} samples ({X_vocal_test.shape[0]/len(X_vocal_processed)*100:.1f}%)")

# Verify stratification
print("\nClass distribution:")
print(f"Train: {np.bincount(y_train)}")
print(f"Val:   {np.bincount(y_val)}")
print(f"Test:  {np.bincount(y_test)}")

## Step 4: Load or Train Base Models

**FIX**: Use consistent, clean architectures

In [None]:
def build_lstm_model(input_shape=(40, 132), num_classes=10):
    """Build LSTM model for vocal classification"""
    model = Sequential([
        Bidirectional(LSTM(256, return_sequences=True), input_shape=input_shape),
        Bidirectional(LSTM(256)),
        Dense(256, activation='relu'),
        Dropout(0.5),
        Dense(128, activation='relu'),
        Dropout(0.5),
        Dense(32, activation='relu'),
        Dropout(0.5),
        Dense(num_classes, activation='softmax')
    ])
    return model

def build_cnn_model(input_shape=(40, 132, 1), num_classes=10):
    """Build CNN model for accompaniment classification"""
    model = Sequential([
        Conv2D(64, (3, 3), activation='relu', padding='same', input_shape=input_shape),
        MaxPooling2D((2, 2)),
        Conv2D(128, (3, 3), activation='relu', padding='same'),
        MaxPooling2D((2, 2)),
        Conv2D(256, (3, 3), activation='relu', padding='same'),
        MaxPooling2D((2, 2)),
        Conv2D(512, (3, 3), activation='relu', padding='same'),
        MaxPooling2D((2, 2)),
        Flatten(),
        Dense(256, activation='relu'),
        Dropout(0.5),
        Dense(128, activation='relu'),
        Dropout(0.5),
        Dense(num_classes, activation='softmax')
    ])
    return model

# Load or build LSTM model
if os.path.exists(lstm_model_path):
    print("Loading LSTM model...")
    lstm_model = load_model(lstm_model_path)
else:
    print("Training LSTM model...")
    lstm_model = build_lstm_model()
    lstm_model.compile(
        optimizer=Adam(learning_rate=learning_rate),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    
    callbacks = [
        EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True),
        ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=7, min_lr=1e-7)
    ]
    
    lstm_history = lstm_model.fit(
        X_vocal_train, y_train,
        validation_data=(X_vocal_val, y_val),
        epochs=epochs,
        batch_size=batch_size,
        callbacks=callbacks,
        verbose=1
    )
    lstm_model.save(lstm_model_path)

# Load or build CNN model
if os.path.exists(cnn_model_path):
    print("Loading CNN model...")
    cnn_model = load_model(cnn_model_path)
else:
    print("Training CNN model...")
    cnn_model = build_cnn_model()
    cnn_model.compile(
        optimizer=Adam(learning_rate=learning_rate),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    
    callbacks = [
        EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True),
        ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=7, min_lr=1e-7)
    ]
    
    cnn_history = cnn_model.fit(
        X_accomp_train, y_train,
        validation_data=(X_accomp_val, y_val),
        epochs=epochs,
        batch_size=batch_size,
        callbacks=callbacks,
        verbose=1
    )
    cnn_model.save(cnn_model_path)

print("\n✓ Base models ready")

## Step 5: Evaluate Individual Models

In [None]:
# Evaluate LSTM
lstm_loss, lstm_acc = lstm_model.evaluate(X_vocal_test, y_test, verbose=0)
print(f"LSTM Test Accuracy: {lstm_acc:.4f}")

# Evaluate CNN
cnn_loss, cnn_acc = cnn_model.evaluate(X_accomp_test, y_test, verbose=0)
print(f"CNN Test Accuracy: {cnn_acc:.4f}")

## Step 6: Generate Predictions for Ensemble

**IMPORTANT**: Generate predictions on validation and test sets separately

In [None]:
print("Generating predictions...")

# Validation set predictions (for training meta-models)
lstm_probs_val = lstm_model.predict(X_vocal_val, verbose=0)
cnn_probs_val = cnn_model.predict(X_accomp_val, verbose=0)

# Test set predictions (for final evaluation)
lstm_probs_test = lstm_model.predict(X_vocal_test, verbose=0)
cnn_probs_test = cnn_model.predict(X_accomp_test, verbose=0)

print(f"Validation predictions shape: {lstm_probs_val.shape}, {cnn_probs_val.shape}")
print(f"Test predictions shape: {lstm_probs_test.shape}, {cnn_probs_test.shape}")

## Step 7: Bagging Methods (Voting)

**FIX**: Proper averaging and weighted voting

In [None]:
print("\n=== BAGGING METHODS ===")

# 1. Simple Average (Equal weights)
mean_probs = (lstm_probs_test + cnn_probs_test) / 2
mean_preds = np.argmax(mean_probs, axis=1)
mean_accuracy = accuracy_score(y_test, mean_preds)
print(f"\n1. Mean Averaging Accuracy: {mean_accuracy:.4f}")

# 2. Weighted Voting (based on validation accuracy)
lstm_val_acc = accuracy_score(y_val, np.argmax(lstm_probs_val, axis=1))
cnn_val_acc = accuracy_score(y_val, np.argmax(cnn_probs_val, axis=1))

# Normalize weights to sum to 1
total_acc = lstm_val_acc + cnn_val_acc
w_lstm = lstm_val_acc / total_acc
w_cnn = cnn_val_acc / total_acc

print(f"\nWeights based on validation accuracy:")
print(f"LSTM: {w_lstm:.3f} (val_acc={lstm_val_acc:.4f})")
print(f"CNN:  {w_cnn:.3f} (val_acc={cnn_val_acc:.4f})")

weighted_probs = w_lstm * lstm_probs_test + w_cnn * cnn_probs_test
weighted_preds = np.argmax(weighted_probs, axis=1)
weighted_accuracy = accuracy_score(y_test, weighted_preds)
print(f"\n2. Weighted Voting Accuracy: {weighted_accuracy:.4f}")

# 3. Max Voting (Hard voting)
lstm_class_preds = np.argmax(lstm_probs_test, axis=1)
cnn_class_preds = np.argmax(cnn_probs_test, axis=1)

# Combine predictions (if they disagree, choose based on confidence)
max_probs = np.maximum(np.max(lstm_probs_test, axis=1), np.max(cnn_probs_test, axis=1))
max_preds = np.where(
    np.max(lstm_probs_test, axis=1) > np.max(cnn_probs_test, axis=1),
    lstm_class_preds,
    cnn_class_preds
)
max_accuracy = accuracy_score(y_test, max_preds)
print(f"\n3. Max Voting Accuracy: {max_accuracy:.4f}")

## Step 8: Stacking Methods

**FIX**: Train meta-models on VALIDATION set, evaluate on TEST set (no data leakage)

In [None]:
print("\n=== STACKING METHODS ===")

# Prepare stacking features
stacking_features_val = np.concatenate([lstm_probs_val, cnn_probs_val], axis=1)
stacking_features_test = np.concatenate([lstm_probs_test, cnn_probs_test], axis=1)

print(f"Stacking features shape - Val: {stacking_features_val.shape}, Test: {stacking_features_test.shape}")

# 1. Logistic Regression Meta-Model
print("\n1. Training Logistic Regression meta-model...")
lr_meta = LogisticRegression(max_iter=1000, random_state=42)
lr_meta.fit(stacking_features_val, y_val)

lr_val_preds = lr_meta.predict(stacking_features_val)
lr_test_preds = lr_meta.predict(stacking_features_test)

lr_val_acc = accuracy_score(y_val, lr_val_preds)
lr_test_acc = accuracy_score(y_test, lr_test_preds)

print(f"Logistic Regression - Val Acc: {lr_val_acc:.4f}, Test Acc: {lr_test_acc:.4f}")

# 2. XGBoost Meta-Model
print("\n2. Training XGBoost meta-model...")
xgb_meta = XGBClassifier(
    n_estimators=100,
    learning_rate=0.1,
    max_depth=3,
    random_state=42,
    eval_metric='mlogloss',
    use_label_encoder=False
)
xgb_meta.fit(stacking_features_val, y_val)

xgb_val_preds = xgb_meta.predict(stacking_features_val)
xgb_test_preds = xgb_meta.predict(stacking_features_test)

xgb_val_acc = accuracy_score(y_val, xgb_val_preds)
xgb_test_acc = accuracy_score(y_test, xgb_test_preds)

print(f"XGBoost - Val Acc: {xgb_val_acc:.4f}, Test Acc: {xgb_test_acc:.4f}")

# 3. Neural Network Meta-Model
print("\n3. Training Neural Network meta-model...")

def build_meta_model(input_dim=20, num_classes=10):
    model = Sequential([
        Dense(128, activation='relu', input_dim=input_dim),
        Dropout(0.3),
        Dense(64, activation='relu'),
        Dropout(0.3),
        Dense(32, activation='relu'),
        Dropout(0.3),
        Dense(num_classes, activation='softmax')
    ])
    return model

nn_meta = build_meta_model(input_dim=stacking_features_val.shape[1])
nn_meta.compile(
    optimizer=Adam(learning_rate=0.001),
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

# Further split validation set for meta-model training
X_meta_train, X_meta_val, y_meta_train, y_meta_val = train_test_split(
    stacking_features_val, y_val, test_size=0.2, stratify=y_val, random_state=42
)

nn_history = nn_meta.fit(
    X_meta_train, y_meta_train,
    validation_data=(X_meta_val, y_meta_val),
    epochs=50,
    batch_size=32,
    callbacks=[EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)],
    verbose=0
)

nn_val_loss, nn_val_acc = nn_meta.evaluate(stacking_features_val, y_val, verbose=0)
nn_test_loss, nn_test_acc = nn_meta.evaluate(stacking_features_test, y_test, verbose=0)

print(f"Neural Network - Val Acc: {nn_val_acc:.4f}, Test Acc: {nn_test_acc:.4f}")

## Step 9: Summary of Results

In [None]:
print("\n" + "="*60)
print("ENSEMBLE RESULTS SUMMARY")
print("="*60)

results = [
    ("LSTM (individual)", lstm_acc),
    ("CNN (individual)", cnn_acc),
    ("Mean Averaging", mean_accuracy),
    ("Weighted Voting", weighted_accuracy),
    ("Max Voting", max_accuracy),
    ("Stacking - Logistic Regression", lr_test_acc),
    ("Stacking - XGBoost", xgb_test_acc),
    ("Stacking - Neural Network", nn_test_acc)
]

# Sort by accuracy
results.sort(key=lambda x: x[1], reverse=True)

print("\nTest Accuracy Rankings:\n")
for i, (name, acc) in enumerate(results, 1):
    improvement = ""
    if acc > max(lstm_acc, cnn_acc):
        improvement = f" (+{acc - max(lstm_acc, cnn_acc):.4f})"
    print(f"{i}. {name:.<45} {acc:.4f}{improvement}")

print("\n" + "="*60)

## Step 10: Visualize Best Model Performance

In [None]:
# Use the best performing model (XGBoost in most cases)
best_preds = xgb_test_preds
best_name = "XGBoost Stacking"

# Confusion Matrix
cm = confusion_matrix(y_test, best_preds)

plt.figure(figsize=(12, 10))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
            xticklabels=genres, yticklabels=genres,
            cbar_kws={'label': 'Count'})
plt.xlabel('Predicted Genre', fontsize=12)
plt.ylabel('True Genre', fontsize=12)
plt.title(f'Confusion Matrix - {best_name}\nTest Accuracy: {xgb_test_acc:.4f}', fontsize=14)
plt.tight_layout()
plt.show()

# Classification Report
print("\nClassification Report:")
print(classification_report(y_test, best_preds, target_names=genres))

## Step 11: Compare All Methods Visually

In [None]:
# Bar plot comparison
methods = [r[0] for r in results]
accuracies = [r[1] for r in results]

plt.figure(figsize=(12, 6))
colors = ['#FF6B6B' if 'individual' in m.lower() else '#4ECDC4' if 'Voting' in m or 'Averaging' in m else '#45B7D1' for m in methods]
bars = plt.barh(methods, accuracies, color=colors)

# Add accuracy values on bars
for i, (bar, acc) in enumerate(zip(bars, accuracies)):
    plt.text(acc + 0.005, i, f'{acc:.4f}', va='center', fontsize=10)

plt.xlabel('Test Accuracy', fontsize=12)
plt.title('Comparison of Ensemble Methods', fontsize=14, fontweight='bold')
plt.xlim(0, 1.0)
plt.grid(axis='x', alpha=0.3)
plt.tight_layout()
plt.show()

print("\n✓ Ensemble learning analysis complete!")