In [None]:
import numpy as np
from pathlib import Path
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.preprocessing.sequence import pad_sequences
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.optimizers import Adam
from sklearn.utils.class_weight import compute_class_weight
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.layers import Dense, LayerNormalization, LSTM, Dropout, Input, GlobalAveragePooling1D, BatchNormalization, Masking
from tensorflow.keras.models import Model
from tensorflow.keras.regularizers import l2
from tensorflow.keras.optimizers import legacy as legacy_optimizers
from tensorflow.keras.callbacks import ReduceLROnPlateau
from sklearn.metrics import classification_report
from scikeras.wrappers import KerasClassifier
from sklearn.model_selection import GridSearchCV
import matplotlib.pyplot as plt
import os
import optuna
from sklearn.metrics import f1_score
from sklearn.metrics import roc_curve, auc, precision_recall_curve, average_precision_score, classification_report, f1_score

In [None]:
from tensorflow.keras.mixed_precision import set_global_policy
set_global_policy('mixed_float16')  # May improve speed on some CPUs


In [None]:
# Functions for reading OpenCap outputs 

def read_trc(fpath):
    # read metadata in file header
    df_meta = pd.read_csv(fpath, delimiter='\t', header=0, skiprows=1, nrows=1)
    meta = df_meta.iloc[0].to_dict()
    fps = meta['DataRate']

    # read marker location names
    markers_df = pd.read_csv(fpath, delimiter='\t', header=None, skiprows=3, nrows=1)
    markers = markers_df.iloc[0].dropna().to_numpy()[2:]

    # read marker XYZ locations
    df = pd.read_csv(fpath, delimiter='\t', header=0, skiprows=4)
    df.rename(columns=dict(zip(df.columns[:2], ('n', 't'))), inplace=True)
    df.dropna(how='all', axis=1, inplace=True)

    N = df.shape[0]
    M = len(markers)
    xyz = df.iloc[:,2:].to_numpy().reshape((N, M, 3))
    xyz[:,:,[0,1,2]] = xyz[:,:,[2,1,0]]

    return fps, markers, xyz


def read_mot(fpath):
    with open(fpath, 'r') as f:
        # count = 0
        line = f.readline().strip()
        # while line and line.strip() != 'endheader':
        while line.lower() != 'endheader':
            line = f.readline().strip()
            # count += 1

        # df = pd.read_csv(f, delimiter='\t', header=0, skiprows=count-3)
        df = pd.read_csv(f, delimiter='\t', header=0)

    return df

In [None]:
def z_score_normalize(data_list):
    """
    Applies z-score normalization to a list of time series arrays.
    Arguments:
        data_list: List of numpy arrays, each of shape (num_timesteps, num_features).
    Returns:
        normalized_data: List of z-score normalized arrays.
        mean: Mean of each feature across all time series.
        std: Standard deviation of each feature across all time series.
    """
    # Concatenate all data to compute global mean and std
    all_data = np.concatenate(data_list, axis=0)
    mean = np.mean(all_data, axis=0)
    std = np.std(all_data, axis=0)
    
    # Normalize each array
    normalized_data = [(data - mean) / (std + 1e-8) for data in data_list]
    return normalized_data, mean, std

def prepare_timeseries_input(subject_dict):
    """
    Combines multiple time series for each subject along the time axis, applies z-score normalization,
    and pads them for transformer input.
    Arguments:
        subject_dict: Dictionary where keys are subject IDs and values are lists of 6 numpy arrays.
        Each numpy array corresponds to a time series with shape (num_timesteps, num_features).
    Returns:
        padded_input: A numpy array of shape (num_subjects, max_timesteps, num_features).
        subject_ids: List of subject IDs in the order they appear in the padded_input array.
        mean: Mean of features used for z-score normalization.
        std: Standard deviation of features used for z-score normalization.
    """
    # Sort the dictionary by keys to ensure consistent ordering
    subject_ids = sorted(subject_dict.keys())
    
    # Extract and concatenate the 6 time series for each subject along the time axis
    concatenated_series = [
        np.concatenate(subject_dict[subject_id], axis=0) for subject_id in subject_ids
    ]
    
    # Apply z-score normalization to the concatenated series
    normalized_series, mean, std = z_score_normalize(concatenated_series)
    
    # Determine the maximum length for padding
    max_length = max(series.shape[0] for series in normalized_series)
    
    # Pad all normalized time series to the same length
    padded_input = pad_sequences(normalized_series, maxlen=max_length, padding="post", dtype="float32")
    
    return padded_input, subject_ids, mean, std

In [None]:
def get_label_for_participant_trial(class_df, pid, trial):
    """
    Fetches the label for a given participant and trial based on class_info.csv.
    
    Parameters:
        pid (str): The participant ID
        trial (str): The trial ID
    
    Returns:
        label (int): Classification label (0 or 1).
    """
    label = class_df.loc[class_df['ID'] == pid, 'Class'].values[0]
    return label

In [None]:
# Prepare the input data

# Directory containing dataset files
root_dir = '/Users/scovitz/datadir/opencap_data'  # path to root data dir 
labels_csv = '/Users/scovitz/datadir/class_info.csv' # csv with classification labels
class_df = pd.read_csv(labels_csv)

time_series_data = {}  # dictionary of pid, timeseries pairs 

expected_columns = [
        'pelvis_tilt', 'pelvis_list', 'pelvis_rotation', 'pelvis_tx',
        'pelvis_ty', 'pelvis_tz', 'hip_flexion_r', 'hip_adduction_r',
        'hip_rotation_r', 'knee_angle_r', 'ankle_angle_r',
        'subtalar_angle_r', 'mtp_angle_r', 'hip_flexion_l', 'hip_adduction_l',
        'hip_rotation_l', 'knee_angle_l', 'ankle_angle_l',
        'subtalar_angle_l', 'mtp_angle_l', 'lumbar_extension', 'lumbar_bending',
        'lumbar_rotation', 'arm_flex_r', 'arm_add_r', 'arm_rot_r',
        'elbow_flex_r', 'pro_sup_r', 'arm_flex_l', 'arm_add_l', 'arm_rot_l',
        'elbow_flex_l', 'pro_sup_l'
    ]

# Loop through all .mot files
for i in range(129):
        # ensure going in order!
        num = str(i+1).zfill(3)

        for path in Path(root_dir).rglob(f"P{num}*/**/*.mot"): 
            
            # remove upper extremity trials 
            if "brooke" in str(path) or "curls" in str(path) or "arm_rom" in str(path):
                continue 
            
            trial = str(path).split('/')[-3]
            pid = str(path).split('/')[-4]
            activity = str(path).split('/')[-2]

            # Load the .mot file, get data, and cut first 200 timesteps (before activity starts)
            data = read_mot(str(path))
            data = data[200:]


            # Ensure the file has the expected columns
            if not all(col in data.columns for col in expected_columns + ['time']):
                raise ValueError(f"File {path} does not contain the expected columns.")
            
            # Drop the 'time' column and keep only relevant features
            data = np.array(data[expected_columns])
            
            if pid not in time_series_data:
                time_series_data[pid + '_' + trial] = []

            time_series_data[(pid + '_' + trial)].append(data) # value = list of timeseries arrays of num_timesteps x num_feats
            print(data.shape, pid, trial, str(path))




In [None]:
len(time_series_data)
time_series_data

In [None]:
# Prepare the transformer input
timeseries_input, subject_ids, mean, std = prepare_timeseries_input(time_series_data)

# Check the results
print("Timeseries Input Shape:", timeseries_input.shape)  # (162, max_timesteps, 35)
print("Subject IDs:", subject_ids[:5])  # subject ID ordering, should be in order! 
print("Mean Shape:", mean.shape)  # shape of mean: (35,)
print("Std Shape:", std.shape)  # shape of std: (35,)




In [None]:
# Convert labels to a numpy array
labels = []
for sid in subject_ids:
    pid = sid.split('_')[0]
    trial = sid.split('_')[1]
    label = get_label_for_participant_trial(class_df, pid, trial)
    labels.append(label)

In [None]:
# convert to numpy array
labels = np.array(labels)

In [None]:
# Split data into train, validation, and test sets - 80, 10, 10
def split_data(X, y, test_size=0.1, val_size=0.1):
    """
    Splits the data into training, validation, and testing sets.
    Arguments:
        X: Input features (numpy array).
        y: Labels (numpy array).
        test_size: Proportion of data for the test set.
        val_size: Proportion of data for the validation set from the training set.
    Returns:
        X_train, X_val, X_test, y_train, y_val, y_test
    """
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=42)
    X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=val_size, random_state=42)
    return X_train, X_val, X_test, y_train, y_val, y_test


# Split data
X_train, X_val, X_test, y_train, y_val, y_test = split_data(timeseries_input, labels)


In [None]:
# Transformer Architecture

class MultiHeadSelfAttention(tf.keras.layers.Layer):
    def __init__(self, embed_dim, num_heads, regularizer=None):
        super(MultiHeadSelfAttention, self).__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.projection_dim = embed_dim // num_heads
        self.query_dense = Dense(embed_dim, kernel_regularizer=regularizer)
        self.key_dense = Dense(embed_dim, kernel_regularizer=regularizer)
        self.value_dense = Dense(embed_dim, kernel_regularizer=regularizer)
        self.combine_heads = Dense(embed_dim, kernel_regularizer=regularizer)

    def attention(self, query, key, value):
        score = tf.matmul(query, key, transpose_b=True) / tf.sqrt(tf.cast(self.projection_dim, query.dtype))
        weights = tf.nn.softmax(score, axis=-1)
        return tf.matmul(weights, value), weights

    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.projection_dim))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, inputs):
        inputs = tf.cast(inputs, dtype=tf.float32)

        batch_size = tf.shape(inputs)[0]
        query = self.query_dense(inputs)
        key = self.key_dense(inputs)
        value = self.value_dense(inputs)

        query = self.split_heads(query, batch_size)
        key = self.split_heads(key, batch_size)
        value = self.split_heads(value, batch_size)

        attention, _ = self.attention(query, key, value)
        attention = tf.transpose(attention, perm=[0, 2, 1, 3])
        concat_attention = tf.reshape(attention, (batch_size, -1, self.embed_dim))
        return self.combine_heads(concat_attention)

class TransformerBlock(tf.keras.layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1, regularizer=None):
        super(TransformerBlock, self).__init__()
        self.att = MultiHeadSelfAttention(embed_dim, num_heads, regularizer)
        self.ffn = tf.keras.Sequential([
            Dense(ff_dim, activation='relu', kernel_regularizer=regularizer),
            Dense(embed_dim, kernel_regularizer=regularizer)
        ])
        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)
        self.dropout1 = Dropout(rate)
        self.dropout2 = Dropout(rate)

    def call(self, inputs, training):
        attn_output = self.att(inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

# Build Transformer Model 
def build_transformer_model(input_shape, num_heads=4, embed_dim=64, ff_dim=128, num_blocks=2, l2_reg=1e-3):
    inputs = Input(shape=input_shape)
    
    # Embedding layer with regularization
    x = Dense(embed_dim, kernel_regularizer=l2(l2_reg))(inputs)
    
    # Add Transformer blocks with regularization
    for _ in range(num_blocks):
        x = TransformerBlock(embed_dim, num_heads, ff_dim, rate=0.3, regularizer=l2(l2_reg))(x, training=True)

    # Global pooling and output layer
    x = GlobalAveragePooling1D()(x)
    x = Dropout(0.5)(x) # INCREASED FROM 0.3
    outputs = Dense(1, activation="sigmoid", kernel_regularizer=l2(l2_reg))(x)

    model = Model(inputs=inputs, outputs=outputs)
    # learning rate decreaesd from 0.001
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
                  loss='binary_crossentropy',
                  metrics=['accuracy'])
    return model

In [None]:
# Build the Transformer model
input_shape = (timeseries_input.shape[1], timeseries_input.shape[2])  # timesteps, features
transformer_model = build_transformer_model(input_shape)

# Print a summary of the model
transformer_model.summary()

# Early stopping
early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=50,
    restore_best_weights=True
)


# Compute class weights to balance the dataset
class_weights = compute_class_weight(class_weight='balanced', classes=np.unique(y_train), y=y_train)
class_weight_dict = dict(enumerate(class_weights))

lr_scheduler = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,  # Reduce the learning rate by half
    patience=5,  # Wait for 5 epochs without improvement
    min_lr=1e-6  # Minimum learning rate
)

history = transformer_model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=70,
    batch_size=16,
    callbacks=[early_stopping], # Trying bringing back early stopping or lr_scheduler
    class_weight=class_weight_dict,
    verbose=1
)

train_accuracy = history.history['accuracy']
val_accuracy = history.history['val_accuracy']

# Print final training and validation accuracy
print(f"Final Training Accuracy: {train_accuracy[-1]:.4f}")
print(f"Final Validation Accuracy: {val_accuracy[-1]:.4f}")

# Evaluate the model on test data
test_loss, test_accuracy = transformer_model.evaluate(X_test, y_test, verbose=1)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")

# Get predictions
y_pred = (transformer_model.predict(X_test) > 0.5).astype("int32")  # Threshold at 0.5

# Print classification report
print("\nClassification Report:")
print(classification_report(y_test, y_pred, digits=4))

test_f1 = f1_score(y_test, y_pred, average="weighted")
print("Weighted F1 Score: ", test_f1)

In [None]:
# Plot Training Loss
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Training and Validation Loss Over Epochs - Kinematic Timeseries')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [None]:
# Print final training and validation accuracy
train_accuracy = history.history['accuracy']
val_accuracy = history.history['val_accuracy']
print(f"Final Training Accuracy: {train_accuracy[-1]:.4f}")
print(f"Final Validation Accuracy: {val_accuracy[-1]:.4f}")



# Evaluate the model on test data
test_loss, test_accuracy = transformer_model.evaluate(X_test, y_test, verbose=1)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")

# Get predictions
y_pred_prob = transformer_model.predict(X_test).ravel()  # Predicted probabilities
y_pred = (y_pred_prob > 0.5).astype("int32")  # Threshold at 0.5

# Print classification report
print("\nClassification Report:")
print(classification_report(y_test, y_pred, digits=4))

weighted_recall = classification_report(y_test, (y_pred_prob > 0.5).astype(int), output_dict=True)['weighted avg']['recall']
weighted_precision = classification_report(y_test, (y_pred_prob > 0.5).astype(int), output_dict=True)['weighted avg']['precision']

print('weighted average precision: ', weighted_precision)
print("average recall: ", weighted_recall)


# Compute and print Weighted F1 Score
test_f1 = f1_score(y_test, y_pred, average="weighted")
print("Weighted F1 Score: ", test_f1)

# ROC-AUC and Precision-Recall Curves
fpr, tpr, _ = roc_curve(y_test, y_pred_prob)
roc_auc = auc(fpr, tpr)

precision, recall, _ = precision_recall_curve(y_test, y_pred_prob)
average_precision = average_precision_score(y_test, y_pred_prob)

# Plot ROC Curve
plt.figure(figsize=(10, 5))

plt.subplot(1, 2, 1)
plt.plot(fpr, tpr, label=f'ROC curve (AUC = {roc_auc:.2f})', color='blue')
plt.plot([0, 1], [0, 1], 'k--', label='Chance')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc='lower right')

# Plot Precision-Recall Curve
plt.subplot(1, 2, 2)
plt.plot(recall, precision, label=f'PR curve (AUPRC = {average_precision:.2f})', color='green')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall (PR) Curve')
plt.legend(loc='lower left')

plt.tight_layout()
plt.show()

# Print ROC-AUC and PR-AUC scores
print(f"\nFinal ROC-AUC: {roc_auc:.4f}")
print(f"Final Precision-Recall AUC (AUPRC): {average_precision:.4f}")