*Task*: For each soccer player, return cropped images of 10 players and the jersey numbers of those 10 players

In [4]:
import os
import json
import cv2
import torch
import torch.nn as nn
import torchvision.models as models
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import Compose, ToTensor, Resize
import shutil
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from torch.optim.lr_scheduler import MultiStepLR
from torchvision.transforms import Compose, ToTensor, Resize
from torch.optim import SGD, Adagrad, Adam
from torch.utils.tensorboard import SummaryWriter
import pickle
import numpy as np
# from google.colab.patches import cv2_imshow
from torchsummary import summary
import numpy as np
from sklearn.metrics import accuracy_score, confusion_matrix
import matplotlib.pyplot as plt
import argparse
from tqdm.autonotebook import tqdm

  from .autonotebook import tqdm as notebook_tqdm





In [5]:
class FootballDataset(Dataset):
    def __init__(self, root, transform = None):
        self.images = []
        self.labels = []
        self.file_names = []
        self.num_frames = []

        matches = os.listdir(root)
        for match in matches:
            folder_path = os.path.join(root, match)
            json_path, video_path = sorted(os.listdir(folder_path))
            self.file_names.append(os.path.join(folder_path, json_path.replace(".json", "")))
            with open(os.path.join(folder_path, json_path), "r") as json_file:
                json_data = json.load(json_file)

            # count number of frame
            self.num_frames.append(len(json_data["images"]))

        self.transform = transform

    def __len__(self):
        # Returns the total number of frames
        return sum(self.num_frames)

    def __getitem__(self, index):
        # index belongs to video
        if index < self.num_frames[0]:
            frame_id = index
            video_id = 0
        elif self.num_frames[0] <= index < self.num_frames[0] + self.num_frames[1]:
            frame_id = index - self.num_frames[0]
            video_id = 1
        else:
            frame_id = index - self.num_frames[0] - self.num_frames[1]
            video_id = 2

        video_path = "{}.mp4".format(self.file_names[video_id])
        json_path = "{}.json".format(self.file_names[video_id])

        # Read video
        cap = cv2.VideoCapture(video_path)
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_id)
        flag, image = cap.read()
        # cv2.imwrite("sample.jpg", image)

        with open(json_path, "r") as json_file:
            json_data = json.load(json_file)
        # print(json_data["annotations"])
        bboxes = [anno["bbox"] for anno in json_data["annotations"] \
                              if anno["image_id"] - 1 == frame_id \
                                and anno["category_id"] == 4] # category_id is human
        jerseys = [int(anno["attributes"]["jersey_number"]) for anno in json_data["annotations"] \
                              if anno["image_id"] -1 == frame_id \
                                and anno["category_id"] == 4]
        colors = [anno["attributes"]["team_jersey_color"] for anno in json_data["annotations"] \
                              if anno["image_id"] -1 == frame_id \
                                and anno["category_id"] == 4]
        colors = [0 if color == "black" else 1 for color in colors]
        cropped_images = [image[int(y_min): int(y_min + height), int(x_min): int(x_min + width), :] \
                       for (x_min, y_min, width, height) in bboxes]

        # for i, cropped_image in enumerate(cropped_images):
        #     cv2.imwrite("{}.jpg".format(i), cropped_image)

        if self.transform:
            cropped_images = [self.transform(image) for image in cropped_images]
        # visualize
        # for ann in current_annotation:
        #     x_min, y_min, width, height = ann
        #     x_min = int(x_min)
        #     y_min = int(y_min)
        #     x_max = int(x_min + width)
        #     y_max = int(y_min + height)
        #     cv2.rectangle(image, (x_min, y_min), (x_max, y_max), (0, 0, 255), 2)
        # cv2.imwrite("sample.jpg", image)
        return cropped_images, jerseys, colors

In [6]:
def get_args():
    parser = argparse.ArgumentParser(description='Football')
    parser.add_argument('-p', '--data_path_train', type=str, default="./Data/football_train")
    parser.add_argument('-a', '--data_path_test', type=str, default="./Data/football_test")
    parser.add_argument('-b', '--batch_size', type=int, default=4)
    parser.add_argument('-e', '--epochs', type=int, default=10)
    parser.add_argument('-l', '--lr', type=float, default=1e-3)  # SGD: lr = 1e-2. Adam: lr = 1e-3
    parser.add_argument('-s', '--image_size', type=int, default=224)
    parser.add_argument('-c', '--checkpoint_path', type=str, default=None)
    parser.add_argument('-t', '--tensorboard_path', type=str, default="tensorboard")
    parser.add_argument('-r', '--trained_path', type=str, default="trained_models")
    args, unknown = parser.parse_known_args()
    return args

In [7]:
def collate_fn(batch):
    images, labels, colors = zip(*batch)

    final_images = []
    for image in images:
        final_images.extend(image)
    final_images = torch.stack(final_images)

    final_labels = []
    for label in labels:
        final_labels.extend(label)
    final_labels = torch.LongTensor(final_labels)

    final_colors = []
    for color in colors:
        final_colors.extend(color)
    final_colors = torch.LongTensor(final_colors)

    return final_images, final_labels, final_colors

In [8]:
class ResNet_two_header2(nn.Module):
    def __init__(self, num_jerseys = 20, num_colors = 2):
        super().__init__()
        self.model = models.resnet50(pretrained = True)
        self.model.fc1 = nn.Linear(in_features = 2048, out_features = num_jerseys)
        self.model.fc2 = nn.Linear(in_features = 2048, out_features = num_colors)

    def forward(self, x):
        x = self.model.conv1(x)
        x = self.model.bn1(x)
        x = self.model.relu(x)
        x = self.model.maxpool(x)

        x = self.model.layer1(x)
        x = self.model.layer2(x)
        x = self.model.layer3(x)
        x = self.model.layer4(x)

        x = self.model.avgpool(x)
        x = torch.flatten(x, 1)
        x1 = self.model.fc1(x)
        x2 = self.model.fc2(x)

        return x1, x2

In [9]:
def plot_confusion_matrix(writer, cm, class_names, epoch):
    """
    Returns a matplotlib figure containing the plotted confusion matrix.

    Args:
       cm (array, shape = [n, n]): a confusion matrix of integer classes
       class_names (array, shape = [n]): String names of the integer classes
    """

    figure = plt.figure(figsize=(20, 20))
    # color map: https://matplotlib.org/stable/gallery/color/colormap_reference.html
    plt.imshow(cm, interpolation='nearest', cmap="cool")
    plt.title("Confusion matrix")
    plt.colorbar()
    tick_marks = np.arange(len(class_names))
    plt.xticks(tick_marks, class_names, rotation=45)
    plt.yticks(tick_marks, class_names)

    # Normalize the confusion matrix.
    cm = np.around(cm.astype('float') / cm.sum(axis=1)[:, np.newaxis], decimals=2)

    # Use white text if squares are dark; otherwise black.
    threshold = cm.max() / 2.

    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            color = "white" if cm[i, j] > threshold else "black"
            plt.text(j, i, cm[i, j], horizontalalignment="center", color=color)

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    writer.add_figure('confusion_matrix', figure, epoch)


In [10]:
def train(args):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    transform = Compose([
        ToTensor(),
        Resize((args.image_size, args.image_size))
    ])

    train_set = FootballDataset(root=args.data_path_train, transform=transform)
    valid_set = FootballDataset(root=args.data_path_test, transform=transform)

    training_params = {
        "batch_size": args.batch_size,
        "shuffle": True,
        "drop_last": True,
        # "num_workers": 6,
        "collate_fn": collate_fn
    }

    valid_params = {
        "batch_size": args.batch_size,
        "shuffle": False,
        "drop_last": False,
        # "num_workers": 6,
        "collate_fn": collate_fn
    }

    train_dataloader = DataLoader(train_set, **training_params)
    valid_dataloader = DataLoader(valid_set, **valid_params)

    model = ResNet_two_header2(20, 2).to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=args.lr)
    scheduler = MultiStepLR(optimizer, milestones=[30, 60, 90], gamma=0.1)

    if args.checkpoint_path and os.path.isfile(args.checkpoint_path):
        checkpoint = torch.load(args.checkpoint_path)
        model.load_state_dict(checkpoint["model"])
        optimizer.load_state_dict(checkpoint["optimizer"])
        start_epoch = checkpoint["epoch"] + 1
        best_acc = checkpoint["best_acc"]
    else:
        start_epoch = 0
        best_acc = 0

    if os.path.isdir(args.tensorboard_path):
        shutil.rmtree(args.tensorboard_path)
    os.mkdir(args.tensorboard_path)

    if not os.path.isdir(args.trained_path):
        os.mkdir(args.trained_path)
    writer = SummaryWriter(args.tensorboard_path)
    num_iters = len(train_dataloader)

    for epoch in range(start_epoch, args.epochs):
        # TRAIN
        model.train()
        losses = []
        progress_bar = tqdm(train_dataloader, colour="yellow")
        for iter, (cropped_images, jerseys, colors) in enumerate(progress_bar):
            # Move tensor to configured device:
            cropped_images = cropped_images.to(device)
            jerseys = jerseys.to(device)
            colors = colors.to(device)

            # Forward pass
            predictions_jerseys, predictions_colors = model(cropped_images)
            loss_jerseys = criterion(predictions_jerseys, jerseys)
            loss_colors = criterion(predictions_colors, colors)
            loss = loss_jerseys + loss_colors

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            loss_value = loss.item()
            progress_bar.set_description("Epoch {}/{}. Loss value: {:.4f}".format(epoch + 1, args.epochs, loss_value))
            losses.append(loss_value)
            writer.add_scalar("Train/Loss", np.mean(losses), epoch*num_iters+iter)

        # VALIDATE
        model.eval()
        losses = []
        all_predictions_jerseys = []
        all_predictions_colors = []
        all_gts_jerseys = []
        all_gts_colors = []
        with torch.no_grad():
            for iter, (images, jerseys, colors) in enumerate(valid_dataloader):
                # Move tensor to configured device:
                images = images.to(device)
                jerseys = jerseys.to(device)
                colors = colors.to(device)

                # Forward pass
                predictions_jerseys, predictions_colors = model(images)
                loss_jerseys = criterion(predictions_jerseys, jerseys)
                loss_colors = criterion(predictions_colors, colors)
                loss = loss_jerseys + loss_colors
                losses.append(loss.item())

                max_idx_jerseys = torch.argmax(predictions_jerseys, 1)
                max_idx_colors = torch.argmax(predictions_colors, 1)

                all_gts_jerseys.extend(jerseys.tolist())
                all_gts_colors.extend(colors.tolist())
                all_predictions_jerseys.extend(max_idx_jerseys.tolist())
                all_predictions_colors.extend(max_idx_colors.tolist())

        writer.add_scalar("Val/Loss", np.mean(losses), epoch)
        acc_jerseys = accuracy_score(all_gts_jerseys, all_predictions_jerseys)
        acc_colors = accuracy_score(all_gts_colors, all_predictions_colors)
        avg_acc = (acc_jerseys + acc_colors) / 2
        writer.add_scalar("Val/Accuracy", avg_acc, epoch)
        # conf_matrix_jerseys = confusion_matrix(all_gts_jerseys, all_predictions_jerseys)
        # conf_matrix_colors = confusion_matrix(all_gts_colors, all_predictions_colors)
        # plot_confusion_matrix(writer, conf_matrix_jerseys, [i for i in range(10)], epoch, "Jerseys")
        # plot_confusion_matrix(writer, conf_matrix_colors, [i for i in range(2)], epoch, "Colors")

        checkpoint = {
            "model": model.state_dict(),
            "optimizer": optimizer.state_dict(),
            "epoch": epoch,
            "best_acc": best_acc,
            "batch_size": args.batch_size
        }

        torch.save(checkpoint, os.path.join(args.trained_path, "last.pt"))
        if avg_acc > best_acc:
            torch.save(checkpoint, os.path.join(args.trained_path, "best.pt"))
            best_acc = avg_acc
        scheduler.step()

In [11]:
if __name__ == '__main__':
    args = get_args()
    train(args)
    # Error

Epoch 1/10. Loss value: 0.0308: 100%|[33m██████████[0m| 1131/1131 [5:59:23<00:00, 19.07s/it]  
Epoch 2/10. Loss value: 0.0654: 100%|[33m██████████[0m| 1131/1131 [10:53:22<00:00, 34.66s/it]    
Epoch 3/10. Loss value: 0.0850: 100%|[33m██████████[0m| 1131/1131 [6:52:00<00:00, 21.86s/it]   
Epoch 4/10. Loss value: 0.0249: 100%|[33m██████████[0m| 1131/1131 [12:25:09<00:00, 39.53s/it]    
Epoch 5/10. Loss value: 0.0020:   2%|[33m▏         [0m| 25/1131 [10:30<8:01:01, 26.10s/it]Exception ignored in: <bound method IPythonKernel._clean_thread_parent_frames of <ipykernel.ipkernel.IPythonKernel object at 0x0000024431A21310>>
Traceback (most recent call last):
  File "C:\Users\Minh Quan\AppData\Roaming\Python\Python311\site-packages\ipykernel\ipkernel.py", line 775, in _clean_thread_parent_frames
    def _clean_thread_parent_frames(

KeyboardInterrupt: 
Epoch 5/10. Loss value: 0.0020:   2%|[33m▏         [0m| 25/1131 [10:53<8:01:53, 26.14s/it]


KeyboardInterrupt: 