In [1]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from scipy.fft import fft

In [2]:
data_df=pd.read_csv('/Users/mm/Desktop/Myolab/prepared_data.csv')

In [3]:
data_df.columns

Index(['exercise', 'rep_count_from_intermediate', 'rep_count_from_start',
       'ref_xy_rotation', 'time', 'euler_X', 'euler_Y', 'euler_Z'],
      dtype='object')

In [4]:
data_df.shape

(1188661, 8)

In [5]:
def process_chunk(chunk):
    for angle in ['euler_X', 'euler_Y', 'euler_Z']:
        chunk[f'{angle}_diff'] = chunk[angle].diff().fillna(0)
        chunk[f'{angle}_roll_mean'] = chunk[angle].rolling(window=5).mean().fillna(method='bfill')
        chunk[f'{angle}_fft'] = np.abs(fft(chunk[angle].to_numpy()))
        chunk[f'{angle}_skew'] = chunk[angle].rolling(window=5).skew().fillna(method='bfill')
        chunk[f'{angle}_kurt'] = chunk[angle].rolling(window=5).kurt().fillna(method='bfill')

    return chunk

chunk_size = 100000 
chunks = [process_chunk(chunk) for chunk in np.array_split(data_df, len(data_df) // chunk_size + 1)]
data_df = pd.concat(chunks, ignore_index=True)

In [6]:
data_df.columns

Index(['exercise', 'rep_count_from_intermediate', 'rep_count_from_start',
       'ref_xy_rotation', 'time', 'euler_X', 'euler_Y', 'euler_Z',
       'euler_X_diff', 'euler_X_roll_mean', 'euler_X_fft', 'euler_X_skew',
       'euler_X_kurt', 'euler_Y_diff', 'euler_Y_roll_mean', 'euler_Y_fft',
       'euler_Y_skew', 'euler_Y_kurt', 'euler_Z_diff', 'euler_Z_roll_mean',
       'euler_Z_fft', 'euler_Z_skew', 'euler_Z_kurt'],
      dtype='object')

In [7]:
numerical_features = data_df[['rep_count_from_intermediate', 'rep_count_from_start',
       'ref_xy_rotation', 'time', 'euler_X', 'euler_Y', 'euler_Z',
       'euler_X_diff', 'euler_X_roll_mean', 'euler_X_fft', 'euler_X_skew',
       'euler_X_kurt', 'euler_Y_diff', 'euler_Y_roll_mean', 'euler_Y_fft',
       'euler_Y_skew', 'euler_Y_kurt', 'euler_Z_diff', 'euler_Z_roll_mean',
       'euler_Z_fft', 'euler_Z_skew', 'euler_Z_kurt']]  
categorical_features = data_df['exercise'] . append("other") # Target variable

# Encode the categorical target variable
encoder = OneHotEncoder(sparse_output=False)
target_encoded = encoder.fit_transform(categorical_features.values.reshape(-1, 1))

In [32]:
data_df['exercise'].unique()

array(['armraise', 'tricepkickback', 'overheadpress', 'bicepcurl',
       'hammercurl', 'burpee', 'vup', 'crunch', 'pushup', 'birddog'],
      dtype=object)

In [8]:
scaler = StandardScaler()
numerical_features_scaled = scaler.fit_transform(numerical_features)

In [9]:
num_samples = numerical_features_scaled.shape[0]
num_time_steps = 1 
num_features = numerical_features_scaled.shape[1]

X_reshaped = numerical_features_scaled.reshape((num_samples, num_time_steps, num_features))

In [10]:
X_train, X_test, y_train, y_test = train_test_split(X_reshaped, target_encoded, test_size=0.2, random_state=42)

In [11]:
model = Sequential([
    LSTM(50, return_sequences=True, input_shape=(num_time_steps, num_features)),
    Dropout(0.2),
    LSTM(50),
    Dropout(0.2),
    Dense(target_encoded.shape[1], activation='softmax') 
])

model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

In [12]:
history = model.fit(X_train, y_train, epochs=30, validation_data=(X_test, y_test), batch_size=32)

Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30


In [13]:
performance = model.evaluate(X_test, y_test)



In [14]:
predictions = model.predict(X_test)



In [15]:
predicted_indices = np.argmax(predictions, axis=1)

predicted_one_hot = np.zeros((predicted_indices.size, predictions.shape[1]))
predicted_one_hot[np.arange(predicted_indices.size), predicted_indices] = 1

predicted_labels = encoder.inverse_transform(predicted_one_hot)

In [16]:
predicted_labels

array([['overheadpress'],
       ['birddog'],
       ['birddog'],
       ...,
       ['birddog'],
       ['burpee'],
       ['overheadpress']], dtype=object)

In [17]:
actual_labels = encoder.inverse_transform(y_test)

In [18]:
actual_labels

array([['overheadpress'],
       ['birddog'],
       ['birddog'],
       ...,
       ['birddog'],
       ['burpee'],
       ['overheadpress']], dtype=object)

In [28]:
type(X_test[1])

numpy.ndarray

In [23]:
X_test[1]

array([[-9.28317111e-01, -9.26705018e-01, -1.33821092e+00,
         1.59151996e-01, -9.80534562e-01,  1.13180968e+00,
        -8.14734264e-01, -1.29092260e-02, -1.01997518e+00,
         2.10817179e+00,  2.12091685e-02, -1.51470508e-03,
        -4.27905404e-02,  1.15207013e+00,  1.26888526e+00,
        -6.85882976e-01, -1.34530801e-01, -2.87846841e-02,
        -8.30967007e-01,  2.60652480e+00, -2.04741732e-01,
        -1.45619779e-03]])

In [25]:
predictions = model.predict(X_test[1:2])



In [26]:
predicted_indices = np.argmax(predictions, axis=1)

predicted_one_hot = np.zeros((predicted_indices.size, predictions.shape[1]))
predicted_one_hot[np.arange(predicted_indices.size), predicted_indices] = 1

predicted_labels = encoder.inverse_transform(predicted_one_hot)

In [27]:
predicted_labels

array([['birddog']], dtype=object)

In [30]:
model.save('LSTM_euler_temporal_dynamics', save_format='tf') 

INFO:tensorflow:Assets written to: LSTM_euler_temporal_dynamics/assets


INFO:tensorflow:Assets written to: LSTM_euler_temporal_dynamics/assets
