# READ ME (This is a supplemental notebook, main.ipynb is the full project)

This script is **not meant to be run**, and the stacked time series file won't be provided, as its 50 GB of patient health information. We will use the features derived from this time series instead of the raw health data in the project. This script outlines how the channel labels were mapped to brain regions, the fourier transform to extract the alpha power present in our features.csv, among other inspection methods to better understand how to load in and work with the data (scipy matfile reader? h5py?). 

In [None]:
import pandas as pd
import numpy as np
from pathlib import Path
import re
SRC = Path(r"C:\Users\0218s\Desktop\channels_1_185_with_labels.csv")
DST = Path(r"C:\Users\0218s\Desktop\optimal-electrode-mip\data\electrodes.csv")
DST.parent.mkdir(parents=True, exist_ok=True)
m = pd.read_csv(SRC)
if "label" not in m.columns:
    raise ValueError(f"'label' column not found. Columns: {m.columns.tolist()}")
cols_lower = {c.lower(): c for c in m.columns}
has_xy  = ("x" in cols_lower) and ("y" in cols_lower)
has_xyz = {"x","y","z"}.issubset(cols_lower.keys())
if has_xy:
    x = pd.to_numeric(m[cols_lower["x"]], errors="coerce")
    y = pd.to_numeric(m[cols_lower["y"]], errors="coerce")
elif has_xyz:
    xyz = m[[cols_lower["x"], cols_lower["y"], cols_lower["z"]]].apply(pd.to_numeric, errors="coerce")
    arr = xyz.to_numpy(dtype=float)
    norm = np.linalg.norm(arr, axis=1, keepdims=True)
    norm[norm==0] = 1.0
    arr = arr / norm
    x = pd.Series(arr[:,0])
    y = pd.Series(arr[:,1])
else:
    x = pd.Series([np.nan]*len(m))
    y = pd.Series([np.nan]*len(m))
def label_to_region(lbl: str) -> str:
    s = lbl.strip().upper()
    # Common 
    if s.startswith("FP"): return "frontopolar"
    if s.startswith("AF"): return "frontal (anterior)"
    if s.startswith("FA"): return "frontal (anterior)"
    if s.startswith("FC"): return "fronto-central"
    if s.startswith("FT"): return "fronto-temporal"
    if s.startswith("CP"): return "centro-parietal"
    if s.startswith("PO"): return "parieto-occipital"
    if s.startswith("TP"): return "temporo-parietal"
    if s.startswith("OP"): return "occipito-parietal"
    if s.startswith("O"):  return "occipital"
    if s.startswith("P"):  return "parietal"
    if s.startswith("C"):  return "central"
    if s.startswith("F"):  return "frontal"
    if s.startswith("T"):  return "temporal"
    # High-density suffixes (h) don’t change lobe
    if re.match(r".*H$", s):
        base = re.sub(r"H$", "", s)
        return label_to_region(base)
    return "unknown"
names = m["label"].astype(str)
regions = names.map(label_to_region)
upper = names.str.upper()
cost = pd.Series(1.0, index=names.index)
mask_fp = upper.str.startswith("FP")
mask_inf_temp = upper.isin(["T3","T4","T7","T8"])  # handle both naming conventions
cost.loc[mask_fp | mask_inf_temp] = 1.5
out = pd.DataFrame({
    "name": names,
    "cost": cost.values,
    "location": regions.values,
    "x": x.values,
    "y": y.values,
})
out = out.drop_duplicates(subset=["name"]).sort_values("name").reset_index(drop=True)
out.to_csv(DST, index=False)
print(f"Wrote {DST} with {len(out)} rows and columns: {list(out.columns)}")


Wrote C:\Users\0218s\Desktop\optimal-electrode-mip\data\electrodes.csv with 185 rows and columns: ['name', 'cost', 'location', 'x', 'y']


In [None]:
import re
import pandas as pd
df = pd.read_csv(r"C:\Users\0218s\Desktop\optimal-electrode-mip\data\electrodes.csv")
mask_afp = df["name"].str.upper().str.startswith("AFP")
df.loc[mask_afp, "location"] = "frontopolar"
upper = df["name"].str.upper()
hard = (
    upper.str.startswith("FP") |       # FP1, FP2, FPz...
    upper.str.startswith("AFP") |      # AFp3, AFpz...
    upper.isin(["T7","T8","T3","T4"]) |   # temporal legacy/new
    upper.isin(["FT7","FT8","TP7","TP8","A1","A2"])
)
df["cost"] = 1.0
df.loc[hard, "cost"] = 1.5
df.to_csv(r"C:\Users\0218s\Desktop\optimal-electrode-mip\data\electrodes.csv", index=False)
print("Updated locations/costs written.")


Updated locations/costs written.


In [None]:
# test with surrogate data 1st (not in main.ipynb deliverable)
import pandas as pd, numpy as np
elec = pd.read_csv(r"C:\Users\0218s\Desktop\optimal-electrode-mip\data\electrodes.csv")  # expects columns: name,location,…
subjects = [f"S{i:02d}" for i in range(1, 11)]  # 10 demo subjects
occ = elec["location"].str.contains("occipital", case=False, na=False)
par = elec["location"].str.contains("parietal",  case=False, na=False)
posterior_mask = (occ | par).to_numpy(dtype=float)
rng = np.random.default_rng(42)
rows = []
for s in subjects:
    base = rng.uniform(0.20, 0.50, size=len(elec))                    # baseline
    bump = rng.uniform(0.05, 0.15, size=len(elec)) * posterior_mask   # posterior lift
    vals = (base + bump).clip(0, 1)
    rows += [{"subject": s, "electrode": name, "usefulness": float(v)}
             for name, v in zip(elec["name"], vals)]
pd.DataFrame(rows).to_csv(r"C:\Users\0218s\Desktop\optimal-electrode-mip\data\usefulness_demo.csv", index=False)


Wrote data/usefulness_demo.csv


In [None]:
# reader formats: AI Note: This code snippet is generated by ChatGPT to test different methods of loading in the time series
import os, numpy as np
path = r"C:\Users\0218s\Desktop\stack03312023.mat" 
def pretty_shape(x):
    try:
        return tuple(x.shape)
    except Exception:
        return None
def try_scipy(p):
    import scipy.io as sio
    d = sio.loadmat(p, struct_as_record=False, squeeze_me=False)
    keys = [k for k in d.keys() if not k.startswith("__")]
    rows = []
    for k in keys:
        v = d[k]
        t = type(v).__name__
        shp = pretty_shape(v)
        if isinstance(v, np.ndarray) and getattr(v.dtype, "names", None):
            t = f"np.ndarray(struct dtype={v.dtype.names})"
        rows.append((k, t, shp))
    return rows
def try_h5py(p):
    import h5py
    rows = []
    with h5py.File(p, "r") as f:
        for k in f.keys():
            obj = f[k]
            if isinstance(obj, h5py.Dataset):
                rows.append((k, f"h5py.Dataset[{obj.dtype}]", tuple(obj.shape)))
            else:
                rows.append((k, "h5py.Group", None))
    return rows
if not os.path.exists(path):
    print(f"File not found: {path}")
else:
    try:
        print("=== SciPy loadmat (v5/v7) ===")
        for k, t, shp in try_scipy(path):
            print(f"- {k:25s} | {t:35s} | shape={shp}")
    except Exception as e:
        print(f"SciPy loadmat fallback: {e}")
        try:
            print("\n=== h5py (v7.3/HDF5) ===")
            for k, t, shp in try_h5py(path):
                print(f"- {k:25s} | {t:35s} | shape={shp}")
        except Exception as e2:
            print(f"HDF5 fallback: {e2}")


=== SciPy loadmat (v5/v7) ===
SciPy loadmat fallback: Please use HDF reader for matlab v7.3 files, e.g. h5py

=== h5py (v7.3/HDF5) ===
- stackedEEG_noreport_60s   | h5py.Dataset[float64]               | shape=(358, 20, 1500, 185)
- stackedEEG_report_60s     | h5py.Dataset[float64]               | shape=(341, 20, 1500, 185)
- stackedEEG_something_60s  | h5py.Dataset[float64]               | shape=(355, 20, 1500, 185)


In [None]:
import h5py, numpy as np, pandas as pd
from pathlib import Path
from scipy.signal import welch
MAT_PATH   = r"C:\Users\0218s\Desktop\stack03312023.mat"
MAP_CSV    = r"C:\Users\0218s\Desktop\channels_1_185_with_labels.csv"  # has Channel,label
ELEC_CSV   = r"C:\Users\0218s\Desktop\optimal-electrode-mip\data\electrodes.csv"
OUT_CSV    = r"C:\Users\0218s\Desktop\optimal-electrode-mip\data\features.csv"
DATASETS = {
    "stackedEEG_report_60s":    1,  # CE
    "stackedEEG_noreport_60s":  0,  # NCE
}
FS = 500.0
WIN_SEC, NOVERLAP = 2.0, 0.5
NPERSEG = int(WIN_SEC * FS)
NOVERLAP_SAMPLES = int(NPERSEG * NOVERLAP)
BANDS = {"delta":(0.5,4.0),"theta":(4.0,8.0),"alpha":(8.0,12.0),"sigma":(12.0,16.0),"beta":(16.0,30.0)}
LOW, HIGH = 0.5, 30.0
def bandpower_from_psd(freqs, psd, fmin, fmax):
    idx = (freqs >= fmin) & (freqs < fmax)
    return float(np.trapz(psd[idx], freqs[idx])) if np.any(idx) else 0.0
def rel_alpha_power(x_1d):
    f, Pxx = welch(x_1d, fs=FS, nperseg=NPERSEG, noverlap=NOVERLAP_SAMPLES, scaling="density")
    p_tot  = bandpower_from_psd(f, Pxx, LOW, HIGH) + 1e-12
    p_alph = bandpower_from_psd(f, Pxx, *BANDS["alpha"])
    return float(p_alph / p_tot)
def main():
    m = pd.read_csv(MAP_CSV)  
    if "Channel" not in m.columns or "label" not in m.columns:
        raise ValueError(f"{MAP_CSV} must have columns 'Channel' and 'label'")
    m["label_norm"] = m["label"].astype(str).str.strip()
    idx2label = dict(zip(m["Channel"].astype(int), m["label_norm"]))
    elec = pd.read_csv(ELEC_CSV)
    have = set(elec["name"].astype(str).str.strip())
    missing = sorted({lbl for lbl in idx2label.values() if lbl not in have})
    if missing:
        print(f"[WARN] {len(missing)} labels in mapping not found in electrodes.csv (showing up to 10): {missing[:10]}")
    rows = []
    with h5py.File(MAT_PATH, "r") as f:
        for ds_name, label in DATASETS.items():
            if ds_name not in f:
                print(f"[WARN] dataset {ds_name} not found; skipping")
                continue
            dset = f[ds_name]  # shape = (N, 20, 1500, 185)
            N, S, T, C = dset.shape
            assert C == 185, f"Expected 185 channels, got {C}"
            assert S*T == int(60*FS), f"Expected 60s total: S*T={S*T} vs 60*FS={60*FS}"
            print(f"[INFO] {ds_name}: N={N}, segments={S}, samples/seg={T}, channels={C}")
            for n in range(N):
                x = dset[n]  # (S, T, C)
                feat_seg = np.zeros((S, C), dtype=np.float32)
                for s in range(S):
                    seg = x[s]  # (T, C)
                    for c in range(C):
                        feat_seg[s, c] = rel_alpha_power(seg[:, c])
                feat_ch = feat_seg.mean(axis=0)  # (C,)
                subj_id = f"{ds_name}_S{n:03d}"
                for ci in range(C):
                    ch_idx_mat = ci + 1
                    name = idx2label.get(ch_idx_mat, f"CH{ch_idx_mat}")
                    rows.append({
                        "subject":   subj_id,
                        "electrode": name,
                        "feature":   float(feat_ch[ci]),
                        "label":     int(label),
                    })
    out = pd.DataFrame(rows)
    Path(OUT_CSV).parent.mkdir(parents=True, exist_ok=True)
    out.to_csv(OUT_CSV, index=False)
    print(f"[DONE] {OUT_CSV}  subjects={out['subject'].nunique()}  electrodes={out['electrode'].nunique()}  rows={len(out)}")

if __name__ == "__main__":
    main()


[INFO] stackedEEG_report_60s: N=341, segments=20, samples/seg=1500, channels=185


  return float(np.trapz(psd[idx], freqs[idx])) if np.any(idx) else 0.0


[INFO] stackedEEG_noreport_60s: N=358, segments=20, samples/seg=1500, channels=185
[DONE] C:\Users\0218s\Desktop\optimal-electrode-mip\data\features.csv  subjects=699  electrodes=185  rows=129315


In [None]:
import pandas as pd
import numpy as np
from pathlib import Path
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
FEATURES_PATH   = Path(r"C:\Users\0218s\Desktop\optimal-electrode-mip\data\features.csv")
ELECTRODES_PATH = Path(r"C:\Users\0218s\Desktop\optimal-electrode-mip\data\electrodes.csv")
OUT_PATH        = Path(r"C:\Users\0218s\Desktop\optimal-electrode-mip\data\usefulness_real.csv")
NORMALIZE_PER_SUBJECT = True  
SEED = 42
MAX_ITER = 300
def main():
    if not FEATURES_PATH.exists():
        raise FileNotFoundError(f"Missing {FEATURES_PATH}. Create it from your .mat first.")
    if not ELECTRODES_PATH.exists():
        raise FileNotFoundError(f"Missing {ELECTRODES_PATH} (expects column 'name').")
    df = pd.read_csv(FEATURES_PATH)       # subject,electrode,feature,label
    elec = pd.read_csv(ELECTRODES_PATH)   # name,cost,location,x,y ...
    E = elec["name"].astype(str).tolist()
    X = df.pivot_table(index="subject", columns="electrode", values="feature", aggfunc="mean")
    y = df.drop_duplicates("subject").set_index("subject")["label"].reindex(X.index)
    X = X.reindex(columns=E).fillna(0.0)
    scaler = StandardScaler(with_mean=True, with_std=True)
    X_std = scaler.fit_transform(X.to_numpy())
    clf = LogisticRegression(
        penalty="l2",
        solver="liblinear",
        class_weight="balanced",   
        max_iter=MAX_ITER,
        random_state=SEED
    )
    clf.fit(X_std, y.to_numpy())
    w = clf.coef_.ravel()  # aligned with X columns
    Xstd_df = pd.DataFrame(X_std, index=X.index, columns=X.columns)
    contrib = (Xstd_df * w).abs()
    if NORMALIZE_PER_SUBJECT:
        mins = contrib.min(axis=1)
        denom = (contrib.max(axis=1) - mins + 1e-12)
        usefulness = contrib.sub(mins, axis=0).div(denom, axis=0)
    else:
        lo, hi = contrib.values.min(), contrib.values.max()
        usefulness = (contrib - lo) / (hi - lo + 1e-12)
    out = usefulness.reset_index().melt(id_vars="subject",
                                        var_name="electrode",
                                        value_name="usefulness")
    out["usefulness"] = out["usefulness"].clip(0, 1).round(3)
    OUT_PATH.parent.mkdir(parents=True, exist_ok=True)
    out.to_csv(OUT_PATH, index=False)
    print(f"Wrote {OUT_PATH}  rows={len(out)}  subjects={out['subject'].nunique()}  electrodes={out['electrode'].nunique()}")
    print("Example:")
    print(out.head())
if __name__ == "__main__":
    main()

Wrote C:\Users\0218s\Desktop\optimal-electrode-mip\data\usefulness_real.csv  rows=129315  subjects=699  electrodes=185
Example:
                        subject electrode  usefulness
0  stackedEEG_noreport_60s_S000      AF1h       0.127
1  stackedEEG_noreport_60s_S001      AF1h       0.149
2  stackedEEG_noreport_60s_S002      AF1h       0.097
3  stackedEEG_noreport_60s_S003      AF1h       0.005
4  stackedEEG_noreport_60s_S004      AF1h       0.124
