# Install and import libraries

In [None]:
%pip install pydot
%pip install tensorflow
%pip install scikit-learn

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pickle

import tensorflow as tf
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout, BatchNormalization
from tensorflow.keras.utils import to_categorical

from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

from pathlib import Path

from constants import (
    DATA_INPUT_PATH,
    MODEL_PATH,
    METADATA_PATH,
)

from constants import CLASSES
num_classes = len(CLASSES)

# Read the files in the data dir

In [None]:
# Read all of the files in the data folder
files_in_folder = Path(DATA_INPUT_PATH).glob("*.csv")

files = [x for x in files_in_folder]
print([file for file in files])

## Convert .csv(s) to dataframes and concatenate

In [None]:
# Read the data from the files
dfs = []
print("Looking in:", DATA_INPUT_PATH)
print("Found CSVs:", list(Path(DATA_INPUT_PATH).glob("*.csv")))

for file in files:
    df = pd.read_csv(str(file))
    dfs.append(df)
        
# Convert the data to a DataFrame
df = pd.concat([x for x in dfs], axis=0)

# after concatenating dfs…
columns = ["gesture_id"] \
        + [f"s{i}" for i in range(1,9)] \
        + ["quat_w","quat_x","quat_y","quat_z"]
df = df[columns]

print(df.head())

# Before removing duplicates
print(f"Shape of dataframe before removing duplicates {df.shape}")
# Remove duplicates
df = df.drop_duplicates()
print(f"Shape of dataframe after removing duplicates {df.shape}")

In [None]:
# --- NEW CELL: Calculate Calibration Stats from 'Rest' Data ---
import numpy as np
from constants import NUM_EMG_SENSORS # Make sure NUM_EMG_SENSORS is defined in constants.py (should be 8)

print("Calculating calibration statistics from 'Rest' data...")

# Select only the 'Rest' data (gesture_id == 0)
rest_df = df[df['gesture_id'] == 0].copy()

# Select only the EMG columns (s1 to s8)
emg_columns = [f"s{i}" for i in range(1, NUM_EMG_SENSORS + 1)]
rest_emg_data = rest_df[emg_columns].values # Get as numpy array

if len(rest_emg_data) > 0:
    # Calculate mean and std dev for each EMG channel (column-wise)
    calibration_mean = np.mean(rest_emg_data, axis=0)
    calibration_std = np.std(rest_emg_data, axis=0)

    # Avoid division by zero: set std dev to 1 if it's 0 or very close to 0
    calibration_std[calibration_std < 1e-6] = 1.0

    print(f"Calculated Calibration Mean (shape {calibration_mean.shape}):\n{np.round(calibration_mean, 2)}")
    print(f"Calculated Calibration StdDev (shape {calibration_std.shape}):\n{np.round(calibration_std, 2)}")
else:
    print("ERROR: No 'Rest' data found to calculate calibration statistics!")
    # Handle this error appropriately - maybe exit or use default values
    # Using default values for now, but this indicates a data problem
    calibration_mean = np.zeros(NUM_EMG_SENSORS)
    calibration_std = np.ones(NUM_EMG_SENSORS)
    print("Using default calibration stats (mean=0, std=1). CHECK YOUR DATA.")

# Ensure these variables are available for the next cell
# (They will be if run in the same kernel session)
# --- END OF NEW CELL ---

In [None]:
# --- MODIFIED CELL 5: Sliding-window Feature Extraction with Z-Score ---
import numpy as np
from constants import WINDOW_SIZE, WINDOW_STEP, NUM_EMG_SENSORS # Added NUM_EMG_SENSORS

print("Starting feature extraction with Z-score normalization...")

# build channel list in correct order (ensure this matches df columns)
channels = [f"s{i}" for i in range(1, NUM_EMG_SENSORS + 1)] + ["quat_w", "quat_x", "quat_y", "quat_z"]
raw = df[channels].values          # shape = [total_samples, 12]
labels = df["gesture_id"].values   # shape = [total_samples,]

X, y = [], []
num_windows = 0 # Counter for debugging

for start in range(0, len(raw) - WINDOW_SIZE + 1, WINDOW_STEP):
    num_windows += 1
    window = raw[start : start + WINDOW_SIZE]     # shape = [WINDOW_SIZE, 12]

    # Separate EMG and IMU parts of the window
    emg_part = window[:, :NUM_EMG_SENSORS]     # shape = [WINDOW_SIZE, 8]
    imu_part = window[:, NUM_EMG_SENSORS:]     # shape = [WINDOW_SIZE, 4]

    # <<< Apply Z-score normalization to EMG part using calculated stats >>>
    # Ensure calibration_mean and calibration_std were calculated in the previous cell
    try:
        normalized_emg_part = (emg_part - calibration_mean) / calibration_std
    except NameError:
         print("ERROR: calibration_mean or calibration_std not defined. Make sure the previous cell was run.")
         # Handle error - maybe break or sys.exit()
         break # Stop processing if calibration stats are missing


    # <<< Calculate RMS on NORMALIZED EMG and ORIGINAL IMU >>>
    rms_emg = np.sqrt(np.mean(normalized_emg_part**2, axis=0))  # Use normalized EMG
    rms_imu = np.sqrt(np.mean(imu_part**2, axis=0))           # Use original IMU

    # Concatenate features
    rms_feat = np.concatenate([rms_emg, rms_imu])
    X.append(rms_feat)

    # Assign the window’s “center” label
    center_idx = start + WINDOW_SIZE // 2
    y.append(labels[center_idx])

X = np.array(X)   # shape = [n_windows, 12]
y = np.array(y)   # shape = [n_windows,]

print(f"Processed {num_windows} windows.")
print("X shape (RMS features):", X.shape)
print("y shape (labels):", y.shape)
# --- END OF MODIFIED CELL 5 ---

## Scale, split, and one-shot encode RMS feature windows

In [None]:
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

# ---- scale features ----
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# ---- one-hot labels ----
y_cat = to_categorical(y, num_classes)

# ---- train / test split ----
# stratify=y ensures each class is proportionally represented
X_train, X_test, y_train, y_test = train_test_split(
    X_scaled, y_cat,
    test_size=0.2,
    random_state=42,
    stratify=y
)

print("X_train:", X_train.shape, "  y_train:", y_train.shape)
print("X_test: ", X_test.shape, "  y_test: ",  y_test.shape)


## Build, compile & train a simple dense classifier

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, BatchNormalization
from tensorflow.keras.optimizers import Adam

model = Sequential([
    Dense(128, activation='relu', input_shape=(X_train.shape[1],)),
    BatchNormalization(),
    Dropout(0.3),
    Dense(64, activation='relu'),
    BatchNormalization(),
    Dropout(0.3),
    Dense(32, activation='relu'),
    Dense(num_classes, activation='softmax')
])

early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',
    patience=3,
    restore_best_weights=True
)

model.compile(
    optimizer=Adam(learning_rate=0.001),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

history = model.fit(
    X_train, y_train,
    validation_split=0.2,
    epochs=100,
    batch_size=64,
    callbacks=[early_stopping]
)

test_loss, test_acc = model.evaluate(X_test, y_test)
print(f"Test accuracy: {test_acc:.3%}")

## Save the model and the scalar + feature list

In [None]:
# --- MODIFIED CELL 8: Save Model and Metadata ---
# make sure the folder exists
(Path(MODEL_PATH).parent).mkdir(parents=True, exist_ok=True)

# Save the trained model (this line is unchanged)
model.save(MODEL_PATH)
print("Saved model to", MODEL_PATH)

# pickle the scaler, feature names, AND calibration stats
with open(METADATA_PATH, "wb") as f:
    # channels list we used for X is:
    feature_names = [f"s{i}" for i in range(1, NUM_EMG_SENSORS + 1)] + ["quat_w", "quat_x", "quat_y", "quat_z"]
    # <<< Save all three items in a tuple >>>
    try:
        pickle.dump((scaler, feature_names, calibration_mean, calibration_std), f)
        print("Saved metadata (scaler, features, calib_mean, calib_std) to", METADATA_PATH)
    except NameError:
        print("ERROR: Could not save metadata. Was calibration_mean/std calculated earlier?")
    except Exception as e:
        print(f"ERROR saving metadata: {e}")


# --- END OF MODIFIED CELL 8 ---

In [None]:
import os
from constants import DATA_INPUT_PATH, MODEL_INPUT_PATH, MODEL_PATH, METADATA_PATH

print("DATA_INPUT_PATH ->", DATA_INPUT_PATH)
print("  contains:", os.listdir(DATA_INPUT_PATH))
print()
print("MODEL_INPUT_PATH ->", MODEL_INPUT_PATH)
print("  contains:", os.listdir(MODEL_INPUT_PATH))
print()
print("MODEL_PATH       ->", MODEL_PATH, "exists?", os.path.exists(MODEL_PATH))
print("METADATA_PATH    ->", METADATA_PATH, "exists?", os.path.exists(METADATA_PATH))


In [None]:
import constants
print(constants.__file__)


In [13]:
import numpy as np
import pandas as pd
from pathlib import Path
from tensorflow.keras.models import load_model
import pickle

from constants import DATA_INPUT_PATH, MODEL_PATH, METADATA_PATH, CLASSES, WINDOW_SIZE

# — 1) load the freshest CSV —
data_dir = Path(DATA_INPUT_PATH)
csv_path = max(data_dir.glob("*.csv"), key=lambda p: p.stat().st_mtime)
df      = pd.read_csv(csv_path)

# — 2) load scaler + feature names —
with open(METADATA_PATH, "rb") as f:
    scaler, feature_names, calibration_mean, calibration_std = pickle.load(f)

# — 3) load your trained model —
model = load_model(MODEL_PATH)

# — 4) for each class, grab the first 100 consecutive rows of that gesture,
#         compute the 12-dim RMS feature vector, and predict —
demo_rows = []
for gid, name in CLASSES.items():
    sub = df[df["gesture_id"] == gid]
    if len(sub) < WINDOW_SIZE:
        print(f"⚠️  not enough samples for '{name}' (need {WINDOW_SIZE}, got {len(sub)})")
        continue
    
    window  = sub[feature_names].values[:WINDOW_SIZE]
    # Separate EMG/IMU parts from the 'window' variable
    emg_part = window[:, :NUM_EMG_SENSORS]
    imu_part = window[:, NUM_EMG_SENSORS:]

    # Apply Z-score normalization to EMG part using LOADED stats
    # (Make sure calibration_mean and calibration_std were loaded correctly earlier in this cell)
    try:
        normalized_emg_part = (emg_part - calibration_mean) / calibration_std
    except NameError:
        print(f"ERROR: calibration_mean/std not available for demo processing (Class {gid}). Skipping.")
        continue # Skip to the next class if stats are missing

    # Calculate RMS on normalized EMG and raw IMU
    rms_emg = np.sqrt(np.mean(normalized_emg_part**2, axis=0))
    rms_imu = np.sqrt(np.mean(imu_part**2, axis=0))

    # Concatenate to get the final feature vector for the demo window
    rms_vec = np.concatenate([rms_emg, rms_imu])

    demo_rows.append((gid, name, rms_vec))

# assemble our demo set
y_true = [gid for gid,_,_ in demo_rows]
X_demo = np.vstack([vec for *_, vec in demo_rows])

# — 5) scale & predict —
X_scaled = scaler.transform(X_demo)
preds     = model.predict(X_scaled, verbose=0)
pred_ids  = preds.argmax(axis=1)

# — 6) report —
print("Demo results:")
for (true_id, true_name, _), pred in zip(demo_rows, pred_ids):
    print(f" • True: {true_id:2d} {true_name:17s} → Predicted: {pred:2d} {CLASSES[pred]}")
    
acc = np.mean(np.array(pred_ids) == np.array(y_true))
print(f"\nDemo accuracy (RMS‐window per class): {acc:.0%}")




Demo results:
 • True:  0 Rest              → Predicted:  0 Rest
 • True:  1 Wrist Flexion     → Predicted:  1 Wrist Flexion
 • True:  2 Wrist Extension   → Predicted:  2 Wrist Extension
 • True:  3 Elbow Flexion     → Predicted:  2 Wrist Extension
 • True:  4 Elbow Extension   → Predicted:  4 Elbow Extension
 • True:  5 Hand Close        → Predicted:  5 Hand Close
 • True:  6 Hand Open         → Predicted:  6 Hand Open
 • True:  7 Forearm Pronation → Predicted:  7 Forearm Pronation
 • True:  8 Forearm Supination → Predicted:  8 Forearm Supination

Demo accuracy (RMS‐window per class): 89%
