In [1]:
import tensorflow as tf
import tensorflow_hub as hub
import tensorflow.keras as keras

import cv2
import numpy as np

In [2]:
video_ids = [2] # range(1, 6)
frame_skip = 8

In [3]:
labels = []
for id in video_ids:
    with open(f'data/labels/video{id}.txt') as f:
        lines = f.readlines()
        lines = [line.split() for line in lines]
        lines = np.array(lines)

        # Remove OTH label
        lines = lines[lines[:, 1] != 'OTH']
        
        # Prune frames
        lines = lines[::frame_skip]

        labels.append(lines)

labels = np.array(labels)

# Show frequency of labels
# unique, counts = np.unique(labels[:, 1], return_counts=True)
# import matplotlib.pyplot as plt
# plt.bar(unique, counts)
# plt.show()

In [4]:
# Load model
model = hub.load("https://tfhub.dev/google/movenet/singlepose/lightning/4")
movenet = model.signatures['serving_default']

In [5]:
data = []
for id, labels in zip(video_ids, labels):
    video = cv2.VideoCapture(f"data/videos/video{id}.mp4")
    for i, frame_num in enumerate(labels[:, 0].astype(int)):
        print(f'Loading video {id}... ({i}/{labels.shape[0]})', end='\r')
        video.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
        ret, frame = video.read()

        input_img = tf.expand_dims(frame, axis=0)
        input_img = tf.image.resize_with_pad(input_img, 192, 192)
        input_img = tf.cast(input_img, dtype=tf.int32)

        # Detection section
        keypoints_with_scores = movenet(input_img)['output_0'].numpy().flatten()
    
        data.append(keypoints_with_scores)

    video.release()

data = np.array(data)

Loading video 2... (2490/2491)

In [6]:
# One-hot encode labels
from sklearn.preprocessing import LabelEncoder

label_encoder = LabelEncoder()
label_encoder.fit(labels[:, 1])
labels = label_encoder.transform(labels[:, 1])

labels = keras.utils.to_categorical(labels)

In [12]:
from keras.layers import LSTM, Dense, LayerNormalization

# model = tf.keras.models.Sequential([
#     LSTM(128, input_shape=(17*3, 1)),
#     LayerNormalization(),
#     Dense(10, activation='relu'),
# ])

model = tf.keras.models.Sequential([
    LSTM(64, return_sequences=True, activation="relu",input_shape=(17*3, 1)),
    LayerNormalization(axis=1),
	LSTM(32, return_sequences=False, activation="relu",input_shape=(17*3, 1)),
	LayerNormalization(axis=1),
    Dense(32, activation='relu'),
	Dense(10, activation='softmax'),
])

model.summary()

Model: "sequential_3"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm_7 (LSTM)               (None, 51, 64)            16896     
                                                                 
 layer_normalization_7 (Laye  (None, 51, 64)           102       
 rNormalization)                                                 
                                                                 
 lstm_8 (LSTM)               (None, 32)                12416     
                                                                 
 layer_normalization_8 (Laye  (None, 32)               64        
 rNormalization)                                                 
                                                                 
 dense_6 (Dense)             (None, 32)                1056      
                                                                 
 dense_7 (Dense)             (None, 10)               

In [13]:
import datetime
from sklearn.model_selection import train_test_split
X_train, X_temp, y_train, y_temp = train_test_split(data, labels, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

log_dir = "logs/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

model.compile(optimizer='adam',
                loss='categorical_crossentropy',
                metrics=['accuracy'])
# Add Early Stopping callback
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=True)
model.fit(X_train, y_train, epochs=250, batch_size=32, validation_split=0.2, callbacks=[tensorboard_callback])


!tensorboard --logdir logs

Epoch 1/250
Epoch 2/250
Epoch 3/250
Epoch 4/250
Epoch 5/250
Epoch 6/250
Epoch 7/250
Epoch 8/250
Epoch 9/250
Epoch 10/250
Epoch 11/250
Epoch 12/250
Epoch 13/250
Epoch 14/250
Epoch 15/250
Epoch 16/250
Epoch 17/250
Epoch 18/250
Epoch 19/250
Epoch 20/250
Epoch 21/250
Epoch 22/250
Epoch 23/250
Epoch 24/250
Epoch 25/250
Epoch 26/250
Epoch 27/250
Epoch 28/250
Epoch 29/250
Epoch 30/250
Epoch 31/250
Epoch 32/250
Epoch 33/250
Epoch 34/250
Epoch 35/250
Epoch 36/250
Epoch 37/250
Epoch 38/250
Epoch 39/250
Epoch 40/250
Epoch 41/250
Epoch 42/250
Epoch 43/250
Epoch 44/250
Epoch 45/250
Epoch 46/250
Epoch 47/250
Epoch 48/250
Epoch 49/250
Epoch 50/250
Epoch 51/250
Epoch 52/250
Epoch 53/250
Epoch 54/250
Epoch 55/250
Epoch 56/250
Epoch 57/250
Epoch 58/250
Epoch 59/250
Epoch 60/250
Epoch 61/250
Epoch 62/250
Epoch 63/250
Epoch 64/250
Epoch 65/250
Epoch 66/250
Epoch 67/250
Epoch 68/250
Epoch 69/250
Epoch 70/250
Epoch 71/250
Epoch 72/250
Epoch 73/250
Epoch 74/250
Epoch 75/250
Epoch 76/250
Epoch 77/250
Epoch 78

KeyboardInterrupt: 

In [None]:
# Evaluate the model on the test set
test_loss, test_accuracy = model.evaluate(X_test, y_test)
print(f'Test Loss: {test_loss}, Test Accuracy: {test_accuracy}')

model.save("your_model.h5")