# Project B: Knowledge Distillation for Building Lightweight Deep Learning Models in Visual Classification Tasks

In [1]:
import tensorflow.compat.v2 as tf
import tensorflow_datasets as tfds
from typing import Union

tf.enable_v2_behavior()

# Not originally included
import numpy as np
import os
import csv
from keras.preprocessing.image import ImageDataGenerator


BATCH_SIZE = 2
NUM_INIT_EPOCHS = 10
NUM_FINE_EPOCHS = 25

ENTROPY_ZERO_FILLER = 1e-15
NUM_CLASSES = 2 #SSA or HP
TRAIN_BATCHES = int(np.floor(2176/BATCH_SIZE))
TEST_BATCHES = int(np.floor(976/BATCH_SIZE))

# For quick testing purposes ONLY
"""
TRAIN_BATCHES = 4
TEST_BATCHES = 4
"""

'\nTRAIN_BATCHES = 4\nTEST_BATCHES = 4\n'

## Data loading/augmenting

In [2]:
# Adapted from Project A's HMT Dataset loading code to use the ImageDataGenerator to augment the data and preload labels/image batches

img_dir = 'mhist_dataset/images'
train_dir = 'mhist_dataset/images/train'
test_dir = 'mhist_dataset/images/test'
anno_csv = 'mhist_dataset/annotations.csv'


if not os.path.isdir(train_dir):
    os.mkdir(train_dir)
if not os.path.isdir(test_dir):
    os.mkdir(test_dir)
    
if not os.path.isdir(os.path.join(train_dir, '01_HP')):
    os.mkdir(os.path.join(train_dir, '01_HP'))
if not os.path.isdir(os.path.join(train_dir, '02_SSA')):
    os.mkdir(os.path.join(train_dir, '02_SSA'))
if not os.path.isdir(os.path.join(test_dir, '01_HP')):
    os.mkdir(os.path.join(test_dir, '01_HP'))
if not os.path.isdir(os.path.join(test_dir, '02_SSA')):
    os.mkdir(os.path.join(test_dir, '02_SSA'))
    
# load csv
# label struct: [HP, SSA]
# labels as a list are sorted in alphabetical order as per the csv

train_labels = []
test_labels = []
train_img = []
test_img = []

with open(anno_csv, 'r') as csvfile:
    first_row = True
    for row in csv.reader(csvfile):
        if first_row:
            first_row = False
            continue
        if row[3] == 'train':
            train_img.append(row[0])
            if row[1] == 'HP':
                train_labels.append([1, 0])
            elif row[1] == 'SSA':
                train_labels.append([0, 1])
        elif row[3] == 'test':
            test_img.append(row[0])
            if row[1] == 'HP':
                test_labels.append([1, 0])
            elif row[1] == 'SSA':
                test_labels.append([0, 1])
        if row[0] in os.listdir(img_dir):
            if row[1] == 'HP' and row[3] == 'train':
                os.rename(os.path.join(img_dir, row[0]), os.path.join(train_dir, '01_HP', row[0]))
            elif row[1] == 'SSA' and row[3] == 'train':
                os.rename(os.path.join(img_dir, row[0]), os.path.join(train_dir, '02_SSA', row[0]))
            elif row[1] == 'HP' and row[3] == 'test':
                os.rename(os.path.join(img_dir, row[0]), os.path.join(test_dir, '01_HP', row[0]))
            elif row[1] == 'SSA' and row[3] == 'test':
                os.rename(os.path.join(img_dir, row[0]), os.path.join(test_dir, '02_SSA', row[0]))
    
# Data Augmentation using ImageDataGenerator
train_datagen = ImageDataGenerator(rescale=1/255.,
shear_range=0.1,
rotation_range=15,
horizontal_flip=True,
vertical_flip=True)

test_datagen = ImageDataGenerator(rescale=1/255.)

train_generator = train_datagen.flow_from_directory(train_dir,
class_mode='categorical',
interpolation='bilinear',
target_size=(224, 224),
batch_size=BATCH_SIZE,
shuffle=True) # 68 batches of 32

t2_generator = test_datagen.flow_from_directory(train_dir,
class_mode='categorical',
interpolation='bilinear',
target_size=(224, 224),
batch_size=BATCH_SIZE,
shuffle=True) # 68 batches of 32

test_generator = test_datagen.flow_from_directory(test_dir,
class_mode='categorical',
interpolation='bilinear',
target_size=(224, 224),
batch_size=BATCH_SIZE,
shuffle=False) # 30.5 batches of 32

Found 2176 images belonging to 2 classes.
Found 2176 images belonging to 2 classes.
Found 976 images belonging to 2 classes.


# Model creation

In [9]:
#@test {"output": "ignore"}
# citing https://www.tensorflow.org/api_docs/python/tf/keras/applications/resnet_v2/ResNet50V2, https://www.kaggle.com/code/suniliitb96/tutorial-keras-transfer-learning-with-resnet50/notebook,
# https://www.tensorflow.org/api_docs/python/tf/keras/applications/mobilenet_v2/MobileNetV2 and https://github.com/Abhi-T/MNIST-CLASSIFIER-From-Scratch/blob/main/MNIST__handwritten_digit_Model.ipynb

# Build CNN teacher.

teacher_model = tf.keras.Sequential()

# your code start from here for step 2

teacher_model.add(tf.keras.applications.resnet_v2.ResNet50V2(classifier_activation = None, input_shape = (224, 224, 3)))
teacher_model.add(tf.keras.layers.Dense(NUM_CLASSES)) 
print(teacher_model.summary())

# Build fully connected students
student_kd_model = tf.keras.Sequential()
student_kd_model.add(tf.keras.applications.mobilenet_v2.MobileNetV2(classifier_activation = None, input_shape = (224, 224, 3)))
student_kd_model.add(tf.keras.layers.Dense(NUM_CLASSES)) 
print(student_kd_model.summary())

student_scratch_model = tf.keras.Sequential()
student_scratch_model.add(tf.keras.applications.mobilenet_v2.MobileNetV2(classifier_activation = None, input_shape = (224, 224, 3)))
student_scratch_model.add(tf.keras.layers.Dense(NUM_CLASSES)) 
print(student_scratch_model.summary())


# your code start from here for step 2




Model: "sequential_3"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 resnet50v2 (Functional)     (None, 1000)              25613800  
                                                                 
 dense_3 (Dense)             (None, 2)                 2002      
                                                                 
Total params: 25,615,802
Trainable params: 25,570,362
Non-trainable params: 45,440
_________________________________________________________________
None
Model: "sequential_4"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 mobilenetv2_1.00_224 (Funct  (None, 1000)             3538984   
 ional)                                                          
                                                                 
 dense_4 (Dense)             (None, 2)                 2002      
          

# Loss Function

## Teacher Loss Function

In [10]:
def compute_teacher_loss(model, images, labels, **kwargs):
    """Compute class knowledge distillation teacher loss for given images
     and labels.

    Args:
    images: Tensor representing a batch of images.
    labels: Tensor representing a batch of labels.
    kwargs: n/a

    Returns:
    Scalar loss Tensor.
    """
    
    class_logits = model(images, training=True)

    # Compute cross-entropy loss for classes.
    
    cross_entropy_loss_value = tf.keras.losses.categorical_crossentropy(labels, tf.nn.softmax(class_logits))

    return cross_entropy_loss_value

## Student (KD) Loss Function

In [11]:
# adapted from https://keras.io/examples/vision/knowledge_distillation/

def distillation_loss(teacher_logits: tf.Tensor, student_logits: tf.Tensor,
                      temperature: Union[float, tf.Tensor]):
    """Compute distillation loss.

    This function computes cross entropy between softened logits and softened
    targets. The resulting loss is scaled by the squared temperature so that
    the gradient magnitude remains approximately constant as the temperature is
    changed. For reference, see Hinton et al., 2014, "Distilling the knowledge in
    a neural network."

    Args:
    teacher_logits: A Tensor of logits provided by the teacher.
    student_logits: A Tensor of logits provided by the student, of the same
      shape as `teacher_logits`.
    temperature: Temperature to use for distillation.

    Returns:
    A scalar Tensor containing the distillation loss.
    """
    # your code start from here for step 3
    soft_targets = teacher_logits / temperature

    return tf.reduce_mean(
      tf.nn.softmax_cross_entropy_with_logits(
          soft_targets, student_logits / temperature)) * temperature ** 2

def compute_student_loss(student_model, images, labels, **kwargs):
    """Compute class knowledge distillation student loss for given images
     and labels.

    Args:
    images: Tensor representing a batch of images.
    labels: Tensor representing a batch of labels.
    kwargs:
        teacher_model: Teacher model
        temperature: Temperature hyperparameter
        alpha: Alpha hyperparameter

    Returns:
    Scalar loss Tensor.
    """
    
    teacher_model = kwargs['teacher_model']
    temperature = kwargs['temperature']
    alpha = kwargs['alpha']
    
    student_class_logits = student_model(images, training=True)

    # Compute class distillation loss between student class logits and
    # softened teacher class targets probabilities.

    teacher_class_logits = teacher_model(images, training=False)
    distillation_loss_value = distillation_loss(teacher_class_logits, student_class_logits, temperature)

    # Compute cross-entropy loss with hard targets.
    
    cross_entropy_loss_value = tf.keras.losses.categorical_crossentropy(labels, tf.nn.softmax(student_class_logits))

    total_loss = alpha * cross_entropy_loss_value + (1 - alpha) * distillation_loss_value

    return total_loss

## Student (Scratch) Loss Function

In [12]:
def compute_student_scratch_loss(model, images, labels, **kwargs):
    """Compute class student (scratch) loss for given images
     and labels.

    Args:
    images: Tensor representing a batch of images.
    labels: Tensor representing a batch of labels.
    kwargs:
        temperature: Temperature hyperparameter

    Returns:
    Scalar loss Tensor.
    """
    temperature = kwargs['temperature']
    
    class_logits = model(images, training=True)

    # Compute cross-entropy loss for classes.
    
    cross_entropy_loss_value = tf.keras.losses.categorical_crossentropy(labels, tf.nn.softmax(class_logits/temperature)) * temperature ** 2

    return cross_entropy_loss_value

# Train and Evaluation

In [15]:
@tf.function
def compute_num_correct(model, images, labels):
    """Compute number of correctly classified images in a batch.

    Args:
    model: Instance of tf.keras.Model.
    images: Tensor representing a batch of images.
    labels: Tensor representing a batch of labels.

    Returns:
    Number of correctly classified images.
    """
    class_logits = model(images, training=False)
    return tf.reduce_sum(
        tf.cast(tf.math.equal(tf.argmax(class_logits, -1), tf.argmax(labels, -1)),
              tf.float32)), tf.argmax(class_logits, -1), tf.argmax(labels, -1)


def train_and_evaluate(model, compute_loss_fn, num_epochs, learning_rate, **kwargs):
    """Perform training and evaluation for a given model.

    Args:
    model: Main Instance of tf.keras.Model.
    compute_loss_fn: A function that computes the training loss given the
        images, and labels.
    num_epochs: Number of epochs to train for
    learning_rate: Optimizer learning rate
    kwargs: Passed through to loss fn
    """

    # your code start from here for step 4
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

    train_generator.reset()
    
    accuracy = 0
    
    for epoch in range(1, num_epochs + 1):
        # Run training.
        print('Epoch {}: '.format(epoch), end='')

        #for images, labels in mhist_train:
        for batch in range(TRAIN_BATCHES):
            
            images, labels = train_generator.next()

            with tf.GradientTape() as tape:

                loss_value = compute_loss_fn(model, images, labels, **kwargs)

            grads = tape.gradient(loss_value, model.trainable_variables)
            optimizer.apply_gradients(zip(grads, model.trainable_variables))

        # Run evaluation.
        num_correct = 0
        num_total = BATCH_SIZE*TEST_BATCHES
        
        test_generator.reset()
        
        for batch in range(TEST_BATCHES):
            images, labels = test_generator.next()
            num_correct += compute_num_correct(model,images,labels)[0]
        accuracy = num_correct / num_total * 100
        print("Class_accuracy: " + '{:.2f}%'.format(accuracy))
    
    return accuracy


# Training models

In [16]:
# your code start from here for step 5 

print("Teacher Run: Initial")
train_and_evaluate(teacher_model, compute_teacher_loss, NUM_INIT_EPOCHS, 1e-4)
print("\nTeacher Run: Fine")
teach_acc = train_and_evaluate(teacher_model, compute_teacher_loss, NUM_FINE_EPOCHS, 1e-5)

print("\n\nStudent (KD) Run: Initial")
train_and_evaluate(student_kd_model, compute_student_loss, NUM_INIT_EPOCHS, 1e-3, teacher_model=teacher_model, temperature=4, alpha=0.5)
print("\nStudent (KD) Run: Fine")
skd_acc = train_and_evaluate(student_kd_model, compute_student_loss, NUM_FINE_EPOCHS, 1e-4, teacher_model=teacher_model, temperature=4, alpha=0.5)

print("\n\nStudent (Scratch) Run: Initial")
train_and_evaluate(student_scratch_model, compute_student_scratch_loss, NUM_INIT_EPOCHS, 1e-3, temperature=4)
print("\nStudent (Scratch) Run: Fine")
ss_acc = train_and_evaluate(student_scratch_model, compute_student_scratch_loss, NUM_FINE_EPOCHS, 1e-4, temperature=4)

Teacher Run: Initial
Epoch 1: Class_accuracy: 77.46%
Epoch 2: Class_accuracy: 62.70%
Epoch 3: Class_accuracy: 68.44%
Epoch 4: Class_accuracy: 76.33%
Epoch 5: Class_accuracy: 60.66%
Epoch 6: Class_accuracy: 74.18%
Epoch 7: Class_accuracy: 75.10%
Epoch 8: Class_accuracy: 78.07%
Epoch 9: Class_accuracy: 79.51%
Epoch 10: Class_accuracy: 73.87%

Teacher Run: Fine
Epoch 1: Class_accuracy: 80.23%
Epoch 2: Class_accuracy: 81.97%
Epoch 3: Class_accuracy: 83.20%
Epoch 4: Class_accuracy: 82.48%
Epoch 5: Class_accuracy: 83.50%
Epoch 6: Class_accuracy: 82.89%
Epoch 7: Class_accuracy: 84.32%
Epoch 8: Class_accuracy: 83.30%
Epoch 9: Class_accuracy: 83.40%
Epoch 10: Class_accuracy: 83.50%
Epoch 11: Class_accuracy: 84.12%
Epoch 12: Class_accuracy: 84.53%
Epoch 13: Class_accuracy: 82.38%
Epoch 14: Class_accuracy: 85.45%
Epoch 15: Class_accuracy: 85.66%
Epoch 16: Class_accuracy: 85.04%
Epoch 17: Class_accuracy: 85.55%
Epoch 18: Class_accuracy: 85.86%
Epoch 19: Class_accuracy: 85.04%
Epoch 20: Class_accur

# Test Accuracy vs. Temperature Curve

In [None]:
# your code start from here for step 6

# Original student uses temperature 4
student_kd_model1 = tf.keras.Sequential()
student_kd_model1.add(tf.keras.applications.mobilenet_v2.MobileNetV2(classifier_activation = None, input_shape = (224, 224, 3)))
student_kd_model1.add(tf.keras.layers.Dense(NUM_CLASSES)) 

student_kd_model2 = tf.keras.Sequential()
student_kd_model2.add(tf.keras.applications.mobilenet_v2.MobileNetV2(classifier_activation = None, input_shape = (224, 224, 3)))
student_kd_model2.add(tf.keras.layers.Dense(NUM_CLASSES)) 

student_kd_model16 = tf.keras.Sequential()
student_kd_model16.add(tf.keras.applications.mobilenet_v2.MobileNetV2(classifier_activation = None, input_shape = (224, 224, 3)))
student_kd_model16.add(tf.keras.layers.Dense(NUM_CLASSES)) 

student_kd_model32 = tf.keras.Sequential()
student_kd_model32.add(tf.keras.applications.mobilenet_v2.MobileNetV2(classifier_activation = None, input_shape = (224, 224, 3)))
student_kd_model32.add(tf.keras.layers.Dense(NUM_CLASSES)) 

student_kd_model64 = tf.keras.Sequential()
student_kd_model64.add(tf.keras.applications.mobilenet_v2.MobileNetV2(classifier_activation = None, input_shape = (224, 224, 3)))
student_kd_model64.add(tf.keras.layers.Dense(NUM_CLASSES)) 

student_dict = {1: student_kd_model1, 2: student_kd_model2, 4: student_kd_model, 16: student_kd_model16, 32: student_kd_model32, 64: student_kd_model64}
acc_dict = {4: skd_acc}

for temp in [1, 2, 16, 32, 64]:
    print(f'Student (KD, Temperature = {temp}) Run: Initial')
    train_and_evaluate(student_dict[temp], compute_student_loss, NUM_INIT_EPOCHS, 1e-3, teacher_model=teacher_model, temperature=temp, alpha=0.5)
    print(f'\nStudent (KD, Temperature = {temp}) Run: Fine')
    acc_dict[temp] = train_and_evaluate(student_dict[temp], compute_student_loss, NUM_FINE_EPOCHS, 1e-4, teacher_model=teacher_model, temperature=temp, alpha=0.5)
    print('\n')

## Save Models

In [17]:
teacher_model.save('teacher.h5')
student_kd_model.save('student_kd4.h5')
student_scratch_model.save('student_scratch.h5')



# Comparing the teacher and student model (number of of parameters and FLOPs) 

In [None]:
# your code start from here for step 8


# Implementing the state-of-the-art KD algorithm

In [None]:
# your code start from here for step 13
