In [1]:
import tensorflow as tf
import os
import random
import numpy as np

def seed_everything(seed):
  random.seed(seed)
  os.environ['PYTHONHASHSEED'] = str(seed)
  np.random.seed(seed)
  tf.random.set_seed(seed)

SEED = 22
seed_everything(seed=SEED)

In [2]:
from functools import reduce
from itertools import accumulate

landmark_lens = (
    (33, 4),
    (468, 3),
    (21, 3),
    (21, 3)
)
landmark_locs = list(accumulate(landmark_lens, lambda a, b: a + b[0]*b[1], initial=0))
landmarks_len = reduce(lambda r, loc: r + loc[0] * loc[1], landmark_lens, 0)
print(landmark_locs, landmarks_len)

[0, 132, 1536, 1599, 1662] 1662


In [3]:
# data_folder = 'tracks_binary_manual'
data_folder = 'tracks_binary'
labels = [label for label in os.listdir(data_folder) if os.path.isdir(f'{data_folder}/{label}')]
NUM_CLASSES = len(labels)

labels_tensor = tf.constant(labels)
ids_tensor = tf.constant(range(len(labels)))

ids_from_labels = tf.lookup.StaticHashTable(
    tf.lookup.KeyValueTensorInitializer(
        labels_tensor,
        ids_tensor
    ),
    default_value=-1
)

labels_from_ids = tf.lookup.StaticHashTable(
    tf.lookup.KeyValueTensorInitializer(
        ids_tensor,
        labels_tensor
    ),
    default_value=""
)

def to_categorical(label):
    return tf.one_hot(
        ids_from_labels.lookup(label),
        depth=NUM_CLASSES
    )

In [4]:
def process_binary(file_path):
    label = tf.strings.split(file_path, os.sep)[-2]

    raw = tf.io.read_file(file_path)
    data = tf.io.decode_raw(raw, tf.float32)
    data = tf.reshape(data, [-1, landmarks_len])

    pose = tf.reshape(data[:, 0:132], [-1, 33, 4])
    # lh = tf.reshape(data[:, 132:195], [-1, 21, 3])
    # rh = tf.reshape(data[:, 195:258], [-1, 21, 3])
    
    face = tf.reshape(data[:, 132:1536], [-1, 468, 3])
    lh = tf.reshape(data[:, 1536:1599], [-1, 21, 3])
    rh = tf.reshape(data[:, 1599:1662], [-1, 21, 3])
    
    # without z
    # new_pose = tf.concat((pose[:, :, :2], pose[:, :, -1:]), axis=2)
    new_pose = pose[:, :, :2]
    new_lh = lh[:, :, :2]
    new_rh = rh[:, :, :2]

    return (new_pose, new_lh, new_rh), to_categorical(label)
    # return (pose, face, lh, rh), to_categorical(label)

In [5]:
from tensorflow import reduce_max, reduce_min

FRAMES = 64

def flatten(x):
    pose = tf.reshape(x[0], shape=[-1, 33*2])
    lh = tf.reshape(x[1], shape=[-1, 21*2])
    rh = tf.reshape(x[2], shape=[-1, 21*2])
    return tf.concat([pose, lh, rh], axis=1)

def concat_joints(x):
    pose = x[0]
    lh = x[1]
    rh = x[2]
    return tf.concat([pose, lh, rh], axis=1)

def random_window(x):
    def pad(x):
        missing = FRAMES - size
        start_pad = tf.math.ceil(missing / 2)
        end_pad = tf.math.floor(missing / 2)
        return tf.concat([
            tf.tile([x[0]], [start_pad, 1, 1]),
            x,
            tf.tile([x[-1]], [end_pad, 1, 1])
        ], axis=0)

    def random_slice(x):
        i = tf.random.uniform(shape=(), maxval=size-FRAMES, dtype=tf.int32)
        return x[i: i+FRAMES]

    size = tf.shape(x)[0]
    return tf.cond(
        size <= FRAMES,
        lambda: pad(x),
        lambda: random_slice(x)
    )
    
def calc_bounding(pose, lh, rh):
    max_x = reduce_max(tf.stack([reduce_max(pose[:, :, :1]), reduce_max(lh[:, :, :1]), reduce_max(rh[:, :, :1])], axis=0))
    min_x = reduce_min(tf.stack([reduce_min(pose[:, :, :1]), reduce_min(lh[:, :, :1]), reduce_min(rh[:, :, :1])], axis=0))
    
    max_y = reduce_max(tf.stack([reduce_max(pose[:, :, 1:2]), reduce_max(lh[:, :, 1:2]), reduce_max(rh[:, :, 1:2])], axis=0))
    min_y = reduce_min(tf.stack([reduce_min(pose[:, :, 1:2]), reduce_min(lh[:, :, 1:2]), reduce_min(rh[:, :, 1:2])], axis=0))

    window = tf.cast((max_x - min_x, max_y - min_y), dtype=tf.float32)
    mid = ((max_x + min_x)/2, (max_y + min_y)/2)
    return (window, mid)

def scale(x, factor):
    pose, lh, rh = x[0], x[1], x[2]
    window, mid = calc_bounding(pose, lh, rh)
    scale = factor * window
    pose_shape, lh_shape, rh_shape = tf.shape(pose), tf.shape(lh), tf.shape(rh)
    
    pose_center = tf.tile([[[mid[0], mid[1]]]], [pose_shape[0], pose_shape[1], 1])
    lh_center = tf.tile([[[mid[0], mid[1]]]], [lh_shape[0], lh_shape[1], 1])
    rh_center = tf.tile([[[mid[0], mid[1]]]], [rh_shape[0], rh_shape[1], 1])
    
    pose_scale = tf.tile([[[scale[0], scale[1]]]], [pose_shape[0], pose_shape[1], 1])
    lh_scale = tf.tile([[[scale[0], scale[1]]]], [lh_shape[0], lh_shape[1], 1])
    rh_scale = tf.tile([[[scale[0], scale[1]]]], [rh_shape[0], rh_shape[1], 1])
    
    scaled_pose = pose_center + (pose - pose_center) * pose_scale
    scaled_lh = lh_center + (lh - lh_center) * lh_scale
    scaled_rh = rh_center + (rh - rh_center) * rh_scale

    return (scaled_pose, scaled_lh, scaled_rh)


def random_translation(x):
    pose, lh, rh = x[0], x[1], x[2]
    magnitude = tf.random.uniform(shape=[2], minval=-0.25, maxval=0.25)
    pose_shape, lh_shape, rh_shape = tf.shape(pose), tf.shape(lh), tf.shape(rh)
    
    pose_trans = tf.tile([[[magnitude[0], magnitude[1]]]], [pose_shape[0], pose_shape[1], 1])
    lh_trans = tf.tile([[[magnitude[0], magnitude[1]]]], [lh_shape[0], lh_shape[1], 1])
    rh_trans = tf.tile([[[magnitude[0], magnitude[1]]]], [rh_shape[0], rh_shape[1], 1])

    return (pose+pose_trans, lh+lh_trans, rh+rh_trans)
    
def flip(x):
    pose, lh, rh = x[0], x[1], x[2]
    pose_shape, lh_shape, rh_shape = tf.shape(pose), tf.shape(lh), tf.shape(rh)
    
    pose_neg = tf.tile([[[-1.0, 1]]], [pose_shape[0], pose_shape[1], 1])
    lh_neg = tf.tile([[[-1.0, 1]]], [lh_shape[0], lh_shape[1], 1])
    rh_neg = tf.tile([[[-1.0, 1]]], [rh_shape[0], rh_shape[1], 1])
    
    pose_trans = tf.tile([[[1.0, 0]]], [pose_shape[0], pose_shape[1], 1])
    lh_trans = tf.tile([[[1.0, 0]]], [lh_shape[0], lh_shape[1], 1])
    rh_trans = tf.tile([[[1.0, 0]]], [rh_shape[0], rh_shape[1], 1])

    flipped_pose = pose_trans + pose * pose_neg
    flipped_lh = lh_trans + lh * lh_neg
    flipped_rh = rh_trans + rh * rh_neg
    
    return (flipped_pose, flipped_lh, flipped_rh)
    
    
def prepare(ds, shuffle=False, augment=False, isTrans=False, isScale=False, isFlip=False):
    if augment:
        if isTrans:
            ds = ds.map(lambda x, y: (random_translation(x), y), num_parallel_calls=tf.data.AUTOTUNE)
        if isScale:
            ds = ds.map(lambda x, y: (scale(x, 0.1), y), num_parallel_calls=tf.data.AUTOTUNE)
        if isFlip:
            ds = ds.map(lambda x, y: (flip(x), y), num_parallel_calls=tf.data.AUTOTUNE)
               
    ds = ds.map(lambda x, y: (concat_joints(x), y), num_parallel_calls=tf.data.AUTOTUNE)

    ds = ds.map(lambda x, y: (random_window(x), y), num_parallel_calls=tf.data.AUTOTUNE)

    if shuffle:
        ds = ds.shuffle(1000, seed=SEED, reshuffle_each_iteration=False)
        
    ds = ds.batch(32)

    return ds.prefetch(buffer_size=tf.data.AUTOTUNE)

In [6]:
# a = tf.data.Dataset.list_files(f'{data_folder}/*/*')
# a = a.map(process_binary)
# a = a.map(lambda x, y: (concat_joints(x), y), num_parallel_calls=tf.data.AUTOTUNE)
# a = a.map(lambda x, y: (random_window(x), y), num_parallel_calls=tf.data.AUTOTUNE)
# a = a.map(lambda x, y: ([x[:-1, :, :], x[1:, :, :]-x[:-1, :, :]], y), num_parallel_calls=tf.data.AUTOTUNE)

# a = prepare(a)


In [6]:
from sklearn.model_selection import train_test_split
import shutil

# Split dataset into folders
def split_dataset(train_size=0.8, val_size=0.2):
    filenames = tf.io.matching_files(f"{data_folder}/*/*")
    filenames = tf.random.shuffle(filenames, seed=SEED)

    video = filenames.numpy()
    label = tf.strings.split(filenames, os.sep)[:, 1:2].flat_values.numpy()

    video_train, video_test, label_train, label_test = train_test_split(video, label, train_size=train_size, test_size=val_size)

    shutil.rmtree('dataset', ignore_errors=True)
    
    for video_path, label in zip(video_train, label_train):
        dest = os.path.join(b'dataset', b'train', label)
        os.makedirs(dest, exist_ok=True)
        shutil.copy2(video_path, dest)

    for video_path, label in zip(video_test, label_test):
        dest = os.path.join(b'dataset', b'test', label)
        os.makedirs(dest, exist_ok=True)
        shutil.copy2(video_path, dest)

In [7]:
split_dataset()

In [8]:
isTrans = False
isScale = False
isFlip = True
isAugment = isTrans or isScale or isFlip
print(isAugment)

True


In [9]:
train_ds = tf.data.Dataset.list_files('dataset/train/*/*')
train_ds = train_ds.map(process_binary)
train_ds = prepare(train_ds, augment=isAugment, isTrans=isTrans, isScale=isScale, isFlip=isFlip)

test_ds = tf.data.Dataset.list_files('dataset/test/*/*')
test_ds = test_ds.map(process_binary)
test_ds = prepare(test_ds)

In [10]:
print(train_ds)
print(test_ds)
print(len(train_ds), len(test_ds))

<PrefetchDataset shapes: ((None, None, 75, 2), (None, 10)), types: (tf.float32, tf.float32)>
<PrefetchDataset shapes: ((None, None, 75, 2), (None, 10)), types: (tf.float32, tf.float32)>
8 2


In [11]:
from keras.models import Model
from keras import Sequential
from keras.layers import *
from keras.layers.core import *
from keras.layers.convolutional import *
from keras import backend as K
import tensorflow as tf
from tensorflow.keras.layers import LSTM, Dense, Bidirectional
from tensorflow.keras.callbacks import TensorBoard, EarlyStopping, ReduceLROnPlateau
from tensorflow import keras
from wandb.keras import WandbCallback
import wandb

In [12]:
JOINTS = 75
JOINT_DIM = 2

In [13]:
class HCN(keras.Model):
  def __init__(self, num_class):
    super(HCN, self).__init__()
    self.num_class = num_class
    
    # position
    self.conv1 = Conv2D(filters=64, kernel_size=(1,1), padding='same')
    self.bn1 = BatchNormalization()
    self.act1 = LeakyReLU()
    
    self.conv2 = Conv2D(filters=32, kernel_size=(3,1), padding='same')
    self.bn2 = BatchNormalization()
    self.act2 = LeakyReLU()
        
    self.permute = Permute((1, 3, 2))
    
    self.conv3 = Conv2D(filters=32, kernel_size=(3, 3), padding='same')
    self.bn3 = BatchNormalization()
    self.act3 = LeakyReLU()
    self.mp3 = MaxPooling2D(pool_size=(2, 2), strides=2)
    
    self.conv4 = Conv2D(filters=64, kernel_size=(3, 3), padding='same')
    self.bn4 = BatchNormalization()
    self.act4 = LeakyReLU()
    self.do4 = SpatialDropout2D(rate=0.5)
    self.mp4 = MaxPooling2D(pool_size=(2, 2), strides=2)
       
       
    # motion
    self.conv1m = Conv2D(filters=64, kernel_size=(1,1), padding='same', strides=1)
    self.bn1m = BatchNormalization()
    self.act1m = LeakyReLU()
    
    self.conv2m = Conv2D(filters=32, kernel_size=(3,1), padding='same')
    self.bn2m = BatchNormalization()
    self.act2m = LeakyReLU()
    
    self.permutem = Permute((1, 3, 2))
    
    self.conv3m = Conv2D(filters=32, kernel_size=(3, 3), padding='same')
    self.mp3m = MaxPooling2D(pool_size=(2, 2), strides=2)
    self.bn3m = BatchNormalization()
    self.act3m = LeakyReLU()
    
    self.conv4m = Conv2D(filters=64, kernel_size=(3, 3), padding='same')
    self.bn4m = BatchNormalization()
    self.act4m = LeakyReLU()
    self.do4m = SpatialDropout2D(rate=0.5)
    self.mp4m = MaxPooling2D(pool_size=(2, 2), strides=2)
    
    
    # concat position and motion
    self.conv5 = Conv2D(filters=128, kernel_size=(3, 3), padding='same')
    self.bn5 = BatchNormalization()
    self.act5 = LeakyReLU()
    self.do5 = SpatialDropout2D(rate=0.5)
    self.mp5 = MaxPooling2D(pool_size=(2, 2), strides=2)
    
    self.conv6 = Conv2D(filters=256, kernel_size=(3, 3), padding='same')
    self.bn6 = BatchNormalization()
    self.act6 = LeakyReLU()
    self.do6 = SpatialDropout2D(rate=0.5)
    self.mp6 = MaxPooling2D(pool_size=(2, 2), strides=2)
    
    self.flatten = Flatten()
    
    self.fc7 = Dense(256, activation='relu')
    # self.do7 = SpatialDropout2D(rate=0.5)
    self.bn7 = BatchNormalization()
    self.act7 = LeakyReLU()
    self.fc8 = Dense(num_class, activation='softmax')
    
    
  def call(self, inputs):
    # N = num of batch
    # F = num of frames
    # J = num of joints
    # D = num of joint dimension
    
    # N, F, J, D = tf.shape(inputs)
    motion = inputs[:, 1:, :, :] - inputs[:, :-1, :, :]
    motion = tf.concat((motion, motion[:, -1:, :, :]), axis=1)
    
    # position
    x = self.conv1(inputs)
    x = self.bn1(x)
    x = self.act1(x)
    
    x = self.conv2(x)
    x = self.bn2(x)
    x = self.act2(x)
    
    x = self.permute(x)
    
    x = self.conv3(x)
    x = self.bn3(x)
    x = self.act3(x)
    x = self.mp3(x)
    
    x = self.conv4(x)
    x = self.bn4(x)
    x = self.act4(x)
    x = self.do4(x)
    x_p = self.mp4(x)
    
    # motion 
    x = self.conv1m(motion)
    x = self.bn1m(x)
    x = self.act1m(x)
    
    x = self.conv2m(x)
    x = self.bn2m(x)
    x = self.act2m(x)
    
    x = self.permutem(x)
    
    x = self.conv3m(x)
    x = self.bn3m(x)
    x = self.act3m(x)
    x = self.mp3m(x)
    
    x = self.conv4m(x)
    x = self.bn4m(x)
    x = self.act4m(x)
    x = self.do4m(x)
    x_m = self.mp4m(x)

    # concat
    x = tf.concat((x_p, x_m), axis=1)
    x = self.conv5(x)
    x = self.bn5(x)
    x = self.act5(x)
    x = self.do5(x)
    x = self.mp5(x)
    
    x = self.conv6(x)
    x = self.bn6(x)
    x = self.act6(x)
    x = self.do6(x)
    x = self.mp6(x)
    
    x = self.flatten(x)
    
    x = self.fc7(x)
    x = self.bn7(x)
    x = self.act7(x)
    # x = self.do7(x)
    
    x = self.fc8(x)

    return x

  def model(self):
    x = Input(shape=(64, 75, 2))
    return Model(inputs=[x], outputs=self.call(x))
  
  # def build(self):
  #   # Initialize the graph
  #   self._is_graph_network = True
  #   self._init_graph_network(
  #       inputs=self.input_layer,
  #       outputs=self.out
  #   )

In [15]:
wandb.init(
  project="HCN",
  entity="richardsonqiu",
  config={
    "input": (FRAMES, JOINTS, JOINT_DIM),
    
    "last_layer": NUM_CLASSES,
    "last_act": "softmax",
    
    "optimizer": "adam",
    "init_lr": 0.01,
    "loss": "categorical_crossentropy",
    "metric": "accuracy",
    "epoch": 500,
    "batch_size": 32,
    "data": "default",
    "landmarks": "pose, lh, rh",
    "landmarks_metadata": "without z",
    "isAugment": isAugment,
    "isTrans": isTrans,
    "isScale": isScale,
    "isFlip": isFlip
    })
wc = wandb.config

Failed to detect the name of this notebook, you can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.
[34m[1mwandb[0m: Currently logged in as: [33mrichardsonqiu[0m (use `wandb login --relogin` to force relogin)
[34m[1mwandb[0m: wandb version 0.12.9 is available!  To upgrade, please run:
[34m[1mwandb[0m:  $ pip install wandb --upgrade


In [16]:
if wc.optimizer == "adam":
  opt = keras.optimizers.Adam(learning_rate=wc.init_lr)
elif wc.optimizer == "sgd":
  opt = keras.optimizers.SGD(learning_rate=wc.init_lr, nesterov=True)
  
model = HCN(wc.last_layer)

model.compile(optimizer=opt, loss=wc.loss, metrics=[wc.metric])

In [43]:
# input_shape = (32, 64, 75, 2)

In [17]:
model.model().summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 64, 75, 2)]  0           []                               
                                                                                                  
 tf.__operators__.getitem (Slic  (None, 63, 75, 2)   0           ['input_1[0][0]']                
 ingOpLambda)                                                                                     
                                                                                                  
 tf.__operators__.getitem_1 (Sl  (None, 63, 75, 2)   0           ['input_1[0][0]']                
 icingOpLambda)                                                                                   
                                                                                              

In [45]:
isAugment

True

In [46]:
es_callback = EarlyStopping(monitor='val_loss', patience=20)
lr_callback = ReduceLROnPlateau(monitor='val_loss', patience=20, factor=0.5, min_lr=1e-6)
wandb_callback = WandbCallback(log_evaluation=True)

In [47]:
history = model.fit(train_ds, validation_data=test_ds, epochs=wc.epoch, callbacks=[lr_callback, wandb_callback])



Epoch 1/500

[34m[1mwandb[0m: [32m[41mERROR[0m Can't save model, h5py returned error: Saving the model to HDF5 format requires the model to be a Functional model or a Sequential model. It does not work for subclassed models, because such models are defined via the body of a Python method, which isn't safely serializable. Consider saving to the Tensorflow SavedModel format (by setting save_format="tf") or using `save_weights`.


Epoch 2/500
Epoch 3/500
Epoch 4/500
Epoch 5/500
Epoch 6/500
Epoch 7/500
Epoch 8/500
Epoch 9/500
Epoch 10/500
Epoch 11/500
Epoch 12/500
Epoch 13/500
Epoch 14/500
Epoch 15/500
Epoch 16/500
Epoch 17/500
Epoch 18/500
Epoch 19/500
Epoch 20/500
Epoch 21/500
Epoch 22/500
Epoch 23/500
Epoch 24/500
Epoch 25/500
Epoch 26/500
Epoch 27/500
Epoch 28/500
Epoch 29/500
Epoch 30/500
Epoch 31/500
Epoch 32/500
Epoch 33/500
Epoch 34/500
Epoch 35/500
Epoch 36/500
Epoch 37/500
Epoch 38/500
Epoch 39/500
Epoch 40/500
Epoch 41/500
Epoch 42/500
Epoch 43/500
Epoch 44/500
Epoch 45/500
Epoch 46/500
Epoch 47/500
Epoch 48/500
Epoch 49/500
Epoch 50/500
Epoch 51/500
Epoch 52/500
Epoch 53/500
Epoch 54/500
Epoch 55/500
Epoch 56/500
Epoch 57/500
Epoch 58/500
Epoch 59/500
Epoch 60/500
Epoch 61/500
Epoch 62/500
Epoch 63/500
Epoch 64/500
Epoch 65/500
Epoch 66/500
Epoch 67/500
Epoch 68/500
Epoch 69/500
Epoch 70/500
Epoch 71/500
Epoch 72/500
Epoch 73/500
Epoch 74/500
Epoch 75/500
Epoch 76/500
Epoch 77/500
Epoch 78/500
Epoch 7

Exception in thread Thread-23:
Traceback (most recent call last):
  File "C:\Users\lenovo\AppData\Local\Programs\Python\Python39\lib\threading.py", line 973, in _bootstrap_inner
    self.run()
  File "C:\Users\lenovo\AppData\Local\Programs\Python\Python39\lib\threading.py", line 910, in run
    self._target(*self._args, **self._kwargs)
  File "d:\FYP\sign-recognition\ActionDetectionforSignLanguage-main\ActionDetectionforSignLanguage-main\venv\lib\site-packages\wandb\sdk\wandb_run.py", line 149, in check_network_status
    status_response = self._interface.communicate_network_status()
  File "d:\FYP\sign-recognition\ActionDetectionforSignLanguage-main\ActionDetectionforSignLanguage-main\venv\lib\site-packages\wandb\sdk\interface\interface.py", line 120, in communicate_network_status
    resp = self._communicate_network_status(status)
  File "d:\FYP\sign-recognition\ActionDetectionforSignLanguage-main\ActionDetectionforSignLanguage-main\venv\lib\site-packages\wandb\sdk\interface\interfac

In [31]:
# model.save('models/glamorous-wave', save_format='tf')

INFO:tensorflow:Assets written to: models/glamorous-wave\assets


In [18]:
model.load_weights('models/glamorous-wave')

<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x26207398520>

## Evaluate Model

In [67]:
test_ds

<PrefetchDataset shapes: ((None, None, 75, 2), (None, 10)), types: (tf.float32, tf.float32)>

In [52]:
y_pred = []  # store predicted labels
y_true = []  # store true labels

# iterate over the dataset
for image_batch, label_batch in test_ds:   # use dataset.unbatch() with repeat
   # append true labels
   y_true.append(label_batch)
   # compute predictions
   preds = model.predict(image_batch)
   # append predicted labels
   y_pred.append(np.argmax(preds, axis = - 1))

# convert the true and predicted labels into tensors
correct_labels = tf.argmax(tf.concat([item for item in y_true], axis = 0), axis=1)
predicted_labels = tf.concat([item for item in y_pred], axis = 0)

In [53]:
correct_labels

<tf.Tensor: shape=(33,), dtype=int64, numpy=
array([0, 4, 2, 0, 9, 3, 7, 4, 4, 2, 5, 7, 8, 9, 3, 7, 8, 2, 8, 8, 6, 4,
       8, 3, 0, 9, 2, 0, 7, 2, 7, 9, 3], dtype=int64)>

In [54]:
predicted_labels

<tf.Tensor: shape=(33,), dtype=int64, numpy=
array([3, 4, 2, 0, 9, 3, 7, 4, 4, 2, 9, 0, 8, 9, 3, 7, 8, 2, 8, 8, 6, 4,
       8, 3, 0, 9, 2, 0, 7, 9, 7, 9, 3], dtype=int64)>

In [55]:
from sklearn.metrics import classification_report

print(classification_report(correct_labels, predicted_labels))

              precision    recall  f1-score   support

           0       0.75      0.75      0.75         4
           2       1.00      0.80      0.89         5
           3       0.80      1.00      0.89         4
           4       1.00      1.00      1.00         4
           5       0.00      0.00      0.00         1
           6       1.00      1.00      1.00         1
           7       1.00      0.80      0.89         5
           8       1.00      1.00      1.00         5
           9       0.67      1.00      0.80         4

    accuracy                           0.88        33
   macro avg       0.80      0.82      0.80        33
weighted avg       0.87      0.88      0.87        33



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


## Test in Real Time

In [19]:
import mediapipe as mp
import matplotlib.pyplot as plt

mp_holistic = mp.solutions.holistic # Holistic model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities

def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
    image.flags.writeable = False                  # Image is no longer writeable
    results = model.process(image)                 # Make prediction
    image.flags.writeable = True                   # Image is now writeable 
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR
    return image, results
  
def draw_styled_landmarks(image, results):
    # # Draw face connections
    # mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_CONTOURS, 
    #                          mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1), 
    #                          mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
    #                          ) 
    # Draw pose connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                             ) 
    # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                             ) 
    # Draw right hand connections  
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                             ) 

In [24]:
import cv2
cap = cv2.VideoCapture(0)
# Set mediapipe model 
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():

        # Read feed
        ret, frame = cap.read()

        # Make detections
        image, results = mediapipe_detection(frame, holistic)
        print(results)
        
        # Draw landmarks
        draw_styled_landmarks(image, results)

        # Show to screen
        cv2.imshow('OpenCV Feed', image)

        # Break gracefully
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()

<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.soluti

In [25]:
# colors = [(245,117,16), (117,245,16), (16,117,245)]
def prob_viz(res, labels, input_frame):
    output_frame = input_frame.copy()
    for num, prob in enumerate(res):
        cv2.rectangle(output_frame, (0,60+num*40), (int(prob*100), 90+num*40), (16,117,245), -1)
        cv2.putText(output_frame, labels[num], (0, 85+num*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
        
    return output_frame

In [26]:
def a(results):    
    pose = []
    if results.pose_landmarks:
        for res in results.pose_landmarks.landmark:
            test = np.array([res.x, res.y])
            pose.append(test)
    else:
        pose.append(np.tile([0], (33, 2)))

In [27]:
def extract_keypoints(results):
    pose = np.array([[res.x, res.y] for res in results.pose_landmarks.landmark]) if results.pose_landmarks else np.tile([0], (33, 2))
    lh = np.array([[res.x, res.y] for res in results.left_hand_landmarks.landmark]) if results.left_hand_landmarks else np.tile([0], (21, 2))
    rh = np.array([[res.x, res.y] for res in results.right_hand_landmarks.landmark]) if results.right_hand_landmarks else np.tile([0], (21, 2))
    return np.concatenate([pose, lh, rh])

In [28]:
extract_keypoints(results)

array([[0.5254308 , 0.49476978],
       [0.55823445, 0.42292893],
       [0.57605416, 0.42291734],
       [0.5918318 , 0.42355078],
       [0.49740019, 0.42255518],
       [0.47713074, 0.42223257],
       [0.45775756, 0.42170289],
       [0.61595953, 0.44830227],
       [0.43009824, 0.44464445],
       [0.55537087, 0.57297027],
       [0.48438868, 0.57045341],
       [0.72707999, 0.77820545],
       [0.30725774, 0.76373208],
       [0.91141617, 1.09877181],
       [0.17225239, 1.0729785 ],
       [0.83203924, 0.65800077],
       [0.07695107, 1.561324  ],
       [0.85637426, 0.4788157 ],
       [0.03064613, 1.66476631],
       [0.80121231, 0.46717927],
       [0.06015164, 1.6627934 ],
       [0.76437086, 0.53582078],
       [0.08912392, 1.62979555],
       [0.63620865, 1.57962072],
       [0.3476004 , 1.5641284 ],
       [0.60690308, 2.27096772],
       [0.33939591, 2.24724078],
       [0.60023874, 2.85648012],
       [0.34476224, 2.84007692],
       [0.60607922, 2.93607545],
       [0.

In [29]:
actions = labels

### Real Time Video

In [30]:
# 1. New detection variables
sequence = []
sentence = []
predictions = []
threshold = 0.85

cap = cv2.VideoCapture(0)
# Set mediapipe model 
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():

        # Read feed
        ret, frame = cap.read()

        # Make detections
        image, results = mediapipe_detection(frame, holistic)
        print(results)
        
        # Draw landmarks
        draw_styled_landmarks(image, results)
        
        # 2. Prediction logic
        keypoints = extract_keypoints(results)
        sequence.append(keypoints)
        sequence = sequence[-64:] # last 32 frames
        
        if len(sequence) == 64:
            res = model.predict(np.expand_dims(sequence, axis=0))[0]
            print(actions[np.argmax(res)])
            predictions.append(np.argmax(res))
            
            
        #3. Viz logic
            if np.unique(predictions[-10:])[0] == np.argmax(res): 
                if res[np.argmax(res)] > threshold: 
                    
                    if len(sentence) > 0: 
                        if actions[np.argmax(res)] != sentence[-1]:
                            sentence.append(actions[np.argmax(res)])
                    else:
                        sentence.append(actions[np.argmax(res)])

            if len(sentence) > 4: 
                sentence = sentence[-4:]

            # Viz probabilities
            image = prob_viz(res, actions, image)
            
        cv2.rectangle(image, (0,0), (640, 40), (245, 117, 16), -1)
        cv2.putText(image, ' '.join(sentence), (3,30), 
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        
        # Show to screen
        cv2.imshow('OpenCV Feed', image)

        # Break gracefully
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()

<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.soluti

TypeError: in user code:

    File "d:\FYP\sign-recognition\ActionDetectionforSignLanguage-main\ActionDetectionforSignLanguage-main\venv\lib\site-packages\keras\engine\training.py", line 1621, in predict_function  *
        return step_function(self, iterator)
    File "d:\FYP\sign-recognition\ActionDetectionforSignLanguage-main\ActionDetectionforSignLanguage-main\venv\lib\site-packages\keras\engine\training.py", line 1611, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "d:\FYP\sign-recognition\ActionDetectionforSignLanguage-main\ActionDetectionforSignLanguage-main\venv\lib\site-packages\keras\engine\training.py", line 1604, in run_step  **
        outputs = model.predict_step(data)
    File "d:\FYP\sign-recognition\ActionDetectionforSignLanguage-main\ActionDetectionforSignLanguage-main\venv\lib\site-packages\keras\engine\training.py", line 1572, in predict_step
        return self(x, training=False)
    File "d:\FYP\sign-recognition\ActionDetectionforSignLanguage-main\ActionDetectionforSignLanguage-main\venv\lib\site-packages\keras\utils\traceback_utils.py", line 67, in error_handler
        raise e.with_traceback(filtered_tb) from None

    TypeError: Exception encountered when calling layer "hcn" (type HCN).
    
    in user code:
    
        File "C:\Users\lenovo\AppData\Local\Temp/ipykernel_10424/2781504093.py", line 85, in call  *
            x = self.conv1(inputs)
        File "d:\FYP\sign-recognition\ActionDetectionforSignLanguage-main\ActionDetectionforSignLanguage-main\venv\lib\site-packages\keras\utils\traceback_utils.py", line 67, in error_handler  **
            raise e.with_traceback(filtered_tb) from None
    
        TypeError: Exception encountered when calling layer "conv2d" (type Conv2D).
        
        Input 'filter' of 'Conv2D' Op has type float32 that does not match type int32 of argument 'input'.
        
        Call arguments received:
          • inputs=tf.Tensor(shape=(None, 64, 75, 2), dtype=int32)
    
    
    Call arguments received:
      • inputs=tf.Tensor(shape=(None, 64, 75, 2), dtype=int32)


### Offline Video

In [50]:
# 1. New detection variables
sequence = []
sentence = []
predictions = []
threshold = 0.85

cap = cv2.VideoCapture('test_videos/test1.avi')

frame_width = int(cap.get(3))
frame_height = int(cap.get(4))
size = (frame_width, frame_height)
writer = cv2.VideoWriter('test_videos/model1_test1.avi',
                         cv2.VideoWriter_fourcc(*'MJPG'),
                         10, size)

# Set mediapipe model 
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():

        # Read feed
        ret, frame = cap.read()

        # Make detections
        image, results = mediapipe_detection(frame, holistic)
        print(results)
        
        # Draw landmarks
        draw_styled_landmarks(image, results)
        
        # 2. Prediction logic
        keypoints = extract_keypoints(results)
        sequence.append(keypoints)
        sequence = sequence[-64:] # last 32 frames
        
        if len(sequence) == 64:
            res = model.predict(np.expand_dims(sequence, axis=0))[0]
            print(actions[np.argmax(res)])
            predictions.append(np.argmax(res))
            
            
        #3. Viz logic
            if np.unique(predictions[-10:])[0] == np.argmax(res): 
                if res[np.argmax(res)] > threshold: 
                    
                    if len(sentence) > 0: 
                        if actions[np.argmax(res)] != sentence[-1]:
                            sentence.append(actions[np.argmax(res)])
                    else:
                        sentence.append(actions[np.argmax(res)])

            if len(sentence) > 4: 
                sentence = sentence[-4:]

            # Viz probabilities
            image = prob_viz(res, actions, image)
            
        cv2.rectangle(image, (0,0), (640, 40), (245, 117, 16), -1)
        cv2.putText(image, ' '.join(sentence), (3,30), 
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        
                
        # Save Video
        writer.write(image)
        
        # Show to screen
        cv2.imshow('OpenCV Feed', image)

        
        # Break gracefully
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
        
    cap.release()
    writer.release()
    cv2.destroyAllWindows()

<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.soluti

error: OpenCV(4.5.4) D:\a\opencv-python\opencv-python\opencv\modules\imgproc\src\color.cpp:182: error: (-215:Assertion failed) !_src.empty() in function 'cv::cvtColor'
