# MAIN NEAT CODE IS HERE, WHERE A WINNER IS IDENTIFIED

In [None]:
# Imports
import numpy as np
import pandas as pd

import sklearn
from sklearn.model_selection import train_test_split

import matplotlib.pyplot as plt
import seaborn as sns


In [None]:
np.random.seed(42)

In [None]:
#Training dataset
train_df = pd.read_csv('final_training.csv')
X = train_df.copy().drop(columns=['total_cases', 'reanalysis_precip_amt_kg_per_m2'])
y = train_df['total_cases']

In [None]:
#Out of distribution dataset
taiwan = pd.read_csv('final_taiwan.csv')
Xout = taiwan.copy().drop(columns=['total_cases'])
yout = taiwan['total_cases']

In [None]:
#Train, Test, Validation Split
xTV, xTest, yTV, yTest = train_test_split(X, y, test_size=0.2, random_state=42)

xTrain, xValid, yTrain, yValid = train_test_split(xTV, yTV, test_size=0.2, random_state=42)

In [None]:
#Import NEAT
import neat
import os

In [None]:
#Importing Default settingsd (Including weight, skip, sparse connection type of mutations)
#Config file attached
config_path = 'config.ini'

# Check if the configuration file exists
if not os.path.exists(config_path):
    print(f"Configuration file {config_path} not found.")
    exit(1)

# Load configuration
config = neat.Config(neat.DefaultGenome, neat.DefaultReproduction,
                     neat.DefaultSpeciesSet, neat.DefaultStagnation,
                     config_path)
#Ready up the Algorithm
p = neat.Population(config)

#Display each generation result
p.add_reporter(neat.StdOutReporter(True))
stats = neat.StatisticsReporter()
p.add_reporter(stats)
p.add_reporter(neat.Checkpointer(5))

In [None]:
generation = []
#The Default Evaluation Function
def eval_genomes_default(genomes, config):
    for genome_id, genome in genomes:
        net = neat.nn.FeedForwardNetwork.create(genome, config)
        absDiff = 0
        for xi,xo in zip(xTV.values, yTV):
            output = net.activate(xi)
            absDiff += abs(round(output[0]) - xo)
        #Negative MAE to evaluate fairness (1120 as the train set has 1120 rows)
        genome.fitness = -(absDiff)/1120

In [None]:
import warnings
warnings.filterwarnings('ignore')

#Running the evolutionary algorithm
winner = p.run(eval_genomes_default, 1000)


 ****** Running generation 0 ****** 

Population's average fitness: -273.21745 stdev: 375.97745
Best fitness: -19.84554 - size: (1, 7) - species 1 - id 1695
Average adjusted fitness: 0.898
Mean genetic distance 1.109, standard deviation 0.378
Population of 7500 members in 1 species:
   ID   age  size  fitness  adj fit  stag
     1    0  7500    -19.8    0.898     0
Total extinctions: 0
Generation time: 37.275 sec

 ****** Running generation 1 ****** 

Population's average fitness: -98.33215 stdev: 192.69137
Best fitness: -19.61518 - size: (1, 6) - species 1 - id 11116
Average adjusted fitness: 0.958
Mean genetic distance 1.411, standard deviation 0.697
Population of 7500 members in 2 species:
   ID   age  size  fitness  adj fit  stag
     1    1  7450    -19.6    0.958     0
     2    0    50       --       --     0
Total extinctions: 0
Generation time: 27.182 sec (32.229 average)

 ****** Running generation 2 ****** 

Population's average fitness: -89.51888 stdev: 183.52191
Best fitn

In [None]:
#Test set to see the performance
net = neat.nn.FeedForwardNetwork.create(winner, config)
absDiff = 0
for xi,xo in zip(xTest.values, yTest):
    output = net.activate(xi)
    #genome.fitness -= (output[0] - xo[0]) ** 2
    absDiff += abs(round(output[0]) - xo)
print(f'Test set MAE: {(absDiff)/len(yTest):.2f}')

Test set MAE: 19.05


In [None]:
#Out of distribution test
absDiff = 0
for xi,xo in zip(Xout.values, yout):
    output = net.activate(xi)
    #genome.fitness -= (output[0] - xo[0]) ** 2
    absDiff += abs(round(output[0]) - xo)
print(f'OoD set MAE: {(absDiff)/len(yout):.2f}')

OoD set MAE: 87.49


In [None]:
#Test 2: Only evolving the topology (Hidden layer, Hidden Nodes)
config_path_Connected = 'config_FullyConnected.ini'

# Check if the configuration file exists
if not os.path.exists(config_path_Connected):
    print(f"Configuration file {config_path_Connected} not found.")
    exit(1)

# Load configuration
config2 = neat.Config(neat.DefaultGenome, neat.DefaultReproduction,
                     neat.DefaultSpeciesSet, neat.DefaultStagnation,
                     config_path_Connected)

p2 = neat.Population(config2)
p2.add_reporter(neat.StdOutReporter(True))
stats = neat.StatisticsReporter()
p2.add_reporter(stats)
p2.add_reporter(neat.Checkpointer(5))

In [None]:
#Dynamic FFNN class that creates x number of hidden layers and y number of hidden unit per layer based on the evolution changes
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import mean_absolute_error
from neat.graphs import feed_forward_layers

#Main Neural Network Class
class FullyConnectedFFNN(nn.Module):
    def __init__(self, inputs, outputs, layers):
        super(FullyConnectedFFNN, self).__init__()
        self.input_nodes = inputs
        self.output_nodes = outputs
        self.layers = layers
        self.NeuralNetwork = nn.ModuleList()

        #Following set CREATES the Neural Network by adding layer by layer
        #No Hidden Layer Cases
        if not layers or len(layers)==1:
            self.NeuralNetwork.append(nn.Linear(len(inputs), len(outputs)))
        else:
            #Else adding input and 1st hidden layer
            self.NeuralNetwork.append(nn.Linear(len(inputs), len(layers[0])))

            for i in range(len(layers)-2):
                #Inputting hidden layers
                self.NeuralNetwork.append(nn.Linear(len(layers[i]), len(layers[i+1])))

            #Last hidden layer to output layer
            self.NeuralNetwork.append(nn.Linear(len(layers[-2]), len(layers[-1])))

            #Edge cases handling
            if len(layers[-1]) != 1:
                self.NeuralNetwork.append(nn.Linear(len(layers[-1]), len(outputs)))

    #Feed forward with reLU as the activation function
    def forward(self, x):
        for layer in self.NeuralNetwork[:-1]:
            x = torch.relu(layer(x))

        x = self.NeuralNetwork[-1](x)

        return x

    #NEAT's method in converting the genome object into readable a different object that can be read into the neural network information
    @staticmethod
    def create(genome, config):

        # Gather expressed connections.
        connections = [cg.key for cg in genome.connections.values() if cg.enabled]

        layers = feed_forward_layers(config.genome_config.input_keys, config.genome_config.output_keys, connections)

        node_evals = []
        for layer in layers:
            for node in layer:
                inputs = []
                for conn_key in connections:
                    inode, onode = conn_key
                    if onode == node:
                        cg = genome.connections[conn_key]
                        inputs.append((inode, cg.weight))

                ng = genome.nodes[node]
                aggregation_function = config.genome_config.aggregation_function_defs.get(ng.aggregation)
                activation_function = config.genome_config.activation_defs.get(ng.activation)
                node_evals.append((node, activation_function, aggregation_function, ng.bias, ng.response, inputs))

        return FullyConnectedFFNN(config.genome_config.input_keys, config.genome_config.output_keys, layers)

#Fitness evaluation, after weights are trained through back-propagation
def eval_genome_FC(genomes, config):
    for genome_id, genome in genomes:
        model = FullyConnectedFFNN.create(genome,config)
        criterion = nn.L1Loss()  # Mean Absolute Error loss
        optimizer = optim.Adam(model.parameters())  # Adam optimizer

        X_train_tensor = torch.tensor(xTrain.to_numpy(dtype=float), dtype=torch.float32)
        y_train_tensor = torch.tensor(yTrain, dtype=torch.float32)

        # Train the model, 30 epochs to avoid overfitting the training set
        for epoch in range(30):
            optimizer.zero_grad()  # Zero the gradients
            output = model(X_train_tensor)  # Forward pass

            loss = criterion(output, y_train_tensor)  # Calculate loss

            loss.backward()  # Backward pass
            optimizer.step()  # Update weights

        y_pred = model(torch.tensor(xValid.to_numpy(dtype=float), dtype=torch.float32)).detach().numpy()
        mae = -mean_absolute_error(np.array(yValid), y_pred)
        #Storing the validation set
        genome.fitness = mae


In [None]:
import warnings

warnings.filterwarnings('ignore')

#Running the algorithm
winnerFC = p2.run(eval_genome_FC, 100)


 ****** Running generation 0 ****** 

Population's average fitness: -73.68719 stdev: 73.13322
Best fitness: -17.78289 - size: (1, 7) - species 1 - id 100

Best individual in generation 0 meets fitness threshold - complexity: (1, 7)


In [None]:
model = FullyConnectedFFNN.create(winnerFC,config)
criterion = nn.L1Loss()  # Mean Absolute Error loss
optimizer = optim.Adam(model.parameters())  # Adam optimizer

X_train_tensor = torch.tensor(xTest.to_numpy(dtype=float), dtype=torch.float32)
y_train_tensor = torch.tensor(np.array(yTest), dtype=torch.float32)

# Train the model, 30 epochs to avoid overfitting the training set
for epoch in range(30):
    optimizer.zero_grad()  # Zero the gradients
    output = model(X_train_tensor)  # Forward pass

    loss = criterion(output, y_train_tensor)  # Calculate loss

    loss.backward()  # Backward pass
    optimizer.step()  # Update weights

y_pred = model(torch.tensor(xTest.to_numpy(dtype=float), dtype=torch.float32)).detach().numpy()
mae = -mean_absolute_error(np.array(yTest), y_pred)
#Displaying the test set performance
print(-mae)

23.72659775699888


In [None]:
#Out of Distribution Test
y_pred = model(torch.tensor(Xout.to_numpy(dtype=float), dtype=torch.float32)).detach().numpy()
mae = mean_absolute_error(np.array(yout), y_pred)
mae

89.48397169526167