# Sign Language to Speech Translation System

This notebook implements a complete pipeline for sign language translation:
1. Sign language video to text translation
2. Grammatical error correction using an LLM
3. Text to speech conversion using ESPnet

Let's start by installing the necessary dependencies.

In [None]:
# Install required packages
!pip install torch torchvision torchaudio
!pip install transformers datasets evaluate
!pip install opencv-python mediapipe
!pip install openai
!pip install espnet espnet_model_zoo
!pip install soundfile
!pip install matplotlib

## Part 1: Sign Language to Text Translation

For this part, we'll use the WLASL dataset (Word-Level American Sign Language) and implement a model to recognize signs from video frames. We'll use MediaPipe for pose estimation and a sequence model to classify the signs.

In [None]:
import os
import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import mediapipe as mp
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
import random
import json
import requests
from pathlib import Path
import zipfile
import gdown

# Set random seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

### 1.1 Download and Prepare the WLASL Dataset

We'll use a subset of the WLASL dataset for this demonstration.

In [None]:
# Create directories for dataset
!mkdir -p data/wlasl

# Download WLASL dataset metadata
!wget -O data/wlasl/WLASL_v0.3.json https://raw.githubusercontent.com/dxli94/WLASL/master/start_kit/WLASL_v0.3.json

In [None]:
# Load the WLASL metadata
with open('data/wlasl/WLASL_v0.3.json', 'r') as f:
    wlasl_data = json.load(f)

print(f"Total number of glosses (words): {len(wlasl_data)}")

# For demonstration, we'll use a subset of the data (first 10 words)
subset_data = wlasl_data[:10]
print(f"Using a subset of {len(subset_data)} words")

# Print the words in our subset
words = [item['gloss'] for item in subset_data]
print(f"Words in our subset: {words}")

In [None]:
# Function to download videos from the dataset
def download_wlasl_videos(data, output_dir='data/wlasl/videos'):
    os.makedirs(output_dir, exist_ok=True)
    
    # Keep track of videos and their labels
    video_paths = []
    video_labels = []
    label_to_gloss = {}
    
    for label_idx, entry in enumerate(tqdm(data, desc="Downloading videos")):
        gloss = entry['gloss']
        label_to_gloss[label_idx] = gloss
        
        for instance in entry['instances']:
            video_id = instance['video_id']
            video_url = instance.get('url', '')
            
            # Skip if no URL is provided
            if not video_url:
                continue
                
            # Define output path
            output_path = os.path.join(output_dir, f"{video_id}.mp4")
            
            # Skip if already downloaded
            if os.path.exists(output_path):
                video_paths.append(output_path)
                video_labels.append(label_idx)
                continue
                
            try:
                # Download the video
                response = requests.get(video_url, stream=True, timeout=10)
                if response.status_code == 200:
                    with open(output_path, 'wb') as f:
                        for chunk in response.iter_content(chunk_size=1024):
                            if chunk:
                                f.write(chunk)
                    video_paths.append(output_path)
                    video_labels.append(label_idx)
                else:
                    print(f"Failed to download {video_id}: HTTP {response.status_code}")
            except Exception as e:
                print(f"Error downloading {video_id}: {e}")
    
    return video_paths, video_labels, label_to_gloss

In [None]:
# Download the videos for our subset
video_paths, video_labels, label_to_gloss = download_wlasl_videos(subset_data)

print(f"Downloaded {len(video_paths)} videos")
print(f"Label to gloss mapping: {label_to_gloss}")

### 1.2 Implement MediaPipe for Pose Estimation

We'll use MediaPipe to extract hand landmarks and pose information from the videos.

In [None]:
# Initialize MediaPipe solutions
mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils

def extract_landmarks(video_path, max_frames=30):
    """Extract pose, face, and hand landmarks from a video using MediaPipe."""
    cap = cv2.VideoCapture(video_path)
    
    # Get video properties
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    duration = frame_count / fps
    
    # Calculate frames to sample
    if frame_count <= max_frames:
        frames_to_sample = list(range(frame_count))
    else:
        frames_to_sample = np.linspace(0, frame_count-1, max_frames, dtype=int)
    
    landmarks_sequence = []
    
    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        for frame_idx in frames_to_sample:
            cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
            success, image = cap.read()
            
            if not success:
                # If frame read failed, append zeros
                landmarks_sequence.append(np.zeros((1, 543)))
                continue
                
            # Convert the BGR image to RGB
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            
            # Process the image and detect landmarks
            results = holistic.process(image)
            
            # Extract landmarks
            frame_landmarks = []
            
            # Pose landmarks (33 landmarks x 3 coordinates)
            if results.pose_landmarks:
                pose = [[lm.x, lm.y, lm.z] for lm in results.pose_landmarks.landmark]
                frame_landmarks.extend(np.array(pose).flatten())
            else:
                frame_landmarks.extend(np.zeros(33*3))
                
            # Left hand landmarks (21 landmarks x 3 coordinates)
            if results.left_hand_landmarks:
                left_hand = [[lm.x, lm.y, lm.z] for lm in results.left_hand_landmarks.landmark]
                frame_landmarks.extend(np.array(left_hand).flatten())
            else:
                frame_landmarks.extend(np.zeros(21*3))
                
            # Right hand landmarks (21 landmarks x 3 coordinates)
            if results.right_hand_landmarks:
                right_hand = [[lm.x, lm.y, lm.z] for lm in results.right_hand_landmarks.landmark]
                frame_landmarks.extend(np.array(right_hand).flatten())
            else:
                frame_landmarks.extend(np.zeros(21*3))
                
            # Face landmarks (we'll use a subset of 10 landmarks for simplicity)
            if results.face_landmarks:
                # Select a subset of face landmarks (e.g., eyes, nose, mouth)
                face_indices = [0, 4, 6, 8, 10, 152, 234, 454, 10, 338]  # Example indices
                face = [[results.face_landmarks.landmark[idx].x,
                         results.face_landmarks.landmark[idx].y,
                         results.face_landmarks.landmark[idx].z] for idx in face_indices]
                frame_landmarks.extend(np.array(face).flatten())
            else:
                frame_landmarks.extend(np.zeros(10*3))
                
            landmarks_sequence.append(frame_landmarks)
    
    cap.release()
    
    # Pad or truncate to ensure all sequences have the same length
    if len(landmarks_sequence) < max_frames:
        # Pad with zeros
        pad_length = max_frames - len(landmarks_sequence)
        landmarks_sequence.extend([np.zeros_like(landmarks_sequence[0])] * pad_length)
    
    return np.array(landmarks_sequence)

In [None]:
# Function to visualize the landmarks
def visualize_landmarks(video_path):
    cap = cv2.VideoCapture(video_path)
    
    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        success, image = cap.read()
        
        if not success:
            print("Failed to read video")
            return
            
        # Convert the BGR image to RGB
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        # Process the image and detect landmarks
        results = holistic.process(image)
        
        # Draw landmarks
        image_copy = image.copy()
        
        # Draw pose landmarks
        mp_drawing.draw_landmarks(
            image_copy, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS)
        
        # Draw left hand landmarks
        mp_drawing.draw_landmarks(
            image_copy, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
        
        # Draw right hand landmarks
        mp_drawing.draw_landmarks(
            image_copy, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
        
        # Draw face landmarks
        mp_drawing.draw_landmarks(
            image_copy, results.face_landmarks)
        
        plt.figure(figsize=(10, 10))
        plt.imshow(image_copy)
        plt.title("MediaPipe Landmarks")
        plt.axis('off')
        plt.show()
    
    cap.release()

In [None]:
# Visualize landmarks for the first video
if video_paths:
    visualize_landmarks(video_paths[0])

### 1.3 Create a Dataset and DataLoader for Training

In [None]:
class SignLanguageDataset(Dataset):
    def __init__(self, video_paths, labels, transform=None, max_frames=30):
        self.video_paths = video_paths
        self.labels = labels
        self.transform = transform
        self.max_frames = max_frames
        
    def __len__(self):
        return len(self.video_paths)
    
    def __getitem__(self, idx):
        video_path = self.video_paths[idx]
        label = self.labels[idx]
        
        # Extract landmarks
        landmarks = extract_landmarks(video_path, self.max_frames)
        
        # Convert to tensor
        landmarks_tensor = torch.tensor(landmarks, dtype=torch.float32)
        label_tensor = torch.tensor(label, dtype=torch.long)
        
        return landmarks_tensor, label_tensor

In [None]:
# Split the data into train and test sets
from sklearn.model_selection import train_test_split

train_paths, test_paths, train_labels, test_labels = train_test_split(
    video_paths, video_labels, test_size=0.2, random_state=42, stratify=video_labels
)

print(f"Training set: {len(train_paths)} videos")
print(f"Test set: {len(test_paths)} videos")

In [None]:
# Create datasets
train_dataset = SignLanguageDataset(train_paths, train_labels)
test_dataset = SignLanguageDataset(test_paths, test_labels)

# Create dataloaders
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False)

### 1.4 Define the Sign Language Recognition Model

We'll use a combination of LSTM and fully connected layers to classify the sign language gestures.

In [None]:
class SignLanguageModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers, num_classes):
        super(SignLanguageModel, self).__init__()
        
        self.lstm = nn.LSTM(
            input_size=input_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            dropout=0.2,
            bidirectional=True
        )
        
        self.fc1 = nn.Linear(hidden_dim * 2, hidden_dim)  # *2 for bidirectional
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.3)
        self.fc2 = nn.Linear(hidden_dim, num_classes)
        
    def forward(self, x):
        # x shape: (batch_size, sequence_length, input_dim)
        
        # LSTM output: (batch_size, sequence_length, hidden_dim * 2)
        lstm_out, _ = self.lstm(x)
        
        # Take the output from the last time step
        lstm_out = lstm_out[:, -1, :]
        
        # Fully connected layers
        out = self.fc1(lstm_out)
        out = self.relu(out)
        out = self.dropout(out)
        out = self.fc2(out)
        
        return out

In [None]:
# Initialize the model
input_dim = 543  # Total number of features per frame (pose + hands + face landmarks)
hidden_dim = 128
num_layers = 2
num_classes = len(label_to_gloss)

model = SignLanguageModel(input_dim, hidden_dim, num_layers, num_classes).to(device)
print(model)

### 1.5 Train the Model

In [None]:
# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training function
def train_model(model, train_loader, criterion, optimizer, num_epochs=10):
    model.train()
    
    for epoch in range(num_epochs):
        running_loss = 0.0
        correct = 0
        total = 0
        
        for i, (inputs, labels) in enumerate(tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}")):
            inputs, labels = inputs.to(device), labels.to(device)
            
            # Zero the parameter gradients
            optimizer.zero_grad()
            
            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            # Backward pass and optimize
            loss.backward()
            optimizer.step()
            
            # Statistics
            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        
        epoch_loss = running_loss / len(train_loader)
        epoch_acc = 100 * correct / total
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.2f}%")
    
    return model

# Evaluation function
def evaluate_model(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for inputs, labels in tqdm(test_loader, desc="Evaluating"):
            inputs, labels = inputs.to(device), labels.to(device)
            
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    accuracy = 100 * correct / total
    print(f"Test Accuracy: {accuracy:.2f}%")
    
    return accuracy, all_preds, all_labels

In [None]:
# Train the model
model = train_model(model, train_loader, criterion, optimizer, num_epochs=10)

In [None]:
# Evaluate the model
accuracy, all_preds, all_labels = evaluate_model(model, test_loader)

# Print confusion matrix
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns

cm = confusion_matrix(all_labels, all_preds)
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=[label_to_gloss[i] for i in range(len(label_to_gloss))],
            yticklabels=[label_to_gloss[i] for i in range(len(label_to_gloss))])
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()

# Print classification report
print("Classification Report:")
print(classification_report(
    all_labels, all_preds,
    target_names=[label_to_gloss[i] for i in range(len(label_to_gloss))]
))

### 1.6 Save the Model

In [None]:
# Save the model
torch.save({
    'model_state_dict': model.state_dict(),
    'label_to_gloss': label_to_gloss,
    'input_dim': input_dim,
    'hidden_dim': hidden_dim,
    'num_layers': num_layers,
    'num_classes': num_classes
}, 'sign_language_model.pth')

print("Model saved successfully!")

### 1.7 Function to Translate Sign Language Videos to Text

In [None]:
def translate_sign_to_text(video_path, model, label_to_gloss):
    """Translate a sign language video to text."""
    model.eval()
    
    # Extract landmarks
    landmarks = extract_landmarks(video_path)
    landmarks_tensor = torch.tensor(landmarks, dtype=torch.float32).unsqueeze(0).to(device)
    
    # Get prediction
    with torch.no_grad():
        outputs = model(landmarks_tensor)
        _, predicted = torch.max(outputs.data, 1)
        predicted_label = predicted.item()
    
    # Convert to text
    predicted_text = label_to_gloss[predicted_label]
    
    return predicted_text

In [None]:
# Test the translation function on a sample video
if test_paths:
    sample_video = test_paths[0]
    predicted_text = translate_sign_to_text(sample_video, model, label_to_gloss)
    print(f"Predicted sign: {predicted_text}")

## Part 2: Grammatical Error Correction using LLM

Now, we'll use an LLM to correct any grammatical errors in the translated text. We'll use OpenAI's GPT model for this purpose, but you can also use an open-source alternative like T5 or BART.

In [None]:
# Import OpenAI library
import openai
from openai import OpenAI

# Set your OpenAI API key
# client = OpenAI(api_key="your-api-key")

# For demonstration, we'll use a mock function
def correct_grammar_with_llm(text):
    """Correct grammatical errors in the text using an LLM."""
    # In a real implementation, you would use the OpenAI API like this:
    # response = client.chat.completions.create(
    #     model="gpt-3.5-turbo",
    #     messages=[
    #         {"role": "system", "content": "You are a helpful assistant that corrects grammatical errors in text."},
    #         {"role": "user", "content": f"Correct any grammatical errors in the following text: '{text}'"},
    #     ]
    # )
    # corrected_text = response.choices[0].message.content
    
    # For demonstration, we'll use a simple mock function
    print(f"Original text: {text}")
    
    # Simulate some common ASL to English grammar corrections
    # ASL often omits articles and uses different word order
    if text.lower() == "book on table":
        corrected_text = "The book is on the table."
    elif text.lower() == "me go store":
        corrected_text = "I am going to the store."
    elif text.lower() == "yesterday me sick":
        corrected_text = "I was sick yesterday."
    else:
        # Add articles and proper verb forms as a simple example
        words = text.split()
        if len(words) > 0 and words[0].lower() not in ["the", "a", "an", "i", "you", "he", "she", "we", "they"]:
            words.insert(0, "The")
        corrected_text = " ".join(words)
    
    print(f"Corrected text: {corrected_text}")
    return corrected_text

In [None]:
# Alternative: Use Hugging Face's T5 model for grammar correction
from transformers import T5ForConditionalGeneration, T5Tokenizer

def load_grammar_correction_model():
    """Load a pre-trained T5 model for grammar correction."""
    tokenizer = T5Tokenizer.from_pretrained("prithivida/grammar_error_correcter_v1")
    model = T5ForConditionalGeneration.from_pretrained("prithivida/grammar_error_correcter_v1")
    model.to(device)
    return model, tokenizer

def correct_grammar_with_t5(text, model, tokenizer):
    """Correct grammatical errors in the text using T5."""
    input_text = f"grammar: {text}"
    input_ids = tokenizer.encode(input_text, return_tensors="pt").to(device)
    
    outputs = model.generate(input_ids, max_length=128)
    corrected_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
    
    print(f"Original text: {text}")
    print(f"Corrected text: {corrected_text}")
    
    return corrected_text

In [None]:
# Load the T5 model for grammar correction
grammar_model, grammar_tokenizer = load_grammar_correction_model()

# Test the grammar correction function
sample_texts = [
    "book on table",
    "me go store",
    "yesterday me sick",
    "she eat apple"
]

for text in sample_texts:
    # Using the mock function
    print("\nUsing mock function:")
    corrected_text_mock = correct_grammar_with_llm(text)
    
    # Using T5
    print("\nUsing T5 model:")
    corrected_text_t5 = correct_grammar_with_t5(text, grammar_model, grammar_tokenizer)

## Part 3: Text to Speech Conversion using ESPnet

Finally, we'll convert the corrected text to speech using ESPnet.

In [None]:
# Import ESPnet libraries
import soundfile as sf
from espnet2.bin.tts_inference import Text2Speech
from espnet_model_zoo.downloader import ModelDownloader

In [None]:
# Download and load a pre-trained TTS model
def load_tts_model():
    """Download and load a pre-trained ESPnet TTS model."""
    d = ModelDownloader()
    model_config = d.download_and_unpack("kan-bayashi/ljspeech_tacotron2")
    tts = Text2Speech(**model_config["tts_train_args"])
    tts.load_state_dict(d.download_and_unpack(model_config["tts_model_file"]))
    return tts

In [None]:
# Function to convert text to speech
def text_to_speech(text, tts_model, output_path="output.wav"):
    """Convert text to speech using ESPnet."""
    with torch.no_grad():
        wav = tts_model(text)["wav"]
    
    sf.write(output_path, wav.numpy(), tts_model.fs, "PCM_16")
    print(f"Speech saved to {output_path}")
    
    # Play the audio
    from IPython.display import Audio
    return Audio(output_path)

In [None]:
# Load the TTS model
tts_model = load_tts_model()

# Test the TTS function
sample_text = "The book is on the table."
audio = text_to_speech(sample_text, tts_model, "sample_output.wav")
audio

## Part 4: Putting It All Together - Complete Pipeline

Now, let's create a complete pipeline that takes a sign language video as input and outputs speech.

In [None]:
def sign_language_to_speech_pipeline(video_path, sign_model, label_to_gloss, grammar_model, grammar_tokenizer, tts_model, output_path="output.wav"):
    """Complete pipeline from sign language video to speech."""
    # Step 1: Translate sign language to text
    print("Step 1: Translating sign language to text...")
    raw_text = translate_sign_to_text(video_path, sign_model, label_to_gloss)
    print(f"Raw text: {raw_text}")
    
    # Step 2: Correct grammar
    print("\nStep 2: Correcting grammar...")
    corrected_text = correct_grammar_with_t5(raw_text, grammar_model, grammar_tokenizer)
    
    # Step 3: Convert to speech
    print("\nStep 3: Converting to speech...")
    audio = text_to_speech(corrected_text, tts_model, output_path)
    
    return raw_text, corrected_text, audio

In [None]:
# Test the complete pipeline
if test_paths:
    sample_video = test_paths[0]
    raw_text, corrected_text, audio = sign_language_to_speech_pipeline(
        sample_video, model, label_to_gloss, grammar_model, grammar_tokenizer, tts_model, "final_output.wav"
    )
    audio

## Part 5: Evaluation of the Complete System

Let's evaluate the performance of our complete system on the test set.

In [None]:
def evaluate_complete_system(test_paths, test_labels, sign_model, label_to_gloss, grammar_model, grammar_tokenizer):
    """Evaluate the complete system on the test set."""
    sign_recognition_correct = 0
    total = len(test_paths)
    
    results = []
    
    for i, (video_path, true_label) in enumerate(zip(test_paths, test_labels)):
        print(f"Processing video {i+1}/{total}...")
        
        # Step 1: Translate sign language to text
        raw_text = translate_sign_to_text(video_path, sign_model, label_to_gloss)
        true_text = label_to_gloss[true_label]
        
        # Check if sign recognition is correct
        sign_correct = (raw_text == true_text)
        if sign_correct:
            sign_recognition_correct += 1
        
        # Step 2: Correct grammar
        corrected_text = correct_grammar_with_t5(raw_text, grammar_model, grammar_tokenizer)
        
        results.append({
            'video_path': video_path,
            'true_label': true_label,
            'true_text': true_text,
            'raw_text': raw_text,
            'corrected_text': corrected_text,
            'sign_correct': sign_correct
        })
    
    # Calculate accuracy
    sign_recognition_accuracy = 100 * sign_recognition_correct / total
    print(f"Sign Recognition Accuracy: {sign_recognition_accuracy:.2f}%")
    
    return results

In [None]:
# Evaluate the complete system
evaluation_results = evaluate_complete_system(
    test_paths[:10],  # Use a subset for demonstration
    test_labels[:10],
    model,
    label_to_gloss,
    grammar_model,
    grammar_tokenizer
)

In [None]:
# Display the evaluation results
import pandas as pd

results_df = pd.DataFrame(evaluation_results)
results_df = results_df[['true_text', 'raw_text', 'corrected_text', 'sign_correct']]
results_df

## Conclusion

In this notebook, we've built a complete sign language to speech translation system with three main components:

1. **Sign Language to Text Translation**: We used MediaPipe to extract landmarks from sign language videos and trained an LSTM-based model to recognize signs.

2. **Grammatical Error Correction**: We used a T5 model to correct any grammatical errors in the translated text, making it more natural and fluent.

3. **Text to Speech Conversion**: We used ESPnet to convert the corrected text to speech, completing the pipeline.

This system can be further improved by:
- Using a larger and more diverse dataset for sign language recognition
- Implementing continuous sign language recognition for full sentences
- Fine-tuning the grammar correction model specifically for sign language translation
- Using a more advanced TTS model for more natural-sounding speech

The system demonstrates the potential of AI to bridge communication gaps between deaf and hearing communities.