In [1]:
import os
import numpy as np
import matplotlib.patches as patches
import matplotlib.pyplot as plt
from bs4 import BeautifulSoup
from PIL import Image
import torchvision
from torchvision import transforms, datasets, models
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
import time
import torch
import cv2

In [2]:
def generate_box(obj):

    xmin = float(obj.find('xmin').text)
    ymin = float(obj.find('ymin').text)
    xmax = float(obj.find('xmax').text)
    ymax = float(obj.find('ymax').text)

    return [xmin, ymin, xmax, ymax]

adjust_label = 1

def generate_label(obj):
    if obj.find('name').text == "1":
        return 1 + adjust_label
    return 0 + adjust_label

def generate_target(file):
    with open(file) as f:
        data = f.read()
        soup = BeautifulSoup(data, "html.parser")
        objects = soup.find_all("object")

        num_objs = len(objects)

        boxes = []
        labels = []
        for i in objects:
            boxes.append(generate_box(i))
            labels.append(generate_label(i))

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)

        target = {}
        target["boxes"] = boxes
        target["labels"] = labels

        return target

def plot_image_from_output(img, annotation):
    # Convert the image from BGR to RGB
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    
    for box, label in zip(annotation["boxes"], annotation["labels"]):
        xmin, ymin, xmax, ymax = box
        start_point = (xmin, ymin)
        end_point = (xmax, ymax)
        color = (0, 255, 0) if label == 1 else (0, 0, 255)
        img = cv2.rectangle(img, start_point, end_point, color, 2)
    
    cv2.imshow('Image', img)
    cv2.waitKey(0)
    cv2.destroyAllWindows()


def plot_image_from_output1(img, annotation):

    img = img.cpu().permute(1,2,0)

    fig,ax = plt.subplots(1)
    ax.imshow(img)

    for idx in range(len(annotation["boxes"])):
        xmin, ymin, xmax, ymax = annotation["boxes"][idx]

        if annotation['labels'][idx] == 1 :
            rect = patches.Rectangle((xmin,ymin),(xmax-xmin),(ymax-ymin),linewidth=1,edgecolor='r',facecolor='none')
        else :
            rect = patches.Rectangle((xmin,ymin),(xmax-xmin),(ymax-ymin),linewidth=1,edgecolor='g',facecolor='none')

        ax.add_patch(rect)

    plt.show()

In [3]:
class TrafficDataset(object):
    def __init__(self, transforms, path):
        '''
        path: path to train folder or test folder
        '''
        # define the path to the images and what transform will be used
        self.transforms = transforms
        self.path = path
        self.imgs = list(sorted(os.listdir(self.path)))


    def __getitem__(self, idx): #special method
        # load images ad masks
        file_image = self.imgs[idx]
        file_label = self.imgs[idx][:-3] + 'xml'
        img_path = os.path.join(self.path, file_image)

        if 'test' in self.path:
            label_path = os.path.join("drive/MyDrive/Colab Notebooks/dataset/test_annotations/", file_label)
        else:
            label_path = os.path.join("drive/MyDrive/Colab Notebooks/dataset/annotations/", file_label)

        img = Image.open(img_path).convert("RGB")
        #Generate Label
        target = generate_target(label_path)

        if self.transforms is not None:
            img = self.transforms(img)

        return img, target

    def __len__(self):
        return len(self.imgs)

data_transform = transforms.Compose([  # transforms.Compose : a class that calls the functions in a list consecutively
        transforms.ToTensor() # ToTensor : convert numpy image to torch.Tensor type
    ])

def collate_fn(batch):
    return tuple(zip(*batch))

dataset = TrafficDataset(data_transform, 'drive/MyDrive/Colab Notebooks/dataset/images/')
test_dataset = TrafficDataset(data_transform, 'drive/MyDrive/Colab Notebooks/dataset/test_images/')

data_loader = torch.utils.data.DataLoader(dataset, batch_size=8, collate_fn=collate_fn)
test_data_loader = torch.utils.data.DataLoader(test_dataset, batch_size=8, collate_fn=collate_fn)

FileNotFoundError: [WinError 3] The system cannot find the path specified: 'drive/MyDrive/Colab Notebooks/dataset/images/'

# Import Model

In [4]:
def get_model(num_classes):

    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    return model

# Transferlearning

In [5]:
model = get_model(3)

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)



FasterRCNN(
  (transform): GeneralizedRCNNTransform(
      Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
      Resize(min_size=(800,), max_size=1333, mode='bilinear')
  )
  (backbone): BackboneWithFPN(
    (body): IntermediateLayerGetter(
      (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): FrozenBatchNorm2d(64, eps=0.0)
      (relu): ReLU(inplace=True)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): FrozenBatchNorm2d(64, eps=0.0)
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): FrozenBatchNorm2d(64, eps=0.0)
          (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn3): FrozenBatchNorm2d(256, eps=0.0)
          (relu): ReLU(

In [6]:
checkpoint_path = "..\\weight\\checkpoint_epoch_171.pth"
checkpoint = torch.load(checkpoint_path, map_location=torch.device('cpu'))

model.load_state_dict(checkpoint['model_state_dict'])

<All keys matched successfully>

In [7]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

FasterRCNN(
  (transform): GeneralizedRCNNTransform(
      Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
      Resize(min_size=(800,), max_size=1333, mode='bilinear')
  )
  (backbone): BackboneWithFPN(
    (body): IntermediateLayerGetter(
      (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): FrozenBatchNorm2d(64, eps=0.0)
      (relu): ReLU(inplace=True)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): FrozenBatchNorm2d(64, eps=0.0)
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): FrozenBatchNorm2d(64, eps=0.0)
          (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn3): FrozenBatchNorm2d(256, eps=0.0)
          (relu): ReLU(

In [None]:
num_epochs = 100
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.001,
                                momentum=0.9, weight_decay=0.0005)

In [None]:
print('----------------------train start--------------------------')
for epoch in range(num_epochs):
    start = time.time()
    model.train()
    i = 0
    epoch_loss = 0
    
    for imgs, annotations in data_loader:
        i += 1
        imgs = list(img.to(device) for img in imgs)
        annotations = [{k: v.to(device) for k, v in t.items()} for t in annotations]
        loss_dict = model(imgs, annotations)
        losses = sum(loss for loss in loss_dict.values())

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()
        epoch_loss += losses

    print(f'epoch : {epoch+29}, Loss : {epoch_loss}, time : {time.time() - start}')

      # Lưu checkpoint
    checkpoint_path = f"drive/MyDrive/Khoa-luan/checkpoint/checkpoint_epoch_{epoch + 29}.pth"
    torch.save({
        'epoch': epoch + 29,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'loss': epoch_loss,
    }, checkpoint_path)

In [None]:
torch.save(model.state_dict(),f'model_{num_epochs}.pt')

In [8]:
def make_prediction(model, img, threshold):
    model.eval() #
    preds = model(img) #
    for id in range(len(preds)) : 
        idx_list = []

        for idx, score in enumerate(preds[id]['scores']) :
            if score > threshold :
                idx_list.append(idx)

        preds[id]['boxes'] = preds[id]['boxes'][idx_list]
        preds[id]['labels'] = preds[id]['labels'][idx_list]
        preds[id]['scores'] = preds[id]['scores'][idx_list]

    return preds

# Me

In [10]:
from PIL import Image
import torchvision.transforms as T

def predict_image(image_path, model, device, detection_threshold):
    # Load image
    img = Image.open(image_path)
    
    # Convert to tensor
    transform = T.Compose([T.ToTensor()])
    img = transform(img)
    
    # Add an extra batch dimension since pytorch treats all images as batches
    img = img.unsqueeze(0)
    
    img = list(im.to(device) for im in img)
    
    # Make the prediction
    with torch.no_grad():
        pred = make_prediction(model, img, detection_threshold)
    
    return pred

image_path = "..\\input\\test5.jpg"
predictions = predict_image(image_path, model, device, 0.5)
print(predictions)

[{'boxes': tensor([[249.0571, 119.7171, 320.7277, 206.8495],
        [319.1904,  58.2207, 337.5952,  81.7413],
        [  1.3798, 127.2829, 129.2709, 288.0000],
        [164.5637, 120.5090, 250.7750, 185.8293],
        [118.8279, 117.3995, 183.6174, 174.8367],
        [207.5799,  62.1505, 244.1538, 113.1640],
        [187.0535,  95.1887, 206.5218, 127.1008],
        [368.0802, 148.4127, 383.2691, 190.4060],
        [172.7169,  92.6391, 186.4944, 119.7706],
        [154.0199,  87.2254, 168.4612, 118.6675],
        [ 30.1839, 105.5751,  56.1299, 137.7566],
        [363.0086, 106.5690, 374.5385, 137.1214],
        [122.2572, 196.3792, 212.3884, 265.6895],
        [391.4474, 159.4713, 408.0204, 199.8583],
        [331.9738, 178.7650, 353.0315, 231.8273],
        [259.7886,  41.7615, 287.3807,  79.8549],
        [324.8332, 122.9944, 337.9163, 162.3100],
        [237.9453,  51.9720, 263.2780,  98.2543],
        [309.7321, 201.9450, 332.3269, 267.3112],
        [364.0207, 193.1598, 385.2540, 

In [26]:
import cv2
import numpy as np


def plot_image_from_output(image_path, predictions):
    # Load image
    img = cv2.imread(image_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    
    # Resize image
    scale_percent = 150 # percent of original size
    width = int(img.shape[1] * scale_percent / 100)
    height = int(img.shape[0] * scale_percent / 100)
    dim = (width, height)
    img = cv2.resize(img, dim, interpolation = cv2.INTER_AREA)
    
    # For each prediction
    for box, label, score in zip(predictions[0]['boxes'], predictions[0]['labels'], predictions[0]['scores']):
        xmin, ymin, xmax, ymax = [int(x * scale_percent / 100) for x in box]
        start_point = (xmin, ymin)
        end_point = (xmax, ymax)
        color = (0, 255, 0) if label == 1 else (0, 0, 255)
        thickness = 1  # Reduce the thickness of the bounding box
        img = cv2.rectangle(img, start_point, end_point, color, thickness)
        font_scale = 0.4  # Reduce the font scale
        img = cv2.putText(img, f'{label}: {score:.2f}', (xmin, ymin - 10), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (36,255,12), 2)

    # Display the image
    cv2.imshow('Image', img)
    cv2.waitKey(0)
    cv2.destroyAllWindows()

plot_image_from_output(image_path, predictions)

In [None]:
with torch.no_grad():
    # batch size of the test set = 2
    for imgs, annotations in test_data_loader:
        imgs = list(img.to(device) for img in imgs)

        pred = make_prediction(model, imgs, 0.5)
        print(pred)
        break

In [None]:
_idx = 1
print("Target : ", annotations[_idx]['labels'])
plot_image_from_output(imgs[_idx], annotations[_idx])
print("Prediction : ", pred[_idx]['labels'])
plot_image_from_output(imgs[_idx], pred[_idx])

This time, we will evaluate the inference results for all of the test data. First, the predicted results and actual labels for all test data will be saved in preds_adj_all and annot_all, respectively.

In [None]:
from tqdm import tqdm

labels = []
preds_adj_all = []
annot_all = []

for im, annot in tqdm(test_data_loader, position = 0, leave = True):
    im = list(img.to(device) for img in im)

    for t in annot:
        labels += t['labels']

    with torch.no_grad():
        preds_adj = make_prediction(model, im, 0.5)
        preds_adj = [{k: v.to(torch.device('cpu')) for k, v in t.items()} for t in preds_adj]
        preds_adj_all.append(preds_adj)
        annot_all.append(annot)

In [None]:
import utils

In [None]:
sample_metrics = []
for batch_i in range(len(preds_adj_all)):
    sample_metrics += utils.get_batch_statistics(preds_adj_all[batch_i], annot_all[batch_i], iou_threshold=0.5)

true_positives, pred_scores, pred_labels = [torch.cat(x, 0) for x in list(zip(*sample_metrics))]  # all the batches get concatenated
precision, recall, AP, f1, ap_class = utils.ap_per_class(true_positives, pred_scores, pred_labels, torch.tensor(labels))
mAP = torch.mean(AP)
print(f'mAP : {mAP}')
print(f'AP : {AP}')