In [1]:
import os
import logging
import cv2
import numpy as np
import json
from typing import List, Dict, Any, Set
import concurrent.futures

class VideoLoader:
    def __init__(self, log_dir='logs'):
        if not os.path.exists(log_dir):
            os.makedirs(log_dir)
        self.logger = logging.getLogger(__name__)
        log_file = os.path.join(log_dir, "frame_normaliser.log")
        handler = logging.FileHandler(log_file)
        handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)

    def load_frames(self, video_path: str, bbox: List[int], frame_start: int, frame_end: int) -> np.ndarray:
        frames = []
        try:
            cap = cv2.VideoCapture(video_path)
            if not cap.isOpened():
                raise ValueError(f"Error opening video file: {video_path}")
            
            frame_count = 0
            while True:
                ret, frame = cap.read()
                if not ret:
                    break
                
                frame_count += 1
                if frame_count < frame_start:
                    continue
                if frame_end != -1 and frame_count > frame_end:
                    break
                
                # Crop the frame using bbox
                x1, y1, x2, y2 = bbox
                cropped_frame = frame[y1:y2, x1:x2]
                frames.append(cropped_frame)
            
            cap.release()
            if len(frames) == 0:
                self.logger.warning(f"No frames extracted from video: {video_path}")
            return np.array(frames)
        except Exception as e:
            self.logger.error(f"Error loading frames from video {video_path}: {e}")
            return None

class WLASLDatasetLoader:
    def __init__(self, json_path: str, missing_file_path: str, video_dir: str, log_dir='logs', max_workers: int = 4, batch_size: int = 10):
        self.json_path = json_path
        self.missing_file_path = missing_file_path
        self.video_dir = video_dir
        self.metadata = self._load_json()
        self.missing_videos = self._load_missing_videos()
        self.video_loader = VideoLoader(log_dir=log_dir)
        self.max_workers = max_workers
        self.batch_size = batch_size

    def _load_json(self) -> List[Dict[str, Any]]:
        with open(self.json_path, 'r') as f:
            return json.load(f)

    def _load_missing_videos(self) -> Set[str]:
        with open(self.missing_file_path, 'r') as f:
            return set(line.strip() for line in f)

    def _get_video_path(self, video_id: str) -> str:
        return os.path.join(self.video_dir, f"{video_id}.mp4")

    def _load_frames_for_instance(self, instance: Dict[str, Any]) -> Dict[str, Any]:
        video_id = instance['video_id']
        if video_id in self.missing_videos:
            return None
        video_path = self._get_video_path(video_id)
        bbox = instance['bbox']
        frame_start = instance['frame_start']
        frame_end = instance['frame_end']
        frames = self.video_loader.load_frames(video_path, bbox, frame_start, frame_end)
        if frames is not None:
            return {
                'gloss': instance.get('gloss', 'unknown'),
                'video_id': video_id,
                'bbox': bbox,
                'fps': instance['fps'],
                'frame_start': frame_start,
                'frame_end': frame_end,
                'frames': frames
            }
        else:
            print(f"Warning: Unable to load frames for video {video_id}")
            return None

    def load_dataset(self, limit=None) -> List[Dict[str, Any]]:
        dataset = []
        batch = []
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            for entry in self.metadata:
                for instance in entry['instances']:
                    if limit and len(dataset) >= limit:
                        return dataset
                    batch.append(instance)
                    if len(batch) >= self.batch_size:
                        futures = [executor.submit(self._load_frames_for_instance, inst) for inst in batch]
                        for future in concurrent.futures.as_completed(futures):
                            try:
                                result = future.result()
                                if result is not None:
                                    dataset.append(result)
                            except Exception as e:
                                self.logger.error(f"Error loading frames for instance: {e}")
                        batch = []
            if batch:
                futures = [executor.submit(self._load_frames_for_instance, inst) for inst in batch]
                for future in concurrent.futures.as_completed(futures):
                    try:
                        result = future.result()
                        if result is not None:
                            dataset.append(result)
                    except Exception as e:
                        self.logger.error(f"Error loading frames for instance: {e}")
        return dataset

    def get_statistics(self) -> Dict[str, int]:
        total_videos = sum(len(entry['instances']) for entry in self.metadata)
        loaded_videos = len([instance for entry in self.metadata for instance in entry['instances'] if instance['video_id'] not in self.missing_videos])
        missing_videos = len(self.missing_videos)
        return {
            'total_videos': total_videos,
            'loaded_videos': loaded_videos,
            'missing_videos': missing_videos
        }

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

class SignLanguageDataset(Dataset):
    def __init__(self, data, transform=None):
        self.data = data
        self.transform = transform
        self.label_encoder = LabelEncoder()
        self.labels = self.label_encoder.fit_transform([item['gloss'] for item in data])

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]
        frames = item['frames']
        label = self.labels[idx]

        if self.transform:
            frames = self.transform(frames)

        # Ensure frames are in the correct shape (C, T, H, W)
        frames = np.transpose(frames, (3, 0, 1, 2))
        return torch.FloatTensor(frames), torch.LongTensor([label]).squeeze()

class Sign3DCNN(nn.Module):
    def __init__(self, num_classes, input_channels=3):
        super(Sign3DCNN, self).__init__()
        self.conv1 = nn.Conv3d(input_channels, 64, kernel_size=(3, 3, 3), padding=(1, 1, 1))
        self.pool1 = nn.MaxPool3d(kernel_size=(1, 2, 2), stride=(1, 2, 2))
        self.conv2 = nn.Conv3d(64, 128, kernel_size=(3, 3, 3), padding=(1, 1, 1))
        self.pool2 = nn.MaxPool3d(kernel_size=(2, 2, 2), stride=(2, 2, 2))
        self.conv3 = nn.Conv3d(128, 256, kernel_size=(3, 3, 3), padding=(1, 1, 1))
        self.pool3 = nn.MaxPool3d(kernel_size=(2, 2, 2), stride=(2, 2, 2))
        self.conv4 = nn.Conv3d(256, 256, kernel_size=(3, 3, 3), padding=(1, 1, 1))
        self.pool4 = nn.MaxPool3d(kernel_size=(2, 2, 2), stride=(2, 2, 2))
        
        self.avgpool = nn.AdaptiveAvgPool3d((1, 1, 1))
        self.fc1 = nn.Linear(256, 512)
        self.fc2 = nn.Linear(512, num_classes)
        
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.relu(self.conv1(x))
        x = self.pool1(x)
        x = self.relu(self.conv2(x))
        x = self.pool2(x)
        x = self.relu(self.conv3(x))
        x = self.pool3(x)
        x = self.relu(self.conv4(x))
        x = self.pool4(x)
        
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

def train_model(model, train_loader, val_loader, num_epochs=50, learning_rate=0.001):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.1)

    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        train_correct = 0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs, 1)
            train_correct += (predicted == labels).sum().item()

        train_loss = train_loss / len(train_loader.dataset)
        train_acc = train_correct / len(train_loader.dataset)

        model.eval()
        val_loss = 0.0
        val_correct = 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item() * inputs.size(0)
                _, predicted = torch.max(outputs, 1)
                val_correct += (predicted == labels).sum().item()

        val_loss = val_loss / len(val_loader.dataset)
        val_acc = val_correct / len(val_loader.dataset)

        print(f'Epoch {epoch+1}/{num_epochs}:')
        print(f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}')
        print(f'Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}')

        scheduler.step()

    return model

In [6]:
dataset_metadata = '/Users/dxt/Documents/ObjectDetection/WordForSign/data/METADATA.json'
missing_metadata = '/Users/dxt/Documents/ObjectDetection/WordForSign/data/missingInfo.txt'
videos_dataset = '/Users/dxt/Documents/ObjectDetection/WordForSign/data/videos'
max_workers = 4
batch_size = 4
num_epochs = 2
learning_rate = 0.001
num_classes = 100
log_dir = '/Users/dxt/Desktop/beta_/logs'

In [7]:
dataset_loader = WLASLDatasetLoader(
    json_path=dataset_metadata,
    missing_file_path=missing_metadata,
    video_dir=videos_dataset,
    max_workers=max_workers,
    batch_size=batch_size,
    log_dir=log_dir
)  # Initialize with appropriate parameters
full_dataset = dataset_loader.load_dataset()

train_data, val_data = train_test_split(full_dataset, test_size=0.2, random_state=42)

transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((112, 112)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])

train_dataset = SignLanguageDataset(train_data, transform=transform)
val_dataset = SignLanguageDataset(val_data, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, num_workers=4)

# Initialize and train the model
num_classes = len(set(item['gloss'] for item in full_dataset))
model = Sign3DCNN(num_classes)
trained_model = train_model(model, train_loader, val_loader)

# Save the trained model
torch.save(trained_model.state_dict(), 'sign_language_3dcnn.pth')

print("Model training completed and saved.")

: 