Term Project

Team: Rain Price, Weston Scott

ECE 523 | Engineering Applications of Machine Learning and Data Analytics

Professor Abhijit Mahalanobis

# Vehicle and People Detection with the FLIR Thermal Dataset

![alt text](problemStatement.png)

## TODO

- Get simple training model working
- Get resulting images and predictions showing
- Run all images through model and check error
- Rewrite Resnet layers to be our own homegrown solution?
- Survive this class ....

## Import Libraries

In [None]:
import os
import random
import math
from datetime import datetime
from collections import Counter
import pandas as pd
import numpy as np

import cv2
from PIL import Image, ImageFilter
from pathlib import Path
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
from matplotlib.collections import PatchCollection
from sklearn.model_selection import train_test_split
import xml.etree.ElementTree as ET
from torchvision.transforms.functional import pad

import torch
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models
from torchsummary import summary
from torch.autograd import Variable
from torchvision import transforms as T
from torchvision.transforms import ToTensor, transforms
from copy import copy
import json
from tqdm import tqdm
from sklearn import metrics

## Random Seed, Device Architecture, and Hyperparameters

In [None]:
%matplotlib inline
randomSeed = 2024
np.random.seed(randomSeed)
torch.manual_seed(randomSeed)

print(f'PyTorch Version: {torch.__version__}')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# device = 'cpu'
print(f'PyTorch Device: {device}')

numWorkers = 2
normalizeImages = False
trackingLabels = ['person', 'car']
colors = ['m', 'b']
maxObjects = 83
numClasses = len(trackingLabels)

## Classifier Params

In [None]:
maxEpochsClassifier = 10
batchSizeClass = 64
learnRateClass = 0.001
classifierModelPath = './classificationModel_20240502.pt'

saveClassifierModel = True
loadClassifierModel = False

## Detector Params

In [None]:
batchSizeDetect = 32
maxEpochsDetect = 10
learnRateDetect = 0.001
detectorModelPath = './DetectionModel_20240502.pt'

saveDetectionModel = True
loadDetectionModel = False

## Check to see if input images are the same size

In [None]:
path = 'data/images_thermal_train/data'

imgs = os.listdir(path)
sizes = []
for img in imgs:
    filename = os.path.join(path, img)
    image = Image.open(filename)
    sizes.append(image.size)
print(sizes[-1])
np.unique(sizes, return_counts=True)

## Load Training/ Testing Data

In [None]:
trainPath = './data/images_thermal_train'
valPath = './data/images_thermal_val'
testPath = './data/video_thermal_test'
dataDir = 'data'
jsonFile = 'coco.json'
trimmedJsonFile = 'trimmed_coco.json'

jsonFiles = { 
              'train' : os.path.join(trainPath, jsonFile),
              'val' : os.path.join(valPath, jsonFile),
              'test' : os.path.join(testPath, jsonFile)
            }

jsonFilesTrimmed = { 
              'train' : os.path.join(trainPath, trimmedJsonFile),
              'val' : os.path.join(valPath, trimmedJsonFile),
              'test' : os.path.join(testPath, trimmedJsonFile)
            }

imagePaths = { 
              'train' : trainPath,
              'val' : valPath,
              'test' : testPath
            }

for key, val in jsonFiles.items():
    if os.path.isfile(val):
        print(f'coco.json Exists: {key}, {val}')
    
for key, val in imagePaths.items():
    if os.path.isdir(val):
        print(f'Data Directory Exists: {key}, {val}')
        
labelMap = {
            1:  'person',
            2:  'bike', #(renamed from "bicycle")
            3:  'car', #(this includes pick-up trucks and vans)
            4:  'motor', #(renamed from "motorcycle" for brevity)
            6:  'bus',
            7:  'train',
            8:  'truck', #(semi/freight truck, excluding pickup truck)
            10: 'light', #(renamed from "traffic light" for brevity)
            11: 'hydrant', #(renamed "fire hydrant" for brevity)
            12: 'sign', #(renamed from "street sign" for brevity)
            17: 'dog',
            18: 'deer',
            37: 'skateboard',
            73: 'stroller', #(four-wheeled carriage for a child, also called pram)
            75: 'scooter',
            79: 'other vehicle' #(less common vehicles like construction equipment and trailers)
          }

In [None]:
boxMaxWidth = 1
boxMaxHeight = 1
for key, val in jsonFiles.items():
    
    print(f"Generating trimmed coco.json files from: {val} for {key} data at: {jsonFilesTrimmed[key]} ...")
    with open(val, 'r') as f:
        data = json.load(f)

    images = []
    annotations = []
    imageIDs = []
    for annot in data['annotations']:
        if labelMap[annot['category_id']] in trackingLabels:
            annotations.append(annot)
            imageIDs.append(annot['image_id'])
            
            bbox = annot['bbox']
            if bbox[2] > boxMaxWidth:
                boxMaxWidth = bbox[2]
            if bbox[3] > boxMaxHeight:
                boxMaxHeight = bbox[3]
                
    imageIDs= np.unique(imageIDs)
    
    for i, image in enumerate(data['images']):
        if (image['id'] in imageIDs):
            images.append(image)
    
    trimmedData = { 'annotations': annotations,
                    'images': images,
                    'ids': dict(enumerate(trackingLabels))}
    
    with open(jsonFilesTrimmed[key], "w") as outfile: 
        json.dump(trimmedData, outfile, indent=4)
        
##TODO: Calculate mean and std of images, calculate maxObjects

In [None]:
boxMaxWidth =  int(boxMaxWidth * 640/image['width'])
boxMaxHeight = int(boxMaxHeight * 640/image['height'])

print(f'Maximum Object Box Width: {boxMaxWidth}')
print(f'Maximum Object Box Height: {boxMaxHeight}')
print(f'Maximum Number of Objects: {maxObjects}')
print(f'Number of Classes: {numClasses}')

del data, images, annotations, imageIDs, trimmedData, bbox

In [None]:
def sizeBoxImage(img, bbox):
    x, y, w, h = bbox
    x, y, w, h = int(x), int(y), int(w), int(h)
    img = img[y:y+h, x:x+w]
    if w <= 80 and h <= 80:
        img = cv2.resize(img, (80, 80))
    elif w <= 160 and h <= 160:
        img = cv2.resize(img, (160, 160))
        img = cv2.pyrDown(img)
    if w <= 320 and h <= 320:
        img = cv2.resize(img, (320, 320))
        img = cv2.pyrDown(img)
        img = cv2.pyrDown(img)
    else:
        img = cv2.resize(img, (640, 640))
        img = cv2.pyrDown(img)
        img = cv2.pyrDown(img)
        img = cv2.pyrDown(img) ## final image size is 80x80
    return img

class ThermalCocoDataset(Dataset):
    def __init__(self, jsonFile:str, imageDir:str, trackingLabels:list, labelMap:dict, maxWidth:int=2, maxHeight:int=2, singleObject:bool=False, transform=None, maxObjects:int=4):
        """
        Args:
            jsonFile (str): Path to the COCO-style JSON file containing annotations.
            imageDir (str): Directory containing the images.
            trackingLabels (list): List of tracking labels.
            labelMap (dict): Mapping of category IDs to labels.
            single_object (bool): Whether to treat each image as containing a single object.
            transform (callable, optional): Optional transform to be applied to the images.
            maxObjects (int): The max number of objects to process in an image.
        """
        
        self.jsonFile = jsonFile
        self.imageDir = imageDir
        self.transform = transform
        self.trackingLabels = trackingLabels
        self.labelMap = labelMap
        self.maxWidth = maxWidth
        self.maxHeight = maxHeight
        self.singleObject = singleObject
        self.maxObjects = maxObjects
        self._load_json()

    def _load_json(self):
        """Load JSON annotations."""
        
        with open(self.jsonFile, 'r') as f:
            data = json.load(f)                    
        
        self.annotations = data['annotations']
        self.images = data['images']
            
    def _adjust_bounding_box(self, bbox, width, height):
        """Adjust bounding box coordinates to match resized image."""
        
        if self.singleObject:
            return [int(bbox[0] * (640 / width)),
                    int(bbox[1] * (640 / height)),
                    int(bbox[2] * (640 / width)),
                    int(bbox[3] * (640 / height))]
        else:
            return [int(bbox[0] * (320 / width)),
                    int(bbox[1] * (320 / height)),
                    int(bbox[2] * (320 / width)),
                    int(bbox[3] * (320 / height))]
    
    def _get_single_object(self, idx):
        """Get image, label, bounding box, and number of objects for single object mode."""
        
        annotation = self.annotations[idx]
        image_id = annotation['image_id']
        for entry in self.images:
            if entry['id'] == image_id:
                image_file_name = entry['file_name']
                width = entry['width']
                height = entry['height']
            
        image_file = os.path.join(self.imageDir, f"{image_file_name}")
        img = cv2.imread(str(image_file), cv2.IMREAD_GRAYSCALE).astype(np.float32)
        img = cv2.resize(img, (640, 640))

        if self.labelMap[annotation['category_id']] in self.trackingLabels:
            # Assuming annotation format: [x, y, width, height]
            bbox = self._adjust_bounding_box(annotation['bbox'], width, height)
            tmpLabel = self.labelMap[annotation['category_id']]
            
        img = sizeBoxImage(img, bbox)
        label = torch.tensor(self.trackingLabels.index(tmpLabel))
        bbox = torch.tensor(bbox)
        
        if self.transform:
            img = self.transform(img)

        return img, label, bbox, torch.tensor(1)

    def _get_multi_objects(self, idx):
        """Get image, labels, bounding boxes, and number of objects for multi-object mode."""
        
        image = self.images[idx]
        image_file = os.path.join(self.imageDir, f"{image['file_name']}")
        img = cv2.imread(str(image_file), cv2.IMREAD_GRAYSCALE).astype(np.float32)
        img = cv2.resize(img, (640, 640))
            
        id = image['id']
        width = image['width']
        height = image['height']
        
        bboxes = []
        labels = []
        for entry in self.annotations:
            if entry['image_id'] == id:
                if (self.labelMap[entry['category_id']] in self.trackingLabels):
                    tmpLabel = self.labelMap[entry['category_id']]
                    bboxes.append(self._adjust_bounding_box(entry['bbox'], width, height))
                    labels.append(self.trackingLabels.index(tmpLabel))
                    
        numObjects = torch.tensor(len(labels))
        padded_labels = torch.full((self.maxObjects,), 0)
        padded_labels[:len(labels)] = torch.tensor(labels)[:self.maxObjects].squeeze()

        padded_bboxes = torch.full((self.maxObjects,4), 0, dtype=torch.float32)
        padded_bboxes[:len(bboxes)] = torch.tensor(bboxes, dtype=torch.float32)[:self.maxObjects]

        if self.transform:
            img = cv2.pyrDown(img)
            img = self.transform(img)
        
        return img, padded_labels, padded_bboxes, numObjects
    
    def __len__(self):
        """Return the number of images in the dataset."""
        
        if self.singleObject:
            return len(self.annotations)
        else:
            return len(self.images)

    def __getitem__(self, idx):
        """Get the image, labels, and bounding boxes for the given index."""
        
        if self.singleObject:
            return self._get_single_object(idx)
        else:
            return self._get_multi_objects(idx)

if normalizeImages:
    transform = transforms.Compose([
        transforms.ToTensor(),  # Convert PIL image to PyTorch tensor
        transforms.Normalize(mean=[262.6299], std=[117.4840])  # Normalize image
    ])
else:
    transform = transforms.Compose([
        transforms.ToTensor()
    ]) 


In [None]:
trainDataClass = ThermalCocoDataset(jsonFilesTrimmed['train'], imagePaths['train'], trackingLabels, labelMap, 640, 640, True, transform=transform)
valDataClass = ThermalCocoDataset(jsonFilesTrimmed['val'], imagePaths['val'], trackingLabels, labelMap, 640, 640, True, transform=transform)
testDataClass = ThermalCocoDataset(jsonFilesTrimmed['test'], imagePaths['test'], trackingLabels, labelMap, 640, 640, True, transform=transform)

trainLoaderClass = DataLoader(trainDataClass, batch_size=batchSizeClass, shuffle=True, num_workers=numWorkers)
valLoaderClass = DataLoader(valDataClass, batch_size=batchSizeClass, shuffle=True, num_workers=numWorkers)
testLoaderClass = DataLoader(testDataClass, batch_size=batchSizeClass, shuffle=False, num_workers=numWorkers)

trainDataDetect = ThermalCocoDataset(jsonFilesTrimmed['train'], imagePaths['train'], trackingLabels, labelMap, transform=transform, maxObjects=maxObjects)
valDataDetect = ThermalCocoDataset(jsonFilesTrimmed['val'], imagePaths['val'], trackingLabels, labelMap, transform=transform, maxObjects=maxObjects)
testDataDetect = ThermalCocoDataset(jsonFilesTrimmed['test'], imagePaths['test'], trackingLabels, labelMap, transform=transform, maxObjects=maxObjects)

trainLoaderDetect = DataLoader(trainDataDetect, batch_size=batchSizeDetect, shuffle=False, num_workers=numWorkers)
valLoaderDetect = DataLoader(valDataDetect, batch_size=batchSizeDetect, shuffle=False, num_workers=numWorkers)
testLoaderDetect = DataLoader(testDataDetect, batch_size=1, shuffle=False, num_workers=numWorkers)

## Helpful Function Definitions

In [None]:
def create_corner_rect(bb, color='red'):
    bb = np.array(bb, dtype=np.float32)
    return plt.Rectangle((bb[0], bb[1]), bb[2], bb[3], color=color,
                         fill=False, lw=1)

def show_corner_bb(im, bb, c=None, cLabel='', color='red', createFig=False):
    if createFig:
        plt.figure(figsize=(6,6))
        if not cLabel == '':
            plt.title(f'{cLabel} Class: {c}')
    plt.imshow(im.squeeze(), cmap=plt.cm.gray)
    plt.gca().add_patch(create_corner_rect(bb, color=color))
    
def plot_sample(image, labels, bboxes, num, showbb=True):
    
    plt.imshow(image.squeeze(), cmap="gray")  # Convert (C, H, W) tensor to (H, W, C) for plotting
    if showbb:
        plt.title(f'Number of Objects: {num}')
        try:
            for bbox, label in zip(bboxes, labels):
                label = int(label)
                x, y, w, h = bbox
                plt.gca().add_patch(plt.Rectangle((x, y), w, h, linewidth=1, edgecolor=colors[label], facecolor='none'))
                plt.text(x, y-5, f'{trackingLabels[label]}', color=colors[label])
        except:
            try:
                x, y, w, h = bboxes
            except:
                x, y, w, h = bboxes[0]
            plt.gca().add_patch(plt.Rectangle((x, y), w, h, linewidth=1, edgecolor=colors[labels], facecolor='none'))
            plt.text(x, y-5, f'{trackingLabels[labels]}', color=colors[labels])
        plt.axis('off')
    else:
        plt.title(f'Sample {trackingLabels[labels]}')

## Sample Imagery From Training Data

### Original Images (Training)

In [None]:
figure = plt.figure(figsize=(12, 12))
plt.suptitle('Training Data Sample')
cols, rows = 3, 3
for i in range(1, cols * rows + 1):
    idx = torch.randint(len(trainDataClass), size=(1,)).item()
    image, labels, bboxes, num = trainDataClass[idx]
    figure.add_subplot(rows, cols, i)
    plot_sample(image, labels.tolist(), bboxes.tolist(), num.item(), showbb=False)
plt.show()

In [None]:
figure = plt.figure(figsize=(12, 12))
plt.suptitle('Training Data Sample')
cols, rows = 3, 3
for i in range(1, cols * rows + 1):
    idx = torch.randint(len(trainDataDetect), size=(1,)).item()
    image, labels, bboxes, num = trainDataDetect[idx]
    figure.add_subplot(rows, cols, i)
    labels = labels[:num]
    bboxes = bboxes[:num,:]
    plot_sample(image, labels.tolist(), bboxes.tolist(), num.item())
plt.show()

### Original Images (Validation)

In [None]:
figure = plt.figure(figsize=(12, 12))
plt.suptitle('Training Data Sample')
cols, rows = 3, 3
for i in range(1, cols * rows + 1):
    idx = torch.randint(len(valDataClass), size=(1,)).item()
    image, labels, bboxes, num = valDataClass[idx]
    figure.add_subplot(rows, cols, i)
    plot_sample(image, labels.tolist(), bboxes.tolist(), num.item(), showbb=False)
plt.show()

In [None]:
figure = plt.figure(figsize=(12, 12))
plt.suptitle('Validation Data Sample')

cols, rows = 3, 3
for i in range(1, cols * rows + 1):
    idx = torch.randint(len(valDataDetect), size=(1,)).item()
    image, labels, bboxes, num = valDataDetect[idx]
    figure.add_subplot(rows, cols, i)
    labels = labels[:num]
    bboxes = bboxes[:num,:]
    plot_sample(image, labels.tolist(), bboxes.tolist(), num.item())
plt.show()

### Original Images (Testing)

In [None]:
figure = plt.figure(figsize=(12, 12))
plt.suptitle('Training Data Sample')
cols, rows = 3, 3
for i in range(1, cols * rows + 1):
    idx = torch.randint(len(testDataClass), size=(1,)).item()
    image, labels, bboxes, num = testDataClass[idx]
    figure.add_subplot(rows, cols, i)
    plot_sample(image, labels.tolist(), bboxes.tolist(), num.item(), showbb=False)
plt.show()

In [None]:
figure = plt.figure(figsize=(12, 12))
plt.suptitle('Testing Data Sample')
cols, rows = 3, 3
for i in range(1, cols * rows + 1):
    idx = torch.randint(len(testDataDetect), size=(1,)).item()
    image, labels, bboxes, num = testDataDetect[idx]
    figure.add_subplot(rows, cols, i)
    labels = labels[:num]
    bboxes = bboxes[:num,:]
    plot_sample(image, labels.tolist(), bboxes.tolist(), num.item())
plt.show()

## Model Definitions

![alt text](image.png)

## Classification Model

In [None]:
class Classifier(nn.Module):
    def __init__(self, num_classes):
        super(Classifier, self).__init__()
        inplace = False
        self.batchNorm = nn.BatchNorm2d(1)
        resnet = models.resnet18(weights=None)
        
        self.relu = nn.ReLU(inplace=inplace)
        resnet.conv1 = nn.Conv2d(1, 64, kernel_size=3, stride=2, padding=1, bias=False)
        
        layers = list(resnet.children())[:6]
        self.features = nn.Sequential(*layers)
        
        self.classifier = nn.Sequential(nn.Linear(128, 32), 
                                         nn.ReLU(inplace=inplace), 
                                         nn.Linear(32, num_classes),
                                         nn.Softmax(1))
        
    def forward(self, x):
        x = self.batchNorm(x)
        x = self.features(x)
        x = self.relu(x)
        x = F.adaptive_avg_pool2d(x, (1, 1))
        x = x.view(x.size(0), -1)
        
        classifier_output = self.classifier(x)
        return classifier_output
    
classifier = Classifier(numClasses)

classifier = classifier.cpu()

if str(device) == 'cuda':
    classifier = classifier.to(device)

summary(classifier, (1,80,80))
print()

In [None]:
criterionClassifier = nn.CrossEntropyLoss()   
print(f'Criterion: {criterionClassifier}')

optimizerClassifier = optim.Adam(classifier.parameters(), lr = learnRateClass)   
print(f'\nOptimizer: {optimizerClassifier}')

# scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizerClassifier, mode='min')
# print(f'\nscheduler: {scheduler}')

In [None]:
if loadClassifierModel:
    classifier = Classifier(numClasses)
    checkpoint = torch.load(classifierModelPath)
    classifier.load_state_dict(checkpoint['model_state_dict'])
    optimizerClassifier.load_state_dict(checkpoint['optimizer_state_dict'])
    epoch = checkpoint['epoch']
    trainLoss = [checkpoint['loss']]

In [None]:
classifier = classifier.to(device)
criterionClassifier = criterionClassifier.to(device)

if not loadClassifierModel:
        
    trainLoss = []
    trainAcc = []
    testLoss = []
    testAcc = []

    for epoch in range(1, maxEpochsClassifier+1):
        epochLoss = []
        epochAcc = []
        testEpochLoss = []
        testEpochAcc = []
        classifier.train()

        for i, (images, labels, __, __) in tqdm(enumerate(trainLoaderClass)):
            optimizerClassifier.zero_grad()
            
            if str(device) == 'cuda':
                images = images.to(device)
                outputs = classifier(images)
                labels = labels.to(device)
            
            loss = criterionClassifier(outputs, labels)
            loss.backward()          
            optimizerClassifier.step()
            lossVal = loss.item()
            
            pred = torch.max(outputs, 1)[1].data.squeeze()
            accuracy = (pred == labels).sum().item() / float(labels.size(0))

            epochLoss.append(float(lossVal))
            epochAcc.append(float(accuracy))
                
        trainLoss.append(np.mean(epochLoss))
        trainAcc.append(np.mean(epochAcc))
        # scheduler.step(trainLoss[-1])
                
        classifier.eval()
        with torch.no_grad():
            for testImages, testLabels, __, __ in tqdm(valLoaderClass):
                testImages = testImages.to(device)
                testLabels = testLabels.to(device)
                    
                testOutput = classifier(testImages)
                lossVal = criterionClassifier(testOutput, testLabels)
            
                if str(device) == 'cuda':
                    lossVal = lossVal.cpu()
                    
                predVal = torch.max(testOutput, 1)[1].data.squeeze()
                accuracyVal = (predVal == testLabels).sum().item() / float(testLabels.size(0))
                
                testEpochLoss.append(lossVal)
                testEpochAcc.append(accuracyVal)
                
            testLoss.append(np.mean(testEpochLoss))
            testAcc.append(np.mean(testEpochAcc))
                
        print(f'[Epoch: {epoch}/{maxEpochsClassifier}] Loss: {np.round(trainLoss[-1], 5)}')

        if saveClassifierModel and epoch % 1 == 0: ## save model every 5th epoch
            torch.save({
                    'epoch': epoch,
                    'model_state_dict': classifier.state_dict(),
                    'optimizer_state_dict': optimizerClassifier.state_dict(),
                    'loss': trainLoss[-1],
                    }, classifierModelPath)

In [None]:
if saveClassifierModel:
    torch.save({
            'epoch': epoch,
            'model_state_dict': classifier.state_dict(),
            'optimizer_state_dict': optimizerClassifier.state_dict(),
            'loss': trainLoss[-1],
            }, classifierModelPath)

In [None]:
if not loadClassifierModel:
    print(f'Final MSE ({maxEpochsClassifier} epochs): {trainLoss[-1]}\n')
    
    f = plt.figure(figsize=(10,8))
    plt.plot(trainLoss, label="train")
    plt.plot(testLoss, label="val")
    plt.xlabel("epochs")
    plt.ylabel("CrossEntropy")
    plt.title("Epochs vs. Loss Function")
    plt.legend()
    plt.grid()
    plt.tight_layout()
    plt.show()
    
    f = plt.figure(figsize=(10,8))
    plt.plot(trainAcc, label="train")
    plt.plot(testAcc, label="test")
    plt.xlabel("epochs")
    plt.ylabel("accuracy")
    plt.title("Epochs vs. Accuracy")
    plt.legend()
    plt.grid()
    plt.tight_layout()
    plt.show()

In [None]:
predictions = []
actuals = []
classifier.eval()
with torch.no_grad():
    for images, labels, __, __ in tqdm(testLoaderClass):
        images = images.to(device)
        labels = labels.to(device)
            
        test_output = classifier(images)
        pred_y = torch.max(test_output, 1)[1].data.squeeze()
        accuracy = (pred_y == labels).sum().item() / float(labels.size(0))
        predictions.extend(pred_y.cpu().numpy())
        actuals.extend(labels.cpu().numpy())

In [None]:
errorCount = [predictions[i] == actuals[i] for i in range(len(actuals))]
print(f"\nCorrect classifications on {len(predictions)} images: {np.sum(errorCount)}/{len(predictions)} | {np.sum(errorCount)/len(predictions)*100}%")

## Create Confusion matrix
f, ax = plt.subplots(figsize=(8,8))
confusion_matrix = metrics.confusion_matrix(actuals, predictions)
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix, display_labels=trackingLabels)

cm_display.plot(ax=ax)
plt.title("Confusion Matrix for Test Data")
plt.tight_layout()
plt.show()

In [None]:
predictions = np.array(predictions).reshape(len(predictions), 1)
actuals = np.array(actuals).reshape(len(actuals), 1)
bidxs = np.where(actuals != predictions)[0]
gidxs = np.where(actuals == predictions)[0]

In [None]:
figure = plt.figure(figsize=(12, 12))
plt.suptitle(f'Correct Classifications: {np.round(len(gidxs)/len(predictions),4)*100}% (Actual - Predicted)')
cols, rows = 5, 5
for i in range(1, cols * rows + 1):
    sample_idx = torch.randint(len(gidxs), size=(1,)).item()
    sample_idx = gidxs[sample_idx]
    img, label, __, __ = testDataClass[sample_idx]
    pred = predictions[sample_idx]
    act = actuals[sample_idx]
    figure.add_subplot(rows, cols, i)
    plt.title(f'{trackingLabels[label]} - {trackingLabels[pred[0]]}')
    plt.axis("off")
    plt.imshow(img.squeeze(), cmap="gray")
plt.show()

In [None]:
figure = plt.figure(figsize=(12, 12))
plt.suptitle(f'Incorrect Classifications: {np.round(len(bidxs)/len(predictions),4)*100}% (Actual - Predicted)')
cols, rows = 5, 5
for i in range(1, cols * rows + 1):
    sample_idx = torch.randint(len(bidxs), size=(1,)).item()
    sample_idx = bidxs[sample_idx]
    img, label, __, __ = testDataClass[sample_idx]
    pred = predictions[sample_idx]
    act = actuals[sample_idx]
    figure.add_subplot(rows, cols, i)
    plt.title(f'{trackingLabels[label]} - {trackingLabels[pred[0]]}')
    plt.axis("off")
    plt.imshow(img.squeeze(), cmap="gray")
plt.show()

## Detection Model

In [None]:
class Detector(nn.Module):
    def __init__(self, max_objects):
        super(Detector, self).__init__()
        inplace = False
        
        self.batchNorm = nn.BatchNorm2d(1)
        resnet = models.resnet18(weights=None)
        
        self.relu = nn.ReLU(inplace=inplace)
        resnet.conv1 = nn.Conv2d(1, 64, kernel_size=3, stride=2, padding=1, bias=False)
        
        layers = list(resnet.children())[:6]
        self.features = nn.Sequential(*layers)
        
        self.max_objects = max_objects
        
        self.bb = nn.Sequential(nn.Linear(128, 64),
                                nn.ReLU(inplace=inplace),
                                nn.Linear(64, 4*max_objects))
        
    def forward(self, x):
        x = self.batchNorm(x)
        x = self.features(x)
        x = self.relu(x)
        x = F.adaptive_avg_pool2d(x, (1, 1))
        x = x.view(x.size(0), -1)
        
        bbox_output = self.bb(x)
        return bbox_output
    
detector = Detector(maxObjects)

detector = detector.cpu()

if str(device) == 'cuda':
    detector = detector.to(device)

summary(detector, (1,320,320))
print()

## Set Criterion and Optimizer

In [None]:
class DetectionLoss(nn.Module):
    def __init__(self):
        super(DetectionLoss, self).__init__()
        self.bboxLoss = nn.SmoothL1Loss()

    def forward(self, bboxes, num, bboxesPred):
        maxNum = torch.max(num)       
        preds = bboxesPred[:, :maxNum, :4]
        truths = bboxes[:, :maxNum, :4]
        preds = torch.flatten(preds)
        truths = torch.flatten(truths)
        loss = self.bboxLoss(preds, truths)
        return loss
    
# Define the loss function
criterionDetector = DetectionLoss()
print(f'\nCriterion: {criterionDetector}')

# Define optimizer and learning rate scheduler
optimizerDetector = optim.Adam(detector.parameters(), lr=learnRateDetect)
print(f'Optimizer: {optimizerDetector}')

# scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)  # Adjust the scheduler parameters
# print(f'Scheduler: {scheduler}')

## Load Trained Model (If Needed)

In [None]:
if loadDetectionModel:
    detector = Detector(maxObjects)
    checkpoint = torch.load(detectorModelPath)
    detector.load_state_dict(checkpoint['model_state_dict'])
    optimizerDetector.load_state_dict(checkpoint['optimizer_state_dict'])
    epoch = checkpoint['epoch']
    loss = checkpoint['loss']  

## Train Model

In [32]:
detector = detector.to(device)
criterionDetector = criterionDetector.to(device)

if not loadDetectionModel:
        
    trainLossDetector = []
    testLossDetector = []

    for epoch in range(1, maxEpochsDetect+1):
        epochLoss = []
        epochAcc = []
        testEpochLoss = []
        testEpochAcc = []
        detector.train()

        for i, (images, __, bboxes, num) in tqdm(enumerate(trainLoaderDetect)):
            optimizerDetector.zero_grad()
            
            if str(device) == 'cuda':
                images = images.to(device)
                bboxes = bboxes.to(device)
                num = num.to(device)
                
            outputs = detector(images)
            outputs = outputs.view(-1, maxObjects, 4)
            loss = criterionDetector(bboxes, num, outputs)
            loss.backward()          
            optimizerDetector.step()
            lossVal = loss.item()
            epochLoss.append(float(lossVal))
        trainLossDetector.append(np.mean(epochLoss))
                
        detector.eval()
        with torch.no_grad():
            for testImages, __, testBoxes, num, in tqdm(valLoaderDetect):
                if str(device) == 'cuda':
                    testImages = testImages.to(device)
                    testBoxes = testBoxes.to(device)
                    num = num.to(device)
                    
                testOutputs = detector(testImages)
                testOutputs = testOutputs.view(-1, maxObjects, 4)
                lossVal = criterionDetector(testBoxes, num, testOutputs)
            
                if str(device) == 'cuda':
                    lossVal = lossVal.cpu()  
                testEpochLoss.append(lossVal) 
            testLossDetector.append(np.mean(testEpochLoss))
                
        print(f'[Epoch: {epoch}/{maxEpochsDetect}] Loss: {np.round(trainLossDetector[-1], 5)}')

        if saveDetectionModel and epoch % 1 == 0: ## save model every 5th epoch
            torch.save({
                    'epoch': epoch,
                    'model_state_dict': detector.state_dict(),
                    'optimizer_state_dict': optimizerDetector.state_dict(),
                    'loss': trainLossDetector[-1],
                    }, detectorModelPath)

KeyboardInterrupt: 

## Save Model (If Needed)

In [None]:
if saveDetectionModel:
    torch.save({
            'epoch': epoch,
            'model_state_dict': detector.state_dict(),
            'optimizer_state_dict': optimizerDetector.state_dict(),
            'loss': trainLossDetector[-1],
            }, detectorModelPath)

## Learning Curve

In [None]:
if not loadDetectionModel:
    print(f'Final MSE ({maxEpochsDetect} epochs): {trainLossDetector[-1]}\n')
    
    f = plt.figure(figsize=(10,8))
    plt.plot(trainLossDetector, label="train")
    plt.plot(testLossDetector, label="val")
    plt.xlabel("epochs")
    plt.ylabel("SmoothL1Loss")
    plt.title("Epochs vs. Loss Function")
    plt.legend()
    plt.grid()
    plt.tight_layout()
    plt.show()

# Object Detection and Classification

In [None]:
predictions = []
actuals = []
detector.eval()
with torch.no_grad():
    for images, labels, bboxes, num in tqdm(testLoaderDetect):
        images = images.to(device)
        labels = labels.to(device)
        bboxes = bboxes.to(device)
        num = num.to(device)
        
        predBoxes = detector(images)
        predBoxes = predBoxes.view(-1, maxObjects, 4)
        
        predLabels = []
        predScores = []
        boxesKeep = []
        for box in predBoxes.squeeze():
            if not torch.all(box > 1):
                continue

            img = sizeBoxImage(images.squeeze().detach().cpu().numpy(), box)
            img = transform(img).unsqueeze(0)
            output = classifier(img.to(device))
            print(output.detach().cpu().tolist())
            print(torch.max(output))
            predLabels.append(torch.max(output, 1)[1].data.squeeze().item())
            predScores.append(torch.max(output, 1)[0].data.squeeze().item())
            boxesKeep.append(box.detach().cpu().tolist())
            
        print(output)
        boxesKeep = torch.tensor(boxesKeep, dtype=torch.float32)
        print(predLabels)
        print()
        print(predScores)
        break
             
        # test_output = classifier(images)
        # pred_y = torch.max(test_output, 1)[1].data.squeeze()
        # accuracy = (pred_y == labels).sum().item() / float(labels.size(0))
        # predictions.extend(pred_y.cpu().numpy())
        # actuals.extend(labels.cpu().numpy())

#     # During evaluation, filter out predictions below a certain probability threshold
#     bbox = bbox_output.view(-1, self.max_objects, 4)
#     probabilities = F.softmax(classifier, dim=2)
#     # Apply thresholding to filter out predictions with low confidence
#     thresholded_probabilities, indices = torch.max(probabilities, dim=2)
#     mask = thresholded_probabilities > 0.5
#     classifier = classifier[mask]
#     bbox = bbox[mask]

In [None]:
figure = plt.figure(figsize=(12, 12))
plt.suptitle('Testing Data')
plot_sample(images.detach().cpu(), predLabels, boxesKeep, num.item())
plt.show()

In [None]:
figure = plt.figure(figsize=(12, 12))
plt.suptitle('Testing Data')
plot_sample(images.detach().cpu(), labels.tolist(), bboxes.tolist(), num.item())
plt.show()

## Sample Imagery of Model Output

### Training Images Samples

In [None]:
# trainImages, trainLabels, trainImagesFlipped = next(iter(trainLoader))
# samples = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

# model.eval()
# with torch.no_grad():
#     outputs, __ = model(trainImages.to(device))

# j = 0
# for i, label in enumerate(trainLabels):
#     if label.item() in samples:
#         f = plt.figure(figsize=(12, 4))      
#         ax1 = f.add_subplot(131)
#         ax1.imshow(trainImages[i].squeeze(), cmap='gray')
#         ax1.axis('off')
#         ax1.set_title(f'Original - {label.item()}')
#         ax2 = f.add_subplot(132)
#         ax2.imshow(outputs[i].detach().cpu()[0].squeeze(), cmap='gray')
#         ax2.axis('off')
#         ax2.set_title(f'Reconstruction (Flipped) - {label.item()}')
#         plt.tight_layout()
#         plt.show()
        
#         samples.remove(label.item())
#         j += 1

### Test Images Samples

In [None]:
# testImages, testLabels, testImagesFlipped = next(iter(testLoader))
# samples = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

# model.eval()
# with torch.no_grad():
#     outputs, __ = model(testImages.to(device))

# j = 0
# for i, label in enumerate(testLabels):
#     if label.item() in samples:
#         f = plt.figure(figsize=(12, 4))      
#         ax1 = f.add_subplot(131)
#         ax1.imshow(testImages[i].squeeze(), cmap='gray')
#         ax1.axis('off')
#         ax1.set_title(f'Original - {label.item()}')
#         ax2 = f.add_subplot(132)
#         ax2.imshow(outputs[i].detach().cpu()[0].squeeze(), cmap='gray')
#         ax2.axis('off')
#         ax2.set_title(f'Reconstruction (Flipped) - {label.item()}')
#         plt.tight_layout()
#         plt.show()
        
#         samples.remove(label.item())
#         j += 1