In [1]:
# mounted to my drive so no need for manual authentication
# from google.colab import drive
# drive.mount('/content/drive')

Install Detectron2

In [2]:
%%capture
# install dependencies: 
!pip install pyyaml==5.1
import torch, torchvision
print(torch.__version__, torch.cuda.is_available())
!gcc --version
# opencv is pre-installed on colab

In [3]:
# install detectron2: (Colab has CUDA 10.1 + torch 1.7)
# See https://detectron2.readthedocs.io/tutorials/install.html for instructions
import torch
assert torch.__version__.startswith("1.7")
!pip install detectron2 -f https://dl.fbaipublicfiles.com/detectron2/wheels/cu101/torch1.7/index.html
# exit(0)  # After installation, you need to "restart runtime" in Colab. This line can also restart runtime

Looking in links: https://dl.fbaipublicfiles.com/detectron2/wheels/cu101/torch1.7/index.html


In [4]:
%%capture
# Some basic setup:
# Setup detectron2 logger
import detectron2
from detectron2.utils.logger import setup_logger
setup_logger()

# import some common libraries
import numpy as np
import os, json, cv2, random
from google.colab.patches import cv2_imshow

# import some common detectron2 utilities
from detectron2 import model_zoo
from detectron2.engine import DefaultPredictor
from detectron2.config import get_cfg
from detectron2.utils.visualizer import Visualizer
from detectron2.data import MetadataCatalog, DatasetCatalog

Import trained model from directory

In [5]:
cfg = get_cfg()
cfg.merge_from_file(model_zoo.get_config_file("COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_3x.yaml"))
cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.8 # Set threshold for this model
cfg.MODEL.WEIGHTS = '/content/drive/MyDrive/Scott_Portfolio/traffic_control/model_650.pth' # Set path model .pth
cfg.MODEL.ROI_HEADS.NUM_CLASSES = 2
predictor = DefaultPredictor(cfg)

Just running some images to test imported model, notice the model is mostly interested in vehicles coming towards the camera since the model is trained this way

In [12]:
# calculation support functions
def area_triangle(p1, p2, p3):
  area = 0.5*(p1[0]*(p2[1]-p3[1])+p2[0]*(p3[1]-p1[1])+p3[0]*(p1[1]-p2[1]))
  return abs(area)

def inside_tetragon(pt, p1, p2, p3, p4):
  area_true = area_triangle(p1,p2,p3) + area_triangle(p1,p3,p4)
  area_point = area_triangle(pt, p1, p2) + area_triangle(pt, p2, p3) + area_triangle(pt, p3, p4) + area_triangle(pt, p4, p1)
  #print(area_triangle(pt, p1, p2),area_triangle(pt, p2, p3),area_triangle(pt, p3, p4),area_triangle(pt, p4, p1))
  #print("true=", area_true, "check=",area_point)
  if area_true < area_point:
    return False
  else:
    return True

def draw_traffic_light(im, motion):
  if motion:
    cv2.circle(im,(1790,150),50,green,-1)
  else:
    cv2.circle(im,(1790,150),50,red,-1)

def draw_timer(im, clr_timer):
  if clr_timer:
    t.append(0)
    #print("clear timer")
  else:
    t.append(round(t[-1]+1/30.0*fps_scale,2))
    #print("continue")
  cv2.putText(im,f'{t[-1]}s',(1600,300), cv2.FONT_HERSHEY_SIMPLEX, 3,(0,0,0),2,cv2.LINE_AA)

In [7]:
# motiong tracking helper functions
def dis(l1, l2):
  if l1[0] == -1 or l1[1] == -1:
    d = 9999   # return big distance for no detection
  else:
    d = pow(pow((l2[0]-l1[0]),2) + pow((l2[1]-l1[1]),2),0.5)   #consider signed differences?
  return d
  

def car_timer(lane,lane_new): #obtain minimum movement across both lanes
  for i in range(2):
    d = [dis(lane[0], lane_new[0]), dis(lane[1], lane_new[1])]
  #print(min(d))
  return min(d)

def buff_timer(buffer, buff_size, movement, thresh):
  last_motion = buffer[-1]
  # append 1 or 0 depending on motion
  if movement < thresh:
    buffer.append(0)
  else:
    buffer.append(1)
  n = len(buffer)
  move_count = 0
  motion = 0
  for x in buffer:
    if x == 1:
      move_count = move_count + 1
  if move_count/n > buff_thresh:
    motion = 1
  if n > buff_size: # limit buffer size
    buffer.pop(0)
  if motion == last_motion:
    return 0, motion # motion is consistent and timer continues
  else:
    return 1, motion  # motion has changed and timer restarts

In [15]:
# get detection outputs and obtain boundaries and centers
def process_im(im, lane, buffer): # t is the time on current timer
  lane_new = [(-1,-1),(-1,-1)]
  outputs = predictor(im)
  car_centers = outputs["instances"].pred_boxes.get_centers().data.cpu().numpy()
  car_boxes = outputs["instances"].pred_boxes.tensor.cpu().numpy()
  classes = outputs["instances"].pred_classes.cpu().numpy()
  # defining some local parameters
  aoi = [(1261,920), (1514,930), (1784,939),(1167,523),(1280,519),(1398,521)] #area of interest (in terms of detection and processing)
  wid = 4 
  # draw area of interest on frame
  cv2.line(im,aoi[0],aoi[3],green,wid)
  cv2.line(im,aoi[1],aoi[4],green,wid)
  cv2.line(im,aoi[2],aoi[5],green,wid)
  cv2.line(im,aoi[0],aoi[2],green,wid)

  # now we process the centers to only display the front row vehicles detected in the area of interest (two lanes on on-coming traffic)
  i = 0
  line1 = 0
  id = [-1,-1]
  line2 = 0
  for center in car_centers:
    # check if center is in area1 or area2
    # obtain index for each area, if none then -1
    if inside_tetragon(center, aoi[0], aoi[1], aoi[4], aoi[3]) is True:
      if car_centers[i][1] > line1 and classes[i] == 0: # exclude motorcycles because they always skip the line
        line1 = car_centers[i][1]
        id[0] = i
    if inside_tetragon(center, aoi[1], aoi[2], aoi[5], aoi[4]) is True:
      if car_centers[i][1] > line2 and classes[i] == 0:
        line2 = car_centers[i][1]
        id[1] = i
    i = i+1
  #print(id)
  
  # draw bounding boxes for detected vehicles and output information for motion tracking
  for i in range(2):
    if id[i] > -1: # if i = -1 then no vehicles in the region
      #cv2.circle(im,(car_centers[i][0],car_centers[i][1]),20,(0,0,255),-1)
      lane_new[i] = car_centers[id[i]]
      cv2.putText(im,'Car',(car_boxes[id[i]][0], int(car_boxes[id[i]][1]-6)), cv2.FONT_HERSHEY_SIMPLEX, 1,red,2,cv2.LINE_AA)
      cv2.rectangle(im, (car_boxes[id[i]][0], car_boxes[id[i]][1]), (car_boxes[id[i]][2], car_boxes[id[i]][3]), red, 5)
  #cv2_imshow(im)

  # movement detection and timer for traffic lights
  movement = car_timer(lane,lane_new)
  #cv2.putText(im,f'{movement}',(300,200), cv2.FONT_HERSHEY_SIMPLEX, 3,red,2,cv2.LINE_AA)
  clr_timer,motion = buff_timer(buffer,buff_size,movement,thresh)
  draw_traffic_light(im, motion)
  draw_timer(im, clr_timer)
  
    # cv2.putText(im,"ticking",(300,400), cv2.FONT_HERSHEY_SIMPLEX, 3,red,2,cv2.LINE_AA)

  return im, lane_new

Here we start processing the video, simulating processing real-time surveillance footage, output to output.mp4

In [16]:
%%time
import numpy as np
from google.colab.patches import cv2_imshow
from IPython.display import clear_output

cap = cv2.VideoCapture('/content/drive/MyDrive/Scott_Portfolio/traffic_control/traffic_long.mp4')
f_time = {0, 25, 57} # specific seconds when you want a picture to be exported to directory (for sample collection)
lane = [(-1,-1),(-1,-1)] # initialize some global variables
buff_thresh = 0.5
buff = [0]
fps_scale = 5
thresh = 4   # distance threshold
buff_size = int(30/fps_scale)
t = [0]
green = (0,131,81)
red = (16,30,187)

if cap.isOpened():
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    res=(int(width), int(height))
    # this format fail to play in Chrome/Win10/Colab
    # fourcc = cv2.VideoWriter_fourcc(*'MP4V') #codec
    # fourcc = cv2.VideoWriter_fourcc(*'H264') #codec
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter('/content/drive/MyDrive/Scott_Portfolio/traffic_control/output_v1.mp4', fourcc, 30.0/fps_scale, res)

    frame = None
    frame_count = 0
    time = 0
    while (time < 300):
        time = int(frame_count/30)
        frame_count = frame_count + 1
        try:
            is_success, frame = cap.read()
        except cv2.error:
            continue

        if not is_success:
            break

        # if time in f_time:
        # if (time%2 == 0): #capture every 2 seconds
          # cv2.imwrite('/content/drive/MyDrive/Scott_Portfolio/traffic_control/frames/frame'+str(time)+"s.jpg", frame)
          # f_time.remove(time)

        # processing the image
        if (frame_count%fps_scale == 0):
          #print(time,t[-1])
          image,lane = process_im(frame,lane,buff)
          #image = process_im_v2(frame)
          out.write(frame)

    out.release() 

cap.release()

CPU times: user 6min 37s, sys: 1min 23s, total: 8min
Wall time: 6min 42s
