In [None]:
from google.colab import drive
drive.mount('/content/drive')


In [None]:
cd  "/content/drive/MyDrive/final_proj/task2"


In [None]:
import os
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
import cv2
import numpy as np
from PIL import Image
import json


In [None]:
label_map = {'r': 1, 'n': 2, 'b': 3, 'k': 4, 'q': 5, 'p': 6, 'R': 7, 'N': 8, 'B': 9, 'K': 10, 'Q': 11, 'P': 12}


In [None]:
class ChessDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.image_list = sorted([file for file in os.listdir(os.path.join(root_dir)) if file.endswith('.png')])

    def __len__(self):
        return len(self.image_list)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        img_name = os.path.join(self.root_dir, self.image_list[idx])
        json_name = os.path.splitext(img_name)[0] + ".json"

        image = Image.open(img_name).convert("RGB")
        with open(json_name) as f:
            annotation = json.load(f)

        # extract labels and boxes of chess pieces
        pieces_info = annotation['pieces']
        labels = [label_map[piece['piece']] for piece in pieces_info]
        boxes = [piece['box'] for piece in pieces_info]

        # convert box coordinates to (xmin, ymin, xmax, ymax) format
        boxes = [[box[0], box[1], box[0] + box[2], box[1] + box[3]] for box in boxes]

        if self.transform:
            image = self.transform(image)

        # return image, targets (labels and boxes)
        targets = {
            'boxes': torch.tensor(boxes, dtype=torch.float32),
            'labels': torch.tensor(labels)
        }

        return image, targets,img_name


In [None]:
def collate_fn(batch):
    return tuple(zip(*batch))


In [None]:
import torch
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator


In [None]:
def calculate_iou(box1, box2):
    x_left = max(box1[0], box2[0])
    y_top = max(box1[1], box2[1])
    x_right = min(box1[2], box2[2])
    y_bottom = min(box1[3], box2[3])

    # calculate intersection area
    if x_right < x_left or y_bottom < y_top:
        return 0.0
    intersection_area = (x_right - x_left) * (y_bottom - y_top)

    # calculate areas of individual boxes
    area_box1 = (box1[2] - box1[0]) * (box1[3] - box1[1])
    area_box2 = (box2[2] - box2[0]) * (box2[3] - box2[1])

    # calculate union area
    union_area = area_box1 + area_box2 - intersection_area

    # calculate IoU
    iou = intersection_area / union_area
    return iou


In [None]:
def find_golden_match(gts, pred, pred_idx, threshold=0.5):


    golden_match_idx = -1
    best_iou = -1

    for idx, gt in enumerate(gts):
        iou = calculate_iou(gt, pred)

        if iou >= threshold and iou > best_iou:
            best_iou = iou
            golden_match_idx = idx

    return golden_match_idx


In [None]:
def calculate_metrics(gts, preds, threshold = 0.5):

    n = len(preds)
    tp = 0
    fp = 0
    fns=[1 for i in range(len(gts))]
    for pred_idx in range(n):

        golden_match_gt_idx = find_golden_match(gts, preds[pred_idx], pred_idx,
                                            threshold=threshold)

        if golden_match_gt_idx >= 0:
            tp += 1
            fns[golden_match_gt_idx]=0
        else:
            fp += 1

    fn = sum(fns)
    p=tp / (tp + fp + fn)
    r=tp / (tp + fn) if (tp + fn) > 0 else 0.0
    f1=2*p*r/(p+r) if (p+r)>0 else 0.0
    return p,r,f1


In [None]:
def evaluate_val(model, val_loader, device):
    model.eval()
    running_val_loss = 0.0
    count=0
    validation_image_precisions=[]
    validation_image_recalls=[]
    validation_image_f1s=[]
    with torch.no_grad():
        for _, (images, targets,image_names)  in enumerate(val_loader):

            images = list(image.to(device) for image in images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
            outputs = model(images)
        for i, image in enumerate(images):
            boxes = outputs[i]['boxes'].data.cpu().numpy()
            scores = outputs[i]['scores'].data.cpu().numpy()
            gt_boxes = targets[i]['boxes'].cpu().numpy()
            preds_sorted_idx = np.argsort(scores)[::-1]
            preds_sorted = boxes[preds_sorted_idx]

            image_precision,image_recall,image_f1 = calculate_metrics(preds_sorted,
                                        gt_boxes,
                                        threshold = 0.5
                                        )
            validation_image_precisions.append(image_precision)
            validation_image_recalls.append(image_recall)
            validation_image_f1s.append(image_f1)
        valid_prec = np.mean(validation_image_precisions)
        valid_recall = np.mean(validation_image_recalls)
        valid_f1=np.mean(validation_image_f1s)
        return valid_prec,valid_recall,valid_f1


In [None]:
def train(model,device,epochs,optimizer,train_loader,val_loader):

    print_every = 50  # print loss every 100 steps
    best_val_f1= -float('inf')
    for epoch in range(epochs):

        running_train_loss = 0.0

        for step, (images, targets,image_names)  in enumerate(train_loader):
            model.train()
            images = list(image.to(device) for image in images)

            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            loss_dict = model(images, targets)


            losses = sum(loss for loss in loss_dict.values())

            optimizer.zero_grad()
            losses.backward()
            optimizer.step()

            running_train_loss += losses.item()

            # print training loss every print_every steps
            if (step + 1) % print_every == 0 or step+1==len(train_loader):
                epoch_train_loss = running_train_loss / print_every

                running_train_loss = 0.0


                val_prec,val_recall,val_f1 = evaluate_val(model, val_loader,device)
                print(f"Epoch [{epoch+1}/{epochs}] - Step : [{step+1}/{len(train_loader)}] - Train Loss: {epoch_train_loss:.4f} - Val Precision: {val_prec:.4f} - Val Recall: {val_recall:.4f}- Val F1: {val_f1:.4f}")

                # save the best model based on validation loss
                if  val_f1> best_val_f1:
                    best_val_f1 = val_f1
                    torch.save(model.state_dict(), 'models/best_model_ep{}_s{}_f1{}.pth'.format(epoch+1,step+1,round(val_f1,4)))


In [None]:
batch_size=18
epochs=10
lr=3e-4

train_directory = "train/"
val_directory = "val/"

transform = transforms.Compose([transforms.ToTensor()])
train_dataset = ChessDataset(train_directory, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
val_dataset = ChessDataset(val_directory, transform=transform)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)


In [None]:
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
num_classes = 13  # 12 class + background

in_features = model.roi_heads.box_predictor.cls_score.in_features

model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

params = [p for p in model.parameters() if p.requires_grad]

optimizer = torch.optim.Adam(model.parameters(), lr=lr)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
train(model,device,epochs,optimizer, train_loader,val_loader)




Epoch [1/10] - Step : [50/245] - Train Loss: 1.0387 - Val Precision: 0.2850 - Val Recall: 0.2850- Val F1: 0.2850
Epoch [1/10] - Step : [100/245] - Train Loss: 0.3443 - Val Precision: 0.3877 - Val Recall: 0.3877- Val F1: 0.3877
Epoch [1/10] - Step : [150/245] - Train Loss: 0.2189 - Val Precision: 0.4828 - Val Recall: 0.4828- Val F1: 0.4828
Epoch [1/10] - Step : [200/245] - Train Loss: 0.1661 - Val Precision: 0.5747 - Val Recall: 0.5747- Val F1: 0.5747
Epoch [1/10] - Step : [245/245] - Train Loss: 0.1315 - Val Precision: 0.6292 - Val Recall: 0.6292- Val F1: 0.6292
Epoch [2/10] - Step : [50/245] - Train Loss: 0.1299 - Val Precision: 0.7969 - Val Recall: 0.7969- Val F1: 0.7969
Epoch [2/10] - Step : [100/245] - Train Loss: 0.1198 - Val Precision: 0.8409 - Val Recall: 0.8409- Val F1: 0.8409
Epoch [2/10] - Step : [150/245] - Train Loss: 0.1071 - Val Precision: 0.8385 - Val Recall: 0.8385- Val F1: 0.8385
