In [9]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import seaborn as sns
import os
import glob
import cv2
from sklearn.metrics import classification_report, confusion_matrix


In [10]:

def plot_training_history(history):
    plt.figure(figsize=(15, 5))
    
    # Plot accuracy
    plt.subplot(1, 2, 1)
    plt.plot(history['train_acc'], label='Train')
    plt.plot(history['val_acc'], label='Validation')
    plt.title('Model Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.legend()
    plt.grid(True)
    
    # Plot loss
    plt.subplot(1, 2, 2)
    plt.plot(history['train_loss'], label='Train')
    plt.plot(history['val_loss'], label='Validation')
    plt.title('Model Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)
    
    plt.tight_layout()
    plt.savefig('training_history.png')
    plt.close()

def plot_confusion_matrix(y_true, y_pred):
    plt.figure(figsize=(10, 8))
    cm = confusion_matrix(y_true, y_pred)
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=CLASSES, yticklabels=CLASSES)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Confusion Matrix')
    plt.tight_layout()
    plt.savefig('confusion_matrix.png')
    plt.close()

def plot_class_distribution(y_train, y_test):
    plt.figure(figsize=(12, 6))
    
    # Count samples in each class
    train_counts = np.bincount(y_train, minlength=len(CLASSES))
    test_counts = np.bincount(y_test, minlength=len(CLASSES))
    
    # Create bar plot
    x = np.arange(len(CLASSES))
    width = 0.35
    
    plt.bar(x - width/2, train_counts, width, label='Training', color='skyblue')
    plt.bar(x + width/2, test_counts, width, label='Testing', color='lightcoral')
    
    plt.xlabel('Classes')
    plt.ylabel('Number of Samples')
    plt.title('Distribution of Samples in Training and Testing Sets')
    plt.xticks(x, CLASSES, rotation=45, ha='right')
    plt.legend()
    
    # Add value labels
    for i, count in enumerate(train_counts):
        plt.text(i - width/2, count, str(count), ha='center', va='bottom')
    for i, count in enumerate(test_counts):
        plt.text(i + width/2, count, str(count), ha='center', va='bottom')
    
    plt.tight_layout()
    plt.savefig('class_distribution.png')
    plt.close()


In [11]:
CLASSES = ['HUGGING', 'KICKING', 'PUNCHING', 'PUSHING', 'HANDSHAKING', 'POINTING']

FOLDER_TO_CLASS = {
    'hugging': 'HUGGING',
    'kick': 'KICKING',
    'punching': 'PUNCHING',
    'push': 'PUSHING',
    'hand shake': 'HANDSHAKING',
    'pointting': 'POINTING'
}

FOLDER_TO_CSV_PREFIX = {
    'hugging': 'Hugging',
    'kick': 'Kicking',
    'punching': 'Punching',
    'push': 'Pushing',
    'hand shake': 'Handshaking',
    'pointting': 'Pointing'
}

ACTION_DESCRIPTIONS = {
    'HUGGING': 'Two people embracing each other in a friendly manner',
    'KICKING': 'Person performing a kicking motion towards another person',
    'PUNCHING': 'Person throwing a punch or striking motion',
    'PUSHING': 'Person using force to push another person',
    'HANDSHAKING': 'Two people engaging in a formal handshake greeting',
    'POINTING': 'Person extending arm and finger to point at something/someone'
}


print("Loading data...")
data_dir = os.path.join("drive", "data")
csv_dir = os.path.join("drive", "csv")

all_samples = []
all_labels = []

categories = []
if os.path.exists(csv_dir) and os.path.isdir(csv_dir):
    categories = [d for d in os.listdir(csv_dir) if os.path.isdir(os.path.join(csv_dir, d))]


Loading data...


In [12]:
if not categories:
    for class_idx, class_name in enumerate(CLASSES):
        for i in range(200):  # زيادة عدد العينات لكل فئة
            rows = np.random.randint(30, 60)
            cols = np.random.randint(30, 50)
            dummy_features = np.random.randn(rows, cols)
            dummy_features = (dummy_features - dummy_features.mean()) / (dummy_features.std() + 1e-5)
            all_samples.append(dummy_features)
            all_labels.append(class_idx)
            
            for noise_level in [0.05, 0.1, 0.15, 0.2]:
                noise = np.random.normal(0, noise_level, dummy_features.shape)
                augmented = dummy_features + noise
                augmented = (augmented - augmented.mean()) / (augmented.std() + 1e-5)
                all_samples.append(augmented)
                all_labels.append(class_idx)
else:
    class_counts = {cls: 0 for cls in CLASSES}
    
    for category in categories:
        if category not in FOLDER_TO_CLASS:
            continue
            
        class_name = FOLDER_TO_CLASS[category]
        class_idx = CLASSES.index(class_name)
        
        # البحث عن ملفات CSV في مجلد الفئة
        category_dir = os.path.join(csv_dir, category)
        csv_files = glob.glob(os.path.join(category_dir, "*.csv"))
        
        if not csv_files:
            for i in range(100):
                rows = np.random.randint(30, 60)
                cols = np.random.randint(30, 50)
                dummy_features = np.random.randn(rows, cols)
                dummy_features = (dummy_features - dummy_features.mean()) / (dummy_features.std() + 1e-5)
                all_samples.append(dummy_features)
                all_labels.append(class_idx)
                
                # إضافة عينات مع ضوضاء
                for noise_level in [0.05, 0.1, 0.15, 0.2]:
                    noise = np.random.normal(0, noise_level, dummy_features.shape)
                    augmented = dummy_features + noise
                    augmented = (augmented - augmented.mean()) / (augmented.std() + 1e-5)
                    all_samples.append(augmented)
                    all_labels.append(class_idx)
                
            class_counts[class_name] += 500  # 100 عينة أصلية + 400 عينة مزيدة
            continue
        
        for csv_file in csv_files:
            try:
                df = pd.read_csv(csv_file)
                if 'Unnamed: 0' in df.columns:
                    df = df.drop('Unnamed: 0', axis=1)
                
                features = df.values
                
                if features.size == 0 or features.shape[0] < 3 or features.shape[1] < 3:
                    continue
                
                # تطبيع البيانات
                features = (features - features.mean()) / (features.std() + 1e-5)
                
                # إضافة العينة الأصلية
                all_samples.append(features)
                all_labels.append(class_idx)
                class_counts[class_name] += 1
                
                # زيادة البيانات بطرق متعددة
                # 1. إضافة ضوضاء
                for noise_level in [0.05, 0.1, 0.15, 0.2]:
                    noise = np.random.normal(0, noise_level, features.shape)
                    augmented = features + noise
                    augmented = (augmented - augmented.mean()) / (augmented.std() + 1e-5)
                    all_samples.append(augmented)
                    all_labels.append(class_idx)
                    class_counts[class_name] += 1
                
                # 2. قلب الإشارة لبعض الأعمدة
                for flip_ratio in [0.2, 0.3, 0.4]:
                    flipped = features.copy()
                    flip_cols = np.random.choice(features.shape[1], size=int(features.shape[1] * flip_ratio), replace=False)
                    flipped[:, flip_cols] = -flipped[:, flip_cols]
                    all_samples.append(flipped)
                    all_labels.append(class_idx)
                    class_counts[class_name] += 1
                
                if features.shape[0] > 10:
                    for shift in range(1, 6):
                        shifted = np.roll(features, shift, axis=0)
                        all_samples.append(shifted)
                        all_labels.append(class_idx)
                        class_counts[class_name] += 1
                
            except Exception as e:
                print(f"Error loading {csv_file}: {str(e)}")


In [13]:

max_frames = max([sample.shape[0] for sample in all_samples])
max_features = max([sample.shape[1] for sample in all_samples])

processed_samples = []
for sample in all_samples:
    if sample.shape[0] < max_frames:
        pad = np.zeros((max_frames - sample.shape[0], sample.shape[1]))
        sample = np.vstack((sample, pad))
    elif sample.shape[0] > max_frames:
        sample = sample[:max_frames, :]
    
    if sample.shape[1] < max_features:
        pad = np.zeros((sample.shape[0], max_features - sample.shape[1]))
        sample = np.hstack((sample, pad))
    elif sample.shape[1] > max_features:
        sample = sample[:, :max_features]
    
    processed_samples.append(sample)

X = np.array(processed_samples)
y = np.array(all_labels)

# إعادة تشكيل البيانات لـ CNN-LSTM
X = X.reshape(-1, 1, max_frames, max_features)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

X_train_tensor = torch.FloatTensor(X_train)
X_test_tensor = torch.FloatTensor(X_test)
y_train_tensor = torch.LongTensor(y_train)
y_test_tensor = torch.LongTensor(y_test)


In [14]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

class ImprovedCNNLSTM(nn.Module):
    def __init__(self, input_shape, hidden_size, num_classes, dropout_rate=0.5):
        super(ImprovedCNNLSTM, self).__init__()
        
        self.time_steps, self.features = input_shape
        
        # First CNN block - detect basic features
        self.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), padding=3)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(kernel_size=(2, 2))
        self.dropout1 = nn.Dropout2d(0.3)
        
        # Second CNN block - detect motion patterns
        self.conv2 = nn.Conv2d(64, 128, kernel_size=(5, 5), padding=2)
        self.bn2 = nn.BatchNorm2d(128)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(kernel_size=(2, 2))
        self.dropout2 = nn.Dropout2d(0.3)
        
        # Third CNN block - detect high-level patterns
        self.conv3 = nn.Conv2d(128, 256, kernel_size=(3, 3), padding=1)
        self.bn3 = nn.BatchNorm2d(256)
        self.relu3 = nn.ReLU()
        self.pool3 = nn.MaxPool2d(kernel_size=(2, 2))
        self.dropout3 = nn.Dropout2d(0.3)
        
        # Calculate CNN output size
        self.cnn_output_size = self._get_conv_output_size(input_shape)
        
        # Bidirectional GRU
        self.rnn = nn.GRU(
            input_size=self.cnn_output_size,
            hidden_size=hidden_size,
            num_layers=2,
            batch_first=True,
            bidirectional=True,
            dropout=0.3
        )
        
        # Enhanced attention mechanism
        self.attention = nn.Sequential(
            nn.Linear(hidden_size * 2, 256),
            nn.LayerNorm(256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, 128),
            nn.LayerNorm(128),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, 1),
            nn.Softmax(dim=1)
        )
        
        # Deep classifier
        hidden_dims = [hidden_size * 2, 512, 256, 128]
        
        self.classifier = nn.ModuleList()
        for i in range(len(hidden_dims)-1):
            self.classifier.extend([
                nn.Linear(hidden_dims[i], hidden_dims[i+1]),
                nn.BatchNorm1d(hidden_dims[i+1]),
                nn.ReLU(),
                nn.Dropout(dropout_rate if i == 0 else dropout_rate / 2)
            ])
        
        # Final classification layer
        self.final_fc = nn.Linear(hidden_dims[-1], num_classes)
        
        # Initialize weights
        self._initialize_weights()
    
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, (nn.BatchNorm2d, nn.BatchNorm1d, nn.LayerNorm)):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.xavier_normal_(m.weight)
                nn.init.constant_(m.bias, 0)
    
    def _get_conv_output_size(self, input_shape):
        with torch.no_grad():
            x = torch.zeros(1, 1, input_shape[0], input_shape[1])
            x = self.pool1(self.relu1(self.bn1(self.conv1(x))))
            x = self.pool2(self.relu2(self.bn2(self.conv2(x))))
            x = self.pool3(self.relu3(self.bn3(self.conv3(x))))
            return x.size(1) * x.size(2) * x.size(3)
    
    def forward(self, x):
        batch_size = x.size(0)
        
        # CNN feature extraction
        x = self.pool1(self.relu1(self.bn1(self.conv1(x))))
        x = self.dropout1(x)
        
        x = self.pool2(self.relu2(self.bn2(self.conv2(x))))
        x = self.dropout2(x)
        
        x = self.pool3(self.relu3(self.bn3(self.conv3(x))))
        x = self.dropout3(x)
        
        # Reshape for RNN
        x = x.view(batch_size, -1, self.cnn_output_size)
        
        # RNN with attention
        rnn_out, _ = self.rnn(x)
        
        # Apply attention
        attn_weights = self.attention(rnn_out)
        context = torch.sum(attn_weights * rnn_out, dim=1)
        
        # Deep classifier with skip connections
        x = context
        for i in range(0, len(self.classifier), 4):
            identity = x
            x = self.classifier[i](x)      # Linear
            x = self.classifier[i+1](x)    # BatchNorm
            x = self.classifier[i+2](x)    # ReLU
            x = self.classifier[i+3](x)    # Dropout
            
            # Add skip connection if dimensions match
            if identity.shape[1] == x.shape[1]:
                x = x + identity
        
        # Final classification
        x = self.final_fc(x)
        return x


Using device: cuda


In [15]:
model = ImprovedCNNLSTM(
    input_shape=(max_frames, max_features),
    hidden_size=256,  
    num_classes=len(CLASSES),
    dropout_rate=0.5
).to(device)


In [16]:

class_counts = np.bincount(y_train)
total_samples = len(y_train)

max_count = max(class_counts)
class_weights = torch.FloatTensor([max_count / (count + 1e-5) for count in class_counts])
class_weights = class_weights / class_weights.sum() * len(CLASSES)
class_weights = class_weights.to(device)

criterion = nn.CrossEntropyLoss(weight=class_weights)

optimizer = torch.optim.AdamW(
    model.parameters(),
    lr=0.0005,  
    weight_decay=0.01,
    betas=(0.9, 0.999),
    eps=1e-8
)

scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer,
    mode='max',
    factor=0.5,
    patience=5,
    verbose=True,
    min_lr=1e-6
)

batch_size = 64  
train_dataset = torch.utils.data.TensorDataset(X_train_tensor, y_train_tensor)
train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=batch_size,
    shuffle=True,
    pin_memory=True,
    num_workers=0
)

test_dataset = torch.utils.data.TensorDataset(X_test_tensor, y_test_tensor)
test_loader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size=batch_size,
    shuffle=False,
    pin_memory=True
)



In [17]:
print("Training improved model...")
history = {
    'train_acc': [],
    'val_acc': [],
    'train_loss': [],
    'val_loss': []
}

best_val_accuracy = 0
best_model_state = None
patience = 15  
counter = 0
num_epochs = 30  

for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    correct = 0
    total = 0
    
    for batch_X, batch_y in train_loader:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)
        
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=0.5)  # تقييد أقوى
        
        optimizer.step()
        
        _, predicted = torch.max(outputs.data, 1)
        total += batch_y.size(0)
        correct += (predicted == batch_y).sum().item()
        total_loss += loss.item()
    
    train_loss = total_loss / len(train_loader)
    train_accuracy = 100 * correct / total
    
    model.eval()
    val_loss = 0
    val_correct = 0
    val_total = 0
    
    with torch.no_grad():
        for batch_X, batch_y in test_loader:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)
            
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            val_loss += loss.item()
            
            _, predicted = torch.max(outputs.data, 1)
            val_total += batch_y.size(0)
            val_correct += (predicted == batch_y).sum().item()
    
    val_loss /= len(test_loader)
    val_accuracy = 100 * val_correct / val_total
    
    scheduler.step(val_accuracy)
    
    history['train_acc'].append(train_accuracy)
    history['val_acc'].append(val_accuracy)
    history['train_loss'].append(train_loss)
    history['val_loss'].append(val_loss)
    
    print(f'Epoch [{epoch+1}/{num_epochs}] Train Acc: {train_accuracy:.2f}% | Val Acc: {val_accuracy:.2f}%')
    
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        best_model_state = model.state_dict().copy()
        counter = 0
    else:
        counter += 1
        if counter >= patience:
            print(f"Early stopping at epoch {epoch+1}")
            break


Training improved model...
Epoch [1/30] Train Acc: 21.50% | Val Acc: 17.42%
Epoch [2/30] Train Acc: 32.58% | Val Acc: 48.39%
Epoch [3/30] Train Acc: 43.73% | Val Acc: 53.55%
Epoch [4/30] Train Acc: 48.91% | Val Acc: 58.39%
Epoch [5/30] Train Acc: 52.71% | Val Acc: 67.74%
Epoch [6/30] Train Acc: 54.49% | Val Acc: 68.06%
Epoch [7/30] Train Acc: 54.97% | Val Acc: 69.03%
Epoch [8/30] Train Acc: 64.35% | Val Acc: 76.13%
Epoch [9/30] Train Acc: 63.62% | Val Acc: 73.87%
Epoch [10/30] Train Acc: 64.83% | Val Acc: 78.39%
Epoch [11/30] Train Acc: 66.94% | Val Acc: 72.90%
Epoch [12/30] Train Acc: 69.44% | Val Acc: 78.39%
Epoch [13/30] Train Acc: 69.77% | Val Acc: 75.48%
Epoch [14/30] Train Acc: 70.49% | Val Acc: 81.61%
Epoch [15/30] Train Acc: 71.71% | Val Acc: 82.26%
Epoch [16/30] Train Acc: 70.98% | Val Acc: 84.84%
Epoch [17/30] Train Acc: 72.43% | Val Acc: 78.06%
Epoch [18/30] Train Acc: 72.11% | Val Acc: 86.45%
Epoch [19/30] Train Acc: 74.05% | Val Acc: 82.90%
Epoch [20/30] Train Acc: 74.62% 

In [18]:

if best_model_state is not None:
    model.load_state_dict(best_model_state)
    print(f"Best validation accuracy: {best_val_accuracy:.2f}%")

torch.save(model.state_dict(), 'ut_interaction_model.pth')



Best validation accuracy: 90.65%


In [19]:
def find_video_files():
    video_files = []
    video_dirs = [
        ".",
        "videos",
        "data",
        os.path.join("drive", "data"),
        os.path.join(data_dir)
    ]
    
    for video_dir in video_dirs:
        if os.path.exists(video_dir):
            for ext in ['.mp4', '.avi', '.mov', '.mkv']:
                video_files.extend(glob.glob(os.path.join(video_dir, f"*{ext}")))
    
    for category in FOLDER_TO_CLASS.keys():
        for video_dir in video_dirs:
            category_dir = os.path.join(video_dir, category)
            if os.path.exists(category_dir):
                for ext in ['.mp4', '.avi', '.mov', '.mkv']:
                    video_files.extend(glob.glob(os.path.join(category_dir, f"*{ext}")))
    
    return video_files



In [20]:
def process_video(video_path, model, device):
    print(f"Processing video: {os.path.basename(video_path)}")
    
    # Open video to get properties
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Could not open video: {video_path}")
        return
    
    # Get video properties first
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    original_frames = []
    
    # Read all frames first
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        original_frames.append(frame.copy())
    
    cap.release()
    
    if len(original_frames) == 0:
        print("No frames could be extracted from the video")
        return
    
    # Try to find corresponding CSV file
    video_basename = os.path.basename(video_path)
    video_name = video_basename.split('.')[0]
    video_dir = os.path.dirname(video_path)
    
    # List of possible CSV locations
    csv_locations = [
        os.path.join(video_dir, f"{video_name}.csv"),
        os.path.join("csv", f"{video_name}.csv"),
        os.path.join("drive/csv", f"{video_name}.csv"),
        os.path.join(data_dir, "csv", f"{video_name}.csv")
    ]
    
    # Try to find CSV file
    csv_file = None
    for loc in csv_locations:
        if os.path.exists(loc):
            csv_file = loc
            break
    
    features = None
    if csv_file:
        try:
            # Load CSV data
            df = pd.read_csv(csv_file)
            if 'Unnamed: 0' in df.columns:
                df = df.drop('Unnamed: 0', axis=1)
            
            features = df.values
            if features.size > 0:
                features = (features - features.mean()) / (features.std() + 1e-5)
                print(f"Successfully loaded CSV data from: {csv_file}")
        except Exception as e:
            print(f"Error loading CSV file: {str(e)}")
            features = None
    
    # If no CSV data, extract features from video frames
    if features is None:
        print("No CSV data found, extracting features from video frames...")
        frames = []
        
        for i, frame in enumerate(original_frames):
            # Enhanced feature extraction
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            resized = cv2.resize(gray, (64, 64))
            
            # Extract multiple features
            edges = cv2.Canny(resized, 100, 200)
            
            if i > 0:
                prev_frame = cv2.resize(cv2.cvtColor(original_frames[i-1], cv2.COLOR_BGR2GRAY), (64, 64))
                flow = cv2.calcOpticalFlowFarneback(prev_frame, resized, None, 0.5, 3, 15, 3, 5, 1.2, 0)
                magnitude, angle = cv2.cartToPolar(flow[..., 0], flow[..., 1])
                
                combined_features = np.concatenate([
                    edges.flatten(),
                    magnitude.flatten(),
                    angle.flatten()
                ])
            else:
                combined_features = np.concatenate([
                    edges.flatten(),
                    np.zeros(64*64),
                    np.zeros(64*64)
                ])
            
            frames.append(combined_features)
        
        features = np.array(frames)
        features = (features - features.mean()) / (features.std() + 1e-5)
    
    # Process features for model input
    if features.shape[0] < max_frames:
        pad = np.zeros((max_frames - features.shape[0], features.shape[1]))
        features = np.vstack((features, pad))
    else:
        features = features[:max_frames, :]
    
    if features.shape[1] < max_features:
        pad = np.zeros((features.shape[0], max_features - features.shape[1]))
        features = np.hstack((features, pad))
    else:
        features = features[:, :max_features]
    
    # Reshape for model
    features = features.reshape(1, 1, max_frames, max_features)
    features_tensor = torch.FloatTensor(features).to(device)
    
    # Get prediction
    model.eval()
    with torch.no_grad():
        outputs = model(features_tensor)
        probabilities = torch.nn.functional.softmax(outputs, dim=1)[0]
        _, predicted = torch.max(outputs, 1)
        pred_class_idx = predicted.item()
    
    pred_class = CLASSES[pred_class_idx]
    confidence = probabilities[pred_class_idx].item() * 100
    
    print(f"\nPrediction Results:")
    print(f"Class: {pred_class}")
    print(f"Description: {ACTION_DESCRIPTIONS[pred_class]}")
    print(f"Confidence: {confidence:.2f}%")
    print("\nTop 3 Predictions:")
    
    # Get top 3 predictions
    top3_values, top3_indices = torch.topk(probabilities, 3)
    for i, idx in enumerate(top3_indices):
        cls = CLASSES[idx]
        prob = top3_values[i].item() * 100
        print(f"{cls}: {prob:.2f}% - {ACTION_DESCRIPTIONS[cls]}")
    
    # Create output video with predictions
    output_filename = f"output_{video_basename.split('.')[0]}.mp4"
    out = cv2.VideoWriter(output_filename, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))
    
    # Add predictions to video frames
    for frame in original_frames:
        # Add black background for text
        cv2.rectangle(frame, (0, 0), (height//4, 80), (0, 0, 0), -1)
        
        # Add prediction and confidence
        cv2.putText(frame, f"Prediction: {pred_class} ({confidence:.1f}%)", 
                   (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
        
        # Add description
        description = ACTION_DESCRIPTIONS[pred_class]
        cv2.putText(frame, description, 
                   (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
        
        out.write(frame)
    
    out.release()
    print(f"\nOutput saved as: {output_filename}")
    return pred_class, confidence, dict(zip(CLASSES, probabilities.cpu().numpy()))


In [21]:

# Test the model on video
video_files = find_video_files()
if video_files:
    print(f"Found {len(video_files)} video files")
    for i, video in enumerate(video_files[:5]):
        print(f"{i+1}. {os.path.basename(video)}")
    
    # Process first video
    video_path = video_files[0]
    process_video(video_path, model, device)
else:
    print("No video files found. Please place video files in one of these directories:")
    print("- Current directory (.)")
    print("- videos/")
    print("- data/")
    print("- drive/data/")
    print("- Or in category subdirectories (e.g., videos/hugging/, data/kicking/, etc.)")

# Add after the training loop ends, add:
print("\nGenerating final evaluation plots...")
plot_training_history(history)

# Evaluate model and generate confusion matrix
model.eval()
all_predictions = []
all_labels = []

with torch.no_grad():
    for batch_X, batch_y in test_loader:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)
        outputs = model(batch_X)
        _, predicted = torch.max(outputs.data, 1)
        all_predictions.extend(predicted.cpu().numpy())
        all_labels.extend(batch_y.cpu().numpy())

# Plot confusion matrix
plot_confusion_matrix(all_labels, all_predictions)

# Print classification report
print("\nClassification Report:")
print(classification_report(all_labels, all_predictions, target_names=CLASSES))

# Add after data splitting
print("\nPlotting class distribution...")
plot_class_distribution(y_train, y_test) 

Found 240 video files
1. 10_2_1.avi
2. 14_3_1.avi
3. 19_14_1.avi
4. 19_4_1.avi
5. 24_15_1.avi
Processing video: 10_2_1.avi
No CSV data found, extracting features from video frames...

Prediction Results:
Class: PUSHING
Description: Person using force to push another person
Confidence: 57.34%

Top 3 Predictions:
PUSHING: 57.34% - Person using force to push another person
HUGGING: 33.82% - Two people embracing each other in a friendly manner
HANDSHAKING: 8.25% - Two people engaging in a formal handshake greeting

Output saved as: output_10_2_1.mp4

Generating final evaluation plots...

Classification Report:
              precision    recall  f1-score   support

     HUGGING       1.00      1.00      1.00        50
     KICKING       0.75      0.87      0.80        52
    PUNCHING       0.82      0.71      0.76        52
     PUSHING       0.91      0.98      0.94        52
 HANDSHAKING       0.98      0.89      0.93        55
    POINTING       1.00      1.00      1.00        49

    ac

In [22]:
video_files = find_video_files()
if video_files:
    print(f"Found {len(video_files)} video files")
    for i, video in enumerate(video_files[:5]):
        print(f"{i+1}. {os.path.basename(video)}")
    
    # Process first video
    video_path = video_files[0]
    process_video(video_path, model, device)
else:
    print("No video files found. Please place video files in one of these directories:")
    print("- Current directory (.)")
    print("- videos/")
    print("- data/")
    print("- drive/data/")
    print("- Or in category subdirectories (e.g., videos/hugging/, data/kicking/, etc.)")

# Add after the training loop ends, add:
print("\nGenerating final evaluation plots...")
plot_training_history(history)

# Evaluate model and generate confusion matrix
model.eval()
all_predictions = []
all_labels = []

with torch.no_grad():
    for batch_X, batch_y in test_loader:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)
        outputs = model(batch_X)
        _, predicted = torch.max(outputs.data, 1)
        all_predictions.extend(predicted.cpu().numpy())
        all_labels.extend(batch_y.cpu().numpy())

# Plot confusion matrix
plot_confusion_matrix(all_labels, all_predictions)

# Print classification report
print("\nClassification Report:")
print(classification_report(all_labels, all_predictions, target_names=CLASSES))

# Add after data splitting
print("\nPlotting class distribution...")
plot_class_distribution(y_train, y_test) 

Found 241 video files
1. output_10_2_1.mp4
2. 10_2_1.avi
3. 14_3_1.avi
4. 19_14_1.avi
5. 19_4_1.avi
Processing video: output_10_2_1.mp4
No CSV data found, extracting features from video frames...

Prediction Results:
Class: PUSHING
Description: Person using force to push another person
Confidence: 98.81%

Top 3 Predictions:
PUSHING: 98.81% - Person using force to push another person
PUNCHING: 0.65% - Person throwing a punch or striking motion
HANDSHAKING: 0.38% - Two people engaging in a formal handshake greeting

Output saved as: output_output_10_2_1.mp4

Generating final evaluation plots...

Classification Report:
              precision    recall  f1-score   support

     HUGGING       1.00      1.00      1.00        50
     KICKING       0.75      0.87      0.80        52
    PUNCHING       0.82      0.71      0.76        52
     PUSHING       0.91      0.98      0.94        52
 HANDSHAKING       0.98      0.89      0.93        55
    POINTING       1.00      1.00      1.00        