In [None]:
!git clone https://github.com/satojkovic/DeepLogo2.git

In [None]:
import os
os.chdir('DeepLogo2')

In [None]:
import os
import pathlib

# Clone the tensorflow models repository if it doesn't already exist
if "models" in pathlib.Path.cwd().parts:
  while "models" in pathlib.Path.cwd().parts:
    os.chdir('..')
elif not pathlib.Path('models').exists():
  !git clone --depth 1 https://github.com/tensorflow/models

In [None]:
# Install the Object Detection API
%%bash
cd models/research/
protoc object_detection/protos/*.proto --python_out=.
cp object_detection/packages/tf2/setup.py .
python -m pip install .

In [None]:
import matplotlib
import matplotlib.pyplot as plt

import os
import random
import io
import imageio
import glob
import scipy.misc
import numpy as np
from six import BytesIO
from PIL import Image, ImageDraw, ImageFont
from IPython.display import display, Javascript
from IPython.display import Image as IPyImage
from tqdm import tqdm

import tensorflow as tf

from object_detection.utils import label_map_util
from object_detection.utils import config_util
from object_detection.utils import visualization_utils as viz_utils
from object_detection.utils import colab_utils
from object_detection.builders import model_builder

%matplotlib inline

In [None]:
def load_image_into_numpy_array(path):
  img_data = tf.io.gfile.GFile(path, 'rb').read()
  image = Image.open(BytesIO(img_data))
  im_width, im_height = image.size
  return np.array(image.getdata()).reshape((im_height, im_width, 3)).astype(np.uint8)

In [None]:
def convert_csv_into_numpy_array(csv, im_width, im_height):
  # xmin,ymin,xmax,ymax
  xmin, ymin, xmax, ymax = list(map(int, csv))
  xmin /= im_width
  ymin /= im_height
  xmax /= im_width
  ymax /= im_height
  return np.array([[xmin, ymin, xmax, ymax]], dtype=np.float32)

In [None]:
def plot_detections(image_np, boxes, classes, scores, category_index, figsize=(12, 16), image_name=None):
  image_np_with_annotations = image_np.copy()
  viz_utils.visualize_boxes_and_labels_on_image_array(
      image_np_with_annotations,
      boxes, classes, scores, category_index,
      use_normalized_coordinates=True,
      min_score_thresh=0.8
  )
  if image_name:
    plt.imsave(image_name, image_np_with_annotations)
  else:
    plt.imshow(image_np_with_annotations)

In [None]:
%%bash
wget http://image.ntua.gr/iva/datasets/flickr_logos/flickr_logos_27_dataset.tar.gz
tar zxvf flickr_logos_27_dataset.tar.gz
cd flickr_logos_27_dataset
tar zxvf flickr_logos_27_dataset_images.tar.gz
cd ..

In [None]:
%%bash
python preproc_annot.py

In [None]:
train_annot_csv = 'flickr_logos_27_dataset/flickr_logos_27_dataset_training_set_annotation_cropped.txt'
train_img_dir = 'flickr_logos_27_dataset/flickr_logos_27_dataset_images'
train_images_np = []
gt_boxes = []
gt_class_ids = []
csvs = np.loadtxt(train_annot_csv, dtype=str, delimiter=',')
for csv in tqdm(csvs):
  img_fname = csv[0]
  with tf.io.gfile.GFile(os.path.join(train_img_dir, img_fname), 'rb') as fid:
    encoded_jpg = fid.read()
  encoded_jpg_io = io.BytesIO(encoded_jpg)
  image = Image.open(encoded_jpg_io)
  width, height = image.size

  train_images_np.append(load_image_into_numpy_array(os.path.join(train_img_dir, img_fname)))
  gt_boxes.append(convert_csv_into_numpy_array(csv[1:-1], width, height))
  class_id = int(csv[-1])
  gt_class_ids.append(class_id)

plt.rcParams['axes.grid'] = False
plt.rcParams['figure.figsize'] = [12, 5]

for idx, train_image_np in enumerate(train_images_np[:6]):
  plt.subplot(2, 3, idx + 1)
  plt.imshow(train_image_np)
plt.show()

In [None]:
np.save('train_images.npy', train_images_np)
np.save('gt_boxes.npy', gt_boxes)
np.save('gt_class_ids.npy', gt_class_ids)

In [None]:
class_names = [
  "Adidas", "Apple", "BMW", "Citroen", "Cocacola",
  "DHL", "Fedex", "Ferrari", "Ford", "Google", 
  "HP", "Heineken", "Intel", "McDonalds", "Mini", 
  "Nbc", "Nike", "Pepsi", "Porsche", "Puma", 
  "RedBull", "Sprite", "Starbucks", "Texaco", "Unicef",
  "Vodafone", "Yahoo"
]

In [None]:
def create_category_index(class_names):
  category_index = {}
  for i, class_name in enumerate(class_names):
    category_index[i+1] = {'id': i + 1, 'name': class_name}
  return category_index

In [None]:
category_index = create_category_index(class_names)
num_classes = len(category_index)

In [None]:
import pprint
pprint.pprint(num_classes)
pprint.pprint(category_index)

In [None]:
# Convert class label to one hot.
# Convert everything to tensors.
# The `label_id_offset` here shifts all classes by a certain number of indices;
# we do this here so that the model receives one-hot labels where non-background
# classes start counting at the zeroth index.
# This is ordinarily just handled automatically in our training binaries, but we need to reproduce it here.
label_id_offset = 1
train_image_tensors = []
gt_box_tensors = []
gt_classes_one_hot_tensors = tf.one_hot(gt_class_ids, depth=num_classes)
for (train_image_np, gt_box_np) in zip(train_images_np, gt_boxes):
  train_image_tensors.append(
      tf.expand_dims(tf.convert_to_tensor(train_image_np, dtype=tf.float32), axis=0)
  )
  gt_box_tensors.append(
      tf.convert_to_tensor(gt_box_np, dtype=tf.float32)
  )
print('train_image_tensors[0]:', tf.shape(train_image_tensors[0]))
print('gt_box_tensors[0]:', tf.shape(gt_box_tensors[0]))
print('gt_classes_one_hot_tensors[0]:', tf.shape(gt_classes_one_hot_tensors[0]))
print('Done prepping data')

In [None]:
dummy_scores = np.array([1.0], dtype=np.float32)

plt.figure(figsize=(12, 15))
for idx in range(5):
  plt.subplot(2, 3, idx + 1)
  plot_detections(
      train_images_np[idx], gt_boxes[idx],
      np.ones(shape=[gt_boxes[idx].shape[0]], dtype=np.int32),
      dummy_scores, category_index
  )
plt.show()

In [None]:
# Download the checkpoint and put it into models/research/object_detection/test_data/

!wget http://download.tensorflow.org/models/object_detection/tf2/20200711/ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.tar.gz
!tar -xf ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.tar.gz
!mv ssd_resnet50_v1_fpn_640x640_coco17_tpu-8/checkpoint models/research/object_detection/test_data/

In [None]:
tf.keras.backend.clear_session()

print('Building model and restoring weights for fine-tuning.', flush=True)
pipeline_config = 'models/research/object_detection/configs/tf2/ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.config'
checkpoint_path = 'models/research/object_detection/test_data/checkpoint/ckpt-0'

# Load pipeling config and build a detection model
#
# Since we are working off of a COCO architecture which predicts 90
# class slots by default, we override the `num_classes` field here to be just one
configs = config_util.get_configs_from_pipeline_file(pipeline_config)
model_config = configs['model']
model_config.ssd.num_classes = num_classes
model_config.ssd.freeze_batchnorm = True
detection_model = model_builder.build(
    model_config=model_config, is_training=True
)

# Define checkpoints for desired layers
#
# We will now isolate the layers of `detection_model` that you wish to reuse so that you can
# restore the weights to just those layers.
# 1. Define checkpoints for the box predictor
# 2. Define checkpoints for the model, which will point to this box predictor checkpoint as well as the feature extraction layers
# 3. Restore the checkpoint for desired layers
fake_box_predictor = tf.compat.v2.train.Checkpoint(
    _base_tower_layers_for_heads=detection_model._box_predictor._base_tower_layers_for_heads,
    #_prediction_heads=detection_model._box_predictor._prediction_heads,
    _box_prediction_heads=detection_model._box_predictor._box_prediction_head,
)
fake_model = tf.compat.v2.train.Checkpoint(
    _feature_extractor=detection_model._feature_extractor,
    _box_predictor=fake_box_predictor
)
ckpt = tf.compat.v2.train.Checkpoint(model=fake_model)
ckpt.restore(checkpoint_path).expect_partial()

# Run model through a dummy image so that variables are created
image, shapes = detection_model.preprocess(tf.zeros([1, 640, 640, 3]))
prediction_dict = detection_model.predict(image, shapes)
_ = detection_model.postprocess(prediction_dict, shapes)
print('Weights restored')

In [None]:
# `detection_model` is SSDMetaArch object
import pprint
pprint.pprint(detection_model)
pprint.pprint(vars(detection_model))

In [None]:
pprint.pprint(detection_model._box_predictor)
pprint.pprint(vars(detection_model._box_predictor))

In [None]:
detection_model._box_predictor

## Training loop

In [None]:
tf.keras.backend.set_learning_phase(True)

batch_size = 4
learning_rate = 0.01
num_batches = 100

In [None]:
# Inspect the layers of `detection_model`
for i, v in enumerate(detection_model.trainable_variables):
  pprint.pprint('i: {}, name: {}, shape: {}, dtype: {}'.format(i, v.name, v.shape, v.dtype))

In [None]:
# Select the prediction layers
trainable_variables = detection_model.trainable_variables
to_fine_tune = []
prefixes_to_train = [
  'WeightSharedConvolutionalBoxPredictor/WeightSharedConvolutionalBoxHead',
  'WeightSharedConvolutionalBoxPredictor/WeightSharedConvolutionalClassHead']
for var in trainable_variables:
  if any([var.name.startswith(prefix) for prefix in prefixes_to_train]):
    to_fine_tune.append(var)

In [None]:
# Set up for fwd + bwd pass for a single train step
def get_model_train_step_function(model, optimizer, vars_to_fine_tune):
  @tf.function
  def train_step_fn(image_tensors,
                    groundtruth_boxes_list,
                    groundtruth_classes_list):
    shapes = tf.constant(batch_size * [[640, 640, 3]], dtype=tf.int32)
    model.provide_groundtruth(
        groundtruth_boxes_list=groundtruth_boxes_list,
        groundtruth_classes_list=groundtruth_classes_list
    )
    with tf.GradientTape() as tape:
      preprocessed_images = tf.concat(
          [detection_model.preprocess(image_tensor)[0]
           for image_tensor in image_tensors], axis=0
      )
      prediction_dict = model.predict(preprocessed_images, shapes)
      losses_dict = model.loss(prediction_dict, shapes)
      total_loss = losses_dict['Loss/localization_loss'] + losses_dict['Loss/classification_loss']
      gradients = tape.gradient(total_loss, vars_to_fine_tune)
      optimizer.apply_gradients(zip(gradients, vars_to_fine_tune))
    return total_loss

  return train_step_fn

In [None]:
optimizer = tf.keras.optimizers.SGD(learning_rate=learning_rate, momentum=0.9)
train_step_fn = get_model_train_step_function(detection_model, optimizer, to_fine_tune)

print('Start fine-tuning', flush=True)
for idx in range(num_batches):
  all_keys = list(range(len(train_images_np)))
  random.shuffle(all_keys)
  example_keys = all_keys[:batch_size]

  gt_boxes_list = [gt_box_tensors[key] for key in example_keys]
  gt_classes_list = [gt_classes_one_hot_tensors[key] for key in example_keys]
  image_tensors = [train_image_tensors[key] for key in example_keys]

  total_loss = train_step_fn(image_tensors, gt_boxes_list, gt_classes_list)

  if idx % 10 == 0:
    print('batch ' + str(idx) + ' of ' + str(num_batches) + ', loss=' + str(total_loss.numpy()), flush=True)

print('Done fine-tuning')

## Run Test

In [None]:
test_image_dir = 'models/research/object_detection/test_images/ducky/test'
test_images_np = []
for i in range(1, 50):
  image_path = os.path.join(test_image_dir, 'out' + str(i) + '.jpg')
  test_images_np.append(np.expand_dims(
      load_image_into_numpy_array(image_path), axis=0)
  )

@tf.function
def detect(input_tensor):
  preprocessed_image, shapes = detection_model.preprocess(input_tensor)
  prediction_dict = detection_model.predict(preprocessed_image, shapes)
  return detection_model.postprocess(prediction_dict, shapes)

# Note that the first frame will trigger tracing of the tf.function, which will
# take some time, after which inference should be fast.
label_id_offset = 1
for i in range(len(test_images_np)):
  input_tensor = tf.convert_to_tensor(test_images_np[i], dtype=np.float32)
  detections = detect(input_tensor)

  plot_detections(
      test_images_np[i][0], detections['detection_boxes'][0].numpy(),
      detections['detection_classes'][0].numpy().astype(np.uint32)
      + label_id_offset,
      detections['detection_scores'][0].numpy(),
      category_index, figsize=(15, 20), image_name='gif_frame_' + ('%02d' % i) + ".jpg"
  )

In [None]:
!ls

In [None]:
imageio.plugins.freeimage.download()

anim_file = 'duckies_test.gif'

filenames = glob.glob('gif_frame_*.jpg')
filenames = sorted(filenames)
last = -1
images = []
for filename in filenames:
  image = imageio.imread(filename)
  images.append(image)

imageio.mimsave(anim_file, images, 'GIF-FI', fps=5)
display(IPyImage(open(anim_file, 'rb').read()))