In [None]:
import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
import cv2
import os

# Import matplotlib libraries
from matplotlib import pyplot as plt
from matplotlib.collections import LineCollection
import matplotlib.patches as patches
# Some modules to display an animation using imageio.
import imageio
print(tf.config.list_physical_devices('GPU'))

import sys
sys.path.insert(1, '..')
from GRU import BIGRU
import pytorch_utils

import torch
torch.zeros(1).cuda()

dataset_dir = 'training/'

In [None]:
#@title Helper functions for visualization

# Dictionary that maps from joint names to keypoint indices.
KEYPOINT_DICT = {
    'nose': 0,
    'left_eye': 1,
    'right_eye': 2,
    'left_ear': 3,
    'right_ear': 4,
    'left_shoulder': 5,
    'right_shoulder': 6,
    'left_elbow': 7,
    'right_elbow': 8,
    'left_wrist': 9,
    'right_wrist': 10,
    'left_hip': 11,
    'right_hip': 12,
    'left_knee': 13,
    'right_knee': 14,
    'left_ankle': 15,
    'right_ankle': 16
}

# Maps bones to a matplotlib color name.
KEYPOINT_EDGE_INDS_TO_COLOR = {
    (0, 1): 'm',
    (0, 2): 'c',
    (1, 3): 'm',
    (2, 4): 'c',
    (0, 5): 'm',
    (0, 6): 'c',
    (5, 7): 'black',
    (7, 9): 'black',
    (6, 8): 'white',
    (8, 10): 'white',
    (5, 6): 'y',
    (5, 11): 'm',
    (6, 12): 'c',
    (11, 12): 'y',
    (11, 13): 'm',
    (13, 15): 'm',
    (12, 14): 'c',
    (14, 16): 'c'
}

In [None]:
model_name = "movenet_thunder" #@param ["movenet_lightning", "movenet_thunder", "movenet_lightning_f16.tflite", "movenet_thunder_f16.tflite", "movenet_lightning_int8.tflite", "movenet_thunder_int8.tflite"]

def movenet(input_image):
    """Runs detection on an input image.

    Args:
    input_image: A [1, height, width, 3] tensor represents the input image
        pixels. Note that the height/width should already be resized and match the
        expected input resolution of the model before passing into this function.

    Returns:
    A [1, 1, 17, 3] float numpy array representing the predicted keypoint
    coordinates and scores.
    """

    if "movenet_lightning" in model_name:
        module = hub.load("https://tfhub.dev/google/movenet/singlepose/lightning/4")
        input_size = 192
    elif "movenet_thunder" in model_name:
        module = hub.load("https://tfhub.dev/google/movenet/singlepose/thunder/4")
        input_size = 256
    else:
        raise ValueError("Unsupported model name: %s" % model_name)

    model = module.signatures['serving_default']

    # SavedModel format expects tensor type of int32.
    input_image = tf.cast(input_image, dtype=tf.int32)
    # Run model inference.
    outputs = model(input_image)
    # Output is a [1, 1, 17, 3] tensor.
    keypoints_with_scores = outputs['output_0'].numpy()
    return keypoints_with_scores

In [None]:
def process(path):
    # Load the input image.
    image_path = os.path.join(dir, path)#'training/16/11783.3.jpg'
    image = tf.io.read_file(image_path)
    image = tf.image.decode_jpeg(image)

    # Resize and pad the image to keep the aspect ratio and fit the expected size.
    input_image = tf.expand_dims(image, axis=0)
    input_image = tf.image.resize_with_pad(input_image, input_size, input_size)

    # Run model inference.
    keypoints_with_scores = movenet(input_image)
    print(keypoints_with_scores)

    # Visualize the predictions with image.
    display_image = tf.expand_dims(image, axis=0)
    display_image = tf.cast(tf.image.resize_with_pad(
        display_image, 1280, 1280), dtype=tf.int32)
    output_overlay = draw_prediction_on_image(
        np.squeeze(display_image.numpy(), axis=0), keypoints_with_scores, threshold=0.3)

    plt.figure(figsize=(5, 5))
    plt.imshow(output_overlay)
    _ = plt.axis('off')

# import os
# from ipywidgets import interact
# dir = 'training/17'
# files = os.listdir(dir)
# interact(process, path=files)

In [None]:
def load_imigue(dir):
    import pandas as pd
    import os

    df = pd.DataFrame(columns=['path', 'class', 'video_id', 'frame'])
    for i in range(1, 33):
        dir = os.path.join(dir, str(i))
        files = os.listdir(dir)
        for file in files:
            df.loc[len(df.index)] = [os.path.join(str(i), file), i, file.split('.')[0], file.split('.')[1]]
    return df
    # return pd.read_csv('metadata.csv')

df = load_imigue(dataset_dir)

In [None]:
def add_pose_data(df):
    for k in KEYPOINT_DICT.keys():
        df[f'{k}_0'] = 0.0
        df[f'{k}_1'] = 0.0
        df[f'{k}_score'] = 0.0

    for i in range(len(df)):
        row = df.iloc[i]
        # Load the input image.
        image_path = os.path.join(dataset_dir, row.path)
        image = tf.io.read_file(image_path)
        image = tf.image.decode_jpeg(image)

        # Resize and pad the image to keep the aspect ratio and fit the expected size.
        input_size = 256
        input_image = tf.expand_dims(image, axis=0)
        input_image = tf.image.resize_with_pad(input_image, input_size, input_size)

        # Run model inference.
        keypoints_with_scores = movenet(input_image)
        for k, v in KEYPOINT_DICT.items():
            values = keypoints_with_scores[0][0][v]
            df.iloc[i, df.columns.get_loc(f'{k}_0')] = values[0]
            df.iloc[i, df.columns.get_loc(f'{k}_1')] = values[1]
            df.iloc[i, df.columns.get_loc(f'{k}_score')] = values[2]

add_pose_data(df)
df

In [None]:
def make_lstm_input(df):
    df = df[['class', 'video_id', 'nose_0', 'nose_1', 'nose_score',
       'left_eye_0', 'left_eye_1', 'left_eye_score', 'right_eye_0',
       'right_eye_1', 'right_eye_score', 'left_ear_0', 'left_ear_1',
       'left_ear_score', 'right_ear_0', 'right_ear_1', 'right_ear_score',
       'left_shoulder_0', 'left_shoulder_1', 'left_shoulder_score',
       'right_shoulder_0', 'right_shoulder_1', 'right_shoulder_score',
       'left_elbow_0', 'left_elbow_1', 'left_elbow_score', 'right_elbow_0',
       'right_elbow_1', 'right_elbow_score', 'left_wrist_0', 'left_wrist_1',
       'left_wrist_score']]
    df = df.groupby(['class','video_id']).apply(lambda x: x.values.tolist()).tolist()
    seq_target_count = 7
    data_tmp = []
    for i in range(len(df)):
        sequences = df[i]
        data_tmp.append([])
        for j in range(seq_target_count):
            if j < len(sequences):
                data_tmp[i].append(sequences[j])
            else:
                # data_tmp[i].append(sequences[-1])
                data_tmp[i].append([0] * 32)
    data_tmp = np.array(data_tmp)
    # data_tmp = data_tmp[:11000]
    return data_tmp[:, :, 2:], data_tmp[:, 0, 0].flatten().astype(int)

data_X, data_Y = make_lstm_input(df)
data_Y = data_Y - 1

In [None]:
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(data_X, data_Y, test_size=0.33, random_state=42)

In [None]:
device = 'cuda'
model = BIGRU(30, 50, num_class=32, device=device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
loss_fn = torch.nn.CrossEntropyLoss()
for i in range(10):
    loss = pytorch_utils.train_one_epoch(i, model, pytorch_utils.make_dataset(X_train, y_train, batch_size=64), optimizer, loss_fn, device=device)
# torch.save(model, 'bigru.pth')

In [None]:
with torch.no_grad():
    model.eval()
    output = model(torch.Tensor(X_test).to(device))
    maxk = 1
    _, y_pred = output.topk(maxk, 1, True, True)
    y_pred = y_pred.t()[0].cpu().numpy()

# y_pred = model(X_test)
# y_pred = np.argmax(y_pred, axis=1)

In [None]:
from sklearn.metrics import classification_report, ConfusionMatrixDisplay
report = classification_report(y_test, y_pred)
print(report)

In [None]:
ConfusionMatrixDisplay.from_predictions(y_test, y_pred, include_values=False)