In [1]:
import os

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import pandas as pd
from torchvision import datasets, transforms


# 1. Initialize dataset

We initialize the dataset object, define required transformations and create the training and evaluation data loaders

In [2]:
batch_size = 64
use_mps = True and torch.backends.mps.is_available()
test_batch_size = 1000
epochs = 14
lr = 1.0
gamma = 0.7
seed = 1
log_interval = 10
save_model = False

if use_mps:
    device = torch.device("mps")
else:
    device = torch.device("cpu")
    
train_kwargs = {'batch_size': batch_size}
test_kwargs = {'batch_size': test_batch_size}

In [3]:


transform=transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
        ])
dataset1 = datasets.MNIST('../data', train=True, download=True,
                       transform=transform)
dataset2 = datasets.MNIST('../data', train=False,
                       transform=transform)
train_loader = torch.utils.data.DataLoader(dataset1,**train_kwargs)
test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)

In [4]:
dataset1[0][0].shape

torch.Size([1, 28, 28])

# 2. Define the model creation function

In [5]:
class ConstructNet(nn.Module):
    def __init__(self, DNA, loss_fn, input_size, output_size):
        #print(DNA)
        super(ConstructNet, self).__init__()
        self.DNA = DNA
        self.input_size = input_size
        self.output_size = output_size
        self.layers = []
        self.loss_fn = loss_fn

        # Append first layer
        self.layers.append(nn.Conv2d(1, 2 * self.DNA[0][1], kernel_size=3, stride=1, padding=1))
        self.layers.append(nn.ReLU())
        
        for i in range(1, len(self.DNA)):
            #print(self.DNA[i])
            if self.DNA[i][0] == "C":
                self.layers.append(nn.Conv2d(2 * self.DNA[i-1][1], 2 * self.DNA[i][1], kernel_size=3, stride=1, padding=1))
                self.layers.append(nn.ReLU())
            if self.DNA[i][0] == "D":
                # The input size is the output of the last layer
                tmp_input_size = self.last_layer_output_size()
                self.layers.append(nn.Linear(tmp_input_size, self.DNA[i][1]))
                self.layers.append(nn.ReLU())
            if self.DNA[i][0] == "R":
                self.layers.append(nn.Dropout(self.DNA[i][1]))

        # Append the output layer 
        self.layers.append(nn.Flatten())
        
        cnn_output = self.cnn_output_size()
        self.layers.append(nn.Linear(cnn_output[1], self.output_size*10))
        self.layers.append(nn.ReLU())
        self.layers.append(nn.Linear(self.output_size*10, self.output_size))
        
        self.net = nn.Sequential(*self.layers, nn.Softmax(dim=1))
    def cnn_output_size(self):
        input = torch.ones(64, 1, self.input_size,self.input_size)
        #print(input.shape)
        for layer in self.layers:
            input = layer(input)
        #print(self.layers)
        return input.shape
    
    def forward(self, x):
        #print(x.shape)
        return self.net(x)

    '''
    Based on the layers created, find the output size of the last dense layer
    '''

    def last_layer_output_size(self):
        for layer in self.layers[::-1]:
            if isinstance(layer, nn.Linear):
                return layer.out_features

    def train_net(self, device, train_loader, optimizer, epoch, log_interval, print_stats):
        self.train()
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = self.forward(data)
            #print(output)
            loss = self.loss_fn(output, target)
            loss.backward()
            optimizer.step()
            if batch_idx % log_interval == 0 and print_stats:
                print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                    epoch, batch_idx * len(data), len(train_loader.dataset),
                           100. * batch_idx / len(train_loader), loss.item()))

    def test(self, device, test_loader, print_stats=False):
        self.eval()
        test_loss = 0
        correct = 0
        with torch.no_grad():
            for data, target in test_loader:
                data, target = data.to(device), target.to(device)
                output = self.forward(data)
                #print(f"output: {output}, target: {target}")
                test_loss += self.loss_fn.forward(output, target).item()  # sum up batch loss
                pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
                correct += pred.eq(target.view_as(pred)).sum().item()
        test_loss /= len(test_loader.dataset)
        accuracy = correct / len(test_loader.dataset)
        if print_stats:
            print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
                test_loss, correct, len(test_loader.dataset),
                100. * accuracy))
        return test_loss, accuracy

    def count_parameters(self):
        # https://discuss.pytorch.org/t/how-do-i-check-the-number-of-parameters-of-a-model/4325/7
        return sum(p.numel() for p in self.net.parameters() if p.requires_grad)        

# 3. Add GA functions

In [6]:

import random
import numpy as np
class GA():
    def __init__(self, generation_length, population_size, initial_size, initial_depth, 
                 lossFn, input_size, output_size, device, train_dataloader, test_dataloader, optimizerFn, epoch):
        self.generation_length = generation_length
        self.population_size = population_size
        self.initial = True # No previous individuals can be used for mutations
        self.initial_size = initial_size
        self.initial_depth = initial_depth
        
        self.lossFn = lossFn
        self.optimizerFn = optimizerFn
        self.input_size = input_size
        self.output_size = output_size
        
        self.device = device
        self.train_dataloader = train_dataloader
        self.test_dataloader = test_dataloader
        self.epoch = epoch
        self.population = []
        
        #self.generate_population()
    
    def auto_evolve(self):
        for i in range(self.generation_length):
            #print(f"Generation #{i}")
            self.generate_population()
            self.train_population()
            self.evaluate_population()
            self.save_population(gen_nm=i)
            self.calculate_fitness()
            self.print_statistics(gen_nm=i, raw=True)
    
    def save_population(self, gen_nm):
        os.mkdir(f"models/gen_{gen_nm}")
        for index,individual in enumerate(self.population):
            acc = individual['accuracy']
            params = individual['parameters']
            torch.save(individual['model'], f"models/gen_{gen_nm}/ind_{index}_{params}_{acc}.pt")
    def print_statistics(self, gen_nm, raw=False):
        fitness_list = []
        parameters_list = []
        accuracy_list = []
        for item in self.population:
            fitness_list.append(item['fitness'])
            parameters_list.append(item['parameters'])
            accuracy_list.append(item['accuracy'])
        if raw:
            print(f"{gen_nm}, {np.min(fitness_list)}, {np.min(parameters_list)}, {np.min(accuracy_list)}, {np.mean(fitness_list)}, {np.mean(parameters_list)}, {np.mean(accuracy_list)}, {np.max(fitness_list)}, {np.max(parameters_list)}, {np.max(accuracy_list)}")
        else:
            print(f"Average fitness: {np.mean(fitness_list)}, Average parameters: {np.mean(parameters_list)}, Average accuracy: {np.mean(accuracy_list)}, Min fitness: {np.min(fitness_list)}, Min parameters: {np.min(parameters_list)}, Min accuracy: {np.min(accuracy_list)}, Max fitness: {np.max(fitness_list)}, Max parameters: {np.max(parameters_list)}, Max accuracy: {np.max(accuracy_list)}")
        
        
    def generate_population(self):
        tmp_list = []
        for index,item in enumerate(self.population):
            tmp_list.append({
                'genome': item['genome'],
                'model': item['model'],
            })
        self.population = tmp_list
        del tmp_list
        
        if self.initial:
            for i in range(self.population_size):
                genome = self.generate_individual()
                self.population.append({'genome': genome})
        else:
            # Delete 33% of the population
            del self.population[:int(np.floor(self.population_size/3))]
            # Generate new individuals to have the same population size
            for i in range(self.population_size-len(self.population)):
                genome = self.generate_individual()
                self.population.append({'genome': genome})
        # Create models for each individual        
        for index,individual in enumerate(self.population):
            model = ConstructNet(individual['genome'], self.lossFn, self.input_size, self.output_size)
            model.to(device)
            individual['model'] = model
            self.population[index] = individual
        self.initial = False
            
    def generate_individual(self):
        genome = []
        if self.initial:
            for chromosome in range(random.randint(1, self.initial_depth)):
                layer_size = random.randint(1,self.initial_size)
                #layer_size = random.randint(np.floor(self.initial_size/2), np.ceil(self.initial_size*2))
                genome.append(["C",layer_size])
        else:
            # Get the DNA of one of the best 5 individuals, that are at the end of the list
            random_index = random.randint(len(self.population)-5, len(self.population)-1)
            #print(random_index)
            original_genome = self.population[random_index]['genome'].copy()
            genome = self.mutate_genome(original_genome)
        #print(genome)
        return genome
    
    def mutate_genome(self, genome):
        # example genome: [["D",1024],["D",512],["D",128]]
        # possible mutations: Add new larger layer, add new smaller layer, change layer size, remove layer
        # Added +1 to new layer size to prevent 0 sized layers
        # 1. Add new larger layer
        if random.random() < 0.15:
            # select random layer
            layer_index = random.randint(0, len(genome)-1)
            new_layer_size = genome[layer_index][1] + 1
            genome.insert(layer_index, ["C",new_layer_size])
        # 2. Add new smaller layer
        if random.random() < 0.15:
            # select random layer
            layer_index = random.randint(0, len(genome)-1)
            new_layer_size = max(genome[layer_index][1] - 1,1)
            genome.insert(layer_index, ["C",new_layer_size])
        # 3. Change layer size by growing or shrinking
        if random.random() < 0.15:
            # select random layer
            layer_index = random.randint(0, len(genome)-1)
            direction = random.choice([-1,1])
            new_layer_size = genome[layer_index][1] + direction
            if len(genome) == 1 and new_layer_size < 1:
                pass
            elif len(genome) > 1 and new_layer_size < 1:
                del genome[layer_index]
            else:
                genome[layer_index][1] = new_layer_size
        # 4. Remove layer
        if random.random() < 0.15 and len(genome) > 1:
            # select random layer
            layer_index = random.randint(0, len(genome)-1)
            del genome[layer_index]
        return genome
    
    def print_population(self, top_n=False):
        if not top_n:
            top_n = len(self.population)
        for index,individual in enumerate(self.population[-top_n:]):
            print(f"Individual #{index}: {individual}")
            
    def train_population(self):
        #print(self.population)
        for index,individual in enumerate(self.population):
            #print(f"Training individual #{index}")
            #print(individual['model'])
            if self.optimizerFn == "Adam":
                optimizer = torch.optim.Adam(individual['model'].parameters(), lr=0.01)
            individual['model'].train_net(self.device,self.train_dataloader, optimizer,self.epoch, 100, False)
    
    def evaluate_population(self):
        for index,individual in enumerate(self.population):
            #print(f"Evaluating individual #{index}")
            loss, accuracy = individual['model'].test(self.device, self.test_dataloader)
            individual['id'] = index
            individual['loss'] = loss
            individual['accuracy'] = accuracy
            individual['parameters'] = individual['model'].count_parameters()
            self.population[index] = individual
        
    def calculate_fitness(self):
        # Create copy of list using only a few columns
        tmp_list = []
        for index,item in enumerate(self.population):
            tmp_list.append({
                'id' : item['id'],
                'accuracy': item['accuracy'],
                'parameters': item['parameters'],
                'fitness': 0
            })
        accuracy_list = sorted(tmp_list, key=lambda d: (d['accuracy'],d['parameters']))
        # Adding more emphasis on accuracy
        for index,item in enumerate(accuracy_list):
            accuracy_list[index]['fitness'] += index * 10 * item['accuracy']
        # Reverse sort, smaller is better
        parameter_size_list = sorted(accuracy_list, key=lambda d: d['parameters'], reverse=True)
        for index,item in enumerate(parameter_size_list):
            pass
            #parameter_size_list[index]['fitness'] += index
        sorted_by_fitness = sorted(parameter_size_list, key=lambda d: d['fitness'])
        for fitness in sorted_by_fitness:
            for index,individual in enumerate(self.population):
                if individual['id'] == fitness['id']:
                    individual['fitness'] = fitness['fitness']
            self.population[index] = individual
        # Sort population
        self.population = sorted(self.population, key=lambda d: d['fitness'])
        #print(self.population)

In [7]:
generation = GA(generation_length=100,population_size=30,initial_size=20,initial_depth=5, lossFn=nn.CrossEntropyLoss(), input_size=28, output_size=10, device=device, train_dataloader=train_loader, test_dataloader=test_loader, optimizerFn="Adam", epoch=15  )
generation.auto_evolve()

0, 0.0, 315404, 0.0958, 40.1999, 1670996.7333333334, 0.19628000000000004, 214.832, 3150132, 0.7408
1, 0.0, 315348, 0.0958, 52.38473333333334, 1287945.5333333334, 0.25392333333333333, 233.856, 3150132, 0.8064
2, 0.0, 315348, 0.0959, 62.6222, 766954.9333333333, 0.30787, 235.27700000000002, 3150132, 0.8113
3, 0.0, 314750, 0.0915, 77.23963333333333, 482818.93333333335, 0.3909633333333333, 233.04399999999998, 1730508, 0.8036
4, 0.0, 157930, 0.098, 80.7477, 404007.93333333335, 0.41276999999999997, 263.465, 944808, 0.9085
5, 0.0, 157930, 0.098, 108.21976666666667, 346302.5333333333, 0.6385966666666665, 270.715, 942630, 0.9335
6, 0.0, 314750, 0.101, 99.57373333333334, 372377.2, 0.5808666666666666, 235.799, 942630, 0.8131
7, 0.0, 314750, 0.1011, 97.29889999999999, 346274.86666666664, 0.56108, 241.106, 471900, 0.8314
8, 0.0, 314750, 0.098, 99.11146666666667, 351473.2, 0.5580166666666667, 272.281, 471900, 0.9389
9, 0.0, 314750, 0.1135, 105.94506666666666, 346232.6, 0.5970133333333335, 265.495, 47