In [1]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.initializers import RandomNormal, Constant
from tensorflow.keras.layers import (Input,
                                     Conv2D, 
                                     Concatenate,
                                     BatchNormalization,
                                     Lambda,
                                     ReLU,
                                     Reshape,
                                     Add)
from tensorflow.keras import backend as K
print('TensorFlow:', tf.__version__)

TensorFlow: 2.0.0


In [2]:
def conv_block(input_tensor=None,
               filters=None,
               kernel_size=None,
               strides=1,
               padding='same',
               kernel_init='he_normal',
               bias_init='zeros',
               bn_act=True,
               name_prefix=None):
    
    _x = Conv2D(filters=filters, kernel_size=kernel_size,
                padding=padding, strides=strides,
                kernel_initializer=kernel_init,
                bias_initializer=bias_init,
                name='{}_conv_{}x{}'.format(name_prefix,
                                            kernel_size,
                                            kernel_size))(input_tensor)
    if bn_act:
        _x = BatchNormalization(
            name='{}_bn'.format(name_prefix))(_x)
        _x = ReLU(name='{}_relu'.format(name_prefix))(_x)
    return _x


def upsample_like(input_tensor, target_tensor, name=None):
    _, fh, fw, _ = target_tensor.shape
    _upsampled_tensor = tf.image.resize(input_tensor,
                                        size=[fh, fw],
                                        method='nearest', 
                                        name=name)
    return _upsampled_tensor



class Scale(tf.keras.layers.Layer):
    def __init__(self, init_value=1.0, **kwargs):
        super(Scale, self).__init__(**kwargs)
        self.init_value = init_value

    def build(self, input_shape):
        self.scale = \
            self.add_weight(name='scale',
                            shape=[1],
                            dtype=K.floatx(),
                            trainable=True,
                            initializer=Constant(value=self.init_value))

    def call(self, x):
        scaled_inputs = tf.multiply(self.scale, x)
        return tf.exp(scaled_inputs)

    def compute_output_shape(self, input_shape):
        return input_shape

    def get_config(self):
        config = super(Scale, self).get_config()
        return config

In [12]:
class FCOS:
    def __init__(self, config):
        self._validate_config(config)
        for attr in config:
            setattr(self, attr, config[attr])
        self._build_fpn()
        self._build_model()
        self._build_datasets()
        self._build_optimizer()
        self._build_callbacks()

    def _validate_config(self, config):
        attr_list = [
            'mode',
            'distribute_strategy',
            'image_height',
            'image_width',
            'num_classes',
            'data_dir',
            'dataset_fn',
            'batch_size',
            'epochs',
            'learning_rate',
            'model_dir',
            'tensorboard_log_dir'
        ]
        for attr in attr_list:
            assert attr in config, 'Missing {} in config'.format(attr)

    def _build_fpn(self):
        '''
            From the FPN paper, "To start the iteration, we simply attach a
            1×1 convolutional layer on C5 to produce the coarsest resolution
            map. Finally, we append a 3×3 convolution on each merged map to
            generate the final feature map, which is to reduce the aliasing
            effect of upsampling. This final set of feature maps is called
            {P2, P3, P4, P5}, corresponding to {C2, C3, C4, C5} that are
            respectively of the same spatial sizes".
            From the FCOS paper, "P6 and P7 are produced by applying one
            convolutional layer with the stride being 2 on P5 and P6,
            respectively".
        '''
        with self.distribute_strategy.scope():
            print('****Building FPN')
            self._backbone = tf.keras.applications.ResNet50V2(
                input_shape=[self.image_height, self.image_width, 3],
                weights='imagenet',
                include_top=False)
            C5 = self._backbone.get_layer('post_relu').output
            C4 = self._backbone.get_layer('conv4_block6_1_relu').output
            C3 = self._backbone.get_layer('conv3_block4_1_relu').output

            M5 = conv_block(C5, 256, 1, bn_act=False, name_prefix='C5')
            P5 = conv_block(M5, 256, 3, bn_act=False, name_prefix='P5')
            M5_upsampled = upsample_like(M5, C4, name='M5_upsampled')

            M4 = conv_block(C4, 256, 1, bn_act=False, name_prefix='C4')
            M4 = tf.keras.layers.Add(name='M4_M5_add')([M4, M5_upsampled])
            P4 = conv_block(M4, 256, 3, bn_act=False, name_prefix='P4')
            M4_upsampled = upsample_like(M4, C3, name='M4_upsampled')

            M3 = conv_block(C3, 256, 1, bn_act=False, name_prefix='C3')
            P3 = Add(name='M3_M4_add')([M3, M4_upsampled])
            P3 = conv_block(P3, 256, 3, bn_act=False, name_prefix='P3')

            P6 = conv_block(P5, 256, 3, 2, bn_act=False, name_prefix='P6')
            P6_relu = ReLU(name='P6_relu')(P6)
            P7 = conv_block(P6_relu, 256, 3, 2, bn_act=False, name_prefix='P7')

            self._pyramid_features = {
                'P3': P3,
                'P4': P4,
                'P5': P5,
                'P6': P6,
                'P7': P7
            }

    def _get_classification_head(self, p=0.01):
        kernel_init = RandomNormal(0.0, 0.01)
        bias_init = Constant(-np.log((1 - p) / p))

        input_layer = Input(shape=[None, None, 256])
        x = input_layer

        for i in range(4):
            x = conv_block(x, 256, 3, kernel_init=kernel_init,
                           name_prefix='c_head_{}'.format(i))
        classification_logits = conv_block(x, self.num_classes,
                                           3, kernel_init=kernel_init,
                                           bias_init=bias_init, bn_act=False,
                                           name_prefix='cls_logits')
        centerness_logits = conv_block(x, 1, 3,
                                       kernel_init=kernel_init, bn_act=False,
                                       name_prefix='ctr_logits')
        classification_logits = Reshape(
            target_shape=[-1, self.num_classes])(classification_logits)
        centerness_logits = Reshape(target_shape=[-1, 1])(centerness_logits)

        outputs = [classification_logits, centerness_logits]
        return tf.keras.Model(inputs=[input_layer],
                              outputs=[outputs],
                              name='classification_head')

    def _get_regression_head(self):
        '''
            From the FCOS paper, "since the regression targets are always
            positive we employ exp(x) to map any real number to (0, ∞) on
            the top of the regression branch"
        '''
        kernel_init = RandomNormal(0.0, 0.01)
        input_layer = Input(shape=[None, None, 256])
        x = input_layer

        for i in range(4):
            x = conv_block(x, 256, 3, kernel_init=kernel_init,
                           name_prefix='r_head_{}'.format(i))
        regression_logits = conv_block(x, 4, 3, kernel_init=kernel_init,
                                       bn_act=False, name_prefix='reg_logits')
        regression_logits = Reshape(target_shape=[-1, 4])(regression_logits)
        return tf.keras.Model(inputs=[input_layer],
                              outputs=[regression_logits],
                              name='regression_head')

    def _build_model(self):
        with self.distribute_strategy.scope():
            print('****Building FCOS')
            self._classification_head = self._get_classification_head()
            self._regression_head = self._get_regression_head()

            self._classification_logits = []
            self._centerness_logits = []
            self._regression_logits = []

            for i in range(3, 8):
                feature = self._pyramid_features['P{}'.format(i)]
                _cls_head_logits = self._classification_head(feature)
                _reg_head_logits = self._regression_head(feature)
                _reg_head_logits = \
                    Scale(init_value=1.0,
                          name='P{}_reg_outputs'.format(i))(_reg_head_logits)

                self._classification_logits.append(_cls_head_logits[0][0])
                self._centerness_logits.append(_cls_head_logits[0][1])
                self._regression_logits.append(_reg_head_logits)

            self._classification_logits = Concatenate(
                axis=1,
                name='classification_outputs')(self._classification_logits)
            self._centerness_logits = Concatenate(
                axis=1, name='centerness_outputs')(self._centerness_logits)
            self._regression_logits = Concatenate(
                axis=1, name='regression_outputs')(self._regression_logits)

            _image_input = self._backbone.input
            outputs = [self._classification_logits,
                       self._centerness_logits,
                       self._regression_logits]
            self.model = tf.keras.Model(
                inputs=[_image_input], outputs=outputs, name='FCOS')
            self.model.build([self.image_height, self.image_width, 3])

    def _build_datasets(self):
        print('****Building Datasets')
        with self.distribute_strategy.scope():
            self.train_dataset, self.val_dataset =  \
                self.dataset_fn(self.image_height,
                                self.image_width,
                                self.data_dir,
                                self.batch_size)

    def _build_callbacks(self):
        print('****Setting Up Callbacks')
        self.callbacks = [
            TensorBoard(log_dir=self.tensorboard_log_dir),class FCOS:
    def __init__(self, config):
        self._validate_config(config)
        for attr in config:
            setattr(self, attr, config[attr])
        self._build_fpn()
        self._build_model()
#         self._build_datasets()

    def _validate_config(self, config):
        attr_list = [
            'mode',
            'distribute_strategy',
            'image_height',
            'image_width',
            'num_classes',
            'data_dir',
            'dataset_fn',
            'batch_size',
            'epochs',
            'learning_rate',
            'model_dir',
            'tensorboard_log_dir'
        ]
        for attr in attr_list:
            assert attr in config, 'Missing {} in config'.format(attr)

    def _build_fpn(self):
        '''
            From the FPN paper, "To start the iteration, we simply attach a
            1×1 convolutional layer on C5 to produce the coarsest resolution
            map. Finally, we append a 3×3 convolution on each merged map to
            generate the final feature map, which is to reduce the aliasing
            effect of upsampling. This final set of feature maps is called
            {P2, P3, P4, P5}, corresponding to {C2, C3, C4, C5} that are
            respectively of the same spatial sizes".
            From the FCOS paper, "P6 and P7 are produced by applying one
            convolutional layer with the stride being 2 on P5 and P6,
            respectively".
        '''
        with self.distribute_strategy.scope():
            print('****Building FPN')
            self._backbone = tf.keras.applications.ResNet50V2(
                input_shape=[self.image_height, self.image_width, 3],
                weights='imagenet',
                include_top=False)
            C5 = self._backbone.get_layer('post_relu').output
            C4 = self._backbone.get_layer('conv4_block6_1_relu').output
            C3 = self._backbone.get_layer('conv3_block4_1_relu').output

            M5 = conv_block(C5, 256, 1, bn_act=False, name_prefix='C5')
            P5 = conv_block(M5, 256, 3, bn_act=False, name_prefix='P5')
            M5_upsampled = upsample_like(M5, C4, name='M5_upsampled')

            M4 = conv_block(C4, 256, 1, bn_act=False, name_prefix='C4')
            M4 = tf.keras.layers.Add(name='M4_M5_add')([M4, M5_upsampled])
            P4 = conv_block(M4, 256, 3, bn_act=False, name_prefix='P4')
            M4_upsampled = upsample_like(M4, C3, name='M4_upsampled')

            M3 = conv_block(C3, 256, 1, bn_act=False, name_prefix='C3')
            P3 = Add(name='M3_M4_add')([M3, M4_upsampled])
            P3 = conv_block(P3, 256, 3, bn_act=False, name_prefix='P3')

            P6 = conv_block(P5, 256, 3, 2, bn_act=False, name_prefix='P6')
            P6_relu = ReLU(name='P6_relu')(P6)
            P7 = conv_block(P6_relu, 256, 3, 2, bn_act=False, name_prefix='P7')

            self._pyramid_features = {
                'P3': P3,
                'P4': P4,
                'P5': P5,
                'P6': P6,
                'P7': P7
            }

    def _get_classification_head(self, p=0.01):
        kernel_init = RandomNormal(0.0, 0.01)
        bias_init = Constant(-np.log((1 - p) / p))

        input_layer = Input(shape=[None, None, 256])
        x = input_layer

        for i in range(4):
            x = conv_block(x, 256, 3, kernel_init=kernel_init,
                           name_prefix='c_head_{}'.format(i))
        classification_logits = conv_block(x, self.num_classes,
                                           3, kernel_init=kernel_init,
                                           bias_init=bias_init, bn_act=False,
                                           name_prefix='cls_logits')
        centerness_logits = conv_block(x, 1, 3,
                                       kernel_init=kernel_init, bn_act=False,
                                       name_prefix='ctr_logits')
        classification_logits = Reshape(
            target_shape=[-1, self.num_classes])(classification_logits)
        centerness_logits = Reshape(target_shape=[-1, 1])(centerness_logits)

        outputs = [classification_logits, centerness_logits]
        return tf.keras.Model(inputs=[input_layer],
                              outputs=[outputs],
                              name='classification_head')

    def _get_regression_head(self):
        '''
            From the FCOS paper, "since the regression targets are always
            positive we employ exp(x) to map any real number to (0, ∞) on
            the top of the regression branch"
        '''
        kernel_init = RandomNormal(0.0, 0.01)
        input_layer = Input(shape=[None, None, 256])
        x = input_layer

        for i in range(4):
            x = conv_block(x, 256, 3, kernel_init=kernel_init,
                           name_prefix='r_head_{}'.format(i))
        regression_logits = conv_block(x, 4, 3, kernel_init=kernel_init,
                                       bn_act=False, name_prefix='reg_logits')
        regression_logits = Reshape(target_shape=[-1, 4])(regression_logits)
        return tf.keras.Model(inputs=[input_layer],
                              outputs=[regression_logits],
                              name='regression_head')

    def _get_predictions_decoder(self):
        # TODO
        pass

    def _build_model(self):
        with self.distribute_strategy.scope():
            print('****Building FCOS')
            self._classification_head = self._get_classification_head()
            self._regression_head = self._get_regression_head()

            self._classification_logits = []
            self._centerness_logits = []
            self._regression_logits = []

            for i in range(3, 8):
                feature = self._pyramid_features['P{}'.format(i)]
                _cls_head_logits = self._classification_head(feature)
                _reg_head_logits = self._regression_head(feature)
                _reg_head_logits = \
                    Scale(init_value=1.0,
                          name='P{}_reg_outputs'.format(i))(_reg_head_logits)

                self._classification_logits.append(_cls_head_logits[0][0])
                self._centerness_logits.append(_cls_head_logits[0][1])
                self._regression_logits.append(_reg_head_logits)

            self._classification_logits = Concatenate(
                axis=1,
                name='classification_outputs')(self._classification_logits)
            self._centerness_logits = Concatenate(
                axis=1, name='centerness_outputs')(self._centerness_logits)
            self._regression_logits = Concatenate(
                axis=1, name='regression_outputs')(self._regression_logits)

            _image_input = self._backbone.input
            outputs = [self._classification_logits,
                       self._centerness_logits,
                       self._regression_logits]
            self.model = tf.keras.Model(
                inputs=[_image_input], outputs=outputs, name='FCOS')
            self.model.build([self.image_height, self.image_width, 3])

    def _build_datasets(self):
        print('****Building Datasets')
        with self.distribute_strategy.scope():
            self.train_dataset, self.val_dataset =  \
                self.dataset_fn(self.image_height,
                                self.image_width,
                                self.data_dir,
                                self.batch_size)

    def __call__(self):
        # TODO
        pass

    def _classification_loss(self, alpha=0.25, gamma=2):
        # TODO
        #   a) mask negative locations
        #   b) normalize loss value
        def focal_loss(y_true, y_pred):
            y_true = tf.one_hot(
                tf.cast(y_true, dtype=tf.int32), depth=self.num_classes + 1)
            y_true = y_true[:, :, 1:]
            y_pred_ = tf.sigmoid(y_pred)

            at = alpha * y_true + (1 - y_true) * (1 - alpha)
            pt = y_true * y_pred_ + (1 - y_true) * (1 - y_pred_)
            f_loss = at * \
                tf.pow(1 - pt, gamma) * \
                tf.nn.sigmoid_cross_entropy_with_logits(
                    labels=y_true, logits=y_pred)
            return f_loss
        return focal_loss

    def _centerness_loss(self, labels, logits):
        # TODO
        #   a) mask negative locations
        #   b) normalize loss value
        bce_loss = tf.nn.sigmoid_cross_entropy_with_logits(
            labels=labels, logits=logits)
        return bce_loss

    def _regression_loss(self, labels, logits):
        # TODO
        #   a) IOU loss
        #   b) mask negative locations
        #   c) normalize loss value
        pass
            ModelCheckpoint(filepath=self.model_dir + '/ckpt-{epoch:02d}',
                            monitor='val_loss',
                            save_weights_only=True,
                            save_best_only=True)
        ]

    def _build_optimizer(self):
        print('****Setting Up Optimizer')
        self.optimizer = tf.keras.optimizers.Adam(lr=self.learning_rate)

    def _classification_loss(self, alpha=0.25, gamma=2):
        # TODO
        #   a) mask negative locations
        #   b) normalize loss value
        def focal_loss(y_true, y_pred):
            y_true = tf.one_hot(
                tf.cast(y_true, dtype=tf.int32), depth=self.num_classes + 1)
            y_true = y_true[:, :, 1:]
            y_pred_ = tf.sigmoid(y_pred)

            at = alpha * y_true + (1 - y_true) * (1 - alpha)
            pt = y_true * y_pred_ + (1 - y_true) * (1 - y_pred_)
            f_loss = at * \
                tf.pow(1 - pt, gamma) * \
                tf.nn.sigmoid_cross_entropy_with_logits(
                    labels=y_true, logits=y_pred)
            return f_loss
        return focal_loss

    def _centerness_loss(self, labels, logits):
        # TODO
        #   a) mask negative locations
        #   b) normalize loss value
        bce_loss = tf.nn.sigmoid_cross_entropy_with_logits(
            labels=labels, logits=logits)
        return bce_loss

    def _regression_loss(self, labels, logits):
        # TODO
        #   a) IOU loss
        #   b) mask negative locations
        #   c) normalize loss value
        pass

    def train(self):
        loss_dict = {
            'classification_outputs': self._classification_loss(alpha=0.25,
                                                                gamma=2),
            'centerness_outputs': self._centerness_loss,
            'regression_outputs': self._regression_loss
        }
        with self.distribute_strategy.scope():
            self.model.compile(optimizer=self.optimizer,
                               loss=loss_dict)
            # self.model.fit(...)

In [13]:
config = {
    'mode': 'train',
    'distribute_strategy': tf.distribute.OneDeviceStrategy(device='/cpu:0'),
    'image_height': 720,
    'image_width': 1280,
    'num_classes': 10,
    'dataset_fn': None,
    'data_dir': '../tfrecords',
    'batch_size': 4,
    'epochs': 250,
    'learning_rate': 1e-4,
    'model_dir': 'model_files',
    'tensorboard_log_dir': 'logs'
}

In [14]:
fcos = FCOS(config)
dummy_tensor = tf.random.normal(shape=[1, 720, 1280, 3])
dummy_output = fcos.model(dummy_tensor, training=False)

****Building FPN
****Building FCOS


In [15]:
dummy_output

[<tf.Tensor: id=25365, shape=(1, 19220, 10), dtype=float32, numpy=
 array([[[-4.5939984, -4.5933757, -4.6110363, ..., -4.5984144,
          -4.601757 , -4.583133 ],
         [-4.599728 , -4.5907536, -4.6028256, ..., -4.6014833,
          -4.602731 , -4.5780544],
         [-4.600538 , -4.5935106, -4.601371 , ..., -4.6003246,
          -4.603682 , -4.5814824],
         ...,
         [-4.5972333, -4.5994215, -4.592283 , ..., -4.596968 ,
          -4.5902376, -4.603656 ],
         [-4.5970044, -4.5971713, -4.591698 , ..., -4.5915704,
          -4.587244 , -4.5974708],
         [-4.5913167, -4.592581 , -4.587546 , ..., -4.5954504,
          -4.5853906, -4.597687 ]]], dtype=float32)>,
 <tf.Tensor: id=25363, shape=(1, 19220, 1), dtype=float32, numpy=
 array([[[-0.00088442],
         [-0.0060561 ],
         [-0.00249656],
         ...,
         [-0.01048212],
         [-0.00427284],
         [-0.01161309]]], dtype=float32)>,
 <tf.Tensor: id=25361, shape=(1, 19220, 4), dtype=float32, numpy=
 ar

In [16]:
fcos.model.outputs

[<tf.Tensor 'classification_outputs_1/Identity:0' shape=(None, None, 10) dtype=float32>,
 <tf.Tensor 'centerness_outputs_1/Identity:0' shape=(None, None, 1) dtype=float32>,
 <tf.Tensor 'regression_outputs_1/Identity:0' shape=(None, None, 4) dtype=float32>]

In [17]:
fcos._regression_head.summary(), fcos._classification_head.summary()

Model: "regression_head"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_6 (InputLayer)         [(None, None, None, 256)] 0         
_________________________________________________________________
r_head_0_conv_3x3 (Conv2D)   multiple                  590080    
_________________________________________________________________
r_head_0_bn (BatchNormalizat multiple                  1024      
_________________________________________________________________
r_head_0_relu (ReLU)         multiple                  0         
_________________________________________________________________
r_head_1_conv_3x3 (Conv2D)   multiple                  590080    
_________________________________________________________________
r_head_1_bn (BatchNormalizat multiple                  1024      
_________________________________________________________________
r_head_1_relu (ReLU)         multiple              

(None, None)

In [18]:
fcos._pyramid_features

{'P3': <tf.Tensor 'P3_conv_3x3_1/Identity:0' shape=(None, 90, 160, 256) dtype=float32>,
 'P4': <tf.Tensor 'P4_conv_3x3_1/Identity:0' shape=(None, 45, 80, 256) dtype=float32>,
 'P5': <tf.Tensor 'P5_conv_3x3_1/Identity:0' shape=(None, 23, 40, 256) dtype=float32>,
 'P6': <tf.Tensor 'P6_conv_3x3_1/Identity:0' shape=(None, 12, 20, 256) dtype=float32>,
 'P7': <tf.Tensor 'P7_conv_3x3_1/Identity:0' shape=(None, 6, 10, 256) dtype=float32>}

In [19]:
fcos.model.summary()

Model: "FCOS"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_4 (InputLayer)            [(None, 720, 1280, 3 0                                            
__________________________________________________________________________________________________
conv1_pad (ZeroPadding2D)       (None, 726, 1286, 3) 0           input_4[0][0]                    
__________________________________________________________________________________________________
conv1_conv (Conv2D)             (None, 360, 640, 64) 9472        conv1_pad[0][0]                  
__________________________________________________________________________________________________
pool1_pad (ZeroPadding2D)       (None, 362, 642, 64) 0           conv1_conv[0][0]                 
_______________________________________________________________________________________________