# Mar 14 Meeting

- File1: All kinds of components of a CNN, e.g. residual connection, batch norm,
- File2:

# Genetic CNN

- we describe a way of representing the network structure by a fixed-length bi- nary string.
- several genetic operations are defined, including selection, mutation and crossover, so that we can traverse the search space efficiently and find high-quality solutions.
- ReLU and batch normalization are added after each convolution.

In [57]:
from torchsummary import summary
import random
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import numpy as np
import torch
import torch.nn as nn
import torchvision
import matplotlib.pyplot as plt
from tqdm import tqdm
from torch.nn import functional as F
# from gpu import get_gpu_status
from torchsummary import summary
import gc
from PIL import Image
import glob

import os
import copy
import time
import random
from functools import partial
import itertools
import torch
from torchvision import datasets, transforms
import numpy as np
import copy
import random
import random
from torch.utils.data import Subset
import numpy as np

In [58]:

# from ops import Identity, Sep_Conv, Conv, Stacked_conv, Pooling, Dil_Conv, Op

class Block(torch.nn.Module):
    """
    Block object that inherits the Op
    """
    def __init__(self, num_actions, action_list, num_channels, strides):
        super().__init__()
        assert len(action_list) == num_actions, "the length of the action list must be equal to the number of actions for the block"
        if strides == 2:
            assert len(action_list) <= 3 and num_actions <= 3, "the input block must have less than or equal to 3 layers"
        self.num_actions = num_actions
        self.action_list = action_list
        self.num_channels = num_channels
        self.strides = strides
        self.out_channels = num_channels
        self.identity = True
        self.build_block()


    def build_block(self):
        self.block = nn.ModuleList([])
        action_ls_tmp = copy.deepcopy(self.action_list)
        num_actions_tmp = copy.deepcopy(self.num_actions)

        if self.action_list[0] == "identity":
            self.identity = True
        else:
            self.identity = False

        if self.identity == True:
            if self.strides == 1:
                self.skip_layer = self.str_2_action(self.num_channels, self.num_channels, 'identity', 1)
                del action_ls_tmp[0]
                num_actions_tmp -= 1
            else:
                self.skip_layer = self.str_2_action(self.num_channels, 64, 'identity', 2)
                del action_ls_tmp[0]
                num_actions_tmp -= 1
        else:
            pass

        for idx in range(num_actions_tmp):
            action = action_ls_tmp[idx]
            if self.strides == 1:
                layer = self.str_2_action(self.num_channels, self.num_channels, action, 1)
                self.block.append(layer)
            else:
                if idx == 0:
                    layer = self.str_2_action(3, 32, action, 2)
                elif idx == 1:
                    layer = self.str_2_action(32, 64, action, 2)
                elif idx == 2:
                    layer = self.str_2_action(64, 128, action, 2)
                self.block.append(layer)

    def forward(self, inputs):
        x = inputs
        for op in self.block:
            x = op(x)
        if self.identity == True:
            skip = self.skip_layer(inputs)
            x = nn.functional.relu(x + skip)
        return x


    def str_2_action(self, in_channels, num_channels, action, strides):

        if action == "3*3 dconv":
            x = Sep_Conv(in_channels, num_channels, 3, strides)
            return x

        if action == "5*5 dconv":
            x = Sep_Conv(in_channels, num_channels, 5, strides)
            return x

        if action == "3*3 conv":
            x = Conv(num_channels, 3, strides)
            return x

        if action == "5*5 conv":
            x = Conv(num_channels, 5, strides)
            return x

        if action == "1*7-7*1 conv":
            x = Stacked_conv([num_channels, num_channels], [strides, strides])
            return x

        if action == "3*3 dil conv":
            x = Dil_Conv(in_channels, num_channels, strides)
            return x

        if action == "identity":
            x = Identity(num_channels, strides)
            return x

        if action == "3*3 maxpool":
            x = Pooling(in_channels, "max", strides)
            return x

        if action == "3*3 avgpool":
            x = Pooling(in_channels, "average", strides)
            return x


# # test
# action_list = ["identity", "3*3 avgpool", "3*3 avgpool", "1*7-7*1 conv"]
# b_1 = Block(4, action_list, 128*2, 1)
# x1 = torch.randn(32, 128*2, 32, 32)
# y1 = b_1(x1)

# print(summary(b_1, (128*2, 32, 32)))
# print(y1.shape)

# action_list = ["identity", "5*5 dconv", "3*3 dconv"]
# b_2 = Block(3, action_list, 3, 2)
# x2 = torch.randn(32, 3, 32, 32)
# y2 = b_2(x2)

# print(summary(b_2, (3, 32, 32)))
# print(y2.shape)


class Cell(torch.nn.Module):
    """
    Cell that builds on the Op object. Cell is composed of Blocks.

    Parameters
    ----------

    cell_encoding : List[List]
         [filter_size: int, num_blocks: int, action_list: ["identity", "3*3 avgpool", "1*7-7*1 conv"]]

    cell_idx : int
        idx of the cell

    """

    def __init__(self, cell_idx, cell_encoding, strides=1):
        super().__init__()
        self.action_list = cell_encoding[2]
        self.num_blocks = cell_encoding[1]
        self.strides = strides
        self.num_channels = cell_encoding[0]
        self.cell_idx = cell_idx
        self.build_cell()

    def build_cell(self):
        self.cell = nn.ModuleList([])
        self.first_layer = nn.LazyConv2d(self.num_channels, kernel_size=1, stride=1)
        self.cell.append(self.first_layer)
        for _ in range(self.num_blocks):
            block = Block(len(self.action_list), self.action_list, self.num_channels, 1)
            self.cell.append(block)

    def forward(self, inputs):
        x = inputs
        for block_op in self.cell:
                x = block_op(x)
        return x

    def cell_summary(self):
         print(f"For Cell {self.cell_idx} | the Resolution of the image is 32 * 32 | the channel size is {self.num_channels} | the number of blocks are {self.num_blocks}.")
         print("The summary of the block is")
         block_fake = copy.deepcopy(self.cell[1])
         print(summary(block_fake, (int(self.num_channels), 32, 32)))

# # testing
# ed = [300, 3, ["identity", "3*3 avgpool", "1*7-7*1 conv", "5*5 dconv"]]
# c = Cell(3, ed)
# x = torch.randn(32, 200, 32, 32)
# y = c(x)
# print(c.cell_summary())
# print(y.shape)

class Cell_input(torch.nn.Module):
    """
    Cell that builds on the Op object. Cell is composed of Blocks.

    Parameters
    ----------

    cell_encoding : List[List]
        [filter_size: int, num_blocks: int, action_list: ["identity", "3*3 avgpool", "1*7-7*1 conv"]]

    cell_idx : int
        idx of the cell

    """

    def __init__(self, cell_idx, cell_encoding, strides=2):
        super().__init__()
        self.action_list = cell_encoding[2]
        self.num_blocks = cell_encoding[1]
        self.strides = strides
        self.num_channels = cell_encoding[0]
        self.cell_idx = cell_idx
        self.build_cell()

    def build_cell(self):
        self.cell = nn.ModuleList([])
        for _ in range(self.num_blocks):
            block = Block(len(self.action_list), self.action_list, self.num_channels, 2)
            self.cell.append(block)

    def forward(self, inputs):
        x = inputs
        for block_op in self.cell:
                x = block_op(x)
        return x

    def cell_summary(self):
         print(f"For Cell {self.cell_idx} | the Resolution of the image is 32 * 32 | the channel size is 3 -> 32 -> 64 | the number of blocks are {self.num_blocks}.")
         print("The summary of the block is")
         block_fake = copy.deepcopy(self.cell[0])
         print(summary(block_fake, (3, 32, 32)))

# # testing
# ed = [np.inf, 1, ["identity", "5*5 dconv", "3*3 conv"]]
# c = Cell_input(0, ed)
# x = torch.randn(32, 3, 32, 32)
# y = c(x)
# print(y.shape)
# c.cell_summary()


class Net(torch.nn.Module):
    """
    Net object that inherits the Op that is the next level of Cell
    """
    def __init__(self, net_encoding):
        assert len(net_encoding) == 5, "the number of cell in an individual must be 5"
        super().__init__()
        self.net_ed = net_encoding
        self.post_process_layer = nn.Sequential(nn.AdaptiveAvgPool2d((1, 1)),
                                                nn.Flatten(),
                                                nn.LazyLinear(10),
                                                nn.Softmax(dim=1))
        self.build_net()

    def build_net(self):
        self.net = nn.ModuleList([])
        cell_0 = Cell_input(0, self.net_ed[0])
        cell_1 = Cell(1, self.net_ed[1])
        cell_2 = Cell(2, self.net_ed[2])
        cell_3 = Cell(3, self.net_ed[3])
        cell_4 = Cell(4, self.net_ed[4])

        self.net.append(cell_0)
        self.net.append(cell_1)
        self.net.append(cell_2)
        self.net.append(cell_3)
        self.net.append(cell_4)

    def forward(self, inputs):
        x = inputs
        for cell in self.net:
            x = cell(x)
        output = self.post_process_layer(x)
        return output

    def net_summary(self):
        for cell in self.net:
            cell.cell_summary()
            print("\n")
        print("Plus the post processing layer.\n")



















In [59]:


class Network(torch.nn.Module):
    """
    Net object that inherits the Op that is the next level of Cell
    """
    def __init__(self, net_encoding = None, learning_rate = 0.001, device='cpu'):
        assert len(net_encoding) == 5, "the number of cell in an individual must be 5"
        super().__init__()
        if net_encoding == None:
            self.net_ed = full_ed_generator(0.5)
        else:
            self.net_ed = net_encoding

        self.post_process_layer = nn.Sequential(nn.AdaptiveAvgPool2d((1, 1)),
                                                nn.Flatten(),
                                                nn.LazyLinear(10),
                                                nn.Softmax(dim=1))
        self.build_net()
        self.performance_history=[]
        self.learning_rate = learning_rate
        self.device = device
        self.assemble_model()

    def assemble_model(self):
        """Assembles the neural network model."""
        self.model = nn.Sequential(*self.net, self.post_process_layer).to(self.device)

    def update_performance(self, new_score):
        self.performance_history.append(new_score)

    def build_net(self):
        self.net = nn.ModuleList([])
        cell_0 = Cell_input(0, self.net_ed[0])
        cell_1 = Cell(1, self.net_ed[1])
        cell_2 = Cell(2, self.net_ed[2])
        cell_3 = Cell(3, self.net_ed[3])
        cell_4 = Cell(4, self.net_ed[4])

        self.net.append(cell_0)
        self.net.append(cell_1)
        self.net.append(cell_2)
        self.net.append(cell_3)
        self.net.append(cell_4)

    def forward(self, inputs):
        x = inputs
        for cell in self.net:
            x = cell(x)
        output = self.post_process_layer(x)
        return output

    def net_summary(self):
        for cell in self.net:
            cell.cell_summary()
            print("\n")
        print("Plus the post processing layer.\n")

    @property
    def average_performance(self):
        """Calculate the average performance of the node."""
        if not self.performance_history:
            return 0
        return sum(self.performance_history) / len(self.performance_history)

    def train(self, train_data, epochs=20, device='cpu'):
        self.model.train()
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)

        for epoch in range(epochs):
            for inputs, labels in DataLoader(train_data, batch_size=64, shuffle=True):
                inputs, labels = inputs.to(self.device), labels.to(self.device)
                optimizer.zero_grad()
                outputs = self.model(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()

    def evaluate_node(self, validation_data, device='cpu'):
        self.model.eval()
        correct = 0
        total = 0

        with torch.no_grad():
            for inputs, labels in DataLoader(validation_data, batch_size=64, shuffle=False):
                inputs, labels = inputs.to(self.device), labels.to(self.device)
                outputs = self.model(inputs)
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        accuracy = correct / total
        self.performance_history.append(accuracy)
        return accuracy















In [60]:

class Op(torch.nn.Module):
    """
    Op object, basic object
    Each of them operates on a single tensor

    Methods
    -------

    forward : method
        Parameters
        ---------

        input : tensor

        Returns
        -------

        x : tensor

    Usage
    -----

    operation = Op()
    inputs = torch.randn(32, 3, 32, 32)
    output = Op(inputs)


    Note
    ----

    The Op only applies to CIFAR10 image format i.e. 4d tensor
    with shape [batch_size, num_channel, 32, 32]
    """
    def __init__(self):
        super().__init__()


class Identity(Op):
    """
    Identity operation, which is None

    Parameters
    ----------

    num_channels : int
        channel size
    strides : int
        1, or 2
    """

    def __init__(self, num_channels, strides):
        super().__init__()

        if strides == 2:
            self.op = nn.LazyConv2d(num_channels, kernel_size=1, stride=strides, padding=16)
        else:
            self.op = lambda x: x
        self.out_channels = num_channels

    def forward(self, inputs):
        return self.op(inputs)


class Sep_Conv(Op):
    """
    Seperable convolution, (depthwise conv -> pointwise conv) -> batchnorm -> relu

    Parameters
    ----------

    in_channels : int
        in_channel size of input
    num_channels : int
        channel size, num of filters
    kernel : int
        kernel size, {3, 5, 7}
    strides : int
        either 1 or 2
    """
    def __init__(self, in_channels, num_channels, kernel, strides):
        super().__init__()
        assert kernel in [3, 5, 7], "kernel not in the range {3, 5, 7}"

        if kernel == 3:
            if strides == 1:
                self.conv_d = nn.Conv2d(in_channels, in_channels, kernel_size=kernel, groups = in_channels, stride=strides, padding="same")
            else:
                self.conv_d = nn.Conv2d(in_channels, in_channels, kernel_size=kernel, groups = in_channels, stride=strides, padding=17)
        elif kernel == 5:
            if strides == 1:
                self.conv_d = nn.Conv2d(in_channels, in_channels, kernel_size=kernel, groups = in_channels, stride=strides, padding="same")
            else:
                self.conv_d = nn.Conv2d(in_channels, in_channels, kernel_size=kernel, groups = in_channels, stride=strides, padding=18)
        elif kernel == 7:
            if strides == 1:
                self.conv_d = nn.Conv2d(in_channels, in_channels, kernel_size=kernel, groups = in_channels, stride=strides, padding="same")
            else:
                self.conv_d = nn.Conv2d(in_channels, in_channels, kernel_size=kernel, groups = in_channels, stride=strides, padding=19)

        self.conv_p = nn.LazyConv2d(num_channels, kernel_size=1)
        self.bn = nn.LazyBatchNorm2d()
        self.relu = nn.ReLU(inplace=True)
        self.out_channels = num_channels

    def forward(self, inputs):
        x = self.conv_d(inputs)
        x = self.conv_p(x)
        x = self.bn(x)
        x = self.relu(x)
        return x

class Conv(Op):
    """
    Base convolution object

    Parameters
    ----------

    num_channels : int
        output channels
    kernel : int
        kernel size, {3, 7}
    strides : int
        stride size, {1, 2}
    """

    def __init__(self, num_channels, kernel, strides):
        super().__init__()

        if kernel == 3:
            if strides == 1:
                self.conv = nn.LazyConv2d(num_channels, kernel_size=kernel, stride=strides, padding="same")
            else:
                self.conv = nn.LazyConv2d(num_channels, kernel_size=kernel, stride=strides, padding=17)
        if kernel == 5:
            if strides == 1:
                self.conv = nn.LazyConv2d(num_channels, kernel_size=kernel, stride=strides, padding="same")
            else:
                self.conv = nn.LazyConv2d(num_channels, kernel_size=kernel, stride=strides, padding=18)
        elif kernel == (1, 7):
            if strides == 1:
                self.conv = nn.LazyConv2d(num_channels, kernel_size=kernel, stride=strides, padding="same")
            else:
                self.conv = nn.LazyConv2d(num_channels, kernel_size=kernel, stride=strides, padding=(16, 19))
        elif kernel == (7, 1):
            if strides == 1:
                self.conv = nn.LazyConv2d(num_channels, kernel_size=kernel, stride=strides, padding="same")
            else:
                self.conv = nn.LazyConv2d(num_channels, kernel_size=kernel, stride=strides, padding=(19, 16))
        self.bn = nn.LazyBatchNorm2d()
        self.relu = nn.ReLU(inplace=True)
        self.out_channels = num_channels

    def forward(self, inputs):
        x = self.conv(inputs)
        x = self.bn(x)
        x = self.relu(x)
        return x

class Stacked_conv(Op):
    """
    Stacked convolution of 1 * 7 followed by 7 * 1 convolution

    Parameters
    ----------

    channel_list : list[int]
        e.g. [64, 128]
    stride_list : list[int]
        e.g. [1, 2]

    """
    def __init__(self, channel_list, stride_list, kernel_list=[(1, 7), (7, 1)]):
        super().__init__()
        assert kernel_list == [(1, 7), (7, 1)], "kernel list must be [(1, 7), (7, 1)]"
        assert len(channel_list) == len(kernel_list) and len(kernel_list) == len(stride_list), "List lengths must match"
        self.convs = nn.ModuleList([])
        for _, (c, k, s) in enumerate(zip(channel_list, kernel_list, stride_list)):
            convolution = Conv(c, k, s)
            self.convs.append(convolution)
        self.out_channels = channel_list[1]

    def forward(self, inputs):
        x = inputs
        for op in self.convs:
            x = op(x)
        return x


class Pooling(Op):
    """
    Pooling operation, two variations
    1. 3 by 3 average pooling
    2. 3 by 3 max pooling

    Parameters
    ----------
    in_channels : int
        input channel number
    type : str
        "max" or "average"
    strides : int
        1 or 2
    """

    def __init__(self, in_channels, type, strides, size = 3):
        super().__init__()
        assert size == 3, "kernel size must be 3"
        self.strides = strides
        if type == "max":
            if strides == 1:
                self.pool = nn.MaxPool2d(size, strides, padding = int(np.floor(size / 2)))
            else:
                self.pad = nn.ZeroPad2d(16)
                self.pool = nn.MaxPool2d(size, strides, padding = 1)
        elif type == "average":
            if strides == 1:
                self.pool = nn.AvgPool2d(size, strides, padding = int(np.floor(size / 2)))
            else:
                self.pad = nn.ZeroPad2d(16)
                self.pool = nn.AvgPool2d(size, strides, padding = 1)
        self.out_channels = in_channels

    def forward(self, inputs):
        if self.strides == 2:
            x = self.pad(inputs)
            x = self.pool(x)
        else:
            x = self.pool(inputs)
        return x


class Dil_Conv(Op):
    """
    3 by 3Seperable dilated convolution, (depthwise conv -> pointwise conv) -> batchnorm -> relu

    Parameters
    ----------

    in_channels : int
        in_channel size of input
    num_channels : int
        channel size, num of filters
    kernel : int
        kernel size, {3, 5, 7}
    strides : int
        either 1 or 2
    """
    def __init__(self, in_channels, num_channels, strides, kernel=3, dilation=2):
        super().__init__()
        assert kernel == 3, "kernel not equal to 3"

        if strides == 1:
            self.conv_d = nn.Conv2d(in_channels, in_channels, kernel_size=kernel, groups = in_channels, stride=strides, dilation=dilation, padding="same")
        else:
            self.conv_d = nn.Conv2d(in_channels, in_channels, kernel_size=kernel, groups = in_channels, stride=strides, dilation=dilation, padding=18)

        self.conv_p = nn.LazyConv2d(num_channels, kernel_size=1)
        self.bn = nn.LazyBatchNorm2d()
        self.relu = nn.ReLU(inplace=True)
        self.out_channels = num_channels

    def forward(self, inputs):
        x = self.conv_d(inputs)
        x = self.conv_p(x)
        x = self.bn(x)
        x = self.relu(x)
        return x











In [61]:

kernel_sizes = [3, 5, 7]
strides = [1, 2]
in_channels_options = [32, 64]
num_channels_options = [32, 64, 128]

conv_combinations = [
    partial(Conv, num_channels=num_channels, kernel=kernel, strides=stride)
    for num_channels, kernel, stride in itertools.product(num_channels_options, kernel_sizes, strides)
]

sep_conv_combinations = [
    partial(Sep_Conv, in_channels=in_channels, num_channels=num_channels, kernel=kernel, strides=stride)
    for in_channels, num_channels, kernel, stride in itertools.product(in_channels_options, num_channels_options, kernel_sizes, strides)
]

available_ops = conv_combinations + sep_conv_combinations

action_ls_full = ["identity", "3*3 dconv",  "5*5 dconv", "3*3 conv", "5*5 conv", "1*7-7*1 conv", "3*3 dil conv", "3*3 maxpool", "3*3 avgpool"]
action_ls_input = ["identity", "3*3 dconv",  "5*5 dconv", "3*3 conv", "5*5 conv"]



def load_cifar10_data():
    """Load and preprocess CIFAR-10 data."""
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])

    # Load CIFAR-10 data
    train_data_full = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
    test_data = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
    train_size = int(0.8 * len(train_data_full))
    validation_size = len(train_data_full) - train_size
    train_data, validation_data = torch.utils.data.random_split(train_data_full, [train_size, validation_size])

    return train_data, validation_data, test_data



def input_ed_generator(random_pre):
    ed_input = []
    action_ls = copy.deepcopy(action_ls_input)
    random_p = random.random()
    if random_p >= random_pre:
        ed_input.append('identity')
        del action_ls[0]
    else:
        del action_ls[0]
    len_remain = 3 - len(ed_input)
    ed_input = ed_input + [random.choice(action_ls) for _ in range(len_remain)]
    return ed_input

def normal_ed_generator(random_pre):
    ed_input = []
    action_ls = copy.deepcopy(action_ls_full)
    random_p = random.random()
    if random_p >= random_pre:
        ed_input.append('identity')
        del action_ls[0]
    num_ls = [2, 3, 4]
    num_actions = random.choice(num_ls)
    len_remain = num_actions - len(ed_input)
    ed_input = ed_input + [random.choice(action_ls) for _ in range(len_remain)]
    return ed_input

def full_ed_generator(random_pre):
    """
    Net encoding generator

    Parameters
    ----------

    random_pre : float
        [0, 1],
        if randomly chosen probability random_p > random_pre,
        assign "identity" operation to the first position of the action list

    Variables in the encoding of a cell
    -----------------------------------

    [num_channels: int, num_blocks: int, action_list: ["identity", "3*3 avgpool", "1*7-7*1 conv"]]

    num_channel : int
        randomly chosen from the list [24, 40, 64, 80, 128, 256]
    action_list : List[str]
        a list of string, e.g. ["identity", "3*3 avgpool", "1*7-7*1 conv"],
        randomly initialized.

    """

    net_ed = []
    channel_list = [24, 40, 64, 80, 128, 256]
    ed_cell_0 = [np.inf, 1, input_ed_generator(random_pre)]
    ed_cell_1 = [random.choice(channel_list), 1, normal_ed_generator(random_pre)]
    ed_cell_2 = [random.choice(channel_list), 2, normal_ed_generator(random_pre)]
    ed_cell_3 = [random.choice(channel_list), 3, normal_ed_generator(random_pre)]
    ed_cell_4 = [random.choice(channel_list), 4, normal_ed_generator(random_pre)]

    net_ed.append(ed_cell_0)
    net_ed.append(ed_cell_1)
    net_ed.append(ed_cell_2)
    net_ed.append(ed_cell_3)
    net_ed.append(ed_cell_4)

    return net_ed

In [62]:

def initialize_population(size=10, device = 'cpu'):
    """Initialize a population of networks."""
    population = [Network(net_encoding=full_ed_generator(0.5), device = device) for _ in range(size)]
    return population

def evaluate_fitness(node, train_data, validation_data, epoch = 20, device ='cpu', save=True):
    """Evaluate the fitness of a node by training and then measuring performance."""
    node.train(train_data, epochs=epoch, device = device)
    performance_score = node.evaluate_node(validation_data, device = device)
    if save:
      node.update_performance(performance_score)
    return performance_score


def select_population(population, to_keep=5):
    """Select the best-performing nodes based on average performance."""
    sorted_population = sorted(population, key=lambda x: x.average_performance, reverse=True)
    return sorted_population[:to_keep]

def weighted_selection(nodes, number_of_parents=2):
    """Selects parents based on their average performance."""
    total_performance = sum(node.average_performance for node in nodes)
    if total_performance == 0:
        weights = [1/len(nodes)] * len(nodes)
    else:
        weights = [node.average_performance / total_performance for node in nodes]

    selected_parents = random.choices(nodes, weights=weights, k=number_of_parents)
    return selected_parents

def crossover_and_mutate(parents, available_ops, population_size=15, to_keep=5, device = 'cpu'):
    """Generate a new population, ensuring the best 'to_keep' nodes are included."""
    new_population = parents
    while len(new_population) < population_size:
        parent1, parent2 = weighted_selection(parents, number_of_parents=2)
        child = create_new_child(parent1, parent2, device = device)
        new_population.append(child)

    return new_population

def create_new_child(parent1, parent2, base_mutation_rate=0.05, mutation_increase=0.05, device = 'cpu', random_pre=0.2, channel_list = [24, 40, 64, 80, 128, 256]):
    child_net_ed = []
    num_cells = len(parent1.net_ed)

    for i in range(num_cells):
        if random.random() < 0.5:
            child_net_ed.append(copy.deepcopy(parent1.net_ed[i]))
        else:
            child_net_ed.append(copy.deepcopy(parent2.net_ed[i]))
        adj = float(mutation_increase) * float(i / (num_cells - 1))
        adjusted_mutation_rate = float(base_mutation_rate) + adj

        if random.random() < adjusted_mutation_rate:
            if i == 0:
                ed_cell = [np.inf, 1, input_ed_generator(random_pre)]
            else:
                ed_cell = [random.choice(channel_list), 1, normal_ed_generator(random_pre)]
            child_net_ed[i] = ed_cell

    child_net = Network(net_encoding=child_net_ed, learning_rate=random.choice([parent1.learning_rate, parent2.learning_rate]), device = device)
    return child_net

def select_data_subsets(train_data, subset_size=20):
    """
    Selects random subsets from the training data for training and validation.

    Parameters:
    - train_data: The dataset from which to select subsets.
    - subset_size (int): The size of the subsets to select for both training and validation.

    Returns:
    - A tuple containing the training and validation subsets.
    """
    indices = np.random.choice(len(train_data), 2 * subset_size, replace=False)
    train_indices = indices[:subset_size]
    val_indices = indices[subset_size:]
    train_subset = Subset(train_data, train_indices)
    val_subset = Subset(train_data, val_indices)

    return train_subset, val_subset

def genetic_algorithm(train_data, validation_data, generations=100, population_size=10, to_keep=5, subset_size=120, train_epoches = 10, device = 'cpu'):
    population = initialize_population(size=population_size, device = device)
    for generation in tqdm(range(generations)):
        print(f"Generation {generation + 1}")
        train_subset, test_subset = select_data_subsets(train_data, subset_size=subset_size)

        test_performances = []
        i = 0
        for node in tqdm(population):
            performance_score = evaluate_fitness(node, train_subset, test_subset, train_epoches, device)
            test_performances.append(performance_score)
            #print(f"training on node {i}, performance {performance_score}")
            i+=1

        print(f"Finish Generation {i}")
        avg_test_performance = sum(test_performances) / len(test_performances)
        best_child_index = test_performances.index(max(test_performances))
        best_child = population[best_child_index]
        best_child_test_score = test_performances[best_child_index]

        print(f"Average performance on test subset: {avg_test_performance}")
        print(f"Best child's performance on test subset: {best_child_test_score}")
        # print(f"Best child's performance on validation subset{evaluate_fitness(best_child, train_data, validation_data, 10, device, save=False)}")
        print(f"Best child's structure: {summary(best_child, (3, 32, 32))}")

        selected = select_population(population, to_keep=to_keep)
        population = crossover_and_mutate(selected, available_ops=available_ops, population_size=population_size, to_keep=to_keep, device = device)



In [63]:

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'Using device: {device}')
train_data, validation_data, _ = load_cifar10_data()
genetic_algorithm(train_data, validation_data, device = device)

Using device: cuda
Files already downloaded and verified
Files already downloaded and verified


  0%|          | 0/100 [00:00<?, ?it/s]

Generation 1



  0%|          | 0/10 [00:00<?, ?it/s][A
 10%|█         | 1/10 [00:09<01:28,  9.85s/it]
  0%|          | 0/100 [00:09<?, ?it/s]


KeyboardInterrupt: 