In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import torch.nn.functional as F

import numpy as np
import pandas as pd
import random

import itertools
import time



In [3]:
class ConvNet(nn.Module):
    def __init__(self, input_channels, hidden_channels, num_classes, kernel_sizes=None):
        super(ConvNet, self).__init__()

        if kernel_sizes is None:
            k = [[3]]
        else:
            k = kernel_sizes

        self.conv1 = nn.Conv2d(input_channels, hidden_channels[0], kernel_size=k[0][0], stride=1, padding=1)
        self.relu1 = nn.ReLU()

        self.conv_layers = nn.ModuleList()
        self.pool_layers = nn.ModuleList()  # Add module list for pooling layers

        for i in range(1, len(hidden_channels)):
            self.conv_layers.append(nn.Conv2d(hidden_channels[i-1], hidden_channels[i], kernel_size=k[i][0], stride=1, padding=1))
            self.relu = nn.ReLU()
            self.pool_layers.append(nn.MaxPool2d(kernel_size=k[i][1], stride=2))  # Add max pooling layer

        self.fc = nn.Linear(hidden_channels[-1], num_classes)

    def forward(self, x):
        out = self.conv1(x)
        out = self.relu1(out)

        for conv_layer, pool_layer in zip(self.conv_layers, self.pool_layers):  # Iterate over conv and pool layers
            out = conv_layer(out)
            out = self.relu(out)
            out = pool_layer(out)  # Apply max pooling

        out = F.avg_pool2d(out, kernel_size=out.size()[2:])  # Global average pooling
        out = out.view(out.size(0), -1)  # Flatten the tensor

        out = self.fc(out)

        if not self.training:
            out = F.softmax(out, dim=1)
        
        return out

In [32]:
def trainModel(model, data_loader, epochSize=20):
    train_loader, test_loader = data_loader[0], data_loader[1]

    loss_fn = nn.CrossEntropyLoss()
    opt = torch.optim.Adam(model.parameters())

    loss_per_epoch = []
    train_acc_per_epoch = []
    test_acc_per_epoch = []
    total_acc_per_epoch = []
    time_per_epoch = []
    exec_time = []

    start_total_time = time.time()
    for epoch in range(epochSize):

        loss = 0
        start_epoch_time = time.time()

        count = 1
        for input_batch, target_batch in train_loader:
            print(
                f"Epoch: {epoch + 1} => {time.time() - start_epoch_time:.2f}s {(count/count_batch_train)*100:.3f}%", end='')

            # Zero the gradients
            opt.zero_grad()

            # Forward pass
            predict_batch = model(input_batch)

            # Compute loss
            loss_batch = loss_fn(predict_batch, target_batch)

            # Backward pass and update weights
            loss_batch.backward()
            opt.step()

            loss += loss_batch.item()  # store the loss
            count += 1
            print('\r', end='', flush=True)

        loss_per_epoch.append(loss)
        # print(loss)

        # CALCULATE TRAIN ACCURACY
        correct = 0
        total = 0
        train_accuracy = 0

        with torch.no_grad():
            count = 1
            for images, labels in train_loader:
                print(f"Epoch: {epoch + 1} => {time.time() - start_epoch_time:.2f}s || Calculating Training Accuracy... {(count/count_batch_train)*100:.3f}%", end='', flush=True)
                # Forward pass
                outputs = model(images)

                # Get the predicted labels
                _, predicted = torch.max(outputs.data, 1)

                # Update counts
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
                count += 1
                print('\r', end='', flush=True)

        # Calculate accuracy
        train_accuracy = correct / total
        train_acc_per_epoch.append(train_accuracy)

        # CALCULATE TEST ACCURACY
        correct = 0
        total = 0
        test_accuracy = 0

        with torch.no_grad():
            count = 1
            for images, labels in test_loader:
                print(f"Epoch: {epoch + 1} => {time.time() - start_epoch_time:.2f}s || Calculating Testing Accuracy... {(count/count_batch_test)*100:.3f}%", end='', flush=True)
                # Forward pass
                outputs = model(images)

                # Get the predicted labels
                _, predicted = torch.max(outputs.data, 1)

                # Update counts
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
                count += 1
                print('\r', end='', flush=True)

        # Calculate accuracy
        test_accuracy = correct / total
        test_acc_per_epoch.append(test_accuracy)

        time_epoch = time.time() - start_epoch_time
        time_current = time.time() - start_total_time

        time_per_epoch.append(time_epoch)
        exec_time.append(time_current)

        total_accuracy = 0.3*train_accuracy + 0.7*test_accuracy
        total_acc_per_epoch.append(total_accuracy)

        print(f'Epoch: {epoch+1} || Loss: {loss} || Train Acc: {train_accuracy * 100:.4f}% || Test Acc: {test_accuracy * 100:.4f}% || Total Acc: {total_accuracy * 100:.4f}% || Epoch Time: {time_epoch:.4f} s || Current Runtime: {time_current:.4f} s')

    output = {
        'loss': loss_per_epoch,
        'train_acc': train_acc_per_epoch,
        'test_acc': test_acc_per_epoch,
        'total_acc': total_acc_per_epoch,
        'epoch_time': time_per_epoch,
        'exec_time': exec_time
    }

    return output

In [48]:
def fitness(individual, dataloader):
    # Create ConvNet instance with the provided individual configuration
    h = individual[0]
    k_0 = [[2]]
    k_n = k_0 + individual[1]

    print(individual)
    model = ConvNet(1, hidden_channels=h, num_classes=5, kernel_sizes=k_n)
    result = trainModel(model, dataloader, epochSize=10)
    return result['total_acc'][-1]

def define_fitness(population, dataloader):
    fitness_per_individual = []
    for individual in population:
        fitness_per_individual.append(fitness(individual, dataloader))

    return fitness_per_individual

def generate_population(population_size, layers, hidden_channels_range, kernel_size_range):
    np.random.seed(4)
    population = []
    for _ in range(population_size):
        # For each population,
        chromosome_h = []
        for _ in range(layers):
            # create chromosome for the number of hidden channels per layer e.g: (layers = 3) ->[256, 128, 64]
            filters = np.random.randint(
                hidden_channels_range[0], hidden_channels_range[1]+1
            )
            chromosome_h.append(filters)

        chromosome_k = []
        for _ in range(layers):
            # create chromosome for the kernel sizes per layer (2D list, each row represents layer)
            kernel_size = np.random.randint(
                kernel_size_range[0], kernel_size_range[1]+1, (1, 2)
            )
            chromosome_k.append(list(kernel_size[0]))
        population.append((chromosome_h, chromosome_k))

    return population

def DataFrame_Pop(pop_unstructured):
    population = []
    for i in range(len(pop_unstructured)):
        lst1 = np.array(pop_unstructured[i][0])
        lst2 = np.array(pop_unstructured[i][1])
        flattened = np.concatenate(([i+1], lst1, lst2.flatten()))

        population.append(flattened.tolist())
    col_name1 = [f"conv_h (Layer {i+1})" for i in range(len(pop_unstructured[0][0]))]
    col_name2 = [f"conv_k (Layer {i+1})" for i in range(len(pop_unstructured[0][0]))]
    col_name3 = [f"pool_k (Layer {i+1})" for i in range(len(pop_unstructured[0][0]))]
    col_name4 = list(itertools.chain(*zip(col_name2, col_name3)))

    cols = ['Individual'] + col_name1 + col_name4
    return pd.DataFrame(population, columns=cols, index=None)

In [6]:
# Define the directory path
data_dir = './processed_dataset'

# Create the ImageFolder dataset
dataset = datasets.DatasetFolder(data_dir, loader=torch.load, extensions=".pt")

# Split the dataset into training and testing sets
train_size = int(0.8 * len(dataset))  # 80% for training
test_size = len(dataset) - train_size  # Remaining 20% for testing

train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

# Set DataLoader
batchSize = 16  # Rule of thumb is to set to the power of 2. In this case 2^7
train_loader = DataLoader(train_dataset, batch_size=batchSize,shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batchSize, shuffle=False) # no need to shuffle test data

count_batch_train, count_batch_test = 0, 0
for xb, yb in train_loader:
  print(count_batch_train, end='', flush=True)
  count_batch_train += 1
  print("\r", end='', flush=True)
print(f'There are {count_batch_train} batches in train_loader')

for xb, yb in test_loader:
  print(count_batch_test, end='', flush=True)
  count_batch_test += 1
  print("\r", end='', flush=True)
print(f'There are {count_batch_test} batches in test_loader')

for i, j in train_loader:
    size = i.shape
    break

print(size)

There are 413 batches in train_loader
There are 104 batches in test_loader
torch.Size([16, 1, 224, 224])


In [50]:
pop = generate_population(5, 1, [4, 16], [1, 5])
DataFrame_Pop(pop)

Unnamed: 0,Individual,conv_h (Layer 1),conv_k (Layer 1),pool_k (Layer 1)
0,1,14,2,1
1,2,11,1,3
2,3,13,3,5
3,4,11,2,1
4,5,8,3,5


In [31]:
indv = pop[0]
fitness(indv, [train_loader, test_loader])

([6], [[3, 3]])
Epoch: 1 || Loss: 663.3225330114365 || Train Acc: 40.1515% || Test Acc: 41.8182% || Total Acc: 41.3182 || Epoch Time: 45.8957 s || Current Runtime: 45.8958 s
Epoch: 2 || Loss: 642.9703561067581 || Train Acc: 56.4848% || Test Acc: 56.9697% || Total Acc: 56.8242 || Epoch Time: 45.9603 s || Current Runtime: 91.8561 s
Epoch: 3 => 5.17s 22.034%

KeyboardInterrupt: 