### Extract Data

In [1]:
import zipfile
import os

zip_path = 'wlasl-processed.zip'

os.makedirs('dataset', exist_ok=True)
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall('dataset')
    
print("Dataset extracted successfully to 'dataset' folder")

Dataset extracted successfully to 'dataset' folder


### Preprocess WLASL dataset

In [2]:
import cv2
import numpy as np
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import audio
import pickle
from tqdm.notebook import tqdm

In [3]:
mp_hands = mp.solutions.hands
hands = mp_hands.Hands(static_image_mode=True, 
                      max_num_hands=2, 
                      min_detection_confidence=0.5)


DATASET_PATH = 'dataset' 
OUTPUT_PATH = 'data\\processed_data'   
METADATA_PATH = os.path.join('dataset', 'WLASL_v0.3.json')


os.makedirs(OUTPUT_PATH, exist_ok=True)


print(f"Dataset path exists: {os.path.exists(DATASET_PATH)}")
print(f"Metadata file exists: {os.path.exists(METADATA_PATH)}")

Dataset path exists: True
Metadata file exists: True


In [4]:
def extract_landmarks(frame):

    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)    

    results = hands.process(rgb_frame)
    
    landmarks = []
    
    # check if hands are detected
    if results.multi_hand_landmarks:
        # For each hand found
        for hand_landmarks in results.multi_hand_landmarks:
            # Extract each landmark
            for lm in hand_landmarks.landmark:
                landmarks.extend([lm.x, lm.y, lm.z])
    

    while len(landmarks) < 21 * 3 * 2:  # Max 2 hands
        landmarks.append(0.0)
    
    return landmarks[:21 * 3 * 2]  

In [5]:
import os
import json


VIDEO_DIR = 'dataset/videos'
METADATA_PATH = 'dataset/WLASL_v0.3.json'
FILTERED_METADATA_PATH = 'data/filtered_wlasl_metadata.json'  


with open(METADATA_PATH, 'r') as f:
    full_metadata = json.load(f)

available_videos = set(os.listdir(VIDEO_DIR))

filtered_metadata = []

for entry in full_metadata:
    valid_instances = []
    for instance in entry['instances']:
        video_file = f"{instance['video_id']}.mp4"
        if video_file in available_videos:
            valid_instances.append(instance)

    # only keep entries with at least one valid video
    if valid_instances:
        filtered_entry = dict(entry)
        filtered_entry['instances'] = valid_instances
        filtered_metadata.append(filtered_entry)


# save filtered metadata
os.makedirs(os.path.dirname(FILTERED_METADATA_PATH), exist_ok=True)
with open(FILTERED_METADATA_PATH, 'w') as f:
    json.dump(filtered_metadata, f, indent=4)

In [8]:
def process_wlasl_dataset():

    with open(FILTERED_METADATA_PATH, 'r') as f:
        metadata = json.load(f)

    processed_data = {
        'data': [],
        'labels': [],
        'label_map': {}
    }
    
    total_videos = 0
    videos_found = 0
    videos_processed = 0
    videos_with_landmarks = 0
    

    for i, entry in enumerate(metadata):
        gloss = entry['gloss']  # sign/word
        instances = entry['instances']
        

        if i % 1000 == 0:
            print(f"Progress: {i}/{len(metadata)} classes ({i/len(metadata)*100:.1f}%)")
        
        # add to label map
        if gloss not in processed_data['label_map']:
            processed_data['label_map'][gloss] = len(processed_data['label_map'])
        
        label_id = processed_data['label_map'][gloss]
        
        # process each video instance of this sign
        for instance in instances:
            total_videos += 1
            video_id = instance['video_id']
            video_path = os.path.join(DATASET_PATH, 'videos', f'{video_id}.mp4')
            
            if not os.path.exists(video_path):
                continue
            
            videos_found += 1
            
            # reead video
            cap = cv2.VideoCapture(video_path)
            frames_landmarks = []
            frame_count = 0
            
            # process each frame
            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break
                
                frame_count += 1
                try:
                    landmarks = extract_landmarks(frame)
                    if landmarks is not None:
                        landmarks = normalize_landmarks(landmarks)
                        frames_landmarks.append(landmarks)
                except Exception as e:
                    pass  
            
            cap.release()
            

            if not frames_landmarks:
                continue
            
            videos_with_landmarks += 1
            

            if len(instances) < 10:  # only augment for classes with few examples
                augmented_sequences = augment_sequence(frames_landmarks)
            else:
                augmented_sequences = [frames_landmarks]

            for sequence in augmented_sequences:
                max_seq_length = 30  
                
                if len(sequence) > max_seq_length:
                    sequence = sequence[:max_seq_length]

                elif len(sequence) < max_seq_length:
                    if len(sequence) > 0:
                        last_frame = sequence[-1]
                        padding = [last_frame.copy() for _ in range(max_seq_length - len(sequence))]
                        sequence.extend(padding)
                    else:
                        continue  
                
                processed_data['data'].append(sequence)
                processed_data['labels'].append(label_id)
                videos_processed += 1
                
                # sve intermediate results to prevent data loss
                if videos_processed % 5000 == 0:
                    print(f"Progress: Processed {videos_processed} videos so far")
                    temp_save_path = os.path.join(OUTPUT_PATH, f'processed_wlasl_intermediate_{videos_processed}.pickle')
                    with open(temp_save_path, 'wb') as f:
                        pickle.dump(processed_data, f)
    

    X = np.array(processed_data['data']) if processed_data['data'] else np.array([])
    y = np.array(processed_data['labels']) if processed_data['labels'] else np.array([])
    

    try:
        with open(os.path.join(OUTPUT_PATH, 'processed_wlasl_full.pickle'), 'wb') as f:
            pickle.dump(processed_data, f)
        

        with open(os.path.join(OUTPUT_PATH, 'label_map_full.json'), 'w') as f:
            json.dump(processed_data['label_map'], f, indent=4)
            
        print("Successfully saved processed data and label map")
    except Exception as e:
        print(f"Error saving data: {e}")
        try:
            print("save data chunks")
            with open(os.path.join(OUTPUT_PATH, 'label_map_full.json'), 'w') as f:
                json.dump(processed_data['label_map'], f, indent=4)
                
            chunk_size = 5000
            for i in range(0, len(processed_data['data']), chunk_size):
                chunk_data = {
                    'data': processed_data['data'][i:i+chunk_size],
                    'labels': processed_data['labels'][i:i+chunk_size],
                    'label_map': processed_data['label_map']
                }
                with open(os.path.join(OUTPUT_PATH, f'processed_wlasl_chunk_{i//chunk_size}.pickle'), 'wb') as f:
                    pickle.dump(chunk_data, f)
            print(f"Successfully saved data in {math.ceil(len(processed_data['data'])/chunk_size)} chunks")
        except Exception as e2:
            print(f"Error saving chunked data: {e2}")
    
    return processed_data, X, y


def normalize_landmarks(landmarks):
    min_val = np.min(landmarks)
    max_val = np.max(landmarks)
    if max_val > min_val:
        return (landmarks - min_val) / (max_val - min_val)
    return landmarks


def augment_sequence(sequence):
    augmented = [sequence]  
    

    if len(sequence) >= 10:
        # slow
        slow = []
        for i in range(len(sequence)):
            slow.append(sequence[i])
            # duplicate every third frame to create a slower sequence
            if i % 3 == 0 and i > 0:
                slow.append(sequence[i])
        augmented.append(slow[:30])  
        
        # speed up
        if len(sequence) >= 15:
            fast = sequence[::2]  
            augmented.append(fast)
            
        noise_level = 0.03  # 3% noise
        noisy = []
        for frame in sequence:
            frame_array = np.array(frame)
            noise = np.random.normal(0, noise_level, frame_array.shape)
            noisy_frame = frame_array + noise
            noisy.append(noisy_frame)
        augmented.append(noisy)
    
    return augmented



In [9]:
processed_data = process_wlasl_dataset()

Progress: 0/2000 classes (0.0%)
Progress: Processed 5000 videos so far
Progress: Processed 10000 videos so far
Progress: Processed 15000 videos so far
Progress: Processed 20000 videos so far
Progress: Processed 25000 videos so far
Progress: 1000/2000 classes (50.0%)
Progress: Processed 30000 videos so far
Progress: Processed 35000 videos so far
Progress: Processed 40000 videos so far
Successfully saved processed data and label map


### Training

In [1]:
import os
import pickle
import numpy as np
import json
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

In [2]:
DATA_PATH = 'data/processed_data/processed_wlasl_full.pickle'
MODEL_PATH = 'models/'
LABEL_MAP_PATH = 'data/processed_data/label_map_full.json'


os.makedirs(MODEL_PATH, exist_ok=True)

with open(DATA_PATH, 'rb') as f:
    data = pickle.load(f)

with open(LABEL_MAP_PATH, 'r') as f:
    label_map = json.load(f)


id_to_label = {v: k for k, v in label_map.items()}

X = np.array(data['data'])
y = np.array(data['labels'])

In [3]:
num_samples = X.shape[0]
timesteps = X.shape[1]
features = X.shape[2]

X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

num_classes = len(label_map)

In [4]:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Dropout, Input, Flatten, Conv1D, GlobalMaxPooling1D, BatchNormalization
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras import metrics
import numpy as np

def create_hierarchical_labels(y_train, y_val, n_super_classes=50):
    all_classes = np.unique(np.concatenate([y_train, y_val]))
    n_fine_classes = len(all_classes)
    
    print(f"Total unique classes: {n_fine_classes}")
    print(f"Class range: {all_classes.min()} to {all_classes.max()}")
    
    # Map original labels to continuous [0, n_fine_classes)
    class_mapping = {old_label: new_label for new_label, old_label in enumerate(all_classes)}
    
    y_train_mapped = np.array([class_mapping[label] for label in y_train])
    y_val_mapped = np.array([class_mapping[label] for label in y_val])
    
    # Create super-class mapping (group consecutive fine classes)
    actual_n_super = min(n_super_classes, n_fine_classes)
    classes_per_super = n_fine_classes // actual_n_super
    
    super_mapping = {}
    for fine_idx in range(n_fine_classes):
        super_idx = min(fine_idx // classes_per_super, actual_n_super - 1)
        super_mapping[fine_idx] = super_idx
    
    y_train_super = np.array([super_mapping[label] for label in y_train_mapped])
    y_val_super = np.array([super_mapping[label] for label in y_val_mapped])
    
    print(f"Created {actual_n_super} super-classes from {n_fine_classes} fine classes")
    print(f"Super-class range: {y_train_super.min()} to {y_train_super.max()}")
    print(f"Fine-class range: {y_train_mapped.min()} to {y_train_mapped.max()}")
    
    return (y_train_super, y_val_super, y_train_mapped, y_val_mapped, 
            actual_n_super, n_fine_classes)

def create_model(input_shape, n_super_classes, n_fine_classes):
    inputs = Input(shape=input_shape, name='input_data')
    
    if len(input_shape) == 2:  
        conv1 = Conv1D(64, 3, activation='relu', padding='same')(inputs)
        conv1 = BatchNormalization()(conv1)
        conv1 = Dropout(0.1)(conv1)
        
        conv2 = Conv1D(128, 5, activation='relu', padding='same')(conv1)
        conv2 = BatchNormalization()(conv2)
        conv2 = Dropout(0.1)(conv2)
        
        conv3 = Conv1D(256, 3, activation='relu', padding='same')(conv2)
        conv3 = BatchNormalization()(conv3)
        conv3 = Dropout(0.2)(conv3)
        
        x = GlobalMaxPooling1D()(conv3)
    else:
        x = Flatten()(inputs)
    
    x = Dense(512, activation='relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(0.3)(x)
    
    shared_features = Dense(256, activation='relu')(x)
    shared_features = BatchNormalization()(shared_features)
    shared_features = Dropout(0.2)(shared_features)
    
    super_branch = Dense(128, activation='relu')(shared_features)
    super_branch = BatchNormalization()(super_branch)
    super_branch = Dropout(0.2)(super_branch)
    
    super_output = Dense(n_super_classes, activation='softmax', name='super_output')(super_branch)
    
    fine_input = tf.keras.layers.Concatenate()([shared_features, super_branch])
    
    fine_branch = Dense(512, activation='relu')(fine_input)
    fine_branch = BatchNormalization()(fine_branch)
    fine_branch = Dropout(0.3)(fine_branch)
    
    fine_branch = Dense(256, activation='relu')(fine_branch)
    fine_branch = BatchNormalization()(fine_branch)
    fine_branch = Dropout(0.3)(fine_branch)
    
    fine_output = Dense(n_fine_classes, activation='softmax', name='fine_output')(fine_branch)
    
    model = Model(inputs, [super_output, fine_output], name='hierarchical_classifier')
    
    print(f"Model created: Super({n_super_classes}), Fine({n_fine_classes})")
    return model

def train_hierarchical_model(X_train, y_train, X_val, y_val, n_super_classes=50):

    print("=== DATA PREPROCESSING ===")
    print(f"Training data shape: {X_train.shape}")
    print(f"Training labels shape: {y_train.shape}")
    
    result = create_hierarchical_labels(y_train, y_val, n_super_classes)
    (y_train_super, y_val_super, y_train_fine, y_val_fine, 
     actual_n_super, n_fine_classes) = result
   
    y_train_super = y_train_super.astype(np.int32)
    y_val_super = y_val_super.astype(np.int32)
    y_train_fine = y_train_fine.astype(np.int32)
    y_val_fine = y_val_fine.astype(np.int32)
    
    print(f"Label statistics:")
    print(f"  Super classes: {len(np.unique(y_train_super))} unique")
    print(f"  Fine classes: {len(np.unique(y_train_fine))} unique")

    model = create_model(X_train.shape[1:], actual_n_super, n_fine_classes)
    
    model.compile(
        optimizer=Adam(learning_rate=0.001),
        loss=['sparse_categorical_crossentropy', 'sparse_categorical_crossentropy'],
        loss_weights=[0.3, 0.7],
        metrics=[
            ['accuracy'],
            ['accuracy', metrics.SparseTopKCategoricalAccuracy(k=5, name='top_5_accuracy')]
        ]
    )

    model.summary()
    
    train_labels = [y_train_super, y_train_fine]
    val_labels = [y_val_super, y_val_fine]
    
    print("\n=== FULL TRAINING ===")
    callbacks = [
        EarlyStopping(
            monitor='val_fine_output_accuracy', 
            patience=20, 
            restore_best_weights=True, 
            mode='max',
            verbose=1
        ),
        ReduceLROnPlateau(
            monitor='val_fine_output_loss', 
            patience=10, 
            factor=0.3, 
            min_lr=1e-6,
            mode='min',
            verbose=1
        )
    ]
    
    history = model.fit(
        X_train, train_labels,
        validation_data=(X_val, val_labels),
        epochs=200,
        batch_size=32,
        callbacks=callbacks,
        verbose=1
    )
    
    print("\n=== FINAL EVALUATION ===")
    final_metrics = model.evaluate(X_val, val_labels, verbose=1)
    
    print("\n=== RESULTS ===")
    metric_names = model.metrics_names
    for name, value in zip(metric_names, final_metrics):
        if 'accuracy' in name:
            print(f"{name}: {value:.4f} ({value*100:.2f}%)")
        else:
            print(f"{name}: {value:.4f}")
    
    return model, history


def run_hierarchical_classification(X_train, y_train, X_val, y_val, n_super_classes=50):
    if X_train.max() > 1.0:
        print("Normalizing input data...")
        X_train = X_train / X_train.max()
        X_val = X_val / X_val.max()
    
    model, history = train_hierarchical_model(
        X_train, y_train, X_val, y_val, n_super_classes
    )
    
    if model is not None:
        model.save('hierarchical_model.keras')
        print("✓ Model saved as 'hierarchical_model.keras'")
    
    return model, history


=== IMPROVED HIERARCHICAL CLASSIFICATION ===


In [5]:
model, history = run_hierarchical_classification(X_train, y_train, X_val, y_val)

=== IMPROVED HIERARCHICAL CLASSIFICATION ===
Normalizing input data...
=== DATA PREPROCESSING ===
Training data shape: (35552, 30, 126)
Training labels shape: (35552,)
Total unique classes: 2000
Class range: 0 to 1999
Created 50 super-classes from 2000 fine classes
Super-class range: 0 to 49
Fine-class range: 0 to 1999
Label statistics:
  Super classes: 50 unique
  Fine classes: 2000 unique
Model created: Super(50), Fine(2000)

=== MODEL ARCHITECTURE ===



=== SMALL BATCH TEST ===
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 77ms/step - fine_output_accuracy: 0.0000e+00 - fine_output_loss: 5.4893 - fine_output_top_5_accuracy: 0.0000e+00 - loss: 6.8308 - super_output_accuracy: 0.0000e+00 - super_output_loss: 1.3415
✓ Small batch test PASSED!

=== FULL TRAINING ===
Epoch 1/200
[1m1111/1111[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m89s[0m 73ms/step - fine_output_accuracy: 0.0017 - fine_output_loss: 5.2497 - fine_output_top_5_accuracy: 0.0083 - loss: 6.5321 - super_output_accuracy: 0.0237 - super_output_loss: 1.2825 - val_fine_output_accuracy: 0.0041 - val_fine_output_loss: 4.8857 - val_fine_output_top_5_accuracy: 0.0178 - val_loss: 6.0700 - val_super_output_accuracy: 0.0299 - val_super_output_loss: 1.1844 - learning_rate: 0.0010
Epoch 2/200
[1m1111/1111[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m67s[0m 60ms/step - fine_output_accuracy: 0.0051 - fine_output_loss: 4.7185 - fine_output_top_5_accuracy: 0.0277 - lo

### Real-time Detection

In [24]:
import cv2
import numpy as np
import mediapipe as mp
import tensorflow as tf
import json
from collections import deque

In [25]:
MODEL_PATH = 'hierarchical_model.keras'
model = tf.keras.models.load_model(MODEL_PATH)

LABEL_MAP_PATH = 'data/processed_data/label_map_full.json'
with open(LABEL_MAP_PATH, 'r') as f:
    label_map = json.load(f)  
    
reverse_mapping = {v: k for k, v in label_map.items()}


In [26]:
mp_hands = mp.solutions.hands
mp_drawing = mp.solutions.drawing_utils
hands = mp_hands.Hands(static_image_mode=False, 
                       max_num_hands=2, 
                       min_detection_confidence=0.5,
                       min_tracking_confidence=0.5)

def extract_landmarks(frame):
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    results = hands.process(rgb_frame)
    
    if results.multi_hand_landmarks:
        for hand_landmarks in results.multi_hand_landmarks:
            mp_drawing.draw_landmarks(
                frame,
                hand_landmarks,
                mp_hands.HAND_CONNECTIONS
            )

    landmarks = []

    if results.multi_hand_landmarks:
        for hand_landmarks in results.multi_hand_landmarks:
            for lm in hand_landmarks.landmark:
                landmarks.extend([lm.x, lm.y, lm.z])
    
    while len(landmarks) < 21 * 3 * 2:  # Max 2 hands
        landmarks.append(0.0)
    
    return landmarks[:21 * 3 * 2]  

In [27]:
def realtime_detection():
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("Error: Could not open webcam.")
        return
    
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
    
    sequence_length = 30  
    landmarks_buffer = deque(maxlen=sequence_length)
    
    prediction_frequency = 10 
    frame_counter = 0
    current_prediction = "No prediction yet"
    confidence = 0.0
    
    print("Starting webcam detection. Press 'q' in the OpenCV window to quit.")
    
    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                print("Error: Could not read frame.")
                break
            
            frame = cv2.flip(frame, 1)
            
            landmarks = extract_landmarks(frame)
            landmarks_buffer.append(landmarks)
            
            frame_counter += 1
            
            if frame_counter >= prediction_frequency and len(landmarks_buffer) == sequence_length:
                input_sequence = np.array([list(landmarks_buffer)])
                
                super_output, fine_output = model.predict(input_sequence, verbose=0)
         
                fine_prediction = fine_output[0]  
                
                predicted_class_id = np.argmax(fine_prediction)
                confidence = fine_prediction[predicted_class_id] * 100

                if predicted_class_id in reverse_mapping and confidence > 50:
                    current_prediction = reverse_mapping[predicted_class_id]
                else:
                    current_prediction = "Unknown sign"
                
                frame_counter = 0

            # Show prediction on screen
            text = f"{current_prediction} ({confidence:.1f}%)"
            cv2.putText(frame, text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 255, 0), 2)
            cv2.putText(frame, "Press 'q' to quit", (10, frame.shape[0] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
            cv2.imshow('Sign Language Detection', frame)

            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
    
    finally:
        cap.release()
        cv2.destroyAllWindows()
        print("Webcam detection stopped.")


In [28]:
realtime_detection()

Starting webcam detection. Press 'q' in the OpenCV window to quit.
Webcam detection stopped.
