In [24]:
import os
import pandas as pd
import numpy as np
import json
import matplotlib.pyplot as plt
import tensorflow as tf
import logging

from scikeras.wrappers import KerasClassifier
from tensorflow import keras
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, classification_report
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.model_selection import StratifiedKFold, GridSearchCV
from sklearn.compose import ColumnTransformer, make_column_selector as selector
from sklearn.utils.class_weight import compute_class_weight
from ast import literal_eval

### Load and explore data files

In [2]:
# Load data
X_train = pd.read_csv("../data/X_train.csv")
X_test = pd.read_csv("../data/X_test.csv")
y_train = pd.read_csv("../data/y_train.csv").squeeze()
y_test = pd.read_csv("../data/y_test.csv").squeeze()

# Print exploratory dataset information
print("Train shape:", X_train.shape)
print("Test shape:", X_test.shape)
print("\nLabel distribution in training set:")
print(y_train.value_counts(normalize=True))

Train shape: (796, 595)
Test shape: (199, 595)

Label distribution in training set:
y
2    0.201005
6    0.154523
1    0.153266
0    0.141960
4    0.136935
3    0.115578
5    0.096734
Name: proportion, dtype: float64


In [3]:
# Use non-OpenL3 features
def drop_l3_embeddings(df):
    def is_l3(col):
        if not col.startswith("e"): return False
        suf = col[1:]
        if not suf.isdigit(): return False
        idx = int(suf)
        return 0 <= idx <= 511
    cols_to_drop = [c for c in df.columns if is_l3(c)]
    return df.drop(columns=cols_to_drop, errors="ignore")

X_train_f = drop_l3_embeddings(X_train)
X_test_f  = drop_l3_embeddings(X_test)

print("Shapes (features):", X_train_f.shape, X_test_f.shape)

Shapes (features): (796, 83) (199, 83)


### Train and evaluate DL models

In [6]:
# Evaluate function
def evaluate(name, est):
    est.fit(X_train_f, y_train)
    y_pred = est.predict(X_test_f)
    acc = accuracy_score(y_test, y_pred)
    mf1 = f1_score(y_test, y_pred, average="macro")
    print(f"\n=== {name} ===")
    print("Accuracy:", acc)
    print("Macro-F1:", mf1)
    print(classification_report(y_test, y_pred, zero_division=0)) # i am not exactly sure what zero division does but it helped me get rid of an annoying warning
    return name, acc, mf1

# Define CV splits
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

In [None]:
# Compute class weights for future data balancing
classes = np.unique(y_train)
cw = compute_class_weight('balanced', classes=classes, y=y_train)
class_weight = dict(zip(classes, cw))

# Constants for model building
n_classes = len(np.unique(y_train))
input_dim = X_train_f.shape[1]

#### Model 1: Simple feed-forward neural network (FFNN) with a dropout layer

In [76]:
def build_ffnn(hidden_units, hidden_layers, dropout, lr):
    inputs = keras.Input(shape=(input_dim,))
    x = inputs
    for _ in range(hidden_layers):
        x = keras.layers.Dense(hidden_units, activation="relu")(x)
        x = keras.layers.Dropout(dropout)(x)
        x = keras.layers.BatchNormalization()(x)
    outputs = keras.layers.Dense(n_classes, activation="softmax")(x)
    model = keras.Model(inputs, outputs)
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=lr),
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"]
    )
    return model

pipe_ffnn = Pipeline([
    ("scaler", StandardScaler()),
    ("clf", KerasClassifier(
        model=build_ffnn,
        epochs=80,
        batch_size=64,
        verbose=0,
        validation_split = 0.15,
        callbacks=[keras.callbacks.EarlyStopping(
            monitor="val_loss", patience=10, restore_best_weights=True
        )]
    ))
])

param_grid_ffnn = {
    "clf__model__hidden_units": [64, 128],
    "clf__model__hidden_layers": [2, 3, 4],
    "clf__model__dropout": [0.2, 0.3, 0.4],
    "clf__model__lr": [1e-2, 1e-3, 1e-4],
}

gs_ffnn = GridSearchCV(
    pipe_ffnn, param_grid_ffnn,
    scoring="f1_macro", cv=cv, n_jobs=-1, verbose=0
)

In [None]:
gs_ffnn.fit(X_train_f, y_train, clf__class_weight=class_weight)

#### Model 2: FFNN with dense skip connections

In [79]:
def build_ds_ffnn(meta, hidden_units, hidden_layers, dropout, lr):
    input_dim = meta["n_features_in_"]
    n_classes = meta["n_classes_"]
    
    inputs = keras.Input(shape=(input_dim,))
    x = keras.layers.Dense(hidden_units, activation="relu")(inputs)
    skips = [x]

    for _ in range(hidden_layers - 1):
        y = keras.layers.Concatenate()(skips)
        y = keras.layers.Dense(hidden_units, activation="relu")(y)
        y = keras.layers.Dropout(dropout)(y)
        skips.append(y)

    x = keras.layers.Concatenate()(skips)
    outputs = keras.layers.Dense(n_classes, activation="softmax")(x)

    model = keras.Model(inputs, outputs)
    model.compile(
        optimizer=keras.optimizers.Adam(lr),
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"],
    )
    return model

pipe_ds_ffnn = Pipeline([
    ("scaler", StandardScaler()),
    ("clf", KerasClassifier(
        model=build_ds_ffnn,
        epochs=80,
        batch_size=64,
        verbose=0,
        validation_split = 0.15,
        callbacks=[keras.callbacks.EarlyStopping(
            monitor="val_loss", patience=10, restore_best_weights=True
        )]
    ))
])

param_grid_ds_ffnn = {
    "clf__model__hidden_units": [64, 128, 256],
    "clf__model__hidden_layers": [2, 3, 4],
    "clf__model__dropout": [0.1, 0.2],
    "clf__model__lr": [1e-3, 1e-4],
}

gs_ds_ffnn = GridSearchCV(
    pipe_ds_ffnn, param_grid_ds_ffnn,
    scoring="f1_macro", cv=cv, n_jobs=-1, verbose=0
)

In [80]:
gs_ds_ffnn.fit(X_train_f, y_train, clf__class_weight=class_weight)



#### Evaluate and compare models

In [81]:
rows = []
rows.append(evaluate("Simple Feed-Forward Network (scaled, balanced)", gs_ffnn.best_estimator_))
rows.append(evaluate("DenseNet (scaled, balanced)", gs_ds_ffnn.best_estimator_))

# Prints summaries, also outputs best parameters for function
summary = pd.DataFrame(rows, columns=["model","test_accuracy","test_macro_f1"]).sort_values("test_macro_f1", ascending=False)
print("Summaries (sorted on macro_f1)")
print(summary)
print("\nBest params:")
print("- FFNN:", gs_ffnn.best_params_)
print("- DenseNet:", gs_ds_ffnn.best_params_)


=== Simple Feed-Forward Network (scaled, balanced) ===
Accuracy: 0.24623115577889448
Macro-F1: 0.1794031396534996
              precision    recall  f1-score   support

           0       0.00      0.00      0.00        28
           1       0.47      0.27      0.34        30
           2       0.27      0.60      0.38        40
           3       0.29      0.26      0.27        23
           4       0.12      0.04      0.06        27
           5       0.00      0.00      0.00        20
           6       0.16      0.32      0.21        31

    accuracy                           0.25       199
   macro avg       0.19      0.21      0.18       199
weighted avg       0.20      0.25      0.20       199


=== DenseNet (scaled, balanced) ===
Accuracy: 0.271356783919598
Macro-F1: 0.23248629394591108
              precision    recall  f1-score   support

           0       0.21      0.14      0.17        28
           1       0.19      0.10      0.13        30
           2       0.33      0

### Train and evaluate DenseNet with feature selection

In [62]:
# Drop unselected features
with open("../outputs/selected_features.unbalanced.json") as f:
    selected_features = json.load(f)

X_train_fs = X_train_f[selected_features]
X_test_fs = X_test_f[selected_features]

print("Shapes (features):", X_train_fs.shape, X_test_fs.shape)

Shapes (features): (796, 68) (199, 68)


In [65]:
param_grid_ds_ffnn_f = {
    "clf__model__hidden_units": [64, 128, 256],
    "clf__model__hidden_layers": [2, 3, 4],
    "clf__model__dropout": [0, 0.1, 0.2],
    "clf__model__lr": [1e-3, 1e-4],
}

gs_ds_ffnn_f = GridSearchCV(
    pipe_ds_ffnn, param_grid_ds_ffnn,
    scoring="f1_macro", cv=cv, n_jobs=-1, verbose=0
)

gs_ds_ffnn_f.fit(X_train_fs, y_train, clf__class_weight=class_weight)

In [75]:
best = gs_ds_ffnn_f.best_estimator_
print("\n Best params:", gs_ds_ffnn_f.best_params_)
print("Best CV Macro-F1:", round(gs_ds_ffnn_f.best_score_, 3))

y_pred = best.predict(X_test_fs)
print("\n DenseNet + Feature Selection")
print("Accuracy:", round(accuracy_score(y_test, y_pred), 3))
print("Macro-F1:", round(f1_score(y_test, y_pred, average='macro'), 3))
print(classification_report(y_test, y_pred, zero_division=0))


 Best params: {'clf__model__dropout': 0.2, 'clf__model__hidden_layers': 3, 'clf__model__hidden_units': 128, 'clf__model__lr': 0.001}
Best CV Macro-F1: 0.234

 DenseNet + Feature Selection
Accuracy: 0.266
Macro-F1: 0.246
              precision    recall  f1-score   support

           0       0.14      0.11      0.12        28
           1       0.25      0.23      0.24        30
           2       0.43      0.45      0.44        40
           3       0.23      0.35      0.28        23
           4       0.27      0.33      0.30        27
           5       0.21      0.30      0.24        20
           6       0.20      0.06      0.10        31

    accuracy                           0.27       199
   macro avg       0.25      0.26      0.25       199
weighted avg       0.26      0.27      0.25       199



### Train and evaluate DenseNet (with response_id)

#### Prepare dataset with response_id

In [68]:
df_r_all= pd.read_csv("../data/combined_openl3_audiofeatures.csv")
df_r = drop_l3_embeddings(df_r_all)
df_r = df_r.drop(columns="track_id")

df_r['strategy'] = df_r['strategy'].apply(
    lambda s: literal_eval(s) if isinstance(s, str) and s.strip().startswith('[') else s
)

def process_multi_strategies(x):
    """
    If songs occur more than once, but for the same strategy (e.g. [('solace', 2)]) keep the strategy
    """
    if isinstance(x,str):
        # keep current x
        return x
    elif isinstance(x,list) and len(x) == 1:
        new_x = x[0][0]
        return new_x
    elif isinstance(x,list) and len(x) > 1:
        return np.nan

df_r['strategy'] = df_r['strategy'].apply(process_multi_strategies)

# drop tracks corresponding to multiple strategies
df_r = df_r.dropna(subset=['strategy'])

X_r = df_r.drop(columns='strategy')
y_r = df_r['strategy']

# Encode strategies into numerical labels
label_mapping = { 
    0:'discharge', 
    1:'diversion', 
    2:'entertainment', 
    3:'mental_work',
    4:'revival', 
    5:'solace', 
    6:'strong_sensation'
}

inverse_label_mapping = {v: k for k, v in label_mapping.items()}
y_r_enc = y_r.map(inverse_label_mapping)

X_train_r, X_test_r, y_train_r, y_test_r = train_test_split(X_r, y_r_enc, train_size=0.8, random_state=42, shuffle=True, stratify=y_r)

print("Train shape:", X_train_r.shape)
print("Test shape:", X_test_r.shape)
print("\nLabel distribution in training set:")
print(y_train_r.value_counts(normalize=True))

Train shape: (796, 70)
Test shape: (199, 70)

Label distribution in training set:
strategy
2    0.201005
6    0.154523
1    0.153266
0    0.141960
4    0.136935
3    0.115578
5    0.096734
Name: proportion, dtype: float64


#### Train and evaluate

In [72]:
cat_cols = selector(dtype_include=['object', 'category', 'string'])(X_train_r)
num_cols = selector(dtype_include=['number', 'bool'])(X_train_r)

preprocess = ColumnTransformer(
    transformers=[
        ("cat", OneHotEncoder(handle_unknown="ignore", sparse_output=False), cat_cols),
        ("num", Pipeline([
            ("scaler", StandardScaler()), 
        ]), num_cols),
    ],
)

pipe_ds_ffnn_r = Pipeline([
    ("preprocess", preprocess), 
    ("clf", KerasClassifier(
        model=build_ds_ffnn,  
        epochs=80,
        batch_size=64,
        verbose=0,
        validation_split=0.15,
        callbacks=[keras.callbacks.EarlyStopping(
            monitor="val_loss", patience=10, restore_best_weights=True
        )]
    ))
])

param_grid_ds_ffnn_r = {
    "clf__model__hidden_units": [64, 128, 256],
    "clf__model__hidden_layers": [3, 4, 5],
    "clf__model__dropout": [0, 0.1, 0.2],
    "clf__model__lr": [1e-3, 1e-4],
}

gs_ds_ffnn_r = GridSearchCV(
    pipe_ds_ffnn_r, param_grid_ds_ffnn_r,
    scoring="f1_macro", cv=cv, n_jobs=-1, verbose=0
)

gs_ds_ffnn_r.fit(X_train_r, y_train_r, clf__class_weight=class_weight)



In [74]:
best = gs_ds_ffnn_r.best_estimator_
print("\n Best params:", gs_ds_ffnn_r.best_params_)
print("Best CV Macro-F1:", round(gs_ds_ffnn_r.best_score_, 3))

y_pred = best.predict(X_test_r)
print("\n DenseNet with response_id")
print("Accuracy:", round(accuracy_score(y_test_r, y_pred), 3))
print("Macro-F1:", round(f1_score(y_test_r, y_pred, average='macro'), 3))
print(classification_report(y_test_r, y_pred, zero_division=0))


 Best params: {'clf__model__dropout': 0.2, 'clf__model__hidden_layers': 3, 'clf__model__hidden_units': 256, 'clf__model__lr': 0.001}
Best CV Macro-F1: 0.243

 DenseNet with response_id
Accuracy: 0.281
Macro-F1: 0.256
              precision    recall  f1-score   support

           0       0.21      0.11      0.14        28
           1       0.28      0.23      0.25        30
           2       0.41      0.53      0.46        40
           3       0.28      0.22      0.24        23
           4       0.24      0.37      0.29        27
           5       0.17      0.25      0.20        20
           6       0.24      0.16      0.19        31

    accuracy                           0.28       199
   macro avg       0.26      0.27      0.26       199
weighted avg       0.27      0.28      0.27       199

