Faster RCNN Model

In [5]:
import torchvision
import torch
from torch.utils.data import Dataset, DataLoader
import os
import cv2
import pandas as pd
from torch.utils.data import Dataset
from torchvision.io import read_image
import torchvision.transforms as transforms
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torch.optim import SGD
from torchvision.transforms import Compose, ToTensor, Normalize
from sklearn.model_selection import train_test_split



Data Preprocessing

bb_left and bb_top are the top left corner, and bb_width and bb_height are the dimensions of the bounding box
x_max would be bb_left + bb_width
y_max would be bb_top + bb_height
x_min = bb_left
y_min = bb_top
Aim to track the player id in each frame

In [None]:

def load_annotations(csv_path):
    return pd.read_csv(csv_path)

def extract_frames(video_path, output_dir, frame_numbers):
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    cap = cv2.VideoCapture(video_path)
    success, frame_idx = True, 0
    while success:
        success, frame = cap.read()
        if frame_idx in frame_numbers:
            frame_path = os.path.join(output_dir, f"frame_{frame_idx:04d}.jpg")
            cv2.imwrite(frame_path, frame)
        frame_idx += 1
    cap.release()

def get_frame_path(frame_number, output_dir):
    return os.path.join(output_dir, f"frame_{frame_number:04d}.jpg")

def preprocess_annotations(annotations, output_dir):
    annotations['x_max'] = annotations['bb_left'] + annotations['bb_width']
    annotations['y_max'] = annotations['bb_top'] + annotations['bb_height']
    annotations['frame_path'] = annotations['frame'].apply(lambda x: get_frame_path(x, output_dir))
    return annotations

def split_dataset(annotations, val_size=0.2, test_size=0.1):
    initial_train_and_val_size = 1 - test_size
    train_val_annots, test_annots = train_test_split(annotations, test_size=test_size)
    adjusted_val_size = val_size / initial_train_and_val_size
    train_annots, val_annots = train_test_split(train_val_annots, test_size=adjusted_val_size)
    return train_annots, val_annots, test_annots

def main(annotations_dir, videos_dir, frames_output_dir):
    all_annotations = []
    
    for i in range(60):  # Assuming 60 pairs of videos and CSV files (top view dataset)
        csv_path = os.path.join(annotations_dir, f"D_20220220_1_{i:04d}_0030.csv")
        video_path = os.path.join(videos_dir, f"D_20220220_1_{i:04d}_0030.mp4")
        
        annotations = load_annotations(csv_path)
        frame_numbers = sorted(annotations['frame'].unique())
        
        extract_frames(video_path, frames_output_dir, frame_numbers)
        
        preprocessed_annotations = preprocess_annotations(annotations, frames_output_dir)
        all_annotations.append(preprocessed_annotations)
    
    all_annotations_df = pd.concat(all_annotations, ignore_index=True)
    
    train_annots, val_annots, test_annots = split_dataset(all_annotations_df, val_size=0.2, test_size=0.1)

    #  use train_annots, val_annots, and test_annots for training, validating, and testing the model
    print("Training annotations:", len(train_annots))
    print("Validation annotations:", len(val_annots))
    print("Testing annotations:", len(test_annots))

# Example usage
annotations_dir = '/path/to/annotations/'
videos_dir = '/path/to/videos/'
frames_output_dir = '/path/to/extracted_frames/'
main(annotations_dir, videos_dir, frames_output_dir)


In [None]:
class SoccerTrackDataset(Dataset):
    def __init__(self, annotations, img_dir, transforms=None):
        self.annotations = annotations
        self.img_dir = img_dir
        self.transforms = transforms

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        row = self.annotations.iloc[idx]
        img_path = os.path.join(self.img_dir, row['frame_path'])
        image = read_image(img_path).float() / 255.0  # Normalize to [0, 1]
        
        # Increment player ID by 1 to reserve 0 for the background
        box = torch.tensor([row['bb_left'], row['bb_top'], row['x_max'], row['y_max']], dtype=torch.float32).unsqueeze(0)
        label = row['PlayerID'] + 1  # Increment player ID by 1
        labels = torch.tensor([label], dtype=torch.int64)
        
        target = {}
        target['boxes'] = box
        target['labels'] = labels
        target['image_id'] = torch.tensor([idx])

        if self.transforms:
            image = self.transforms(image)
        
        return image, target


In [None]:

def get_faster_rcnn_model(num_classes):
    """
    Load a pre-trained Faster R-CNN model and replace the classifier head with one
    that has `num_classes`, accounting for the background and player IDs.

    Parameters:
    - num_classes (int): The total number of classes including the background.

    Returns:
    - model (FasterRCNN): A Faster R-CNN model adjusted for the specified number of classes.
    """
    # Load an instance segmentation model pre-trained on COCO
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    
    # Get the number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    
    # Replace the pre-trained head with a new one for the specified number of classes
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    
    return model


In [None]:
def train_model(model, data_loader, optimizer, device, num_epochs=10000):
    model.to(device)
    
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        
        for images, targets in data_loader:
            images = [img.to(device) for img in images]
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            loss_dict = model(images, targets)
            losses = sum(loss for loss in loss_dict.values())

            optimizer.zero_grad()
            losses.backward()
            optimizer.step()

            total_loss += losses.item()
        
        print(f"Epoch #{epoch+1} Loss: {total_loss / len(data_loader)}")


In [None]:
def main():
    annotations_path = '/path/to/your/annotations.csv'
    img_dir = '/path/to/your/images'
    annotations_df = pd.read_csv(annotations_path)
    max_player_id = annotations_df['PlayerID'].max()

    # Increment max_player_id by 1 since we shifted player IDs by +1 to reserve 0 for background
    model = get_faster_rcnn_model(max_player_id + 2)

    dataset = SoccerTrackDataset(annotations_df, img_dir, transforms=T.ToTensor())
    data_loader = DataLoader(dataset, batch_size=4, shuffle=True, collate_fn=lambda x: list(zip(*x)))

    optimizer = torch.optim.SGD(model.parameters(), lr=0.005, momentum=0.9, weight_decay=0.005)

    device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
    train_model(model, data_loader, optimizer, device, num_epochs=10)

if __name__ == "__main__":
    main()


When interpreting the model's predictions, remember that the player ID predictions are shifted by +1. To match predictions with the original player IDs, we need to decrement the predicted class IDs by 1: