<h1> This part of code corresponds tho the research methods and the results of my thesis </h1>

<h1 style="color:orange"> The first step is to import all libraries</h1>


In [None]:
import torch
from torch import nn
import os
import numpy as np
import wandb
import platform
import sys
from tqdm import tqdm
import random
from datasets import load_dataset
from transformers import Trainer, TrainingArguments
from functools import partial
from transformers import PreTrainedModel
from torchvision import transforms
from torch.utils.data import Dataset
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
from transformers import EarlyStoppingCallback
from transformers import PretrainedConfig
from collections import Counter



<h2 style="color:orange"> Next we introduce a set of global configs which we'll be used through the search </h2>


In [None]:
image_size = 224 # since we want to compare to pretrained models like ViT
batch_size = 128 # our GPU can handle it
num_classes = 3 # since we are doing the 3-class classification on pneumonia dataset
epochs = 5 # since 5 epochs of pretraining per model is enough
channels = 3 # since we are using RGB images
seed = 1234 # for reproducibility
random_generator = np.random.default_rng(seed)
folder_for_best_architectures = './best_architectures/'
dataset_path = "pawlo2013/chest_xray" # this is the dataset we are using it contains the exact images as the Kermany et al. dataset.
vector_of_choices = [2, 5, 2, 3, 4, 2] # this can be done a bit more elegantly, basically this encodes all of the possible architectural choices for the model
use_wandb = True # since we want to log the results
population_size = 5 # number of models in the population can be higher but for the sake of GPU time we are keeping it low.

# getting the device
has_gpu = torch.cuda.is_available()
has_mps = getattr(torch, 'has_mps', False)
device = "mps" if getattr(torch, 'has_mps', False) \
    else "cuda" if torch.cuda.is_available() else "cpu"
device = "mps" if getattr(
    torch, 'has_mps', False) else "cuda" if torch.cuda.is_available() else "cpu"
print(f"Python Platform: {platform.platform()}")
print(f"PyTorch Version: {torch.__version__}")
print(f"Python {sys.version}")


print("GPU is", "available" if has_gpu else "NOT AVAILABLE")
print("MPS (Apple Metal) is", "AVAILABLE" if has_mps else "NOT AVAILABLE")




# function to calculate class weights as mention in the paper
def calculate_class_weights(dataset_path, device):
    # Load the dataset
    dataset = load_dataset(dataset_path)
    
    # Assuming the label column is named 'label'
    train_labels = dataset['train']['label']
    
    # Count the number of samples in each class
    label_counts = Counter(train_labels)
    
    # Get total number of samples
    total_samples = len(train_labels)
    
    # Calculate class weights: inverse of class frequency
    class_weights = {label: total_samples / count for label, count in label_counts.items()}
    
    # Convert class weights to tensor and move to the specified device
    class_weights_tensor = torch.tensor(
        [class_weights[label] for label in range(len(class_weights))],
        dtype=torch.float
    ).to(device)
    
    return class_weights_tensor


# get the class weights that will be used in the loss function
class_weights = calculate_class_weights(dataset_path, device)


# here we are just loading the dataset, to get the class names. 
dataset = load_dataset(dataset_path)
train_dataset = dataset['train']
class_names = train_dataset.features['label'].names
print(class_names)


  has_mps = getattr(torch, 'has_mps', False)
  device = "mps" if getattr(torch, 'has_mps', False) \
  device = "mps" if getattr(


Python Platform: macOS-14.4.1-arm64-arm-64bit
PyTorch Version: 2.3.0
Python 3.10.9 | packaged by conda-forge | (main, Feb  2 2023, 20:26:08) [Clang 14.0.6 ]
GPU is NOT AVAILABLE
MPS (Apple Metal) is AVAILABLE
tensor([2.0147, 4.1631, 3.7958], device='mps:0')
['Bacterial', 'Normal', 'Viral']


<h1 style="color:orange"> Reusable code, some helper functions</h1>


In [None]:
# Actually the CustomDataset is not really needed it can be dene by using a collate function in the Trainer class.

class CustomDataset(Dataset):
    def __init__(self, dataset, device, transform=None, shuffle=False):
        self.dataset = dataset
        self.transform = transform
        self.device = device
        
        if shuffle:
            self.dataset = self.dataset.shuffle(seed)
    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        item = self.dataset[idx]
        image = item["image"]
        label = item["label"]
        if self.transform:
            image = self.transform(image, self.data_augmentation)
        return {"input_ids": image, "labels": label}
    

        
    
# the initial transforms that will be used for the images as mentioned in the paper scales to [-1,1]
initial_transforms = transforms.Compose(
                [
                    transforms.Resize((image_size, image_size)),
                    transforms.ToTensor(),
                    transforms.Lambda(lambda t: (t * 2) - 1),
                ]
            )


# callback function that gets called on every step for on the fly data normalization
def perform_transforms(image):
    image = initial_transforms(image.convert("RGB"))
    return image


# funtion for initing the population of the genetic algorithm
def init_population(population_size, vector_of_choices):

    population = []

    for _ in range(population_size):
        individual = []
        for length in vector_of_choices:

            value = random.randint(0, length - 1)
            individual.append(value)

        population.append(individual)  

    return np.asarray(population)          


# the decode function mentioned in the paper, again this can be done more elegantly. 
def decode_vector(x):
     return [
            x[0] % 2,
            x[1] % 4,
            x[2] % 2,
            x[3] % 3,
            x[4] % 4,
            x[5] % 2,
   
     ]

# an important function that is used to remove duplicates from the population
def remove_duplicates_order_matters(lst):
    seen = set()
    result = []

    for sublist in lst:
        sublist_tuple = tuple(sublist)
        if sublist_tuple not in seen:
            result.append(sublist)
            seen.add(sublist_tuple)

    return  np.asarray(result)
# helper function to check if a list is in a nested list
def is_list_not_in_nested_list(list1, nested_list):
    # Convert the nested list elements to tuples for comparison
    nested_list_tuples = [tuple(lst) for lst in nested_list]
    list1_tuple = tuple(list1)

    return list1_tuple not in nested_list_tuples

# metrics that will be used for the evaluation of the model important to get the per class accuracies
def compute_metrics(p):
    preds = np.argmax(p.predictions, axis=1)
    labels = p.label_ids
    # Overall accuracy
    acc = accuracy_score(labels, preds)
    # Precision, recall, and F1 score
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average="weighted", zero_division=0)     
    # Confusion matrix to calculate per-class accuracy
    cm = confusion_matrix(labels, preds)
    per_class_acc = cm.diagonal() / cm.sum(axis=1)
    # Prepare the per-class accuracy in a dictionary
    per_class_accuracy = {f"accuracy_class_{class_names[i]}": acc for i, acc in enumerate(per_class_acc)}
    # Combine all metrics into one dictionary
    metrics = {
        "accuracy": acc,
        "precision": precision,
        "recall": recall,
        "f1": f1,
    }
    metrics.update(per_class_accuracy)
    # log the confusion matrix to W&B
    wandb.log({"confusion_matrix": wandb.plot.confusion_matrix(probs=None,
                                                                y_true=labels,
                                                                preds=preds,
                                                                class_names=class_names)})   
    return metrics
   





# Custom config class for the ConvNext model, can be useful when exporting the mdoel to the Hugging Face model hub.
class CustomConvNextConfig(PretrainedConfig):
    model_type = "custom_convnext"
    
    def __init__(self,
                 input_channels: int = 3,
                 num_classes: int = 3,
                 device: str = 'cpu',
                 convnext_mult: int = 2,
                 layer_norm: bool = False,  # Fixed Bool to bool
                 dropout: float = 0.0,
                 pooling_choice: str = "MaxPool2d",
                 activation_choice: str = "ReLU",
                 use_residual: bool = False,
                 **kwargs  # Add a comma before **kwargs
                 ):

        # Initialize class attributes
        self.input_channels = input_channels
        self.num_classes = num_classes
        self.device = device
        self.convnext_mult = convnext_mult
        self.layer_norm = layer_norm
        self.dropout = dropout
        self.pooling_choice = pooling_choice
        self.activation_choice = activation_choice
        self.use_residual = use_residual

        # Call the super class's constructor
        super().__init__(**kwargs)



<h1 style="color:orange"> Basic CNN Model</h1>

In [None]:

# this function is used to calculate the number of halving steps that will be used in the model
# since the last we want the last feature map to be Cx1x1 where C is the number of resulting channels
# using something like a global average pooling layer results in too many parameters
def calculate_halving_steps(input_size):
    # Ensure the input size is greater than 0
    if input_size <= 0:
        raise ValueError("The input size must be a positive integer")
    # Initialize the halving step counter
    halving_steps = 0
    # Keep halving the size until it reaches 1
    while input_size > 1:
        input_size = input_size // 2
        halving_steps += 1
    return halving_steps




# the main function that will be used to train the model
class PretrainedWraper(PreTrainedModel):

    config_class = CustomConvNextConfig


    def __init__(self, config, class_weights: torch.Tensor = torch.tensor([1.0,1.0,1.0])):

        super().__init__(config)

        self.class_weights = class_weights.to(config.device)

        self.model = CNN(
            input_size=config.image_size,
            device=config.device,
            num_classes=config.num_classes,
            pooling_choice=config.pooling_choice,
            activation_choice=config.activation_choice,
            convnext_mult=config.convnext_mult,
            layer_norm=config.layer_norm,
            dropout=config.dropout,
            use_residual=config.use_residual,
        ).to(config.device)

    
    def forward(self, input_ids, labels=None):
        logits = self.model(input_ids)
        if labels is not None:
            loss = torch.nn.NLLLoss(weight=self.class_weights)(logits, labels)
            return {"loss": loss, "logits": logits}
        return {"logits": logits}


# the main 'dynamic' CNN model that will be used in the genetic algorithm
class CNN(nn.Module):
    def __init__(self, input_size, num_classes, channels=3, pooling_choice=None, activation_choice=None, convnext_mult=None, layer_norm = None, dropout=None, use_residual=None ):
        super(CNN, self).__init__()
        # check how many halving steps we can do
        self.halving_steps = calculate_halving_steps(input_size)
        # set the number of classes
        self.num_classes = num_classes


      


        self.conv_layers = nn.ModuleList([])
        self.input_channels = [channels * (2**i) for i in range(self.halving_steps)]
        self.output_channels = [
            channels * (2 ** (i + 1)) for i in range(self.halving_steps)
        ]
        self.combinations = list(zip(self.input_channels, self.output_channels))
        self.pooling_choice = pooling_choice
       
        block_klass = partial(ConvNextBlock, mult=convnext_mult, activation_choice=activation_choice, norm=layer_norm, use_residual=use_residual)

        for i, (input_channels, output_channels) in enumerate(self.combinations):
            is_last = i == self.halving_steps - 1
            self.conv_layers.append(
                nn.Sequential(
                    block_klass(input_channels, output_channels),
                    self.get_pooling_layer()(kernel_size=2, stride=2),
                )
            )
            if is_last:
                self.conv_layers.append(
                    nn.Sequential(
                        nn.Flatten(),
                        nn.Linear(output_channels, output_channels // 2),
                        nn.Dropout(dropout),
                        nn.Linear(output_channels // 2, num_classes),
                        nn.LogSoftmax(dim=1),
                    )
                )    
    def get_pooling_layer(self):
        return getattr(nn, self.pooling_choice)
    def forward(self, input_ids):
        x = input_ids
        for layer in self.conv_layers:
            x = layer(x)
        return x 


class ConvNextBlock(nn.Module):
    """https://arxiv.org/abs/2201.03545"""

    def __init__(self, dim, dim_out, *, mult=2, norm=True, activation_choice, use_residual=False):
        super().__init__()
        self.use_residual = use_residual
        self.ds_conv = nn.Conv2d(dim, dim, 7, padding=3, groups=dim)
        self.activation_choice = activation_choice

        self.net = nn.Sequential(
            # equivalent to LayerNorm 
            nn.GroupNorm(1, dim) if norm else nn.Identity(),
            nn.Conv2d(dim, dim_out * mult, 3, padding=1),
            self.get_activation_layer(),
            # equivalent to LayerNorm 
            nn.GroupNorm(1, dim_out * mult) if norm else nn.Identity(),
            nn.Conv2d(dim_out * mult, dim_out, 3, padding=1),
        )
        if use_residual:
            self.res_conv = nn.Conv2d(dim, dim_out, 1) if dim != dim_out else nn.Identity()

    def get_activation_layer(self):
        return getattr(nn, self.activation_choice)()


    def forward(self, x):
        h = self.ds_conv(x)
        h = self.net(h)
        if self.use_residual:
            return h + self.res_conv(x)
        return h




In [None]:
# the code for the architecture that faciliteates teh genetic algorithm, sort of a wrapper around the model.

class MetaNeuralFramework():
    def __init__(self, image_size, num_classes, epochs, batch_size, population, class_weigths, device , use_wandb, dataset_path, f1 = False ):



        self.epochs = epochs
        self.batch_size = batch_size
        self.population = population
        self.class_weigths = class_weigths
        self.device = device
        self.use_wandb = use_wandb
        self.dataset_path = dataset_path


        # IMPORTANT!!
        # This encodes possible choices for the model architecture
        # This should correspond to the choice vector that is globally defined
        # Again extremely hacky, but it works for now, can be done more elegantly.
        self.layer_norm = [True,False]
        self.activations = ["ReLU", "GELU", "Tanh", "Softplus", "Sigmoid"]
        self.pooling_layer_choices = ["MaxPool2d", "AvgPool2d"]
        self.dropout = [0, 0.33, 0.66]
        self.learning_rate = [0.00001, 0.0001, 0.001, 0.01]
        self.use_residual = [True, False]


        self.image_size = image_size
        self.num_classes = num_classes
        self.f1 = f1



        self.dataset = load_dataset(self.dataset_path)
        self.class_names =  self.dataset["train"].features["label"].names

        self.train_loader = CustomDataset( self.dataset["train"], transform=perform_transforms, shuffle=True, device=device)
        self.val_loader = CustomDataset( self.dataset["validation"], transform=perform_transforms, shuffle=False, device=device)
        self.test_loader = CustomDataset( self.dataset["test"], transform=perform_transforms, shuffle=False, device=device)

  
       


    def decode(self):

        self.decoded_vector = decode_vector(self.vector)

        return self.decoded_vector

    # Function for preparing the run
    # It gets as input a vector that encodes the model and based on this vector it prepares the model for training.
    def prepare_run(self, vector):

        decoded_vector = decode_vector(vector)

        layer_norm = self.layer_norm [decoded_vector[0]]
        activation_choice = self.activations[decoded_vector[1]]
        pooling_choice = self.pooling_layer_choices[decoded_vector[2]]
        convnext_mult = 1
        dropout = self.dropout[decoded_vector[3]]
        learning_rate = self.learning_rate[decoded_vector[4]]
        use_residual = self.use_residual[decoded_vector[5]]

        data_augmentation = self.data_augmentation




        print(f"Layer Norm: {layer_norm}")
        print(f"Activation: {activation_choice}")
        print(f"ConvNext Multiplier: {convnext_mult}")
        print(f"Pooling Layer: {pooling_choice}")
        print(f"Dropout: {dropout}")
        print("epochs", self.epochs)
        print("batch_size", self.batch_size)
        print("class_weigths", self.class_weigths)
        print("device", self.device)
        print('data_augmentation', data_augmentation)
        print("use_wandb", self.use_wandb)
        print("learning_rate", learning_rate)
        print("use_residual", use_residual)


        config = CustomConvNextConfig(
            input_channels=channels,
            num_classes=num_classes,
            device=self.device,
            convnext_mult=convnext_mult,
            layer_norm=layer_norm,
            dropout=dropout,
            pooling_choice=pooling_choice,
            activation_choice=activation_choice,
            image_size=image_size,
            use_residual=use_residual,
        )

        model = PretrainedWraper(config, class_weights=self.class_weigths)


        self.num_of_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

        wandb.init(project="neural_evolution_variable_learning_variable_residual_is_long", 
    
                    name=str(decoded_vector), config=
                   
                   {
                       
                          "layer_norm": layer_norm, 
                            "activation_choice": activation_choice,
                            "convnext_mult": convnext_mult,
                            "pooling_choice": pooling_choice,
                            "dropout": dropout,
                            "epochs": self.epochs,
                            "batch_size": self.batch_size,
                            "class_weigths": self.class_weigths,
                            "device": self.device,
                            "data_augmentation": data_augmentation,
                            "num_of_params": self.num_of_params,
                            "learning_rate": learning_rate,
                            "use_residual": use_residual, 


                            

                   }
                   
                     )
        

        # Training arguments
        training_args = TrainingArguments(
            output_dir="./results",
            num_train_epochs=self.epochs,
            per_device_train_batch_size=self.batch_size,
            per_device_eval_batch_size=self.batch_size,
            evaluation_strategy="epoch",
            logging_dir="./logs",
            logging_steps=10,
            weight_decay=0.01,
            save_strategy="epoch",
            report_to="wandb" if self.use_wandb else None,
            lr_scheduler_type="cosine",
            learning_rate=learning_rate,
            


        )


        
        # Trainer
        self.trainer = Trainer(
            model=model,
            args=training_args,
            train_dataset=self.train_loader,
            eval_dataset=self.val_loader,
            compute_metrics=compute_metrics,
        )


    def evaluate(self, vector, generation):
        
        self.vector = vector
        self.prepare_run(self.vector)
        self.trainer.train()
        self.results = self.trainer.evaluate(self.test_loader)

        if self.f1:

            fitness = self.results['eval_f1'] 
            wandb.log({"fitness":fitness, "generation": generation})
            wandb.finish()
            return fitness
        
        else:

            fitness = self.results['eval_accuracy_class_Viral'] + self.results['eval_accuracy_class_Bacterial'] 
            wandb.log({"fitness": fitness,  "generation": generation})
            wandb.finish()
            return self.results['eval_accuracy_class_Viral'] + self.results['eval_accuracy_class_Bacterial'] 

        



In [None]:

# The code for the evolutionary algorithm, majority of it was taken from the paper https://arxiv.org/abs/2002.02869
class MetaEvolutionaryAlgoritm():
    def __init__(self, scaling_factor = 2, cross_over_probability=0.9, framework = None, number_of_survivors = 5):
        self.number_of_survivors = number_of_survivors
        self.scaling_factor = scaling_factor
        self.cross_over_probability = cross_over_probability
        self.framework = framework
        # helper functions
        self.remove_duplicates = remove_duplicates_order_matters
        self.decode = decode_vector
        self.check = is_list_not_in_nested_list

    def differential_mutation(self):
        # implemented after the paper https://arxiv.org/abs/2002.02869
        self.mutants = np.ones((self.x.shape[0] * 3, self.x.shape[1]), dtype=int)
        for i in range(0,self.mutants.shape[0],3):
            # sample 3 random indexes from the population
            random_index = np.random.choice(self.x.shape[0], 3, replace=False)
            # sample 3 random vectors from the population
            random_vectors = self.x[random_index]
            # perform the mutation on the 3 random vectors from the population
            self.mutants [i] = random_vectors[0] + self.scaling_factor * (random_vectors[1] + random_vectors[2])
            self.mutants [i+1] = random_vectors[1] + self.scaling_factor * (random_vectors[2] + self.mutants[i])
            self.mutants [i+2] = random_vectors[2] + self.scaling_factor * (self.mutants [i] + self.mutants [i+1])

    def uniform_cross_over(self):
        # the population after the crossover is initialized with zeros
        self.crossover_population = np.zeros((self.mutants.shape[0], self.mutants.shape[1]), dtype=int)
        # the mask is a vector of 0s and 1s with the same size as the number of genes and the probability 
        # of 0s is the cross over probability being 0.9 after the paper
        mask = np.random.binomial(n=1, p=self.cross_over_probability, size=(self.x.shape[1])) 
        # similar to the mutation we are performing the crossover on 3 vectors at a time
        for i in range(0, self.mutants.shape[0], 3):
            self.crossover_population[i] = mask * self.mutants[i] + (1 - mask) * self.x[i//3]
            self.crossover_population[i +1 ] = mask * self.mutants[i + 1] + self.x[i//3] * (1 - mask)
            self.crossover_population[i + 2] = mask * self.mutants[i + 2] + self.x[i//3] * (1 - mask)

    def remove_duplicates_order_matters_and_decode(self):
        # remove duplicates and decode the vectors this is a very important step 
        # it saves a lot of computation 
        # and it keeps a uniformity of the population
        self.crossover_population = np.asarray([self.decode(member) for member in self.crossover_population])
        self.crossover_population = self.remove_duplicates(self.crossover_population)           

    def evaluate(self):
        f_evaluated = []
        for x in (self.crossover_population):
            # check if the vector has been already evaluate        
            # saves on computation
            if (self.check(x, self.x) and self.check(x, self.visited)):
                f_evaluated.append(self.framework.evaluate(x,self.current_generation))
                self.visited.append(x)
            else:
                # We add -1 since the recall can never be negative
                # If already evaluated we add -1 to the list
                f_evaluated.append(-1)
        return f_evaluated
    
    def evolution(self, x, f, visited, current_generation):
        self.x = x
        self.f = f
        self.visited = visited
        self.current_generation = current_generation
        # print the best member of the population
        print('Best member of the population: ', self.x[0])
        self.differential_mutation()
        self.uniform_cross_over()
        self.remove_duplicates_order_matters_and_decode()
        f_evaluated = self.evaluate()
        # select 5 best member of both the crossover population and the original population
        self.x = np.concatenate((self.x, self.crossover_population))
        self.f = np.concatenate((self.f, f_evaluated))
        # sort the population based on the fitness value from highest to lowest
        sorted_index = np.argsort(self.f)[::-1]
        self.x = self.x[sorted_index]
        self.f = self.f[sorted_index]
        # select the best 5 members of the population
        self.x = self.x[:self.number_of_survivors]
        self.f = self.f[:self.number_of_survivors]
        # decode
        self.x = np.asarray([self.decode(member) for member in self.x])
        print('Current population: ', self.x)
        print('Current fitness: ', self.f)
        if self.f[0] < self.f[-1]:

            raise ValueError('The algorithm did not converge')

        return self.x, self.f, self.visited






In [None]:
print("Initializing the Nural Evolution Framework")
print("=====================================================================================================")


# if the folder does not exist create it
if not os.path.exists(folder_for_best_architectures):
    os.makedirs(folder_for_best_architectures)


x = init_population(population_size, vector_of_choices)

framework = MetaNeuralFramework(image_size, num_classes, epochs, batch_size, x, class_weights, device , use_wandb, dataset_path)
meta = MetaEvolutionaryAlgoritm(framework=framework, number_of_survivors=population_size)

# initialize the population with random numbers

# initialize the fitness with minus values 
f =  np.ones(population_size) * -1
# number of generations
generations = 10
# keep track of the visited vectors
# saves on computation! we dont have to evaluate the same vector twice
visited = []



for i in tqdm(range(generations)):
    tqdm.write("################Generation################: " + str(i))
    print("=====================================================================================================")
    print("Current Population: ", x)
    print("Current Fitness: ", f)

    x,f,visited  = meta.evolution(x=x, f=f, visited=visited, current_generation=i)
   




    


<h2 style='color:teal'> After the training get the best solutions to a txt file</h2>

In [None]:

print("=====================================================================================================")
print("Best solutions found are:")
print(x)
print("With fitness values:")
print(f)
print(len(visited))

# save the best solutions into a txt file

with open(folder_for_best_architectures  + 'best_solutions.txt', 'w') as filehandle:

    filehandle.write('Best solutions found are:\n')
    for listitem in x:
        filehandle.write('%s\n' % listitem)
    filehandle.write('With fitness values:\n')
    for listitem in f:
        filehandle.write('%s\n' % listitem)



In [None]:
def parse_file(filename):
    solutions = []
    fitnesses = []

    with open(filename, 'r') as file:
        lines = file.readlines()

        # Find the start of the solutions
        sol_start_index = lines.index("Best solutions found are:\n") + 1
        fit_start_index = lines.index("With fitness values:\n") + 1

        # Read solutions
        for i in range(sol_start_index, fit_start_index - 1):
            line = lines[i].strip()
            if line:
                solution = [int(x) for x in line.strip('[]').split(',')]
                solutions.append(solution)

        # Read fitness values
        for i in range(fit_start_index, len(lines)):
            line = lines[i].strip()
            if line:
                fitness = float(line)
                fitnesses.append(fitness)

    return solutions, fitnesses





x, f = parse_file(folder_for_best_architectures + 'best_solutions.txt')

print("Solutions:", x)
print("Fitnesses:", f)


<h1 style='color:orange'> Full (grid search) training of the final (best) population
 </h1>

In [None]:
api = wandb.Api()

final_epochs = 50
early_stopping = 5
data_augmentation = False
convnext_multiplier_choices = [1, 2, 3]
number_of_repeats = 10
test_accuracies_array = np.ones((len(x), len(convnext_multiplier_choices))) 
final_population = x





framework = MetaNeuralFramework(image_size, num_classes, final_epochs, batch_size, final_population, class_weights, device , use_wandb, dataset_path, data_augmentation=data_augmentation )
for i, individual in enumerate(final_population):
    for j, convnext_multiplier in enumerate(convnext_multiplier_choices):
        avg = 0
        for k in range(number_of_repeats):

            decoded_vector = decode_vector(individual)
            name = str(decoded_vector) + "_mult_" + str(convnext_multiplier) + "_run_nr_" + str(k)



            layer_norm = framework.layer_norm[decoded_vector[0]]
            activation_choice = framework.activations[decoded_vector[1]]
            convnext_mult = convnext_multiplier
            pooling_choice = framework.pooling_layer_choices[decoded_vector[2]]
            dropout = framework.dropout[decoded_vector[3]]
            learning_rate = framework.learning_rate[decoded_vector[4]]
            use_residual = framework.use_residual[decoded_vector[5]]
            config = CustomConvNextConfig(
                input_channels=channels,
                num_classes=num_classes,
                device=device,
                convnext_mult=convnext_mult,
                layer_norm=layer_norm,
                dropout=dropout,
                pooling_choice=pooling_choice,
                activation_choice=activation_choice,
                image_size=image_size,
                use_residual=use_residual
            )
            model_to_evaluate = PretrainedWraper(config, class_weights=class_weights)
            num_of_params = sum(p.numel() for p in model_to_evaluate.parameters() if p.requires_grad)


 


            print(f"Layer Norm: {layer_norm}")
            print(f"Activation: {activation_choice}")
            print(f"ConvNext Multiplier: {convnext_mult}")
            print(f"Pooling Layer: {pooling_choice}")
            print(f"Dropout: {dropout}")
            print("epochs", final_epochs)
            print("batch_size", batch_size)
            print("class_weigths", class_weights)
            print("device", device)
            print('data_augmentation', data_augmentation)
            print('num_of_params', num_of_params)
            print("learning_rate", learning_rate)
            print("use_residual", use_residual)


            wandb.init(project="neural_evolution_final_best_evaluation_stopping_on_val_loss_10_runs_each_no_data_augmentation",
                        name=str(decoded_vector) +"_mult_"+ str(convnext_mult) + "_run_nr_" + str(k), config=
                        {
                            "layer_norm": layer_norm,
                            "activation_choice": activation_choice,
                            "convnext_mult": convnext_mult,
                            "pooling_choice": pooling_choice,
                            "dropout": dropout,
                            "num_of_params": num_of_params,
                            "epochs": final_epochs,
                            "batch_size": batch_size,
                            "class_weigths": class_weights,
                            "device": device,
                            "data_augmentation": data_augmentation, 
                            "learning_rate": learning_rate,
                            "use_residual": use_residual,
            
                        }
                        )
    
            training_args = TrainingArguments(

                output_dir="./results",
                num_train_epochs=final_epochs,
                per_device_train_batch_size=batch_size,
                per_device_eval_batch_size=batch_size,
                evaluation_strategy="epoch",
                logging_dir="./logs",
                logging_steps=10,
                weight_decay=0.01,
                save_strategy="epoch",
                load_best_model_at_end=True,
                metric_for_best_model="eval_loss",
                report_to="wandb" if use_wandb else None,
                overwrite_output_dir=True,
                lr_scheduler_type="cosine",
                learning_rate=learning_rate,
                use_mps_device=True if device == "mps" else False,
            )

            trainer = Trainer(
                model=model_to_evaluate,
                args=training_args,
                train_dataset=framework.train_loader,
                eval_dataset=framework.val_loader,
                compute_metrics=compute_metrics,
                callbacks=[

        EarlyStoppingCallback(early_stopping_patience=early_stopping)

                ],
            )   

            print(trainer.model.device)



            trainer.train()

            results = trainer.evaluate(framework.test_loader, metric_key_prefix='test')

            avg += results['test_accuracy']
        
            wandb.finish()
        test_accuracies_array[i][j] = avg / number_of_repeats



    









In [None]:
import wandb
from collections import defaultdict
import pandas as pd
import numpy as np
import ast

# Initialize the API
api = wandb.Api()
final_epochs = 50
early_stopping = 5
data_augmentation = False
convnext_multiplier_choices = [1, 2, 3]
number_of_repeats = 10
test_accuracies_array = np.ones((len(x), len(convnext_multiplier_choices))) 
final_population = x

# Define the project path
project_path = "neural_evolution_final_best_evaluation_stopping_on_val_loss_10_runs_each_no_data_augmentation"

# Get all runs from the project
project_runs = api.runs(project_path)

# Dictionary to store test accuracies for each unique architecture-multiplier combination
accuracy_dict = defaultdict(list)

framework = MetaNeuralFramework(image_size, num_classes, final_epochs, batch_size, final_population, class_weights, device , use_wandb, dataset_path, data_augmentation=data_augmentation )

# Iterate over the runs
for run in project_runs:
    # Extract the name of the run
    run_name = run.name
    
    # Split the run name to extract the architecture and multiplier
    try:
        # Assuming the name format is consistent as [architecture]_mult_multiplier_run_nr_number
        parts = run_name.split('_')
        architecture = parts[0]
        multiplier = parts[2]

        architecture_to_list = ast.literal_eval(architecture)

        layer_norm = framework.layer_norm[architecture_to_list[0]]
        activation_choice = framework.activations[architecture_to_list[1]]
        pooling_choice = framework.pooling_layer_choices[architecture_to_list[2]]
        dropout = framework.dropout[architecture_to_list[3]]
        learning_rate = framework.learning_rate[architecture_to_list[4]]
        use_residual = framework.use_residual[architecture_to_list[5]]

        architecture = [layer_norm, activation_choice, pooling_choice, dropout, learning_rate, use_residual]
        
        # Get the test accuracy from the run's summary
        test_accuracy = run.summary.get('test/accuracy')
        nr_of_params = run.config.get('num_of_params')

        if nr_of_params is None:
            raise ValueError("Number of parameters not found in the run's configuration")
        
        if test_accuracy is not None:
            # Store the test accuracy in the dictionary
            key = f"{architecture}_mult_{multiplier}"
            accuracy_dict[key].append((test_accuracy, nr_of_params))
    except (IndexError, AttributeError) as e:
        print(f"Error processing run {run_name}: {e}")

# Function to calculate mean and standard deviation
def calculate_mean_and_std(accuracies):
    mean = np.mean([acc[0] for acc in accuracies])
    std = np.std([acc[0] for acc in accuracies])  # Standard deviation
    return mean, std

# Data for DataFrame
data = []

# Calculate the average test accuracy, number of parameters, and standard deviation for each combination
for key, accuracies in accuracy_dict.items():
    if accuracies:
        mean, std = calculate_mean_and_std(accuracies)
        architecture, multiplier = key.split('_mult_')
        accuracy_with_std = f"{mean:.4f} (+/-{std:.4f})"
        num_params = np.mean([acc[1] for acc in accuracies])
        data.append([architecture, multiplier, accuracy_with_std, num_params])

# Create a DataFrame
df = pd.DataFrame(data, columns=['Architecture', 'Multiplier', 'Accuracy with Std', 'Number of Parameters'])

# Pivot the DataFrame to get the desired matrix format
pivot_df = df.pivot(index='Multiplier', columns='Architecture', values=['Accuracy with Std', 'Number of Parameters'])

# Print the results
print("Average Test Accuracy, Standard Deviation, and Number of Parameters for each Architecture-Multiplier combination:")
#print(pivot_df)

def get_highest_mean_accuracies(accuracy_dict):
    best_run_key = None
    best_mean_accuracy = -np.inf
    best_accuracies = []

    for key, accuracies in accuracy_dict.items():

        accuracies = [acc[0] for acc in accuracies]


        if accuracies:
            mean_accuracy = np.mean(accuracies)
            if mean_accuracy > best_mean_accuracy:
                best_mean_accuracy = mean_accuracy
                best_run_key = key
                best_accuracies = accuracies

    if best_run_key is None:
        raise ValueError("No runs found with valid accuracies")

    return best_run_key, best_mean_accuracy, best_accuracies


# Get the best run key, mean accuracy, and accuracies

best_run_key, best_mean_accuracy, best_accuracies = get_highest_mean_accuracies(accuracy_dict)
print(f"Best Architecture-Multiplier combination: {best_run_key}")
print(f"Mean Test Accuracy: {best_mean_accuracy:.4f}")


# Print the results
print("Best run key:", best_run_key)
print("Highest mean test accuracy:", best_mean_accuracy)
print("Test accuracies that make up the best mean:")
print(best_accuracies)



Average Test Accuracy, Standard Deviation, and Number of Parameters for each Architecture-Multiplier combination:
Best Architecture-Multiplier combination: [True, 'ReLU', 'MaxPool2d', 0.33, 0.0001, True]_mult_2
Mean Test Accuracy: 0.7814
Best run key: [True, 'ReLU', 'MaxPool2d', 0.33, 0.0001, True]_mult_2
Highest mean test accuracy: 0.7813993174061433
Test accuracies that make up the best mean:
[0.7662116040955631, 0.7969283276450512, 0.757679180887372, 0.7986348122866894, 0.7662116040955631, 0.7986348122866894, 0.7662116040955631, 0.7986348122866894, 0.7662116040955631, 0.7986348122866894]


<h2> Gets stats for the best Baseline CNN model </h2>


In [None]:
import wandb
from collections import defaultdict
import pandas as pd
import numpy as np
import ast

# Initialize the API
api = wandb.Api()

def get_stats(wandb_project_name):
    results = defaultdict(list) 
    project_runs = api.runs(wandb_project_name)

    print(f"Number of runs in project {wandb_project_name}: {len(project_runs)}")

    for run in project_runs:
        test_accuracy = run.summary.get("test/accuracy")
        test_precision = run.summary.get("test/precision")
        test_recall = run.summary.get("test/recall")
        test_f1 = run.summary.get("test/f1")
        bacterial_accuracy = run.summary.get("test/accuracy_class_Bacterial")
        viral_accuracy = run.summary.get("test/accuracy_class_Viral")
        normal_accuracy = run.summary.get("test/accuracy_class_Normal")

        results["test_accuracy"].append(test_accuracy)
        results["test_precision"].append(test_precision)
        results["test_recall"].append(test_recall)
        results["test_f1"].append(test_f1)
        results["bacterial_accuracy"].append(bacterial_accuracy)
        results["viral_accuracy"].append(viral_accuracy)
        results["normal_accuracy"].append(normal_accuracy)

    return results

def get_best_architecture_stats(wandb_project_name):
    project_runs = api.runs(wandb_project_name)
    accuracy_dict = defaultdict(list)

    for run in project_runs:
        run_name = run.name
        try:
            parts = run_name.split('_')
            architecture = parts[0]
            multiplier = parts[2]

            architecture_to_list = ast.literal_eval(architecture)

            test_accuracy = run.summary.get("test/accuracy")
            if test_accuracy is not None:
                key = f"{architecture}_mult_{multiplier}"
                accuracy_dict[key].append((test_accuracy, run.summary, run.config.get('num_of_params')))

        except (IndexError, AttributeError) as e:
            print(f"Error processing run {run_name}: {e}")

    def calculate_mean_and_std(accuracies):
        mean = np.mean([acc[0] for acc in accuracies])
        std = np.std([acc[0] for acc in accuracies])
        return mean, std

    best_run_key, best_mean_accuracy, _ = get_highest_mean_accuracies(accuracy_dict)

    best_architecture_stats = defaultdict(list)
    for accuracy, summary, _ in accuracy_dict[best_run_key]:
        best_architecture_stats["test_accuracy"].append(summary.get("test/accuracy"))
        best_architecture_stats["test_precision"].append(summary.get("test/precision"))
        best_architecture_stats["test_recall"].append(summary.get("test/recall"))
        best_architecture_stats["test_f1"].append(summary.get("test/f1"))
        best_architecture_stats["bacterial_accuracy"].append(summary.get("test/accuracy_class_Bacterial"))
        best_architecture_stats["viral_accuracy"].append(summary.get("test/accuracy_class_Viral"))
        best_architecture_stats["normal_accuracy"].append(summary.get("test/accuracy_class_Normal"))

    return best_architecture_stats

def make_table_with_mean_and_sd(results_dict):
    results_df = pd.DataFrame(results_dict)
    
    # Calculate the mean and standard deviation
    mean_values = results_df.mean()
    std_values = results_df.std()
    
    # Combine the results into one DataFrame with the format "mean ± standard deviation"
    formatted_results = mean_values.astype(str) + " ± " + std_values.astype(str)
    formatted_results_df = pd.DataFrame(formatted_results, columns=["mean ± standard_deviation"])
    
    return formatted_results_df

# Get stats for the best architecture
best_architecture_stats = get_best_architecture_stats(project_path)
best_architecture_table = make_table_with_mean_and_sd(best_architecture_stats)

# Print the results
print("Best Architecture Stats:")
print(best_architecture_table)

# get the stats of the best run 

best_result = max(best_architecture_stats["test_accuracy"])

best_result_index = best_architecture_stats["test_accuracy"].index(best_result)
print("Best result: ", best_result)
print("Best result index: ", best_result_index)
print("Best result stats: ", best_architecture_stats["test_accuracy"][best_result_index], best_architecture_stats["test_precision"][best_result_index], best_architecture_stats["test_recall"][best_result_index], best_architecture_stats["test_f1"][best_result_index], best_architecture_stats["bacterial_accuracy"][best_result_index], best_architecture_stats["viral_accuracy"][best_result_index], best_architecture_stats["normal_accuracy"][best_result_index])




Best Architecture Stats:
                                    mean ± standard_deviation
test_accuracy       0.7813993174061433 ± 0.017996020279174324
test_precision      0.8068541158339165 ± 0.002118162060278354
test_recall         0.7813993174061433 ± 0.017996020279174324
test_f1              0.7816511835497144 ± 0.01729369296575994
bacterial_accuracy  0.918348623853211 ± 0.0042153054287737855
viral_accuracy       0.7552238805970151 ± 0.05058994554780559
normal_accuracy      0.6688034188034189 ± 0.07432703902105167
Best result:  0.7986348122866894
Best result index:  3
Best result stats:  0.7986348122866894 0.8061901546653281 0.7986348122866894 0.7982339482035974 0.9174311926605504 0.7089552238805971 0.7393162393162394
