### Pipeline Overview
- Load and Parse Data: Read and preprocess the keypoint data from .pkl files.
- Feature Augmentation: Add relative positional encodings and unique feature embeddings.
- Model Design: Define a Transformer-based model leveraging these augmented features.
- Training Loop: Prepare data loaders, define loss function, and train the model.


#### Step 1: Data Loading and Parsing

In [21]:
import pickle
import torch
import numpy as np

# Define the dataset loader
def read_pkl(path):
    with open(path, 'rb') as f:
        data = pickle.load(f)
    return data

path = '/nas/Dataset/Phoenix/phoenix-2014-keypoints.pkl'
data = read_pkl(path)

# Process dataset structure
def process_dataset(data):
    videos = {}
    for video_key, video_data in data.items():
        keypoints = video_data['keypoints']  # [frames, 133, 3]
        videos[video_key] = keypoints
    return videos

videos = process_dataset(data)
print("Number of videos:", len(videos))


Number of videos: 6841


In [None]:
# Define adjacency list for hand joints (simplified example)
ADJACENCY_LIST = {
    91: [92],  # Wrist to first joint
    92: [93],  # First joint to second joint
    93: [94, 95],  # Second joint to two fingertips
    96: [97],  # Repeat for other fingers...
    # Add the remaining keypoints according to the hierarchy
}

### Step 2: Feature Augmentation
Relative Positional Encoding

In [22]:
# Define adjacency list for hand joints
ADJACENCY_LIST = {
    91: [92],  # Wrist to first joint
    92: [93],  # First joint to second joint
    93: [94, 95],  # Second joint to two fingertips
    96: [97],  # Repeat for other fingers...
    # Add the remaining keypoints according to the hierarchy
}

# Create adjacency matrix
def create_adjacency_matrix(num_keypoints, adjacency_list):
    adjacency_matrix = np.zeros((num_keypoints, num_keypoints))
    for parent, children in adjacency_list.items():
        for child in children:
            adjacency_matrix[parent, child] = 1
            adjacency_matrix[child, parent] = 1
    return adjacency_matrix

adj_matrix = create_adjacency_matrix(133, ADJACENCY_LIST)

# Compute relative positional encoding
def relative_position_encoding(keypoints, adjacency_matrix):
    num_keypoints = keypoints.shape[1]
    rel_pos_encodings = np.zeros_like(keypoints)
    for i in range(num_keypoints):
        neighbors = np.where(adjacency_matrix[i] == 1)[0]
        for neighbor in neighbors:
            rel_pos_encodings[:, i, :2] += keypoints[:, neighbor, :2] - keypoints[:, i, :2]  # x, y differences
    return rel_pos_encodings


##### Unique Feature Embeddings

In [23]:
from sklearn.preprocessing import OneHotEncoder

# Keypoint types
KEYPOINT_TYPES = {
    91: 'wrist',
    92: 'joint1',
    93: 'joint2',
    94: 'tip',
    # Add remaining keypoint types...
}

# Create feature embeddings
def create_feature_embeddings(num_keypoints, keypoint_types):
    encoder = OneHotEncoder()
    types = [keypoint_types.get(i, 'unknown') for i in range(num_keypoints)]
    type_embeddings = encoder.fit_transform(np.array(types).reshape(-1, 1)).toarray()
    return type_embeddings

type_embeddings = create_feature_embeddings(133, KEYPOINT_TYPES)
type_embeddings = torch.tensor(type_embeddings, dtype=torch.float32)


### 4. Attention Mechanism
- Adjust attention scores based on relative positions.

In [24]:
# Compute attention scores
def compute_attention_scores(keypoints, adjacency_matrix):
    num_keypoints = keypoints.shape[1]
    batch_size = keypoints.shape[0]
    attention_scores = np.zeros((batch_size, num_keypoints, num_keypoints))
    for i in range(num_keypoints):
        for j in range(num_keypoints):
            if adjacency_matrix[i, j] == 1:
                diff = keypoints[:, i, :2] - keypoints[:, j, :2]
                distance = np.linalg.norm(diff, axis=1)  # Compute distance
                attention_scores[:, i, j] = 1 / (1 + distance)  # Higher for closer points
    return attention_scores


5. End-to-End Pipeline
- Combine all components.

In [25]:
def preprocess_video(video_keypoints, adj_matrix, type_embeddings):
    """
    Process a single video's keypoints:
    - Add relative positional encodings.
    - Add unique feature embeddings.
    - Compute adjusted attention scores.
    """
    # Compute relative positional encoding
    rel_pos_enc = relative_position_encoding(video_keypoints, adj_matrix)
    
    # Combine relative positional encodings with keypoints
    augmented_keypoints = np.concatenate([video_keypoints, rel_pos_enc], axis=-1)

    # Add type embeddings
    num_frames = augmented_keypoints.shape[0]
    repeated_embeddings = np.tile(type_embeddings.numpy(), (num_frames, 1, 1))
    augmented_keypoints = np.concatenate([augmented_keypoints, repeated_embeddings], axis=-1)

    # Compute attention scores
    attention_scores = compute_attention_scores(video_keypoints, adj_matrix)

    return augmented_keypoints, attention_scores

# Process all videos
processed_data = {}
for video_key, keypoints in videos.items():
    augmented_keypoints, attention_scores = preprocess_video(keypoints, adj_matrix, type_embeddings)
    processed_data[video_key] = {
        'keypoints': augmented_keypoints,
        'attention_scores': attention_scores,
    }

print("Processed data for all videos.")


  s = (x.conj() * x).real
  return sqrt(add.reduce(s, axis=axis, keepdims=keepdims))


Processed data for all videos.


### 6. Transformer Model Integration
- You can now integrate this data into a Transformer model. Here's an example of a simplified model:

In [26]:
import torch.nn as nn

class TransformerModel(nn.Module):
    def __init__(self, input_dim, num_heads, num_layers, num_classes):
        super(TransformerModel, self).__init__()
        self.embedding = nn.Linear(input_dim, 256)
        self.transformer = nn.Transformer(
            d_model=256, nhead=num_heads, num_encoder_layers=num_layers, batch_first=True
        )
        self.fc = nn.Linear(256, num_classes)

    def forward(self, x, attention_mask=None):
        x = self.embedding(x)
        x = self.transformer(x, src_mask=attention_mask)
        x = x.mean(dim=1)  # Global average pooling
        x = self.fc(x)
        return x

# Instantiate the model
model = TransformerModel(input_dim=133 + 4, num_heads=8, num_layers=6, num_classes=100)
print(model)


TransformerModel(
  (embedding): Linear(in_features=137, out_features=256, bias=True)
  (transformer): Transformer(
    (encoder): TransformerEncoder(
      (layers): ModuleList(
        (0): TransformerEncoderLayer(
          (self_attn): MultiheadAttention(
            (out_proj): NonDynamicallyQuantizableLinear(in_features=256, out_features=256, bias=True)
          )
          (linear1): Linear(in_features=256, out_features=2048, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
          (linear2): Linear(in_features=2048, out_features=256, bias=True)
          (norm1): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
          (norm2): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
          (dropout1): Dropout(p=0.1, inplace=False)
          (dropout2): Dropout(p=0.1, inplace=False)
        )
        (1): TransformerEncoderLayer(
          (self_attn): MultiheadAttention(
            (out_proj): NonDynamicallyQuantizableLinear(in_features=256, out_feature

### 1. Data Preprocessing for Gesture Recognition
- The input consists of video data structured as sequences of frames, with keypoints for each frame. We'll use the processed data to create gesture sequences.

In [27]:
# Define preprocessing for each video
def preprocess_videos_for_model(videos, adj_matrix, type_embeddings, max_frames=200):
    processed_videos = {}
    for video_key, keypoints in videos.items():
        # Normalize keypoints (optional)
        normalized_keypoints = keypoints.copy()
        normalized_keypoints[..., :2] = (keypoints[..., :2] - 0.5) * 2  # Scale x, y between -1 and 1
        
        # Process each video
        augmented_keypoints, attention_scores = preprocess_video(normalized_keypoints, adj_matrix, type_embeddings)
        
        # Padding or truncating to `max_frames`
        num_frames = augmented_keypoints.shape[0]
        if num_frames > max_frames:
            augmented_keypoints = augmented_keypoints[:max_frames]
            attention_scores = attention_scores[:max_frames, :, :]
        else:
            pad_size = max_frames - num_frames
            pad_shape = (pad_size,) + augmented_keypoints.shape[1:]
            augmented_keypoints = np.concatenate([augmented_keypoints, np.zeros(pad_shape)], axis=0)
            attention_scores = np.concatenate([attention_scores, np.zeros((pad_size,) + attention_scores.shape[1:])], axis=0)

        processed_videos[video_key] = {
            'keypoints': torch.tensor(augmented_keypoints, dtype=torch.float32),
            'attention_scores': torch.tensor(attention_scores, dtype=torch.float32),
        }
    
    return processed_videos

# Process all videos
processed_videos = preprocess_videos_for_model(videos, adj_matrix, type_embeddings)


  import sys


### 2. Transformer Model Design
- A Transformer model tailored for sequence-to-sequence classification for sign gesture recognition.

In [28]:
import torch.nn as nn
import torch.nn.functional as F

class SignLanguageTransformer(nn.Module):
    def __init__(self, input_dim, num_heads, num_layers, num_classes, max_frames):
        super(SignLanguageTransformer, self).__init__()
        self.embedding = nn.Linear(input_dim, 256)  # Embedding layer
        self.positional_encoding = nn.Parameter(torch.zeros(1, max_frames, 256))  # Learnable positional encoding
        self.transformer = nn.Transformer(
            d_model=256, nhead=num_heads, num_encoder_layers=num_layers, batch_first=True
        )
        self.fc = nn.Linear(256, num_classes)  # Final classification layer

    def forward(self, x, attention_mask=None):
        batch_size, seq_len, _ = x.size()
        x = self.embedding(x)
        x += self.positional_encoding[:, :seq_len, :]  # Add positional encoding
        x = self.transformer(x, src_key_padding_mask=attention_mask)
        x = x.mean(dim=1)  # Global average pooling
        x = self.fc(x)
        return x

# Define model
input_dim = 133 + 4 + type_embeddings.shape[1]  # Keypoints, relative encoding, and type embeddings
num_heads = 8
num_layers = 6
num_classes = 50  # Number of gestures in the dataset
max_frames = 200

model = SignLanguageTransformer(input_dim, num_heads, num_layers, num_classes, max_frames)


### 3. Training Pipeline
Define the loss function, optimizer, and training loop.

In [29]:
import torch.optim as optim

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# Training loop
def train_model(model, processed_videos, labels, epochs=10):
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for video_key, data in processed_videos.items():
            keypoints = data['keypoints']  # Input keypoints
            attention_mask = None  # Placeholder for optional attention masks
            label = torch.tensor(labels[video_key], dtype=torch.long)  # Ground truth label

            # Forward pass
            optimizer.zero_grad()
            output = model(keypoints.unsqueeze(0), attention_mask)
            loss = criterion(output, label.unsqueeze(0))
            
            # Backward pass
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        
        print(f"Epoch {epoch + 1}/{epochs}, Loss: {total_loss:.4f}")

# Example labels (dummy data)
labels = {video_key: i % num_classes for i, video_key in enumerate(processed_videos.keys())}

# Train the model
train_model(model, processed_videos, labels, epochs=10)


ValueError: too many values to unpack (expected 3)

### GPU-Enabled Pipeline for Sign Gesture Recognition

In [32]:
import random
from sklearn.model_selection import train_test_split

# Assuming `data` is the dictionary with keypoint data loaded from the .pkl file
# data structure example: {'video_key1': {'keypoints': [...], 'label': ...}, ...}

# Convert data to a list of samples
all_data = [{'keypoints': torch.tensor(video_data['keypoints'], dtype=torch.float32), 
             'label': video_data['label']}
            for video_data in data.values()]

# Split data into train, validation, and test sets
train_split, test_split = train_test_split(all_data, test_size=0.2, random_state=42)
train_split, val_split = train_test_split(train_split, test_size=0.2, random_state=42)

# Dataset objects
train_dataset = GestureRecognitionDataset(train_split)
val_dataset = GestureRecognitionDataset(val_split)
test_dataset = GestureRecognitionDataset(test_split)

# Data preparation
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


KeyError: 'label'

In [30]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset

# Assuming the above stages for preprocessing and dataset creation are in place

class GestureRecognitionDataset(Dataset):
    def __init__(self, data):
        self.data = data
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        sample = self.data[idx]
        keypoints = torch.tensor(sample['keypoints'], dtype=torch.float32)
        label = torch.tensor(sample['label'], dtype=torch.long)
        return keypoints, label

# Model definition
class TransformerModel(nn.Module):
    def __init__(self, input_dim, embedding_dim, nhead, num_layers, num_classes):
        super(TransformerModel, self).__init__()
        self.embedding = nn.Linear(input_dim, embedding_dim)
        self.positional_encoding = nn.Parameter(torch.zeros(1, 1, embedding_dim))
        self.transformer = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=embedding_dim, nhead=nhead), 
            num_layers=num_layers
        )
        self.fc = nn.Linear(embedding_dim, num_classes)
    
    def forward(self, x):
        x = self.embedding(x) + self.positional_encoding
        x = self.transformer(x)
        x = self.fc(x.mean(dim=1))  # Average pooling
        return x

# Data preparation
# Assuming the dataset is already split into train, validation, and test
train_data = GestureRecognitionDataset(train_dataset)
val_data = GestureRecognitionDataset(val_dataset)
test_data = GestureRecognitionDataset(test_dataset)

train_loader = DataLoader(train_data, batch_size=32, shuffle=True)
val_loader = DataLoader(val_data, batch_size=32, shuffle=False)
test_loader = DataLoader(test_data, batch_size=32, shuffle=False)

# Model initialization
input_dim = 133 * 2  # x, y coordinates for 133 keypoints
embedding_dim = 256
nhead = 8
num_layers = 4
num_classes = len(label_map)  # Number of unique gesture classes

model = TransformerModel(input_dim, embedding_dim, nhead, num_layers, num_classes).to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 20

for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0

    for keypoints, labels in train_loader:
        keypoints, labels = keypoints.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(keypoints)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {train_loss/len(train_loader)}")

    # Validation loop
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for keypoints, labels in val_loader:
            keypoints, labels = keypoints.to(device), labels.to(device)

            outputs = model(keypoints)
            loss = criterion(outputs, labels)
            val_loss += loss.item()

            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    print(f"Validation Loss: {val_loss/len(val_loader)}, Accuracy: {100 * correct / total}%")


NameError: name 'train_dataset' is not defined

In [34]:
# Inspect video keys
for video_key in data.keys():
    print(video_key)
    break  # Print one example


fullFrame-210x260px/train/01April_2010_Thursday_heute_default-0/1/01April_2010_Thursday_heute


In [36]:
# Inspect video keys
for video_key in data.keys():
    print(video_key)
    break  # Print one example


AttributeError: 'str' object has no attribute 'keys'

In [31]:
model.eval()
test_accuracy = 0.0
total = 0

with torch.no_grad():
    for keypoints, labels in test_loader:
        keypoints, labels = keypoints.to(device), labels.to(device)

        outputs = model(keypoints)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        test_accuracy += (predicted == labels).sum().item()

print(f"Test Accuracy: {100 * test_accuracy / total}%")


NameError: name 'test_loader' is not defined

In [33]:
pwd

'/home/muhiddin/SLRT/TwoStreamNetwork'

### 4. Evaluation Metrics
- Use evaluation metrics such as accuracy and Word Error Rate (WER).

In [None]:
from sklearn.metrics import accuracy_score

def evaluate_model(model, processed_videos, labels):
    model.eval()
    predictions = []
    ground_truth = []
    
    with torch.no_grad():
        for video_key, data in processed_videos.items():
            keypoints = data['keypoints']
            label = labels[video_key]
            output = model(keypoints.unsqueeze(0))
            pred = torch.argmax(output, dim=1).item()
            
            predictions.append(pred)
            ground_truth.append(label)
    
    # Compute accuracy
    accuracy = accuracy_score(ground_truth, predictions)
    print(f"Accuracy: {accuracy:.4f}")
    
    return accuracy

# Evaluate the model
evaluate_model(model, processed_videos, labels)
