In [1]:
import os
is_running_on_colab = 'COLAB_GPU' in os.environ
print('Running on CoLab?', is_running_on_colab)

Running on CoLab? False


In [2]:
from PIL import Image
import matplotlib.pyplot as plt
import cv2
import xml.etree.ElementTree as ET
from torch.utils.data import *
from imutils import paths
from typing import Union
import torch
import torch.nn as nn
import time
import torch.optim as optim
from tqdm.notebook import tqdm


if is_running_on_colab:
  from google.colab import drive
  drive.mount('/content/gdrive')
  data_path = '/content/gdrive/MyDrive/license_plate/data/'
  test_path = '/content/gdrive/MyDrive/license_plate/testdata/'
else:
  current_dir = os.getcwd()
  data_path = os.path.join(current_dir,'dataset','license_plate','data')
  test_path = os.path.join(current_dir,'dataset','license_plate','test_data')
#simple directory check for the data_files

def check_directories_exist(*paths):
    for path in paths:
        if not os.path.isdir(path):
            assert False
            print(f"The directory {path} does not exist.")

check_directories_exist(data_path, test_path)

data_link = "https://drive.google.com/open?id=1rdEsCUcIUaYOVRkx5IMTRNA7PcGMmSgc"

In [3]:

class labelFpsDataLoader(Dataset):
    def __init__(self, img_dir, imgSize, is_transform=None):
        self.img_dir = img_dir
        self.img_paths = []
        for i in range(len(img_dir)):
            self.img_paths += [el for el in paths.list_images(img_dir[i])]
        self.img_size = imgSize
        self.is_transform = is_transform

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, index):
        # Get image name
        img_name = self.img_paths[index]
        img = cv2.imread(img_name)
        # Resize image
        resizedImage = cv2.resize(img, self.img_size)
        resizedImage = resizedImage.astype('float32')
        resizedImage /= 255.0

        if is_running_on_colab:
            iname = img_name.rsplit('/', 1)[-1].rsplit('.', 1)[0].split('-')
        else:
            iname = img_name.rsplit('\\', 1)[-1].rsplit('.', 1)[0].split('-')
        [leftUp, rightDown] = [[int(eel) for eel in el.split('&')] for el in iname[2].split('_')]
            # [leftUp, rightDown] = [[int(eel) for eel in el.split('_')] for el in iname[1:3]]

        # Find leftUp and rightDown from file-name of original image

        # Find original width and original height of original image
        ori_w, ori_h = [float(int(el)) for el in [img.shape[1], img.shape[0]]]

        # Find scale values for width and height
        scale_height, scale_width = self.img_size[1] / ori_h, self.img_size[0] / ori_w

        scaled_leftUp = (int(leftUp[0] * scale_width), int(leftUp[1] * scale_height))
        scaled_rightDown = (int(rightDown[0] * scale_width), int(rightDown[1] * scale_height))

        # All four values of leftUp and rightDown scaled in 1 vector
        scaled_labels = (scaled_leftUp, scaled_rightDown)
        # scaled_labels = [int(leftUp[0] * scale_x, leftUp[1] * scale_y), (rightDown[0] * scale_x, rightDown[1] * scale_y)]

        # Flatten scaled_labels into a 1D list
        flattened_labels = [coordinate for point in scaled_labels for coordinate in point]

        # Convert the 1D list to a tensor and reshape it to (1, 4)
        scaled_labels_tensor = torch.tensor(flattened_labels, dtype=torch.float32)


        return resizedImage, scaled_labels_tensor, img_name


In [4]:
# Initialize your dataset
img_dir = [data_path]  # Replace with actual paths to your image directories
img_size = (224, 224)  # Replace with your desired image size
dataset = labelFpsDataLoader(img_dir=img_dir, imgSize=img_size)

# Test the length of the dataset
print(f"Length of dataset: {len(dataset)}")

# Test getting an item
try:
    # Retrieve the first item
    img, labels, img_name = dataset[0]

    # Check the shapes and types
    print(f"Image shape: {img.shape}")
    print(f"Labels: {labels}")
    print(f"Image name: {img_name}")

    # If the code reaches this point, the item has been retrieved successfully
    print("Item retrieval successful.")
except Exception as e:
    # If there is any error, print it out
    print(f"An error occurred: {e}")
print("got out here")

Length of dataset: 327
Image shape: (224, 224, 3)
Labels: tensor([ 98., 105., 123., 111.])
Image name: c:\Users\au616584\OneDrive - Aarhus Universitet\Datalogi\Deep_learning\License_plate\dataset\license_plate\data\0031-1_1-315&547_397&579-397&579_315&577_315&547_397&549-0_0_9_9_24_32_26-128-19.jpg
Item retrieval successful.
got out here


In [5]:
import numpy as np
import cv2
import matplotlib.pyplot as plt


def plot_img_and_boundingbox(image, labels):
    numpy_array = image.cpu().detach().numpy()

    cv2_image = cv2.cvtColor(numpy_array, cv2.COLOR_RGB2BGR)

    # Extract numerical values from tensors and convert to integers
    leftUp = (int(labels[0]),int(labels[1]))
    rightDown = (int(labels[2]),int(labels[3]))
    # Draw rectangle on the image
    new_image = cv2.rectangle(cv2_image, leftUp, rightDown, (255, 0, 0), 2)

    # Display the image
    plt.imshow(new_image)
    plt.show()

# scaled_labels = batch[1][0]
# plot_img_and_boundingbox(image, scaled_labels)

In [6]:
# model = torch.hub.load('pytorch/vision:v0.10.0', 'mobilenet_v2', pretrained=True)
# model = torch.hub.load('pytorch/vision:v0.10.0', 'mobilenet_v3_large', pretrained=True)
# model = torch.hub.load('NVIDIA/DeepLearningExamples:torchhub', 'nvidia_efficientnet_b0', pretrained=True)
model = torch.hub.load('pytorch/vision:v0.10.0', 'mobilenet_v3_large', pretrained=True)
net = nn.Sequential(
    nn.Linear(960, 100),
    nn.ReLU(),
    nn.Dropout(0.01),
    nn.Linear(100, out_features=4, bias=True),
)
model.classifier = net


Using cache found in C:\Users\au616584/.cache\torch\hub\pytorch_vision_v0.10.0
Downloading: "https://download.pytorch.org/models/mobilenet_v3_large-8738ca79.pth" to C:\Users\au616584/.cache\torch\hub\checkpoints\mobilenet_v3_large-8738ca79.pth
100%|██████████| 21.1M/21.1M [00:05<00:00, 3.93MB/s]


In [7]:
# net = nn.Sequential(
#     nn.Linear(1280, 1024),
#     nn.ReLU(),
#     nn.Dropout(0.5),
#     nn.ReLU(),
#     nn.Linear(1024, out_features=4, bias=True),
# )
net = nn.Sequential(
    nn.Linear(960, 100),
    nn.ReLU(),
    nn.Dropout(0.5),
    nn.BatchNorm1d(100),
    nn.Linear(100, out_features=4, bias=True),
)
model.classifier = net

In [8]:
#Sets model to training mode,e this is important for layers that have different functionalities depending on training or eval. for example BatchNorm
model.train()

MobileNetV3(
  (features): Sequential(
    (0): ConvBNActivation(
      (0): Conv2d(3, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(16, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
      (2): Hardswish()
    )
    (1): InvertedResidual(
      (block): Sequential(
        (0): ConvBNActivation(
          (0): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=16, bias=False)
          (1): BatchNorm2d(16, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
          (2): ReLU(inplace=True)
        )
        (1): ConvBNActivation(
          (0): Conv2d(16, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (1): BatchNorm2d(16, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
          (2): Identity()
        )
      )
    )
    (2): InvertedResidual(
      (block): Sequential(
        (0): ConvBNActivation(
          (0): Conv2d(16, 64, kernel_size=(1, 1), stride=

In [9]:
layers_of_classifier = 4
for i, param in enumerate(model.parameters()):
    if i < len(list(model.parameters())) - layers_of_classifier:
        param.requires_grad = False
    else:
        param.requires_grad = True

number_of_layers = len(list(enumerate(model.parameters())))
print(f"Total number of layers is {number_of_layers}")
print(f"Number of pretrained base layers is {number_of_layers - layers_of_classifier}")

Total number of layers is 176
Number of pretrained base layers is 172


In [10]:
def check_requires_grad():
  for name, param in model.named_parameters(): # Just to check
    if name.startswith('classifier'):
        print(f'Layer {name} - requires_grad: {param.requires_grad}')
    if name.startswith('features'):
        print(f'Layer {name} - requires_grad: {param.requires_grad}')
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# check_requires_grad()

In [11]:
from datetime import datetime
def bb_intersection_over_union(boxA, boxB): # https://pyimagesearch.com/2016/11/07/intersection-over-union-iou-for-object-detection/
	# determine the (x, y)-coordinates of the intersection rectangle
	xA = max(boxA[0], boxB[0])
	yA = max(boxA[1], boxB[1])
	xB = min(boxA[2], boxB[2])
	yB = min(boxA[3], boxB[3])
	# compute the area of intersection rectangle
	interArea = max(0, xB - xA + 1) * max(0, yB - yA + 1)
	# compute the area of both the prediction and ground-truth
	# rectangles
	boxAArea = (boxA[2] - boxA[0] + 1) * (boxA[3] - boxA[1] + 1)
	boxBArea = (boxB[2] - boxB[0] + 1) * (boxB[3] - boxB[1] + 1)
	# compute the intersection over union by taking the intersection
	# area and dividing it by the sum of prediction + ground-truth
	# areas - the interesection area
	iou = interArea / float(boxAArea + boxBArea - interArea)
	# return the intersection over union value
	return iou

def calculate_true_positives_in_batch(y_pred, y_batch):
  threshold = 0.5
  true_positive_amount = 0
  for i in range(y_pred.shape[0]):
    y_pred_numpy = y_pred[i].cpu().detach().numpy()
    y_batch_numpy = y_batch[i].cpu().detach().numpy()
    iou = bb_intersection_over_union(y_pred_numpy, y_batch_numpy)
    if iou > threshold:
      true_positive_amount +=1
  return true_positive_amount

#wrapper of tqdm to enable toggling it off easily
def toggle_tqdm(iterable, use_tqdm=True):
  if use_tqdm:
    return tqdm(iterable)
  else:
    return iterable

def calculate_loss_and_accuracy(loader):
  total_loss = 0
  total_true_positives = 0
  total_samples = 0

  for X_batch_, y_batch, img_name in loader:
      X_batch = X_batch_.permute(0,3,1,2).to(device)
      y_batch = y_batch.to(device)
      y_pred = model(X_batch)

      total_true_positives += calculate_true_positives_in_batch(y_pred, y_batch)

      loss = loss_fn(y_pred, y_batch)
      total_loss += loss.item() * len(y_batch)  # Accumulate scaled loss
      total_samples += len(y_batch)  # Accumulate number of samples
  sample_loss = total_loss / total_samples  # Calculate average loss
  accuracy = total_true_positives / total_samples
  return sample_loss, accuracy

def save_model(model):
  # Get the current time
  current_time = datetime.now()

  # Format the time in a 'YearMonthDay_HourMinuteSecond' format for the filename
  timestamp = current_time.strftime('%Y%m%d_%H%M')
  model_path = f'/content/gdrive/MyDrive/license_plate/models/model_weights_{timestamp}.pth'
  torch.save(model.state_dict(), model_path)

def load_model(path):
  model.load_state_dict(torch.load(path))


In [12]:
# load_model('/content/gdrive/MyDrive/license_plate/models/model_weights_20231106_1410.pth')


In [13]:
scaler = torch.cuda.amp.GradScaler()

# Determine hardware
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("using device: ", device)

# Define dataloader for training
data_loader = labelFpsDataLoader([data_path],(500,1000))
train_loader = DataLoader(data_loader, batch_size=16, shuffle=True, num_workers=2)

test_loader = labelFpsDataLoader([test_path],(500,1000))
validation_loader = DataLoader(test_loader, batch_size=64, shuffle=True, num_workers=2)
# Determine subset?? (mabeto??)
subset = not is_running_on_colab
subset = True
subset_data = Subset(data_loader, indices=range(327))
subset_val = Subset(test_loader, indices=range(100))
if subset:
  print("Running on subset!")
  train_loader =DataLoader(subset_data, batch_size=16, shuffle=True, num_workers=2, pin_memory=True)
  train_loader_finetuning =DataLoader(subset_data, batch_size=16, shuffle=True, num_workers=2, pin_memory=True)
  validation_loader = DataLoader(subset_val, batch_size=16, shuffle=True, num_workers=2)

decoder_epochs = 1
n_epochs = 1
# dataiter = iter(trainloader)
# first_batch = next(dataiter)

# # Now you can access the data and target tensors
# X_batch, y_batch, img_name = first_batch

# print(X_batch.shape)
# Define dataloader for validation

# Define model and hyperparameters
loss_fn = nn.MSELoss()
optimizer_decoder = optim.AdamW(model.parameters(), lr=0.001)
#We believe the scheduler LR overrules adam?
scheduler_decoder = optim.lr_scheduler.CyclicLR(optimizer_decoder,base_lr=0.01, max_lr=5, cycle_momentum=False)

# scheduler_decoder = optim.lr_scheduler.OneCycleLR(optimizer_decoder, max_lr=0.01,total_steps=decoder_epochs)

optimizer_full = optim.AdamW(model.parameters(), lr=0.00005)
# scheduler_full = optim.lr_scheduler.OneCycleLR(optimizer_decoder, max_lr=0.001,total_steps=n_epochs)
scheduler_full = optim.lr_scheduler.CyclicLR(optimizer_decoder,base_lr=0.0001, max_lr=0.1, cycle_momentum=False)
model.train()
model.to(device)

use_tqdm = True
# Train
def train_model(n_epochs, optimizer, scheduler, dataloader,validation_loader):
  test_loss_history = []
  val_loss_history = []
  test_accuracy_history = []
  val_accuracy_history = []
  i = 0
  for epoch in toggle_tqdm(range(n_epochs), use_tqdm):
    # print("epoch:", i)
    for X_batch, y_batch, img_name in toggle_tqdm(dataloader, False):
      optimizer.zero_grad()
      X_batch = X_batch.permute(0,3,1,2).to(device) # Needs to have shape [batch_size, channels, height, width]
      y_batch = y_batch.to(device)
      with torch.cuda.amp.autocast():
        y_pred = model(X_batch)
        loss = loss_fn(y_pred,y_batch)
      scaler.scale(loss).backward()
      scaler.step(optimizer)
      scaler.update()
      # loss.backward()
      # optimizer.step()
    scheduler.step()
    # In-sample & out-sample loss calculation
    model.eval()  # Switch to evaluation mode to disable features like dropout
    #time the following

    if epoch % 10 == 0:
      with torch.no_grad():  # Disable gradient calculation to save memory
          # In sample
          in_sample_loss, in_sample_accuracy = calculate_loss_and_accuracy(dataloader)
          test_loss_history.append(in_sample_loss)
          test_accuracy_history.append(in_sample_accuracy)
          # print(f'Epoch {epoch+1}, In-sample Loss : {in_sample_loss:.4f}, In-sample accuracy : {in_sample_accuracy:.4f}' )

          # Out sample
          out_sample_loss, out_sample_accuracy = calculate_loss_and_accuracy(validation_loader)
          val_loss_history.append(out_sample_loss)
          val_accuracy_history.append(out_sample_accuracy)
          tqdm.write(f'Epoch {epoch+1}, Out-sample Loss: {out_sample_loss:.4f}, Out-sample accuracy : {out_sample_accuracy:.4f}')
          tqdm.write(f'Epoch {epoch+1}, In-sample Loss: {in_sample_loss:.4f}, In-sample Accuracy: {in_sample_accuracy:.4f}')
    i += 1

  return test_loss_history, val_loss_history, test_accuracy_history, val_accuracy_history

# Train decoder (initial)
train_model(decoder_epochs, optimizer_decoder, scheduler_decoder,train_loader, validation_loader)
save_model(model)
print("DECODER TRAINING DONE")
# Unfreeze encoder
for i, param in enumerate(model.parameters()):
  param.requires_grad = True

# Train encoder
test_loss_history, val_loss_history, test_accuracy_history, val_accuracy_history = train_model(n_epochs, optimizer_full, scheduler_full,train_loader_finetuning, validation_loader)





using device:  cpu
Running on subset!




  0%|          | 0/1 [00:00<?, ?it/s]

RuntimeError: DataLoader worker (pid(s) 31288, 25520) exited unexpectedly

In [None]:
save_model(model)


In [None]:
# initial_lr = 0.00001
# lr_epochs = 100
# optimizer = optim.SGD(model.parameters(), lr=initial_lr)


# lr_hist = []
# train_loss_history = []

# def train_model(n_epochs):
#   i = 0
#   lr = initial_lr
#   for epoch in toggle_tqdm(range(n_epochs), use_tqdm):
#     # print("epoch:", i)
#     for X_batch, y_batch, img_name in toggle_tqdm(trainloader, False):
#       agg_loss = 0
#       X_batch = X_batch.permute(0,3,1,2).to(device) # Needs to have shape [batch_size, channels, height, width]
#       y_batch = y_batch.to(device)
#       y_pred = model(X_batch)
#       loss = loss_fn(y_pred,y_batch)
#       optimizer.zero_grad()
#       loss.backward()
#       optimizer.step()
#       agg_loss += loss.item()
#       train_loss_history.append(agg_loss)

#       lr_hist.append(lr)

#       lr = 1.1*lr
#       for g in optimizer.param_groups:
#         g['lr'] = lr
#       if lr >= 0.0005:
#         break

# train_model(3)




In [None]:
plt.figure(figsize=(10, 5))
plt.xscale('log')  # Set the y-axis to a logarithmic scale

plt.plot(lr_hist,train_loss_history, label='Train Loss')
plt.xlabel('LR')
plt.ylabel('Loss')
plt.grid(True)
plt.legend()  # Add a legend to differentiate between train and validation loss
plt.show()
print(train_loss_history)
print(lr_hist)

In [None]:
plt.figure(figsize=(10, 5))
#plt.yscale('log')  # Set the y-axis to a logarithmic scale
plt.plot(test_loss_history, label='Train Loss')
plt.plot(val_loss_history, label='Validation Loss')
plt.title('Loss Curve')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.grid(True)
plt.legend()  # Add a legend to differentiate between train and validation loss
plt.show()
print(val_accuracy_history)

In [None]:
#print a trainset with predicted label
# train_loader = DataLoader(data_loader, batch_size=1, shuffle=True, num_workers=1)

train_iter = iter(train_loader)

batch = next(train_iter)

X_batch, y_batch, _ = batch
print(X_batch.shape)
print(y_batch.shape)

image = batch[0][0]
labels = batch[1][0]

# Get predicted labels
X_batch = X_batch.permute(0,3,1,2).to(device)
print(X_batch.shape)
pred = model(X_batch)

plot_img_and_boundingbox(image, labels)
plot_img_and_boundingbox(image, pred[0])

In [None]:
# Print a validation image with its predicted labels

validationloader = DataLoader(test_loader, batch_size=1, shuffle=True, num_workers=1)

validationloader_iter = iter(validationloader)

batch = next(validationloader_iter)

X_batch, y_batch, _ = batch
print(X_batch.shape)
print(y_batch.shape)

image = batch[0][0]
labels = batch[1][0]

# Get predicted labels
X_batch = X_batch.permute(0,3,1,2).to(device)
print(X_batch.shape)
pred = model(X_batch)

plot_img_and_boundingbox(image, labels)

In [None]:
plot_img_and_boundingbox(image,pred[0])
print(pred)
print(calculate_true_positives_in_batch(labels.view(1,4),pred[0].view(1,4))) # Need to change view since we dont have batch