<a href="https://colab.research.google.com/github/rawdhikagupta/Texture-Analysis-Model/blob/main/DeepResidualEncoding.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import tensorflow as tf
import matplotlib.pyplot as plt

In [None]:
# load dataset
path = 'https://www.robots.ox.ac.uk/~vgg/data/dtd/download/dtd-r1.0.1.tar.gz'
path_to_file = tf.keras.utils.get_file("dtd-r1.0.1_data", path, archive_format = 'tar', untar = True, extract=True)

# extracted file is dtd/images in datasets dir
path_to_dir = "/root/.keras/datasets/dtd/images"

ds_train = tf.keras.utils.image_dataset_from_directory(
    path_to_dir, subset= 'training', validation_split = 0.3, seed = 42, shuffle = True, label_mode = 'int', batch_size=32, image_size=(448,448)
)


ds_test = tf.keras.utils.image_dataset_from_directory(
    path_to_dir, subset= 'validation', validation_split = 0.3, seed = 42, shuffle =True, label_mode ='int', batch_size = 32, image_size=(448,448)
)

#ds_val = ds_test.take(ds_test.cardinality()//2)
#ds_test = ds_test.skip(ds_test.cardinality()//2)

num_classes=  len(ds_train.class_names)

print("Number of train batches: ", ds_train.cardinality().numpy())
print("Number of test batches: ", ds_test.cardinality().numpy())
#print("Number of validation batches: ", ds_val.cardinality().numpy())

def pre_processing(image, label):
  image = tf.keras.applications.resnet.preprocess_input(image)
  return image, label

ds_train = ds_train.map(pre_processing, num_parallel_calls = tf.data.AUTOTUNE)
ds_test = ds_test.map(pre_processing, num_parallel_calls= tf.data.AUTOTUNE)
#ds_val = ds_val.map(pre_processing, num_parallel_calls=tf.data.AUTOTUNE)

def tfds_to_tensor(ds):
  images = []
  labels = []
  for image, label in ds:
    images.append(image)
    labels.append(label)
  return tf.concat(images, axis=0), tf.concat(labels, axis=0)

Downloading data from https://www.robots.ox.ac.uk/~vgg/data/dtd/download/dtd-r1.0.1.tar.gz
Found 5640 files belonging to 47 classes.
Using 3948 files for training.
Found 5640 files belonging to 47 classes.
Using 1692 files for validation.
Number of train batches:  124
Number of test batches:  53


In [None]:
def residualnet(basemodel, num_classes, dropout_rate, layer_index=None):
  if layer_index is not None:
    x = basemodel.output
    y = basemodel.get_layer(index = layer_index).output
    #--------------Convolutional Transfer-----------------------------
    ctm = tf.keras.layers.Conv2D(basemodel.output_shape[-1], kernel_size= 1)(x)
    ctm = tf.keras.layers.Dropout(dropout_rate)(ctm)
    ctm = tf.keras.layers.BatchNormalization()(ctm)
    ctm = tf.keras.layers.Activation(activation='sigmoid')(ctm)

    ctm_y = tf.keras.layers.Conv2D(basemodel.get_layer(index= layer_index).output_shape[-1], kernel_size= 1)(y)
    ctm_y = tf.keras.layers.Dropout(dropout_rate)(ctm_y)
    ctm_y = tf.keras.layers.BatchNormalization()(ctm_y)
    ctm_y = tf.keras.layers.Activation(activation='sigmoid')(ctm_y)
    #--------------Residual Calculation-------------------------------
    x = tf.keras.layers.Activation(activation='sigmoid')(x)
    res = tf.keras.layers.Lambda(lambda v: tf.subtract(v[1], v[0]))([x, ctm])

    y = tf.keras.layers.Activation(activation='sigmoid')(y)
    res_y = tf.keras.layers.Lambda(lambda v: tf.subtract(v[1], v[0]))([y, ctm_y])
    #--------------Aggregation----------------------------------------
    x = tf.keras.layers.Activation(activation='relu')(res)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation(activation='relu')(x)
    x = tf.keras.layers.GlobalAveragePooling2D()(x)

    y = tf.keras.layers.Activation(activation='relu')(res_y)
    y = tf.keras.layers.BatchNormalization()(y)
    y = tf.keras.layers.Activation(activation='relu')(y)
    y = tf.keras.layers.GlobalAveragePooling2D()(y)
    #-----------------------------------------------------------------
    x = tf.keras.layers.Concatenate()([x, y])
    x = tf.keras.layers.Dense(num_classes, activation='softmax')(x)
    model = tf.keras.Model(inputs=basemodel.input, outputs=x)
  else:
    x = basemodel.output
    #--------------Convolutional Transfer-----------------------------
    ctm = tf.keras.layers.Conv2D(basemodel.output_shape[-1], kernel_size= 1)(x)
    ctm = tf.keras.layers.Dropout(dropout_rate)(ctm)
    ctm = tf.keras.layers.BatchNormalization()(ctm)
    ctm = tf.keras.layers.Activation(activation='sigmoid')(ctm)
    #--------------Residual Calculation-------------------------------
    x = tf.keras.layers.Activation(activation='sigmoid')(x)
    res = tf.keras.layers.Lambda(lambda v: tf.subtract(v[1], v[0]))([x, ctm])
    #--------------Aggregation----------------------------------------
    x = tf.keras.layers.Activation(activation='relu')(res)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation(activation='relu')(x)
    x = tf.keras.layers.GlobalAveragePooling2D()(x)
    #-----------------------------------------------------------------
    x = tf.keras.layers.Dense(num_classes, activation='softmax')(x)
    model = tf.keras.Model(inputs=basemodel.input, outputs=x)
  return model

In [None]:
backbone_model = tf.keras.applications.resnet50.ResNet50(include_top=False, weights='imagenet', input_shape=(448,448,3))
backbone_model.trainable = False 

model_1= residualnet(backbone_model, 47, 0.6)
model_1.compile(
    optimizer = tf.keras.optimizers.Adam(),
    loss = tf.keras.losses.SparseCategoricalCrossentropy(),
    metrics=['accuracy']
)

model_1.fit(ds_train, epochs=15, validation_data=ds_test)

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5
Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15


<keras.callbacks.History at 0x7f74bbc0afd0>

In [None]:
model_2 = residualnet(backbone_model, 47, 0.6, layer_index=-33)
model_2.compile(
    optimizer = tf.keras.optimizers.Adam(),
    loss = tf.keras.losses.SparseCategoricalCrossentropy(),
    metrics=['accuracy']
)

model_2.fit(ds_train, epochs=15, validation_data=ds_test)

In [None]:
# use the encoding module after extracting features from last conv layer of a CNN
class EncodingLayer(tf.keras.layers.Layer):
  def __init__(self, K=32):
    super(EncodingLayer, self).__init__()
    self.K = K
  def build(self, input_shape):
    self.D = input_shape[-1]
    std_1 = 1/self.K**0.5 # to generate uniform distribution
    init = tf.random_uniform_initializer(minval=-std_1, maxval=std_1)
    self.codes = self.add_weight(name='codebook', initializer = init, trainable = True, shape=(1,self.K, self.D))
    self.scale = self.add_weight(name='scale_factors', initializer = init, trainable = True, shape = (self.K, ))
  def call(self, inputs):
    x = tf.keras.backend.reshape(inputs, (-1,inputs.shape[1]*inputs.shape[2], 1, self.D ))
    res = x - self.codes
    W_i = tf.keras.backend.softmax(tf.multiply(tf.linalg.norm(res, axis=-1), self.scale))
    E = tf.einsum('bik, bikd->bkd', W_i, res)
    E = tf.keras.backend.l2_normalize(E)
    return E

def encoding_module(shape, filters, K=32):
  input = tf.keras.Input(shape=shape)
  x = tf.keras.layers.Conv2D(filters, 1)(input)
  x = tf.keras.layers.BatchNormalization()(x)
  x = tf.keras.layers.Activation(activation='relu')(x)
  x = EncodingLayer(K=K)(x)
  x = tf.keras.layers.Flatten()(x)
  model = tf.keras.Model(inputs=input, outputs = x)
  return model

def DeepTEN(basemodel):
  x = basemodel.output
  x = encoding_module(basemodel.output_shape[1:], 128, 32)(x)
  x = tf.keras.layers.Dense(47, activation='softmax')(x)
  return tf.keras.Model(inputs= basemodel.input, outputs=x)

basemodel = tf.keras.applications.resnet50.ResNet50(include_top=False, weights='imagenet', input_shape=(448,448,3))
basemodel.trainable = False
model = DeepTEN(basemodel)
model.compile(
    optimizer = tf.keras.optimizers.Adam(),
    loss = tf.keras.losses.SparseCategoricalCrossentropy(),
    metrics=['accuracy']
)
model.fit(ds_train, epochs=15, validation_data=ds_test)

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5
Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15


<keras.callbacks.History at 0x7fc87ac82d10>

In [None]:
def EncodingModule(shape, dropout_rate):
  input = tf.keras.Input(shape=shape[1:])
#--------------Convolutional Transfer-----------------------------
  ctm = tf.keras.layers.Conv2D(shape[-1], kernel_size= 1)(input)
  ctm = tf.keras.layers.Dropout(dropout_rate)(ctm)
  ctm = tf.keras.layers.BatchNormalization()(ctm)
  ctm = tf.keras.layers.Activation(activation='sigmoid')(ctm)
  #--------------Residual Calculation-------------------------------
  x = tf.keras.layers.Activation(activation='sigmoid')(input)
  res = tf.keras.layers.Lambda(lambda v: tf.subtract(v[1], v[0]))([x, ctm])
  #--------------Aggregation----------------------------------------
  x = tf.keras.layers.Activation(activation='relu')(res)
  x = tf.keras.layers.BatchNormalization()(x)
  x = tf.keras.layers.Activation(activation='relu')(x)
  x = tf.keras.layers.GlobalAveragePooling2D()(x)
  return tf.keras.Model(inputs=input, outputs = x)

In [None]:
def testmodel(basemodel, dropout_rate=0.5):
  x1 = basemodel.get_layer(index=-1).output
  x2 = basemodel.get_layer(index=-33).output
  x3 = basemodel.get_layer(index=-95).output
  x4 = basemodel.get_layer(index=-137).output

  x1= EncodingModule(x1.shape, dropout_rate)(x1)
  x2= EncodingModule(x2.shape, dropout_rate)(x2)
  x3= EncodingModule(x3.shape, dropout_rate)(x3)
  x4= EncodingModule(x4.shape, dropout_rate)(x4)

  x = tf.keras.layers.Concatenate()([x1,x2,x3,x4])
  #x = tf.keras.layers.Dense(2048, activation='relu')(x)
  #x = tf.keras.layers.BatchNormalization()(x)
  x = tf.keras.layers.Dense(47, activation='softmax')(x)
  model = tf.keras.Model(inputs=basemodel.input, outputs = x)
  return model

basemodel = tf.keras.applications.ResNet50(include_top=False, weights='imagenet',input_shape=(448,448,3))
basemodel.trainable = False
testmodel_1 = testmodel(basemodel)
tf.keras.utils.plot_model(testmodel_1)

In [None]:
testmodel_1.compile(optimizer= tf.keras.optimizers.Adam(),
                    loss = tf.keras.losses.SparseCategoricalCrossentropy(),
                    metrics = ['accuracy'])

testmodel_1.fit(ds_train, epochs=15, validation_data = ds_test)

Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15


<keras.callbacks.History at 0x7f800a532d10>