In [None]:
# emg_train_with_augmentation.py
import os
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import tensorflow as tf
from tensorflow.keras import layers, models
import matplotlib.pyplot as plt
import random

# -----------------------
# USER PARAMETERS
# -----------------------
csv_paths = ["data/session_rest.csv", "data/session_walk.csv", "data/session_lift.csv"]
emg_col = "emg"
label_col = "label"
sampling_rate = 1000   # Hz
window_ms = 50         # smaller so short files give windows (50 ms => 50 samples @1000Hz)
step_ms = 25           # overlap step (50% overlap)
target_windows_per_class = 200   # how many windows per class after augmentation (set lower if you want)
min_real_windows_required = 1    # must have at least 1 real window per class to augment from
batch_size = 64
epochs = 30
random_seed = 42

np.random.seed(random_seed)
random.seed(random_seed)
tf.random.set_seed(random_seed)

# -----------------------
# helpers: label inference, loading, windowing
# -----------------------
def infer_label_from_filename(path):
    fname = os.path.basename(path).lower()
    if "rest" in fname: return "rest"
    if "walk" in fname: return "walk"
    if "lift" in fname or "flex" in fname: return "lift"
    return os.path.splitext(fname)[0]

def load_single_file(path):
    df = pd.read_csv(path)
    if label_col not in df.columns or df[label_col].isna().all():
        df[label_col] = infer_label_from_filename(path)
    return df

def build_windows_from_array(arr_emg, win_samples, step_samples):
    n = len(arr_emg)
    windows = []
    for start in range(0, n - win_samples + 1, step_samples):
        windows.append(arr_emg[start:start+win_samples].astype(np.float32))
    return np.array(windows)

# -----------------------
# Augmentation functions (simple, realistic)
# -----------------------
def augment_window(window, max_jitter=5, noise_std=0.02, scale_range=(0.9, 1.1)):
    """Return a single augmented version of window."""
    w = window.copy()
    L = len(w)

    # 1) random jitter: shift left/right by up to max_jitter samples (pad with edge values)
    shift = np.random.randint(-max_jitter, max_jitter+1)
    if shift > 0:
        w = np.concatenate([w[shift:], np.full(shift, w[-1], dtype=w.dtype)])
    elif shift < 0:
        s = -shift
        w = np.concatenate([np.full(s, w[0], dtype=w.dtype), w[:-s]])

    # 2) amplitude scaling
    scale = np.random.uniform(scale_range[0], scale_range[1])
    w = w * scale

    # 3) additive gaussian noise (relative to signal range)
    sig_range = np.max(w) - np.min(w)
    noise = np.random.normal(0.0, noise_std * (sig_range + 1e-8), size=w.shape)
    w = w + noise

    # 4) small DC offset
    offset = np.random.normal(0.0, 0.01 * (sig_range + 1e-8))
    w = w + offset

    return w

def create_augmented_windows(real_windows, target_count):
    """Given array(real_windows, win_samples), generate target_count windows."""
    if len(real_windows) == 0:
        return np.zeros((0,0), dtype=np.float32)
    out = []
    n_real = len(real_windows)
    # always include real windows first (shuffled)
    indices = list(range(n_real))
    random.shuffle(indices)
    for i in indices:
        out.append(real_windows[i])
    # augment until reaching target_count
    while len(out) < target_count:
        # pick a random real window as base
        base = real_windows[np.random.randint(0, n_real)]
        new_w = augment_window(base)
        out.append(new_w)
    return np.array(out, dtype=np.float32)

# -----------------------
# Build windows per-file, then augment per-class
# -----------------------
win_samples = int(sampling_rate * window_ms / 1000)
step_samples = int(sampling_rate * step_ms / 1000)
if win_samples <= 0:
    raise ValueError("window_ms too small for given sampling_rate")

print("Window params:", win_samples, "samples per window ; step:", step_samples)

per_class_windows = {}   # class -> list of windows

for p in csv_paths:
    if not os.path.exists(p):
        raise FileNotFoundError(f"Missing file: {p}")
    df = load_single_file(p)
    label = str(df[label_col].iloc[0])
    emg = df[emg_col].values
    windows = build_windows_from_array(emg, win_samples, step_samples)
    print(f"{os.path.basename(p)} -> rows: {len(emg)} ; windows: {len(windows)} ; label: {label}")
    per_class_windows.setdefault(label, []).extend(list(windows))

# ensure all three classes present (inform user if missing)
print("\nClasses found and real-window counts:")
for cls, arr in per_class_windows.items():
    print(" ", cls, ":", len(arr))

required_classes = ["rest", "walk", "lift"]
for rc in required_classes:
    if rc not in per_class_windows:
        print(f"[ERROR] Class '{rc}' missing entirely. You need at least some real recordings for this class.")
        # do NOT attempt to fabricate an entire class from nothing
        # exit early
        raise ValueError(f"Missing required class: {rc}")

# check minimum real windows
for cls, arr in per_class_windows.items():
    if len(arr) < min_real_windows_required:
        raise ValueError(f"Class '{cls}' has {len(arr)} real windows (< {min_real_windows_required}). Record at least {min_real_windows_required} windows before augmenting.")

# Augment to target count per class
X_aug = []
y_aug = []
for cls, arr in per_class_windows.items():
    arr_np = np.array(arr)  # shape (n_real, win_samples)
    print(f"[INFO] Augmenting class '{cls}': real {len(arr_np)} -> target {target_windows_per_class}")
    augmented = create_augmented_windows(arr_np, target_windows_per_class)
    X_aug.append(augmented)
    y_aug.extend([cls] * len(augmented))

X = np.vstack(X_aug)
y = np.array(y_aug)
print("Total windows after augmentation:", X.shape)

# -----------------------
# Prepare for training
# -----------------------
le = LabelEncoder()
y_enc = le.fit_transform(y)
class_names = le.classes_
num_classes = len(class_names)
print("Class names:", class_names)

# reshape for Keras
X = X[..., np.newaxis]

# Standardize (fit on X)
mean = X.mean()
std = X.std()
X = (X - mean) / (std + 1e-8)

# Train/val/test split (stratify)
X_train, X_temp, y_train, y_temp = train_test_split(X, y_enc, test_size=0.25, random_state=random_seed, stratify=y_enc)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=random_seed, stratify=y_temp)

print("Shapes -> train:", X_train.shape, "val:", X_val.shape, "test:", X_test.shape)

# -----------------------
# Build model (same 1D-CNN)
# -----------------------
input_shape = X_train.shape[1:]
def build_model(input_shape, num_classes):
    model = models.Sequential([
        layers.Input(shape=input_shape),
        layers.Conv1D(32, 5, activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling1D(2),
        layers.Conv1D(64, 5, activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling1D(2),
        layers.Conv1D(128, 3, activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.GlobalAveragePooling1D(),
        layers.Dense(64, activation='relu'),
        layers.Dropout(0.3),
        layers.Dense(num_classes, activation='softmax')
    ])
    return model

model = build_model(input_shape, num_classes)
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
model.summary()

callbacks = [
    tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=6, restore_best_weights=True),
    tf.keras.callbacks.ModelCheckpoint("best_emg_model.h5", save_best_only=True)
]

history = model.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=epochs, batch_size=batch_size, callbacks=callbacks, verbose=2)

# -----------------------
# Evaluate & save
# -----------------------
test_loss, test_acc = model.evaluate(X_test, y_test, verbose=0)
print("Test acc:", test_acc)

y_pred = np.argmax(model.predict(X_test), axis=1)
print("Classification report:")
print(classification_report(y_test, y_pred, target_names=class_names))
print("Confusion matrix:\n", confusion_matrix(y_test, y_pred))

model.save("final_emg_model.h5")
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()
with open("emg_model.tflite", "wb") as f:
    f.write(tflite_model)
print("Saved final_emg_model.h5 and emg_model.tflite")


Window params: 50 samples per window ; step: 25
session_rest.csv -> rows: 101 ; windows: 3 ; label: rest
session_walk.csv -> rows: 52 ; windows: 1 ; label: walk
session_lift.csv -> rows: 50 ; windows: 1 ; label: lift

Classes found and real-window counts:
  rest : 3
  walk : 1
  lift : 1
[INFO] Augmenting class 'rest': real 3 -> target 200
[INFO] Augmenting class 'walk': real 1 -> target 200
[INFO] Augmenting class 'lift': real 1 -> target 200
Total windows after augmentation: (600, 50)
Class names: ['lift' 'rest' 'walk']
Shapes -> train: (450, 50, 1) val: (75, 50, 1) test: (75, 50, 1)


Epoch 1/30




8/8 - 3s - 436ms/step - accuracy: 0.9022 - loss: 0.3135 - val_accuracy: 0.6667 - val_loss: 0.9675
Epoch 2/30




8/8 - 0s - 33ms/step - accuracy: 1.0000 - loss: 0.0223 - val_accuracy: 0.6667 - val_loss: 0.8763
Epoch 3/30




8/8 - 0s - 35ms/step - accuracy: 1.0000 - loss: 0.0102 - val_accuracy: 0.6667 - val_loss: 0.8072
Epoch 4/30




8/8 - 0s - 33ms/step - accuracy: 1.0000 - loss: 0.0064 - val_accuracy: 0.6800 - val_loss: 0.7509
Epoch 5/30




8/8 - 0s - 32ms/step - accuracy: 1.0000 - loss: 0.0042 - val_accuracy: 0.8133 - val_loss: 0.7013
Epoch 6/30




8/8 - 0s - 50ms/step - accuracy: 1.0000 - loss: 0.0029 - val_accuracy: 0.8933 - val_loss: 0.6557
Epoch 7/30




8/8 - 0s - 51ms/step - accuracy: 1.0000 - loss: 0.0029 - val_accuracy: 0.9200 - val_loss: 0.6155
Epoch 8/30




8/8 - 0s - 50ms/step - accuracy: 1.0000 - loss: 0.0018 - val_accuracy: 0.9333 - val_loss: 0.5803
Epoch 9/30




8/8 - 0s - 51ms/step - accuracy: 1.0000 - loss: 0.0019 - val_accuracy: 0.9467 - val_loss: 0.5471
Epoch 10/30




8/8 - 0s - 51ms/step - accuracy: 1.0000 - loss: 0.0018 - val_accuracy: 0.9467 - val_loss: 0.5138
Epoch 11/30




8/8 - 0s - 60ms/step - accuracy: 1.0000 - loss: 0.0015 - val_accuracy: 0.9600 - val_loss: 0.4824
Epoch 12/30




8/8 - 0s - 34ms/step - accuracy: 1.0000 - loss: 0.0013 - val_accuracy: 0.9600 - val_loss: 0.4515
Epoch 13/30




8/8 - 0s - 32ms/step - accuracy: 1.0000 - loss: 0.0012 - val_accuracy: 0.9600 - val_loss: 0.4206
Epoch 14/30




8/8 - 0s - 31ms/step - accuracy: 1.0000 - loss: 0.0010 - val_accuracy: 0.9600 - val_loss: 0.3901
Epoch 15/30




8/8 - 0s - 33ms/step - accuracy: 1.0000 - loss: 7.5981e-04 - val_accuracy: 0.9600 - val_loss: 0.3593
Epoch 16/30




8/8 - 0s - 33ms/step - accuracy: 1.0000 - loss: 0.0011 - val_accuracy: 0.9600 - val_loss: 0.3298
Epoch 17/30




8/8 - 0s - 32ms/step - accuracy: 1.0000 - loss: 0.0012 - val_accuracy: 0.9600 - val_loss: 0.3012
Epoch 18/30




8/8 - 0s - 33ms/step - accuracy: 1.0000 - loss: 8.7278e-04 - val_accuracy: 0.9600 - val_loss: 0.2741
Epoch 19/30




8/8 - 0s - 34ms/step - accuracy: 1.0000 - loss: 8.5246e-04 - val_accuracy: 0.9600 - val_loss: 0.2486
Epoch 20/30




8/8 - 0s - 34ms/step - accuracy: 1.0000 - loss: 0.0010 - val_accuracy: 0.9867 - val_loss: 0.2237
Epoch 21/30




8/8 - 0s - 32ms/step - accuracy: 1.0000 - loss: 8.2477e-04 - val_accuracy: 0.9867 - val_loss: 0.2001
Epoch 22/30




8/8 - 0s - 34ms/step - accuracy: 1.0000 - loss: 5.1289e-04 - val_accuracy: 0.9867 - val_loss: 0.1772
Epoch 23/30




8/8 - 0s - 34ms/step - accuracy: 1.0000 - loss: 8.3586e-04 - val_accuracy: 0.9867 - val_loss: 0.1562
Epoch 24/30




8/8 - 0s - 33ms/step - accuracy: 1.0000 - loss: 8.2539e-04 - val_accuracy: 0.9867 - val_loss: 0.1390
Epoch 25/30




8/8 - 0s - 33ms/step - accuracy: 1.0000 - loss: 6.0533e-04 - val_accuracy: 0.9867 - val_loss: 0.1215
Epoch 26/30




8/8 - 0s - 33ms/step - accuracy: 1.0000 - loss: 7.0312e-04 - val_accuracy: 0.9867 - val_loss: 0.1046
Epoch 27/30




8/8 - 0s - 34ms/step - accuracy: 1.0000 - loss: 4.9961e-04 - val_accuracy: 0.9867 - val_loss: 0.0900
Epoch 28/30




8/8 - 0s - 36ms/step - accuracy: 1.0000 - loss: 3.8618e-04 - val_accuracy: 0.9867 - val_loss: 0.0778
Epoch 29/30




8/8 - 0s - 33ms/step - accuracy: 1.0000 - loss: 3.5496e-04 - val_accuracy: 0.9867 - val_loss: 0.0667
Epoch 30/30




8/8 - 0s - 33ms/step - accuracy: 1.0000 - loss: 3.7115e-04 - val_accuracy: 0.9867 - val_loss: 0.0574
Test acc: 0.9866666793823242
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 74ms/step




Classification report:
              precision    recall  f1-score   support

        lift       1.00      0.96      0.98        25
        rest       0.96      1.00      0.98        25
        walk       1.00      1.00      1.00        25

    accuracy                           0.99        75
   macro avg       0.99      0.99      0.99        75
weighted avg       0.99      0.99      0.99        75

Confusion matrix:
 [[24  1  0]
 [ 0 25  0]
 [ 0  0 25]]
Saved artifact at '/tmp/tmpb_zqdqea'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): TensorSpec(shape=(None, 50, 1), dtype=tf.float32, name='keras_tensor_13')
Output Type:
  TensorSpec(shape=(None, 3), dtype=tf.float32, name=None)
Captures:
  135768143558224: TensorSpec(shape=(), dtype=tf.resource, name=None)
  135768637998096: TensorSpec(shape=(), dtype=tf.resource, name=None)
  135768637998480: TensorSpec(shape=(), dtype=tf.resource, name=None)
  135768637999440: TensorSpec(shape=(), dtype=tf.

In [None]:
# convert_tflite_to_c_header.py
from tensorflow.lite.python.util import convert_bytes_to_c_source
# If you saved emg_model.tflite earlier:
with open("emg_model.tflite", "rb") as f:
    tflite_bytes = f.read()

# convert -> returns (c_src, header_src)
c_src, h_src = convert_bytes_to_c_source(tflite_bytes, "my_model")

# Rename to TF-Micro naming
h_src = h_src.replace('unsigned char my_model[]', 'unsigned char g_my_model[]')
h_src = h_src.replace('unsigned int my_model_len', 'unsigned int g_my_model_len')

# Write header
with open("emg_model.h", "w") as fh:
    fh.write(h_src)

print("Wrote emg_model.h with g_my_model[], g_my_model_len")


Wrote emg_model.h with g_my_model[], g_my_model_len


In [None]:
# tflite_evaluate_real_windows.py
import os
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, balanced_accuracy_score
from sklearn.preprocessing import LabelEncoder

# -----------------------
# USER PARAMETERS (change if needed)
# -----------------------
csv_paths = ["data/session_rest.csv", "data/session_walk.csv", "data/session_lift.csv"]
emg_col = "emg"
label_col = "label"   # if missing in CSV, script will infer label from filename
sampling_rate = 1000   # Hz used to compute samples from window_ms
window_ms = 50         # must match window size you used in training (50 ms -> 50 samples @1000Hz)
step_ms = 25
tflite_path = "emg_model.tflite"
mean_path = "mean.npy"
std_path = "std.npy"
classes_path = "classes.npy"
save_predictions_csv = "tflite_predictions.csv"

# -----------------------
# Helpers
# -----------------------
def infer_label_from_filename(path):
    fname = os.path.basename(path).lower()
    if "rest" in fname: return "rest"
    if "walk" in fname: return "walk"
    if "lift" in fname or "flex" in fname: return "lift"
    return os.path.splitext(fname)[0]

def load_single_file(path):
    df = pd.read_csv(path)
    if label_col not in df.columns or df[label_col].isna().all():
        df[label_col] = infer_label_from_filename(path)
    return df

def build_windows_from_array(arr_emg, win_samples, step_samples):
    windows = []
    n = len(arr_emg)
    for start in range(0, n - win_samples + 1, step_samples):
        windows.append(arr_emg[start:start+win_samples].astype(np.float32))
    return np.array(windows)

# -----------------------
# Build real windows & labels (no augmentation)
# -----------------------
win_samples = int(sampling_rate * window_ms / 1000)
step_samples = int(sampling_rate * step_ms / 1000)
if win_samples <= 0:
    raise ValueError("Invalid window settings: win_samples <= 0")

print("Window size (samples):", win_samples, " step:", step_samples)
all_windows = []
all_labels = []
file_info = []

for p in csv_paths:
    if not os.path.exists(p):
        print(f"[WARN] File not found: {p} (skipping)")
        continue
    df = load_single_file(p)
    label = str(df[label_col].iloc[0])
    arr = df[emg_col].values
    windows = build_windows_from_array(arr, win_samples, step_samples)
    print(f"Loaded {os.path.basename(p)} -> rows: {len(arr)} ; windows: {len(windows)} ; label: {label}")
    for w in windows:
        all_windows.append(w)
        all_labels.append(label)
    file_info.append((p, label, len(arr), len(windows)))

if len(all_windows) == 0:
    raise ValueError("No real windows built from CSVs. Try reducing window_ms or increasing overlap (step_ms).")

all_windows = np.array(all_windows)  # shape (N, win_samples)
all_labels = np.array(all_labels)    # shape (N,)

print("Total real windows:", all_windows.shape[0])
print("Raw label counts:", pd.Series(all_labels).value_counts().to_dict())

# -----------------------
# Load mean/std/classes if available; fallback compute from real windows
# -----------------------
if os.path.exists(mean_path) and os.path.exists(std_path) and os.path.exists(classes_path):
    mean = np.load(mean_path).item() if os.path.exists(mean_path) else np.load(mean_path)
    std  = np.load(std_path).item()  if os.path.exists(std_path) else np.load(std_path)
    classes = np.load(classes_path, allow_pickle=True)
    print("[INFO] Loaded mean/std/classes from files.")
else:
    print("[WARN] mean/std/classes not all found. Computing mean/std from real windows (FALLBACK).")
    mean = float(all_windows.mean())
    std  = float(all_windows.std())
    # infer classes from labels present
    classes = np.unique(all_labels)
    print(f"[INFO] Inferred classes = {classes}")
    # Save these so you can reuse downstream
    np.save("mean_inferred.npy", mean)
    np.save("std_inferred.npy", std)
    np.save("classes_inferred.npy", classes)
    print("[INFO] Saved mean_inferred.npy, std_inferred.npy, classes_inferred.npy")

# Build mapping label -> index using classes order (important to match training order if available)
le = LabelEncoder()
le.fit(classes)
y_true = le.transform(all_labels)  # integers 0..C-1
class_names = le.classes_
print("Class names used (order):", class_names)

# -----------------------
# Load TFLite model
# -----------------------
if not os.path.exists(tflite_path):
    raise FileNotFoundError(f"TFLite model not found at {tflite_path}")

interpreter = tf.lite.Interpreter(model_path=tflite_path)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
print("TFLite input shape:", input_details[0]['shape'], " dtype:", input_details[0]['dtype'])
print("TFLite output shape:", output_details[0]['shape'], " dtype:", output_details[0]['dtype'])

# -----------------------
# Inference loop over real windows
# -----------------------
y_pred = []
y_pred_probs = []

for i, w in enumerate(all_windows):
    # preprocess: normalize with mean/std
    x = (w.astype(np.float32) - mean) / (std + 1e-8)
    x = x.reshape(1, win_samples, 1)

    # If the tflite model expects a different dtype (e.g., int8), cast accordingly
    # We will attempt to cast to input_details dtype if needed:
    if input_details[0]['dtype'] == np.int8 or input_details[0]['dtype'] == np.uint8:
        # scale float [-1,1] to int8 using quantization params if present
        # check if quantization params are available
        scale, zero_point = input_details[0].get('quantization', (None, None))
        if scale and zero_point is not None:
            x_int = (x / scale + zero_point).astype(input_details[0]['dtype'])
            interpreter.set_tensor(input_details[0]['index'], x_int)
        else:
            # fallback: cast float32 to required dtype
            interpreter.set_tensor(input_details[0]['index'], x.astype(input_details[0]['dtype']))
    else:
        interpreter.set_tensor(input_details[0]['index'], x.astype(np.float32))

    interpreter.invoke()
    out = interpreter.get_tensor(output_details[0]['index'])[0]

    # If output is quantized, dequantize if needed
    if output_details[0]['dtype'] == np.int8 or output_details[0]['dtype'] == np.uint8:
        scale_o, zp_o = output_details[0].get('quantization', (None, None))
        if scale_o and zp_o is not None:
            out = (out.astype(np.float32) - zp_o) * scale_o

    probs = np.asarray(out)
    pred_idx = int(np.argmax(probs))
    y_pred.append(pred_idx)
    y_pred_probs.append(probs)

y_pred = np.array(y_pred)
y_pred_probs = np.vstack(y_pred_probs)

# -----------------------
# Metrics & Reporting
# -----------------------
acc = accuracy_score(y_true, y_pred)
bal_acc = balanced_accuracy_score(y_true, y_pred)
print(f"\nOverall accuracy: {acc:.4f}  |  Balanced accuracy: {bal_acc:.4f}\n")

print("Classification report (per-class):\n")
print(classification_report(y_true, y_pred, target_names=class_names))

print("Confusion matrix:\n")
cm = confusion_matrix(y_true, y_pred)
print(cm)

# Per-class accuracy
per_class_acc = cm.diagonal() / cm.sum(axis=1)
for i, n in enumerate(class_names):
    print(f"Class {n}: accuracy {per_class_acc[i]:.3f} (support {cm.sum(axis=1)[i]})")

# -----------------------
# Save predictions CSV for inspection
# -----------------------
rows = []
for i in range(len(all_windows)):
    true_label = all_labels[i]
    pred_label = class_names[y_pred[i]]
    probs = y_pred_probs[i]
    rows.append({
        "file": None,
        "true_label": true_label,
        "pred_label": pred_label,
        "pred_idx": int(y_pred[i]),
        "probabilities": ",".join([f"{p:.6f}" for p in probs])
    })

df_out = pd.DataFrame(rows)
df_out.to_csv(save_predictions_csv, index=False)
print(f"\nSaved predictions to {save_predictions_csv}")

# -----------------------
# Done
# -----------------------
print("\nFinished TFLite evaluation on real windows.")


Window size (samples): 50  step: 25
Loaded session_rest.csv -> rows: 101 ; windows: 3 ; label: rest
Loaded session_walk.csv -> rows: 52 ; windows: 1 ; label: walk
Loaded session_lift.csv -> rows: 50 ; windows: 1 ; label: lift
Total real windows: 5
Raw label counts: {'rest': 3, 'walk': 1, 'lift': 1}
[WARN] mean/std/classes not all found. Computing mean/std from real windows (FALLBACK).
[INFO] Inferred classes = ['lift' 'rest' 'walk']
[INFO] Saved mean_inferred.npy, std_inferred.npy, classes_inferred.npy
Class names used (order): ['lift' 'rest' 'walk']
TFLite input shape: [ 1 50  1]  dtype: <class 'numpy.float32'>
TFLite output shape: [1 3]  dtype: <class 'numpy.float32'>

Overall accuracy: 1.0000  |  Balanced accuracy: 1.0000

Classification report (per-class):

              precision    recall  f1-score   support

        lift       1.00      1.00      1.00         1
        rest       1.00      1.00      1.00         3
        walk       1.00      1.00      1.00         1

    accura

    TF 2.20. Please use the LiteRT interpreter from the ai_edge_litert package.
    See the [migration guide](https://ai.google.dev/edge/litert/migration)
    for details.
    


In [None]:
import os
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

# -----------------------------
# USER SETTINGS
# -----------------------------
csv_paths = [
    "data/session_rest.csv",
    "data/session_walk.csv",
    "data/session_lift.csv"
]

emg_col = "emg"
sampling_rate = 1000    # Hz
window_ms = 50          # You used 50ms windows
step_ms = 25            # 50% overlap

# -----------------------------
# LOAD mean / std / classes
# -----------------------------
mean = np.load("mean_inferred.npy")
std  = np.load("std_inferred.npy")
classes = np.load("classes_inferred.npy", allow_pickle=True)
class_to_idx = {c:i for i,c in enumerate(classes)}

print("Classes:", classes)

# -----------------------------
# LOAD TFLITE MODEL
# -----------------------------
interpreter = tf.lite.Interpreter(model_path="emg_model.tflite")
interpreter.allocate_tensors()

input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

# -----------------------------
# WINDOWING FUNCTION
# -----------------------------
win_samples = int(sampling_rate * window_ms / 1000)
step_samples = int(sampling_rate * step_ms / 1000)

def build_windows(arr):
    windows = []
    for start in range(0, len(arr) - win_samples + 1, step_samples):
        windows.append(arr[start:start+win_samples].astype(np.float32))
    return np.array(windows)

# -----------------------------
# BUILD TEST WINDOWS
# -----------------------------
X = []
Y = []

for path in csv_paths:
    df = pd.read_csv(path)

    # infer class from filename
    fname = path.lower()
    if "rest" in fname: label = "rest"
    elif "walk" in fname: label = "walk"
    elif "lift" in fname or "flex" in fname: label = "lift"
    else: label = "unknown"

    arr = df[emg_col].values
    wins = build_windows(arr)

    for w in wins:
        X.append(w)
        Y.append(class_to_idx[label])

X = np.array(X)
Y = np.array(Y)

print("Total windows built:", len(X))

# -----------------------------
# RUN TFLITE INFERENCE
# -----------------------------
y_pred = []

for w in X:
    # normalize
    w_norm = (w - mean) / (std + 1e-8)
    w_norm = w_norm.reshape(1, win_samples, 1).astype(np.float32)

    interpreter.set_tensor(input_details[0]['index'], w_norm)
    interpreter.invoke()

    out = interpreter.get_tensor(output_details[0]['index'])[0]
    pred = np.argmax(out)
    y_pred.append(pred)

y_pred = np.array(y_pred)

# -----------------------------
# PRINT ACCURACY
# -----------------------------
print("\n==============================")
print(" ✔ MODEL ACCURACY RESULTS")
print("==============================")

print("\nOverall Accuracy:", round(accuracy_score(Y, y_pred), 4))

print("\nClassification Report:")
print(classification_report(Y, y_pred, target_names=classes))

print("Confusion Matrix:")
print(confusion_matrix(Y, y_pred))


Classes: ['lift' 'rest' 'walk']
Total windows built: 5

 ✔ MODEL ACCURACY RESULTS

Overall Accuracy: 1.0

Classification Report:
              precision    recall  f1-score   support

        lift       1.00      1.00      1.00         1
        rest       1.00      1.00      1.00         3
        walk       1.00      1.00      1.00         1

    accuracy                           1.00         5
   macro avg       1.00      1.00      1.00         5
weighted avg       1.00      1.00      1.00         5

Confusion Matrix:
[[1 0 0]
 [0 3 0]
 [0 0 1]]


    TF 2.20. Please use the LiteRT interpreter from the ai_edge_litert package.
    See the [migration guide](https://ai.google.dev/edge/litert/migration)
    for details.
    


In [None]:
# retrain_with_holdout_flexible.py  (copy & run)
import os, random, numpy as np, pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import tensorflow as tf
from tensorflow.keras import layers, models
from sklearn.metrics import classification_report, confusion_matrix, precision_recall_fscore_support

# --- user params (adjust if needed) ---
csv_paths = ["data/session_rest.csv", "data/session_walk.csv", "data/session_lift.csv"]
emg_col = "emg"
label_col = "label"
sampling_rate = 1000
window_ms = 50
step_ms = 25
target_windows_per_class = 100   # lower if you want faster runs
batch_size = 64
epochs = 20
random_seed = 42

np.random.seed(random_seed); random.seed(random_seed); tf.random.set_seed(random_seed)

# --- helpers ---
def infer_label_from_filename(path):
    fn = os.path.basename(path).lower()
    if "rest" in fn: return "rest"
    if "walk" in fn: return "walk"
    if "lift" in fn or "flex" in fn: return "lift"
    return os.path.splitext(fn)[0]

def load_single_file(path):
    df = pd.read_csv(path)
    if label_col not in df.columns or df[label_col].isna().all():
        df[label_col] = infer_label_from_filename(path)
    return df

def build_windows(arr, win_samples, step_samples):
    outs=[]
    n=len(arr)
    for s in range(0, n - win_samples + 1, step_samples):
        outs.append(arr[s:s+win_samples].astype(np.float32))
    return np.array(outs)

def augment_window(w):
    # small jitter, scale and noise
    w = w.copy()
    shift = np.random.randint(-3, 4)
    if shift>0: w = np.concatenate([w[shift:], np.full(shift, w[-1], dtype=w.dtype)])
    elif shift<0:
        s=-shift; w = np.concatenate([np.full(s, w[0], dtype=w.dtype), w[:-s]])
    w = w * np.random.uniform(0.95, 1.05)
    rng = np.max(w) - np.min(w)
    w = w + np.random.normal(0, 0.02*(rng+1e-8), size=w.shape)
    return w

def create_augmented(bases, target):
    if len(bases)==0: return np.zeros((0,0), dtype=np.float32)
    out = [b for b in bases]
    while len(out) < target:
        b = bases[np.random.randint(0, len(bases))]
        out.append(augment_window(b))
    return np.array(out, dtype=np.float32)

# --- compute window sizes ---
win_samples = int(sampling_rate * window_ms / 1000)
step_samples = int(sampling_rate * step_ms / 1000)
if win_samples <= 0: raise SystemExit("Invalid window settings")

print("Window samples:", win_samples, " step:", step_samples)

per_class_bases = {}
per_class_test = {}

for p in csv_paths:
    if not os.path.exists(p):
        print("[WARN] missing file:", p); continue
    df = load_single_file(p)
    cls = str(df[label_col].iloc[0])
    arr = df[emg_col].values
    wins = build_windows(arr, win_samples, step_samples)
    print(f"{os.path.basename(p)} rows:{len(arr)} windows:{len(wins)} class:{cls}")
    if len(wins) == 0:
        continue
    if len(wins) >= 2:
        # reserve last as test
        per_class_test.setdefault(cls, []).append(wins[-1])
        per_class_bases.setdefault(cls, []).extend(list(wins[:-1]))
    else:
        # exactly 1: use as augmentation base, no held-out
        per_class_bases.setdefault(cls, []).extend(list(wins))
        print(f"[INFO] only 1 window for class '{cls}' — used as base, no held-out reserved.")

print("\nBases per class:")
for c,v in per_class_bases.items(): print(" ", c, len(v))
print("\nHeld-out per class:")
for c,v in per_class_test.items(): print(" ", c, len(v))

no_heldout = [c for c in per_class_bases.keys() if c not in per_class_test]
if no_heldout:
    print("\n[WARN] Classes with NO held-out windows (cannot evaluate on unseen real windows):", no_heldout)

# --- create augmented training data ---
X_train_list=[]; y_train_list=[]
X_test=[]; y_test=[]
for cls, bases in per_class_bases.items():
    bases_np = np.array(bases) if len(bases)>0 else np.zeros((0,win_samples))
    print(f"[AUG] {cls}: bases={len(bases_np)} -> target {target_windows_per_class}")
    aug = create_augmented(bases_np, target_windows_per_class)
    X_train_list.append(aug)
    y_train_list.extend([cls]*len(aug))
    for tw in per_class_test.get(cls, []):
        X_test.append(tw); y_test.append(cls)

X_train = np.vstack(X_train_list)
y_train = np.array(y_train_list)
X_test = np.array(X_test) if len(X_test)>0 else np.zeros((0,win_samples))
y_test = np.array(y_test) if len(y_test)>0 else np.array([])

print("Train shape:", X_train.shape, "Test shape:", X_test.shape)

# --- encode + normalize (fit on train only) ---
le = LabelEncoder(); le.fit(y_train); y_train_enc = le.transform(y_train)
y_test_enc = le.transform(y_test) if len(y_test)>0 else np.array([], dtype=int)
class_names = le.classes_
print("Class order:", class_names)

X_train = X_train[..., np.newaxis]; X_test = X_test[..., np.newaxis] if X_test.shape[0]>0 else np.zeros((0,win_samples,1))
mean = X_train.mean(); std = X_train.std()
X_train = (X_train - mean) / (std + 1e-8)
if X_test.shape[0]>0: X_test = (X_test - mean) / (std + 1e-8)

np.save("mean.npy", mean); np.save("std.npy", std); np.save("classes.npy", class_names)

# --- model ---
input_shape = X_train.shape[1:]
def build_model(inp, ncls):
    m = models.Sequential([
        layers.Input(shape=inp),
        layers.Conv1D(32,5,activation='relu',padding='same'),
        layers.BatchNormalization(), layers.MaxPooling1D(2),
        layers.Conv1D(64,5,activation='relu',padding='same'),
        layers.BatchNormalization(), layers.MaxPooling1D(2),
        layers.Conv1D(128,3,activation='relu',padding='same'),
        layers.BatchNormalization(), layers.GlobalAveragePooling1D(),
        layers.Dense(64,activation='relu'), layers.Dropout(0.3),
        layers.Dense(ncls, activation='softmax')
    ])
    return m

model = build_model(input_shape, len(class_names))
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
model.summary()

# --- train (quick) ---
from sklearn.model_selection import train_test_split
X_tr, X_val, y_tr, y_val = train_test_split(X_train, y_train_enc, test_size=0.15, stratify=y_train_enc, random_state=random_seed)
callbacks = [tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)]
model.fit(X_tr, y_tr, validation_data=(X_val,y_val), epochs=epochs, batch_size=batch_size, callbacks=callbacks, verbose=2)

# --- evaluate on held-out if exists ---
if X_test.shape[0] > 0:
    preds = np.argmax(model.predict(X_test), axis=1)

    # Use labels = full set of class indices so report always matches class_names
    all_label_indices = list(range(len(class_names)))

    print("\nClassification report (held-out):")
    print(classification_report(y_test_enc, preds,
                                labels=all_label_indices,
                                target_names=class_names,
                                zero_division=0))

    print("Confusion matrix (held-out):")
    cm = confusion_matrix(y_test_enc, preds, labels=all_label_indices)
    print(cm)

    # Show per-class support (how many held-out samples per class)
    support = cm.sum(axis=1)
    for i, cname in enumerate(class_names):
        print(f"Class {cname}: support (held-out) = {support[i]}")
else:
    print("\n[WARN] No held-out real windows to evaluate.")

Window samples: 50  step: 25
session_rest.csv rows:101 windows:3 class:rest
session_walk.csv rows:52 windows:1 class:walk
[INFO] only 1 window for class 'walk' — used as base, no held-out reserved.
session_lift.csv rows:50 windows:1 class:lift
[INFO] only 1 window for class 'lift' — used as base, no held-out reserved.

Bases per class:
  rest 2
  walk 1
  lift 1

Held-out per class:
  rest 1

[WARN] Classes with NO held-out windows (cannot evaluate on unseen real windows): ['walk', 'lift']
[AUG] rest: bases=2 -> target 100
[AUG] walk: bases=1 -> target 100
[AUG] lift: bases=1 -> target 100
Train shape: (300, 50) Test shape: (1, 50)
Class order: ['lift' 'rest' 'walk']


Epoch 1/20
4/4 - 4s - 1s/step - accuracy: 0.8863 - loss: 0.3569 - val_accuracy: 0.6667 - val_loss: 1.0231
Epoch 2/20
4/4 - 0s - 38ms/step - accuracy: 1.0000 - loss: 0.0286 - val_accuracy: 0.6667 - val_loss: 0.9628
Epoch 3/20
4/4 - 0s - 39ms/step - accuracy: 1.0000 - loss: 0.0071 - val_accuracy: 0.9333 - val_loss: 0.9065
Epoch 4/20
4/4 - 0s - 43ms/step - accuracy: 1.0000 - loss: 0.0029 - val_accuracy: 1.0000 - val_loss: 0.8515
Epoch 5/20
4/4 - 0s - 43ms/step - accuracy: 1.0000 - loss: 0.0031 - val_accuracy: 1.0000 - val_loss: 0.8018
Epoch 6/20
4/4 - 0s - 37ms/step - accuracy: 1.0000 - loss: 0.0030 - val_accuracy: 1.0000 - val_loss: 0.7596
Epoch 7/20
4/4 - 0s - 41ms/step - accuracy: 1.0000 - loss: 0.0020 - val_accuracy: 1.0000 - val_loss: 0.7245
Epoch 8/20
4/4 - 0s - 38ms/step - accuracy: 1.0000 - loss: 0.0016 - val_accuracy: 1.0000 - val_loss: 0.6955
Epoch 9/20
4/4 - 0s - 40ms/step - accuracy: 1.0000 - loss: 0.0014 - val_accuracy: 1.0000 - val_loss: 0.6714
Epoch 10/20
4/4 - 0s - 42ms/st



[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 162ms/step

Classification report (held-out):
              precision    recall  f1-score   support

        lift       0.00      0.00      0.00         0
        rest       1.00      1.00      1.00         1
        walk       0.00      0.00      0.00         0

    accuracy                           1.00         1
   macro avg       0.33      0.33      0.33         1
weighted avg       1.00      1.00      1.00         1

Confusion matrix (held-out):
[[0 0 0]
 [0 1 0]
 [0 0 0]]
Class lift: support (held-out) = 0
Class rest: support (held-out) = 1
Class walk: support (held-out) = 0


In [None]:
# emg_full_pipeline_safe.py
import os
import random
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers, models

# -----------------------
# USER PARAMETERS (edit if needed)
# -----------------------
csv_paths = ["data/session_rest.csv", "data/session_walk.csv", "data/session_lift.csv"]
emg_col = "emg"
label_col = "label"      # if missing in CSV, label inferred from filename
sampling_rate = 1000     # Hz
window_ms = 50           # window length in ms
step_ms = 25             # step in ms
target_windows_per_class = 100   # augment to this many windows per class (reduce to 50 for speed)
batch_size = 64
epochs = 20
random_seed = 42

np.random.seed(random_seed)
random.seed(random_seed)
tf.random.set_seed(random_seed)

# -----------------------
# Helpers
# -----------------------
def infer_label_from_filename(path):
    fname = os.path.basename(path).lower()
    if "rest" in fname: return "rest"
    if "walk" in fname: return "walk"
    if "lift" in fname or "flex" in fname: return "lift"
    return os.path.splitext(fname)[0]

def load_single_file(path):
    df = pd.read_csv(path)
    if label_col not in df.columns or df[label_col].isna().all():
        df[label_col] = infer_label_from_filename(path)
    return df

def build_windows_from_array(arr_emg, win_samples, step_samples):
    outs = []
    n = len(arr_emg)
    for start in range(0, n - win_samples + 1, step_samples):
        outs.append(arr_emg[start:start+win_samples].astype(np.float32))
    return np.array(outs)

def augment_window(window):
    w = window.copy()
    # small random shift (jitter)
    shift = np.random.randint(-4, 5)
    if shift > 0:
        w = np.concatenate([w[shift:], np.full(shift, w[-1], dtype=w.dtype)])
    elif shift < 0:
        s = -shift
        w = np.concatenate([np.full(s, w[0], dtype=w.dtype), w[:-s]])
    # amplitude scale
    w = w * np.random.uniform(0.95, 1.05)
    # additive noise proportional to signal range
    rng = np.max(w) - np.min(w)
    w = w + np.random.normal(0.0, 0.02 * (rng + 1e-8), size=w.shape)
    return w

def create_augmented_windows(bases, target):
    """Given bases (n_base, win_samples), produce 'target' windows by augmentation."""
    if len(bases) == 0:
        return np.zeros((0,0), dtype=np.float32)
    out = [b for b in bases]
    while len(out) < target:
        b = bases[np.random.randint(0, len(bases))]
        out.append(augment_window(b))
    return np.array(out, dtype=np.float32)

def safe_label_encode_fit(y_train):
    """Fit LabelEncoder on y_train and return mapping dict and encoder."""
    le = LabelEncoder()
    le.fit(y_train)
    classes = list(le.classes_)
    label_to_idx = {c: int(i) for i,c in enumerate(classes)}
    return le, classes, label_to_idx

def safe_label_transform(labels, label_to_idx):
    """Transform labels into indices using label_to_idx; unseen labels -> dropped (with warning)."""
    out_idx = []
    keep_mask = []
    unseen = set()
    for v in labels:
        if v in label_to_idx:
            out_idx.append(label_to_idx[v])
            keep_mask.append(True)
        else:
            unseen.add(v)
            keep_mask.append(False)
    if unseen:
        print("[WARN] Dropping samples with unseen labels:", unseen)
    return np.array(out_idx, dtype=int), np.array(keep_mask, dtype=bool)

# -----------------------
# Windowing & holdout reservation (flexible)
# - reserve last window as held-out only if file has >= 2 windows
# - if file has exactly 1 window, use it as augmentation base (no held-out)
# -----------------------
win_samples = int(sampling_rate * window_ms / 1000)
step_samples = int(sampling_rate * step_ms / 1000)
if win_samples <= 0:
    raise ValueError("Invalid window parameters. win_samples <= 0")

print("Window samples:", win_samples, " step:", step_samples)

per_class_bases = {}   # class -> list of windows (bases for augmentation)
per_class_heldout = {} # class -> list of windows reserved for test (no augmentation)

for p in csv_paths:
    if not os.path.exists(p):
        print(f"[WARN] File not found: {p} (skipping)")
        continue
    df = load_single_file(p)
    cls = str(df[label_col].iloc[0])
    arr = df[emg_col].values
    wins = build_windows_from_array(arr, win_samples, step_samples)
    print(f"{os.path.basename(p)} rows:{len(arr)} windows:{len(wins)} class:{cls}")
    if len(wins) == 0:
        continue
    if len(wins) >= 2:
        # reserve last window as held-out test, use the rest as augmentation bases
        per_class_heldout.setdefault(cls, []).append(wins[-1])
        per_class_bases.setdefault(cls, []).extend(list(wins[:-1]))
    else:
        # exactly 1 window: use as base only (no held-out)
        per_class_bases.setdefault(cls, []).extend(list(wins))
        print(f"[INFO] only 1 window for class '{cls}' — used as base, no held-out reserved.")

print("\nBases per class:")
for c, arr in per_class_bases.items():
    print(" ", c, len(arr))
print("\nHeld-out per class:")
for c, arr in per_class_heldout.items():
    print(" ", c, len(arr))

classes_without_heldout = [c for c in per_class_bases.keys() if c not in per_class_heldout]
if classes_without_heldout:
    print("\n[WARN] Classes without held-out windows (cannot evaluate unseen real windows for these):", classes_without_heldout)

# -----------------------
# Create augmented training data (balanced per-class)
# -----------------------
X_train_parts = []
y_train_parts = []
X_test = []
y_test = []

for cls, bases in per_class_bases.items():
    bases_np = np.array(bases) if len(bases) > 0 else np.zeros((0, win_samples))
    print(f"[AUG] class {cls}: bases={len(bases_np)} -> augment to {target_windows_per_class}")
    augmented = create_augmented_windows(bases_np, target_windows_per_class)
    if augmented.shape[0] > 0:
        X_train_parts.append(augmented)
        y_train_parts.extend([cls] * len(augmented))
    # attach any reserved held-out windows for that class (may be empty)
    for tw in per_class_heldout.get(cls, []):
        X_test.append(tw)
        y_test.append(cls)

if len(X_train_parts) == 0:
    raise RuntimeError("No training data (no bases produced). Collect more recordings or adjust window params.")

X_train = np.vstack(X_train_parts)
y_train = np.array(y_train_parts)
X_test = np.array(X_test) if len(X_test) > 0 else np.zeros((0, win_samples))
y_test = np.array(y_test) if len(y_test) > 0 else np.array([])

print("Train shape:", X_train.shape, "Test shape:", X_test.shape)
print("Train class counts:", {c: sum(y_train == c) for c in np.unique(y_train)} )
print("Test class counts:", {c: sum(y_test == c) for c in np.unique(y_test)} )

# -----------------------
# Safe label encoding (fit on train only)
# -----------------------
le, class_names, label_to_idx = safe_label_encode_fit(y_train)
print("Class order (training):", class_names)

# transform training labels
y_train_enc = np.array([label_to_idx[v] for v in y_train], dtype=int)

# transform test labels safely (drop unseen)
if X_test.shape[0] > 0:
    y_test_enc, keep_mask = safe_label_transform(y_test, label_to_idx)
    if not keep_mask.all():
        X_test = X_test[keep_mask]
        y_test = y_test[keep_mask]
else:
    y_test_enc = np.array([], dtype=int)

# -----------------------
# Preprocessing: reshape and normalize (fit mean/std on train only)
# -----------------------
X_train = X_train[..., np.newaxis]  # (N, win_samples, 1)
X_test = X_test[..., np.newaxis] if X_test.shape[0] > 0 else np.zeros((0, win_samples, 1))

mean = float(X_train.mean())
std  = float(X_train.std())
print("Train mean/std:", mean, std)

# Normalize
X_train = (X_train - mean) / (std + 1e-8)
if X_test.shape[0] > 0:
    X_test = (X_test - mean) / (std + 1e-8)

# Save preprocessing artifacts
np.save("mean_inferred.npy", mean)
np.save("std_inferred.npy", std)
np.save("classes_inferred.npy", np.array(class_names, dtype=object))
print("Saved mean_inferred.npy, std_inferred.npy, classes_inferred.npy")

# -----------------------
# Build model (compact 1D-CNN)
# -----------------------
input_shape = X_train.shape[1:]
def build_model(input_shape, num_classes):
    model = models.Sequential([
        layers.Input(shape=input_shape),
        layers.Conv1D(32, 5, activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling1D(2),
        layers.Conv1D(64, 5, activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling1D(2),
        layers.Conv1D(128, 3, activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.GlobalAveragePooling1D(),
        layers.Dense(64, activation='relu'),
        layers.Dropout(0.3),
        layers.Dense(num_classes, activation='softmax')
    ])
    return model

model = build_model(input_shape, len(class_names))
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
model.summary()

# -----------------------
# Train (with a small val split)
# -----------------------
X_tr, X_val, y_tr, y_val = train_test_split(X_train, y_train_enc, test_size=0.15, random_state=random_seed, stratify=y_train_enc)
callbacks = [
    tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=6, restore_best_weights=True)
]
history = model.fit(X_tr, y_tr, validation_data=(X_val, y_val), epochs=epochs, batch_size=batch_size, callbacks=callbacks, verbose=2)

# -----------------------
# Evaluate on held-out real windows (if any)
# -----------------------
if X_test.shape[0] > 0:
    test_loss, test_acc = model.evaluate(X_test, y_test_enc, verbose=0)
    print(f"\nKeras held-out evaluation -> loss: {test_loss:.4f} acc: {test_acc:.4f}")
    preds = np.argmax(model.predict(X_test), axis=1)

    # safe full-class report: supply labels=list(range(n_classes))
    labels_full = list(range(len(class_names)))
    print("\nClassification report (held-out):")
    print(classification_report(y_test_enc, preds, labels=labels_full, target_names=class_names, zero_division=0))
    print("Confusion matrix (held-out):")
    print(confusion_matrix(y_test_enc, preds, labels=labels_full))

    # per-class support in held-out
    cm = confusion_matrix(y_test_enc, preds, labels=labels_full)
    for i, cname in enumerate(class_names):
        print(f"Class {cname}: held-out support = {cm.sum(axis=1)[i]}")
else:
    print("\n[WARN] No held-out real windows were available to evaluate. Collect additional recordings to measure real generalization.")

# -----------------------
# Save Keras model and convert to TFLite
# -----------------------
model.save("final_emg_model_safe.h5")
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()
with open("emg_model_safe.tflite", "wb") as f:
    f.write(tflite_model)
print("Saved final_emg_model_safe.h5 and emg_model_safe.tflite")

# -----------------------
# Optional: Quick TFLite parity check on held-out (if present)
# -----------------------
if X_test.shape[0] > 0:
    interpreter = tf.lite.Interpreter(model_path="emg_model_safe.tflite")
    interpreter.allocate_tensors()
    in_d = interpreter.get_input_details()[0]
    out_d = interpreter.get_output_details()[0]
    preds_tflite = []
    for w in X_test:
        x = w.astype(np.float32)
        # if quantized input expected, scaling would be needed; here model is float
        interpreter.set_tensor(in_d['index'], x.reshape(1, x.shape[0], 1).astype(np.float32))
        interpreter.invoke()
        out = interpreter.get_tensor(out_d['index'])[0]
        preds_tflite.append(np.argmax(out))
    preds_tflite = np.array(preds_tflite)
    print("\nTFLite held-out accuracy:", accuracy_score(y_test_enc, preds_tflite))
    print("TFLite confusion:\n", confusion_matrix(y_test_enc, preds_tflite, labels=labels_full))

print("\nFinished pipeline.")


Window samples: 50  step: 25
session_rest.csv rows:101 windows:3 class:rest
session_walk.csv rows:52 windows:1 class:walk
[INFO] only 1 window for class 'walk' — used as base, no held-out reserved.
session_lift.csv rows:50 windows:1 class:lift
[INFO] only 1 window for class 'lift' — used as base, no held-out reserved.

Bases per class:
  rest 2
  walk 1
  lift 1

Held-out per class:
  rest 1

[WARN] Classes without held-out windows (cannot evaluate unseen real windows for these): ['walk', 'lift']
[AUG] class rest: bases=2 -> augment to 100
[AUG] class walk: bases=1 -> augment to 100
[AUG] class lift: bases=1 -> augment to 100
Train shape: (300, 50) Test shape: (1, 50)
Train class counts: {np.str_('lift'): np.int64(100), np.str_('rest'): np.int64(100), np.str_('walk'): np.int64(100)}
Test class counts: {np.str_('rest'): np.int64(1)}
Class order (training): [np.str_('lift'), np.str_('rest'), np.str_('walk')]
Train mean/std: 774.6346435546875 230.83572387695312
Saved mean_inferred.npy, st

Epoch 1/20
4/4 - 3s - 857ms/step - accuracy: 0.8941 - loss: 0.3525 - val_accuracy: 0.6667 - val_loss: 1.0173
Epoch 2/20
4/4 - 0s - 38ms/step - accuracy: 1.0000 - loss: 0.0299 - val_accuracy: 0.6667 - val_loss: 0.9505
Epoch 3/20
4/4 - 0s - 60ms/step - accuracy: 1.0000 - loss: 0.0087 - val_accuracy: 0.9333 - val_loss: 0.8900
Epoch 4/20
4/4 - 0s - 61ms/step - accuracy: 1.0000 - loss: 0.0031 - val_accuracy: 1.0000 - val_loss: 0.8329
Epoch 5/20
4/4 - 0s - 77ms/step - accuracy: 1.0000 - loss: 0.0031 - val_accuracy: 1.0000 - val_loss: 0.7819
Epoch 6/20
4/4 - 0s - 57ms/step - accuracy: 1.0000 - loss: 0.0026 - val_accuracy: 1.0000 - val_loss: 0.7387
Epoch 7/20
4/4 - 0s - 77ms/step - accuracy: 1.0000 - loss: 0.0018 - val_accuracy: 1.0000 - val_loss: 0.7030
Epoch 8/20
4/4 - 0s - 60ms/step - accuracy: 1.0000 - loss: 0.0017 - val_accuracy: 1.0000 - val_loss: 0.6743
Epoch 9/20
4/4 - 0s - 64ms/step - accuracy: 1.0000 - loss: 0.0012 - val_accuracy: 1.0000 - val_loss: 0.6513
Epoch 10/20
4/4 - 0s - 74ms




Classification report (held-out):
              precision    recall  f1-score   support

        lift       0.00      0.00      0.00         0
        rest       1.00      1.00      1.00         1
        walk       0.00      0.00      0.00         0

    accuracy                           1.00         1
   macro avg       0.33      0.33      0.33         1
weighted avg       1.00      1.00      1.00         1

Confusion matrix (held-out):
[[0 0 0]
 [0 1 0]
 [0 0 0]]
Class lift: held-out support = 0
Class rest: held-out support = 1
Class walk: held-out support = 0
Saved artifact at '/tmp/tmprtuwt639'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): TensorSpec(shape=(None, 50, 1), dtype=tf.float32, name='keras_tensor_52')
Output Type:
  TensorSpec(shape=(None, 3), dtype=tf.float32, name=None)
Captures:
  135768687631184: TensorSpec(shape=(), dtype=tf.resource, name=None)
  135768687629648: TensorSpec(shape=(), dtype=tf.resource, name=None)
  13576

    TF 2.20. Please use the LiteRT interpreter from the ai_edge_litert package.
    See the [migration guide](https://ai.google.dev/edge/litert/migration)
    for details.
    


In [None]:
# lofo_quick.py  -- quick LOFO evaluation (fast)
import os, random, numpy as np, pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
import tensorflow as tf
from tensorflow.keras import layers, models

# --- USER (edit if you'd like) ---
csv_files = ["data/session_rest.csv", "data/session_walk.csv", "data/session_lift.csv"]
emg_col = "emg"
sampling_rate = 1000
window_ms = 50
step_ms = 25
target_windows_per_class = 50   # smaller -> faster
batch_size = 64
epochs = 10                     # fewer epochs -> faster
random_seed = 42

np.random.seed(random_seed); random.seed(random_seed); tf.random.set_seed(random_seed)

# --- helpers ---
def infer_label_from_filename(path):
    fn = os.path.basename(path).lower()
    if "rest" in fn: return "rest"
    if "walk" in fn: return "walk"
    if "lift" in fn or "flex" in fn: return "lift"
    return os.path.splitext(fn)[0]

def load_windows(path, win_samples, step_samples):
    df = pd.read_csv(path)
    if 'label' not in df.columns or df['label'].isna().all():
        df['label'] = infer_label_from_filename(path)
    arr = df[emg_col].values
    outs=[]
    for s in range(0, len(arr)-win_samples+1, step_samples):
        outs.append(arr[s:s+win_samples].astype(np.float32))
    return np.array(outs), df['label'].iloc[0]

def augment_window(w):
    w = w.copy()
    shift = np.random.randint(-4,5)
    if shift>0: w = np.concatenate([w[shift:], np.full(shift, w[-1], dtype=w.dtype)])
    elif shift<0:
        s=-shift; w = np.concatenate([np.full(s, w[0], dtype=w.dtype), w[:-s]])
    w = w * np.random.uniform(0.95, 1.05)
    rng = np.max(w)-np.min(w)
    w = w + np.random.normal(0, 0.02*(rng+1e-8), size=w.shape)
    return w

def create_augmented(bases, target):
    if len(bases)==0: return np.zeros((0,0), dtype=np.float32)
    out = [b for b in bases]
    while len(out) < target:
        b = bases[np.random.randint(0, len(bases))]
        out.append(augment_window(b))
    return np.array(out, dtype=np.float32)

# --- window params ---
win_samples = int(sampling_rate * window_ms / 1000)
step_samples = int(sampling_rate * step_ms / 1000)
if win_samples <= 0: raise SystemExit("check window params")

fold_results = []
for i, test_file in enumerate(csv_files):
    print(f"\n=== FOLD {i+1}/{len(csv_files)}: test = {os.path.basename(test_file)} ===")
    train_bases = {}
    test_windows = None
    test_label = None

    for f in csv_files:
        wins, label = load_windows(f, win_samples, step_samples)
        if f == test_file:
            test_windows = wins
            test_label = label
            print(" Test windows:", len(test_windows), " label:", test_label)
        else:
            if len(wins) > 0:
                train_bases.setdefault(label, []).extend(list(wins))
                print(" Train base added:", os.path.basename(f), "wins:", len(wins), "label:", label)

    # skip if missing train data
    if len(train_bases) == 0:
        print("[SKIP] No training bases for this fold"); continue
    if test_windows is None or len(test_windows)==0:
        print("[SKIP] No test windows in test file"); continue

    # build augmented train set
    X_train_parts = []; y_train_parts = []
    for cls, bases in train_bases.items():
        bases_np = np.array(bases)
        aug = create_augmented(bases_np, target_windows_per_class)
        if aug.shape[0] > 0:
            X_train_parts.append(aug)
            y_train_parts.extend([cls]*len(aug))
    if len(X_train_parts) == 0:
        print("[SKIP] No augmented train data"); continue

    X_train = np.vstack(X_train_parts); y_train = np.array(y_train_parts)
    X_test = test_windows; y_test = np.array([test_label]*len(X_test))

    # encode
    le = LabelEncoder(); le.fit(y_train)
    y_train_enc = le.transform(y_train)
    # if test contains unseen class, skip fold (shouldn't happen with LOFO)
    try:
        y_test_enc = le.transform(y_test)
    except ValueError:
        print("[SKIP] Test contains unseen labels compared to train"); continue

    # reshape + normalize (fit on train only)
    X_train = X_train[..., np.newaxis]; X_test = X_test[..., np.newaxis]
    mean = X_train.mean(); std = X_train.std()
    X_train = (X_train - mean) / (std + 1e-8)
    X_test  = (X_test - mean) / (std + 1e-8)

    # model (smaller for speed)
    def build_model(inp, ncls):
        m = models.Sequential([
            layers.Input(shape=inp),
            layers.Conv1D(32,5,activation='relu',padding='same'),
            layers.BatchNormalization(), layers.MaxPooling1D(2),
            layers.Conv1D(64,5,activation='relu',padding='same'),
            layers.BatchNormalization(), layers.GlobalAveragePooling1D(),
            layers.Dense(64,activation='relu'), layers.Dropout(0.3),
            layers.Dense(ncls, activation='softmax')
        ])
        return m

    model = build_model(X_train.shape[1:], len(le.classes_))
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    from sklearn.model_selection import train_test_split
    X_tr, X_val, y_tr, y_val = train_test_split(X_train, y_train_enc, test_size=0.15, stratify=y_train_enc, random_state=random_seed)
    model.fit(X_tr, y_tr, validation_data=(X_val,y_val), epochs=epochs, batch_size=batch_size, verbose=0)

    preds = np.argmax(model.predict(X_test), axis=1)
    acc = accuracy_score(y_test_enc, preds)
    print(f" Fold accuracy: {acc:.4f}  (n_test={len(X_test)})")
    print(classification_report(y_test_enc, preds, zero_division=0, target_names=le.classes_))
    print(" Confusion:\n", confusion_matrix(y_test_enc, preds))
    fold_results.append(acc)

# summary
if fold_results:
    print("\nLOFO mean accuracy:", np.mean(fold_results), "per-fold:", fold_results)
else:
    print("No folds completed successfully. Collect more windows per file.")



=== FOLD 1/3: test = session_rest.csv ===
 Test windows: 3  label: rest
 Train base added: session_walk.csv wins: 1 label: walk
 Train base added: session_lift.csv wins: 1 label: lift
[SKIP] Test contains unseen labels compared to train

=== FOLD 2/3: test = session_walk.csv ===
 Train base added: session_rest.csv wins: 3 label: rest
 Test windows: 1  label: walk
 Train base added: session_lift.csv wins: 1 label: lift
[SKIP] Test contains unseen labels compared to train

=== FOLD 3/3: test = session_lift.csv ===
 Train base added: session_rest.csv wins: 3 label: rest
 Train base added: session_walk.csv wins: 1 label: walk
 Test windows: 1  label: lift
[SKIP] Test contains unseen labels compared to train
No folds completed successfully. Collect more windows per file.


In [None]:
# quick_stratified_eval.py  -- optimistic but immediate evaluation (not file-held-out)
import os, random, numpy as np, pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
import tensorflow as tf
from tensorflow.keras import layers, models

# --- USER ---
csv_paths = ["data/session_rest.csv", "data/session_walk.csv", "data/session_lift.csv"]
emg_col = "emg"
sampling_rate = 1000
window_ms = 50
step_ms = 25
target_windows_per_class = 100   # lower to 50 to speed up
batch_size = 64
epochs = 20
random_seed = 42

np.random.seed(random_seed); random.seed(random_seed); tf.random.set_seed(random_seed)

def infer_label_from_filename(path):
    fn = os.path.basename(path).lower()
    if "rest" in fn: return "rest"
    if "walk" in fn: return "walk"
    if "lift" in fn or "flex" in fn: return "lift"
    return os.path.splitext(fn)[0]

def build_windows(path, win_samples, step_samples):
    df = pd.read_csv(path)
    label = df['label'].iloc[0] if 'label' in df.columns and not df['label'].isna().all() else infer_label_from_filename(path)
    arr = df[emg_col].values
    outs = []
    for s in range(0, len(arr)-win_samples+1, step_samples):
        outs.append(arr[s:s+win_samples].astype(np.float32))
    return np.array(outs), label

def augment_window(w):
    w = w.copy()
    shift = np.random.randint(-4,5)
    if shift>0: w = np.concatenate([w[shift:], np.full(shift, w[-1], dtype=w.dtype)])
    elif shift<0:
        s=-shift; w = np.concatenate([np.full(s, w[0], dtype=w.dtype), w[:-s]])
    w = w * np.random.uniform(0.95, 1.05)
    rng = np.max(w)-np.min(w)
    w = w + np.random.normal(0, 0.02*(rng+1e-8), size=w.shape)
    return w

def create_augmented(bases, target):
    if len(bases)==0: return np.zeros((0,0), dtype=np.float32)
    out = [b for b in bases]
    while len(out) < target:
        b = bases[np.random.randint(0, len(bases))]
        out.append(augment_window(b))
    return np.array(out, dtype=np.float32)

win_samples = int(sampling_rate * window_ms / 1000)
step_samples = int(sampling_rate * step_ms / 1000)

# collect real windows per file then aggregate by class
per_class_bases = {}
for p in csv_paths:
    if not os.path.exists(p): continue
    wins, lbl = build_windows(p, win_samples, step_samples)
    per_class_bases.setdefault(lbl, []).extend(list(wins))
    print(f"{os.path.basename(p)} -> {len(wins)} windows label={lbl}")

# augment
X_list=[]; y_list=[]
for cls, bases in per_class_bases.items():
    bases_np = np.array(bases)
    if bases_np.shape[0]==0: continue
    aug = create_augmented(bases_np, target_windows_per_class)
    X_list.append(aug)
    y_list.extend([cls]*len(aug))
X = np.vstack(X_list)
y = np.array(y_list)
print("Total windows (augmented):", X.shape, "classes:", np.unique(y))

# encode
le = LabelEncoder(); y_enc = le.fit_transform(y); class_names = le.classes_
print("Class order:", class_names)

# reshape & normalize
X = X[..., np.newaxis]
mean = X.mean(); std = X.std()
X = (X - mean) / (std + 1e-8)

# stratified split
X_train, X_temp, y_train, y_temp = train_test_split(X, y_enc, test_size=0.25, random_state=random_seed, stratify=y_enc)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=random_seed, stratify=y_temp)

print("Shapes -> train:", X_train.shape, "val:", X_val.shape, "test:", X_test.shape)

# build model
def build_model(input_shape, ncls):
    m = models.Sequential([
        layers.Input(shape=input_shape),
        layers.Conv1D(32,5,activation='relu',padding='same'),
        layers.BatchNormalization(), layers.MaxPooling1D(2),
        layers.Conv1D(64,5,activation='relu',padding='same'),
        layers.BatchNormalization(), layers.GlobalAveragePooling1D(),
        layers.Dense(64,activation='relu'), layers.Dropout(0.3),
        layers.Dense(ncls, activation='softmax')
    ])
    return m

model = build_model(X_train.shape[1:], len(class_names))
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
from sklearn.model_selection import train_test_split
X_tr, X_v, y_tr, y_v = train_test_split(X_train, y_train, test_size=0.15, random_state=random_seed, stratify=y_train)
model.fit(X_tr, y_tr, validation_data=(X_v,y_v), epochs=epochs, batch_size=batch_size, verbose=2)

# evaluate
y_pred = np.argmax(model.predict(X_test), axis=1)
print("Overall accuracy (optimistic):", accuracy_score(y_test, y_pred))
print(classification_report(y_test, y_pred, target_names=class_names, zero_division=0))
print("Confusion:\n", confusion_matrix(y_test, y_pred))


session_rest.csv -> 3 windows label=rest
session_walk.csv -> 1 windows label=walk
session_lift.csv -> 1 windows label=lift
Total windows (augmented): (300, 50) classes: ['lift' 'rest' 'walk']
Class order: ['lift' 'rest' 'walk']
Shapes -> train: (225, 50, 1) val: (37, 50, 1) test: (38, 50, 1)
Epoch 1/20
3/3 - 3s - 871ms/step - accuracy: 0.4398 - loss: 1.0856 - val_accuracy: 0.6471 - val_loss: 1.0505
Epoch 2/20
3/3 - 0s - 39ms/step - accuracy: 0.9948 - loss: 0.3421 - val_accuracy: 0.6471 - val_loss: 0.9990
Epoch 3/20
3/3 - 0s - 38ms/step - accuracy: 0.9948 - loss: 0.1734 - val_accuracy: 0.6471 - val_loss: 0.9428
Epoch 4/20
3/3 - 0s - 44ms/step - accuracy: 0.9948 - loss: 0.0973 - val_accuracy: 0.7647 - val_loss: 0.8887
Epoch 5/20
3/3 - 0s - 44ms/step - accuracy: 1.0000 - loss: 0.0585 - val_accuracy: 0.8824 - val_loss: 0.8397
Epoch 6/20
3/3 - 0s - 39ms/step - accuracy: 1.0000 - loss: 0.0384 - val_accuracy: 0.9412 - val_loss: 0.7941
Epoch 7/20
3/3 - 0s - 39ms/step - accuracy: 1.0000 - loss: