# Mohammad Ali Mojtahed Soleimani

In [None]:
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader, random_split
from PIL import Image
import os
import cv2
import zipfile

# Requirements for Part A.

*   Load Hockey dataset.
*   Import ResNet50.
*   Remove the last FC layer of ResNet to use network as a feature extractor.
*   Extract frames from videos.
*   ImageNet normalization.


In [None]:
class HockeyDataset(Dataset):
    def __init__(self, root_dir, transform=None, num_frames=16):
        """
        Args:
            root_dir (string): Directory with all the videos categorized into subfolders.
            transform (callable, optional): Optional transform to be applied on each frame.
            num_frames (int): Number of frames to extract from each video.
        """
        self.root_dir = root_dir
        self.transform = transform
        self.num_frames = num_frames
        self.classes = sorted(os.listdir(root_dir))
        self.class_to_idx = {cls_name: i for i, cls_name in enumerate(self.classes)}
        self.samples = self._make_dataset(self.root_dir, self.class_to_idx)

    def _make_dataset(self, root_dir, class_to_idx):
        videos = []
        for target_class in sorted(class_to_idx.keys()):
            class_index = class_to_idx[target_class]
            target_dir = os.path.join(root_dir, target_class)
            if not os.path.isdir(target_dir):
                continue
            for root, _, fnames in sorted(os.walk(target_dir, followlinks=True)):
                for fname in sorted(fnames):
                    if fname.endswith(('.avi', '.mp4', '.mov')):  
                        path = os.path.join(root, fname)
                        item = (path, class_index)
                        videos.append(item)
        return videos

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        video_path, label = self.samples[idx]
        frames = self.extract_frames(video_path)

        
        if self.transform:
            frames_tensor = torch.stack([self.transform(frame) for frame in frames])
        else:
            frames_tensor = torch.stack([transforms.ToTensor()(frame) for frame in frames])

        return frames_tensor, label

    def extract_frames(self, video_path):
        """
        Extracts frames from a video.
        """
        cap = cv2.VideoCapture(video_path)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        if total_frames < self.num_frames:
            frame_indices = range(total_frames)
        else:
            frame_indices = torch.linspace(0, total_frames - 1, self.num_frames).long()

        frames = []
        for i in range(total_frames):
            ret, frame = cap.read()
            if not ret:
                break
            if i in frame_indices:
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                frame = Image.fromarray(frame)
                frames.append(frame)
        cap.release()
        return frames

In [None]:
def unzip_dataset(zip_file_path, extract_path):
   
    with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
        zip_ref.extractall(extract_path)
    print(f"Dataset extracted to: {extract_path}")

In [None]:
data_transform = transforms.Compose([
    transforms.Resize((224, 224)),  # ResNet50 input size
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), # ImageNet normalization
])

In [None]:

zip_file_path = "/content/Hockey.zip"
extract_path = "extracted_data"
unzip_dataset(zip_file_path, extract_path)
root_directory = extract_path
hockey_dataset = HockeyDataset(root_dir=root_directory, transform=data_transform, num_frames=16)


Dataset extracted to: extracted_data


In [None]:
num_samples = len(hockey_dataset)
train_size = num_samples // 2  
test_size = num_samples - train_size  # The rest for testing
train_dataset, test_dataset = random_split(hockey_dataset, [train_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)

In [None]:
resnet50 = models.resnet50(pretrained=True)

feature_extractor = nn.Sequential(*list(resnet50.children())[:-1])

feature_extractor.eval()


Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 181MB/s]


Sequential(
  (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (2): ReLU(inplace=True)
  (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (4): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)


In [None]:
def extract_features(data_loader, feature_extractor):
    features_list = []
    labels_list = []
    with torch.no_grad():
        for video_frames, labels in data_loader:
            batch_size, num_frames, C, H, W = video_frames.shape
            video_frames = video_frames.view(batch_size * num_frames, C, H, W)
            features = feature_extractor(video_frames)
            features = features.view(batch_size, num_frames, -1)
            video_features = torch.mean(features, dim=1) # Average pooling
            features_list.append(video_features)
            labels_list.append(labels)
    all_features = torch.cat(features_list, dim=0)
    all_labels = torch.cat(labels_list, dim=0)
    return all_features, all_labels





In [None]:
train_features, train_labels = extract_features(train_loader, feature_extractor)
test_features, test_labels = extract_features(test_loader, feature_extractor)

print("Train features shape:", train_features.shape)
print("Train labels shape:", train_labels.shape)
print("Test features shape:", test_features.shape)
print("Test labels shape:", test_labels.shape)

Train features shape: torch.Size([500, 2048])
Train labels shape: torch.Size([500])
Test features shape: torch.Size([500, 2048])
Test labels shape: torch.Size([500])


In [None]:
data_to_save = {
    'train_features': train_features,
    'train_labels': train_labels,
    'test_features': test_features,
    'test_labels': test_labels
}

torch.save(data_to_save, 'extracted_data.pth')


In [None]:
class Classifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(Classifier, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        out = self.fc1(x)
        out = self.relu(out)
        out = self.fc2(out)
        return out

In [None]:
# --- Hyperparameters ---

input_size = 2048  
hidden_size = 512 
num_classes = len(hockey_dataset.classes) 
learning_rate = 0.001
num_epochs = 10  

In [None]:
classifier = Classifier(input_size, hidden_size, num_classes)
criterion = nn.CrossEntropyLoss()  
optimizer = torch.optim.Adam(classifier.parameters(), lr=learning_rate)

In [None]:
for epoch in range(num_epochs):
    for i in range(train_features.shape[0]):
        # Forward pass
        outputs = classifier(train_features[i].unsqueeze(0))
        loss = criterion(outputs, train_labels[i].unsqueeze(0))

        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if (i + 1) % 10 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {loss.item():.4f}')

Epoch [1/10], Step [10/63], Loss: 1.4286
Epoch [1/10], Step [20/63], Loss: 0.2939
Epoch [1/10], Step [30/63], Loss: 0.6655
Epoch [1/10], Step [40/63], Loss: 0.1335
Epoch [1/10], Step [50/63], Loss: 1.2249
Epoch [1/10], Step [60/63], Loss: 2.0242
Epoch [1/10], Step [70/63], Loss: 0.1738
Epoch [1/10], Step [80/63], Loss: 0.0344
Epoch [1/10], Step [90/63], Loss: 0.0097
Epoch [1/10], Step [100/63], Loss: 0.2364
Epoch [1/10], Step [110/63], Loss: 0.0727
Epoch [1/10], Step [120/63], Loss: 0.0069
Epoch [1/10], Step [130/63], Loss: 0.0782
Epoch [1/10], Step [140/63], Loss: 0.7987
Epoch [1/10], Step [150/63], Loss: 0.1245
Epoch [1/10], Step [160/63], Loss: 0.1808
Epoch [1/10], Step [170/63], Loss: 0.0624
Epoch [1/10], Step [180/63], Loss: 0.7095
Epoch [1/10], Step [190/63], Loss: 0.2364
Epoch [1/10], Step [200/63], Loss: 0.0817
Epoch [1/10], Step [210/63], Loss: 0.1796
Epoch [1/10], Step [220/63], Loss: 0.8439
Epoch [1/10], Step [230/63], Loss: 0.0121
Epoch [1/10], Step [240/63], Loss: 0.0020
E

In [None]:
classifier.eval()  
with torch.no_grad():
    correct = 0
    total = 0
    for i in range(test_features.shape[0]):
        outputs = classifier(test_features[i].unsqueeze(0))
        _, predicted = torch.max(outputs.data, 1)
        total += test_labels[i].unsqueeze(0).size(0)
        correct += (predicted == test_labels[i].unsqueeze(0)).sum().item()

    accuracy = 100 * correct / total
    print(f'Test Accuracy: {accuracy:.2f}%')

Test Accuracy: 95.60%
