# Eager Few Shot Object Detection Colab

Welcome to the Eager Few Shot Object Detection Colab --- in this colab we demonstrate fine tuning of a (TF2 friendly) RetinaNet architecture on very few examples of a novel class after initializing from a pre-trained COCO checkpoint.
Training runs in eager mode.

Estimated time to run through this colab (with GPU): < 5 minutes.

## Imports

In [None]:
# git clone --depth 1 https://github.com/tensorflow/models

# %cd  "C:\Users\sky66\Downloads\models\research" (cd to this folder)
# jupyter notebook

# conda install tensorflow=2.5.0=gpu_py39h7dc34a2_0
# conda uninstall tensorflow
# pip install "tensorflow==2.7.0"

# pip uninstall pyparsing -y
# pip install pyparsing==2.4.2

# pip uninstall pyyaml -y
# pip install pyyaml==5.1

# protoc object_detection/protos/*.proto --python_out=.
# python object_detection/packages/tf2/setup.py build
# python object_detection/packages/tf2/setup.py install

# conda list

# 參考資料
# https://www.jianshu.com/p/f8ffbf18c312
# https://stackoverflow.com/questions/68737130/error-while-import-keras-attributeerror-module-tensorflow-compat-v2-interna
# https://github.com/tensorflow/tensorflow/issues/53060
# https://medium.com/ching-i/win10-%E5%AE%89%E8%A3%9D-cuda-cudnn-%E6%95%99%E5%AD%B8-c617b3b76deb
# https://github.com/tensorflow/tensorflow/issues/52988
# https://cppsecrets.com/users/17211410511511610510710997106117109100971144964103109971051084699111109/Python-tqdmsetpostfixstr.php
# https://www.delftstack.com/zh-tw/howto/python/python-print-flush/

In [None]:
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt

import os
import random
import io
import imageio
import glob
import scipy.misc
import numpy as np
from six import BytesIO
from PIL import Image, ImageDraw, ImageFont
from IPython.display import display, Javascript
from IPython.display import Image as IPyImage

from PIL import Image
from PIL import ImageColor
from PIL import ImageDraw
from PIL import ImageFont
from PIL import ImageOps
from IPython.display import display as ds
import math
import time
from datetime import datetime
import cv2
import shutil
from tqdm import tqdm

import tensorflow as tf
from tensorflow.keras.preprocessing.image import load_img, img_to_array, array_to_img

from object_detection.utils import label_map_util
from object_detection.utils import config_util
from object_detection.utils import visualization_utils as viz_utils
from object_detection.builders import model_builder

%matplotlib inline

In [None]:
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        # Currently, memory growth needs to be the same across GPUs
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        logical_gpus = tf.config.list_logical_devices('GPU')
        print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
    except RuntimeError as e:
        # Memory growth must be set before GPUs have been initialized
        print(e)

# Custom Image data

We will start with some toy (literally) data consisting of 6 images of two insect.  Note that the [coco](https://cocodataset.org/#explore) dataset contains a number of animals, but notably, it does *not* contain some datas , so this is a novel class.

In [None]:
# bbox.csv 是由 Get_BBox.ipynb 建立的。
name = [0]
bbox = [0]
with open("open_images_v6_bbox.csv", "r", encoding="utf8") as f:
    f = iter(f)
    info = next(f).strip().split(",")
    ie = 0
    try:
        while ie<4170:
        #while 1:
            ie += 1
            e = next(f).strip().split(",")
            content = [float(i) for i in [e[3], e[1], e[4], e[2]]]
            name.append(e[0])
            bbox.append(content)
    except:
        pass
name.pop(0)
bbox.pop(0)
for e in range(len(name)):
    name[e] = name[e]+".jpg"
None

In [None]:
c = [0]
for e in name:
    if e!=c[-1]:
        c.append(e)
c.pop(0)
print(len(c))
del c

In [None]:
len(name)

In [None]:
image_path = r"C:\Users\sky66\fiftyone\open-images-v6\validation\data"

In [None]:
# for i in name:
#     if cv2.imread(image_path+"\\"+i).shape != (640, 640, 3):
#         print(i)

In [None]:
label_map= {0: "Person"}

In [None]:
num_classes = 1

In [None]:
# Copy from Train_Unet_Model.ipynb
def random_image_process_v1(img_np, box, recorded, ids): 
    # image format cv2 (0~255)
    # box format np.array shape(4,)
    img = img_np
    ymin, xmin, ymax, xmax = box
    # 用隨機數判斷是否旋轉圖片，增強訓練集 (因為訓練用的圖片是正方形，所以可以這樣做。)            
    if not(recorded):
        global now_point
        ctr = np.random.randint(0,6)
        now_point = [ids, ctr]
    else:
        ctr = now_point[1]
    # print(ctr)
    img = color(img, case[ctr])
    
    if ctr==0:
        new_xmin = xmin
        new_xmax = xmax
        new_ymin = ymin
        new_ymax = ymax
    elif ctr==1:
        # 順時針旋轉90度               
        img = cv2.rotate(img, cv2.ROTATE_90_CLOCKWISE)
        new_xmin = 1-ymax
        new_xmax = 1-ymin
        new_ymin = xmin
        new_ymax = xmax
    elif ctr==2:
        # 旋轉180度
        img = cv2.rotate(img, cv2.ROTATE_180)
        new_xmin = 1-xmax
        new_xmax = 1-xmin
        new_ymin = 1-ymax
        new_ymax = 1-ymin      
    elif ctr==3:
        # 順時針旋轉270度(逆時針旋轉90度)
        img = cv2.rotate(img, cv2.ROTATE_90_COUNTERCLOCKWISE)
        new_xmin = ymin
        new_xmax = ymax
        new_ymin = 1-xmax
        new_ymax = 1-xmin
    elif ctr==4:
        # 水平翻轉
        img = cv2.flip(img, 1)
        new_xmin = 1-xmax
        new_xmax = 1-xmin
        new_ymin = ymin
        new_ymax = ymax
    elif ctr==5:
        # 上下翻轉
        img = cv2.flip(img, 0)
        new_xmin = xmin
        new_xmax = xmax
        new_ymin = 1-ymax
        new_ymax = 1-ymin
    #Add to the batch data.
    # 對圖片做亮度、對比度處理，增強訓練集
    # 參考網站 https://www.wongwonggoods.com/python/python_opencv/opencv-modify-contrast/
    ctr = np.random.randint(1,101)
    if ctr>=35: # 65% 機率調整對比度
        brightness = 0
        contrast = np.random.randint(1,70) # - 減少對比度/+ 增加對比度  變動幅度 -50~+50    
        if ctr>=25:
            contrast = contrast*-1
        B = brightness / 255.0
        c = contrast / 255.0 
        k = math.tan((45 + 44 * c) / 180 * math.pi)                
        img = (img - 127.5 * (1 - B)) * k + 127.5 * (1 + B)
        # 所有值必須介於 0~255 之間，超過255 = 255，小於 0 = 0
        img = np.clip(img, 0, 255)  
    ctr = np.random.randint(1,101)    
    if ctr>=35: # 65% 機率調整亮度
        phi = np.random.randint(5,16)/10 # phi>1 減少亮度  phi<1 增加亮度 phi:0.5~1.5
        img = (img/255)**phi
        img = np.clip(img*255, 0, 255)  
    #if ctr==1 : # 50% 機率黑白顛倒
    #    img = 255-img        
    box = np.array([new_ymin, new_xmin, new_ymax, new_xmax], dtype="float16")
    # output img format cv2 GRAY
    return img, box

# Copy from Train_Unet_Model.ipynb
def image_process_default(img_np, box): 
    # image format cv2 (0~255)
    # box format np.array shape(4,)
    img = img_np
    ymin, xmin, ymax, xmax = box
    new_xmin = xmin
    new_xmax = xmax
    new_ymin = ymin
    new_ymax = ymax
    #Add to the batch data.
    # 對圖片做亮度、對比度處理，增強訓練集
    # 參考網站 https://www.wongwonggoods.com/python/python_opencv/opencv-modify-contrast/
    ctr = np.random.randint(1,101)
    if ctr>=35: # 65% 機率調整對比度
        brightness = 0
        contrast = np.random.randint(1,70) # - 減少對比度/+ 增加對比度  變動幅度 -50~+50    
        if ctr>=25:
            contrast = contrast*-1
        B = brightness / 255.0
        c = contrast / 255.0 
        k = math.tan((45 + 44 * c) / 180 * math.pi)                
        img = (img - 127.5 * (1 - B)) * k + 127.5 * (1 + B)
        # 所有值必須介於 0~255 之間，超過255 = 255，小於 0 = 0
        img = np.clip(img, 0, 255)  
    ctr = np.random.randint(1,101)    
    if ctr>=35: # 65% 機率調整亮度
        phi = np.random.randint(5,16)/10 # phi>1 減少亮度  phi<1 增加亮度 phi:0.5~1.5
        img = (img/255)**phi
        img = np.clip(img*255, 0, 255)  
    #if ctr==1 : # 50% 機率黑白顛倒
    #    img = 255-img        
    box = np.array([new_ymin, new_xmin, new_ymax, new_xmax], dtype="float16")
    # output img format cv2 GRAY
    return img, box

In [None]:
# test_path = r"C:\Users\sky66\fiftyone\coco-2017\raw\nlp"
# test_id = 0
# test_img = cv2.imread(test_path+"\\"+name[test_id], cv2.IMREAD_GRAYSCALE)
# process_img, process_box = random_image_process(test_img, bbox[test_id])
# draw_test = draw_bounding_box_on_image(process_img, *process_box, display_str_list=["Drawing Test"])
# ds(draw_test)

In [None]:
case = {0:"RGB",1:"RBG",2:"BGR",3:"BRG",4:"GRB",5:"GBR"}
def color(img_np_, mode):
    img_np = img_np_.copy()
    a = img_np_
    # img_np is BGR cv2 default mode.
    # 6!，R、G、B三種元素，6階、6種可能。
    if  mode=="RGB":
        img_np[:,:,0:1] = a[:,:,2:3] #R
        img_np[:,:,1:2] = a[:,:,1:2] #G
        img_np[:,:,2:3] = a[:,:,0:1] #B
    elif mode=="RBG":
        img_np[:,:,0:1] = a[:,:,2:3] #R
        img_np[:,:,1:2] = a[:,:,0:1] #B
        img_np[:,:,2:3] = a[:,:,1:2] #G
    elif mode=="BGR":
        img_np[:,:,0:1] = a[:,:,0:1] #B
        img_np[:,:,1:2] = a[:,:,1:2] #G
        img_np[:,:,2:3] = a[:,:,2:3] #R
    elif mode=="BRG":
        img_np[:,:,0:1] = a[:,:,0:1] #B
        img_np[:,:,1:2] = a[:,:,2:3] #R
        img_np[:,:,2:3] = a[:,:,1:2] #G
    elif mode=="GRB":
        img_np[:,:,0:1] = a[:,:,1:2] #G
        img_np[:,:,1:2] = a[:,:,2:3] #R
        img_np[:,:,2:3] = a[:,:,0:1] #B
    elif mode=="GBR":
        img_np[:,:,0:1] = a[:,:,1:2] #G
        img_np[:,:,1:2] = a[:,:,0:1] #B
        img_np[:,:,2:3] = a[:,:,2:3] #R
    return img_np

In [None]:
def on_train(point, batch_size, total):
    batch_size -= 1 # Inclube start point  
    if(point+batch_size>total):
        f = point+batch_size-total
        g = point+batch_size+1-f
        return list(range(point,g))+list(range(0, f))
    else:
        g = point+batch_size+1
        return list(range(point,g))

def flag(point, batch_size, data_size):
    #Choose random indice for later picking.
    rnd_ind = on_train(point, batch_size, data_size-1)
    if((point + batch_size) >= data_size):
        point = point + batch_size - data_size
    else:
        point = point + batch_size 
    return  rnd_ind, point   

img_size = [640, 640, 1]
def data_generator(images_path, image_ids, bbox, batch_size, image_process):
    global train_point
    
    data_size = len(image_ids)
    while True:     
        rnd_ind, point = flag(train_point, batch_size, data_size)
        train_point = point
        imgs = []
        boxes = []
        for i in rnd_ind:
            img_id = image_ids[i]
            box = bbox[i]
            #Load/resize images.
            img = cv2.imread(images_path +"\\" + img_id)
            if img_id==now_point[0]:
                img, box = image_process(img, box, True, img_id)
            else:
                img, box = image_process(img, box, False, img_id)
            img = tf.expand_dims(tf.convert_to_tensor(img, dtype=tf.float32) , axis=0)
            box = tf.expand_dims(tf.convert_to_tensor(box, dtype=tf.float32) , axis=0)
            imgs.append(img)  
            boxes.append(box)
        yield imgs, boxes
        
def defalut_data_generator(images_path, image_ids, bbox, batch_size, image_process):
    global train_point
    
    data_size = len(image_ids)
    while True:     
        rnd_ind, point = flag(train_point, batch_size, data_size)
        train_point = point
        imgs = []
        boxes = []
        for i in rnd_ind:
            img_id = image_ids[i]
            box = bbox[i]
            #Load/resize images.
            img = load_img(images_path +"\\" + img_id, target_size=img_size[:-1], color_mode = 'grayscale')
            img = img_to_array(img)
            img, box = image_process(img, box)
            img = cv2.cvtColor(img, cv2.COLOR_GRAY2RGB)
            img = tf.expand_dims(tf.convert_to_tensor(img, dtype=tf.float32) , axis=0)
            box = tf.expand_dims(tf.convert_to_tensor(box, dtype=tf.float32) , axis=0)
            imgs.append(img)  
            boxes.append(box)
        yield imgs, boxes

In [None]:
# train_point = 0
# val_point = 0
# now_point = ["file jpg", 0]

In [None]:
# s = data_generator(image_path, name, bbox, 5, random_image_process_v1)

In [None]:
# i, b = next(s)
# plt.imshow(i[0].numpy()[0]/255)

In [None]:
# b

In [None]:
# i

# Annotate images with bounding boxes




# Prepare data for training

Below we add the class annotations (for simplicity, we assume a single class in this colab; though it should be straightforward to extend this to handle multiple classes).  We also convert everything to the format that the training
loop below expects (e.g., everything converted to tensors, classes converted to one-hot representations, etc.).

In [None]:
zero_indexed_groundtruth_classes = tf.convert_to_tensor(0)
gt_classes = tf.expand_dims(tf.one_hot(zero_indexed_groundtruth_classes, num_classes), axis=0)

In [None]:
# gt_classes

# Create model and restore weights for all but last layer

In this cell we build a single stage detection architecture (RetinaNet) and restore all but the classification layer at the top (which will be automatically randomly initialized).

For simplicity, we have hardcoded a number of things in this colab for the specific RetinaNet architecture at hand (including assuming that the image size will always be 640x640), however it is not difficult to generalize to other model configurations.

In [None]:
# Download the checkpoint and put it into models/research/object_detection/test_data/
# 
# !wget http://download.tensorflow.org/models/object_detection/tf2/20200711/ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.tar.gz
# !tar -xf ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.tar.gz
# !mv ssd_resnet50_v1_fpn_640x640_coco17_tpu-8/checkpoint models/research/object_detection/test_data/

In [None]:
tf.keras.backend.clear_session()
# num_classes = 1
# print('Building model and restoring weights for fine-tuning...', flush=True)
# pipeline_config = 'models/research/object_detection/configs/tf2/ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.config'
# checkpoint_path = 'models/research/object_detection/test_data/checkpoint/ckpt-0'

# pipeline_config = r"C:\Users\sky66\Downloads\models\research\ckpt\ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.config"
pipeline_config = r"C:\Users\sky66\Downloads\models\research\ckpt\pipeline.config"
checkpoint_path = r"C:\Users\sky66\Downloads\models\research\ckpt"

# Load pipeline config and build a detection model.
#
# Since we are working off of a COCO architecture which predicts 90
# class slots by default, we override the `num_classes` field here to be just
# one (for our new rubber ducky class).
configs = config_util.get_configs_from_pipeline_file(pipeline_config)
model_config = configs['model']
model_config.ssd.num_classes = num_classes
# model_config.ssd.freeze_batchnorm = True
detection_model = model_builder.build(
      model_config=model_config, is_training=True)

# Set up object-based checkpoint restore --- RetinaNet has two prediction
# `heads` --- one for classification, the other for box regression.  We will
# restore the box regression head but initialize the classification head
# from scratch (we show the omission below by commenting out the line that
# we would add if we wanted to restore both heads)

# Line : _prediction_heads=detection_model._box_predictor._prediction_heads 
#        will determined : the classification head that we will restore or not.
# 在第一次訓練，因為 num_classes 不同、載入的 ckpt也是不同的 num_classes，所以我們會把它註解起來，不執行它。
# 在第一次訓練過後，我們就部會註解起來，會去執行它了。(因為用新儲存的 checkpoint)
fake_box_predictor = tf.compat.v2.train.Checkpoint(
    _base_tower_layers_for_heads=detection_model._box_predictor._base_tower_layers_for_heads,
    _prediction_heads=detection_model._box_predictor._prediction_heads,
    _box_prediction_head=detection_model._box_predictor._box_prediction_head,
    )



fake_model = tf.compat.v2.train.Checkpoint(
          _feature_extractor=detection_model._feature_extractor,
          _box_predictor=fake_box_predictor)
ckpt = tf.compat.v2.train.Checkpoint(model=fake_model)
ckpt_manager = tf.train.CheckpointManager(ckpt, directory=checkpoint_path, max_to_keep=5)
ckpt.restore(ckpt_manager.latest_checkpoint)
# ckpt.restore(checkpoint_path).expect_partial()

# Run model through a dummy image so that variables are created
image, shapes = detection_model.preprocess(tf.zeros([1, 640, 640, 3]))
prediction_dict = detection_model.predict(image, shapes)
_ = detection_model.postprocess(prediction_dict, shapes)
print('Weights restored!')

In [None]:
# pipeline_config = r"C:\Users\sky66\Downloads\models\research\my_model\ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.config"
# checkpoint_path = r"C:\Users\sky66\Downloads\models\research\my_model\new_model"

# Custom training loop



In [None]:
tf.keras.backend.set_learning_phase(True)

# Select variables in top layers to fine-tune.
trainable_variables = detection_model.trainable_variables
to_fine_tune = []
prefixes_to_train = [
    'WeightSharedConvolutionalBoxPredictor/WeightSharedConvolutionalBoxHead',
    'WeightSharedConvolutionalBoxPredictor/WeightSharedConvolutionalClassHead']
for var in trainable_variables:
    if any([var.name.startswith(prefix) for prefix in prefixes_to_train]):
        to_fine_tune.append(var)

# Set up forward + backward pass for a single train step.
def get_model_train_step_function(model, optimizer, vars_to_fine_tune):
    """Get a tf.function for training step."""
    
    # Use tf.function for a bit of speed.
    # Comment out the tf.function decorator if you want the inside of the
    # function to run eagerly.
    @tf.function
    def train_step_fn(image_tensors,
                      groundtruth_boxes_list,
                      groundtruth_classes_list):
        """A single training iteration.
        
        Args:
            image_tensors: A list of [1, height, width, 3] Tensor of type tf.float32.
              Note that the height and width can vary across images, as they are
              reshaped within this function to be 640x640.
            groundtruth_boxes_list: A list of Tensors of shape [N_i, 4] with type
              tf.float32 representing groundtruth boxes for each image in the batch.
            groundtruth_classes_list: A list of Tensors of shape [N_i, num_classes]
              with type tf.float32 representing groundtruth boxes for each image in
              the batch.
        
        Returns:
            A scalar tensor representing the total loss for the input batch.
        """
        shapes = tf.constant(batch_size*[[640, 640, 3]], dtype=tf.int32)
        model.provide_groundtruth(
            groundtruth_boxes_list=groundtruth_boxes_list,
            groundtruth_classes_list=groundtruth_classes_list)
        with tf.GradientTape() as tape:
            preprocessed_images = tf.concat([detection_model.preprocess(image_tensor)[0] for image_tensor in image_tensors], axis=0)
            prediction_dict = model.predict(preprocessed_images, shapes)
            losses_dict = model.loss(prediction_dict, shapes)
            total_loss = losses_dict['Loss/localization_loss'] + losses_dict['Loss/classification_loss']
            gradients = tape.gradient(total_loss, vars_to_fine_tune)
            optimizer.apply_gradients(zip(gradients, vars_to_fine_tune))
        return total_loss

    return train_step_fn

In [None]:
def custom_train(batch_size, train):
    global now_loss, now_loss_total
    image_tensors, gt_boxes_list = next(train)
    total_loss = train_step_fn(image_tensors, gt_boxes_list, gt_classes_list)   
    now_loss += 1
    now_loss_total += float(total_loss.numpy())

In [None]:
"%.10f"%(1e-02)

# Start fine-tuning !

# Train Random Part Datas (Images : 5)

In [None]:
# # https://developers.google.com/machine-learning/crash-course/reducing-loss/learning-rate
# # 這裡可以先用較少的資料快速測試，觀察不同學習率造成的正確率，及模組進步速度。
# # 學習率變化(手動調整)
# # 如果你是先 "Train ALL Data" 再來這邊優化模組，這裡的學習率應當小於 "Train ALL Data" 的學習率。
# # 你可以先在這邊試試，看看 Total loss，然後再去 "Train ALL Data"。
# 
# learning_rate = 1e-3
# optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
# train_step_fn = get_model_train_step_function(
#     detection_model, optimizer, to_fine_tune)

In [None]:
# # 隨機抽取出 count_train_images 張圖片，待會用來訓練及觀察。
# 
# batch_size = 5
# count_train_images = 25
# total_size = round(count_train_images/batch_size)
# now_loss = 0
# now_loss_total = 0
# train_point = 0
# gt_classes_list = [gt_classes for i in range(batch_size)]
# epochs = 100
# history = []
# 
# part_name = []
# part_bbox = []
# for i in range(count_train_images):
#     part_rnd_int = np.random.randint(0,len(name))
#     part_name.append(name[part_rnd_int])
#     part_bbox.append(bbox[part_rnd_int])
#     
# part_train = data_generator(image_path, part_name, part_bbox, batch_size)    

In [None]:
# try:
#     for epoch in range(1,epochs+1):
#         now_loss = 0
#         now_loss_total = 0
#         pbar = tqdm(total = total_size, ncols = 100)
#         for one in range(total_size):
#             custom_train(batch_size, part_train)
#             now_loss_value = now_loss_total/now_loss
#             pbar.set_postfix_str("epoch %d, Total loss = %.7f"%(epoch, now_loss_value))
#             pbar.update(1)
#         pbar.close() 
#         history.append(now_loss_value)
# except:  
#     pbar.close()
# print("Finish")    

In [None]:
# text = "optimizer : %s \nlearning_rate : %s  batch_size : %s  count_train_images : %s"%(optimizer, learning_rate, batch_size, count_train_images)
# plt.plot([i for i in range(1,len(history)+1)], history)
# plt.title(text)
# plt.xlabel("Epochs")
# plt.ylabel("Total Loss")
# plt.show()
# plt.clf()

# Train ALL Datas

In [None]:
history = {"Date":[], "Loss":[]}

In [None]:
with open("ckpt/history.txt", "r", encoding="utf8") as txt:
    for e in txt:
        content = e.split("#")
        history["Loss"] = history["Loss"] + [float(y) for y in content[1].strip().split(",")]
        history["Date"].append(content[0].split(" Total Loss")[0])

In [None]:
(1e-3)/(1561/5)

In [None]:
# learning_rate = 3e-06
# optimizer = tf.keras.optimizers.SGD(learning_rate=learning_rate, momentum=0.9)
# train_step_fn = get_model_train_step_function(
#     detection_model, optimizer, to_fine_tune)

In [None]:
# learning_rate = 1e-06
# optimizer = tf.keras.optimizers.RMSprop(learning_rate=learning_rate)
# train_step_fn = get_model_train_step_function(
#     detection_model, optimizer, to_fine_tune)

In [None]:
# 學習率變化(手動調整)
# (1e-3)/(4170/5)，1e-3 是 batch size : 5 、 images : 5 的 Adam 優化器最適學習率
learning_rate = 5e-06
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
train_step_fn = get_model_train_step_function(
    detection_model, optimizer, to_fine_tune)

In [None]:
train_point = 0
val_point = 0
now_point = ["file jpg", 0]
batch_size = 5
total_size = round(len(name)/batch_size)
now_loss = 0
now_loss_total = 0
epochs = 20
train = data_generator(image_path, name, bbox, batch_size, random_image_process_v1)
# train = default_data_generator(image_path, name, bbox, batch_size, image_process_default)
gt_classes_list = [gt_classes for i in range(batch_size)]

In [None]:
try:
    for epoch in range(1, epochs+1):
        now_loss = 0
        now_loss_total = 0
        with tqdm(total=total_size, ncols=100, ascii=' =') as pbar:
            for one in range(total_size):
                custom_train(batch_size, train)
                now_loss_value = now_loss_total/now_loss
                pbar.set_postfix_str("epoch %d, Total loss = %.7f"%(epoch, now_loss_value))
                pbar.update(1) 
        history["Loss"].append(now_loss_value)
        history["Date"].append(str(datetime.now()).split(".")[0]+",")
except:    
    pbar.close()
print("Finish")      

In [None]:
learning_rate_ = "1e-03, 3e-04, 3e-05, 3e-06"
optimizer_ = "SGD、Adam、RMSprop"

In [None]:
text = "Date : %s \noptimizer : %s \nlearning_rate : %s  batch_size : %s  images : %s"%(str(datetime.now()), optimizer_, learning_rate_, batch_size, len(name))
plt.plot([i for i in range(1,len(history["Loss"])+1)], history["Loss"])
plt.title(text)
plt.xlabel("Epochs")
plt.ylabel("Total Loss")
plt.show()
plt.clf()

In [None]:
with open("ckpt/history.txt", "w", encoding="utf8") as txt:
    for e,r in zip(history["Date"], history["Loss"]):
        txt.write(str(e)+" Total Loss#"+str(r)+"\n")

In [None]:
# https://github.com/tensorflow/models/issues/8862#issuecomment-920330306
detection_model
new_pipeline_proto = config_util.create_pipeline_proto_from_configs(configs)
config_util.save_pipeline_config(new_pipeline_proto, 'ckpt')

exported_ckpt = tf.compat.v2.train.Checkpoint(model=detection_model)
ckpt_manager = tf.train.CheckpointManager(exported_ckpt, directory="ckpt", max_to_keep=5)
print('Done fine-tuning!')

ckpt_manager.save()
print('Checkpoint saved!')

In [None]:
print('Done fine-tuning!')

In [None]:
r"C:\Users\sky66\Downloads\models\research\ckpt\pipeline.config"

In [None]:
# https://github.com/tensorflow/models/blob/master/research/object_detection/exporter_main_v2.py
# python C:\Users\sky66\Downloads\models\research\object_detection\exporter_main_v2.py --pipeline_config_path C:\Users\sky66\Downloads\models\research\ckpt\pipeline.config --trained_checkpoint_dir C:\Users\sky66\Downloads\models\research\ckpt --output_directory new_model

# Load test images and run inference with new model!

In [None]:
# Again, uncomment this decorator if you want to run inference eagerly
@tf.function(input_signature=[tf.TensorSpec(shape=[None,640,640,3], dtype=tf.float32)])
def detect(input_tensor):
    """Run detection on an input image.
    
    Args:
      input_tensor: A [1, height, width, 3] Tensor of type tf.float32.
        Note that height and width can be anything since the image will be
        immediately resized according to the needs of the model within this
        function.
    
    Returns:
      A dict containing 3 Tensors (`detection_boxes`, `detection_classes`,
        and `detection_scores`).
    """
    preprocessed_image, shapes = detection_model.preprocess(input_tensor)
    prediction_dict = detection_model.predict(preprocessed_image, shapes)
    return detection_model.postprocess(prediction_dict, shapes)

# Note that the first frame will trigger tracing of the tf.function, which will
# take some time, after which inference should be fast.    

def img_to_tensor(img_np):
    img_tf = tf.convert_to_tensor(img_np, dtype=tf.float32 )
    img_tf = tf.expand_dims(img_tf, axis=0) 
    return img_tf 

def img_proccess(img):
    img = Image.open(img)
    
    detection_img = img.resize((640,640))
    detection_img = np.array(detection_img)
    
    img = np.array(img)
    origin_img = np.zeros(img.shape)
    
    np.copyto(origin_img ,img)
    
    detection_img = img_to_tensor(detection_img)
    
    return {'origin_img_np':origin_img , 'detection_img_tensor':detection_img}

def draw_bounding_box_on_image(image,
                                 ymin,
                                 xmin,
                                 ymax,
                                 xmax,
                                 color = list(ImageColor.colormap.values())[0],
                                 font = ImageFont.load_default(),
                                 thickness=4,
                                 display_str_list=()):
    # image format RGB np.array (0~255)
    """Adds a bounding box to an image."""
    image = Image.fromarray(np.uint8(image)).convert("RGB")
    draw = ImageDraw.Draw(image)
    im_width, im_height = image.size
    (left, right, top, bottom) = (xmin * im_width, xmax * im_width,
                                  ymin * im_height, ymax * im_height)
    draw.line([(left, top), (left, bottom), (right, bottom), (right, top),
               (left, top)],
              width=thickness,
              fill=color)
    
    # If the total height of the display strings added to the top of the bounding
    # box exceeds the top of the image, stack the strings below the bounding box
    # instead of above.
    display_str_heights = [font.getsize(ds)[1] for ds in display_str_list]
    # Each display_str has a top and bottom margin of 0.05x.
    total_display_str_height = (1 + 2 * 0.05) * sum(display_str_heights)
    
    if top > total_display_str_height:
        text_bottom = top
    else:
        text_bottom = top + total_display_str_height
    # Reverse list and print from bottom to top.
    for display_str in display_str_list[::-1]:
        text_width, text_height = font.getsize(display_str)
        margin = np.ceil(0.05 * text_height)
        draw.rectangle([(left, text_bottom - text_height - 2 * margin),
                        (left + text_width, text_bottom)],
                       fill=color)
        draw.text((left + margin, text_bottom - text_height - margin),
                  display_str,
                  fill="black",
                  font=font)
        text_bottom -= text_height - 2 * margin
    return image  
  
def draw_boxes_s(
         image, 
         class_names, 
         boxes,
         scores,
         score_limit,
         max_box,
         display = True):
    # image format RGB np.array (0~255)
    now_image_np = np.zeros((image.shape))
    np.copyto(now_image_np , image)
    
    class_names = class_names.tolist()
    boxes = boxes.tolist()
    scores = scores.tolist()
    
    new_class_names = []
    new_boxes = []
    new_scores = []
    
    if max_box>len(boxes):
        max_box = len(boxes)
    
    for e in range(max_box):
        big = 0
        for e in range(len(boxes)):
            if scores[e]>scores[big]:
                big = e
        new_class_names.append(class_names.pop(big))
        new_boxes.append(boxes.pop(big))
        new_scores.append(scores.pop(big))     
        
    class_names = new_class_names
    boxes = new_boxes
    scores = new_scores
    
    for i in range(0,len(boxes)):
        if(float(scores[i])>score_limit):
            box , score , class_name = boxes[i] , scores[i] , class_names[i]
            # print(box)
            class_name = "Person"
            colors = list(ImageColor.colormap.values())
                  
            font = ImageFont.load_default()
            display_str = (class_name+":"+str(score))
            color = colors[hash(class_name) % len(colors)]
            
            image_pil = Image.fromarray(np.uint8(now_image_np)).convert("RGB")
          
            image_pil = draw_bounding_box_on_image(
                image_pil,
                *box,
                list(ImageColor.colormap.values())[5],
                ImageFont.load_default(),
                display_str_list=["Person"+":"+str(score)])
            np.copyto(now_image_np, np.array(image_pil) )
    if display:
        ds( Image.fromarray(np.uint8( np.array(now_image_np) )).convert("RGB") )    
    return now_image_np

In [None]:
# test_path = r"C:\Users\sky66\fiftyone\coco-2017\raw\nlp"
# test_id = 0
# test_img = cv2.imread(test_path+"\\"+name[test_id], cv2.IMREAD_GRAYSCALE)
# process_img, process_box = random_image_process(test_img, bbox[test_id])
# draw_test = draw_bounding_box_on_image(process_img, *process_box, display_str_list=["Drawing Test"])
# ds(draw_test)

In [None]:
# tf.keras.backend.clear_session()

In [None]:
# # ymin, xmin, ymax, xmax 範圍 0~1 (0%~100%)
# # x座標是與圖片的左側相比
# # y座標是與圖片的上側相比
# # ymin, xmin 是圖片的左上角
# # ymax, xmax 是圖片的右下角

# test_img = r"C:\Users\sky66\fiftyone\coco-2017\raw\train2017\000000000436.jpg"
# origin_img = cv2.cvtColor(cv2.imread(test_img), cv2.COLOR_BGR2RGB)
#                           ymin, xmin, ymax, xmax
# ymin, xmin, ymax, xmax = [0.02, 0.02,  0.7, 0.9]
# draw_test = draw_bounding_box_on_image(origin_img, ymin, xmin, ymax, xmax , display_str_list=["Drawing Test"])
# ds(draw_test)

In [None]:
test_img = r"aa.png"
predict_img = cv2.cvtColor(cv2.resize(img_to_array(load_img(test_img, color_mode = 'grayscale')), (640,640)), cv2.COLOR_GRAY2RGB)
predict_img = tf.expand_dims(tf.convert_to_tensor(predict_img, dtype=tf.float32), axis=0)
origin_img = cv2.cvtColor(cv2.imread(test_img), cv2.COLOR_BGR2RGB)
detection_result = detect(predict_img)
min_predict_score = 0.2
max_view_box = 3
predict_result = draw_boxes_s(
                     origin_img,
                     detection_result['detection_classes'][0].numpy(),
                     detection_result['detection_boxes'][0].numpy(),
                     detection_result["detection_scores"][0].numpy(),
                     min_predict_score,
                     max_view_box)

In [None]:
test_img = r"C:\Users\sky66\Downloads\tumblr_464a8a1770ac6b2c09c80232911673b7_0306a7f3_1280.jpg"
predict_img = cv2.cvtColor(cv2.resize(cv2.imread(test_img, cv2.IMREAD_GRAYSCALE), (640,640)), cv2.COLOR_GRAY2RGB)
predict_img = tf.expand_dims(tf.convert_to_tensor(predict_img, dtype=tf.float32), axis=0)
origin_img = cv2.cvtColor(cv2.imread(test_img), cv2.COLOR_BGR2RGB)
detection_result = detect(predict_img)
min_predict_score = 0.2
max_view_box = 3

In [None]:
predict_result = draw_boxes_s(
                     origin_img,
                     detection_result['detection_classes'][0].numpy(),
                     detection_result['detection_boxes'][0].numpy(),
                     detection_result["detection_scores"][0].numpy(),
                     min_predict_score,
                     max_view_box)

In [None]:
# tf.saved_model.save(
#     detection_model , 'detection_model',
#     signatures={
#       'detection': detect.get_concrete_function()
#     })

In [None]:
# new_model = tf.saved_model.load('detection_model')

In [None]:
# new_detection_model = new_model.signatures['detection']