# Sitting Posture Classification

In [1]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers

from dvclive import Live
from dvclive.keras import DVCLiveCallback

import polars as pl
from sklearn.model_selection import train_test_split

from sklearn.metrics import classification_report, confusion_matrix, recall_score, precision_score, f1_score, roc_auc_score
import matplotlib.pyplot as plt
import scikitplot as skplt
import seaborn as sns

from data import BodyPart

In [2]:
df = pl.read_csv('data/data.csv')
df.head()

file_name,NOSE_x,NOSE_y,NOSE_score,LEFT_EYE_x,LEFT_EYE_y,LEFT_EYE_score,RIGHT_EYE_x,RIGHT_EYE_y,RIGHT_EYE_score,LEFT_EAR_x,LEFT_EAR_y,LEFT_EAR_score,RIGHT_EAR_x,RIGHT_EAR_y,RIGHT_EAR_score,LEFT_SHOULDER_x,LEFT_SHOULDER_y,LEFT_SHOULDER_score,RIGHT_SHOULDER_x,RIGHT_SHOULDER_y,RIGHT_SHOULDER_score,LEFT_ELBOW_x,LEFT_ELBOW_y,LEFT_ELBOW_score,RIGHT_ELBOW_x,RIGHT_ELBOW_y,RIGHT_ELBOW_score,LEFT_WRIST_x,LEFT_WRIST_y,LEFT_WRIST_score,RIGHT_WRIST_x,RIGHT_WRIST_y,RIGHT_WRIST_score,LEFT_HIP_x,LEFT_HIP_y,LEFT_HIP_score,RIGHT_HIP_x,RIGHT_HIP_y,RIGHT_HIP_score,class_no,class_name
str,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,i64,str
"""ergonomis\DSC02412.JPG""",2673.0,1426.0,0.6860288,2742.0,1352.0,0.5782869,2587.0,1360.0,0.6361119,2851.0,1428.0,0.763622,2451.0,1449.0,0.709723,3015.0,1902.0,0.711977,2257.0,1916.0,0.7672756,3173.0,2415.0,0.658837,2157.0,2475.0,0.703141,3157.0,2557.0,0.440902,2205.0,2631.0,0.334904,2893.0,2918.0,0.447213,2432.0,2901.0,0.5723911,0,"""ergonomis"""
"""ergonomis\DSC02414.JPG""",2553.0,1548.0,0.6246008,2645.0,1465.0,0.405759,2488.0,1457.0,0.49856,2805.0,1512.0,0.657731,2427.0,1489.0,0.566506,2995.0,1900.0,0.6918683,2262.0,1900.0,0.6520261,3030.0,2511.0,0.793446,1991.0,2432.0,0.5237641,2623.0,2633.0,0.7455008,1606.0,2570.0,0.641524,2749.0,3070.0,0.390012,2271.0,3009.0,0.5088232,0,"""ergonomis"""
"""ergonomis\DSC02416.JPG""",2811.0,1522.0,0.4514594,2869.0,1428.0,0.7665354,2712.0,1434.0,0.6812546,2896.0,1464.0,0.366023,2503.0,1490.0,0.674537,3044.0,1925.0,0.79644,2277.0,1934.0,0.826779,3267.0,2465.0,0.514814,2165.0,2508.0,0.670309,3631.0,2616.0,0.7749762,2415.0,2661.0,0.381569,2920.0,3101.0,0.5792116,2453.0,3108.0,0.5448222,0,"""ergonomis"""
"""ergonomis\DSC02418.JPG""",2654.0,1438.0,0.5282986,2731.0,1362.0,0.493264,2566.0,1370.0,0.449793,2842.0,1430.0,0.749468,2449.0,1465.0,0.608965,3016.0,1913.0,0.7614244,2282.0,1911.0,0.7253464,3171.0,2504.0,0.6839093,2126.0,2538.0,0.5690689,3031.0,2791.0,0.298675,2337.0,2777.0,0.193512,2888.0,2989.0,0.543253,2411.0,2988.0,0.5330165,0,"""ergonomis"""
"""ergonomis\DSC02416.JPG""",2557.0,1550.0,0.6233348,2648.0,1460.0,0.4383338,2494.0,1458.0,0.6176586,2841.0,1497.0,0.674965,2466.0,1486.0,0.5448318,3010.0,1893.0,0.776701,2278.0,1862.0,0.7632377,3072.0,2539.0,0.7402708,1983.0,2402.0,0.5696551,2737.0,2744.0,0.286435,1663.0,2656.0,0.610937,2741.0,2973.0,0.5816682,2270.0,2919.0,0.606949,0,"""ergonomis"""


In [3]:
df_to_process = df.clone()
classes = df_to_process.select('class_name').unique().to_numpy()
y = tf.keras.utils.to_categorical(df_to_process.select('class_no').to_numpy())
X = df_to_process.drop(['file_name', 'class_name', 'class_no'])

In [4]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_test, y_test, test_size=0.5, random_state=42)

In [5]:
def get_center_point(landmarks, left_bodypart, right_bodypart):
    """Calculates the center point of the two given landmarks."""

    left = tf.gather(landmarks, left_bodypart.value, axis=1)
    right = tf.gather(landmarks, right_bodypart.value, axis=1)
    center = left * 0.5 + right * 0.5
    return center


def get_pose_size(landmarks, torso_size_multiplier=2.5):
    """Calculates pose size.
  
    It is the maximum of two values:
      * Torso size multiplied by `torso_size_multiplier`
      * Maximum distance from pose center to any pose landmark
    """
    # Hips center
    hips_center = get_center_point(landmarks, BodyPart.LEFT_HIP,
                                   BodyPart.RIGHT_HIP)

    # Shoulders center
    shoulders_center = get_center_point(landmarks, BodyPart.LEFT_SHOULDER,
                                        BodyPart.RIGHT_SHOULDER)

    # Torso size as the minimum body size
    torso_size = tf.linalg.norm(shoulders_center - hips_center)

    # Pose center
    pose_center_new = get_center_point(landmarks, BodyPart.LEFT_HIP,
                                       BodyPart.RIGHT_HIP)
    pose_center_new = tf.expand_dims(pose_center_new, axis=1)
    # Broadcast the pose center to the same size as the landmark vector to
    # perform substraction
    pose_center_new = tf.broadcast_to(pose_center_new,
                                      [tf.size(landmarks) // (13*2), 13, 2])

    # Dist to pose center
    d = tf.gather(landmarks - pose_center_new, 0, axis=0,
                  name="dist_to_pose_center")
    # Max dist to pose center
    max_dist = tf.reduce_max(tf.linalg.norm(d, axis=0))

    # Normalize scale
    pose_size = tf.maximum(torso_size * torso_size_multiplier, max_dist)

    return pose_size


def normalize_pose_landmarks(landmarks):
    """Normalizes the landmarks translation by moving the pose center to (0,0) and
    scaling it to a constant pose size.
    """
    # Move landmarks so that the pose center becomes (0,0)
    pose_center = get_center_point(landmarks, BodyPart.LEFT_HIP,
                                   BodyPart.RIGHT_HIP)
    pose_center = tf.expand_dims(pose_center, axis=1)
    # Broadcast the pose center to the same size as the landmark vector to perform
    # substraction
    pose_center = tf.broadcast_to(pose_center,
                                  [tf.size(landmarks) // (13*2), 13, 2])
    landmarks = landmarks - pose_center

    # Scale the landmarks to a constant pose size
    pose_size = get_pose_size(landmarks)
    landmarks /= pose_size

    return landmarks


def landmarks_to_embedding(landmarks_and_scores):
    """Converts the input landmarks into a pose embedding."""
    # Reshape the flat input into a matrix with shape=(17, 3)
    reshaped_inputs = layers.Reshape((13, 3))(landmarks_and_scores)

    # Normalize landmarks 2D
    landmarks = normalize_pose_landmarks(reshaped_inputs[:, :, :2])

    # Flatten the normalized landmark coordinates into a vector
    return layers.Flatten()(landmarks)


In [6]:
inputs = tf.keras.Input(shape=39)
embedding = landmarks_to_embedding(inputs)

layer = layers.Dense(128, activation='relu')(embedding)
layer = layers.Dropout(0.5)(layer)
layer = layers.Dense(64, activation='relu')(layer)
layer = layers.Dropout(0.5)(layer)
outputs = layers.Dense(2, activation='softmax')(layer)

model = tf.keras.Model(inputs=inputs, outputs=outputs)
model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 39)]         0           []                               
                                                                                                  
 reshape (Reshape)              (None, 13, 3)        0           ['input_1[0][0]']                
                                                                                                  
 tf.__operators__.getitem (Slic  (None, 13, 2)       0           ['reshape[0][0]']                
 ingOpLambda)                                                                                     
                                                                                                  
 tf.compat.v1.gather (TFOpLambd  (None, 2)           0           ['tf.__operators__.getitem[0]

In [7]:
def log_confusion_matrix(cm, title=None, cmap='Blues', class_names=classes):
    """Plots the confusion matrix."""
    plt.rcParams["font.family"] = "serif"
    plt.rcParams["font.size"] = 10
    fig, ax = plt.subplots(figsize=(7, 6), dpi=300)
    sns.heatmap(cm, annot=True, cmap=cmap,
                ax=ax, annot_kws={"fontsize": 11},
                xticklabels=class_names, yticklabels=class_names)
    ax.set_xlabel('Predicted Class')
    ax.set_ylabel('Actual Class')
    if title:
        ax.set_title(title)
    # log the confusion matrix
    live.log_image('confusion_matrix.png', fig)
    plt.show()
    plt.close(fig)
    
def log_roc_auc_curve(y_true, y_pred_proba):
    fig, ax = plt.subplots(figsize=(7, 6), dpi=300)
    skplt.metrics.plot_roc(y_true, y_pred_proba, ax=ax)
    live.log_image('roc_auc_curve.png', fig)
    plt.show()
    plt.close(fig)

In [8]:
tf.keras.backend.clear_session()

In [9]:
model.compile(
    optimizer='adam',
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

# Add a checkpoint callback to store the checkpoint that has the highest
# validation accuracy.
checkpoint_path = "dumps/weights.best.hdf5"
checkpoint = tf.keras.callbacks.ModelCheckpoint(checkpoint_path,
                                             monitor='val_accuracy',
                                             verbose=1,
                                             save_best_only=True,
                                             mode='max')
earlystopping = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy',
                                              patience=20)

# Start training
with Live(report='html') as live:
    history = model.fit(X_train.to_numpy(), y_train,
                        epochs=200,
                        batch_size=16,
                        validation_data=(X_val.to_numpy(), y_val),
                        callbacks=[checkpoint, earlystopping, DVCLiveCallback(live=live)])
    model.save('dumps/mymodel.keras')
    live.log_artifact('dumps/mymodel.keras', type='model')
    test_loss, test_accuracy = model.evaluate(X_test.to_numpy(), y_test)

    # Classify pose in the TEST dataset using the trained model
    y_pred_proba = model.predict(X_test.to_numpy())
    y_pred = np.argmax(y_pred_proba, axis=1)
    y_test_ravel = np.argmax(y_test, axis=1)
    
    # log test metric
    live.log_metric('test_loss', test_loss)
    live.log_metric('test_accuracy', test_accuracy)
    live.log_metric('test_recall', recall_score(y_test_ravel, y_pred))
    live.log_metric('test_precision', precision_score(y_test_ravel, y_pred))
    live.log_metric('test_f1', f1_score(y_test_ravel, y_pred))
    live.log_metric('test_roc_auc', roc_auc_score(y_test, y_pred_proba))

    # Convert the prediction result to class name
    y_pred_label = [classes[i][0] for i in y_pred]
    y_true_label = [classes[i][0] for i in y_test_ravel]

    # Plot the confusion matrix
    cm = confusion_matrix(y_test_ravel, y_pred)
    log_confusion_matrix(cm, class_names=['ergonomic', 'non-ergonomic'])
    log_roc_auc_curve(y_test_ravel, y_pred_proba)

# Print the classification report
print('\nClassification Report:\n', classification_report(y_true_label,
                                                          y_pred_label))

Epoch 1/200
Epoch 1: val_accuracy improved from -inf to 0.69032, saving model to dumps\weights.best.hdf5
Epoch 2/200
Epoch 2: val_accuracy improved from 0.69032 to 0.87097, saving model to dumps\weights.best.hdf5
Epoch 3/200
Epoch 3: val_accuracy improved from 0.87097 to 0.87742, saving model to dumps\weights.best.hdf5
Epoch 4/200
Epoch 4: val_accuracy improved from 0.87742 to 0.88387, saving model to dumps\weights.best.hdf5
Epoch 5/200
Epoch 5: val_accuracy improved from 0.88387 to 0.90323, saving model to dumps\weights.best.hdf5
Epoch 6/200
Epoch 6: val_accuracy did not improve from 0.90323
Epoch 7/200
Epoch 7: val_accuracy improved from 0.90323 to 0.90968, saving model to dumps\weights.best.hdf5
Epoch 8/200
Epoch 8: val_accuracy did not improve from 0.90968
Epoch 9/200
Classification Report:
                precision    recall  f1-score   support

    ergonomis       0.93      0.76      0.84        92
non-ergonomis       0.72      0.92      0.81        63

     accuracy             