<a href="https://colab.research.google.com/github/yannnn126/2022CCE/blob/main/notebook/22.%E7%9F%A5%E8%AD%98%E8%92%B8%E9%A4%BE_%E9%90%B5%E4%BA%BA%E8%B3%BD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 知識蒸餾 Knowledge Distillation

- 此為鐵人賽系列文示範文件，參考[Keras官方範例](https://www.tensorflow.org/lite/performance/post_training_quantization)修改而成。

- 知識蒸餾  Knowledge Distillation 為模型壓縮技術，其中student模型從可以更複雜的 teacher 模型中 "學習" ，實作過程包含:
  1. 自定義一個`Distiller`類別。
  2. 用 CNN 訓練 teacher 模型。
  3. student 模型向 teacher 學習。
  4. 訓練一個沒向老師學的 student_scratch 模型進行比較。


In [1]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

import numpy as np
import os

In [None]:
ACCURACY = {}

## 準備資料

- 模型採用`tf.keras.datasets.mnist`，用CNN進行建模。

In [2]:
import tensorflow as tf

# 載入 CIFAR-10
cifar10 = tf.keras.datasets.cifar10
(train_images, train_labels), (test_images, test_labels) = cifar10.load_data()

# 正規化
train_images = train_images / 255.0
test_images = test_images / 255.0

# 標籤轉一維
train_labels = train_labels.flatten()
test_labels = test_labels.flatten()


Downloading data from https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz
[1m170498071/170498071[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 0us/step


## 建立Distiller類別

- 此直接使用 Keras 官方範例定義的 `Distiller` 類別。
- 該類別繼承於 `th.keras.Model`，並改寫以下方法:
  - `compile`：這個模型需要一些額外的參數來編譯，比如老師和學生的損失，alpha 和 temp 。
  - `train_step`：控制模型的訓練方式。這將是真正的知識蒸餾邏輯所在。這個方法就是你做的時候調用的方法model.fit。
  - `test_step`：控制模型的評估。這個方法就是你做的時候調用的方法model.evaluate。

In [7]:
class Distiller(keras.Model):
    def __init__(self, student, teacher):
        super(Distiller, self).__init__()
        self.teacher = teacher
        self.student = student

    def compile(
        self,
        optimizer,
        metrics,
        student_loss_fn,
        distillation_loss_fn,
        alpha=0.4,
        temperature=5,
    ):
        """ Configure the distiller.

        Args:
            optimizer: Keras optimizer for the student weights
            metrics: Keras metrics for evaluation
            student_loss_fn: Loss function of difference between student
                predictions and ground-truth
            distillation_loss_fn: Loss function of difference between soft
                student predictions and soft teacher predictions
            alpha: weight to student_loss_fn and 1-alpha to distillation_loss_fn
            temperature: Temperature for softening probability distributions.
                Larger temperature gives softer distributions.
        """
        super(Distiller, self).compile(optimizer=optimizer, metrics=metrics)
        self.student_loss_fn = student_loss_fn
        self.distillation_loss_fn = distillation_loss_fn
        self.alpha = alpha
        self.temperature = temperature

    def train_step(self, data):
    # Unpack data
      x, y = data

      # Forward pass of teacher
      teacher_predictions = self.teacher(x, training=False)

      with tf.GradientTape() as tape:
          # Forward pass of student
          student_predictions = self.student(x, training=True)

          # Compute losses
          student_loss = self.student_loss_fn(y, student_predictions)
          distillation_loss = self.distillation_loss_fn(
              tf.nn.softmax(teacher_predictions / self.temperature, axis=1),
              tf.nn.softmax(student_predictions / self.temperature, axis=1),
          )
          loss = self.alpha * student_loss + (1 - self.alpha) * distillation_loss

      # Compute gradients
      trainable_vars = self.student.trainable_variables
      gradients = tape.gradient(loss, trainable_vars)

      # Update weights
      self.optimizer.apply_gradients(zip(gradients, trainable_vars))

      # Update the metrics configured in `compile()`.
      self.compiled_metrics.update_state(y, student_predictions)

      # Return a dict of performance
      results = {m.name: m.result() for m in self.metrics}
      results.update(
          {"student_loss": student_loss, "distillation_loss": distillation_loss}
      )
      return results


    def test_step(self, data):
        # Unpack the data
        x, y = data

        # Compute predictions
        y_prediction = self.student(x, training=False)

        # Calculate the loss
        student_loss = self.student_loss_fn(y, y_prediction)

        # Update the metrics.
        self.compiled_metrics.update_state(y, y_prediction)

        # Return a dict of performance
        results = {m.name: m.result() for m in self.metrics}
        results.update({"student_loss": student_loss})
        return results


## 建立老師與學生模型

- 這裡定義大模型與小模型，老師使用大模型，學生使用小模型。
- 有兩個重要的事情需要注意：
  - 最後一層沒有使用激勵函數 softmax ，因為知識蒸餾需要原始權重特徵。
  - 通過 dropout 層的正則化將應用於教師而不是學生。這是因為學生應該能夠通過蒸餾過程學習這種正則化。

- 可以將學生模型視為教師模型的簡化（或壓縮）版本。

In [10]:
def big_model_builder():
    keras = tf.keras

    model = keras.Sequential([
        keras.layers.InputLayer(input_shape=(32, 32, 3)),
        keras.layers.Conv2D(filters=12, kernel_size=(3, 3), activation='relu'),
        keras.layers.MaxPooling2D(pool_size=(2, 2)),
        keras.layers.Conv2D(filters=12, kernel_size=(3, 3), activation='relu'),
        keras.layers.MaxPooling2D(pool_size=(2, 2)),
        keras.layers.Flatten(),
        keras.layers.Dense(10)
    ])

    return model


def small_model_builder():
    keras = tf.keras

    model = keras.Sequential([
        keras.layers.InputLayer(input_shape=(32, 32, 3)),
        keras.layers.Conv2D(filters=12, kernel_size=(3, 3), activation='relu'),
        keras.layers.MaxPooling2D(pool_size=(2, 2)),
        keras.layers.Flatten(),
        keras.layers.Dense(10)
    ])

    return model


In [None]:
teacher = big_model_builder()

student = small_model_builder()

student_scratch = small_model_builder()

## 訓練老師

In [11]:
# Train teacher as usual
teacher.compile(
    optimizer=keras.optimizers.Adam(),
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
)
teacher.summary()

# Train and evaluate teacher on data.
teacher.fit(train_images, train_labels, epochs=5)
_ , ACCURACY['teacher model'] = teacher.evaluate(test_images, test_labels)

NameError: name 'teacher' is not defined

## 透過知識蒸餾訓練學生

- 要執行知識提煉過程，您將使用您之前compline的模型。
- 為此，首先創建`Distiller`類別的實例並傳入學生和教師模型`distiller = Distiller(student=student, teacher=teacher)
`。然後用合適的參數編譯它並訓練它！

- 老師可以用更高的epochs，學生會向老師學習。

In [None]:
distiller = Distiller(student=student, teacher=teacher)
distiller.compile(
    optimizer=keras.optimizers.Adam(),
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
    student_loss_fn=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    distillation_loss_fn=keras.losses.KLDivergence(),
    alpha=0.1,
    temperature=10,
)

# Distill teacher to student
distiller.fit(
    train_images,
    train_labels,
    epochs=5,
    shuffle=False
)

# Evaluate student on test dataset
results = distiller.evaluate(test_images, test_labels)
ACCURACY['distiller student model'] = results[1]  # 確保存取 SparseCategoricalAccuracy

Epoch 1/5
[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m28s[0m 14ms/step - sparse_categorical_accuracy: 0.8659 - distillation_loss: 0.0507 - loss: -4.7086 - student_loss: 0.2686
Epoch 2/5
[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m27s[0m 15ms/step - sparse_categorical_accuracy: 0.9700 - distillation_loss: 0.0151 - loss: -7.2765 - student_loss: 0.0916
Epoch 3/5
[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m41s[0m 15ms/step - sparse_categorical_accuracy: 0.9776 - distillation_loss: 0.0117 - loss: -7.4786 - student_loss: 0.0709
Epoch 4/5
[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m27s[0m 15ms/step - sparse_categorical_accuracy: 0.9809 - distillation_loss: 0.0104 - loss: -7.4649 - student_loss: 0.0614
Epoch 5/5
[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m40s[0m 14ms/step - sparse_categorical_accuracy: 0.9833 - distillation_loss: 0.0095 - loss: -7.3888 - student_loss: 0.0547
[1m313/313[0m [32m━━━━━━━━━

In [None]:
ACCURACY

{'teacher model': 0.9848999977111816,
 'distiller student model': {'sparse_categorical_accuracy': <tf.Tensor: shape=(), dtype=float32, numpy=0.9781000018119812>}}

## 比較模型 - 從頭訓練學生

In [None]:
# Train student as doen usually
student_scratch.compile(
    optimizer=keras.optimizers.Adam(),
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
)
student_scratch.summary()

# Train and evaluate student trained from scratch.
student_scratch.fit(
    train_images,
    train_labels,
    epochs=5,
    shuffle=False
    )
# student_scratch.evaluate(x_test, y_test)
_, ACCURACY['student from scrath model'] = student_scratch.evaluate(test_images, test_labels)

Epoch 1/5
[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 11ms/step - loss: 0.4807 - sparse_categorical_accuracy: 0.8739
Epoch 2/5
[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m19s[0m 10ms/step - loss: 0.1238 - sparse_categorical_accuracy: 0.9649
Epoch 3/5
[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m22s[0m 11ms/step - loss: 0.0891 - sparse_categorical_accuracy: 0.9747
Epoch 4/5
[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m19s[0m 10ms/step - loss: 0.0717 - sparse_categorical_accuracy: 0.9795
Epoch 5/5
[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m20s[0m 11ms/step - loss: 0.0607 - sparse_categorical_accuracy: 0.9825
[1m313/313[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 4ms/step - loss: 0.0748 - sparse_categorical_accuracy: 0.9742


## 小結

In [None]:
ACCURACY

{'teacher model': 0.9848999977111816,
 'distiller student model': {'sparse_categorical_accuracy': <tf.Tensor: shape=(), dtype=float32, numpy=0.9781000018119812>},
 'student from scrath model': 0.9787999987602234}

- 老師的準確率應會高於學生，畢竟可以採用大模型、更多的epoch等方式優化。
- 「接受知識蒸餾的學生」表現通常會優於「自己從頭開始的學生」。
- 學生的模型雖然較簡易，知識蒸餾甚至會青出於藍勝於藍。

## 參考
- https://keras.io/examples/vision/knowledge_distillation/