Note: Before running this notebook download dataset from https://github.com/Armin1337/RebarDSC into your project directory.

In [None]:
# Install Dependencies

%pip install torch==2.2.0 torchvision==0.17.0 -f https://download.pytorch.org/whl/torch_stable.html
%pip install opencv-python
%pip install pycocotools
%pip install pandas

In [None]:
# Imports

import cv2
import engine
import numpy as np
import os
import pandas as pd
import pycocotools.cocoeval
import random
import shutil
import torch
import torchvision
from engine import train_one_epoch, evaluate
from importlib import reload
from matplotlib import pyplot
from PIL import Image
from torch.optim.lr_scheduler import StepLR
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

os.system("curl https://raw.githubusercontent.com/pytorch/vision/main/references/detection/engine.py -o engine.py")
os.system("curl https://raw.githubusercontent.com/pytorch/vision/main/references/detection/utils.py -o utils.py")
os.system("curl https://raw.githubusercontent.com/pytorch/vision/main/references/detection/coco_utils.py -o coco_utils.py")
os.system("curl https://raw.githubusercontent.com/pytorch/vision/main/references/detection/coco_eval.py -o coco_eval.py")
os.system("curl https://raw.githubusercontent.com/pytorch/vision/main/references/detection/transforms.py -o transforms.py")

DEVICE = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
MODEL_DIR = "./model"
DATASET_DIR = "./RebarDSC"

In [None]:
# Show single image with annotations to check

key = "861_20MM"
train_df = pd.read_csv(f"{DATASET_DIR}/annotations/100_percent_train.csv", header=None)
train_df = train_df[train_df[0] == f"rebar_{key}.jpg"]
str_boxes = train_df[1].values.flatten().tolist()
img = Image.open(f"{DATASET_DIR}/images/rebar_{key}.jpg").convert('RGB')
pyplot.figure(figsize=(img.size[0]/400.0, img.size[1]/400.0))
img = np.array(img)
for str_box in str_boxes:
  xmin, ymin, xmax, ymax = [int(c) for c in str_box.split()]  
  cv2.rectangle(img, (xmin, ymin), (xmax, ymax), (0, 255, 0), 4)
pyplot.imshow(img)
pyplot.show()

In [None]:
# Dataset 

class RebarDataset(torch.utils.data.Dataset):
    
  def __init__(self, csv_path, transforms=None):                
    self.df = pd.read_csv(csv_path, header=None)
    self.imgs = self.df[0].unique().flatten().tolist()
    self.transforms = transforms        

  def __getitem__(self, idx):                
    img = Image.open(f"{DATASET_DIR}/images/{self.imgs[idx]}").convert("RGB")
    str_boxes = self.df[self.df[0] == self.imgs[idx]][1].values.flatten().tolist()
    float_boxes = []
    for str_box in str_boxes:
      b = [int(c) for c in str_box.split()]
      if b[0] < b[2] and b[1] < b[3]:
        float_boxes.append(b)    
    boxes = torch.as_tensor(float_boxes, dtype=torch.float32, device=DEVICE)

    target = {}
    target["image_id"] = idx
    target["boxes"] = boxes
    target["labels"] = torch.ones((len(boxes),), dtype=torch.int64 , device=DEVICE)    
    target["area"] = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
    target["iscrowd"] = torch.zeros((len(boxes),), dtype=torch.int64)

    if self.transforms:
      img, target = self.transforms(img, target)

    return img, target

  def __len__(self):
    return len(self.imgs)

  def get_ids(self):
    return [i for i in range(len(self.imgs))]    
       

In [None]:
# Loaders

import transforms as T

def get_transform(train):    
  transforms = [T.PILToTensor(), T.ToDtype(torch.float, scale=True)]
  if train:
    transforms.append(T.RandomHorizontalFlip(0.5))
  return T.Compose(transforms)

def collate_fn(batch):
    return tuple(zip(*batch))

train_csv_path = f"{DATASET_DIR}/annotations/100_percent_train.csv"
train_dataset = RebarDataset(train_csv_path, get_transform(True))
train_dataset = torch.utils.data.Subset(train_dataset, train_dataset.get_ids())
train_data_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)

test_csv_path = f"{DATASET_DIR}/annotations/test.csv"
test_dataset = RebarDataset(test_csv_path, get_transform(False))
test_dataset = torch.utils.data.Subset(test_dataset, test_dataset.get_ids())
test_data_loader = torch.utils.data.DataLoader(
    test_dataset, batch_size=1, shuffle=False, collate_fn=collate_fn)

In [None]:
# Train model

if os.path.exists(MODEL_DIR):
  shutil.rmtree(MODEL_DIR)
os.makedirs(MODEL_DIR)

model = torchvision.models.detection.fasterrcnn_resnet50_fpn_v2(
    pretrained=True, progress=True)
model.roi_heads.detections_per_img=1000
model.roi_heads.box_predictor = FastRCNNPredictor(
    model.roi_heads.box_predictor.cls_score.in_features, 2) 
model.to(DEVICE)

params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.Adam(params, lr=3e-4)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.06)

for epoch in range(5):
    train_one_epoch(model, optimizer, train_data_loader, DEVICE, epoch, print_freq=50)    
    lr_scheduler.step()      
    evaluate(model, test_data_loader, device=DEVICE)        
    torch.save(model.cpu().state_dict(), f"{MODEL_DIR}/model_{epoch}.pth")		 
    model.to(DEVICE)

In [None]:
# Look at the results

model = torchvision.models.detection.fasterrcnn_resnet50_fpn_v2(
    pretrained=True, progress=True)
model.roi_heads.detections_per_img = 1000
model.roi_heads.box_predictor = FastRCNNPredictor(
    model.roi_heads.box_predictor.cls_score.in_features, 2) 
model.to(DEVICE)


epoch = max(int(f.split("_")[1].split(".")[0]) for f in os.listdir(MODEL_DIR))
model.load_state_dict(torch.load(f"{MODEL_DIR}/model_{epoch}.pth"))
model.eval()

df = pd.read_csv(f"{DATASET_DIR}/annotations/test.csv", header=None)
img_list = df[0].unique().flatten().tolist()
random.shuffle(img_list)

for i, name in enumerate(img_list[:5]):
    str_boxes = df[df[0] == name][1].values.flatten().tolist()
    boxes = []
    for str_box in str_boxes:
      b = [int(c) for c in str_box.split()]
      if b[0] < b[2] and b[1] < b[3]:
        boxes.append(b)
    
    image_src = Image.open(f"{DATASET_DIR}/images/{name}").convert("RGB")
    img_tensor = torchvision.transforms.ToTensor()(image_src)
    result_dict = None
    with torch.no_grad():      
      img = np.array(image_src.copy())
      for xmin, ymin, xmax, ymax in boxes: 
        cv2.rectangle(img, (xmin, ymin), (xmax, ymax), (0, 255, 0), 4)
      
      result_dict = model([img_tensor.to(DEVICE)])
      bbox = result_dict[0]["boxes"].cpu().numpy()
      scores = result_dict[0]["scores"].cpu().numpy()         
      rebar_count = 0     
      for bbox, score in zip(bbox, scores):
        if len(bbox) > 0 and score > 0.75:          
          rebar_count += 1
          cv2.circle(img,
                     (int((bbox[0] + bbox[2]) * 0.5), int((bbox[1] + bbox[3]) * 0.5)),
                     int((bbox[2] - bbox[0]) * 0.5 * 0.6),
                     (255, 0, 0),
                     -1)          
      
      print("Rebar count:", rebar_count, f"(red circles) vs {len(boxes)} expected (green boxes)")
      pyplot.figure(i, figsize=(10, 10))
      pyplot.imshow(img)
      pyplot.show()