# ECSE 415: Final Project
Mathieu Geoffroy, 260986559
Ryan Reszetnik, 260948454


December 5th, 2023

In [26]:
import numpy as np
import cv2
import os
from ultralytics import YOLO
import torch
from collections import defaultdict
from efficientnet_pytorch import EfficientNet
from pathlib import Path

working_dir = os.path.curdir

In [27]:
device = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"

model = YOLO('yolov8n.pt').to(device)

# import YOLO labels from the dataset
yolo_labels = model.names

MODEL_F = 'b0.pth'

effnet = EfficientNet.from_pretrained('efficientnet-b0', in_channels=2, num_classes=1).to(device)
state = torch.load(MODEL_F, map_location=torch.device(device))
effnet.load_state_dict(state)
effnet.to(device);

Loaded pretrained weights for efficientnet-b0


In [28]:
def inference(of_f):
    of = np.load(of_f)
    # convert to float
    of = of.astype(np.float32)
    #change shape from (H,W,2) to (2,H,W)
    of = np.transpose(of, (2, 0, 1))
    #add batch dimension
    of = np.expand_dims(of, axis=0)
    i = torch.from_numpy(of).to(device)
    pred = effnet(i)
    del i
    torch.mps.empty_cache()
    return pred

In [29]:
def calc_optical_flow(video_name, frame_index, frame, prev_frame, show_flow=False):
    prev_frame_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
    frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    flow = cv2.calcOpticalFlowFarneback(prev_frame_gray, frame_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
    # save the optical flow
    np.save(f'{working_dir}/{video_name}/{frame_index}_optical_flow.npy', flow)
    
    # show the optical flow
    if show_flow:
        hsv = np.zeros_like(frame)
        hsv[...,1] = 255
        mag, ang = cv2.cartToPolar(flow[...,0], flow[...,1])
        hsv[...,0] = ang*180/np.pi/2
        hsv[...,2] = cv2.normalize(mag,None,0,255,cv2.NORM_MINMAX)
        rgb = cv2.cvtColor(hsv,cv2.COLOR_HSV2BGR)
        cv2.imshow('optical flow',rgb)

In [36]:
def analyze_video(video_path, display=True, show_flow=False, save=True):
    cap = cv2.VideoCapture(video_path)
    
    # get video name
    video_name = video_path.split('/')[-1].split('.')[0]
    
    out = cv2.VideoWriter(f'{video_name}_analyzed.mp4',cv2.VideoWriter_fourcc('m','p','4','v'), 30, (int(cap.get(3)),int(cap.get(4))))
    
    # Store the track history
    track_history = defaultdict(lambda: [])
    
    people_count = 0
    car_count = 0
    max_speed = 0

    frame_index = 0
    prev_frame = None
    
    # Loop through the video frames
    while cap.isOpened():
        # Read a frame from the video
        success, frame = cap.read()
    
        if success:
            start = cv2.getTickCount()  
            # Run YOLOv8 tracking on the frame, persisting tracks between frames
            results = model.track(frame, persist=True, verbose=False)
            
            # calculate optical flow
            if prev_frame is not None:
                calc_optical_flow(video_name, frame_index, frame, prev_frame, show_flow)
                # created optical flow image, now predict
                speed = inference(Path(f'{working_dir}/{video_name}/{frame_index}_optical_flow.npy')).item()
                print(f'Frame {frame_index} speed: {round(speed, 2)}')
                max_speed = max(max_speed, speed)
    
            # Get the boxes, track IDs, class, for the frame
            boxes = results[0].boxes.xywh.cpu()
            track_ids = results[0].boxes.id.int().cpu().tolist()
            classes = results[0].boxes.cls.int().cpu().tolist()
    
            # Visualize the results on the frame with masks
            annotated_frame = results[0].plot()
            
            # calculate the number of new people and cars
            for cls, track_id in zip(classes, track_ids):
                if yolo_labels[cls] == 'person' and track_id not in track_history:
                    people_count += 1
                if yolo_labels[cls] == 'car' and track_id not in track_history:
                    car_count += 1
                # calculate the speed by tracking stationary objects
                # if yolo_labels[cls] == 'traffic light' or yolo_labels[cls] == 'stop sign' or yolo_labels[cls] == 'fire hydrant':
                #     if len(track_history[track_id]) > 1:
                #         last_point = track_history[track_id][-1]
                #         second_last_point = track_history[track_id][-2]
                #         distance = np.sqrt((last_point[0] - second_last_point[0])**2 + (last_point[1] - second_last_point[1])**2)
                #         # convert pixels to meters
                #         distance = distance * 0.0002645833
                #         
                #         # calculate speed in km/h
                #         speed = distance * 3600
                #         
                #         max_speed = max(max_speed, speed)
                    
            
            # Display the number of people
            cv2.putText(annotated_frame, f"Number of people: {people_count}", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
            
            # Display the number of cars
            cv2.putText(annotated_frame, f"Number of cars: {car_count}", (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
            
            # Display the speed
            cv2.putText(annotated_frame, f"Max speed: {max_speed} km/h", (50, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)     
    
            # Plot the tracks
            for box, track_id in zip(boxes, track_ids):
                x, y, w, h = box
                track = track_history[track_id]
                track.append((float(x), float(y)))  # x, y center point
                if len(track) > 30:  # retain 90 tracks for 90 frames
                    track.pop(0)
    
                # Draw the tracking lines
                points = np.hstack(track).astype(np.int32).reshape((-1, 1, 2))
                cv2.polylines(annotated_frame, [points], isClosed=False, color=(230, 230, 230), thickness=10)
    
            # Display the annotated frame
            cv2.imshow(f'{video_name} Tracking', annotated_frame) if display else None
            
            # save the annotated frame to a new video
            out.write(annotated_frame) if save else None
    
            # Break the loop if 'q' is pressed
            if cv2.waitKey(1) & 0xFF == ord("q"):
                break

            # calculate the time it took to process the frame
            end = cv2.getTickCount()
            time = (end - start)/cv2.getTickFrequency()
            
            # Print the time it took to process the frame
            print(f"Frame {frame_index} took {time} seconds to process")
            
            # increment the frame index
            frame_index += 1
            prev_frame = frame
        else:
            # Break the loop if the end of the video is reached
            break
    
    # Release the video capture object and close the display window
    cap.release()
    cv2.destroyAllWindows()

    return people_count, car_count, max_speed   

In [37]:
print(analyze_video(working_dir + '/mcgill_drive.mp4', display=True, show_flow=False, save=True))

Frame 0 took 0.242956417 seconds to process


KeyboardInterrupt: 

In [None]:
print(analyze_video(working_dir + '/st-catherines_drive.mp4', display=False, save=True))