In [54]:
import os
import numpy as np
import pandas as pd
import tensorflow as tf
import keras
from keras.preprocessing.image import ImageDataGenerator
from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D, GlobalAveragePooling2D, Dense, Flatten
from keras import regularizers
import keras.backend as K
from keras.models import load_model
from tensorflow.core.util import event_pb2
from tensorflow.python.lib.io import tf_record
import matplotlib.pyplot as plt
import sklearn.metrics as metrics
import csv
import tensorflow.compat.v2 as tf
import tensorflow_datasets as tfds
from typing import Union

In [14]:
train_path_01_SSA = os.path.join(os.getcwd(), 'mhist_dataset', 'mhist_dataset', 'HMT_train', '01_SSA')
train_path_02_HP = os.path.join(os.getcwd(), 'mhist_dataset', 'mhist_dataset', 'HMT_train', '02_HP')

test_path_01_SSA = os.path.join(os.getcwd(), 'mhist_dataset', 'mhist_dataset', 'HMT_test', '01_SSA')
test_path_02_HP = os.path.join(os.getcwd(), 'mhist_dataset', 'mhist_dataset', 'HMT_test', '02_HP')

#print(train_path_01_SSA)

/Users/nirmal/Downloads/Project_B_Supp/mhist_dataset/mhist_dataset/HMT_train/01_SSA


# Preprocessing the data into train and test folders

In [16]:
import shutil

annotation_path = os.path.join(os.getcwd(), 'mhist_dataset', 'annotations.csv')
with open(annotation_path, 'r') as file:
    csvreader = csv.reader(file)
    header = next(csvreader)
    for row in csvreader:
        filePath = os.path.join(os.getcwd(), 'mhist_dataset', 'images', row[0])
        curr_image = os.path.join(os.getcwd(), 'mhist_dataset', 'images')
        
        if row[3] == 'train' and row[1] == 'SSA':
            shutil.copy(filePath, train_path_01_SSA)
        elif row[3] == 'train' and row[1] == 'HP':
            shutil.copy(filePath, train_path_02_HP)
            
        elif row[3] == 'test' and row[1] == 'SSA':
            shutil.copy(filePath, test_path_01_SSA)
        elif row[3] == 'test' and row[1] == 'HP':
            shutil.copy(filePath, test_path_02_HP)

In [125]:
# Reusing datagenerator from Project A and applying same data augmentations as Project A

train_dir = os.path.join(os.getcwd(), 'mhist_dataset', 'mhist_dataset', 'HMT_train') #you should change to your directory
test_dir = os.path.join(os.getcwd(), 'mhist_dataset', 'mhist_dataset', 'HMT_test') #you should change to your directory

train_datagen = ImageDataGenerator(rescale=1/255.,
shear_range=0.1,
rotation_range=15,
horizontal_flip=True,
vertical_flip=True)

test_datagen = ImageDataGenerator(rescale=1/255.)

train_generator = train_datagen.flow_from_directory(train_dir,
class_mode='categorical',
interpolation='bilinear',
target_size=(224, 224),
batch_size=32,
shuffle=True)

test_generator = test_datagen.flow_from_directory(test_dir,
class_mode='categorical',
interpolation='bilinear',
target_size=(224, 224),
batch_size=32,
shuffle=False)

Found 2175 images belonging to 2 classes.
Found 977 images belonging to 2 classes.


# Creating Teacher & Student Models

## Teacher Model

In [160]:
from keras.models import Model
from keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, InputLayer

resnet = tf.keras.applications.resnet_v2.ResNet50V2(
    include_top=False,
    weights='imagenet',
    input_tensor=None,
    input_shape=(224, 224, 3),
    pooling=None,
    classes=2,
)

output = resnet.layers[-1].output
output = keras.layers.Flatten()(output)
teacher_model = Model(resnet.input, output)

teacher_model.trainable = False

teacher = Sequential()
teacher.add(teacher_model)
teacher.add(Dense(2))

## Student with KD Model

In [166]:
mobilenet = tf.keras.applications.MobileNetV2(input_shape=(224, 224, 3),
                                               include_top=False,
                                               weights='imagenet')

output = mobilenet.layers[-1].output
output = keras.layers.Flatten()(output)
mobile_model = Model(mobilenet.input, output)

mobile_model.trainable = False

student_KD = Sequential()
student_KD.add(mobile_model)
student_KD.add(Dense(2))

## Student without KD Model

In [170]:
mobilenet = tf.keras.applications.MobileNetV2(input_shape=(224, 224, 3),
                                               include_top=False,
                                               weights='imagenet')
output = mobilenet.layers[-1].output
output = keras.layers.Flatten()(output)
mobile_model = Model(mobilenet.input, output)

mobile_model.trainable = False

student_noKD = Sequential()
student_noKD.add(mobile_model)
student_noKD.add(Dense(2))

# Creating Loss Functions

## Teacher loss function

In [169]:
def compute_teacher_loss(images, labels):
  # REUSING TASK 1 CODE
    
  #Compute the subclass_logits and apply cross entropy on the logits of the model.
  subclass_logits = teacher(images, training=True)

  #The model was built without softmax so the last layer or output will be the logits.
  cross_entropy_for_batch = tf.nn.softmax_cross_entropy_with_logits(labels=labels, logits=subclass_logits)

  #Since this is a batch of images, we need to average the loss and return it.
  cross_entropy_loss_value = tf.reduce_mean(cross_entropy_for_batch)

  return cross_entropy_loss_value

## Student with KD loss function

In [168]:
#@test {"output": "ignore"}

# Hyperparameters for distillation (need to be tuned).
ALPHA = 0.5 # task balance between cross-entropy and distillation loss
DISTILLATION_TEMPERATURE = 4. #temperature hyperparameter

def distillation_loss(teacher_logits: tf.Tensor, student_logits: tf.Tensor,
                      temperature: Union[float, tf.Tensor]):

  # RESUSING TASK 1 CODE

  soft_targets = tf.nn.softmax(teacher_logits/temperature)
  return tf.reduce_mean(
      tf.nn.softmax_cross_entropy_with_logits(
          soft_targets, student_logits / temperature)) * temperature ** 2

def compute_student_loss(images, labels):
  # RESUSING TASK 1 CODE

  student_subclass_logits = student_KD(images, training=True)
  teacher_subclass_logits = teacher(images, training=False)
  distillation_loss_value = distillation_loss(teacher_subclass_logits, student_subclass_logits, DISTILLATION_TEMPERATURE)

  cross_entropy_loss_batch = tf.nn.softmax_cross_entropy_with_logits(labels=labels, logits=student_subclass_logits)
  hard_cross_entropy_loss = tf.reduce_mean(cross_entropy_loss_batch)

  cross_entropy_loss_value = ALPHA*hard_cross_entropy_loss + (1-ALPHA)*distillation_loss_value

  return cross_entropy_loss_value

## Student without KD loss function

In [208]:
def compute_student_noKD_loss(images, labels):
  # REUSING TASK 1 CODE
    
  #Compute the subclass_logits and apply cross entropy on the logits of the model.
  subclass_logits = student_noKD(images, training=True)

  #The model was built without softmax so the last layer or output will be the logits.
  cross_entropy_for_batch = tf.nn.softmax_cross_entropy_with_logits(labels=labels, logits=subclass_logits)

  #Since this is a batch of images, we need to average the loss and return it.
  cross_entropy_loss_value = tf.reduce_mean(cross_entropy_for_batch)

  return cross_entropy_loss_value

## Train and evaluation

In [218]:
logits = 0
@tf.function
def compute_num_correct(model, images, labels):
  # REUSING TASK 1 CODE
  class_logits = model(images, training=False)
  #print(tf.argmax(labels, -1))
  return tf.reduce_sum(
      tf.cast(tf.math.equal(tf.argmax(class_logits, -1), tf.argmax(labels, -1)),
              tf.float32)), tf.argmax(class_logits, -1), tf.argmax(labels, -1)


def train_and_evaluate(model, compute_loss_fn, l_rate):
  # REUSING TASK 1 CODE
  optimizer = tf.keras.optimizers.Adam(learning_rate=l_rate)
  
    
  for epoch in range(NUM_EPOCHS):
    # Run training.
    print('Epoch {}: '.format(epoch), end='')
    for i in range(len(train_generator)):
          images, labels = train_generator[i]
          #print(len(labels))
          #print(len(images))
          #print(i)
          with tf.GradientTape() as tape:
            loss_value = compute_loss_fn(images, labels)

          # Obtained from tensorflow documentation
          grads = tape.gradient(loss_value, model.trainable_variables)
          optimizer.apply_gradients(zip(grads, model.trainable_variables))

    # Run evaluation.
    num_correct = 0
    num_total = len(test_generator)*32
    for i in range(len(test_generator)):
        images, labels = test_generator[i] 
        #print(compute_num_correct(model, images, labels)[0]) #Prints sum of correct answers
        #print(compute_num_correct(model, images, labels)[1]) #Numbers that were predicted correctly
        num_correct += compute_num_correct(model, images, labels)[0]
        
        
        # The logits output do not make sense as they do not map well with the labels very often
        # I suspect that there is an issue with the implementation of the model itself.
        
        #logits = model(images, training=False)
        #print(logits)
        #print(labels)
        #from sklearn.metrics import precision_score
        #precision = precision_score(labels, logits, labels=[0,1], average='micro')
        #print("The precision on the test set is " + str(precision))

        #from sklearn.metrics import recall_score
        #recall = recall_score(labels, logits, labels=[0,1], average='micro')
        #print("The recall on the test set is "+str(recall))

        #f_1 = (2 * precision * recall) / (precision + recall)
        #print("The F-1 score on the test set is "+str(f_1))
        
    print("Class_accuracy: " + '{:.2f}%'.format(
            num_correct / num_total * 100))


## Training and Test models

In [153]:
teacher.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=base_learning_rate),
              loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
              metrics=['accuracy'])

loss, acc = teacher.evaluate(test_generator)



In [220]:
# Training and Testing for Teacher Model.
NUM_EPOCHS=10
train_generator.reset()
test_generator.reset()
print("Testing the Teacher Model")
train_and_evaluate(teacher, compute_teacher_loss, l_rate=1e-4)

train_generator.reset()
test_generator.reset()
print("Testing the Student with KD Model")
train_and_evaluate(student_KD, compute_student_loss, l_rate=1e-3)

train_generator.reset()
test_generator.reset()
print("Testing the Student without KD Model")
train_and_evaluate(student_noKD, compute_student_noKD_loss, l_rate=1e-3)

Testing the Teacher Model
Epoch 0: Class_accuracy: 71.47%
Epoch 1: Class_accuracy: 76.31%
Epoch 2: Class_accuracy: 77.22%
Epoch 3: Class_accuracy: 77.32%
Epoch 4: Class_accuracy: 73.39%
Epoch 5: Class_accuracy: 73.39%
Epoch 6: Class_accuracy: 76.11%
Epoch 7: Class_accuracy: 74.90%
Epoch 8: Class_accuracy: 77.12%
Epoch 9: Class_accuracy: 73.89%
Testing the Student with KD Model
Epoch 0: Class_accuracy: 73.59%
Epoch 1: Class_accuracy: 64.82%
Epoch 2: Class_accuracy: 68.75%
Epoch 3: Class_accuracy: 68.15%
Epoch 4: Class_accuracy: 68.45%
Epoch 5: Class_accuracy: 74.29%
Epoch 6: Class_accuracy: 71.47%
Epoch 7: Class_accuracy: 70.77%
Epoch 8: Class_accuracy: 72.68%
Epoch 9: Class_accuracy: 72.58%
Testing the Student without KD Model
Epoch 0: Class_accuracy: 72.68%
Epoch 1: Class_accuracy: 72.38%
Epoch 2: Class_accuracy: 73.89%
Epoch 3: Class_accuracy: 75.30%
Epoch 4: Class_accuracy: 75.71%
Epoch 5: Class_accuracy: 74.50%
Epoch 6: Class_accuracy: 74.70%
Epoch 7: Class_accuracy: 72.88%
Epoch 8

# Comparing the Teacher and Student models (number of of parameters and FLOPs)

In [206]:
# your code start from here for step 8
'''
#The following FLOPS code obtained from Keras-Flops:
/***************************************************************************************
*    Title: FLOPs calculator for neural network architecture written in tensorflow
*    Author: tokusumi
*    Date: August 17, 2020
*    Code version: N/A
*    Availability: https://github.com/tokusumi/keras-flops
*
***************************************************************************************/
'''
from keras_flops import get_flops
flops = get_flops(teacher, batch_size=32)
print("Teacher Model: ")
print(f"FLOPS: {flops / 10**9:.03} G")

flops = get_flops(student_KD, batch_size=32)
print("Student Model with KD: ")
print(f"FLOPS: {flops / 10**9:.03} G")

flops = get_flops(student_noKD, batch_size=32)
print("Student Model without KD: ")
print(f"FLOPS: {flops / 10**9:.03} G")

Instructions for updating:
Use `tf.compat.v1.graph_util.tensor_shape_from_node_def_name`
Teacher Model: 
FLOPS: 2.24e+02 G
Student Model with KD: 
FLOPS: 19.6 G
Student Model without KD: 
FLOPS: 19.6 G


# Fine Tuning

In [None]:
#print("Number of layers in the base model: ", len(teacher_model.layers))

# Fine-tune from this layer onwards
#fine_tune_at = 180

# Freeze all the layers before the `fine_tune_at` layer
#for layer in teacher_model.layers[:fine_tune_at]:
#  layer.trainable = False

#import pandas as pd
#pd.set_option('max_colwidth', -1)
#layers = [(layer, layer.name, layer.trainable) for layer in teacher_model.layers]
#pd.DataFrame(layers, columns=['Layer Type', 'Layer Name', 'Layer Trainable']) 