<a href="https://colab.research.google.com/github/rainmaker29/Advanced-Regression/blob/master/resnet50.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import tensorflow as tf
import numpy as np

import matplotlib
import matplotlib.pyplot as plt

%matplotlib inline

from __future__ import absolute_import, division, print_function, unicode_literals
from tensorflow.keras import datasets, layers, models
from tensorflow.keras import backend as K
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import Input
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, InputSpec
import tensorflow_addons as tfa

from tensorflow.keras.layers import BatchNormalization
BatchNormalization._USE_V2_BEHAVIOR = False

# Used to make the tensorflow_addons work (Used for Group Norm Operation)
!pip install typeguard



# Layers

In [None]:
class AdditionLayer(Layer):
  def __init__(self):
    super(self.__class__,self).__init__()
    self.add = tf.keras.layers.Add()
  
  @tf.function
  def call(self,input1,input2):
    return self.add([input1,input2])


In [None]:
class BatchNorm(Layer):
  def __init__(self,scale=True, center=True,name=None):
    super(self.__class__,self).__init__()
    self.bn = BatchNormalization(scale=True,center=True,trainable=True,name=name)

  @tf.function
  def call(self,inputs,training=True):
    return self.bn(inputs,training=training)

In [294]:
class Relu(Layer):
  
  def __init__(self,name='None'):
    super(self.__class__,self).__init__()
    # self.relu = tf.keras.layers.Activation('relu',name=name)
    self.relu = tf.nn.relu
    self._name = name

  
  @tf.function
  def call(self,inputs):
    return self.relu(inputs)

In [295]:
class Convolution(Layer):
  def __init__(self,filters,kernel_size,conv_name,bn_name,padding = 'valid',stride = (1,1),use_bias = True,train_bn = True):
    super(self.__class__,self).__init__()   
    self.conv = tf.keras.layers.Conv2D(filters=filters, kernel_size = kernel_size,name=conv_name
                                       , padding = padding, strides = stride,use_bias=use_bias)
    self.bn = BatchNorm(name=bn_name)
    self.train_bn=train_bn

  @tf.function
  def call(self,inputs):
    x = self.conv(inputs)
    return self.bn(x,training=self.train_bn)

In [331]:
class conv_block(Layer):
  def __init__(self,kernel_size,filters,stage,block,strides = (2,2),use_bias=True,train_bn=True):
    super(self.__class__,self).__init__()
    self.nb_filter1, self.nb_filter2, self.nb_filter3 = filters
    self.conv_name_base = 'res' + str(stage) + block + '_branch'
    self.bn_name_base = 'bn' + str(stage) + block + '_branch'
    self.strides = strides
    self.use_bias = use_bias
    self.train_bn = train_bn
    self.kernel_size = kernel_size
    self.stage = stage
    self.block=block

  @tf.function
  def call(self,inputs):
    x = Convolution(filters = self.nb_filter1, kernel_size = (1,1), conv_name = self.conv_name_base + '2a', 
                    stride=self.strides ,bn_name = self.bn_name_base + '2a',use_bias=self.use_bias,train_bn=self.train_bn)(inputs)
    x = Relu()(x)

    x = Convolution(filters = self.nb_filter2, kernel_size = (self.kernel_size,self.kernel_size), conv_name = self.conv_name_base + '2b',bn_name = self.bn_name_base + '2b',padding='same',use_bias=self.use_bias,train_bn=self.train_bn)(x)
    x = Relu()(x)

    x = Convolution(filters = self.nb_filter3, kernel_size = (1,1), conv_name = self.conv_name_base + '2c', 
                    bn_name = self.bn_name_base + '2c',use_bias=self.use_bias,train_bn=self.train_bn)(x)
    
    shortcut = Convolution(filters = self.nb_filter3, kernel_size = (1,1), conv_name = self.conv_name_base + '1', 
                    stride=self.strides ,bn_name = self.bn_name_base + '1',use_bias=self.use_bias,train_bn=self.train_bn)(inputs)
    

    x = AdditionLayer()(x,shortcut)

    x = Relu(name='res' + str(self.stage) + self.block + '_out')(x)

    return x
    

In [344]:
class identity_block(Layer):
  def __init__(self,kernel_size,filters,stage,block,use_bias=True,train_bn=True):
    super(self.__class__,self).__init__()
    self.nb_filter1, self.nb_filter2, self.nb_filter3 = filters
    self.conv_name_base = 'res' + str(stage) + block + '_branch'
    self.bn_name_base = 'bn' + str(stage) + block + '_branch'
    self.kernel_size = kernel_size
    self.stage = stage
    self.block = block

  @tf.function
  def call(self,inputs):
    x = Convolution(filters = self.nb_filter1, kernel_size = (1,1), conv_name = self.conv_name_base + '2a', 
                    bn_name = self.bn_name_base + '2a')(inputs)
    x = Relu()(x)

    x = Convolution(filters = self.nb_filter2, kernel_size = (self.kernel_size,self.kernel_size), conv_name = self.conv_name_base + '2b', 
                    bn_name = self.bn_name_base + '2b',padding='same')(x)
    x = Relu()(x)

    x = Convolution(filters = self.nb_filter3, kernel_size = (1,1), conv_name = self.conv_name_base + '2c', 
                    bn_name = self.bn_name_base + '2c')(x)
    x = AdditionLayer()(x,inputs)

    x = Relu(name='res' + str(self.stage) + self.block + '_out')(x)

    return x
    

# Resnet Graph

In [386]:
class Resnet(Model):
  def __init__(self,architecture,stage_5=False,train_bn=True):
    super(self.__class__,self).__init__()

    assert architecture in ['resnet50','resnet101']
    self.C1,self.C2,self.C3,self.C4,self.C5 = None , None , None , None , None
  
    #stage 1
    self.l1 = tf.keras.layers.ZeroPadding2D((3,3))
    self.l2 = Convolution(filters=64,kernel_size=(7,7),conv_name='conv1',bn_name='conv1',use_bias=True,train_bn=True)
    self.l3 = Relu()

    self.l4 = tf.keras.layers.MaxPool2D((3,3),strides=(2,2),padding='same')

    #stage 2
    self.l5 = conv_block(kernel_size=3,filters=[64,64,256],stage=2,block='a',strides=(1,1),train_bn=train_bn)
    self.l6 = identity_block(kernel_size=3,filters=[64,64,256],stage=2,block='b',train_bn=train_bn)
    self.l7 = identity_block(kernel_size=3,filters=[64,64,256],stage=2,block='c',train_bn=train_bn)

    #stage 3
    self.l8 = conv_block(kernel_size=3,filters=[128,128,512],stage=3,block='a',train_bn=train_bn)
    self.l9 = identity_block(kernel_size=3,filters=[128,128,512],stage=3,block='a',train_bn=train_bn)
    self.l10 = identity_block(kernel_size=3,filters=[128,128,512],stage=3,block='a',train_bn=train_bn)
    self.l11 = identity_block(kernel_size=3,filters=[128,128,512],stage=3,block='a',train_bn=train_bn)

    #stage 4
    self.l12 = conv_block(kernel_size=3,filters=[256,256,1024],stage=4,block='a',train_bn=train_bn)
    self.block_count = {"resnet50": 5, "resnet101": 22}[architecture]
    self.arch_wise_layers = list()
    
    for i in range(self.block_count):
      self.arch_wise_layers.append(identity_block(kernel_size=3,filters=[256,256,1024],stage=4,block=chr(98 + i),train_bn=train_bn))
    
    #stage5
    self.stage5=False
    if stage_5:
      self.stage5=True
      
      self.s5_l1 = conv_block(kernel_size=3,filters=[512,512,2048],stage=5,block='a',train_bn = train_bn)
      self.s5_l2 = identity_block(kernel_size=3,filters=[512,512,2048],stage=5,block='b',train_bn = train_bn)
      self.s5_l3 = identity_block(kernel_size=3,filters=[512,512,2048],stage=5,block='c',train_bn = train_bn)

  def build(self,input_shape):
    super(self.__class__,self).build(input_shape)

  @tf.function
  def call(self,inputs):
    #stage 1
    x = self.l1(inputs)
    print('shape check')
    print(x.shape)
    
    x = self.l2(x)

    print('shap check')
    print(x.shape)

    x = self.l3(x)
    print('shape check')
    print(x.shape)

    x = self.l4(x)
    print('shape check')
    print(x.shape)
    self.C1 = x

    #stage 2
    x = self.l5(x)
    print('shape check')
    print(x.shape)

    x = self.l6(x)
    print('shape check')
    print(x.shape)

    x = self.l7(x)
    print('shape check')
    print(x.shape)

    self.C2 = x

    #stage 3
    x = self.l8(x)
    print('shape check')
    print(x.shape)

    x = self.l9(x)
    print('shape check')
    print(x.shape)

    x = self.l10(x)
    print('shape check')
    print(x.shape)

    x = self.l11(x)
    print('shape check')
    print(x.shape)

    self.C3 = x

    #stage 4
    x = self.l12(x)
    print('shape check')
    print(x.shape)

    # return x
    for l in self.arch_wise_layers:
      x = l(x)
      print('shape-l check')
      print(x.shape)
    self.C4 = x

    #stage 5
    if self.stage5:
      x = self.s5_l1(x)
      x = self.s5_l2(x)
      x = self.s5_l3(x)
      self.C5 = x

    print(type(x))
    return x








In [387]:
m = Resnet('resnet50')

In [388]:
s = (20, 224, 224, 3)
nx = np.random.rand(*s).astype(np.float32)/ 255
print(nx.shape)

m._set_inputs(nx)

train_dataset = tf.data.Dataset.from_tensor_slices(nx)
train_dataset = train_dataset.shuffle(buffer_size=10).batch(5)

indices = [0, 1, 2, 3, 4]
depth = 14
sample_labels = tf.one_hot(indices, depth)
print(sample_labels.shape)

(20, 224, 224, 3)
(5, 14)


In [389]:
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-2)

In [390]:
m.call(z)

shape check
(20, 230, 230, 3)
shap check
(20, 224, 224, 64)
shape check
(20, 224, 224, 64)
shape check
(20, 112, 112, 64)
shape check
(20, 112, 112, 256)
shape check
(20, 112, 112, 256)
shape check
(20, 112, 112, 256)
shape check
(20, 56, 56, 512)
shape check
(20, 56, 56, 512)
shape check
(20, 56, 56, 512)
shape check
(20, 56, 56, 512)
shape check
(20, 28, 28, 1024)
shape-l check
(20, 28, 28, 1024)
shape-l check
(20, 28, 28, 1024)
shape-l check
(20, 28, 28, 1024)
shape-l check
(20, 28, 28, 1024)
shape-l check
(20, 28, 28, 1024)
<class 'tensorflow.python.framework.ops.Tensor'>


FailedPreconditionError: ignored

In [391]:
epochs = 2
loss_metric = tf.keras.metrics.Mean()

# Iterate over epochs.
for epoch in range(epochs):
    print('Start of epoch %d' % (epoch,))

    # Iterate over the batches of the dataset.
    for step, x_batch_train in enumerate(train_dataset):
        with tf.GradientTape() as tape:
            x = m(x_batch_train)
            print(x.shape)
            
            
            # Compute reconstruction loss
            loss = tf.nn.softmax_cross_entropy_with_logits(logits=x, labels=sample_labels)
            print(loss)

        grads = tape.gradient(loss, m.trainable_weights)
        optimizer.apply_gradients(zip(grads, m.trainable_weights))

        loss_metric(loss)

        if step % 100 == 0:
            print('step %s: mean loss = %s' % (step, loss_metric.result()))
            
# NOTE: tf2test/MobilenetV2 is directory path, "checkpoint" at the end is for the name of checkpoint
# m.save_weights('../../tf2test/MobilenetV2/MobilenetV2')

Start of epoch 0
shape check
(5, 230, 230, 3)
shap check
(5, 224, 224, 64)
shape check
(5, 224, 224, 64)
shape check
(5, 112, 112, 64)
shape check
(5, 112, 112, 256)
shape check
(5, 112, 112, 256)
shape check
(5, 112, 112, 256)
shape check
(5, 56, 56, 512)
shape check
(5, 56, 56, 512)
shape check
(5, 56, 56, 512)
shape check
(5, 56, 56, 512)
shape check
(5, 28, 28, 1024)
shape-l check
(5, 28, 28, 1024)
shape-l check
(5, 28, 28, 1024)
shape-l check
(5, 28, 28, 1024)
shape-l check
(5, 28, 28, 1024)
shape-l check
(5, 28, 28, 1024)
<class 'tensorflow.python.framework.ops.Tensor'>
shape check
(5, 230, 230, 3)
shap check
(5, 224, 224, 64)
shape check
(5, 224, 224, 64)
shape check
(5, 112, 112, 64)
shape check
(5, 112, 112, 256)
shape check
(5, 112, 112, 256)
shape check
(5, 112, 112, 256)
shape check
(5, 56, 56, 512)
shape check
(5, 56, 56, 512)
shape check
(5, 56, 56, 512)
shape check
(5, 56, 56, 512)
shape check
(5, 28, 28, 1024)
shape-l check
(5, 28, 28, 1024)
shape-l check
(5, 28, 28, 10

FailedPreconditionError: ignored

In [374]:
z = tf.constant(nx)