# Prerequisite

In [None]:
# Install ViT-Pose Framework
!git clone https://github.com/JunkyByte/easy_ViTPose.git
!cd easy_ViTPose/ && pip install -r requirements.txt && pip install -e . 
# Install ViT-Posepretrain weight
!pip install huggingface_hub 
!pip install gdown
!pip install vidaug

In [1]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
import shutil
import os
import cv2
import numpy as np

2024-07-25 05:04:07.912705: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-07-25 05:04:07.912804: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-07-25 05:04:08.066357: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


In [2]:
# Reset the plotly graph
import plotly.graph_objs as go
from plotly.offline import init_notebook_mode, iplot

# Initialize plotly notebook mode
init_notebook_mode(connected=True)

# Load dataset

In [3]:
import os
import pandas as pd
data_list = []

for root, dirs, files in os.walk('/kaggle/input/new-data/kaggle/working/new_data'): #Change: file path
    print(f"Currently in directory: {root}")
    label = root[47:] #Change base on len
    for file in files:
      video_path = os.path.join(root, file)
      data_list.append((video_path,label))

import pandas as pd
df_action = pd.DataFrame(data_list, columns=['file_path', 'label'])
df_action = df_action.sample(frac=1, random_state=42).reset_index(drop=True)

Currently in directory: /kaggle/input/new-data/kaggle/working/new_data
Currently in directory: /kaggle/input/new-data/kaggle/working/new_data/Normal
Currently in directory: /kaggle/input/new-data/kaggle/working/new_data/Abnormal


In [5]:
df_action_fall = df_action[(df_action['label'] == 'Abnormal')]
df_action_walk = df_action[(df_action['label'] == 'Normal')]
df_action_walk = df_action_walk.sample(frac=1).reset_index(drop=True)
df_action_fall = df_action_fall.sample(frac=1).reset_index(drop=True)

In [7]:
df_action = pd.concat([df_action_fall,df_action_walk],axis=0)
df_action = df_action.sample(frac=1, random_state=42).reset_index(drop=True)

In [None]:
from sklearn.model_selection import train_test_split

# split with sklearn
df_train, df_temp = train_test_split(df_action, test_size=0.2, shuffle=True, random_state=42)
df_val, df_test = train_test_split(df_temp, test_size=0.5, shuffle=True, random_state=42)


# Setting up VitPose

In [9]:
#@title Choose model and run this cell

MODEL_SIZE = 'b'  #@param ['s', 'b', 'l', 'h']
YOLO_SIZE = 's'  #@param ['s', 'n']
DATASET = 'coco'  #@param ['coco_25', 'coco', 'wholebody', 'mpii', 'aic', 'ap10k', 'apt36k']
ext = '.pth'
ext_yolo = '.pt'

In [10]:
import os
from huggingface_hub import hf_hub_download
MODEL_TYPE = "torch"
YOLO_TYPE = "torch"
REPO_ID = 'JunkyByte/easy_ViTPose'
FILENAME = os.path.join(MODEL_TYPE, f'{DATASET}/vitpose-' + MODEL_SIZE + f'-{DATASET}') + ext
FILENAME_YOLO = 'yolov8/yolov8' + YOLO_SIZE + ext_yolo

print(f'Downloading model {REPO_ID}/{FILENAME}')
model_path = hf_hub_download(repo_id=REPO_ID, filename=FILENAME) # ViT-Pose - Human Pose
yolo_path = hf_hub_download(repo_id=REPO_ID, filename=FILENAME_YOLO) # YOLO - Bouding box

Downloading model JunkyByte/easy_ViTPose/torch/coco/vitpose-b-coco.pth


In [11]:
# Load model
# If you get an error from PIL restart environment and rerun this cell to update packages version
from easy_ViTPose import VitInference
model_pose2 = VitInference(model_path, yolo_path, MODEL_SIZE,
                     dataset=DATASET, yolo_size=320, is_video=False)

The cache for model files in Transformers v4.22.0 has been updated. Migrating your old cache. This is a one-time only operation. You can interrupt this and resume the migration later on by calling `transformers.utils.move_cache()`.


0it [00:00, ?it/s]

In [12]:
def load_all_frames(video_path):
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        return {'frames':None,'frames_dim':None,'success':False}
    frames_dims = []
    frames = []
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        h,w,c = frame.shape
        frames_dims.append(list([0,h,w,c]))
        # frame = cv2.resize(frame, (128, 128), interpolation=cv2.INTER_CUBIC)
        frames.append(frame)

    cap.release()
    return {'frames':np.asarray(frames),'frames_dim':frames_dims,'success':True}

# Vit-Pose (Pose Estimation)

In [15]:
# Extract pose
# Trim video to a given frames (40 frames)
def trim_video_frames(video,max_frame):
    f,_,_,_ = video.shape
    startf = f//2 - max_frame//2
    return video[startf:startf+max_frame, :, :, :]

def prepare_data(df):
    videos_path = df['file_path'].values.tolist()
    labels = df['label'].to_list()
    pos_frames = [] # keeped all the video's pose
    updated_label = []
    for video_path,label in zip(videos_path,labels):
        # check frames
        load_data = load_all_frames(video_path)
        if load_data['success']==False:
            continue
        new_video = trim_video_frames(load_data['frames'],40)
        frames=[]
        count_success_frame=0 # Triggered unconsistent issues
        for frame in new_video:
            
            # Extract keypoint using Vit-Pose
            frame_keypoints = model_pose2.inference(frame)
            if 0 in frame_keypoints: # Validate item in frame_keypoints
                frames.append(frame_keypoints[0][:,:2])
                count_success_frame+=1

        # Make sure each frame had contain human pose
        if count_success_frame==40:
            updated_label.append(label)
            pos_frames.append(frames)
        
    return pos_frames, updated_label

train_data, train_labels = prepare_data(df_train)
test_data, test_labels = prepare_data(df_test)
val_data, val_labels = prepare_data(df_val)

In [16]:
# Reformat labels

train_labels = [1 if label == "Abnormal" else 0 for label in train_labels]
test_labels = [1 if label == "Abnormal" else 0 for label in test_labels]
val_labels = [1 if label == "Abnormal" else 0 for label in val_labels]


In [17]:
# convert input to tensor
X_train = tf.convert_to_tensor(train_data,dtype=tf.float32)
y_train = tf.convert_to_tensor(train_labels,dtype=tf.float32)

X_val = tf.convert_to_tensor(val_data,dtype=tf.float32)
y_val = tf.convert_to_tensor(val_labels,dtype=tf.float32)

X_test = tf.convert_to_tensor(test_data,dtype=tf.float32)
y_test = tf.convert_to_tensor(test_labels,dtype=tf.float32)

# GRU (After Tuned)

In [144]:
# clear backend
keras.backend.clear_session()

In [145]:
from tensorflow.keras import layers, models

model = models.Sequential()
model.add(layers.Input(shape=(40, 17, 2)))
model.add(layers.Reshape((40, 34))) 
model.add(layers.GRU(16, return_sequences=True))  
model.add(layers.GRU(32, return_sequences=True)) 
model.add(layers.GRU(64))

model.add(layers.Dropout(0.5))
# Dense output layer with 2 units and softmax activation
model.add(layers.Dense(2, activation='softmax'))

print(model.summary())


None


In [146]:
optimizer = keras.optimizers.Adam(learning_rate=1e-4)
model.compile(
    loss=keras.losses.SparseCategoricalCrossentropy(),
    optimizer=optimizer,
    metrics=["accuracy"],
)	

In [None]:
from tensorflow.keras.callbacks import EarlyStopping

# apply early stoping
early_stopping = EarlyStopping(monitor='val_loss', patience=6, restore_best_weights=True)
history = model.fit(
    X_train, y_train, validation_data=(X_val, y_val), batch_size=64, epochs=100,
    callback=[early_stopping]
)

## Loss Valus

In [108]:
import plotly.graph_objs as go
import plotly.subplots as sp

# Sample data (replace these with your actual loss values)
loss2 = history.history['loss']
val_loss = history.history['val_loss']

epochs = list(range(1, len(loss2) + 1))

# Create subplots
fig = sp.make_subplots(rows=1, cols=2, subplot_titles=('Accuracy', 'Loss'))

# Remove accuracy subplot by creating only one subplot for loss
fig = sp.make_subplots(rows=1, cols=1, subplot_titles=('Loss',))

# Add traces for loss
fig.add_trace(
    go.Scatter(x=epochs, y=loss2, mode='lines', name='Train Loss', line=dict(color='blue')),
    row=1, col=1
)
fig.add_trace(
    go.Scatter(x=epochs, y=val_loss, mode='lines', name='Test Loss', line=dict(color='red')),
    row=1, col=1
)

# Update layout
fig.update_layout(
    title_text='VP-GRU Training and Validation Loss',
    showlegend=True,
    xaxis_title='Epoch',
    yaxis_title='Value'
)

# Update xaxis and yaxis titles for the loss subplot
fig.update_xaxes(title_text='Epoch', row=1, col=1)
fig.update_yaxes(title_text='Loss', dtick=0.1, row=1, col=1)  # Change dtick to desired step size for Loss

# Show figure
fig.show()

## Classification report

In [121]:
from sklearn.metrics import classification_report
def calculate_metrics(model, X_test, y_test):
    # Get the true labels and predictions
    y_true = []
    y_pred = []
    label_dict = {0: 'Normal', 1: 'Abnormal'}

    for frame, label in zip(X_test,y_test):
        pred = model.predict(tf.expand_dims(frame, axis=0))[0]
        y_true.append(label)
        y_pred.append(np.argmax(pred))

    # Calculate metrics
    report = classification_report(y_true, y_pred, target_names=label_dict.values(), output_dict=True)
    
    accuracy = report['accuracy']
    precision = report['weighted avg']['precision']
    recall = report['weighted avg']['recall']
    f1_score = report['weighted avg']['f1-score']

    return accuracy, precision, recall, f1_score


accuracy, precision, recall, f1_score = calculate_metrics(model,X_test,y_test)
print(f'Accuracy: {accuracy}')
print(f'Precision: {precision}')
print(f'Recall: {recall}')
print(f'f1_score{f1_score}')

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18

## Confusion matrix


In [171]:
# Create Plotly heatmap for confusion matrix
fig = go.Figure(data=go.Heatmap(
                   z=conf_matrix,
                   x=['0', '1'],
                   y=['0', '1'],
                   colorscale=[[0, 'rgb(173, 216, 230)'], [1, 'rgb(0, 0, 255)']],  
                   showscale=True,  
                   colorbar=dict(
                       titleside='right'
                   )
               ))

# Add annotations
annotations = []
for i in range(conf_matrix.shape[0]):
    for j in range(conf_matrix.shape[1]):
        annotations.append(
            dict(
                text=str(conf_matrix[i][j]),
                x=j,
                y=i,
                xref='x1',
                yref='y1',
                showarrow=False,
                font=dict(
                    color="white" if conf_matrix[i][j] > conf_matrix.max() / 2 else "black"
                )
            )
        )

fig.update_layout(
    title='Confusion Matrix - Test Set',
    xaxis=dict(title='Predicted Label'),
    yaxis=dict(title='Actual Label'),
    annotations=annotations
)

# Show the Plotly figure
fig.show()


# GRU (Before Tuned)

In [None]:
# clear backend
keras.backend.clear_session()
from tensorflow.keras import layers, models

model = models.Sequential()
model.add(layers.Input(shape=(40, 17, 2)))
model.add(layers.Reshape((40, 34))) 
model.add(layers.GRU(16, return_sequences=True))  
model.add(layers.GRU(32, return_sequences=True)) 
model.add(layers.GRU(64))

model.add(layers.Dropout(0.4))
# Dense output layer with 2 units and softmax activation
model.add(layers.Dense(2, activation='softmax'))

In [None]:
optimizer = keras.optimizers.Adam(learning_rate=1e-4)
model.compile(
    loss=keras.losses.SparseCategoricalCrossentropy(),
    optimizer=optimizer,
    metrics=["accuracy"],
)	

In [None]:
history = model.fit(
    X_train, y_train, validation_data=(X_val, y_val), batch_size=64, epochs=100
)

## Loss Values

In [76]:
import plotly.graph_objs as go
import plotly.subplots as sp

# Sample data (replace these with your actual loss values)
loss2 = history.history['loss']
val_loss = history.history['val_loss']

epochs = list(range(1, len(loss2) + 1))

# Create subplots
fig = sp.make_subplots(rows=1, cols=2, subplot_titles=('Accuracy', 'Loss'))

# Remove accuracy subplot by creating only one subplot for loss
fig = sp.make_subplots(rows=1, cols=1, subplot_titles=('Loss',))

# Add traces for loss
fig.add_trace(
    go.Scatter(x=epochs, y=loss2, mode='lines', name='Train Loss', line=dict(color='blue')),
    row=1, col=1
)
fig.add_trace(
    go.Scatter(x=epochs, y=val_loss, mode='lines', name='Test Loss', line=dict(color='red')),
    row=1, col=1
)

# Update layout
fig.update_layout(
    title_text='VP-GRU Training and Validation Loss',
    showlegend=True,
    xaxis_title='Epoch',
    yaxis_title='Value'
)

# Update xaxis and yaxis titles for the loss subplot
fig.update_xaxes(title_text='Epoch', row=1, col=1)
fig.update_yaxes(title_text='Loss', dtick=0.1, row=1, col=1)  # Change dtick to desired step size for Loss

# Show figure
fig.show()

# Hyperparameter tuning

NOTES:
- In this section only GRU will be invovled in this tuning process

In [128]:
from tensorflow.keras import layers, models
import keras_tuner as kt

def build_model(hp):
    keras.backend.clear_session()
    model = models.Sequential()
    model.add(layers.Input(shape=(40, 17, 2)))
    model.add(layers.Reshape((40, 34))) 
    model.add(layers.GRU(16, return_sequences=True))  
    model.add(layers.GRU(32, return_sequences=True)) 
    model.add(layers.GRU(64))

    # Setup params for hyper tune [learning_rate, dropout]
    DROPOUT_RATE = hp.Float('dropout_rate', min_value=0.2, max_value=0.8, step=0.1) #NOTE: Total 7
    LEARNING_RATE = hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4, 1e-5]) #NOTE: total 4
    
    model.add(layers.Dropout(DROPOUT_RATE))
    model.add(layers.Dense(2, activation='softmax'))

    optimizer = keras.optimizers.Adam(learning_rate=LEARNING_RATE)
    model.compile(
        loss=keras.losses.SparseCategoricalCrossentropy(),
        optimizer=optimizer,
        metrics=["accuracy"],
    )
    return model

In [129]:
from tensorflow.keras import layers, models
import keras_tuner as kt

tuner = kt.GridSearch(
    hypermodel=build_model,
    objective="val_accuracy",
    max_trials=50,
    executions_per_trial=1,
    overwrite=True,
    directory='my_dir',
    project_name="VP-GRU_GridSearch",
)

In [126]:
tuner.search_space_summary()

Search space summary
Default search space size: 2
dropout_rate (Float)
{'default': 0.2, 'conditions': [], 'min_value': 0.2, 'max_value': 0.8, 'step': 0.1, 'sampling': 'linear'}
learning_rate (Choice)
{'default': 0.01, 'conditions': [], 'values': [0.01, 0.001, 0.0001, 1e-05], 'ordered': True}


In [167]:
tuner.search(X_train, y_train, epochs=40, validation_data=(X_val, y_val))
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]

print(f"""The hyperparameter grid serach is completed.The optimal parameter for
dropput is {round(best_hps.get('dropout_rate'),1)} and for the learning rate is {best_hps.get('learning_rate')}""")

Trial 28 Complete [00h 00m 10s]
val_accuracy: 0.800000011920929

Best val_accuracy So Far: 0.9090909090909091
Total elapsed time: 00h 15m 37s

The hyperparameter grid serach is completed.The optimal parameter for
dropput is 0.5 and for the learning rate is 1e-4


# Save model

In [None]:
model.save('VP-GRU_25July.keras')

# Demostration

## Setup VitPose

In [None]:
#@title Choose model and run this cell

MODEL_SIZE = 'b'  #@param ['s', 'b', 'l', 'h']
YOLO_SIZE = 's'  #@param ['s', 'n']
DATASET = 'coco'  #@param ['coco_25', 'coco', 'wholebody', 'mpii', 'aic', 'ap10k', 'apt36k']
ext = '.pth'
ext_yolo = '.pt'

import os
from huggingface_hub import hf_hub_download
MODEL_TYPE = "torch"
YOLO_TYPE = "torch"
REPO_ID = 'JunkyByte/easy_ViTPose'
FILENAME = os.path.join(MODEL_TYPE, f'{DATASET}/vitpose-' + MODEL_SIZE + f'-{DATASET}') + ext
FILENAME_YOLO = 'yolov8/yolov8' + YOLO_SIZE + ext_yolo

print(f'Downloading model {REPO_ID}/{FILENAME}')
model_path = hf_hub_download(repo_id=REPO_ID, filename=FILENAME) # ViT-Pose - Human Pose
yolo_path = hf_hub_download(repo_id=REPO_ID, filename=FILENAME_YOLO) # YOLO - Bouding box

from easy_ViTPose import VitInference
model_pose2 = VitInference(model_path, yolo_path, MODEL_SIZE,
                     dataset=DATASET, yolo_size=320, is_video=False)

def load_all_frames(video_path):
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        return {'frames':None,'frames_dim':None,'success':False}
    frames_dims = []
    frames = []
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        h,w,c = frame.shape
        frames_dims.append(list([0,h,w,c]))
        # frame = cv2.resize(frame, (128, 128), interpolation=cv2.INTER_CUBIC)
        frames.append(frame)

    cap.release()
    return {'frames':np.asarray(frames),'frames_dim':frames_dims,'success':True}

## Preprocess frame

In [None]:
# Preprocess
# video_path = '/content/video_251.avi'
def load_all_frames(video_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        return {'frames': None, 'frames_dim': None, 'success': False}

    frames_dims = []
    frames = []
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        h, w, c = frame.shape
        frames_dims.append([0, h, w, c])
        frame = cv2.resize(frame, (224, 224), interpolation=cv2.INTER_CUBIC)
        frames.append(frame)

    cap.release()
    return {'frames':np.asarray(frames),'frames_dim':frames_dims,'success':True}

def trim_video_frames(video,max_frame):
    f,_,_,_ = video.shape
    startf = f//2 - max_frame//2
    return video[startf:startf+max_frame, :, :, :]


def prepare_test(file_path):
    load_data = load_all_frames(file_path)
    trimmed_vid = trim_video_frames(load_data['frames'],40) # trim vid
    key_frames = []
    for frame in trimmed_vid:
        frame_keypoints = model_pose2.inference(frame)
        if 0 in frame_keypoints: # shoudl write this way to prevent 
            key_frames.append(frame_keypoints[0][:,:2])
    return key_frames

# Abnormal video_254_flip  Fall53_Cam3_cutup
# Normal video_317
file_path = '/kaggle/working/dataset_resized/train/Walking/video_317.avi' 
key_frame = prepare_test(file_path)
print(np.array(key_frame).shape) # number of frame detected by vitpose

## Predicting

In [None]:
# load model
prev_model = tf.keras.models.load_model('/kaggle/working/CNN-RNN_26Jul_1.keras')

In [None]:
# make pred
label_dict = {0:'Normal', 1:'Abnormal'} 
output = prev_model.predict([test[0],test[1]])[0]
print(output)
pred = np.argmax(output.tolist(),axis=0)
print(label_dict[pred])