In [None]:
# !wget "https://www.dropbox.com/scl/fi/acun1rm43ge7ljr5qo6p2/wlasl.zip?rlkey=4o90zt8bhip49m7nows9gcsc8&dl=0"
# !pip install gdown
# !gdown --id "1e7nvLl2sdQO9VZDxYbWAZL9UBfUGmONv"

In [None]:
# !mv wlasl.zip* /wlasl.zip
# !unzip -qq /wlasl.zip -d dw-data
# !mv dw-data/data data
# !rm -r dw-data
# !rm wlasl.zip*
# !rm -r sample_data
# !git clone -b feature/mediapipe https://github.com/sceredi/VAR-wlals-recognition.git ./code
# !mv ./code/* ./
# !rm -r code

In [None]:
# !pip install -r requirements.txt
# !pip install mediapipe==0.10.9
# !pip uninstall -y keras
# !pip install keras==2.15.0

In [None]:
import gc
import pandas as pd
import numpy as np

from handcrafted.app.dataset.dataset import Dataset 
from wlasl_mediapipe.app.mp.mp_video import MediapipeVideo

In [None]:
def split_data(word_number: int):
  dataset = Dataset('data/WLASL_v0.3.json')
  glosses = pd.read_csv("data/wlasl_class_list.txt", sep="\t", header=None)[1].tolist()
  glosses = glosses[:word_number]
  train_videos = dataset.get_videos(
    lambda video: (video.split == "train") and video.gloss in glosses
  )
  val_videos = dataset.get_videos(
    lambda video: (video.split == "val") and video.gloss in glosses
  )
  test_videos = dataset.get_videos(
    lambda video: (video.split == "test") and video.gloss in glosses
  )
  train_videos = [MediapipeVideo(video, plot=False, expand_keypoints=False) for video in train_videos]
  print("Train videos loaded")
  val_videos = [MediapipeVideo(video, plot=False, expand_keypoints=False) for video in val_videos]
  print("Val videos loaded")
  test_videos = [MediapipeVideo(video, plot=False, expand_keypoints=False) for video in test_videos]
  print("Test videos loaded")
  return train_videos, val_videos, test_videos, glosses

In [None]:
word_number = 20
train_videos, val_videos, test_videos, glosses = split_data(word_number)

In [None]:
Y_train = [glosses.index(video.get_base_video().gloss) for video in train_videos]
Y_val = [glosses.index(video.get_base_video().gloss) for video in val_videos]
Y_test = [glosses.index(video.get_base_video().gloss) for video in test_videos]

In [None]:
def concatenate_data(video_list):
    concatenated_data = []
    for video in video_list:
        frames_data = []
        for i in range(len(video.sign_model.left_hand_list)):
            left_hand_data = video.sign_model.lh_embedding[i]
            right_hand_data = video.sign_model.rh_embedding[i]
            pose_data = video.sign_model.pose_embedding[i]
            face_data = video.sign_model.face_embedding[i]
            frame_data = np.concatenate((left_hand_data, right_hand_data, pose_data, face_data))
            frames_data.append(frame_data)
        concatenated_data.append(frames_data)
    return concatenated_data


In [None]:
X_train_concatenated = concatenate_data(train_videos)
del train_videos
gc.collect()
X_val_concatenated = concatenate_data(val_videos)
del val_videos
gc.collect()
X_test_concatenated = concatenate_data(test_videos)
del test_videos
gc.collect()

In [None]:
print(X_train_concatenated[0][0].shape)

# Model definition
## Libraries useful for ml

In [None]:

from tensorflow import keras
import tensorflow as tf

from tensorflow.keras.utils import to_categorical
from tensorflow.keras import layers
from tensorflow.keras.optimizers import Adam

## Preparing the data

In [None]:
# Convert your concatenated data to RaggedTensors
X_train_ragged = tf.ragged.constant(X_train_concatenated, dtype=tf.float32)
del X_train_concatenated
gc.collect()
X_val_ragged = tf.ragged.constant(X_val_concatenated, dtype=tf.float32)
del X_val_concatenated
gc.collect()
X_test_ragged = tf.ragged.constant(X_test_concatenated, dtype=tf.float32)
del X_test_concatenated
gc.collect()

In [None]:
Y_train_one_hot = to_categorical(Y_train, num_classes=word_number)
Y_val_one_hot = to_categorical(Y_val, num_classes=word_number)
Y_test_one_hot = to_categorical(Y_test, num_classes=word_number)

In [None]:
input_shape = (None, len(X_train_ragged[0][0]))

In [None]:
batch_size = 1
X_train_dataset = tf.data.Dataset.from_tensor_slices((X_train_ragged, Y_train_one_hot)).shuffle(buffer_size=X_train_ragged.shape[0]).batch(batch_size)
del X_train_ragged
gc.collect()
X_val_dataset = tf.data.Dataset.from_tensor_slices((X_val_ragged, Y_val_one_hot)).shuffle(buffer_size=X_val_ragged.shape[0]).batch(batch_size)
del X_val_ragged
gc.collect()
X_test_dataset = tf.data.Dataset.from_tensor_slices((X_test_ragged, Y_test_one_hot)).shuffle(buffer_size=X_test_ragged.shape[0]).batch(batch_size)
del X_test_ragged
gc.collect()

In [None]:
print(Y_test_one_hot)

### Defining the model

In [None]:
def build_rnn_gru(input_shape, gru_units_per_layer=[256, 256], output_count=2000,neuron_count_per_hidden_layer=[128,128],activation='relu'):
  model = keras.Sequential()
  model.add(layers.Input(shape=input_shape, ragged=True))
  for gru_units in gru_units_per_layer[:-1]:
    model.add(layers.GRU(units = gru_units, return_sequences=True, activation=activation, dropout=0.2, recurrent_dropout=0.2))
  
  model.add(layers.GRU(units = gru_units_per_layer[-1], activation=activation, dropout=0.2, recurrent_dropout=0.2))

  for n in neuron_count_per_hidden_layer:
    model.add(layers.Dense(n,activation=activation))

  model.add(layers.Dense(output_count, activation="softmax"))
  return model

In [None]:
def build_rnn_lstm(input_shape, lstm_units_per_layer=[256, 256], output_count=2000, neuron_count_per_hidden_layer=[128, 128], activation='relu'):
    model = keras.Sequential()
    model.add(layers.Input(shape=input_shape, ragged=True))
    
    for lstm_units in lstm_units_per_layer[:-1]:
        model.add(layers.LSTM(units=lstm_units, return_sequences=True, activation=activation))
    
    model.add(layers.LSTM(units=lstm_units_per_layer[-1], activation=activation))
    
    for n in neuron_count_per_hidden_layer:
        model.add(layers.Dense(n, activation=activation))
    
    model.add(layers.Dense(output_count, activation="softmax"))
    
    return model


### Model creation

In [None]:
model = build_rnn_gru(
    input_shape=input_shape,
    gru_units_per_layer=[64, 32],
    output_count=word_number,
    neuron_count_per_hidden_layer=[]
)

In [None]:
# model = build_rnn_lstm(
#     input_shape=input_shape,
#     lstm_units_per_layer=[256, 256, 256],
#     output_count=word_number,
#     neuron_count_per_hidden_layer=[128, 64, 32]
# )

In [None]:
model.compile(optimizer=Adam(learning_rate=0.001), loss="categorical_crossentropy", metrics=["accuracy"])
model.summary()

In [None]:
keras.utils.plot_model(model, "multi_input_and_output_model.png", show_shapes=True)

## Model fitting

In [None]:
n_epochs = 100
patience = 10

early_stop = keras.callbacks.EarlyStopping(monitor='val_loss', patience=patience, restore_best_weights=True)
model.fit(
    X_train_dataset,
    validation_data=X_val_dataset,  
    epochs=n_epochs,
    batch_size=batch_size,
    callbacks=[early_stop],
)

## Model predictions

In [None]:
results = model.evaluate(X_test_dataset)
print("test loss, test acc:", results)

In [None]:
Y_pred = model.predict(X_test_dataset)
accuracy = np.mean(np.argmax(Y_pred, axis=1) == Y_test)
print(f"Accuracy: {accuracy}")

In [None]:
print(np.argmax(Y_pred, axis=1))
print(Y_test)