In [1]:
import glob
import pandas as pd
import numpy as np
from pathlib import Path

files = sorted(glob.glob("../data/telemetry_run*.csv"))
if not files: files = ["../data/telemetry.csv"]   # fallback

dfs = [pd.read_csv(p) for p in files]
df = pd.concat(dfs, ignore_index=True)

assert set(df.columns) == {
    "timestamp","x","y","vx","vy","action","ping_ms","cheat_flag"
}


In [2]:
df = df.copy()

# speed
df["speed"] = np.sqrt(df["vx"]**2 + df["vy"]**2)

# accel from differences of velocity (Δvx, Δvy)
df["ax"] = df["vx"].diff().fillna(0.0)
df["ay"] = df["vy"].diff().fillna(0.0)
df.loc[df.index[0], ["ax","ay"]] = 0.0  # first row has no prev
df["accel_mag"] = np.sqrt(df["ax"]**2 + df["ay"]**2)

# trailing rolling window (size=5) of speed — same as agent
W = 5
df["speed_roll_mean"] = df["speed"].rolling(W, min_periods=1).mean()
df["speed_roll_std"]  = df["speed"].rolling(W, min_periods=1).std().fillna(0.0)

FEATURES = ["speed","accel_mag","speed_roll_mean","speed_roll_std","ping_ms","action"]
X = df[FEATURES].values
y = df["cheat_flag"].values.astype(int)


In [3]:
# Simple blocked split: first 80% train, last 20% test
split = int(len(df)*0.80)
X_train, X_test = X[:split], X[split:]
y_train, y_test = y[:split], y[split:]

# Safety: ensure both classes in train
import numpy as np
print("Train counts:", np.bincount(y_train))
print("Test  counts:", np.bincount(y_test))
assert len(np.unique(y_train)) == 2, "Adjust split or generate more data"


Train counts: [218  22]
Test  counts: [49 11]


In [4]:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (confusion_matrix, precision_recall_fscore_support,
                             roc_auc_score, average_precision_score)

logreg = Pipeline([
    ("scaler", StandardScaler()),
    ("clf", LogisticRegression(max_iter=1000, class_weight="balanced", random_state=42))
])
logreg.fit(X_train, y_train)

proba_test = logreg.predict_proba(X_test)[:,1]
pred_test  = (proba_test >= 0.5).astype(int)

cm = confusion_matrix(y_test, pred_test, labels=[0,1])
prec, rec, f1, _ = precision_recall_fscore_support(y_test, pred_test, average="binary")
roc = roc_auc_score(y_test, proba_test)
pr  = average_precision_score(y_test, proba_test)

print("CM:\n", cm)
print(f"precision={prec:.3f} recall={rec:.3f} f1={f1:.3f} ROC-AUC={roc:.3f} PR-AUC={pr:.3f}")


CM:
 [[49  0]
 [ 0 11]]
precision=1.000 recall=1.000 f1=1.000 ROC-AUC=1.000 PR-AUC=1.000


In [5]:
import numpy as np
from sklearn.metrics import precision_recall_fscore_support

best = None
for thr in np.linspace(0.2, 0.9, 36):
    pred = (proba_test >= thr).astype(int)
    p, r, f, _ = precision_recall_fscore_support(y_test, pred, average='binary')
    cand = (f, p, r, thr)
    if (best is None) or (cand > best):
        best = cand
print("Best by F1 -> F1=%.3f, P=%.3f, R=%.3f at thr=%.2f" % best)


Best by F1 -> F1=1.000, P=1.000, R=1.000 at thr=0.76


In [6]:
import json, os, time

scaler = logreg.named_steps["scaler"]
clf    = logreg.named_steps["clf"]

export = {
  "schema_version": 1,
  "type": "logistic_regression",
  "features": FEATURES,                       # order matters!
  "scaler_mean": scaler.mean_.tolist(),
  "scaler_scale": scaler.scale_.tolist(),
  "coef": clf.coef_[0].tolist(),
  "intercept": float(clf.intercept_[0]),
  "decision_threshold": 0.70,                 # <= put your chosen threshold here
  "notes": "trained on concatenated runs; trailing window=5; accel=diff(v)"
}

os.makedirs("../models", exist_ok=True)
ts = time.strftime("%Y%m%d_%H%M%S")
with open(f"../models/logreg_export_{ts}.json","w") as f:
    json.dump(export, f, indent=2)
print("Wrote", f"../models/logreg_export_{ts}.json")


Wrote ../models/logreg_export_20251014_114917.json
