# Download data and unzip

In [2]:
!rm -rf *
!wget https://storage.googleapis.com/wandb_datasets/nature_12K.zip
!unzip -q nature_12K.zip
!rm nature_12K.zip 
!find . -name '.DS_Store' -type f -delete
!pip install rich wandb

# Imports

In [3]:
import yaml
import os
import cv2
import time
import glob
import random
import numpy as np
from tqdm import tqdm
from PIL import Image
from rich import print
from pprint import pprint
from cv2 import imread, cvtColor

from sklearn.utils import shuffle

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers import  Dense, Input, InputLayer, Flatten, Conv2D, BatchNormalization, MaxPooling2D, Activation , GlobalAveragePooling2D

from tensorflow.keras.applications.inception_v3 import InceptionV3
from tensorflow.keras.applications.inception_resnet_v2 import InceptionResNetV2 as IRV2
from tensorflow.keras.applications.resnet50 import ResNet50
from tensorflow.keras.applications.xception import Xception


from matplotlib import pyplot as plt
%matplotlib inline

import wandb
from wandb.keras import WandbCallback

# Constants

In [4]:
TRAIN_DIR = "./inaturalist_12K/train/"
TEST_DIR = "./inaturalist_12K/val/"
os.environ["WANDB_API_KEY"] = "f058e3418fb166c3bee2d5131ef0bcc4b642793f"
IMAGE_SIZE = (224,224)

class_labels = ['Amphibia', 'Animalia', 'Arachnida', 'Aves', 'Fungi', 'Insecta', 'Mammalia', 'Mollusca', 'Plantae', 'Reptilia']
class_labels = sorted(class_labels)

# Model

In [5]:
class Model():

    def __init__(self, image_size, config):
        
        self.IMG_HEIGHT = image_size[0]
        self.IMG_WIDTH = image_size[1]        
        self.input_shape = (self.IMG_HEIGHT, self.IMG_WIDTH, 3)
        
        self.num_hidden_cnn_layers= config["num_hidden_cnn_layers"]
        self.filter_size = config["filter_size"]
        
        self.number_of_filters_base  = config["number_of_filters_base"]
        self.filter_distribution = config["filter_distribution"]
        
        self.padding = config["padding"]
        self.activation = config["activation"]
        self.optimizer = config["optimizer"]
        
        self.pool_size = config["pool_size"]

        self.batch_normalization = config["batch_normalization"]
        self.batch_normalisation_location = config["batch_normalisation_location"]  
        self.dropout_fraction = config["dropout_fraction"]
        self.dropout_location = config["dropout_location"]
        
        self.global_average_pooling = config["global_average_pooling"]
        self.dense_neurons = config["dense_neurons"]
        self.num_classes = config["num_classes"]


    def build_cnnmodel(self):
        with tf.device('/device:GPU:0'):
            tf.keras.backend.clear_session()

            model = Sequential()
            model.add(
                Conv2D(
                    self.number_of_filters_base, 
                    self.filter_size, 
                    padding = self.padding,
                    kernel_initializer = "he_uniform",
                    input_shape = (self.IMG_HEIGHT, self.IMG_WIDTH, 3)
                    )
            )
            
            # if batchnorm is True, location is Before, add batch normalisation layer then activation
            if self.batch_normalisation_location == "Before" and self.batch_normalization: 
                model.add(BatchNormalization())
            
            model.add(Activation(self.activation))
            
            # if batchnorm is True, location is After, add activation then batch normalisation layer
            if self.batch_normalisation_location == "After" and self.batch_normalization: model.add(BatchNormalization())
            
            # Max pooling
            model.add(MaxPooling2D(pool_size=self.pool_size))  
            
            # Dropout
            if self.dropout_fraction != None:
                model.add(tf.keras.layers.Dropout(self.dropout_fraction))

            # Convolutional layers
            for i in range(self.num_hidden_cnn_layers-1):
                # TODO: changing the kernel size in each layer
                # filter distribution 
                # double - double number of filters in each Convolutional layers
                # half - halve the filter size in each successive convolutional layers
                # o/w same number of filters in each Convolutional layers

                if self.filter_distribution == "double":
                    model.add(
                        Conv2D(
                            2**(i+1)*self.number_of_filters_base, 
                            self.filter_size,
                            kernel_initializer = "he_uniform", 
                            padding = self.padding
                        )
                    )
                elif self.filter_distribution == "half":
                    model.add(
                        Conv2D(
                            int(self.number_of_filters_base/2**(i+1)), 
                            self.filter_size,
                            kernel_initializer = "he_uniform", 
                            padding = self.padding
                        )
                    )
                else:
                    model.add(
                        Conv2D(
                            self.number_of_filters_base,
                            self.filter_size,
                            kernel_initializer="he_uniform",
                            padding=self.padding
                        )
                    )

                if self.batch_normalisation_location == "Before" and self.batch_normalization: 
                    model.add(BatchNormalization())
                
                model.add(Activation(self.activation))
            
                if self.batch_normalisation_location == "After" and self.batch_normalization: 
                    model.add(BatchNormalization())
                
                # Max pooling
                model.add(MaxPooling2D(pool_size=self.pool_size))
                
                # Dropout
                if self.dropout_fraction != None and (self.dropout_location == "convlayer" or self.dropout_location == "all"):
                    model.add(tf.keras.layers.Dropout(self.dropout_fraction))

            # Global average pooling    
            if self.global_average_pooling == True:
                model.add(GlobalAveragePooling2D())
            else:
                model.add(Flatten())

            model.add(Dense(self.dense_neurons, activation = 'sigmoid'))

            if self.dropout_fraction != None and (self.dropout_location == "denselayer" or self.dropout_location == "all"):
                model.add(tf.keras.layers.Dropout(self.dropout_fraction))
            
            model.add(Dense(self.num_classes, activation = 'softmax'))

            return model

# Data Generator

In [6]:
def get_data_generator(train_dir, test_dir,  data_augmentation, batch_size, img_size):
    
    if data_augmentation == True:
        train_datagen = ImageDataGenerator(
            rescale=1./255,
            validation_split = 0.1,
            zoom_range=0.2,
            rotation_range=10, 
            horizontal_flip=True,
        )
    
    else:
        train_datagen = ImageDataGenerator(rescale=1./255, validation_split = 0.1)

    test_datagen = ImageDataGenerator(rescale=1./255)

    train_generator = train_datagen.flow_from_directory(
        train_dir,
        subset='training',
        target_size=img_size,
        batch_size=batch_size,
        class_mode='categorical',
        shuffle = True,
        seed = 42
    )
        
    validation_generator = train_datagen.flow_from_directory(
        train_dir,
        subset = 'validation',
        target_size=img_size,
        batch_size=batch_size,
        class_mode='categorical',
        shuffle = False,
        seed = 42
    )

    test_generator = test_datagen.flow_from_directory(
        test_dir,
        target_size=img_size,
        batch_size=batch_size,
        class_mode='categorical',
        shuffle = False,
        seed = 42
    )
    
    return train_generator, validation_generator, test_generator

# Train and Test function

In [7]:
def train():
    
    config_defaults = dict(
        image_size = IMAGE_SIZE,

        num_hidden_cnn_layers = 5,
        filter_size = (3,3),

        number_of_filters_base  = 128,
        filter_distribution = "half",

        padding = 'valid',
        activation = 'relu',
        optimizer = 'nadam',
        
        pool_size = (2,2),
        
        batch_normalization = True,
        batch_normalisation_location = "After",
        dropout_fraction = 0.01,
        dropout_location = "dense",
        
        global_average_pooling = True,
        dense_neurons = 256,
        num_classes = 10,
        
        epochs = 5,
        batch_size = 64,
        data_augmentation = False,
    ) 


    # WandB logging 
    wandb.init(project = 'CS6910_Assignment2', config = config_defaults)
    CONFIG = wandb.config
    # Run name  
    wandb.run.name = "CNN_" + str(CONFIG.num_hidden_cnn_layers) + "_dn_" + str(CONFIG.dense_neurons) \
    + "_opt_" + CONFIG.optimizer + "_dro_" + str(CONFIG.dropout_fraction) + "_bs_"+str(CONFIG.batch_size) \
    + "_fd_" + CONFIG.filter_distribution + "_bnl_" + CONFIG.batch_normalisation_location + "_dpl_" + CONFIG.dropout_location


    data_augmentation = CONFIG.data_augmentation
    BATCH_SIZE = CONFIG.batch_size
    train_generator, validation_generator, test_generator = get_data_generator(TRAIN_DIR, TEST_DIR,  data_augmentation, BATCH_SIZE, CONFIG.image_size)

    with tf.device('/device:GPU:0'):        
        _model = Model(CONFIG.image_size, CONFIG)
        model = _model.build_cnnmodel()

        model.compile(
            optimizer=CONFIG.optimizer,  
            loss=tf.keras.losses.CategoricalCrossentropy(),
            metrics=['accuracy'],
        )
      
        history = model.fit(
            train_generator,
            steps_per_epoch = train_generator.samples // CONFIG.batch_size,
            validation_data = validation_generator, 
            validation_steps = validation_generator.samples // CONFIG.batch_size,
            epochs = CONFIG.epochs, 
            callbacks=[WandbCallback()]
        )

        model.save('./TrainedModel/'+wandb.run.name)
        test_loss, test_acc = model.evaluate(test_generator)
        
        # Log the test loss and accuracy
        wandb.log(
            {
                "test_acc": test_acc,
                "test_loss": test_loss, 
            }
        )
        wandb.finish()
        
        return model, history

In [8]:
model, history = train()

# Sweep Config

In [9]:
sweep_config = {
    "name": "Test Sweep",
    "method": "bayes",
    "metric":{
        "name": "val_accuracy",
        "goal": "maximize"
    },
    'early_terminate': {
        'type':'hyperband',
        'min_iter': 3,
        's': 2
    },
    "parameters": {
        "epochs": {
            "values": [5, 10]
        },
        "data_augmentation": {
            "values": [True, False]
        },
        "batch_size": {
            "values": [32, 64]
        },
        "filter_size": {
            "values": [(7,7), (5,5), (3,3)]
        },
        "number_of_filters_base": {
            "values": [32, 64]
        },
        "filter_distribution": {
            "values": ["half", "full", "same"]
        },
        "activation":{
            "values": ["relu", "elu", "selu"]
        },
        "padding": {
            "values": ["valid", "same"]
        },
        "optimizer": {
            "values": ["adam", "nadam"]
        },
        "batch_normalization": {
            "values": [True, False]
        },
        "batch_normalisation_location": {
            "values": ["Before", "After"]
        },
        "dropout_fraction": {
            "values": [None, 0.2,0.3]
        },  
        "dropout_location": {
            "values": ["convlayer", "denselayer", "all"]
        },
        "dense_neurons": {
            "values": [32, 64, 128]
        },   
        "global_average_pooling": {
            "values": [False,True]
        },        
    }
}

sweep_id = wandb.sweep(sweep_config, project='CS6910_Assignment2')
# wandb.agent(sweep_id, train, count=1)


# Filter Visualization

### Load best model for visualization

In [10]:
best_run_path = 'ms20/CS6910_Assignment2/runs/odlwk86t/'

api = wandb.Api()
run = api.run(best_run_path)

model_file = run.file('model-best.h5').download(replace=True)
model = tf.keras.models.load_model(model_file.name)
config_file = run.file('config.yaml').download(replace=True)

with open(config_file.name, 'r') as file:
    config = yaml.safe_load(file)

### Load Random Images

In [11]:
# Load n samples from the test set for analysis
n = 30

files = glob.glob('./inaturalist_12K/val/*/*')
# select n random files
random_image_samples = random.sample(files, n)
# Convert them to np arrays
random_images = []
actual_labels = []
predicted_labels = []
for image in random_image_samples:
    img = Image.open(image)
    img = img.resize(config["img_size"]["value"])
    img = np.array(img)
    random_images.append(img)
    actual_labels.append(image.split('/')[-2])

    
predicted_class_number = np.argmax(model.predict(np.array(random_images)), axis=-1)
predicted_labels = [class_labels[class_no]
                    for class_no in predicted_class_number]


### Plot the images with prediction results

In [12]:
row, col = 10, 3
fig, ax = plt.subplots(row, col, figsize=(15, 18))
fig.suptitle("Model Evaluation on random images", fontsize='x-large')
for i in range(row):
    for j in range(col):
        idx = i*col + j
        ax[i][j].imshow(Image.open(
            random_image_samples[idx]).resize((500, 500)))
        ax[i][j].axis('off')
        ax[i][j].set_title(f'True: {actual_labels[idx]}')

        if actual_labels[idx] == predicted_labels[idx]:
            text_color = "green"
        else:
            text_color = "red"

        ax[i][j].text(0., -0.5, "Pred : ", color='black', transform=ax[i][j].transAxes)
        ax[i][j].text(0.7, -0.5, predicted_labels[idx], color=text_color, transform=ax[i][j].transAxes)

fig.tight_layout(rect=[0, 0.03, 1, 0.95])


In [13]:
# Log the image 
wandb.init()
wandb.log({"plot": fig})
wandb.finish()

In [16]:
# Filter Visualization
random_image_path = random.sample(glob.glob('./inaturalist_12K/val/*/*'), 1)[0]
rand_image = np.array(Image.open(random_image_path).resize(config['img_size']['value']))

# The first convolution layer of the model is named 'conv2d'
layer = model.get_layer(name='conv2d')
# Create an intermediate model to tap the output of the first layer
intermediate_model = tf.keras.Model(inputs=model.input, outputs=layer.output)
# Get the output of the first convolution layer
layer_outputs = intermediate_model.predict(rand_image.reshape(1, *rand_image.shape))
# Getting layer weights of the first convolution layer
layer_weights = layer.weights

# Plot the sample image
fig = plt.figure()
plt.title('Image used for visualizing filters', fontsize='x-large')
plt.axis('off')
plt.imshow(np.array(Image.open(random_image_path)))

# Plot the filters and layer weights
row, col = int(config['number_of_filters_base']['value'] / 8), 8
fig1, ax1 = plt.subplots(row, col, figsize=(25, 50), )
fig2, ax2 = plt.subplots(row, col, figsize=(25, 50))
# Set title of the figure
fig1.suptitle(f"Visualizing output of {config['number_of_filters_base']['value']} filters in first layer", fontsize='x-large')
fig2.suptitle(f"Visualizing {config['number_of_filters_base']['value']} filters in first layer", fontsize='x-large')

for i in range(row):
    for j in range(col):
        idx = i*col + j
        ax1[i][j].set_title(f'Filter : {idx+1}')
        ax1[i][j].axis('off')
        ax1[i][j].imshow(layer_outputs[0][:, :, idx], cmap='plasma')

        ax2[i][j].set_title(f'Filter : {idx+1}')
        ax2[i][j].axis('off')
        ax2[i][j].imshow(layer_weights[0].numpy()[:, :, 0, idx], cmap="gray")

fig1.tight_layout(rect=[0, 0.03, 0.5, 0.95])
fig2.tight_layout(rect=[0, 0.03, 0.5, 0.95])

fig1.show()
fig2.show()

wandb.init()
wandb.log({'random_image': wandb.Image(Image.open(random_image_path))})
wandb.log({'filters': fig2})
wandb.log({'filter_outputs': fig1})
run.finish()

In [42]:
@tf.custom_gradient
def guidedRelu(x):
    def grad(dy):
        return tf.cast(dy>0,"float32") * tf.cast(x>0, "float32") * dy
    return tf.nn.relu(x), grad

In [86]:
def guided_backprop(config, model):
    # Function to perform guided backpropagation on the model
    random_image_path = random.sample(
        glob.glob('./inaturalist_12K/val/*/*'), 1)[0]
    rand_inp_image = np.array(Image.open(
        random_image_path).resize(config['img_size']['value']))

    # Plot the image used for guided backprop
    plt.title('Random Image used for guided backpropogation', fontsize='x-large')
    plt.axis('off')
    plt.imshow(np.array(Image.open(random_image_path).resize(
        config['img_size']['value'])))
    plt.show()

    gb_model = tf.keras.Model(
        inputs=[model.inputs],
        outputs=[model.get_layer("conv2d_4").output]
    )

    output_shape = model.get_layer("conv2d_4").output.shape[1:]
    layer_dict = [layer for layer in gb_model.layers if hasattr(
        layer, 'activation')]

    for layer in layer_dict:
        if layer.activation == tf.keras.activations.relu:
            layer.activation = guidedRelu

    row, col = 5, 2
    fig, ax = plt.subplots(row, col, figsize=(15, 28))

    fig.suptitle(
        "Visualizing gradient for 10 neurons in CONV-5 layer", fontsize='x-large')

    for i in range(10):
        rand_neuron_index = [
            0] + [random.randint(0, dim_max-1) for dim_max in output_shape]

        mask = np.zeros((1, *output_shape))
        mask[rand_neuron_index[0], rand_neuron_index[1],
             rand_neuron_index[2], rand_neuron_index[3]] = 1

        with tf.GradientTape() as tape:
            inputs = tf.cast(rand_inp_image.reshape(
                1, *rand_inp_image.shape), tf.float32)
            tape.watch(inputs)
            outputs = (gb_model(inputs) * mask)

        grad = tape.gradient(outputs, inputs)[0]
        grad -= np.mean(grad)
        grad /= (np.std(grad) + 1e-5)
        ax[i//col, i % col].set_title(f'Neuron-{i+1}')
        ax[i//col, i % col].axis('off')
        ax[i//col, i % col].imshow(grad)

    fig.tight_layout(rect=[0, 0.03, 1, 0.95])
    fig.show()

    # Visulizing the guided back propagation for the whole image
    with tf.GradientTape() as tape:
        inputs = tf.cast(rand_inp_image.reshape(
            1, *rand_inp_image.shape), tf.float32)
        tape.watch(inputs)
        outputs = gb_model(inputs)

    grad = tape.gradient(outputs, inputs)[0]

    fig_whole = plt.figure()
    plt.title('Visualizing gradient for whole CONV-5 layer', fontsize='x-large')
    plt.axis('off')
    plt.imshow(np.array(grad))
    plt.show()

    # Gradcam
#     with tf.GradientTape() as tape:
#         inputs = tf.cast(rand_inp_image.reshape(
#             1, *rand_inp_image.shape), tf.float32)
#         tape.watch(inputs)
#         outputs = gb_model(inputs)[0]

#     grad = tape.gradient(outputs, inputs)[0]

#     weights = tf.reduce_mean(grad, axis=(0, 1))
#     grad_cam = np.ones(outputs.shape[0: 2], dtype=np.float32)

#     for j, w in enumerate(weights):
#         grad_cam += w * outputs[:, :, j]

#     grad_cam_img = cv2.resize(grad_cam.numpy(), (224, 224))
#     grad_cam_img = np.maximum(grad_cam_img, 0)
#     heatmap = (grad_cam_img - grad_cam_img.min()) / (grad_cam_img.max() - grad_cam_img.min())
#     grad_cam_img = cv2.applyColorMap(np.uint8(255*heatmap), cv2.COLORMAP_JET)
#     output_image = cv2.addWeighted(cv2.cvtColor(rand_inp_image.astype('uint8'), cv2.COLOR_RGB2BGR), 0.5, grad_cam_img, 1, 0)

#     fig_grad_cam = plt.figure()
#     plt.title('Visualizing gradient for whole CONV-5 layer', fontsize='x-large')
#     plt.axis('off')
#     plt.imshow(np.array(output_image))
#     plt.show()

    return fig, fig_whole, random_image_path


wandb.init()
fig, fig_whole, random_image_path = guided_backprop(config, model)
wandb.log({'random_image': wandb.Image(Image.open(random_image_path))})
wandb.log({'guided_backprop_10_neurons_visualization': fig})
wandb.log({'guided_backprop_whole_visualization': fig_whole})
# wandb.log({'Gradcam_visualization': fig_gradcam})
wandb.finish()
