In [None]:
!pip install mediapipe==0.9.3 protobuf==3.20.*
!pip install py7zr

Collecting mediapipe==0.9.3
  Downloading mediapipe-0.9.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (21 kB)
Collecting sounddevice>=0.4.4 (from mediapipe==0.9.3)
  Downloading sounddevice-0.4.7-py3-none-any.whl.metadata (1.4 kB)
Downloading mediapipe-0.9.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (33.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m33.9/33.9 MB[0m [31m21.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading sounddevice-0.4.7-py3-none-any.whl (32 kB)
Installing collected packages: sounddevice, mediapipe
Successfully installed mediapipe-0.9.3.0 sounddevice-0.4.7
Collecting py7zr
  Downloading py7zr-0.21.1-py3-none-any.whl.metadata (17 kB)
Collecting texttable (from py7zr)
  Downloading texttable-1.7.0-py2.py3-none-any.whl.metadata (9.8 kB)
Collecting pycryptodomex>=3.16.0 (from py7zr)
  Downloading pycryptodomex-3.20.0-cp35-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.4 kB)
Collecting pyz

In [None]:
import pandas as pd
import os
import cv2
import numpy as np
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Masking
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.utils import to_categorical
from collections import defaultdict
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import mediapipe as mp
import zipfile
import py7zr
from google.colab import drive
from tensorflow.keras.utils import to_categorical
drive.mount('/content/drive')
base_path = '/content/drive/MyDrive/KArSL-502'
output_base_path='/content/kArSL-502'
if not os.path.exists(output_base_path):
          os.makedirs(output_base_path)
num_classes = 502

Mounted at /content/drive


In [None]:
df=pd.read_excel(os.path.join(base_path,"KARSL-502_Labels.xlsx"))
df.head()

Unnamed: 0,SignID,Sign-Arabic,Sign-English
0,1,0,0
1,2,1,1
2,3,2,2
3,4,3,3
4,5,4,4


In [None]:
id_to_label={}
for index,row in df.iterrows():
  id=row['SignID']
  label=row['Sign-English']
  id_to_label[id]=label

In [None]:
df_trains=[]
df_tests=[]

for item in os.listdir(base_path):
  if (os.path.isdir(os.path.join(base_path,item))):
    for subitem in os.listdir(os.path.join(base_path,item)):
      for zipfile in os.listdir(os.path.join(base_path,item,subitem)):
        #if zipfile.startswith('000'):
          output_path = os.path.join(output_base_path, item, subitem, zipfile.rstrip('.7z'))
          if not os.path.exists(output_path):
            os.makedirs(output_path)
          with py7zr.SevenZipFile(os.path.join(base_path,item,subitem,zipfile), 'r') as archive:
            archive.extractall(output_path)

In [None]:
# Initialize MediaPipe Holistic
mp_holistic = mp.solutions.holistic
holistic = mp_holistic.Holistic(static_image_mode=True, model_complexity=1, smooth_landmarks=True)

# Initialize MediaPipe Drawing
mp_drawing = mp.solutions.drawing_utils
drawing_spec = mp_drawing.DrawingSpec(color=(128,0,128),thickness=1, circle_radius=1)
connect_drawing_spec = mp_drawing.DrawingSpec(color=(200,200,200),thickness=1, circle_radius=1)

def mediapipe_detection(image, model):
  results = holistic.process(image)
  image_copy=image.copy()
  if results.pose_landmarks:
      mp_drawing.draw_landmarks(
          image_copy, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
          landmark_drawing_spec=drawing_spec, connection_drawing_spec=drawing_spec)
  if results.face_landmarks:
      mp_drawing.draw_landmarks(
          image_copy, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION,
          landmark_drawing_spec=drawing_spec, connection_drawing_spec=connect_drawing_spec)
  if results.left_hand_landmarks:
      mp_drawing.draw_landmarks(
          image_copy, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
          landmark_drawing_spec=drawing_spec, connection_drawing_spec=drawing_spec)
  if results.right_hand_landmarks:
      mp_drawing.draw_landmarks(
          image_copy, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
          landmark_drawing_spec=drawing_spec, connection_drawing_spec=drawing_spec)
  return image_copy, results

In [None]:
def extract_keypoints(results):
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([lh, rh])

In [None]:
def pad_or_truncate_videos(videos, max_len=25, frame_size=126):
    padded_videos = []
    for video in videos:
        num_frames = video.shape[0]
        if num_frames < max_len:
            # Pad with zeros
            padding = np.zeros((max_len - num_frames, frame_size))
            padded_video = np.vstack((video, padding))
        else:
            # Truncate to max_len
            padded_video = video[:max_len]
        padded_videos.append(padded_video)
    return np.array(padded_videos)

In [None]:
def process_batch(videos):

  videos_landmarks=[]

  # Process each image
  for video in videos:
    images_landmarks = []
    for image in video:
        holistic_image, results = mediapipe_detection(image, holistic)
        landmarks=extract_keypoints(results)
        images_landmarks.append(landmarks)
    images_landmarks = np.array(images_landmarks)
    videos_landmarks.append(images_landmarks)
  padded_landmarks=pad_or_truncate_videos(videos_landmarks)
  print("made  it")
  return padded_landmarks

In [None]:
def load_videos_in_batches(folder_paths, batch_size):
    all_videos = []
    all_labels = []
    current_batch_size = 0

    for folder_path, label in folder_paths:
        images = []
        for image_name in os.listdir(folder_path):
            image_path = os.path.join(folder_path, image_name)
            image = mpimg.imread(image_path)
            images.append(image)

        if images:
            all_videos.append(np.array(images))
            all_labels.append(label)
            current_batch_size += 1

        if current_batch_size >= batch_size:
            yield all_videos, np.array(all_labels)
            all_videos = []
            all_labels = []
            current_batch_size = 0

    if all_videos:
        yield all_videos, np.array(all_labels)

In [None]:
def encode_labels(y):
  y_train = to_categorical(y - 1, num_classes=num_classes)
  return y_train

In [None]:
def train_model(X_train,batch_labels):
  X_train, X_val, y_train, y_val = train_test_split(X_train, batch_labels, test_size=0.2, stratify=batch_labels, random_state=42)

  # Convert the labels back to one-hot encoding after splitting
  y_train = encode_labels(y_train)
  y_val = encode_labels(y_val)
  # Train the model with validation data
  print("training")
  history = model.fit(X_train, y_train, epochs=50, batch_size=32, validation_data=(X_val, y_val),
                      callbacks=[early_stopping])



In [None]:
def process_data_in_batches(output_base_path, batch_size):
    folder_paths = []

    for fold in os.listdir(output_base_path):
        for subfold in os.listdir(os.path.join(output_base_path, fold)):
            if subfold == "train":
                signer_folder_path = os.path.join(output_base_path, fold, subfold)
                for i, folder in enumerate(os.listdir(signer_folder_path)):
                    folder_path = os.path.join(signer_folder_path, folder)
                    for j, idfolder in enumerate(os.listdir(folder_path)):
                        idfolder_path = os.path.join(folder_path, idfolder)
                        for k, subfolder in enumerate(os.listdir(idfolder_path)):
                            subfolder_path = os.path.join(idfolder_path, subfolder)
                            folder_paths.append((subfolder_path, int(idfolder)))

    for batch_videos, batch_labels in load_videos_in_batches(folder_paths, batch_size):
        print(len(batch_videos),batch_videos[0].shape)
        videos_landmarks=process_batch(batch_videos)
        print(videos_landmarks.shape)
        train_model(videos_landmarks,batch_labels)

In [None]:
model = Sequential()
model.add(Masking(mask_value=0., input_shape=(25, 42 * 3)))
model.add(LSTM(64, return_sequences=True, activation='relu'))
model.add(LSTM(128, return_sequences=True, activation='relu'))
model.add(LSTM(64, return_sequences=False, activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(num_classes, activation='softmax'))


# Compile the model
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Define callbacks
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

In [None]:
batch_size = 100

process_data_in_batches(output_base_path, batch_size)

100 (40, 256, 256, 3)
made  it
(100, 25, 126)
training
Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
100 (28, 256, 256, 3)
made  it
(100, 25, 126)
training
Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
100 (49, 256, 256, 3)


In [None]:
model.save('Sign-Translator.h5')