In [1]:
import random

In [2]:
class Maze:
    
    def __init__(self, maze):
        self.maze = maze
        self.start_position = (-1, -1)
    
    def get_start_position(self):
        # Check if we've found start position
        if self.start_position[0] != -1 and self.start_position[1] != -1:
            return self.start_position
        
        # Default return value
        start_position = (0, 0)
        
        # loop over rows
        for row_index in xrange(len(self.maze)):
            # Loop over columns
            for col_index in xrange(len(self.maze[0])):
                # 2 is the type for start position
                if self.maze[row_index][col_index] == 2:
                    self.start_position = (col_index, row_index)
                    return self.start_position
        
        return start_position
    
    def get_position_value(self, x, y):
        if x < 0 or y < 0 or x >= len(self.maze[0]) or y >= len(self.maze):
            return 1
        return self.maze[y][x]
    
    def is_wall(self, x, y):
        return self.get_position_value(x, y) == 1
    
    def get_max_x(self):
        return len(self.maze[0]) - 1
    
    def get_max_y(self):
        return len(self.maze) - 1
    
    def score_route(self, route):
        score = 0
        visited = [[False for _ in range(self.get_max_x() + 1)] for _ in range(self.get_max_y() + 1)]
        
        # Loop over route and score each move
        for step in route:
            if self.maze[step[1]][step[0]] == 3 and visited[step[1]][step[0]] is False:
                # Increase score for correct move
                score += 1
                # Remove reward
                visited[step[1]][step[0]] = True
        
        return score

In [3]:
class Direction:
    NORTH, EAST, SOUTH, WEST = range(4)

class Robot:
    
    def __init__(self, sensor_actions, maze, max_moves):
        self.sensor_actions = self.calc_sensor_actions(sensor_actions)
        self.maze = maze
        start_pos = self.maze.get_start_position()
        self.x_position = start_pos[0]
        self.y_position = start_pos[1]
        self.sensor_val = -1
        self.heading = Direction.EAST
        self.max_moves = max_moves
        self.moves = 0
        self.route = [start_pos]
    
    # Runs the robot's actions based on sensor inputs
    def run(self):
        while True:
            self.moves += 1
            
            # Break if the robot stops moving
            if self.get_next_action() == 0:
                return
            
            # Break if we reach the goal
            if self.maze.get_position_value(self.x_position, self.y_position) == 4:
                return
            
            # Break if we reach a maximum number of moves
            if self.moves > self.max_moves:
                return
            
            # Run action
            self.make_next_action()
    
    # Map robot's sensor data to actions from binary string
    def calc_sensor_actions(self, sensor_actions_str):
        # How many actions are there?
        num_actions = len(sensor_actions_str) / 2
        sensor_actions = []
        
        # Loop through actions
        for sensor_value in xrange(num_actions):
            # Get sensor action
            sensor_action = 0
            if sensor_actions_str[sensor_value * 2] == 1:
                sensor_action += 2
            if sensor_actions_str[(sensor_value * 2) + 1] == 1:
                sensor_action += 1
            
            # Add to sensor-action map
            sensor_actions.append(sensor_action)
        
        return sensor_actions
    
    # Runs the next action
    def make_next_action(self):
        # If move forward
        if self.get_next_action() == 1:
            current_x = self.x_position
            current_y = self.y_position
            
            # Move depending on current direction
            if Direction.NORTH == self.heading:
                self.y_position += -1
                if self.y_position < 0:
                    self.y_position = 0
            elif Direction.EAST == self.heading:
                self.x_position += 1
                if self.x_position > self.maze.get_max_x():
                    self.x_position = self.maze.get_max_x()
            elif Direction.SOUTH == self.heading:
                self.y_position += 1
                if self.y_position > self.maze.get_max_y():
                    self.y_position = self.maze.get_max_y()
            elif Direction.WEST == self.heading:
                self.x_position += -1
                if self.x_position < 0:
                    self.x_position = 0
            
            # We can't move here
            if self.maze.is_wall(self.x_position, self.y_position) is True:
                self.x_position = current_x
                self.y_position = current_y
            else:
                if current_x != self.x_position or current_y != self.y_position:
                    self.route.append((self.x_position, self.y_position))
        # Move clockwise
        elif self.get_next_action() == 2:
            if Direction.NORTH == self.heading:
                self.heading = Direction.EAST
            elif Direction.EAST == self.heading:
                self.heading = Direction.SOUTH
            elif Direction.SOUTH == self.heading:
                self.heading = Direction.WEST
            elif Direction.WEST == self.heading:
                self.heading = Direction.NORTH
        # Move anti-clockwise
        elif self.get_next_action() == 3:
            if Direction.NORTH == self.heading:
                self.heading = Direction.WEST
            elif Direction.EAST == self.heading:
                self.heading = Direction.NORTH
            elif Direction.SOUTH == self.heading:
                self.heading = Direction.EAST
            elif Direction.WEST == self.heading:
                self.heading = Direction.SOUTH
        
        # Reset sensor value
        self.sensor_val = -1
    
    def get_next_action(self):
        return self.sensor_actions[self.get_sensor_value()]
    
    # Get sensor value
    def get_sensor_value(self):
        # If sensor value has already been calculated
        if self.sensor_val > -1:
            return self.sensor_val
        
        front_sensor = front_left_sensor = front_right_sensor = left_sensor = right_sensor = back_sensor = False
        
        # Find which sensors have been activated
        if self.heading == Direction.NORTH:
            front_sensor = self.maze.is_wall(self.x_position, self.y_position - 1)
            front_left_sensor = self.maze.is_wall(self.x_position - 1, self.y_position - 1)
            front_right_sensor = self.maze.is_wall(self.x_position + 1, self.y_position - 1)
            left_sensor = self.maze.is_wall(self.x_position - 1, self.y_position)
            right_sensor = self.maze.is_wall(self.x_position + 1, self.y_position)
            back_sensor = self.maze.is_wall(self.x_position, self.y_position + 1)
        elif self.heading == Direction.EAST:
            front_sensor = self.maze.is_wall(self.x_position + 1, self.y_position)
            front_left_sensor = self.maze.is_wall(self.x_position + 1, self.y_position - 1)
            front_right_sensor = self.maze.is_wall(self.x_position + 1, self.y_position + 1)
            left_sensor = self.maze.is_wall(self.x_position, self.y_position - 1)
            right_sensor = self.maze.is_wall(self.x_position, self.y_position + 1)
            back_sensor = self.maze.is_wall(self.x_position - 1, self.y_position)
        elif self.heading == Direction.SOUTH:
            front_sensor = self.maze.is_wall(self.x_position, self.y_position + 1)
            front_left_sensor = self.maze.is_wall(self.x_position + 1, self.y_position + 1)
            front_right_sensor = self.maze.is_wall(self.x_position - 1, self.y_position + 1)
            left_sensor = self.maze.is_wall(self.x_position + 1, self.y_position)
            right_sensor = self.maze.is_wall(self.x_position - 1, self.y_position)
            back_sensor = self.maze.is_wall(self.x_position, self.y_position - 1)
        else:
            front_sensor = self.maze.is_wall(self.x_position - 1, self.y_position)
            front_left_sensor = self.maze.is_wall(self.x_position - 1, self.y_position + 1)
            front_right_sensor = self.maze.is_wall(self.x_position - 1, self.y_position - 1)
            left_sensor = self.maze.is_wall(self.x_position, self.y_position + 1)
            right_sensor = self.maze.is_wall(self.x_position, self.y_position - 1)
            back_sensor = self.maze.is_wall(self.x_position + 1, self.y_position)
        
        # Calculate sensor value
        sensor_val = 0
        
        if front_sensor is True:
            sensor_val += 1
        if front_left_sensor is True:
            sensor_val += 2
        if front_right_sensor is True:
            sensor_val += 4
        if left_sensor is True:
            sensor_val += 8
        if right_sensor is True:
            sensor_val += 16
        if back_sensor is True:
            sensor_val += 32
        
        self.sensor_val = sensor_val
        
        return sensor_val
    
    # Returns route in printable format
    def print_route(self):
        route = ''
        
        for step in self.route:
            route += '{'+ str(step[0]) + ',' + str(step[1]) + '}'
        return route

In [4]:
class Individual:
    
    def __init__(self, chromosome_length):
        self.chromosome = None
        self.set_chromosome(chromosome_length)
        self.fitness = None
    
    # Initialize with random chromosome
    def set_chromosome(self, chromosome_length):
        self.chromosome = []
        for _ in xrange(chromosome_length):
            if random.uniform(0, 1) < 0.5:
                self.chromosome.append(0)
            else:
                self.chromosome.append(1)

In [5]:
class Population:
    
    def __init__(self, population_size, chromosome_length=None):
        # Set None list for crossover or mutation to set individual
        self.individuals = [None] * population_size
        if chromosome_length is not None:
            self.set_individuals(population_size, chromosome_length)
        self.population_fitness = None
    
    # Initialize individuals in population
    def set_individuals(self, population_size, chromosome_length):
        self.individuals = []
        for _ in xrange(population_size):
            self.individuals.append(Individual(chromosome_length))
    
    # The offset of the individual you want, sorted by fitness
    # 0 is the strongest, len(population)-1 is the weakest
    def get_fittest(self, offset):
        self.individuals.sort(key=lambda x: x.fitness, reverse=True)
        return self.individuals[offset]
    
    def shuffle(self):
        random.shuffle(self.individuals)

In [6]:
class GeneticAlgorithm:
    
    def __init__(self, population_size, mutation_rate, crossover_rate, elitism_count, tournament_size):
        self.population_size = population_size
        self.mutation_rate = mutation_rate
        self.crossover_rate = crossover_rate
        self.elitism_count = elitism_count
        self.tournament_size = tournament_size
        
    def init_population(self, chromosome_length):
        # Initialize population
        return Population(self.population_size, chromosome_length)
    
    # Calculate fitness for an individual
    def calc_fitness(self, individual, maze):
        # Get individual's chromosome
        chromosome = individual.chromosome
        
        # Get fitness
        robot = Robot(chromosome, maze, 100)
        robot.run()
        fitness = maze.score_route(robot.route)
        
        # Store fitness
        individual.fitness = fitness
        
        return fitness
    
    # Evaluate the whole population
    def eval_population(self, population, maze):
        population_fitness = 0
        
        # Loop over population evaluating individuals and summing population fitness
        for individual in population.individuals:
            population_fitness += self.calc_fitness(individual, maze)
            
        population.population_fitness = population_fitness
    
    # Check if population has met termination condition
    def termination_condition_met(self, generations_count, max_generations):
        return generations_count > max_generations
    
    # Selects parent for crossover using tournament selection
    def select_parent(self, population):
        # Create tournament
        tournament = Population(self.tournament_size)
        
        # Add random individuals to the tournament
        population.shuffle()
        for i in range(self.tournament_size):
            tournament.individuals[i] = population.individuals[i]
        
        # Return the best
        return tournament.get_fittest(0)
    
    # Crossover population using single point crossover
    def crossover_population(self, population):
        # Create new population
        new_population = Population(len(population.individuals))
        # Loop over current population by fitness 
        for population_index in range(len(population.individuals)):
            parent1 = population.get_fittest(population_index)
            
            # Apply crossover to this individual?
            if self.crossover_rate > random.uniform(0, 1) and population_index >= self.elitism_count:
                # Initialize offspring
                offspring = Individual(len(parent1.chromosome))
                
                # Find second parent
                parent2 = self.select_parent(population)
                
                # Get random swap point
                swap_point = int(random.uniform(0, 1) * (len(parent1.chromosome) + 1))
                
                # Loop over genome
                for gene_index in range(len(parent1.chromosome)):
                    # Use half of parent1's genes and half of parent2's genes
                    if gene_index < swap_point:
                        offspring.chromosome[gene_index] = parent1.chromosome[gene_index]
                    else:
                        offspring.chromosome[gene_index] = parent2.chromosome[gene_index]
                
                # Add offspring to new population
                new_population.individuals[population_index] = offspring
            else:
                # Add individual to new population without applying crossover
                new_population.individuals[population_index] = parent1
        
        return new_population
        
    def mutate_population(self, population):
        # Initialize new population
        new_population = Population(self.population_size)
        
        # Loop over current population by fitness
        for population_index in xrange(len(population.individuals)):
            individual = population.get_fittest(population_index)
            
            # Loop over individual's genes
            for gene_index in xrange(len(individual.chromosome)):
                # Skip mutation if this is an elite individual
                if population_index >= self.elitism_count:
                    # Does this gene need mutation?
                    if self.mutation_rate > random.uniform(0, 1):
                        # Get new gene
                        new_gene = 1
                        if individual.chromosome[gene_index] == 1:
                            new_gene = 0
                        # Mutate gene
                        individual.chromosome[gene_index] = new_gene
            
            # Add individual to population
            new_population.individuals[population_index] = individual
            
        return new_population

In [7]:
max_generations = 1000

maze = Maze([
        [ 0, 0, 0, 0, 1, 0, 1, 3, 2 ],
        [ 1, 0, 1, 1, 1, 0, 1, 3, 1 ],
        [ 1, 0, 0, 1, 3, 3, 3, 3, 1 ],
        [ 3, 3, 3, 1, 3, 1, 1, 0, 1 ],
        [ 3, 1, 3, 3, 3, 1, 1, 0, 0 ],
        [ 3, 3, 1, 1, 1, 1, 0, 1, 1 ],
        [ 1, 3, 0, 1, 3, 3, 3, 3, 3 ],
        [ 0, 3, 1, 1, 3, 1, 0, 1, 3 ],
        [ 1, 3, 3, 3, 3, 1, 1, 1, 4 ]
    ])

# Create genetic algorithm
ga = GeneticAlgorithm(200, 0.05, 0.9, 2, 10)
population = ga.init_population(128)
ga.eval_population(population, maze)
# Keep track of current generation
generation = 1
# Start evolution loop
while ga.termination_condition_met(generation, max_generations) is False:
    # Print fittest individual from population
    fittest = population.get_fittest(0)
    print 'G' + str(generation) + ' Best solution (' + str(fittest.fitness) + '): ' + ''.join([str(x) for x in fittest.chromosome])
    
    # Apply crossover
    population = ga.crossover_population(population)
    
    # Apply mutation
    population = ga.mutate_population(population)
    
    # Evaluate population
    ga.eval_population(population, maze)
    
    # Increment the current generation
    generation += 1
    
print 'Stopped after ' + str(max_generations) + ' generations.'
fittest = population.get_fittest(0)
print 'Best solution (' + str(fittest.fitness) + '): ' + ''.join([str(x) for x in fittest.chromosome])

G1 Best solution (1): 00110110010000001100111111101000011001100011001101110100010101111110011100011111011001010110111010100111111111111100010001011101
G2 Best solution (1): 00110110010000001100111111101000011001100011001101110100010101111110011100011111011001010110111010100111111111111100010001011101
G3 Best solution (2): 01100101000010100100101100110001111101011000101111101101111110110011111101110000000111001011010100101101111101110110101101011010
G4 Best solution (3): 11001100001101001100011111000011100111001111111111010101110110111100001001100001000111001001010100101101111101110100101101011010
G5 Best solution (3): 11001100001101001100011111000011100111001111111111010101110110111100001001100001000111001001010100101101111101110100101101011010
G6 Best solution (5): 10001100100101100101110000011011100111000110111111010111110110110100001001100101110111001001010100000101111101110100101101001000
G7 Best solution (5): 100011001001011001011100000110111001110001101111110101111101101101000010