In [2]:
from pydrive2.auth import GoogleAuth
from pydrive2.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials
from kaggle_secrets import UserSecretsClient

user_secrets = UserSecretsClient()
gdrive_kaggle_wl_ar_sl = user_secrets.get_secret("gdrive_kaggle_wl_ar_sl")

auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

def upload_to_gdrive(file_name, file_content):
    file_metadata = {
        'title': file_name,
        'parents': [{'id': gdrive_kaggle_wl_ar_sl}]
    }
    file = drive.CreateFile(file_metadata)
    file.SetContentString(file_content)
    file.Upload() # Files.insert()

In [1]:
!wget https://github.com/issamjebnouni/Arabic-Word-level-Sign-Language-Recognition/raw/refs/heads/main/KARSL-502_Labels.xlsx

--2025-08-03 22:57:54--  https://github.com/issamjebnouni/Arabic-Word-level-Sign-Language-Recognition/raw/refs/heads/main/KARSL-502_Labels.xlsx
Resolving github.com (github.com)... 140.82.113.4
Connecting to github.com (github.com)|140.82.113.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/issamjebnouni/Arabic-Word-level-Sign-Language-Recognition/refs/heads/main/KARSL-502_Labels.xlsx [following]
--2025-08-03 22:57:54--  https://raw.githubusercontent.com/issamjebnouni/Arabic-Word-level-Sign-Language-Recognition/refs/heads/main/KARSL-502_Labels.xlsx
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.111.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 26778 (26K) [application/octet-stream]
Saving to: ‘KARSL-502_Labels.xlsx’


2025-08-03 22:57:54 (13

In [116]:
import os 
from tqdm.notebook import tqdm
from collections import defaultdict

def get_karsl_words_min_frames_cnt():
    in_dir = "/kaggle/input/karsl-502"
    words = [str(num).zfill(4) for num in range(1,503)]
    words_frames = defaultdict(lambda: (0, None))
    for signer in tqdm([1,2,3], desc='signer'):
        signer = str(signer).zfill(2)
        signer_dir = os.path.join(in_dir, signer, signer)
        for split in tqdm(['train', 'test'], desc='split', leave=False):
            split_dir = os.path.join(signer_dir, split)
            for word in tqdm(words, desc='words', leave=False):
                frames = (999, None)
                word_dir = os.path.join(split_dir, word)
                for rep in os.listdir(word_dir):
                    frames_dir = os.path.join(word_dir, rep)
                    frames_cnt = len(os.listdir(frames_dir))
                    if frames_cnt < frames[0]:
                        frames = (frames_cnt, frames_dir)
                if frames[0] > words_frames[word][0]:
                    words_frames[word] = frames
    return words_frames

# words_frames = get_karsl_words_min_frames_cnt()

In [75]:
bad_samples = [
    # this sample has >260 frames, and after inspection it has many unrelated frames, so just drop it
    'karsl-502/02/02/train/0443/03_02_0443_(15_11_17_15_52_07)_c',
]

PAD_TKN = -1
SEQ_LEN = 80

In [None]:
# !tar -cf sample.tar.gz '/kaggle/input/karsl-502/03/03/test/0102/03_03_0102_(22_12_16_10_40_19)_c'
# sorted(words_frames.values())

In [None]:
import mediapipe as mp
import cv2
import numpy as np
import os
from tqdm.notebook import tqdm
from concurrent.futures import ThreadPoolExecutor


mp_holistic = mp.solutions.holistic.Holistic()
# mp_hands = mp.solutions.hands.Hands()
# mp_pose = mp.solutions.pose.Pose()
# mp_face = mp.solutions.face_mesh.FaceMesh(refine_landmarks=True)

mp_face_nose_idx = mp.solutions.face_mesh_connections.FACEMESH_NOSE[0][0]
mp_hand_wrist_idx = mp.solutions.hands.HandLandmark.WRIST
mp_pose_nose_idx = mp.solutions.pose.PoseLandmark.NOSE


pose_kps_idx = tuple(range(11, 17))
face_kps_idx = tuple(
    set(
        point
        for edge in [
            mp.solutions.face_mesh_connections.FACEMESH_CONTOURS,
            mp.solutions.face_mesh_connections.FACEMESH_IRISES,
        ]
        for point in edge
    )
)
hand_kps_idx = tuple(
    set(point for edge in mp.solutions.hands.HAND_CONNECTIONS for point in edge)
)

POSE_NUM = len(pose_kps_idx)
FACE_NUM = len(face_kps_idx)
HAND_NUM = len(hand_kps_idx)
# POSE_NUM, FACE_NUM, HAND_NUM

In [None]:
# def adjust_keypoints(arr, center):
#     # arr_reshaped = arr.reshape(-1, 3)
#     # center_repeated = np.tile(center, (len(arr_reshaped), 1))
#     # arr_adjusted = arr_reshaped - center_repeated
#     # return arr_adjusted.reshape(-1)
#     return arr - center


def extract_frame_keypoints(results):
    # TODO: normalize(?) keypoints after adjustment

    def get_xyz(lm):
        return (lm.x, lm.y, lm.z)

    # define numpy views, pose -> face -> rh -> lh
    all_kps = np.zeros((180, 3))  # (pose=6 + face=132 + rh+lh=42), xyz=3
    pose_kps = all_kps[:POSE_NUM]
    face_kps = all_kps[POSE_NUM : POSE_NUM + FACE_NUM]
    rh_kps = all_kps[POSE_NUM + FACE_NUM : POSE_NUM + FACE_NUM + HAND_NUM]
    lh_kps = all_kps[POSE_NUM + FACE_NUM + HAND_NUM :]

    def get_pose():
        nonlocal pose_kps
        lms = results.pose_landmarks.landmark
        pose_kps[:] = (get_xyz(lms[idx]) for idx in pose_kps_idx)
        # pose_kps[:] = adjust_keypoints(pose_kps, pose_kps[0])
        pose_kps -= pose_kps[mp_pose_nose_idx]

    def get_face():
        nonlocal face_kps
        lms = results.face_landmarks.landmark
        face_kps[:] = (get_xyz(lms[idx]) for idx in face_kps_idx)
        # face_kps[:] = adjust_keypoints(face_kps, face_kps[0])
        face_kps -= face_kps[mp_face_nose_idx]

    def get_rh():
        nonlocal rh_kps
        rh_kps[:] = (get_xyz(lm) for lm in results.right_hand_landmarks.landmark)
        # rh_kps[:] = adjust_keypoints(rh_kps, rh_kps[0])
        rh_kps -= rh_kps[mp_hand_wrist_idx]

    def get_lh():
        nonlocal lh_kps
        lh_kps[:] = (get_xyz(lm) for lm in results.left_hand_landmarks.landmark)
        # lh_kps[:] = adjust_keypoints(lh_kps, lh_kps[0])
        lh_kps -= lh_kps[mp_hand_wrist_idx]

    with ThreadPoolExecutor(max_workers=4) as executor:
        executor.submit(get_pose)
        executor.submit(get_face)
        executor.submit(get_rh)
        executor.submit(get_lh)

    return all_kps


def mediapipe_detection(image, model):
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    return model.process(image_rgb)


def store_keypoint_arrays(data_dir, out_dir, signer, split, selected_words):
    """This function generates numpy arrays of keypoints for each video in the specified folder location.
    Args:
      signer(int): the signer of interest. Could be 01 or 02 or 03
      split(str): can be 'train', 'test' or 'val'
    """
    out_dir = os.path.join(out_dir, "karsl-502", str(signer), split)
    os.makedirs(out_dir, exist_ok=True)

    split_dir = os.path.join(data_dir, str(signer), split)
    for word in tqdm(selected_words):
        # [pose_dir, face_dir, rh_dir, lh_dir] = [
        #     os.path.join(out_dir, word, dir_)
        #     for dir_ in ["pose_kps", "face_kps", "rh_kps", "lh_kps"]
        # ]
        # for dir_ in [pose_dir, face_dir, rh_dir, lh_dir]:
        #     os.makedirs(dir_, exist_ok=True)

        word_kps_dir = os.path.join(out_dir, "all_kps", f"{signer}-{split}", word)
        os.makedirs(word_kps_dir, exist_ok=True)

        word_dir = os.path.join(split_dir, word)
        videos = os.listdir(word_dir)
        for video in videos:
            video_dir = os.path.join(word_dir, video)
            video_frames = sorted(os.listdir(video_dir))

            video_kps_dir = os.path.join(word_kps_dir, video)
            # video_pose_dir = os.path.join(pose_dir, video)
            # video_face_dir = os.path.join(face_dir, video)
            # video_rh_dir = os.path.join(rh_dir, video)
            # video_lh_dir = os.path.join(lh_dir, video)

            # pose_kps, face_kps, lh_kps, rh_kps = [], [], [], []
            all_kps = []
            holistic = mp_holistic(
                min_detection_confidence=0.5, min_tracking_confidence=0.5
            )
            with holistic:
                for frame in video_frames:
                    frame = cv2.imread(os.path.join(video_dir, frame))
                    # frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

                    all_kps.append(
                        extract_frame_keypoints(mediapipe_detection(frame, holistic))
                    )

                    # Normalize pixel values to the range [0, 1]
                    # pose, face, rh, lh = extract_frame_keypoints(
                    #     mediapipe_detection(frame, holistic)
                    # )

                    # pose_kps.append(pose)
                    # face_kps.append(face)
                    # rh_kps.append(rh)
                    # lh_kps.append(lh)

                    # np.save(video_pose_dir, pose_kps)
                    # np.save(video_face_dir, face_kps)
                    # np.save(video_rh_dir, rh_kps)
                    # np.save(video_lh_dir, lh_kps)

            np.save(video_kps_dir, all_kps)

In [2]:
def extract_keypoints_from_frames(data_root_dir, signers=None, splits=None):
    if signers is None:
        signers = ["01", "02", "03"]
    if splits is None:
        splits = ["train", "test"]
    for signer in signers:
        for split in splits:
            store_keypoint_arrays(data_root_dir, "./karsl-data/", signer, split)


extract_keypoints_from_frames("/kaggle/input/karsl-502")

In [None]:
def load_keypoints(kps_data_path, signers, split, words, f_avg):
    def pad_seq_(x, padding_amount):
        x = np.concatenate((x, np.repeat(x[-1], padding_amount, axis=0)), axis=0)

    sequences = []
    for word in tqdm(words):
        word = f"{w2id[word]:04}"
        for signer in signers:
            word_dir = os.path.join(kps_data_path, f"{signer}-{split}", word)
            for sequence in os.listdir(os.path.join(word_dir, "lh_keypoints")):
                seq_pose, seq_face, seq_lh, seq_rh = [
                    np.load(os.path.join(word_dir, kp, sequence))
                    for kp in ["pose_kps", "face_kps", "lh_kps", "rh_kps"]
                ]

                if f_avg > seq_lh.shape[0]:
                    padding_amount = f_avg - seq_lh.shape[0]
                    pad_seq_(seq_pose, padding_amount)
                    pad_seq_(seq_face, padding_amount)
                    pad_seq_(seq_lh, padding_amount)
                    pad_seq_(seq_rh, padding_amount)
                    # seq_lh = pad_seq(seq_lh, padding_amount)
                    # seq_rh = pad_seq(seq_rh, padding_amount)
                    # seq_face = pad_seq(seq_face, padding_amount)
                    # seq_pose = pad_seq(seq_pose, padding_amount)
                    # padding = np.repeat(seq_lh[-1], padding_amount, axis=0)
                    # seq_lh = np.concatenate((seq_lh, padding), axis=0)
                    # padding = np.repeat(seq_rh[-1], padding_amount, axis=0)
                    # seq_rh = np.concatenate((seq_rh, padding), axis=0)
                    # padding = np.repeat(seq_face[-1], padding_amount, axis=0)
                    # seq_face = np.concatenate((seq_face, padding), axis=0)
                    # padding = np.repeat(seq_pose[-1], padding_amount, axis=0)
                    # seq_pose = np.concatenate((seq_pose, padding), axis=0)

                sequences.append(
                    np.concatenate((seq_pose, seq_face, seq_lh, seq_rh), axis=1)
                )

    X = np.array(sequences)
    y = np.array([label_map[word] for word in words])
    y = OneHotEncoder(sparse=False).fit_transform(y.reshape(-1, 1))

    return X, y

In [None]:
load_keypoints("./karsl-data/", ["01", "02", "03"], "train", None, 100)