# Vehicle Detection, tracking, and speed estimation
https://www.pyimagesearch.com/2019/12/02/opencv-vehicle-detection-tracking-and-speed-estimation/

In [1]:
from pyimagesearch.centroidtracker import CentroidTracker
from imutils.video import VideoStream
from imutils.io import TempFile
from imutils.video import FPS
from datetime import datetime
from threading import Thread
import numpy as np
import argparse
import imutils
import dlib
import time
import cv2
import os

## Classes and functions

In [2]:
class TrackableObject:
    def __init__(self, objectID, centroid):
        # Store the object ID, then initialize a list of centroids using the current centroid
        self.objectID = objectID
        self.centroids = [centroid]
        
        # Initialize a dictionaries to store the timestamp and position of the object at various points
        self.timestamp = {"A": 0, "B": 0, "C": 0, "D": 0}
        self.position = {"A": None, "B": None, "C": None, "D": None}
        self.lastPoint = False
        
        # Initialize the object speeds in MPH and KMPH
        self.speedMPH = None
        self.speedKMPH = None
        
        # Initialize two booleans, (1) used to indicate if the object's speed has already been
        # estimated or not, and (2) used to indidicate if the object's speed has been logged or not
        self.estimated = False
        self.logged = False
        
        # Initialize the direction of the object
        self.direction = None
        
    def calculate_speed(self, estimatedSpeeds):
        # Calculate the speed in KMPH and MPH
        self.speedKMPH = np.average(estimatedSpeeds)
        MILES_PER_ONE_KILOMETER = 0.621371
        self.speedMPH = self.speedKMPH * MILES_PER_ONE_KILOMETER

## Configuration

In [3]:
# Maximum consecutive frames a given object is allowed to be marked as "disappeared" until
# we need to deregister the object from tracking
max_disappear = 10

# Maximum distance between centroids to associate an object. If the distance is larger than
# this maximum distance, we'll start to mark the object as "disappeared"
max_distance = 175

# Number of frames to perform object tracking instead of object detection
track_object = 4

# Minimum confidence
confidence = 0.4

# Frame width in pixels
frame_width = 400

# Dictionary holding the different speed estimation columns
speed_estimation_zone = {"A": 120, "B": 160, "C": 200, "D": 240}

# Real world distance in meters
distance = 16

# Speed limit in Km/h
speed_limit = 30

# Video path
input_path = 'sample_data/cars.mp4'

# Path to the object detection model
model_path = "model/MobileNetSSD_deploy.caffemodel"

# Path to the prototxt file of the object detection model
prototxt_path = "model/MobileNetSSD_deploy.prototxt"

# Output directory and csv file name
output_path = "output"
csv_name = "log.csv"

## Initialization

In [4]:
# Initialize the list of class labels MobileNet SSD was trained to detect
CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat",
    "bottle", "bus", "car", "cat", "chair", "cow", "diningtable",
    "dog", "horse", "motorbike", "person", "pottedplant", "sheep",
    "sofa", "train", "tvmonitor"]

# Load our serialized model from disk
net = cv2.dnn.readNetFromCaffe(prototxt_path, model_path)

# Initialize the video stream, pointer to output video file, and frame dimensions
vs = cv2.VideoCapture(input_path)

H = None
W = None

# Instantiate our centroid tracker, then initialize a list to store each of our dlib correlation
# trackers, followed by a dictionary to map each unique object ID to a TrackableObject
ct = CentroidTracker(maxDisappeared=max_disappear, maxDistance=max_distance)
trackers = []
trackableObjects = {}

# Keep the count of total number of frames
totalFrames = 0

# Initialize the log file
logFile = None

# Initialize the list of various points used to calculate the avg of the vehicle speed
points = [("A", "B"), ("B", "C"), ("C", "D")]

# Start the frames per second throughput estimator
fps = FPS().start()

## Vehicle detection

In [5]:
# Loop over the frames of the video
while True:
    # Grab the next frame from the stream, store the current timestamp, and store the new date
    _, frame = vs.read()
    ts = datetime.now()
    newDate = ts.strftime("%m-%d-%y")
    
    # Check if the frame is None, if so, break out of the loop
    if frame is None:
        break
        
    # If the log file has not been created or opened
    if logFile is None:
        # Build the log file path and create/open the log file
        logPath = os.path.join(output_path, csv_name)
        logFile = open(logPath, mode="a")
        
        # Set the file pointer to end of the file
        pos = logFile.seek(0, os.SEEK_END)
        
        # If this is a empty log file, then write the column headings
        if pos == 0:
            logFile.write("Year,Month,Day,Time (in MPH),Speed\n")
            
    # Resize the frame
    frame = imutils.resize(frame, width=frame_width)
    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    
    # If the frame dimensions are empty, set them
    if W is None or H is None:
        (H, W) = frame.shape[:2]
        meterPerPixel = distance / W
        
    # Initialize our list of bounding box rectangles returned by
    # either (1) our object detector or (2) the correlation trackers
    rects = []
    
    # Check to see if we should run a more computationally expensive
    # object detection method to aid our tracker
    if totalFrames % track_object == 0:
        # Initialize our new set of object trackers
        trackers = []
        
        # Convert the frame to a blob and pass the blob through the
        # network and obtain the detections
        blob = cv2.dnn.blobFromImage(frame, size=(300, 300), ddepth=cv2.CV_8U)
        net.setInput(blob, scalefactor=1.0/127.5, mean=[127.5, 127.5, 127.5])
        detections = net.forward()
        
        # Loop over the detections
        for i in np.arange(0, detections.shape[2]):
            # Extract the confidence (i.e., probability) associated with the prediction
            conf = detections[0, 0, i, 2]
            # Filter out weak detections by ensuring the `confidence` is greater than 
            # the minimum confidence
            if conf > confidence:
                # Extract the index of the class label from the detections list
                idx = int(detections[0, 0, i, 1])
                
                # If the class label is not a car, ignore it
                if CLASSES[idx] != "car":
                    continue
                    
                # Compute the (x, y)-coordinates of the bounding box for the object
                box = detections[0, 0, i, 3:7] * np.array([W, H, W, H])
                (startX, startY, endX, endY) = box.astype("int")
                
                # Construct a dlib rectangle object from the bounding box coordinates and then
                # start the dlib correlation tracker
                tracker = dlib.correlation_tracker()
                rect = dlib.rectangle(startX, startY, endX, endY)
                tracker.start_track(rgb, rect)
                
                # Add the tracker to our list of trackers so we can utilize it during skip frames
                trackers.append(tracker)
                
                
    # Otherwise, we should utilize our object *trackers* rather than object *detectors* to obtain
    # a higher frame processing throughput
    else:
        # Loop over the trackers
        for tracker in trackers:
            # Update the tracker and grab the updated position
            tracker.update(rgb)
            pos = tracker.get_position()
            
            # Unpack the position object
            startX = int(pos.left())
            startY = int(pos.top())
            endX = int(pos.right())
            endY = int(pos.bottom())
            
            # Add the bounding box coordinates to the rectangles list
            rects.append((startX, startY, endX, endY))
            
    # Use the centroid tracker to associate the (1) old object
    # centroids with (2) the newly computed object centroids
    objects = ct.update(rects)
    
    # Loop over the tracked objects
    for (objectID, centroid) in objects.items():
        # Check to see if a trackable object exists for the current object ID
        to = trackableObjects.get(objectID, None)
        
        # If there is no existing trackable object, create one
        if to is None:
            to = TrackableObject(objectID, centroid)
            
        # Otherwise, if there is a trackable object and its speed has not yet been estimated,
        # then estimate it
        elif not to.estimated:
            # Check if the direction of the object has been set, if not, calculate it, and set it
            if to.direction is None:
                y = [c[0] for c in to.centroids]
                direction = centroid[0] - np.mean(y)
                to.direction = direction
                
            # If the direction is positive (indicating the object is moving from left to right)
            if to.direction > 0:
                # Check to see if timestamp has been noted for point A
                if to.timestamp["A"] == 0 :
                    # If the centroid's x-coordinate is greater than
                    # the corresponding point then set the timestamp
                    # as current timestamp and set the position as the
                    # centroid's x-coordinate
                    if centroid[0] > speed_estimation_zone["A"]:
                        to.timestamp["A"] = ts
                        to.position["A"] = centroid[0]
                        
                # Check to see if timestamp has been noted for point B
                elif to.timestamp["B"] == 0:
                    # If the centroid's x-coordinate is greater than
                    # the corresponding point then set the timestamp
                    # as current timestamp and set the position as the
                    # centroid's x-coordinate
                    if centroid[0] > speed_estimation_zone["B"]:
                        to.timestamp["B"] = ts
                        to.position["B"] = centroid[0]
                
                # Check to see if timestamp has been noted for point C
                elif to.timestamp["C"] == 0:
                    # If the centroid's x-coordinate is greater than
                    # the corresponding point then set the timestamp
                    # as current timestamp and set the position as the
                    # centroid's x-coordinate
                    if centroid[0] > speed_estimation_zone["C"]:
                        to.timestamp["C"] = ts
                        to.position["C"] = centroid[0]
                
                # Check to see if timestamp has been noted for point D
                elif to.timestamp["D"] == 0:
                    # If the centroid's x-coordinate is greater than
                    # the corresponding point then set the timestamp
                    # as current timestamp, set the position as the
                    # centroid's x-coordinate, and set the last point
                    # flag as True
                    if centroid[0] > speed_estimation_zone["D"]:
                        to.timestamp["D"] = ts
                        to.position["D"] = centroid[0]
                        to.lastPoint = True
                        
            # If the direction is negative (indicating the object
            # is moving from right to left)
            elif to.direction < 0:
                # Check to see if timestamp has been noted for point D
                if to.timestamp["D"] == 0:
                    # If the centroid's x-coordinate is lesser than
                    # the corresponding point then set the timestamp
                    # as current timestamp and set the position as the
                    # centroid's x-coordinate
                    if centroid[0] < speed_estimation_zone["D"]:
                        to.timestamp["D"] = ts
                        to.position["D"] = centroid[0]
                        
                # Check to see if timestamp has been noted for point C
                elif to.timestamp["C"] == 0:
                    # If the centroid's x-coordinate is lesser than
                    # the corresponding point then set the timestamp
                    # as current timestamp and set the position as the
                    # centroid's x-coordinate
                    if centroid[0] < speed_estimation_zone["C"]:
                        to.timestamp["C"] = ts
                        to.position["C"] = centroid[0]
                        
                # Check to see if timestamp has been noted for point B
                elif to.timestamp["B"] == 0:
                    # If the centroid's x-coordinate is lesser than
                    # the corresponding point then set the timestamp
                    # as current timestamp and set the position as the
                    # centroid's x-coordinate
                    if centroid[0] < speed_estimation_zone["B"]:
                        to.timestamp["B"] = ts
                        to.position["B"] = centroid[0]
                        
                # Check to see if timestamp has been noted for point A
                elif to.timestamp["A"] == 0:
                    # If the centroid's x-coordinate is lesser than
                    # the corresponding point then set the timestamp
                    # as current timestamp, set the position as the
                    # centroid's x-coordinate, and set the last point
                    # flag as True
                    if centroid[0] < speed_estimation_zone["A"]:
                        to.timestamp["A"] = ts
                        to.position["A"] = centroid[0]
                        to.lastPoint = True
                        
            # Check to see if the vehicle is past the last point and the vehicle's speed has not
            # yet been estimated, if yes, then calculate the vehicle speed and log it if it's
            # over the limit
            if to.lastPoint and not to.estimated:
                # Initialize the list of estimated speeds
                estimatedSpeeds = []
                
                # Loop over all the pairs of points and estimate the vehicle speed
                for (i, j) in points:
                    # Calculate the distance in pixels
                    d = to.position[j] - to.position[i]
                    distanceInPixels = abs(d)
                    
                    # Check if the distance in pixels is zero, if so, skip this iteration
                    if distanceInPixels == 0:
                        continue
                        
                    # Calculate the time in hours
                    t = to.timestamp[j] - to.timestamp[i]
                    timeInSeconds = abs(t.total_seconds())
                    timeInHours = timeInSeconds / (60 * 60)
                    
                    # Calculate distance in kilometers and append the calculated speed to the list
                    distanceInMeters = distanceInPixels * meterPerPixel
                    distanceInKM = distanceInMeters / 1000
                    estimatedSpeeds.append(distanceInKM / timeInHours)
                
                # Calculate the average speed
                to.calculate_speed(estimatedSpeeds)
                
                # Set the object as estimated
                to.estimated = True
                print("Speed of the vehicle that just passed is: {:.2f} KPH".format(to.speedKMPH))
        
        # Store the trackable object in our dictionary
        trackableObjects[objectID] = to
        
        # Draw both the ID of the object and the centroid of the object on the output frame
        text = "ID {}".format(objectID)
        cv2.putText(frame, text, (centroid[0] - 10, centroid[1] - 10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
        cv2.circle(frame, (centroid[0], centroid[1]), 4, (0, 255, 0), -1)
        
        # Check if the object has not been logged
        if not to.logged:
            # Check if the object's speed has been estimated and it is higher than the speed limit
            if to.estimated and to.speedKMPH > speed_limit:
                # Set the current year, month, day, and time
                year = ts.strftime("%Y")
                month = ts.strftime("%m")
                day = ts.strftime("%d")
                time = ts.strftime("%H:%M:%S")
                
                # Log the event in the log file
                info = "{},{},{},{},{}\n".format(year, month, day, time, to.speedKMPH)
                logFile.write(info)
                
                # Set the object has logged
                to.logged = True
                
    # Display the current frame to the screen and record if a user presses a key
    cv2.imshow("frame", frame)
    key = cv2.waitKey(1) & 0xFF
    
    # If the `q` key is pressed, break from the loop
    if key == ord("q"):
        break
        
    # Increment the total number of frames processed thus far and then update the FPS counter
    totalFrames += 1
    fps.update()
    
# Stop the timer and display FPS information
fps.stop()
print("Elapsed time: {:.2f}".format(fps.elapsed()))
print("Approx. FPS: {:.2f}".format(fps.fps()))

# Check if the log file object exists, if it does, then close it
if logFile is not None:
    logFile.close()
    
# Close any open windows
cv2.destroyAllWindows()

# Clean up
vs.release()

Speed of the vehicle that just passed is: 83.44 KPH
Speed of the vehicle that just passed is: 56.13 KPH
Speed of the vehicle that just passed is: 47.19 KPH
Speed of the vehicle that just passed is: 51.36 KPH
Speed of the vehicle that just passed is: 57.48 KPH
Elapsed time: 17.73
Approx. FPS: 39.70
