In [None]:
import nbformat as nbf
import glob

def py_to_nb(py_file):
    with open(py_file) as f:
        code = f.read()

    return nbf.v4.new_code_cell(source=code)

py_files = glob.glob('*.py')

nb = nbf.v4.new_notebook()

for py_file in py_files:
    new_cell = py_to_nb(py_file)
    nb.cells.append(new_cell)

with open('combined.ipynb', 'w') as f:
    nbf.write(nb, f)

In [None]:
import math
import tensorflow as tf
import utils
import os
import model as m
import losses
import numpy as np
import json
import argparse
import datetime
import glob
import cv2
from tqdm import tqdm
from utils import visualize_detections

LABEL_MAP = {
    1: "No entry",
    2: "No parking / waiting",
    3: "No turning",
    4: "Max Speed",
    5: "Other prohibition signs",
    6: "Warning",
    7: "Mandatory",
}

class Prediction:
    def __init__(self,
    inference_model,
    crop_size=200,
    image_height=626,
    image_width=1622,
    crop_height=300,
    overlap=75,
    dynamic_size=False,
    tiling_size=968):
        self.crop_size = crop_size
        self.crop_height = crop_height
        self.image_width = image_width
        self.tiling_size = tiling_size
        self.image_height = image_height
        self.overlap = overlap
        self.dynamic_size = dynamic_size
        self.g_slice_indices = self.get_slice_indices(image_width)
        self.g_slice_indices_y = self.get_slice_indices(image_height)
        self.seperate_y = len(self.g_slice_indices_y)
        self.inference_model = inference_model

    def set_height(self, height):
        self.image_height = height
        self.crop_height = height // 4
        self.g_slice_indices_y = self.get_slice_indices(height)

    def set_width(self, width):
        self.image_width = width
        self.crop_size = width // 4
        self.g_slice_indices = self.get_slice_indices(width)

    def get_offset(self, idx):
        cur_rank_y = idx // self.seperate
        if idx >= self.seperate * cur_rank_y:
            idx = idx - self.seperate * cur_rank_y
        idx_y = cur_rank_y

        return self.g_slice_indices[idx][0], self.g_slice_indices_y[idx_y][0]

    def get_slice_indices(self, full_size):
        crop_s = self.crop_size
        over = self.overlap
        num_paths = math.ceil(full_size / crop_s)

        if full_size == self.image_height and full_size != self.image_width:
            if self.crop_height == 0:
                return [[0, self.image_height]]
            crop_s = self.crop_height
            over = 30
        else:
            self.seperate = num_paths

        slices = []
        for i in range(num_paths):
            start = max(crop_s * i - over, 0)
            end = start + crop_s
            if end > full_size:
                end = full_size
                start = end - crop_s

            slices.append([start, end])

        return slices

    def get_input_img(self, image, crop=False, crop_size=512):
        image = tf.convert_to_tensor(image)

        if self.dynamic_size:
            shape = image.shape
            self.set_height(shape[0])
            self.set_width(shape[1])

        train_imgs = []
        small_imgs = []

        if crop:
            ratio = 0
            for start_y, end_y in self.g_slice_indices_y:
                for start_x, end_x in self.g_slice_indices:
                    small_img = image[start_y:end_y, start_x: end_x, :]
                    if start_x + self.crop_size > self.image_width:
                        start_x = self.image_width -  self.crop_size
                    if start_y + self.crop_height > self.image_height:
                        start_y = self.image_height - self.crop_height
                    
                    small_img = tf.slice(image, [start_y, start_x, 0], [self.crop_height, self.crop_size, 3])

                    croped, _, ratio = utils.resize_and_pad_image(small_img,
                                                                  crop_size,
                                                                  crop_size, jitter=None)
                    train_imgs.append(tf.expand_dims(croped, axis=0))
                    small_imgs.append(small_img)

            return [tf.keras.applications.resnet.preprocess_input(i) for i in train_imgs], image, ratio

        else:
            train_img, _, ratio = utils.resize_and_pad_image(image,
                                                             crop_size,
                                                             crop_size,
                                                             jitter=None)
            train_img = tf.keras.applications.resnet.preprocess_input(train_img)
            return tf.expand_dims(train_img, axis=0), image, ratio

    def revert_bboxes(self, boxes, idx):
        offset_x, offset_y = self.get_offset(idx)
        return tf.stack([
            boxes[idx, :, 0] + offset_x,
            boxes[idx, :, 1] + offset_y,
            boxes[idx, :, 2] + offset_x,
            boxes[idx, :, 3] + offset_y,
        ], axis=-1)

    def detect_single_image(self, image, crop_sizes=[], tiling=False):
        all_boxes = []
        all_scores = []
        all_classes = []

        sboxes, sscores, sclasses = [], [], []

        if not crop_sizes:
            crop_sizes = [1024]

        detected = False
        if tiling:
            input_img, image, ratio = self.get_input_img(image, crop=True, crop_size=self.tiling_size)

            detections = self.inference_model.predict_on_batch(tf.concat(input_img, 0))

            boxes = detections.nmsed_boxes / ratio
            for i, valids in enumerate(detections.valid_detections):
                if valids > 0:
                    for j in range(valids):
                        sboxes.append(self.revert_bboxes(boxes, i)[j])

                    sclasses.append(detections.nmsed_classes[i][:valids])
                    sscores.append(detections.nmsed_scores[i][:valids])

            if len(sboxes):
                sboxes = tf.stack(sboxes)
                sscores = tf.concat(sscores, 0)
                sclasses = tf.concat(sclasses, 0)

        small_detections = len(sboxes)

        for crop_size in crop_sizes:
            input_img, image, ratio = self.get_input_img(image, crop=False, crop_size=crop_size)
            detections = self.inference_model.predict(input_img)
            num_detections = detections.valid_detections[0]

            if num_detections:
                detected = True
                scores = detections.nmsed_scores[0][:num_detections]

                all_boxes.append(detections.nmsed_boxes[0][:num_detections] / ratio)
                all_scores.append(scores)
                all_classes.append(detections.nmsed_classes[0][:num_detections])

        if small_detections:       
            if len(all_classes):
                all_boxes = tf.concat(all_boxes, 0)
                all_scores = tf.concat(all_scores, 0)
                all_classes = tf.concat(all_classes, 0)

                if detected:
                    all_boxes = tf.concat([all_boxes, sboxes ], 0)
                    all_scores = tf.concat([all_scores, sscores], 0)
                    all_classes = tf.concat([all_classes, sclasses], 0)
            else:
                all_boxes = sboxes
                all_scores =  sscores
                all_classes = sclasses


        elif detected:
            all_boxes = tf.concat(all_boxes, 0)
            all_scores = tf.concat(all_scores, 0)
            all_classes = tf.concat(all_classes, 0)

        if detected or small_detections:
            selected_indices = tf.image.non_max_suppression(
                all_boxes,
                all_scores,
                50,
                iou_threshold=0.1,
                score_threshold=0.5,
            )

            selected_indices = selected_indices.numpy()

            if len(selected_indices):
                return (image,
                        tf.gather(all_boxes, selected_indices),
                        tf.gather(all_scores, selected_indices),
                        tf.gather(all_classes, selected_indices))

        return image, all_boxes, all_scores, all_classes


def get_inference_model(weight_path, backbone="resnet50"):
    num_of_classes = 7
    model = m.RetinaNet(num_of_classes, backbone=backbone)
    model.compile(optimizer="adam", loss=losses.RetinaNetLoss(num_of_classes))
    model.build((1, None, None, 3))
    image = tf.keras.Input(shape=[None, None, 3], name="image")
    model.load_weights(weight_path)
    predictions = model(image, training=False)
    detections = m.DecodePredictions(confidence_threshold=0.5,
                                     num_classes=num_of_classes,
                                     max_detections_per_class=10,
                                     nms_iou_threshold=0.5,
                                     verbose=0)(image, predictions)

    inference_model = tf.keras.Model(inputs=image, outputs=detections)

    return inference_model

def combine_prediction(
    prediction_1,
    prediction_2,
    weight_1=1,
    max_detections=50,
    iou_threshold=0.5,
    score_threshold=0.65):
    boxes_1, scores_1, classes_1 = prediction_1
    boxes_2, scores_2, classes_2 = prediction_2

    weight_2 = 1 - weight_1
    highest = max(weight_1, weight_2)
    score_threshold *= highest
    
    if not len(scores_1) and len(scores_2):
        scores = scores_2 * weight_2
        boxes = boxes_2
        classes = classes_2
    elif not len(scores_2) and len(scores_1):
        scores = scores_1 * weight_1
        boxes = boxes_1
        classes = classes_1

    elif not len(scores_1) and not len(scores_2):
        return boxes_1, scores_1, classes_1

    else:
        scores_1 *= weight_1
        scores_2 *= weight_2

        boxes = tf.concat([boxes_1, boxes_2], 0)
        scores = tf.concat([scores_1, scores_2], 0)
        classes = tf.concat([classes_1, classes_2], 0)

    selected_indices =  tf.image.non_max_suppression(
        boxes,
        scores,
        max_detections,
        iou_threshold=iou_threshold,
        score_threshold=score_threshold,
    )

    return (tf.gather(boxes, selected_indices),
            tf.gather(scores / highest, selected_indices),
            tf.gather(classes, selected_indices))


def run_prediction(args):
    input_path, output_path, weight, save_dir = (
        args.input, args.output, args.weight, args.save_dir)

    backbone = weight.split("_")[-1].replace(".h5", "")
    crop_sizes = list(map(int, args.scales.split(",")))
    os.makedirs(save_dir, exist_ok=True)
    os.makedirs("/".join(output_path.split("/")[:-1]), exist_ok=True)

    if output_path.split(".")[-1] != "json":
        raise ValueError("Output file should be json format")

    # Get list of test images
    if os.path.isdir(input_path):
        image_files = glob.glob(os.path.join(input_path, '*'))
    else:
        # it's file
        image_files = [input_path]

    print(f"Test on {len(image_files)} images")

    # Create submission.json
    submission = []
    predictor = Prediction(get_inference_model(weight, backbone))

    start = datetime.datetime.now()
    for file_path in tqdm(image_files):
        image, boxes, scores, classes = predictor.detect_single_image(
            cv2.imread(file_path)[..., ::-1],
            crop_sizes=crop_sizes,
            tiling=args.tiling
        )
        if not isinstance(boxes, list):
            boxes = boxes.numpy()
            scores = scores.numpy()
            classes = classes.numpy()

        if save_dir:
            save_path = os.path.join(save_dir, file_path.split("/")[-1])
            cls_name = [
                LABEL_MAP[int(x)] for x in classes
            ]
            visualize_detections(image, boxes, cls_name, scores, save_path=save_path)

        for i in range(len(boxes)):
            box = boxes[i]
            x1, y1, x2, y2 = box
            xywh = [x1, y1, x2 - x1, y2 - y1]
            score = scores[i]
            cls = classes[i]
            submission.append({
                "image_id": file_path,
                "category_id": int(cls),
                "bbox": [float(z) for z in xywh],
                "score": float(score),
            })

    print("Predict in {}".format(datetime.datetime.now() - start))

    with open(output_path, "w") as f:
        json.dump(submission, f, indent=2)

    print("Submission saved at {}".format(output_path))


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Traffic sign detection')
    parser.add_argument("--input",
                        metavar="I", type=str, default="/data/images",
                        help="Path to input images")
    parser.add_argument("--output", metavar="O", type=str,
                        default="/data/result/submission.json", help="Output file path")
    parser.add_argument("--weight", metavar="W", type=str,
                        default="pretrained_densenet121", help="Weight path")
    parser.add_argument("--save-dir", type=str, default="/content/infernece_images")
    parser.add_argument("--tiling", action="store_true")
    parser.add_argument("--scales", type=str, default="1024", help="Separated by comma ','")

    args = parser.parse_args()

    print(args)

    run_prediction(args)


In [None]:
import tensorflow as tf
import numpy as np
import math
from utils import convert_to_corners, compute_iou
from data_processing import resize_and_pad_image
from tensorflow import keras

def get_backbone(name="resnet50", weight=None):
    """Supported backbone: resnet50, resnet101, densenet121"""
    backbone = None
    if "resnet" in name:
        if name == "resnet50":
            backbone = keras.applications.ResNet50
        elif name == "resnet101":
            backbone = keras.applications.ResNet101

        output_layers = ["conv3_block4_out", "conv4_block6_out", "conv5_block3_out"]

    elif "densenet" in name:
        if name == "densenet121":
            backbone = keras.applications.DenseNet121
            output_layers = ["pool3_conv", "pool4_conv", "relu"]

    backbone_model = backbone(include_top=False, input_shape=[None, None, 3], weights=weight)
    c3_output, c4_output, c5_output = [
        backbone_model.get_layer(layer_name).output
        for layer_name in output_layers
    ]
    return keras.Model(
        inputs=[backbone_model.inputs], outputs=[c3_output, c4_output, c5_output]
    )


class FeaturePyramid(keras.layers.Layer):
    """Builds the Feature Pyramid with the feature maps from the backbone.

    Attributes:
      num_classes: Number of classes in the dataset.
      backbone: The backbone to build the feature pyramid from.
        Currently supports ResNet50 only.
    """

    def __init__(self, backbone="resnet50", weight=None, **kwargs):
        super(FeaturePyramid, self).__init__(name="FeaturePyramid", **kwargs)
        self.backbone = get_backbone(backbone, weight)
        self.conv_c3_1x1 = keras.layers.Conv2D(256, 1, 1, "same")
        self.conv_c4_1x1 = keras.layers.Conv2D(256, 1, 1, "same")
        self.conv_c5_1x1 = keras.layers.Conv2D(256, 1, 1, "same")
        self.conv_c3_3x3 = keras.layers.Conv2D(256, 3, 1, "same")
        self.conv_c4_3x3 = keras.layers.Conv2D(256, 3, 1, "same")
        self.conv_c5_3x3 = keras.layers.Conv2D(256, 3, 1, "same")
        self.conv_c6_3x3 = keras.layers.Conv2D(256, 3, 2, "same")
        self.conv_c7_3x3 = keras.layers.Conv2D(256, 3, 2, "same")

    def call(self, images, training=False):
        c3_output, c4_output, c5_output = self.backbone(images, training=training)
        p3_output = self.conv_c3_1x1(c3_output)
        p4_output = self.conv_c4_1x1(c4_output)
        p5_output = self.conv_c5_1x1(c5_output)
        p4_output = p4_output + keras.layers.UpSampling2D(2)(p5_output)
        p3_output = p3_output + keras.layers.UpSampling2D(2)(p4_output)
        p3_output = self.conv_c3_3x3(p3_output)
        p4_output = self.conv_c4_3x3(p4_output)
        p5_output = self.conv_c5_3x3(p5_output)
        p6_output = self.conv_c6_3x3(c5_output)
        p7_output = self.conv_c7_3x3(tf.nn.relu(p6_output))
        return p3_output, p4_output, p5_output, p6_output, p7_output


def build_head(output_filters, bias_init):
    """Builds the class/box predictions head.

    Arguments:
      output_filters: Number of convolution filters in the final layer.
      bias_init: Bias Initializer for the final convolution layer.

    Returns:
      A keras sequential model representing either the classification
        or the box regression head depending on `output_filters`.
    """
    head = keras.Sequential([keras.Input(shape=[None, None, 256])])
    kernel_init = tf.initializers.RandomNormal(0.0, 0.01)
    for _ in range(4):
        head.add(
            keras.layers.Conv2D(256, 3, padding="same", kernel_initializer=kernel_init)
        )
        head.add(keras.layers.ReLU())
    head.add(
        keras.layers.Conv2D(
            output_filters,
            3,
            1,
            padding="same",
            kernel_initializer=kernel_init,
            bias_initializer=bias_init,
        )
    )
    return head


class RetinaNet(keras.Model):
    """A subclassed Keras model implementing the RetinaNet architecture.

    Attributes:
      num_classes: Number of classes in the dataset.
      backbone: The backbone to build the feature pyramid from.
        Currently supports ResNet50 only.
    """

    def __init__(self, num_classes, backbone=None, weight=None, **kwargs):
        super(RetinaNet, self).__init__(name="RetinaNet", **kwargs)
        self.backbone_name = backbone
        self.fpn = FeaturePyramid(backbone, weight)
        self.num_classes = num_classes

        prior_probability = tf.constant_initializer(-np.log((1 - 0.01) / 0.01))
        self.cls_head = build_head(9 * num_classes, prior_probability)
        self.box_head = build_head(9 * 4, "zeros")

    def call(self, image, training=False):
        features = self.fpn(image, training=True)
        N = tf.shape(image)[0]
        cls_outputs = []
        box_outputs = []
        for feature in features:
            box_outputs.append(tf.reshape(self.box_head(feature), [N, -1, 4]))
            cls_outputs.append(
                tf.reshape(self.cls_head(feature), [N, -1, self.num_classes])
            )
        cls_outputs = tf.concat(cls_outputs, axis=1)
        box_outputs = tf.concat(box_outputs, axis=1)
        return tf.concat([box_outputs, cls_outputs], axis=-1)

class AnchorBox:
    """Generates anchor boxes.

    This class has operations to generate anchor boxes for feature maps at
    strides `[8, 16, 32, 64, 128]`. Where each anchor each box is of the
    format `[x, y, width, height]`.

    Attributes:
      aspect_ratios: A list of float values representing the aspect ratios of
        the anchor boxes at each location on the feature map
      scales: A list of float values representing the scale of the anchor boxes
        at each location on the feature map.
      num_anchors: The number of anchor boxes at each location on feature map
      areas: A list of float values representing the areas of the anchor
        boxes for each feature map in the feature pyramid.
      strides: A list of float value representing the strides for each feature
        map in the feature pyramid.
    """

    def __init__(self):
        self.aspect_ratios = [0.5, 1.0, 2.0]
        self.scales = [2 ** x for x in [0, 1 / 3, 2 / 3]]

        self._num_anchors = len(self.aspect_ratios) * len(self.scales)
        self._strides = [2 ** i for i in range(3, 8)]
        self._areas = [x ** 2 for x in [32.0, 64.0, 128.0, 256.0, 512.0]]
        self._anchor_dims = self._compute_dims()

    def _compute_dims(self):
        """Computes anchor box dimensions for all ratios and scales at all levels
        of the feature pyramid.
        """
        anchor_dims_all = []
        for area in self._areas:
            anchor_dims = []
            for ratio in self.aspect_ratios:
                anchor_height = tf.math.sqrt(area / ratio)
                anchor_width = area / anchor_height
                dims = tf.reshape(
                    tf.stack([anchor_width, anchor_height], axis=-1), [1, 1, 2]
                )
                for scale in self.scales:
                    anchor_dims.append(scale * dims)
            anchor_dims_all.append(tf.stack(anchor_dims, axis=-2))
        return anchor_dims_all

    def _get_anchors(self, feature_height, feature_width, level):
        """Generates anchor boxes for a given feature map size and level

        Arguments:
          feature_height: An integer representing the height of the feature map.
          feature_width: An integer representing the width of the feature map.
          level: An integer representing the level of the feature map in the
            feature pyramid.

        Returns:
          anchor boxes with the shape
          `(feature_height * feature_width * num_anchors, 4)`
        """
        rx = tf.range(feature_width, dtype=tf.float32) + 0.5
        ry = tf.range(feature_height, dtype=tf.float32) + 0.5
        centers = tf.stack(tf.meshgrid(rx, ry), axis=-1) * self._strides[level - 3]
        centers = tf.expand_dims(centers, axis=-2)
        centers = tf.tile(centers, [1, 1, self._num_anchors, 1])
        dims = tf.tile(
            self._anchor_dims[level - 3], [feature_height, feature_width, 1, 1]
        )
        anchors = tf.concat([centers, dims], axis=-1)
        return tf.reshape(
            anchors, [feature_height * feature_width * self._num_anchors, 4]
        )

    def get_anchors(self, image_height, image_width):
        """Generates anchor boxes for all the feature maps of the feature pyramid.

        Arguments:
          image_height: Height of the input image.
          image_width: Width of the input image.

        Returns:
          anchor boxes for all the feature maps, stacked as a single tensor
            with shape `(total_anchors, 4)`
        """
        anchors = [
            self._get_anchors(
                tf.math.ceil(image_height / 2 ** i),
                tf.math.ceil(image_width / 2 ** i),
                i,
            )
            for i in range(3, 8)
        ]
        return tf.concat(anchors, axis=0)


class DecodePredictions(tf.keras.layers.Layer):
    """A Keras layer that decodes predictions of the RetinaNet model.

    Attributes:
      num_classes: Number of classes in the dataset
      confidence_threshold: Minimum class probability, below which detections
        are pruned.
      nms_iou_threshold: IOU threshold for the NMS operation
      max_detections_per_class: Maximum number of detections to retain per
       class.
      max_detections: Maximum number of detections to retain across all
        classes.
      box_variance: The scaling factors used to scale the bounding box
        predictions.
    """

    def __init__(
        self,
        num_classes=80,
        confidence_threshold=0.05,
        nms_iou_threshold=0.5,
        max_detections_per_class=100,
        max_detections=100,
        box_variance=[0.1, 0.1, 0.2, 0.2],
        verbose=0,
        **kwargs
    ):
        super(DecodePredictions, self).__init__(**kwargs)
        self.num_classes = num_classes
        self.verbose = verbose
        self.confidence_threshold = confidence_threshold
        self.nms_iou_threshold = nms_iou_threshold
        self.max_detections_per_class = max_detections_per_class
        self.max_detections = max_detections

        self._anchor_box = AnchorBox()
        self._box_variance = tf.convert_to_tensor(
            [0.1, 0.1, 0.2, 0.2], dtype=tf.float32
        )

    def _decode_box_predictions(self, anchor_boxes, box_predictions):
        boxes = box_predictions * self._box_variance
        boxes = tf.concat(
            [
                boxes[:, :, :2] * anchor_boxes[:, :, 2:] + anchor_boxes[:, :, :2],
                tf.math.exp(boxes[:, :, 2:]) * anchor_boxes[:, :, 2:],
            ],
            axis=-1,
        )
        boxes_transformed = convert_to_corners(boxes)
        return boxes_transformed

    def call(self, images, predictions):
        image_shape = tf.cast(tf.shape(images), dtype=tf.float32)
        anchor_boxes = self._anchor_box.get_anchors(image_shape[1], image_shape[2])
        box_predictions = predictions[:, :, :4]
        cls_predictions = tf.nn.sigmoid(predictions[:, :, 4:])
        boxes = self._decode_box_predictions(anchor_boxes[None, ...], box_predictions)
        return tf.image.combined_non_max_suppression(
            tf.expand_dims(boxes, axis=2),
            cls_predictions,
            self.max_detections_per_class,
            self.max_detections,
            self.nms_iou_threshold,
            self.confidence_threshold,
            clip_boxes=False,
        )


class LabelEncoder:
    """Transforms the raw labels into targets for training.

    This class has operations to generate targets for a batch of samples which
    is made up of the input images, bounding boxes for the objects present and
    their class ids.

    Attributes:
      anchor_box: Anchor box generator to encode the bounding boxes.
      box_variance: The scaling factors used to scale the bounding box targets.
    """

    def __init__(self):
        self._anchor_box = AnchorBox()
        self._box_variance = tf.convert_to_tensor(
            [0.1, 0.1, 0.2, 0.2], dtype=tf.float32
        )

    def _match_anchor_boxes(
        self, anchor_boxes, gt_boxes, match_iou=0.5, ignore_iou=0.4
    ):
        """Matches ground truth boxes to anchor boxes based on IOU.

        1. Calculates the pairwise IOU for the M `anchor_boxes` and N `gt_boxes`
          to get a `(M, N)` shaped matrix.
        2. The ground truth box with the maximum IOU in each row is assigned to
          the anchor box provided the IOU is greater than `match_iou`.
        3. If the maximum IOU in a row is less than `ignore_iou`, the anchor
          box is assigned with the background class.
        4. The remaining anchor boxes that do not have any class assigned are
          ignored during training.

        Arguments:
          anchor_boxes: A float tensor with the shape `(total_anchors, 4)`
            representing all the anchor boxes for a given input image shape,
            where each anchor box is of the format `[x, y, width, height]`.
          gt_boxes: A float tensor with shape `(num_objects, 4)` representing
            the ground truth boxes, where each box is of the format
            `[x, y, width, height]`.
          match_iou: A float value representing the minimum IOU threshold for
            determining if a ground truth box can be assigned to an anchor box.
          ignore_iou: A float value representing the IOU threshold under which
            an anchor box is assigned to the background class.

        Returns:
          matched_gt_idx: Index of the matched object
          positive_mask: A mask for anchor boxes that have been assigned ground
            truth boxes.
          ignore_mask: A mask for anchor boxes that need to by ignored during
            training
        """
        iou_matrix = compute_iou(anchor_boxes, gt_boxes)
        max_iou = tf.reduce_max(iou_matrix, axis=1)
        matched_gt_idx = tf.argmax(iou_matrix, axis=1)
        positive_mask = tf.greater_equal(max_iou, match_iou)
        negative_mask = tf.less(max_iou, ignore_iou)
        ignore_mask = tf.logical_not(tf.logical_or(positive_mask, negative_mask))
        return (
            matched_gt_idx,
            tf.cast(positive_mask, dtype=tf.float32),
            tf.cast(ignore_mask, dtype=tf.float32),
        )

    def _compute_box_target(self, anchor_boxes, matched_gt_boxes):
        """Transforms the ground truth boxes into targets for training"""
        box_target = tf.concat(
            [
                (matched_gt_boxes[:, :2] - anchor_boxes[:, :2]) / anchor_boxes[:, 2:],
                tf.math.log(matched_gt_boxes[:, 2:] / anchor_boxes[:, 2:]),
            ],
            axis=-1,
        )
        box_target = box_target / self._box_variance
        return box_target

    def _encode_sample(self, image_shape, gt_boxes, cls_ids):
        """Creates box and classification targets for a single sample"""
        anchor_boxes = self._anchor_box.get_anchors(image_shape[1], image_shape[2])
        cls_ids = tf.cast(cls_ids, dtype=tf.float32)
        matched_gt_idx, positive_mask, ignore_mask = self._match_anchor_boxes(
            anchor_boxes, gt_boxes
        )
        matched_gt_boxes = tf.gather(gt_boxes, matched_gt_idx)
        box_target = self._compute_box_target(anchor_boxes, matched_gt_boxes)
        matched_gt_cls_ids = tf.gather(cls_ids, matched_gt_idx)
        cls_target = tf.where(
            tf.not_equal(positive_mask, 1.0), -1.0, matched_gt_cls_ids
        )
        cls_target = tf.where(tf.equal(ignore_mask, 1.0), -2.0, cls_target)
        cls_target = tf.expand_dims(cls_target, axis=-1)

        label = tf.concat([box_target, cls_target], axis=-1)

        return label

    def encode_batch(self, batch_images, gt_boxes, cls_ids):
        """Creates box and classification targets for a batch"""
        images_shape = tf.shape(batch_images)
        batch_size = images_shape[0]

        labels = tf.TensorArray(dtype=tf.float32, size=batch_size, dynamic_size=True)
        for i in range(batch_size):
            label = self._encode_sample(images_shape, gt_boxes[i], cls_ids[i])
            labels = labels.write(i, label)
        batch_images = tf.keras.applications.resnet.preprocess_input(batch_images)
        return batch_images, labels.stack()


In [None]:
import cv2
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import requests
import sys


def image_resize(image, width=None, height=None, inter=cv2.INTER_AREA):
    dim = None
    (h, w) = image.shape[:2]
    if width is None and height is None:
        return image

    if width is None:
        r = height / float(h)
        dim = (int(w * r), height)

    else:
        r = width / float(w)
        dim = (width, int(h * r))

    resized = cv2.resize(image, dim, interpolation=inter)
    return resized


def get_img(path, width=None):
    img = cv2.imread(path)
    if width:
        img = image_resize(img, width=width)

    return img


def resize_and_pad_image(
    image,
    min_side=512,
    max_side=1024,
    jitter=[512, 1024],
    stride=128.0
):
    """Resizes and pads image while preserving aspect ratio.

    1. Resizes images so that the shorter side is equal to `min_side`
    2. If the longer side is greater than `max_side`, then resize the image
      with longer side equal to `max_side`
    3. Pad with zeros on right and bottom to make the image shape divisible by
    `stride`

    Arguments:
      image: A 3-D tensor of shape `(height, width, channels)` representing an
        image.
      min_side: The shorter side of the image is resized to this value, if
        `jitter` is set to None.
      max_side: If the longer side of the image exceeds this value after
        resizing, the image is resized such that the longer side now equals to
        this value.
      jitter: A list of floats containing minimum and maximum size for scale
        jittering. If available, the shorter side of the image will be
        resized to a random value in this range.
      stride: The stride of the smallest feature map in the feature pyramid.
        Can be calculated using `image_size / feature_map_size`.

    Returns:
      image: Resized and padded image.
      image_shape: Shape of the image before padding.
      ratio: The scaling factor used to resize the image
    """
    image_shape = tf.cast(tf.shape(image)[:2], dtype=tf.float32)
    if jitter is not None:
        min_side = tf.random.uniform((), jitter[0], jitter[1], dtype=tf.float32)
    ratio = min_side / tf.reduce_min(image_shape)
    if ratio * tf.reduce_max(image_shape) > max_side:
        ratio = max_side / tf.reduce_max(image_shape)
    image_shape = ratio * image_shape
    image = tf.image.resize(image, tf.cast(image_shape, dtype=tf.int32))
    padded_image_shape = tf.cast(
        tf.math.ceil(image_shape / stride) * stride, dtype=tf.int32
    )
    image = tf.image.pad_to_bounding_box(
        image, 0, 0, padded_image_shape[0], padded_image_shape[1]
    )
    return image, image_shape, ratio


def swap_xy(boxes):
    """Swaps order the of x and y coordinates of the boxes.

    Arguments:
      boxes: A tensor with shape `(num_boxes, 4)` representing bounding boxes.

    Returns:
      swapped boxes with shape same as that of boxes.
    """
    return tf.stack([boxes[:, 1], boxes[:, 0], boxes[:, 3], boxes[:, 2]], axis=-1)


def to_xyxy(bbox):
    return tf.stack(
        [bbox[:, 0], bbox[:, 1], bbox[:, 2] + bbox[:, 0], bbox[:, 3] + bbox[:, 1],],
        axis=-1,
    )


def normalize_bbox(bbox, w=1622, h=626):
    return tf.stack([
        bbox[:, 0] / w,
        bbox[:, 1] / h,
        bbox[:, 2] / w,
        bbox[:, 3] / h,
    ], axis=-1)


def convert_to_xywh(boxes):
    """Changes the box format to center, width and height.

    Arguments:
      boxes: A tensor of rank 2 or higher with a shape of `(..., num_boxes, 4)`
        representing bounding boxes where each box is of the format
        `[xmin, ymin, xmax, ymax]`.

    Returns:
      converted boxes with shape same as that of boxes.
    """
    return tf.concat(
        [(boxes[..., :2] + boxes[..., 2:]) / 2.0, boxes[..., 2:] - boxes[..., :2]],
        axis=-1,
    )


def convert_to_corners(boxes):
    """Changes the box format to corner coordinates

    Arguments:
      boxes: A tensor of rank 2 or higher with a shape of `(..., num_boxes, 4)`
        representing bounding boxes where each box is of the format
        `[x, y, width, height]`.

    Returns:
      converted boxes with shape same as that of boxes.
    """
    return tf.concat(
        [boxes[..., :2] - boxes[..., 2:] / 2.0, boxes[..., :2] + boxes[..., 2:] / 2.0],
        axis=-1,
    )


def compute_iou(boxes1, boxes2):
    """Computes pairwise IOU matrix for given two sets of boxes

    Arguments:
      boxes1: A tensor with shape `(N, 4)` representing bounding boxes
        where each box is of the format `[x, y, width, height]`.
        boxes2: A tensor with shape `(M, 4)` representing bounding boxes
        where each box is of the format `[x, y, width, height]`.

    Returns:
      pairwise IOU matrix with shape `(N, M)`, where the value at ith row
        jth column holds the IOU between ith box and jth box from
        boxes1 and boxes2 respectively.
    """
    boxes1_corners = convert_to_corners(boxes1)
    boxes2_corners = convert_to_corners(boxes2)
    lu = tf.maximum(boxes1_corners[:, None, :2], boxes2_corners[:, :2])
    rd = tf.minimum(boxes1_corners[:, None, 2:], boxes2_corners[:, 2:])
    intersection = tf.maximum(0.0, rd - lu)
    intersection_area = intersection[:, :, 0] * intersection[:, :, 1]
    boxes1_area = boxes1[:, 2] * boxes1[:, 3]
    boxes2_area = boxes2[:, 2] * boxes2[:, 3]
    union_area = tf.maximum(
        boxes1_area[:, None] + boxes2_area - intersection_area, 1e-8
    )
    return tf.clip_by_value(intersection_area / union_area, 0.0, 1.0)


def visualize_detections(
    image, boxes, classes, scores, figsize=(15, 15), linewidth=2, color=[1, 0, 0],
    box_true=None, label_true=None, save_path=''
):
    """Visualize Detections"""
    image = np.array(image, dtype=np.uint8)
    plt.figure(figsize=figsize)

    plt.axis("off")
    plt.imshow(image)
    ax = plt.gca()
    for i in range(len(boxes)):
        box, _cls, score = boxes[i], classes[i], scores[i]

        text = "{}: {:.2f}".format(_cls, score)
        x1, y1, x2, y2 = box
        w, h = x2 - x1, y2 - y1
        patch = plt.Rectangle(
            [x1, y1], w, h, fill=False, edgecolor=color, linewidth=linewidth
        )
        ax.add_patch(patch)
        ax.text(
            x1,
            y1,
            text,
            bbox={"facecolor": color, "alpha": 0.4},
            clip_box=ax.clipbox,
            clip_on=True,
        )

    if box_true is not None and label_true is not None:
        for i in range(len(box_true)):
            box_t, cls_t = box_true[i], label_true[i]
            text = "{}: {:.2f}".format(cls_t, 1.0)
            x1, y1, w, h = box_t
            patch = plt.Rectangle(
                [x1, y1], w, h, fill=False,
                edgecolor=[1,1,1], linewidth=3
            )
            ax.add_patch(patch)
    
    if save_path:
        plt.savefig(save_path, bbox_inches='tight')
    else:
        plt.show()

    return ax


def try_ignore_error(func, *argv):
    """
    Try and ignore error
    @params:
      + func: function
      + *argv: arguments of func
    """
    try:
        func(*argv)
    except Exception as e:
        print("WARN: ", e)


In [None]:
from genericpath import exists
import json
import tensorflow as tf
import model as m
import data_processing
import losses
import utils
import argparse
import os

def parse_args():
    parser = argparse.ArgumentParser(description='Traffic sign detection')
    parser.add_argument("--input", dest="input_path",
                        metavar="I", type=str, default="/data/images",
                        help="Path to training images")
    parser.add_argument("--backbone", type=str, default='resnet50')
    parser.add_argument("--init-from", type=str, default='resnet50',
                        help='Path to pretrained weight or backbone name')
    parser.add_argument("--batch-size", type=int, default=2)
    parser.add_argument("--epochs", type=int, default=10)
    parser.add_argument("--n-classes", type=int, default=7)
    parser.add_argument("--checkpoint-dir", type=str, default='weights')
    parser.add_argument("--force-tfrec", action='store_true')
    parser.add_argument("--debug-samples", type=int, default=0)

    return parser.parse_args()

def main(args):
    TFRECORDS_FILE = "/tmp/images.tfrecords"
    metadata = json.load(open("./train_traffic_sign_dataset.json", "r"))
    os.makedirs(args.checkpoint_dir, exist_ok=True)

    if args.force_tfrec or not os.path.isfile(TFRECORDS_FILE):
        print("Create tfrecords dataset")
        data_processing.write_tfrecords(
            data_processing.create_dataset_list(metadata["annotations"]),
            TFRECORDS_FILE,
             args.input_path
        )

    autotune = tf.data.experimental.AUTOTUNE
    batch_size = args.batch_size


    fdataset = tf.data.TFRecordDataset(TFRECORDS_FILE)
    data_processor = data_processing.DataProcessing(width=400, height=154)
    label_encoder = m.LabelEncoder()
    dataset = fdataset.map(data_processor.preprocess_data)
    dataset = dataset.shuffle(batch_size)
    dataset = dataset.padded_batch(
        batch_size,
        padding_values=(0.0, 1e-8, tf.cast(-1, tf.int64)),
        drop_remainder=True,
    )
    dataset = dataset.map(
        label_encoder.encode_batch, num_parallel_calls=autotune
    )
    dataset = dataset.apply(tf.data.experimental.ignore_errors())
    dataset = dataset.prefetch(autotune)

    train_size = args.debug_samples or 4500
    train_data = dataset
    train_steps_per_epoch = train_size // batch_size
    train_steps = 6 * 10000
    epochs = train_steps // train_steps_per_epoch

    learning_rates = [1e-4, 0.000625, 0.00125, 0.0025, 0.00025, 2.5e-05]
    learning_rate_boundaries = [125, 250, 500, 240000, 360000]
    learning_rate_fn = tf.optimizers.schedules.PiecewiseConstantDecay(
        boundaries=learning_rate_boundaries, values=learning_rates
    )
    optimizer = tf.optimizers.SGD(learning_rate=learning_rate_fn, momentum=0.9)

    callbacks_list = [
        tf.keras.callbacks.ModelCheckpoint(
            filepath=os.path.join(args.checkpoint_dir, f'weight_{args.backbone}.h5'),
            monitor="loss",
            save_best_only=False,
            save_weights_only=True,
            verbose=1,
        )
    ]

    model = m.RetinaNet(args.n_classes, backbone=args.backbone)
    model.compile(optimizer=optimizer, loss=losses.RetinaNetLoss(args.n_classes))
    model.build((1, None, None, 3))
    utils.try_ignore_error(model.load_weights, args.init_from)

    H = model.fit(train_data.repeat(),
                epochs=epochs,
                steps_per_epoch=train_steps_per_epoch,
                callbacks=callbacks_list)


if __name__ == '__main__':
    main(parse_args())

In [None]:
import tensorflow as tf
import numpy as np
import data_augmentation as augmentation
from utils import (
    resize_and_pad_image,
    swap_xy,
    convert_to_xywh,
    convert_to_corners,
    to_xyxy,
    normalize_bbox,
)
import math
import os
from tqdm import tqdm

image_feature_description = {
    "bbox": tf.io.FixedLenFeature([], tf.string),
    "label": tf.io.FixedLenFeature([], tf.string),
    "image": tf.io.FixedLenFeature([], tf.string),
}

def bytes_feature(value):
    """Returns a bytes_list from a string / byte."""
    if isinstance(value, type(tf.constant(0))):
        value = value.numpy()  # BytesList won't unpack a string from an EagerTensor.
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))


def float_feature(value):
    """Returns a float_list from a float / double."""
    return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))


def int64_feature(value):
    """Returns an int64_list from a bool / enum / int / uint."""
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))


def float_array_feature(value):
    return tf.train.Feature(float_list=tf.train.FloatList(value=value))


def has_small_bbox(bboxes):
    areas = (bboxes[:, 2] - bboxes[:, 0]) * (bboxes[:, 3] - bboxes[:, 1])
    min_val = tf.constant(650, dtype=tf.float32)
    return tf.math.reduce_any(tf.math.less(areas, min_val))

def create_dataset_list(annotations):
    image_bboxes = {}
    for item in annotations:
        img_id = item.get("image_id")
        if img_id in image_bboxes:
            image_bboxes[img_id]["bbox"].append(item["bbox"])
            image_bboxes[img_id]["label"].append(item["category_id"])
        else:
            image_bboxes[img_id] = {
                "id": img_id,
                "bbox": [item["bbox"]],
                "label": [item["category_id"]],
            }

    return list(image_bboxes.values())


def image_example(image_string, label, bbox):
    feature = {
        "bbox": bytes_feature(bbox),
        "label": bytes_feature(label),
        "image": bytes_feature(image_string),
    }

    return tf.train.Example(features=tf.train.Features(feature=feature))


def write_tfrecords(data, file_path, train_dir):
    if train_dir.endswith("images"):
        train_dir = train_dir.replace("images", "")

    with tf.io.TFRecordWriter(file_path) as writer:
        for img_info in tqdm(data):
            ipath = "{}images/{}.png".format(train_dir, img_info["id"])
            image_string = open(ipath, "rb").read()
            tf_example = image_example(
                image_string,
                np.array(img_info["label"]).tobytes(),
                np.array(img_info["bbox"]).tobytes(),
            )
            writer.write(tf_example.SerializeToString())

class DataProcessing:
    def __init__(self, origin_width=1622, origin_height=626 , width=400,
                height=154, augment=True, mix_iterator=None,convert_xywh=True,
                random_cropping=True, dynamic_size=False):
        self.origin_width = origin_width
        self.origin_height = origin_height
        self.dynamic_size = dynamic_size
        self.width = width
        self.height = height
        self.random_cropping = random_cropping
        self.scale_x = self.origin_width / self.width
        self.scale_y = self.origin_height / self.height
        self.convert_xywh = convert_xywh
        self.augment = augment
        self.mix_iterator = mix_iterator

    def set_width(self, width):
        self.width = width

    def set_height(self, height):
        self.height = height

    def moved_box(self, box, x1, y1, width, height):
        x1, y1 = tf.cast(x1, tf.float32), tf.cast(y1, tf.float32)

        return tf.stack([
            (box[:, 0] - x1) * self.scale_x,
            (box[:, 1] - y1) * self.scale_y,
            (box[:, 2] - x1) * self.scale_x,
            (box[:, 3] - y1) * self.scale_y,
        ], axis=1)


    def get_slice_indices(self):
        num_paths = math.ceil(self.origin_width / self.width)
        slices = []
        for i in range(num_paths):
            start = max(self.width * i - self.overlap_x, 0)
            end = start + self.width
            if end > self.origin_width:
                start = end - self.origin_width
                end = self.origin_width

            slices.append([start, end])

        return slices

    def random_crop(self, image, bbox, labels):
        width = self.width
        height = self.height
        idx = tf.random.uniform((), 0, tf.shape(bbox)[0], tf.int32)
        selected_box = bbox[idx]
        x1, y1, x2, y2 = tf.unstack(selected_box, axis=0)
        x1 = tf.cast(x1, tf.int32)
        x2 = tf.cast(x2, tf.int32)
        y1 = tf.cast(y1, tf.int32)
        y2 = tf.cast(y2, tf.int32)

        # 60% part of object lie inside the frame is considered valid
        accept_ratio = 0.6
        mean_x1, mean_x2 = tf.reduce_mean(bbox[:, 0]), tf.reduce_mean(bbox[:, 2])
        pad_size = accept_ratio * (mean_x2 - mean_x1)

        x1 = tf.random.uniform((), x1 - width, x1, dtype=tf.int32)
        y1 = tf.random.uniform((), y1 - height, y1, dtype=tf.int32)

        if tf.less(x1, 0):
            x1 = 0

        if tf.less(y1, 0):
            y1 = 0

        if tf.greater(x1 + width, self.origin_width):
            x1 = self.origin_width - width

        if tf.greater(y1 + height, self.origin_height):
            y1 = self.origin_height - height

        if tf.greater(y2, y1 + height):
            y1 = y1 + (y2 - (y1 + height))

        if tf.greater(x2, x1 + width):
            x1 = x1 + (x2 - (x1 + width))

        # [height, width, channels]
        cropped = tf.slice(image, [y1, x1, 0], [height, width, 3])

        x1 = tf.cast(x1, tf.float32)
        y1 = tf.cast(y1, tf.float32)
        width = tf.cast(width, tf.float32)
        height = tf.cast(height, tf.float32)

        # filter out boxes that not lie inside the cropped image
        x1_b, y1_b, x2_b, y2_b = tf.unstack(bbox, axis=1)

        # 1. x1 of box > cropped width
        # 2. x2 of box < cropped width
        x_condition = tf.logical_and(
            tf.greater(x1_b, x1 - pad_size),
            tf.less(x2_b, x1 + width + pad_size)
        )
        # 3. y1 of box> cropped height
        # 4. y2 of box> cropped height
        y_condition = tf.logical_and(
            tf.greater(y1_b, y1 - pad_size),
            tf.less(y2_b, y1 + height + pad_size)
        )

        cond = tf.logical_and(x_condition, y_condition)
        positive_mask = tf.where(cond)

        bbox = self.moved_box(bbox, x1, y1, width, height)
        bbox = tf.gather_nd(bbox, positive_mask)
        labels = tf.gather_nd(labels, positive_mask)

        return cropped, bbox, labels

    def preprocess_data(self, example):
        """
        Applies preprocessing step to a single example
        """
        sample = tf.io.parse_single_example(example, image_feature_description)
        image = tf.image.decode_png(sample["image"])
        bbox = tf.cast(
            tf.io.decode_raw(sample["bbox"], out_type=tf.int64), dtype=tf.float32
        )

        label = tf.io.decode_raw(sample["label"], out_type=tf.int64)
        bbox = to_xyxy(tf.reshape(bbox, (-1, 4)))

        if self.dynamic_size:
            shape = tf.shape(image)
            self.origin_width = shape[1]
            self.origin_height = shape[0]

        if not self.augment:
            image, bbox, label = self.random_crop(image, bbox, label)
            image = tf.image.resize(image, (self.origin_height, self.origin_width))
            if self.convert_xywh:
                bbox = convert_to_xywh(bbox)
            return image, bbox, label

        # Data augmentation
        image = augmentation.random_adjust_brightness(image)
        image = augmentation.random_adjust_contrast(image)
        # crop the region contain at least 1 bounding box
        has_smallb = has_small_bbox(bbox)
        if self.random_cropping and tf.logical_or(has_smallb, tf.random.uniform(()) > 0.5):
            image, bbox, label = self.random_crop(image, bbox, label)

        bbox = normalize_bbox(bbox, self.origin_width, self.origin_height)
        image, bbox = augmentation.random_flip_horizontal(image, bbox)

        if not has_smallb:
            image = augmentation.random_gaussian_blur(image, 0.5)

        image, image_shape, _ = resize_and_pad_image(image, jitter=None)
        w, h = image_shape[0], image_shape[1]

        bbox = tf.stack([
            bbox[:, 0] * h,
            bbox[:, 1] * w,
            bbox[:, 2] * h,
            bbox[:, 3] * w,
        ], axis=-1)

        if self.convert_xywh:
            bbox = convert_to_xywh(bbox)

        return image, bbox, label


In [None]:
import tensorflow as tf

class RetinaNetBoxLoss(tf.losses.Loss):
    """Implements Smooth L1 loss"""

    def __init__(self, delta):
        super(RetinaNetBoxLoss, self).__init__(
            reduction="none", name="RetinaNetBoxLoss"
        )
        self._delta = delta

    def call(self, y_true, y_pred):
        difference = y_true - y_pred
        absolute_difference = tf.abs(difference)
        squared_difference = difference ** 2
        loss = tf.where(
            tf.less(absolute_difference, self._delta),
            0.5 * squared_difference,
            absolute_difference - 0.5,
        )
        return tf.reduce_sum(loss, axis=-1)

class RetinaNetClassificationLoss(tf.losses.Loss):
    """Implements Focal loss"""

    def __init__(self, alpha, gamma, label_smoothing):
        super(RetinaNetClassificationLoss, self).__init__(
            reduction="none", name="RetinaNetClassificationLoss"
        )
        self._alpha = alpha
        self._gamma = gamma
        self._label_smoothing = label_smoothing

    def call(self, y_true, y_pred):
        cross_entropy = tf.nn.sigmoid_cross_entropy_with_logits(
            labels=y_true, logits=y_pred
        )
        probs = tf.nn.sigmoid(y_pred)

        if self._label_smoothing:
            alpha = tf.where(tf.greater(y_true, 0.91), self._alpha, (1.0 - self._alpha))
            pt = tf.where(tf.greater(y_true, 0.91), probs, 1 - probs)
        else:
            alpha = tf.where(tf.equal(y_true, 1.0), self._alpha, (1.0 - self._alpha))
            pt = tf.where(tf.equal(y_true, 1.0), probs, 1 - probs)

        loss = alpha * tf.pow(1.0 - pt, self._gamma) * cross_entropy
        return tf.reduce_sum(loss, axis=-1)

class RetinaNetLoss(tf.losses.Loss):
    """Wrapper to combine both the losses"""

    def __init__(self, num_classes=80, alpha=0.25,
                gamma=2.0, delta=1.0, label_smoothing=False,
                ):
        super(RetinaNetLoss, self).__init__(reduction="auto", name="RetinaNetLoss")
        self._clf_loss = RetinaNetClassificationLoss(alpha, gamma, label_smoothing)
        self._box_loss = RetinaNetBoxLoss(delta)
        self._num_classes = num_classes
        self._label_smoothing = label_smoothing
        self._factor = 0.1
        self._max_label = (1 - self._factor)

    def call(self, y_true, y_pred):
        y_pred = tf.cast(y_pred, dtype=tf.float32)
        box_labels = y_true[:, :, :4]
        box_predictions = y_pred[:, :, :4]
        cls_labels = tf.one_hot(
            tf.cast(y_true[:, :, 4], dtype=tf.int32),
            depth=self._num_classes,
            dtype=tf.float32,
        )

        if self._label_smoothing:
            cls_labels = _smooth_labels(cls_labels)

        cls_predictions = y_pred[:, :, 4:]
        positive_mask = tf.cast(tf.greater(y_true[:, :, 4], -1.0), dtype=tf.float32)
        ignore_mask = tf.cast(tf.equal(y_true[:, :, 4], -2.0), dtype=tf.float32)
        clf_loss = self._clf_loss(cls_labels, cls_predictions)
        box_loss = self._box_loss(box_labels, box_predictions)

        if self._label_smoothing:
            clf_loss = tf.where(tf.greater(ignore_mask, 0.8), 0.0, clf_loss)
            box_loss = tf.where(tf.equal(positive_mask, 1), box_loss, 0.0)
        else:
            clf_loss = tf.where(tf.equal(ignore_mask, 1.0), 0.0, clf_loss)
            box_loss = tf.where(tf.equal(positive_mask, 1.0), box_loss, 0.0)

        normalizer = tf.reduce_sum(positive_mask, axis=-1)
        clf_loss = tf.math.divide_no_nan(tf.reduce_sum(clf_loss, axis=-1), normalizer)
        box_loss = tf.math.divide_no_nan(tf.reduce_sum(box_loss, axis=-1), normalizer)

        loss = clf_loss + box_loss

        return loss


def _smooth_labels(labels):
    """Apply label smoothing"""
    factor = 0.1
    labels = labels * (1 - factor)
    labels = labels + (factor / tf.cast(tf.shape(labels)[1], tf.float32))

    return labels


In [None]:
import tensorflow as tf
import numpy as np
import math

size = 3
kernel_motion_blur = np.zeros((size, size))
kernel_motion_blur[int((size - 1) / 2), :] = np.ones(size)
kernel_motion_blur = kernel_motion_blur / size
kernel_motion_blur = np.expand_dims(kernel_motion_blur, axis=-1)
kernel_motion_blur = np.repeat(kernel_motion_blur, repeats=3, axis=-1)
kernel_motion_blur = np.expand_dims(kernel_motion_blur, axis=-1)
kernel_motion_blur = tf.cast(kernel_motion_blur, tf.float32)


def random_flip_horizontal(image, boxes, prob=0.5):
    if tf.random.uniform(()) > prob:
        image = tf.image.flip_left_right(image)
        boxes = tf.stack(
            [1 - boxes[:, 2], boxes[:, 1], 1 - boxes[:, 0], boxes[:, 3]], axis=-1
        )
    return image, boxes


def random_adjust_contrast(image, prob=0.5):
    if tf.random.uniform(()) > prob:
        factor = tf.random.uniform((), 0.5, 2.0)
        return tf.image.adjust_contrast(image, factor)

    return image


def random_adjust_brightness(image, prob=0.5):
    if tf.random.uniform(()) > prob:
        return tf.image.random_brightness(image, 0.06)

    return image


def _gaussian_kernel(kernel_size, sigma, n_channels, dtype):
    x = tf.range(-kernel_size // 2 + 1, kernel_size // 2 + 1, dtype=dtype)
    g = tf.math.exp(-(tf.pow(x, 2) / (2 * tf.pow(tf.cast(sigma, dtype), 2))))
    g_norm2d = tf.pow(tf.reduce_sum(g), 2)
    g_kernel = tf.tensordot(g, g, axes=0) / g_norm2d
    g_kernel = tf.expand_dims(g_kernel, axis=-1)
    return tf.expand_dims(tf.tile(g_kernel, (1, 1, n_channels)), axis=-1)


def random_gaussian_blur(img, prob=0.9):
    if tf.random.uniform(()) > prob:
        img = tf.cast(img, dtype=tf.float32)
        if tf.random.uniform(()) > 0.5:
            kernel = _gaussian_kernel(7, 3, 3, img.dtype)
        else:
            kernel = kernel_motion_blur
        img = tf.nn.depthwise_conv2d(img[None], kernel, [1, 1, 1, 1], "SAME")

        return tf.cast(img[0], dtype=tf.uint8)

    return img
