***Implementation of the paper - "Do CNNs encode augmentations?"***

By: Sourav Sharan* and Rohan Banerjee* <br>
*equal contributions


Importing all needed libraries

In [None]:
import tensorflow as tf
from tensorflow.keras.datasets import cifar10
from tensorflow.keras import layers, models
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import to_categorical
import matplotlib.pyplot as plt
import numpy as np
from sklearn.model_selection import train_test_split
import keras
from numpy import save
from tensorflow.keras.layers import Input, Conv2D, Dense, Flatten, Dropout
from tensorflow.keras.layers import GlobalMaxPooling2D, MaxPooling2D
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.models import Model
from google.colab.patches import cv2_imshow
from google.colab import drive
drive.mount('/content/drive/')

Setting and spiltting up the Cifar10 dataset for training the CNN model and the RankNet model and testing both



In [None]:
input_shape = (32, 32, 3)

(x_train, y_train), (x_test, y_test) = keras.datasets.cifar10.load_data()

x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.20, random_state=1) # 0.25 x 0.8 = 0.2
print(f"x_train shape: {x_train.shape} - y_train shape: {y_train.shape}")
print(f"x_val shape: {x_val.shape} - y_val shape: {y_val.shape}")
print(f"x_test shape: {x_test.shape} - y_test shape: {y_test.shape}")

save('x_train.npy', x_train)
save('y_train.npy', y_train)
save('x_val.npy', x_val)
save('y_val.npy', y_val)
save('x_test.npy', x_test)
save('y_test.npy', y_test)

Data Generator for augmenting the data. For our experiment, we chose only the "zoom_in" and "zoom_out" augmentations

In [None]:
datagen = ImageDataGenerator(
    horizontal_flip=True,
    vertical_flip=True,
    zoom_range=[0.5,1.0]
    )
datagen.fit(x_train)

# Display the data to verify the augmentation

fig, ax = plt.subplots(5, 5)
k = 0
aug_iter = datagen.flow(x_train[:25], batch_size=1,shuffle=False)

 
for i in range(5):
    for j in range(5):
        
        aug_img = next(aug_iter)[0].astype('uint8')
        ax[i][j].imshow(aug_img, aspect='auto')
        k += 1
        
plt.show()

Normalizing the data for training

In [None]:
x_train, x_test = x_train / 255.0, x_test / 255.0
y_train, y_test = y_train.flatten(), y_test.flatten()

Building the model for training (with augmentations)

In [None]:
# number of classes
K = len(set(y_train))

# calculate total number of classes
# for output layer
print("number of classes:", K)

# Build the model using the functional API
# input layer
i = Input(shape=x_train[0].shape)
#augmented = data_augmentation(i)
#x = Conv2D(32, (3, 3), activation='relu', padding='same')(augmented)
x = Conv2D(32, (3, 3), activation='relu', padding='same')(i)
x = BatchNormalization()(x)
x = Conv2D(32, (3, 3), activation='relu', padding='same')(x)
x = BatchNormalization()(x)
x = MaxPooling2D((2, 2))(x)

x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
x = BatchNormalization()(x)
x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
x = BatchNormalization()(x)
x = MaxPooling2D((2, 2))(x)

x = Conv2D(128, (3, 3), activation='relu', padding='same')(x)
x = BatchNormalization()(x)
x = Conv2D(128, (3, 3), activation='relu', padding='same')(x)
x = BatchNormalization()(x)
x = MaxPooling2D((2, 2))(x)

x = Flatten()(x)
x = Dropout(0.2)(x)

# Hidden layer
x = Dense(1024, activation='relu')(x)
x = Dropout(0.2)(x)

# last hidden layer i.e.. output layer
x = Dense(K, activation='softmax')(x)

model = Model(i, x)

# model description
model.summary()

Fitting the model and training

In [None]:
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
r = model.fit(datagen.flow(x_train, y_train, batch_size=32), validation_data=(x_test, y_test), epochs=50, verbose =1)

In [None]:
model.save('/content/drive/MyDrive/Colab Notebooks/models/model_without_aug_test.h5')
model = tf.keras.models.load_model('/content/drive/MyDrive/Colab Notebooks/models/model_without_aug_test.h5')

Extracting the trained model except the last layer for the RankNet model and freezing it

In [None]:
feature_extractor = Model(inputs=model.input, outputs=model.layers[16].output)
feature_extractor.trainable = False
feature_extractor.summary()

Preparing training data for the RankNet model

In [None]:
import random
x_rank_train = []
y_rank_train = []
for i in range(4):
  for x in x_val:
    a_fact = random.randint(5,9)/10 
    b_fact = random.randint(5,9)/10
    # the labels for the training data are the zoom factors
    y_rank_train.append((a_fact-b_fact))
    
    datagen = ImageDataGenerator(zoom_range=[a_fact,a_fact])
    
    img = tf.expand_dims(x, axis=0)
    aug_iter = datagen.flow(img, batch_size=1,shuffle=False)
    aug_img_a = next(aug_iter)
    datagen = ImageDataGenerator(zoom_range=[b_fact,b_fact])
    
    aug_iter = datagen.flow(img, batch_size=1,shuffle=False)
    aug_img_b = next(aug_iter)
    x_rank_train.append([aug_img_a,aug_img_b])
    

print(x_rank_train,y_rank_train)

Defining the RankNet loss

In [None]:
def rank_loss(y_true, y_pred):
  final = K.log(1+K.exp((-1*(tf.math.sign(y_true)*y_pred))))
  return final

Creating and compiling the RankNet model

In [None]:
input_dim = x_train[0].shape
input = Input(shape=(input_dim))

x = feature_extractor(input)

# adding new layers to the trained model
x = Dropout(0.2)(x)
x = Dense(1024, activation='relu')(x)
x = Dropout(0.2)(x)
x = Dense(1, activation='linear')(x)
base_model = Model(input, x)

# prinitng model summary for verification
for i,layer in enumerate(base_model.layers):
    print(i,layer.name,layer.trainable)
print(base_model.summary())


# creating the ranknet model
input_a = Input(shape=(input_dim))
input_b = Input(shape=(input_dim))

a_score = base_model(input_a)
b_score = base_model(input_b)

# subtract scores {scores are amount of augmentation in our case}
diff = tf.keras.layers.Subtract()([a_score, b_score])
prob = diff

# Build model
meta_model = Model(inputs = [input_a, input_b], outputs = prob)
meta_model.compile(optimizer='adam', loss=rank_loss, metrics=['accuracy'])
meta_model.summary() 

Normalizing data for training the RankNet

In [None]:
x_rank_train, x_rank_test = x_rank_train / 255.0, x_rank_test / 255.0

print(x_rank_train[0][1].shape)
x_rank_train_a = []
x_rank_train_b = []
x_rank_test_a = []
x_rank_test_b = []

for x in x_rank_train:
  x_rank_train_a.append(x[0][0]/255.0)
  x_rank_train_b.append(x[1][0]/255.0)
for x in x_rank_test:
  x_rank_test_a.append(x[0][0]/255.0)
  x_rank_test_b.append(x[1][0]/255.0)

x_rank_train_a = np.array(x_rank_train_a)
x_rank_train_b = np.array(x_rank_train_b)
x_rank_test_a = np.array(x_rank_test_a)
x_rank_test_b = np.array(x_rank_test_b)
y_rank_train = np.array(y_rank_train)
y_rank_test = np.array(y_rank_test)

print(len(x_rank_train_a),len(x_rank_train_b),len(x_rank_test_a),len(x_rank_test_b))
print(x_rank_train_a.shape)

Training the RankNet model

In [None]:
r = meta_model.fit([x_rank_train_a,x_rank_train_b], y_rank_train, epochs=50, verbose =1,batch_size=32)

Predicting how many images have been predicted with the correct zoom factor hence proving that CNNs enocode augmentations

In [None]:
gt_a = 0
gt_b = 0
pred_a = 0
pred_b = 0 

for gt, pred in zip(y_rank_test, results):
  #print(gt,pred)
  if gt > 0:
    gt_b += 1
    if pred > 0:
      pred_b += 1
  else:
    gt_a += 1
    if pred < 0:
      pred_a += 1


print('where a has more aug : ', pred_a, ' / ', gt_a)
print('where b has more aug : ', pred_b, ' / ', gt_b)