Knowledge Distillation 
===============================

**Author**: [Clara Martinez](https://github.com/moonblume/LIVIA.git)


Knowledge distillation is a technique that enables knowledge transfer
from large, computationally expensive models to smaller ones without
losing validity. This allows for deployment on less powerful hardware,
making evaluation faster and more efficient.

Librairies
================


In [101]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "2"
import pandas as pd
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, TensorDataset
from torch.optim.lr_scheduler import ReduceLROnPlateau

from sklearn.model_selection import train_test_split
from scipy.signal import savgol_filter

from typing import List, Union, Tuple, Any
import statistics

# Check if GPU is available, and if not, use the CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

Loading dataset
================

Fisrt, I focus on the physiological signals of the Biovid dataset. In one sample, we have access to 6 classes associated with 0 to 4 pain levels :  

Time: This could be the timestamp or time index when the signal was recorded.

GSR (Galvanic Skin Response): A measure of the electrical conductance of the skin, which varies with the moisture level of the skin. It's often associated with emotional arousal.

ECG (Electrocardiogram): A recording of the electrical activity of the heart over time. It typically consists of waves representing the depolarization and repolarization of the heart muscle during each heartbeat.

EMG (Electromyography) - Trapezius: Measures the electrical activity produced by skeletal muscles. The trapezius muscle is a large superficial muscle that extends longitudinally from the occipital bone to the lower thoracic vertebrae and laterally to the spine of the scapula.

EMG - Corrugator: Electromyography signal from the corrugator supercilii muscle, which is a small facial muscle involved in frowning and expressing negative emotions.

EMG - Zygomaticus: Electromyography signal from the zygomaticus major muscle, which is involved in smiling and expressing positive emotions.  
    
    
Our objective is to predict the pain level of input signals. One signal corresponds to one csv file.



In [104]:
# Define the directory containing the CSV files
biosignals_path = '/home/ens/AU59350/LIVIA/physio/physio_organised/'

# Initialize an empty list to store data for DataFrame
data = []

# Iterate over each pain level directory
for pain_level in os.listdir(biosignals_path):
    pain_level_dir = os.path.join(biosignals_path, pain_level)
    
    # Check if it's a directory
    if os.path.isdir(pain_level_dir):
        # Iterate over each CSV file in the pain level directory
        for csv_file in os.listdir(pain_level_dir):
            # Check if it's a CSV file
            if csv_file.endswith('.csv'):
                csv_path = os.path.join(pain_level_dir, csv_file)
                # Read the CSV file
                df = pd.read_csv(csv_path, sep='\t')
                # Extract GSR values
                gsr_signal = df['gsr'].values
                # Append the CSV name, GSR signals, and Pain level to the data list
                data.append({'CSV name': csv_file, 'GSR signals': gsr_signal, 'Pain level': int(pain_level)})

# Create a DataFrame from the collected data
df = pd.DataFrame(data)

# Display the DataFrame
df.head()


Unnamed: 0,CSV name,GSR signals,Pain level
0,072414_m_23-PA2-034_bio.csv,"[6.966839, 6.966161, 6.966, 6.966839, 6.966161...",2
1,081609_w_40-PA2-028_bio.csv,"[0.872, 0.872, 0.872, 0.872, 0.872, 0.872, 0.8...",2
2,081714_m_36-PA2-065_bio.csv,"[6.089862, 6.091, 6.091432, 6.092, 6.092432, 6...",2
3,102514_w_40-PA2-046_bio.csv,"[1.462, 1.462, 1.462, 1.462, 1.462, 1.462, 1.4...",2
4,120514_w_56-PA2-019_bio.csv,"[2.226, 2.226, 2.226, 2.226, 2.226, 2.226, 2.2...",2


Preprocessing
================

Preprocessing steps for GSR DataFrame include tasks such as handling missing values, smoothing the signal to reduce noise in the GSR signal 9(Savitzky-Golay filtering), removing outliers (z-score), and normalizing the data between a specified range, such as [0, 1] or [-1, 1] helping comparison across different subjects.

In [105]:
# Function to preprocess GSR signals
def preprocess_gsr_signal(gsr_signal):
    # Handle missing values (if any)
    gsr_signal = np.array(gsr_signal)  # Convert to NumPy array
    gsr_signal = gsr_signal[~np.isnan(gsr_signal)]  # Remove NaN values
    
    # Check if the length of the signal is sufficient for smoothing
    if len(gsr_signal) < 5:
        # If the signal is too short, return the original signal
        return gsr_signal
    
    try:
        # Smoothing using Savitzky-Golay filter
        gsr_signal_smooth = savgol_filter(gsr_signal, window_length=5, polyorder=2)
    except ValueError:
        # If an error occurs during smoothing, return the original signal
        return gsr_signal
    
    # Removing outliers based on Z-scores
    z_scores = (gsr_signal_smooth - gsr_signal_smooth.mean()) / gsr_signal_smooth.std()
    gsr_signal_smooth_no_outliers = gsr_signal_smooth[(z_scores < 3)]
    
    # Normalization
    if len(gsr_signal_smooth_no_outliers) > 0:
        gsr_signal_normalized = (gsr_signal_smooth_no_outliers - gsr_signal_smooth_no_outliers.min()) / \
                                 (gsr_signal_smooth_no_outliers.max() - gsr_signal_smooth_no_outliers.min())
    else:
        # If there are no valid values after removing outliers, return the original signal
        return gsr_signal
    
    return gsr_signal_normalized

# Apply preprocessing to each row in the DataFrame
df['GSR signals'] = df['GSR signals'].apply(preprocess_gsr_signal)

# Display the updated DataFrame
df.head()

  gsr_signal_normalized = (gsr_signal_smooth_no_outliers - gsr_signal_smooth_no_outliers.min()) / \


Unnamed: 0,CSV name,GSR signals,Pain level
0,072414_m_23-PA2-034_bio.csv,"[0.9899913791193845, 0.9877131683489699, 0.986...",2
1,081609_w_40-PA2-028_bio.csv,"[0.9678938384198197, 0.9678938384197773, 0.967...",2
2,081714_m_36-PA2-065_bio.csv,"[0.9020912464663925, 0.9035627741584942, 0.904...",2
3,102514_w_40-PA2-046_bio.csv,"[0.9675465896733677, 0.9675465896732949, 0.967...",2
4,120514_w_56-PA2-019_bio.csv,"[0.9986433711307843, 0.9986433711307986, 0.998...",2


In [106]:
# Step 1: Prepare the data
max_length = max(len(signal) for signal in df['GSR signals'])  # Find the maximum length of GSR signals

# Pad or truncate the GSR signals to the maximum length
gsr_signals = np.array([np.pad(signal, (0, max_length - len(signal))) if len(signal) < max_length else signal[:max_length] for signal in df['GSR signals']])

pain_levels = df['Pain level'].values

# Step 2: Split the data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(gsr_signals, pain_levels, test_size=0.2, random_state=42)

# Step 3: Convert the data into PyTorch tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).unsqueeze(1)  
X_test_tensor = torch.tensor(X_test, dtype=torch.float32).unsqueeze(1)  
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

# Check the shape of tensors
print("X_train_tensor shape:", X_train_tensor.shape)
print("X_test_tensor shape:", X_test_tensor.shape)
print("y_train_tensor shape:", y_train_tensor.shape)
print("y_test_tensor shape:", y_test_tensor.shape)


X_train_tensor shape: torch.Size([6960, 1, 2816])
X_test_tensor shape: torch.Size([1740, 1, 2816])
y_train_tensor shape: torch.Size([6960])
y_test_tensor shape: torch.Size([1740])


X_train_tensor: The training data tensor with a shape of [6960, 1, 2816], indicating that there are 6960 samples, each with 1 channel (for the GSR signal), and each signal has been padded or truncated to a length of 2816.  
X_test_tensor: The test data tensor with a shape of [1740, 1, 2816], indicating that there are 1740 samples in the test set, each with 1 channel, and the signals have the same length as the training data.  
y_train_tensor: The training labels tensor with a shape of [6960], containing the corresponding pain levels for the training samples.  
y_test_tensor: The test labels tensor with a shape of [1740], containing the corresponding pain levels for the test samples.

In [107]:
#check dimensionality of the tensor
inputs.shape

torch.Size([816, 1, 2816])

The first dimension (29) represents the batch size, indicating that there are 29 samples in the batch.  
The second dimension (1) represents the number of channels. In this case, there is only one channel.  
The third dimension (1) represents the length of the input data for each channel.

Input data has a shape of (batch_size, channels, sequence_length).

Model
================

In [108]:
# Define the Conv1D model
class Conv1D_model(nn.Module):
    def __init__(self, num_classes=5):
        super(Conv1D_model, self).__init__()
        
        # First Convolutional Layer
        self.conv1 = nn.Conv1d(in_channels=1, out_channels=32, kernel_size=5, stride=2)
        self.relu1 = nn.ReLU()
        self.maxpool1 = nn.MaxPool1d(kernel_size=2)
        
        # Second Convolutional Layer
        self.conv2 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=5)
        self.relu2 = nn.ReLU()
        self.maxpool2 = nn.MaxPool1d(kernel_size=2)
        
        # Fully Connected Layers
        self.fc1 = nn.Linear(22336, 512)  
        self.relu3 = nn.ReLU()
        self.fc2 = nn.Linear(512, num_classes)
        
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.maxpool1(x)
        
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.maxpool2(x)
        
        x = x.view(x.size(0), -1)  # Flatten the tensor to 1D
        x = self.fc1(x)
        
        x = self.fc2(x)
        #x = self.sigmoid(x)
        return x

# Create an instance of the model
model = Conv1D_model(num_classes=5)

# Print the model architecture
print(model)

# Define learning parameters
lr = 0.0001
num_epochs = 100
batch_size = 1024

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)

# Define training and testing tensors (replace with your actual data)
X_train_tensor = torch.randn(6960, 1, 2816)
y_train_tensor = torch.randint(0, 5, (6960,))
X_test_tensor = torch.randn(1740, 1, 2816)
y_test_tensor = torch.randint(0, 5, (1740,))

# Create DataLoader for training
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Train the model
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * inputs.size(0)
    epoch_loss = running_loss / len(train_loader.dataset)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}")

# Evaluate the model
model.eval()
with torch.no_grad():
    outputs = model(X_test_tensor)
    predictions = torch.argmax(outputs, dim=1)
    y_test_tensor_sampled = y_test_tensor[:len(predictions)]
    accuracy = torch.mean((predictions == y_test_tensor_sampled).float()).item()
    print(f"Test Accuracy: {accuracy:.4f}")


Conv1D_model(
  (conv1): Conv1d(1, 32, kernel_size=(5,), stride=(2,))
  (relu1): ReLU()
  (maxpool1): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv1d(32, 64, kernel_size=(5,), stride=(1,))
  (relu2): ReLU()
  (maxpool2): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=22336, out_features=512, bias=True)
  (relu3): ReLU()
  (fc2): Linear(in_features=512, out_features=5, bias=True)
  (sigmoid): Sigmoid()
)
Epoch 1/100, Loss: 1.6100
Epoch 2/100, Loss: 1.6099
Epoch 3/100, Loss: 1.6098
Epoch 4/100, Loss: 1.6097
Epoch 5/100, Loss: 1.6095
Epoch 6/100, Loss: 1.6094
Epoch 7/100, Loss: 1.6093
Epoch 8/100, Loss: 1.6092
Epoch 9/100, Loss: 1.6091
Epoch 10/100, Loss: 1.6090
Epoch 11/100, Loss: 1.6089
Epoch 12/100, Loss: 1.6087
Epoch 13/100, Loss: 1.6086
Epoch 14/100, Loss: 1.6085
Epoch 15/100, Loss: 1.6084
Epoch 16/100, Loss: 1.6083
Epoch 17/100, Loss: 1.6081
Epoch 18/100, Loss: 1.6080
Epoch 19/100, L

Cross-Validation
================

In [111]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import KFold

# Define the Conv1D model
class Conv1D_model(nn.Module):
    def __init__(self, num_classes=5):
        super(Conv1D_model, self).__init__()
        # First Convolutional Layer
        self.conv1 = nn.Conv1d(in_channels=1, out_channels=32, kernel_size=5, stride=2)
        self.relu1 = nn.ReLU()
        self.maxpool1 = nn.MaxPool1d(kernel_size=2)
        
        # Second Convolutional Layer
        self.conv2 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=5)
        self.relu2 = nn.ReLU()
        self.maxpool2 = nn.MaxPool1d(kernel_size=2)
        
        # Fully Connected Layers
        self.fc1 = nn.Linear(22336, 512)  
        self.relu3 = nn.ReLU()
        self.fc2 = nn.Linear(512, num_classes)
        
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.maxpool1(x)
        
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.maxpool2(x)
        
        x = x.view(x.size(0), -1)  # Flatten the tensor to 1D
        x = self.fc1(x)
        
        x = self.fc2(x)
        #x = self.sigmoid(x)
        return x

# Define learning parameters
lr = 0.0001
num_epochs = 100
batch_size = 1024
num_classes = 5

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()

# Define training and testing tensors (replace with your actual data)
X_train_tensor = torch.randn(6960, 1, 2816)
y_train_tensor = torch.randint(0, num_classes, (6960,))
X_test_tensor = torch.randn(1740, 1, 2816)
y_test_tensor = torch.randint(0, num_classes, (1740,))

# Initialize K-Fold cross-validation
k_folds = 5
kf = KFold(n_splits=k_folds, shuffle=True)

# Perform cross-validation
for fold, (train_index, val_index) in enumerate(kf.split(X_train_tensor)):
    print(f"Fold {fold + 1}/{k_folds}")

    # Split data into training and validation sets for this fold
    X_train_fold, X_val_fold = X_train_tensor[train_index], X_train_tensor[val_index]
    y_train_fold, y_val_fold = y_train_tensor[train_index], y_train_tensor[val_index]

    # Create DataLoader for training and validation sets
    train_dataset = TensorDataset(X_train_fold, y_train_fold)
    val_dataset = TensorDataset(X_val_fold, y_val_fold)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    # Initialize the model
    model = Conv1D_model(num_classes=num_classes)

    # Initialize the optimizer
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)

    # Train the model
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        for inputs, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * inputs.size(0)

            # Calculate training accuracy
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_accuracy = correct / total
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.4f}")

    # Evaluate the model on the validation set
    model.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        for inputs, labels in val_loader:
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        accuracy = correct / total
        print(f"Validation Accuracy: {accuracy:.4f}")


Fold 1/5
Epoch 1/100, Loss: 1.6199, Accuracy: 0.2044
Epoch 2/100, Loss: 1.6155, Accuracy: 0.2042
Epoch 3/100, Loss: 1.6117, Accuracy: 0.2051
Epoch 4/100, Loss: 1.6103, Accuracy: 0.2055
Epoch 5/100, Loss: 1.6099, Accuracy: 0.2094
Epoch 6/100, Loss: 1.6099, Accuracy: 0.2119
Epoch 7/100, Loss: 1.6098, Accuracy: 0.2125
Epoch 8/100, Loss: 1.6096, Accuracy: 0.2123
Epoch 9/100, Loss: 1.6094, Accuracy: 0.2119
Epoch 10/100, Loss: 1.6092, Accuracy: 0.2117
Epoch 11/100, Loss: 1.6091, Accuracy: 0.2123
Epoch 12/100, Loss: 1.6089, Accuracy: 0.2108
Epoch 13/100, Loss: 1.6088, Accuracy: 0.2094
Epoch 14/100, Loss: 1.6087, Accuracy: 0.2101
Epoch 15/100, Loss: 1.6086, Accuracy: 0.2119
Epoch 16/100, Loss: 1.6084, Accuracy: 0.2110
Epoch 17/100, Loss: 1.6083, Accuracy: 0.2110
Epoch 18/100, Loss: 1.6082, Accuracy: 0.2139
Epoch 19/100, Loss: 1.6080, Accuracy: 0.2143
Epoch 20/100, Loss: 1.6079, Accuracy: 0.2135
Epoch 21/100, Loss: 1.6077, Accuracy: 0.2144
Epoch 22/100, Loss: 1.6077, Accuracy: 0.2164
Epoch 23/1

Evaluation
================

In [112]:
from sklearn.metrics import mean_absolute_error, mean_squared_error

# Evaluate the model
model.eval()
with torch.no_grad():
    outputs = model(X_test_tensor)
    predictions = torch.argmax(outputs, dim=1)
    y_test_tensor_sampled = y_test_tensor[:len(predictions)]
    accuracy = torch.mean((predictions == y_test_tensor_sampled).float()).item()
    print(f"Test Accuracy: {accuracy:.4f}")

    # Calculate MAE and RMSE
    mae = mean_absolute_error(y_test_tensor_sampled, predictions)
    rmse = mean_squared_error(y_test_tensor_sampled, predictions, squared=False)
    print(f"MAE: {mae:.4f}")
    print(f"RMSE: {rmse:.4f}")

Test Accuracy: 0.1902
MAE: 1.4368
RMSE: 1.7590




Defining model classes and utility functions
============================================

Next, I need to define our model classes. Several user-defined
parameters need to be set here. I use two different architectures,
keeping the number of filters fixed across our experiments to ensure
fair comparisons. Both architectures are Convolutional Neural Networks
(CNNs) with a different number of convolutional layers that serve as
feature extractors, followed by a classifier with 10 classes. The number
of filters and neurons is smaller for the students.


In [5]:
# Deeper neural network class to be used as teacher:
class DeepNN(nn.Module):
    def __init__(self, num_classes=10):
        super(DeepNN, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(128, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(64, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        self.classifier = nn.Sequential(
            nn.Linear(2048, 512),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

# Lightweight neural network class to be used as student:
class LightNN(nn.Module):
    def __init__(self, num_classes=10):
        super(LightNN, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(16, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        self.classifier = nn.Sequential(
            nn.Linear(1024, 256),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(256, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

I employ 2 functions to help produce and evaluate the results on the
original classification task. One function is called `train` and takes
the following arguments:

-   `model`: A model instance to train (update its weights) via this
    function.
-   `train_loader`: We defined our `train_loader` above, and its job is
    to feed the data into the model.
-   `epochs`: How many times we loop over the dataset.
-   `learning_rate`: The learning rate determines how large our steps
    towards convergence should be. Too large or too small steps can be
    detrimental.
-   `device`: Determines the device to run the workload on. Can be
    either CPU or GPU depending on availability.

Our test function is similar, but it will be invoked with `test_loader`
to load images from the test set.


In [6]:
def train(model, train_loader, epochs, learning_rate, device):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    model.train()

    for epoch in range(epochs):
        running_loss = 0.0
        for inputs, labels in train_loader:
            # inputs: A collection of batch_size images
            # labels: A vector of dimensionality batch_size with integers denoting class of each image
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)

            # outputs: Output of the network for the collection of images. A tensor of dimensionality batch_size x num_classes
            # labels: The actual labels of the images. Vector of dimensionality batch_size
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        print(f"Epoch {epoch+1}/{epochs}, Loss: {running_loss / len(train_loader)}")

def test(model, test_loader, device):
    model.to(device)
    model.eval()

    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)

            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    print(f"Test Accuracy: {accuracy:.2f}%")
    return accuracy

Cross-entropy runs
==================

For reproducibility, we need to set the torch manual seed. I train
networks using different methods, so to compare them fairly, it makes
sense to initialize the networks with the same weights. I start by
training the teacher network using cross-entropy:


In [7]:
torch.manual_seed(42)
nn_deep = DeepNN(num_classes=10).to(device)
train(nn_deep, train_loader, epochs=10, learning_rate=0.001, device=device)
test_accuracy_deep = test(nn_deep, test_loader, device)

# Instantiate the lightweight network:
torch.manual_seed(42)
nn_light = LightNN(num_classes=10).to(device)

Epoch 1/10, Loss: 1.3609035784936012
Epoch 2/10, Loss: 0.8999937804763579
Epoch 3/10, Loss: 0.698583966935687
Epoch 4/10, Loss: 0.5613005085064627
Epoch 5/10, Loss: 0.4368233955119882
Epoch 6/10, Loss: 0.3273872912997175
Epoch 7/10, Loss: 0.23821778524943324
Epoch 8/10, Loss: 0.18469191213016925
Epoch 9/10, Loss: 0.14930837982527131
Epoch 10/10, Loss: 0.12943355578100285
Test Accuracy: 75.09%


I instantiate one more lightweight network model to compare their
performances. Back propagation is sensitive to weight initialization, so
I need to make sure these two networks have the exact same
initialization.


In [None]:
torch.manual_seed(42)
new_nn_light = LightNN(num_classes=10).to(device)

To ensure I have created a copy of the first network, we inspect the
norm of its first layer. If it matches, then the networks are indeed the same.


In [None]:
# Print the norm of the first layer of the initial lightweight model
print("Norm of 1st layer of nn_light:", torch.norm(nn_light.features[0].weight).item())
# Print the norm of the first layer of the new lightweight model
print("Norm of 1st layer of new_nn_light:", torch.norm(new_nn_light.features[0].weight).item())

Print the total number of parameters in each model:


In [None]:
total_params_deep = "{:,}".format(sum(p.numel() for p in nn_deep.parameters()))
print(f"DeepNN parameters: {total_params_deep}")
total_params_light = "{:,}".format(sum(p.numel() for p in nn_light.parameters()))
print(f"LightNN parameters: {total_params_light}")

Train and test the lightweight network with cross entropy loss:


In [None]:
train(nn_light, train_loader, epochs=10, learning_rate=0.001, device=device)
test_accuracy_light_ce = test(nn_light, test_loader, device)

As we can see, based on test accuracy, I can now compare the deeper
network that is to be used as a teacher with the lightweight network
that is the supposed student. So far, the student has not intervened
with the teacher, therefore this performance is achieved by the student
itself. The metrics so far can be seen with the following lines:


In [None]:
print(f"Teacher accuracy: {test_accuracy_deep:.2f}%")
print(f"Student accuracy: {test_accuracy_light_ce:.2f}%")

In [None]:
def train_knowledge_distillation(teacher, student, train_loader, epochs, learning_rate, T, soft_target_loss_weight, ce_loss_weight, device):
    ce_loss = nn.CrossEntropyLoss()
    optimizer = optim.Adam(student.parameters(), lr=learning_rate)

    teacher.eval()  # Teacher set to evaluation mode
    student.train() # Student to train mode

    for epoch in range(epochs):
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()

            # Forward pass with the teacher model - do not save gradients here as we do not change the teacher's weights
            with torch.no_grad():
                teacher_logits = teacher(inputs)

            # Forward pass with the student model
            student_logits = student(inputs)

            #Soften the student logits by applying softmax first and log() second
            soft_targets = nn.functional.softmax(teacher_logits / T, dim=-1)
            soft_prob = nn.functional.log_softmax(student_logits / T, dim=-1)

            # Calculate the soft targets loss. Scaled by T**2 as suggested by the authors of the paper "Distilling the knowledge in a neural network"
            soft_targets_loss = torch.sum(soft_targets * (soft_targets.log() - soft_prob)) / soft_prob.size()[0] * (T**2)

            # Calculate the true label loss
            label_loss = ce_loss(student_logits, labels)

            # Weighted sum of the two losses
            loss = soft_target_loss_weight * soft_targets_loss + ce_loss_weight * label_loss

            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        print(f"Epoch {epoch+1}/{epochs}, Loss: {running_loss / len(train_loader)}")

# Apply ``train_knowledge_distillation`` with a temperature of 2. Arbitrarily set the weights to 0.75 for CE and 0.25 for distillation loss.
train_knowledge_distillation(teacher=nn_deep, student=new_nn_light, train_loader=train_loader, epochs=10, learning_rate=0.001, T=2, soft_target_loss_weight=0.25, ce_loss_weight=0.75, device=device)
test_accuracy_light_ce_and_kd = test(new_nn_light, test_loader, device)

# Compare the student test accuracy with and without the teacher, after distillation
print(f"Teacher accuracy: {test_accuracy_deep:.2f}%")
print(f"Student accuracy without teacher: {test_accuracy_light_ce:.2f}%")
print(f"Student accuracy with CE + KD: {test_accuracy_light_ce_and_kd:.2f}%")

Cosine loss minimization run
============================

The temperature parameter controls the softness of the softmax function and the loss coefficients. In
neural networks, it is easy to include to include additional loss
functions to the main objectives to achieve goals like better
generalization.   Let\'s try including an objective for the student, but
now let\'s focus on their hidden states rather than their output layers.  
The goal is to convey information from the teacher\'s representation to
the student by including a naive loss function, whose minimization
implies that the flattened vectors that are subsequently passed to the
classifiers have become more *similar* as the loss decreases. Of course,
the teacher does not update its weights, so the minimization depends
only on the student\'s weights.   The rationale behind this method is that
I am  operating under the assumption that the teacher model has a
better internal representation that is unlikely to be achieved by the
student without external intervention, therefore we artificially push
the student to mimic the internal representation of the teacher. Whether
or not this will end up helping the student is not straightforward,
though, because pushing the lightweight network to reach this point
could be a good thing, assuming that I have found an internal
representation that leads to better test accuracy, but it could also be
harmful because the networks have different architectures and the
student does not have the same learning capacity as the teacher. In
other words, there is no reason for these two vectors, the student\'s
and the teacher\'s to match per component. The student could reach an
internal representation that is a permutation of the teacher\'s and it
would be just as efficient. Nonetheless, we can still run a quick
experiment to figure out the impact of this method. We will be using the
`CosineEmbeddingLoss` which is given by the following formula:

![Formula for
CosineEmbeddingLoss](https://pytorch.org/tutorials//../_static/img/knowledge_distillation/cosine_embedding_loss.png){.align-center
width="450px"}

Obviously, there is one thing that I need to resolve first. When we
applied distillation to the output layer we mentioned that both networks
have the same number of neurons, equal to the number of classes.
However, this is not the case for the layer following our convolutional
layers. Here, the teacher has more neurons than the student after the
flattening of the final convolutional layer. Our loss function accepts
two vectors of equal dimensionality as inputs, therefore we need to
somehow match them. We will solve this by including an average pooling
layer after the teacher\'s convolutional layer to reduce its
dimensionality to match that of the student.

To proceed, I will modify our model classes, or create new ones. Now,
the forward function returns not only the logits of the network but also
the flattened hidden representation after the convolutional layer. We
include the aforementioned pooling for the modified teacher.


In [None]:
class ModifiedDeepNNCosine(nn.Module):
    def __init__(self, num_classes=10):
        super(ModifiedDeepNNCosine, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(128, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(64, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        self.classifier = nn.Sequential(
            nn.Linear(2048, 512),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        flattened_conv_output = torch.flatten(x, 1)
        x = self.classifier(flattened_conv_output)
        flattened_conv_output_after_pooling = torch.nn.functional.avg_pool1d(flattened_conv_output, 2)
        return x, flattened_conv_output_after_pooling

# Create a similar student class where we return a tuple. We do not apply pooling after flattening.
class ModifiedLightNNCosine(nn.Module):
    def __init__(self, num_classes=10):
        super(ModifiedLightNNCosine, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(16, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        self.classifier = nn.Sequential(
            nn.Linear(1024, 256),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(256, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        flattened_conv_output = torch.flatten(x, 1)
        x = self.classifier(flattened_conv_output)
        return x, flattened_conv_output

# We do not have to train the modified deep network from scratch of course, we just load its weights from the trained instance
modified_nn_deep = ModifiedDeepNNCosine(num_classes=10).to(device)
modified_nn_deep.load_state_dict(nn_deep.state_dict())

# Once again ensure the norm of the first layer is the same for both networks
print("Norm of 1st layer for deep_nn:", torch.norm(nn_deep.features[0].weight).item())
print("Norm of 1st layer for modified_deep_nn:", torch.norm(modified_nn_deep.features[0].weight).item())

# Initialize a modified lightweight network with the same seed as our other lightweight instances. This will be trained from scratch to examine the effectiveness of cosine loss minimization.
torch.manual_seed(42)
modified_nn_light = ModifiedLightNNCosine(num_classes=10).to(device)
print("Norm of 1st layer:", torch.norm(modified_nn_light.features[0].weight).item())

Naturally, I need to change the train loop because now the model
returns a tuple `(logits, hidden_representation)`. Using a sample input
tensor we can print their shapes.


In [None]:
# Create a sample input tensor
sample_input = torch.randn(128, 3, 32, 32).to(device) # Batch size: 128, Filters: 3, Image size: 32x32

# Pass the input through the student
logits, hidden_representation = modified_nn_light(sample_input)

# Print the shapes of the tensors
print("Student logits shape:", logits.shape) # batch_size x total_classes
print("Student hidden representation shape:", hidden_representation.shape) # batch_size x hidden_representation_size

# Pass the input through the teacher
logits, hidden_representation = modified_nn_deep(sample_input)

# Print the shapes of the tensors
print("Teacher logits shape:", logits.shape) # batch_size x total_classes
print("Teacher hidden representation shape:", hidden_representation.shape) # batch_size x hidden_representation_size

In [None]:
def train_cosine_loss(teacher, student, train_loader, epochs, learning_rate, hidden_rep_loss_weight, ce_loss_weight, device):
    ce_loss = nn.CrossEntropyLoss()
    cosine_loss = nn.CosineEmbeddingLoss()
    optimizer = optim.Adam(student.parameters(), lr=learning_rate)

    teacher.to(device)
    student.to(device)
    teacher.eval()  # Teacher set to evaluation mode
    student.train() # Student to train mode

    for epoch in range(epochs):
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()

            # Forward pass with the teacher model and keep only the hidden representation
            with torch.no_grad():
                _, teacher_hidden_representation = teacher(inputs)

            # Forward pass with the student model
            student_logits, student_hidden_representation = student(inputs)

            # Calculate the cosine loss. Target is a vector of ones. From the loss formula above we can see that is the case where loss minimization leads to cosine similarity increase.
            hidden_rep_loss = cosine_loss(student_hidden_representation, teacher_hidden_representation, target=torch.ones(inputs.size(0)).to(device))

            # Calculate the true label loss
            label_loss = ce_loss(student_logits, labels)

            # Weighted sum of the two losses
            loss = hidden_rep_loss_weight * hidden_rep_loss + ce_loss_weight * label_loss

            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        print(f"Epoch {epoch+1}/{epochs}, Loss: {running_loss / len(train_loader)}")

I need to modify our test function for the same reason. Here I ignore
the hidden representation returned by the model.


In [None]:
def test_multiple_outputs(model, test_loader, device):
    model.to(device)
    model.eval()

    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            outputs, _ = model(inputs) # Disregard the second tensor of the tuple
            _, predicted = torch.max(outputs.data, 1)

            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    print(f"Test Accuracy: {accuracy:.2f}%")
    return accuracy

In this case, I include both knowledge distillation and
cosine loss minimization in the same function. It is common to combine
methods to achieve better performance in teacher-student paradigms. For
now, we can run a simple train-test session.


In [None]:
# Train and test the lightweight network with cross entropy loss
train_cosine_loss(teacher=modified_nn_deep, student=modified_nn_light, train_loader=train_loader, epochs=10, learning_rate=0.001, hidden_rep_loss_weight=0.25, ce_loss_weight=0.75, device=device)
test_accuracy_light_ce_and_cosine_loss = test_multiple_outputs(modified_nn_light, test_loader, device)

Intermediate regressor run
==========================

The naive minimization does not guarantee better results for several
reasons, one being the dimensionality of the vectors. Cosine similarity
generally works better than Euclidean distance for vectors of higher
dimensionality, but I am dealing with vectors with 1024 components
each, so it is much harder to extract meaningful similarities.
Furthermore, as we mentioned, pushing towards a match of the hidden
representation of the teacher and the student is not supported by
theory. There are no good reasons why we should be aiming for a 1:1
match of these vectors. I will provide a final example of training
intervention by including an extra network called regressor. The
objective is to first extract the feature map of the teacher after a
convolutional layer, then extract a feature map of the student after a
convolutional layer, and finally try to match these maps. However, this
time, we will introduce a regressor between the networks to facilitate
the matching process. The regressor will be trainable and ideally will
do a better job than our naive cosine loss minimization scheme. Its main
job is to match the dimensionality of these feature maps so that we can
properly define a loss function between the teacher and the student.
Defining such a loss function provides a teaching \"path,\" which is
basically a flow to back-propagate gradients that will change the
student\'s weights. Focusing on the output of the convolutional layers
right before each classifier for our original networks, I have the
following shapes:


In [None]:
# Pass the sample input only from the convolutional feature extractor
convolutional_fe_output_student = nn_light.features(sample_input)
convolutional_fe_output_teacher = nn_deep.features(sample_input)

# Print their shapes
print("Student's feature extractor output shape: ", convolutional_fe_output_student.shape)
print("Teacher's feature extractor output shape: ", convolutional_fe_output_teacher.shape)

In [None]:
class ModifiedDeepNNRegressor(nn.Module):
    def __init__(self, num_classes=10):
        super(ModifiedDeepNNRegressor, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(128, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(64, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        self.classifier = nn.Sequential(
            nn.Linear(2048, 512),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        conv_feature_map = x
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x, conv_feature_map

class ModifiedLightNNRegressor(nn.Module):
    def __init__(self, num_classes=10):
        super(ModifiedLightNNRegressor, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(16, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        # Include an extra regressor (in our case linear)
        self.regressor = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=3, padding=1)
        )
        self.classifier = nn.Sequential(
            nn.Linear(1024, 256),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(256, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        regressor_output = self.regressor(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x, regressor_output

After that, I have to update the train loop again. This time, I
extract the regressor output of the student, the feature map of the
teacher, I calculate the `MSE` on these tensors (they have the exact
same shape so it\'s properly defined) and we back propagate gradients
based on that loss, in addition to the regular cross entropy loss of the
classification task.


In [None]:
def train_mse_loss(teacher, student, train_loader, epochs, learning_rate, feature_map_weight, ce_loss_weight, device):
    ce_loss = nn.CrossEntropyLoss()
    mse_loss = nn.MSELoss()
    optimizer = optim.Adam(student.parameters(), lr=learning_rate)

    teacher.to(device)
    student.to(device)
    teacher.eval()  # Teacher set to evaluation mode
    student.train() # Student to train mode

    for epoch in range(epochs):
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()

            # Again ignore teacher logits
            with torch.no_grad():
                _, teacher_feature_map = teacher(inputs)

            # Forward pass with the student model
            student_logits, regressor_feature_map = student(inputs)

            # Calculate the loss
            hidden_rep_loss = mse_loss(regressor_feature_map, teacher_feature_map)

            # Calculate the true label loss
            label_loss = ce_loss(student_logits, labels)

            # Weighted sum of the two losses
            loss = feature_map_weight * hidden_rep_loss + ce_loss_weight * label_loss

            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        print(f"Epoch {epoch+1}/{epochs}, Loss: {running_loss / len(train_loader)}")

# Notice how our test function remains the same here with the one we used in our previous case. We only care about the actual outputs because we measure accuracy.

# Initialize a ModifiedLightNNRegressor
torch.manual_seed(42)
modified_nn_light_reg = ModifiedLightNNRegressor(num_classes=10).to(device)

# We do not have to train the modified deep network from scratch of course, we just load its weights from the trained instance
modified_nn_deep_reg = ModifiedDeepNNRegressor(num_classes=10).to(device)
modified_nn_deep_reg.load_state_dict(nn_deep.state_dict())

# Train and test once again
train_mse_loss(teacher=modified_nn_deep_reg, student=modified_nn_light_reg, train_loader=train_loader, epochs=10, learning_rate=0.001, feature_map_weight=0.25, ce_loss_weight=0.75, device=device)
test_accuracy_light_ce_and_mse_loss = test_multiple_outputs(modified_nn_light_reg, test_loader, device)

It is expected that the final method will work better than `CosineLoss`
because now I have allowed a trainable layer between the teacher and
the student, which gives the student some wiggle room when it comes to
learning, rather than pushing the student to copy the teacher\'s
representation. Including the extra network is the idea behind
hint-based distillation.


In [None]:
print(f"Teacher accuracy: {test_accuracy_deep:.2f}%")
print(f"Student accuracy without teacher: {test_accuracy_light_ce:.2f}%")
print(f"Student accuracy with CE + KD: {test_accuracy_light_ce_and_kd:.2f}%")
print(f"Student accuracy with CE + CosineLoss: {test_accuracy_light_ce_and_cosine_loss:.2f}%")
print(f"Student accuracy with CE + RegressorMSE: {test_accuracy_light_ce_and_mse_loss:.2f}%")

Conclusion
==========

None of the methods above increases the number of parameters for the
network or inference time, so the performance increase comes at the
little cost of calculating gradients during training. In ML
applications, we mostly care about inference time because training
happens before the model deployment. If our lightweight model is still
too heavy for deployment, we can apply different ideas, such as
post-training quantization. Additional losses can be applied in many
tasks, not just classification, and you can experiment with quantities
like coefficients, temperature, or number of neurons. Feel free to tune
any numbers in the tutorial above, but keep in mind, if you change the
number of neurons / filters chances are a shape mismatch might occur.

For more information, refers to:

-   [Hinton, G., Vinyals, O., Dean, J.: Distilling the knowledge in a
    neural network. In: Neural Information Processing System Deep
    Learning Workshop (2015)](https://arxiv.org/abs/1503.02531)
-   [Romero, A., Ballas, N., Kahou, S.E., Chassang, A., Gatta, C.,
    Bengio, Y.: Fitnets: Hints for thin deep nets. In: Proceedings of
    the International Conference on Learning
    Representations (2015)](https://arxiv.org/abs/1412.6550)
