Description: In this notebook file, you can find the code regarding how to make a normal model into rotational invariant
model. Here, we are using Resnet architecture style. We have also implemented color invariant block at the begining of the model.
And in side rotational invariant block, only one layer is allowed for training and rest of the block of freezed. After training,
that one layer, we will just transfer the kernel weight to the non-trainable layer with appropriate orientation. 

Issue: One issue we faced when working with the weights of the model is that is tampers the inference mechansim of the network.
Using transferable weights, we can improve the feature detection ability of the model but it impacts the inference mechanism
of the model. And this further reduces the accuracy of the model prediction. To handle this, after transfering the weights, we
will set all the layers are trainable and train the model on the random augmented data for 2-3 epochs. And this adjusts the 
inference mechanism of the model and the model runs as intended.

In our tests, the model which trained in this method shown similar accuracy irrespective of the rotation of the image. So, if 
we just improve the feature detection of the model, then we can simply transfer that knowledge to all orientations.

In [1]:
# Importing the required libraries
import tensorflow as tf
import os
import cv2 as cv
from tensorflow import keras
import numpy as np
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Dense, BatchNormalization, LeakyReLU, DepthwiseConv2D, \
GlobalMaxPooling2D, Input, Dropout, Add
from tensorflow.keras.models import Model, load_model
import pathlib as pl
from itertools import permutations
from scipy import ndimage
from keras.preprocessing.image import ImageDataGenerator

In [2]:
# This function is related to Activation Suppression technique. This function takes input of normal image and applies
# oil paint stylizing effect
def oilpaint(img):
    kernel = cv.getStructuringElement(cv.MORPH_ELLIPSE, (2, 2))
    morph = cv.morphologyEx(img, cv.MORPH_OPEN, kernel)
    result = cv.normalize(morph, None, 50, 255, cv.NORM_MINMAX)
    return result

# This function is related to Activation Suppression, to get the coordinates of the activation regions
def relative_coordinates_getter_circle(activation_area, dim):
    pixel_ratio = 300/dim
    relative_y = int((activation_area[1]/dim)*300)
    relative_x = int((activation_area[0]/dim)*300)
    return(relative_y, relative_x)

# This function is related to Activation Suppression, it is used to blur any shape edges of the suppressions
def diffuser(img, center, dim):
    blured = cv.GaussianBlur(img, (31, 31), 0)
    mask = np.zeros((300, 300, 3), dtype=np.uint8)
    cv.circle(mask, (center), int(300/dim)+3, (255, 255, 255), -1)
    
    out = np.where(mask==np.array([255, 255, 255]), blured, img)
    return out
 
# This function is used to draw the circles on the regions where we want to suppress
def supress_circle(img, center, dim):
    dim += 4
    img = oilpaint(img)
    cv.circle(img, (center), int(300/dim), (127, 127, 127), -1)
    return diffuser(img, center, dim)


# This is the main function that handles everything related to activation suppression
def activation_suppressor(model, taker_data, saver_data):
    global class_names, counter
    features = keras.Model(inputs=model.inputs, outputs=[layer.output for layer in model.layers])
    layer_names = []
    for layer in model.layers:
        layer_names.append(layer.__class__.__name__)
    not_req = ["Flatten", "Dense", "Dropout", "GlobalMaxPooling2D"]
    req = []
    for i in range(len(layer_names)):
        if layer_names[i] not in not_req:
            req.append(i)
    for each_class in class_names:
        temp_path = taker_data + "\\" + each_class
        temp_pathlib_file = pl.Path(temp_path)
        all_img = list(temp_pathlib_file.glob("*"))
        for each_img in all_img:
            img = cv.imread(str(each_img))
            img = cv.cvtColor(img, cv.COLOR_BGR2RGB)
            img = cv.resize(img, (300, 300))
            feature_extractor = features(np.expand_dims(img/255.0, 0))
            l_features = feature_extractor[req[-1]].numpy()[0]
            dim = len(l_features[..., 0])
            final_img = np.zeros((dim, dim))
            for i in range(len(l_features[0][0])):
                final_img = final_img + l_features[..., i]
            top_five_activations = []
            for i in range(5):
                activation_area_1 = np.where(final_img == np.max(final_img))
                final_img[activation_area_1[0][0]][activation_area_1[1][0]] = 0
                top_five_activations.append((activation_area_1[0][0],activation_area_1[1][0]) )
            for i in range(5):
                img = supress_circle(img, relative_coordinates_getter_circle(top_five_activations[i], dim-1), dim-1)
            img = cv.cvtColor(img, cv.COLOR_RGB2BGR)
            parts = str(each_img).split("\\")[-1]
            parts = parts.split(".")[-2]
            cv.imwrite(saver_data + "\\" + each_class + "\\aug" + str(parts) + "as.jpg", img)
            counter += 1

In [3]:
# Defining necessary paths of training and validation datasets
training_path = 
validation_path = 
main_file_path = training_path
main_dataset = pl.Path(main_file_path)
class_names = np.array(sorted(item.name for item in main_dataset.glob("*")))

In [4]:
# These all functions are used to GPU implementation of the input pipeline of the tensorflow

def get_label(file_name):
    global class_names
    parts = tf.strings.split(file_name, os.path.sep)
    return parts[-2] == class_names

def get_image(file_name):
    file = tf.io.read_file(file_name)
    image = tf.io.decode_image(contents=file, channels=3, expand_animations=False)
    image = tf.image.resize(image, [300, 300])
    image = image/255.0
    return image

def process_data(file_name):
    label = get_label(file_name)
    img = get_image(file_name)
    return (img, label)

def configure_input(dataset, batch_size):
    AUTOTUNE = tf.data.AUTOTUNE
    dataset = dataset.shuffle(buffer_size=1000).batch(batch_size=batch_size)
    dataset = dataset.prefetch(buffer_size=AUTOTUNE)
    return dataset

In [5]:
# This single function, handles everything related to the input pipeline of the training and validation datasets
def input_pipeline(train_path, validation_path, train_batch_size, validation_batch_size):
    global class_names
    main_file_path = train_path
    main_dataset = pl.Path(main_file_path)
    #class_names = np.array(sorted(item.name for item in main_dataset.glob("*")))
    total_main_ds_size = len(list(main_dataset.glob("*/*")))
    
    validation_file_path = validation_path
    val_dataset = pl.Path(validation_file_path)
    total_val_ds_size = len(list(val_dataset.glob("*/*")))
    
    train_data_set = tf.data.Dataset.list_files(str(main_dataset/"*/*"), shuffle=False)
    train_data_set = train_data_set.shuffle(total_main_ds_size, reshuffle_each_iteration=False)
    
    validation_data_set = tf.data.Dataset.list_files(str(val_dataset/"*/*"), shuffle=False)
    validation_data_set = validation_data_set.shuffle(total_val_ds_size, reshuffle_each_iteration=False)
    
    print(f"Size of the Training dataset: {tf.data.experimental.cardinality(train_data_set)}")
    print(f"Size of the Testing dataset: {tf.data.experimental.cardinality(validation_data_set)}")
    
    AUTOTUNE = tf.data.AUTOTUNE
    train_data_set = train_data_set.map(process_data, num_parallel_calls=AUTOTUNE)
    validation_data_set = validation_data_set.map(process_data, num_parallel_calls=AUTOTUNE)
    
    train_data_set = configure_input(train_data_set, train_batch_size)
    validation_data_set = configure_input(validation_data_set, validation_batch_size)
    
    print(f"Number of batches in training dataset: {tf.data.experimental.cardinality(train_data_set)}")
    print(f"Number of batches in validation dataset: {tf.data.experimental.cardinality(validation_data_set)}")
    return train_data_set, validation_data_set

In [6]:
# Getting Training and Validation datasets
train, val = input_pipeline(training_path, validation_path, 8, 8)

Size of the Training dataset: 9000
Size of the Testing dataset: 400
Number of batches in training dataset: 1125
Number of batches in validation dataset: 50


In [7]:
# This function is used to returns block which contains a set of layers where each layer is associated with a particular order 
#of RGB channels. Here, only one layer is allowed for training and the rest of the layers are frozen and initialized with zeros.
# The particular function is inspired from Inception architecture. Here, the input image is passed onto 6 layers (each layer)
# represents a particular coombination of RGB channels. And in each layers, we perform convolution with 2 x 2 kernel (for better
# texture detection) and 3 x 3 kernel (for better low-level feature detection). But unlike Inception architecture, we are not
# concatenating the results, rather we are adding them. This is mainly to reduce the memory requirements of the model.


def color_invariant_block(n_filters, l1_kernel_size, l2_kernel_size, X_input, block_code):
    block_code=str(block_code)
    texture1 = Conv2D(filters=n_filters, kernel_size=(l1_kernel_size, l1_kernel_size), padding="same", name="trainable1"+block_code)(X_input)
    edges1 = Conv2D(filters=n_filters, kernel_size=(l2_kernel_size, l2_kernel_size), padding="same", name="trainable2"+block_code)(X_input)

    texture2 = Conv2D(filters=n_filters, kernel_size=(l1_kernel_size, l1_kernel_size), padding="same", activation="relu", name="nontrainable1"+block_code)(X_input)
    edges2 = Conv2D(filters=n_filters, kernel_size=(l2_kernel_size, l2_kernel_size), padding="same", activation="relu", name="nontrainable2"+block_code)(X_input)

    texture3 = Conv2D(filters=n_filters, kernel_size=(l1_kernel_size, l1_kernel_size), padding="same", activation="relu", name="nontrainable3"+block_code)(X_input)
    edges3 = Conv2D(filters=n_filters, kernel_size=(l2_kernel_size, l2_kernel_size), padding="same", activation="relu", name="nontrainable4"+block_code)(X_input)

    texture4 = Conv2D(filters=n_filters, kernel_size=(l1_kernel_size, l1_kernel_size), padding="same", activation="relu", name="nontrainable5"+block_code)(X_input)
    edges4 = Conv2D(filters=n_filters, kernel_size=(l2_kernel_size, l2_kernel_size), padding="same", activation="relu", name="nontrainable6"+block_code)(X_input)

    texture5 = Conv2D(filters=n_filters, kernel_size=(l1_kernel_size, l1_kernel_size), padding="same", activation="relu", name="nontrainable7"+block_code)(X_input)
    edges5 = Conv2D(filters=n_filters, kernel_size=(l2_kernel_size, l2_kernel_size), padding="same", activation="relu", name="nontrainable8"+block_code)(X_input)

    texture6 = Conv2D(filters=n_filters, kernel_size=(l1_kernel_size, l1_kernel_size), padding="same", activation="relu", name="nontrainable9"+block_code)(X_input)
    edges6 = Conv2D(filters=n_filters, kernel_size=(l2_kernel_size, l2_kernel_size), padding="same", activation="relu", name="nontrainable10"+block_code)(X_input)

    texture1 = LeakyReLU(alpha=0.0001)(texture1)
    edges1 = LeakyReLU(alpha=0.001)(edges1)

    texture2 = LeakyReLU(alpha=0.0001)(texture2)
    edges2 = LeakyReLU(alpha=0.001)(edges2)

    texture3 = LeakyReLU(alpha=0.0001)(texture3)
    edges3 = LeakyReLU(alpha=0.001)(edges3)

    texture4 = LeakyReLU(alpha=0.0001)(texture4)
    edges4 = LeakyReLU(alpha=0.001)(edges4)

    texture5 = LeakyReLU(alpha=0.0001)(texture5)
    edges5 = LeakyReLU(alpha=0.001)(edges5)

    texture6 = LeakyReLU(alpha=0.0001)(texture6)
    edges6 = LeakyReLU(alpha=0.001)(edges6)

    x1 = Add()([texture1, edges1])
    x2 = Add()([texture2, edges2])
    x3 = Add()([texture3, edges3])
    x4 = Add()([texture4, edges4])
    x5 = Add()([texture5, edges5])
    x6 = Add()([texture6, edges6])
    x = Add()([x1,x2,x3,x4,x5,x6])
    x = BatchNormalization(axis=3)(x)
    return x

In [8]:
# This function is used to return return a block which contains a set of layers which are associated with different angles.
# In this function, we have 4 layers and only one layer is allowed to train. After training the one layer, the kernel weights
# are rotated at 90, 180, 270 are transfered to the non-trainble layers. The main reason to choose only 90, 180, 270 is that
# there are the angles which have highest variance between them, so model can capture more information and also, rotation of 
# kernel at arbitrary angles can introduce new weights in the kernel due to extrapolation.

def rotational_equivariant_block(n_filters, kernel_size, input_layer, block_code):
    block_code=str(block_code)
    x1 = Conv2D(filters=n_filters, kernel_size=(kernel_size, kernel_size), padding="same", activation="relu", name="trainable11"+block_code)(input_layer)
    x2 = Conv2D(filters=n_filters, kernel_size=(kernel_size, kernel_size), padding="same", activation="relu", name="nontrainable11"+block_code)(input_layer)
    x3 = Conv2D(filters=n_filters, kernel_size=(kernel_size, kernel_size), padding="same", activation="relu", name="nontrainable12"+block_code)(input_layer)
    x4 = Conv2D(filters=n_filters, kernel_size=(kernel_size, kernel_size), padding="same", activation="relu", name="nontrainable13"+block_code)(input_layer)
    x = tf.keras.layers.maximum([x1, x2, x3, x4])
    x = BatchNormalization(axis=3)(x)
    return x

In [9]:
# This function is used to mark which layers are allowed for training and which are now allowed for training
def set_non_trainable(model):
    for layer in model.layers:
        name = layer.name
        if name.startswith("nontrainable"):
            w = layer.get_weights()
            new_w = np.zeros(w[0].shape)
            new_b = np.zeros(layer.filters)
            layer.set_weights([new_w, new_b])
            layer.trainable_= False
        elif name.startswith("nonzero"):
            layer.trainable = False
        elif name.startswith("blur"):
            w = layer.get_weights()
            new_w = np.zeros((w[0].shape))
            new_w.fill(0.25)
            layer.set_weights([new_w])
            layer.trainable = False
    return model

In [10]:
# Defining the model with neccessary blocks and layers

X_input = Input(shape=(300, 300, 3))
n_filters=32
X = color_invariant_block(n_filters=n_filters, l1_kernel_size=2, l2_kernel_size=3, X_input=X_input, block_code=1)
X_copy = Conv2D(filters=n_filters, kernel_size=(3,3), padding="same", activation="relu")(X_input)
X = Add()([X, X_copy])
X = DepthwiseConv2D(kernel_size=2, use_bias=False, padding="same", name="blur1")(X)
X = MaxPooling2D()(X)

X_copy = X
n_filters=32
X = rotational_equivariant_block(n_filters, 3, X, block_code=2)
X_copy = Conv2D(filters=n_filters, kernel_size=(3, 3), padding="same", activation="relu", name="nonzero1")(X_copy)
X = Add()([X, X_copy])
X = DepthwiseConv2D(kernel_size=2, use_bias=False, padding="same", name="blur2")(X)
X = MaxPooling2D()(X)


X_copy = X
n_filters=32
X = rotational_equivariant_block(n_filters, 3 , X, block_code=3)
X_copy = Conv2D(filters=n_filters, kernel_size=(3, 3), padding="same", activation="relu", name="nonzero2")(X_copy)
X = Add()([X, X_copy])
X = DepthwiseConv2D(kernel_size=2, use_bias=False, padding="same", name="blur3")(X)
X = MaxPooling2D()(X)


X_copy = X
n_filters=32
X = rotational_equivariant_block(n_filters, 3, X, block_code=4)
X_copy = Conv2D(filters=n_filters, kernel_size=(3, 3), padding="same", activation="relu", name="nonzero3")(X_copy)
X = Add()([X, X_copy])
X = DepthwiseConv2D(kernel_size=2, use_bias=False, padding="same", name="blur4")(X)
X = MaxPooling2D()(X)

X_copy = X
n_filters=32
X = rotational_equivariant_block(n_filters, 3, X, block_code=5)
X_copy = Conv2D(filters=n_filters, kernel_size=(3, 3), padding="same", activation="relu", name="nonzero4")(X_copy)
X = Add()([X, X_copy])
X = DepthwiseConv2D(kernel_size=2, use_bias=False, padding="same", name="blur5")(X)
X = MaxPooling2D()(X)

X = Dropout(0.2)(X)
X = GlobalMaxPooling2D()(X)
X = Dense(3, activation="softmax")(X)


In [11]:
# Creating the model with appropriate inputs and outputs
model = Model(inputs=X_input, outputs=X)

In [12]:
# Calling the set_non_trainable function to mark which layers are allowed for training
model = set_non_trainable(model)

In [13]:
model.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])

In [14]:
# Training the model
callback = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=3)
history = model.fit(train, validation_data=val, epochs=1, callbacks=[callback])



In [11]:
# Calling Activation Suppression function, to generate the suppressed data based on the models performance
counter = 100001
activation_suppressor(model, training_path, temporary_save_path)

In [15]:
# Using this function, based on the block code, we will transfer weights from trainable layers to non-trainable layers with
# necessary modifications

def transfer_weights(model, block_code):
    if block_code == 1: # For transfering color information
        w1 = model.layers[1].get_weights()
        w2 = model.layers[2].get_weights()
        channels = ["r", "g", "b"]
        combinations = permutations(channels)
        c = 2
        for i in combinations:
            if c == 2:
                c += 1
            else:
                new_w = np.zeros(w1[0].shape)
                index = 0
                for j in i:
                    new_w[:, :, index, :] = w1[0][:, :, channels.index(j), :]
                    index += 1
                model.layers[c].set_weights([new_w, w1[1]])
                c += 1
                
                new_w = np.zeros(w2[0].shape)
                index = 0
                for j in i:
                    new_w[:, :, index, :] = w2[0][:, :, channels.index(j), :]
                    index += 1
                model.layers[c].set_weights([new_w, w2[1]])
                c += 1
    else: # For transfering rotational information
        layer_index = 0
        while not model.layers[layer_index].name.startswith(str(r"nontrainable")) or not model.layers[layer_index].name.endswith(str(block_code)):
            layer_index += 1
        layer_index -= 1
        w = model.layers[layer_index].get_weights()
        rotations = [90, 180, 270]
        layer_index += 1
        for i in rotations:
            temp = w[0]
            for j in range(w[0].shape[2]):
                for k in range(w[0].shape[3]):
                    temp[:, :, j, k] = ndimage.rotate(temp[:, :, j, k], i)
            model.layers[layer_index].set_weights([temp, w[1]])
            layer_index += 1

    return model    
    

In [16]:
# Calling transfer_weights function for all block codes
for i in range(1, 6):
    model = transfer_weights(model, i)

In [17]:
# Defining an ImageDataGenerator to rotate an image at arbitrary angles so that we can retrain the transfered model so make sure
# that the inference mechansim stays intact

gen = ImageDataGenerator(
    rescale=1.0/255,
    rotation_range=180
)

data = gen.flow_from_directory(r"D:\Samsung Prism\Datasets\Cat and Dog\Training",
                              target_size=(300, 300), batch_size=8, class_mode="categorical")

Found 18000 images belonging to 3 classes.


In [18]:
# Unfreezing all the layers and setting them to trainable
for layer in model.layers:
    if not layer.name.startswith("blur"):
        layer.trainable = True

In [19]:
# Retraining the model for 5 or under 5 epochs
callback = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=3)
history = model.fit(data, validation_data=val, epochs=1, callbacks=[callback])



In [20]:
# Finally, saving the model
model.save(save_path)



INFO:tensorflow:Assets written to: Model generated\Temp\assets


INFO:tensorflow:Assets written to: Model generated\Temp\assets
