In [2]:
import json, math, time, numpy as np, pandas as pd
from pathlib import Path
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score, mean_absolute_error, accuracy_score

import torch, torch.nn as nn
from torch.utils.data import Dataset, DataLoader

# Paths
ROOT = Path.cwd().parents[1]  # 'web/model-training/notebooks' -> ROOT='web'
PARQUET = ROOT / "model-training" / "output" / "embeddings_panns.parquet"
OUT_DIR = ROOT / "model-training" / "output"
OUT_DIR.mkdir(parents=True, exist_ok=True)

MODEL_PATH   = OUT_DIR / "multitask_head.pt"
SCALERS_JSON = OUT_DIR / "regression_scalers.json"
CLASSMAP_JSON= OUT_DIR / "class_maps.json"
METRICS_JSON = OUT_DIR / "metrics_multitask.json"

# Training config
RANDOM_SEED = 42
BATCH_SIZE  = 64
EPOCHS      = 20
LR          = 1e-3
WEIGHT_DECAY= 1e-4
HIDDEN      = 512
DROPOUT     = 0.2
device = torch.device("cpu")  # keep CPU for stability

# Targets
REG_COLS = [
    "acousticness","danceability","energy","instrumentalness","liveness",
    "loudness","speechiness","tempo","valence",
    "duration_ms"  # weak signal (we’ll down‑weight)
]
CLS_COLS = ["key","mode","time_signature"]

LOSS_WEIGHTS = {c:1.0 for c in REG_COLS}
LOSS_WEIGHTS["duration_ms"] = 0.2  # deemphasize duration (previews are ~30s)

In [4]:
# CELL 2 (FIXED): normalize label columns, standardize regression targets, build class maps

import re
import numpy as np
import pandas as pd

df = pd.read_parquet(PARQUET)
emb_cols = [c for c in df.columns if c.startswith("e")]
X = df[emb_cols].values.astype("float32")

# --- Helpers to normalize categorical labels ---

KEY_TO_INT = {
    "C":0, "C#":1, "D":2, "D#":3, "E":4, "F":5, "F#":6, "G":7, "G#":8, "A":9, "A#":10, "B":11
}
FLAT_TO_SHARP = {"DB":"C#", "EB":"D#", "GB":"F#", "AB":"G#", "BB":"A#"}

def normalize_key(val):
    """
    Accepts strings like 'C#', 'Db', 'A♭', 'C#/Db', 'C major', 'g#', etc.
    Returns int 0..11 or np.nan.
    """
    if pd.isna(val): return np.nan
    s = str(val).strip()
    # unify unicode accidentals
    s = s.replace("♯", "#").replace("♭", "b")
    # take first token (drop things like "C major" or "C#/Db")
    s = re.split(r"[\/\s]+", s)[0]
    s = s.upper()

    # map flats to sharps
    if s in FLAT_TO_SHARP:
        s = FLAT_TO_SHARP[s]

    # sometimes datasets include weird tags like 'N' or '-1'
    if s in KEY_TO_INT:
        return KEY_TO_INT[s]

    # if it already looks numeric (e.g., '0'..'11'), keep it
    try:
        v = int(s)
        if 0 <= v <= 11:
            return v
    except Exception:
        pass
    return np.nan

def normalize_mode(val):
    """
    Accepts 'Major'/'Minor', 'major'/'minor', 1/0, '1'/'0'.
    Spotify convention: major=1, minor=0.
    """
    if pd.isna(val): return np.nan
    s = str(val).strip().lower()
    if s in {"major", "maj"}: return 1
    if s in {"minor", "min"}: return 0
    # numeric-ish
    try:
        v = int(float(s))
        if v in (0, 1): return v
    except Exception:
        pass
    return np.nan

def normalize_time_signature(val):
    """
    Accepts '4/4', '3/4', '5', 4, etc. Returns int top-number (e.g., 4).
    """
    if pd.isna(val): return np.nan
    s = str(val).strip()
    m = re.search(r"\d+", s)
    if m:
        try:
            return int(m.group(0))
        except Exception:
            return np.nan
    return np.nan

# --- Apply normalization to categorical targets ---
df["key_norm"]  = df["key"].apply(normalize_key)              if "key" in df.columns  else np.nan
df["mode_norm"] = df["mode"].apply(normalize_mode)            if "mode" in df.columns else np.nan
df["tsig_norm"] = df["time_signature"].apply(normalize_time_signature) if "time_signature" in df.columns else np.nan

# --- Build mask for rows that have all required targets present ---
needed = REG_COLS + ["key_norm", "mode_norm", "tsig_norm"]
mask = df[needed].notna().all(axis=1)
df = df[mask].reset_index(drop=True)

# Update X after filtering
X = df[emb_cols].values.astype("float32")

print(f"After normalization & filtering, rows = {len(df)}")
print("Key class examples:", sorted(pd.unique(df['key_norm']))[:12])
print("Mode class examples:", sorted(pd.unique(df['mode_norm'])))
print("Time sig examples:", sorted(pd.unique(df['tsig_norm'])))

# --- Standardize regression targets (with safe transforms) ---
reg_means, reg_stds, Y_reg = {}, {}, {}
for c in REG_COLS:
    y = df[c].astype("float32").values
    # transforms for skewed targets
    if c == "tempo":
        y = np.log1p(np.clip(y, 0, None))         # log(BPM)
    if c == "duration_ms":
        y = np.log1p(np.clip(y, 1.0, None))       # previews make this weak; still normalize
    # loudness: already in dB; just standardize
    m, s = float(y.mean()), float(y.std() + 1e-8)
    reg_means[c], reg_stds[c] = m, s
    Y_reg[c] = ((y - m) / s).astype("float32")

# --- Build class maps (value -> index) for categorical targets ---
class_maps, Y_cls = {}, {}

# key
key_classes = sorted(pd.unique(df["key_norm"]).astype(int).tolist())
key_c2i = {v:i for i,v in enumerate(key_classes)}
class_maps["key"] = {"classes": key_classes, "cls_to_idx": key_c2i}
Y_cls["key"] = np.array([key_c2i[int(v)] for v in df["key_norm"].values], dtype="int64")

# mode
mode_classes = sorted(pd.unique(df["mode_norm"]).astype(int).tolist())
mode_c2i = {v:i for i,v in enumerate(mode_classes)}
class_maps["mode"] = {"classes": mode_classes, "cls_to_idx": mode_c2i}
Y_cls["mode"] = np.array([mode_c2i[int(v)] for v in df["mode_norm"].values], dtype="int64")

# time_signature
tsig_classes = sorted(pd.unique(df["tsig_norm"]).astype(int).tolist())
tsig_c2i = {v:i for i,v in enumerate(tsig_classes)}
class_maps["time_signature"] = {"classes": tsig_classes, "cls_to_idx": tsig_c2i}
Y_cls["time_signature"] = np.array([tsig_c2i[int(v)] for v in df["tsig_norm"].values], dtype="int64")

print("Class sizes:", {k: len(v["classes"]) for k,v in class_maps.items()})

# --- Train/val split (stratify by key to keep pitch distribution balanced) ---
from sklearn.model_selection import train_test_split
X_tr, X_va, idx_tr, idx_va = train_test_split(
    X, np.arange(len(X)), test_size=0.2, random_state=RANDOM_SEED, stratify=Y_cls["key"]
)

# Helper to split dicts of arrays based on indices
def split_dict(d, tr_idx, va_idx):
    return {k: v[tr_idx] for k,v in d.items()}, {k: v[va_idx] for k,v in d.items()}

Yreg_tr, Yreg_va = split_dict(Y_reg, idx_tr, idx_va)
Ycls_tr, Ycls_va = split_dict(Y_cls, idx_tr, idx_va)

print("Train size:", len(idx_tr), " Val size:", len(idx_va))
print("Embedding dim:", X.shape[1])

After normalization & filtering, rows = 6000
Key class examples: [np.int64(0), np.int64(1), np.int64(2), np.int64(3), np.int64(4), np.int64(5), np.int64(6), np.int64(7), np.int64(8), np.int64(9), np.int64(10), np.int64(11)]
Mode class examples: [np.int64(0), np.int64(1)]
Time sig examples: [np.int64(1), np.int64(3), np.int64(4), np.int64(5)]
Class sizes: {'key': 12, 'mode': 2, 'time_signature': 4}
Train size: 4800  Val size: 1200
Embedding dim: 2049


In [5]:
# CELL 3: PyTorch Dataset/DataLoader and multi-task model definition

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

# --- Dataset that returns x plus each target by name ---
class MTSet(Dataset):
    def __init__(self, X, yreg, ycls):
        self.X = X
        self.yreg = yreg
        self.ycls = ycls
        self.reg_cols = list(yreg.keys())
        self.cls_cols = list(ycls.keys())
    def __len__(self): 
        return len(self.X)
    def __getitem__(self, i):
        item = {"x": self.X[i]}
        # standardized regression targets
        for c in self.reg_cols:
            item[f"reg_{c}"] = self.yreg[c][i]
        # class indices for classification targets
        for c in self.cls_cols:
            item[f"cls_{c}"] = self.ycls[c][i]
        return item

train_ds = MTSet(X_tr, Yreg_tr, Ycls_tr)
val_ds   = MTSet(X_va, Yreg_va, Ycls_va)

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, drop_last=False)
val_loader   = DataLoader(val_ds, batch_size=BATCH_SIZE, shuffle=False, drop_last=False)

# --- Simple heads ---
class HeadReg(nn.Module):
    def __init__(self, d_in):
        super().__init__()
        self.fc = nn.Linear(d_in, 1)
    def forward(self, h):
        return self.fc(h).squeeze(1)  # shape: (B,)

class HeadCls(nn.Module):
    def __init__(self, d_in, n_classes):
        super().__init__()
        self.fc = nn.Linear(d_in, n_classes)
    def forward(self, h):
        return self.fc(h)  # logits shape: (B, C)

# --- Multi-task model: shared trunk + per-task heads ---
class MTModel(nn.Module):
    def __init__(self, d_in, hidden, dropout, class_maps, reg_cols):
        super().__init__()
        self.trunk = nn.Sequential(
            nn.Linear(d_in, hidden),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden, hidden // 2),
            nn.ReLU(),
        )
        h = hidden // 2
        self.reg_cols = list(reg_cols)
        self.cls_cols = list(class_maps.keys())
        # create a tiny head per target
        self.reg_heads = nn.ModuleDict({c: HeadReg(h) for c in self.reg_cols})
        self.cls_heads = nn.ModuleDict({c: HeadCls(h, len(class_maps[c]["classes"])) for c in self.cls_cols})

    def forward(self, x):
        h = self.trunk(x)
        out = {}
        for c, head in self.reg_heads.items():
            out[f"reg_{c}"] = head(h)
        for c, head in self.cls_heads.items():
            out[f"cls_{c}"] = head(h)
        return out

# Instantiate model & training components
device = torch.device("cpu")  # keep CPU for stability on M1
model = MTModel(X.shape[1], HIDDEN, DROPOUT, class_maps, REG_COLS).to(device)

opt = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY)
reg_loss = nn.SmoothL1Loss()
cls_loss = nn.CrossEntropyLoss()

print(model)

MTModel(
  (trunk): Sequential(
    (0): Linear(in_features=2049, out_features=512, bias=True)
    (1): ReLU()
    (2): Dropout(p=0.2, inplace=False)
    (3): Linear(in_features=512, out_features=256, bias=True)
    (4): ReLU()
  )
  (reg_heads): ModuleDict(
    (acousticness): HeadReg(
      (fc): Linear(in_features=256, out_features=1, bias=True)
    )
    (danceability): HeadReg(
      (fc): Linear(in_features=256, out_features=1, bias=True)
    )
    (energy): HeadReg(
      (fc): Linear(in_features=256, out_features=1, bias=True)
    )
    (instrumentalness): HeadReg(
      (fc): Linear(in_features=256, out_features=1, bias=True)
    )
    (liveness): HeadReg(
      (fc): Linear(in_features=256, out_features=1, bias=True)
    )
    (loudness): HeadReg(
      (fc): Linear(in_features=256, out_features=1, bias=True)
    )
    (speechiness): HeadReg(
      (fc): Linear(in_features=256, out_features=1, bias=True)
    )
    (tempo): HeadReg(
      (fc): Linear(in_features=256, out_feat

In [6]:
# CELL 4: training + validation loop with per-target metrics and best-model saving

from sklearn.metrics import mean_absolute_error, r2_score, accuracy_score
import numpy as np
import torch

def train_epoch():
    model.train()
    total_loss, total_count = 0.0, 0
    for batch in train_loader:
        x = torch.as_tensor(batch["x"], dtype=torch.float32, device=device)
        out = model(x)

        # total multi-task loss = sum(regression heads) + sum(classification heads)
        loss = 0.0
        # regression heads: standardized targets
        for c in REG_COLS:
            y = torch.as_tensor(batch[f"reg_{c}"], dtype=torch.float32, device=device)
            loss = loss + LOSS_WEIGHTS[c] * reg_loss(out[f"reg_{c}"], y)
        # classification heads: class indices
        for c in CLS_COLS:
            y = torch.as_tensor(batch[f"cls_{c}"], dtype=torch.long, device=device)
            loss = loss + cls_loss(out[f"cls_{c}"], y)

        opt.zero_grad()
        loss.backward()
        opt.step()

        total_loss += loss.item() * x.size(0)
        total_count += x.size(0)
    return total_loss / total_count


@torch.no_grad()
def eval_epoch():
    model.eval()
    total_loss, total_count = 0.0, 0

    # store predictions/targets to compute metrics after the loop
    preds_reg = {c: [] for c in REG_COLS}
    trues_reg = {c: [] for c in REG_COLS}
    preds_cls = {c: [] for c in CLS_COLS}
    trues_cls = {c: [] for c in CLS_COLS}

    for batch in val_loader:
        x = torch.as_tensor(batch["x"], dtype=torch.float32, device=device)
        out = model(x)

        loss = 0.0
        for c in REG_COLS:
            y = torch.as_tensor(batch[f"reg_{c}"], dtype=torch.float32, device=device)
            p = out[f"reg_{c}"]
            loss += LOSS_WEIGHTS[c] * reg_loss(p, y)
            preds_reg[c].append(p.cpu().numpy())
            trues_reg[c].append(y.cpu().numpy())

        for c in CLS_COLS:
            y = torch.as_tensor(batch[f"cls_{c}"], dtype=torch.long, device=device)
            p = out[f"cls_{c}"]
            loss += cls_loss(p, y)
            preds_cls[c].append(p.argmax(1).cpu().numpy())
            trues_cls[c].append(y.cpu().numpy())

        total_loss += loss.item() * x.size(0)
        total_count += x.size(0)

    # aggregate metrics
    metrics = {"loss": total_loss / total_count}

    # Regression metrics in ORIGINAL units: invert standardization & any log transforms
    for c in REG_COLS:
        y_std = np.concatenate(trues_reg[c])
        p_std = np.concatenate(preds_reg[c])
        m, s = reg_means[c], reg_stds[c]
        y = y_std * s + m
        p = p_std * s + m
        if c == "tempo":
            y = np.expm1(y); p = np.expm1(p)
        if c == "duration_ms":
            y = np.expm1(y); p = np.expm1(p)
        metrics[f"{c}_MAE"] = float(mean_absolute_error(y, p))
        metrics[f"{c}_R2"]  = float(r2_score(y, p))

    # Classification accuracy
    for c in CLS_COLS:
        y = np.concatenate(trues_cls[c])
        p = np.concatenate(preds_cls[c])
        metrics[f"{c}_ACC"] = float(accuracy_score(y, p))

    return metrics


# ---- Run training ----
best = None
for epoch in range(1, EPOCHS + 1):
    tr_loss = train_epoch()
    val = eval_epoch()

    # concise status line (you can print more metrics if you want)
    status = (f"Epoch {epoch:02d} | train {tr_loss:.4f} | val {val['loss']:.4f} | "
              f"key_ACC {val['key_ACC']:.3f} | mode_ACC {val['mode_ACC']:.3f} | "
              f"time_sig_ACC {val['time_signature_ACC']:.3f}")
    print(status)

    # save best by validation loss
    if best is None or val["loss"] < best["loss"]:
        best = {"epoch": epoch, **val}
        torch.save(model.state_dict(), MODEL_PATH)

print("\nBest (by val loss):", best)

Epoch 01 | train 5.8961 | val 5.4085 | key_ACC 0.138 | mode_ACC 0.664 | time_sig_ACC 0.881
Epoch 02 | train 5.4147 | val 5.3610 | key_ACC 0.143 | mode_ACC 0.665 | time_sig_ACC 0.877
Epoch 03 | train 5.3268 | val 5.3159 | key_ACC 0.132 | mode_ACC 0.653 | time_sig_ACC 0.874
Epoch 04 | train 5.2712 | val 5.2869 | key_ACC 0.146 | mode_ACC 0.660 | time_sig_ACC 0.874
Epoch 05 | train 5.1993 | val 5.2839 | key_ACC 0.147 | mode_ACC 0.665 | time_sig_ACC 0.874
Epoch 06 | train 5.1462 | val 5.2250 | key_ACC 0.147 | mode_ACC 0.673 | time_sig_ACC 0.876
Epoch 07 | train 5.0864 | val 5.2995 | key_ACC 0.147 | mode_ACC 0.634 | time_sig_ACC 0.873
Epoch 08 | train 5.0454 | val 5.1902 | key_ACC 0.138 | mode_ACC 0.655 | time_sig_ACC 0.880
Epoch 09 | train 4.9868 | val 5.1525 | key_ACC 0.143 | mode_ACC 0.673 | time_sig_ACC 0.880
Epoch 10 | train 4.9441 | val 5.1726 | key_ACC 0.131 | mode_ACC 0.637 | time_sig_ACC 0.874
Epoch 11 | train 4.9035 | val 5.1434 | key_ACC 0.128 | mode_ACC 0.667 | time_sig_ACC 0.876

In [7]:
# CELL 5: save scalers, class maps, and best metrics for inference

import json
from pathlib import Path

# Save regression scalers (mean/std per target) for de-standardization at inference
with open(SCALERS_JSON, "w") as f:
    json.dump({"mean": reg_means, "std": reg_stds}, f, indent=2)

# Save class maps (actual class values ↔ indices)
with open(CLASSMAP_JSON, "w") as f:
    json.dump(class_maps, f, indent=2)

# Save metrics summary
with open(METRICS_JSON, "w") as f:
    json.dump(best, f, indent=2)

print(f"💾 Saved model:      {MODEL_PATH}")
print(f"💾 Saved scalers:    {SCALERS_JSON}")
print(f"💾 Saved class maps: {CLASSMAP_JSON}")
print(f"📈 Metrics:          {METRICS_JSON}")

💾 Saved model:      /Users/prajeetdarda/Desktop/All_Coding/AI-Project/web/model-training/output/multitask_head.pt
💾 Saved scalers:    /Users/prajeetdarda/Desktop/All_Coding/AI-Project/web/model-training/output/regression_scalers.json
💾 Saved class maps: /Users/prajeetdarda/Desktop/All_Coding/AI-Project/web/model-training/output/class_maps.json
📈 Metrics:          /Users/prajeetdarda/Desktop/All_Coding/AI-Project/web/model-training/output/metrics_multitask.json


In [8]:
# CELL 6: quick prediction on one validation sample (emb → feature dict)

import numpy as np
import torch

@torch.no_grad()
def predict_features_from_emb(emb_vec):
    x = torch.as_tensor(emb_vec[None, :], dtype=torch.float32, device=device)
    model.eval()
    out = model(x)
    # regressions
    reg_pred = {}
    for c in REG_COLS:
        p_std = out[f"reg_{c}"].cpu().numpy()[0]
        m, s = reg_means[c], reg_stds[c]
        val = float(p_std * s + m)
        if c == "tempo":        val = float(np.expm1(val))
        if c == "duration_ms":  val = float(np.expm1(val))
        reg_pred[c] = val
    # classifications
    cls_pred = {}
    for c in CLS_COLS:
        logits = out[f"cls_{c}"].cpu().numpy()[0]
        idx = int(np.argmax(logits))
        cls_val = class_maps[c]["classes"][idx]
        cls_pred[c] = int(cls_val)
    return {**reg_pred, **cls_pred}

sample_pred = predict_features_from_emb(X_va[0])
sample_pred

{'acousticness': 0.6405558586120605,
 'danceability': 0.515924334526062,
 'energy': 0.36192819476127625,
 'instrumentalness': 0.21889616549015045,
 'liveness': 0.11445026844739914,
 'loudness': -11.277266502380371,
 'speechiness': 0.03460095077753067,
 'tempo': 118.63714923378535,
 'valence': 0.3236296772956848,
 'duration_ms': 226190.83729823618,
 'key': 0,
 'mode': 0,
 'time_signature': 4}