## Import libraries

In [None]:
%%capture
!pip install --quiet wandb
import wandb
from wandb.keras import WandbCallback

import tensorflow as tf
from tensorflow import keras
from keras.utils.vis_utils import plot_model
from keras.preprocessing import image
from sklearn.metrics import ConfusionMatrixDisplay, confusion_matrix
from sklearn.preprocessing import LabelBinarizer
import matplotlib.pyplot as plt
%matplotlib inline

import os
import cv2
import mmap
import pandas
import numpy as np
import subprocess
from tqdm.notebook import tqdm

from google.colab.patches import cv2_imshow
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

## Data

In [None]:
def wc_count(dataset_path: str) -> int:
	file_path = os.path.join(OCD_DATA_DIR, 'splits', dataset_path, "all.txt")
	out = subprocess.Popen(['wc', '-l', file_path],
												stdout=subprocess.PIPE,
												stderr=subprocess.STDOUT
												).communicate()[0]
	return int(out.partition(b' ')[0])

In [None]:
# TODO shuffle data

def image_generator(input_path: str, bs: int, mode="train", img_size=150, aug=None):
	file_path = os.path.join(OCD_DATA_DIR, 'splits', input_path, "all.txt")
	f = open(file_path, "r")

	while True:
		images = []
		labels = []

		while len(images) < bs:
			line = f.readline()
			# check to see if the line is empty, indicating we have reached the end of the file
			if line == "":
				# reset the file pointer to the beginning of the file and re-read the line
				f.seek(0)
				line = f.readline()
				# if eval break from loop so we don't fill up the batch from samples at the beginning of file
				if mode == "eval":
					break

			# extract the image path and label
			image_path, label = line.strip().split(" ")
	 		print(image_path)
			image_path = os.path.join(OCD_DATA_DIR, input_path, image_path)
	 		# read image data
			image = cv2.imread(image_path, cv2.IMREAD_UNCHANGED) / 255
			# reshape images to preferred size
			image = cv2.resize(image, (img_size, img_size))

			images.append(image)
			labels.append(label)

		# if the data augmentation object is not None, apply it
		if aug is not None:
			(images, labels) = next(aug.flow(np.array(images), labels, batch_size=bs))

		labels = np.array(labels).astype('float32')

		# yield the batch to the calling function
		yield (np.array(images), labels)

In [None]:
# NUM_TRAIN_IMAGES = 0

# file_path = os.path.join(root_dir, splits_dir, CNRParkAB, "all.txt")

# # initialize the unique set of class labels in the dataset
# f = open(file_path, "r")
# labels = set()

# for line in f:
#   label = line.strip().split(' ')[1]
#   labels.add(label)
#   NUM_TRAIN_IMAGES += 1

# f.close()

# lb = LabelBinarizer()
# lb.fit(list(labels))

## RUN

In [None]:
def get_test_data(input_path, img_size=150):
  file_path = os.path.join(OCD_DATA_DIR, 'splits', input_path, "all.txt")
  f = open(file_path, "r")
  images = []
  labels = []

  while len(images) < 100:
    line = f.readline()
    # extract the image path and label
    image_path, label = line.strip().split(" ")	
    image_path = os.path.join(OCD_DATA_DIR, input_path, image_path)
    # read image data
    image = cv2.imread(image_path, cv2.IMREAD_UNCHANGED) / 255
    # reshape images to preferred size
    image = cv2.resize(image, (img_size, img_size))

    images.append(image)
    labels.append(label)

  # labels = lb.transform(np.array(labels))
  return np.array(images), np.array(labels).astype('float32')

In [None]:
run = wandb.init(project='DP', entity='michaelkrocka',
           config={
              "learning_rate": 0.001,
              "epochs": 1,
              "batch_size": 32,
              "loss_function": "binary_crossentropy",
              "optimizer": "Adam",
              "architecture": "CNN",
              "dataset": "PKLot"
           })
config = wandb.config

<IPython.core.display.Javascript object>

[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /root/.netrc


242 * 10 +- 10 somewhere there is a NoneType

In [None]:
generator = image_generator(PKLot_PATH, config.batch_size)
NUM_TRAIN_IMAGES = wc_count(PKLot_PATH)

history = loaded_model.fit(
	x = generator,
	steps_per_epoch = NUM_TRAIN_IMAGES // 10,
	epochs = config.epochs,
	callbacks = [WandbCallback()])

wandb.save('occupancy_model.h5')
run.finish()

In [None]:
X_test, y_test = get_test_data(PKLot_PATH)
y_pred = loaded_model.predict(X_test)
y_pred = np.where(y_pred > 0.5, 1, 0)

y_test
cm = confusion_matrix(y_test, y_pred)

labels = np.array([0, 1])
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)

disp.plot(cmap=plt.cm.Blues)
plt.show()

# Pilot app

1.   Init
  1.   Init paths (keras model weights, image + mask, output?)
  2.   <font color=green>Load occupancy detection model</font>
  3.   Segment parking lot into parking spaces using Mask
2.   Main
  1.   Create main
      1.   Model detects occupied / empty
      2.   If occupied start LP detection pipeline
           1.   Described in ALPR
  2.   Loop while images in input



In [None]:
def init() -> None:
  # initialize directory paths
  global OCD_DATA_DIR, CNRParkAB_PATH, CNRParkEXT_PATH, PKLot_PATH
  DIR_PREFIX = 'drive/MyDrive/DP'
  OCD_DATA_DIR = os.path.join(DIR_PREFIX, 'Datasets/OccupancyDetection')
  CNRParkAB_PATH = 'CNRParkAB'
  CNRParkEXT_PATH = 'CNRPark-EXT'
  PKLot_PATH = 'PKLot/PKLotSegmented'
  MODEL_PATH = os.path.join(DIR_PREFIX, 'Models/keras_mAlexNet/out_keras.h5')

  # verify files exist
  for d_path in [CNRParkAB_PATH, CNRParkEXT_PATH, PKLot_PATH]:
    missing_files = check_all_files_exist(d_path)
    if missing_files: print(d_path, 'files missing:', missing_files)

  # load keras model
  global loaded_model, target_img_size
  loaded_model = keras.models.load_model(MODEL_PATH)
  target_img_size = loaded_model.inputs[0].shape[1:-1]

  global labels_mapping
  labels_mapping = {0: 'free', 1: 'occupied'}

In [None]:
def get_num_lines(dataset_path: str) -> int:
  file_path = os.path.join(OCD_DATA_DIR, 'splits', dataset_path, 'all.txt')
  fp = open(file_path, "r+")
  buf = mmap.mmap(fp.fileno(), 0)
  lines = 0
  while buf.readline():
    lines += 1

  return lines

In [None]:
def check_all_files_exist(dataset_path: str) -> list:
  invalid_paths = list()
  with open(os.path.join(OCD_DATA_DIR, 'splits', dataset_path, 'all.txt')) as f:
    for line in tqdm(f, total=get_num_lines(dataset_path)):
      image_path = os.path.join(OCD_DATA_DIR, dataset_path, line.split()[0])
      if not os.path.exists(image_path):
        invalid_paths.append(image_path)
        print(image_path)

  return invalid_paths

In [None]:
def preprocess_parking_space(directory: str, filename: str, show_img=False) -> np.ndarray:
  f = os.path.join(directory, filename) if directory is not None else filename
  img = image.load_img(f, target_size=target_img_size)
  if show_img:
    plt.imshow(img)
    plt.axis('off')
    plt.show()
  img_array = image.img_to_array(img)
  img_array = img_array * 1. / 256 # convert to float and normalize
  img_batch = np.expand_dims(img_array, axis=0)

  return img_batch

In [None]:
def print_formatted_model_output(filename: str, prediction: dict) -> None:
  print(filename.replace('.jpg', ''), '\t', labels_mapping[np.argmax(prediction)])

In [None]:
# OCD_DATA_DIR in the future -> too large

# def main() -> None:
#   for filename in os.listdir(OCD_DATA_DIR):
#     img = preprocess_parking_space(OCD_DATA_DIR, filename) # show_img=True
#     prediction = loaded_model.predict(img)
#     print_formatted_model_output(filename, prediction)
#     # TODO license plate detection
#     # if labels_mapping[np.argmax(prediction)] == 'occupied':
#     #   print('TODO :)')

In [None]:
init()
# main()

  0%|          | 0/12584 [00:00<?, ?it/s]

  0%|          | 0/144965 [00:00<?, ?it/s]

  0%|          | 0/695851 [00:00<?, ?it/s]

# Accuracy testing

*   CNRParkAB {'correct': 11193, 'incorrect': 1391} -> 88.9% accuracy
*   CNRParkEXT {'correct': 135149, 'incorrect': 9816} -> 93.2% accuracy
*   PKLot_SUM -> {'correct': 543178, 'incorrect': 152583} -> 78.1% accuracy
   *   PUC -> {'correct': 348010, 'incorrect': 76213} -> 82.0% accuracy
   *   UFPR04 -> {'correct': 60142, 'incorrect': 45701} -> 56.8% accuracy
   *   UFPR05 -> {'correct': 135026, 'incorrect': 30759} -> 81.5% accuracy

In [None]:
def test_accuracy_on_dataset(dataset_path:str, subdir_path:str="") -> dict:
  with open(os.path.join(OCD_DATA_DIR, 'splits', dataset_path, 'all.txt')) as f:
    true_labels_dict = dict([line.split() for line in f])

  pred_labels_dict = {'correct': 0, 'incorrect': 0}
  dir_path = os.path.join(OCD_DATA_DIR, dataset_path)

  for filename in tqdm([os.path.join(dp, f) for dp, dn, fn in os.walk(os.path.expanduser(os.path.join(dir_path, subdir_path))) for f in fn]):
    img = preprocess_parking_space(None, filename) # show_img=True
    prediction = loaded_model.predict(img)
    if str(np.argmax(prediction)) == true_labels_dict[filename.replace(dir_path + '/', '')]:
      pred_labels_dict['correct'] += 1
    else:
      pred_labels_dict['incorrect'] += 1

  return pred_labels_dict

In [None]:
# print(CNRParkAB_PATH, test_accuracy_on_dataset(CNRParkAB_PATH))
# print(CNRParkEXT_PATH, test_accuracy_on_dataset(CNRParkEXT_PATH))
# print(PKLot_PATH + 'PUC', test_accuracy_on_dataset(PKLot_PATH, 'PUC'))
# print(PKLot_PATH + 'UFPR04', test_accuracy_on_dataset(PKLot_PATH, 'UFPR04'))
# print(PKLot_PATH + 'UFPR05', test_accuracy_on_dataset(PKLot_PATH, 'UFPR05'))

# Segmentation mask
In Ubuntu as .py files.