In [None]:
import os
import cv2
import xml.etree.ElementTree as ET
import numpy as np
import tensorflow as tf
from object_detection.utils import dataset_util
from object_detection.utils import label_map_util
from object_detection.utils import visualization_utils as vis_util


In [None]:
IMAGE_DIR = 'path/to/images'
ANNOTATION_DIR = 'path/to/annotations'
LABEL_MAP_FILE = 'path/to/label_map.pbtxt'
OUTPUT_DIR = 'path/to/output'


In [None]:
NUM_CLASSES = 2
label_map = label_map_util.load_labelmap(LABEL_MAP_FILE)
categories = label_map_util.convert_label_map_to_categories(label_map, max_num_classes=NUM_CLASSES, use_display_name=True)
category_index = label_map_util.create_category_index(categories)
def create_model(num_classes):
    base_model = tf.keras.applications.InceptionResNetV2(include_top=False, weights='imagenet')

    feature_map = base_model.output

    rpn = tf.keras.layers.Conv2D(512, (3, 3), activation='relu', padding='same', name='rpn_conv')(feature_map)

    rpn_class = tf.keras.layers.Conv2D(2, (1, 1), activation='softmax', name='rpn_class')(rpn)

    rpn_bbox = tf.keras.layers.Conv2D(4, (1, 1), activation='linear', name='rpn_bbox')(rpn)

    proposals = tf.keras.layers.ProposalLayer()([rpn_class, rpn_bbox])

    rois = tf.keras.layers.ROIAlign((7, 7), 1)([proposals, feature_map])

    fc1 = tf.keras.layers.Dense(1024, activation='relu')(tf.keras.layers.Flatten()(rois))

    class_logits = tf.keras.layers.Dense(num_classes, name='class_logits')(fc1)

    bboxes = tf.keras.layers.Dense(num_classes * 4, activation='linear', name='bboxes')(fc1)

    model = tf.keras.Model(inputs=base_model.input, outputs=[class_logits, bboxes])
    
    return model


In [None]:
model = create_model(NUM_CLASSES)
model.compile(optimizer=tf.keras.optimizers.SGD(lr=0.001, momentum=0.9), loss=[tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), tf.keras.losses.Huber()], metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
def load_image(image_path):
    image = cv2.imread(image_path)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
return image

def load_annotations(annotation_path):
tree = ET.parse(annotation_path)
root = tree.getroot()

bboxes = []
labels = []

for obj in root.findall('object'):
    label = obj.find('name').text
    bbox = obj.find('bndbox')
    xmin = int(bbox.find('xmin').text)
    ymin = int(bbox.find('ymin').text)
    xmax = int(bbox.find('xmax').text)
    ymax = int(bbox.find('ymax').text)

    bboxes.append([ymin, xmin, ymax, xmax])
    labels.append(label)

return np.array(bboxes), np.array(labels)



In [None]:

def create_tfrecord(output_file, image_dir, annotation_dir, label_map, category_index):
    writer = tf.io.TFRecordWriter(output_file)

    for idx, image_file in enumerate(os.listdir(image_dir)):
        image_path = os.path.join(image_dir, image_file)
        annotation_path = os.path.join(annotation_dir, os.path.splitext(image_file)[0] + '.xml')

        image = load_image(image_path)
        bboxes, labels = load_annotations(annotation_path)

        encoded_image_data = tf.io.encode_jpeg(image)

        feature_dict = {
            'image/height': dataset_util.int64_feature(image.shape[0]),
            'image/width': dataset_util.int64_feature(image.shape[1]),
            'image/filename': dataset_util.bytes_feature(image_file.encode('utf8')),
            'image/source_id': dataset_util.bytes_feature(image_file.encode('utf8')),
            'image/encoded': dataset_util.bytes_feature(encoded_image_data.numpy()),
            'image/format': dataset_util.bytes_feature('jpeg'.encode('utf8')),
            'image/object/bbox/xmin': dataset_util.float_list_feature(bboxes[:, 1] / image.shape[1]),
            'image/object/bbox/xmax': dataset_util.float_list_feature(bboxes[:, 3] / image.shape[1]),
            'image/object/bbox/ymin': dataset_util.float_list_feature(bboxes[:, 0] / image.shape[0]),
            'image/object/bbox/ymax': dataset_util.float_list_feature(bboxes[:, 2] / image.shape[0]),
            'image/object/class/text': dataset_util.bytes_list_feature(labels),
            'image/object/class/label': dataset_util.int64_list_feature([category_index[label]['id'] for label in labels]),
        }

        example = tf.train.Example(features=tf.train.Features(feature=feature_dict))

        writer.write(example.SerializeToString())

    writer.close()


In [None]:
create_tfrecord(os.path.join(OUTPUT_DIR, 'train.tfrecord'), IMAGE_DIR, ANNOTATION_DIR, label_map, category_index)
train_dataset = tf.data.TFRecordDataset(os.path.join(OUTPUT_DIR, 'train.tfrecord'))
train_dataset = train_dataset.map(lambda x: dataset_util.parse_single_example(x, {'image/encoded': tf.io.FixedLenFeature([], tf.string), 'image/object/bbox/xmin': tf.io.VarLenFeature(tf.float32), 'image/object/bbox/xmax': tf.io.VarLenFeature(tf.float32), 'image/object/bbox/ymin': tf.io.VarLenFeature(tf.float32), 'image/object/bbox/ymax': tf.io.VarLenFeature(tf.float32), 'image/object/class/text': tf.io.VarLenFeature(tf.string)}))
train_dataset = train_dataset.map(lambda x: (tf.io.decode_jtrain_dataset = train_dataset.map(lambda x: (tf.io.decode_jpeg(x['image/encoded'], channels=3), tf.sparse.to_dense(x['image/object/bbox/xmin']), tf.sparse.to_dense(x['image/object/bbox/ymin']), tf.sparse.to_dense(x['image/object/bbox/xmax']), tf.sparse.to_dense(x['image/object/bbox/ymax']), x['image/object/class/text']))
train_dataset = train_dataset.map(lambda image, xmin, ymin, xmax, ymax, class_text: (image, {'bbox': tf.stack([ymin, xmin, ymax, xmax], axis=-1), 'class_text': tf.sparse.to_dense(class_text)}))
train_dataset = train_dataset.batch(BATCH_SIZE)
train_dataset = train_dataset.prefetch(tf.data.experimental.AUTOTUNE)

model = create_model(num_classes=len(label_map))

optimizer = tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE)

model.compile(optimizer=optimizer, loss=object_detection.losses.WeightedSigmoidFocal(num_classes=len(label_map)))

callbacks = [
tf.keras.callbacks.ModelCheckpoint(os.path.join(CHECKPOINT_DIR, 'rcnn_{epoch:02d}.h5')),
tf.keras.callbacks.TensorBoard(log_dir=LOG_DIR),
tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=PATIENCE),
]

model.fit(train_dataset, epochs=EPOCHS, callbacks=callbacks)


In [None]:

def predict(image_path, model, category_index):
    image = load_image(image_path)

    boxes, scores, classes, _ = model.predict(np.expand_dims(image, axis=0))

    boxes = boxes[0]
    scores = scores[0]
    classes = classes[0]

    detections = []

    for box, score, cls in zip(boxes, scores, classes):
        if score > CONFIDENCE_THRESHOLD:
            ymin, xmin, ymax, xmax = box
            label = category_index[cls]['name']
            detections.append({'label': label, 'score': score, 'box': [xmin, ymin, xmax, ymax]})

    return detections

for image_file in os.listdir(NEW_IMAGE_DIR):
    image_path = os.path.join(NEW_IMAGE_DIR, image_file)
    detections = predict(image_path, model, category_index)

    image = load_image(image_path)

    for detection in detections:
        label = detection['label']
        score = detection['score']
        box = detection['box']

        ymin, xmin, ymax, xmax = box

        cv2.rectangle(image, (int(xmin), int(ymin)), (int(xmax), int(ymax)), (0, 255, 0), 2)
        cv2.putText(image, '{}: {:.2f}'.format(label, score), (int(xmin), int(ymin)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)

    cv2.imshow('image', image)
    cv2.waitKey(0)
    cv2.destroyAllWindows()


In [None]:
# #alternate

# import os
# import numpy as np
# import cv2
# import tensorflow as tf
# from object_detection.utils import visualization_utils as viz_utils
# from object_detection.utils import label_map_util
# from object_detection.builders import model_builder

# IMAGE_DIR = '/path/to/image/directory'
# ANNOTATION_PATH = '/path/to/annotation/file'
# CHECKPOINT_PATH = '/path/to/checkpoint'
# LABEL_MAP_PATH = '/path/to/label/map'
# CONFIDENCE_THRESHOLD = 0.5

# # Load the label map
# label_map = label_map_util.load_labelmap(LABEL_MAP_PATH)
# categories = label_map_util.convert_label_map_to_categories(label_map, max_num_classes=2, use_display_name=True)
# category_index = label_map_util.create_category_index(categories)

# # Load the model
# pipeline_config = model_builder.create_pipeline_proto_from_configs_file(os.path.join(CHECKPOINT_PATH, 'pipeline.config'))
# model_config = pipeline_config.model
# detection_model = model_builder.build(model_config=model_config, is_training=False)

# # Restore checkpoint
# ckpt = tf.compat.v2.train.Checkpoint(model=detection_model)
# ckpt.restore(os.path.join(CHECKPOINT_PATH, 'checkpoint', 'ckpt-0')).expect_partial()

# @tf.function
# def detect_fn(image):
#     image, shapes = detection_model.preprocess(image)
#     prediction_dict = detection_model.predict(image, shapes)
#     detections = detection_model.postprocess(prediction_dict, shapes)
#     return detections, prediction_dict, tf.reshape(shapes, [-1])

# def detect(image_path):
#     image_np = cv2.imread(image_path)
#     input_tensor = tf.convert_to_tensor(np.expand_dims(image_np, 0), dtype=tf.float32)
#     detections, predictions_dict, shapes = detect_fn(input_tensor)

#     label_id_offset = 1
#     image_np_with_detections = image_np.copy()

#     viz_utils.visualize_boxes_and_labels_on_image_array(
#           image_np_with_detections,
#           detections['detection_boxes'][0].numpy(),
#           (detections['detection_classes'][0].numpy() + label_id_offset).astype(int),
#           detections['detection_scores'][0].numpy(),
#           category_index,
#           use_normalized_coordinates=True,
#           max_boxes_to_draw=200,
#           min_score_thresh=CONFIDENCE_THRESHOLD,
#           agnostic_mode=False)

#     return image_np_with_detections

# # Test the model on a single image
# image_file = 'image.jpg'
# image_path = os.path.join(IMAGE_DIR, image_file)
# output_image = detect(image_path)

# # Display the image
# cv2.imshow('Output', output_image)
# cv2.waitKey(0)
# cv2.destroyAllWindows()
