Basic imports

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, Sequential, optimizers

Basic building block

In [None]:
class BasicBlock(layers.Layer):
  def __init__(self, filter_num, stride=1):
    super(BasicBlock,self).__init__()

    # padding='same' keeps the result at (X/stride) x (X/stride)
    self.conv1 = layers.Conv2D(filter_num, (3,3), strides=stride, padding = 'same')
    self.bn1 = layers.BatchNormalization()
    self.relu = layers.Activation('relu')
    # Size reduction by stride only once
    self.conv2 = layers.Conv2D(filter_num, (3,3), strides=1, padding = 'same')
    self.bn2 = layers.BatchNormalization()

    #Skip if stride =1 (saves computation time)
    if stride !=1:
      self.downsample = Sequential()
      self.downsample.add(layers.Conv2D(filter_num,(1,1),strides=stride,padding='same'))
      self.downsample.add(layers.BatchNormalization())
    else:
      self.downsample = lambda x: x
    
    self.stride=stride

  def call(self, inputs, training=None):
    
    # [b, h, w, c]
    out = self.conv1(inputs)
    out = self.bn1(out)
    out =  self.relu(out)
    out = self.conv2(out)
    out = self.bn2(out)

    # The residual
    identity = self.downsample(inputs)


    # Element-wise addition
    add = layers.add([out, identity])
    out = tf.nn.relu(add)
    return out



How to build ResBlocks from Basic Blocks

In [None]:
class ResNet(keras.Model):
  def __init__(self, layer_dims, num_classes=100): # [2, 2, 2, 2]
    super(ResNet,self).__init__()

    self.stem = Sequential([layers.Conv2D(64,(3,3),strides= (1,1)),
                            layers.BatchNormalization(),
                            layers.Activation('relu'),
                            layers.MaxPool2D(pool_size=(2, 2),strides=(1, 1),padding='same')])
    
    self.layer1= self._build_resblock(64, blocks=layer_dims[0])
    self.layer2= self._build_resblock(128, blocks=layer_dims[1],stride=2)
    self.layer3= self._build_resblock(256, blocks=layer_dims[2], stride= 2)
    self.layer4= self._build_resblock(512, blocks=layer_dims[3], stride = 2)

    # output: [b, 512, h, w]
    self.avgpool = layers.GlobalAveragePooling2D()
    self.fc=layers.Dense(num_classes)
    print(self.fc)

  def call(self, inputs, training = None):
    x=self.stem(inputs)

    x=self.layer1(x)
    x=self.layer2(x)
    x=self.layer3(x)
    x=self.layer4(x)

    # [b, c]
    x=self.avgpool(x)
    # [b, 100]
    x=self.fc(x)
    return x

  def _build_resblock(self,  filter_num,  blocks, stride=1):
    res_blocks = Sequential()
    res_blocks.add(BasicBlock(filter_num,stride))

    for _ in range(1, blocks):
      res_blocks.add(BasicBlock(filter_num,1))

    
    return res_blocks

  def resnet18():
    return ResNet([2, 2, 2, 2])

  def resnet34():
    return ResNet([3, 4, 6, 3])



Utilize it with Cifar100

In [None]:

tf.random.set_seed(2345)



model = ResNet.resnet18()
model.build(input_shape=(None,32,32,3))
optimizer = optimizers.Adam(lr=1e-3)

# Get dataset
(x, y), (x_test, y_test) = keras.datasets.cifar100.load_data()
y=tf.squeeze(y,axis=1)
y_test=tf.squeeze(y_test,axis=1)

def preprocess(x,y):
  x=tf.cast(x,dtype=tf.float32)/255.
  y=tf.cast(y,dtype=tf.int32)
  return x,y

#enter x,y in training function using the layers
train_db = tf.data.Dataset.from_tensor_slices((x,y))
train_db = train_db.shuffle(1000).map(preprocess).batch(64)

test_db = tf.data.Dataset.from_tensor_slices((x_test,y_test))
test_db = test_db.map(preprocess).batch(64)

sample=next(iter(train_db))



#run the training function for multiple epochs
for epoch in range(50):
  for step, (x,y) in enumerate(train_db):  
    
    with tf.GradientTape() as tape:

      # [b, 32,32,3] -> [b, 100]
      logits = model(x)
      #use 1-hot-encoding on y
      y_onehot=tf.one_hot(y,depth=100)
      #Calculate losses with crossentropy
      
      loss =tf.losses.categorical_crossentropy(y_onehot, logits, from_logits=True)
      loss=tf.reduce_mean(loss)

      
    #Use adam optimizer to apply gradients to the weights
    grads=tape.gradient(loss,model.trainable_variables)
    optimizer.apply_gradients(zip(grads,model.trainable_variables))

    if step%100==0:
      print(epoch,step,"loss:", float(loss))
  
  #test
  total_number=0
  total_correct=0
  for x,y in test_db:
    logits = model(x)
    #Calculate probabilities for each of the 100 categories
    prob=tf.nn.softmax(logits,axis=1)
    # Choose the category with the maximum value
    pred=tf.argmax(prob,axis=1)
    # Recast the answer into tf.int32
    pred=tf.cast(pred,dtype= tf.int32)

    correct=tf.cast(tf.equal(pred,y),dtype=tf.int32)
    correct=tf.reduce_sum(correct)

    total_correct+=int(correct)
    total_number+=x.shape[0]

  accuracy= total_correct/total_number

  print(epoch, "acc", accuracy)

<keras.layers.core.dense.Dense object at 0x7f331775cfd0>


  super(Adam, self).__init__(name, **kwargs)


0 0 loss: 4.613809585571289
0 100 loss: 4.607126712799072


KeyboardInterrupt: ignored