In [None]:
import os
import sys
import cv2
import torch
import torchvision

from tl_dataset import TLDataset

In [None]:
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

# load a model pre-trained pre-trained on COCO
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)

In [None]:
# replace the classifier with a new one, that has
# num_classes which is user-defined
num_classes = 2  # 1 class (traffic light) + background
# get number of input features for the classifier
in_features = model.roi_heads.box_predictor.cls_score.in_features
# replace the pre-trained head with a new one
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

In [None]:
from engine import train_one_epoch, evaluate
import utils

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
num_classes = 2

list_fpath = './apollo_tl_demo_data/trainsets/list'
dataset = TLDataset(list_fpath)
dataset_test = TLDataset(list_fpath)

# split the dataset in train and test set
indices = torch.randperm(len(dataset)).tolist()
dataset = torch.utils.data.Subset(dataset, indices[:-20])
dataset_test = torch.utils.data.Subset(dataset_test, indices[-80:])

# define training and validation data loaders
data_loader = torch.utils.data.DataLoader(
    dataset, batch_size=2, shuffle=True, num_workers=1,
    collate_fn=utils.collate_fn)

data_loader_test = torch.utils.data.DataLoader(
    dataset_test, batch_size=1, shuffle=False, num_workers=1,
    collate_fn=utils.collate_fn)

model.to(device)

# construct an optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.01,
                            momentum=0.9, weight_decay=0.0005)
# and a learning rate scheduler
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer,
                                               step_size=6,
                                               gamma=0.1)

# let's train it for 10 epochs
num_epochs = 10

for epoch in range(num_epochs):
    # train for one epoch, printing every 10 iterations
    train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq=10)
    # update the learning rate
    lr_scheduler.step()
    # evaluate on the test dataset
    evaluate(model, data_loader_test, device=device)

print("Done!")

# save model
TL_MODEL_PATH = 'tl_detect_model.pth'
torch.save(model.state_dict(), TL_MODEL_PATH)

In [None]:
from torchvision import transforms
from PIL import Image
import cv2
import numpy as np

# model.load_state_dict(torch.load(TL_MODEL_PATH))
model.eval()
trans = transforms.Compose([transforms.ToTensor()])

test_img_dir = './apollo_tl_demo_data/testsets/images/'

for i in range(100):
    test_img_path = '{}/{:0>5d}.jpg'.format(test_img_dir, i)
    test_img = Image.open(test_img_path).convert("RGB")
    cv_img = cv2.cvtColor(np.asarray(test_img),cv2.COLOR_RGB2BGR)
    x = [trans(test_img).to(device)]
    predictions = model(x)
    # print(predictions)
    num_boxes = predictions[0]['scores'].size()[0]
    for j in range(num_boxes):
        xmin = int(predictions[0]['boxes'][j][0].item())
        ymin = int(predictions[0]['boxes'][j][1].item())
        xmax = int(predictions[0]['boxes'][j][2].item())
        ymax = int(predictions[0]['boxes'][j][3].item())
        confidence = float(predictions[0]['scores'][j].item())
        print(xmin, ymin, xmax, ymax)
        print(confidence)
        if confidence < 0.7:
            continue
        cv2.rectangle(cv_img, (xmin, ymin), (xmax, ymax), (0,0,255), 2)
    cv2.imwrite('./vis_det/{:0>5d}.jpg'.format(i), cv_img)