# Exercise 1.2

This application utilizes the same background extraction mechanism as the Exercise 1.1. To keep track of moving objects, this application has a Tracker class that internally has a list of objects with its coordinates and updates the coordinates of the objects based on the contours found in each frame. The movements of objects are predicted by using a Kalman filter implemented by cv2.KalmanFilter() method.

The Kalman filter is a mathematical model that uses noisy measurements observed over time to produce estimates of unknown variables. In this case, the unknown variables are the coordinates of the objects. The Kalman filter is used to predict the next position of the objects based on the previous positions. We assume that a car cannot disappear in the middle of the scene. So the predicted position is used when the objects are temporarily not detected during few frames. It makes the tracking more robust.

Install necessary packages:

In [1]:
%pip install opencv-python numpy tabulate

Note: you may need to restart the kernel to use updated packages.


Import necessary packages:

In [2]:
import time

import cv2
import numpy as np
import tabulate

The higher the value, the bigger a moving object should be to be detected

In [3]:
MIN_CONTOUR_AREA = 4000

In [4]:
class Tracker:
    """
    Tracks objects in a video stream.
    """
    class TrackingObject:
        """
        Represents a tracked object.
        """
        def __init__(self, initial_contour, tracking_id, ttl) -> None:
            """
            Initializes a new tracked object.

            initial_contour: the contour of the object in the first frame it appeared in.
            tracking_id: the unique ID of the object to keep track of it.
            ttl: the number of subsequent frames the object can be missing before it is removed.
            """
            self.tracking_id = tracking_id
            self.ttl = ttl
            self.location_history = []
            self.centroid = Tracker.calc_centroid(initial_contour)
            self.updated = True

            # Initialize the Kalman filter
            # It will be used to predict the location of the object in the next frame if it is not covered by any contour
            # it improves the robustness of the tracker
            kalman_2d = cv2.KalmanFilter(4, 2)
            kalman_2d.measurementMatrix = np.array([[1, 0, 0, 0], [0, 1, 0, 0]], np.float32)
            kalman_2d.transitionMatrix = np.array([[1, 0, 1, 0], [0, 1, 0, 1], [0, 0, 1, 0], [0, 0, 0, 1]], np.float32)
            kalman_2d.processNoiseCov = np.array([[1, 0, 0, 0], [0, 1, 0, 0], [0, 0, 1, 0], [0, 0, 0, 1]], np.float32) * 1e-4
            self.kalman_2d = kalman_2d

        def reset_updated(self):
            """
            Resets the updated flag to False.
            """
            self.updated = False

        def update_contour(self, contour):
            """
            Updates the object with a new contour.
            """
            self.updated = True
            self.location_history.append(self.centroid)
            self.centroid = Tracker.calc_centroid(contour)

            # Update the Kalman filter with the new location
            self.kalman_2d.predict()
            self.kalman_2d.correct(np.array([[np.float32(self.centroid[0])], [np.float32(self.centroid[1])]]))
        
        def update_kalman(self):
            """
            Updates the object with a new Kalman prediction.
            """
            if len(self.location_history) < 5:
                return

            prediction = self.kalman_2d.predict()
            self.centroid = (int(prediction[0]), int(prediction[1]))
            self.location_history.append(self.centroid)

        def x_direction(self) -> float:
            """
            Calculates how much the object directed towards the right or left.
            """
            # We need at least 5 points to make a prediction
            if len(self.location_history) < 5:
                return 0

            xs = np.array(self.location_history)[:, 0]
            time = np.arange(len(xs))
    
            # Compute the slope and intercept of the line of best fit
            slope, _ = np.polyfit(time, xs, 1)
    
            # slope > 0 is right, slope < 0 is left, slope = 0 is no movement
            return slope

    def __init__(self, proximity_threshold = 20, object_ttl = 10) -> None:
        self.objects = []
        self.last_tracking_id = 0
        self.proximity_threshold = proximity_threshold
        self.object_ttl = object_ttl
        self.onObjectRemoved = None

    @classmethod
    def calc_centroid(cls, contour):
        """
        Calculates the centroid of a contour.
        """
        M = cv2.moments(contour)
        if M["m00"] == 0:
            # Avoid division by zero
            return None

        cX = int(M["m10"] / M["m00"])
        cY = int(M["m01"] / M["m00"])

        return (cX, cY)

    def update(self, contours):
        """
        Updates the tracker with a new frame information: contours and centroids.
        """
        original_contours = contours.copy()
        contours = contours.copy()

        # Reset the updated flag for all objects
        [obj.reset_updated() for obj in self.objects]

        # Find the closest object to each contour centroid
        for i, contour in enumerate(contours):
            # Find the centroid of each contour
            centroid = Tracker.calc_centroid(contour)

            if centroid is None:
                continue

            # Find the closest object to the centroid
            closest_object = None
            closest_distance = float('inf')
            for obj in self.objects:
                distance = np.linalg.norm(np.array(obj.centroid) - np.array(centroid))
                if distance < closest_distance:
                    closest_object = obj
                    closest_distance = distance

            # If the closest object is close enough, update it
            if closest_distance < self.proximity_threshold:
                closest_object.update_contour(contour)
                # Remove the contour from the list
                contours[i] = None

        # Add the remaining contours as new objects
        for contour in contours:
            if contour is None:
                continue

            for object in self.objects:
                # If the contour is already have any object, skip it
                if cv2.pointPolygonTest(contour, object.centroid, False) > 0:
                    break
            else:
                # Please note that the first object will have ID 1 will be the whole frame
                self.last_tracking_id = self.last_tracking_id + 1
                self.objects.append(Tracker.TrackingObject(contour, self.last_tracking_id, self.object_ttl))

        [obj.update_kalman() for obj in self.objects if not obj.updated]

        # Remove objects that are not covered by any contour
        for obj in self.objects:
            for contour in original_contours:
                if cv2.pointPolygonTest(contour, obj.centroid, False) > 0:
                    break
            else:
                # The object is not covered by any contour
                # Decrease its TTL
                obj.ttl -= 1

                # remove it if it reaches zero
                if obj.ttl <= 0:
                    self.objects.remove(obj)

                    # Notify the listener (callback)
                    if self.onObjectRemoved is not None:
                        self.onObjectRemoved(obj)


In [5]:
def count_cars_going_left(file: str, debug=False, slow_motion=False) -> (int, float):
    """
    Count number of cars going to the city centre
    
    Parameters:
        file: path to the video file to be processed

    Optional keyword arguments:
        debug: if True, the video will be displayed with the contours and tracking objects
        slow_motion: if True, the video will be displayed in slow motion

    Returns a tuple of the number of cars going to the city centre and the cars per minute.
    """

    # Open the sample video file:
    video = cv2.VideoCapture(file)

    # Ensure that the file was opened correctly:
    assert video.isOpened(), "Can't open the video file"

    # Initialize an instance of [Gaussian Mixture-based Background/Foreground Segmentation Algorithm](https://docs.opencv.org/3.4/d7/d7b/classcv_1_1BackgroundSubtractorMOG2.html):
    bg_subtractor = cv2.createBackgroundSubtractorKNN(detectShadows=True, history=10000, dist2Threshold=400)

    # Instantiate the tracker
    tracker = Tracker()

    if debug:
        # Initialize the window to display the debug video:
        cv2.namedWindow("Cam")

    # A set to keep track of the counted cars
    counted_car_ids = set()

    # Define the callback function for the tracker to be called when an object is removed
    def onObjectRemoved(obj):
        if obj.tracking_id not in counted_car_ids and obj.x_direction() < -2 and len(obj.location_history) > 80:
            counted_car_ids.add(obj.tracking_id)

    # Set the callback function for the tracker to be called when an object is removed
    tracker.onObjectRemoved = onObjectRemoved

    # Read the video frame by frame:
    while True:
        ret, frame = video.read()

        # Check for the last frame
        if not ret:
            break

        # blur the frame to remove noise
        blur = cv2.GaussianBlur(frame, (5,5), 0)

        # Apply the background subtraction algorithm:
        fg_mask = bg_subtractor.apply(blur)
        
        # Threshold the foreground mask to remove the shadows:
        _, fg_mask = cv2.threshold(fg_mask, 100, 255, cv2.THRESH_BINARY)

        # Apply morphological operations to remove corruptions:
        kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5))
        fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, kernel, iterations=2)

        # Find the contours of the detected objects:
        contours, _ = cv2.findContours(fg_mask, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)

        # Filter out the contours that are too small to be a car:
        contours = [c for c in contours if cv2.contourArea(c) > MIN_CONTOUR_AREA]

        # Filter out the contours that in the upper half of the frame,
        # because we are only interested in the cars that are running along the Main Street:
        contours = [c for c in contours if cv2.boundingRect(c)[1] > frame.shape[0] / 2]

        # Convex hull to get clean contours without holes:
        contours = [cv2.convexHull(c) for c in contours]

        if debug:
            # Draw the contours on the original image:
            cv2.drawContours(frame, contours, -1, (0, 255, 0), 2)

        # # Update the tracker with contours:
        tracker.update(contours)

        if debug:
            # Draw tracking objects:
            for obj in tracker.objects:
                cv2.putText(frame, f"Car #{obj.tracking_id}", obj.centroid, cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
                cv2.circle(frame, obj.centroid, 2, (0, 0, 255), -1)

            cv2.putText(frame, f'Number of cars go to the city centre: {len(counted_car_ids)}', (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)

            
            # Display the resulting frame
            cv2.imshow("Cam", frame)

            if slow_motion:
                # Slow down the video for debugging purposes 
                time.sleep(0.05)

            # Press "q" to exit the loop
            if (cv2.waitKey(1) & 0xFF == ord('q')):
                break

    # When everything done, close the debug output window
    if debug:
        cv2.destroyWindow("Cam")
        cv2.waitKey(1)

    # Number of cars going to the city centre
    number_of_cars = len(counted_car_ids)

    # Get the video duration in seconds
    frame_count = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = int(video.get(cv2.CAP_PROP_FPS))
    duration_seconds = frame_count / fps

    # Cars per minute
    cars_per_minute = number_of_cars / duration_seconds * 60

    # Close the video file:
    video.release()

    return (len(counted_car_ids), cars_per_minute)

In [6]:
FILES = ["Traffic_Laramie_1.mp4", "Traffic_Laramie_2.mp4"]
table_data = [
    [file, *count_cars_going_left(file)] for file in FILES
]

  self.centroid = (int(prediction[0]), int(prediction[1]))


In [7]:
table = tabulate.tabulate(table_data, tablefmt='html', headers=["", "Total number of cars", "Cars per minute"])
table

Unnamed: 0,Total number of cars,Cars per minute
Traffic_Laramie_1.mp4,6,2.02338
Traffic_Laramie_2.mp4,4,2.27101
