In [14]:
import matplotlib.pyplot as plt
import numpy as np
import os
from tqdm import tqdm
import tensorflow as tf
import pandas as pd
from pathlib import Path
from tensorflow.keras.utils import to_categorical
from tensorflow import keras  
from tensorflow.keras.models import load_model

In [15]:
actions = ["Doing other things", "No gesture", 'Rolling Hand Backward', 'Rolling Hand Forward', 'Shaking Hand', 
           'Sliding Two Fingers Down', 'Sliding Two Fingers Left', 'Sliding Two Fingers Right', 'Sliding Two Fingers Up',
            'Stop Sign', 'Swiping Down','Swiping Left', 'Swiping Right', 'Swiping Up',
            'Thumb Down', 'Thumb Up',
            'Turning Hand Clockwise', 'Turning Hand Counterclockwise'
            ]

In [16]:
df = pd.read_csv(r"C:\Users\Admin\Documents\2024.2\CV\Project\Validation.csv")
df.drop(columns = ['format', 'shape', 'frames'], axis = 1, inplace = True)

In [17]:
df_filtered = df[df['label'].isin(actions)]

In [18]:
df_filtered

Unnamed: 0,video_id,label,label_id
1,30,Swiping Down,15
2,68,Shaking Hand,9
3,77,Thumb Down,19
4,96,Swiping Right,17
7,177,Thumb Down,19
...,...,...,...
7042,147945,Shaking Hand,9
7043,147960,Sliding Two Fingers Left,11
7044,148032,Doing other things,0
7045,148046,Doing other things,0


In [19]:
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau, TensorBoard
from tensorflow.keras import Model
from tensorflow.keras.layers import Input, Concatenate, Conv2D, Dense, InputLayer, Activation, GlobalAveragePooling2D, BatchNormalization, ReLU, AveragePooling2D, Layer

In [20]:
class MSST_Layer(Layer):
    def __init__(self, stride, filter1, filter2, filter3, filter4, filter5, **kwargs):
        super(MSST_Layer, self).__init__(**kwargs)
        self.stride = stride
        self.filters = [filter1, filter2, filter3, filter4, filter5]
        self.concat = Concatenate()
        
        # Tạo các nhánh
        self.branch1 = self._make_branch([(1, 1)], filter1, stride)
        self.branch2 = self._make_branch([(1, 1), (3, 3)], [filter1, filter2], stride)
        self.branch3 = self._make_branch([(1, 1), (5, 1), (1, 5)], [filter3]*3, stride)
        self.branch4 = self._make_branch([(1, 1), (7, 1), (1, 7)], [filter4]*3, stride)
        self.branch5 = self._make_branch([(1, 1), (11, 1), (1, 11)], [filter5]*3, stride)

    def _make_branch(self, kernel_sizes, filters, first_stride):
        layers = []
        if isinstance(filters, int):
            filters = [filters] * len(kernel_sizes)
        for i, (kernel, f) in enumerate(zip(kernel_sizes, filters)):
            stride = first_stride if i == 0 else (1, 1)
            layers.append(Conv2D(f, kernel_size=kernel, strides=stride, padding='same'))
            layers.append(BatchNormalization())
            layers.append(ReLU())
        return layers

    def _apply_branch(self, inputs, branch_layers, training):
        x = inputs
        for layer in branch_layers:
            if isinstance(layer, BatchNormalization):
                x = layer(x, training=training)
            else:
                x = layer(x)
        return x

    def call(self, inputs, training=None):
        b1 = self._apply_branch(inputs, self.branch1, training)
        b2 = self._apply_branch(inputs, self.branch2, training)
        b3 = self._apply_branch(inputs, self.branch3, training)
        b4 = self._apply_branch(inputs, self.branch4, training)
        b5 = self._apply_branch(inputs, self.branch5, training)
        return self.concat([b1, b2, b3, b4, b5])

    def get_config(self):
        config = super(MSST_Layer, self).get_config()
        keys = ['filter1', 'filter2', 'filter3', 'filter4', 'filter5']
        config.update({'stride': self.stride, **{k: v for k, v in zip(keys, self.filters)}})
        return config

In [21]:
POSE_CONNECTIONS = frozenset([(11, 12), (11, 13), (13, 15), (12, 14), (14, 16)])
    
# Mapping from pose landmark IDs to file landmark IDs
poseid2fileid = {12: 0, 14: 1, 16: 2, 11: 3, 13: 4, 15: 5}

# Create file connections based on mapped pose IDs
POSE_CONNECTIONS_FILE_INDEX = [
    (poseid2fileid[a], poseid2fileid[b])
    for (a, b) in POSE_CONNECTIONS
]

"Connection" 
HAND_PALM_CONNECTIONS = ((0, 1), (0, 5), (9, 13), (13, 17), (5, 9), (0, 17))

HAND_THUMB_CONNECTIONS = ((1, 2), (2, 3), (3, 4))

HAND_INDEX_FINGER_CONNECTIONS = ((5, 6), (6, 7), (7, 8))

HAND_MIDDLE_FINGER_CONNECTIONS = ((9, 10), (10, 11), (11, 12))

HAND_RING_FINGER_CONNECTIONS = ((13, 14), (14, 15), (15, 16))

HAND_PINKY_FINGER_CONNECTIONS = ((17, 18), (18, 19), (19, 20))

HAND_CONNECTIONS = frozenset().union(*[
    HAND_PALM_CONNECTIONS, HAND_THUMB_CONNECTIONS,
    HAND_INDEX_FINGER_CONNECTIONS, HAND_MIDDLE_FINGER_CONNECTIONS,
    HAND_RING_FINGER_CONNECTIONS, HAND_PINKY_FINGER_CONNECTIONS
])

handid2fileid = {a: a + 6 for a in range(21)}

#Mapping from joint_id to index_id of array

LEFT_HAND_CONNECTIONS_FILE_INDEX = [
    (handid2fileid[a], handid2fileid[b])
    for (a, b) in HAND_CONNECTIONS
]

RIGHT_HAND_CONNECTIONS_FILE_INDEX = [
    (handid2fileid[a] + 21, handid2fileid[b] + 21)
    for (a, b) in HAND_CONNECTIONS
]

In [22]:
def cal_sequence(sequence):
    """
    Input: <List> of keypoints from 37 frames of video -> Shape (37, 48, 3)
    """
    
    sequence1 = np.expand_dims(sequence, axis = 0) # Shape: (1, sequence_length, 48, 3)
    sequence2 = sequence[1:] -sequence[:-1] # Shape: (sequence_length - 1, 48, 3)
    # 3. Bone stream
    sequence3 = []

    def retrieve_bone_vector(x):
        return np.stack([sequence[:, b, :] - sequence[:, a, :] for (a, b) in x], axis=1)
    
    bone_pose = retrieve_bone_vector(POSE_CONNECTIONS_FILE_INDEX)
    bone_left = retrieve_bone_vector(LEFT_HAND_CONNECTIONS_FILE_INDEX)
    bone_right = retrieve_bone_vector(RIGHT_HAND_CONNECTIONS_FILE_INDEX)    
    sequence3 = np.concatenate([bone_pose, bone_left, bone_right], axis=1) # Shape: (sequence_length, 47, 3)

    # 4. Bone motion stream
    sequence4 = sequence3[1:] - sequence3[:-1] # Shape: (sequence_length - 1, 47, 3)

    return sequence1, np.expand_dims(sequence2, axis = 0), np.expand_dims(sequence3, axis = 0), np.expand_dims(sequence4, axis = 0)

In [23]:
model_path1 = r"C:\Users\Admin\Documents\2024.2\CV\Project\test\Hand-Tracking-Computer-Control\best_model\joint_stream.keras"
model_path2 = r"C:\Users\Admin\Documents\2024.2\CV\Project\test\Hand-Tracking-Computer-Control\best_model\joint_motion_stream.keras"
model_path3 = r"C:\Users\Admin\Documents\2024.2\CV\Project\test\Hand-Tracking-Computer-Control\best_model\bone_stream.keras"
model_path4 = r"C:\Users\Admin\Documents\2024.2\CV\Project\test\Hand-Tracking-Computer-Control\best_model\bone_motion_stream.keras"

In [24]:
models = {
    "joint_stream": load_model(model_path1, custom_objects={'MSST_Layer': MSST_Layer}),
    "joint_motion_stream": load_model(model_path2, custom_objects={'MSST_Layer': MSST_Layer}),
    "bone_stream": load_model(model_path3, custom_objects={'MSST_Layer': MSST_Layer}),
    "bone_motion_stream": load_model(model_path4, custom_objects={'MSST_Layer': MSST_Layer}),
}



In [25]:
test_case = r"C:\Users\Admin\Documents\2024.2\CV\Project\joint_stream\val\00030.npy"
test_case = np.load(test_case)
test_case  = np.expand_dims(test_case, axis=0)
print(test_case.shape)
model = models["joint_stream"]
pred = model.predict(test_case)
print(pred.shape)
print(actions[np.argmax(pred)])

(1, 37, 48, 3)
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 5s/step
(1, 18)
Swiping Down


In [26]:
action2index = {a: i for i, a in enumerate(actions)}
one_hot_labels = to_categorical(range(len(actions)), num_classes=len(actions))

In [27]:
def predict_sequence(sequence):
    seqs = cal_sequence(sequence)
    preds = np.array([model.predict(s)[0] for model, s in zip(models.values(), seqs)]) #Softmax [0.1, 0.2,...]
    ensemble_pred = np.mean(preds, axis=0) #Softmax (ensemble)
    return preds + [ensemble_pred] 

In [28]:
# Prepare data holders
y_true, y_true_encoded = [], []
predictions = {k: [] for k in models.keys()}
predictions["ensemble"] = []
scores = {k: [] for k in models.keys()}
scores["ensemble"] = [] 

In [None]:
val_path = r"C:\Users\Admin\Documents\2024.2\CV\Project\joint_stream\val"

for _, row in tqdm(df_filtered.iterrows()):
    video_file = os.path.join(val_path, f"{row['video_id']:05d}.npy")
    if not os.path.exists(video_file):
        continue

    sequence = np.load(video_file)
    preds = predict_sequence(sequence)
    
    y_true.append(row['label']) # (4780,)
    y_true_encoded.append(one_hot_labels[action2index[row['label']]]) # (4780, 18)

    for key, pred in zip(list(models.keys()) + ["ensemble"], preds):
        pred_label = actions[np.argmax(pred)]
        predictions[key].append(pred_label) # (5, 4780)
        scores[key].append(pred) # (5, 4780, )

In [103]:
# scores (5, 4780, 18)
# predictions (5, 4780, 1)
model_keys = list(scores.keys())[:4]  # ['model_1', 'model_2', 'model_3', 'model_4']

arrays = [np.array(scores[k]) for k in model_keys]  # mỗi cái (4780, 18)

stacked = np.stack(arrays, axis=0)  

avg_softmax = np.mean(stacked, axis=0)

scores['ensemble'] = avg_softmax # 4780, 18
predictions['ensemble'] = np.argmax(scores['ensemble'], axis = 1)
predictions['ensemble'] = [actions[i] for i in predictions['ensemble']]

In [30]:
from sklearn.metrics import roc_auc_score, precision_score, recall_score, accuracy_score, f1_score

In [104]:
def compute_metrics(y_true, y_pred, y_true_enc, y_scores):
    metrics = {}
    for key in y_pred.keys():
        
        metrics[key] = {
            "accuracy": accuracy_score(y_true, y_pred[key]),
            "precision": precision_score(y_true, y_pred[key], average='weighted'),
            "recall": recall_score(y_true, y_pred[key], average='weighted'),
            "f1": f1_score(y_true, y_pred[key], average='weighted')
        }

        if len(y_true_enc) > 0 and len(y_scores[key]) > 0:
            try:
                auc = roc_auc_score(np.array(y_true_enc), np.array(y_scores[key]), average='weighted', multi_class='ovr')
                metrics[key]["auc"] = auc
            except ValueError as e:
                print(f"AUC error for {key}: {e}")
                metrics[key]["auc"] = np.nan
        else:
            print(f"AUC skipped for {key} due to empty inputs.")
            metrics[key]["auc"] = np.nan

    return metrics

In [105]:
metrics = compute_metrics(y_true, predictions, y_true_encoded, scores)

In [106]:
table_data = []
for model_name, metric in metrics.items():
    table_data.append([
        model_name.replace('_', ' ').title(),
        metric['accuracy'],
        metric['precision'],
        metric['recall'],
        metric['f1'],
        metric['auc']
    ])

df_results = pd.DataFrame(table_data, columns=[
    "Model", "Accuracy", "Precision", "Recall", "F1 Score", "AUC"
])

df_results[["Accuracy", "Precision", "Recall", "F1 Score", "AUC"]] = df_results[
    ["Accuracy", "Precision", "Recall", "F1 Score", "AUC"]
].map(lambda x: f"{x:.4f}")


print(df_results.to_string(index=False))

              Model Accuracy Precision Recall F1 Score    AUC
       Joint Stream   0.8906    0.8920 0.8906   0.8904 0.9952
Joint Motion Stream   0.8877    0.8910 0.8877   0.8879 0.9952
        Bone Stream   0.8906    0.8913 0.8906   0.8904 0.9952
 Bone Motion Stream   0.8611    0.8633 0.8611   0.8606 0.9947
           Ensemble   0.9044    0.9062 0.9044   0.9043 0.9954
