In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
from sklearn.metrics import f1_score, accuracy_score
from block_differential_evolution import block_differential_evolution

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print("Running on device:", DEVICE.upper())

Running on device: CUDA


In [2]:
# Define a simple neural network class
class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.fc1 = nn.Linear(784, 128)
        self.relu1 = nn.ReLU()
        self.fc2 = nn.Linear(128, 64)
        self.relu2 = nn.ReLU()
        self.fc3 = nn.Linear(64, 10)

    def forward(self, x):
        x = x.view(x.size(0), -1)
        x = self.fc1(x)
        x = self.relu1(x)
        x = self.fc2(x)
        x = self.relu2(x)
        x = self.fc3(x)
        return x

In [3]:
# Load and preprocess the MNIST dataset
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])

train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transform, download=True)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

test_dataset = torchvision.datasets.MNIST(root='./data', train=False, transform=transform, download=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

In [4]:
# Step 4: Choose a loss function and optimizer
model = NeuralNetwork().to(DEVICE)
criterion = nn.CrossEntropyLoss()

In [5]:
params_sizes={}
for param_tensor in model.state_dict():
    params_sizes[param_tensor] = model.state_dict()[param_tensor].size()
    # print(param_tensor, "\t", model.state_dict()[param_tensor].size())

In [6]:
def set_weights(model_state, all_parameters):
    counted_params = 0
    for param_tensor in model_state:
        # print(param_tensor, counted_params, params_sizes[param_tensor].numel())
        if not 'num_batches_tracked' in param_tensor:
            model_state[param_tensor] = torch.tensor(all_parameters[counted_params:params_sizes[param_tensor].numel()+counted_params]).reshape(params_sizes[param_tensor])
            counted_params += params_sizes[param_tensor].numel()
    return model_state

In [7]:
def f1score_in_optimization(model, loader):
    running_loss = 0.0
    true_labels = []
    predicted_labels = []

    for batch_idx, (data, target) in enumerate(loader):
        data, target = data.to(DEVICE), target.to(DEVICE)
        output = model(data)
        loss = criterion(output, target)
        
        running_loss += loss.item()
        
        _, predicted = torch.max(output.data, 1)
        
        true_labels.extend(target.tolist())
        predicted_labels.extend(predicted.tolist())
        
    train_f1score = f1_score(true_labels, predicted_labels, average='weighted')
    return train_f1score

In [8]:
def fitness_func(parameters):
    #print(parameters[:20])
    model.load_state_dict(set_weights(model.state_dict(), parameters))
    model.to(DEVICE)
    model.eval();
    fitness = -1 * f1score_in_optimization(model, train_loader)
    print(fitness)
    return fitness

In [9]:
def get_parameters(model):
    model_state = model.state_dict()
    params=[]
    for p in model_state:
        if not 'num_batches_tracked' in p:
            params.append(model_state[p].view(-1))

    params = torch.cat(params).cpu().detach().numpy()
    return params

In [10]:
params = get_parameters(model)

In [11]:
def population_initializer(Network, popsize):
    model = Network().to(DEVICE)
    params = get_parameters(model)
    initial_population = np.array([params])
    for i in range(popsize-1):
        model = Network().to(DEVICE)
        #print(f1score_in_optimization(model, train_loader))
        params = get_parameters(model)
        #print(fitness_func(params))
        initial_population = np.concatenate([initial_population, [params]], axis=0)
    return initial_population

In [12]:
block_size = 10
max_iterations = 1000000
popsize = 100

initial_population = population_initializer(NeuralNetwork, popsize)
dimensions = initial_population.shape[1]
print(initial_population.shape)

(100, 109386)


In [13]:
initial_population.max(axis=0).shape

(109386,)

In [14]:
#for i in range(popsize):
print(fitness_func(initial_population[0]))
    

-0.019342378413953216
-0.019342378413953216


In [15]:
bounds = np.concatenate([initial_population.min(axis=0).reshape(-1, 1), initial_population.max(axis=0).reshape(-1,1)], axis=1)

In [16]:
result = block_differential_evolution(fitness_func, bounds, maxiter=max_iterations, block_size=block_size, save_link=f'ann_bde_b{block_size}_FE{max_iterations*100}_mnist_training_history.npz', popsize=popsize, callback=None, polish=False, disp=True, x0=params, updating='deferred', init=initial_population)

-0.017278213501007465
-0.01894681818410932
-0.019744070524912602
-0.017278213501007465
-0.014976374449608611
-0.017278213501007465
-0.014973398450038977
-0.030410258608663428
-0.017744106484115327
-0.02409832286994464
-0.019744070524912602
-0.05785402641199138
-0.019744070524912602
-0.01793957973255708
-0.017278213501007465
-0.014406011011845904
-0.01771021085266613
-0.017887863348951462
-0.017278213501007465
-0.05734506568315532
-0.01894681818410932
-0.017329122311480972
-0.019744070524912602
-0.017738841274416113
-0.017278213501007465
-0.01492070807052137
-0.01761820418951558
-0.022704010389350916
-0.04909697336657917
-0.021579742344017563
-0.02205180341532373
-0.017738841274416113
-0.01894681818410932
-0.01894681818410932
-0.023097243162549675
-0.017278213501007465
-0.02101765696410012
-0.05953510843668279
-0.01894681818410932
-0.017528713392592248
-0.017887863348951462
-0.019519148161389797
-0.017940939760132214
-0.022701629159050274
-0.014974314016234311
-0.055742385607384724
-0.0

KeyboardInterrupt: 

In [None]:
print(f'Train best fitness F1-score: {result.fun * -100:.2f}%)

In [None]:
model.load_state_dict(set_weights(model.state_dict(), result.x))

In [None]:
# Save the trained model and metrics
torch.save(model.state_dict(), f'ann_bde_b{block_size}_mnist_model.pth')

In [None]:
# Load the trained model
block_size=10
model.load_state_dict(torch.load(f'ann_bde_b{block_size}_mnist_model.pth'))

In [None]:
# Step 6: Test the network
model.eval()
true_labels = []
predicted_labels = []

with torch.no_grad():
    for data, target in test_loader:
        output = model(data)
        _, predicted = torch.max(output.data, 1)
        true_labels.extend(target.tolist())
        predicted_labels.extend(predicted.tolist())
        
    test_accuracy = accuracy_score(true_labels, predicted_labels)
    test_f1score = f1_score(true_labels, predicted_labels, average='weighted')

print(f'Test Accuracy: {test_accuracy*100:.2f}%, Test F1-score: {test_f1score*100:.2f}%')

In [None]:
# Plot the training loss and accuracy
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(train_loss_history, label='Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Trained ANN by Block DE optimizer')
plt.subplot(1, 2, 2)
plt.plot(train_f1score_history, label='Training F1-score')
plt.xlabel('Epoch')
plt.ylabel('Accuracy (%)')
plt.legend()

plt.show()