In [2]:
import cv2
import os
import shutil

def extract_frames(video_path, output_dir, fps=5):
    """
    Extract frames from a video and save them as images.
    Args:
    - video_path (str): Path to the video file.
    - output_dir (str): Directory to save the frames.
    - fps (int): Number of frames per second to extract.
    """
    if os.path.exists(output_dir):
        shutil.rmtree(output_dir)
    # os.makedirs(output_dir) 
    
    # cap = cv2.VideoCapture(video_path)
    # video_fps = cap.get(cv2.CAP_PROP_FPS)
    # interval = int(video_fps / fps)
    
    # frame_count = 0
    # while cap.isOpened():
    #     ret, frame = cap.read()
    #     if not ret:
    #         break
    #     if frame_count % interval == 0:
    #         frame_filename = os.path.join(output_dir, f"frame_{frame_count}.jpg")
    #         cv2.imwrite(frame_filename, frame)
    #     frame_count += 1
    #     # if frame_count == 10:
    #     #     break
        
    # cap.release()

# Example usage
# extract_frames('path_to_video.mp4', 'output_frames/', fps=5)


In [3]:
def find_videos_and_run_extract_frames(base_path, label_map):
    for class_label, class_name in label_map.items():
        class_folder = f'{base_path}/{class_name}'
        print(f"Checking folder: {class_folder}\n")  # Print the folder being checked
        
        if not os.path.exists(class_folder):
            print(f"Folder does not exist: {class_folder}\n")  # Print if the folder doesn't exist
            continue
            
        # Loop through each subfolder
        subfolders = os.listdir(class_folder)
        
        if '.DS_Store' in subfolders:
            subfolders.remove('.DS_Store')

        for subfolder in subfolders:
            # print(f'{class_folder}/{subfolder}')
            for image_file in os.listdir(f'{class_folder}/{subfolder}'):
                if '.avi' in f'{class_folder}/{subfolder}/{image_file}':
                    print(f'{class_folder}/{subfolder}/{image_file}')
                    extract_frames(f'{class_folder}/{subfolder}/{image_file}', f'{class_folder}/{subfolder}/output_frames/', fps=9)

# Define your label map based on your class names
label_map = {
    0: "Diving-Side", 1: "Golf-Swing-Back", 2: "Golf-Swing-Front", 
    3: "Golf-Swing-Side", 4: "Kicking-Front", 5: "Kicking-Side",
    6: "Lifting", 7: "Riding-Horse", 8: "Run-Side", 
    9: "SkateBoarding-Front", 10: "Swing-Bench", 11: "Walk-Front"
}

# Load image data
base_path = "./ucf_sports_actions/ucf action"
find_videos_and_run_extract_frames(base_path, label_map)

Checking folder: ./ucf_sports_actions/ucf action/Diving-Side

./ucf_sports_actions/ucf action/Diving-Side/007/4475-6_70099.avi
./ucf_sports_actions/ucf action/Diving-Side/001/2538-5_70133.avi
./ucf_sports_actions/ucf action/Diving-Side/006/4475-2_70045.avi
./ucf_sports_actions/ucf action/Diving-Side/003/2538-12_70246.avi
./ucf_sports_actions/ucf action/Diving-Side/004/2538-16_70032.avi
./ucf_sports_actions/ucf action/Diving-Side/005/4475-1_70541.avi
./ucf_sports_actions/ucf action/Diving-Side/002/2538-11_70015.avi
Checking folder: ./ucf_sports_actions/ucf action/Golf-Swing-Back

./ucf_sports_actions/ucf action/Golf-Swing-Back/001/3283-8_700741.avi
./ucf_sports_actions/ucf action/Golf-Swing-Back/003/7608-12_70275.avi
./ucf_sports_actions/ucf action/Golf-Swing-Back/004/7616-7_70270.avi
./ucf_sports_actions/ucf action/Golf-Swing-Back/005/RF1-13903_70070.avi
./ucf_sports_actions/ucf action/Golf-Swing-Back/002/3283-8_701201.avi
Checking folder: ./ucf_sports_actions/ucf action/Golf-Swing-Fro

In [None]:
import torch
import torch.nn as nn
import torchvision.models as models

class ActionRecognitionModel(nn.Module):
    def __init__(self, num_classes):
        super(ActionRecognitionModel, self).__init__()
        
        # Pretrained 2D CNN (ResNet) for feature extraction
        resnet = models.resnet50(pretrained=True)
        self.feature_extractor = nn.Sequential(*list(resnet.children())[:-2])
        
        # 3D Convolution Layer
        self.conv3d = nn.Conv3d(1, 64, kernel_size=(3, 3, 3), stride=1, padding=1)
        
        # LSTM for Temporal Dynamics
        self.lstm = nn.LSTM(2048, 512, batch_first=True)
        
        # Fully Connected Layer for classification
        self.fc = nn.Linear(512, num_classes)

    def forward(self, x):
        # batch_size, time_steps, c, h, w = x.size()
        # cnn_out = []
        
        # # Apply CNN to each frame
        # for t in range(time_steps):
        #     frame_features = self.feature_extractor(x[:, t, :, :, :])
        #     cnn_out.append(frame_features)
        
        # cnn_out = torch.stack(cnn_out, dim=1)  # Shape: (batch_size, time_steps, 2048)
        # cnn_out = cnn_out.view(batch_size, time_steps, -1)  # Flatten
        
        # # LSTM for sequence processing
        # lstm_out, _ = self.lstm(cnn_out)
        
        # # Classification layer
        # out = self.fc(lstm_out[:, -1, :])  # Take the output from the last time step
        # return out
        batch_size, c, h, w = x.size()  # Expecting 4 dimensions from DataLoader
        x = x.unsqueeze(1)  # Add time step dimension, making it (batch_size, time_steps, c, h, w)
        batch_size, time_steps, c, h, w = x.size()
        
        cnn_out = []
        
        # Apply CNN to each frame
        for t in range(time_steps):
            frame_features = self.feature_extractor(x[:, t, :, :, :])  # Output shape: (batch_size, 2048, H', W')
            
            # Apply adaptive average pooling to reduce (2048, H', W') to (2048, 1, 1)
            frame_features = torch.nn.functional.adaptive_avg_pool2d(frame_features, (1, 1))
            
            # Flatten to get (batch_size, 2048)
            frame_features = frame_features.view(batch_size, 2048)
            cnn_out.append(frame_features)
        
        cnn_out = torch.stack(cnn_out, dim=1)  # Shape: (batch_size, time_steps, 2048)
        
        # LSTM for sequence processing
        lstm_out, _ = self.lstm(cnn_out)
        
        # Classification layer
        out = self.fc(lstm_out[:, -1, :])  # Take the output from the last time step
        return out

# Instantiate the model
model = ActionRecognitionModel(num_classes=13)


In [None]:
from torch.optim import Adam
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
import torchvision.transforms as transforms
import torch.nn.functional as F
from PIL import Image
# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'device ===========> {device}')
# Move the model to the appropriate device

# Custom function to pad images to a target size
def pad_tensor(image, target_size):
    # Convert image to tensor first if not already done
    tensor = transforms.ToTensor()(image)
    
    # Padding: pad (width, height) to match the target size
    padded_tensor = F.pad(tensor, 
                          (0, target_size[2] - tensor.size(2),  # pad width
                           0, target_size[1] - tensor.size(1)))  # pad height
    return padded_tensor

# Custom transform to resize or pad images to the target size
class ResizeOrPadTransform:
    def __init__(self, target_size):
        self.target_size = target_size
    
    def __call__(self, image):
        # Pad image to target size
        return pad_tensor(image, self.target_size)

# Set your desired target size (C, H, W) - example target size (3, 404, 720)
target_size = (3, 404, 720)

# Use the custom transformation pipeline
transform = transforms.Compose([
    ResizeOrPadTransform(target_size)  # Apply padding to match target size
])


# Prepare data
train_dataset = ImageFolder('./ucf_sports_actions/ucf action/', transform=transform)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)

# Get the class names and the number of classes
class_names = train_dataset.classes  # List of class names
num_classes = len(class_names)  # Number of classes

print(f"Number of classes: {num_classes}")
print(f"Class names: {class_names}")

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 10
model.train()

for epoch in range(num_epochs):
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        
        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        
        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
    
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}")


In [None]:
from sklearn.metrics import f1_score

def evaluate_model(model, test_loader):
    model.eval()
    all_labels = []
    all_preds = []
    
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(preds.cpu().numpy())
    
    return f1_score(all_labels, all_preds, average='weighted')

# Example evaluation
test_loader = DataLoader(test_dataset, batch_size=16)
f1 = evaluate_model(model, test_loader)
print(f"F1 Score: {f1:.4f}")


In [None]:
import torch.nn.utils.prune as prune

def apply_pruning(model):
    """
    Apply pruning to the model. 
    This will prune the weights of the layers to reduce memory and computation.
    """
    for name, module in model.named_modules():
        # Prune the linear and convolutional layers
        if isinstance(module, (nn.Conv2d, nn.Linear)):
            prune.l1_unstructured(module, name='weight', amount=0.3)  # Prune 30% of weights
            prune.remove(module, 'weight')  # Remove the mask and make pruning permanent

# Example usage:
apply_pruning(model)


In [None]:
# Static quantization
def apply_quantization(model):
    """
    Apply static quantization to the model to reduce its size and speed up inference.
    """
    model.eval()
    # Fuse Conv + BatchNorm + ReLU layers for better quantization performance
    fused_model = torch.quantization.fuse_modules(model, [['conv3d', 'bn', 'relu']])

    # Apply quantization-aware training
    model.qconfig = torch.quantization.get_default_qconfig('fbgemm')
    torch.quantization.prepare(fused_model, inplace=True)

    # Calibrate with a few batches (optional, improves performance)
    with torch.no_grad():
        for inputs, _ in train_loader:
            fused_model(inputs)
            break  # One batch should be enough for calibration

    # Convert to quantized model
    quantized_model = torch.quantization.convert(fused_model, inplace=True)
    return quantized_model

# Example usage:
quantized_model = apply_quantization(model)


In [None]:
torch.save(quantized_model.state_dict(), 'optimized_model.pth')


In [None]:
pip install Flask


In [None]:
from flask import Flask, request, render_template
import torch
import cv2
from torchvision import transforms
from PIL import Image
import numpy as np

app = Flask(__name__)

# Load the optimized model
model = ActionRecognitionModel(num_classes=10)
model.load_state_dict(torch.load('optimized_model.pth'))
model.eval()

# Transform for frame preprocessing
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])

def process_video(video_path):
    cap = cv2.VideoCapture(video_path)
    frames = []
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        
        # Preprocess frame
        frame = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        frame = transform(frame).unsqueeze(0)
        frames.append(frame)
    
    cap.release()
    return torch.cat(frames)

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/predict', methods=['POST'])
def predict():
    if 'file' not in request.files:
        return "No file uploaded", 400
    
    file = request.files['file']
    video_path = './uploads/' + file.filename
    file.save(video_path)
    
    # Process video and predict actions
    video_frames = process_video(video_path)
    with torch.no_grad():
        output = model(video_frames)
        predicted_class = torch.argmax(output, dim=1).item()
    
    return f"Predicted Action Class: {predicted_class}"

if __name__ == '__main__':
    app.run(debug=True)


In [None]:
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Sports Action Recognition</title>
</head>
<body>
    <h1>Upload a Sports Video</h1>
    <form action="/predict" method="post" enctype="multipart/form-data">
        <input type="file" name="file">
        <button type="submit">Predict Action</button>
    </form>
</body>
</html>


In [None]:
python app.py
