In [1]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np

In [2]:
from tensorflow.keras import activations
from statistics import mean

In [3]:
class Distiller(keras.Model):
    def __init__(self, student, teacher):
        super(Distiller, self).__init__()
        self.teacher = teacher
        self.student = student

    def compile(
        self,
        optimizer,
        metrics,
        student_loss_fn,
        distillation_loss_fn,
        alpha=0.1,
        temperature=3,
    ):
        """ Configure the distiller.

        Args:
            optimizer: Keras optimizer for the student weights
            metrics: Keras metrics for evaluation
            student_loss_fn: Loss function of difference between student
                predictions and ground-truth
            distillation_loss_fn: Loss function of difference between soft
                student predictions and soft teacher predictions
            alpha: weight to student_loss_fn and 1-alpha to distillation_loss_fn
            temperature: Temperature for softening probability distributions.
                Larger temperature gives softer distributions.
        """
        super(Distiller, self).compile(optimizer=optimizer, metrics=metrics)
        self.student_loss_fn = student_loss_fn
        self.distillation_loss_fn = distillation_loss_fn
        self.alpha = alpha
        self.temperature = temperature

    def train_step(self, data):
        # Unpack data
        x, y = data

        # Forward pass of teacher
        teacher_predictions = self.teacher(x, training=False)

        with tf.GradientTape() as tape:
            # Forward pass of student
            student_predictions = self.student(x, training=True)

            # Compute losses
            student_loss = self.student_loss_fn(y, student_predictions)
            distillation_loss = self.distillation_loss_fn(
                tf.nn.softmax(teacher_predictions / self.temperature, axis=1),
                tf.nn.softmax(student_predictions / self.temperature, axis=1),
            )
            loss = self.alpha * student_loss + (1 - self.alpha) * distillation_loss

        # Compute gradients
        trainable_vars = self.student.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        # Update the metrics configured in `compile()`.
        self.compiled_metrics.update_state(y, student_predictions)

        # Return a dict of performance
        results = {m.name: m.result() for m in self.metrics}
        results.update(
            {"student_loss": student_loss, "distillation_loss": distillation_loss}
        )
        return results

    def test_step(self, data):
        # Unpack the data
        x, y = data

        # Compute predictions
        y_prediction = self.student(x, training=False)

        # Calculate the loss
        student_loss = self.student_loss_fn(y, y_prediction)

        # Update the metrics.
        self.compiled_metrics.update_state(y, y_prediction)

        # Return a dict of performance
        results = {m.name: m.result() for m in self.metrics}
        results.update({"student_loss": student_loss})
        return results

In [4]:
# Create the teacher
teacher = keras.Sequential(
    [
        keras.Input(shape=(28, 28, 1)),
        layers.Conv2D(256, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.MaxPooling2D(pool_size=(2, 2), strides=(1, 1), padding="same"),
        layers.Conv2D(512, (3, 3), strides=(2, 2), padding="same"),
        layers.Flatten(),
        layers.Dense(10,activation=activations.sigmoid),
    ],
    name="teacher",
)

# Create the student
student = keras.Sequential(
    [
        keras.Input(shape=(28, 28, 1)),
        layers.Conv2D(16, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.MaxPooling2D(pool_size=(2, 2), strides=(1, 1), padding="same"),
        layers.Conv2D(32, (3, 3), strides=(2, 2), padding="same"),
        layers.Flatten(),
        layers.Dense(10,activation=activations.sigmoid),
    ],
    name="student",
)

# Clone student for later comparison
student_scratch = keras.models.clone_model(student)

In [5]:
from sklearn.model_selection import train_test_split
import tensorflow as tf

DATASET_SIZE = 70000
TRAIN_RATIO = 0.5
VALIDATION_RATIO = 0.4
TEST_RATIO = 0.1

(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()

X = np.concatenate([x_train, x_test])
y = np.concatenate([y_train, y_test])



In [6]:
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=(1-TRAIN_RATIO))
X_val, X_test, y_val, y_test = train_test_split(X_val, y_val, test_size=((TEST_RATIO/(VALIDATION_RATIO+TEST_RATIO))))


X_train = X_train.astype("float32") / 255.0
X_train = np.reshape(X_train, (-1, 28, 28, 1))

X_test = X_test.astype("float32") / 255.0
X_test = np.reshape(X_test, (-1, 28, 28, 1))

X_val = X_val.astype("float32") / 255.0
X_val = np.reshape(X_val, (-1, 28, 28, 1))

In [7]:
# Train teacher as usual
teacher.compile(
    optimizer=keras.optimizers.Adam(),
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
)

# Train and evaluate teacher on data.
teacher.fit(X_train, y_train, epochs=1)
teacher.evaluate(X_test, y_test)

  return dispatch_target(*args, **kwargs)




[0.11583765596151352, 0.9634285569190979]

In [8]:
# Initialize and compile distiller
distiller = Distiller(student=student, teacher=teacher)
distiller.compile(
    optimizer=keras.optimizers.Adam(),
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
    student_loss_fn=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    distillation_loss_fn=keras.losses.KLDivergence(),
    alpha=0.1,
    temperature=10,
)

# Distill teacher to student
distiller.fit(X_train, y_train, epochs=3)

# Evaluate student on test dataset
distiller.evaluate(X_test, y_test)

Epoch 1/3
Epoch 2/3
Epoch 3/3


[0.9727143049240112, 0.01151325087994337]

In [9]:
# Train student as doen usually
student_scratch.compile(
    optimizer=keras.optimizers.Adam(),
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
)

# Train and evaluate student trained from scratch.
student_scratch.fit(X_train, y_train, epochs=3)
student_scratch.evaluate(X_test, y_test)

Epoch 1/3
Epoch 2/3
Epoch 3/3


[0.09160300344228745, 0.9708571434020996]

In [10]:
st = student_scratch.predict(X_val)
te = teacher.predict(X_val)



In [11]:
a = sum(st)/len(st)
b = sum(te)/len(te)

In [12]:
a

array([0.36415863, 0.29683653, 0.5021533 , 0.5268581 , 0.35597712,
       0.3947639 , 0.30693948, 0.45571566, 0.6729485 , 0.5363645 ],
      dtype=float32)

In [13]:
dist = np.linalg.norm(a-b)

In [14]:
print(dist)

0.27411267


In [15]:
te

array([[5.06090820e-02, 1.24666452e-01, 4.86135483e-04, ...,
        9.99999285e-01, 3.99537027e-01, 9.78013635e-01],
       [1.00001991e-02, 8.54404425e-05, 6.99907541e-04, ...,
        1.16840005e-03, 9.09332573e-01, 9.45450008e-01],
       [2.30843633e-01, 1.22550696e-01, 2.23343462e-01, ...,
        9.99997377e-01, 5.86990297e-01, 9.51521754e-01],
       ...,
       [2.87127554e-01, 3.55236828e-02, 1.05021000e-02, ...,
        9.94034588e-01, 2.13733047e-01, 9.99998093e-01],
       [7.56102860e-01, 7.61580467e-03, 9.99934137e-01, ...,
        1.00814253e-01, 5.47037125e-01, 6.64544940e-01],
       [8.84786844e-02, 9.65439200e-01, 1.67714119e-01, ...,
        9.95305181e-01, 8.18448067e-01, 9.98833060e-01]], dtype=float32)

In [16]:
st

array([[1.43336058e-02, 3.82459164e-02, 1.04820728e-03, ...,
        9.99999523e-01, 5.44908702e-01, 9.98031199e-01],
       [2.51694322e-02, 3.84807587e-04, 1.03825629e-02, ...,
        3.45051289e-04, 9.63757873e-01, 9.89703536e-01],
       [1.47953808e-01, 1.61029011e-01, 2.83679307e-01, ...,
        9.99999523e-01, 3.84923637e-01, 9.91968989e-01],
       ...,
       [4.07401323e-02, 5.96967340e-02, 7.94822574e-02, ...,
        9.94008064e-01, 3.68721187e-01, 9.99990106e-01],
       [3.28835964e-01, 6.97404146e-04, 9.99994516e-01, ...,
        1.25106305e-01, 8.48600268e-01, 1.37853146e-01],
       [2.98619270e-02, 9.89575863e-01, 1.02544725e-01, ...,
        9.92730141e-01, 7.37377048e-01, 9.98594761e-01]], dtype=float32)

In [17]:
training_error = 0   
for i in range(28000):       
    out = st[i] - X_val[i]       
    out = out*out       
    training_error+=out    
training_error = training_error/28000    
training_error = sum(training_error)

In [18]:
avg = sum(training_error)/len(training_error)


In [19]:
bavg= sum(avg)/len(avg)

In [20]:
bavg

10.233070468902588

In [21]:
student_scratch.evaluate(X_train, y_train)



[0.0697353407740593, 0.9793714284896851]

In [24]:
class Distiller_new(keras.Model):
    def __init__(self, student, teacher):
        super(Distiller_new, self).__init__()
        self.teacher = teacher
        self.student = student

    def compile(
        self,
        optimizer,
        metrics,
        student_loss_fn,
        distillation_loss_fn,
        alpha=0.1,
        temperature=3,
    ):
        """ Configure the distiller.

        Args:
            optimizer: Keras optimizer for the student weights
            metrics: Keras metrics for evaluation
            student_loss_fn: Loss function of difference between student
                predictions and ground-truth
            distillation_loss_fn: Loss function of difference between soft
                student predictions and soft teacher predictions
            alpha: weight to student_loss_fn and 1-alpha to distillation_loss_fn
            temperature: Temperature for softening probability distributions.
                Larger temperature gives softer distributions.
        """
        super(Distiller_new, self).compile(optimizer=optimizer, metrics=metrics)
        self.student_loss_fn = student_loss_fn
        self.distillation_loss_fn = distillation_loss_fn
        self.alpha = alpha
        self.temperature = temperature
        
    def train_step(self, data):
        # Unpack data
        x, y = data

        # Forward pass of teacher
        teacher_predictions = self.teacher(x, training=False)

        with tf.GradientTape() as tape:
            # Forward pass of student
            student_predictions = self.student(x, training=True)

            # Compute losses
            training_loss = 6e-02
            distill_loss = dist
            loss = self.alpha * training_loss + (1 - self.alpha) * distill_loss

        # Compute gradients
        trainable_vars = self.student.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        # Update the metrics configured in `compile()`.
        self.compiled_metrics.update_state(y, student_predictions)

        # Return a dict of performance
        results = {m.name: m.result() for m in self.metrics}
        results.update(
            {"student_loss": student_loss, "distillation_loss": distillation_loss}
        )
        return results


    def test_step(self, data):
        # Unpack the data
        x, y = data

        # Compute predictions
        y_prediction = self.student(x, training=False)

        # Calculate the loss
        student_loss = self.student_loss_fn(y, y_prediction)

        # Update the metrics.
        self.compiled_metrics.update_state(y, y_prediction)

        # Return a dict of performance
        results = {m.name: m.result() for m in self.metrics}
        results.update({"student_loss": student_loss})
        return results










In [25]:
distiller1 = Distiller_new(student=student, teacher=teacher)
distiller1.compile(
    optimizer=keras.optimizers.Adam(),
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
    student_loss_fn=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    distillation_loss_fn=keras.losses.KLDivergence(),
    alpha=0.1,
    temperature=10,
)

# Distill teacher to student
distiller1.fit(X_train, y_train, epochs=3)

# Evaluate student on test dataset
distiller1.evaluate(X_test, y_test)

Epoch 1/3


AttributeError: in user code:

    File "C:\Users\Tejaswi\anaconda3\lib\site-packages\keras\engine\training.py", line 878, in train_function  *
        return step_function(self, iterator)
    File "C:\Users\Tejaswi\anaconda3\lib\site-packages\keras\engine\training.py", line 867, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "C:\Users\Tejaswi\anaconda3\lib\site-packages\keras\engine\training.py", line 860, in run_step  **
        outputs = model.train_step(data)
    File "C:\Users\Tejaswi\AppData\Local\Temp/ipykernel_3880/830864529.py", line 53, in train_step
        gradients = tape.gradient(loss, trainable_vars)

    AttributeError: 'numpy.float64' object has no attribute '_id'


In [87]:
student_loss

NameError: name 'student_loss' is not defined

In [None]:
from utils import losses, saveload
from models import lenet5, Resnet2,Resnet4


def get_model(model,compile=True, weights=None, init_temp=2.5):
    if isinstance(model, str):
        if model == 'lenet5':
            model = lenet5.get_model(dataset_name, compile=False, softmax=False)
        elif model == 'Resnet2':
            model = Resnet2.get_model(dataset_name, compile=False, softmax=False)
        elif model == 'Resnet4':
            model == Resnet4.get_model(dataset_name, compile=False, softmax=False)
        else:
            raise ValueError("model not defined")

    if weights:
        saveload.load_weights(model, weights)

    if compile:
        model.compile(optimizer='adam', loss=losses.ENDLoss(init_temp=init_temp))

    return model