**23. Transfer Learning in PyTorch**

* Transfer Learning is a ML technique where a model trained on one task is reused (partially or fully) for a different but related task. Instead of training a model from scratch, which can be computationally expensive and require large datasets, transfer learning leverages knowledge from a pre-trained model to improve learning efficiency and performance.
* Working of Transfer Learning:
  1. Pretraining on a Large Dataset
    * A model is first trained on a large dataset (e.g. ImageNet for images, GPT for text)
    * The model learns the general features, such as edges and shapes in images or syntax and semantics in text.
  2. Fine-tuning for a new task
    * The pre-trained model is then adapted to a new, often smaller, dataset.
    * Some layers may be frozen (not updated), while others are fine-tuned for the specific task.
* Steps:
  * import model (to be used for transfer-learning)
  * detach the classifier
  * attach custom/our classifier
  * freeze feature extraction layer
  * train the model
* We will be using VGG16 model, but it requires a little pre-processing:
  * reshape: 784 --> (28,28)
  * dtype = np.uint8
  * convert from 1D to 3D: (28,28) --> (3,28,28) 
  * convert to PIL image
  * resize from (3,28,28) to (3,256,256) using InterpolationMode.BILINEAR
  * center-crop (3,224,224)
  * convert to pytorch tensor and scale between 0 to 1
  * normalize using mean = [0.485, 0.456, 0.406] and std = [0.229, 0.224, 0.225]


In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split

import numpy as np
import torch as tr
import torch.nn as nn
from torch.utils.data import Dataset,DataLoader

# import plotting libraries
import matplotlib.pyplot as plt
import matplotlib_inline
matplotlib_inline.backend_inline.set_matplotlib_formats('svg')

# set random seeds for reproducibility
random_seed = 14

In [None]:
device = tr.device('cuda' if tr.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

In [None]:
df = pd.read_csv('datasets/fmnist_small.csv')
df.head()

In [None]:
# transformations
from torchvision import transforms
custom_transform = transforms.Compose(
    [
        transforms.Resize((256,256),interpolation=transforms.InterpolationMode.BILINEAR),
        transforms.CenterCrop((224,224)),
        transforms.ToTensor(), # transforms to PyTorch tensor and scales all values between 0 and 1,
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std = [0.229, 0.224, 0.225]) 
    ]
)

In [None]:
arr = np.arange(1,5).reshape(2,2)
np.stack([arr]*3).shape,np.stack([arr]*3,axis=-1).shape

In [None]:
# getting data as numpy arrays
X = df.drop(columns=['label']).to_numpy()
y = df['label'].to_numpy()

class_labels =  ["T-shirt/top",
                "Trouser",
                "Pullover",
                "Dress",
                "Coat",
                "Sandal",
                "Shirt",
                "Sneaker",
                "Bag",
                "Ankle boot"]

# perform train-test split
X_train,X_test,y_train,y_test = train_test_split(X,y,test_size=0.2)

from PIL import Image
# creating dataset class
class FMNIST_DATASET(Dataset):
    def __init__(self,X,y,transform):
        self.X = X
        self.y = y
        self.transform = transform

    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, index):
        # 1. resize to (28,28)
        img = self.X[index].reshape(28,28)

        # 2. change datatype to np.uint8
        img = img.astype(np.uint8)

        # change 1 channel to 3 channel -> height, width, channels
        img = np.stack([img]*3, axis = -1)

        # 4. convert to PIL image
        img = Image.fromarray(img)

        # 5. apply transformations
        return self.transform(img),tr.tensor(self.y[index],dtype=tr.long)

    
train_dataset = FMNIST_DATASET(X_train,y_train,custom_transform)
test_dataset = FMNIST_DATASET(X_test,y_test,custom_transform)


BATCH_SIZE = 32
train_loader = DataLoader(train_dataset,batch_size= BATCH_SIZE, shuffle = True)
test_loader = DataLoader(test_dataset,batch_size= BATCH_SIZE, shuffle = False)

In [None]:
fig,axes = plt.subplots(2,4,figsize = (8,4))

images,labels = next(iter(train_loader))


for i,ax in enumerate(axes.flat):
    ax.imshow(images[i][0],cmap = 'binary')
    ax.axis('off')
    ax.set_title(f'{class_labels[labels[i]]}')

plt.tight_layout(rect=[0,0,1,0.96])
plt.show()

In [None]:
# using a pre-trained model
import torchvision.models as models
vgg16 = models.vgg16(pretrained = True)

In [None]:
for param in vgg16.features.parameters():
    param.requires_grad = False

In [None]:
class FMNIST_NET(nn.Module):
    def __init__(self):
        super().__init__()

        # feature extractor 
        self.features = vgg16.features

        # classifier
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features = 25088, out_features = 64),
            nn.ReLU(),
            nn.Dropout(p=0.5),

            nn.Linear(in_features = 64, out_features = 32),
            nn.ReLU(),
            nn.Dropout(p=0.5),

            nn.Linear(in_features = 32, out_features = 10)
        )
        
    def forward(self,x):
        x = self.features(x)
        x = self.classifier(x)
        return x

# from torchinfo import summary
# demo_model = FMNIST_NET()
# summary(demo_model)

In [None]:
# learning rate and epochs
lr = 1e-3
epochs = 5

# instantiating the model
fnet = FMNIST_NET()

# loss function
loss_fn = nn.CrossEntropyLoss()

# optimizer
optimizer = tr.optim.Adam(fnet.classifier.parameters(), lr = lr)

In [None]:
fnet = fnet.to(device) # move the model to GPU

In [None]:
# training loop
for epochi in range(epochs):
    fnet.train()
    batch_loss = []
    for X_batch,y_batch in train_loader:
        # moving data to device (GPU)
        X_batch,y_batch = X_batch.to(device),y_batch.to(device)
        
        # forward pass
        y_pred = fnet(X_batch)

        # loss calculation
        loss = loss_fn(y_pred,y_batch)
        batch_loss.append(loss.item())

        # backward pass
        optimizer.zero_grad()
        loss.backward()

        # upgrade parameters
        optimizer.step()

    mean_batch_loss = np.mean(batch_loss)
    
    # perform validation
    fnet.eval()
    val_correct = 0
    val_total = 0

    with tr.no_grad(): # disable gradient tracking
        for X_val,y_val in test_loader:
            # moving data to device(GPU)
            X_val,y_val = X_val.to(device),y_val.to(device)
            
            y_pred = fnet(X_val)
            preds = tr.argmax(y_pred,dim=1)
            val_correct += (preds == y_val).sum().item()
            val_total += y_val.size(0)
    val_acc = (val_correct / val_total)*100

    print(f'Epoch: {epochi+1}, Loss: {mean_batch_loss}, Val Accuracy:{val_acc:.2f}%')

In [None]:
def evaluate_model(model,test_loader):
    correct_preds = 0
    total_preds = 0

    device = tr.device('cuda' if tr.cuda.is_available() else 'cpu')

    model.to(device)

    with tr.no_grad():
        for X_batch,y_batch in test_loader:
            # moving data to device(GPU)
            X_batch,y_batch = X_batch.to(device),y_batch.to(device)

            y_pred = model(X_batch)
            preds = tr.argmax(y_pred, dim = 1)
            correct_preds+=(preds == y_batch).sum().item()
            total_preds += y_batch.size(0)
        accuracy = (correct_preds/total_preds)
        return accuracy
    
evaluate_model(fnet,test_loader) 