<a href="https://colab.research.google.com/github/praneshnikhar/NEAT/blob/main/NEAT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import random
import math
import copy
import pickle

In [3]:
class NodeGene:
    def __init__(self, node_id, layer, activation, bias):
        self.node_id = node_id
        self.layer = layer
        self.activation = activation
        self.bias = bias

    def copy(self):
        return NodeGene(self.node_id, self.layer, self.activation, self.bias)

class ConnectionGene:
    def __init__(self, in_node_id, out_node_id, weight, innovation_number, enabled=True):
        self.in_node_id = in_node_id
        self.out_node_id = out_node_id
        self.weight = weight
        self.innovation_number = innovation_number
        self.enabled = enabled

    def copy(self):
        return ConnectionGene(self.in_node_id, self.out_node_id, self.weight, self.innovation_number, self.enabled)

In [4]:
class Genome:
    def __init__(self, nodes, connections):
        self.nodes = {node.node_id: node.copy() for node in nodes}
        self.connections = [c.copy() for c in connections]
        self.fitness = 0

    def get_node(self, node_id):
        return self.nodes.get(node_id, None)

    def evaluate(self, input_values):


        node_values = {}
        input_nodes = [node for node in self.nodes.values() if node.layer == "input"]
        output_nodes = [node for node in self.nodes.values() if node.layer == "output"]

        for node, val in zip(input_nodes, input_values):
            node_values[node.node_id] = val

        sorted_nodes = [node for node in self.nodes.values() if node.layer != "input"]
        sorted_nodes.sort(key=lambda n: n.layer)

        for node in sorted_nodes:
            if node.node_id not in node_values:
                incoming_connections = [
                    conn for conn in self.connections
                    if conn.out_node_id == node.node_id and conn.enabled
                ]

                total_input = sum(
                    node_values.get(conn.in_node_id, 0) * conn.weight
                    for conn in incoming_connections
                ) + node.bias

                node_values[node.node_id] = node.activation(total_input)

        return [node_values.get(node.node_id, 0) for node in output_nodes]



In [5]:
class InnovationTracker:
    def __init__(self):
        self.current_innovation = 0
        self.connection_innovations = {}
        self.node_innovations = {}
        self.node_id_counter = 0

    def get_connection_innovation(self, in_node_id, out_node_id):
        key = (in_node_id, out_node_id)
        if key not in self.connection_innovations:
            self.connection_innovations[key] = self.current_innovation
            self.current_innovation += 1
        return self.connection_innovations[key]

    def get_node_innovation(self, old_connection_innovation):
        if old_connection_innovation not in self.node_innovations:
            node_id = self.node_id_counter
            self.node_id_counter += 1

            conn1_innov = self.current_innovation
            self.current_innovation += 1
            conn2_innov = self.current_innovation
            self.current_innovation += 1

            self.node_innovations[old_connection_innovation] = (node_id, conn1_innov, conn2_innov)
        return self.node_innovations[old_connection_innovation]

In [6]:
class Species:
    def __init__(self, representative):
        self.representative = representative
        self.members = [representative]
        self.adjusted_fitness = 0
        self.best_fitness = -float('inf')
        self.stagnant_generations = 0

    def add_member(self, genome):
        self.members.append(genome)

    def clear_members(self):
        self.members = []

    def update_fitness_stats(self):
        if not self.members:
            self.adjusted_fitness = 0
            return

        current_best_fitness = max(m.fitness for m in self.members)
        if current_best_fitness > self.best_fitness:
            self.best_fitness = current_best_fitness
            self.stagnant_generations = 0
        else:
            self.stagnant_generations += 1

        self.adjusted_fitness = sum(m.fitness for m in self.members) / len(self.members)
        self.representative = max(self.members, key=lambda g: g.fitness)


def compatibility_distance(genome1, genome2, c1=1.0, c2=1.0, c3=0.4):
    genes1 = {g.innovation_number: g for g in genome1.connections}
    genes2 = {g.innovation_number: g for g in genome2.connections}

    innovations1 = set(genes1.keys())
    innovations2 = set(genes2.keys())

    matching = innovations1 & innovations2
    disjoint = (innovations1 | innovations2) - matching

    excess = set()
    max_innov1 = max(innovations1) if innovations1 else 0
    max_innov2 = max(innovations2) if innovations2 else 0
    max_shared_innov = min(max_innov1, max_innov2)

    for innov in disjoint.copy():
        if innov > max_shared_innov:
            excess.add(innov)
            disjoint.remove(innov)

    avg_weight_diff = 0
    if matching:
        weight_diff = sum(abs(genes1[i].weight - genes2[i].weight) for i in matching)
        avg_weight_diff = weight_diff / len(matching)

    N = max(len(genome1.connections), len(genome2.connections))
    if N < 20: N = 1

    delta = (c1 * len(excess) / N) + (c2 * len(disjoint) / N) + (c3 * avg_weight_diff)
    return delta


class Speciator:
    def __init__(self, compatibility_threshold=3.0):
        self.species = []
        self.compatibility_threshold = compatibility_threshold

    def speciate(self, population):
        for s in self.species: s.clear_members()

        for genome in population:
            found_species = False
            for s in self.species:
                if compatibility_distance(genome, s.representative) < self.compatibility_threshold:
                    s.add_member(genome)
                    found_species = True
                    break
            if not found_species:
                self.species.append(Species(representative=genome))

        self.species = [s for s in self.species if s.members]
        for s in self.species: s.update_fitness_stats()

    def get_species(self):
        return self.species

In [7]:
def crossover(parent1, parent2):
    if parent2.fitness > parent1.fitness:
        parent1, parent2 = parent2, parent1

    offspring_connections = []
    parent1_genes = {g.innovation_number: g for g in parent1.connections}
    parent2_genes = {g.innovation_number: g for g in parent2.connections}
    all_innovs = set(parent1_genes.keys()) | set(parent2_genes.keys())

    for innov in sorted(all_innovs):
        gene1 = parent1_genes.get(innov)
        gene2 = parent2_genes.get(innov)

        if gene1 and gene2:
            chosen_gene = random.choice([gene1, gene2])
            offspring_connections.append(chosen_gene.copy())
        elif gene1:
            offspring_connections.append(gene1.copy())


    offspring_nodes = set()
    for conn in offspring_connections:
        offspring_nodes.add(parent1.get_node(conn.in_node_id))
        offspring_nodes.add(parent1.get_node(conn.out_node_id))


        if parent1.get_node(conn.in_node_id) is None:
            offspring_nodes.add(parent2.get_node(conn.in_node_id))
        if parent1.get_node(conn.out_node_id) is None:
            offspring_nodes.add(parent2.get_node(conn.out_node_id))

    input_nodes = [n for n in parent1.nodes.values() if n.layer == "input"]
    output_nodes = [n for n in parent1.nodes.values() if n.layer == "output"]
    offspring_nodes.update(input_nodes)
    offspring_nodes.update(output_nodes)

    return Genome(list(offspring_nodes), offspring_connections)

In [8]:
def _path_exists(self, start_node_id, end_node_id, checked_nodes=None):
    if checked_nodes is None: checked_nodes = set()
    if start_node_id == end_node_id: return True
    checked_nodes.add(start_node_id)

    for conn in self.connections:
        if conn.enabled and conn.in_node_id == start_node_id:
            if conn.out_node_id not in checked_nodes:
                if self._path_exists(conn.out_node_id, end_node_id, checked_nodes):
                    return True
    return False

def mutate_add_connection(self, innov):
    node_list = list(self.nodes.values())
    max_tries = 10
    for _ in range(max_tries):
        node1, node2 = random.sample(node_list, 2)

        if (node1.layer == "output" or (node1.layer == "hidden" and node2.layer == "input")):
            node1, node2 = node2, node1

        if node1 == node2 or node1.layer == "output" or node2.layer == "input": continue
        if self._path_exists(node2.node_id, node1.node_id): continue
        if any(c.in_node_id == node1.node_id and c.out_node_id == node2.node_id for c in self.connections): continue

        innov_num = innov.get_connection_innovation(node1.node_id, node2.node_id)
        new_conn = ConnectionGene(node1.node_id, node2.node_id, random.uniform(-1, 1), innov_num, True)
        self.connections.append(new_conn)
        return

def mutate_add_node(self, innov):
    enabled_conn = [c for c in self.connections if c.enabled]
    if not enabled_conn: return

    connection = random.choice(enabled_conn)
    connection.enabled = False

    node_id, conn1_innov, conn2_innov = innov.get_node_innovation(connection.innovation_number)

    in_node = self.get_node(connection.in_node_id)
    out_node = self.get_node(connection.out_node_id)

    new_node = NodeGene(node_id, "hidden", in_node.activation, random.uniform(-1,1))
    conn1 = ConnectionGene(connection.in_node_id, node_id, 1, conn1_innov, True)
    conn2 = ConnectionGene(node_id, connection.out_node_id, connection.weight, conn2_innov, True)

    self.nodes[node_id] = new_node
    self.connections.extend([conn1, conn2])

def mutate_weights(self, rate=0.8, power=0.5):
    for conn in self.connections:
        if random.random() < rate:
            if random.random() < 0.1: conn.weight = random.uniform(-1, 1)
            else: conn.weight += random.gauss(0, power)

def mutate_bias(self, rate=0.7, power=0.5):
    for node in self.nodes.values():
        if node.layer != "input" and random.random() < rate:
            if random.random() < 0.1: node.bias = random.uniform(-1, 1)
            else: node.bias += random.gauss(0, power)

def mutate(self, innov, conn_mutation_rate=0.05, node_mutation_rate=0.03):
    self.mutate_weights()
    self.mutate_bias()
    if random.random() < conn_mutation_rate: self.mutate_add_connection(innov)
    if random.random() < node_mutation_rate: self.mutate_add_node(innov)

Genome.mutate_add_connection = mutate_add_connection
Genome.mutate_add_node = mutate_add_node
Genome.mutate_weights = mutate_weights
Genome.mutate_bias = mutate_bias
Genome.mutate = mutate
Genome._path_exists = _path_exists

In [9]:
def sigmoid(x):
    return 1 / (1 + math.exp(-x))

def create_initial_genome(num_inputs, num_outputs, innov):
    input_nodes = [NodeGene(i, "input", lambda x: x, 0) for i in range(num_inputs)]
    output_nodes = [NodeGene(num_inputs + i, "output", sigmoid, random.uniform(-1, 1)) for i in range(num_outputs)]

    innov.node_id_counter = num_inputs + num_outputs

    connections = []
    for i in range(num_inputs):
        for j in range(num_outputs):
            innov_num = innov.get_connection_innovation(i, num_inputs + j)
            weight = random.uniform(-1, 1)
            connections.append(ConnectionGene(i, num_inputs + j, weight, innov_num, True))

    return Genome(input_nodes + output_nodes, connections)

def create_initial_population(size, num_inputs, num_outputs, innov):
    return [create_initial_genome(num_inputs, num_outputs, innov) for _ in range(size)]

def reproduce_species(species, offspring_count, innov):
    offspring = []

    if species.members and offspring_count > 0:
        best_genome = max(species.members, key=lambda g: g.fitness)
        offspring.append(best_genome.copy())
        offspring_count -= 1

    for _ in range(offspring_count):
        parent1 = random.choice(species.members)
        parent2 = random.choice(species.members)

        child = crossover(parent1, parent2)
        child.mutate(innov)
        offspring.append(child)

    return offspring

In [10]:
def evolution(population, fitness_scores, speciator, innov, stagnation_limit=15):
    for genome, fitness in zip(population, fitness_scores):
        genome.fitness = fitness

    # Speciate population
    speciator.speciate(population)
    species_list = speciator.get_species()
    species_list.sort(key=lambda s: s.best_fitness, reverse=True)

    # Remove stagnant species (and keep the best one)
    surviving_species = []
    if species_list:
        surviving_species.append(species_list[0])
        for s in species_list[1:]:
            if s.stagnant_generations < stagnation_limit:
                surviving_species.append(s)

    species_list = surviving_species
    total_adjusted_fitness = sum(s.adjusted_fitness for s in species_list)

    new_population = []

    #Reproduction and Elitism
    if species_list:
        remaining_offspring = len(population)
        for s in species_list:
            if s.members and s.best_fitness > -float('inf'):
                best_genome = max(s.members, key=lambda g: g.fitness)
                new_population.append(best_genome.copy())
                remaining_offspring -= 1

        #Allocate the remaining offspring
        for s in species_list:
            if total_adjusted_fitness > 0:
                offspring_count = int((s.adjusted_fitness / total_adjusted_fitness) * remaining_offspring)
            else:
                offspring_count = remaining_offspring // len(species_list)

            new_population.extend(reproduce_species(s, offspring_count, innov))

    # Fill up the rest with the best species if needed
    while len(new_population) < len(population):
        best_species = max(species_list, key=lambda s: s.adjusted_fitness) if species_list else None
        if best_species:
            new_population.extend(reproduce_species(best_species, 1, innov))
        else: # Handle edge case of no species
            new_population.append(create_initial_genome(len(population[0].nodes)-1, len(population[0].nodes)-1, innov))

    return new_population[:len(population)]

In [11]:
def fitness_xor(genome):
    X = [[0, 0], [0, 1], [1, 0], [1, 1]]
    y = [0, 1, 1, 0]
    total_error = 0
    for i in range(len(X)):
        try:
            output = genome.evaluate(X[i])
            if output:
                error = abs(output[0] - y[i])
                total_error += error
            else:
                error = y[i]
                total_error += error
        except Exception:
            return 0  # Return 0 fitness on error
    fitness = 4 - total_error
    return max(0, fitness)

def run_neat_xor(generations=50, pop_size=50, target_fitness=3.9, speciator_threshold=3.0):
    NUM_INPUTS = 2
    NUM_OUTPUTS = 1

    # Initialize components
    innov = InnovationTracker()
    speciator = Speciator(speciator_threshold)
    population = create_initial_population(pop_size, NUM_INPUTS, NUM_OUTPUTS, innov)

    # Stats tracking
    best_fitness_history = []

    # Main loop
    for gen in range(generations):
        fitness_scores = [fitness_xor(g) for g in population]
        best_fitness = max(fitness_scores)
        best_genome = population[fitness_scores.index(best_fitness)]
        best_fitness_history.append(best_fitness)

        print(f"Generation {gen}: Best Fitness = {best_fitness:.4f}, Pop Size = {len(population)}, Species = {len(speciator.get_species())}")

        if best_fitness >= target_fitness:
            print(f"Problem solved in {gen} generations with fitness {best_fitness:.4f}")
            with open("best_xor_genome.pkl", "wb") as f:
                pickle.dump(best_genome, f)
            return best_genome, best_fitness_history

        population = evolution(population, fitness_scores, speciator, innov)

    print(f"Did not solve XOR in {generations} generations. Best fitness: {max(best_fitness_history):.4f}")
    return None, best_fitness_history



In [15]:
best_genome, fitness_history = run_neat_xor()

Generation 0: Best Fitness = 2.0623, Pop Size = 50, Species = 0


AttributeError: 'Genome' object has no attribute 'copy'