In [2]:
import os
import json
import numpy as np
import pandas as pd
import tensorflow as tf

import mediapipe as mp

from tqdm import tqdm
import logging

# import matplotlib.pyplot as plt
from mediapipe.framework.formats import landmark_pb2

mp_holistic = mp.solutions.holistic
logger = logging.getLogger(__name__)
logger.setLevel("INFO")

ModuleNotFoundError: No module named 'tensorflow'

In [None]:
# Constants

connections_type = dict(
    face=mp_holistic.FACE_CONNECTIONS,
    left_hand=mp_holistic.HAND_CONNECTIONS,
    right_hand=mp_holistic.HAND_CONNECTIONS,
    pose=mp_holistic.POSE_CONNECTIONS
)

IMG_SIZE = (512, 512, 3)
VAL_SIZE = 0.2

train_dataset_path = "train_landmark_files/"
save_dataset_path = "transformed_imgs/"
class_path = "train.csv"
class_to_label_path = "sign_to_prediction_index_map.json"
train_data_with_label = "train_data_with_label.csv"

df = pd.read_csv(train_data_with_label)
df = df.sample(frac = 1.0)

val_df = df.iloc[0:int(np.floor(df.shape[0] * VAL_SIZE)), :]
train_df = df.iloc[int(np.floor(df.shape[0] * VAL_SIZE)):df.shape[0], :]

In [None]:
class ASLModel():

    def __init__(
        self,
        n_classes:int,
        pretrained_model:str="mobilenet_v3",
        input_shape:tuple=None,
        load_weights:bool=True
    ) -> None:
        logger.info(f"Loading {pretrained_model} pre-trained model")
        
        self.n_classes = n_classes

        if pretrained_model == 'mobilenet':
            self.base = tf.keras.applications.MobileNetV2(
                weights='imagenet' if load_weights else None,
                include_top=False,
                input_shape=input_shape,
            )

        elif pretrained_model == 'mobilenet_v3':
            self.base = tf.keras.applications.MobileNetV3Small(
                weights='imagenet' if load_weights else None,
                include_top=False,
                input_shape=input_shape,
                pooling='avg',
                minimalistic=True
            )

        elif pretrained_model == 'resnet':
            self.base = tf.keras.applications.ResNet152V2(
                weights='imagenet' if load_weights else None,
                include_top=False,
                input_shape=input_shape
            )

        elif pretrained_model == 'efficientnet':
            self.base = tf.keras.applications.EfficientNetB7(
                weights='imagenet' if load_weights else None,
                include_top=False,
                input_shape=input_shape,
            )

        elif pretrained_model == 'efficientnet_v2':
            self.base = tf.keras.applications.EfficientNetV2S(
                weights='imagenet' if load_weights else None,
                include_top=False,
                input_shape=input_shape,
            )

        self.base.trainable = False if load_weights else True
        inputs = tf.keras.Input(shape=input_shape)

        self.model = tf.keras.Sequential()
        self.model.add(inputs)
        self.model.add(self.base)

    def _initialize_network(self) -> tf.keras.Model:
        """
        Initializes the network on top of pre-trained model

        :params:
            None

        :returns:
            None
        """
        self.model.add(tf.keras.layers.Flatten())
        self.model.add(tf.keras.layers.Dense(512, activation = 'relu'))
        self.model.add(tf.keras.layers.BatchNormalization())
        self.model.add(tf.keras.layers.Dropout(0.5))
        self.model.add(tf.keras.layers.Dense(
            self.n_classes, activation = 'softmax'
        ))
        self.model.add(tf.keras.layers.Softmax())

    def get_model(self) -> tf.keras.Model:
        """
        Compiles the model and return it

        :params:
            None

        :returns:
            {tf.keras.Model}
        """
        self._initialize_network()

        self.model.compile(
            optimizer='adam',
            loss='sparse_categorical_crossentropy',
            metrics=[
                'acc'
            ]
        )

        logger.info(f"{self.model.summary()}")

        return self.model

    def finetune_model(self, learning_rate: float = 1e-5):
        """
        Compiles the model for finetuning and returns it.
        Please make sure that model is trained first before finetuning the pretrained model

        Reference: https://keras.io/guides/transfer_learning/#do-a-round-of-finetuning-of-the-entire-model

        :params:
            learning_rate {float}       -- Learning rate to be used while
            finetuning the model. Needs to be low so that the pre-trained model
            weights don't blow up

        :returns:
            {tf.keras.Model}
        """
        self.base.trainable = True

        self.model.compile(
            optimizer=tf.keras.optimizers.Adam(learning_rate),
            loss='binary_crossentropy',
            metrics=[
                'acc',
                tf.keras.metrics.TrueNegatives(),
                tf.keras.metrics.FalsePositives(),
                tf.keras.metrics.FalseNegatives(),
                tf.keras.metrics.TruePositives()
            ]
        )

        logger.info(f"{self.model.summary()}")

        return self.model

    @property
    def classifier(self):
        return self.model

In [None]:
asl_detector = ASLModel(
    n_classes = df.y_label.nunique(),
    input_shape = IMG_SIZE
)

model = asl_detector.get_model()

In [None]:
class GISLRSequence(tf.keras.utils.Sequence):
    def __init__(self, df: pd.DataFrame, x_col: str, y_col: str, batch_size: int, shuffle: bool = True) -> None:
        self.batch_size = batch_size
        self.shuffle = shuffle

        self.x_files_path = df[x_col]
        self.y = df[y_col]
        
        self.nframes = df.nframes
        self.indices = df.index.to_list()
    
    def __len__(self):
        return (self.nframes.sum() // self.batch_size)
    
    def __getitem__(self, idx):
        subset = self.indices[(idx * self.batch_size):((idx + 1) * self.batch_size)]
        batch_x = self.x_files_path[subset]
        batch_y = self.y[subset]

        X, y = self._get_data(batch_x, batch_y)

        return X, y
    
    def on_epoch_end(self):
        if shuffle:
            np.random.shuffle(self.indices)
    
    def _get_data(self, batch_x, batch_y):
        temp_X, temp_y = [], []

        for idx, idy in zip(batch_x, batch_y):
            temp = self._get_transformed_data(idx)

            temp_X.append(temp)
            temp_y.append(np.array(temp.shape[0] * [idy]))

        if not temp_X:
            return np.empty((0, 0, 0, 0), dtype=np.uint8), np.empty((0), dtype=np.uint8)
            
        x = temp_X[0]
        y = temp_y[0]
        
        for i, elem in enumerate(zip(temp_X, temp_y)):
            if i == 0:
                continue

            x = np.concatenate((x, elem[0]))
            y = np.concatenate((y, elem[1]))

        return x, y

    def _get_pts_to_img(self, data: pd.DataFrame) -> np.ndarray:
        image = np.zeros(IMG_SIZE, np.uint8)

        pts = {
            "face": [],
            "left_hand": [],
            "right_hand": [],
            "pose": []
        }

        for elem in data.itertuples():
            pts[elem.type].append(
                landmark_pb2.NormalizedLandmark(
                    x=elem.x,
                    y=elem.y,
                    z=elem.z,
                    visibility=1.0
                )
            )

        for body_part, landmarks in pts.items():
            landmark_subset = landmark_pb2.NormalizedLandmarkList(landmark =  landmarks)

            mp.solutions.drawing_utils.draw_landmarks(
                image,
                landmark_subset,
                connections_type[body_part]
            )

        return image

    def _get_transformed_data(self, dataset_file_path) -> np.ndarray:
        df = pd.read_parquet(dataset_file_path)
        logger.info(f"Found: {df.frame.nunique()} frames in {dataset_file_path}")

        all_imgs = []

        for single_frame in df.frame.unique():
            temp = df[df.frame == single_frame]
            single_frame_img = self._get_pts_to_img(temp)
            all_imgs.append(single_frame_img)

        return np.array(all_imgs)

In [None]:
dataset = GISLRSequence(
    train_df,
    x_col = "filename",
    y_col = "y_label",
    batch_size = 16
)

val_dataset = GISLRSequence(
    val_df,
    x_col = "filename",
    y_col = "y_label",
    batch_size = 16
)

In [None]:
history = model.fit(
    dataset,
    epochs=12,
    verbose=1,
    validation_data=val_dataset
)