In [45]:
!pip install --upgrade pip



In [46]:
!pip install pandas



In [47]:
!pip install joblib
!pip install scikit-learn



In [None]:
# =========================
# STEP 0 — Setup
# =========================
# Optional installs:
# pip install numpy pandas scikit-learn xgboost lightgbm joblib pyarrow fastparquet

import os, re, ast, json, math, joblib
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import (
    accuracy_score, f1_score, classification_report, confusion_matrix
)
from sklearn.pipeline import Pipeline
from sklearn.neural_network import MLPClassifier
from sklearn.utils.class_weight import compute_class_weight

# ---- Paths / config ----
DATA_PATH = "/Users/rohan/Documents/Research/distrubution_data_ml/Compiled_Networks.csv"
LABEL_COL = "NeuronType"  #The thing you’re trying to predict (your target/class).

columns_to_process = [
    "Burst_Peak_List",
    "Abs_Burst_Peak_List",
    "Burst_Times_List",      # used for timing, L, bursts/min
    "IBI_List",
    "SpikesPerBurst_List",    #columns_to_process = [...]
                            #The only input columns you want to use. Each is a “list-like” column (arrays per recording).
]

TIME_COL = "Burst_Times_List"   # column with burst times
N_BINS = 10                     # histogram bins per feature

assert os.path.exists(DATA_PATH), f"File not found: {DATA_PATH}"
df = pd.read_csv(DATA_PATH)

# Each row = a recording (no unique id present) → synthesize one
df = df.reset_index(drop=True)
df["recording_id"] = [f"row_{i:06d}" for i in range(len(df))]

# Sanity checks
missing = [c for c in [LABEL_COL] + columns_to_process if c not in df.columns]
assert not missing, f"Missing columns in CSV: {missing}"

df[[LABEL_COL] + columns_to_process].head()


Unnamed: 0,NeuronType,Burst_Peak_List,Abs_Burst_Peak_List,Burst_Times_List,IBI_List,SpikesPerBurst_List
0,MxHEMI,"14.6441,3.9478,2.4697,4.4859,3.7445,1.552,1.52...","14995.5074,4042.5851,2528.9483,4593.6003,3834....","0.5,0.4,0.5,0.4,0.5,0.6,0.5,0.7,0.5,0.6,0.6,0....","21,7.9,21.8,13.3,5.1,11.3,13.7,24.3,22.2,3.3,3...","7349,1673,1135,1915,1842,839,740,1049,7600,850..."
1,MxWT,8.4741,8677.4995,0.5,,4043
2,FxHET,"1.4235,6.7396,6.7726,6.6778","1457.6317,6901.3812,6935.1657,6838.0787","0.6,0.5,0.4,0.5","7.1,83,85.4",814328628603139
3,MxHEMI,"12.4079,2.199,6.2069,4.4418,1.8126,1.8054,3.72...","12705.7113,2251.7803,6355.8459,4548.4475,1856....","0.5,0.7,0.4,0.4,0.4,0.7,0.6,0.5,0.5,0.5,0.6,0....","24.5,25.7,16.4,5.3,25.2,15.3,14.6,21.5,23.3,17...","6015,1387,2649,1885,762,1115,2030,5998,886,172..."
4,MxWT,8.3836,8584.808,0.5,,4153


In [49]:
# ---- Input paths ----
DATA_PATH = "/Users/rohan/Documents/Research/distrubution_data_ml/Compiled_Networks.csv"

In [50]:
df = pd.read_csv(DATA_PATH)
df.head()

Unnamed: 0,Run_ID,DIV,Assay,Well,NeuronType,Time,Chip_ID,mean_IBI,cov_IBI,mean_Burst_Peak,...,LogISI_NumBursts,LogISI_MeanBurstDur,LogISI_CV_BurstDur,LogISI_MeanSpikes,LogISI_CV_Spikes,LogISI_MeanIBI,LogISI_CV_IBI,LogISI_BurstIBIList,LogISI_BurstPeaks,LogISI_BurstDuration
0,6,5,Network Today,1,MxHEMI,20-May-2024 10:06:26,M07039,15.447368,61.446644,4.364867,...,3,0.701933,14.47591,14.611772,3.712001,91.68895,41.539625,"118.6207,64.7572","14.6441,15.1373,14.054","0.7291,0.5895,0.7872"
1,6,5,Network Today,2,MxWT,20-May-2024 10:06:26,M07039,,,8.474121,...,1,0.344,0.0,8.474121,0.0,,,,8.4741,0.344
2,6,5,Network Today,3,FxHET,20-May-2024 10:06:26,M07039,58.5,76.119449,5.403383,...,3,0.268267,9.218217,6.730021,0.715162,84.1856,2.008796,"82.9898,85.3814","6.7396,6.7726,6.6778","0.2911,0.242,0.2717"
3,6,5,Network Today,4,MxHEMI,20-May-2024 10:06:26,M07039,22.025,56.108645,5.103559,...,3,0.6506,6.362262,12.529625,0.861041,123.06435,4.469738,"126.9539,119.1748","12.4079,12.6135,12.5674","0.6474,0.6935,0.6109"
4,6,5,Network Today,5,MxWT,20-May-2024 10:06:26,M07039,,,8.383602,...,1,0.4419,0.0,8.383602,0.0,,,,8.3836,0.4419


In [52]:

# Only these columns are used (lists per recording)
columns_to_process = [
    "Burst_Peak_List",
    "Abs_Burst_Peak_List",
    "Burst_Times_List",      # used for timing, L, bursts/min
    "IBI_List",
    "SpikesPerBurst_List",
]

In [None]:
# =========================
# STEP 1 — Parse list columns into numeric arrays
# =========================
def parse_list_cell(x): 
    if pd.isna(x):
        return np.array([], dtype=float)
    if isinstance(x, (list, tuple, np.ndarray)):
        return np.array(x, dtype=float) #If it’s already a Python list/tuple/NumPy array → just cast to float and return it.
    s = str(x).strip()
    if s == "" or s.lower() in {"nan", "none"}:
        return np.array([], dtype=float)
    # Try Python/JSON literal like "[1,2,3]"
    try:
        v = ast.literal_eval(s) #Try ast.literal_eval(s): 
        # this safely parses strings that look like Python/JSON lists, e.g. "[1, 2, 3]" → [1,2,3].
        if isinstance(v, (list, tuple, np.ndarray)):
            return np.array(v, dtype=float)
    except Exception:
        pass
    # Fallback: comma/space separated
    try:
        toks = [t for t in re.split(r"[,\s]+", s) if t != ""]
        return np.array([float(t) for t in toks], dtype=float)
    except Exception:
        return np.array([], dtype=float)

parsed = {c: [] for c in columns_to_process} #Make a dictionary whose keys are your five columns, 
                                            #and whose values will become lists of arrays (one array per row).
rec_ids = df["recording_id"].tolist() #Keep the synthetic recording IDs in a list, same order as the rows. We’ll use this to keep features lined up with rows later.

for _, row in df.iterrows():
    for c in columns_to_process:
        parsed[c].append(parse_list_cell(row[c]))


In [None]:
# =========================
# STEP 2 — Global histogram bins per feature
# =========================
def global_edges(all_arrays, n_bins=N_BINS):
    """
    Build a single set of histogram bin edges for one feature,
    shared by ALL recordings. This lets every recording's histogram
    be comparable (same bin edges for everyone).

    Parameters
    ----------
    all_arrays : list[np.ndarray]
        For one feature (e.g., IBI_List), this is a list of 1D arrays,
        one array per recording.
    n_bins : int
        Number of histogram bins to produce.

    Returns
    -------
    np.ndarray
        Array of length (n_bins + 1) with bin edges (monotonically increasing).
    """
    # Keep only proper, non-empty numpy arrays (skip Nones/empties)
    vals = [a for a in all_arrays if isinstance(a, np.ndarray) and a.size]
    if not vals:
        # If literally no data at all for this feature, return a safe default
        # edges from 0 to 1 split into n_bins bins.
        return np.linspace(0, 1, n_bins + 1)

    # Concatenate values from all recordings to find a global min/max range
    concat = np.concatenate(vals)

    # Compute global min/max while ignoring NaNs
    vmin, vmax = float(np.nanmin(concat)), float(np.nanmax(concat))

    # If min/max are not finite (all NaNs?) or equal (constant feature),
    # build a small padded range so bins aren't degenerate
    if not np.isfinite(vmin) or not np.isfinite(vmax) or vmin == vmax:
        # Fallback to a basic 0..1 range if min/max are not finite
        vmin = vmin if np.isfinite(vmin) else 0.0
        vmax = vmax if np.isfinite(vmax) else 1.0
        # Add a tiny pad on both sides so we can still create n_bins intervals
        return np.linspace(vmin - 0.5, vmax + 0.5, n_bins + 1)

    # Normal case: equal-width bins spanning [vmin, vmax]
    return np.linspace(vmin, vmax, n_bins + 1)

# Build a dict: for each feature column, compute its shared bin edges once.
# Example: hist_bins["IBI_List"] is a 1D array of edges used for that feature's histograms.
hist_bins = {c: global_edges(parsed[c], N_BINS) for c in columns_to_process}


In [None]:
# =========================
# STEP 3 — Feature helpers
# =========================
def median_absolute_deviation(x):
    """
    Compute MAD (Median Absolute Deviation) for a 1D numeric array x.
    MAD is robust to outliers compared to standard deviation.
    Returns NaN if x is empty.
    """
    if x.size == 0:
        return np.nan
    # Median of the data (ignores NaNs)
    med = np.nanmedian(x)
    # Median of absolute deviations from the median
    return float(np.nanmedian(np.abs(x - med)))

def robust_stats(x):
    """
    Given a 1D numeric array x (e.g., all IBI values for one recording),
    compute a set of *robust* summary stats:
      - location/scale: mean, std, median, MAD
      - range/quantiles: min, q10, q25, q50, q75, q90, max
      - shape: skew, kurtosis
    If x is empty, return all NaNs (we'll handle with indicators later).
    """
    if x.size == 0:
        # Return a dict of NaNs for all requested stats when no data
        return {k: np.nan for k in [
            "mean","std","median","mad","min","q10","q25","q50","q75","q90","max","skew","kurtosis"
        ]}
    # Wrap in a pandas Series for convenient stats; force float dtype
    s = pd.Series(x, dtype=float)

    # Build the output dict; use np.nan-safe versions where relevant
    out = {
        "mean": float(s.mean()),                 # average
        "std": float(s.std(ddof=1)),            # sample std (ddof=1)
        "median": float(s.median()),            # robust central tendency
        "mad": float(median_absolute_deviation(s.values)),  # robust spread
        "min": float(s.min()),                  # minimum
        "q10": float(s.quantile(0.10)),         # 10th percentile
        "q25": float(s.quantile(0.25)),         # 25th percentile
        "q50": float(s.quantile(0.50)),         # 50th (same as median)
        "q75": float(s.quantile(0.75)),         # 75th percentile
        "q90": float(s.quantile(0.90)),         # 90th percentile
        "max": float(s.max()),                  # maximum
        "skew": float(s.skew()),                # asymmetry of distribution
        "kurtosis": float(s.kurtosis()),        # tail heaviness / peakedness
    }
    return out

def hist_features(x, edges, prefix):
    """
    Turn the array x into a normalized histogram using *shared* bin edges.
    - edges: 1D array of length (bins+1) created globally for this feature.
    - prefix: base name to use for the output columns (e.g., "IBI_List").
    Returns a dict like { "<prefix>_hist_bin0": p0, "<prefix>_hist_bin1": p1, ... }.

    Design choice:
    - If x is empty, we emit NaNs (not zeros) so that later the
      '__isnan' indicators capture "no data present" explicitly.
    """
    # No data for this recording → mark all bins as NaN (will be imputed later)
    if x.size == 0:
        return {f"{prefix}_hist_bin{i}": np.nan for i in range(len(edges) - 1)}

    # Keep only finite values before binning
    counts, _ = np.histogram(x[np.isfinite(x)], bins=edges)

    # Normalize counts so they sum to 1 (probability mass per bin)
    total = counts.sum()
    if total > 0:
        counts = counts / total

    # Emit bin features as floats
    return {f"{prefix}_hist_bin{i}": float(counts[i]) for i in range(len(counts))}


In [None]:
# =========================
# STEP 4 — Build per-recording baseline features (ONLY from your five lists)
# =========================

FEATURE_ROWS = []  # we'll fill this with one feature-dict per recording (row)

for i, rec_id in enumerate(rec_ids):
    # ----- 1) TIMING + COUNT (L) from burst times -----
    # Grab the times array for this row (empty array if missing)
    times = parsed.get(TIME_COL, [np.array([], dtype=float)])[i]
    # Sort so first/last times make sense
    times = np.sort(times) if times.size else times

    # L = number of bursts.
    # Prefer the number of time stamps; if times are missing,
    # fall back to the largest length among the other lists.
    fallback_lengths = [parsed[c][i].size for c in columns_to_process if parsed[c][i] is not None]
    L = int(times.size) if times.size else (int(max(fallback_lengths)) if fallback_lengths else 0)

    # First/last burst times (NaN if no times)
    first_t = float(times[0]) if times.size else np.nan
    last_t  = float(times[-1]) if times.size else np.nan

    # Span of the recording ≈ last - first (seconds). If no times, NaN.
    span = float(max(0.0, last_t - first_t)) if times.size else np.nan

    # Bursts per minute = L divided by (span in minutes), only if span > 0.
    bursts_per_min = (L / (span / 60.0)) if (times.size and span > 0) else np.nan

    # ----- 2) START A FEATURE DICT FOR THIS RECORDING -----
    # We don't have per-burst durations in your five columns,
    # so fraction_time_active/duty_cycle are left as NaN (we'll add
    # missingness indicators + impute later so the model can still use them).
    feat = {
        "recording_id": rec_id,                  # synthetic ID for tracking
        "L": L,                                  # count of bursts
        "bursts_per_min": bursts_per_min,        # rate
        "first_burst_time": first_t,             # timing: start
        "last_burst_time": last_t,               # timing: end
        "approx_recording_span_sec": span,       # timing: span
        "fraction_time_active": np.nan,          # needs per-burst durations (not provided)
        "duty_cycle": np.nan,                    # alias of fraction_time_active
        "has_bursts": 1 if L > 0 else 0,         # edge flag used later
    }

    # ----- 3) ADD DISTRIBUTION FEATURES FOR EACH LIST COLUMN -----
    # For each of your five list columns:
    #   - compute robust summary stats (mean/median/MAD/quantiles/etc.)
    #   - compute a normalized histogram (shape info) using shared bin edges
    for c in columns_to_process:
        x = parsed[c][i]                         # this row's array for feature c

        # Robust stats: returns a dict like {"mean": ..., "std": ..., "q25": ..., ...}
        for k, v in robust_stats(x).items():
            feat[f"{c}__{k}"] = v

        # Histogram bins: { "<c>_hist_bin0": p0, "<c>_hist_bin1": p1, ... }
        # If x is empty, these are NaN; later we'll add __isnan flags and impute.
        feat.update(hist_features(x, hist_bins[c], prefix=f"{c}"))

    # Save this recording's features
    FEATURE_ROWS.append(feat)

# Turn the list of dicts into a DataFrame: 1 row per recording, many feature columns
feat_table = pd.DataFrame(FEATURE_ROWS)

# Quick peek to make sure it looks reasonable
feat_table.head()


Unnamed: 0,recording_id,L,bursts_per_min,first_burst_time,last_burst_time,approx_recording_span_sec,fraction_time_active,duty_cycle,has_bursts,Burst_Peak_List__mean,...,SpikesPerBurst_List_hist_bin0,SpikesPerBurst_List_hist_bin1,SpikesPerBurst_List_hist_bin2,SpikesPerBurst_List_hist_bin3,SpikesPerBurst_List_hist_bin4,SpikesPerBurst_List_hist_bin5,SpikesPerBurst_List_hist_bin6,SpikesPerBurst_List_hist_bin7,SpikesPerBurst_List_hist_bin8,SpikesPerBurst_List_hist_bin9
0,row_000000,20,3000.0,0.4,0.8,0.4,,,1,4.36487,...,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,row_000001,1,,0.5,0.5,0.0,,,1,8.4741,...,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,row_000002,4,1200.0,0.4,0.6,0.2,,,1,5.403375,...,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,row_000003,13,2600.0,0.4,0.7,0.3,,,1,5.103554,...,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,row_000004,1,,0.5,0.5,0.0,,,1,8.3836,...,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [None]:
# =========================
# STEP 5 — L=0-safe: missingness indicators + median impute
# =========================

# Start from the per-recording feature table we just built
X = feat_table.copy()

# 1) Add explicit "was missing?" flags for every numeric feature.
#    Why: If a feature is NaN (e.g., because a list was empty, L=0),
#    we want the model to *know* it was missing (this often carries signal).
#    We skip 'has_bursts' because it's already a binary flag we created.
num_cols = [c for c in X.columns if pd.api.types.is_numeric_dtype(X[c]) and c != "has_bursts"]
for c in num_cols:
    # Create a companion column like "<feature>__isnan" with 1 if NaN else 0
    X[f"{c}__isnan"] = X[c].isna().astype(int)

# 2) Median-impute all numeric NaNs so models can ingest a complete matrix.
#    Why median? It's simple, robust to outliers, and publication-friendly.
for c in num_cols:
    # If the column has any non-NaN values, use its median; otherwise fall back to 0.0
    med = float(np.nanmedian(X[c].values)) if X[c].notna().any() else 0.0
    X[c] = X[c].fillna(med)

# 3) Keep 'recording_id' around for reference/joins, but we won't feed it to the model.
#    (All remaining columns after 'recording_id' are numeric features + __isnan flags.)
X = X[["recording_id"] + [c for c in X.columns if c != "recording_id"]]

# 4) Pull the target labels aligned by row order (same order as df/feat_table).
#    Here, LABEL_COL = "NeuronType" from Step 0.
y = df[LABEL_COL].copy()

print("Shapes → X:", X.shape, " y:", y.shape)
X.head()  # quick peek at the model-ready feature matrix (+ missingness indicators)


Shapes → X: (186, 246)  y: (186,)


  X[f"{c}__isnan"] = X[c].isna().astype(int)
  X[f"{c}__isnan"] = X[c].isna().astype(int)
  X[f"{c}__isnan"] = X[c].isna().astype(int)
  X[f"{c}__isnan"] = X[c].isna().astype(int)
  X[f"{c}__isnan"] = X[c].isna().astype(int)
  X[f"{c}__isnan"] = X[c].isna().astype(int)
  X[f"{c}__isnan"] = X[c].isna().astype(int)
  X[f"{c}__isnan"] = X[c].isna().astype(int)
  X[f"{c}__isnan"] = X[c].isna().astype(int)
  X[f"{c}__isnan"] = X[c].isna().astype(int)
  X[f"{c}__isnan"] = X[c].isna().astype(int)
  X[f"{c}__isnan"] = X[c].isna().astype(int)
  X[f"{c}__isnan"] = X[c].isna().astype(int)
  X[f"{c}__isnan"] = X[c].isna().astype(int)
  X[f"{c}__isnan"] = X[c].isna().astype(int)
  X[f"{c}__isnan"] = X[c].isna().astype(int)
  X[f"{c}__isnan"] = X[c].isna().astype(int)
  X[f"{c}__isnan"] = X[c].isna().astype(int)
  X[f"{c}__isnan"] = X[c].isna().astype(int)
  X[f"{c}__isnan"] = X[c].isna().astype(int)
  X[f"{c}__isnan"] = X[c].isna().astype(int)
  X[f"{c}__isnan"] = X[c].isna().astype(int)
  X[f"{c}_

Unnamed: 0,recording_id,L,bursts_per_min,first_burst_time,last_burst_time,approx_recording_span_sec,fraction_time_active,duty_cycle,has_bursts,Burst_Peak_List__mean,...,SpikesPerBurst_List_hist_bin0__isnan,SpikesPerBurst_List_hist_bin1__isnan,SpikesPerBurst_List_hist_bin2__isnan,SpikesPerBurst_List_hist_bin3__isnan,SpikesPerBurst_List_hist_bin4__isnan,SpikesPerBurst_List_hist_bin5__isnan,SpikesPerBurst_List_hist_bin6__isnan,SpikesPerBurst_List_hist_bin7__isnan,SpikesPerBurst_List_hist_bin8__isnan,SpikesPerBurst_List_hist_bin9__isnan
0,row_000000,20,3000.0,0.4,0.8,0.4,0.0,0.0,1,4.36487,...,0,0,0,0,0,0,0,0,0,0
1,row_000001,1,4800.0,0.5,0.5,0.0,0.0,0.0,1,8.4741,...,0,0,0,0,0,0,0,0,0,0
2,row_000002,4,1200.0,0.4,0.6,0.2,0.0,0.0,1,5.403375,...,0,0,0,0,0,0,0,0,0,0
3,row_000003,13,2600.0,0.4,0.7,0.3,0.0,0.0,1,5.103554,...,0,0,0,0,0,0,0,0,0,0
4,row_000004,1,4800.0,0.5,0.5,0.0,0.0,0.0,1,8.3836,...,0,0,0,0,0,0,0,0,0,0


In [None]:
# =========================
# STEP 6 — Train/test split & label encoding
# =========================

# 1) Some rows might be missing the label (NeuronType). Drop those so training is clean.
mask = y.notna()
X = X.loc[mask].reset_index(drop=True)
y = y.loc[mask].reset_index(drop=True)

# 2) Keep the human-readable row ID for later (reports, joins), but DO NOT feed it into the model.
recording_ids = X["recording_id"].tolist()
X_model = X.drop(columns=["recording_id"])

# 3) Convert string labels like ["MxWT","FxHET","MxHEMI", ...] → integers [0,1,2,...].
#    Models need numeric targets; LabelEncoder stores the mapping so we can decode later.
le = LabelEncoder()
y_enc = le.fit_transform(y)

# 4) Split into training and test sets.
#    - test_size=0.25 → 25% test, 75% train
#    - stratify=y_enc → keep the class proportions similar in train and test (important for imbalance)
#    - random_state=42 → reproducible split
X_tr, X_te, y_tr, y_te = train_test_split(
    X_model, y_enc, test_size=0.25, random_state=42, stratify=y_enc
)

# 5) Compute class weights to handle class imbalance.
#    'balanced' gives higher weight to rare classes and lower to common ones.
classes = np.unique(y_tr)
class_weights = compute_class_weight(class_weight="balanced", classes=classes, y=y_tr)
class_weight_map = {cls: w for cls, w in zip(classes, class_weights)}

# 6) Turn class weights into per-sample weights (one weight for each training example).
#    Many models (e.g., LightGBM/XGBoost) can accept sample_weight during .fit(...)
sample_weight_tr = np.array([class_weight_map[c] for c in y_tr], dtype=float)

print("Classes:", list(le.classes_))       # The original class names in order of their encoded IDs
print("Class weights:", class_weight_map)  # e.g., {0: 0.8, 1: 1.3, 2: 1.9} → rarer classes get larger weights


Classes: ['FxHET', 'MxHEMI', 'MxWT']
Class weights: {np.int64(0): np.float64(0.8910256410256411), np.int64(1): np.float64(2.0144927536231885), np.int64(2): np.float64(0.7239583333333334)}


In [59]:
!pip install lightgbm
!pip install xgboost



In [61]:
# =========================
# STEP 7 — Train multiple models; pick best by macro-F1
# =========================
models = {}
reports = {}

# 7A) LightGBM (if available)
try:
    from lightgbm import LGBMClassifier
    lgbm = LGBMClassifier(
        objective="multiclass",
        n_estimators=600,
        learning_rate=0.05,
        num_leaves=63,
        subsample=0.9,
        colsample_bytree=0.9,
        random_state=42
    )
    lgbm.fit(X_tr, y_tr, sample_weight=sample_weight_tr)
    y_pred = lgbm.predict(X_te)
    f1m = f1_score(y_te, y_pred, average="macro")
    models["LightGBM"] = lgbm
    reports["LightGBM"] = {
        "f1_macro": f1m,
        "accuracy": accuracy_score(y_te, y_pred),
        "y_pred": y_pred
    }
    print(f"LightGBM → F1_macro={f1m:.3f}  Acc={reports['LightGBM']['accuracy']:.3f}")
except Exception as e:
    print("LightGBM not available:", e)

# 7B) XGBoost (if available)
try:
    from xgboost import XGBClassifier
    lgbm = LGBMClassifier(
        objective="multiclass",
        n_estimators=800,
        learning_rate=0.05,
        num_leaves=15,          # smaller trees
        max_depth=5,            # cap depth to avoid overfitting
        min_data_in_leaf=5,     # <— critical for 139 rows
        feature_fraction=0.8,
        bagging_fraction=0.8,
        bagging_freq=1,
        reg_lambda=1e-2,        # mild L2
        random_state=42,
        force_col_wise=True     # just removes the overhead message
    )
    # make sure features are numeric
    X_tr = X_tr.astype(np.float32)
    X_te = X_te.astype(np.float32)
    lgbm.fit(X_tr, y_tr, sample_weight=sample_weight_tr)
    y_pred = lgbm.predict(X_te)
    f1m = f1_score(y_te, y_pred, average="macro")
    models["XGBoost"] = xgb
    reports["XGBoost"] = {
        "f1_macro": f1m,
        "accuracy": accuracy_score(y_te, y_pred),
        "y_pred": y_pred
    }
    print(f"XGBoost  → F1_macro={f1m:.3f}  Acc={reports['XGBoost']['accuracy']:.3f}")
except Exception as e:
    print("XGBoost not available:", e)

# 7C) MLP (always available; scale features)
mlp = Pipeline([
    ("scaler", StandardScaler(with_mean=False)),  # sparse-safe; features can be different scales
    ("clf", MLPClassifier(hidden_layer_sizes=(256,128), alpha=1e-4, max_iter=600, random_state=42))
])
mlp.fit(X_tr, y_tr)
y_pred = mlp.predict(X_te)
f1m = f1_score(y_te, y_pred, average="macro")
models["MLP"] = mlp
reports["MLP"] = {
    "f1_macro": f1m,
    "accuracy": accuracy_score(y_te, y_pred),
    "y_pred": y_pred
}
print(f"MLP      → F1_macro={f1m:.3f}  Acc={reports['MLP']['accuracy']:.3f}")

# Pick best
best_name = max(reports, key=lambda k: reports[k]["f1_macro"])
best_model = models[best_name]
best_pred = reports[best_name]["y_pred"]
print("\nBEST MODEL:", best_name, " | F1_macro:", reports[best_name]["f1_macro"], " | Acc:", reports[best_name]["accuracy"])


[LightGBM] [Info] Auto-choosing row-wise multi-threading, the overhead of testing was 0.000929 seconds.
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total Bins 3412
[LightGBM] [Info] Number of data points in the train set: 139, number of used features: 93
[LightGBM] [Info] Start training from score -1.098612
[LightGBM] [Info] Start training from score -1.098612
[LightGBM] [Info] Start training from score -1.098612
LightGBM → F1_macro=0.585  Acc=0.574
[LightGBM] [Info] Total Bins 3475
[LightGBM] [Info] Number of data points in the train set: 139, number of used features: 115
[LightGBM] [Info] Start training from score -1.098612
[LightGBM] [Info] Start training from score -1.098612
[LightGBM] [Info] Start training from score -1.098612
XGBoost  → F1_macro=0.629  Acc=0.638
MLP      → F1_macro=0.646  Acc=0.638

BEST MODEL: MLP  | F1_macro: 0.6464498843015467  | Acc: 0.6382978723404256


# MLP Model metrics

In [66]:
from sklearn.metrics import confusion_matrix, classification_report
import numpy as np

# Which model won and its predictions
print("Best model:", best_name)
labels = list(le.classes_)
y_true = y_te
y_hat  = best_pred  # predictions for the best model

# Raw confusion matrix (rows = true, cols = predicted)
cm = confusion_matrix(y_true, y_hat, labels=np.arange(len(labels)))
print("Label order:", labels)
print("Confusion matrix (rows=true, cols=pred):\n", cm)

# Row-normalized (per-true-class recall)
cm_norm = cm.astype(float) / cm.sum(axis=1, keepdims=True)
np.set_printoptions(precision=3, suppress=True)
print("Row-normalized (%):\n", np.round(100 * cm_norm, 1))

# Full per-class report (precision/recall/F1)
print("\nClassification report (best):\n")
print(classification_report(y_true, y_hat, target_names=labels, digits=3))


Best model: MLP
Label order: ['FxHET', 'MxHEMI', 'MxWT']
Confusion matrix (rows=true, cols=pred):
 [[ 7  3  8]
 [ 0  6  1]
 [ 6  0 16]]
Row-normalized (%):
 [[38.9 16.7 44.4]
 [ 0.  85.7 14.3]
 [27.3  0.  72.7]]

Classification report (best):

              precision    recall  f1-score   support

       FxHET      0.538     0.389     0.452        18
      MxHEMI      0.667     0.857     0.750         7
        MxWT      0.640     0.727     0.681        22

    accuracy                          0.617        47
   macro avg      0.615     0.658     0.627        47
weighted avg      0.605     0.617     0.603        47



FxHET (support=18): model got 7 right, misclassified 3 as MxHEMI and 8 as MxWT.
Row-normalized: 38.9% correct, 16.7% → MxHEMI, 44.4% → MxWT.
→ FxHET is the hardest; the model often confuses it with MxWT.

MxHEMI (support=7): model got 6 right, only 1 mistake (as MxWT).
Row-normalized: 85.7% correct → best recall.

MxWT (support=22): model got 16 right, 6 were predicted as FxHET.
Row-normalized: 72.7% correct.


Your report:

FxHET: precision 0.538, recall 0.389, F1 0.452 → biggest pain point (lots of misses to MxWT).

MxHEMI: precision 0.667, recall 0.857, F1 0.750 → strong.

MxWT: precision 0.640, recall 0.727, F1 0.681 → decent.

# XGBoost Model

In [69]:
# === Metrics for the XGBoost model ===
import numpy as np
from sklearn.metrics import (
    f1_score, accuracy_score, confusion_matrix, classification_report, log_loss
)

# Ensure we have a trained XGBoost model in `models`
assert "XGBoost" in models, "XGBoost model not found in `models`. Re-run Step 7 with the fixed XGBoost block."

# XGBoost likes float32 features
X_te_np = X_te.astype(np.float32)

# Predictions
y_pred_xgb = models["XGBoost"].predict(X_te_np)

# Core metrics
acc_xgb  = accuracy_score(y_te, y_pred_xgb)
f1m_xgb  = f1_score(y_te, y_pred_xgb, average="macro")
print(f"XGBoost — Acc: {acc_xgb:.3f} | F1_macro: {f1m_xgb:.3f}")

# Optional: probability-based metric (log loss)
if hasattr(models["XGBoost"], "predict_proba"):
    y_proba_xgb = models["XGBoost"].predict_proba(X_te_np)
    ll_xgb = log_loss(y_te, y_proba_xgb, labels=np.arange(len(le.classes_)))
    print(f"LogLoss: {ll_xgb:.4f}")

# Confusion matrix
labels = list(le.classes_)
cm_xgb = confusion_matrix(y_te, y_pred_xgb, labels=np.arange(len(labels)))
print("Label order:", labels)
print("Confusion matrix (rows=true, cols=pred):\n", cm_xgb)

# Row-normalized (per-true-class recall breakdown)
cm_norm_xgb = cm_xgb.astype(float) / cm_xgb.sum(axis=1, keepdims=True)
np.set_printoptions(precision=3, suppress=True)
print("Row-normalized (%):\n", np.round(100 * cm_norm_xgb, 1))

# Per-class precision/recall/F1
print("\nClassification report (XGBoost):\n")
print(classification_report(y_te, y_pred_xgb, target_names=labels, digits=3))


XGBoost — Acc: 0.574 | F1_macro: 0.562
LogLoss: 1.0343
Label order: ['FxHET', 'MxHEMI', 'MxWT']
Confusion matrix (rows=true, cols=pred):
 [[ 9  1  8]
 [ 4  3  0]
 [ 7  0 15]]
Row-normalized (%):
 [[50.   5.6 44.4]
 [57.1 42.9  0. ]
 [31.8  0.  68.2]]

Classification report (XGBoost):

              precision    recall  f1-score   support

       FxHET      0.450     0.500     0.474        18
      MxHEMI      0.750     0.429     0.545         7
        MxWT      0.652     0.682     0.667        22

    accuracy                          0.574        47
   macro avg      0.617     0.537     0.562        47
weighted avg      0.589     0.574     0.575        47



Row-normalized (recall per class):

FxHET: 50.0% correct (9/18). The rest mostly go to MxWT (44.4%).

MxHEMI: 42.9% correct (3/7). Many are misclassified as FxHET (57.1%).

MxWT: 68.2% correct (15/22). Misses mostly go to FxHET (31.8%).

So XGBoost confuses FxHET with MxWT a lot, and often calls MxHEMI → FxHET.

Per-class precision/recall/F1 (what they mean)
FxHET: precision 0.450, recall 0.500, F1 0.474
→ When XGB predicts FxHET it’s right ~45%, and it catches ~50% of true FxHETs.

MxHEMI: precision 0.750, recall 0.429, F1 0.545
→ It predicts MxHEMI sparingly (high precision), but misses many true MxHEMIs (low recall).

MxWT: precision 0.652, recall 0.682, F1 0.667
→ Best of the three, but still some confusion with FxHET

In [68]:
# Class distribution in the full labeled set
print(pd.Series(le.inverse_transform(y_enc)).value_counts())

# Majority-class baseline accuracy on the test set
maj = np.bincount(y_tr).argmax()
maj_acc = (y_te == maj).mean()
print(f"Majority-class baseline (test): {maj_acc:.3f}")


MxWT      86
FxHET     70
MxHEMI    30
Name: count, dtype: int64
Majority-class baseline (test): 0.468


Total = 86 + 70 + 30 = 186 samples overall → mild imbalance:

MxWT ≈ 46%

FxHET ≈ 38%

MxHEMI ≈ 16%

# Not Important skip

In [None]:
# === PATCH CELL: small-data-friendly baseline + classifier for NeuronType ===
# - Uses ONLY the five list columns you specified
# - Histogram bins are quantile-based (better on small data)
# - Empty-list histograms -> NaN so missingness indicators carry signal
# - Adds per-list flags: __has_values and __len
# - Trains LightGBM (small-data tuned), XGBoost (if available), and MLP; picks best by macro-F1

import os, re, ast, json, math, warnings
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import accuracy_score, f1_score, classification_report, confusion_matrix
from sklearn.pipeline import Pipeline
from sklearn.neural_network import MLPClassifier
from sklearn.utils.class_weight import compute_class_weight

warnings.filterwarnings("ignore", category=UserWarning)

# ---------------------------
# 0) Inputs / fallbacks
# ---------------------------
LABEL_COL = "NeuronType"

if "columns_to_process" not in locals():
    columns_to_process = [
        "Burst_Peak_List",
        "Abs_Burst_Peak_List",
        "Burst_Times_List",      # used for timing, L, bursts/min
        "IBI_List",
        "SpikesPerBurst_List",
    ]

TIME_COL = "Burst_Times_List"
N_BINS = 10

# If df not already loaded, try to load it (edit DATA_PATH if needed)
if "df" not in locals():
    DATA_PATH = "your_file.csv"  # <- change if needed
    assert os.path.exists(DATA_PATH), f"File not found: {DATA_PATH}"
    df = pd.read_csv(DATA_PATH)

# synth recording_id if missing
if "recording_id" not in df.columns:
    df = df.reset_index(drop=True).copy()
    df["recording_id"] = [f"row_{i:06d}" for i in range(len(df))]

# sanity checks
missing = [c for c in [LABEL_COL] + columns_to_process if c not in df.columns]
assert not missing, f"Missing columns in df: {missing}"

# ---------------------------
# 1) Parse list columns
# ---------------------------
def parse_list_cell(x):
    if pd.isna(x):
        return np.array([], dtype=float)
    if isinstance(x, (list, tuple, np.ndarray)):
        return np.array(x, dtype=float)
    s = str(x).strip()
    if s == "" or s.lower() in {"nan", "none"}:
        return np.array([], dtype=float)
    try:
        v = ast.literal_eval(s)  # e.g., "[1,2,3]"
        if isinstance(v, (list, tuple, np.ndarray)):
            return np.array(v, dtype=float)
    except Exception:
        pass
    try:
        toks = [t for t in re.split(r"[,\s]+", s) if t != ""]
        return np.array([float(t) for t in toks], dtype=float)
    except Exception:
        return np.array([], dtype=float)

parsed = {c: [] for c in columns_to_process}
for _, row in df.iterrows():
    for c in columns_to_process:
        parsed[c].append(parse_list_cell(row[c]))

# ---------------------------
# 2) Quantile histogram bins (global)
# ---------------------------
def global_edges_quantile(all_arrays, n_bins=N_BINS):
    arrs = [a for a in all_arrays if isinstance(a, np.ndarray) and a.size]
    if not arrs:
        return np.linspace(0, 1, n_bins + 1)
    concat = np.concatenate(arrs)
    qs = np.linspace(0, 1, n_bins + 1)
    edges = np.quantile(concat, qs)
    edges = np.unique(edges)  # ensure strictly increasing
    if edges.size < n_bins + 1:
        # slight jitter to avoid identical edges on tiny data
        jitter = np.linspace(-1e-6, 1e-6, n_bins + 1)
        edges = np.quantile(concat, qs) + jitter
    return edges

hist_bins = {c: global_edges_quantile(parsed[c], N_BINS) for c in columns_to_process}

# ---------------------------
# 3) Stats + hist helpers (patched)
# ---------------------------
def median_absolute_deviation(x):
    if x.size == 0:
        return np.nan
    med = np.nanmedian(x)
    return float(np.nanmedian(np.abs(x - med)))

def robust_stats(x):
    if x.size == 0:
        return {k: np.nan for k in [
            "mean","std","median","mad","min","q10","q25","q50","q75","q90","max","skew","kurtosis"
        ]}
    s = pd.Series(x, dtype=float)
    return {
        "mean": float(s.mean()),
        "std": float(s.std(ddof=1)),
        "median": float(s.median()),
        "mad": float(median_absolute_deviation(s.values)),
        "min": float(s.min()),
        "q10": float(s.quantile(0.10)),
        "q25": float(s.quantile(0.25)),
        "q50": float(s.quantile(0.50)),
        "q75": float(s.quantile(0.75)),
        "q90": float(s.quantile(0.90)),
        "max": float(s.max()),
        "skew": float(s.skew()),
        "kurtosis": float(s.kurtosis()),
    }

def hist_features(x, edges, prefix):
    # PATCH: empty list -> NaN bins so __isnan indicators capture "no data"
    if x.size == 0:
        return {f"{prefix}_hist_bin{i}": np.nan for i in range(len(edges) - 1)}
    x = x[np.isfinite(x)]
    if x.size == 0:
        return {f"{prefix}_hist_bin{i}": np.nan for i in range(len(edges) - 1)}
    counts, _ = np.histogram(x, bins=edges)
    total = counts.sum()
    if total > 0:
        counts = counts / total
    return {f"{prefix}_hist_bin{i}": float(counts[i]) for i in range(len(counts))}

# ---------------------------
# 4) Build features (adds __has_values and __len)
# ---------------------------
rows = []
for i in range(len(df)):
    rec_id = df.loc[i, "recording_id"]

    times = parsed.get(TIME_COL, [np.array([], dtype=float)])[i]
    times = np.sort(times) if isinstance(times, np.ndarray) and times.size else np.array([], dtype=float)

    fallback_lengths = [parsed[c][i].size for c in columns_to_process if parsed[c][i] is not None]
    L = int(times.size) if times.size else (int(max(fallback_lengths)) if fallback_lengths else 0)
    first_t = float(times[0]) if times.size else np.nan
    last_t  = float(times[-1]) if times.size else np.nan
    span    = float(max(0.0, last_t - first_t)) if times.size else np.nan
    bpm     = (L / (span/60.0)) if (times.size and span > 0) else np.nan

    feat = {
        "recording_id": rec_id,
        "L": L,
        "bursts_per_min": bpm,
        "first_burst_time": first_t,
        "last_burst_time": last_t,
        "approx_recording_span_sec": span,
        "fraction_time_active": np.nan,  # no per-burst duration in provided columns
        "duty_cycle": np.nan,
        "has_bursts": 1 if L > 0 else 0,
    }

    for c in columns_to_process:
        x = parsed[c][i]
        # flags
        feat[f"{c}__has_values"] = 1 if x.size > 0 else 0
        feat[f"{c}__len"] = int(x.size)
        # stats + hist
        for k, v in robust_stats(x).items():
            feat[f"{c}__{k}"] = v
        feat.update(hist_features(x, hist_bins[c], prefix=f"{c}"))

    rows.append(feat)

feat_table = pd.DataFrame(rows)

# ---------------------------
# 5) L=0-safe: indicators + median impute
# ---------------------------
X = feat_table.copy()

num_cols = [c for c in X.columns if pd.api.types.is_numeric_dtype(X[c]) and c != "has_bursts"]
for c in num_cols:
    X[f"{c}__isnan"] = X[c].isna().astype(int)
    med = float(np.nanmedian(X[c].values)) if X[c].notna().any() else 0.0
    X[c] = X[c].fillna(med)

# Keep ID aside; build model matrix
recording_ids = X["recording_id"].tolist()
X_model = X.drop(columns=["recording_id"]).astype(np.float32)

# Labels
y = df[LABEL_COL].copy()
mask = y.notna()
X_model = X_model.loc[mask].reset_index(drop=True)
y = y.loc[mask].reset_index(drop=True)

# Encode classes
le = LabelEncoder()
y_enc = le.fit_transform(y)

# Train/test split
X_tr, X_te, y_tr, y_te = train_test_split(
    X_model, y_enc, test_size=0.25, random_state=42, stratify=y_enc
)

# Class weights → per-sample weights
classes = np.unique(y_tr)
class_weights = compute_class_weight(class_weight="balanced", classes=classes, y=y_tr)
cw_map = {cls: w for cls, w in zip(classes, class_weights)}
sample_weight_tr = np.array([cw_map[c] for c in y_tr], dtype=float)

# ---------------------------
# 6) Train models; pick best by macro-F1
# ---------------------------
models, reports = {}, {}

# LightGBM (small-data tuned)
try:
    from lightgbm import LGBMClassifier
    lgbm = LGBMClassifier(
        objective="multiclass",
        n_estimators=800,
        learning_rate=0.05,
        num_leaves=15,
        max_depth=5,
        min_data_in_leaf=5,
        feature_fraction=0.8,
        bagging_fraction=0.8,
        bagging_freq=1,
        reg_lambda=1e-2,
        random_state=42,
        force_col_wise=True,
    )
    lgbm.fit(X_tr, y_tr, sample_weight=sample_weight_tr)
    y_pred = lgbm.predict(X_te)
    reports["LightGBM"] = {
        "f1_macro": f1_score(y_te, y_pred, average="macro"),
        "accuracy": accuracy_score(y_te, y_pred),
        "y_pred": y_pred,
    }
    models["LightGBM"] = lgbm
    print(f"LightGBM → F1_macro={reports['LightGBM']['f1_macro']:.3f}  Acc={reports['LightGBM']['accuracy']:.3f}")
except Exception as e:
    print("LightGBM not available:", e)

# XGBoost (optional)
try:
    from xgboost import XGBClassifier
    xgb = XGBClassifier(
        objective="multi:softprob",
        eval_metric="mlogloss",
        n_estimators=700,
        learning_rate=0.05,
        max_depth=7,
        subsample=0.9,
        colsample_bytree=0.9,
        random_state=42,
        tree_method="hist"
    )
    xgb.fit(X_tr, y_tr, sample_weight=sample_weight_tr)
    y_pred = xgb.predict(X_te)
    reports["XGBoost"] = {
        "f1_macro": f1_score(y_te, y_pred, average="macro"),
        "accuracy": accuracy_score(y_te, y_pred),
        "y_pred": y_pred,
    }
    models["XGBoost"] = xgb
    print(f"XGBoost  → F1_macro={reports['XGBoost']['f1_macro']:.3f}  Acc={reports['XGBoost']['accuracy']:.3f}")
except Exception as e:
    print("XGBoost not available:", e)

# MLP (always available)
mlp = Pipeline([
    ("scaler", StandardScaler(with_mean=False)),
    ("clf", MLPClassifier(hidden_layer_sizes=(256, 128), alpha=1e-4, max_iter=700, random_state=42))
])
mlp.fit(X_tr, y_tr)
y_pred = mlp.predict(X_te)
reports["MLP"] = {
    "f1_macro": f1_score(y_te, y_pred, average="macro"),
    "accuracy": accuracy_score(y_te, y_pred),
    "y_pred": y_pred,
}
models["MLP"] = mlp
print(f"MLP      → F1_macro={reports['MLP']['f1_macro']:.3f}  Acc={reports['MLP']['accuracy']:.3f}")

# Pick best
best_name = max(reports, key=lambda k: reports[k]["f1_macro"])
best_model = models[best_name]
best_pred = reports[best_name]["y_pred"]

print("\nBEST MODEL:", best_name, 
      "| F1_macro:", f"{reports[best_name]['f1_macro']:.3f}", 
      "| Acc:", f"{reports[best_name]['accuracy']:.3f}")

# ---------------------------
# 7) Detailed report
# ---------------------------
print("\nClassification report (best):\n")
print(classification_report(y_te, best_pred, target_names=list(le.classes_), digits=3))

cm = confusion_matrix(y_te, best_pred)
print("Confusion matrix:\n", cm)

# Optional quick plot
try:
    import matplotlib.pyplot as plt
    plt.figure(figsize=(6,5))
    plt.imshow(cm, interpolation="nearest")
    plt.title(f"Confusion Matrix — {best_name}")
    plt.colorbar()
    ticks = np.arange(len(le.classes_))
    plt.xticks(ticks, le.classes_, rotation=45, ha="right")
    plt.yticks(ticks, le.classes_)
    plt.xlabel("Predicted"); plt.ylabel("True"); plt.tight_layout(); plt.show()
except Exception:
    pass
