In [None]:
!pip install tflite
!pip install opencv-python
!pip install tf-models-official
!pip install protobuf

In [None]:
import os, sys
import pathlib
import numpy as np
import pickle
import cv2 as cv
import matplotlib.pyplot as plt
import tensorflow as tf

np.set_printoptions(threshold=sys.maxsize)
tf.get_logger().setLevel('ERROR')

Extract the list of operator names with their corresponding operator indexes from schema.fbs file of Tensorflow

In [None]:
import re

# Path to the schema.fbs file
schema_fbs_file = os.path.join(os.getcwd(), "schema.fbs")

# Function to extract builtin operators from schema.fbs
def extract_builtin_operators(schema_fbs_file):
    builtin_operators = {}

    with open(schema_fbs_file, "r") as f:
        content = f.read()

        # Regex pattern to find enum definitions
        enum_pattern = r'enum BuiltinOperator(.*?)}'
        enums = re.findall(enum_pattern, content, re.DOTALL)

        # Extract enum values and names
        for enum in enums:
            enum_values = re.findall(r'(\w+)\s*=\s*(-?\d+)', enum)
            for enum_name, enum_value in enum_values:
                builtin_operators[int(enum_value)] = enum_name

    return builtin_operators

# Extract builtin operators
builtin_operators = extract_builtin_operators(schema_fbs_file)

# Print the dictionary
print("Builtin Operators Dictionary:")
for enum_value, enum_name in builtin_operators.items():
    print(f"{enum_value}: {enum_name}")

In [None]:
from tflite.Model import Model
from tflite.TensorType import TensorType
from tflite.BuiltinOperator import BuiltinOperator

def extract_tensor_values(tensor, buffer):
  buffer_data = buffer.DataAsNumpy()
  tensor_type = tensor.Type()
  data_length = buffer.DataLength()
  shape = tuple(tensor.ShapeAsNumpy())
  # bytes_per_data = 0

  # Convert buffer data to a NumPy array based on the datatype
  if tensor_type == TensorType.FLOAT32:
    buffer_data = np.empty((data_length,), dtype=np.float32)
    for i in range(data_length):
      buffer_data[i] = buffer.Data(i)
    tensor_values = np.frombuffer(buffer_data, dtype=np.float32)
    # bytes_per_data = 4
  elif tensor_type == TensorType.UINT8:
    # print("uint8")
    buffer_data = np.empty((data_length,), dtype=np.uint8)
    for i in range(data_length):
      buffer_data[i] = buffer.Data(i)
    tensor_values = np.frombuffer(buffer_data, dtype=np.uint8)
    # bytes_per_data = 1
  elif tensor_type == TensorType.INT8:
    # print("int8")
    buffer_data = np.empty((data_length,), dtype=np.uint8)
    for i in range(data_length):
      buffer_data[i] = buffer.Data(i)
    # print(buffer_data)
    tensor_values = np.frombuffer(buffer_data.tobytes(), dtype=np.int8)
    # bytes_per_data = 1
  elif tensor_type == TensorType.INT32:
    # print("int32")
    buffer_data = np.empty((data_length,), dtype=np.uint8)
    for i in range(data_length):
      buffer_data[i] = buffer.Data(i)
    # print(buffer_data)
    tensor_values = np.frombuffer(buffer_data.tobytes(), dtype=np.int32)
    # bytes_per_data = 4
  else:
    raise ValueError("Unsupported datatype")

  return tensor_values, shape #, bytes_per_data

def extract_quantization_params(tensor):
  quantization_data = tensor.Quantization()
  return quantization_data.ScaleAsNumpy(), quantization_data.ZeroPointAsNumpy()

In [None]:
def extract_operator_params(model, op_code, tensors):
  # num_bytes_tensor = 0
  # Extract properties of input and output tensors
  op_data = {}
  op_data['op_code'] = builtin_operators[op_code]

  if op_code == BuiltinOperator().QUANTIZE \
  or op_code == BuiltinOperator().LOGISTIC \
  or op_code == BuiltinOperator().SOFTMAX:
    for tensor_idx, tensor in enumerate(tensors):
      scale, zero_point = extract_quantization_params(tensor)
      if tensor_idx == 0:
        op_data['i_scale'] = scale
        op_data['i_zero_point'] = zero_point
      elif tensor_idx == 1:
        op_data['o_scale'] = scale
        op_data['o_zero_point'] = zero_point

  elif op_code == BuiltinOperator().CONV_2D \
  or op_code == BuiltinOperator().DEPTHWISE_CONV_2D:
    for tensor_idx, tensor in enumerate(tensors):
      buffer_idx = tensor.Buffer()
      buffer = model.Buffers(buffer_idx)
      data, shape = extract_tensor_values(tensor, buffer)
      scale, zero_point = extract_quantization_params(tensor)
      if tensor_idx == 0:
        op_data['i_scale'] = scale
        op_data['i_zero_point'] = zero_point
      elif tensor_idx == 1:
        tmp_data = data.reshape(shape)
        reshaped_data = np.zeros(dtype=np.int8, shape=(shape[0], shape[3], shape[2], shape[1]))
        for l in range(shape[0]):
          for k in range(shape[1]):
            for j in range(shape[2]):
              for i in range(shape[3]):
                reshaped_data[l][i][k][j] = tmp_data[l][k][j][i]
        op_data['w_data'] = reshaped_data
        op_data['w_scale'] = scale
        op_data['w_zero_point'] = zero_point
      elif tensor_idx == 2:
        op_data['b_data'] = data.reshape(shape)
        op_data['b_scale'] = scale
        op_data['b_zero_point'] = zero_point
      elif tensor_idx == 3:
        op_data['o_scale'] = scale
        op_data['o_zero_point'] = zero_point

  elif op_code == BuiltinOperator().FULLY_CONNECTED:
    for tensor_idx, tensor in enumerate(tensors):
      buffer_idx = tensor.Buffer()
      buffer = model.Buffers(buffer_idx)
      data, shape = extract_tensor_values(tensor, buffer)
      scale, zero_point = extract_quantization_params(tensor)
      if tensor_idx == 0:
        op_data['i_scale'] = scale
        op_data['i_zero_point'] = zero_point
      elif tensor_idx == 1:
        tmp_data = data.reshape(shape)
        reshaped_data = np.transpose(tmp_data)
        op_data['w_data'] = reshaped_data
        op_data['w_scale'] = scale
        op_data['w_zero_point'] = zero_point
      elif tensor_idx == 2:
        op_data['b_data'] = data.reshape(shape)
        op_data['b_scale'] = scale
        op_data['b_zero_point'] = zero_point
      elif tensor_idx == 3:
        op_data['o_scale'] = scale
        op_data['o_zero_point'] = zero_point

  elif op_code == BuiltinOperator().ADD:
    for tensor_idx, tensor in enumerate(tensors):
      scale, zero_point = extract_quantization_params(tensor)
      if tensor_idx == 0:
        op_data['i0_scale'] = scale
        op_data['i0_zero_point'] = zero_point
      elif tensor_idx == 1:
        op_data['i1_scale'] = scale
        op_data['i1_zero_point'] = zero_point
      elif tensor_idx == 2:
        op_data['o_scale'] = scale
        op_data['o_zero_point'] = zero_point

  elif op_code == BuiltinOperator().MAX_POOL_2D:
    pass

  elif op_code == BuiltinOperator().PACK:
    pass

  elif op_code == BuiltinOperator().RESHAPE:
    for tensor_idx, tensor in enumerate(tensors):
      buffer_idx = tensor.Buffer()
      buffer = model.Buffers(buffer_idx)
      data, shape = extract_tensor_values(tensor, buffer)
      if tensor_idx == 1:
        op_data['new_shape'] = data

  elif op_code == BuiltinOperator().CONCATENATION:
    pass

  elif op_code == BuiltinOperator().DEQUANTIZE:
    for tensor_idx, tensor in enumerate(tensors):
      scale, zero_point = extract_quantization_params(tensor)
      if tensor_idx == 0:
        op_data['i_scale'] = scale
        op_data['i_zero_point'] = zero_point
        
  elif op_code == BuiltinOperator().CUSTOM:
    for tensor_idx, tensor in enumerate(tensors):
      buffer_idx = tensor.Buffer()
      buffer = model.Buffers(buffer_idx)
      data, shape = extract_tensor_values(tensor, buffer)
      if tensor_idx == 2:
        reshaped_data = np.zeros(dtype=np.float32, shape=(shape[0], shape[1]))
        for i in range(shape[0]):
          for j in range(shape[1]):
            reshaped_data[i][j] = data[i*shape[1] + j]
        op_data['anchor'] = reshaped_data
  else:
    raise ValueError("Unsupported op_code")
  
  return op_data

In [None]:
compute_graph = []

def extract_model_params(model_file):
  # num_bytes_op = 0
  # Parse the model's flatbuffers
  with open(model_file, "rb") as f:
    model_buf = f.read()

  model = Model.GetRootAsModel(model_buf, 0)

  # Access operators and buffers
  for subgraph_idx in range(model.SubgraphsLength()):
    subgraph = model.Subgraphs(subgraph_idx)
    # Extract operator details
    for op_idx in range(subgraph.OperatorsLength()):
      op = subgraph.Operators(op_idx)
      op_code_idx = op.OpcodeIndex()
      op_code = model.OperatorCodes(op_code_idx).BuiltinCode()
      # Extract inputs and outputs
      input_tensors = [subgraph.Tensors(op.Inputs(j)) for j in range(0, op.InputsLength())]
      output_tensors = [subgraph.Tensors(op.Outputs(j)) for j in range(0, op.OutputsLength())]
      op_params = extract_operator_params(model, op_code, input_tensors + output_tensors)
      compute_graph.append(op_params)
      # print("number of bytes per operator:", num_bytes_op)
  # return num_bytes_op

In [None]:
model_file = "ssd_mobilenetV2_fpnlite_UINT8_AP24.tflite"
# model_file = "lenet5_int8.tflite"
dataset = "APtest_resized"
# output = "lenet5"
tflite_models_dir = pathlib.Path("../../pretrained_models")
tflite_models_dir.mkdir(exist_ok=True, parents=True)
tflite_model_quant_file = os.path.join(tflite_models_dir, model_file)
# dataset_dir = os.path.join(os.path.join(os.getcwd(), "../../datasets"), dataset)
output_dir = pathlib.Path(os.path.join(os.path.join(os.getcwd(), '../../model_output'), output))
output_dir.mkdir(exist_ok=True, parents=True)

In [None]:
extract_model_params(tflite_model_quant_file)
for node in compute_graph:
  print(node)
# print(f"total number of bytes required: {num_bytes_model}")

In [None]:
# fname = 'lenet5.pkl'
fname = 'autopolls.pkl'
with open(os.path.join(output_dir, fname), 'wb') as handle:
    pickle.dump(compute_graph, handle, protocol=pickle.HIGHEST_PROTOCOL)

### Code for visualizing images with bounding boxes, label and score drawn

In [None]:
def extract_metadata(dir):
  in1 = open(os.path.join(dir, "key"),'rb')
  fKey = pickle.load(in1)
  in1.close()
  return fKey

In [None]:
# Visualize an image with bounding boxes from labels
def visualize_before_prediction(dir, metadata, fre):
  # read in the image
  in1 = cv.imread(os.path.join(dir, (metadata[fre]['file_name']).split('/')[-1]))
  sz1 = np.shape(in1)
  in1 = cv.cvtColor(in1, cv.COLOR_BGR2RGB)
  tbox1 = metadata[fre]['bbox']

  # overlay the bounding box and save
  for ele in tbox1:
    cv.rectangle(in1,(int(sz1[1]*ele[0]),int(sz1[0]*ele[1])),(int(sz1[1]*ele[2]),int(sz1[0]*ele[3])),color=(0,0,255),thickness=4)
  # cv.imwrite(dir + '/test_' + fre + '.jpg',in1)
  plt.imshow(in1)

In [None]:
# Visualize an image with bounding boxes from prediction
def visualize_after_prediction(dir, metadata, fre, bbox):
  # read in the image
  in1 = cv.imread(os.path.join(dir, (metadata[fre]['file_name']).split('/')[-1]))
  sz1 = np.shape(in1)
  in1 = cv.cvtColor(in1, cv.COLOR_BGR2RGB)
  tbox1 = bbox

  # overlay the bounding box and save
  for ele in tbox1:
    cv.rectangle(in1,(int(sz1[1]*ele[0]),int(sz1[0]*ele[1])),(int(sz1[1]*ele[2]),int(sz1[0]*ele[3])),color=(0,0,255),thickness=4)
  # cv.imwrite(dir + '/test_' + str(fre) + '.jpg',in1)
  plt.imshow(in1)

In [None]:
def create_category_index(label_path=None):
  """
  To create dictionary of label map
  Parameters
  ----------
  label_path : string, optional
      Path to labelmap.txt. The default is None.
  Returns
  -------
  category_index : dict
      nested dictionary of labels.
  """
  f = open(label_path)
  category_index = {}
  for i, val in enumerate(f):
      if i != 0:
          val = val[:-1]
          if val != '???':
              category_index.update({(i-1): {'id': (i-1), 'name': val}})
          
  f.close()
  return category_index

In [None]:
def apply_nms(output_dict, num_classes=1, iou_thresh=0.5, score_thresh=0.6):
    """
    Function to apply non-maximum suppression on different classes
    Parameters
    ----------
    output_dict : dictionary
        dictionary containing:
            'detection_boxes' : Bounding boxes coordinates. Shape (N, 4)
            'detection_classes' : Class indices detected. Shape (N)
            'detection_scores' : Shape (N)
            'num_detections' : Total number of detections i.e. N. Shape (1)
    iou_thresh : int, optional
        Intersection Over Union threshold value. The default is 0.5.
    score_thresh : int, optional
        Score threshold value below which to ignore. The default is 0.6.
    Returns
    -------
    output_dict : dictionary
        dictionary containing only scores and IOU greater than threshold.
            'detection_boxes' : Bounding boxes coordinates. Shape (N2, 4)
            'detection_classes' : Class indices detected. Shape (N2)
            'detection_scores' : Shape (N2)
            where N2 is the number of valid predictions after those conditions.
    """
    num_detections = int(output_dict['num_detections'])
    boxes = np.zeros([1, num_detections, num_classes, 4])
    scores = np.zeros([1, num_detections, num_classes])
    # val = [0]*q
    for i in range(num_detections):
        # indices = np.where(classes == output_dict['detection_classes'][i])[0][0]
        boxes[0, i, output_dict['detection_classes'][i], :] = output_dict['detection_boxes'][i]
        scores[0, i, output_dict['detection_classes'][i]] = output_dict['detection_scores'][i]
    nmsd = tf.image.combined_non_max_suppression(boxes=boxes,
                                                 scores=scores,
                                                 max_output_size_per_class=1,
                                                 max_total_size=num_detections,
                                                 iou_threshold=iou_thresh,
                                                 score_threshold=score_thresh,
                                                 pad_per_class=False,
                                                 clip_boxes=False)
    valid = nmsd.valid_detections[0].numpy()
    output_dict = {
                   'detection_boxes' : nmsd.nmsed_boxes[0].numpy()[:valid],
                   'detection_classes' : nmsd.nmsed_classes[0].numpy().astype(np.int64)[:valid],
                   'detection_scores' : nmsd.nmsed_scores[0].numpy()[:valid],
                   }
    return output_dict

In [None]:
from object_detection.utils import visualization_utils as vis_util

# Helper function to run inference on a TFLite model
def run_tflite_model(tflite_file, dataset_dir, output_dir):
  # Initialize the interpreter
  interpreter = tf.lite.Interpreter(model_path=str(tflite_file))
  interpreter.allocate_tensors()

  input_details = interpreter.get_input_details()[0]
  output_details = interpreter.get_output_details()[0]
  # print(input_details['dtype'])

  category_index = create_category_index(label_path=os.path.join(dataset_dir, 'label.txt'))

  metadata = extract_metadata(dataset_dir)

  for i in range(len(metadata)):
    test_image = cv.imread(os.path.join(dataset_dir, (metadata[i]['file_name']).split('/')[-1]))
    test_image_expanded = np.expand_dims(test_image, axis=0).astype(input_details["dtype"])
    interpreter.set_tensor(input_details["index"], test_image_expanded)
    interpreter.invoke()

    output_details = interpreter.get_output_details()

    # outputs are:
    # - scores
    # - bboxes [top, left, bottom, right] normalized 0-1
    # - n_boxes (shape 1)
    # - classes
    assert len(output_details) == 4

    # make fake output with shape (n_boxes, 6)
    # 6 = [class, score, top, left, bottom, right]
    n_boxes = output_details[0]['shape'][1]

    # validate output shape
    assert tuple(output_details[0]['shape']) == (1, n_boxes)
    assert tuple(output_details[1]['shape']) == (1, n_boxes, 4)
    assert tuple(output_details[2]['shape']) == (1,)
    assert tuple(output_details[3]['shape']) == (1, n_boxes)

    scores, bboxes, _, classes = [np.squeeze(interpreter.get_tensor(td['index'])) for td in output_details]

    # Adding quantization factors for SSD models
    scores = scores*output_details[0]['quantization'][0]
    scale, zeropoint = output_details[1]['quantization']
    bboxes = (bboxes - zeropoint) * scale
    scale, zeropoint = output_details[3]['quantization']
    classes = np.round((classes - zeropoint) * scale)

    output_dict = {
                  'detection_boxes' : bboxes,
                  'detection_classes' : classes,
                  'detection_scores' : scores,
                  'num_detections' : n_boxes
                  }

    output_dict['detection_classes'] = output_dict['detection_classes'].astype(np.int64)

    output_dict = apply_nms(output_dict, num_classes=output_details[3]['shape'][0], iou_thresh=0.6, score_thresh=0)

    # Visualization of the results of a detection.
    vis_util.visualize_boxes_and_labels_on_image_array(
    test_image,
    output_dict['detection_boxes'],
    output_dict['detection_classes'],
    output_dict['detection_scores'],
    category_index,
    max_boxes_to_draw=1,
    use_normalized_coordinates=True,
    min_score_thresh=0,
    line_thickness=3)
    
    filename = output_dir + '/' + ((metadata[i]['file_name']).split('/')[-1]).split('.')[0] + 'out.jpg'
    cv.imwrite(filename, test_image)

In [None]:
# run_tflite_model(tflite_model_quant_file, dataset_dir, output_dir)
# visualize_before_prediction(dataset_dir, extract_metadata(dataset_dir), 16)