In [None]:
from google.colab import drive
drive.mount('/content/drive')

##Mask Image


In [None]:
%matplotlib inline
from pycocotools.coco import COCO
import numpy as np
import skimage.io as io
import matplotlib.pyplot as plt
import pylab
from PIL import Image
pylab.rcParams['figure.figsize'] = (8.0, 10.0)

In [None]:
dataDir='content'
dataType='val2017'
annFile='/{}/annotations/instances_{}.json'.format(dataDir,dataType)

annFile = '/content/drive/MyDrive/filtered.json'

In [None]:
# initialize COCO api for instance annotations
coco=COCO(annFile)
catIDs = coco.getCatIds()
cats = coco.loadCats(catIDs)
print(cats)

In [None]:
imgIds = coco.getImgIds()
images = coco.loadImgs(imgIds)
print(len(images))

In [None]:
for i in range(len(images)):
  img = images[i]
  filename = '/content/{}/{}'.format(dataType,img['file_name'])
  print(img['file_name'])
  I = io.imread(filename)
  imgId = img['id']
  plt.imshow(I)
  plt.axis('off')
  annIds = coco.getAnnIds(imgIds=imgId, iscrowd=None)
  anns = coco.loadAnns(annIds)
  coco.showAnns(anns)
  mask = np.zeros((img['height'],img['width']),dtype=np.uint8)
  for i in range(len(anns)):
    print(anns[i])
    mask = np.maximum(mask,coco.annToMask(anns[i])*(i+1))
    #np.savetxt('test.txt',mask)
  data = Image.fromarray(mask)
  imgName = img['file_name']
  imgName = imgName.replace('.jpg','')
  savepath = imgName
  print(savepath)
  data.save('/content/drive/MyDrive/vehicleDataset/vehicleMasks/{}_mask.png'.format(savepath))

## Train model

In [None]:
import os
import numpy as np
import torch
import torch.utils.data
from PIL import Image
from pycocotools.coco import COCO

In [None]:
class VehicleDataset(torch.utils.data.Dataset):
  def __init__(self, root, transforms=None):
    self.root = root
    self.transforms = transforms
    # load all image files, sorting them to
    # ensure that they are aligned
    self.imgs = list(sorted(os.listdir(os.path.join(root , "JPGImages"))))
  
    self.masks = list(sorted(os.listdir(os.path.join(root, "vehicleMasks"))))
    #addtional
    self.coco = COCO('/content/drive/MyDrive/filtered.json')
    imgIds = self.coco.getImgIds()
    sort_imgIds = sorted(imgIds)
    sort_imgIds.remove(52412)
    sort_imgIds.remove(183246)
    sort_imgIds.remove(210273)
    sort_imgIds.remove(344888)
    sort_imgIds.remove(336232)
    sort_imgIds.remove(426372)
    sort_imgIds.remove(460147)
    self.sort_imgIds = sort_imgIds 

  def __len__(self):
    return len(self.imgs)

  def __getitem__(self, idx):
    # load images ad masks
    img_path = os.path.join(self.root, "JPGImages", self.imgs[idx])
    mask_path = os.path.join(self.root, "vehicleMasks", self.masks[idx])
    img = Image.open(img_path).convert("RGB")
    
    annIds = self.coco.getAnnIds(imgIds=self.sort_imgIds[idx])
    anns = self.coco.loadAnns(annIds)
    
    # note that we haven't converted the mask to RGB,
    # because each color corresponds to a different instance
    # with 0 being background
    mask = Image.open(mask_path)
    mask = np.array(mask)
    # instances are encoded as different colors        
    obj_ids = np.unique(mask)
    # first id is the background, so remove it
    obj_ids = obj_ids[1:]

    # split the color-encoded mask into a set
    # of binary masks
    masks = mask == obj_ids[:, None, None]

    # get bounding box coordinates for each mask
    num_objs = len(obj_ids)
    boxes = []
    iscrowd = []
    labels = []
    
    #for i in range(num_objs):
    j=0
    for i in obj_ids:
      labels.append(anns[i-1]['category_id'])
      iscrowd.append(anns[i-1]['iscrowd'])
      pos = np.where(masks[j])
      xmin = np.min(pos[1])
      xmax = np.max(pos[1])
      ymin = np.min(pos[0])
      ymax = np.max(pos[0])
      boxes.append([xmin, ymin, xmax, ymax])
      j+=1
    
    boxes = torch.as_tensor(boxes, dtype=torch.float32)
      
    masks = torch.as_tensor(masks, dtype=torch.uint8)

    image_id = torch.tensor([idx])
    area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
   
      
    iscrowd = torch.as_tensor(iscrowd, dtype=torch.int64)
    #iscrowd = torch.zeros((num_objs,), dtype=torch.int64)
    labels = torch.as_tensor(labels, dtype=torch.int64)
    #labels = torch.ones((num_objs,), dtype=torch.int64)
      
    target = {}
    target["boxes"] = boxes
    target["labels"] = labels
    target["masks"] = masks
    target["image_id"] = image_id
    target["area"] = area
    target["iscrowd"] = iscrowd

    if self.transforms is not None:
      img, target = self.transforms(img, target)

    return img, target

In [None]:
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor

      
def build_model(num_classes):
    # load an instance segmentation model pre-trained on COCO
    model = torchvision.models.detection.maskrcnn_resnet50_fpn(pretrained=True)

    # get the number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    # replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    # Stop here if you are fine-tunning Faster-RCNN

    # now get the number of input features for the mask classifier
    in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
    hidden_layer = 256
    # and replace the mask predictor with a new one
    model.roi_heads.mask_predictor = MaskRCNNPredictor(in_features_mask,
                                                       hidden_layer,
                                                       num_classes)

    return model

In [None]:
!git clone https://github.com/pytorch/vision.git
%cd vision
!git checkout v0.3.0

!cp references/detection/utils.py ../
!cp references/detection/transforms.py ../
!cp references/detection/coco_eval.py ../
!cp references/detection/engine.py ../
!cp references/detection/coco_utils.py ../

In [None]:
from engine import train_one_epoch, evaluate
import utils
import transforms as T


def get_transform(train):
    transforms = []
    # converts the image, a PIL image, into a PyTorch Tensor
    transforms.append(T.ToTensor())
    if train:
        # during training, randomly flip the training images
        # and ground-truth for data augmentation
        transforms.append(T.RandomHorizontalFlip(0.5))
    return T.Compose(transforms)

# use our dataset and defined transformations
dataset = VehicleDataset('/content/drive/MyDrive/vehicleDataset', get_transform(train=True))
dataset_test = VehicleDataset('/content/drive/MyDrive/vehicleDataset', get_transform(train=False))
# split the dataset in train and test set
torch.manual_seed(1)
indices = torch.randperm(len(dataset)).tolist()
dataset = torch.utils.data.Subset(dataset, indices[:-50])
dataset_test = torch.utils.data.Subset(dataset_test, indices[-50:])

# define training and validation data loaders
data_loader = torch.utils.data.DataLoader(
    dataset, batch_size=2, shuffle=True, num_workers=4,
    collate_fn=utils.collate_fn)

data_loader_test = torch.utils.data.DataLoader(
    dataset_test, batch_size=1, shuffle=False, num_workers=4,
    collate_fn=utils.collate_fn)

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# our dataset has two classes only - background and person
num_classes = 5

# get the model using our helper function
model = build_model(num_classes)
# move model to the right device
model.to(device)

# construct an optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005,
                            momentum=0.9, weight_decay=0.0005)

# and a learning rate scheduler which decreases the learning rate by
# 10x every 3 epochs
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer,
                                               step_size=3,
                                               gamma=0.1)

In [None]:
# number of epochs
num_epochs = 20

for epoch in range(num_epochs):
    # train for one epoch, printing every 10 iterations
    train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq=10)
    # update the learning rate
    lr_scheduler.step()
    # evaluate on the test dataset
    evaluate(model, data_loader_test, device=device)

In [None]:
torch.save(model, 'mask-rcnn-vehicle2.pt')

##apply model

In [None]:
%matplotlib inline
from PIL import Image
import matplotlib.pyplot as plt
import torch
import torchvision.transforms as T
import torchvision
import numpy as np

import cv2
import random
import warnings
warnings.filterwarnings('ignore')

In [None]:
PATH = '/content/drive/MyDrive/mask-rcnn-vehicle2.pt'
model = torch.load(PATH)
model.eval()
CLASS_NAMES = ['__background__', 'car','motorcycle','bus','truck']
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

In [None]:
def get_centroid(x1, y1, x2, y2):
    w = x2 - x1
    h = y2 - y1
    
    cx = int(x1 + w//2)
    cy = int(y1 + h//2)
    return cx,cy

def get_coloured_mask(mask):
    """
    random_colour_masks
      parameters:
        - image - predicted masks
      method:
        - the masks of each predicted object is given random colour for visualization
    """
    colours = [[0, 255, 0],[0, 0, 255],[255, 0, 0],[0, 255, 255],[255, 255, 0],[255, 0, 255],[80, 70, 180],[250, 80, 190],[245, 145, 50],[70, 150, 250],[50, 190, 190]]
    r = np.zeros_like(mask).astype(np.uint8)
    g = np.zeros_like(mask).astype(np.uint8)
    b = np.zeros_like(mask).astype(np.uint8)
    r[mask == 1], g[mask == 1], b[mask == 1] = colours[random.randrange(0,10)]
    coloured_mask = np.stack([r, g, b], axis=2)
    return coloured_mask

def get_prediction(img, confidence):
    """
    get_prediction
      parameters:
        - img_path - path of the input image
        - confidence - threshold to keep the prediction or not
      method:
        - Image is obtained from the image path
        - the image is converted to image tensor using PyTorch's Transforms
        - image is passed through the model to get the predictions
        - masks, classes and bounding boxes are obtained from the model and soft masks are made binary(0 or 1) on masks
          ie: eg. segment of cat is made 1 and rest of the image is made 0
    
    """
    #img = Image.open(img_path)
    transform = T.Compose([T.ToTensor()])
    img = transform(img)

    img = img.to(device)
    pred = model([img])
    pred_score = list(pred[0]['scores'].detach().cpu().numpy())
    print(pred_score)
    pred_t = []
    
    for x in pred_score:
      if x > confidence:
        pred_t.append(pred_score.index(x))
    if len(pred_t) != 0:
      pred_t = pred_t[-1]
    else:
      pred_t = -1

    masks = (pred[0]['masks']>0.5).squeeze().detach().cpu().numpy()
    # print(pred[0]['labels'].numpy().max())
    pred_class = [CLASS_NAMES[i] for i in list(pred[0]['labels'].cpu().numpy())]
    pred_boxes = [[(i[0], i[1]), (i[2], i[3])] for i in list(pred[0]['boxes'].detach().cpu().numpy())]
    masks = masks[:pred_t+1]
    pred_boxes = pred_boxes[:pred_t+1]
    pred_class = pred_class[:pred_t+1]
    pred_s = pred_score[:pred_t+1]
    print(pred_s)
    print(pred_class)
    return masks, pred_boxes, pred_class,pred_s

def segment_instance(img, confidence=0.5, rect_th=2, text_size=1, text_th=2):
    """
    segment_instance
      parameters:
        - img_path - path to input image
        - confidence- confidence to keep the prediction or not
        - rect_th - rect thickness
        - text_size
        - text_th - text thickness
      method:
        - prediction is obtained by get_prediction
        - each mask is given random color
        - each mask is added to the image in the ration 1:0.8 with opencv
        - final output is displayed
    """
    masks, boxes, pred_cls , pred_s = get_prediction(img, confidence)
    offset = 10
    matches = []
    global num_obj
    global num_car
    global num_motorcycle
    global num_bus
    global num_truck
    line_height = 120
    line_width = 300
    height, width, channels = img.shape
    print('height',height,width)
    #img = cv2.imread(img_path)
    #img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    #print('num_mask=',len(masks))
    for i in range(len(masks)):
      rgb_mask = get_coloured_mask(masks[i])
      img = cv2.addWeighted(img, 1, rgb_mask, 0.5, 0)
      x1, y1 = boxes[i][0][0],boxes[i][0][1]
      x2, y2 = boxes[i][1][0],boxes[i][1][1]
      #print(boxes[i])
      centroid = get_centroid(x1,y1,x2,y2)
      obj_class = pred_cls[i]
      matches.append((centroid,obj_class))
      
      cv2.circle(img,centroid, 3, (0,0,255), -1)

      #vertical line
      cv2.line(img, (line_width,0), (line_width, height), (255,255,0), 6)

      #horizontal line
      #cv2.line(img,(0,line_height),(width,line_height),(255,0,255),6)
      
      cv2.rectangle(img, boxes[i][0], boxes[i][1],color=(0, 255, 0), thickness=rect_th)
      cv2.putText(img,pred_cls[i]+' '+"{:.2f}".format((pred_s[i])), boxes[i][0], cv2.FONT_HERSHEY_SIMPLEX, text_size, (0,255,0),thickness=text_th)
    #print('num_matches=',len(matches))
    
    for (x,y),label in matches:
      #print((x,y),label)
        
      if (x<line_width+offset) and (x>line_width-offset) and (y>line_height):
        cv2.line(img, (line_width,0), (line_width, height), (0,100,255), 6)
        num_obj+=1
        if (label == 'car'):
          num_car += 1
        elif (label == 'motorcycle'):
          num_motorcycle += 1
        elif (label == 'bus'):
          num_bus += 1
        else:
          num_truck += 1
        #matches.remove(((x,y),label))
        #print(((x,y),label))
    print('total_obj=',num_obj)
    cv2.putText(img, "Total Objs Detected: " + str(num_obj), (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1,
                    (0, 170, 0), 2)
    
    cv2.putText(img, "Total Cars Detected: " + str(num_car), (10, 80), cv2.FONT_HERSHEY_SIMPLEX, 1,
                    (0, 170, 0), 2)
    
    cv2.putText(img, "Total Motorcycles Detected: " + str(num_motorcycle), (10, 110), cv2.FONT_HERSHEY_SIMPLEX, 1,
                    (0, 170, 0), 2)
    
    cv2.putText(img, "Total Buses Detected: " + str(num_bus), (10, 140), cv2.FONT_HERSHEY_SIMPLEX, 1,
                    (0, 170, 0), 2)
    
    cv2.putText(img, "Total Trucks Detected: " + str(num_truck), (10, 170), cv2.FONT_HERSHEY_SIMPLEX, 1,
                    (0, 170, 0), 2)
    '''
    plt.figure(figsize=(15,15))
    plt.imshow(img)
    plt.xticks([])
    plt.yticks([])
    plt.show()
    '''
    return img

In [None]:
VIDEO_STREAM = "/content/drive/MyDrive/traffic_day.mp4"
VIDEO_STREAM_OUT = "/content/drive/MyDrive/test_day6.mp4"

vs = cv2.VideoCapture(VIDEO_STREAM)
writer = None

#vs.set(cv.CAP_PROP_FPS, 25);
num_obj=0
num_car=0
num_motorcycle = 0
num_bus = 0
num_truck = 0
i = 0
while (True):
  print(i)
  # read the next frame from the file
  (grabbed, frame) = vs.read()
  i += 1
   
  # If the frame was not grabbed, then we have reached the end
  # of the stream
  if not grabbed:
    print ("Not grabbed.")
    break;
    
  
  masked_frame = segment_instance(frame, confidence=0.7)
  
  # Check if the video writer is None
  if writer is None:
    # Initialize our video writer
    fourcc = cv2.VideoWriter_fourcc(*"XVID")
    writer = cv2.VideoWriter(VIDEO_STREAM_OUT, fourcc, 30,
      (masked_frame.shape[1], masked_frame.shape[0]), True)
   
  # Write the output frame to disk
  writer.write(masked_frame)
  
# Release the file pointers
print("[INFO] cleaning up...")
writer.release() 