In [5]:
!export KERAS_BACKEND="torch"


In [47]:
import numpy as np
import keras
from keras import layers
import torch
import pandas as pd
import os
import sys
from sklearn.preprocessing import LabelEncoder,StandardScaler
from scipy.spatial.transform import Rotation
module_path = os.path.abspath(os.path.join('../tslearn')) # or the path to your source code
sys.path.insert(0, module_path)
from sklearn.model_selection import train_test_split


# Create a CNN model with conv1D , batch norm and Relu X 3 layers

In [2]:
def make_model(input_shape):
    input_layer = keras.layers.Input(input_shape)

    conv1 = keras.layers.Conv1D(filters=64, kernel_size=3, padding="same")(input_layer)
    conv1 = keras.layers.BatchNormalization()(conv1)
    conv1 = keras.layers.ReLU()(conv1)

    conv2 = keras.layers.Conv1D(filters=64, kernel_size=3, padding="same")(conv1)
    conv2 = keras.layers.BatchNormalization()(conv2)
    conv2 = keras.layers.ReLU()(conv2)

    conv3 = keras.layers.Conv1D(filters=64, kernel_size=3, padding="same")(conv2)
    conv3 = keras.layers.BatchNormalization()(conv3)
    conv3 = keras.layers.ReLU()(conv3)

    gap = keras.layers.GlobalAveragePooling1D()(conv3)

    output_layer = keras.layers.Dense(num_classes, activation="softmax")(gap)

    return keras.models.Model(inputs=input_layer, outputs=output_layer)


model = make_model(input_shape=x_train.shape[1:])
keras.utils.plot_model(model, show_shapes=True)

You must install pydot (`pip install pydot`) for `plot_model` to work.


## Define Routines to calculate Quaternion angle

In [7]:
def quaternion_inverse(q):
  """Calculate the inverse of a quaternion.

  Args:
    q: A quaternion represented as a 4-element numpy array.

  Returns:
    The inverse of the quaternion, represented as a 4-element numpy array.
  """

  q_conj = np.conj(q)
  q_norm = np.linalg.norm(q)
  return q_conj / (q_norm * q_norm)
def quaternion_multiply(q1, q2):
  """Multiply two quaternions.

  Args:
    q1: A quaternion represented as a 4-element numpy array.
    q2: A quaternion represented as a 4-element numpy array.

  Returns:
    The product of the two quaternions, represented as a 4-
    element numpy array.
    
    """
  w1, x1, y1, z1 = q1
  w2, x2, y2, z2 = q2
  w = w1 * w2 - x1 * x2 - y1 * y2 - z1 * z2
  x = w1 * x2 + x1 * w2 + y1 * z2 - z1 * y2
  y = w1 * y2 - x1 * z2 + y1 * w2 + z1 * x2
  z = w1 * z2 + x1 * y2 - y1 * x2 + z1 * w2
  return w, x, y, z
def vec_length(v: np.array):
    return np.sqrt(sum(i**2 for i in v))

def normalize(v):
    norm = np.linalg.norm(v)
    if norm == 0: 
       return v
    return v / norm

def orientation(rs: np.array, ls: np.array):
    axis_z = normalize((rs - ls))
    if vec_length(axis_z) == 0:
        axis_z = np.array((0, -1, 0))
        
    axis_x = np.cross(np.array((0, 0, 1)), axis_z)
    if vec_length(axis_x) == 0:
        axis_x = np.array((1, 0, 0))
    
    axis_y = np.cross(axis_z, axis_x)
    rot_matrix = np.matrix([axis_x, axis_y, axis_z]).transpose()
    #quat=Rotation.from_matrix(rot_matrix).as_quat()
    return Rotation.from_matrix(rot_matrix).as_quat()
def to_quat(df: pd.DataFrame):
    return df.apply(lambda x: orientation(np.array([x[0],x[1],x[2]]),np.array([x[3],x[4],x[5]])),axis=1).to_list()
def get_distance(df:list):
    dfr=[]
    for i in range(0,len(df)-1,2):
        xi = quaternion_inverse(df[i])
        xy = quaternion_multiply(xi,df[i+1])
        dfr.append(2*np.arccos(xy[0])*180/np.pi)
        #dfr.append(np.arccos(xy[0]))
    return dfr

## loop thru keypoint files and create Workouts Pandas dataframe and calculate Quaterneon angles 

In [12]:
workouts= pd.DataFrame()
y=[]
for root, dirs, files in os.walk('./Data/MediaPipe/Train'):
        for name in files:
                if(name.endswith('.csv') == False):
                    continue
                filepath = root + os.sep + name
                worktype = filepath.split(os.sep )[-2]
                filename = filepath.split(os.sep)[-1].replace('.csv','')
                if(os.path.getsize(filepath) > 0):
                    df = pd.read_csv(filepath,index_col=0)
                    df1 = pd.DataFrame()
                    #df = df[::round((df.shape[0]/4))]
                    #df_d=calc_euc_dist(df.diff().dropna().pow(2))
                    #df_a=pd.DataFrame()
                    #for i in range(0,33):
                    #    df1 = pd.DataFrame()
                    #    df1['0'] = df.iloc[:,i,]
                    #    df1['1'] = df.iloc[:,i+33]
                    #    df1['2'] = df.iloc[:,i+66]
                    #    df_a= pd.concat([df_a,calculate_euler_angles_from_df(df1)],axis=1,ignore_index=True)
                    #df_x = pd.concat([df_d.reset_index(drop=True),df_a],axis=1,ignore_index=True)
                    """df1[['shoulder_x','shoulder_y','shoulder_z','shoulder_w']] = to_quat(pd.concat([df['11'],df['11.1'],df['11.2'],df['23'],df['23.1'],df['23.2']],axis=1))
                    df1[['hip_x','hip_y','hip_z','hip_w']] = to_quat(pd.concat([df['23'],df['23.1'],df['23.2'],df['24'],df['24.1'],df['24.2']],axis=1))
                    df1[['lhand1_x','lhand1_y','lhand1_z','lhand1_w']] = to_quat(pd.concat([df['11'],df['11.1'],df['11.2'],df['13'],df['13.1'],df['13.2']],axis=1))
                    df1[['rhand1_x','rhand1_y','rhand1_z','rhand1_w']] = to_quat(pd.concat([df['12'],df['12.1'],df['12.2'],df['14'],df['14.1'],df['14.2']],axis=1))
                    df1[['lhand2_x','lhand2_y','lhand2_z','lhand2_w']] = to_quat(pd.concat([df['13'],df['13.1'],df['13.2'],df['15'],df['15.1'],df['15.2']],axis=1))
                    df1[['rhand2_x','rhand2_y','rhand2_z','rhand2_w']] = to_quat(pd.concat([df['14'],df['14.1'],df['14.2'],df['16'],df['16.1'],df['16.2']],axis=1))
                    df1[['lleg1_x','lleg1_y','lleg1_z','lleg1_w']] = to_quat(pd.concat([df['23'],df['23.1'],df['23.2'],df['25'],df['25.1'],df['25.2']],axis=1))
                    df1[['rleg1_x','rleg1_y','rleg1_z','rleg1_w']] = to_quat(pd.concat([df['24'],df['24.1'],df['24.2'],df['26'],df['26.1'],df['26.2']],axis=1))    
                    df1[['lleg2_x','lleg2_y','lleg2_z','lleg2_w']]= to_quat(pd.concat([df['25'],df['25.1'],df['25.2'],df['27'],df['27.1'],df['27.2']],axis=1))
                    df1[['rleg2_x','rleg2_y','rleg2_z','rleg2_w']] = to_quat(pd.concat([df['26'],df['26.1'],df['26.2'],df['28'],df['28.1'],df['28.2']],axis=1))
                    df1[['filename']]= filename
                    """
                    df1['shoulder'] = get_distance(to_quat(pd.concat([df['11'],df['11.1'],df['11.2'],df['23'],df['23.1'],df['23.2']],axis=1)))
                    df1['hip'] = get_distance(to_quat(pd.concat([df['23'],df['23.1'],df['23.2'],df['24'],df['24.1'],df['24.2']],axis=1)))
                    df1['lhand1'] = get_distance(to_quat(pd.concat([df['11'],df['11.1'],df['11.2'],df['13'],df['13.1'],df['13.2']],axis=1)))
                    df1['rhand1'] = get_distance(to_quat(pd.concat([df['12'],df['12.1'],df['12.2'],df['14'],df['14.1'],df['14.2']],axis=1)))
                    df1['lhand2'] = get_distance(to_quat(pd.concat([df['13'],df['13.1'],df['13.2'],df['15'],df['15.1'],df['15.2']],axis=1)))
                    df1['rhand2'] = get_distance(to_quat(pd.concat([df['14'],df['14.1'],df['14.2'],df['16'],df['16.1'],df['16.2']],axis=1)))
                    df1['lleg1'] = get_distance(to_quat(pd.concat([df['23'],df['23.1'],df['23.2'],df['25'],df['25.1'],df['25.2']],axis=1)))
                    df1['rleg1'] = get_distance(to_quat(pd.concat([df['24'],df['24.1'],df['24.2'],df['26'],df['26.1'],df['26.2']],axis=1)))
                    df1['lleg2'] = get_distance(to_quat(pd.concat([df['25'],df['25.1'],df['25.2'],df['27'],df['27.1'],df['27.2']],axis=1)))
                    df1['rleg2'] = get_distance(to_quat(pd.concat([df['26'],df['26.1'],df['26.2'],df['28'],df['28.1'],df['28.2']],axis=1)))
                    #df1['weight'] = df1.diff().fillna(1).pow(2).apply([lambda x: 1 if x >0 else 0 ]).sum(axis=1)/df.shape[1]
                    df1['filename'] = filename
                    
                    #df2=pd.concat([df['11'],df['11.1'],df['11.2'],df['23'],df['23.1'],df['23.2']],axis=1)
                    #df_q = pd.DataFrame(df2.apply(lambda x: orientation(np.array([x[0],x[1],x[2]]),np.array([x[3],x[4],x[5]])),axis=1).to_list(),columns=['q1','q2','q3','q4'])
                    #if(len(df) < 250):
                    #df = pd.concat([df,df_q],axis=1)
                    workouts = pd.concat([workouts,df1],axis=0)
                    y.append(worktype)


#Scale the data using standard Scaler

In [15]:
for x in workouts.select_dtypes(include=np.number).columns:
    workouts[x] = StandardScaler().fit_transform(workouts[x].values.reshape(-1,1))


In [44]:
def reshape(dataset):
    n_ts = len(dataset)
    max_sz = max(
        [ts.shape[0] for ts in dataset]
    )
    d = dataset[0].shape[1]
    dataset_out = np.zeros((n_ts, max_sz, d))
    for i in range(n_ts):
        ts = dataset[i]
        dataset_out[i, : ts.shape[0]] = ts
    return dataset_out

## Convert the Dataframe to Timeseries Dataset using reshape method above. This will create numpy array of shape no of Batches, number of time steps, number of bones

In [54]:
X=workouts.set_index('filename').groupby('filename').apply(pd.DataFrame.to_numpy).to_numpy()
encoder = LabelEncoder()
y = encoder.fit_transform(y)
#formatted_time_series = np.nan_to_num(formatted_time_series,0)
formatted_time_series = reshape(X)
formatted_time_series.shape
X_train, X_test, y_train, y_test = train_test_split(formatted_time_series, y, test_size=0.3, random_state=42, stratify=y)

# Train The model with loss function as sparse Categorical crossentropy and metrics as sparse_categorical_accuracy . Batch size 32 and 100 epochs

In [61]:
epochs = 100
batch_size = 32
num_classes = len(np.unique(y_train))
model = make_model(input_shape=X_train.shape[1:])
callbacks = [
    keras.callbacks.ModelCheckpoint(
        "best_model.keras", save_best_only=True, monitor="val_loss"
    ),
    keras.callbacks.ReduceLROnPlateau(
        monitor="val_loss", factor=0.5, patience=20, min_lr=0.0001
    ),
    keras.callbacks.EarlyStopping(monitor="val_loss", patience=50, verbose=1),
]
model.compile(
    optimizer="adam",
    loss="sparse_categorical_crossentropy",
    metrics=["sparse_categorical_accuracy"],
)
history = model.fit(
    X_train,
    y_train,
    batch_size=batch_size,
    epochs=epochs,
    callbacks=callbacks,
    validation_split=0.2,
    verbose=1,
)

Epoch 1/100
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 347ms/step - loss: 1.6591 - sparse_categorical_accuracy: 0.1521 - val_loss: 1.5454 - val_sparse_categorical_accuracy: 0.5625 - learning_rate: 0.0010
Epoch 2/100
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 168ms/step - loss: 1.3939 - sparse_categorical_accuracy: 0.3049 - val_loss: 1.5011 - val_sparse_categorical_accuracy: 0.6250 - learning_rate: 0.0010
Epoch 3/100
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 123ms/step - loss: 1.2145 - sparse_categorical_accuracy: 0.5437 - val_loss: 1.4569 - val_sparse_categorical_accuracy: 0.6250 - learning_rate: 0.0010
Epoch 4/100
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 131ms/step - loss: 1.0533 - sparse_categorical_accuracy: 0.6618 - val_loss: 1.4188 - val_sparse_categorical_accuracy: 0.6875 - learning_rate: 0.0010
Epoch 5/100
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 124ms/step - loss: 0.9872 - sparse

In [62]:
model = keras.models.load_model("best_model.keras")

test_loss, test_acc = model.evaluate(X_test, y_test)

print("Test accuracy", test_acc)
print("Test loss", test_loss)

[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - loss: 0.5825 - sparse_categorical_accuracy: 0.9186  
Test accuracy 0.9090909361839294
Test loss 0.6243724822998047
