# Exercise 1.2

Installing libraries: 

In [13]:
%pip install opencv-python numpy tabulate

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 23.3.2 -> 24.0
[notice] To update, run: python.exe -m pip install --upgrade pip


Import libraries

In [14]:
import numpy as np
import cv2
import tabulate
import time

The higher the value, the bigger a moving object should be to be detected

In [15]:
minimun_area_contour = 4000

In [16]:
class Tracker:
    """
    Tracks objects in a video stream.
    """
    class TrackingObject:
        """
        Represents a tracked object.
        """
        def __init__(self, initial_contour, tracking_id, ttl) -> None:
            """
            Initializes a tracked object with its initial state.

            initial_contour: The contour of the object in the initial frame.
            tracking_id: A unique identifier for the object.
            ttl: Time-to-live, the number of frames until the object is considered lost.
            """
            self.tracking_id = tracking_id
            self.ttl = ttl
            self.location_history = []
            self.centroid = Tracker.calculate_centroid(initial_contour)
            self.updated = True

            # Set up the Kalman filter for location prediction
            filter_2d_kalman = cv2.KalmanFilter(4, 2)
            filter_2d_kalman.measurementMatrix = np.array([[1, 0, 0, 0], [0, 1, 0, 0]], np.float32)
            filter_2d_kalman.transitionMatrix = np.array([[1, 0, 1, 0], [0, 1, 0, 1], [0, 0, 1, 0], [0, 0, 0, 1]], np.float32)
            filter_2d_kalman.processNoiseCov = np.array([[1, 0, 0, 0], [0, 1, 0, 0], [0, 0, 1, 0], [0, 0, 0, 1]], np.float32) * 1e-4
            self.filter_2d_kalman = filter_2d_kalman

        def status_update_reset(self):
            """
            Marks the object as not updated.
            """
            self.updated = False

        def contour_update(self, contour):
            """
            Updates the object's location based on a new contour.
            """
            self.updated = True
            self.location_history.append(self.centroid)
            self.centroid = Tracker.calculate_centroid(contour)

            # Input the new location into the Kalman filter for future predictions
            self.filter_2d_kalman.predict()
            self.filter_2d_kalman.correct(np.array([[np.float32(self.centroid[0])], [np.float32(self.centroid[1])]]))
        
        def filter_kalman_update(self):
            """
            Updates the object's location based on the Kalman filter's prediction.
            """
            if len(self.location_history) < 5:
                return

            kalman_prediction = self.filter_2d_kalman.predict()
            self.centroid = (int(kalman_prediction[0]), int(kalman_prediction[1]))
            self.location_history.append(self.centroid)

        def x_axis_direction(self) -> float:
            """
            Determines the horizontal movement direction of the object.
            """
            if len(self.location_history) < 5:
                return 0

            x_values = np.array(self.location_history)[:, 0]
            measure_of_time = np.arange(len(x_values))
    
            # Linear regression to determine movement direction
            contour_slope, _ = np.polyfit(measure_of_time, x_values, 1)
    
            return contour_slope

    def __init__(self, proximity_threshold = 20, object_ttl = 10) -> None:
        self.objects = []
        self.last_tracking_id = 0
        self.proximity_threshold = proximity_threshold
        self.object_ttl = object_ttl
        self.onObjectRemoved = None

    @classmethod
    def calculate_centroid(cls, contour):
        """
        Computes the center point of a contour.
        """
        M = cv2.moments(contour)
        if M["m00"] == 0:
            return None

        centroid_x = int(M["m10"] / M["m00"])
        centroid_y = int(M["m01"] / M["m00"])

        return (centroid_x, centroid_y)

    def update(self, video_contours):
        """
        Processes a new set of contours to update tracked objects.
        """
        contours_originals = video_contours.copy()
        video_contours = video_contours.copy()

        # Mark all objects as not updated
        [obj.status_update_reset() for obj in self.objects]

        # Match contours with existing tracked objects
        for i, contour in enumerate(video_contours):
            centroid_point = Tracker.calculate_centroid(contour)

            if centroid_point is None:
                continue

            nearest_object = None
            nearest_distance = float('inf')
            for obj in self.objects:
                distance_measure = np.linalg.norm(np.array(obj.centroid) - np.array(centroid_point))
                if distance_measure < nearest_distance:
                    nearest_object = obj
                    nearest_distance = distance_measure

            if nearest_distance < self.proximity_threshold:
                nearest_object.contour_update(contour)
                video_contours[i] = None

        # Introduce new objects for unmatched contours
        for contour in video_contours:
            if contour is None:
                continue

            for object in self.objects:
                if cv2.pointPolygonTest(contour, object.centroid, False) > 0:
                    break
            else:
                self.last_tracking_id += 1
                self.objects.append(Tracker.TrackingObject(contour, self.last_tracking_id, self.object_ttl))

        # Predict the location for objects not updated
        [obj.filter_kalman_update() for obj in self.objects if not obj.updated]

        # Clean up lost objects
        for obj in self.objects:
            for contour in contours_originals:
                if cv2.pointPolygonTest(contour, obj.centroid, False) > 0:
                    break
            else:
                obj.ttl -= 1

                if obj.ttl <= 0:
                    self.objects.remove(obj)

                    if self.onObjectRemoved is not None:
                        self.onObjectRemoved(obj)


In [17]:
def count_cars_going_left(file: str, debug_mode=False, slow_motion=False) -> (int, float):
    """
    Counts the number of cars moving left in a video file.

    Parameters:
        file: The file path to the video to be analyzed.

    Optional Parameters:
        debug: If True, displays additional information for troubleshooting purposes.
        slow_motion: If True, plays back the video at a reduced speed for closer inspection.

    Returns:
        A pair consisting of the total count of left-moving cars and their frequency per minute.
    """

    # Initialize video capture
    video = cv2.VideoCapture(file)
    assert video.isOpened(), "Failed to open the specified video file."

    # Setup for detecting moving objects
    background_subtractor = cv2.createBackgroundSubtractorKNN(detectShadows=True, history=10000, dist2Threshold=400)

    # Initialize the object tracking system
    object_tracker = Tracker()

    # Prepare a debugging window if requested
    if debug_mode:
        cv2.namedWindow("Debugging Output")

    # Keep track of cars moving left
    ids_cars_counted = set()

    # Callback for when an object is no longer being tracked
    def onObjectRemoved(obj):
        if (obj.tracking_id not in ids_cars_counted 
            and obj.x_axis_direction() < -2 
            and len(obj.location_history) > 80):

            ids_cars_counted.add(obj.tracking_id)

    object_tracker.onObjectRemoved = onObjectRemoved

    # Main loop for frame processing
    while True:
        read_success, video_frame = video.read()

        if not read_success:
            break

        # Apply Gaussian blur to smoothen the frame
        blurred_image = cv2.GaussianBlur(video_frame, (5, 5), 0)

        # Create a mask for foreground objects
        foreground_mask = background_subtractor.apply(blurred_image)

        # Clean up the mask to remove shadows
        _, foreground_mask = cv2.threshold(foreground_mask, 100, 255, cv2.THRESH_BINARY)

        # Further refine the mask to eliminate noise
        morphological_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5))
        foreground_mask = cv2.morphologyEx(foreground_mask, cv2.MORPH_CLOSE, morphological_kernel, iterations=2)

        # Identify potential cars using contours
        video_contours, _ = cv2.findContours(foreground_mask, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)

        # Eliminate contours that are too small or located in the top half of the frame
        video_contours = [c for c in video_contours if cv2.contourArea(c) > minimun_area_contour and cv2.boundingRect(c)[1] > video_frame.shape[0] / 2]
        video_contours = [cv2.convexHull(c) for c in video_contours]

        # In debug mode, draw contours on the frame for visualization
        if debug_mode:
            cv2.drawContours(video_frame, video_contours, -1, (0, 255, 0), 2)

        # Inform the tracker about the detected contours
        object_tracker.update(video_contours)

        # If in debug mode, annotate the frame with tracking info
        if debug_mode:
            for obj in object_tracker.objects:
                cv2.putText(video_frame, f"Car #{obj.tracking_id}", obj.centroid, cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
                cv2.circle(video_frame, obj.centroid, 2, (0, 0, 255), -1)
        
        
        cv2.putText(video_frame, f'Total Leftward Cars: {len(ids_cars_counted)}', (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
        cv2.imshow("Debugging Output", video_frame)

        # Slow-motion playback if enabled
        if slow_motion:
            time.sleep(0.05)

        # Exit on 'q' key press
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break


    # Cleanup on exit
    if debug_mode:
        cv2.destroyWindow("Debug View")
        cv2.waitKey(1)

    # Calculate the final counts and rate
    car_count = len(ids_cars_counted)
    count_of_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
    frames_per_second = int(video.get(cv2.CAP_PROP_FPS))
    duration_in_seconds = count_of_frames / frames_per_second
    cars_x_minute = car_count / duration_in_seconds * 60

    video.release()

    return (car_count, cars_x_minute)


In [18]:
file_paths = ["Traffic_Laramie_1.mp4", "Traffic_Laramie_2.mp4"]
table_data = [
    [file, *count_cars_going_left(file)] for file in file_paths
]

  self.centroid = (int(kalman_prediction[0]), int(kalman_prediction[1]))


In [19]:
table = tabulate.tabulate(table_data, tablefmt='html', headers=["", "Total number of cars", "Cars per minute"])
table

Unnamed: 0,Total number of cars,Cars per minute
Traffic_Laramie_1.mp4,6,2.02338
Traffic_Laramie_2.mp4,4,2.27101
