This notebook contains the code to train the custom tflite model against mobile phone image and get it verified with test image.  
The model is used to run in the raspberry pi car to locate the lost phone on the ground.  
Thus, the model is a tflite model which is tuned for IOT device. And the train images are all phone images on the ground.  

# Reference
https://www.tensorflow.org/lite/inference_with_metadata/task_library/object_detector  
https://www.tensorflow.org/lite/tutorials/model_maker_object_detection  
https://www.tensorflow.org/lite/guide/model_maker


# Set Up

In [None]:
!pip3 install tflite-model-maker

# Import Modules

In [1]:
import os
import numpy as np
from PIL import Image

import tensorflow as tf
from tflite_model_maker import object_detector
from tflite_model_maker import model_spec


### Unitilites

In [3]:
# preprocess image, exclude imcompatible image, remove alpha channel
def preprocess_images(input_dir, output_dir):
    if not os.path.exists(output_dir):
      os.mkdir(output_dir)    
    count = 0
    for f in os.listdir(input_dir):
      if f.endswith('jpeg') or f.endswith('jpg'):
        print('pre process file: ', f)
        image = Image.open(os.path.join(input_dir, f), 'r')
        if image.mode == 'RGBA':
          image = image.convert('RGB')
        image.save(os.path.join(output_dir, f))
        count += 1
    print('processed ', count, ' images')


## Prepare Input Files for Training

In [2]:
BASE_PATH = os.path.abspath('./mobile_phone')
train_image_dir = os.path.join(BASE_PATH, 'train')
train_process_image_dir = os.path.join(BASE_PATH, 'train_preprocess')
train_label_dir = os.path.join(BASE_PATH,'train_label')

test_image_dir = os.path.join(BASE_PATH,'test')
test_process_image_dir = os.path.join(BASE_PATH,'test_preprocess')
test_label_dir = os.path.join(BASE_PATH,'test_label')

label_map = {1: 'phone'}

# preprocess_images(train_image_dir, train_process_image_dir)
# preprocess_images(test_image_dir, test_process_image_dir)

### MANUAL ACTION: Use LabelImg to label the input image, the label path is   

$BASE_PATH/train_label  
$BASE_PATH/test_label

LabelImg Usage: https://github.com/tzutalin/labelImg

In [3]:
train_process_image_dir = os.path.join(BASE_PATH, 'train_preprocess')
test_label_dir = os.path.join(BASE_PATH,'test_label')

# Train Model

In [5]:
train_data_loader = object_detector.DataLoader.from_pascal_voc(train_process_image_dir, train_label_dir, label_map=label_map)
spec = model_spec.get('efficientdet_lite4')

test_data_loader = object_detector.DataLoader.from_pascal_voc(test_process_image_dir, test_label_dir, label_map=label_map)
model = object_detector.create(train_data_loader, spec, validation_data=test_data_loader, batch_size=5, epochs=50, train_whole_model=True)

INFO:tensorflow:Cache will be stored in /var/folders/t6/xm8bbqvd10jcg3mxv5fdfq640000gn/T/tmpfubars_c with prefix filename 06dc7edd3f1e6e74d1358b2e7b35f5fc. Cache_prefix is /var/folders/t6/xm8bbqvd10jcg3mxv5fdfq640000gn/T/tmpfubars_c/06dc7edd3f1e6e74d1358b2e7b35f5fc


INFO:tensorflow:Cache will be stored in /var/folders/t6/xm8bbqvd10jcg3mxv5fdfq640000gn/T/tmpfubars_c with prefix filename 06dc7edd3f1e6e74d1358b2e7b35f5fc. Cache_prefix is /var/folders/t6/xm8bbqvd10jcg3mxv5fdfq640000gn/T/tmpfubars_c/06dc7edd3f1e6e74d1358b2e7b35f5fc


INFO:tensorflow:On image 0


INFO:tensorflow:On image 0


INFO:tensorflow:Retraining the models...


INFO:tensorflow:Retraining the models...


Epoch 1/50


2022-03-20 22:34:55.971507: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.


Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50

2022-03-20 23:02:50.368034: W tensorflow/core/framework/op_kernel.cc:1745] OP_REQUIRES failed at save_restore_v2_ops.cc:138 : RESOURCE_EXHAUSTED: /var/folders/t6/xm8bbqvd10jcg3mxv5fdfq640000gn/T/tmpl97081jo/ckpt-30_temp/part-00000-of-00001.data-00000-of-00001.tempstate2227047178749386079; No space left on device


ResourceExhaustedError: /var/folders/t6/xm8bbqvd10jcg3mxv5fdfq640000gn/T/tmpl97081jo/ckpt-30_temp/part-00000-of-00001.data-00000-of-00001.tempstate2227047178749386079; No space left on device [Op:SaveV2]

# Evaluate with Test Data


In [None]:
test_data_loader = object_detector.DataLoader.from_pascal_voc(test_process_image_dir, test_label_dir, label_map=label_map)
model.evaluate(test_data_loader)

# Export Model

In [None]:
model_dir = os.path.join(BASE_PATH, 'model')
model_path = os.path.join(model_dir, 'model.tflite')
model.export(model_dir)


# Evaluate TFLite Model with Test Data

In [None]:
model.evaluate_tflite(model_path, test_data_loader)

# Verify with Test Image

In [None]:
### Utility
# Load the labels into a list
classes = ['???'] * model.model_spec.config.num_classes
label_map = model.model_spec.config.label_map
for label_id, label_name in label_map.as_dict().items():
  classes[label_id-1] = label_name

# Define a list of colors for visualization
COLORS = np.random.randint(0, 255, size=(len(classes), 3), dtype=np.uint8)

def preprocess_image(image_path, input_size):
  """Preprocess the input image to feed to the TFLite model"""
  img = tf.io.read_file(image_path)
  img = tf.io.decode_image(img, channels=3)
  img = tf.image.convert_image_dtype(img, tf.uint8)
  original_image = img
  resized_img = tf.image.resize(img, input_size)
  resized_img = resized_img[tf.newaxis, :]
  resized_img = tf.cast(resized_img, dtype=tf.uint8)
  return resized_img, original_image


def detect_objects(interpreter, image, threshold):
  """Returns a list of detection results, each a dictionary of object info."""

  signature_fn = interpreter.get_signature_runner()

  # Feed the input image to the model
  output = signature_fn(images=image)

  # Get all outputs from the model
  count = int(np.squeeze(output['output_0']))
  scores = np.squeeze(output['output_1'])
  classes = np.squeeze(output['output_2'])
  boxes = np.squeeze(output['output_3'])

  results = []
  for i in range(count):
    if scores[i] >= threshold:
      result = {
        'bounding_box': boxes[i],
        'class_id': classes[i],
        'score': scores[i]
      }
      results.append(result)
  return results


def run_odt_and_draw_results(image_path, interpreter, threshold=0.5):
  """Run object detection on the input image and draw the detection results"""
  # Load the input shape required by the model
  _, input_height, input_width, _ = interpreter.get_input_details()[0]['shape']

  # Load the input image and preprocess it
  preprocessed_image, original_image = preprocess_image(
      image_path,
      (input_height, input_width)
    )

  # Run object detection on the input image
  results = detect_objects(interpreter, preprocessed_image, threshold=threshold)

  # Plot the detection results on the input image
  original_image_np = original_image.numpy().astype(np.uint8)
  for obj in results:
    # Convert the object bounding box from relative coordinates to absolute
    # coordinates based on the original image resolution
    ymin, xmin, ymax, xmax = obj['bounding_box']
    xmin = int(xmin * original_image_np.shape[1])
    xmax = int(xmax * original_image_np.shape[1])
    ymin = int(ymin * original_image_np.shape[0])
    ymax = int(ymax * original_image_np.shape[0])

    # Find the class index of the current object
    class_id = int(obj['class_id'])

    # Draw the bounding box and label on the image
    color = [int(c) for c in COLORS[class_id]]
    cv2.rectangle(original_image_np, (xmin, ymin), (xmax, ymax), color, 2)
    # Make adjustments to make the label visible for all objects
    y = ymin - 15 if ymin - 15 > 15 else ymin + 15
    label = "{}: {:.0f}%".format(classes[class_id], obj['score'] * 100)
    cv2.putText(original_image_np, label, (xmin, y),
        cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

  # Return the final image
  original_uint8 = original_image_np.astype(np.uint8)
  return original_uint8

In [None]:
DETECTION_THRESHOLD = 0.6

test_image_dir = model_dir = os.path.join(BASE_PATH, 'test_real_image')

test_image = os.path.join(test_image_dir, 'test1.jpeg')

# Load the TFLite model
interpreter = tf.lite.Interpreter(model_path=model_path)
interpreter.allocate_tensors()

##### Image 1
# Run inference and draw detection result on the local copy of the original file
detection_result_image = run_odt_and_draw_results(
    test_image,
    interpreter,
    threshold=DETECTION_THRESHOLD
)

# Show the detection result
Image.fromarray(detection_result_image)

In [None]:
##### Image 2
# Run inference and draw detection result on the local copy of the original file
test_image2 = os.path.join(test_image_dir, 'test2.jpeg')

detection_result_image = run_odt_and_draw_results(
    test_image2,
    interpreter,
    threshold=DETECTION_THRESHOLD
)

# Show the detection result
Image.fromarray(detection_result_image)

In [None]:
##### Image 3
# Run inference and draw detection result on the local copy of the original file
test_image3 = os.path.join(test_image_dir, 'test3.jpeg')

detection_result_image = run_odt_and_draw_results(
    test_image3,
    interpreter,
    threshold=DETECTION_THRESHOLD
)

# Show the detection result
Image.fromarray(detection_result_image)