In [None]:
# Import necessary libaries
import cv2
import numpy as np
import matplotlib.pyplot as plt
import time
import statistics  
from statistics import mode

import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.models as models

In [None]:
# Define the classes from finetuning code
classes = ['Bike', 'Car']

# Load finetuned alexnet model
PATH1 = "alexnetpretrained.pt"
alexnetmodel= models.alexnet(pretrained=True)
alexnetmodel.classifier=nn.Sequential(nn.Linear(9216,1024),
                                     nn.ReLU(),
                                     nn.Dropout(p=0.5),
                                     nn.Linear(1024,2),
                                     nn.LogSoftmax(dim=1))
alexnetmodel.load_state_dict(torch.load(PATH1))
alexnetmodel.eval()

AlexNet(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(11, 11), stride=(4, 4), padding=(2, 2))
    (1): ReLU(inplace=True)
    (2): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(64, 192, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (4): ReLU(inplace=True)
    (5): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(192, 384, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): ReLU(inplace=True)
    (8): Conv2d(384, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (9): ReLU(inplace=True)
    (10): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (avgpool): AdaptiveAvgPool2d(output_size=(6, 6))
  (classifier): Sequential(
    (0): Linear(in_features=9216, out_features=1024, bias=True)
    (1): ReLU()
    (2): Dropout(p=0.5, 

In [None]:
# Load finetuned densenet model
PATH2 = "densenetpretrained.pt"
densenetmodel= models.densenet121(pretrained=True)
densenetmodel.classifier=nn.Sequential(nn.Linear(1024,1024),
                                     nn.ReLU(),
                                     nn.Dropout(p=0.5),
                                     nn.Linear(1024,2),
                                     nn.LogSoftmax(dim=1))
densenetmodel.load_state_dict(torch.load(PATH2))
densenetmodel.eval()

DenseNet(
  (features): Sequential(
    (conv0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (norm0): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu0): ReLU(inplace=True)
    (pool0): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (denseblock1): _DenseBlock(
      (denselayer1): _DenseLayer(
        (norm1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu1): ReLU(inplace=True)
        (conv1): Conv2d(64, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (norm2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu2): ReLU(inplace=True)
        (conv2): Conv2d(128, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      )
      (denselayer2): _DenseLayer(
        (norm1): BatchNorm2d(96, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu

In [None]:
# Load finetuned efficientnet model
PATH3 = "efficientnetpretrained.pt"
efficientnetmodel= models.efficientnet_b0(pretrained=True)
efficientnetmodel.classifier=nn.Sequential(nn.Linear(1280,1024),
                                     nn.ReLU(),
                                     nn.Dropout(p=0.5),
                                     nn.Linear(1024,2),
                                     nn.LogSoftmax(dim=1))
efficientnetmodel.load_state_dict(torch.load(PATH3))
efficientnetmodel.eval()

EfficientNet(
  (features): Sequential(
    (0): Conv2dNormActivation(
      (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): SiLU(inplace=True)
    )
    (1): Sequential(
      (0): MBConv(
        (block): Sequential(
          (0): Conv2dNormActivation(
            (0): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=32, bias=False)
            (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
            (2): SiLU(inplace=True)
          )
          (1): SqueezeExcitation(
            (avgpool): AdaptiveAvgPool2d(output_size=1)
            (fc1): Conv2d(32, 8, kernel_size=(1, 1), stride=(1, 1))
            (fc2): Conv2d(8, 32, kernel_size=(1, 1), stride=(1, 1))
            (activation): SiLU(inplace=True)
            (scale_activation): Sigmoid()
          )
          (2): Conv2dNormActivat

In [None]:
# Preprocess the image into tensor for recognition
def preprocess_image_for_recognition(frame):
    transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((300,300)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406],
                             [0.229, 0.224, 0.225])
    ])
    return transform(frame).unsqueeze(0)

# Utility function to recognize objects in the frame
def recognize_objects(frame, model):
    input_tensor = preprocess_image_for_recognition(frame)
    with torch.no_grad():
        output = model(input_tensor)
    _, index = torch.max(output, 1)
    return classes[index]

In [None]:
# Frame capturing
cap = cv2.VideoCapture("highway.mp4")
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height =int(cap.get( cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc('X','V','I','D')
out = cv2.VideoWriter("output.avi", fourcc, 5.0, (1280,720))
ret, frame1 = cap.read()# 1st frame
ret, frame2 = cap.read() #2nd frame

# start time
start_time = time.time()

In [None]:
while cap.isOpened():
    # Frame preprocessing and detect motion
    diff = cv2.absdiff(frame1, frame2)        
    gray = cv2.cvtColor(diff, cv2.COLOR_BGR2GRAY)        
    blur = cv2.GaussianBlur(gray, (5,5), 0)      
    _, thresh = cv2.threshold(blur, 20, 255, cv2.THRESH_BINARY)        
    dilated = cv2.dilate(thresh, None, iterations=3)        
    contours, _ = cv2.findContours(dilated, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)

    # Each contour as individual moving object
    for contour in contours:        
        (x, y, w, h) =  cv2.boundingRect(contour)             
        if cv2.contourArea(contour) < 800:              
            continue

        # Initialize array to store 3 models predictions
        recognized_class_arr = []

        # Draw rectangle around the moving object
        cv2.rectangle(frame1, (x, y), (x+w, y+h), (0, 255, 0), 2)              
        cv2.putText(frame1, "Status: {}".format('Movement'), (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 3)

        # Crop and save the moving object (ROI)
        roi = frame1[y:y+h, x:x+w]

        # Perform object recognition using the same frame using 3 different models
        recognized_class_arr.append(recognize_objects(roi, alexnetmodel))
        recognized_class_arr.append(recognize_objects(roi, densenetmodel))
        recognized_class_arr.append(recognize_objects(roi, efficientnetmodel))
        
        # Ensemble the results by finding the highest voted prediction
        recognized_class = mode(recognized_class_arr)
        print("Array: ", recognized_class_arr)
        print("Ensembled prediction: ", recognized_class)

        # Save the ROI with a unique filename (using timestamp)
        timestamp = int(time.time())
        cv2.imwrite(f"results/{recognized_class}_{timestamp}.jpg", roi)  # Save ROI as an image
    
    # Record and save the output video
    image = cv2.resize(frame1, (1280,720))
    out.write(image.shape)
    cv2.imshow("feed", frame1)
    frame1 = frame2
    ret, frame2 = cap.read()#read

    # Close frame capturing using escape key
    if cv2.waitKey(40) == 27:
        break

cv2.destroyAllWindows()#close window
cap.release()#close camera
out.release()#close when write
