# 1. Imports

In [None]:

# notebook init
import os
from notebook_init import init_notebook_path

# Get project root path
project_root = init_notebook_path()

# Set environment variable for Ray workers to find the app module
os.environ["PYTHONPATH"] = f"{project_root}:{os.environ.get('PYTHONPATH', '')}"




In [None]:

# Imports
from typing import Any, Sequence, List, Tuple, Dict
import numpy as np
from numpy.typing import NDArray
import pandas as pd
from app.workflow.utils import read_files_recursive

from sklearn.metrics import accuracy_score, confusion_matrix

# 2. Data loading

In [None]:
labeled_data = read_files_recursive(path = os.path.join(project_root, "data", "turns"))


# 3. Data preview

In [None]:
labeled_data[0].head()

# 4. Behavior filling & cleanup

In [None]:
def fill_behavior(df_):
    df = df_.copy()
    s = df['Behavior'].where(df['Behavior'].isin(['left', 'right']))
    df['Behavior'] = s.ffill()
    return df

In [None]:
# Apply fill_behavior and drop Status column
labeled_data_filled = [
    fill_behavior(df)
    .drop(columns=['Status'], errors='ignore')
    for df in labeled_data
]

In [None]:
labeled_data_filled[0].head()

# 5. Feature engineering

In [None]:
# names of vector like columns
VECTOR_COLS: List[str] = [
    "Accelerometer","Gyroscope","Gravity",
    "TotalAcceleration","Orientation","Magnetometer",
    "GyroscopeUncalibrated","MagnetometerUncalibrated"
]
DIM = {"Orientation": 7}


def to_vec(x: str) -> NDArray[np.float64]:
        return np.asarray(x, dtype=float)


def expand_vec_col(df: pd.DataFrame, col: str) -> pd.DataFrame:
    """Split vector column into x y z and norm."""
    dim = DIM.get(col, 3)
    vecs = df[col].apply(to_vec)
    arr = np.vstack(vecs.apply(lambda v: np.pad(v[:dim], (0, max(0, dim - len(v))), constant_values=np.nan)))
    out = pd.DataFrame(arr, columns=[f"{col}_{i}" for i in range(dim)], index=df.index)
    out[f"{col}_norm"] = np.linalg.norm(arr, axis=1)
    return out

def build_feature_frame(df: pd.DataFrame) -> pd.DataFrame:

    work = df.copy()
    work = work[work["Behavior"].isin(["left", "right"])]

    parts= []
    for c in VECTOR_COLS:
        if c in work.columns:
            parts.append(expand_vec_col(work, c))
    if not parts:
        raise ValueError("No known sensor columns found")

    X = pd.concat(parts, axis=1)
    X["Behavior"] = work["Behavior"].astype("category")
    return X

# list of feature frames aligned with labeled_data
data_with_features = [build_feature_frame(df) for df in labeled_data_filled]

In [None]:
data_with_features[0].head()

# 6. Train/Test split

In [None]:
def split_frames(
    feature_frames: Sequence[pd.DataFrame],
    test_size: float = 0.2,
    random_state: int = 42,
) -> Tuple[List[pd.DataFrame], List[pd.DataFrame]]:
    rng = np.random.default_rng(random_state)
    idx = np.arange(len(feature_frames))
    rng.shuffle(idx)
    cut = int(round(len(idx) * (1 - test_size)))
    train_idx = idx[:cut]
    test_idx = idx[cut:]
    train_frames = [feature_frames[i] for i in train_idx]
    test_frames = [feature_frames[i] for i in test_idx]
    return train_frames, test_frames



In [None]:
# Split
train_frames, test_frames = split_frames(data_with_features, test_size=0.2, random_state=42)

# 7. Modeling & Evaluation

In [None]:
class FakePredictor:
    def __init__(self, random_state: int = 42):
        self.rng = np.random.default_rng(random_state)

    def fit(self, X: List[pd.DataFrame], y: List[pd.DataFrame]) -> "FakePredictor":
        return self 

    def predict(self, n: int) -> np.ndarray:
        return self.rng.integers(0, 2, size=n)

def make_dataset_single(df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.Series]:
    frame = df.copy()
    X = frame.drop(columns=["Behavior"])
    y = frame["Behavior"].map({"left": 0, "right": 1}).astype("Int64")
    mask = y.isin([0, 1])
    X = X.loc[mask]
    y = y.loc[mask].astype(int)
    return X, y

def train_fake(
    train_frames: Sequence[pd.DataFrame],
    random_state: int = 42,
) -> FakePredictor:
    # Split into feature and label lists per frame
    X_list, y_list = zip(*(make_dataset_single(frame) for frame in train_frames))
    # Fit on the lists of DataFrames/Series
    clf = FakePredictor(random_state=random_state).fit(list(X_list), list(y_list))
    return clf

In [None]:
def evaluate_fake(
    clf: FakePredictor,
    test_frames: Sequence[pd.DataFrame],
) -> Dict[str, Any]:
    total_cm = np.zeros((2, 2), dtype=int)
    total_correct = 0
    total_n = 0
    per_frame = []

    for i, f in enumerate(test_frames):
        X, y = make_dataset_single(f)
        n = len(y)
        if n == 0:
            per_frame.append({"frame_index": i, "skipped": True, "reason": "no labels"})
            continue
        y_pred = clf.predict(n)
        acc = float(accuracy_score(y, y_pred))
        cm = confusion_matrix(y, y_pred, labels=[0, 1])
        total_cm += cm
        total_correct += int((y_pred == y).sum())
        total_n += n
        per_frame.append({"frame_index": i, "n": n, "accuracy": acc})

    macro_acc = float(np.mean([x["accuracy"] for x in per_frame if not x.get("skipped")])) if any(not x.get("skipped") for x in per_frame) else float("nan")
    weighted_acc = float(total_correct / total_n) if total_n > 0 else float("nan")

    return {
        "macro_accuracy": macro_acc,
        "weighted_accuracy": weighted_acc,
        "confusion_matrix_sum": total_cm,
        "tested_samples": total_n,
        "per_frame": per_frame,
    }

In [None]:
clf = train_fake(train_frames, random_state=42)

In [None]:
summary = evaluate_fake(clf, test_frames)
print("Macro accuracy:", summary["macro_accuracy"])
print("Weighted accuracy:", summary["weighted_accuracy"])
print("Summed confusion matrix [left=0 right=1]:")
print(summary["confusion_matrix_sum"])