# Importing all the required libraries

In [1]:
import os
import cv2
import uuid
import time
import tensorflow as tf
import matplotlib.pyplot as plt
import wget

In [2]:
gpus = tf.config.list_physical_devices('GPU')
if gpus: [tf.config.experimental.set_memory_growth(gpu, True) for gpu in gpus]
print("Num GPUs Available: ", len(gpus))

Num GPUs Available:  1


# Collecting the data

In [3]:
labels = ['zero', 'one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine']
num_imgs = 5

In [4]:
img_path = os.path.join('Workspace','Training','Images','Collected_images')

In [5]:
if not os.path.exists(img_path):
    if os.name == 'nt':
        !mkdir {img_path}
for label in labels:
    path = os.path.join(img_path,label)
    if not os.path.exists(path):
        !mkdir {path}

In [None]:
cap = cv2.VideoCapture(0)

for label in labels:
    print('Collecting images for {}'.format(label))
    time.sleep(5)

    for img_num in range(num_imgs):
        print('Collecting images for {}, image num {}'.format(label, img_num))

        ret, frame = cap.read()

        img_name = os.path.join(img_path, label, label + '.' + str(uuid.uuid1()) + '.jpg')
        cv2.imwrite(img_name, frame)
        cv2.imshow('IMG Collection', frame)

        # Wait for user input to continue or break out of inner for loop
        key = cv2.waitKey(0) & 0xFF
        if key == ord('q'):  # Quit the loop entirely
            break
        elif key == ord('c'):  # Continue with the next image
            continue

        time.sleep(3)

    # Wait for user input to continue or break out of outer for loop
    key = cv2.waitKey(0) & 0xFF
    if key == ord('q'):  # Quit the loop entirely
        break
    elif key == ord('c'):  # Continue with the next label
        continue

cap.release()
cv2.destroyAllWindows()

# Annotating the collected images

In [None]:
!pip install labelImg

In [None]:
!labelImg

# Setting up the Model and verifying the installation

In [6]:
model_name = 'my_ssd_mobilenetv2_fpn_640'
pre_trained_model_name = 'ssd_mobilenet_v2_fpnlite_640x640_coco17_tpu-8'
pre_trained_model_url = 'http://download.tensorflow.org/models/object_detection/tf2/20200711/ssd_mobilenet_v2_fpnlite_640x640_coco17_tpu-8.tar.gz'
tf_record_script_name = 'generate_tfrecord.py'
label_map_name = 'label_map.pbtxt'

In [7]:
paths = {
    'WORKSPACE_PATH': os.path.join('Workspace'),
    'SCRIPTS_PATH': os.path.join('scripts'),
    'APIMODEL_PATH': os.path.join('models'),
    'ANNOTATION_PATH': os.path.join('Workspace','Training','Annotations'),
    'IMAGE_PATH': os.path.join('Workspace','Training','Images'),
    'MODEL_PATH': os.path.join('Workspace','Training','models'),
    'PRETRAINED_MODEL_PATH': os.path.join('Workspace','Training','pre_trained_models'),
    'CHECKPOINT_PATH': os.path.join('Workspace','Training','models',model_name), 
    'OUTPUT_PATH': os.path.join('Workspace','Training','models',model_name, 'export'), 
    'TFJS_PATH':os.path.join('Workspace','Training','models',model_name, 'tfjsexport'), 
    'TFLITE_PATH':os.path.join('Workspace','Training','models',model_name, 'tfliteexport'), 
 }

In [8]:
files = {
    'PIPELINE_CONFIG':os.path.join('Workspace','Training','models', model_name, 'pipeline.config'),
    'TF_RECORD_SCRIPT': os.path.join(paths['SCRIPTS_PATH'], tf_record_script_name), 
    'LABELMAP': os.path.join(paths['ANNOTATION_PATH'], label_map_name)
}

In [9]:
for path in paths.values():
    if not os.path.exists(path):
        if os.name == 'nt':
            !mkdir {path}

In [None]:
VERIFICATION_SCRIPT = os.path.join(paths['APIMODEL_PATH'], 'research', 'object_detection', 'builders', 'model_builder_tf2_test.py')
# Verify Installation
!python {VERIFICATION_SCRIPT}

In [11]:
import object_detection

In [12]:
wget.download(pre_trained_model_url)
!move {pre_trained_model_name+'.tar.gz'} {paths['PRETRAINED_MODEL_PATH']}
!cd {paths['PRETRAINED_MODEL_PATH']} && tar -zxvf {pre_trained_model_name+'.tar.gz'}

100% [........................................................................] 20518283 / 20518283        1 file(s) moved.


x ssd_mobilenet_v2_fpnlite_640x640_coco17_tpu-8/
x ssd_mobilenet_v2_fpnlite_640x640_coco17_tpu-8/checkpoint/
x ssd_mobilenet_v2_fpnlite_640x640_coco17_tpu-8/checkpoint/ckpt-0.data-00000-of-00001
x ssd_mobilenet_v2_fpnlite_640x640_coco17_tpu-8/checkpoint/checkpoint
x ssd_mobilenet_v2_fpnlite_640x640_coco17_tpu-8/checkpoint/ckpt-0.index
x ssd_mobilenet_v2_fpnlite_640x640_coco17_tpu-8/pipeline.config
x ssd_mobilenet_v2_fpnlite_640x640_coco17_tpu-8/saved_model/
x ssd_mobilenet_v2_fpnlite_640x640_coco17_tpu-8/saved_model/saved_model.pb
x ssd_mobilenet_v2_fpnlite_640x640_coco17_tpu-8/saved_model/variables/
x ssd_mobilenet_v2_fpnlite_640x640_coco17_tpu-8/saved_model/variables/variables.data-00000-of-00001
x ssd_mobilenet_v2_fpnlite_640x640_coco17_tpu-8/saved_model/variables/variables.index


# Creating label and record files

In [13]:
labels = [{'name':'zero','id':1},
          {'name':'one','id':2},
          {'name':'two','id':3},
          {'name':'three','id':4},
          {'name':'four','id':5},
          {'name':'five','id':6},
          {'name':'six','id':7},
          {'name':'seven','id':8},
          {'name':'eight','id':9},
          {'name':'nine','id':10}]
with open(files['LABELMAP'],'w') as f:
    for label in labels:
        f.write('item { \n')
        f.write('\tname:\'{}\'\n'.format(label['name']))
        f.write('\tid:{}\n'.format(label['id']))
        f.write('}\n')

In [14]:
len(labels)

10

In [15]:
!python {files['TF_RECORD_SCRIPT']} -x {os.path.join(paths['IMAGE_PATH'], 'train_images')} -l {files['LABELMAP']} -o {os.path.join(paths['ANNOTATION_PATH'], 'train.record')} 
!python {files['TF_RECORD_SCRIPT']} -x {os.path.join(paths['IMAGE_PATH'], 'test_images')} -l {files['LABELMAP']} -o {os.path.join(paths['ANNOTATION_PATH'], 'test.record')}

Successfully created the TFRecord file: Workspace\Training\Annotations\train.record
Successfully created the TFRecord file: Workspace\Training\Annotations\test.record


In [16]:
 !copy {os.path.join(paths['PRETRAINED_MODEL_PATH'], pre_trained_model_name, 'pipeline.config')} {os.path.join(paths['CHECKPOINT_PATH'])}

        1 file(s) copied.


# Configuring the model for the task

In [17]:
import tensorflow as tf
from object_detection.utils import config_util
from object_detection.protos import pipeline_pb2
from google.protobuf import text_format

In [18]:
config = config_util.get_configs_from_pipeline_file(files['PIPELINE_CONFIG'])

In [None]:
config

In [20]:
pipeline_config = pipeline_pb2.TrainEvalPipelineConfig()
with tf.io.gfile.GFile(files['PIPELINE_CONFIG'], "r") as f:                                                                                                                                                                                                                     
    proto_str = f.read()                                                                                                                                                                                                                                          
    text_format.Merge(proto_str, pipeline_config) 

In [21]:
pipeline_config.model.ssd.num_classes = len(labels)
pipeline_config.train_config.batch_size = 4
pipeline_config.train_config.fine_tune_checkpoint = os.path.join(paths['PRETRAINED_MODEL_PATH'], pre_trained_model_name, 'checkpoint', 'ckpt-0')
pipeline_config.train_config.fine_tune_checkpoint_type = "detection"
pipeline_config.train_input_reader.label_map_path= files['LABELMAP']
pipeline_config.train_input_reader.tf_record_input_reader.input_path[:] = [os.path.join(paths['ANNOTATION_PATH'], 'train.record')]
pipeline_config.eval_input_reader[0].label_map_path = files['LABELMAP']
pipeline_config.eval_input_reader[0].tf_record_input_reader.input_path[:] = [os.path.join(paths['ANNOTATION_PATH'], 'test.record')]

In [22]:
config_text = text_format.MessageToString(pipeline_config)                                                                                                                                                                                                        
with tf.io.gfile.GFile(files['PIPELINE_CONFIG'], "wb") as f:                                                                                                                                                                                                                     
    f.write(config_text)  

# Training the model

In [23]:
TRAINING_SCRIPT = os.path.join(paths['APIMODEL_PATH'], 'research', 'object_detection', 'model_main_tf2.py')

In [24]:
command = "python {} --model_dir={} --pipeline_config_path={} --num_train_steps=8000".format(TRAINING_SCRIPT, paths['CHECKPOINT_PATH'],files['PIPELINE_CONFIG'])

In [25]:
print(command) # run the command in the miniconda prompt after activating the environment and changing to the working directory

python models\research\object_detection\model_main_tf2.py --model_dir=Workspace\Training\models\my_ssd_mobilenetv2_fpn_640 --pipeline_config_path=Workspace\Training\models\my_ssd_mobilenetv2_fpn_640\pipeline.config --num_train_steps=8000


# Evaluating the model

In [26]:
command = "python {} --model_dir={} --pipeline_config_path={} --checkpoint_dir={}".format(TRAINING_SCRIPT, paths['CHECKPOINT_PATH'],files['PIPELINE_CONFIG'], paths['CHECKPOINT_PATH'])

In [27]:
print(command)

# run the command in the miniconda prompt after activating the environment and changing to the working directory
# after the completion of the training 

python models\research\object_detection\model_main_tf2.py --model_dir=Workspace\Training\models\my_ssd_mobilenetv2_fpn_640 --pipeline_config_path=Workspace\Training\models\my_ssd_mobilenetv2_fpn_640\pipeline.config --checkpoint_dir=Workspace\Training\models\my_ssd_mobilenetv2_fpn_640


# Real-time detections and saving

In [39]:
from object_detection.utils import label_map_util
from object_detection.utils import visualization_utils as viz_utils
from object_detection.builders import model_builder
from object_detection.utils import config_util

In [40]:
# Load pipeline config and build a detection model
configs = config_util.get_configs_from_pipeline_file(files['PIPELINE_CONFIG'])
detection_model = model_builder.build(model_config=configs['model'], is_training=False)

# Restore checkpoint
ckpt = tf.compat.v2.train.Checkpoint(model=detection_model)
ckpt.restore(os.path.join(paths['CHECKPOINT_PATH'], 'ckpt-9')).expect_partial()

@tf.function
def detect_fn(image):
    image, shapes = detection_model.preprocess(image)
    prediction_dict = detection_model.predict(image, shapes)
    detections = detection_model.postprocess(prediction_dict, shapes)
    return detections

In [42]:
import numpy as np
from matplotlib import pyplot as plt
%matplotlib inline

category_index = label_map_util.create_category_index_from_labelmap(files['LABELMAP'])

In [55]:
cap = cv2.VideoCapture(0)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

# Define the codec and create VideoWriter object
fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter('output.avi', fourcc, 20.0, (width, height))

while cap.isOpened(): 
    ret, frame = cap.read()
    image_np = np.array(frame)
    
    input_tensor = tf.convert_to_tensor(np.expand_dims(image_np, 0), dtype=tf.float32)
    detections = detect_fn(input_tensor)
    
    num_detections = int(detections.pop('num_detections'))
    detections = {key: value[0, :num_detections].numpy()
                  for key, value in detections.items()}
    detections['num_detections'] = num_detections

    # detection_classes should be ints.
    detections['detection_classes'] = detections['detection_classes'].astype(np.int64)

    label_id_offset = 1
    image_np_with_detections = image_np.copy()

    viz_utils.visualize_boxes_and_labels_on_image_array(
                image_np_with_detections,
                detections['detection_boxes'],
                detections['detection_classes']+label_id_offset,
                detections['detection_scores'],
                category_index,
                use_normalized_coordinates=True,
                max_boxes_to_draw=10,
                min_score_thresh=.7,
                agnostic_mode=False)

    # Write the video frame to file
    out.write(image_np_with_detections)

    cv2.imshow('object detection',  cv2.resize(image_np_with_detections, (800, 600)))
    
    if cv2.waitKey(10) & 0xFF == ord('q'):
        cap.release()
        out.release()  # Release the video writer
        cv2.destroyAllWindows()
        break


# Resources

for installation and doing project these youtube videos helped me a lot

1. https://www.youtube.com/watch?v=rRwflsS67ow&t=803s&ab_channel=LazyTech

2. https://www.youtube.com/watch?v=yqkISICHH-U&t=11494s&ab_channel=NicholasRenotte

# Further Improvements

1. Collecting more data and doing data agumentation for a robust model

2. Converting it to tensorflow lite version and deploying it on modile device