# N-Queens Problem

Here is a solution I put together for the N-Queens problem.  Like Travis's solution [https://github.com/aiml-satx/n-queen-evolutionary-algo/blob/master/tvaught/n-queens-exploration.ipynb] I use an evolutionary algorithm(EA) but I also wanted to see how an EA would handle removing the unique row and column constraint (A.K.A the N-rooks problem) so I allow queens to occupy any row/column regardless of whether there is already a queen in the row/column.  

In [1]:
import random
from itertools import product
from numpy.random import choice
import math
import datetime
import os
import shutil
import matplotlib.pyplot as plt
import collections

Since queens can occupy any sqaure on the board, I represent a creature (layout of N queens on the board) as an array of N tuples representing the x,y coordinates of each queen.

In [2]:
# Size of the board (NxN) and the number of queens to be placed
N = 8

# Maximum number of generations to run before exiting if a valid solution is not found
MAX_GENERATIONS = 10000

# Number of creatures that exist in each generation
POPULATION_SIZE = 100

# A way to modify how many mutations occurr in every generation.  
# Mutations in this solution are a nudge of either an x or y component of any queen by 1 or -1 (should make sense further down in the code)
MUTATIONS_PER_GENERATION = 5

# Probability that any x or y coordinate for any queen in a generation will be adjusted +1 or -1
# To calculate this we take the desired number of mutations divided by the total number of coordinates that exist
# in any generation (population * N * 2) where the 2 represents an x and y coordinate.  We then divide by 2 again
# to account for the two sided distribution representing +1 and -1.  
# Again hopefully this will make more sense further down in the code.
MUTATION_RATE = MUTATIONS_PER_GENERATION / (POPULATION_SIZE * N * 2) / 2

print(f"Run with N={N}, Generations={MAX_GENERATIONS}, Pop size={POPULATION_SIZE}, Mutations/gen={MUTATIONS_PER_GENERATION} (mutate rate={MUTATION_RATE}")

Run with N=8, Generations=10000, Pop size=100, Mutations/gen=5 (mutate rate=0.0015625


## Diagnostic Utilities

The following 3 classes are utilities to log some information about how the algorithm is progressing.

In [3]:
class Passthrough:
    '''
    A "filter" that simply returns back the current value (no filtering)
    '''
    def __init__(self):
        self.value = None

    def add_value(self, value):
        self.value = value

    def get_value(self):
        return self.value

    def apply_filter(self, value):
        self.add_value(value)
        return self.get_value()


class ExponentialAvgFilter:
    '''
    Performs an exponential moving average calculation on the values passed in
    '''
    def __init__(self, alpha):
        self.alpha = alpha
        self.value = None

    def add_value(self, value):
        if self.value is None:
            # If this is the first iteration of the algorithm simply seed the value with the value to avoid warm up time
            self.value = value
        else:
            self.value = self.value * (1-self.alpha) + value * self.alpha

    def get_value(self):
        return self.value

    def apply_filter(self, value):
        self.add_value(value)
        return self.get_value()


class ResultWriter:
    '''
    Where the real (diagnostic) magic happens.  Constructor will make a copy of the current file being run so we have 
    the ability to experiment with code and parameter changes and can still go back to see the source for any given 
    run.  It also periodically checkpoints the current data in case we need to kill the run we'll still have some data.
    '''
    def __init__(self, checkpoint_rate=250):
        self.run_id = f"{datetime.datetime.now():%y%m%d_%H%M%S}"
        self.generation = 0
        self.elements = []
        self.checkpoint_rate = checkpoint_rate
        self.filters = [Passthrough(), Passthrough(), Passthrough(), Passthrough(),
                        ExponentialAvgFilter(.1), ExponentialAvgFilter(.1), ExponentialAvgFilter(.1), ExponentialAvgFilter(.1)]
        self.winner_found = False
        self._initialize()

    def _initialize(self):
        os.makedirs(self._get_dir(), exist_ok=True)
        shutil.copy2("queens_ea.ipynb", self._get_dir())

    def apply_filters(self, datas):
        return [filter.apply_filter(data) for data, filter in zip(datas, self.filters)]

    def add_entry(self, datas):
        # NOTE this is pretty ugly as it requires values to be provided in a predetermined order but it was late
        # and I ran out of time to clean this up further.
        self.elements.append([self.generation] + self.apply_filters(datas + datas))
        self.generation += 1
        if self.generation % self.checkpoint_rate == 0:
            self.flush()

    def log_winner(self, creature):
        with open(f"{self._get_dir()}winner_{self.generation}.txt", "w") as f:
            f.write(str(creature))
        self.winner_found = True

    def _get_dir(self):
        return f"results/{self.run_id}/"

    def flush(self):
        with open(f"{self._get_dir()}data.csv", "w") as f:
            for e in self.elements:
                f.write(f"{','.join(str(el) for el in e)}\n")

        x = [x[0] for x in self.elements]
        y_fit_raw = [x[1] for x in self.elements]
        y_div_raw = [x[2] for x in self.elements]
        y_fit_min_raw = [x[3] for x in self.elements]
        y_fit_max_raw = [x[4] for x in self.elements]
        y_fit_ema = [x[5] for x in self.elements]
        y_div_ema = [x[6] for x in self.elements]
        y_fit_min_ema = [x[7] for x in self.elements]
        y_fit_max_ema = [x[8] for x in self.elements]
        fig, ax1 = plt.subplots()
        ax1.set_xlabel("Generations")
        ax1.set_ylabel("Average fitness", color="tab:blue")
        ax2 = ax1.twinx()
        ax2.set_ylabel("Diversity", color="tab:red")

        ax1.plot(x, y_fit_raw, "c+")
        ax1.plot(x, y_fit_min_raw, "c.")
        ax1.plot(x, y_fit_max_raw, "c.")

        ax2.plot(x, y_div_raw, "r+")

        ax1.plot(x, y_fit_ema, "b")
        ax1.plot(x, y_fit_min_ema, "b")
        ax1.plot(x, y_fit_max_ema, "b")

        ax2.plot(x, y_div_ema, "m")
        fig.set_size_inches(12, 8)
        fig.savefig(f"{self._get_dir()}plot.png")
        fig.clf()

## Fitness Function
Here we have the fitness function I used.  It is very similar to the solution Travis and Mark came up with for the diagonals but also includes fitness of rows and columns.  Also, I initially tried using a sum of the 4 fitness components (rows, columns, positive diagonals, and negative diagonals) but ended up finding that a normalized product of each seemed to work better but this could probably use some further investigation to see if this truly results in better performance or not.

In [4]:
def measure_fitness(creature, rw):
    unique_rows = len(set([x[0] for x in creature]))
    unique_cols = len(set([x[1] for x in creature]))
    unique_diag1 = len(set([x[0] + x[1] for x in creature]))
    unique_diag2 = len(set([x[0] - x[1] for x in creature]))
    if N*4 == unique_rows + unique_cols + unique_diag1 + unique_diag2:
        print(f"Found a winner: {creature}")
        rw.log_winner(creature)
    # return math.pow(unique_rows + unique_cols + unique_diag1 + unique_diag2, 2)
    return (unique_rows/N) * (unique_cols/N) * (unique_diag1/N) * (unique_diag2/N)


## Generate Children

`next_gen_creature` chooses two parents randomly using fitness to weight

`procreate` then takes these 2 parents and creates a child with mutations.  It combines the queen locations for each parent and then selects N child queen locations as a weighted random selection where the weights are based on how frequently the queen location appears in each parent (a location that appears in both parents will have a higher liklihood of being selected for the child).  Finally it applies any mutations and returns the child.

In [5]:
def procreate(p1, p2):
    gene_counts = collections.Counter(p1 + p2)
    genes = list(gene_counts.keys())
    gene_probs = [p/(N*2) for p in gene_counts.values()]
    idxs = choice(range(len(genes)), size=N, replace=False, p=gene_probs)
    child_genes = [genes[i] for i in idxs]

    # Mutate child genes
    # Generate an array of N tuples representing x,y mutations to apply.  This is where the 2-sided distribution
    # comes into play because we select one of -1, 0, or +1 for the mutation.
    mutations = [choice([-1, 0, 1], size=2, replace=True, p=[MUTATION_RATE, 1-2*MUTATION_RATE, MUTATION_RATE]) for _ in range(N)]
    
    # We need a way to avoid applying a mutation that will create an unviable child (coordinates are off the board)
    clamp = lambda x: min(max(x, 0), N-1)
    mutated_child = [(clamp(g[0] + m[0]), clamp(g[1] + m[1])) for g, m in zip(child_genes, mutations)]
    if len(set(mutated_child)) != N:
        # Occasionally a mutation will result in 2 queens in the same square, if this happens simply return the
        # unmutated child
        return child_genes
    else:
        return mutated_child


def next_gen_creature(parents, fitness_probabilities):
    chosen_parents = choice(range(POPULATION_SIZE), size=2, replace=True, p = fitness_probabilities)
    p1 = parents[chosen_parents[0]]
    p2 = parents[chosen_parents[1]]
    return procreate(p1, p2)


In [6]:
def calculate_diversity(population):
    '''
    A diagnostic measure to calculate how much diversity exists in the population by computing the number of
    unique queen locations present in the entire population
    '''
    c = []
    for p in population:
        c += p

    return len(set(c))

## Run the EA

In [16]:
# Fill our first generation with random queen locations for the specified number of creatures
population = [random.sample(list(product(range(N), repeat=2)), N) for i in range(POPULATION_SIZE)]

rw = ResultWriter()
for generation in range(MAX_GENERATIONS):
    # assess fitness of each generation
    fitness = [measure_fitness(c, rw) for c in population]

    # Compute and log some diagnostic information about the current generation
    fit_min = 0
    fit_sum = sum(f - fit_min for f in fitness)
    fitness_probabilities = [(f-fit_min)/fit_sum for f in fitness]
    rw.add_entry([fit_sum/POPULATION_SIZE, calculate_diversity(population), min(fitness), max(fitness)])

    if rw.winner_found:
        print(f"Winner found in generation {generation}")
        break

    # generate next generation    
    new_population = [next_gen_creature(population, fitness_probabilities) for i in range(POPULATION_SIZE)]
    population = new_population

# Flush any remaining results before exiting
rw.flush()

Found a winner: [(0, 7), (6, 6), (2, 0), (4, 5), (5, 1), (3, 2), (1, 3), (7, 4)]
Winner found in generation 648


<Figure size 864x576 with 0 Axes>

<Figure size 864x576 with 0 Axes>

<Figure size 864x576 with 0 Axes>

## Results

Some data on number of generations required to solve for N=8:

[194, 1945, 718, 784, 353, 181, 2224, 4126, 678, 648]

For an average of **1185.1** generations and at 100 creatures per generations comes out to **118,510** evalutations.