In [1]:
data_folder = "quest_training_data/"

In [2]:
#Wanna do a quick check to see if TensorFlow can access the GPU
from tensorflow.python.client import device_lib

def get_available_devices():
    local_device_protos = device_lib.list_local_devices()
    return [x.name for x in local_device_protos]

print(get_available_devices())

['/device:CPU:0']


In [4]:
import pandas as pd
import numpy as np
import os
import glob # Library for finding files that match a pattern


import random
import tensorflow as tf

random.seed(42)
np.random.seed(42)
tf.random.set_seed(42)


# Function to process a single quest data file
# This function loads the data, downsamples it, and calculates velocity and acceleration features
# It returns a DataFrame with the processed data
# If the file is missing required columns, it returns None
def process_quest_file(file_path):
    """
    Loads a single data file, downsamples it, and calculates
    velocity and acceleration features.
    """
    # low_memory=False helps prevent data type warnings
    df = pd.read_csv(file_path, low_memory=False)
    
    # Downsample by a factor of 2 to speed up processing... maybe not optimal, but quick
    # This assumes the data is evenly spaced; if not, this could introduce bias
    # Adjust the step size as needed; here we take every 2nd row
    df = df.iloc[::2, :].copy()

    feature_cols = [
        'TimeStamp',
        'Meta_R_Index_Distal_GLOBAL_X',
        'Meta_R_Index_Distal_GLOBAL_Y',
        'Meta_R_Index_Distal_GLOBAL_Z'
    ]
    label_col = 'KeyPressFlag'
    
    # It's safer to check if columns exist before using them
    required_cols = feature_cols + [label_col]
    if not all(col in df.columns for col in required_cols):
        print(f"  -> Skipping {os.path.basename(file_path)}: missing required columns.")
        return None # Return nothing if a file is missing columns
        
    processed_df = df[required_cols].copy()
    
    delta_time = processed_df['TimeStamp'].diff()
    
    # Calculate Velocity
    processed_df['vel_x'] = processed_df['Meta_R_Index_Distal_GLOBAL_X'].diff() / delta_time
    processed_df['vel_y'] = processed_df['Meta_R_Index_Distal_GLOBAL_Y'].diff() / delta_time
    processed_df['vel_z'] = processed_df['Meta_R_Index_Distal_GLOBAL_Z'].diff() / delta_time
    
    # Calculate Acceleration
    processed_df['accel_x'] = processed_df['vel_x'].diff() / delta_time
    processed_df['accel_y'] = processed_df['vel_y'].diff() / delta_time
    processed_df['accel_z'] = processed_df['vel_z'].diff() / delta_time
    
    processed_df.dropna(inplace=True)
    
    return processed_df




# Use glob to find all .csv files recursively
search_pattern = os.path.join(data_folder, '**', '*.csv')


all_files = glob.glob(search_pattern, recursive=True)

all_files = [f for f in all_files if "_0deg_" in os.path.basename(f)]


# A list to hold the processed data from each file
list_of_dfs = []

print(f"Found {len(all_files)} files to process...")

current_subfolder = None

for file in all_files:
    # We'll just print the filename, not the full path, to keep the log clean
    subfolder_name = os.path.basename(os.path.dirname(file))

    if subfolder_name != current_subfolder:
        current_subfolder = subfolder_name
        print(f"\n--- Processing subfolder: {current_subfolder}---")
    #print(f"Processing {os.path.basename(file)}...")
    try:
        processed_df = process_quest_file(file)
        if processed_df is not None:
            list_of_dfs.append(processed_df)
    except Exception as e:
        print(f"  -> ERROR processing {os.path.basename(file)}. Error: {e}")

# Combine all the processed data into one master DataFrame
if list_of_dfs:
    master_df = pd.concat(list_of_dfs, ignore_index=True)

    print("\n--- Processing Complete ---")
    print("Shape of the final master DataFrame:", master_df.shape)
    
    print("\nClass Distribution ('1' is a Tap):")
    # We check if 'KeyPressFlag' exists before trying to access it
    if 'KeyPressFlag' in master_df.columns:
        print(master_df['KeyPressFlag'].value_counts(normalize=True))
    else:
        print("Column 'KeyPressFlag' not found in the final DataFrame.")
else:
    print("\nNo files were processed. Please check your data_folder path and file contents.")

Found 10 files to process...

--- Processing subfolder: flap---

--- Processing subfolder: while---

--- Processing Complete ---
Shape of the final master DataFrame: (424, 11)

Class Distribution ('1' is a Tap):
KeyPressFlag
0    0.568396
1    0.431604
Name: proportion, dtype: float64


In [7]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix

# --- Step 1: Prepare Data for Windowing ---
# --- MODIFIED: We now include Y-axis features alongside Z-axis ---
feature_columns = [
    'Meta_R_Index_Distal_GLOBAL_Y', # Position Y
    'vel_y',                        # Velocity Y
    'accel_y',                      # Acceleration Y
    'Meta_R_Index_Distal_GLOBAL_Z', # Position Z
    'vel_z',                        # Velocity Z
    'accel_z'                       # Acceleration Z
]

# The rest of the script adapts automatically to the new feature count
timeseries_data = master_df[['KeyPressFlag'] + feature_columns].to_numpy()

window_size = 100

# --- Step 2: Create Time-Series Windows ---
def make_timeseries_instances(time_series, window_size):
    """Chops the data into overlapping windows."""
    X = []
    y = []
    for i in range(window_size, time_series.shape[0]):
        X.append(time_series[i-window_size:i, 1:])
        y.append(time_series[i, 0])
    return np.array(X), np.array(y).astype(int)

print("Creating time-series windows...")
X_windowed, y_windowed = make_timeseries_instances(timeseries_data, window_size)
print("Shape of X_windowed (samples, timesteps, features):", X_windowed.shape)
print("Shape of y_windowed:", y_windowed.shape)

# --- Step 3: Split and Scale the Data ---
X_train, X_test, y_train, y_test = train_test_split(
    X_windowed, y_windowed, test_size=0.2, random_state=42, stratify=y_windowed
)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train.reshape(-1, X_train.shape[-1])).reshape(X_train.shape)
X_test_scaled = scaler.transform(X_test.reshape(-1, X_test.shape[-1])).reshape(X_test.shape)

print("\nData successfully prepared with Y and Z-axis features.")

Creating time-series windows...
Shape of X_windowed (samples, timesteps, features): (324, 100, 6)
Shape of y_windowed: (324,)

Data successfully prepared with Y and Z-axis features.


In [8]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping

# --- Step 4: Build and Train the LSTM Model ---

print("\nBuilding the LSTM model...")
model = Sequential([
    # The LSTM layer processes the sequence. input_shape is (window_size, num_features)
    LSTM(64, input_shape=(X_train_scaled.shape[1], X_train_scaled.shape[2]), unroll=True, name="lstm"),
    Dropout(0.5), # Dropout helps prevent overfitting
    # The final Dense layer gives a single output (tap or no-tap)
    Dense(1, activation='sigmoid') 
])

# Compile the model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
model.summary()

# To handle the class imbalance, calculate class weights
# This penalizes the model more for missing the rare 'tap' events
from sklearn.utils import class_weight
weights = class_weight.compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)
class_weights = {i : weights[i] for i in range(len(weights))}

print("\nTraining the LSTM model... (This may take a long time)")
# EarlyStopping will stop training if the model isn't improving
early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

history = model.fit(
    X_train_scaled,
    y_train,
    epochs=10,
    batch_size=256,
    validation_split=0.2, # Use part of the training data for validation - perhaps 20% is a good start
    class_weight=class_weights,
    callbacks=[early_stopping]
)


# --- Step 5: Evaluate the Final Model ---

print("\nEvaluating the final model on the test set...")
# We predict probabilities and use a threshold of 0.5 to get 0s and 1s
y_pred_probs = model.predict(X_test_scaled)
y_pred = (y_pred_probs > 0.5).astype(int)

print("\nFinal LSTM Model Confusion Matrix:")
print(confusion_matrix(y_test, y_pred))


Building the LSTM model...


  super().__init__(**kwargs)



Training the LSTM model... (This may take a long time)
Epoch 1/10
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m74s[0m 74s/step - accuracy: 0.4493 - loss: 0.7101 - val_accuracy: 0.5577 - val_loss: 0.6934
Epoch 2/10
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 337ms/step - accuracy: 0.5072 - loss: 0.6980 - val_accuracy: 0.6731 - val_loss: 0.6889
Epoch 3/10
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 311ms/step - accuracy: 0.4928 - loss: 0.6946 - val_accuracy: 0.5192 - val_loss: 0.6847
Epoch 4/10
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 354ms/step - accuracy: 0.5652 - loss: 0.6826 - val_accuracy: 0.5385 - val_loss: 0.6810
Epoch 5/10
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 376ms/step - accuracy: 0.5845 - loss: 0.6712 - val_accuracy: 0.5385 - val_loss: 0.6776
Epoch 6/10
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 355ms/step - accuracy: 0.5990 - loss: 0.6660 - val_accuracy: 0.5192 - va

In [9]:
model.save('ltsm_tap_simple.keras')  # Save the model for later use

In [10]:

# Load trained model
model = tf.keras.models.load_model('ltsm_tap_simple.keras')

# Export it to a new directory
model.export('ltsm_tap_simple_exported')

INFO:tensorflow:Assets written to: ltsm_tap_simple_exported\assets


INFO:tensorflow:Assets written to: ltsm_tap_simple_exported\assets


Saved artifact at 'ltsm_tap_simple_exported'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): TensorSpec(shape=(None, 100, 6), dtype=tf.float32, name='input_layer')
Output Type:
  TensorSpec(shape=(None, 1), dtype=tf.float32, name=None)
Captures:
  2473036113744: TensorSpec(shape=(), dtype=tf.resource, name=None)
  2473176725840: TensorSpec(shape=(), dtype=tf.resource, name=None)
  2473176724880: TensorSpec(shape=(), dtype=tf.resource, name=None)
  2473176726416: TensorSpec(shape=(), dtype=tf.resource, name=None)
  2473176727376: TensorSpec(shape=(), dtype=tf.resource, name=None)


In [None]:
# Probably a better way to do this, but this is a quick fix -- unity isn't happy unless i unroll the LSTM layer..
#redunant now, but was needed for my first model

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout

input_shape = model.input_shape[1:]  
print("Recovered input shape:", input_shape)

new_model = Sequential([
    LSTM(64, input_shape=input_shape, unroll=True, name="lstm"),
    Dropout(0.5, name="dropout"),
    Dense(1, activation='sigmoid', name="dense")
])

new_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

Recovered input shape: (100, 3)


  super().__init__(**kwargs)


In [None]:
# I assume the new model has the same architecture as the old one
# Transfer weights from the old model to the new model
for old_layer, new_layer in zip(model.layers, new_model.layers):
    try:
        new_layer.set_weights(old_layer.get_weights())
        print(f"Transferred weights for {old_layer.name}")
    except ValueError:
        print(f"Skipped {old_layer.name} (shape mismatch)")

Transferred weights for lstm
Transferred weights for dropout_1
Transferred weights for dense_1


In [None]:
export_path = "/data/transient/ahmedszz/Documents/vr_text_entry_models/vr_text_entry/typing_classifier/ltsm_tap_detector_unrolled_25.keras"
new_model.save(export_path)


# Load trained model
model = tf.keras.models.load_model('ltsm_tap_detector_unrolled_25.keras')

# Export it to a new directory
model.export('ltsm_tap_detector_unrolled_25_exported')

  saveable.load_own_variables(weights_store.get(inner_path))


INFO:tensorflow:Assets written to: ltsm_tap_detector_unrolled_25_exported/assets


INFO:tensorflow:Assets written to: ltsm_tap_detector_unrolled_25_exported/assets


Saved artifact at 'ltsm_tap_detector_unrolled_25_exported'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): TensorSpec(shape=(None, 100, 3), dtype=tf.float32, name='input_layer_2')
Output Type:
  TensorSpec(shape=(None, 1), dtype=tf.float32, name=None)
Captures:
  140012378406160: TensorSpec(shape=(), dtype=tf.resource, name=None)
  140005534535504: TensorSpec(shape=(), dtype=tf.resource, name=None)
  140005534541456: TensorSpec(shape=(), dtype=tf.resource, name=None)
  140005534543760: TensorSpec(shape=(), dtype=tf.resource, name=None)
  140005534543568: TensorSpec(shape=(), dtype=tf.resource, name=None)


In [None]:
# Quick sanity check to see if the model is valid cortesy of chatgpt

import onnx
import onnxruntime as ort
import numpy as np

# 1. Load the ONNX model
onnx_model_path = "/data/transient/ahmedszz/Documents/vr_text_entry_models/vr_text_entry/typing_classifier/ltsm_tap_detector.onnx"
model = onnx.load(onnx_model_path)

# 2. Check model structure
onnx.checker.check_model(model)
print("✅ Model is structurally valid ONNX")

# 3. Create an ONNX Runtime session
session = ort.InferenceSession(onnx_model_path)

# Print model I/O info
print("Inputs:", [(i.name, i.shape, i.type) for i in session.get_inputs()])
print("Outputs:", [(o.name, o.shape, o.type) for o in session.get_outputs()])

# 4. Run a dummy inference
# Example input shape: (1, 100, 3) -> batch of 1, 100 timesteps, 3 features
dummy_input = np.random.rand(1, 100, 3).astype(np.float32)

# Feed into the session
input_name = session.get_inputs()[0].name
output_name = session.get_outputs()[0].name

result = session.run([output_name], {input_name: dummy_input})
print("Dummy inference output:", result)

Moving from tap detection to letter detection. First producing a visualisation. 

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# Make sure master_df is your fully processed dataframe from the previous step

# 1. Create a new dataframe containing ONLY the rows where a real tap occurred
tap_events_df = master_df[master_df['KeyPressFlag'] == 1].copy()

print(f"Found {len(tap_events_df)} tap events to visualize.")

# 2. Create a 2D scatter plot of the tap locations
plt.figure(figsize=(12, 6))
sns.scatterplot(
    data=tap_events_df,
    x='Meta_R_Index_Distal_GLOBAL_X',
    y='Meta_R_Index_Distal_GLOBAL_Z',
    hue='Pressed_Letter', # Color each point by the letter that was pressed
    palette='viridis',
    legend=False # Turning off legend for clarity as there will be many letters
)

plt.title('2D Visualization of All Detected Tap Locations')
plt.xlabel('X Coordinate')
plt.ylabel('Z Coordinate')
plt.grid(True)
plt.axis('equal') # Ensure the scaling of X and Z axes is the same
plt.show()