In [4]:
import tensorflow as tf
import pandas as pd
import glob
import json
import numpy as np

In [5]:
class CFG:
    left_ROWS_per_frame = 21
    sequence_length = 600
    batch_size = 32

In [6]:
class CustomData(tf.keras.utils.Sequence):
    def __init__(self,df_path,num_frames=20,batch_size=8,shuffle=True,\
                 labels_path='sign_to_prediction_index_map.json'):
        self.df = pd.read_csv(df_path)
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.num_frames = num_frames
        self.labels  = json.load(open('sign_to_prediction_index_map.json','r'))
        self.on_epoch_end()
        
    def on_epoch_end(self):
        self.indexes = np.arange(len(self.df))
        if self.shuffle:
            np.random.shuffle(self.indexes)
    
    def __getitem__(self,index):
        batches = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
        left_hand_input = np.zeros(shape=(self.batch_size,self.num_frames,CFG.left_ROWS_per_frame,3))
        right_hand_input = np.zeros(shape=(self.batch_size,self.num_frames,CFG.left_ROWS_per_frame,3))
        labels = []
        for i,row_val in enumerate(batches):
            row = self.df.iloc[row_val]
            left_hand,right_hand = self.load_video(row['path'])
            left_hand_input[i,:] = left_hand
            right_hand_input[i,:] = right_hand
            labels.append(self.labels[row['sign']])
        return [left_hand_input,right_hand_input],np.asarray(labels)
            
    def load_video(self,video_path):
        video_df = pd.read_parquet(video_path, engine='pyarrow')
        video_df.fillna(0,inplace=True)
        left_df = video_df[video_df.type=='left_hand']
        left_values = left_df[['x','y','z']].values
        left_hand_array = np.zeros(shape=(600*CFG.left_ROWS_per_frame,3),dtype=np.float32)
        left_hand_array[-len(left_values):] = left_values
        left_hand_array = left_hand_array.reshape(600,CFG.left_ROWS_per_frame,3)
        right_df = video_df[video_df.type=='right_hand']
        right_values = right_df[['x','y','z']].values
        right_hand_array = np.zeros(shape=(600*CFG.left_ROWS_per_frame,3),dtype=np.float32)
        right_hand_array[-len(right_values):] = right_values
        right_hand_array = right_hand_array.reshape(600,CFG.left_ROWS_per_frame,3)
        return left_hand_array, right_hand_array
    
    def __len__(self):
        return len(self.df)//self.batch_size
    

In [7]:
datagen = CustomData('train.csv',num_frames=600,batch_size=CFG.batch_size)

In [9]:
def conv1d_lstm_block(inputs, filters):
    vector = tf.keras.layers.ConvLSTM1D(filters=32, kernel_size=3)(inputs)
    #for f in filters:
    #    vector = tf.keras.layers.Conv1D(filters=f, kernel_size=8)(vector)
    #    vector = tf.keras.layers.MaxPooling1D()(vector)
    vector = tf.keras.layers.Dropout(0.3)(vector)
    return vector

def get_model():
    input1 = tf.keras.Input((CFG.sequence_length, CFG.left_ROWS_per_frame, 3), dtype=tf.float32)
    input2 = tf.keras.Input((CFG.sequence_length, CFG.left_ROWS_per_frame, 3), dtype=tf.float32)
    left_hand_vector = conv1d_lstm_block(input1, [64])
    right_hand_vector = conv1d_lstm_block(input2, [64])
    vector = tf.keras.layers.Concatenate(axis=1)([left_hand_vector, right_hand_vector])
    vector = tf.keras.layers.Flatten()(vector)
    output = tf.keras.layers.Dense(250, activation="softmax")(vector)
    model = tf.keras.Model(inputs=[input1,input2], outputs=output)
    model.compile(
        loss=tf.keras.losses.SparseCategoricalCrossentropy(), metrics=[
            "accuracy",
        ]
    )
    return model

In [10]:
model = get_model()

In [11]:
model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 600, 21, 3)  0           []                               
                                ]                                                                 
                                                                                                  
 input_2 (InputLayer)           [(None, 600, 21, 3)  0           []                               
                                ]                                                                 
                                                                                                  
 conv_lstm1d (ConvLSTM1D)       (None, 19, 32)       13568       ['input_1[0][0]']                
                                                                                              

In [None]:
file_name = "model.h5"
callbacks = [
    tf.keras.callbacks.ModelCheckpoint(
        file_name, 
        save_best_only=True, 
        restore_best_weights=True, 
        monitor="val_accuracy",
        mode="max"
    ),
    tf.keras.callbacks.EarlyStopping(
        patience=5, 
        monitor="val_accuracy",
        mode="max"
    )
]
model.fit(datagen, epochs=30, callbacks=callbacks)
model = tf.keras.models.load_model(file_name)

Epoch 1/30

In [55]:
x,y = datagen[0]