In [14]:
import tensorflow as tf
import cv2
import mediapipe as mp
import numpy as np
print("TensorFlow version:", tf.__version__)
print("OpenCV version:", cv2.__version__)

TensorFlow version: 2.12.0
OpenCV version: 4.11.0


In [15]:
import os
import random

# 📂 Path to your ISLVT videos folder
DATASET_DIR = r"C:\Users\param\OneDrive\Desktop\Main\Coding\1M1B\Workplace\credentials\model\videos"  # <-- Change to your WLASL folder

# Find all MP4 files
all_videos = []
for root, _, files in os.walk(DATASET_DIR):
    for file in files:
        if file.endswith(".MOV"):
            all_videos.append(os.path.join(root, file))

print(f"📊 Found {len(all_videos)} total videos.")

# 🎯 Randomly pick 500
sample_size = 500 if len(all_videos) >= 500 else len(all_videos)
sampled_videos = random.sample(all_videos, sample_size)

# Save the selected paths to a text file for later use
with open("selected_videos.txt", "w") as f:
    for video in sampled_videos:
        f.write(video + "\n")

print(f"✅ Saved {sample_size} sampled video paths to selected_videos.txt")


📊 Found 152 total videos.
✅ Saved 152 sampled video paths to selected_videos.txt


In [16]:
# MediaPipe hands setup
mp_hands = mp.solutions.hands
hands = mp_hands.Hands(
    static_image_mode=False,
    max_num_hands=2,
    min_detection_confidence=0.7,
    min_tracking_confidence=0.5
)

# Where to save processed landmark data
OUTPUT_DIR = "processed_landmarks"
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Read sampled videos
with open("selected_videos.txt", "r") as f:
    video_paths = [line.strip() for line in f.readlines()]

# Process each video
processed_count = 0
failed_count = 0

for video_path in video_paths:
    try:
        cap = cv2.VideoCapture(video_path)
        video_name = os.path.splitext(os.path.basename(video_path))[0]
        landmarks_all_frames = []
        
        frame_count = 0
        while True:
            ret, frame = cap.read()
            if not ret:
                break
                
            frame_count += 1
            
            # Convert BGR to RGB
            rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            results = hands.process(rgb_frame)
            
            # Initialize frame landmarks with zeros (consistent shape: 126 values)
            frame_landmarks = [0.0] * 126  # 2 hands × 21 landmarks × 3 coords
            
            if results.multi_hand_landmarks:
                # Process detected hands
                for hand_idx, hand_landmarks in enumerate(results.multi_hand_landmarks):
                    if hand_idx >= 2:  # Only process first 2 hands
                        break
                        
                    # Extract coordinates for this hand
                    hand_coords = []
                    for lm in hand_landmarks.landmark:
                        hand_coords.extend([lm.x, lm.y, lm.z])
                    
                    # Place hand data in the correct position
                    start_idx = hand_idx * 63  # 63 values per hand
                    frame_landmarks[start_idx:start_idx + 63] = hand_coords
            
            landmarks_all_frames.append(frame_landmarks)
        
        cap.release()
        
        # Convert to NumPy array (should work now - consistent shapes!)
        landmarks_array = np.array(landmarks_all_frames)
        
        # Only save if we got some frames
        if landmarks_array.shape[0] > 0:
            np.save(os.path.join(OUTPUT_DIR, f"{video_name}_landmarks.npy"), landmarks_array)
            print(f"✅ Saved landmarks for {video_name} - Shape: {landmarks_array.shape}")
            processed_count += 1
        else:
            print(f"⚠️ Skipped {video_name} - No frames processed")
            failed_count += 1
            
    except Exception as e:
        print(f"❌ Error processing {video_path}: {e}")
        failed_count += 1
        continue

print(f"\n🎯 Processing Complete!")
print(f"✅ Successfully processed: {processed_count} videos")
print(f"❌ Failed/Skipped: {failed_count} videos")

# Test load one file to verify
if processed_count > 0:
    test_files = [f for f in os.listdir(OUTPUT_DIR) if f.endswith('.npy')]
    if test_files:
        test_data = np.load(os.path.join(OUTPUT_DIR, test_files[0]))
        print(f"\n🧪 Test load successful! Sample shape: {test_data.shape}")
        print(f"   - Frames: {test_data.shape[0]}")
        print(f"   - Features per frame: {test_data.shape[1]} (should be 126)")
else:
    print("❌ No files were processed successfully")

✅ Saved landmarks for 75 sleep well  (2) - Shape: (105, 126)
✅ Saved landmarks for 33 pune cold  - Shape: (138, 126)
✅ Saved landmarks for 71 birds sky fly   - Shape: (267, 126)
✅ Saved landmarks for 25 raju first come tanu after - Shape: (270, 126)
✅ Saved landmarks for 60 shoes dirty  (2) - Shape: (132, 126)
✅ Saved landmarks for 70 dog  pet animal  - Shape: (165, 126)
✅ Saved landmarks for 70 dog  pet animal  (2) - Shape: (171, 126)
✅ Saved landmarks for 11 grand mother market go  - Shape: (219, 126)
✅ Saved landmarks for 3 family big  (2) - Shape: (129, 126)
✅ Saved landmarks for 39 egg white  (2) - Shape: (147, 126)
✅ Saved landmarks for 6 girl hungry  - Shape: (189, 126)
✅ Saved landmarks for 22 we this week holiday  - Shape: (174, 126)
✅ Saved landmarks for 76 his hair  comb will  - Shape: (246, 126)
✅ Saved landmarks for 41 dress color green  (2) - Shape: (141, 126)
✅ Saved landmarks for 69 Cat  apet animal  - Shape: (174, 126)
✅ Saved landmarks for 44 i black coffe like  (2) -

In [3]:
import os
import json
import numpy as np
import re
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, BatchNormalization
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import pickle
from collections import Counter

class ISLVTModelTrainer:
    def __init__(self, landmarks_dir="processed_landmarks"):
        self.landmarks_dir = landmarks_dir
        self.label_encoder = LabelEncoder()
        
    def extract_label_from_filename(self, filename):
        """
        Extract sentence label from ISLVT filename
        Format: "number sentence text _landmarks.npy"
        Example: "21 we night go out _landmarks.npy" -> "we night go out"
        """
        # Remove file extension and landmarks suffix
        basename = filename.replace('_landmarks.npy', '').strip()
        
        # Split by spaces
        parts = basename.split()
        
        if len(parts) > 1:
            # First part is usually a number, rest is the sentence
            # Check if first part is a number
            try:
                int(parts[0])  # If this works, first part is a number
                sentence = ' '.join(parts[1:])  # Join the rest as sentence
                return sentence.lower().strip()
            except ValueError:
                # First part is not a number, use entire string
                return basename.lower().strip()
        else:
            # Single word or part
            return basename.lower().strip()
    
    def load_processed_data(self):
        """Load all processed landmark files and create dataset"""
        print("📁 Loading processed ISLVT landmark data...")
        
        X_data = []
        y_labels = []
        loaded_count = 0
        skipped_count = 0
        filename_to_label = {}
        
        # Get all .npy files
        npy_files = [f for f in os.listdir(self.landmarks_dir) if f.endswith('_landmarks.npy')]
        
        if len(npy_files) == 0:
            raise ValueError("No landmark files found! Check your processed_landmarks directory.")
        
        print(f"Found {len(npy_files)} landmark files to process...")
        
        for npy_file in npy_files:
            try:
                # Extract label from filename
                label = self.extract_label_from_filename(npy_file)
                filename_to_label[npy_file] = label
                
                # Load landmark data
                landmarks = np.load(os.path.join(self.landmarks_dir, npy_file))
                
                # Skip if too few frames
                if landmarks.shape[0] < 10:
                    print(f"⚠️ Skipping {npy_file} - only {landmarks.shape[0]} frames")
                    skipped_count += 1
                    continue
                
                # Pad or truncate to fixed length
                max_frames = 30  # Fixed sequence length
                if landmarks.shape[0] > max_frames:
                    landmarks = landmarks[:max_frames]
                else:
                    # Pad with zeros
                    padding = np.zeros((max_frames - landmarks.shape[0], 126))
                    landmarks = np.vstack([landmarks, padding])
                
                X_data.append(landmarks)
                y_labels.append(label)
                loaded_count += 1
                
                if loaded_count <= 5:  # Show first few for verification
                    print(f"✅ {npy_file} -> '{label}' (shape: {landmarks.shape})")
                
            except Exception as e:
                print(f"❌ Error loading {npy_file}: {e}")
                skipped_count += 1
        
        print(f"\n📊 Data Loading Summary:")
        print(f"✅ Successfully loaded: {loaded_count} sequences")
        print(f"⚠️ Skipped: {skipped_count} files")
        
        # Show label distribution
        label_counts = Counter(y_labels)
        print(f"\n🏷️ Found {len(label_counts)} unique signs:")
        for label, count in label_counts.most_common(10):
            print(f"   {label}: {count} videos")
        if len(label_counts) > 10:
            print(f"   ... and {len(label_counts) - 10} more")
        
        if loaded_count == 0:
            raise ValueError("No data loaded! Check your landmark files and filename patterns.")
        
        # Filter out classes with too few samples - ADJUSTED FOR SINGLE SAMPLES
        min_samples = 1  # Allow single samples since ISLVT has unique sentences
        filtered_X = []
        filtered_y = []
        
        # For sentence-level data with single samples, we'll use all data
        # but warn about potential overfitting
        if max(label_counts.values()) == 1:
            print(f"\n⚠️  WARNING: All sentences appear only once!")
            print(f"   This means each sentence is unique - typical for sentence-level ISLVT")
            print(f"   The model will learn individual sentences, not generalize to new ones")
            print(f"   Consider training on individual words instead for better generalization")
            
            # Use all data
            filtered_X = X_data
            filtered_y = y_labels
        else:
            # Filter normally
            for label, count in label_counts.items():
                if count >= min_samples:
                    indices = [i for i, y in enumerate(y_labels) if y == label]
                    for idx in indices:
                        filtered_X.append(X_data[idx])
                        filtered_y.append(y_labels[idx])
        
        print(f"\n🔍 After filtering (min {min_samples} samples per class):")
        print(f"   Final dataset: {len(filtered_y)} sequences")
        print(f"   Unique classes: {len(set(filtered_y))}")
        
        if len(filtered_y) == 0:
            raise ValueError("No data remaining after filtering! All classes had too few samples.")
        
        return np.array(filtered_X), np.array(filtered_y)
    
    def create_lstm_model(self, input_shape, num_classes):
        """Create LSTM model for sign language recognition"""
        model = Sequential([
            # First LSTM layer
            LSTM(128, return_sequences=True, input_shape=input_shape),
            BatchNormalization(),
            Dropout(0.3),
            
            # Second LSTM layer
            LSTM(64, return_sequences=True),
            BatchNormalization(),
            Dropout(0.3),
            
            # Third LSTM layer
            LSTM(32),
            BatchNormalization(),
            Dropout(0.4),
            
            # Dense layers
            Dense(64, activation='relu'),
            Dropout(0.5),
            Dense(32, activation='relu'),
            Dense(num_classes, activation='softmax')
        ])
        
        model.compile(
            optimizer=Adam(learning_rate=0.001),
            loss='sparse_categorical_crossentropy',
            metrics=['accuracy']
        )
        
        return model
    
    def train_model(self, epochs=50):
        """Train the LSTM model"""
        print("🚀 Starting ISLVT LSTM model training...")
        
        # Load data
        X, y = self.load_processed_data()
        
        print(f"\n📊 Final Dataset Shape: {X.shape}")
        print(f"🏷️ Unique Signs: {len(np.unique(y))}")
        
        # Encode labels
        y_encoded = self.label_encoder.fit_transform(y)
        
        # Split data - handle single-sample case
        if len(set(y)) == len(y):
            # Each sample is unique - can't stratify
            print("⚠️  Using random split (no stratification possible with unique samples)")
            X_train, X_test, y_train, y_test = train_test_split(
                X, y_encoded, test_size=0.2, random_state=42
            )
        else:
            # Normal stratified split
            X_train, X_test, y_train, y_test = train_test_split(
                X, y_encoded, test_size=0.2, random_state=42, stratify=y_encoded
            )
        
        print(f"\n🎯 Training samples: {len(X_train)}")
        print(f"🧪 Testing samples: {len(X_test)}")
        
        # Create model
        model = self.create_lstm_model(
            input_shape=(X.shape[1], X.shape[2]),  # (frames, features)
            num_classes=len(np.unique(y_encoded))
        )
        
        print("\n🧠 Model Architecture:")
        model.summary()
        
        # Callbacks
        callbacks = [
            EarlyStopping(patience=10, restore_best_weights=True, verbose=1),
            ReduceLROnPlateau(factor=0.5, patience=5, min_lr=1e-6, verbose=1)
        ]
        
        # Train model
        print("\n🏋️ Starting training...")
        history = model.fit(
            X_train, y_train,
            validation_data=(X_test, y_test),
            epochs=epochs,
            batch_size=16,  # Smaller batch size for better training
            callbacks=callbacks,
            verbose=1
        )
        
        # Evaluate
        test_loss, test_accuracy = model.evaluate(X_test, y_test, verbose=0)
        
        print(f"\n🎉 Training Complete!")
        print(f"📊 Test Accuracy: {test_accuracy:.4f}")
        print(f"📉 Test Loss: {test_loss:.4f}")
        
        # Show some predictions
        print("\n🔮 Sample Predictions:")
        predictions = model.predict(X_test[:5])
        for i in range(min(5, len(X_test))):
            predicted_idx = np.argmax(predictions[i])
            actual_idx = y_test[i]
            predicted_word = self.label_encoder.classes_[predicted_idx]
            actual_word = self.label_encoder.classes_[actual_idx]
            confidence = predictions[i][predicted_idx]
            
            result = "✅" if predicted_word == actual_word else "❌"
            print(f"   {result} Predicted: '{predicted_word}' | Actual: '{actual_word}' | Confidence: {confidence:.3f}")
        
        return model, history, test_accuracy
    
    def save_model_and_labels(self, model):
        """Save trained model and label mappings"""
        print("\n💾 Saving model and labels...")
        
        # Create models directory
        os.makedirs("models", exist_ok=True)
        
        # Save model
        model_path = "models/islvt_model.h5"
        model.save(model_path)
        print(f"✅ Model saved: {model_path}")
        
        # Create labels dictionary
        labels_dict = {}
        for i, label in enumerate(self.label_encoder.classes_):
            labels_dict[str(i)] = label
        
        # Save labels
        labels_path = "models/islvt_labels.json"
        with open(labels_path, 'w') as f:
            json.dump(labels_dict, f, indent=2)
        print(f"✅ Labels saved: {labels_path}")
        
        # Save label encoder
        encoder_path = "models/islvt_encoder.pkl"
        with open(encoder_path, 'wb') as f:
            pickle.dump(self.label_encoder, f)
        print(f"✅ Label encoder saved: {encoder_path}")
        
        # Create summary
        summary = {
            "dataset": "ISLVT",
            "model_info": {
                "num_classes": len(self.label_encoder.classes_),
                "input_shape": [30, 126],  # frames, features
                "architecture": "LSTM",
                "framework": "TensorFlow/Keras"
            },
            "classes": list(self.label_encoder.classes_),
            "files": {
                "model": model_path,
                "labels": labels_path,
                "encoder": encoder_path
            }
        }
        
        summary_path = "models/islvt_summary.json"
        with open(summary_path, 'w') as f:
            json.dump(summary, f, indent=2)
        print(f"✅ Summary saved: {summary_path}")
        
        return model_path, labels_path

def main():
    """Main training function"""
    print("🤟 ISLVT LSTM Model Training")
    print("=" * 50)
    
    # Check if required files exist
    if not os.path.exists("processed_landmarks"):
        print("❌ processed_landmarks directory not found!")
        print("Make sure you've run the landmark processing step first.")
        return
    
    # Initialize trainer
    trainer = ISLVTModelTrainer()
    
    try:
        # Train model
        model, history, accuracy = trainer.train_model(epochs=100)
        
        # Save model and labels
        model_path, labels_path = trainer.save_model_and_labels(model)
        
        print(f"\n🎉 SUCCESS!")
        print(f"🤖 Model trained with {accuracy:.4f} accuracy")
        print(f"📁 Files created:")
        print(f"   - {model_path}")
        print(f"   - {labels_path}")
        print(f"   - models/islvt_encoder.pkl")
        print(f"   - models/islvt_summary.json")
        
        print(f"\n🚀 Ready to use in your Flask app!")
        print(f"Update your .env file:")
        print(f"WLASL_MODEL_PATH=./models/islvt_model.h5")
        print(f"WLASL_LABELS_PATH=./models/islvt_labels.json")
        
    except Exception as e:
        print(f"❌ Training failed: {e}")
        import traceback
        traceback.print_exc()

if __name__ == "__main__":
    main()

🤟 ISLVT LSTM Model Training
🚀 Starting ISLVT LSTM model training...
📁 Loading processed ISLVT landmark data...
Found 80 landmark files to process...
✅ 1 i am good _landmarks.npy -> 'i am good' (shape: (30, 126))
✅ 10 granfather sick _landmarks.npy -> 'granfather sick' (shape: (30, 126))
✅ 11 grand mother market go _landmarks.npy -> 'grand mother market go' (shape: (30, 126))
✅ 12 grandmother market go will _landmarks.npy -> 'grandmother market go will' (shape: (30, 126))
✅ 13 before grand mother market go finish _landmarks.npy -> 'before grand mother market go finish' (shape: (30, 126))

📊 Data Loading Summary:
✅ Successfully loaded: 80 sequences
⚠️ Skipped: 0 files

🏷️ Found 80 unique signs:
   i am good: 1 videos
   granfather sick: 1 videos
   grand mother market go: 1 videos
   grandmother market go will: 1 videos
   before grand mother market go finish: 1 videos
   baby cute: 1 videos
   both just seperate: 1 videos
   they apply divorce: 1 videos
   we place see go which: 1 video