### Import Libraries
This section imports all necessary Python libraries for image processing, model building, feature extraction (EfficientNet, Zernike), and training the model.


In [None]:
import os
import numpy as np
import random
from PIL import Image
import pickle
import matplotlib.pyplot as plt
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Dense, Dropout, Concatenate
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.applications import EfficientNetB0
from tensorflow.keras.applications.efficientnet import preprocess_input
from tensorflow.keras.preprocessing import image
from skimage.io import imread
from skimage.color import rgb2gray
import mahotas



### Define Feature Extractors
This part loads EfficientNetB0 (without top layers) to extract features from RGB hand images and defines Zernike moment extraction using Mahotas from binary hand shape diagrams.


In [None]:

base_model = EfficientNetB0(include_top=False, input_shape=(224, 224, 3), pooling='avg')
eff_model = Model(inputs=base_model.input, outputs=base_model.output)
FEATURE_SIZE = base_model.output_shape[-1]  

ZERN_ORDER = 8
ZERN_RADIUS = 200  
ZERN_FEATURE_SIZE = len(mahotas.features.zernike_moments(np.ones((400, 400), dtype=bool), radius=ZERN_RADIUS, degree=ZERN_ORDER))

def extract_eff_features(img_path):
    try:
        img = Image.open(img_path).convert('RGB').resize((224, 224))
        x = image.img_to_array(img)
        x = np.expand_dims(x, axis=0)
        x = preprocess_input(x)
        features = eff_model.predict(x, verbose=0)
        return features.flatten()
    except Exception as e:
        print(f"Error processing {img_path}: {e}")
        return np.zeros(FEATURE_SIZE)

def extract_zernike_features(img_path):
    try:
        img = imread(img_path)
        img_gray = rgb2gray(img)

        if img_gray.shape != (400, 400):
            print(f"Warning: {img_path} is not 400x400, found {img_gray.shape}")

        
        binarized = img_gray > img_gray.mean()

        features = mahotas.features.zernike_moments(binarized, radius=ZERN_RADIUS, degree=ZERN_ORDER)
        return np.array(features[:ZERN_FEATURE_SIZE])
    except Exception as e:
        print(f"Zernike error processing {img_path}: {e}")
        return np.zeros(ZERN_FEATURE_SIZE)

### Feature Extraction Functions
Defines two functions: one for extracting EfficientNet features and another for computing Zernike moments from grayscale, binarized shape images.


In [None]:
def process_frame_combined(frame_dir):

    left_eff = np.zeros(FEATURE_SIZE)
    right_eff = np.zeros(FEATURE_SIZE)


    left_zern = [np.zeros(ZERN_FEATURE_SIZE)] * 6
    right_zern = [np.zeros(ZERN_FEATURE_SIZE)] * 6


    left_hand_dir = os.path.join(frame_dir, 'left_hand')
    if os.path.exists(left_hand_dir):

        left_img = next((f for f in os.listdir(left_hand_dir) if f.endswith('.png') and not f.startswith('point')), None)
        if left_img:
            left_eff = extract_eff_features(os.path.join(left_hand_dir, left_img))


        results_dir = os.path.join(left_hand_dir, 'results')
        if os.path.exists(results_dir):
            diagram_files = sorted([f for f in os.listdir(results_dir) if f.startswith('results-') and f.endswith('.png')])[:6]
            left_zern = [extract_zernike_features(os.path.join(results_dir, f)) for f in diagram_files]
            # Padding if fewer than 6
            left_zern += [np.zeros(ZERN_FEATURE_SIZE)] * (6 - len(left_zern))

 
    right_hand_dir = os.path.join(frame_dir, 'right_hand')
    if os.path.exists(right_hand_dir):

        right_img = next((f for f in os.listdir(right_hand_dir) if f.endswith('.png') and not f.startswith('point')), None)
        if right_img:
            right_eff = extract_eff_features(os.path.join(right_hand_dir, right_img))

        results_dir = os.path.join(right_hand_dir, 'results')
        if os.path.exists(results_dir):
            diagram_files = sorted([f for f in os.listdir(results_dir) if f.startswith('results-') and f.endswith('.png')])[:6]
            right_zern = [extract_zernike_features(os.path.join(results_dir, f)) for f in diagram_files]
            # Padding if fewer than 6
            right_zern += [np.zeros(ZERN_FEATURE_SIZE)] * (6 - len(right_zern))

    combined_eff = np.concatenate([left_eff, right_eff], axis=0)
    combined_zern = np.array(left_zern + right_zern)

    return combined_eff, combined_zern


### Build Sequences from Dataset
Traverses video directories to extract frame-wise feature sequences for each sign class, returning lists of EfficientNet features, Zernike sequences, and their corresponding labels.


In [None]:
def build_sequences_combined(data_path, label_map=None, limit_labels=39):
    # Output lists
    eff_seqs, zern_seqs, labels = [], [], []

    new_label_map = label_map is None
    label_map = label_map or {}
    label_counter = max(label_map.values(), default=-1) + 1

    for label in sorted(os.listdir(data_path))[:limit_labels]:
        label_dir = os.path.join(data_path, label)
        if not os.path.isdir(label_dir): continue

        if label not in label_map:
            label_map[label] = label_counter
            label_counter += 1

        for video in sorted(os.listdir(label_dir)):
            video_dir = os.path.join(label_dir, video)
            if not os.path.isdir(video_dir): continue

            print(f"Processing label: {label} | video: {video}")

            eff_seq, zern_seq = [], []

            for frame in sorted(os.listdir(video_dir)):
                frame_dir = os.path.join(video_dir, frame)
                if os.path.isdir(frame_dir):
                    eff, zern = process_frame_combined(frame_dir)
                    eff_seq.append(eff)
                    zern_seq.append(zern)

            if eff_seq:
                eff_seqs.append(eff_seq)
                zern_seqs.append(zern_seq)
                labels.append(label_map[label])

    outputs = (eff_seqs, zern_seqs, labels)
    return (*outputs, label_map) if new_label_map else outputs


### Load or Cache Training Data
Attempts to load preprocessed training data from disk. If not found, it builds them using the previous function and caches the result as a `.pkl` file.


In [None]:
train_path = 'F:/letters/train'
train_cache = 'train_data_dual_stream_letters_10_zern8.pkl'

if os.path.exists(train_cache):
    with open(train_cache, 'rb') as f:
        train_eff, train_zern, train_labels, label_map = pickle.load(f)
    print("✓ Loaded cached combined training data.")
else:
    train_eff, train_zern, train_labels, label_map = build_sequences_combined(train_path)
    with open(train_cache, 'wb') as f:
        pickle.dump((train_eff, train_zern, train_labels, label_map), f)
    print("✓ Combined training data preprocessed and cached.")


### Load or Cache Test Data
Same as above, but for test data. Ensures testing features and labels are ready.


In [None]:
test_path = 'F:/letters/test'
test_cache = 'test_data_dual_stream_letters_10_zern8.pkl'

if os.path.exists(test_cache):
    with open(test_cache, 'rb') as f:
        test_eff, test_zern, test_labels = pickle.load(f)
    print("✓ Loaded cached combined test data.")
else:
    test_eff, test_zern, test_labels, _ = build_sequences_combined(test_path)
    with open(test_cache, 'wb') as f:
        pickle.dump((test_eff, test_zern, test_labels), f)
    print("✓ Combined test data preprocessed and cached.")


### Preprocess Feature Sequences
Pads both EfficientNet and Zernike sequences to the maximum sequence length and converts labels to one-hot encoding for training.


In [None]:
import pickle
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.sequence import pad_sequences
import numpy as np


with open('train_data_dual_stream_letters_10_zern8.pkl', 'rb') as f:
    train_eff, train_zern, train_labels, label_map = pickle.load(f)


with open('test_data_dual_stream_letters_10_zern8.pkl', 'rb') as f:
    test_eff, test_zern, test_labels = pickle.load(f)


max_seq_len = 10  

def pad(data): return pad_sequences(data, maxlen=max_seq_len, dtype='float32', padding='post', truncating='post')

X_train_eff = pad(train_eff)
X_test_eff  = pad(test_eff)

# Option 1: Keep Zernike features as 3D (T, 12, ZERN_FEATURE_SIZE)
X_train_zern = pad(train_zern)
X_test_zern  = pad(test_zern)

# === One-hot encode labels ===
num_classes = len(label_map)
y_train = to_categorical(train_labels, num_classes=num_classes)
y_test  = to_categorical(test_labels, num_classes=num_classes)


### Define Custom F1 Score Metric
Implements a custom Keras metric to calculate F1 Score during training using internal precision and recall metrics.


In [None]:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.metrics import Precision, Recall, AUC, TopKCategoricalAccuracy
import matplotlib.pyplot as plt
import tensorflow.keras.backend as K


class F1Score(tf.keras.metrics.Metric):
    def __init__(self, name='f1_score', **kwargs):
        super().__init__(name=name, **kwargs)
        self.precision = Precision()
        self.recall = Recall()

    def update_state(self, y_true, y_pred, sample_weight=None):
        self.precision.update_state(y_true, y_pred, sample_weight)
        self.recall.update_state(y_true, y_pred, sample_weight)

    def result(self):
        p = self.precision.result()
        r = self.recall.result()
        return 2 * ((p * r) / (p + r + K.epsilon()))

    def reset_states(self):
        self.precision.reset_states()
        self.recall.reset_states()


### Build Dual-Stream Model
Defines a dual-stream model: one for EfficientNet features processed with Multi-Head Attention, and another for Zernike features processed with LSTM. The two are fused via attention and residual connection before final classification.


In [None]:
from tensorflow.keras.layers import (
    Input, Dense, Dropout, Bidirectional, LSTM, LayerNormalization,
    MultiHeadAttention, GlobalAveragePooling1D, Concatenate, TimeDistributed, Flatten, Add
)
from tensorflow.keras.models import Model


EFF_DIM = 2 * FEATURE_SIZE           
ZERN_SEQ_DIM = 12                    
ZERN_FEAT_DIM = ZERN_FEATURE_SIZE   
SEQ_LEN = 10                       
NUM_CLASSES = len(label_map)         
FUSION_DIM = 64                     


eff_input = Input(shape=(SEQ_LEN, EFF_DIM), name='eff_input')
x_eff = Dense(FUSION_DIM)(eff_input)  
x_eff = LayerNormalization()(x_eff)
x_eff = MultiHeadAttention(num_heads=4, key_dim=FUSION_DIM)(x_eff, x_eff)
x_eff = Dropout(0.3)(x_eff)
x_eff = Dense(FUSION_DIM, activation='relu')(x_eff)

zern_input = Input(shape=(SEQ_LEN, ZERN_SEQ_DIM, ZERN_FEAT_DIM), name='zern_input')
zern_flat = TimeDistributed(Flatten())(zern_input)  
zern_proj = Dense(FUSION_DIM)(zern_flat) 
zern_lstm = Bidirectional(LSTM(FUSION_DIM, return_sequences=True))(zern_proj)
zern_lstm = Dropout(0.3)(zern_lstm)


attended = MultiHeadAttention(num_heads=4, key_dim=FUSION_DIM)(
    query=x_eff,
    key=zern_lstm,
    value=zern_lstm
)


fused = Add()([x_eff, attended])
fused = LayerNormalization()(fused)


fused_pooled = GlobalAveragePooling1D()(fused)
x = Dropout(0.4)(fused_pooled)
x = Dense(128, activation='relu')(x)
x = Dropout(0.3)(x)
x = Dense(64, activation='relu')(x)
output = Dense(NUM_CLASSES, activation='softmax')(x)


model = Model(inputs=[eff_input, zern_input], outputs=output)

model.compile(
    optimizer='adam',
    loss='categorical_crossentropy',
    metrics=[
        'accuracy',
        Precision(name='precision'),
        F1Score(name='f1_score')  
    ]
)


### Compile and Train Model
Compiles the model using categorical crossentropy, accuracy, precision, and F1 metrics, and trains using early stopping.


In [None]:
from tensorflow.keras.callbacks import EarlyStopping


early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=10,
    restore_best_weights=True,
    verbose=1
)

history = model.fit(
    x={'eff_input': X_train_eff, 'zern_input': X_train_zern},
    y=y_train,
    validation_data=(
        {'eff_input': X_test_eff, 'zern_input': X_test_zern},
        y_test
    ),
    epochs=200,
    batch_size=128,
    callbacks=[early_stopping]
)



### Visualize Training Metrics
Plots loss, accuracy, precision, and F1 score over training epochs for both training and validation datasets using Matplotlib and Seaborn.


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

# === Configure Seaborn Theme ===
sns.set_theme(style="whitegrid")

# === Function to Plot Metrics ===
def plot_metric(history, metric, title=None, ylabel=None):
    train = history.history.get(metric)
    val = history.history.get(f'val_{metric}')
    
    if train is None or val is None:
        return  # Skip if metric not found

    epochs = range(1, len(train) + 1)
    
    # Custom colors like the reference image
    train_color = '#1f77b4'  # Blue
    val_color = '#ff5733'    # Red/Orange

    plt.figure(figsize=(10, 6))
    plt.plot(epochs, train, label="Train", color=train_color, linewidth=2)
    plt.plot(epochs, val, label="Test", color=val_color, linewidth=2)


    # Labels and title
    plt.title(title or f"{metric.capitalize()} Over Epochs", fontsize=16)
    plt.xlabel("Epoch", fontsize=14)
    plt.ylabel(ylabel or metric.capitalize(), fontsize=14)
    plt.legend(fontsize=12)
    plt.grid(True, linestyle='--', alpha=0.6)
    plt.tight_layout()
    plt.show()

# === Plotting ===
plot_metric(history, 'loss', title="Training vs Validation Loss", ylabel="Loss")
plot_metric(history, 'accuracy', title="Training vs Validation Accuracy", ylabel="Accuracy")

# Optional: Add these only if these metrics exist in your model
for metric in ['precision', 'f1_score']:
    plot_metric(history, metric, title=f"Training vs Validation {metric.capitalize()}", ylabel=metric.capitalize())


In [None]:

# Save model
model.save('model_dual_stream_letters.h5')