In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from keras_detection.models.box_detector import BoxDetectionOutput
import keras_detection.datasets.datasets_ops as datasets_ops
import keras_detection.datasets.random_rectangles as random_rects
from keras_detection import ImageData

In [None]:
keras = tf.keras

In [None]:
dataset = datasets_ops.from_numpy_generator(
    random_rects.create_random_rectangles_dataset_generator(min_max_num_boxes=(5, 20))
)
dataset

In [None]:
# Benchmark dataset sampling
# %timeit next(iter(dataset))

In [None]:
image_data = ImageData.from_dict(next(iter(dataset)))
image_frame_data = BoxDetectionOutput.from_tf_boxes(
    boxes=image_data.labels.boxes.numpy(),
    labels=image_data.labels.labels.numpy()
)
image_frame_data.draw(image_data.features.image.numpy())

In [None]:
import keras_detection.datasets.datasets_ops as datasets_ops

def aug_fn(image_data: ImageData) -> ImageData:
    image = tf.cast(image_data.features.image, tf.float32)
    image = tf.image.random_brightness(image, max_delta=1.2)
    return image_data.replace_image(image)


image_dim = 224
batch_size = 32
shuffle_buffer_size = 1 # we don't have to shuffle random rectangles dataset
num_classes = 9 # random rectangles have 9 classes
num_parallel_calls = tf.data.experimental.AUTOTUNE

In [None]:
dataset = datasets_ops.from_numpy_generator(
    random_rects.create_random_rectangles_dataset_generator(min_max_num_boxes=(5, 20))
)

train_dataset = datasets_ops.prepare_dataset(
    dataset,
    model_image_size=(image_dim, image_dim),
    augmentation_fn=aug_fn,
    num_epochs=-1,
    batch_size=batch_size ,
    shuffle_buffer_size=shuffle_buffer_size,
    prefetch_buffer_size=4,
    num_parallel_calls=num_parallel_calls
)

In [None]:
# %timeit next(iter(train_dataset))

In [None]:
from keras_detection import FPNBuilder
from keras_detection.tasks import standard_tasks
from keras_detection.backbones import resnet
from keras_detection.utils import plotting


backbone = resnet.ResNetBackbone(
    input_shape=(image_dim, image_dim, 3),
    units_per_block=(1, 1, 1),
    num_last_blocks=2, # number of feature pyramids
)

tasks = [
    # predicts objectnes score for each anchor
    standard_tasks.get_objectness_task(label_smoothing=0.02, obj_class="center_ignore_margin"),
    # predicts [height, with, y_center, x_center] location of the box
    standard_tasks.get_box_shape_task("box_shape"),
    # predicts [num_classes] for each anchor (focal loss is not supported yet)
    standard_tasks.get_multiclass_task(num_classes, fl_gamma=0.0, label_smoothing=0, activation='softmax'),
]

builder = FPNBuilder(backbone=backbone, tasks=tasks)

In [None]:
model = builder.build()

In [None]:
model.summary()

# Training model

In [None]:
prepared_train_dataset = train_dataset.map(builder.get_build_training_targets_fn())

In [None]:
prepared_train_dataset

In [None]:
features, labels = next(iter(prepared_train_dataset))

In [None]:
fm = "fm28x28"

targets = [labels[n][..., :-1] for n in builder.get_outputs_names()]
targets = builder.predictions_to_dict(targets, postprocess=True)

idx = 0
target=dict(
    objectness=targets[f'{fm}/objectness'][idx],
    boxes_shape_map=targets[f'{fm}/box_shape'][idx],
    classes_map=targets[f'{fm}/classes'][idx],
)

render = plotting.draw_compares(    
    target=target,
    predicted=None,
    all_targets=True,
    draw_fns=[
        plotting.draw_boxes, 
        plotting.draw_classes_map,
        plotting.draw_objectness_map,
        plotting.draw_classes_max_score_map,
    ],
    image=features['image'][idx] / 255,
    score_threshold=0.2, 
)
render

In [None]:
import keras_detection.models.utils as kd_utils

l2_reg_fn = kd_utils.get_l2_loss_fn(l2_reg=1e-5, model=model)
model.add_loss(l2_reg_fn)

In [None]:
optimizer = keras.optimizers.Adam(learning_rate=0.001, beta_2=0.995)
model.compile(optimizer, **builder.get_model_compile_args())
model.fit(prepared_train_dataset, epochs=1, steps_per_epoch=100)