# Library

In [None]:
!uv add tsfresh seglearn tsassure-feature pandas scikit-learn openpyxl gdown imbalanced-learn keras-tuner scipy pyts tslearn optuna catboost imblearn lightgbm xgboost tensorflow numpy nolds saxpy
!uv add numba
!uv add tslearn

# Dataset

In [None]:
!rm -rf ./dataset
!gdown --folder https://drive.google.com/drive/folders/1BeZmzXCMpUWKL9zFrNoWoJsvkiBwpfQp?usp=sharing

In [None]:
# =========================
# WARMUP-AWARE EXPORTER (LR/SVC(linear)/RF) — ONE CELL
# =========================
import numpy as np, pandas as pd
from pathlib import Path
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
from sklearn.ensemble import RandomForestClassifier

# ---------- CONFIG ----------
DATA_DIR = Path("./dataset")
TRAIN_FILES = [DATA_DIR/"Train-set_1.xlsx", DATA_DIR/"Train-set_2.xlsx"]
TEST_FILES  = [DATA_DIR/"Test-set_1.xlsx",  DATA_DIR/"Test-set_2.xlsx"]
COL_TIME = "Time"
COLS_SIGNAL = ["Temperature","Humidity","Humidity_WeatherStation","Temperature_WeatherStation"]
COL_LABEL = "Label"

OUT_DIR = Path("include"); OUT_DIR.mkdir(parents=True, exist_ok=True)

# ---------- LOAD ----------
def load_xlsx_list(files):
    dfs=[]
    for f in files:
        x = pd.read_excel(f)
        x["__stream_id__"] = f.name
        dfs.append(x)
    return pd.concat(dfs, ignore_index=True)

df_tr = load_xlsx_list(TRAIN_FILES)
df_te = load_xlsx_list(TEST_FILES)

need = set([COL_TIME, COL_LABEL] + COLS_SIGNAL)
if (need - set(df_tr.columns)) or (need - set(df_te.columns)):
    raise ValueError("Dataset missing required columns")

# ---------- HELPERS ----------
def _as_c_floats(a):
    a = np.asarray(a, dtype=np.float32).ravel()
    # enforce decimal part to avoid tokens like '1f' or '29f'
    out=[]
    for v in a:
        s = f"{float(v):.8g}"
        if ("." not in s) and ("e" not in s) and ("E" not in s):
            s = s + ".0"
        out.append(s + "f")
    return ", ".join(out)

def write_lr_header(tag, scaler, lr):
    T=f"LR_{tag}"
    n=scaler.mean_.shape[0]
    coef = lr.coef_.reshape(-1)
    bias = float(lr.intercept_.reshape(-1)[0])
    H=[]
    H += ["#pragma once", f"// Auto-generated Logistic Regression ({T})",
          f"#define {T}_N_FEATURES {n}",
          f"static const float {T}_SCALE_MEAN[{T}_N_FEATURES] = {{ {_as_c_floats(scaler.mean_)} }};",
          f"static const float {T}_SCALE_STD [{T}_N_FEATURES] = {{ {_as_c_floats(scaler.scale_)} }};",
          f"static const float {T}_COEF       [{T}_N_FEATURES] = {{ {_as_c_floats(coef)} }};",
          f"static const float {T}_BIAS = {bias:.8g}f;"]
    (OUT_DIR/f"model_edge_lr_{tag.lower()}.h").write_text("\n".join(H)+"\n", encoding="utf-8")

def write_svc_header(tag, scaler, svc):
    T=f"SVC_{tag}"
    n=scaler.mean_.shape[0]
    w = svc.coef_.reshape(-1)
    b = float(svc.intercept_.reshape(-1)[0])
    # LinearSVC has no Platt; use monotone default
    A, B = -1.0, 0.0
    H=[]
    H += ["#pragma once", f"// Auto-generated Linear SVM ({T})",
          f"#define {T}_N_FEATURES {n}",
          f"static const float {T}_SCALE_MEAN[{T}_N_FEATURES] = {{ {_as_c_floats(scaler.mean_)} }};",
          f"static const float {T}_SCALE_STD [{T}_N_FEATURES] = {{ {_as_c_floats(scaler.scale_)} }};",
          f"static const float {T}_COEF       [{T}_N_FEATURES] = {{ {_as_c_floats(w)} }};",
          f"static const float {T}_BIAS = {b:.8g}f;",
          f"static const float {T}_PROB_A = {A:.8g}f;",
          f"static const float {T}_PROB_B = {B:.8g}f;"]
    (OUT_DIR/f"model_edge_svc_{tag.lower()}.h").write_text("\n".join(H)+"\n", encoding="utf-8")

def _extract_tree_arrays(clf):
    t = clf.tree_
    feature   = t.feature.astype(np.int32)
    threshold = t.threshold.astype(np.float32)
    left      = t.children_left.astype(np.int32)
    right     = t.children_right.astype(np.int32)
    val       = t.value[:,0,:]
    leaf_val  = (val[:,1] - val[:,0]).astype(np.float32)
    is_leaf   = (left == -1).astype(np.uint8)
    return feature, threshold, left, right, is_leaf, leaf_val

def write_rf_header(tag, scaler, rf):
    T=f"RF_{tag}"
    n=scaler.mean_.shape[0]
    offsets=[0]; F=[];Th=[];L=[];R=[];IS=[];LV=[]
    for est in rf.estimators_:
        f,thr,l,r,isleaf,lv = _extract_tree_arrays(est)
        F.append(f); Th.append(thr); L.append(l); R.append(r); IS.append(isleaf); LV.append(lv)
        offsets.append(offsets[-1] + f.size)
    F  = np.concatenate(F).astype(np.int32)
    Th = np.concatenate(Th).astype(np.float32)
    L  = np.concatenate(L ).astype(np.int32)
    R  = np.concatenate(R ).astype(np.int32)
    IS = np.concatenate(IS).astype(np.uint8)
    LV = np.concatenate(LV).astype(np.float32)
    OFF= np.asarray(offsets, dtype=np.int32)

    H=[]
    H += ["#pragma once", f"// Auto-generated RandomForest ({T})",
          f"#define {T}_N_FEATURES {n}",
          f"#define {T}_N_TREES {len(rf.estimators_)}",
          f"#define {T}_N_NODES {F.size}",
          f"static const float {T}_SCALE_MEAN[{T}_N_FEATURES] = {{ {_as_c_floats(scaler.mean_)} }};",
          f"static const float {T}_SCALE_STD [{T}_N_FEATURES] = {{ {_as_c_floats(scaler.scale_)} }};",
          f"static const int   {T}_TREE_OFFSETS[{T}_N_TREES+1] = {{ {', '.join(map(str, OFF))} }};",
          f"static const int   {T}_FEATURE    [{T}_N_NODES]     = {{ {', '.join(map(str, F))} }};",
          f"static const float {T}_THRESHOLD  [{T}_N_NODES]     = {{ {_as_c_floats(Th)} }};",
          f"static const int   {T}_LEFT       [{T}_N_NODES]     = {{ {', '.join(map(str, L))} }};",
          f"static const int   {T}_RIGHT      [{T}_N_NODES]     = {{ {', '.join(map(str, R))} }};",
          f"static const unsigned char {T}_IS_LEAF[{T}_N_NODES] = {{ {', '.join(map(str, IS))} }};",
          f"static const float {T}_LEAF_VALUE [{T}_N_NODES]     = {{ {_as_c_floats(LV)} }};"]
    (OUT_DIR/f"model_edge_rf_{tag.lower()}.h").write_text("\n".join(H)+"\n", encoding="utf-8")

# ---------- PHASE FEATURES ----------
def make_stream_index(df):
    df = df.copy()
    df["__t__"] = df.groupby("__stream_id__").cumcount() + 1
    return df

df_tr = make_stream_index(df_tr)
df_te = make_stream_index(df_te)

def build_p0(df):
    rows=[]
    for sid,g in df.groupby("__stream_id__"):
        g=g.reset_index(drop=True)
        for i in range(min(len(g),2)):
            T,H,HWS,TWS = [g.loc[i,c] for c in COLS_SIGNAL]
            if i==0:
                dx=[0,0,0,0]; prd=[0,0,0,0]; sgn=[0,0,0,0]
            else:
                pv=[g.loc[i-1,c] for c in COLS_SIGNAL]; curr=[T,H,HWS,TWS]
                dx=[curr[j]-pv[j] for j in range(4)]
                prd=[]; sgn=[]
                for j in range(4):
                    den=0.5*(abs(curr[j])+abs(pv[j])); den=den if den>1e-6 else 1.0
                    prd.append(abs(curr[j]-pv[j])/den); sgn.append(1 if dx[j]>0 else (-1 if dx[j]<0 else 0))
            t=i+1
            feat=[T,H,HWS,TWS] + dx + [abs(x) for x in dx] + sgn + prd + [t, 1.0/t]
            rows.append((feat, int(g.loc[i,COL_LABEL])))
    X=np.array([f for f,_ in rows], dtype=np.float32); y=np.array([y for _,y in rows], dtype=np.int32)
    X=np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)
    return X,y

def build_p1(df,k=5):
    rows=[]
    for sid,g in df.groupby("__stream_id__"):
        g=g.reset_index(drop=True)
        d = g[COLS_SIGNAL].diff().fillna(0.0)
        ew = g[COLS_SIGNAL].ewm(alpha=2.0/(k+1), adjust=False).mean()
        rv = d.pow(2).rolling(k, min_periods=k).mean()
        m  = d.rolling(k, min_periods=k).mean()
        s  = d.rolling(k, min_periods=k).std().fillna(0.0)
        L1 = g[COLS_SIGNAL].shift(1); L2=g[COLS_SIGNAL].shift(2)
        for i in range(len(g)):
            if i+1<k: continue
            feat=[]
            for c in COLS_SIGNAL:
                feat += [ m.loc[i,c] if pd.notna(m.loc[i,c]) else 0.0,
                          s.loc[i,c] if pd.notna(s.loc[i,c]) else 0.0,
                          ew.loc[i,c] if pd.notna(ew.loc[i,c]) else g.loc[i,c],
                          rv.loc[i,c] if pd.notna(rv.loc[i,c]) else 0.0,
                          L1.loc[i,c] if pd.notna(L1.loc[i,c]) else g.loc[i,c],
                          L2.loc[i,c] if pd.notna(L2.loc[i,c]) else g.loc[i,c] ]
            rows.append((feat, int(g.loc[i,COL_LABEL])))
    X=np.array([f for f,_ in rows], dtype=np.float32) if rows else np.zeros((0,24), np.float32)
    y=np.array([y for _,y in rows], dtype=np.int32) if rows else np.zeros((0,), np.int32)
    X=np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)
    return X,y

def build_p2(df):
    # Try RobustFeatureExtractor if present
    try:
        from RFE import RobustFeatureExtractor
        rfe = RobustFeatureExtractor()
        Xs=[]; Ys=[]
        for sid,g in df.groupby("__stream_id__"):
            Xf = rfe.extract_from_dataframe(g[[COL_TIME]+COLS_SIGNAL])
            yv = g[COL_LABEL].values
            n  = min(len(Xf), len(yv))
            Xs.append(np.asarray(Xf[:n], np.float32)); Ys.append(yv[:n].astype(np.int32))
        X=np.vstack(Xs) if Xs else np.zeros((0,1), np.float32)
        y=np.concatenate(Ys) if Ys else np.zeros((0,), np.int32)
        X=np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)
        return X,y,"RFE"
    except Exception:
        # BASIC steady fallback (mean/std/min/max over sliding 15)
        rows=[]
        for sid,g in df.groupby("__stream_id__"):
            g=g.reset_index(drop=True)
            roll = g[COLS_SIGNAL].rolling(15, min_periods=5)
            mean = roll.mean(); std=roll.std().fillna(0.0); mn=roll.min(); mx=roll.max()
            for i in range(len(g)):
                feat=[]
                for c in COLS_SIGNAL:
                    feat += [ mean.loc[i,c] if pd.notna(mean.loc[i,c]) else g.loc[i,c],
                              std.loc[i,c]  if pd.notna(std.loc[i,c])  else 0.0,
                              mn.loc[i,c]   if pd.notna(mn.loc[i,c])   else g.loc[i,c],
                              mx.loc[i,c]   if pd.notna(mx.loc[i,c])   else g.loc[i,c] ]
                rows.append((feat, int(g.loc[i,COL_LABEL])))
        X=np.array([f for f,_ in rows], np.float32); y=np.array([y for _,y in rows], np.int32)
        X=np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)
        return X,y,"BASIC"

# ---------- BUILD ----------
X0_tr,y0_tr = build_p0(df_tr); X0_te,y0_te = build_p0(df_te)
X1_tr,y1_tr = build_p1(df_tr, k=5); X1_te,y1_te = build_p1(df_te, k=5)
X2_tr,y2_tr,mode2 = build_p2(df_tr); X2_te,y2_te,_ = build_p2(df_te)

print("Training files used:", [p.name for p in TRAIN_FILES])
print("Phase shapes: P0", X0_tr.shape, " P1", X1_tr.shape, " P2", X2_tr.shape, "(steady:", mode2, ")")

# ---------- FIT+EXPORT ----------
def fit_export(tag, Xtr,ytr, Xte,yte):
    scaler = StandardScaler().fit(Xtr)
    Ztr = scaler.transform(Xtr); Zte = scaler.transform(Xte)

    lr  = LogisticRegression(max_iter=1000, solver='lbfgs').fit(Ztr, ytr)
    write_lr_header(tag, scaler, lr)

    svc = LinearSVC().fit(Ztr, ytr)
    write_svc_header(tag, scaler, svc)

    rf  = RandomForestClassifier(n_estimators=25, max_depth=6, min_samples_leaf=2,
                                 max_features="sqrt", bootstrap=False, random_state=42).fit(Ztr, ytr)
    write_rf_header(tag, scaler, rf)

fit_export("P0", X0_tr,y0_tr, X0_te,y0_te)
fit_export("P1", X1_tr,y1_tr, X1_te,y1_te)
fit_export("P2", X2_tr,y2_tr, X2_te,y2_te)

print("✅ Headers written to ./include")


# **new FE - dual mode**

## RF

In [2]:
import pandas as pd
import numpy as np
import warnings
import os
import random
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from imblearn.over_sampling import SMOTE

warnings.filterwarnings("ignore")

# ================= CONFIGURATION =================
SEED = 42
np.random.seed(SEED)
random.seed(SEED)

TRAIN_FILES = [
    "./dataset/Train-set_1.xlsx",
    "./dataset/Train-set_2.xlsx"
]

ROLLING_WINDOWS = [5, 15]
NUM_LAGS = 3
WARMUP_PERIOD = 15

# ================= FEATURE EXTRACTORS =================
class ColdFeatureExtractor:
    def __init__(self, df, time_col="Time"):
        self.df = df.copy()
        if time_col in self.df.columns:
            self.df[time_col] = pd.to_datetime(self.df[time_col], errors='coerce')
        self.numeric_cols = [c for c in self.df.select_dtypes(include=np.number).columns if 'Label' not in c]

    def extract(self):
        out = pd.DataFrame(index=self.df.index)
        df_num = self.df[self.numeric_cols]
        for c in self.numeric_cols: out[c] = df_num[c]
        if len(self.numeric_cols) >= 2:
            for i, c1 in enumerate(self.numeric_cols):
                for c2 in self.numeric_cols[i+1:]:
                    out[f'inter_{c1}_x_{c2}'] = df_num[c1] * df_num[c2]
        return out.fillna(0)

class WarmFeatureExtractor:
    def __init__(self, df, rolling_windows, num_lags):
        self.df = df.copy()
        self.rolling_windows = rolling_windows
        self.num_lags = num_lags
        self.numeric_cols = [c for c in self.df.select_dtypes(include=np.number).columns if 'Label' not in c]

    def extract(self):
        out = pd.DataFrame(index=self.df.index)
        df_num = self.df[self.numeric_cols]
        for c in self.numeric_cols: out[c] = df_num[c]
        df_speed = df_num.diff()
        for c in self.numeric_cols: out[f'speed_change_{c}'] = df_speed[c]
        stats_list = ['mean', 'median', 'std', 'var', 'min', 'max']
        for w in self.rolling_windows:
            for c in self.numeric_cols:
                r = df_num[c].rolling(window=w, min_periods=1)
                for s in stats_list:
                    try: out[f'rolling_{s}_{w}_{c}'] = getattr(r, s)().fillna(0)
                    except: pass
        for w in self.rolling_windows:
            for c in self.numeric_cols:
                r = df_speed[c].rolling(window=w, min_periods=1)
                for s in ['mean', 'std']:
                    out[f'rolling_{s}_{w}_speed_change_{c}'] = getattr(r, s)().fillna(0)
        for i in range(1, self.num_lags + 1):
            for c in self.numeric_cols: out[f'lag_{i}_{c}'] = df_num[c].shift(i)
        span = self.rolling_windows[0]
        for c in self.numeric_cols: out[f'ewma_{span}_{c}'] = df_num[c].ewm(span=span).mean()
        if len(self.numeric_cols) >= 2:
            for i, c1 in enumerate(self.numeric_cols):
                for c2 in self.numeric_cols[i+1:]:
                    out[f'inter_{c1}_x_{c2}'] = df_num[c1] * df_num[c2]
        return out

# ================= C++ GENERATORS =================
def generate_specs_code(feature_names, raw_cols, array_name):
    lines = []
    stat_map = {'mean':0, 'median':1, 'std':2, 'var':3, 'min':4, 'max':5}
    for feat in feature_names:
        kind, stat, win, lag, ch1, ch2 = "FEAT_UNKNOWN", -1, 0, 0, -1, -1
        if feat in raw_cols:
            kind, ch1 = "FEAT_RAW", raw_cols.index(feat)
        elif feat.startswith("inter_"):
            kind = "FEAT_INTER"
            parts = feat[6:].split('_x_')
            if len(parts)==2 and parts[0] in raw_cols and parts[1] in raw_cols:
                ch1, ch2 = raw_cols.index(parts[0]), raw_cols.index(parts[1])
        elif feat.startswith("speed_change_"):
            kind = "FEAT_DIFF"
            col = feat.replace("speed_change_", "")
            if col in raw_cols: ch1 = raw_cols.index(col)
        elif "speed_change" in feat and "rolling_" in feat:
            kind = "FEAT_ROLL_DIFF"
            parts = feat.split('_')
            stat, win, col = stat_map.get(parts[1], -1), int(parts[2]), "_".join(parts[5:])
            if col in raw_cols: ch1 = raw_cols.index(col)
        elif feat.startswith("rolling_"):
            kind = "FEAT_ROLL_RAW"
            parts = feat.split('_')
            stat, win, col = stat_map.get(parts[1], -1), int(parts[2]), "_".join(parts[3:])
            if col in raw_cols: ch1 = raw_cols.index(col)
        elif feat.startswith("lag_"):
            kind = "FEAT_LAG"
            parts = feat.split('_')
            lag, col = int(parts[1]), "_".join(parts[2:])
            if col in raw_cols: ch1 = raw_cols.index(col)
        elif feat.startswith("ewma_"):
            kind = "FEAT_EWMA"
            parts = feat.split('_')
            win, col = int(parts[1]), "_".join(parts[2:])
            if col in raw_cols: ch1 = raw_cols.index(col)
        lines.append(f"  {{ {kind}, {stat}, {win}, {lag}, {ch1}, {ch2} }}, // {feat}")
    return f"static const FeatureSpec {array_name}[] = {{\n" + "\n".join(lines) + "\n};\n"

def export_rf_model(rf, scaler, prefix, filename):
    all_left, all_right, all_feat, all_thresh, all_prob1 = [], [], [], [], []
    offsets = [0]
    curr_offset = 0

    for est in rf.estimators_:
        tree = est.tree_
        for i in range(tree.node_count):
            is_leaf = (tree.children_left[i] == tree.children_right[i])
            all_thresh.append(tree.threshold[i])
            all_feat.append(tree.feature[i])
            if is_leaf:
                all_left.append(-1); all_right.append(-1)
                v = tree.value[i][0]
                all_prob1.append(v[1] / v.sum() if v.sum() > 0 else 0)
            else:
                all_left.append(tree.children_left[i] + curr_offset)
                all_right.append(tree.children_right[i] + curr_offset)
                all_prob1.append(0)
        curr_offset += tree.node_count
        offsets.append(curr_offset)

    with open(filename, "a") as f:
        f.write(f"\n// ===== MODEL: {prefix} =====\n")
        f.write(f"#define {prefix}_N_FEATURES {scaler.n_features_in_}\n")
        f.write(f"#define {prefix}_N_TREES {len(rf.estimators_)}\n")

        f.write(f"static const float {prefix}_SCALE_MEAN[] = {{ {', '.join(f'{x:.6f}' for x in scaler.mean_)} }};\n")
        f.write(f"static const float {prefix}_SCALE_STD[]  = {{ {', '.join(f'{x:.6f}' for x in scaler.scale_)} }};\n")

        def w(n, t, d): f.write(f"static const {t} {prefix}_{n}[] = {{ {', '.join(str(x) for x in d)} }};\n")
        w("TREE_OFFSETS", "int", offsets)
        w("FEATURE", "int", all_feat)
        w("THRESHOLD", "float", all_thresh)
        w("LEFT", "int", all_left)
        w("RIGHT", "int", all_right)
        w("PROB1", "float", all_prob1)

# ================= MAIN =================
def main():
    print("1. Loading Data...")
    dfs = []
    for f in TRAIN_FILES:
        try:
            d = pd.read_excel(f)
            d['Label'] = pd.to_numeric(d.get('Label', 1), errors='coerce').fillna(1)
            dfs.append(d)
        except: pass

    if not dfs:
        print("Error: No data loaded. Check file paths!")
        return

    df_all = pd.concat(dfs, ignore_index=True)
    raw_cols = [c for c in df_all.select_dtypes(include=np.number).columns if c != 'Label']

    sm = SMOTE(random_state=SEED)

    # --- TRAIN COLD MODEL ---
    print("2. Training COLD Model (with SMOTE)...")
    cold_ext = ColdFeatureExtractor(df_all)
    X_cold = cold_ext.extract()
    y_cold = df_all['Label']

    X_cold_res, y_cold_res = sm.fit_resample(X_cold, y_cold)
    scaler_cold = StandardScaler()
    X_cold_s = scaler_cold.fit_transform(X_cold_res)

    # n_jobs=1 ensures deterministic behavior
    rf_cold = RandomForestClassifier(n_estimators=10, max_depth=6, random_state=SEED, n_jobs=1)
    rf_cold.fit(X_cold_s, y_cold_res)

    # --- TRAIN WARM MODEL ---
    print("3. Training WARM Model (with SMOTE)...")
    warm_ext = WarmFeatureExtractor(df_all, ROLLING_WINDOWS, NUM_LAGS)
    X_warm = warm_ext.extract()
    mask = ~X_warm.isna().any(axis=1)
    mask.iloc[:WARMUP_PERIOD] = False
    X_warm_clean = X_warm[mask]
    y_warm_clean = df_all['Label'][mask]

    X_warm_res, y_warm_res = sm.fit_resample(X_warm_clean, y_warm_clean)
    scaler_warm = StandardScaler()
    X_warm_s = scaler_warm.fit_transform(X_warm_res)

    # n_jobs=1 ensures deterministic behavior
    rf_warm = RandomForestClassifier(n_estimators=20, max_depth=8, random_state=SEED, n_jobs=1)
    rf_warm.fit(X_warm_s, y_warm_res)

    # --- EXPORT ---
    print("4. Writing headers...")
    if os.path.exists("rfe_settings.h"): os.remove("rfe_settings.h")
    if os.path.exists("model_edge_dual.h"): os.remove("model_edge_dual.h")

    with open("rfe_settings.h", "w") as f:
        f.write("#pragma once\n\n")
        for i, c in enumerate(raw_cols): f.write(f"#define IDX_{c.upper().replace(' ','_')} {i}\n")
        f.write(f"#define NUM_RAW_INPUTS {len(raw_cols)}\n")
        f.write(f"#define WARMUP_PERIOD {WARMUP_PERIOD}\n")
        f.write(f"#define N_FEATURES_COLD {X_cold.shape[1]}\n")
        f.write(f"#define N_FEATURES_WARM {X_warm.shape[1]}\n\n")
        f.write("enum FeatureKind { FEAT_RAW, FEAT_DIFF, FEAT_ROLL_RAW, FEAT_ROLL_DIFF, FEAT_LAG, FEAT_EWMA, FEAT_INTER, FEAT_ROLL_TD, FEAT_UNKNOWN };\n")
        f.write("typedef struct { FeatureKind kind; int stat; int window; int lag; int channel1; int channel2; } FeatureSpec;\n\n")
        f.write(generate_specs_code(X_cold.columns, raw_cols, "FEATURE_SPECS_COLD"))
        f.write(generate_specs_code(X_warm.columns, raw_cols, "FEATURE_SPECS_WARM"))

    with open("model_edge_dual.h", "w") as f:
        f.write("#pragma once\n#include <stdint.h>\n")
    export_rf_model(rf_cold, scaler_cold, "RF_COLD", "model_edge_dual.h")
    export_rf_model(rf_warm, scaler_warm, "RF_WARM", "model_edge_dual.h")

    print("Done. Copy 'rfe_settings.h' and 'model_edge_dual.h' to your ESP32/src folder.")

if __name__ == "__main__":
    main()

1. Loading Data...
2. Training COLD Model (with SMOTE)...
3. Training WARM Model (with SMOTE)...
4. Writing headers...
Done. Copy 'rfe_settings.h' and 'model_edge_dual.h' to your ESP32/src folder.


## LR

In [2]:
import pandas as pd
import numpy as np
import warnings
import os
import random
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from imblearn.over_sampling import SMOTE

warnings.filterwarnings("ignore")

# ================= CONFIGURATION =================
# Enforce reproducibility
SEED = 42
np.random.seed(SEED)
random.seed(SEED)

TRAIN_FILES = [
    "./dataset/Train-set_1.xlsx",
    "./dataset/Train-set_2.xlsx"
]

ROLLING_WINDOWS = [5, 15]
NUM_LAGS = 3
WARMUP_PERIOD = 15

# ================= FEATURE EXTRACTORS =================
class ColdFeatureExtractor:
    def __init__(self, df, time_col="Time"):
        self.df = df.copy()
        if time_col in self.df.columns:
            self.df[time_col] = pd.to_datetime(self.df[time_col], errors='coerce')
        self.numeric_cols = [c for c in self.df.select_dtypes(include=np.number).columns if 'Label' not in c]

    def extract(self):
        out = pd.DataFrame(index=self.df.index)
        df_num = self.df[self.numeric_cols]
        for c in self.numeric_cols: out[c] = df_num[c]
        if len(self.numeric_cols) >= 2:
            for i, c1 in enumerate(self.numeric_cols):
                for c2 in self.numeric_cols[i+1:]:
                    out[f'inter_{c1}_x_{c2}'] = df_num[c1] * df_num[c2]
        return out.fillna(0)

class WarmFeatureExtractor:
    def __init__(self, df, rolling_windows, num_lags):
        self.df = df.copy()
        self.rolling_windows = rolling_windows
        self.num_lags = num_lags
        self.numeric_cols = [c for c in self.df.select_dtypes(include=np.number).columns if 'Label' not in c]

    def extract(self):
        out = pd.DataFrame(index=self.df.index)
        df_num = self.df[self.numeric_cols]
        for c in self.numeric_cols: out[c] = df_num[c]
        df_speed = df_num.diff()
        for c in self.numeric_cols: out[f'speed_change_{c}'] = df_speed[c]
        stats_list = ['mean', 'median', 'std', 'var', 'min', 'max']
        for w in self.rolling_windows:
            for c in self.numeric_cols:
                r = df_num[c].rolling(window=w, min_periods=1)
                for s in stats_list:
                    try: out[f'rolling_{s}_{w}_{c}'] = getattr(r, s)().fillna(0)
                    except: pass
        for w in self.rolling_windows:
            for c in self.numeric_cols:
                r = df_speed[c].rolling(window=w, min_periods=1)
                for s in ['mean', 'std']:
                    out[f'rolling_{s}_{w}_speed_change_{c}'] = getattr(r, s)().fillna(0)
        for i in range(1, self.num_lags + 1):
            for c in self.numeric_cols: out[f'lag_{i}_{c}'] = df_num[c].shift(i)
        span = self.rolling_windows[0]
        for c in self.numeric_cols: out[f'ewma_{span}_{c}'] = df_num[c].ewm(span=span).mean()
        if len(self.numeric_cols) >= 2:
            for i, c1 in enumerate(self.numeric_cols):
                for c2 in self.numeric_cols[i+1:]:
                    out[f'inter_{c1}_x_{c2}'] = df_num[c1] * df_num[c2]
        return out

# ================= C++ GENERATORS =================
def generate_specs_code(feature_names, raw_cols, array_name):
    lines = []
    stat_map = {'mean':0, 'median':1, 'std':2, 'var':3, 'min':4, 'max':5}
    for feat in feature_names:
        kind, stat, win, lag, ch1, ch2 = "FEAT_UNKNOWN", -1, 0, 0, -1, -1
        if feat in raw_cols:
            kind, ch1 = "FEAT_RAW", raw_cols.index(feat)
        elif feat.startswith("inter_"):
            kind = "FEAT_INTER"
            parts = feat[6:].split('_x_')
            if len(parts)==2 and parts[0] in raw_cols and parts[1] in raw_cols:
                ch1, ch2 = raw_cols.index(parts[0]), raw_cols.index(parts[1])
        elif feat.startswith("speed_change_"):
            kind = "FEAT_DIFF"
            col = feat.replace("speed_change_", "")
            if col in raw_cols: ch1 = raw_cols.index(col)
        elif "speed_change" in feat and "rolling_" in feat:
            kind = "FEAT_ROLL_DIFF"
            parts = feat.split('_')
            stat, win, col = stat_map.get(parts[1], -1), int(parts[2]), "_".join(parts[5:])
            if col in raw_cols: ch1 = raw_cols.index(col)
        elif feat.startswith("rolling_"):
            kind = "FEAT_ROLL_RAW"
            parts = feat.split('_')
            stat, win, col = stat_map.get(parts[1], -1), int(parts[2]), "_".join(parts[3:])
            if col in raw_cols: ch1 = raw_cols.index(col)
        elif feat.startswith("lag_"):
            kind = "FEAT_LAG"
            parts = feat.split('_')
            lag, col = int(parts[1]), "_".join(parts[2:])
            if col in raw_cols: ch1 = raw_cols.index(col)
        elif feat.startswith("ewma_"):
            kind = "FEAT_EWMA"
            parts = feat.split('_')
            win, col = int(parts[1]), "_".join(parts[2:])
            if col in raw_cols: ch1 = raw_cols.index(col)
        lines.append(f"  {{ {kind}, {stat}, {win}, {lag}, {ch1}, {ch2} }}, // {feat}")
    return f"static const FeatureSpec {array_name}[] = {{\n" + "\n".join(lines) + "\n};\n"

def export_lr_model(lr, scaler, prefix, filename):
    with open(filename, "a") as f:
        f.write(f"\n// ===== MODEL: {prefix} (Logistic Regression) =====\n")
        f.write(f"#define {prefix}_N_FEATURES {scaler.n_features_in_}\n")
        f.write(f"static const float {prefix}_SCALE_MEAN[] = {{ {', '.join(f'{x:.6f}' for x in scaler.mean_)} }};\n")
        f.write(f"static const float {prefix}_SCALE_STD[]  = {{ {', '.join(f'{x:.6f}' for x in scaler.scale_)} }};\n")
        coefs = lr.coef_[0]
        f.write(f"static const float {prefix}_COEF[] = {{ {', '.join(f'{x:.6f}' for x in coefs)} }};\n")
        bias = lr.intercept_[0]
        f.write(f"static const float {prefix}_BIAS = {bias:.6f};\n")

# ================= MAIN =================
def main():
    print("1. Loading Data...")
    dfs = []
    for f in TRAIN_FILES:
        try:
            d = pd.read_excel(f)
            d['Label'] = pd.to_numeric(d.get('Label', 1), errors='coerce').fillna(1)
            dfs.append(d)
        except: pass

    if not dfs:
        print("Error: No data loaded.")
        return

    df_all = pd.concat(dfs, ignore_index=True)
    raw_cols = [c for c in df_all.select_dtypes(include=np.number).columns if c != 'Label']

    sm = SMOTE(random_state=SEED)

    # --- TRAIN COLD MODEL (LR) ---
    print("2. Training COLD Model (LR + SMOTE)...")
    cold_ext = ColdFeatureExtractor(df_all)
    X_cold = cold_ext.extract()
    y_cold = df_all['Label']

    # APPLY SMOTE
    X_cold_res, y_cold_res = sm.fit_resample(X_cold, y_cold)

    scaler_cold = StandardScaler()
    X_cold_s = scaler_cold.fit_transform(X_cold_res)

    lr_cold = LogisticRegression(random_state=SEED, max_iter=1000)
    lr_cold.fit(X_cold_s, y_cold_res)

    # --- TRAIN WARM MODEL (LR) ---
    print("3. Training WARM Model (LR + SMOTE)...")
    warm_ext = WarmFeatureExtractor(df_all, ROLLING_WINDOWS, NUM_LAGS)
    X_warm = warm_ext.extract()
    mask = ~X_warm.isna().any(axis=1)
    mask.iloc[:WARMUP_PERIOD] = False
    X_warm_clean = X_warm[mask]
    y_warm_clean = df_all['Label'][mask]

    # APPLY SMOTE
    X_warm_res, y_warm_res = sm.fit_resample(X_warm_clean, y_warm_clean)

    scaler_warm = StandardScaler()
    X_warm_s = scaler_warm.fit_transform(X_warm_res)

    lr_warm = LogisticRegression(random_state=SEED, max_iter=1000)
    lr_warm.fit(X_warm_s, y_warm_res)

    # --- EXPORT ---
    print("4. Writing headers...")
    if os.path.exists("rfe_settings.h"): os.remove("rfe_settings.h")
    if os.path.exists("model_edge_dual.h"): os.remove("model_edge_dual.h")

    with open("rfe_settings.h", "w") as f:
        f.write("#pragma once\n\n")
        for i, c in enumerate(raw_cols): f.write(f"#define IDX_{c.upper().replace(' ','_')} {i}\n")
        f.write(f"#define NUM_RAW_INPUTS {len(raw_cols)}\n")
        f.write(f"#define WARMUP_PERIOD {WARMUP_PERIOD}\n")
        f.write(f"#define N_FEATURES_COLD {X_cold.shape[1]}\n")
        f.write(f"#define N_FEATURES_WARM {X_warm.shape[1]}\n\n")
        f.write("enum FeatureKind { FEAT_RAW, FEAT_DIFF, FEAT_ROLL_RAW, FEAT_ROLL_DIFF, FEAT_LAG, FEAT_EWMA, FEAT_INTER, FEAT_ROLL_TD, FEAT_UNKNOWN };\n")
        f.write("typedef struct { FeatureKind kind; int stat; int window; int lag; int channel1; int channel2; } FeatureSpec;\n\n")
        f.write(generate_specs_code(X_cold.columns, raw_cols, "FEATURE_SPECS_COLD"))
        f.write(generate_specs_code(X_warm.columns, raw_cols, "FEATURE_SPECS_WARM"))

    with open("model_edge_dual.h", "w") as f:
        f.write("#pragma once\n#include <stdint.h>\n")
    export_lr_model(lr_cold, scaler_cold, "LR_COLD", "model_edge_dual.h")
    export_lr_model(lr_warm, scaler_warm, "LR_WARM", "model_edge_dual.h")

    print("Done. Ready for ESP32 (LR Version).")

if __name__ == "__main__":
    main()

1. Loading Data...
2. Training COLD Model (LR + SMOTE)...
3. Training WARM Model (LR + SMOTE)...
4. Writing headers...
Done. Ready for ESP32 (LR Version).


## SVM

In [None]:
import pandas as pd
import numpy as np
import warnings
import os
import random
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
from imblearn.over_sampling import SMOTE

warnings.filterwarnings("ignore")

# ================= CONFIGURATION =================
SEED = 42
np.random.seed(SEED)
random.seed(SEED)

TRAIN_FILES = [
    "./dataset/Train-set_1.xlsx",
    "./dataset/Train-set_2.xlsx"
]

ROLLING_WINDOWS = [5, 15]
NUM_LAGS = 3
WARMUP_PERIOD = 15

# ================= FEATURE EXTRACTORS =================
class ColdFeatureExtractor:
    def __init__(self, df, time_col="Time"):
        self.df = df.copy()
        if time_col in self.df.columns:
            self.df[time_col] = pd.to_datetime(self.df[time_col], errors='coerce')
        self.numeric_cols = [c for c in self.df.select_dtypes(include=np.number).columns if 'Label' not in c]

    def extract(self):
        out = pd.DataFrame(index=self.df.index)
        df_num = self.df[self.numeric_cols]
        for c in self.numeric_cols: out[c] = df_num[c]
        if len(self.numeric_cols) >= 2:
            for i, c1 in enumerate(self.numeric_cols):
                for c2 in self.numeric_cols[i+1:]:
                    out[f'inter_{c1}_x_{c2}'] = df_num[c1] * df_num[c2]
        return out.fillna(0)

class WarmFeatureExtractor:
    def __init__(self, df, rolling_windows, num_lags):
        self.df = df.copy()
        self.rolling_windows = rolling_windows
        self.num_lags = num_lags
        self.numeric_cols = [c for c in self.df.select_dtypes(include=np.number).columns if 'Label' not in c]

    def extract(self):
        out = pd.DataFrame(index=self.df.index)
        df_num = self.df[self.numeric_cols]
        for c in self.numeric_cols: out[c] = df_num[c]
        df_speed = df_num.diff()
        for c in self.numeric_cols: out[f'speed_change_{c}'] = df_speed[c]
        stats_list = ['mean', 'median', 'std', 'var', 'min', 'max']
        for w in self.rolling_windows:
            for c in self.numeric_cols:
                r = df_num[c].rolling(window=w, min_periods=1)
                for s in stats_list:
                    try: out[f'rolling_{s}_{w}_{c}'] = getattr(r, s)().fillna(0)
                    except: pass
        for w in self.rolling_windows:
            for c in self.numeric_cols:
                r = df_speed[c].rolling(window=w, min_periods=1)
                for s in ['mean', 'std']:
                    out[f'rolling_{s}_{w}_speed_change_{c}'] = getattr(r, s)().fillna(0)
        for i in range(1, self.num_lags + 1):
            for c in self.numeric_cols: out[f'lag_{i}_{c}'] = df_num[c].shift(i)
        span = self.rolling_windows[0]
        for c in self.numeric_cols: out[f'ewma_{span}_{c}'] = df_num[c].ewm(span=span).mean()
        if len(self.numeric_cols) >= 2:
            for i, c1 in enumerate(self.numeric_cols):
                for c2 in self.numeric_cols[i+1:]:
                    out[f'inter_{c1}_x_{c2}'] = df_num[c1] * df_num[c2]
        return out

# ================= C++ GENERATORS =================
def generate_specs_code(feature_names, raw_cols, array_name):
    lines = []
    stat_map = {'mean':0, 'median':1, 'std':2, 'var':3, 'min':4, 'max':5}
    for feat in feature_names:
        kind, stat, win, lag, ch1, ch2 = "FEAT_UNKNOWN", -1, 0, 0, -1, -1
        if feat in raw_cols:
            kind, ch1 = "FEAT_RAW", raw_cols.index(feat)
        elif feat.startswith("inter_"):
            kind = "FEAT_INTER"
            parts = feat[6:].split('_x_')
            if len(parts)==2 and parts[0] in raw_cols and parts[1] in raw_cols:
                ch1, ch2 = raw_cols.index(parts[0]), raw_cols.index(parts[1])
        elif feat.startswith("speed_change_"):
            kind = "FEAT_DIFF"
            col = feat.replace("speed_change_", "")
            if col in raw_cols: ch1 = raw_cols.index(col)
        elif "speed_change" in feat and "rolling_" in feat:
            kind = "FEAT_ROLL_DIFF"
            parts = feat.split('_')
            stat, win, col = stat_map.get(parts[1], -1), int(parts[2]), "_".join(parts[5:])
            if col in raw_cols: ch1 = raw_cols.index(col)
        elif feat.startswith("rolling_"):
            kind = "FEAT_ROLL_RAW"
            parts = feat.split('_')
            stat, win, col = stat_map.get(parts[1], -1), int(parts[2]), "_".join(parts[3:])
            if col in raw_cols: ch1 = raw_cols.index(col)
        elif feat.startswith("lag_"):
            kind = "FEAT_LAG"
            parts = feat.split('_')
            lag, col = int(parts[1]), "_".join(parts[2:])
            if col in raw_cols: ch1 = raw_cols.index(col)
        elif feat.startswith("ewma_"):
            kind = "FEAT_EWMA"
            parts = feat.split('_')
            win, col = int(parts[1]), "_".join(parts[2:])
            if col in raw_cols: ch1 = raw_cols.index(col)
        lines.append(f"  {{ {kind}, {stat}, {win}, {lag}, {ch1}, {ch2} }}, // {feat}")
    return f"static const FeatureSpec {array_name}[] = {{\n" + "\n".join(lines) + "\n};\n"

def export_svm_model(svm, scaler, prefix, filename):
    with open(filename, "a") as f:
        f.write(f"\n// ===== MODEL: {prefix} (Linear SVM) =====\n")
        f.write(f"#define {prefix}_N_FEATURES {scaler.n_features_in_}\n")
        f.write(f"static const float {prefix}_SCALE_MEAN[] = {{ {', '.join(f'{x:.6f}' for x in scaler.mean_)} }};\n")
        f.write(f"static const float {prefix}_SCALE_STD[]  = {{ {', '.join(f'{x:.6f}' for x in scaler.scale_)} }};\n")
        coefs = svm.coef_[0]
        f.write(f"static const float {prefix}_COEF[] = {{ {', '.join(f'{x:.6f}' for x in coefs)} }};\n")
        bias = svm.intercept_[0]
        f.write(f"static const float {prefix}_BIAS = {bias:.6f};\n")
        pa = svm.probA_[0] if hasattr(svm, 'probA_') else 0.0
        pb = svm.probB_[0] if hasattr(svm, 'probB_') else 0.0
        f.write(f"static const float {prefix}_PROB_A = {pa:.6f};\n")
        f.write(f"static const float {prefix}_PROB_B = {pb:.6f};\n")

# ================= MAIN =================
def main():
    print("1. Loading Data...")
    dfs = []
    for f in TRAIN_FILES:
        try:
            d = pd.read_excel(f)
            d['Label'] = pd.to_numeric(d.get('Label', 1), errors='coerce').fillna(1)
            dfs.append(d)
        except: pass

    if not dfs:
        print("Error: No data loaded.")
        return

    df_all = pd.concat(dfs, ignore_index=True)
    raw_cols = [c for c in df_all.select_dtypes(include=np.number).columns if c != 'Label']

    sm = SMOTE(random_state=SEED)

    # --- TRAIN COLD MODEL (SVM) ---
    print("2. Training COLD Model (SVM + SMOTE)...")
    cold_ext = ColdFeatureExtractor(df_all)
    X_cold = cold_ext.extract()
    y_cold = df_all['Label']

    X_cold_res, y_cold_res = sm.fit_resample(X_cold, y_cold)
    scaler_cold = StandardScaler()
    X_cold_s = scaler_cold.fit_transform(X_cold_res)

    svm_cold = SVC(kernel='linear', probability=True, random_state=SEED)
    svm_cold.fit(X_cold_s, y_cold_res)

    # --- TRAIN WARM MODEL (SVM) ---
    print("3. Training WARM Model (SVM + SMOTE)...")
    warm_ext = WarmFeatureExtractor(df_all, ROLLING_WINDOWS, NUM_LAGS)
    X_warm = warm_ext.extract()
    mask = ~X_warm.isna().any(axis=1)
    mask.iloc[:WARMUP_PERIOD] = False
    X_warm_clean = X_warm[mask]
    y_warm_clean = df_all['Label'][mask]

    X_warm_res, y_warm_res = sm.fit_resample(X_warm_clean, y_warm_clean)
    scaler_warm = StandardScaler()
    X_warm_s = scaler_warm.fit_transform(X_warm_res)

    svm_warm = SVC(kernel='linear', probability=True, random_state=SEED)
    svm_warm.fit(X_warm_s, y_warm_res)

    # --- EXPORT ---
    print("4. Writing headers...")
    if os.path.exists("rfe_settings.h"): os.remove("rfe_settings.h")
    if os.path.exists("model_edge_dual.h"): os.remove("model_edge_dual.h")

    with open("rfe_settings.h", "w") as f:
        f.write("#pragma once\n\n")
        for i, c in enumerate(raw_cols): f.write(f"#define IDX_{c.upper().replace(' ','_')} {i}\n")
        f.write(f"#define NUM_RAW_INPUTS {len(raw_cols)}\n")
        f.write(f"#define WARMUP_PERIOD {WARMUP_PERIOD}\n")
        f.write(f"#define N_FEATURES_COLD {X_cold.shape[1]}\n")
        f.write(f"#define N_FEATURES_WARM {X_warm.shape[1]}\n\n")
        f.write("enum FeatureKind { FEAT_RAW, FEAT_DIFF, FEAT_ROLL_RAW, FEAT_ROLL_DIFF, FEAT_LAG, FEAT_EWMA, FEAT_INTER, FEAT_ROLL_TD, FEAT_UNKNOWN };\n")
        f.write("typedef struct { FeatureKind kind; int stat; int window; int lag; int channel1; int channel2; } FeatureSpec;\n\n")
        f.write(generate_specs_code(X_cold.columns, raw_cols, "FEATURE_SPECS_COLD"))
        f.write(generate_specs_code(X_warm.columns, raw_cols, "FEATURE_SPECS_WARM"))

    with open("model_edge_dual.h", "w") as f:
        f.write("#pragma once\n#include <stdint.h>\n")
    export_svm_model(svm_cold, scaler_cold, "SVM_COLD", "model_edge_dual.h")
    export_svm_model(svm_warm, scaler_warm, "SVM_WARM", "model_edge_dual.h")

    print("Done. Ready for ESP32 (SVM Version).")

if __name__ == "__main__":
    main()

# tsassuse (new)

In [None]:
!uv add tsassure_feature

## RF

In [None]:
import pandas as pd
import numpy as np
import warnings
import os
import random
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from imblearn.over_sampling import SMOTE

warnings.filterwarnings("ignore")

# ================= CONFIGURATION =================
SEED = 42
np.random.seed(SEED)
random.seed(SEED)

TRAIN_FILES = [
    "./dataset/Train-set_1.xlsx",
    "./dataset/Train-set_2.xlsx"
]

class TsAssureExtractor:
    def __init__(self, df, main_col_idx=0):
        self.raw_cols = [c for c in df.select_dtypes(include=np.number).columns if 'Label' not in c]
        self.main_col = self.raw_cols[main_col_idx]
        self.correlated_pairs = []

        print(f"Finding correlations relative to main column: '{self.main_col}'...")
        candidates = []
        for col in self.raw_cols:
            if col != self.main_col:
                corr = df[self.main_col].corr(df[col])
                if abs(corr) > 0.7:
                    candidates.append(col)

        for i, c1 in enumerate(candidates):
            for c2 in candidates[i+1:]:
                corr = df[c1].corr(df[c2])
                if abs(corr) > 0.65:
                    self.correlated_pairs.append((c1, c2))

    def extract(self, df):
        out = pd.DataFrame(index=df.index)
        df_num = df[self.raw_cols]
        col_0 = df_num[self.main_col]

        # 1. Raw & DiffMain
        out[self.main_col] = col_0
        for col in self.raw_cols:
            if col != self.main_col:
                out[col] = df_num[col]
                out[f'DiffMain_{col}'] = col_0 - df_num[col]

        # 2. Speed
        df_speed = df_num.diff()
        for col in self.raw_cols:
            out[f'speed_change_{col}'] = df_speed[col]

        # 3. PRD
        prev_0 = col_0.shift(1)
        mean_val = (col_0 + prev_0) * 0.5
        with np.errstate(divide='ignore', invalid='ignore'):
            prd = abs(col_0 - prev_0) / mean_val
        out['PRD'] = prd.fillna(0)

        # 4. Pairs
        for c1, c2 in self.correlated_pairs:
            out[f'DiffPair_{c1}_{c2}'] = df_num[c1] - df_num[c2]

        return out.fillna(0)

def generate_tsassure_settings(extractor, raw_cols):
    lines = []
    lines.append(f"#define NUM_RAW_INPUTS {len(raw_cols)}")
    col_map = {name: i for i, name in enumerate(raw_cols)}
    lines.append(f"#define IDX_MAIN_COL {col_map[extractor.main_col]}")
    lines.append(f"#define NUM_PAIRS {len(extractor.correlated_pairs)}")
    pairs_str = ""
    for c1, c2 in extractor.correlated_pairs:
        pairs_str += f"  {{ {col_map[c1]}, {col_map[c2]} }}, // {c1}-{c2}\n"
    return lines, pairs_str

def export_rf_model(rf, scaler, prefix, filename):
    all_left, all_right, all_feat, all_thresh, all_prob1 = [], [], [], [], []
    offsets = [0]
    curr_offset = 0

    for est in rf.estimators_:
        tree = est.tree_
        for i in range(tree.node_count):
            is_leaf = (tree.children_left[i] == tree.children_right[i])
            all_thresh.append(tree.threshold[i])
            all_feat.append(tree.feature[i])
            if is_leaf:
                all_left.append(-1); all_right.append(-1)
                v = tree.value[i][0]
                all_prob1.append(v[1] / v.sum() if v.sum() > 0 else 0)
            else:
                all_left.append(tree.children_left[i] + curr_offset)
                all_right.append(tree.children_right[i] + curr_offset)
                all_prob1.append(0)
        curr_offset += tree.node_count
        offsets.append(curr_offset)

    with open(filename, "a") as f:
        f.write(f"\n// ===== MODEL: {prefix} =====\n")
        f.write(f"#define {prefix}_N_FEATURES {scaler.n_features_in_}\n")
        f.write(f"#define {prefix}_N_TREES {len(rf.estimators_)}\n")
        f.write(f"static const float {prefix}_SCALE_MEAN[] = {{ {', '.join(f'{x:.6f}' for x in scaler.mean_)} }};\n")
        f.write(f"static const float {prefix}_SCALE_STD[]  = {{ {', '.join(f'{x:.6f}' for x in scaler.scale_)} }};\n")

        def w(n, t, d): f.write(f"static const {t} {prefix}_{n}[] = {{ {', '.join(str(x) for x in d)} }};\n")
        w("TREE_OFFSETS", "int", offsets)
        w("FEATURE", "int", all_feat)
        w("THRESHOLD", "float", all_thresh)
        w("LEFT", "int", all_left)
        w("RIGHT", "int", all_right)
        w("PROB1", "float", all_prob1)

def main():
    print("1. Loading Data...")
    dfs = []
    for f in TRAIN_FILES:
        try:
            d = pd.read_excel(f)
            d['Label'] = pd.to_numeric(d.get('Label', 1), errors='coerce').fillna(1)
            dfs.append(d)
        except: pass
    if not dfs: return
    df_all = pd.concat(dfs, ignore_index=True)

    print("2. Training TsAssure Model (RF with SMOTE)...")
    ext = TsAssureExtractor(df_all, main_col_idx=0)
    X = ext.extract(df_all)
    y = df_all['Label']

    sm = SMOTE(random_state=SEED)
    X_res, y_res = sm.fit_resample(X, y)

    scaler = StandardScaler()
    X_s = scaler.fit_transform(X_res)

    # Single Thread for reproducibility
    rf = RandomForestClassifier(n_estimators=20, max_depth=8, random_state=SEED, n_jobs=1)
    rf.fit(X_s, y_res)

    print("3. Writing headers...")
    if os.path.exists("tsassure_settings.h"): os.remove("tsassure_settings.h")
    if os.path.exists("model_edge.h"): os.remove("model_edge.h")

    setting_lines, pairs_str = generate_tsassure_settings(ext, ext.raw_cols)

    with open("tsassure_settings.h", "w") as f:
        f.write("#pragma once\n\n")
        for i, c in enumerate(ext.raw_cols): f.write(f"#define IDX_{c.upper().replace(' ','_')} {i}\n")
        for line in setting_lines: f.write(line + "\n")
        f.write(f"\nstatic const int CORR_PAIRS[NUM_PAIRS > 0 ? NUM_PAIRS : 1][2] = {{\n{pairs_str}}};\n")
        f.write(f"\n#define TS_N_FEATURES {X.shape[1]}\n")

    with open("model_edge.h", "w") as f:
        f.write("#pragma once\n#include <stdint.h>\n")
    export_rf_model(rf, scaler, "RF", "model_edge.h")

    print(f"Done. Features: {X.shape[1]}")

if __name__ == "__main__":
    main()

## LR

In [None]:
import pandas as pd
import numpy as np
import warnings
import os
import random
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from imblearn.over_sampling import SMOTE

warnings.filterwarnings("ignore")

# ================= CONFIGURATION =================
SEED = 42
np.random.seed(SEED)
random.seed(SEED)

TRAIN_FILES = [
    "/content/dataset/Train-set_1.xlsx",
    "/content/dataset/Train-set_2.xlsx"
]

# ================= TSASSURE EXTRACTOR =================
class TsAssureExtractor:
    def __init__(self, df, main_col_idx=0):
        self.raw_cols = [c for c in df.select_dtypes(include=np.number).columns if 'Label' not in c]
        self.main_col = self.raw_cols[main_col_idx]
        self.correlated_pairs = []

        print(f"Finding correlations relative to main column: '{self.main_col}'...")
        candidates = []
        for col in self.raw_cols:
            if col != self.main_col:
                corr = df[self.main_col].corr(df[col])
                if abs(corr) > 0.7:
                    candidates.append(col)

        for i, c1 in enumerate(candidates):
            for c2 in candidates[i+1:]:
                corr = df[c1].corr(df[c2])
                if abs(corr) > 0.65:
                    self.correlated_pairs.append((c1, c2))

        print(f"Found {len(self.correlated_pairs)} correlated pairs: {self.correlated_pairs}")

    def extract(self, df):
        out = pd.DataFrame(index=df.index)
        df_num = df[self.raw_cols]
        col_0 = df_num[self.main_col]

        # 1. Raw & DiffMain
        out[self.main_col] = col_0
        for col in self.raw_cols:
            if col != self.main_col:
                out[col] = df_num[col]
                out[f'DiffMain_{col}'] = col_0 - df_num[col]

        # 2. Speed
        df_speed = df_num.diff()
        for col in self.raw_cols:
            out[f'speed_change_{col}'] = df_speed[col]

        # 3. PRD
        prev_0 = col_0.shift(1)
        mean_val = (col_0 + prev_0) * 0.5
        with np.errstate(divide='ignore', invalid='ignore'):
            prd = abs(col_0 - prev_0) / mean_val
        out['PRD'] = prd.fillna(0)

        # 4. Pairs
        for c1, c2 in self.correlated_pairs:
            out[f'DiffPair_{c1}_{c2}'] = df_num[c1] - df_num[c2]

        return out.fillna(0)

# ================= C++ GENERATORS =================
def generate_tsassure_settings(extractor, raw_cols):
    lines = []
    lines.append(f"#define NUM_RAW_INPUTS {len(raw_cols)}")
    col_map = {name: i for i, name in enumerate(raw_cols)}
    lines.append(f"#define IDX_MAIN_COL {col_map[extractor.main_col]}")
    lines.append(f"#define NUM_PAIRS {len(extractor.correlated_pairs)}")
    pairs_str = ""
    for c1, c2 in extractor.correlated_pairs:
        pairs_str += f"  {{ {col_map[c1]}, {col_map[c2]} }}, // {c1}-{c2}\n"
    return lines, pairs_str

def export_lr_model(lr, scaler, prefix, filename):
    with open(filename, "a") as f:
        f.write(f"\n// ===== MODEL: {prefix} (Logistic Regression) =====\n")
        f.write(f"#define {prefix}_N_FEATURES {scaler.n_features_in_}\n")
        f.write(f"static const float {prefix}_SCALE_MEAN[] = {{ {', '.join(f'{x:.6f}' for x in scaler.mean_)} }};\n")
        f.write(f"static const float {prefix}_SCALE_STD[]  = {{ {', '.join(f'{x:.6f}' for x in scaler.scale_)} }};\n")
        coefs = lr.coef_[0]
        f.write(f"static const float {prefix}_COEF[] = {{ {', '.join(f'{x:.6f}' for x in coefs)} }};\n")
        bias = lr.intercept_[0]
        f.write(f"static const float {prefix}_BIAS = {bias:.6f};\n")

# ================= MAIN =================
def main():
    print("1. Loading Data...")
    dfs = []
    for f in TRAIN_FILES:
        try:
            d = pd.read_excel(f)
            d['Label'] = pd.to_numeric(d.get('Label', 1), errors='coerce').fillna(1)
            dfs.append(d)
        except: pass
    if not dfs: return
    df_all = pd.concat(dfs, ignore_index=True)

    # --- TRAIN TSASSURE MODEL (LR) ---
    print("2. Training TsAssure Model (LR with SMOTE)...")
    ext = TsAssureExtractor(df_all, main_col_idx=0)
    X = ext.extract(df_all)
    y = df_all['Label']

    # SMOTE REQUIREMENT
    sm = SMOTE(random_state=SEED)
    X_res, y_res = sm.fit_resample(X, y)

    scaler = StandardScaler()
    X_s = scaler.fit_transform(X_res)

    lr = LogisticRegression(random_state=SEED, max_iter=1000)
    lr.fit(X_s, y_res)

    # --- EXPORT ---
    print("3. Writing headers...")
    if os.path.exists("tsassure_settings.h"): os.remove("tsassure_settings.h")
    if os.path.exists("model_edge.h"): os.remove("model_edge.h")

    setting_lines, pairs_str = generate_tsassure_settings(ext, ext.raw_cols)

    with open("tsassure_settings.h", "w") as f:
        f.write("#pragma once\n\n")
        for i, c in enumerate(ext.raw_cols): f.write(f"#define IDX_{c.upper().replace(' ','_')} {i}\n")
        for line in setting_lines: f.write(line + "\n")
        f.write(f"\nstatic const int CORR_PAIRS[NUM_PAIRS > 0 ? NUM_PAIRS : 1][2] = {{\n{pairs_str}}};\n")
        f.write(f"\n#define TS_N_FEATURES {X.shape[1]}\n")

    with open("model_edge.h", "w") as f:
        f.write("#pragma once\n#include <stdint.h>\n")
        export_lr_model(lr, scaler, "LR", "model_edge.h")

    print(f"Done. Features: {X.shape[1]}")

if __name__ == "__main__":
    main()

## SVM

In [None]:
import pandas as pd
import numpy as np
import warnings
import os
import random
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
from imblearn.over_sampling import SMOTE

warnings.filterwarnings("ignore")

# ================= CONFIGURATION =================
SEED = 42
np.random.seed(SEED)
random.seed(SEED)

TRAIN_FILES = [
    "/content/dataset/Train-set_1.xlsx",
    "/content/dataset/Train-set_2.xlsx"
]

class TsAssureExtractor:
    def __init__(self, df, main_col_idx=0):
        self.raw_cols = [c for c in df.select_dtypes(include=np.number).columns if 'Label' not in c]
        self.main_col = self.raw_cols[main_col_idx]
        self.correlated_pairs = []

        print(f"Finding correlations relative to main column: '{self.main_col}'...")
        candidates = []
        for col in self.raw_cols:
            if col != self.main_col:
                corr = df[self.main_col].corr(df[col])
                if abs(corr) > 0.7:
                    candidates.append(col)

        for i, c1 in enumerate(candidates):
            for c2 in candidates[i+1:]:
                corr = df[c1].corr(df[c2])
                if abs(corr) > 0.65:
                    self.correlated_pairs.append((c1, c2))

    def extract(self, df):
        out = pd.DataFrame(index=df.index)
        df_num = df[self.raw_cols]
        col_0 = df_num[self.main_col]

        # 1. Raw & DiffMain
        out[self.main_col] = col_0
        for col in self.raw_cols:
            if col != self.main_col:
                out[col] = df_num[col]
                out[f'DiffMain_{col}'] = col_0 - df_num[col]

        # 2. Speed
        df_speed = df_num.diff()
        for col in self.raw_cols:
            out[f'speed_change_{col}'] = df_speed[col]

        # 3. PRD
        prev_0 = col_0.shift(1)
        mean_val = (col_0 + prev_0) * 0.5
        with np.errstate(divide='ignore', invalid='ignore'):
            prd = abs(col_0 - prev_0) / mean_val
        out['PRD'] = prd.fillna(0)

        # 4. Pairs
        for c1, c2 in self.correlated_pairs:
            out[f'DiffPair_{c1}_{c2}'] = df_num[c1] - df_num[c2]

        return out.fillna(0)

def generate_tsassure_settings(extractor, raw_cols):
    lines = []
    lines.append(f"#define NUM_RAW_INPUTS {len(raw_cols)}")
    col_map = {name: i for i, name in enumerate(raw_cols)}
    lines.append(f"#define IDX_MAIN_COL {col_map[extractor.main_col]}")
    lines.append(f"#define NUM_PAIRS {len(extractor.correlated_pairs)}")
    pairs_str = ""
    for c1, c2 in extractor.correlated_pairs:
        pairs_str += f"  {{ {col_map[c1]}, {col_map[c2]} }}, // {c1}-{c2}\n"
    return lines, pairs_str

def export_svm_model(svm, scaler, prefix, filename):
    with open(filename, "a") as f:
        f.write(f"\n// ===== MODEL: {prefix} (Linear SVM) =====\n")
        f.write(f"#define {prefix}_N_FEATURES {scaler.n_features_in_}\n")
        f.write(f"static const float {prefix}_SCALE_MEAN[] = {{ {', '.join(f'{x:.6f}' for x in scaler.mean_)} }};\n")
        f.write(f"static const float {prefix}_SCALE_STD[]  = {{ {', '.join(f'{x:.6f}' for x in scaler.scale_)} }};\n")
        coefs = svm.coef_[0]
        f.write(f"static const float {prefix}_COEF[] = {{ {', '.join(f'{x:.6f}' for x in coefs)} }};\n")
        bias = svm.intercept_[0]
        f.write(f"static const float {prefix}_BIAS = {bias:.6f};\n")
        pa = svm.probA_[0] if hasattr(svm, 'probA_') else 0.0
        pb = svm.probB_[0] if hasattr(svm, 'probB_') else 0.0
        f.write(f"static const float {prefix}_PROB_A = {pa:.6f};\n")
        f.write(f"static const float {prefix}_PROB_B = {pb:.6f};\n")

def main():
    print("1. Loading Data...")
    dfs = []
    for f in TRAIN_FILES:
        try:
            d = pd.read_excel(f)
            d['Label'] = pd.to_numeric(d.get('Label', 1), errors='coerce').fillna(1)
            dfs.append(d)
        except: pass
    if not dfs: return
    df_all = pd.concat(dfs, ignore_index=True)

    print("2. Training TsAssure Model (SVM with SMOTE)...")
    ext = TsAssureExtractor(df_all, main_col_idx=0)
    X = ext.extract(df_all)
    y = df_all['Label']

    sm = SMOTE(random_state=SEED)
    X_res, y_res = sm.fit_resample(X, y)

    scaler = StandardScaler()
    X_s = scaler.fit_transform(X_res)

    svm = SVC(kernel='linear', probability=True, random_state=SEED)
    svm.fit(X_s, y_res)

    print("3. Writing headers...")
    if os.path.exists("tsassure_settings.h"): os.remove("tsassure_settings.h")
    if os.path.exists("model_edge.h"): os.remove("model_edge.h")

    setting_lines, pairs_str = generate_tsassure_settings(ext, ext.raw_cols)

    with open("tsassure_settings.h", "w") as f:
        f.write("#pragma once\n\n")
        for i, c in enumerate(ext.raw_cols): f.write(f"#define IDX_{c.upper().replace(' ','_')} {i}\n")
        for line in setting_lines: f.write(line + "\n")
        f.write(f"\nstatic const int CORR_PAIRS[NUM_PAIRS > 0 ? NUM_PAIRS : 1][2] = {{\n{pairs_str}}};\n")
        f.write(f"\n#define TS_N_FEATURES {X.shape[1]}\n")

    with open("model_edge.h", "w") as f:
        f.write("#pragma once\n#include <stdint.h>\n")
        export_svm_model(svm, scaler, "SVM", "model_edge.h")

    print(f"Done. Features: {X.shape[1]}")

if __name__ == "__main__":
    main()

# Hjorth Parameters

## LR

In [None]:
import pandas as pd
import numpy as np
import warnings
import os
import random
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from imblearn.over_sampling import SMOTE

warnings.filterwarnings("ignore")

# ================= CONFIGURATION =================
SEED = 42
np.random.seed(SEED)
random.seed(SEED)

TRAIN_FILES = [
    "/content/dataset/Train-set_1.xlsx",
    "/content/dataset/Train-set_2.xlsx"
]
WINDOW_SIZE = 10  # Window size for Hjorth parameter calculation

# ================= HJORTH EXTRACTOR =================
class HjorthExtractor:
    def __init__(self, df):
        # We process all numeric columns as independent sensors
        self.raw_cols = [c for c in df.select_dtypes(include=np.number).columns if 'Label' not in c]
        print(f"Extracting Hjorth parameters for: {self.raw_cols}")

    def extract(self, df):
        out = pd.DataFrame(index=df.index)

        for col in self.raw_cols:
            x = df[col]

            # First Derivative (Velocity)
            dx = x.diff()
            # Second Derivative (Acceleration)
            ddx = dx.diff()

            # 1. Activity = Variance of the signal
            activity = x.rolling(WINDOW_SIZE).var()

            # Variance of derivatives
            var_dx = dx.rolling(WINDOW_SIZE).var()
            var_ddx = ddx.rolling(WINDOW_SIZE).var()

            # 2. Mobility = sqrt(Var(dx) / Var(x))
            # We use a small epsilon to avoid division by zero
            mobility = np.sqrt(var_dx.div(activity + 1e-9))

            # 3. Complexity = Mobility(dx) / Mobility(x)
            # Mobility(dx) = sqrt(Var(ddx) / Var(dx))
            mob_dx = np.sqrt(var_ddx.div(var_dx + 1e-9))
            complexity = mob_dx.div(mobility + 1e-9)

            out[f'Act_{col}'] = activity
            out[f'Mob_{col}'] = mobility
            out[f'Comp_{col}'] = complexity

        return out.fillna(0)

# ================= C++ GENERATORS =================
def generate_hjorth_settings(extractor):
    lines = []
    lines.append(f"#define NUM_RAW_INPUTS {len(extractor.raw_cols)}")
    lines.append(f"#define HJORTH_WINDOW_SIZE {WINDOW_SIZE}")
    return lines

def export_lr_model(lr, scaler, prefix, filename):
    with open(filename, "a") as f:
        f.write(f"\n// ===== MODEL: {prefix} (Logistic Regression) =====\n")
        f.write(f"#define {prefix}_N_FEATURES {scaler.n_features_in_}\n")

        f.write(f"static const float {prefix}_SCALE_MEAN[] = {{ {', '.join(f'{x:.6f}' for x in scaler.mean_)} }};\n")
        f.write(f"static const float {prefix}_SCALE_STD[]  = {{ {', '.join(f'{x:.6f}' for x in scaler.scale_)} }};\n")

        coefs = lr.coef_[0]
        f.write(f"static const float {prefix}_COEF[] = {{ {', '.join(f'{x:.6f}' for x in coefs)} }};\n")

        bias = lr.intercept_[0]
        f.write(f"static const float {prefix}_BIAS = {bias:.6f};\n")

# ================= MAIN =================
def main():
    print("1. Loading Data...")
    dfs = []
    for f in TRAIN_FILES:
        try:
            d = pd.read_excel(f)
            d['Label'] = pd.to_numeric(d.get('Label', 1), errors='coerce').fillna(1)
            dfs.append(d)
        except: pass
    if not dfs: return
    df_all = pd.concat(dfs, ignore_index=True)

    # --- TRAIN HJORTH MODEL (LR with SMOTE) ---
    print("2. Training Hjorth Model (LR + SMOTE)...")
    ext = HjorthExtractor(df_all)
    X = ext.extract(df_all)
    y = df_all['Label']

    # Drop NaNs created by rolling window
    mask = ~X.isna().any(axis=1)
    X_clean = X[mask]
    y_clean = y[mask]

    # --- APPLY SMOTE ---
    sm = SMOTE(random_state=SEED)
    X_res, y_res = sm.fit_resample(X_clean, y_clean)

    scaler = StandardScaler()
    X_s = scaler.fit_transform(X_res)

    lr = LogisticRegression(random_state=SEED, max_iter=1000)
    lr.fit(X_s, y_res)

    # --- EXPORT ---
    print("3. Writing headers...")
    if os.path.exists("hjorth_settings.h"): os.remove("hjorth_settings.h")
    if os.path.exists("model_edge.h"): os.remove("model_edge.h")

    setting_lines = generate_hjorth_settings(ext)

    with open("hjorth_settings.h", "w") as f:
        f.write("#pragma once\n\n")
        for i, c in enumerate(ext.raw_cols): f.write(f"#define IDX_{c.upper().replace(' ','_')} {i}\n")
        for line in setting_lines: f.write(line + "\n")
        f.write(f"\n#define HJORTH_N_FEATURES {X.shape[1]}\n")

    with open("model_edge.h", "w") as f:
        f.write("#pragma once\n#include <stdint.h>\n")
        export_lr_model(lr, scaler, "LR", "model_edge.h")

    print(f"Done. Features: {X.shape[1]}")

if __name__ == "__main__":
    main()

## RF

In [None]:
import pandas as pd
import numpy as np
import warnings
import os
import random
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from imblearn.over_sampling import SMOTE

warnings.filterwarnings("ignore")

# ================= CONFIGURATION =================
SEED = 42
np.random.seed(SEED)
random.seed(SEED)

TRAIN_FILES = [
    "/content/dataset/Train-set_1.xlsx",
    "/content/dataset/Train-set_2.xlsx"
]
WINDOW_SIZE = 10

# ================= HJORTH EXTRACTOR =================
class HjorthExtractor:
    def __init__(self, df):
        self.raw_cols = [c for c in df.select_dtypes(include=np.number).columns if 'Label' not in c]
        print(f"Extracting Hjorth parameters for: {self.raw_cols}")

    def extract(self, df):
        out = pd.DataFrame(index=df.index)
        for col in self.raw_cols:
            x = df[col]
            dx = x.diff()
            ddx = dx.diff()

            activity = x.rolling(WINDOW_SIZE).var()
            var_dx = dx.rolling(WINDOW_SIZE).var()
            var_ddx = ddx.rolling(WINDOW_SIZE).var()

            mobility = np.sqrt(var_dx.div(activity + 1e-9))
            mob_dx = np.sqrt(var_ddx.div(var_dx + 1e-9))
            complexity = mob_dx.div(mobility + 1e-9)

            out[f'Act_{col}'] = activity
            out[f'Mob_{col}'] = mobility
            out[f'Comp_{col}'] = complexity
        return out.fillna(0)

# ================= C++ GENERATORS =================
def generate_hjorth_settings(extractor):
    lines = []
    lines.append(f"#define NUM_RAW_INPUTS {len(extractor.raw_cols)}")
    lines.append(f"#define HJORTH_WINDOW_SIZE {WINDOW_SIZE}")
    return lines

def export_rf_model(rf, scaler, filename):
    n_trees = len(rf.estimators_)
    g_left, g_right, g_feature, g_threshold, g_value = [], [], [], [], []
    tree_roots = []
    offset = 0

    for estimator in rf.estimators_:
        tree = estimator.tree_
        n_nodes = tree.node_count
        tree_roots.append(offset)

        t_left = tree.children_left
        t_right = tree.children_right
        t_feature = tree.feature
        t_threshold = tree.threshold
        t_value = tree.value[:, 0, :]
        with np.errstate(divide='ignore', invalid='ignore'):
            probs = t_value[:, 1] / t_value.sum(axis=1)
            probs = np.nan_to_num(probs)

        for i in range(n_nodes):
            if t_left[i] != -1:
                g_left.append(t_left[i] + offset)
                g_right.append(t_right[i] + offset)
            else:
                g_left.append(-1)
                g_right.append(-1)
            g_feature.append(t_feature[i])
            g_threshold.append(t_threshold[i])
            g_value.append(probs[i])
        offset += n_nodes

    with open(filename, "w") as f:
        f.write("#pragma once\n#include <stdint.h>\n\n")
        f.write(f"// Random Forest: {n_trees} estimators\n")
        f.write(f"#define RF_N_FEATURES {scaler.n_features_in_}\n")
        f.write(f"static const float RF_SCALE_MEAN[] = {{ {', '.join(f'{x:.6f}' for x in scaler.mean_)} }};\n")
        f.write(f"static const float RF_SCALE_STD[]  = {{ {', '.join(f'{x:.6f}' for x in scaler.scale_)} }};\n\n")
        f.write(f"#define RF_NUM_TREES {n_trees}\n")
        f.write(f"static const int RF_TREE_ROOTS[] = {{ {', '.join(map(str, tree_roots))} }};\n")
        f.write(f"static const int RF_LEFT[] = {{ {', '.join(map(str, g_left))} }};\n")
        f.write(f"static const int RF_RIGHT[] = {{ {', '.join(map(str, g_right))} }};\n")
        f.write(f"static const int RF_FEATURE[] = {{ {', '.join(map(str, g_feature))} }};\n")
        f.write(f"static const float RF_THRESHOLD[] = {{ {', '.join(f'{x:.6f}' for x in g_threshold)} }};\n")
        f.write(f"static const float RF_VALUE[] = {{ {', '.join(f'{x:.6f}' for x in g_value)} }};\n")

# ================= MAIN =================
def main():
    print("1. Loading Data...")
    dfs = []
    for f in TRAIN_FILES:
        try:
            d = pd.read_excel(f)
            d['Label'] = pd.to_numeric(d.get('Label', 1), errors='coerce').fillna(1)
            dfs.append(d)
        except: pass
    if not dfs: return
    df_all = pd.concat(dfs, ignore_index=True)

    print("2. Training Hjorth Model (RF + SMOTE)...")
    ext = HjorthExtractor(df_all)
    X = ext.extract(df_all)
    y = df_all['Label']

    mask = ~X.isna().any(axis=1)
    X_clean = X[mask]
    y_clean = y[mask]

    # --- APPLY SMOTE ---
    sm = SMOTE(random_state=SEED)
    X_res, y_res = sm.fit_resample(X_clean, y_clean)

    scaler = StandardScaler()
    X_s = scaler.fit_transform(X_res)

    rf = RandomForestClassifier(n_estimators=10, max_depth=8, random_state=SEED, n_jobs=1)
    rf.fit(X_s, y_res)

    print("3. Exporting...")
    if os.path.exists("hjorth_settings.h"): os.remove("hjorth_settings.h")
    if os.path.exists("model_edge.h"): os.remove("model_edge.h")

    setting_lines = generate_hjorth_settings(ext)

    with open("hjorth_settings.h", "w") as f:
        f.write("#pragma once\n\n")
        for i, c in enumerate(ext.raw_cols): f.write(f"#define IDX_{c.upper().replace(' ','_')} {i}\n")
        for line in setting_lines: f.write(line + "\n")
        f.write(f"\n#define HJORTH_N_FEATURES {X.shape[1]}\n")

    export_rf_model(rf, scaler, "model_edge.h")
    print(f"Done. Features: {X.shape[1]}")

if __name__ == "__main__":
    main()

## SVM


In [None]:
import pandas as pd
import numpy as np
import warnings
import os
import random
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
from imblearn.over_sampling import SMOTE

warnings.filterwarnings("ignore")

# ================= CONFIGURATION =================
SEED = 42
np.random.seed(SEED)
random.seed(SEED)

TRAIN_FILES = [
    "/content/dataset/Train-set_1.xlsx",
    "/content/dataset/Train-set_2.xlsx"
]
WINDOW_SIZE = 10

# ================= HJORTH EXTRACTOR =================
class HjorthExtractor:
    def __init__(self, df):
        self.raw_cols = [c for c in df.select_dtypes(include=np.number).columns if 'Label' not in c]
        print(f"Extracting Hjorth parameters for: {self.raw_cols}")

    def extract(self, df):
        out = pd.DataFrame(index=df.index)
        for col in self.raw_cols:
            x = df[col]
            dx = x.diff()
            ddx = dx.diff()

            activity = x.rolling(WINDOW_SIZE).var()
            var_dx = dx.rolling(WINDOW_SIZE).var()
            var_ddx = ddx.rolling(WINDOW_SIZE).var()

            mobility = np.sqrt(var_dx.div(activity + 1e-9))
            mob_dx = np.sqrt(var_ddx.div(var_dx + 1e-9))
            complexity = mob_dx.div(mobility + 1e-9)

            out[f'Act_{col}'] = activity
            out[f'Mob_{col}'] = mobility
            out[f'Comp_{col}'] = complexity
        return out.fillna(0)

# ================= C++ GENERATORS =================
def generate_hjorth_settings(extractor):
    lines = []
    lines.append(f"#define NUM_RAW_INPUTS {len(extractor.raw_cols)}")
    lines.append(f"#define HJORTH_WINDOW_SIZE {WINDOW_SIZE}")
    return lines

def export_svm_model(svm, scaler, filename):
    with open(filename, "w") as f:
        f.write("#pragma once\n#include <stdint.h>\n\n")
        f.write(f"// Linear SVM Model\n")
        f.write(f"#define SVM_N_FEATURES {scaler.n_features_in_}\n")
        f.write(f"static const float SVM_SCALE_MEAN[] = {{ {', '.join(f'{x:.6f}' for x in scaler.mean_)} }};\n")
        f.write(f"static const float SVM_SCALE_STD[]  = {{ {', '.join(f'{x:.6f}' for x in scaler.scale_)} }};\n\n")
        coefs = svm.coef_[0]
        f.write(f"static const float SVM_COEF[] = {{ {', '.join(f'{x:.6f}' for x in coefs)} }};\n")
        bias = svm.intercept_[0]
        f.write(f"static const float SVM_BIAS = {bias:.6f};\n")
        # Assuming probability=True was not used for SVM in previous context or defaulting to linear decision
        # If probability=True is needed like AD-FE, we need Platt parameters.
        # But standard Linear SVM on MCUs usually outputs decision distance.
        # AD-FE SVM uses Platt scaling. Let's stick to standard linear decision unless SVC(probability=True) is explicit.
        # The prompt for Hjorth SVM uses SVC(kernel='linear', C=1.0).
        # We will keep it simple: decision > 0 -> 1.

# ================= MAIN =================
def main():
    print("1. Loading Data...")
    dfs = []
    for f in TRAIN_FILES:
        try:
            d = pd.read_excel(f)
            d['Label'] = pd.to_numeric(d.get('Label', 1), errors='coerce').fillna(1)
            dfs.append(d)
        except: pass
    if not dfs: return
    df_all = pd.concat(dfs, ignore_index=True)

    print("2. Training Hjorth Model (Linear SVM + SMOTE)...")
    ext = HjorthExtractor(df_all)
    X = ext.extract(df_all)
    y = df_all['Label']

    mask = ~X.isna().any(axis=1)
    X_clean = X[mask]
    y_clean = y[mask]

    # --- APPLY SMOTE ---
    sm = SMOTE(random_state=SEED)
    X_res, y_res = sm.fit_resample(X_clean, y_clean)

    scaler = StandardScaler()
    X_s = scaler.fit_transform(X_res)

    # Linear SVM
    svm = SVC(kernel='linear', C=1.0, random_state=SEED)
    svm.fit(X_s, y_res)

    print("3. Exporting...")
    if os.path.exists("hjorth_settings.h"): os.remove("hjorth_settings.h")
    if os.path.exists("model_edge.h"): os.remove("model_edge.h")

    setting_lines = generate_hjorth_settings(ext)

    with open("hjorth_settings.h", "w") as f:
        f.write("#pragma once\n\n")
        for i, c in enumerate(ext.raw_cols): f.write(f"#define IDX_{c.upper().replace(' ','_')} {i}\n")
        for line in setting_lines: f.write(line + "\n")
        f.write(f"\n#define HJORTH_N_FEATURES {X.shape[1]}\n")

    export_svm_model(svm, scaler, "model_edge.h")
    print(f"Done. Features: {X.shape[1]}")

if __name__ == "__main__":
    main()

# catch 22


In [None]:
!pip install pycatch22

## LR

In [None]:
import pandas as pd
import numpy as np
import warnings
import os
import random
try:
    import pycatch22 as catch22
except ImportError:
    raise SystemExit("pycatch22 not found. Install with: pip install pycatch22")

from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from imblearn.over_sampling import SMOTE

warnings.filterwarnings("ignore")

# ================= CONFIGURATION =================
SEED = 42
np.random.seed(SEED)
random.seed(SEED)

TRAIN_FILES = [
    "/content/dataset/Train-set_1.xlsx",
    "/content/dataset/Train-set_2.xlsx"
]
WINDOW_SIZE = 40

# The subset of features implemented in C++
CATCH_LITE_FEATURES = [
    "DN_HistogramMode_5",
    "DN_HistogramMode_10",
    "CO_f1ecac",
    "CO_FirstMin_ac",
    "CO_trev_1_num",
    "MD_hrv_classic_pnn40"
]

# ================= CATCH22 EXTRACTOR =================
class Catch22Extractor:
    def __init__(self, df):
        self.raw_cols = [c for c in df.select_dtypes(include=np.number).columns if 'Label' not in c]
        print(f"Extracting Catch22 (Lite) for: {self.raw_cols}")

    def extract(self, df):
        out = pd.DataFrame(index=df.index)

        for col in self.raw_cols:
            series = df[col].to_numpy()
            n_samples = len(series)

            # Initialize arrays
            feats = {f: np.zeros(n_samples) for f in CATCH_LITE_FEATURES}

            # Simple rolling window loop
            for i in range(WINDOW_SIZE, n_samples):
                window = series[i-WINDOW_SIZE : i]
                try:
                    res = catch22.catch22_all(window)
                    for f_name in CATCH_LITE_FEATURES:
                        val = res['values'][res['names'].index(f_name)]
                        feats[f_name][i] = val
                except: pass

            for f_name in CATCH_LITE_FEATURES:
                out[f'{col}_{f_name}'] = feats[f_name]

        return out.iloc[WINDOW_SIZE:].fillna(0)

# ================= C++ GENERATORS =================
def generate_catch22_settings(extractor):
    lines = []
    lines.append(f"#define NUM_RAW_INPUTS {len(extractor.raw_cols)}")
    lines.append(f"#define C22_WINDOW_SIZE {WINDOW_SIZE}")
    return lines

def export_lr_model(lr, scaler, prefix, filename):
    with open(filename, "a") as f:
        f.write(f"\n// ===== MODEL: {prefix} (Logistic Regression) =====\n")
        f.write(f"#define {prefix}_N_FEATURES {scaler.n_features_in_}\n")
        f.write(f"static const float {prefix}_SCALE_MEAN[] = {{ {', '.join(f'{x:.6f}' for x in scaler.mean_)} }};\n")
        f.write(f"static const float {prefix}_SCALE_STD[]  = {{ {', '.join(f'{x:.6f}' for x in scaler.scale_)} }};\n")
        coefs = lr.coef_[0]
        f.write(f"static const float {prefix}_COEF[] = {{ {', '.join(f'{x:.6f}' for x in coefs)} }};\n")
        bias = lr.intercept_[0]
        f.write(f"static const float {prefix}_BIAS = {bias:.6f};\n")

# ================= MAIN =================
def main():
    print("1. Loading Data...")
    dfs = []
    for f in TRAIN_FILES:
        try:
            d = pd.read_excel(f)
            d['Label'] = pd.to_numeric(d.get('Label', 1), errors='coerce').fillna(1)
            dfs.append(d)
        except: pass
    if not dfs: return
    df_all = pd.concat(dfs, ignore_index=True)

    print("2. Training Catch22-Lite Model (LR with SMOTE)...")
    ext = Catch22Extractor(df_all)
    X = ext.extract(df_all)
    y = df_all['Label'].iloc[WINDOW_SIZE:] # Align y

    mask = ~X.isna().any(axis=1)
    X_clean = X[mask]
    y_clean = y[mask]

    # --- ADDED SMOTE ---
    sm = SMOTE(random_state=SEED)
    X_res, y_res = sm.fit_resample(X_clean, y_clean)

    scaler = StandardScaler()
    X_s = scaler.fit_transform(X_res)

    lr = LogisticRegression(random_state=SEED, max_iter=1000)
    lr.fit(X_s, y_res)

    print("3. Exporting...")
    if os.path.exists("catch22_settings.h"): os.remove("catch22_settings.h")
    if os.path.exists("model_edge.h"): os.remove("model_edge.h")

    setting_lines = generate_catch22_settings(ext)

    with open("catch22_settings.h", "w") as f:
        f.write("#pragma once\n\n")
        for i, c in enumerate(ext.raw_cols): f.write(f"#define IDX_{c.upper().replace(' ','_')} {i}\n")
        for line in setting_lines: f.write(line + "\n")
        f.write(f"\n#define C22_N_FEATURES {X.shape[1]}\n")

    with open("model_edge.h", "w") as f:
        f.write("#pragma once\n#include <stdint.h>\n")
        export_lr_model(lr, scaler, "LR", "model_edge.h")

    print(f"Done. Features: {X.shape[1]}")

if __name__ == "__main__":
    main()

## RF

In [None]:
import pandas as pd
import numpy as np
import warnings
import os
import random
try:
    import pycatch22 as catch22
except ImportError:
    raise SystemExit("pycatch22 not found. Install with: pip install pycatch22")

from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from imblearn.over_sampling import SMOTE

warnings.filterwarnings("ignore")

# ================= CONFIGURATION =================
SEED = 42
np.random.seed(SEED)
random.seed(SEED)

TRAIN_FILES = [
    "/content/dataset/Train-set_1.xlsx",
    "/content/dataset/Train-set_2.xlsx"
]
WINDOW_SIZE = 40

CATCH_LITE_FEATURES = [
    "DN_HistogramMode_5",
    "DN_HistogramMode_10",
    "CO_f1ecac",
    "CO_FirstMin_ac",
    "CO_trev_1_num",
    "MD_hrv_classic_pnn40"
]

class Catch22Extractor:
    def __init__(self, df):
        self.raw_cols = [c for c in df.select_dtypes(include=np.number).columns if 'Label' not in c]
        print(f"Extracting Catch22 (Lite) for: {self.raw_cols}")

    def extract(self, df):
        out = pd.DataFrame(index=df.index)
        for col in self.raw_cols:
            series = df[col].to_numpy()
            n_samples = len(series)
            feats = {f: np.zeros(n_samples) for f in CATCH_LITE_FEATURES}

            for i in range(WINDOW_SIZE, n_samples):
                window = series[i-WINDOW_SIZE : i]
                try:
                    res = catch22.catch22_all(window)
                    for f_name in CATCH_LITE_FEATURES:
                        val = res['values'][res['names'].index(f_name)]
                        feats[f_name][i] = val
                except: pass

            for f_name in CATCH_LITE_FEATURES:
                out[f'{col}_{f_name}'] = feats[f_name]
        return out.iloc[WINDOW_SIZE:].fillna(0)

def generate_catch22_settings(extractor):
    lines = []
    lines.append(f"#define NUM_RAW_INPUTS {len(extractor.raw_cols)}")
    lines.append(f"#define C22_WINDOW_SIZE {WINDOW_SIZE}")
    return lines

def export_rf_model(rf, scaler, filename):
    n_trees = len(rf.estimators_)
    g_left, g_right, g_feature, g_threshold, g_value = [], [], [], [], []
    tree_roots = []
    offset = 0

    for estimator in rf.estimators_:
        tree = estimator.tree_
        n_nodes = tree.node_count
        tree_roots.append(offset)

        t_left = tree.children_left
        t_right = tree.children_right
        t_feature = tree.feature
        t_threshold = tree.threshold
        t_value = tree.value[:, 0, :]
        with np.errstate(divide='ignore', invalid='ignore'):
            probs = t_value[:, 1] / t_value.sum(axis=1)
            probs = np.nan_to_num(probs)

        for i in range(n_nodes):
            if t_left[i] != -1:
                g_left.append(t_left[i] + offset)
                g_right.append(t_right[i] + offset)
            else:
                g_left.append(-1)
                g_right.append(-1)
            g_feature.append(t_feature[i])
            g_threshold.append(t_threshold[i])
            g_value.append(probs[i])
        offset += n_nodes

    with open(filename, "w") as f:
        f.write("#pragma once\n#include <stdint.h>\n\n")
        f.write(f"// Random Forest: {n_trees} estimators\n")
        f.write(f"#define RF_N_FEATURES {scaler.n_features_in_}\n")
        f.write(f"static const float RF_SCALE_MEAN[] = {{ {', '.join(f'{x:.6f}' for x in scaler.mean_)} }};\n")
        f.write(f"static const float RF_SCALE_STD[]  = {{ {', '.join(f'{x:.6f}' for x in scaler.scale_)} }};\n\n")
        f.write(f"#define RF_NUM_TREES {n_trees}\n")
        f.write(f"static const int RF_TREE_ROOTS[] = {{ {', '.join(map(str, tree_roots))} }};\n")
        f.write(f"static const int RF_LEFT[] = {{ {', '.join(map(str, g_left))} }};\n")
        f.write(f"static const int RF_RIGHT[] = {{ {', '.join(map(str, g_right))} }};\n")
        f.write(f"static const int RF_FEATURE[] = {{ {', '.join(map(str, g_feature))} }};\n")
        f.write(f"static const float RF_THRESHOLD[] = {{ {', '.join(f'{x:.6f}' for x in g_threshold)} }};\n")
        f.write(f"static const float RF_VALUE[] = {{ {', '.join(f'{x:.6f}' for x in g_value)} }};\n")

def main():
    print("1. Loading Data...")
    dfs = []
    for f in TRAIN_FILES:
        try:
            d = pd.read_excel(f)
            d['Label'] = pd.to_numeric(d.get('Label', 1), errors='coerce').fillna(1)
            dfs.append(d)
        except: pass
    if not dfs: return
    df_all = pd.concat(dfs, ignore_index=True)

    print("2. Training Catch22-Lite Model (RF with SMOTE)...")
    ext = Catch22Extractor(df_all)
    X = ext.extract(df_all)
    y = df_all['Label'].iloc[WINDOW_SIZE:]

    mask = ~X.isna().any(axis=1)
    X_clean = X[mask]
    y_clean = y[mask]

    # --- ADDED SMOTE ---
    sm = SMOTE(random_state=SEED)
    X_res, y_res = sm.fit_resample(X_clean, y_clean)

    scaler = StandardScaler()
    X_s = scaler.fit_transform(X_res)

    rf = RandomForestClassifier(n_estimators=10, max_depth=8, random_state=SEED, n_jobs=1)
    rf.fit(X_s, y_res)

    print("3. Exporting...")
    if os.path.exists("catch22_settings.h"): os.remove("catch22_settings.h")
    if os.path.exists("model_edge.h"): os.remove("model_edge.h")

    setting_lines = generate_catch22_settings(ext)

    with open("catch22_settings.h", "w") as f:
        f.write("#pragma once\n\n")
        for i, c in enumerate(ext.raw_cols): f.write(f"#define IDX_{c.upper().replace(' ','_')} {i}\n")
        for line in setting_lines: f.write(line + "\n")
        f.write(f"\n#define C22_N_FEATURES {X.shape[1]}\n")

    export_rf_model(rf, scaler, "model_edge.h")
    print(f"Done. Features: {X.shape[1]}")

if __name__ == "__main__":
    main()

## SVM

In [3]:
import pandas as pd
import numpy as np
import warnings
import os
import random
try:
    import pycatch22 as catch22
except ImportError:
    raise SystemExit("pycatch22 not found. Install with: pip install pycatch22")

from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
from imblearn.over_sampling import SMOTE

warnings.filterwarnings("ignore")

# ================= CONFIGURATION =================
SEED = 42
np.random.seed(SEED)
random.seed(SEED)

TRAIN_FILES = [
    "./dataset/Train-set_1.xlsx",
    "./dataset/Train-set_2.xlsx"
]
WINDOW_SIZE = 40

CATCH_LITE_FEATURES = [
    "DN_HistogramMode_5",
    "DN_HistogramMode_10",
    "CO_f1ecac",
    "CO_FirstMin_ac",
    "CO_trev_1_num",
    "MD_hrv_classic_pnn40"
]

class Catch22Extractor:
    def __init__(self, df):
        self.raw_cols = [c for c in df.select_dtypes(include=np.number).columns if 'Label' not in c]
        print(f"Extracting Catch22 (Lite) for: {self.raw_cols}")

    def extract(self, df):
        out = pd.DataFrame(index=df.index)
        for col in self.raw_cols:
            series = df[col].to_numpy()
            n_samples = len(series)
            feats = {f: np.zeros(n_samples) for f in CATCH_LITE_FEATURES}

            for i in range(WINDOW_SIZE, n_samples):
                window = series[i-WINDOW_SIZE : i]
                try:
                    res = catch22.catch22_all(window)
                    for f_name in CATCH_LITE_FEATURES:
                        val = res['values'][res['names'].index(f_name)]
                        feats[f_name][i] = val
                except: pass

            for f_name in CATCH_LITE_FEATURES:
                out[f'{col}_{f_name}'] = feats[f_name]
        return out.iloc[WINDOW_SIZE:].fillna(0)

def generate_catch22_settings(extractor):
    lines = []
    lines.append(f"#define NUM_RAW_INPUTS {len(extractor.raw_cols)}")
    lines.append(f"#define C22_WINDOW_SIZE {WINDOW_SIZE}")
    return lines

def export_svm_model(svm, scaler, filename):
    with open(filename, "w") as f:
        f.write("#pragma once\n#include <stdint.h>\n\n")
        f.write(f"// Linear SVM Model\n")
        f.write(f"#define SVM_N_FEATURES {scaler.n_features_in_}\n")
        f.write(f"static const float SVM_SCALE_MEAN[] = {{ {', '.join(f'{x:.6f}' for x in scaler.mean_)} }};\n")
        f.write(f"static const float SVM_SCALE_STD[]  = {{ {', '.join(f'{x:.6f}' for x in scaler.scale_)} }};\n\n")
        coefs = svm.coef_[0]
        f.write(f"static const float SVM_COEF[] = {{ {', '.join(f'{x:.6f}' for x in coefs)} }};\n")
        bias = svm.intercept_[0]
        f.write(f"static const float SVM_BIAS = {bias:.6f};\n")

def main():
    print("1. Loading Data...")
    dfs = []
    for f in TRAIN_FILES:
        try:
            d = pd.read_excel(f)
            d['Label'] = pd.to_numeric(d.get('Label', 1), errors='coerce').fillna(1)
            dfs.append(d)
        except: pass
    if not dfs: return
    df_all = pd.concat(dfs, ignore_index=True)

    print("2. Training Catch22-Lite Model (SVM with SMOTE)...")
    ext = Catch22Extractor(df_all)
    X = ext.extract(df_all)
    y = df_all['Label'].iloc[WINDOW_SIZE:]

    mask = ~X.isna().any(axis=1)
    X_clean = X[mask]
    y_clean = y[mask]

    # --- ADDED SMOTE ---
    sm = SMOTE(random_state=SEED)
    X_res, y_res = sm.fit_resample(X_clean, y_clean)

    scaler = StandardScaler()
    X_s = scaler.fit_transform(X_res)

    svm = SVC(kernel='linear', C=1.0, random_state=SEED)
    svm.fit(X_s, y_res)

    print("3. Exporting...")
    if os.path.exists("catch22_settings.h"): os.remove("catch22_settings.h")
    if os.path.exists("model_edge.h"): os.remove("model_edge.h")

    setting_lines = generate_catch22_settings(ext)

    with open("catch22_settings.h", "w") as f:
        f.write("#pragma once\n\n")
        for i, c in enumerate(ext.raw_cols): f.write(f"#define IDX_{c.upper().replace(' ','_')} {i}\n")
        for line in setting_lines: f.write(line + "\n")
        f.write(f"\n#define C22_N_FEATURES {X.shape[1]}\n")

    export_svm_model(svm, scaler, "model_edge.h")
    print(f"Done. Features: {X.shape[1]}")

if __name__ == "__main__":
    main()

1. Loading Data...
2. Training Catch22-Lite Model (SVM with SMOTE)...
Extracting Catch22 (Lite) for: ['Temperature', 'Humidity', 'Temperature_WeatherStation', 'Humidity_WeatherStation']
3. Exporting...
Done. Features: 24
