In [3]:
!pip install mediapipe

Collecting mediapipe
  Downloading mediapipe-0.10.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (33.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m33.5/33.5 MB[0m [31m30.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Collecting sounddevice>=0.4.4 (from mediapipe)
  Downloading sounddevice-0.4.6-py3-none-any.whl (31 kB)
Installing collected packages: sounddevice, mediapipe
Successfully installed mediapipe-0.10.3 sounddevice-0.4.6


****Library Imports

In [4]:
import os
import io
import shutil
import numpy as np
import pandas as pd
import pyarrow.parquet as pq
import tensorflow as tf
import json
import mediapipe
import imghdr
import matplotlib
import matplotlib.pyplot as plt
import random
from mediapipe.framework.formats import landmark_pb2
from skimage.transform import resize
from tensorflow import keras
from tensorflow.keras import layers
from tqdm.notebook import tqdm
from PIL import Image

caused by: ['/opt/conda/lib/python3.10/site-packages/tensorflow_io/python/ops/libtensorflow_io_plugins.so: undefined symbol: _ZN3tsl6StatusC1EN10tensorflow5error4CodeESt17basic_string_viewIcSt11char_traitsIcEENS_14SourceLocationE']
caused by: ['/opt/conda/lib/python3.10/site-packages/tensorflow_io/python/ops/libtensorflow_io.so: undefined symbol: _ZTVN10tensorflow13GcsFileSystemE']


Create Mediapipe Hand Model 

In [6]:
# Extract the landmark data and convert it to an image using medipipe library.
# This function extracts the data for both hands.
#Ref: https://www.kaggle.com/code/gusthema/asl-fingerspelling-recognition-w-tensorflow

mp_pose = mediapipe.solutions.pose
mp_hands = mediapipe.solutions.hands
mp_drawing = mediapipe.solutions.drawing_utils 
mp_drawing_styles = mediapipe.solutions.drawing_styles

def get_hands(seq_df):
    images = []
    all_hand_landmarks = []
    for seq_idx in range(len(seq_df)):
        x_hand = seq_df.iloc[seq_idx].filter(regex="x_right_hand.*").values
        y_hand = seq_df.iloc[seq_idx].filter(regex="y_right_hand.*").values
        z_hand = seq_df.iloc[seq_idx].filter(regex="z_right_hand.*").values

        right_hand_image = np.zeros((256, 256, 3))

        right_hand_landmarks = landmark_pb2.NormalizedLandmarkList()
        
        for x, y, z in zip(x_hand, y_hand, z_hand):
            right_hand_landmarks.landmark.add(x=x, y=y, z=z)

        mp_drawing.draw_landmarks(
                right_hand_image,
                right_hand_landmarks,
                mp_hands.HAND_CONNECTIONS,
                landmark_drawing_spec=mp_drawing_styles.get_default_hand_landmarks_style())
        
        x_hand = seq_df.iloc[seq_idx].filter(regex="x_left_hand.*").values
        y_hand = seq_df.iloc[seq_idx].filter(regex="y_left_hand.*").values
        z_hand = seq_df.iloc[seq_idx].filter(regex="z_left_hand.*").values
        
        left_hand_image = np.zeros((256, 256, 3))
        
        left_hand_landmarks = landmark_pb2.NormalizedLandmarkList()
        for x, y, z in zip(x_hand, y_hand, z_hand):
            left_hand_landmarks.landmark.add(x=x, y=y, z=z)

        mp_drawing.draw_landmarks(
                left_hand_image,
                left_hand_landmarks,
                mp_hands.HAND_CONNECTIONS,
                landmark_drawing_spec=mp_drawing_styles.get_default_hand_landmarks_style())
        
        images.append([right_hand_image.astype(np.uint8), left_hand_image.astype(np.uint8)])
        all_hand_landmarks.append([right_hand_landmarks, left_hand_landmarks])
    return images, all_hand_landmarks

Loading and Preprocess


In [7]:
#Read train csv
train_df = pd.read_csv('/kaggle/input/asl-fingerspelling/train.csv')
no_phrases = train_df.shape[0]
unique_file_ids = train_df['file_id'].unique()
print(len(unique_file_ids))

# Get hand and pose columns
LH = [f'x_left_hand_{i}' for i in range(21)] + [f'y_left_hand_{i}' for i in range(21)]
RH = [f'x_right_hand_{i}' for i in range(21)] + [f'y_right_hand_{i}' for i in range(21)]
pose = [f'x_pose_{i}' for i in range(33)] + [f'y_pose_{i}' for i in range(33)]
select_features = LH + RH + pose

padding_image = None
# Read sequences 
for file_id in tqdm(unique_file_ids): 
#     sequence_id, file_id, phrase = train_df.iloc[i][['sequence_id', 'file_id', 'phrase']]
    file_rows = train_df[train_df['file_id'] == file_id]
    sequence_id, phrase = file_rows.iloc[0][['sequence_id', 'phrase']]
    # Read sequences for a given phrase
    seq = pq.read_table(f"/kaggle/input/asl-fingerspelling/train_landmarks/{str(file_id)}.parquet",
        filters=[[('sequence_id', '=', sequence_id)],]).to_pandas()
    seq_frames =  seq[select_features]
    # Chose hand image with min nan values 
    hand_images, hand_landmarks = get_hands(seq)
    seq_len = len(hand_images)
    
    print(len(hand_images))
    chosen_hand = None
    chosen_images = []
    chosen_landmarks_list = None
    for i in range(len(hand_images)):
        right_hand_landmarks = hand_landmarks[i][0]
        left_hand_landmarks = hand_landmarks[i][1]
        right_hand_image = hand_images[i][0]
        left_hand_image = hand_images[i][1]

        # Check NaN values for right hand
        right_na_count = sum(np.isnan([lm.x for lm in right_hand_landmarks.landmark])) \
                         + sum(np.isnan([lm.y for lm in right_hand_landmarks.landmark])) \
                         + sum(np.isnan([lm.z for lm in right_hand_landmarks.landmark]))

        # Check NaN values for left hand
        left_na_count = sum(np.isnan([lm.x for lm in left_hand_landmarks.landmark])) \
                        + sum(np.isnan([lm.y for lm in left_hand_landmarks.landmark])) \
                        + sum(np.isnan([lm.z for lm in left_hand_landmarks.landmark]))
        
        # Choose the hand with fewer NaN values
        if right_na_count < left_na_count:
            min_na_count = right_na_count
            chosen_hand = "right"
            chosen_images.append(right_hand_image)
        elif(left_na_count < right_na_count):
            min_na_count = left_na_count
            chosen_hand = "left"
            chosen_images.append(left_hand_image)
        else:
            chosen_hand = "right as both hands are Nan"
            chosen_images.append(right_hand_image)
            padding_image = right_hand_image

    
    #Set Max Frame length
    MAX_LEN = 413 # Max no of images in a sequences are 380
    
    # Pad chosen_images 
    if(len(chosen_images) < 413):
        itr = MAX_LEN-len(chosen_images)
        for i in range(itr):
            chosen_images.append(padding_image)
    
    print('Adjusted chosen image length is:',len(chosen_images))
    
    if not os.path.isdir("tfrecords"): os.mkdir("tfrecords")
    tf_record = f"tfrecords/{file_id}.tfrecord"
    
    
    # Create TF Records
    parquet_numpy = seq.to_numpy()
    with tf.io.TFRecordWriter(tf_record) as file_writer:
        for i in range(len(chosen_images)):
            image = chosen_images[i]
            
            # Preprocess and convert image to a supported format
            preprocessed_image = image.astype(np.uint8)
            
            # Serialize the image array to bytes
            image_bytes = tf.io.encode_jpeg(preprocessed_image).numpy()

            # Create a TF feature for the image and phrase
            feature = {
            "image": tf.train.Feature(
                bytes_list=tf.train.BytesList(value=[image_bytes])
            ),
            "phrase": tf.train.Feature(
                bytes_list=tf.train.BytesList(value=[bytes(phrase, 'utf-8')])
            ),
        }

            record_bytes = tf.train.Example(features=tf.train.Features(feature=feature)).SerializeToString()
            file_writer.write(record_bytes)


68


  0%|          | 0/68 [00:00<?, ?it/s]

123
Adjusted chosen image length is: 413
294
Adjusted chosen image length is: 413
127
Adjusted chosen image length is: 413
126
Adjusted chosen image length is: 413
122
Adjusted chosen image length is: 413
300
Adjusted chosen image length is: 413
132
Adjusted chosen image length is: 413
94
Adjusted chosen image length is: 413
107
Adjusted chosen image length is: 413
186
Adjusted chosen image length is: 413
209
Adjusted chosen image length is: 413
118
Adjusted chosen image length is: 413
61
Adjusted chosen image length is: 413
103
Adjusted chosen image length is: 413
8
Adjusted chosen image length is: 413
173
Adjusted chosen image length is: 413
60
Adjusted chosen image length is: 413
204
Adjusted chosen image length is: 413
150
Adjusted chosen image length is: 413
140
Adjusted chosen image length is: 413
244
Adjusted chosen image length is: 413
84
Adjusted chosen image length is: 413
77
Adjusted chosen image length is: 413
6
Adjusted chosen image length is: 413
293
Adjusted chosen image

Read TF Records

In [11]:
def decode_fn(record_bytes):
    schema = {
        "image": tf.io.FixedLenFeature([], dtype=tf.string),
        "phrase": tf.io.FixedLenFeature([], dtype=tf.string)
    }
    features = tf.io.parse_single_example(record_bytes, schema)
    
    # Decode image
#     image = tf.io.decode_raw(features['image'], tf.uint8)
#     image = tf.reshape(image, (256, 256, 3))
    image = tf.io.decode_jpeg(features['image'], channels=3)
    image = tf.cast(image, tf.float32) / 255.0
    
    # Decode phrase
#     phrase = tf.io.decode_raw(features['phrase'], tf.uint8)
#     phrase = tf.strings.join([phrase], separator="").numpy().decode('utf-8')
    phrase = features['phrase']
    
    return image, phrase

Preprocess input sequences

In [30]:
#Ref: https://www.kaggle.com/code/gusthema/asl-fingerspelling-recognition-w-tensorflow
with open ("/kaggle/input/asl-fingerspelling/character_to_prediction_index.json", "r") as f:
    char_to_num = json.load(f)

# Add pad_token, start_token and end_token to the dict
pad_token = 'P'
start_token = '<'
end_token = '>'
pad_token_idx = 59
start_token_idx = 60
end_token_idx = 61

char_to_num[pad_token] = pad_token_idx
char_to_num[start_token] = start_token_idx
char_to_num[end_token] = end_token_idx
num_to_char = {j:i for i,j in char_to_num.items()}

table = tf.lookup.StaticHashTable(
    initializer=tf.lookup.KeyValueTensorInitializer(
        keys=list(char_to_num.keys()),
        values=list(char_to_num.values()),
    ),
    default_value=tf.constant(-1),
    name="class_weight"
)

def convert_fn(landmarks, phrase):
    # Add start and end pointers to phrase.
    phrase = start_token + phrase + end_token
    phrase = tf.strings.bytes_split(phrase)
    phrase = table.lookup(phrase)
    # Vectorize and add padding.
    phrase = tf.pad(phrase, paddings=[[0, 64 - tf.shape(phrase)[0]]], mode = 'CONSTANT',
                    constant_values = pad_token_idx)
    return landmarks, phrase

Split Data

In [38]:
#Get TF Records
tf_records = train_df.file_id.map(lambda x: f'/kaggle/working/tfrecords/{x}.tfrecord').unique()
print(f"List of {len(tf_records)} TFRecord files.")

batch_size = 16
train_len = int(0.1 * len(tf_records))

train_ds = tf.data.TFRecordDataset(tf_records[:train_len]).map(decode_fn).map(convert_fn).batch(batch_size).prefetch(buffer_size=tf.data.AUTOTUNE).cache()
valid_ds = tf.data.TFRecordDataset(tf_records[train_len:]).map(decode_fn).map(convert_fn).batch(batch_size).prefetch(buffer_size=tf.data.AUTOTUNE).cache()

# Verify TF Records being read properly

# for landmarks, phrase in train_ds:
#         # Convert the TensorFlow tensor to a NumPy array
#     batch_of_images_np = landmarks.numpy()

#     # Determine the shape of the batch (assuming the first dimension is the batch size)
#     batch_size, height, width, channels = batch_of_images_np.shape

#     # Calculate the number of rows and columns for the grid
#     num_rows = int(np.sqrt(batch_size))
#     num_cols = (batch_size + num_rows - 1) // num_rows

#     # Create a figure and a grid of subplots
#     fig, axs = plt.subplots(num_rows, num_cols, figsize=(10, 10))

#     # Flatten the subplots array if it's a 1D array
#     if num_rows == 1:
#         axs = axs.reshape(1, -1)

#     # Loop through the images and display them in the grid
#     for i in range(batch_size):
#         ax = axs[i // num_cols, i % num_cols]
#         ax.imshow(batch_of_images_np[i])
#         ax.axis('off')  # Turn off axis labels and ticks

#     # Hide any remaining empty subplots
#     for i in range(batch_size, num_rows * num_cols):
#         axs[i // num_cols, i % num_cols].axis('off')

#     # Show the grid of images
#     plt.show()
    
#     print("Phrase:", phrase)
#     print("------------------------------")

List of 68 TFRecord files.


Transformer Model in progress.....

In [8]:
# Token And Landmark Image Embedding
class TokenEmbedding(layers.Layer):
    def __init__(self, num_vocab=1000, maxlen=100, num_hid=64):
        super().__init__()
        self.emb = tf.keras.layers.Embedding(num_vocab, num_hid)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=num_hid)

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        x = self.emb(x)
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        return x + positions

class LandmarkEmbedding2D(layers.Layer):
    def __init__(self, num_hid=64, maxlen=100):
        super().__init__()
        self.conv1 = tf.keras.layers.Conv2D(
            num_hid, (3, 3), strides=(2, 2), padding="same", activation="relu"
        )
        self.conv2 = tf.keras.layers.Conv2D(
            num_hid, (3, 3), strides=(2, 2), padding="same", activation="relu"
        )
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=num_hid)

    def call(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = tf.keras.layers.Flatten()(x)
        # Apply the positional embedding
        x = self.pos_emb(x)
        # Expand the dimensions to match the desired output shape
        return x


Transformer Encoder

In [9]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, num_heads, feed_forward_dim, rate=0.1):
        super().__init__()
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = keras.Sequential(
            [
                layers.Dense(feed_forward_dim, activation="relu"),
                layers.Dense(embed_dim),
            ]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs, training):
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

Transformer Decoder

In [10]:
# Customized to add `training` variable
# Reference: https://www.kaggle.com/code/shlomoron/aslfr-a-simple-transformer/notebook

class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, num_heads, feed_forward_dim, dropout_rate=0.1):
        super().__init__()
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = layers.LayerNormalization(epsilon=1e-6)
        self.self_att = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.enc_att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.self_dropout = layers.Dropout(0.5)
        self.enc_dropout = layers.Dropout(0.1)
        self.ffn_dropout = layers.Dropout(0.1)
        self.ffn = keras.Sequential(
            [
                layers.Dense(feed_forward_dim, activation="relu"),
                layers.Dense(embed_dim),
            ]
        )

    def causal_attention_mask(self, batch_size, n_dest, n_src, dtype):
        """Masks the upper half of the dot product matrix in self attention.

        This prevents flow of information from future tokens to current token.
        1's in the lower triangle, counting from the lower right corner.
        """
        i = tf.range(n_dest)[:, None]
        j = tf.range(n_src)
        m = i >= j - n_src + n_dest
        mask = tf.cast(m, dtype)
        mask = tf.reshape(mask, [1, n_dest, n_src])
        mult = tf.concat(
            [batch_size[..., tf.newaxis], tf.constant([1, 1], dtype=tf.int32)], 0
        )
        return tf.tile(mask, mult)

    def call(self, enc_out, target, training):
        input_shape = tf.shape(target)
        batch_size = input_shape[0]
        seq_len = input_shape[1]
        causal_mask = self.causal_attention_mask(batch_size, seq_len, seq_len, tf.bool)
        target_att = self.self_att(target, target, attention_mask=causal_mask)
        target_norm = self.layernorm1(target + self.self_dropout(target_att, training = training))
        enc_out = self.enc_att(target_norm, enc_out)
        enc_out_norm = self.layernorm2(self.enc_dropout(enc_out, training = training) + target_norm)
        ffn_out = self.ffn(enc_out_norm)
        ffn_out_norm = self.layernorm3(enc_out_norm + self.ffn_dropout(ffn_out, training = training))
        return ffn_out_norm

Transformer Model

In [11]:
class Transformer(keras.Model):
    def __init__(
        self,
        num_hid=64,
        num_head=2,
        num_feed_forward=128,
        source_maxlen=100,
        target_maxlen=100,
        num_layers_enc=4,
        num_layers_dec=1,
        num_classes=60,
    ):
        super().__init__()
        self.loss_metric = keras.metrics.Mean(name="loss")
        self.acc_metric = keras.metrics.Mean(name="edit_dist")
        self.num_layers_enc = num_layers_enc
        self.num_layers_dec = num_layers_dec
        self.target_maxlen = target_maxlen
        self.num_classes = num_classes

        self.enc_input = LandmarkEmbedding2D(num_hid=num_hid, maxlen=source_maxlen)
        self.dec_input = TokenEmbedding(
            num_vocab=num_classes, maxlen=target_maxlen, num_hid=num_hid
        )

        self.encoder = keras.Sequential(
            [self.enc_input]
            + [
                TransformerEncoder(num_hid, num_head, num_feed_forward)
                for _ in range(num_layers_enc)
            ]
        )

        for i in range(num_layers_dec):
            setattr(
                self,
                f"dec_layer_{i}",
                TransformerDecoder(num_hid, num_head, num_feed_forward),
            )

        self.classifier = layers.Dense(num_classes)

    def decode(self, enc_out, target, training):
        y = self.dec_input(target)
        for i in range(self.num_layers_dec):
            y = getattr(self, f"dec_layer_{i}")(enc_out, y, training)
        return y

    def call(self, inputs, training):
        source = inputs[0]
        target = inputs[1]
        x = self.encoder(source, training)
        y = self.decode(x, target, training)
        return self.classifier(y)

    @property
    def metrics(self):
        return [self.loss_metric]

    def train_step(self, batch):
        """Processes one batch inside model.fit()."""
        source = batch[0]
        target = batch[1]

        input_shape = tf.shape(target)
        batch_size = input_shape[0]
        
        dec_input = target[:, :-1]
        dec_target = target[:, 1:]
        with tf.GradientTape() as tape:
            preds = self([source, dec_input])
            one_hot = tf.one_hot(dec_target, depth=self.num_classes)
            mask = tf.math.logical_not(tf.math.equal(dec_target, pad_token_idx))
            loss = self.compiled_loss(one_hot, preds, sample_weight=mask)
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        # Computes the Levenshtein distance between sequences since the evaluation
        # metric for this contest is the normalized total levenshtein distance.
        edit_dist = tf.edit_distance(tf.sparse.from_dense(target), 
                                     tf.sparse.from_dense(tf.cast(tf.argmax(preds, axis=1), tf.int32)))
        edit_dist = tf.reduce_mean(edit_dist)
        self.acc_metric.update_state(edit_dist)
        self.loss_metric.update_state(loss)
        return {"loss": self.loss_metric.result(), "edit_dist": self.acc_metric.result()}

    def test_step(self, batch):        
        source = batch[0]
        target = batch[1]

        input_shape = tf.shape(target)
        batch_size = input_shape[0]
        
        dec_input = target[:, :-1]
        dec_target = target[:, 1:]
        preds = self([source, dec_input])
        one_hot = tf.one_hot(dec_target, depth=self.num_classes)
        mask = tf.math.logical_not(tf.math.equal(dec_target, pad_token_idx))
        loss = self.compiled_loss(one_hot, preds, sample_weight=mask)
        # Computes the Levenshtein distance between sequences since the evaluation
        # metric for this contest is the normalized total levenshtein distance.
        edit_dist = tf.edit_distance(tf.sparse.from_dense(target), 
                                     tf.sparse.from_dense(tf.cast(tf.argmax(preds, axis=1), tf.int32)))
        edit_dist = tf.reduce_mean(edit_dist)
        self.acc_metric.update_state(edit_dist)
        self.loss_metric.update_state(loss)
        return {"loss": self.loss_metric.result(), "edit_dist": self.acc_metric.result()}

    def generate(self, source, target_start_token_idx):
        """Performs inference over one batch of inputs using greedy decoding."""
        bs = tf.shape(source)[0]
        enc = self.encoder(source, training = False)
        dec_input = tf.ones((bs, 1), dtype=tf.int32) * target_start_token_idx
        dec_logits = []
        for i in range(self.target_maxlen - 1):
            dec_out = self.decode(enc, dec_input, training = False)
            logits = self.classifier(dec_out)
            logits = tf.argmax(logits, axis=-1, output_type=tf.int32)
            last_logit = logits[:, -1][..., tf.newaxis]
            dec_logits.append(last_logit)
            dec_input = tf.concat([dec_input, last_logit], axis=-1)
        return dec_input

Display Output

In [12]:
class DisplayOutputs(keras.callbacks.Callback):
    def __init__(
        self, batch, idx_to_token, target_start_token_idx=60, target_end_token_idx=61
    ):
        """Displays a batch of outputs after every 4 epoch

        Args:
            batch: A test batch
            idx_to_token: A List containing the vocabulary tokens corresponding to their indices
            target_start_token_idx: A start token index in the target vocabulary
            target_end_token_idx: An end token index in the target vocabulary
        """
        self.batch = batch
        self.target_start_token_idx = target_start_token_idx
        self.target_end_token_idx = target_end_token_idx
        self.idx_to_char = idx_to_token

    def on_epoch_end(self, epoch, logs=None):
        if epoch % 4 != 0:
            return
        source = self.batch[0]
        target = self.batch[1].numpy()
        bs = tf.shape(source)[0]
        preds = self.model.generate(source, self.target_start_token_idx)
        preds = preds.numpy()
        for i in range(bs):
            target_text = "".join([self.idx_to_char[_] for _ in target[i, :]])
            prediction = ""
            for idx in preds[i, :]:
                prediction += self.idx_to_char[idx]
                if idx == self.target_end_token_idx:
                    break
            print(f"target:     {target_text.replace('-','')}")
            print(f"prediction: {prediction}\n")




Train transformer 

In [1]:
batch = next(iter(valid_ds))

# The vocabulary to convert predicted indices into characters
idx_to_char = list(char_to_num.keys())
display_cb = DisplayOutputs(
    batch, idx_to_char, target_start_token_idx=char_to_num['<'], target_end_token_idx=char_to_num['>']
)  # set the arguments as per vocabulary index for '<' and '>'

model = Transformer(
    num_hid=200,
    num_head=4,
    num_feed_forward=400,
    source_maxlen = 413,
    target_maxlen=64,
    num_layers_enc=2,
    num_layers_dec=1,
    num_classes=62
)
loss_fn = tf.keras.losses.CategoricalCrossentropy(
    from_logits=True, label_smoothing=0.1,
)


optimizer = keras.optimizers.Adam(0.0001)
model.compile(optimizer=optimizer, loss=loss_fn)

history = model.fit(train_ds, validation_data=valid_ds, callbacks=[display_cb], epochs=13)

NameError: name 'valid_ds' is not defined

Create TFLite Model and Prepare for Submission

In [None]:
 class TFLiteModel(tf.Module):
    def __init__(self, model):
        super(TFLiteModel, self).__init__()
        self.target_start_token_idx = start_token_idx
        self.target_end_token_idx = end_token_idx
        # Load the feature generation and main models
        self.model = model
    
    @tf.function(input_signature=[tf.TensorSpec(shape=[None, len(FEATURE_COLUMNS)], dtype=tf.float32, name='inputs')])
    def __call__(self, inputs, training=False):
        # Preprocess Data
        x = tf.cast(inputs, tf.float32)
        x = x[None]
        x = tf.cond(tf.shape(x)[1] == 0, lambda: tf.zeros((1, 1, len(FEATURE_COLUMNS))), lambda: tf.identity(x))
        x = x[0]
        x = pre_process(x)
        x = x[None]
        x = self.model.generate(x, self.target_start_token_idx)
        x = x[0]
        idx = tf.argmax(tf.cast(tf.equal(x, self.target_end_token_idx), tf.int32))
        idx = tf.where(tf.math.less(idx, 1), tf.constant(2, dtype=tf.int64), idx)
        x = x[1:idx]
        x = tf.one_hot(x, 59)
        return {'outputs': x}
    
tflitemodel_base = TFLiteModel(model)
model.save_weights("model.h5")

keras_model_converter = tf.lite.TFLiteConverter.from_keras_model(tflitemodel_base)
keras_model_converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS]#, tf.lite.OpsSet.SELECT_TF_OPS]
tflite_model = keras_model_converter.convert()
with open('/kaggle/working/model.tflite', 'wb') as f:
    f.write(tflite_model)
    
infargs = {"selected_columns" : FEATURE_COLUMNS}

with open('inference_args.json', "w") as json_file:
    json.dump(infargs, json_file)

!zip submission.zip  './model.tflite' './inference_args.json'

interpreter = tf.lite.Interpreter("model.tflite")

REQUIRED_SIGNATURE = "serving_default"
REQUIRED_OUTPUT = "outputs"

with open ("/kaggle/input/asl-fingerspelling/character_to_prediction_index.json", "r") as f:
    character_map = json.load(f)
rev_character_map = {j:i for i,j in character_map.items()}

found_signatures = list(interpreter.get_signature_list().keys())

if REQUIRED_SIGNATURE not in found_signatures:
    raise KernelEvalException('Required input signature not found.')

prediction_fn = interpreter.get_signature_runner("serving_default")
output = prediction_fn(inputs=batch[0][0])
prediction_str = "".join([rev_character_map.get(s, "") for s in np.argmax(output[REQUIRED_OUTPUT], axis=1)])
print(prediction_str)