# SETTING UP

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import os

os.chdir('/content/drive/MyDrive/HCMUT/ML242/GA')

In [3]:
if os.path.isdir('/content/drive/MyDrive/HCMUT/ML242/GA'):
    print("OK")
else: print("NO")

OK



**Dataset structure**
```
dataset
|-- Original
|        |-- Training
|        |       |-- glioma     (Tr_gl_xxxx.jpg)    --> 1321 images
|        |       |-- meningioma (Tr_me_xxxx.jpg)    --> 1339 images
|        |       |-- notumor    (Tr_no_xxxx.jpg)    --> 1595 images
|        |       |-- pituitary  (Tr_pi_xxxx.jpg)    --> 1457 images
|        |-- Testing
|        |       |-- glioma     (Te_gl_xxxx.jpg)    --> 300 images
|        |       |-- meningioma (Te_me_xxxx.jpg)    --> 306 images
|        |       |-- notumor    (Te_no_xxxx.jpg)    --> 405 images
|        |       |-- pituitary  (Te_pi_xxxx.jpg)    --> 300 images
|-- AfterPreprocess
|        |-- Training
|        |       |-- glioma     (Tr_gl_xxxx.jpg)    --> 1321 images
|        |       |-- meningioma (Tr_me_xxxx.jpg)    --> 1339 images
|        |       |-- notumor    (Tr_no_xxxx.jpg)    --> 1595 images
|        |       |-- pituitary  (Tr_pi_xxxx.jpg)    --> 1457 images
|        |-- Testing
|        |       |-- glioma     (Te_gl_xxxx.jpg)    --> 300 images
|        |       |-- meningioma (Te_me_xxxx.jpg)    --> 306 images
|        |       |-- notumor    (Te_no_xxxx.jpg)    --> 405 images
|        |       |-- pituitary  (Te_pi_xxxx.jpg)    --> 300 images
|        |-- augmented_img_paths.json
```



# Preprocessing Images

In [None]:
import numpy as np
import tqdm
import cv2
import imutils
import matplotlib.pyplot as plt
import random
import json

In [None]:
class ImgPreprocess:
    def __init__ (self,
                  origin_dir:str = "dataset/Original",
                  save_dir:str = "dataset/AfterPreprocess"):
        self.origin_dir = origin_dir
        self.save_dir = save_dir
        self.augmented_img_path = []

    def crop_img(self, img, extra_padding:int=0):
        """
	        Finds the extreme points on the image and crops the rectangular out of them
	    """
        gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
        gray = cv2.GaussianBlur(gray, (3, 3), 0)

        thresh = cv2.threshold(gray, 45, 255, cv2.THRESH_BINARY)[1]
        thresh = cv2.erode(thresh, None, iterations=2)
        thresh = cv2.dilate(thresh, None, iterations=2)

        cnts = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        cnts = imutils.grab_contours(cnts)
        c = max(cnts, key=cv2.contourArea)

        extLeft = tuple(c[c[:, :, 0].argmin()][0])
        extRight = tuple(c[c[:, :, 0].argmax()][0])
        extTop = tuple(c[c[:, :, 1].argmin()][0])
        extBot = tuple(c[c[:, :, 1].argmax()][0])
        new_img = img[extTop[1]-extra_padding:extBot[1]+extra_padding, extLeft[0]-extra_padding:extRight[0]+extra_padding].copy()

        return new_img

    def augment_img(self, img):
        new_img = img.copy()

        if np.random.rand() < 0.5:
            new_img = cv2.flip(new_img, 1)

        angle = np.random.uniform(-30, 30)
        (h, w) = new_img.shape[:2]
        center = (w // 2, h // 2)
        M = cv2.getRotationMatrix2D(center, angle, 1.0)
        new_img = cv2.warpAffine(new_img, M, (w, h),
                                 flags=cv2.INTER_LINEAR,
                                 borderMode=cv2.BORDER_REFLECT_101)

        alpha = np.random.uniform(0.8, 1.2)
        beta = np.random.randint(-20, 20)
        new_img = cv2.convertScaleAbs(new_img, alpha=alpha, beta=beta)

        return new_img

    def run_on_dataset(self, augment_ratio:float=0.25, final_size=(256, 256)):
        sub_dirs = ["Training", "Testing"]
        class_names = ["glioma", "meningioma", "notumor", "pituitary"]
        seed = 42

        random.seed(seed)
        np.random.seed(seed)

        augmented_paths = {}

        print(f"----- Preprocessing -----")

        for dir in sub_dirs:
            for name in class_names:
                out_dir = os.path.join(self.save_dir, dir, name)
                os.makedirs(out_dir, exist_ok=True)

        for dir in sub_dirs:
            augmented_paths[dir] = {}
            for name in class_names:
                augmented_paths[dir][name] = []
                in_dir = os.path.join(self.origin_dir, dir, name)
                out_dir = os.path.join(self.save_dir, dir, name)

                if not os.path.isdir(in_dir):
                    print(f"Bug found: in_dir not exist - in_dir: {in_dir}")
                    continue
                else:
                    print(f"Processing in {in_dir}")

                all_files = os.listdir(in_dir)
                all_imgs = [img for img in all_files if img.lower().endswith(('.jpg'))]
                if not all_imgs:
                    continue

                random.shuffle(all_imgs)
                n_augment = int(len(all_imgs) * augment_ratio)
                imgs_to_augment = set(all_files[:n_augment])
                print(f"--> Processing {len(all_imgs)} | Augmenting {n_augment} images.")

                rl_img = 0
                rl_augment = 0
                for img in all_imgs:
                    in_path = os.path.join(in_dir, img)
                    img_0 = cv2.imread(in_path)
                    if img_0 is None:
                        print(f"Failed to read image: {in_path}")
                        continue

                    try:
                        cropped_img = self.crop_img(img_0)
                        rl_img += 1

                        if img in imgs_to_augment:
                            cropped_img = self.augment_img(cropped_img)
                            augmented_paths[dir][name].append(os.path.join(out_dir, img))
                            rl_augment += 1

                        resized_img = cv2.resize(cropped_img, final_size, interpolation=cv2.INTER_AREA)
                        out_path = os.path.join(out_dir, img)
                        cv2.imwrite(out_path, resized_img)
                    except Exception as e:
                        print(f"Error processing {in_path}: {e}")

                print(f"----> Finish processing in {in_dir}")
                print(f"----> Total preprocessed img: {rl_img} | Total augmented img: {rl_augment}")

        self.augmented_img_path = augmented_paths
        augmented_json_file = "augmented_img_paths.json"
        augmented_file_path = os.path.join(self.save_dir, augmented_json_file)
        with open(augmented_file_path, 'w') as f:
            json.dump(augmented_paths, f, indent=4)

        print(f"Preprocessing complete | Augmented image paths saved to {augmented_file_path}")


    def load_augmented_paths(self, json_filename:str="augmented_img_paths.json"):
        print(f"----- Loading augmented images paths from json -----")
        file_path = os.path.join(self.save_dir, json_filename)
        if not os.path.exists(file_path):
            print(f"JSON file not found: {file_path}")
            return None

        with open(file_path, 'r') as f:
            data = json.load(f)

        self.augmented_img_path = data
        print(f"--> Loaded augmented image paths from {file_path} successfully")
        return data


    def plot_single_img(self, img):
        plt.figure(figsize=(8, 8))
        if len(img.shape) == 2:
            plt.imshow(img, cmap='gray')
        else:
            plt.imshow(img)
        plt.axis('off')
        plt.show()

In [None]:
img_prep = ImgPreprocess()
img_prep.run_on_dataset()

----- Preprocessing -----
Processing in dataset/Original/Training/glioma
--> Processing 1321 | Augmenting 330 images.
----> Finish processing in dataset/Original/Training/glioma
----> Total preprocessed img: 1321 | Total augmented img: 330
Processing in dataset/Original/Training/meningioma
--> Processing 1339 | Augmenting 334 images.
----> Finish processing in dataset/Original/Training/meningioma
----> Total preprocessed img: 1339 | Total augmented img: 334
Processing in dataset/Original/Training/notumor
--> Processing 1595 | Augmenting 398 images.
----> Finish processing in dataset/Original/Training/notumor
----> Total preprocessed img: 1595 | Total augmented img: 398
Processing in dataset/Original/Training/pituitary
--> Processing 1457 | Augmenting 364 images.
----> Finish processing in dataset/Original/Training/pituitary
----> Total preprocessed img: 1457 | Total augmented img: 364
Processing in dataset/Original/Testing/glioma
--> Processing 300 | Augmenting 75 images.
----> Finish 

# Data loader

In [39]:
import os
import cv2
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import numpy as np

In [40]:
class BrainTumorDataset(Dataset):
    def __init__ (self, data_dir, class_names, transform=None, mode="Training"):
        self.data_dir = data_dir
        self.class_names = class_names
        self.transform = transform
        self.mode = mode
        self.img_paths = []
        self.labels = []

        for class_id, class_name in enumerate(self.class_names):
            class_dir = os.path.join(self.data_dir, self.mode, class_name)
            for img_name in os.listdir(class_dir):
                if img_name.lower().endswith(('.jpg')):
                    img_path = os.path.join(class_dir, img_name)
                    self.img_paths.append(img_path)
                    self.labels.append(class_id)


    def __len__(self):
        return len(self.img_paths)


    def __getitem__(self, idx):
        img_path = self.img_paths[idx]
        img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
        img = img.astype('float32') / 255.0

        if self.transform:
            img = self.transform(img)

        label = self.labels[idx]
        return img, label

In [41]:
class Loader:
    def __init__(self,
                 data_dir:str="dataset/AfterPreprocess",
                 batch_size=32,
                 validation_split=0.2):
        self.data_dir = data_dir
        self.batch_size = batch_size
        self.validation_split = validation_split

        self.class_names = ["glioma", "meningioma", "notumor", "pituitary"]


    def get_dataloader(self):
        transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.5], std=[0.5])
        ])

        full_train_set = BrainTumorDataset(data_dir=self.data_dir,
                                           class_names=self.class_names,
                                           transform=transform,
                                           mode="Training")

        val_size = int(len(full_train_set) * self.validation_split)
        train_size = len(full_train_set) - val_size

        train_dataset, val_dataset = torch.utils.data.random_split(
            full_train_set,
            [train_size, val_size]
        )

        train_loader = DataLoader(train_dataset,
                                  batch_size=self.batch_size,
                                  shuffle=True)
        val_loader = DataLoader(val_dataset,
                                batch_size=self.batch_size,
                                shuffle=False)

        test_dataset = BrainTumorDataset(data_dir=self.data_dir,
                                         class_names=self.class_names,
                                         transform=transform,
                                         mode="Testing")
        test_loader = DataLoader(test_dataset,
                                 batch_size=self.batch_size,
                                 shuffle=False)
        return train_loader, val_loader, test_loader

In [None]:
loader = Loader(data_dir="dataset/AfterPreprocess", batch_size=32, validation_split=0.2)
train_loader, val_loader, test_loader = loader.get_dataloader()

for images, labels in train_loader:
    print(images.shape)
    print(labels.shape)  
    break


torch.Size([32, 1, 256, 256])
torch.Size([32])


In [None]:
for images, labels in val_loader:
    print(images.shape) 
    print(labels.shape) 
    break

torch.Size([32, 1, 256, 256])
torch.Size([32])


In [None]:
for images, labels in test_loader:
    print(images.shape)
    print(labels.shape)
    break

torch.Size([32, 1, 256, 256])
torch.Size([32])


# Dynamic Neural Networks

In [42]:
import torch
import torch.nn as nn

In [46]:
class DynamicNN(nn.Module):
    def __init__(self,
                 num_cnns=2,
                 num_fcns=2,
                 img_size=(1, 256, 256),
                 filters=[32, 64],
                 kernel_sizes=[3, 3],
                 strides=[1, 1],
                 pool_sizes=[2, 2],
                 fcn_units=[128, 64],
                 activation_index=0,
                 lr=0.01):
        super(DynamicNN, self).__init__()

        self.num_cnns = num_cnns
        self.num_fcns = num_fcns
        self.img_size = img_size
        self.filters = filters
        self.kernel_sizes = kernel_sizes
        self.strides = strides
        self.pool_sizes = pool_sizes
        self.fcn_units = fcn_units

        self.activate_function_list = [nn.ReLU(), nn.Sigmoid(), nn.Tanh()]
        self.activation_index = activation_index
        self.activation_function = self.activate_function_list[self.activation_index]

        self.conv_layers = self._build_conv_layers()
        self.fc_layers = self._build_fc_layers()

        self.lr = lr

    def _build_conv_layers(self):
        layers = []
        in_channels = self.img_size[0]

        for i in range(self.num_cnns):
            layers.append(nn.Conv2d(in_channels,
                                    self.filters[i],
                                    kernel_size=self.kernel_sizes[i],
                                    stride=self.strides[i]))
            layers.append(self.activation_function)
            layers.append(nn.MaxPool2d(self.pool_sizes[i]))
            in_channels = self.filters[i]

        return nn.Sequential(*layers)

    def _build_fc_layers(self):
        layers = []
        H, W = self.img_size[1], self.img_size[2]
        for i in range(self.num_cnns):
            H = (H - self.kernel_sizes[i]) // self.strides[i] + 1
            W = (W - self.kernel_sizes[i]) // self.strides[i] + 1
            H = H // self.pool_sizes[i]
            W = W // self.pool_sizes[i]

        in_features = self.filters[-1] * H * W

        for units in self.fcn_units:
            layers.append(nn.Linear(in_features, units))
            layers.append(self.activation_function)
            in_features = units

        layers.append(nn.Linear(self.fcn_units[-1], 4))
        layers.append(nn.Softmax(dim=1))
        return nn.Sequential(*layers)


    def forward(self, x):
        x = self.conv_layers(x)
        x = x.view(x.size(0), -1)
        x = self.fc_layers(x)
        return x


    def __str__(self):
        info = "DynamicNN Architecture:\n"
        info += f" Input Image Size: {self.img_size}\n"
        info += f" Number of CNN Layers: {self.num_cnns}\n"
        for i in range(self.num_cnns):
            in_channels = self.img_size[0] if i == 0 else self.filters[i-1]
            info += f"  CNN Layer {i+1}:\n"
            info += f"    Conv2d(in_channels={in_channels}, out_channels={self.filters[i]}, kernel_size={self.kernel_sizes[i]}, stride={self.strides[i]})\n"
            info += f"    MaxPool2d(pool_size={self.pool_sizes[i]})\n"
        info += f" Number of FCN Layers: {self.num_fcns}\n"
        for i in range(self.num_fcns):
            info += f"  FCN Layer {i+1}: Linear(units={self.fcn_units[i]})\n"
        info += f" Final Classification Layer: Linear({self.fcn_units[-1]} -> 4) + Softmax\n"
        info += f" Activation Function: {self.activation_function.__class__.__name__}\n"
        info += f" Learning Rate: {self.lr}\n"
        return info

In [47]:
temp_nn = DynamicNN()
print(temp_nn)

DynamicNN Architecture:
 Input Image Size: (1, 256, 256)
 Number of CNN Layers: 2
  CNN Layer 1:
    Conv2d(in_channels=1, out_channels=32, kernel_size=3, stride=1)
    MaxPool2d(pool_size=2)
  CNN Layer 2:
    Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1)
    MaxPool2d(pool_size=2)
 Number of FCN Layers: 2
  FCN Layer 1: Linear(units=128)
  FCN Layer 2: Linear(units=64)
 Final Classification Layer: Linear(64 -> 4) + Softmax
 Activation Function: ReLU
 Learning Rate: 0.01



# Genetic Algorithm Optimizer

In [48]:
import random
import torch
import torch.optim as optim
import torch.nn as nn
import time

In [None]:
class GAOptimizer:
    def __init__(self,
                 population_size=10,
                 generations=10,
                 mutation_rate=0.1,
                 device='cuda' if torch.cuda.is_available() else 'cpu',
                 epochs_per_model=20,
                 lr_choices=None,
                 activation_functions=None,
                 train_loader=None,
                 val_loader=None,
                 test_loader=None,
                 criterion=None):
        self.population_size = population_size
        self.generations = generations
        self.mutation_rate = mutation_rate
        self.device = device
        print(f"Training on {device}")
        self.epochs_per_model = epochs_per_model

        if lr_choices is None:
            self.lr_choices = [0.1, 0.05, 0.01, 0.005, 0.001, 0.0005, 0.0001, 0.00005, 0.00001]
        else:
            self.lr_choices = lr_choices

        if activation_functions is None:
            self.activation_functions = [nn.ReLU(), nn.Sigmoid(), nn.Tanh()]
        else:
            self.activation_functions = activation_functions

        self.population = []
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.test_loader = test_loader
        if criterion is None:
            self.criterion = nn.NLLLoss()
        else:
            self.criterion = criterion

    def initialize_population(self,
                              num_cnn_layers_range=(1, 4),
                              num_fcn_layers_range=(1, 4)):
        print(f"Initialize population")
        for _ in range(self.population_size):
            cnn_layers = random.randint(*num_cnn_layers_range)
            fcn_layers = random.randint(*num_fcn_layers_range)
            activation_index = random.randint(0, len(self.activation_functions) - 1)
            lr = random.choice(self.lr_choices)

            possible_filters = [16, 32, 64, 128]
            filters = []
            for i in range(cnn_layers):
                if i == 0:
                    filt = random.choice(possible_filters)
                else:
                    allowed = [f for f in possible_filters if f <= filters[i - 1]]
                    filt = random.choice(allowed)
                filters.append(filt)

            allowed_kernel_sizes = [3, 5, 7]
            kernel_sizes = []
            for i in range(cnn_layers):
                if i == 0:
                    k = random.choice(allowed_kernel_sizes)
                else:
                    allowed = [ks for ks in allowed_kernel_sizes if ks <= kernel_sizes[i - 1]]
                    k = random.choice(allowed)
                kernel_sizes.append(k)

            strides = [random.randint(1, 2) for _ in range(cnn_layers)]

            allowed_pool_sizes = [2, 3, 4]
            pool_sizes = []
            for i in range(cnn_layers):
                if i == 0:
                    p = random.choice(allowed_pool_sizes)
                else:
                    allowed = [ps for ps in allowed_pool_sizes if ps <= pool_sizes[i - 1]]
                    p = random.choice(allowed)
                pool_sizes.append(p)

            fcn_units = [random.randint(32, 256) for _ in range(fcn_layers)]

            individual = DynamicNN(num_cnns=cnn_layers,
                                   num_fcns=fcn_layers,
                                   filters=filters,
                                   kernel_sizes=kernel_sizes,
                                   strides=strides,
                                   pool_sizes=pool_sizes,
                                   fcn_units=fcn_units,
                                   activation_index=activation_index,
                                   lr=lr).to(self.device)
            self.population.append(individual)


    def crossover(self, parent1, parent2):
        child_cnn_layers = random.choice([parent1.num_cnns, parent2.num_cnns])
        child_fcn_layers = random.choice([parent1.num_fcns, parent2.num_fcns])
        child_activation_index = random.choice([parent1.activation_index, parent2.activation_index])
        child_lr = random.choice([parent1.lr, parent2.lr])

        def choose_param(list1, list2, idx):
            if idx < len(list1) and idx < len(list2):
                return random.choice([list1[idx], list2[idx]])
            elif idx < len(list1):
                return list1[idx]
            elif idx < len(list2):
                return list2[idx]
            else:
                return None  

        child_filters = []
        child_kernel_sizes = []
        child_strides = []
        child_pool_sizes = []
        for i in range(child_cnn_layers):
            filt = choose_param(parent1.filters, parent2.filters, i)
            kernel = choose_param(parent1.kernel_sizes, parent2.kernel_sizes, i)
            stride = choose_param(parent1.strides, parent2.strides, i)
            pool = choose_param(parent1.pool_sizes, parent2.pool_sizes, i)
            child_filters.append(filt)
            child_kernel_sizes.append(kernel)
            child_strides.append(stride)
            child_pool_sizes.append(pool)

        for i in range(1, child_cnn_layers):
            if child_filters[i] > child_filters[i - 1]:
                child_filters[i] = child_filters[i - 1]
            if child_kernel_sizes[i] > child_kernel_sizes[i - 1]:
                child_kernel_sizes[i] = child_kernel_sizes[i - 1]
            if child_pool_sizes[i] > child_pool_sizes[i - 1]:
                child_pool_sizes[i] = child_pool_sizes[i - 1]

        child_fcn_units = []
        for i in range(child_fcn_layers):
            unit = choose_param(parent1.fcn_units, parent2.fcn_units, i)
            if unit is None:
                unit = random.randint(32, 256)
            child_fcn_units.append(unit)

        child = DynamicNN(num_cnns=child_cnn_layers,
                          num_fcns=child_fcn_layers,
                          filters=child_filters,
                          kernel_sizes=child_kernel_sizes,
                          strides=child_strides,
                          pool_sizes=child_pool_sizes,
                          fcn_units=child_fcn_units,
                          activation_index=child_activation_index,
                          lr=child_lr).to(self.device)
        return child


    def mutate(self, individual):
        mutated = False

        lr_choices = self.lr_choices
        possible_filters = [16, 32, 64, 128]
        allowed_kernel_sizes = [3, 5, 7]
        allowed_pool_sizes = [2, 3, 4]

        if random.random() < self.mutation_rate:
            individual.num_cnns = random.randint(1, 4)
            mutated = True
        if random.random() < self.mutation_rate:
            individual.num_fcns = random.randint(1, 4)
            mutated = True
        if random.random() < self.mutation_rate:
            individual.activation_index = random.randint(0, len(self.activation_functions) - 1)
            mutated = True
        if random.random() < self.mutation_rate:
            individual.lr = random.choice(lr_choices)
            mutated = True

        current_cnn_layers = individual.num_cnns
        while len(individual.filters) < current_cnn_layers:
            if len(individual.filters) == 0:
                individual.filters.append(random.choice(possible_filters))
            else:
                allowed = [f for f in possible_filters if f <= individual.filters[-1]]
                individual.filters.append(random.choice(allowed))
        while len(individual.kernel_sizes) < current_cnn_layers:
            if len(individual.kernel_sizes) == 0:
                individual.kernel_sizes.append(random.choice(allowed_kernel_sizes))
            else:
                allowed = [k for k in allowed_kernel_sizes if k <= individual.kernel_sizes[-1]]
                individual.kernel_sizes.append(random.choice(allowed))
        while len(individual.strides) < current_cnn_layers:
            individual.strides.append(random.randint(1, 2))
        while len(individual.pool_sizes) < current_cnn_layers:
            if len(individual.pool_sizes) == 0:
                individual.pool_sizes.append(random.choice(allowed_pool_sizes))
            else:
                allowed = [p for p in allowed_pool_sizes if p <= individual.pool_sizes[-1]]
                individual.pool_sizes.append(random.choice(allowed))

        for i in range(current_cnn_layers):
            if random.random() < self.mutation_rate:
                if i == 0:
                    individual.filters[i] = random.choice(possible_filters)
                else:
                    allowed = [f for f in possible_filters if f <= individual.filters[i - 1]]
                    individual.filters[i] = random.choice(allowed)
                mutated = True
            if random.random() < self.mutation_rate:
                if i == 0:
                    individual.kernel_sizes[i] = random.choice(allowed_kernel_sizes)
                else:
                    allowed = [k for k in allowed_kernel_sizes if k <= individual.kernel_sizes[i - 1]]
                    individual.kernel_sizes[i] = random.choice(allowed)
                mutated = True
            if random.random() < self.mutation_rate:
                individual.strides[i] = random.randint(1, 2)
                mutated = True
            if random.random() < self.mutation_rate:
                if i == 0:
                    individual.pool_sizes[i] = random.choice(allowed_pool_sizes)
                else:
                    allowed = [p for p in allowed_pool_sizes if p <= individual.pool_sizes[i - 1]]
                    individual.pool_sizes[i] = random.choice(allowed)
                mutated = True

        current_fcn_layers = individual.num_fcns
        while len(individual.fcn_units) < current_fcn_layers:
            individual.fcn_units.append(random.randint(32, 256))
        for i in range(current_fcn_layers):
            if random.random() < self.mutation_rate:
                individual.fcn_units[i] = random.randint(32, 256)
                mutated = True

        if mutated:
            new_individual = DynamicNN(num_cnns=individual.num_cnns,
                                       num_fcns=individual.num_fcns,
                                       filters=individual.filters,
                                       kernel_sizes=individual.kernel_sizes,
                                       strides=individual.strides,
                                       pool_sizes=individual.pool_sizes,
                                       fcn_units=individual.fcn_units,
                                       activation_index=individual.activation_index,
                                       lr=individual.lr).to(self.device)
            individual = new_individual

        return individual

    def train_model(self, model):
        optimizer = optim.Adam(model.parameters(), lr=model.lr)
        best_val_accuracy = 0.0
        patience = 2  
        no_improvement = 0

        train_loss_list = []
        val_accuracy_list = []

        print(f"-----Training model-----")
        for epoch in range(self.epochs_per_model):
            print(f"Epoch {epoch + 1}: ...")
            model.train()
            epoch_losses = []
            for inputs, labels in self.train_loader:
                inputs, labels = inputs.to(self.device), labels.to(self.device)
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = self.criterion(torch.log(outputs), labels)
                loss.backward()
                optimizer.step()
                epoch_losses.append(loss.item())

            avg_train_loss = sum(epoch_losses) / len(epoch_losses) if epoch_losses else 0.0
            train_loss_list.append(avg_train_loss)

            model.eval()
            val_correct, val_total = 0, 0
            with torch.no_grad():
                for inputs, labels in self.val_loader:
                    inputs, labels = inputs.to(self.device), labels.to(self.device)
                    outputs = model(inputs)
                    _, predicted = torch.max(outputs, 1)
                    val_total += labels.size(0)
                    val_correct += (predicted == labels).sum().item()
            val_accuracy = val_correct / val_total if val_total > 0 else 0
            val_accuracy_list.append(val_accuracy)

            print(f"Train Loss = {avg_train_loss:.4f}, Validation Accuracy = {val_accuracy:.4f}")

            if val_accuracy > best_val_accuracy:
                best_val_accuracy = val_accuracy
                no_improvement = 0
            else:
                no_improvement += 1

            if no_improvement >= patience:
                print("Early stopping triggered.")
                break

        model.eval()
        test_correct, test_total = 0, 0
        with torch.no_grad():
            for inputs, labels in self.test_loader:
                inputs, labels = inputs.to(self.device), labels.to(self.device)
                outputs = model(inputs)
                _, predicted = torch.max(outputs, 1)
                test_total += labels.size(0)
                test_correct += (predicted == labels).sum().item()
        test_accuracy = test_correct / test_total if test_total > 0 else 0
        print(f"Final Test Accuracy = {test_accuracy:.4f}")

        return {
            'train_losses': train_loss_list,
            'val_accuracies': val_accuracy_list,
            'test_accuracy': test_accuracy
        }


    def calculate_fitness(self, model):
        try:
            train_info = self.train_model(model)

            last_train_loss = train_info['train_losses'][-1] if train_info['train_losses'] else float('inf')
            last_val_accuracy = train_info['val_accuracies'][-1] if train_info['val_accuracies'] else 0.0
            test_accuracy = train_info['test_accuracy']

            fitness = (last_val_accuracy + test_accuracy) / 2.0

            print(f"Last epoch metrics - Train Loss: {last_train_loss:.4f}, Val Accuracy: {last_val_accuracy:.4f}, Test Accuracy: {test_accuracy:.4f}")
            print(f"Calculated Fitness = {fitness:.4f}")

            return fitness
        except RuntimeError as e:
            print(f"RuntimeError encountered during fitness evaluation: {e}")
            return 0.0


    def save_population_metrics_history(self, population_metrics_history):
        history_dir = "result/history"
        os.makedirs(history_dir, exist_ok=True)

        file_path = os.path.join(history_dir, "population_metrics_history.json")

        with open(file_path, 'w') as f:
            json.dump(population_metrics_history, f, indent=4)

        print(f"Population metrics history saved to: {file_path}")


    def run(self):
        if not (self.train_loader and self.val_loader and self.test_loader and self.criterion):
            raise ValueError("Data loaders and criterion must be provided in the constructor.")

        best_model_dir = "result/best_model"
        os.makedirs(best_model_dir, exist_ok=True)

        self.initialize_population(num_cnn_layers_range=(1, 4), num_fcn_layers_range=(1, 4))
        population_metrics_history = []
        previous_best_child = None
        previous_best_fitness = 0.0
        generation_without_change = 0
        best_child = None

        for generation in range(self.generations):
            print(f"\nGeneration {generation + 1}/{self.generations}")
            generation_metrics = [] 
            fitness_scores = []

            for individual in self.population:
                try:
                    train_info = self.train_model(individual)
                    last_val_accuracy = train_info['val_accuracies'][-1] if train_info['val_accuracies'] else 0.0
                    test_accuracy = train_info['test_accuracy']
                    fitness = (last_val_accuracy + test_accuracy) / 2.0
                except RuntimeError as e:
                    print(f"RuntimeError encountered: {e}")
                    train_info = {'train_losses': [], 'val_accuracies': [], 'test_accuracy': 0.0}
                    fitness = 0.0

                generation_metrics.append({
                    'train_losses': train_info['train_losses'],
                    'val_accuracies': train_info['val_accuracies'],
                    'test_accuracy': train_info['test_accuracy'],
                    'fitness': fitness
                })
                fitness_scores.append(fitness)

            population_metrics_history.append(generation_metrics)

            sorted_indices = sorted(range(len(fitness_scores)), key=lambda k: fitness_scores[k], reverse=True)
            best_index = sorted_indices[0]
            best_child = self.population[best_index]
            best_fitness = fitness_scores[best_index]
            print(f"Best fitness in generation {generation + 1}: {best_fitness:.4f}")

            filename = os.path.join(best_model_dir, f"generation_{generation + 1:02d}.pth")
            torch.save(best_child.state_dict(), filename)
            print(f"Saved best model for generation {generation + 1} as {filename}")

            num_selected = self.population_size // 2
            selected_individuals = [self.population[i] for i in sorted_indices[:num_selected]]

            new_population = selected_individuals.copy()
            while len(new_population) < self.population_size:
                parent1, parent2 = random.sample(selected_individuals, 2)
                child = self.crossover(parent1, parent2)
                child = self.mutate(child)
                new_population.append(child)
            self.population = new_population

            if previous_best_child is not None:
                improvement = best_fitness - previous_best_fitness
                if improvement < 0.005 * previous_best_fitness:
                    generation_without_change += 1
                else:
                    generation_without_change = 0

            if best_fitness >= 0.9999:
                print("Achieved nearly perfect fitness. Early stopping (potential overfitting).")
                torch.save(best_child.state_dict(), os.path.join(best_model_dir, f"generation_{generation + 1:02d}_overfitting.pth"))
                if previous_best_child:
                    torch.save(previous_best_child.state_dict(), os.path.join(best_model_dir, f"generation_{generation + 1:02d}_overfitting_prev.pth"))
                break

            if generation_without_change >= 10:
                print("No significant improvement in recent generations. Early stopping.")
                torch.save(best_child.state_dict(), os.path.join(best_model_dir, f"generation_{generation + 1:02d}_no_improvement.pth"))
                break

            previous_best_child = best_child
            previous_best_fitness = best_fitness

        else:
            print("Finished all generations.")
            torch.save(best_child.state_dict(), os.path.join(best_model_dir, f"generation_{self.generations:02d}_final.pth"))

        self.save_population_metrics_history(population_metrics_history)

        return population_metrics_history

In [None]:
import random
import numpy as np
import torch
import torch.nn as nn
import os

random.seed(42)
np.random.seed(42)
torch.manual_seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(42)

data_dir = "dataset/AfterPreprocess"
batch_size = 32
validation_split = 0.2

loader_instance = Loader(data_dir=data_dir, batch_size=batch_size, validation_split=validation_split)
train_loader, val_loader, test_loader = loader_instance.get_dataloader()

criterion = nn.NLLLoss()

ga_optimizer = GAOptimizer(
    train_loader=train_loader,
    val_loader=val_loader,
    test_loader=test_loader,
    criterion=criterion
)

population_metrics_history = ga_optimizer.run()

print("\nPopulation Metrics History:")
for gen_index, gen_metrics in enumerate(population_metrics_history, start=1):
    print(f"\nGeneration {gen_index}:")
    for ind_index, metric in enumerate(gen_metrics, start=1):
        print(f"  Individual {ind_index}:")
        print(f"    Fitness: {metric['fitness']:.4f}")
        print(f"    Final Test Accuracy: {metric['test_accuracy']:.4f}")
        print(f"    Training Losses per Epoch: {metric['train_losses']}")
        print(f"    Validation Accuracies per Epoch: {metric['val_accuracies']}")
