In [None]:
!pip install --upgrade pip
!pip install --upgrade --force-reinstall tflite_model_maker
!sudo apt-get -y install libusb-1.0-0-dev
!pip install pycocotools
!pip install gdown

In [None]:
import os
from datetime import datetime

from math import floor
from pathlib import Path
import pandas as pd
import numpy as np
import tensorflow as tf
from tflite_model_maker.config import QuantizationConfig
from tflite_model_maker.config import ExportFormat
from tflite_model_maker import model_spec
from tflite_model_maker import object_detector
from numba import cuda
from absl import logging

import yaml
import PIL

In [None]:
!apt-get zip unzip
!gdown 1tq7vkIzMnezhOsWurwRYEaaQ-QhvQpFK
!unzip vedai_corrected.zip

In [None]:
!nvidia-smi

In [None]:
device = cuda.get_current_device() 
device.reset()

In [None]:
def decode_img(img_path, img_height=1024, img_width=1024, convert_dtype=True):
    # Convert the compressed string to a 3D uint8 tensor
    img = tf.io.read_file(img_path)
    img = tf.io.decode_jpeg(img, channels=3)
    img = tf.image.resize(img, [img_height, img_width])
    if convert_dtype:
        img = tf.image.convert_image_dtype(img, dtype=tf.uint8, saturate=False)
    img = tf.reshape(img, [1, img_height, img_width, 3])
    # Resize the image to the desired size
    return img

def create_nms_bbox(test_img, detector_output):
    suppressed_idx = tf.image.non_max_suppression(
    tf.squeeze(detector_output["detection_boxes"]),
    tf.squeeze(detector_output["detection_scores"]),
    max_output_size=tf.constant(500),
    iou_threshold=0.3,
    score_threshold=float('-inf'),
    name=None
    )

    bounding_boxes_suppressed = tf.gather(tf.squeeze(detector_output["detection_boxes"]), suppressed_idx).numpy().squeeze()
    labels_suppressed = tf.gather(tf.squeeze(detector_output["detection_classes"]), suppressed_idx).numpy().astype(int).astype(str).squeeze().tolist()
    draw_bounding_boxes_on_image(test_img, bounding_boxes_suppressed, color='red', thickness=4, display_str_list_list=labels_suppressed)
    return test_img

# VEDAI Dataset creation

In [None]:
CORRECTED_PATH = os.path.join("vedai_corrected")
ANNOTATIONS_DIR = os.path.join(CORRECTED_PATH, "annotations")
ANNOTATIONS_MERGED_FILE_PATH = os.path.join(CORRECTED_PATH, "merged_annotations.csv")
IMAGES_DIR = os.path.join(CORRECTED_PATH, "images")
BACKBONE = 'efficientdet_lite4'
CLASSES_DICT = {1: "car",
                2: "truck",
                3: "pickup",
                4: "tractor",
                5: "camping car",
                6: "boat",
                7: "motorcycle",
                8: "bus",
                9: "van",
                10: "other",
                11: "small plane",
                12: "large plane"}

def merge_annotation_folder(annotations_dir, classes_dict, train_proportion=0.8, test_proportion=0.1):
    if train_proportion+test_proportion>=1:
        raise ValueError("Input a train and test proportion summing up to strictly less than 1.")
    annotation_files = list(Path(annotations_dir).rglob('*.txt'))
    indices = list(range(len(annotation_files)))
    files = [str(idx) + '.txt' for idx in indices]
    # else:
    #   files = [filename for filename in list(sorted(os.listdir(self.annotations_dir))) if filename.endswith('.txt')]
    #   indices = [convert_id_to_idx(filename.replace('.txt', '')) for filename in files]
    abs_filepaths = [os.path.join(annotations_dir, annotation_file) for annotation_file in files]
    annotations = pd.DataFrame(columns=["dataset", "x", "y", "width", "length", "idx", "empty"])
    # for img_file, filepath in zip(files, abs_filepaths):
    for idx, img_file in zip(indices, abs_filepaths):
        temp_annotation = pd.read_csv(img_file, sep=' ', names=["x", "y", "width", "length"]).reset_index(drop=False)
    # temp_annotation["image_id"] = img_file.split('.')[0]
    temp_annotation["idx"] = idx
    annotations = pd.concat([annotations, temp_annotation])
    annotations = annotations.rename(columns={"index":"labels"})
    annotations["labels"] = (annotations["labels"] + 1).astype(int)
    # annotations.index.name = None
    annotations["labels_name"] = annotations["labels"].replace(classes_dict)
    annotations["x_min"] = (annotations["x"] - annotations["width"]/2)
    annotations["y_min"] = (annotations["y"] - annotations["length"]/2)
    annotations["x_max"] = (annotations["x"] + annotations["width"]/2)
    annotations["y_max"] = (annotations["y"] + annotations["length"]/2)
    annotations["image_path"] = annotations["idx"].astype(str) + ".jpg"
    # Train test split
    images_idx = annotations.index.unique()
    train_idx = np.random.choice(images_idx, replace=False, size=floor(train_proportion*len(images_idx)))
    test_idx = np.random.choice(list(set(images_idx) - set(train_idx)), replace=False, size=floor(test_proportion*len(images_idx)))
    validation_idx = list(set(images_idx) - set(train_idx) - set(test_idx))
    annotations.loc[train_idx, "dataset"] = "TRAIN"
    annotations.loc[test_idx, "dataset"] = "TEST"
    annotations.loc[validation_idx, "dataset"] = "VALIDATION"
    annotations = annotations[["dataset", "image_path", "labels_name", "x_min", "y_min", "empty", "empty", "x_max", "y_max", "empty", "empty"]]
    return annotations

if not os.path.exists(ANNOTATIONS_MERGED_FILE_PATH):
    annotations = merge_annotation_folder(ANNOTATIONS_DIR, CLASSES_DICT, train_proportion=0.8, test_proportion=0.1)
    annotations.to_csv(ANNOTATIONS_MERGED_FILE_PATH, index=False, header=False)

In [None]:
# def create_tf_example(example):
#   # TODO(user): Populate the following variables from your example.
#   height = None # Image height
#   width = None # Image width
#   filename = None # Filename of the image. Empty if image is not from file
#   encoded_image_data = None # Encoded image bytes
#   image_format = None # b'jpeg' or b'png'

#   xmins = [] # List of normalized left x coordinates in bounding box (1 per box)
#   xmaxs = [] # List of normalized right x coordinates in bounding box
#              # (1 per box)
#   ymins = [] # List of normalized top y coordinates in bounding box (1 per box)
#   ymaxs = [] # List of normalized bottom y coordinates in bounding box
#              # (1 per box)
#   classes_text = [] # List of string class name of bounding box (1 per box)
#   classes = [] # List of integer class id of bounding box (1 per box)

#   tf_example = tf.train.Example(features=tf.train.Features(feature={
#       'image/height': dataset_util.int64_feature(height),
#       'image/width': dataset_util.int64_feature(width),
#       'image/filename': dataset_util.bytes_feature(filename),
#       'image/source_id': dataset_util.bytes_feature(filename),
#       'image/encoded': dataset_util.bytes_feature(encoded_image_data),
#       'image/format': dataset_util.bytes_feature(image_format),
#       'image/object/bbox/xmin': dataset_util.float_list_feature(xmins),
#       'image/object/bbox/xmax': dataset_util.float_list_feature(xmaxs),
#       'image/object/bbox/ymin': dataset_util.float_list_feature(ymins),
#       'image/object/bbox/ymax': dataset_util.float_list_feature(ymaxs),
#       'image/object/class/text': dataset_util.bytes_list_feature(classes_text),
#       'image/object/class/label': dataset_util.int64_list_feature(classes),
#   }))
#   return tf_example

# TF Lite - Detection Module

In [None]:
BATCH_SIZE = 32
IMG_HEIGHT = 1024
IMG_WIDTH = 1024
EPOCHS = 250
EXPORT_NAME = f"tflite_detector_{BACKBONE}_{str(datetime.now().date())}"

In [None]:
tf.get_logger().setLevel('ERROR')
logging.set_verbosity(logging.ERROR)

In [None]:
# !rm -rf /tmp/tmp*

In [None]:
# TO-DO : generate TFRecord files to accelerate loading
train_data, validation_data, test_data = object_detector.DataLoader.from_csv(os.path.join(ANNOTATIONS_MERGED_FILE_PATH), images_dir=IMAGES_DIR)

In [None]:
spec = model_spec.get(BACKBONE)
# spec.config.image_size = "1024x1024"
spec.config.num_classes = 13
spec.config.label_map = CLASSES_DICT

In [None]:
model = object_detector.create(train_data, model_spec=spec, batch_size=BATCH_SIZE, train_whole_model=False, epochs=EPOCHS, validation_data=validation_data)

In [None]:
config = QuantizationConfig.for_float16()
model.export(export_dir=f'./{EXPORT_NAME}', export_format=[ExportFormat.SAVED_MODEL, ExportFormat.TFLITE], quantization_config=config)

!apt-get install zip
!zip -r {EXPORT_NAME}.zip {EXPORT_NAME}

In [None]:
!zip -r tflite_detector.zip tflite_detector_efficientdet_lite4

In [None]:
model.evaluate(test_data)

# TF Classical

In [None]:
tf_image = decode_img(TEST_IMG_PATH)
image = ...  # A batch of preprocessed images with shape [batch_size, height, width, 3].
base_model = hub.KerasLayer("https://tfhub.dev/tensorflow/efficientdet/lite4/feature-vector/1")
cls_outputs, box_outputs = base_model(image, training=training)

In [None]:

detector_output = efficient_det_2(tf_image)

test_img = PIL.Image.open(TEST_IMG_PATH)

test_img = create_nms_bbox(test_img, detector_output)
test_img

In [None]:
m = tf.keras.Sequential([hub.KerasLayer("efficientdet_d6_1", trainable=True),
    tf.keras.layers.Dense(13, activation='softmax')
])

In [None]:
m.build([1, 1024, 1024, 3])
m.summary()

In [None]:
efficient_det_2 = hub.load("https://tfhub.dev/tensorflow/efficientdet/d6/1")

In [None]:
tf.keras.utils.plot_model(
    efficient_det_2,
    to_file="model.png",
    show_shapes=False,
    show_dtype=False,
    show_layer_names=True,
    rankdir="TB",
    expand_nested=False,
    dpi=96,
    layer_range=None,
    show_layer_activations=False,
)

In [None]:
# model_test = tf.load("detection_retinanet_spinenet-96.tar")
with open("spinenet96_retinanet.yaml", "r") as file:
    model_config = yaml.load(file, Loader=yaml.FullLoader)