<a href="https://colab.research.google.com/github/smiliyo55/psoCNN_Brain_Tumor/blob/main/psoCNN1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install import_ipynb
!pip install torchsummary
import import_ipynb

In [None]:
import torch
import torchvision
import torchvision.transforms as transforms
from torchsummary import summary

import numpy as np
import json
from copy import deepcopy

import utils1
import particle1
from population1 import Population
from torchvision.datasets import ImageFolder

In [None]:
class psoCNN:
  def __init__(self, dataset, n_iter, pop_size, batch_size, epochs, min_layer, max_layer, \
      conv_prob, pool_prob, fc_prob, max_conv_kernel, max_out_ch, max_fc_neurons, dropout_rate):

    self.pop_size = pop_size
    self.n_iter = n_iter
    self.epochs = epochs

    self.batch_size = batch_size
    self.gBest_acc = np.zeros(n_iter)
    self.gBest_test_acc = np.zeros(n_iter)

    self.ctr = 0
    self.ctr1 = 0

    dataset = "Brain Tumor MRI Dataset"
    input_width = 224
    input_height = 224
    input_channels = 3
    output_dim = 4

    # Define transformations

    train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    # Load MNIST dataset
    trainset = ImageFolder('/content/drive/MyDrive/Colab Notebooks/psoCNN2/Training', transform=train_transform)

    testset = ImageFolder('/content/drive/MyDrive/Colab Notebooks/psoCNN2/Testing', transform=test_transform)

    # Create data loaders
    self.trainloader = torch.utils.data.DataLoader(trainset, batch_size=self.batch_size, shuffle=True)
    self.testloader = torch.utils.data.DataLoader(testset, batch_size=self.batch_size, shuffle=False)

    self.results_path = "/content/drive/MyDrive/Colab Notebooks/psoCNN2/results/" + dataset + "/"

    self.output_str = "Initializing population..."  + "\n"

    print("Initializing population...")
    self.population = Population(pop_size, min_layer, max_layer, input_width, input_height, input_channels,
                                  conv_prob, pool_prob, fc_prob, max_conv_kernel, max_out_ch, max_fc_neurons, output_dim)

    print("Verifying accuracy of the current gBest..."+ "\n")
    self.output_str = self.output_str + "Particle - " + str(1) + " architecture : " + str(self.population.particle[0]) + "\n"
    self.output_str = self.output_str + json.dumps(self.population.particle[0].layers) + "\n"

    #self.population.particle[0].layers = [{'type': 'conv', 'ou_c': 16, 'kernel': 3}, {'type': 'max_pool', 'ou_c': 16, 'kernel': 2}, {'type': 'conv', 'ou_c': 32, 'kernel': 3}, {'type': 'max_pool', 'ou_c': 32, 'kernel': 2}, {'type': 'fc', 'ou_c': 128, 'kernel': -1}, {'type': 'fc', 'ou_c': 4, 'kernel': -1}]
    self.population.particle[0].model_compile(dropout_rate)
    print(self.population.particle[0])
    print(self.population.particle[0].layers)
    print()

    hist = self.population.particle[0].model_fit(self.trainloader, self.testloader, batch_size=batch_size, epochs=epochs)

    self.gBest = deepcopy(self.population.particle[0])
    self.population.particle[0].pBest = deepcopy(self.population.particle[0])
    self.population.particle[0].pBest.model_delete()

    self.gBest_acc[0] = hist['train accuracy'][-1]
    self.gBest_test_acc[0] = hist['val accuracy'][-1]

    self.population.particle[0].acc = hist['train accuracy'][-1]
    self.population.particle[0].pBest.acc = hist['train accuracy'][-1]

    self.population.particle[0].model_delete()

    print("Current gBest acc: " + str(self.gBest_acc[0]) + "\n")
    print("Current gBest test acc: " + str(self.gBest_test_acc[0]) + "\n")

    print("Looking for a new gBest in the population...")
    self.redd_count = 0
    for i in range(1, self.pop_size):
        print('Initialization - Particle: ' + str(i+1))
        self.output_str = self.output_str + "Particle - " + str(i+1) + " architecture : " + str(self.population.particle[i]) + "\n"
        self.output_str = self.output_str + json.dumps(self.population.particle[i].layers) + "\n"
        print(self.population.particle[i])
        print(self.population.particle[i].layers)

        skip_iteration = False
        for j in range(i):
            if self.population.particle[j] == self.population.particle[i] :
                skip_iteration = True
                self.redd_count += 1
                print('Particule ' + str(i+1) + ' redondante')
                self.population.particle[i].acc = self.population.particle[j].acc
                self.population.particle[i].pBest.acc = self.population.particle[i].acc
                break

        if skip_iteration:
            continue

        self.population.particle[i].model_compile(dropout_rate)
        hist = self.population.particle[i].model_fit(self.trainloader, self.testloader, batch_size=batch_size, epochs=epochs)

        self.population.particle[i].acc = hist['train accuracy'][-1]
        self.population.particle[i].pBest.acc = hist['train accuracy'][-1]

        if self.population.particle[i].pBest.acc > self.gBest_acc[0]:
            print("Found a new gBest.")
            self.gBest.model_delete()
            self.gBest = deepcopy(self.population.particle[i])
            self.gBest_acc[0] = self.population.particle[i].pBest.acc
            print("New gBest acc: " + str(self.gBest_acc[0]))
            self.gBest_test_acc[0] = hist['val accuracy'][-1]
            print("New gBest test acc: " + str(self.gBest_test_acc[0]))

        self.population.particle[i].model_delete()

  def fit(self, Cg, dropout_rate):
    for i in range(1, self.n_iter):
      gBest_acc = self.gBest_acc[i-1]
      gBest_test_acc = self.gBest_test_acc[i-1]

      self.ctr += 1

      self.output_str = self.output_str + 'Iteration: ' + str(i) + "\n"
      incremented = False
      for j in range(self.pop_size):
        print('Iteration: ' + str(i) + ' - Particle: ' + str(j+1))

        # Update particle velocity
        self.population.particle[j].velocity(self.gBest.layers, Cg)

        # Update particle architecture
        self.population.particle[j].update()

        print('Particle NEW architecture: ')
        self.output_str = self.output_str + 'Iteration: ' + str(i) + "Particle - " + str(j+1) + " architecture : " + str(self.population.particle[j]) + "\n"
        self.output_str = self.output_str + json.dumps(self.population.particle[j].layers) + "\n"

        print(self.population.particle[j])
        print(self.population.particle[j].layers)

        self.population.particle[j].model_compile(dropout_rate)

        skip_iteration = False
        for k in range(j):
            if self.population.particle[k] == self.population.particle[j] :
                skip_iteration = True
                self.redd_count += 1
                print('Particule ' + str(j+1) + ' redondante')

                self.population.particle[j].acc = self.population.particle[k].acc

                f_test = self.population.particle[j].acc
                pBest_acc = self.population.particle[j].pBest.acc

                if f_test > pBest_acc:
                    print("Found a new pBest.")
                    print("Current acc: " + str(f_test))
                    print("Past pBest acc: " + str(pBest_acc))
                    self.population.particle[j].pBest = deepcopy(self.population.particle[k])
                break

        if skip_iteration:
            continue

        for k in range(j+1, self.pop_size):
            if self.population.particle[k] == self.population.particle[j] :
                skip_iteration = True
                self.redd_count += 1
                print('Particule ' + str(j+1) + ' redondante')

                self.population.particle[j].acc = self.population.particle[k].acc

                f_test = self.population.particle[j].acc
                pBest_acc = self.population.particle[j].pBest.acc

                if f_test > pBest_acc:
                    print("Found a new pBest.")
                    print("Current acc: " + str(f_test))
                    print("Past pBest acc: " + str(pBest_acc))
                    self.population.particle[j].pBest = deepcopy(self.population.particle[k])

                break

        if skip_iteration:
            continue

        # Compute the acc in the updated particle

        hist = self.population.particle[j].model_fit(self.trainloader, self.testloader, batch_size=self.batch_size, epochs=self.epochs + self.ctr)

        self.population.particle[j].acc = hist['train accuracy'][-1]

        f_test = self.population.particle[j].acc
        pBest_acc = self.population.particle[j].pBest.acc

        if f_test > pBest_acc:
            print("Found a new pBest.")
            print("Current acc: " + str(f_test))
            print("Past pBest acc: " + str(pBest_acc))
            pBest_acc = f_test
            self.population.particle[j].pBest = deepcopy(self.population.particle[j])
            self.population.particle[j].pBest.model_delete()

            if pBest_acc > gBest_acc:
                print("Found a new gBest.")
                self.gBest.model_delete()
                gBest_acc = pBest_acc
                self.gBest = deepcopy(self.population.particle[j])
                gBest_test_acc = hist['val accuracy'][-1]
                if not incremented:
                  self.ctr1 += 1

        self.population.particle[j].model_delete()


      self.gBest_acc[i] = gBest_acc
      self.gBest_test_acc[i] = gBest_test_acc

      print("Current gBest acc: " + str(self.gBest_acc[i]))
      print("Current gBest test acc: " + str(self.gBest_test_acc[i]))

  def fit_gBest(self, batch_size, epochs1, epochs, dropout_rate):
    print("\nFurther training gBest model...")
    #self.gBest.model_compile(dropout_rate)
    self.output_str = self.output_str + "gBest-architecture : " + str(self.gBest) + "\n"
    self.output_str = self.output_str + json.dumps(self.gBest.layers) + "\n"

    with open(self.results_path + "swarms.txt", "w") as f:
      try:
          print(self.output_str, file=f)
      except SyntaxError:
          print >> f, self.output_str

    print(self.gBest)
    print(self.gBest.layers)
    self.gBest.model_summary()

    trainable_count = count_parameters(self.gBest.model)
    print("gBest's number of trainable parameters: " + str(trainable_count))

    metrics = self.gBest.model_fit_complete(self.trainloader, self.testloader, batch_size=batch_size, epochs1=epochs1 + self.ctr1, epochs=epochs)

    print("\ngBest model loss in the test set: " + str(metrics['val loss'][-1]) + " - Test set accuracy: " + str(metrics['val accuracy'][-1]))

    self.gBest.model_delete()

    return trainable_count, metrics