In [None]:
!pip install wget

In [None]:
import os
import pathlib
import shutil
import subprocess
import tarfile
import wget
import zipfile

In [None]:
TF_ROOT = pathlib.Path('tf')
CUSTOM_MODEL_NAME = 'od_ssd_mobnet' 
PRETRAINED_MODEL_NAME = 'ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8'
PRETRAINED_MODEL_URL = 'http://download.tensorflow.org/models/object_detection/tf2/20200711/' + PRETRAINED_MODEL_NAME + '.tar.gz'
TF_RECORD_SCRIPT_NAME = 'generate_tfrecord.py'
LABEL_MAP_NAME = 'label_map.pbtxt'
PROTOC_VERSION = '3.19.1'
TENSORFLOW_VERSION = '2.6.2'

In [None]:
APIMODEL_PATH = TF_ROOT / 'models'
PROTOC_PATH = TF_ROOT / 'protoc'
SCRIPTS_PATH = TF_ROOT / 'scripts'
WORKSPACE_PATH = TF_ROOT / 'workspace'
ANNOTATION_PATH = TF_ROOT / 'workspace' / 'annotations'
IMAGE_PATH = TF_ROOT / 'workspace' / 'images'
MODEL_PATH =  TF_ROOT / 'workspace' / 'models'
PRETRAINED_MODEL_PATH = TF_ROOT / 'workspace' / 'pre-trained-models'
CHECKPOINT_PATH = TF_ROOT / 'workspace' / 'models' / CUSTOM_MODEL_NAME
OUTPUT_PATH = TF_ROOT / 'workspace' / 'models' / CUSTOM_MODEL_NAME / 'export'
TFJS_PATH = TF_ROOT / 'workspace' / 'models'/ CUSTOM_MODEL_NAME / 'tfjsexport'
TFLITE_PATH = TF_ROOT / 'workspace' / 'models' / CUSTOM_MODEL_NAME / 'tfliteexport'

paths = {
    APIMODEL_PATH,
    PROTOC_PATH,
    SCRIPTS_PATH,
    WORKSPACE_PATH,
    ANNOTATION_PATH,
    IMAGE_PATH,
    MODEL_PATH,
    PRETRAINED_MODEL_PATH,
    CHECKPOINT_PATH,
    OUTPUT_PATH,
    TFJS_PATH,
    TFLITE_PATH
}

In [None]:
PIPELINE_CONFIG = CHECKPOINT_PATH / 'pipeline.config'
TF_RECORD_SCRIPT = SCRIPTS_PATH / TF_RECORD_SCRIPT_NAME
LABELMAP = ANNOTATION_PATH / LABEL_MAP_NAME

files = {
    PIPELINE_CONFIG,
    TF_RECORD_SCRIPT,
    LABELMAP
}

In [None]:
for path in paths:
    path.mkdir(exist_ok=True)

### Clone Tensorflow models

In [None]:
if not APIMODEL_PATH.joinpath('research', 'object_detection').exists():
    !git clone https://github.com/tensorflow/models {str(APIMODEL_PATH)}

### Install Tensorflow and object detection library

In [None]:
protoc_zip = 'protoc-' + PROTOC_VERSION + '-win64.zip'
wget.download('https://github.com/protocolbuffers/protobuf/releases/download/v' + PROTOC_VERSION + '/' + protoc_zip)

with zipfile.ZipFile(protoc_zip, 'r') as zip_ref:
    zip_ref.extractall(str(PROTOC_PATH))

os.remove(protoc_zip)
os.environ['PATH'] += os.pathsep + str(PROTOC_PATH.joinpath('bin').resolve())

In [None]:
!pip install tensorflow-gpu=={TENSORFLOW_VERSION}
subprocess.run(['protoc', 'object_detection/protos/*.proto', '--python_out=.'], cwd=str(TF_ROOT / 'models' / 'research'))

In [None]:
shutil.copyfile(TF_ROOT / 'models' / 'research' / 'object_detection' / 'packages' / 'tf2' / 'setup.py', TF_ROOT / 'models' / 'research' / 'setup.py')
result = subprocess.run(['pip', 'install', '-e', '.'], cwd=str(TF_ROOT / 'models' / 'research'), capture_output=True, text=True)
print(result.stdout)

### Verify installtion of object detection library

In [None]:
VERIFICATION_SCRIPT = APIMODEL_PATH / 'research' / 'object_detection' / 'builders' / 'model_builder_tf2_test.py'
!python {str(VERIFICATION_SCRIPT)}

## Prepare training

In [None]:
model_tar = PRETRAINED_MODEL_NAME + '.tar.gz'
wget.download(PRETRAINED_MODEL_URL)

with tarfile.open(model_tar, 'r:gz') as tar_ref:
    tar_ref.extractall(path=PRETRAINED_MODEL_PATH)

os.remove(model_tar)

### Create label map

In [None]:
labels = [{'name':'SmallYellowMachine', 'id':1}, {'name':'BigYellowMachine', 'id':2}]
with LABELMAP.open('w') as lm_file:
    for label in labels:
        lm_file.write('item { \n')
        lm_file.write('\tname:\'{}\'\n'.format(label['name']))
        lm_file.write('\tid:{}\n'.format(label['id']))
        lm_file.write('}\n')

### Generate tf records from image annotations

In [None]:
from object_detection.utils import label_map_util

In [None]:
import io
import pandas as pd
import tensorflow as tf
import xml.etree.ElementTree as et

from collections import namedtuple
from object_detection.utils import label_map_util
from PIL import Image

In [None]:
def gen_tfrecord(xml_dir, output_path):
    # Convert annotation xml files to panda data frame
    rows = []
    for xml_file in xml_dir.glob('**/*.xml'):
        root = et.parse(xml_file).getroot()
        for annot in root.findall('object'):
            filename = root.find('filename').text
            
            size = root.find('size')
            width = int(size[0].text)
            height = int(size[1].text)
            
            label_name = annot[0].text
            bbox = annot[4]
            bbox_ymin = int(bbox[0].text)
            bbox_ymax = int(bbox[1].text)
            bbox_xmin = int(bbox[2].text)
            bbox_xmax = int(bbox[3].text)
            
            rows.append((filename,
                         width,
                         height,
                         label_name,
                         bbox_ymin,
                         bbox_ymax,
                         bbox_xmin,
                         bbox_xmax))
            
    column_name = ['filename',
                   'width',
                   'height',
                   'class',
                   'xmin',
                   'ymin',
                   'xmax',
                   'ymax']
    
    df = pd.DataFrame(rows, columns=column_name)
    
    data = namedtuple('data', ['filename', 'object'])
    gb = df.groupby('filename')
    grouped = [data(filename, gb.get_group(x)) for filename, x in zip(gb.groups.keys(), gb.groups)]
        
    label_map = label_map_util.get_label_map_dict(label_map_util.load_labelmap(LABELMAP))
    
    with tf.io.TFRecordWriter(str(output_path)) as writer:
        for group in grouped:
            with tf.io.gfile.GFile(xml_dir.joinpath(group.filename), 'rb') as fid:
                enc_jpg = fid.read()
                
            enc_jpg_io = io.BytesIO(enc_jpg)
            img_width, img_height = Image.open(enc_jpg_io).size

            filename = group.filename.encode('utf8')
            img_format = b'jpg'
            xmins = []
            xmaxs = []
            ymins = []
            ymaxs = []
            classes_text = []
            classes = []

            for index, row in group.object.iterrows():
                xmins.append(row['xmin'] / img_width)
                xmaxs.append(row['xmax'] / img_width)
                ymins.append(row['ymin'] / img_height)
                ymaxs.append(row['ymax'] / img_height)
                classes_text.append(row['class'].encode('utf8'))
                classes.append(label_map[row['class']])

            tf_example = tf.train.Example(features=tf.train.Features(feature={
                'image/height': dataset_util.int64_feature(img_width),
                'image/width': dataset_util.int64_feature(img_height),
                'image/filename': dataset_util.bytes_feature(filename),
                'image/source_id': dataset_util.bytes_feature(filename),
                'image/encoded': dataset_util.bytes_feature(enc_jpg),
                'image/format': dataset_util.bytes_feature(img_format),
                'image/object/bbox/xmin': dataset_util.float_list_feature(xmins),
                'image/object/bbox/xmax': dataset_util.float_list_feature(xmaxs),
                'image/object/bbox/ymin': dataset_util.float_list_feature(ymins),
                'image/object/bbox/ymax': dataset_util.float_list_feature(ymaxs),
                'image/object/class/text': dataset_util.bytes_list_feature(classes_text),
                'image/object/class/label': dataset_util.int64_list_feature(classes),
            }))
            writer.write(tf_example.SerializeToString())


In [None]:
gen_tfrecord(IMAGE_PATH / 'train', ANNOTATION_PATH / 'train.record')
gen_tfrecord(IMAGE_PATH / 'test', ANNOTATION_PATH / 'test.record')

### Update pipeline config for model

In [None]:
shutil.copyfile(PRETRAINED_MODEL_PATH / PRETRAINED_MODEL_NAME / 'pipeline.config', CHECKPOINT_PATH / 'pipeline.config')

In [None]:
from object_detection.protos import pipeline_pb2
from google.protobuf import text_format

In [None]:
pipeline_config = pipeline_pb2.TrainEvalPipelineConfig()
with tf.io.gfile.GFile(PIPELINE_CONFIG, 'r') as f:
    proto_str = f.read()
    text_format.Merge(proto_str, pipeline_config)
    
pipeline_config.model.ssd.num_classes = len(labels)
pipeline_config.train_config.batch_size = 2
pipeline_config.train_config.fine_tune_checkpoint = str(PRETRAINED_MODEL_PATH / PRETRAINED_MODEL_NAME / 'checkpoint' / 'ckpt-0')
pipeline_config.train_config.fine_tune_checkpoint_type = "detection"
pipeline_config.train_input_reader.label_map_path= str(LABELMAP)
pipeline_config.train_input_reader.tf_record_input_reader.input_path[:] = [str(ANNOTATION_PATH / 'train.record')]
pipeline_config.eval_input_reader[0].label_map_path = str(LABELMAP)
pipeline_config.eval_input_reader[0].tf_record_input_reader.input_path[:] = [str(ANNOTATION_PATH / 'test.record')]

config_text = text_format.MessageToString(pipeline_config)
with tf.io.gfile.GFile(PIPELINE_CONFIG, "wb") as f:
    f.write(config_text)

## Run training

In [None]:
TRAINING_SCRIPT = APIMODEL_PATH / 'research' / 'object_detection' / 'model_main_tf2.py'
command = "python {} --model_dir={} --pipeline_config_path={} --num_train_steps=2000".format(TRAINING_SCRIPT, str(CHECKPOINT_PATH), str(PIPELINE_CONFIG))
!{command}

## Run detection

In [None]:
import os
import tensorflow as tf
from object_detection.utils import label_map_util
from object_detection.utils import visualization_utils as viz_utils
from object_detection.builders import model_builder
from object_detection.utils import config_util

from tensorflow.python.eager import def_function
def_function.ALLOW_DYNAMIC_VARIABLE_CREATION = True

In [None]:
# Load pipeline config and build a detection model
configs = config_util.get_configs_from_pipeline_file(PIPELINE_CONFIG)
detection_model = model_builder.build(model_config=configs['model'], is_training=False)

# Restore checkpoint
ckpt = tf.compat.v2.train.Checkpoint(model=detection_model)
ckpt.restore(str(CHECKPOINT_PATH / 'ckpt-3')).expect_partial()

@tf.function
def detect_fn(image):
    image, shapes = detection_model.preprocess(image)
    prediction_dict = detection_model.predict(image, shapes)
    detections = detection_model.postprocess(prediction_dict, shapes)
    return detections

In [None]:
import numpy as np
from matplotlib import pyplot as plt
%matplotlib inline

In [None]:
category_index = label_map_util.create_category_index_from_labelmap(LABELMAP)

image_np = np.array(Image.open(str(IMAGE_PATH / 'test' / 'sym17.jpg')))
image = np.asarray(image_np)

input_tensor = tf.convert_to_tensor(image, dtype=tf.float32)
input_tensor = input_tensor[tf.newaxis,...]
detections = detect_fn(input_tensor)

num_detections = int(detections.pop('num_detections'))
detections = {key: value[0, :num_detections].numpy()
              for key, value in detections.items()}
detections['num_detections'] = num_detections

# detection_classes should be ints.
detections['detection_classes'] = detections['detection_classes'].astype(np.int64)

label_id_offset = 1
image_np_with_detections = image_np.copy()

viz_utils.visualize_boxes_and_labels_on_image_array(
            image_np_with_detections,
            detections['detection_boxes'],
            detections['detection_classes']+label_id_offset,
            detections['detection_scores'],
            category_index,
            use_normalized_coordinates=True,
            max_boxes_to_draw=5,
            min_score_thresh=.8,
            agnostic_mode=False)

plt.imshow(cv2.cvtColor(image_np_with_detections, cv2.COLOR_BGR2RGB))
plt.show()