In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os
import librosa
import librosa.display
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import (
    accuracy_score,
    confusion_matrix,
    classification_report,
    ConfusionMatrixDisplay,
)
from sklearn.pipeline import make_pipeline
import warnings

# Suppress warnings
warnings.filterwarnings("ignore")

In [None]:
dataset_path = "./dataset"

In [None]:
# Configuration
MOVES = [
    "center",
    "left",
    "right",
    "up",
    "down",
    "downleft",
    "downright",
    "upleft",
    "upright",
    "updown",
    "leftright",
]
SAMPLE_RATE = 16000  # Standard sample rate for audio
WINDOW_SIZES = [1, 5, "full"]  # in seconds - 3 required window sizes

In [None]:
# Sample function to resample the input audio to 16kHz
def load_audio_16k(path):
    audio, sr = librosa.load(path, SAMPLE_RATE=16000)  # Resample to 16 kHz
    return audio, sr

##Feature Extraction

notes: wave 1 and wave 2 cross correlation

In [None]:
from features import (
    extract_all_features_with_xcorr,
)
from stomp_detector import StompDetector
from file_stream import FileStream

##Data Processing

In [None]:
def list_audio_files(base_path):
    """List all valid audio files in dataset directory."""
    audio_files = []
    if os.path.exists(base_path):
        for root, dirs, files in os.walk(base_path):
            for file in files:
                if file.endswith(".wav"):
                    # Extract move from filename (last part before .wav)
                    basename = os.path.splitext(file)[0]
                    # Get the last underscore-separated part as the move
                    parts = basename.split("_")
                    move = parts[-1]
                    if move in MOVES:
                        audio_files.append(os.path.join(root, file))
    return audio_files


def load_and_extract_features_xcorr(audio_files):
    """
    Load audio files, split them into individual stomps, and extract features
    for EACH stomp instead of one feature vector per whole file.
    """
    X, y, metadata = [], [], []

    # Parameters for streaming
    window_ms = 200
    step_ms = 100
    sr = SAMPLE_RATE

    window_frames = int((window_ms / 1000.0) * sr)
    step_frames = int((step_ms / 1000.0) * sr)
    channels = 2  # Assuming stereo

    for file in audio_files:
        try:
            # Use the stomp detector with streaming
            detector = StompDetector(sr=sr, energy_threshold=0.3)
            audio_buffer = np.zeros((window_frames, channels), dtype=np.float32)

            stomps = []

            # Stream the file
            stream_ctx = FileStream(file, step_frames)

            # If the file stream has a different SR, we might need to handle it,
            # but FileStream loads with librosa.load(sr=None), so it keeps original SR.
            # StompDetector expects input to match its sr or it resamples internally?
            # StompDetector.detect resamples to 16000 if needed.
            # But StompDetector init takes sr.

            # Let's check FileStream sr
            if stream_ctx.sr != sr:
                # Re-init detector with file's SR if different
                detector = StompDetector(sr=stream_ctx.sr, energy_threshold=0.2)
                # Recalculate buffer size for the file's SR
                file_window_frames = int((window_ms / 1000.0) * stream_ctx.sr)
                audio_buffer = np.zeros(
                    (file_window_frames, channels), dtype=np.float32
                )

            with stream_ctx as stream:
                while True:
                    if stream.finished:
                        break

                    chunk, overflow = stream.read(step_frames)

                    # Update rolling buffer
                    audio_buffer = np.roll(audio_buffer, -len(chunk), axis=0)
                    audio_buffer[-len(chunk) :] = chunk

                    # Detect on the full window
                    detected_stomps = detector.detect(audio_buffer)
                    stomps.extend(detected_stomps)

            print(f"{file}: {len(stomps)} stomps detected")

            if len(stomps) == 0:
                print(f"Skipping {file}: no stomps found")
                continue

            basename = os.path.basename(file)
            basename_no_ext = os.path.splitext(basename)[0]
            move = basename_no_ext.split("_")[-1]

            move_idx = MOVES.index(move)

            for i, stomp in enumerate(stomps):
                # Skip tiny fragments just in case
                if len(stomp) < SAMPLE_RATE * 0.05:
                    continue
                # print(f"stomp shape: {stomp.shape}")
                features = extract_all_features_with_xcorr(stomp, SAMPLE_RATE)

                X.append(features)
                y.append(move_idx)
                metadata.append(
                    {
                        "file": basename,
                        "move": move,
                        "stomp_idx": i,
                        "n_samples": len(stomp),
                    }
                )

        except Exception as e:
            print(f"Skipping {file} due to error: {e}")
            import traceback

            traceback.print_exc()

    return np.array(X), np.array(y), pd.DataFrame(metadata)

In [None]:
def prepare_data(X, y, metadata, test_size=0.2, random_state=42):
    """
    Prepares data for training and testing: scales features and performs train-test split.
    """

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, random_state=random_state, stratify=y
    )
    return {
        "X_train": X_train,
        "X_test": X_test,
        "y_train": y_train,
        "y_test": y_test,
    }

In [None]:
audio_files = list_audio_files(dataset_path)
X, y, metadata = load_and_extract_features_xcorr(audio_files)
data = prepare_data(X, y, metadata)

In [None]:
# Define the neural network
mlp_clr = MLPClassifier(
    hidden_layer_sizes=(256, 128),  # two hidden layers
    activation="relu",
    solver="adam",
    alpha=1e-4,  # L2 regularization
    batch_size=32,
    learning_rate="adaptive",
    max_iter=300,
    early_stopping=True,  # use a validation split internally
    n_iter_no_change=10,
    random_state=42,
    verbose=True,  # prints training progress
)

mlp = make_pipeline(StandardScaler(), mlp_clr)

# Train the neural network
print("Training MLP neural network...")
mlp.fit(X_train, y_train)

# Predict on the test set
y_pred_nn = mlp.predict(X_test)

# Evaluate performance
nn_accuracy = accuracy_score(y_test, y_pred_nn)
print("\n=== Neural Network (MLP) Performance ===")
print(f"Test Accuracy: {nn_accuracy:.4f}")

print("\n=== Classification Report (Neural Network) ===")
print(classification_report(y_test, y_pred_nn, target_names=MOVES))

# Confusion matrix
cm_nn = confusion_matrix(y_test, y_pred_nn)
print("\n=== Confusion Matrix (Neural Network) ===")
print(cm_nn)

plt.figure(figsize=(10, 8))
sns.heatmap(
    cm_nn, annot=True, fmt="d", cmap="Blues", xticklabels=MOVES, yticklabels=MOVES
)
plt.ylabel("True Label")
plt.xlabel("Predicted Label")
plt.title("Confusion Matrix - Neural Network (MLP)")
plt.tight_layout()
plt.show()


# Optional: function to predict direction for a new feature vector
def predict_direction_nn(feature_vector, model=mlp, moves=MOVES):
    """
    feature_vector: 1D numpy array of the same length as a row in X
    Returns the predicted move label.
    """
    feature_vector = feature_vector.reshape(1, -1)
    pred_idx = model.predict(feature_vector)[0]
    return moves[pred_idx]

## Simple Version

In [None]:
# We only care about these directions
BASIC_MOVES = ["center", "left", "right", "up", "down"]

# Get their indices in the full MOVES list
basic_move_indices = [MOVES.index(m) for m in BASIC_MOVES]

# Keep only samples where the label is one of these 5
train_mask = np.isin(y_train, basic_move_indices)
test_mask = np.isin(y_test, basic_move_indices)

X_train_basic = X_train[train_mask]
y_train_basic = y_train[train_mask]
X_test_basic = X_test[test_mask]
y_test_basic = y_test[test_mask]

print("5-direction training examples:", X_train_basic.shape[0])
print("5-direction test examples:", X_test_basic.shape[0])

label_map = {old_idx: new_idx for new_idx, old_idx in enumerate(basic_move_indices)}
inv_label_map = {v: k for k, v in label_map.items()}  # if you want to go back later

y_train_basic_remap = np.array([label_map[i] for i in y_train_basic])
y_test_basic_remap = np.array([label_map[i] for i in y_test_basic])

print("Unique remapped train labels:", np.unique(y_train_basic_remap))
print("Unique remapped test labels:", np.unique(y_test_basic_remap))

In [None]:
# Define a separate neural network for 5 basic directions
mlp_basic_clr = MLPClassifier(
    hidden_layer_sizes=(256, 128),  # same architecture
    activation="relu",
    solver="adam",
    alpha=1e-4,
    batch_size=1,
    learning_rate="adaptive",
    max_iter=300,
    early_stopping=True,
    n_iter_no_change=10,
    random_state=42,
    verbose=True,
)

mlp_basic = make_pipeline(StandardScaler(), mlp_basic_clr)

print("Training 5-direction MLP neural network...")
mlp_basic.fit(X_train_basic, y_train_basic_remap)

# Predict on the filtered test set
y_pred_basic = mlp_basic.predict(X_test_basic)

# Evaluate performance
nn_basic_accuracy = accuracy_score(y_test_basic_remap, y_pred_basic)
print("\n=== 5-Direction Neural Network (MLP) Performance ===")
print(f"Test Accuracy (5 classes): {nn_basic_accuracy:.4f}")

print("\n=== Classification Report (5 classes) ===")
print(classification_report(y_test_basic_remap, y_pred_basic, target_names=BASIC_MOVES))

# Confusion matrix for the 5 directions
cm_basic = confusion_matrix(y_test_basic_remap, y_pred_basic)

plt.figure(figsize=(6, 5))
disp_basic = ConfusionMatrixDisplay(
    confusion_matrix=cm_basic, display_labels=BASIC_MOVES
)
disp_basic.plot(values_format="d", cmap="Blues")
plt.title("Confusion Matrix - 5-Direction Neural Network (MLP)")
plt.tight_layout()
plt.show()

## Only left or right

In [None]:
# We only care about these directions
SUPER_BASIC_MOVES = ["left", "right"]

# Get their indices in the full MOVES list
basic_move_indices = [MOVES.index(m) for m in SUPER_BASIC_MOVES]

# Keep only samples where the label is one of these 2
train_mask = np.isin(y_train, basic_move_indices)
test_mask = np.isin(y_test, basic_move_indices)

X_train_lr = X_train[train_mask]
y_train_lr = y_train[train_mask]
X_test_lr = X_test[test_mask]
y_test_lr = y_test[test_mask]

print("2-direction training examples:", X_train_lr.shape[0])
print("2-direction test examples:", X_test_lr.shape[0])

label_map = {old_idx: new_idx for new_idx, old_idx in enumerate(basic_move_indices)}
inv_label_map = {v: k for k, v in label_map.items()}  # if you want to go back later

y_train_lr_remap = np.array([label_map[i] for i in y_train_lr])
y_test_lr_remap = np.array([label_map[i] for i in y_test_lr])

print("Unique remapped train labels:", np.unique(y_train_lr_remap))
print("Unique remapped test labels:", np.unique(y_test_lr_remap))

# Define a separate neural network for 5 basic directions
mlp_only_left_right_clr = MLPClassifier(
    hidden_layer_sizes=(256, 128),  # same architecture
    activation="relu",
    solver="adam",
    alpha=1e-4,
    batch_size=1,
    learning_rate="adaptive",
    max_iter=300,
    early_stopping=True,
    n_iter_no_change=10,
    random_state=42,
    verbose=True,
)

mlp_only_left_right = make_pipeline(StandardScaler(), mlp_only_left_right_clr)

print("Training 2-direction MLP neural network...")
mlp_only_left_right.fit(X_train_lr, y_train_lr_remap)

# Predict on the filtered test set
y_pred_lr = mlp_only_left_right.predict(X_test_lr)

# Evaluate performance
nn_lr_accuracy = accuracy_score(y_test_lr_remap, y_pred_lr)
print("\n=== 2-Direction Neural Network (MLP) Performance ===")
print(f"Test Accuracy (2 classes): {nn_lr_accuracy:.4f}")

print("\n=== Classification Report (2 classes) ===")
print(classification_report(y_test_lr_remap, y_pred_lr, target_names=SUPER_BASIC_MOVES))

# Confusion matrix for the 2 directions
cm_lr = confusion_matrix(y_test_lr_remap, y_pred_lr)

plt.figure(figsize=(6, 5))
disp_lr = ConfusionMatrixDisplay(
    confusion_matrix=cm_lr, display_labels=SUPER_BASIC_MOVES
)
disp_lr.plot(values_format="d", cmap="Blues")
plt.title("Confusion Matrix - 2-Direction Neural Network (MLP)")
plt.tight_layout()
plt.show()

In [None]:
from sklearn.preprocessing import StandardScaler

# X_train: full 11-move training features
# X_train_basic: only center/left/right/up/down
# X_train_lr: only left/right samples

# If you ALREADY have scalers, just assign them instead of fitting again:
#   scaler_all = <your existing scaler for mlp>
#   scaler_basic = <your existing scaler for mlp_basic>
#   scaler_lr = <your existing scaler for mlp_only_left_right>

from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType


def save_mlp_with_scaler_to_onnx(model, onnx_path):
    """
    Wraps (scaler â†’ model) in a Pipeline and saves as ONNX.
    """

    # Number of input features
    n_features = model.n_features_in_

    initial_type = [("input", FloatTensorType([None, n_features]))]

    onnx_model = convert_sklearn(model, initial_types=initial_type)

    with open(onnx_path, "wb") as f:
        f.write(onnx_model.SerializeToString())

    print(f"Saved ONNX model to {onnx_path}")


# 1) Full multi-move model
save_mlp_with_scaler_to_onnx(model=mlp, onnx_path="./models/mlp_all_moves.onnx")

# 2) 5-direction model (center/left/right/up/down)
save_mlp_with_scaler_to_onnx(
    model=mlp_basic, onnx_path="./models/mlp_five_directions.onnx"
)

# 3) Left/right-only model
save_mlp_with_scaler_to_onnx(
    model=mlp_only_left_right, onnx_path="./models/mlp_left_right.onnx"
)