# Object Detection API Demo

<table align="left"><td>
  <a target="_blank"  href="https://colab.sandbox.google.com/github/tensorflow/models/blob/master/research/object_detection/colab_tutorials/object_detection_tutorial.ipynb">
    <img src="https://www.tensorflow.org/images/colab_logo_32px.png" />Run in Google Colab
  </a>
</td><td>
  <a target="_blank"  href="https://github.com/tensorflow/models/blob/master/research/object_detection/colab_tutorials/object_detection_tutorial.ipynb">
    <img width=32px src="https://www.tensorflow.org/images/GitHub-Mark-32px.png" />View source on GitHub</a>
</td></table>

Welcome to the [Object Detection API](https://github.com/tensorflow/models/tree/master/research/object_detection). This notebook will walk you step by step through the process of using a pre-trained model to detect objects in an image.

> **Important**: This tutorial is to help you through the first step towards using [Object Detection API](https://github.com/tensorflow/models/tree/master/research/object_detection) to build models. If you just just need an off the shelf model that does the job, see the [TFHub object detection example](https://colab.sandbox.google.com/github/tensorflow/hub/blob/master/examples/colab/object_detection.ipynb).

# Setup

Important: If you're running on a local machine, be sure to follow the [installation instructions](https://github.com/tensorflow/models/blob/master/research/object_detection/g3doc/tf2.md). This notebook includes only what's necessary to run in Colab.

### Install

In [None]:
!pip install -U --pre tensorflow=="2.*"
!pip install tf_slim

Collecting tensorflow==2.*
  Downloading tensorflow-2.13.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (524.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m524.1/524.1 MB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
Collecting keras<2.14,>=2.13.1 (from tensorflow==2.*)
  Downloading keras-2.13.1-py3-none-any.whl (1.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.7/1.7 MB[0m [31m34.7 MB/s[0m eta [36m0:00:00[0m
Collecting tensorboard<2.14,>=2.13 (from tensorflow==2.*)
  Downloading tensorboard-2.13.0-py3-none-any.whl (5.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.6/5.6 MB[0m [31m35.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting tensorflow-estimator<2.14,>=2.13.0 (from tensorflow==2.*)
  Downloading tensorflow_estimator-2.13.0-py2.py3-none-any.whl (440 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m440.8/440.8 kB[0m [31m40.5 MB/s[0m eta [36m0:00:00[0m
Collecting typing-ex

Make sure you have `pycocotools` installed

In [None]:
!pip install pycocotools



Get `tensorflow/models` or `cd` to parent directory of the repository.

In [None]:
import os
import pathlib
import sys


if "models" in pathlib.Path.cwd().parts:
  while "models" in pathlib.Path.cwd().parts:
    os.chdir('..')
elif not pathlib.Path('models').exists():
  !git clone --depth 1 https://github.com/tensorflow/models

Cloning into 'models'...
remote: Enumerating objects: 3916, done.[K
remote: Counting objects: 100% (3916/3916), done.[K
remote: Compressing objects: 100% (3054/3054), done.[K
remote: Total 3916 (delta 1117), reused 1766 (delta 809), pack-reused 0[K
Receiving objects: 100% (3916/3916), 49.66 MiB | 16.30 MiB/s, done.
Resolving deltas: 100% (1117/1117), done.
Updating files: 100% (3545/3545), done.


Compile protobufs and install the object_detection package

In [None]:
%%bash
cd models/research/
protoc object_detection/protos/*.proto --python_out=.

In [None]:
%%capture
if "research" not in os.getcwd():
  !git clone --depth 1 https://github.com/tensorflow/models
  os.chdir('models/research')
  sys.path.append('.')
  ! cd models/research
  ! pip install .
else:
  !git pull

### Imports

In [None]:
import numpy as np
import os
import six.moves.urllib as urllib
import tarfile
import tensorflow as tf
import zipfile

from collections import defaultdict
from io import StringIO
from matplotlib import pyplot as plt
from PIL import Image
from IPython.display import display

Import the object detection module.

In [None]:
from object_detection.utils import ops as utils_ops
from object_detection.utils import label_map_util
from object_detection.utils import visualization_utils as vis_util

Patches:

In [None]:
# patch tf1 into `utils.ops`
utils_ops.tf = tf.compat.v1

# Patch the location of gfile
tf.gfile = tf.io.gfile

# Model preparation

## Variables

Any model exported using the `export_inference_graph.py` tool can be loaded here simply by changing the path.

By default we use an "SSD with Mobilenet" model here. See the [detection model zoo](https://github.com/tensorflow/models/blob/master/research/object_detection/g3doc/detection_model_zoo.md) for a list of other models that can be run out-of-the-box with varying speeds and accuracies.

## Loader

In [None]:
def load_model(model_name):
  ## URL updated
  base_url = 'http://download.tensorflow.org/models/object_detection/tf2/20200711/'
  model_file = model_name + '.tar.gz'
  model_dir = tf.keras.utils.get_file(
    fname=model_name,
    origin=base_url + model_file,
    untar=True)

  model_dir = pathlib.Path(model_dir)/"saved_model"

  model = tf.saved_model.load(str(model_dir))

  return model

## Loading label map
Label maps map indices to category names, so that when our convolution network predicts `5`, we know that this corresponds to `airplane`.  Here we use internal utility functions, but anything that returns a dictionary mapping integers to appropriate string labels would be fine

In [None]:

os.getcwd()

'/content/models/research'

In [None]:
# converting Json to protobuf

import json

from google.protobuf.json_format import Parse, ParseDict

# d = {
#     "first": "a string",
#     "second": True,
#     "third": 123456789
# }

# message = ParseDict(d, Thing())
# # or
# message = Parse(json.dumps(d), Thing())

# print(message.first)  # "a string"
# print(message.second) # True
# print(message.third)  # 123456789

In [None]:
# List of the strings that is used to add correct label for each box.
# modify the path depending on os.getcwd()
PATH_TO_LABELS = 'object_detection/data/mscoco_label_map.pbtxt'
category_index = label_map_util.create_category_index_from_labelmap(PATH_TO_LABELS, use_display_name=True)

For the sake of simplicity we will test on 2 images:

In [None]:
# If you want to test the code with your images, just add path to the images to the TEST_IMAGE_PATHS.

## add different test images
PATH_TO_TEST_IMAGES_DIR = pathlib.Path('object_detection/test_images')
types = ('*.jpg', '*.jpeg', '*.png')
files_grabbed = []
for files in types:
    files_grabbed.extend(PATH_TO_TEST_IMAGES_DIR.glob(files))
TEST_IMAGE_PATHS = sorted(files_grabbed) #list(PATH_TO_TEST_IMAGES_DIR.glob("*.jpg"))
TEST_IMAGE_PATHS

[PosixPath('object_detection/test_images/image1.jpg'),
 PosixPath('object_detection/test_images/image2.jpg'),
 PosixPath('object_detection/test_images/image3.jpg')]

# Detection

Load an object detection model:

In [None]:
def load_model(model_name):
  base_url = 'http://download.tensorflow.org/models/object_detection/tf2/20200711/'
  model_file = model_name + '.tar.gz'
  model_dir = tf.keras.utils.get_file(
    fname=model_name,
    origin=base_url + model_file,
    untar=True)

  model_dir = pathlib.Path(model_dir)/"saved_model"

  model = tf.saved_model.load(str(model_dir))

  return model

In [None]:
# Download the saved model and put it into models/research/object_detection/test_data/
# !wget http://download.tensorflow.org/models/object_detection/tf2/20200711/efficientdet_d0_coco17_tpu-32.tar.gz
# !tar -xf efficientdet_d0_coco17_tpu-32.tar.gz
# !mv efficientdet_d0_coco17_tpu-32.tar.gz/ models/research/object_detection/test_data/

In [None]:
model_name = 'efficientdet_d0_coco17_tpu-32'
#http://download.tensorflow.org/models/object_detection/tf2/20200711/ssd_mobilenet_v1_fpn_640x640_coco17_tpu-8.tar.gz
detection_model = load_model(model_name)


Downloading data from http://download.tensorflow.org/models/object_detection/tf2/20200711/efficientdet_d0_coco17_tpu-32.tar.gz




In [None]:
import sys
sys.getsizeof(detection_model)

48

In [None]:

import sys

def get_size(obj, seen=None):
    """Recursively finds size of objects"""
    size = sys.getsizeof(obj)
    if seen is None:
        seen = set()
    obj_id = id(obj)
    if obj_id in seen:
        return 0
    # Important mark as seen *before* entering recursion to gracefully handle
    # self-referential objects
    seen.add(obj_id)
    if isinstance(obj, dict):
        size += sum([get_size(v, seen) for v in obj.values()])
        size += sum([get_size(k, seen) for k in obj.keys()])
    elif hasattr(obj, '__dict__'):
        size += get_size(obj.__dict__, seen)
    elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes, bytearray)):
        size += sum([get_size(i, seen) for i in obj])
    return size

In [None]:
kilo=get_size(detection_model)
kilo/1024/1024

20.532941818237305

Check the model's input signature, it expects a batch of 3-color images of type uint8:

In [None]:
print(detection_model.signatures['serving_default'].inputs)

[<tf.Tensor 'input_tensor:0' shape=(1, None, None, 3) dtype=uint8>, <tf.Tensor 'unknown:0' shape=<unknown> dtype=resource>, <tf.Tensor 'unknown_0:0' shape=<unknown> dtype=resource>, <tf.Tensor 'unknown_1:0' shape=<unknown> dtype=resource>, <tf.Tensor 'unknown_2:0' shape=<unknown> dtype=resource>, <tf.Tensor 'unknown_3:0' shape=<unknown> dtype=resource>, <tf.Tensor 'unknown_4:0' shape=<unknown> dtype=resource>, <tf.Tensor 'unknown_5:0' shape=<unknown> dtype=resource>, <tf.Tensor 'unknown_6:0' shape=<unknown> dtype=resource>, <tf.Tensor 'unknown_7:0' shape=<unknown> dtype=resource>, <tf.Tensor 'unknown_8:0' shape=<unknown> dtype=resource>, <tf.Tensor 'unknown_9:0' shape=<unknown> dtype=resource>, <tf.Tensor 'unknown_10:0' shape=<unknown> dtype=resource>, <tf.Tensor 'unknown_11:0' shape=<unknown> dtype=resource>, <tf.Tensor 'unknown_12:0' shape=<unknown> dtype=resource>, <tf.Tensor 'unknown_13:0' shape=<unknown> dtype=resource>, <tf.Tensor 'unknown_14:0' shape=<unknown> dtype=resource>, <

And returns several outputs:

In [None]:
detection_model.signatures['serving_default'].output_dtypes

{'detection_classes': tf.float32,
 'detection_scores': tf.float32,
 'detection_boxes': tf.float32,
 'num_detections': tf.float32,
 'raw_detection_boxes': tf.float32,
 'detection_multiclass_scores': tf.float32,
 'raw_detection_scores': tf.float32,
 'detection_anchor_indices': tf.float32}

In [None]:
detection_model.signatures['serving_default'].output_shapes

{'detection_classes': TensorShape([1, 100]),
 'detection_scores': TensorShape([1, 100]),
 'detection_boxes': TensorShape([1, 100, 4]),
 'num_detections': TensorShape([1]),
 'raw_detection_boxes': TensorShape([1, 49104, 4]),
 'detection_multiclass_scores': TensorShape([1, 100, 90]),
 'raw_detection_scores': TensorShape([1, 49104, 90]),
 'detection_anchor_indices': TensorShape([1, 100])}

detection_classes and detection_scores are needed to identify identified objects

Add a wrapper function to call the model, and cleanup the outputs:

In [None]:
import six
import collections

def get_classes_name_and_scores(
        boxes,
        classes,
        scores,
        category_index,
        model_name = model_name,
        max_boxes_to_draw=20,
        min_score_thresh=.5, # change the threshold per need
        food_list = [52, 53, 55, 56, 57, 58, 59, 60, 61]
        ): # returns bigger than 70% precision
    display_str = collections.defaultdict(dict)
    for i in range(boxes.shape[0]):
        if scores[i] > min_score_thresh: #scores is None or
            if classes[i] in six.viewkeys(category_index):
                if classes[i] in food_list: # limit to food items:
                  display_str[i]['name'] = category_index[classes[i]]['name']
                  display_str[i]['score'] = '{}%'.format(int(100 * scores[i]))
                  display_str[i]['model'] = model_name
    # sort the nested dictionary so that it lists by the highest confidence scores
    # images = list(result.keys())
    # sorted(images, key=lambda x: (display_str[x]['score'], display_str[x]['date']))

    return dict(display_str)

In [None]:
def run_inference_for_single_image(model, image):
  image = np.asarray(image)
  # The input needs to be a tensor, convert it using `tf.convert_to_tensor`.
  input_tensor = tf.convert_to_tensor(image)
  # The model expects a batch of images, so add an axis with `tf.newaxis`.
  input_tensor = input_tensor[tf.newaxis,...]

  # Run inference
  model_fn = model.signatures['serving_default']
  output_dict = model_fn(input_tensor)

  # All outputs are batches tensors.
  # Convert to numpy arrays, and take index [0] to remove the batch dimension.
  # We're only interested in the first num_detections.
  num_detections = int(output_dict.pop('num_detections'))
  output_dict = {key:value[0, :num_detections].numpy()
                 for key,value in output_dict.items()}
  output_dict['num_detections'] = num_detections

  # detection_classes should be ints.
  output_dict['detection_classes'] = output_dict['detection_classes'].astype(np.int64)

  # Handle models with masks:
  if 'detection_masks' in output_dict:
    # Reframe the the bbox mask to the image size.
    detection_masks_reframed = utils_ops.reframe_box_masks_to_image_masks(
              output_dict['detection_masks'], output_dict['detection_boxes'],
               image.shape[0], image.shape[1])
    detection_masks_reframed = tf.cast(detection_masks_reframed > 0.5,
                                       tf.uint8)
    output_dict['detection_masks_reframed'] = detection_masks_reframed.numpy()

  return output_dict

Run it on each test image and show the results:

In [None]:
def show_inference(model, image_path):
  # the array based representation of the image will be used later in order to prepare the
  # result image with boxes and labels on it.
  image_np = np.array(Image.open(image_path))
  # Actual detection.
  output_dict = run_inference_for_single_image(model, image_np)
  # Visualization of the results of a detection.
  vis_util.visualize_boxes_and_labels_on_image_array(
      image_np,
      output_dict['detection_boxes'],
      output_dict['detection_classes'],
      output_dict['detection_scores'],
      category_index,
      instance_masks=output_dict.get('detection_masks_reframed', None),
      use_normalized_coordinates=True,
      line_thickness=8)
  print(get_classes_name_and_scores(
      output_dict['detection_boxes'],
      output_dict['detection_classes'],
      output_dict['detection_scores'],
      category_index))

  display(Image.fromarray(image_np))

In [None]:
# run an inference on multiple images at one time and save results in dictionary format


def run_inference(TEST_IMAGE_PATHS, model=detection_model):
  result = {}

  for image_path in TEST_IMAGE_PATHS:
    image_np = np.array(Image.open(image_path))
    output_dict = run_inference_for_single_image(detection_model, image_np)
    filename = os.path.basename(image_path).split(".")[0]
    result[filename] = get_classes_name_and_scores(
        output_dict['detection_boxes'],
        output_dict['detection_classes'],
        output_dict['detection_scores'],
        category_index)
  return result



In [None]:
%%capture
if "research" not in os.getcwd():
  !git clone --depth 1 https://github.com/tensorflow/models
  os.chdir('models/research')
  sys.path.append('.')
  ! cd models/research
  ! mkdir outputs
else:
  ! cd models/research
  ! mkdir outputs

In [None]:
import json
def save_results(result, dir="/content/models/research/outputs"):
  if os.path.exists(dir):
    image_names = '_'.join(list(result.keys()))
    with open(
      os.path.join(dir, "{}_result.txt".format(image_names)),
      'w',
      encoding='utf-8') as f:
      # new_dict = {}
      # for val in result.values():
      #   if not val == {}:
      #     new_dict.update(val.values())
        json.dump(result, f, ensure_ascii=False)
  else:
    raise ValueError('No output folder is created.')



In [None]:
#result = run_inference(TEST_IMAGE_PATHS, model=detection_model)
save_results(result)

NameError: ignored

In [None]:
# flatten and reduce to only food items?

In [None]:

result
# results only include non-empty dict values see outputs

In [None]:
print(updated_result)

In [None]:
#[ val2['name'] for val in result.values() for val2 in val.values() ]
{ val.values() for val in result.values() if not val == {} }

In [None]:
#output_dict['detection_boxes'].shape

In [None]:
for image_path in TEST_IMAGE_PATHS:
  show_inference(detection_model, image_path)


# 2.0 Run inference on API endpoints with FastAPI

In [None]:
!pip install colabcode
!pip install fastapi
!pip install python-multipart

In [None]:
from colabcode import ColabCode
from fastapi import FastAPI

API deployment process

1. Load images and save the files
2. Run an inference
3. Save the result dictionary as JSON
4. Output into the link

#### 1. Load images

app.py

In [None]:
cc_images = ColabCode(port=12000, code=False)

In [None]:
from fastapi import File, UploadFile
from typing import List

app = FastAPI()

@app.post("/upload")
def upload(files: List[UploadFile] = File(...)):
    for file in files:
        try:
            with open(file.filename, 'wb') as f:
                while contents := file.file.read(640 * 640):
                    f.write(contents)
        except Exception:
            return {"message": "There was an error uploading the file(s)"}
        finally:
            file.file.close()

    return {"message": f"Successfuly uploaded {[file.filename for file in files]}"}

In [None]:
cc_images.run_app(app=app)

test.py

In [None]:
# import requests
# import glob

# paths = glob.glob("images/*", recursive=True) # returns a list of file paths
# images = [('files', open(p, 'rb')) for p in paths] # or paths[:3] to select the first 3 images
# # change the port per necessary
# url = 'https://f125-34-29-119-31.ngrok-free.app/upload'#'http://127.0.0.1:12000/upload'
# resp = requests.post(url=url, files=images)
# print(resp.json())

In [None]:
cc = ColabCode(port=8000, code=False)

#### create an architecutre to receive images on server and receive images on client sides

In [None]:
import os
import logging
from io import BytesIO
from typing import List
# from warnings import filterwarnings, simplefilter
# import ssl

# import torch
from fastapi import FastAPI, Request, File, UploadFile
from fastapi.responses import JSONResponse
from PIL import Image

app = FastAPI()

# model_name = 'efficientdet_d0_coco17_tpu-32'
# detection_model = load_model(model_name)

# output the results in dictionaries
result = run_inference(TEST_IMAGE_PATHS, model=detection_model)

model = None

@app.on_event("startup")
def load_model_to_server():
    global model
    model_name = 'efficientdet_d0_coco17_tpu-32'
    #http://download.tensorflow.org/models/object_detection/tf2/20200711/ssd_mobilenet_v1_fpn_640x640_coco17_tpu-8.tar.gz
    detection_model = load_model(model_name)

@app.post("/upload")
def upload(files: List[UploadFile] = File(...)):
    for file in files:
        try:
            with open(file.filename, 'wb') as f:
                while contents := file.file.read(640 * 640):
                    f.write(contents)
        except Exception:
            return {"message": "There was an error uploading the file(s)"}
        finally:
            file.file.close()

    return {"message": f"Successfuly uploaded {[file.filename for file in files]}"}

@app.post("/object_detect")
async def image_detect(request: Request,
                       input_file: List[UploadFile]):

    if request.method == "POST":
        # json_result = []
        try:
            # image = Image.open(BytesIO(await input_file.read()))
            # ob = ObjectDetector(image, detection_model)
            # json_results = ob.object_detect()
            result = run_inference(input_file, model=load_model_to_server())

            #logger.info("detection results", json_result)

            return JSONResponse({"data": result,
                                 "message": "object detected successfully",
                                 "errors": None},
                                status_code=200)
        except Exception as error:
            #logger.error(["process failed", error])
            return JSONResponse({"message": "object detection failed",
                                 "errors": "error"},
                                status_code=400)

In [None]:
cc.run_app(app=app)

#### Curl command


```
curl -X 'POST' \
  'https://91d8-34-143-217-78.ngrok-free.app/object_detect' \
  -H 'accept: application/json' \
  -d ''
```



#### Request URL

https://91d8-34-143-217-78.ngrok-free.app/object_detect


In [None]:
import requests
import json
resp = requests.post(
    'https://httpbin.org/post',
    json={'website': 'datagy.io'},
)

print(resp)

In [None]:
from fastapi import FastAPI, File, UploadFile
from typing_extensions import Annotated

app = FastAPI()


@app.post("/files/")
async def create_file(file: Annotated[bytes, File()]):
    return {"file_size": len(file)}


@app.post("/uploadfile/")
async def create_upload_file(file: UploadFile):
    return {"filename": file.filename}

In [None]:
cc.run_app(app=app)

Object Detection && Embeddings

1. Set up architecture to receive images and apply those (test.py) to the model
2. Collect inference results per upload by user
3. Prepare embeddings of the results so that there is a embedding model behind the scene to improve the accuracy of object detection

Object Detection (later)
1. Feed datasets coming in from users' upload and train the model to increase the accuracy

Text Recognition
1. package item recognition

Receipt Recognition
1. Scan receipts OCR