<a href="https://colab.research.google.com/github/sabinnmc/machine_learning_n_image_classification/blob/main/memory_buffer_%2B_EWC_in_MLP_method_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import torchvision.datasets as datasets
import matplotlib.pyplot as plt
import torchvision
import numpy as np
from PIL import Image

In [None]:

import torch
print(f"GPU available: {torch.cuda.is_available()}")
print(f"GPU name: {torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'N/A'}")


GPU available: True
GPU name: NVIDIA GeForce RTX 2060 SUPER


# Data preparation -> define tranformation for augmentation and normalization

`transforms.RandomHorizontalFlip()` ->
*   its tranforms flip image into its mirror image with 50% probability
*   it effectively doubles our datsets without collecting lots of data
*   flipping teaches model recognize real world sceanrio data more clearly
*   improve generalization pattern of model


In [None]:
# checking for GPU availblivity
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu" )

# for reproducibilty of same randomness
torch.manual_seed(42)
if torch.cuda.is_available():
  torch.cuda.manual_seed(42)

# data preparation
# tranform to tensor and normalize them using mean and std
transform_train = transforms.Compose([
    # transforms.RandomCrop(32, padding = 4),       # get 32*32 pixel chunk from image and add 4 pixel around that 32*32 -> 40*40
    transforms.CenterCrop(32),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465),     # mean tuple
                         (0.2470, 0.2435, 0.2616))])   # standard deviation  tuple

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465),
                         (0.2470, 0.2435, 0.2616))
])

# Downloading data and loding CIFAR-10 dataset

# 1. what is tranform?


*   optional callable
*   Input -> PIL(python imaging Library) image or tensor
*   output -> tranformed version of image
*   similar to personal image editor that crop, flip or adjust image and prepare data for training and interence
*   transforms.RandomCrop  -> randomly crop PIL image to specified size(32*32)



In [None]:
train_dataset = torchvision.datasets.CIFAR10(
    # root = '/content/drive/MyDrive/Colab Notebooks/CIFAR_10 dataset',
    root = "lab/tree/model_resnet",
    train = True,                                     # used for training purpose
    download = True,                                  # download and put in "root" directory
    transform = transform_train               # it takes PIL image or tensor-> return tranformed version
)

test_dataset = torchvision.datasets.CIFAR10(
    # root = "/content/drive/MyDrive/Colab Notebooks/CIFAR_10 dataset",
    root = "lab/tree/model_resnet",
    train = False,
    download = True,
    transform = transform_test
)

#reading data info
print(f"Training data sets number : {len(train_dataset)}")
print(f"Testing data sets number:   {len(test_dataset)}")
# chckinf data and label return values
data, labels = train_dataset[0]
print(f"Data type: {type(data)} and Label types : {type(labels)}")

Training data sets number : 50000
Testing data sets number:   10000
Data type: <class 'torch.Tensor'> and Label types : <class 'int'>


## Splitting Task for CIFAR-10

In [None]:
from torch.utils.data import Subset
import random
# here " dataset = train_dataset " which is a global variable and it is passed when it is called at Cl calling
# whole dataset is splitted into task i.e 50k / num of task
#
def splitting_task(dataset, num_tasks):
    dataset_size = len(dataset)          # Total number of sample in dataset
    idx = list(range(dataset_size))      # Create list of indices 0,1,2,............
    random.shuffle(idx)                  # shuffle for creating randomness of data
    split_size = dataset_size // num_tasks  ## integer divion is done e.g 1000 / 5 = 250 -> one task have 250 data
    task_idx = []                         # for storing indices as data are shuffled

    for i in range(num_tasks):
        start_idx = i * split_size    # say 0, 10k, 20k, 30k,40k
        end_idx = (i + 1) * split_size if i < num_tasks -1 else dataset_size  # 10k, 20k, 30k, 40k, 50k
        task_idx.append(idx[start_idx:end_idx])

    tasks = [Subset(dataset, idx_counter) for idx_counter in task_idx]

    return tasks

# creating a data loader and sepcifying a class in CIFAR-10

In [None]:
# number of core in default google collab  =  2
import os
print("maximum number of worker = ", os.cpu_count())

maximum number of worker =  16


# Why transpose is important before display?
*   reordering dimension from (C, H, W) -> (H, W, C)
*   image in tensors(PyTorch/TensorFlow) are [channels, Height, width]
*   image in numpy, OpenCV, PIL, matplotlib are [height, width, channels]  

# DataLoader  -> return a tuple (images, labels)
images = tensor of shape [batch_size, channels, height, width]
labels = tensor of shape [batch_size]
iter() function -> convert dataloader into iterator and ready to dispatch one batch at time
next() function -> retrives next item from iterator(i.e. 1st batch, 2nd...)


In [None]:
train_dataloader = DataLoader(train_dataset,
                              batch_size = 128,
                              shuffle = True,
                              num_workers = 2)
test_dataloader = DataLoader(test_dataset,
                             batch_size = 64,
                             shuffle = False,
                             num_workers = 2)

# define class in CIFAR-10
classes = ('airplane', 'automobile','bird','cat','deer',
           'dog','frog','horse','ship','truck')

# Building Resnet(Residual Network) Architecture

Q . how output dimension can be preserved using padding?
*   suppose i/p tensor = (1,1,5,5) -> (batch_size, i/p channels, height, weight)

---


*   **No padding:**
output size = (1, o/p_channel, 3,3) as 3*3 kernel reduce dimension by 2*(5-(3-1)) = 2*(3)

---
* **with padding = 1:**
input_size  = (1, o/p_channel, 7,7) -> add 1 pixel all around input features
output_size = (1, o/p_channel, 5,5) -> as 3*3 kernel reduce dimesnion by 2*(7-(3-1)) = 2* (5)
therefore, original spatial size is preserved

In [None]:
class BasicResNet_block(nn.Module):
  expansion = 1
  # for bottle neck (used ib ResNets(50,101,152))
  #expansion = 4
  def __init__(self, in_channels, out_channels, stride = 1):
    super(BasicResNet_block, self).__init__()
    # first convolution
    self.conv1 = nn.Conv2d(in_channels, out_channels,
                           kernel_size = 3,
                           stride = stride,      # take jump of each pixel at a time
                           padding = 1,     # adding 1 pixel around input features map(i/p tensor)
                           bias = False)    # False -> not learnable and won`t be updated

    self.relu = nn.ReLU()
    self.bn1 = nn.BatchNorm2d(out_channels)
    self.conv2 = nn.Conv2d(out_channels, out_channels,
                           kernel_size = 3,
                           stride = 1,
                           padding = 1,
                           bias = False)
    self.bn2 = nn.BatchNorm2d(out_channels)
    # skip connection is performed by "shortcut"
    self.shortcut = nn.Sequential()
    # if dimension change , we need to adjust shortcut connection
    if stride !=1 or in_channels != self.expansion * out_channels:    # this occur only in bottlneck not in simple ResNet
      self.shortcut = nn.Sequential(nn.Conv2d(in_channels,
                                              self.expansion * out_channels,      # here output channel is changed in bottleneck ResNet
                                              kernel_size = 1, # 1
                                              stride = stride,
                                              bias = False),
                                    nn.BatchNorm2d(self.expansion * out_channels))


  def forward(self, x):
    out = self.conv1(x)
    out = self.bn1(out)
    out = self.relu(out)
    out = self.conv2(out)
    out = self.bn2(out)
    out += self.shortcut(x)
    out = self.relu(out)
    return out

# define ResNet model
class ResNet_model(nn.Module):
  def __init__(self, block, num_blocks, num_classes = 10):
    super(ResNet_model, self).__init__()
    self.in_channels = 64
    self.conv1 = nn.Conv2d(3, 64,
                           kernel_size = 3,
                           stride = 1,
                           padding = 1,
                           bias = False)
    self.bn1 = nn.BatchNorm2d(64)
    self.relu = nn.ReLU()
    # memo : block = BasicResNet_block class instance
    self.layer1 = self._make_layer(block, 64, num_blocks[0], stride = 1)
    self.layer2 = self._make_layer(block, 128, num_blocks[1], stride = 2)
    self.layer3 = self._make_layer(block, 256, num_blocks[2], stride = 2)
    self.layer4 = self._make_layer(block, 512, num_blocks[3], stride = 2)
    self.linear = nn.Linear(512 * block.expansion, num_classes)

  def _make_layer(self, block, out_channels, num_blocks, stride):
    strides = [stride] + [1]*(num_blocks - 1)
    # output is [1,1],[2,1],[2,1],[2,1]
    layers = []
    for stride in strides:
      # here block is BasicResNet_block class instance
      layers.append(block(self.in_channels, out_channels, stride))
      self.in_channels = out_channels * block.expansion

    return nn.Sequential(*layers)

  def forward(self, x):
    out = self.conv1(x)   # 32*32*3 -> 32*32*64
    out = self.bn1(out)
    out = self.relu(out)
    out = self.layer1(out)      # stride=1, size = N*32*32*64
    out = self.layer2(out)      # stride=2, size = N*16*16*128
    out = self.layer3(out)      # stride=2, size = N*8*8*256
    out = self.layer4(out)      # stride=2, size = N*4*4*512
    out = F.avg_pool2d(out, 4)  # agerage down 4*4 kernel to size = 1*1*512 -> preparing for linear layer
    out = out.view(out.size(0), -1)   # flatten to (batch_size, 512)
    out = self.linear(out)            #  N*512*1 = N*512 input features
    return out


In [None]:
model = ResNet_model(BasicResNet_block,[2, 2, 2, 2]).to(device)
# [2,2,2,2] is a "num_blocks" argument passed for ResNet_model
# print(model)
# define a loss function
criterion = nn.CrossEntropyLoss()
#optimizer = optim.Adam(model.parameters(), lr = 0.001)
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9, weight_decay= 5e-4)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max = 30)

# count the number of paramter
# .numel() is a method in PyTorch ."number of elements"
total_parameter = sum(p.numel() for p in model.parameters())
print(f"Total parameter: { total_parameter:,}")

Total parameter: 11,173,962


# Training and Evalution

In [None]:
# train function
def train_epoch(model, train_dataloader,optimizer, criterion, save_dir, train_epochs = 30):
  model.train()
  train_loss_list = []
  train_accuracy_list = []
  for epoch in range(train_epochs):
    train_ok_count = 0
    train_total = 0
    train_epoch_loss = 0
    for i, (data,labels) in enumerate(train_dataloader, 0):

      # tranfer data and label to same device
      data, labels = data.to(device), labels.to(device)

      # zero the parameter gardient
      optimizer.zero_grad()

      outputs = model(data)
      loss = criterion(outputs, labels)
      loss.backward()
      optimizer.step()
      train_epoch_loss += loss.item()
      scheduler.step()
      # for training accuracy
      with torch.no_grad():
        maximum_predict, max_predicted_index = torch.max(outputs.data,1)
        # total number of target size
        train_total += labels.size(0)
        train_ok_count += (max_predicted_index == labels).sum().item()
    if save_dir is not None:
        torch.save(model.state_dict(), f"{save_dir}/epoch_{epoch}.pth")

    avg_loss = train_epoch_loss / len(train_dataloader)
    train_loss_list.append(avg_loss)
    # efficacy calculate
    train_accuracy = train_ok_count / train_total * 100
    train_accuracy_list.append(train_accuracy)

    #print(f"Epoch: {epoch + 1}, Training loss: {avg_loss:.4f}, Training accuracy: {train_accuracy:.4f}")

  return train_accuracy_list, train_loss_list

def test_epoch(model, test_dataloader, criterion, save_dir, test_epochs = 30):

    # model.load_state_dict(torch.load(f"/content/drive/MyDrive/Colab Notebooks/CIFAR_10 model location/epoch_{epoch}.pth"))
    test_loss_list = []
    test_accuracy_list = []
    #model.load_state_dict(torch.load(f"{save_dir}/epoch_29.pth"))
    for epoch in range(test_epochs):

        model.load_state_dict(torch.load(f"{save_dir}/epoch_29.pth"))
        model.eval()
        test_epoch_loss = 0
        test_ok_count = 0
        test_total = 0

    with torch.no_grad():
        for i, (data, labels) in enumerate(test_dataloader, 0):

            #print(f"Batch {i}: Data type: {type(data)}, Labels type: {type(labels)}")
            # tranfer data and label to same device
            data, labels = data.to(device), labels.to(device)

            test_output = model(data)
            test_loss = criterion(test_output, labels)
            test_epoch_loss += test_loss.item()

            # calculating accuracy and loss during evalution mode
            maxmimum_predict, max_predict_idx = torch.max(test_output.data, 1)
            test_total += labels.size(0)
            test_ok_count += (max_predict_idx == labels).sum().item()

    average_loss = test_epoch_loss / len(test_dataloader)
    test_loss_list.append(average_loss)
    test_accuracy = test_ok_count / test_total * 100
    test_accuracy_list.append(test_accuracy)

    print(f"Epoch: {epoch + 1}, Testing loss: {average_loss:.4f}, Testing accuracy: {test_accuracy:.4f}")

    return test_accuracy_list, test_loss_list

# Splitting a task for catastrophic forgetting

In [None]:
import os
# calling a splitting_task defintion to split the total  training dataset into fraction
num_tasks = 5
epoch_per_task = 20
tasks = splitting_task(train_dataset, num_tasks)
print(len(tasks))
print(tasks)


save_dir_catastropic = "/home/sabinnmc/work/image_classification/catastrophic forgetting model"
os.makedirs(save_dir_catastropic, exist_ok=True)  # Creates the folder if it’s not there

5
[<torch.utils.data.dataset.Subset object at 0x7489f6457740>, <torch.utils.data.dataset.Subset object at 0x7489f647d9d0>, <torch.utils.data.dataset.Subset object at 0x7489f647e3c0>, <torch.utils.data.dataset.Subset object at 0x7489f647dd30>, <torch.utils.data.dataset.Subset object at 0x7489f647f230>]


# Tracking all the accuracy and buliding a matrix of accuracy for comparing

# Training loop and testing loop

In [None]:
# tracking acccuracy for output anlaysis
train_accuracy_splitList = []
test_accuracy_by_split_on_whole  = []
test_accuracy_splitList = []
clock = 1
for task_id, task_dataset in enumerate(tasks):

    train_accuracy_list_1 = []
    test_accuracy_list_1  = []
    split_dataloader = DataLoader(task_dataset,
                                 batch_size = 128,
                                 shuffle = True,
                                 num_workers = 2)
    train_accuracy_list_1, _ = train_epoch(model, split_dataloader, optimizer, criterion, save_dir_catastropic)
    train_accuracy_splitList.append(train_accuracy_list_1)
    print("\n Test of last epoch on all test dataset. ")
    test_accuracy_list_1, _ = test_epoch(model, test_dataloader, criterion, save_dir_catastropic)
    test_accuracy_by_split_on_whole.append(test_accuracy_list_1)
    accuracies_chora = []
    print("\n Evaluation of past already Train dataset")
    for counter in range(clock):
        test_accuracy_list_child = []

        child_dataloader = DataLoader(tasks[counter],
                                     batch_size = 128,
                                     shuffle = True,
                                     num_workers = 2)

        print(f"\n Evaluation of task number {counter} / {clock}")
        test_accuracy_list_child, _ = test_epoch(model, child_dataloader, criterion, save_dir_catastropic)
        accuracies_chora.append(test_accuracy_list_child)
    # chora ko list inside bau ko main list vitra vai
    test_accuracy_splitList.append(accuracies_chora)
    clock += 1

In [None]:
# convert to  a NumPy array to match table structure
n_tasks = len(tasks)
# intializing all element as NaN
test_accuracy_matrix = np.full((n_tasks, n_tasks), np.nan)
for i in range(n_tasks):
    for ticker in range(0, i + 1):
        accuracy = test_accuracy_splitList[i][ticker]
        test_accuracy_matrix[i, ticker] = accuracy[-1] if isinstance(accuracy, list) else accuracy
print(test_accuracy_matrix)
print(test_accuracy_matrix.dtype)


# plotting a graph

In [None]:
plt.figure(figsize = (10,6))
colors = ["b", "g", "r", "c", "m"]
labels = [f" Test Task {i}" for i in range(5)]
ticker = 0
for counter in range(5):
    column_data = test_accuracy_matrix[:, counter]
    valid_data  = column_data[~np.isnan(column_data)]
    # valid_data = test_accuracy_matrix[counter,:counter + 1]
    valid_indices = np.where(~np.isnan(column_data))[0]
    ticker -= 1
    # plt.plot(range(counter  +1), valid_data, marker = 'o', color = colors[counter], label = labels[counter] )
    plt.plot(valid_indices, valid_data, marker='o', linestyle='-', linewidth=2, markersize=8, color=colors[counter],
             label=labels[counter])
plt.title("Test accuracies per training Task")
plt.xlabel("Evaluation Task")
plt.ylabel("Accuracy")
plt.grid(True)
plt.legend()
plt.show()

# <center> Comparing MLP with ResNet-18

# Equivalenting ResNet18 into MLP for catastrophic forgetting

In [None]:
class MLP(nn.Module):
    """ input size = 32 * 32 * 3 = weight * height * channel = 3072
    -> output layer = task is splitted into two tasks (5 per each task)
    -> mm.Faltten() = flat image to linear = 32 * 32 * 3 = 3072
    """
    def __init__(self, input_feature = 3072, hidden_layer = 1024, num_classes = 10):
        super().__init__()
        """ 1. reason for falttening a data
        If the input to an nn.Linear layer comes from a layer that outputs a multi-dimensional tensor (e.g., a convolutional layer
        with shape (batch_size, channels, height, width) or a pooling layer),
        you need to flatten it into a 1D tensor (e.g., (batch_size, channels * height * width)).
        """
        self.flatten = nn.Flatten()

        # define layer
        self.func1 = nn.Linear(input_feature, hidden_layer)
        self.bn1   = nn.BatchNorm1d(1024)
        self.func2 = nn.Linear(1024, 512)
        self.bn2   = nn.BatchNorm1d(512)
        self.func3 = nn.Linear(512, 256)
        self.bn3   = nn.BatchNorm1d(256)
        self.func4 = nn.Linear(256, num_classes)
        self.bn4   = nn.BatchNorm1d(10)

        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.0)

    def forward(self, x):
        x = self.flatten(x)
        # first hidden layer
        x = self.dropout(self.relu(self.bn1(self.func1(x))))
        # second hidden layer
        x = self.dropout(self.relu(self.bn2(self.func2(x))))
        # third hidden layer
        x = self.dropout(self.relu(self.bn3(self.func3(x))))
        # output layer
        x = self.func4(x)

        return x

In [None]:
# calling MLP
model_MLP = MLP().to(device)
# print(model)
# define a loss function
criterion = nn.CrossEntropyLoss()
#optimizer = optim.Adam(model.parameters(), lr = 0.001)
optimizer = optim.SGD(model_MLP.parameters(), lr=0.01, momentum=0.9, weight_decay= 5e-4)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max = 30)

In [None]:
save_dir_mlp = "/home/sabinnmc/work/image_classification/model_MLP"
os.makedirs(save_dir_mlp, exist_ok=True)  # Creates the folder if it’s not there

# Runnig same ResNet-18 model style but using <b><span style = "color: red">MLP(Multi layer Perceptron)</span>

In [None]:
# tracking acccuracy for output anlaysis
train_accuracy_splitList = []
test_accuracy_by_split_on_whole  = []
test_accuracy_splitList = []
clock = 1
for task_id, task_dataset in enumerate(tasks):

    train_accuracy_list_1 = []
    test_accuracy_list_1  = []
    split_dataloader = DataLoader(task_dataset,
                                 batch_size = 128,
                                 shuffle = True,
                                 num_workers = 2)
    train_accuracy_list_1, _ = train_epoch(model_MLP, split_dataloader, optimizer, criterion, save_dir_mlp)
    train_accuracy_splitList.append(train_accuracy_list_1)
    print("\n Test of last epoch on all test dataset. ")
    test_accuracy_list_1, _ = test_epoch(model_MLP, test_dataloader, criterion, save_dir_mlp)
    test_accuracy_by_split_on_whole.append(test_accuracy_list_1)
    accuracies_chora = []
    print("\n Evaluation of past already Train dataset")
    for counter in range(clock):
        test_accuracy_list_child = []

        child_dataloader = DataLoader(tasks[counter],
                                     batch_size = 128,
                                     shuffle = True,
                                     num_workers = 2)

        print(f"\n Evaluation of task number {counter} / {clock}")
        test_accuracy_list_child, _ = test_epoch(model_MLP, child_dataloader, criterion, save_dir_mlp)
        accuracies_chora.append(test_accuracy_list_child)
    # chora ko list inside bau ko main list vitra vai
    test_accuracy_splitList.append(accuracies_chora)
    clock += 1

In [None]:
# convert to  a NumPy array to match table structure
n_tasks = len(tasks)
# intializing all element as NaN
test_accuracy_matrix = np.full((n_tasks, n_tasks), np.nan)
for i in range(n_tasks):
    for ticker in range(0, i + 1):
        accuracy = test_accuracy_splitList[i][ticker]
        test_accuracy_matrix[i, ticker] = accuracy[-1] if isinstance(accuracy, list) else accuracy
print(test_accuracy_matrix)
print(test_accuracy_matrix.dtype)


# Plotting graph using MLP

In [None]:
plt.figure(figsize = (10,6))
colors = ["b", "g", "r", "c", "m"]
labels = [f" Test Task {i}" for i in range(5)]
ticker = 0
for counter in range(5):
    column_data = test_accuracy_matrix[:, counter]
    valid_data  = column_data[~np.isnan(column_data)]
    # valid_data = test_accuracy_matrix[counter,:counter + 1]
    valid_indices = np.where(~np.isnan(column_data))[0]
    ticker -= 1
    # plt.plot(range(counter  +1), valid_data, marker = 'o', color = colors[counter], label = labels[counter] )
    plt.plot(valid_indices, valid_data, marker='o', linestyle='-', linewidth=2, markersize=8, color=colors[counter],
             label=labels[counter])
plt.title("Test accuracies per training Task")
plt.xlabel("Evaluation Task")
plt.ylabel("Accuracy")
plt.grid(True)
plt.legend()
plt.show()

# <center><u><b>Continual Learning Implementation</b></u></center>
<ol><h2>
    <li>Using Memory Buffer </li>
    <li> Using Elastic Weight Consolidation (EWC) </li>
</h2>
</ol>Regularization based Approch

# <center> <B> 1.  Using Memory Buffer </B> </center>
## 1.1 Memory Dataset class definition for fetching sample and returning (x,y) tuple and number of sample
<ul><li> This class inherit from class to create a custom datset from Pytorch <code style="color: red;"> torch.utils.data </code></li>
<li><code style="color: red;"> Dataset </code> class is part of PyTorch data loading pipeline whivh provide standard interface for accessing data samples </li>
</ul>

In [None]:
from torch.utils.data import Dataset
from PIL import Image
# When we have custom data that doesn’t fit into pre-built datasets the  this is import
class MemoryDataset(Dataset):
    def __init__(self, x_data, y_data, transform):
        self.x_data = x_data
        self.y_data = y_data
        self.transform = transform

    """ Fetches a single data sample from your dataset based on the provided index.
        -> It returns a tuple (x, y) where:
        -> x is the input data ( an image) at position index in self.x_data.
        -> y is the corresponding label or target (e.g., a class label) at position index in self.y_data.
    """
    def __getitem__(self, index):
        x, y = self.x_data[index], self.y_data[index]
        # convert tensor to PIL imgage transformer
        if isinstance(x, torch.Tensor):
            if x.dim() == 3 and x.shape[0] == 3:                  #[C, H, W] format confirmation
                # converting numpy and then PIL
                x_np = x.numpy()
                if x_np.max() <=1.0:
                    x_np = (x_np * 255).astype(np.uint8)
                else:
                    x_np = x_np.astype(np.uint8)
                # convert to [H, W, C] for PIL
                x_np = np.transpose(x_np, (1,2,0))
                x = Image.fromarray(x_np)
        if self.transform:
            # # Convert tensor to PIL Image (assuming x is an image tensor)
            # x = x.cpu().numpy()  # Convert tensor to NumPy array
            # x = Image.fromarray(x.astype('uint8'), 'RGB')  # Convert to PIL Image (adjust mode as needed)
            x = self.transform(x)
        return x, y
    """
    Return number of samples from samples from dataset
    """
    def __len__(self):
        return len(self.x_data)

##  1.2 Memory Buffer class definition

In [None]:
from collections import defaultdict
class MemoryBuffer:
    def __init__(self, capacity = 1000):
        self.capacity = capacity
        self.memory_x = []
        self.memory_y = []
        # defaultdict can count the frequency of data or labels
        self.class_counts = defaultdict(int)

    def add_sample(self, x, y):

        """ .detach() remove tensor "x" (image) from computational graph i.e. no longer track gradients
                -> beacuse in CL, memory buffer store sample for later use not for immediate gradient compute
                -> if not detached , consume memory and compute resource unncessarily
        .cpu() remove tensor from GPU to CPU because memory buffer are oftern stored in CPU rather then GPU
                -> GPU are expensive and its resource are used carefully for computation not for storing
                ->  CL invloves long term storage of samples across tasks and keeping is GPU is not good idea
                ->  Transfering data from CPU to GPU doesnot create memory bottleneck but vice versa will do
        """
        x = x.detach().cpu()
        y = y.item()        if isinstance(y, torch.Tensor) else y
        #_______________________________________________________________________________________
        #  determing a balanced buffer eith sample number of sample for each class
        # ____________________________________________________________________ for future__________________
        if len(self.memory_x) < self.capacity:
            self.memory_x.append(x)
            self.memory_y.append(y)
            self.class_counts[y] += 1
        else:
            max_class = max(self.class_counts, key = self.class_counts.get)
            idx = [i for i, label in enumerate(self.memory_y) if label == max_class]
            # class balacing by random sampling
            # random_idx = buffer_size / total_samples_seen
            if idx:
                replace_idx = random.choice(idx)
                self.class_counts[self.memory_y[replace_idx]] -= 1
                self.memory_x[replace_idx] = x
                self.memory_y[replace_idx] = y
                self.class_counts[y] += 1               # since we just data so count of y label will increase by 1

    def get_memory_dataset(self, transform = None):
      # creating a dataset from memory samples
      memory_dataset = MemoryDataset(self.memory_x, self.memory_y, transform)
      return memory_dataset

    def __len__(self):
      return len(self.memory_x)

    def get_class_distribution(self, class_counts):
      # return class distribution in the memory buffer
      return dict(self.class_counts)

# 1.3 (i) Training model with Memory Buffer
<code>def train_epoch(model, train_dataloader,optimizer, criterion, save_dir, train_epochs = 30):
def test_epoch(model, test_dataloader, criterion, save_dir, test_epochs = 30):

In [None]:
# train function
def train_with_memory(model_MLP, memory_buffer,optimizer, criterion, train_epochs = 30):
  # creating a new dataset for using memory buffer
  memory_dataset = memory_buffer.get_memory_dataset(transform_train)   # transform_train is defined as global variable at top
  memory_dataloader = DataLoader(memory_dataset, batch_size = 128, shuffle = True)
  print(f"\nMemory dataloader during training with memory is : size( {len(memory_buffer)} )")

  return train_epoch(model_MLP, memory_dataloader,optimizer, criterion, save_dir = None, train_epochs = 30)

#  1.5 Train on task

In [None]:
def train_with_task(model_MLP, task_dataset, memory_buffer, optimizer, criterion, train_epochs = 10):
    # Creating a new dataset for using memory buffer
    task_train_dataloader = DataLoader(task_dataset, batch_size=128, shuffle=True, num_workers=2)
    model_MLP.train()
    train_loss_list = []
    train_accuracy_list = []

    accuracy_list, loss_list = train_epoch(model_MLP, task_train_dataloader,optimizer, criterion, save_dir = None, train_epochs = 30)
    # Initializing before using
    memory_dataloader = None
    memory_iteration = None

    # # Create a combined dataset if memory buffer has samples
    # if memory_buffer is not None and len(memory_buffer) > 0:
    #     # transform_train is global object used for augmentation
    #     memory_dataset = memory_buffer.get_memory_dataset(transform_train)
    #     # Oversample memory data (increasing weight of old data)
    #     memory_dataloader = DataLoader(memory_dataset, batch_size=64, shuffle=True, num_workers=2)
    #     memory_iteration = iter(memory_dataloader)

    # UPDATE memory buffer with samples from current task if needed
    if memory_buffer is not None:
        # add samples to memory buffer here and this part is missing
        for i, (data, labels) in enumerate(task_train_dataloader):
            for j in range(len(data)):
                if random.random() < 0.01:  # Sample with 1% probability to avoid bias
                    memory_buffer.add_sample(data[j].cpu(), labels[j].item())

    return accuracy_list, loss_list

# 1.6 Running continual learning model
## Definition of train and test
<code>train_accuracy_list, train_loss_list = def train_epoch(model, train_dataloader,optimizer, criterion, save_dir, train_epochs = 30):
<code>
test_accuracy_list, test_loss_list = def test_epoch(model, test_dataloader, criterion, save_dir, test_epochs = 30):

<code>def train_with_task(model, task_dataset, memory_buffer, optimizer, criterion, train_epochs = 10):

In [None]:
import random
#def Continual_learning_experiment():
num_tasks = 5
memory_capacity = 1000
epoch_per_task = 5

memory_buffer = MemoryBuffer(capacity = memory_capacity)
if memory_buffer is not None:
    print(" Object of class is not empty and it can train a model")
    print(" length if memory buffer is : ", len(memory_buffer))
# already called in above
# tasks = splitting_task(train_dataset, num_tasks)

In [None]:
# Tracking accuracy and loss across all tasks
train_accuracy_list_MB = []
train_loss_list_MB = []
train_accuracy_list_TASK = []
train_loss_list_TASK = []
test_accuracy_list_CL = []
test_loss_list_CL = []

# Tracking results when we train model using train_on_task method
for task_id, task_dataset in enumerate(tasks):
    print(f"\nTraining on Task {task_id + 1} / {num_tasks}")
    print(f"\nThe size of each task (size: {len(task_dataset)})")

    train_accuracy_list_task, train_loss_list_task = train_with_task(model_MLP,
                                                  task_dataset,
                                                  memory_buffer=memory_buffer,
                                                  optimizer=optimizer,
                                                  criterion=criterion,
                                                  train_epochs=30)

    # Store task training results
    train_accuracy_list_TASK.append(train_accuracy_list_task)
    train_loss_list_TASK.append(train_loss_list_task)

    # Train on memory buffer after task (replay)
    if memory_buffer is not None and len(memory_buffer) > 0:
        print(f"Training on memory buffer (size: {len(memory_buffer)})")
        train_accuracy_list_mb, train_loss_list_mb = train_with_memory(model_MLP,
                                                       memory_buffer=memory_buffer,
                                                       optimizer=optimizer,
                                                       criterion=criterion,
                                                       train_epochs=20)

        # Store memory buffer training results
        train_accuracy_list_MB.append(train_accuracy_list_mb)
        # You might want to store loss too
        # train_loss_list_MB.append(train_loss_list_mb)

    # Evaluating after each task
    print(f"Evaluating on test dataset (size: {len(test_dataloader.dataset)})")
    test_accuracy_list_cl, test_loss_list_cl = test_epoch(model_MLP,
                                                 test_dataloader,
                                                 criterion,
                                                 save_dir = save_dir_mlp ,
                                                 test_epochs=30)
    test_accuracy_list_CL.append(test_accuracy_list_cl)
    test_loss_list_CL.append(test_loss_list_cl)

# defining class for CutMix method

# trying a cutMix approach



1.  we are randomly selecting a number and if selection of number is less than 0.5   then we are performing CutMix idea
2.   We are not using all images only randomly selected data are only mixed

In [None]:
class CutMix(object):
  def __init__(self, alpha= 0.1, prob = 0.5):
    self.alpha = alpha
    self.prob = prob

  def __call__(self, batch):
    data, labels = batch
    print("random.random return is ", random.random())

    if random.random() >= self.prob:
      return data, labels, None, None

    index = torch.randperm(data.size(0))
    shuffle_images = data[index]
    # it is special beta distribution. related to beta distribution
    lamda = np.random.beta(self.alpha,    # alpha > 0
                            self.alpha)    # beta > 0
    # data = images and tuples data look like (batch_no, channel, height, width)
    img_ht, img_wd = data.shape[2:]
    x_center = np.random.uniform(0, img_ht)   # creata random value among 0 <= value < img_ht
    y_center = np.random.uniform(0, img_wd)
    w = img_wd * np.sqrt(1 - lamda)
    h = img_ht * np.sqrt(1 - lamda)
    x0 = int(np.round(max(x_center - w / 2, 0)))
    y0 = int(np.round(max(y_center - h / 2, 0)))
    x1 = int(np.round(min(x_center + w / 2, img_wd)))
    y1 = int(np.round(min(y_center + w / 2, img_ht)))

    data[:, :, y0:y1, x0:x1] = shuffle_images[:, :, y0:y1, x0:x1]
    # 1-total patch area/total_area -> 1- portion % of patch area -> real image section remaining after patch
    lamda = 1 - ((x1 - x0) * (y1 - y0) /   (img_ht * img_wd))
    return data, labels, lamda, index

# Memory Buffer for replay based method (GDumb - paper method)