In [None]:
print("\n... PIP INSTALLS STARTING ...\n")
!pip install tflite-runtime
import tflite_runtime.interpreter as tflite
print("\n... PIP INSTALLS COMPLETE ...\n")


print("\n... IMPORTS STARTING ...\n")
print("\n\tVERSION INFORMATION")

# Machine Learning and Data Science Imports (basics)
import tensorflow as tf; print(f"\t\t– TENSORFLOW VERSION: {tf.__version__}");
import tensorflow_io as tfio; print(f"\t\t– TENSORFLOW-IO VERSION: {tfio.__version__}");
import tensorflow_addons as tfa; print(f"\t\t– TENSORFLOW-ADDONS VERSION: {tfa.__version__}");
import pandas as pd; pd.options.mode.chained_assignment = None; pd.set_option('display.max_columns', None);
import numpy as np; print(f"\t\t– NUMPY VERSION: {np.__version__}");
import sklearn; print(f"\t\t– SKLEARN VERSION: {sklearn.__version__}");

# Built-In Imports (mostly don't worry about these)
from sklearn.model_selection import StratifiedKFold, StratifiedGroupKFold
from kaggle_datasets import KaggleDatasets
from collections import Counter
from datetime import datetime
from zipfile import ZipFile
from glob import glob
import Levenshtein
import warnings
import requests
import hashlib
import imageio
import IPython
import sklearn
import urllib
import zipfile
import pickle
import random
import shutil
import string
import json
import math
import time
import gzip
import ast
import sys
import io
import os
import gc
import re

# Visualization Imports (overkill)
from matplotlib.animation import FuncAnimation
from matplotlib.colors import ListedColormap
from matplotlib.patches import Rectangle
import matplotlib.patches as patches
import plotly.graph_objects as go
from IPython.display import HTML
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm; tqdm.pandas();
import plotly.express as px
import tifffile as tif
import seaborn as sns
from PIL import Image, ImageEnhance; Image.MAX_IMAGE_PIXELS = 5_000_000_000;
import matplotlib; print(f"\t\t– MATPLOTLIB VERSION: {matplotlib.__version__}");
from matplotlib import animation, rc; rc('animation', html='jshtml')
import plotly
import PIL
import cv2

import plotly.io as pio
print(pio.renderers)

def seed_it_all(seed=7):
    """ Attempt to be Reproducible """
    os.environ['PYTHONHASHSEED'] = str(seed)
    random.seed(seed)
    np.random.seed(seed)
    tf.random.set_seed(seed)

seed_it_all()

print("\n\n... IMPORTS COMPLETE ...\n")

In [None]:
def flatten_l_o_l(nested_list):
    """Flatten a list of lists into a single list.

    Args:
        nested_list (list): 
            – A list of lists (or iterables) to be flattened.

    Returns:
        list: A flattened list containing all items from the input list of lists.
    """
    return [item for sublist in nested_list for item in sublist]


def print_ln(symbol="-", line_len=110, newline_before=False, newline_after=False):
    """Print a horizontal line of a specified length and symbol.

    Args:
        symbol (str, optional): 
            – The symbol to use for the horizontal line
        line_len (int, optional): 
            – The length of the horizontal line in characters
        newline_before (bool, optional): 
            – Whether to print a newline character before the line
        newline_after (bool, optional): 
            – Whether to print a newline character after the line
    """
    if newline_before: print();
    print(symbol * line_len)
    if newline_after: print();
        
        
def read_json_file(file_path):
    """Read a JSON file and parse it into a Python object.

    Args:
        file_path (str): The path to the JSON file to read.

    Returns:
        dict: A dictionary object representing the JSON data.
        
    Raises:
        FileNotFoundError: If the specified file path does not exist.
        ValueError: If the specified file path does not contain valid JSON data.
    """
    try:
        # Open the file and load the JSON data into a Python object
        with open(file_path, 'r') as file:
            json_data = json.load(file)
        return json_data
    except FileNotFoundError:
        # Raise an error if the file path does not exist
        raise FileNotFoundError(f"File not found: {file_path}")
    except ValueError:
        # Raise an error if the file does not contain valid JSON data
        raise ValueError(f"Invalid JSON data in file: {file_path}")
        
def get_sign_df(pq_path, invert_y=True):
    sign_df = pd.read_parquet(pq_path)
    
    # y value is inverted (Thanks @danielpeshkov)
    if invert_y: sign_df["y"] *= -1 
        
    return sign_df

ROWS_PER_FRAME = 543  # number of landmarks per frame
def load_relevant_data_subset(pq_path):
    data_columns = ['x', 'y', 'z']
    data = pd.read_parquet(pq_path, columns=data_columns)
    n_frames = int(len(data) / ROWS_PER_FRAME)
    data = data.values.reshape(n_frames, ROWS_PER_FRAME, len(data_columns))
    return data.astype(np.float32)

In [None]:
# Define the path to the root data directory
DATA_DIR         = "/kaggle/input/asl-signs"
EXTEND_TRAIN_DIR = "/kaggle/input/gislr-extended-train-dataframe" 
NP_FILE_DIR      = "/kaggle/input/isolated-sign-language-aggregation-preparation"

print("\n... BASIC DATA SETUP STARTING ...\n")
print("\n\n... LOAD TRAIN DATAFRAME FROM CSV FILE ...\n")

LOAD_EXTENDED = True
if LOAD_EXTENDED and os.path.isfile(os.path.join(EXTEND_TRAIN_DIR, "extended_train.csv")):
    train_df = pd.read_csv(os.path.join(EXTEND_TRAIN_DIR, "extended_train.csv"))
else:
    train_df = pd.read_csv(os.path.join(DATA_DIR, "train.csv"))
    train_df["path"] = DATA_DIR+"/"+train_df["path"]
display(train_df)

print("\n\n... LOAD SIGN TO PREDICTION INDEX MAP FROM JSON FILE ...\n")
s2p_map = {k.lower():v for k,v in read_json_file(os.path.join(DATA_DIR, "sign_to_prediction_index_map.json")).items()}
p2s_map = {v:k for k,v in read_json_file(os.path.join(DATA_DIR, "sign_to_prediction_index_map.json")).items()}
encoder = lambda x: s2p_map.get(x.lower())
decoder = lambda x: p2s_map.get(x)

DEMO_ROW = 283
print(f"\n\n... DEMO SIGN/EVENT DATAFRAME FOR ROW {DEMO_ROW} - SIGN={train_df.iloc[DEMO_ROW]['sign']} ...\n")
demo_sign_df = get_sign_df(train_df.iloc[DEMO_ROW]["path"])
display(demo_sign_df)

# Landmark IDs start at 0 for each respective type and count up
FRAME_TYPE_ORDER_DETAIL = demo_sign_df.groupby("frame")["type"].apply(list).values[0]
FRAME_TYPE_ORDER = sorted(set(FRAME_TYPE_ORDER_DETAIL))
print(FRAME_TYPE_ORDER)

# https://www.kaggle.com/competitions/asl-signs/discussion/391812#2168354
lipsUpperOuter = [61, 185, 40, 39, 37, 0, 267, 269, 270, 409, 291]
lipsLowerOuter = [146, 91, 181, 84, 17, 314, 405, 321, 375, 291]
lipsUpperInner = [78, 191, 80, 81, 82, 13, 312, 311, 310, 415, 308]
lipsLowerInner = [78, 95, 88, 178, 87, 14, 317, 402, 318, 324, 308]
lips = lipsUpperOuter + lipsLowerOuter + lipsUpperInner + lipsLowerInner
FRAME_TYPE_IDX_MAP = {
    "lips"       : np.array(lips),
    "left_hand"  : np.arange(468, 489),
    "pose"       : np.arange(489, 522),
    "right_hand" : np.arange(522, 543),
}

for k,v in FRAME_TYPE_IDX_MAP.items():
    print(k, len(v))

In [None]:
all_x = np.load(os.path.join(NP_FILE_DIR, "feature_data.npy")).astype(np.float32)
all_y = np.load(os.path.join(NP_FILE_DIR, "feature_labels.npy")).astype(np.uint8)

# add nan back in not to mess up means/std
all_x = np.where(all_x==0.0, np.nan, all_x)

# Get mean and std ignoring nans
all_mean = np.nanmean(all_x, keepdims=True, axis=0)
all_std = np.nanstd(all_x, keepdims=True, axis=0)

# Standardize around 0
all_x = (all_x-all_mean)/all_std

# Back to 0s
all_x = np.nan_to_num(all_x)

# There are 21 participants so we use 7 folds
# 3 participants in val every time
N_PARTICIPANTS = train_df.participant_id.nunique()
RH_SIGNERS = [26734, 28656, 25571, 62590, 29302, 
                       49445, 53618, 18796,  4718,  2044, 
                       37779, 30680]

# We are including 37055 in LH Signer
LH_SIGNERS  = [16069, 32319, 36257, 22343, 27610, 61333, 34503, 55372, 37055]


K_FOLDS = N_PARTICIPANTS
def get_folds(df, k_folds, force_lh=True, lh_signers=LH_SIGNERS[:-1]):
    while True:
        sgkf = StratifiedGroupKFold(n_splits=K_FOLDS, shuffle=True)
        _fold_ds_idx_map = {
            i:{"train":t_idxs, "val":v_idxs} \
            for i, (t_idxs, v_idxs) in enumerate(sgkf.split(df.index, df.sign, df.participant_id))
        }
        
        # Ensure only one left hander in every val group
        if force_lh:
            if all([len(set(df.iloc[_idxs['val']]["participant_id"].unique()).intersection(set(lh_signers)))>=1 for _idxs in _fold_ds_idx_map.values()]):
                return _fold_ds_idx_map
            else:
                print(".", end="")
        else:
            return _fold_ds_idx_map
    
fold_ds_idx_map = get_folds(train_df, K_FOLDS, force_lh=False)
if K_FOLDS==N_PARTICIPANTS:
    fold_2_val_pid_map = {k:train_df.iloc[v["val"]].participant_id.values[0] for k,v in fold_ds_idx_map.items()}
    print(fold_2_val_pid_map)
print(" APPROPRIATE KFOLD SPLIT FOUND!\n")

In [None]:
class EpochPrintCB(tf.keras.callbacks.Callback):
    def __init__(self, n_epochs_btwn_prints=5, extra_metrics_to_incl=None):
        self.n_epochs_btwn_prints=n_epochs_btwn_prints        
        self.extra_metrics_to_incl = extra_metrics_to_incl if ((extra_metrics_to_incl is None) or (type(extra_metrics_to_incl)==list)) else list(extra_metrics_to_incl)
    
    def on_epoch_end(self, epoch, logs):
        if epoch % self.n_epochs_btwn_prints == 0:
            print_str = f"|| Epoch {epoch:>3} | lr: {self.model.optimizer.lr.numpy():10.7f} || loss:{logs['loss']:8.5f} | acc:{logs['acc']:8.5f} || val_loss:{logs['val_loss']:8.5f} | val_acc:{logs['val_acc']:8.5f} ||"
            if self.extra_metrics_to_incl is not None:
                for extra_metric in self.extra_metrics_to_incl:
                        print_str = "||".join([
                            group if i in [0, 1, len(print_str.split("||"))-1] else group[:-1]+f" | {'val_' if group[1]=='v' else ''}{extra_metric}:{logs[('val_' if group[1]=='v' else '')+extra_metric]:8.5f} "
                            for i, group in enumerate(print_str.split("||"))
                        ])
            print(print_str)

def fc_block(inputs, output_channels, dropout=0.2, gaussian_noise=0.01, _act="relu", do_bn=True):
    x = tf.keras.layers.Dense(output_channels)(inputs)    
    if do_bn: x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation(_act)(x)
    x = tf.keras.layers.Dropout(dropout)(x)
    x = tf.keras.layers.GaussianNoise(gaussian_noise)(x)
    return x

def get_model(n_labels=250, init_fc=512, n_blocks=2, _dropout_1=0.2, _dropout_2=0.6, _fc_step_rate=2, n_ax=2, n_feats=2,
              types_to_use=("lips", "left_hand", "pose", "right_hand"), do_L1=False, do_L2=False, _gaussian_noise=0.1, _do_bn=True,
              _per_block_gaussian_noise=0.01, type_frame_len={"lips":43, "left_hand":21, "pose":33, "right_hand":21}):
    
    flat_frame_len = sum([type_frame_len[x]*n_ax*n_feats for x in types_to_use])
    _inputs = tf.keras.layers.Input(shape=(flat_frame_len,))
    x = tf.keras.layers.GaussianNoise(_gaussian_noise)(_inputs)
    
    # Define layers
    for i in range(n_blocks):
        x = fc_block(
            x, output_channels=init_fc//(_fc_step_rate**i),
            dropout=_dropout_1 if i!=(n_blocks-1) else _dropout_2,
            gaussian_noise=_per_block_gaussian_noise,
            do_bn=_do_bn
        )
    
    # Define output layers
    _outputs = tf.keras.layers.Dense(n_labels, activation="softmax")(x)
    
    # Build the model
    model = tf.keras.models.Model(inputs=_inputs, outputs=_outputs)
    return model

BATCH_SIZE   = 1024
LR           = 0.0004
DO_BN        = True
DROPOUT_1    = 0.25
DROPOUT_2    = 0.5
GAUSS_NOISE  = 0.25
PER_BLOCK_GN = 0.05
N_EPOCHS     = 400
INIT_FC      = 384
N_BLOCKS     = 3
FC_STEP_RATE = 1.2
CB_MONITOR   = "val_acc"
LOSS_FN      = "sparse_categorical_crossentropy"
METRICS      = ["acc", tf.keras.metrics.SparseTopKCategoricalAccuracy(k=3, name='t3_acc')]

model_kwargs =dict(
    init_fc=INIT_FC, 
    n_blocks=N_BLOCKS, 
    _dropout_1=DROPOUT_1, 
    _dropout_2=DROPOUT_2, 
    _fc_step_rate=FC_STEP_RATE,
    _do_bn=DO_BN,
    _gaussian_noise=GAUSS_NOISE,
    _per_block_gaussian_noise=PER_BLOCK_GN,
)

In [None]:
histories, MODEL_DIR = [], "/kaggle/working/models"
if not os.path.isdir(MODEL_DIR): os.makedirs(MODEL_DIR)

for fold_num, fold_idxs in fold_ds_idx_map.items():
    print(f"\n\n... STARTING TRAINING FOR FOLD #{fold_num+1} ...\n")
    
    # Get the dataset
    val_x, val_y     = all_x[fold_idxs["val"]],   all_y[fold_idxs["val"]]
    train_x, train_y = all_x[fold_idxs["train"]], all_y[fold_idxs["train"]]

    # Initialize optimizer
    optimizer = tf.keras.optimizers.Adam(LR)
    
    # Initialize CB list
    _pct_to_drop = 2
    cb_list = [
        tf.keras.callbacks.EarlyStopping(patience=40, restore_best_weights=True, verbose=1, monitor=CB_MONITOR),
        tf.keras.callbacks.ReduceLROnPlateau(patience=2, factor=(1-0.01*_pct_to_drop), verbose=0, monitor=CB_MONITOR),
        EpochPrintCB(extra_metrics_to_incl=["t3_acc",])
    ]
            
    # Initialize model
    model = get_model(**model_kwargs)
    model.compile(optimizer, loss=LOSS_FN, metrics=METRICS)
    
    # See the structure and number of parameters
    if fold_num==0: print(f"\n\nFIRST FOLD... PRINTING MODEL SUMMARY:\n"); model.summary()
        
    # Fit!
    print("\n\n... BEGINNING MODEL TRAINING ...\n")
    histories.append(model.fit(train_x, train_y, validation_data=(val_x, val_y), epochs=N_EPOCHS, callbacks=cb_list, batch_size=BATCH_SIZE, verbose=0))
    
    # Save
    model.save(os.path.join(MODEL_DIR, f"islr_model__fold_{fold_num+1:>02}__{model.evaluate(val_x, val_y, verbose=0)[1]:.5f}").replace("0.", ""))
    
    # Cleanup 
    del model, train_x, train_y, val_x, val_y; gc.collect(); gc.collect();

In [None]:
def plot_training_data(histories, fold_2_pid_map=None, _cmap="tab20"):
    """
    Plots the accuracy and loss for ten folds of training data on the same figure using Matplotlib.

    Args:
    - histories: A list of ten history objects returned by the `fit` method of a Keras model.
    """
    cmap = plt.get_cmap(_cmap)
    clrs = [cmap(x) for x in np.linspace(0, 1, len(histories[:20]))]
    if len(clrs)<21: clrs=[(0.0, 0.0, 0.502, 1.0),]+clrs
    n_plots = len(histories[0].history)
    fig, axs = plt.subplots(n_plots, 1, figsize=(20, 20*(n_plots//2)))
    min_vals, max_vals = [1e5,]*len(histories[0].history), [0.0,]*len(histories[0].history)
    # plot accuracy and loss for each fold
    for i in range(len(histories)):
        for j, (_mkey, _mval) in enumerate(histories[i].history.items()):
            if fold_2_pid_map is None:
                axs[j].plot(_mval, label=f'Fold {i+1}', c=clrs[i])
            else:
                axs[j].plot(_mval, label=f'Fold {i+1} - Participant {fold_2_pid_map.get(i)}', c=clrs[i])
            if max(_mval)>max_vals[j]: max_vals[j]=max(_mval)
            if min(_mval)<min_vals[j]: min_vals[j]=min(_mval)

    for j, (_mkey, _mval) in enumerate(histories[0].history.items()):
        # set overall title and adjust spacing
        axs[j].set_title(f'{_mkey.title()} for {len(histories)} Folds')
        axs[j].set_xlabel('Epochs')
        axs[j].set_ylabel(f'{_mkey.title()}')
        axs[j].set_xlim([10, 400]) # skip first ten epochs
        axs[j].set_ylim([min_vals[j]*0.75, max_vals[j]*1.05])
        axs[j].grid(True)
        axs[j].legend()

    fig.tight_layout(pad=3.0)
    plt.show()
    
plot_training_data(histories, fold_2_val_pid_map)

In [None]:
def evaluate_model(model, data_x, data_y, decoder):
    """
    Evaluates the given model on the given data and prints predictions and ground truth for the last few samples.
    
    Args:
    - model: The trained model to evaluate.
    - data_x: The input data to evaluate the model on.
    - data_y: The target data to evaluate the model on.
    - decoder: A function to decode the model's output into readable text.
    """
    # Evaluate the model and print predictions and ground truth for the last few samples
    model_loss, model_accuracy = model.evaluate(data_x, data_y)
    print(f"Model Loss: {model_loss:.4f}, Model Accuracy: {model_accuracy:.4f}\n")
    
    last_samples_x, last_samples_y = data_x[-10:], data_y[-10:]
    print("Predictions and Ground Truth for Last Few Samples in Training Data:\n")
    for x, y in zip(last_samples_x, last_samples_y):
        pred = np.argmax(model.predict(tf.expand_dims(x, axis=0), verbose=0), axis=-1)[0]
        print(f"PRED: {decoder(pred):<20} – GT: {decoder(y)}")

    first_samples_x, first_samples_y = data_x[:10], data_y[:10]
    print("\nPredictions and Ground Truth for First Few Samples in Validation Data:\n")
    for x, y in zip(first_samples_x, first_samples_y):
        pred = np.argmax(model.predict(tf.expand_dims(x, axis=0), verbose=0), axis=-1)[0]
        print(f"PRED: {decoder(pred):<20} – GT: {decoder(y)}")
        

def compute_evaluation_metrics(model, data_x, data_y, decoder, plt_cm=False, verbose=True):
    """
    Computes the evaluation metrics for the given model on the given data and prints classwise confusion matrix.
    
    Args:
    - model: The trained model to evaluate.
    - data_x: The input data to evaluate the model on.
    - data_y: The target data to evaluate the model on.
    - decoder: A function to decode the model's output into readable text.
    """
    # Compute the predicted classes and confusion matrix
    batch_size = 1024
    y_pred = model.predict(data_x, batch_size=1024, verbose=verbose)
    y_pred_classes = tf.cast(np.argmax(y_pred, axis=1), tf.uint8)
    confusion_mtx = tf.math.confusion_matrix(data_y, y_pred_classes)
        
    # Compute the evaluation metrics by class
    num_classes = confusion_mtx.shape[0]
    classwise_performance = {}
    for i in range(num_classes):
        tp = confusion_mtx[i,i]
        fp = tf.reduce_sum(confusion_mtx[:,i]) - tp
        fn = tf.reduce_sum(confusion_mtx[i,:]) - tp
        tn = tf.reduce_sum(confusion_mtx[i]) - (tp - fp - fn)

        classwise_performance[i] = dict(
            accuracy=(tp + tn) / (tp + fp + tn + fn),
            precision = tp / (tp + fp),
            recall = tp / (tp + fn),
        )
        classwise_performance[i]['f1_score'] = 2 * (classwise_performance[i]['precision'] * classwise_performance[i]['recall']) / (classwise_performance[i]['precision'] + classwise_performance[i]['recall'])
        classwise_performance[i] = {k:v.numpy() for k,v in classwise_performance[i].items()}

    # Sort the classwise performance by f1_score and print the results
    if verbose:
        classwise_performance = dict(sorted(classwise_performance.items(), key=lambda x: x[1]["f1_score"], reverse=True))
        print("\n\n... OOF CLASSWISE CONFUSION MATRIX... \n")
        for i, perf in classwise_performance.items():
            print(f"Class {i:<3}  ({decoder(i):^13})  -->  Accuracy: {perf['accuracy']:.2f}, Precision: {perf['precision']:.2f}, Recall: {perf['recall']:.2f}, F1 Score: {perf['f1_score']:.2f}")
    return classwise_performance

MODEL_PATHS = sorted(glob(os.path.join(MODEL_DIR, "*")), key=lambda x: int(x.rsplit("_")[-3]))
model_perf_dfs = []
for i, mpath in enumerate(MODEL_PATHS):
    c_perf = compute_evaluation_metrics(
        tf.keras.models.load_model(MODEL_PATHS[i], compile=False), 
        all_x[fold_ds_idx_map[int(MODEL_PATHS[i].rsplit("_")[-3])-1]["val"]], 
        all_y[fold_ds_idx_map[int(MODEL_PATHS[i].rsplit("_")[-3])-1]["val"]], 
        decoder=decoder, verbose=False
    )
    model_perf_dfs.append(pd.DataFrame(dict(sorted(c_perf.items(), key=lambda x:x[0]))).T)

MODEL_PATHS = sorted(MODEL_PATHS, reverse=True, key=lambda x: int(x.rsplit("__", 1)[-1]))
for i, mdf in enumerate(model_perf_dfs): mdf.columns = [f"model_{i}_oof_"+_c for _c in mdf.columns]
oof_perf_df = pd.concat(model_perf_dfs, axis=1).reset_index().rename(columns={"index":"class_idx"})
oof_perf_df.insert(0, "class_str", oof_perf_df['class_idx'].apply(decoder))
display(oof_perf_df)

In [None]:
### PLOT ALL CLASSES:
# fig = px.bar(oof_perf_df, [_c for _c in oof_perf_df if "f1_score" in _c], "class_str", barmode="group", orientation="h", height=10000)
# fig.show()

## PLOT WORST 5 CLASSES:
worst_5 = [150, 12, 41, 224, 97]
fig = px.bar(oof_perf_df.iloc[worst_5], "class_str", [_c for _c in oof_perf_df if "f1_score" in _c], barmode="group", title="<b>OOF PERFORMANCE ON WORST 5 CLASSES</b>", labels={"value":"<b>F1 Score</b>", "variable":"<b>Fold</b>"})
fig.update_layout(showlegend=False)
fig.show()

## PLOT BEST 5 CLASSES:
best_5  = [2, 162, 89, 59, 182]
fig = px.bar(oof_perf_df.iloc[best_5], "class_str", [_c for _c in oof_perf_df if "f1_score" in _c], barmode="group", title="<b>OOF PERFORMANCE ON BEST 5 CLASSES</b>", labels={"value":"<b>F1 Score</b>", "variable":"<b>Fold</b>"})
fig.update_layout(showlegend=False)
fig.show()

In [None]:
for signer, oof_acc in {fold_2_val_pid_map[int(x.rsplit("_")[-3])-1]:float(x.rsplit("__" , 1)[-1])/100000 for x in MODEL_PATHS}.items():
    print(f"SINGER: {signer:>5} ({'LH' if signer in LH_SIGNERS else 'RH'}) ––> {oof_acc}")

In [None]:
def dumb_tf_mean(x, axis=None):
    return tf.math.reduce_mean(x, axis=axis)

def dumb_tf_std(x, axis=None):
    x = tf.experimental.numpy.var(x, axis=axis, dtype=tf.float32, ddof=1)
    return tf.experimental.numpy.sqrt(x)

class PrepInputs(tf.keras.layers.Layer):
    def __init__(self, lh_idx_range=(468, 489), pose_idx_range=(489, 522), rh_idx_range=(522, 543), distribution_mean=all_mean, distribution_std=all_std):
        super(PrepInputs, self).__init__()
        self.lips = tf.constant([61, 185, 40, 39, 37, 0, 267, 269, 270, 409, 291, 146, 91, 181, 84, 17, 314, 405, 321, 375, 291, 78, 191, 80, 81, 82, 13, 312, 311, 310, 415, 308, 78, 95, 88, 178, 87, 14, 317, 402, 318, 324, 308])
        self.idx_ranges = [lh_idx_range, pose_idx_range, rh_idx_range]
        self.flat_feat_lens = [2*self.lips.shape[0],]+[2*(_range[1]-_range[0]) for _range in self.idx_ranges]
        self.distribution_mean = tf.constant(distribution_mean, dtype=tf.float32)
        self.distribution_std  = tf.constant(distribution_std, dtype=tf.float32)
    
    def call(self, x_in):
        
        # Split the single vector into 4
        xs = [tf.gather(x_in[..., :2], self.lips, axis=1),]+[x_in[:, _range[0]:_range[1], :2] for _range in self.idx_ranges]
        
        # Reshape based on specific number of keypoints
        xs = [tf.reshape(_x, (-1, flat_feat_len)) for _x, flat_feat_len in zip(xs, self.flat_feat_lens)]

        xs = [tf.boolean_mask(_x, tf.reduce_all(tf.logical_not(tf.math.is_nan(_x)), axis=1), axis=0) for _x in xs]
        
        # Get means and stds
        x_means = [dumb_tf_mean(_x, axis=0) for _x in xs]
        x_stds  = [dumb_tf_std(_x,  axis=0) for _x in xs]
        
        x_out = tf.concat([*x_means, *x_stds], axis=0)
        x_out = tf.expand_dims(tf.where(tf.math.is_nan(x_out), tf.zeros_like(x_out), x_out), axis=0)
        x_out = self.standardize_tensor(x_out)
        return x_out
    
    def standardize_tensor(self, tensor):
        return tf.where(tensor!=0, (tensor-self.distribution_mean)/self.distribution_std, tf.zeros_like(tensor))
    
p_demo = PrepInputs()(load_relevant_data_subset(train_df.path[0]))
print(p_demo.shape)

In [None]:
class ISLRModel(tf.keras.Model):
    """
    TensorFlow Lite model that takes input tensors and applies:
        – a preprocessing model
        – the ISLR model 
    """

    def __init__(self, islr_fold_models):
        """
        Initializes the TFLiteModel with the specified preprocessing model and ISLR model.
        """
        super(ISLRModel, self).__init__()

        # Load the feature generation and main models
        self.islr_fold_models  = list(islr_fold_models.values())
        self.model_weights = tf.repeat(tf.expand_dims(tf.constant([float(k)/100_000. for k in islr_fold_models.keys()], dtype=tf.float32), axis=-1), 250, axis=-1)
    
    def __call__(self, inputs, training=None):
        """
        Applies the feature generation model and main model to the input tensors.

        Args:
            inputs: Input tensor with shape [batch_size, 543, 3].

        Returns:
            A dictionary with a single key 'outputs' and corresponding output tensor.
        """
        batch_size = tf.shape(inputs)[0]
        outputs    = tf.concat([_model(inputs, training=training) for _model in self.islr_fold_models], axis=0)
        outputs = tf.reduce_mean(outputs, axis=0, keepdims=True)

        # Return a dictionary with the output tensor
        return outputs

class TFLiteModel(tf.Module):
    """
    TensorFlow Lite model that takes input tensors and applies:
        – a preprocessing model
        – the ISLR model 
    """

    def __init__(self, islr_fold_models, islr_fold_pp_fn):
        """
        Initializes the TFLiteModel with the specified preprocessing model and ISLR model.
        """
        super(TFLiteModel, self).__init__()

        # Load the feature generation and main models
        self.prep_inputs = islr_fold_pp_fn()
        self.islr_fold_models  = list(islr_fold_models.values())
        self.model_weights = tf.repeat(tf.expand_dims(tf.constant([float(k)/100_000. for k in islr_fold_models.keys()], dtype=tf.float32), axis=-1), 250, axis=-1)
    
    @tf.function(input_signature=[tf.TensorSpec(shape=[None, 543, 3], dtype=tf.float32, name='inputs')])
    def __call__(self, inputs):
        """
        Applies the feature generation model and main model to the input tensors.

        Args:
            inputs: Input tensor with shape [batch_size, 543, 3].

        Returns:
            A dictionary with a single key 'outputs' and corresponding output tensor.
        """
        x = self.prep_inputs(tf.cast(inputs, dtype=tf.float32))
        outputs  = tf.concat([_model(x) for _model in self.islr_fold_models], axis=0)
        
        outputs = tf.reduce_mean(outputs, axis=0, keepdims=True)
        
        # Return a dictionary with the output tensor
        return {'outputs': outputs}
    
ONLY_KFOLD=False

if ONLY_KFOLD:
    ISLR_FOLD_MODELS = {_path.rsplit("__", 1)[-1]:tf.keras.models.load_model(_path, compile=False) for _path in MODEL_PATHS}
    tflite_keras_model = TFLiteModel(ISLR_FOLD_MODELS, PrepInputs)
    out = tflite_keras_model(load_relevant_data_subset(train_df.path[0]))["outputs"]
    np.argmax(out)

In [None]:
if ONLY_KFOLD:
    keras_model_converter = tf.lite.TFLiteConverter.from_keras_model(tflite_keras_model)
    tflite_model = keras_model_converter.convert()

    TFLITE_PATH = '/kaggle/working/models/model.tflite'
    with open(TFLITE_PATH, 'wb') as f:
        f.write(tflite_model)
    !zip submission.zip {TFLITE_PATH}

    interpreter = tflite.Interpreter(TFLITE_PATH)
    found_signatures = list(interpreter.get_signature_list().keys())
    prediction_fn = interpreter.get_signature_runner("serving_default")
    output = prediction_fn(inputs=load_relevant_data_subset(train_df.path[0]))
    sign = np.argmax(output["outputs"])

    print("PRED : ", decoder(sign))
    print("GT   : ", train_df.sign[0])

In [None]:
def get_input_shape(num_frames, landmarks, flag_drop_z):
    input_shape = (num_frames, landmarks * 3)

    if flag_drop_z:
        num_coords = 2
    else:
        num_coords = 3

    return (num_frames, landmarks * num_coords)

output_bias = tf.keras.initializers.Constant(1.0 / 250.0)
class MSD(tf.keras.layers.Layer):
    def __init__(
        self,
        units,
        fold_num=1,
        **kwargs,
    ):
        super().__init__(**kwargs)

        self.lin = tf.keras.layers.Dense(
            units,
            activation=None,
            use_bias=True,
            bias_initializer=output_bias,
            # kernel_regularizer=R.l2(WEIGHT_REGULARIZE)
        )

        rate_dropout = 0.5
        self.dropouts = [
            tf.keras.layers.Dropout((rate_dropout - 0.2), seed=135 + fold_num),
            tf.keras.layers.Dropout((rate_dropout - 0.1), seed=690 + fold_num),
            tf.keras.layers.Dropout((rate_dropout), seed=275 + fold_num),
            tf.keras.layers.Dropout((rate_dropout + 0.1), seed=348 + fold_num),
            tf.keras.layers.Dropout((rate_dropout + 0.2), seed=861 + fold_num),
        ]

    def call(self, inputs):
        for ii, drop in enumerate(self.dropouts):
            if ii == 0:
                out = self.lin(drop(inputs)) / 5.0
            else:
                out += self.lin(drop(inputs)) / 5.0
        return out


class ResidualBlock(tf.keras.layers.Layer):
    def __init__(self, units, dropout):
        super().__init__()
        self.linear = tf.keras.layers.Dense(units)
        self.bn = tf.keras.layers.BatchNormalization()
        self.act = tf.keras.layers.Activation("gelu")
        if dropout != 0:
            self.drop = tf.keras.layers.Dropout(dropout)
            self.flag_use_drop = True
        else:
            self.flag_use_drop = False

    def call(self, x):
        x = self.linear(x)
        x = self.bn(x)
        x = self.act(x)
        if self.flag_use_drop:
            x = self.drop(x)
        return x

class GRUModel(tf.keras.layers.Layer):
    def __init__(self, units, dropout, num_blocks):
        super().__init__()
        self.start_gru = tf.keras.layers.GRU(
            units=units, dropout=0.0, return_sequences=True
        )
        self.end_gru = tf.keras.layers.GRU(
            units=units, dropout=dropout, return_sequences=False
        )

        if (num_blocks - 2) > 0:
            self.gru_blocks = [
                tf.keras.layers.GRU(units=units, dropout=dropout, return_sequences=True)
                * (num_blocks - 2)
            ]
            self.flag_use_gru_blocks = True
        else:
            self.flag_use_gru_blocks = False

    def call(self, x):
        x = self.start_gru(x)
        if self.flag_use_gru_blocks:
            for blk in self.gru_blocks:
                x = blk(x)
        x = self.end_gru(x)
        return x

def model_utils(cfg, fold_num):
    metric_ls = [
        tf.keras.metrics.SparseCategoricalAccuracy(),
        tf.keras.metrics.SparseTopKCategoricalAccuracy(k=5),
    ]

    cb_list = [
        tf.keras.callbacks.EarlyStopping(
            patience=5,
            restore_best_weights=True,
            verbose=1,
            monitor=cfg["TARGET_METRIC"],
        ),
        tf.keras.callbacks.ReduceLROnPlateau(patience=2, factor=0.8, verbose=1),
        tf.keras.callbacks.ModelCheckpoint(
            f"{SAVE_DIR}/best_acc_{fold_num}.h5",
            monitor=cfg["TARGET_METRIC"],
            verbose=0,
            save_best_only=True,
            save_weights_only=True,
            mode="max",
            save_freq="epoch",
        ),
    ]

    if cfg["FLAG_WANDB"]:
        cb_list += [#WandbMetricsLogger()
            WandbCallback(
                monitor=cfg["TARGET_METRIC"],
                log_weights=False,
                log_evaluation=False,
                save_model=False,
            )
        ]

    opt = tfa.optimizers.AdamW(weight_decay=0, learning_rate=cfg["LR"])
    # opt = tf.keras.optimizers.Adam(learning_rate=LR)
    # opt = tfa.optimizers.RectifiedAdam(learning_rate=LR)
    # opt = tfa.optimizers.Lookahead(opt, sync_period=5)

    return metric_ls, cb_list, opt

# Analyzing Handedness
left_handed_signer = [16069, 32319, 36257, 22343, 27610, 61333, 34503, 55372, 37055]  # both_hands_signer-> 37055
right_handed_signer = [26734, 28656, 25571, 62590, 29302, 49445, 53618, 18796, 4718, 2044, 37779, 30680,]
lip_landmarks = [61, 185, 40, 39, 37, 0, 267, 269, 270, 409, 291, 146, 91, 181, 84, 17, 314, 405, 321, 375, 78, 191, 80, 81, 82, 13, 312, 311, 310, 415, 95, 88, 178, 87, 14, 317, 402, 318, 324, 308]

di = {}
for k in left_handed_signer:
    di[k] = 0
for k in right_handed_signer:
    di[k] = 1

left_hand_landmarks = list(range(468, 468 + 21))
right_hand_landmarks = list(range(522, 522 + 21))

averaging_sets = [
    [0, 468],
    [489, 33],
]  ## average over the entire face, and the entire 'pose'

point_landmarks = [
    item
    for sublist in [lip_landmarks, left_hand_landmarks, right_hand_landmarks]
    for item in sublist
]

LANDMARKS = len(point_landmarks) #+ len(averaging_sets)

# Fixed  ##################################################################################

FLAG_DROP_Z = False
ROWS_PER_FRAME = 543
NUM_FRAMES = 15
INPUT_SHAPE = get_input_shape(NUM_FRAMES, LANDMARKS, FLAG_DROP_Z)
SEGMENTS = 3
NUM_BASE_FEATS = (SEGMENTS + 1) * INPUT_SHAPE[1] * 2
FLAT_FRAME_SHAPE = NUM_BASE_FEATS + (INPUT_SHAPE[0] * INPUT_SHAPE[1])
decoder = {v: k for k, v in read_json_file("/kaggle/input/asl-signs/sign_to_prediction_index_map.json").items()}

    
_inputs = tf.keras.layers.Input(shape=(FLAT_FRAME_SHAPE,))

# import ipdb
# ipdb.set_trace()
x = _inputs[:, :NUM_BASE_FEATS]
x_conv = tf.reshape(_inputs[:, NUM_BASE_FEATS:], (-1, NUM_FRAMES, INPUT_SHAPE[1]))

# Concat Dilated Convolutions with actual data
gru_out = GRUModel(512, 0.5, 1)(x_conv)
x = gru_out

# Residual Block
x = ResidualBlock(1024, 0.25)(x)
x += ResidualBlock(1024, 0.0)(x)

# Final output MSD Layer
x = MSD(units=250)(x)
_outputs = tf.keras.layers.Softmax(dtype="float32")(x)

# Build the model
gwg_model = tf.keras.models.Model(inputs=_inputs, outputs=_outputs)
gwg_model.summary()
gwg_model.load_weights("/kaggle/input/gwg-dataset-version-4/models/best_acc_1.h5")

In [None]:
def tf_nan_mean(x, axis=0):
    return tf.reduce_sum(tf.where(tf.math.is_nan(x), tf.zeros_like(x), x), axis=axis) / tf.reduce_sum(tf.where(tf.math.is_nan(x), tf.zeros_like(x), tf.ones_like(x)), axis=axis)

def tf_nan_std(x, axis=0):
    d = x - tf_nan_mean(x, axis=axis)
    return tf.math.sqrt(tf_nan_mean(d * d, axis=axis))

def flatten_means_and_stds(x, axis=0):
    # Get means and stds
    x_mean = tf_nan_mean(x, axis=0)
    x_std  = tf_nan_std(x,  axis=0)

    x_out = tf.concat([x_mean, x_std], axis=0)
    x_out = tf.reshape(x_out, (1, INPUT_SHAPE[1]*2))
    x_out = tf.where(tf.math.is_finite(x_out), x_out, tf.zeros_like(x_out))
    return x_out

class FeatureGen_1(tf.keras.layers.Layer):
    def __init__(self):
        super(FeatureGen_1, self).__init__()
    
    def call(self, x_in):
        x = tf.gather(x_in, point_landmarks, axis=1)

        x_padded = x
        for i in range(SEGMENTS):
            p0 = tf.where( ((tf.shape(x_padded)[0] % SEGMENTS) > 0) & ((i % 2) != 0) , 1, 0)
            p1 = tf.where( ((tf.shape(x_padded)[0] % SEGMENTS) > 0) & ((i % 2) == 0) , 1, 0)
            paddings = [[p0, p1], [0, 0], [0, 0]]
            x_padded = tf.pad(x_padded, paddings, mode="SYMMETRIC")
        x_list = tf.split(x_padded, SEGMENTS)
        x_list = [flatten_means_and_stds(_x, axis=0) for _x in x_list]

        x_list.append(flatten_means_and_stds(x, axis=0))
        
        ## Resize only dimension 0. Resize can't handle nan, so replace nan with that dimension's avg value to reduce impact.
        x = tf.image.resize(tf.where(tf.math.is_finite(x), x, tf_nan_mean(x, axis=0)), [NUM_FRAMES, LANDMARKS])
        x = tf.reshape(x, (1, INPUT_SHAPE[0]*INPUT_SHAPE[1]))
        x = tf.where(tf.math.is_nan(x), tf.zeros_like(x), x)
        x_list.append(x)
        x = tf.concat(x_list, axis=1)
        return x
    
R__DROP_Z = False

R__NUM_FRAMES = 15
R__SEGMENTS = 3

R__LEFT_HAND_OFFSET = 468
R__POSE_OFFSET = R__LEFT_HAND_OFFSET+21
R__RIGHT_HAND_OFFSET = R__POSE_OFFSET+33

## average over the entire face, and the entire 'pose'
R__averaging_sets = [[0, 468], [R__POSE_OFFSET, 33]]

R__lip_landmarks = [61, 185, 40, 39, 37,  0, 267, 269, 270, 409,
                 291,146, 91,181, 84, 17, 314, 405, 321, 375, 
                 78, 191, 80, 81, 82, 13, 312, 311, 310, 415, 
                 95, 88, 178, 87, 14,317, 402, 318, 324, 308]
R__left_hand_landmarks = list(range(R__LEFT_HAND_OFFSET, R__LEFT_HAND_OFFSET+21))
R__right_hand_landmarks = list(range(R__RIGHT_HAND_OFFSET, R__RIGHT_HAND_OFFSET+21))

R__point_landmarks = [item for sublist in [R__lip_landmarks, R__left_hand_landmarks, R__right_hand_landmarks] for item in sublist]

R__LANDMARKS = len(R__point_landmarks) + len(R__averaging_sets)

if R__DROP_Z:
    R__INPUT_SHAPE = (R__NUM_FRAMES,R__LANDMARKS*2)
else:
    R__INPUT_SHAPE = (R__NUM_FRAMES,R__LANDMARKS*3)

R__FLAT_INPUT_SHAPE = (R__INPUT_SHAPE[0] + 2 * (R__SEGMENTS + 1)) * R__INPUT_SHAPE[1]

def R__flatten_means_and_stds(x, axis=0):
    # Get means and stds
    x_mean = tf_nan_mean(x, axis=0)
    x_std  = tf_nan_std(x,  axis=0)

    x_out = tf.concat([x_mean, x_std], axis=0)
    x_out = tf.reshape(x_out, (1, R__INPUT_SHAPE[1]*2))
    x_out = tf.where(tf.math.is_finite(x_out), x_out, tf.zeros_like(x_out))
    return x_out
    
class RobertFeatureGen(tf.keras.layers.Layer):
    def __init__(self):
        super(RobertFeatureGen, self).__init__()
    
    def call(self, x_in):
        if R__DROP_Z:
            x_in = x_in[:, :, 0:2]
        x_list = [tf.expand_dims(tf_nan_mean(x_in[:, av_set[0]:av_set[0]+av_set[1], :], axis=1), axis=1) for av_set in R__averaging_sets]
        x_list.append(tf.gather(x_in, R__point_landmarks, axis=1))
        x = tf.concat(x_list, 1)

        x_padded = x
        for i in range(R__SEGMENTS):
            p0 = tf.where( ((tf.shape(x_padded)[0] % R__SEGMENTS) > 0) & ((i % 2) != 0) , 1, 0)
            p1 = tf.where( ((tf.shape(x_padded)[0] % R__SEGMENTS) > 0) & ((i % 2) == 0) , 1, 0)
            paddings = [[p0, p1], [0, 0], [0, 0]]
            x_padded = tf.pad(x_padded, paddings, mode="SYMMETRIC")
        x_list = tf.split(x_padded, R__SEGMENTS)
        x_list = [R__flatten_means_and_stds(_x, axis=0) for _x in x_list]

        x_list.append(R__flatten_means_and_stds(x, axis=0))
        
        ## Resize only dimension 0. Resize can't handle nan, so replace nan with that dimension's avg value to reduce impact.
        x = tf.image.resize(tf.where(tf.math.is_finite(x), x, tf_nan_mean(x, axis=0)), [R__NUM_FRAMES, R__LANDMARKS])
        x = tf.reshape(x, (1, R__INPUT_SHAPE[0]*R__INPUT_SHAPE[1]))
        x = tf.where(tf.math.is_nan(x), tf.zeros_like(x), x)
        x_list.append(x)
        x = tf.concat(x_list, axis=1)
        return x

In [None]:
class TFLiteModel(tf.Module):
    """
    TensorFlow Lite model that takes input tensors and applies:
        – a preprocessing model
        – the ISLR model 
    """

    def __init__(self, islr_fold_models, islr_fold_pp_fn, gwg_model, gwg_pp_fn, robert_model, robert_pp_fn):
        """
        Initializes the TFLiteModel with the specified preprocessing model and ISLR model.
        """
        super(TFLiteModel, self).__init__()

        # Load the feature generation and main models
        self.prep_inputs_1 = islr_fold_pp_fn()
        self.prep_inputs_2 = gwg_pp_fn()
        self.prep_inputs_3 = robert_pp_fn()
        self.models_1      = list(islr_fold_models.values())
        self.model_2       = gwg_model
        self.model_3       = robert_model
    
    @tf.function(input_signature=[tf.TensorSpec(shape=[None, 543, 3], dtype=tf.float32, name='inputs')])
    def __call__(self, inputs):
        """
        Applies the feature generation model and main model to the input tensors.

        Args:
            inputs: Input tensor with shape [batch_size, 543, 3].

        Returns:
            A dictionary with a single key 'outputs' and corresponding output tensor.
        """
        x1 = self.prep_inputs_1(tf.cast(inputs, dtype=tf.float32))
        x2 = self.prep_inputs_2(tf.cast(inputs, dtype=tf.float32))
        x3 = self.prep_inputs_3(tf.cast(inputs, dtype=tf.float32))
        
        outputs_1 = tf.concat([_model(x1) for _model in self.models_1], axis=0)
        outputs_2 = self.model_2(x2) 
        outputs_3 = self.model_3(x3) 
        
        # 2x weighting higher score via repeat 14
        outputs = tf.reduce_mean(tf.concat([
            outputs_1, 
            tf.repeat(outputs_2, 11, axis=0),
            tf.repeat(outputs_3, 9, axis=0),
        ], axis=0), axis=0, keepdims=True)
        
        # Return a dictionary with the output tensor
        return {'outputs': outputs}

R__MODEL_PATH = "/kaggle/input/ensemble-dataset/ensemble_basic/robert/models/asl_model"
N_TOP_MODELS=7
ISLR_FOLD_MODELS = {_path.rsplit("__", 1)[-1]:tf.keras.models.load_model(_path, compile=False) for _path in MODEL_PATHS[:N_TOP_MODELS]}
tflite_keras_model = TFLiteModel(
    ISLR_FOLD_MODELS, PrepInputs, 
    gwg_model, FeatureGen_1,
    tf.keras.models.load_model(R__MODEL_PATH, compile=False), RobertFeatureGen
)
out = tflite_keras_model(load_relevant_data_subset(train_df.path[0]))["outputs"]
np.argmax(out)

In [None]:
# Helps reduce overall size but can decrease performance
DO_OPTIMIZATION = True

if ONLY_KFOLD:
    !rm -rf {TFLITE_PATH}
    !rm -rf ./submission.zip

keras_model_converter = tf.lite.TFLiteConverter.from_keras_model(tflite_keras_model)
keras_model_converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = keras_model_converter.convert()

TFLITE_PATH = '/kaggle/working/models/model.tflite'
with open(TFLITE_PATH, 'wb') as f:
    f.write(tflite_model)
!zip submission.zip {TFLITE_PATH}

interpreter = tflite.Interpreter(TFLITE_PATH)
found_signatures = list(interpreter.get_signature_list().keys())
prediction_fn = interpreter.get_signature_runner("serving_default")
output = prediction_fn(inputs=load_relevant_data_subset(train_df.path[0]))
sign = np.argmax(output["outputs"])

print("PRED : ", decoder[sign])
print("GT   : ", train_df.sign[0])