In [1]:
import numpy as np
from itertools import groupby
from pathlib import Path
from IPython.core.debugger import set_trace
import time

In [2]:
sample_input = open('sample-input.txt')
puzzles = [line.strip() for line in [line for line in sample_input.read().splitlines() if len(line)!=0] if line[0]!='#']
puzzles

['BBIJ....IJCC..IAAMGDDK.MGH.KL.GHFFL.',
 '..I...BBI.K.GHAAKLGHDDKLG..JEEFF.J..',
 'JBBCCCJDD..MJAAL.MFFKL.N..KGGN.HH...',
 'BBB..MCCDD.MAAKL.MJ.KLEEJ.GG..JHHHII J0 B4',
 'IJBBCCIJDDL.IJAAL.EEK.L...KFF..GGHH. F0 G6',
 'BB.G.HE..G.HEAAG.I..FCCIDDF..I..F...']

In [3]:
def create_grid(puzzle:str):
    return np.array(list(puzzle[0:36]), dtype=str).reshape(6,6), {car[0]:int(car[1:]) for car in puzzle[36:].split()}

def grid_to_string(current_grid_state:np.array, fuel:dict()):
    fuel_list = ' '
    for key in fuel:
        fuel_list = fuel_list + "{key}{value} ".format(key=key, value=fuel[key])
    
    return ''.join(str(ele) for ele in current_grid_state.flatten()) + fuel_list

def get_only_grid_string(puzzle:str):
    return puzzle[0:36]

In [4]:
class Car():
    
    def __init__(self, is_horizontal:bool, letter:str, car_length:int, arr_indices, fuel):
        self.horizontal = is_horizontal
        self.letter = letter
        self.car_length = car_length
        self.arr_indices = arr_indices
        self.fuel = fuel
    
    def __str__(self):
        return "{direction} car '{letter}' with a length of {length}."\
        " Indices i:{indices_i} and j:{indices_j}. It has {fuel} fuel.".format(
            direction='Horizontal' if self.horizontal else 'Vertical',
            letter=self.letter,
            length=self.car_length,
            indices_i=self.arr_indices[0],
            indices_j=self.arr_indices[1],
            fuel=self.fuel)

In [5]:
def get_all_cars_in_grid(puzzle:str):
    
    puzzle, fuel = create_grid(puzzle)
    
    #find horizontal cars
    cars = []
    for row in puzzle[range(6)]:
        #https://stackoverflow.com/a/6352456 for finding consecutive duplicates in a list
        grouped_row = [(letter, sum(1 for i in g)) for letter, g in groupby(row)]        
        grouped_row = [Car(True, letter, num, np.where(puzzle==letter), 100 if letter not in fuel else fuel[letter])
                       for letter, num in grouped_row if letter!='.' and num>1]
        if grouped_row is not None:
            cars = cars + grouped_row
    #find vertical cars
    for i in range(6):
        column = puzzle[:,i]
        grouped_column = [(letter, sum(1 for i in g)) for letter, g in groupby(column)]
        grouped_column = [Car(False, letter, num, np.where(puzzle==letter), 100 if letter not in fuel else fuel[letter])
                          for letter, num in grouped_column if letter!='.' and num>1]
        if grouped_column is not None:
            cars = cars + grouped_column
    
    return cars

1. Group the consecutive duplicate letters
2. Find the neighbours of the car's letter.
3. Append a tuple of the group of empty spaces with the array index adjacent to the car to a list if it neighbours the car
4. Return this list

Front spaces are defined as j+1 or i+1. AKA going down or going to the right of the matrix.
Back spaces are defined as j-1 or i-1. AKA going up or going to the left of the matrix.

In [6]:
def find_available_spaces_for_car_on_grid(car:Car, puzzle:str):
    
    back_spaces = 0
    front_spaces = 0
    
    puzzle_grid, fuel = create_grid(puzzle)
    
    if car.horizontal:
        relevant_groups = [(letter, sum(1 for i in g)) for letter, g in groupby(puzzle_grid[car.arr_indices[0][1],:])]
        index_of_car=[i for i, group in enumerate(relevant_groups) if group[0]==car.letter][0]
    else:
        relevant_groups = [(letter, sum(1 for i in g)) for letter, g in groupby(puzzle_grid[:,car.arr_indices[1][0]])]
        index_of_car=[i for i, group in enumerate(relevant_groups) if group[0]==car.letter][0]
    
    try:
        front_neighbour = relevant_groups[index_of_car+1]
        #finds number of empty spaces in front of the car, otherwise it is 0
        front_spaces = front_neighbour[1] if front_neighbour[0]=='.' else 0
        #checks to see if the car can travel that amount of spaces with its fuel,
        #otherwise, the fuel is the max number of spaces it can traverse
        front_spaces = car.fuel if front_spaces > car.fuel else front_spaces
    except IndexError:
        front_spaces = 0
    try:
        back_neighbour = relevant_groups[index_of_car-1] if index_of_car > 0 else ('',0)
        back_spaces = back_neighbour[1] if back_neighbour[0]=='.' else 0
        back_spaces = car.fuel if back_spaces > car.fuel else back_spaces
    except IndexError:
        back_spaces = 0
    
    return car, back_spaces, front_spaces

1. Pass car_moves into move_car? Let's assume that the passed amount is valid for the puzzle
2. Update the car's array indices
3. Update the grid/empty space, replace the car's old position with '.'s, then place the car at its new position
4. return new puzzle grid

In [7]:
def goal_state(puzzle:str):
    puzzle_grid, fuel = create_grid(puzzle)
    return puzzle_grid[2][5]=='A'

In [8]:
def move_car(car:Car, puzzle:str, amount:int):
    puzzle_grid, fuel = create_grid(puzzle)
    
    #Replace car with empty space
    new_grid = np.copy(puzzle_grid)
    new_grid[new_grid==car.letter] = '.'
    indices_i, indices_j = car.arr_indices
    new_fuel = car.fuel - abs(amount)
    
    if car.horizontal:
        indices_j = indices_j + amount
    else:
        indices_i = indices_i + amount
    
    #Put car in new array indices
    for x in range(len(car.arr_indices[0])):
        new_grid[indices_i[x],indices_j[x]] = car.letter
    
    #Add fuel changes to end of grid string
    fuel[car.letter] = new_fuel
    
    return grid_to_string(new_grid, fuel)

In [9]:
def remove_car(puzzle:str):
    puzzle_grid, fuel = create_grid(puzzle)
    if can_remove_car(puzzle):
        letter = puzzle_grid[2][5]
        new_grid = np.copy(puzzle_grid)
        new_grid[new_grid==letter] = '.'
        return grid_to_string(new_grid, fuel), letter
    else:
        return None, ''

In [10]:
def can_remove_car(puzzle:str):
    puzzle_grid, fuel = create_grid(puzzle)
    if goal_state(puzzle):
        return False
    
    goal_row = puzzle_grid[2,:]
    
    car_groups = [(letter, sum(1 for i in g)) for letter, g in groupby(goal_row)]
    
    return car_groups[-1][0]!='.' and car_groups[-1][1]>1

In [11]:
def explore_all_states(current_grid_state_string:str):
    current_grid_state, fuel = create_grid(current_grid_state_string)
    
    new_states = []
    # Find move state where a car is removed
    remove_car_state, removed_car_letter = remove_car(current_grid_state_string)
    if remove_car_state is not None:
        new_states.append((remove_car_state, removed_car_letter, 0))
    # Get all moves
    cars = get_all_cars_in_grid(current_grid_state_string)
    car_moves = [find_available_spaces_for_car_on_grid(car, current_grid_state_string) for car in cars]
    car_moves = list(filter(lambda moves:moves[1]!=0 or moves[2]!=0, car_moves))
    # Create all move states
    
    for move in car_moves:
        current_car, back_spaces, front_spaces = move
        if back_spaces>0:
            for i in range(1,back_spaces+1):
                if(current_car.fuel-i>=0):
                    moved_car_state = move_car(current_car, current_grid_state_string, -i)
                    new_states.append((moved_car_state, current_car.letter, -i, current_car.fuel-i))
        if front_spaces>0:
            for i in range(1,front_spaces+1):
                if(current_car.fuel-i>=0):
                    moved_car_state = move_car(current_car, current_grid_state_string, i)
                    new_states.append((moved_car_state, current_car.letter, i, current_car.fuel-i))
    return new_states

In [12]:
def blocking_heuristic(puzzle:str):
    puzzle_grid, fuel = create_grid(puzzle)
    #since we always know 'A' car is on the third row, we just make group the duplicate characters on the third row
    row = [(letter, sum(1 for i in g)) for letter, g in groupby(puzzle_grid[2])]
    #filter out the empty space since we can ignore it the heuristic
    row = list(filter(lambda group: group[0]!='.', row))
    #find the index of the 'A' in the group list
    index_of_car=[i for i, group in enumerate(row) if group[0]=='A'][0]+1
    #return length of group list - 'A' car's index to get how many cars are blocking 'A'
    return len(row)-index_of_car

In [13]:
def blocked_positions_heuristic(puzzle:str):
    puzzle_grid, fuel = create_grid(puzzle)
    #since we always know 'A' car is on the third row, we just make group the duplicate characters on the third row
    row = [(letter, sum(1 for i in g)) for letter, g in groupby(puzzle_grid[2])]
    #filter out the empty space
    row = list(filter(lambda group: group[0]!='.', row))
    #find the index of the 'A' in the group list
    index_of_car=[i for i, group in enumerate(row) if group[0]=='A'][0]+1
    #sum all the groups after 'A'
    blocked_positions = sum(blocking_groups[1] for blocking_groups in row[index_of_car:])
    return blocked_positions

In [14]:
def blocking_heuristic_with_multiplier(puzzle:str):
    multiplier = 4
    return blocking_heuristic(puzzle)*multiplier

In [15]:
#a* algorithm from https://leetcode.com/problems/shortest-path-in-binary-matrix/discuss/313347/a-search-in-python

from heapq import heappush, heappop

class PriorityQueue:
    
    def __init__(self, iterable=[]):
        self.heap = []
        for value in iterable:
            heappush(self.heap, (0, value))
    
    def add(self, value, priority=0):
        heappush(self.heap, (priority, value))
    
    def pop(self):
        priority, value = heappop(self.heap)
        return value
    
    def __len__(self):
        return len(self.heap)

In [16]:
#a* algorithm from https://leetcode.com/problems/shortest-path-in-binary-matrix/discuss/313347/a-search-in-python

def a_star_search(
        start,
        goal,
        successors,
        heuristic
    ):
    visited = set()
    came_from = dict()
    distance = {start:0}
    frontier = PriorityQueue()
    frontier.add(start)
    actions_info = {start:(None)}
    search_states = dict()
    search_count = 0
    while frontier:
        node = frontier.pop()
        if get_only_grid_string(node) in visited:
            continue
        if goal(node):
            goal_path = reconstruct_path(came_from, start, node)
            relevant_actions = [actions_info[state] for state in goal_path]
            return goal_path, relevant_actions, search_count, search_states
        visited.add(get_only_grid_string(node))
        for successor in successors(node):
            successor_state, successor_letter, successor_distance, successor_fuel = successor
            actions_info[successor_state] = (successor_letter, successor_distance)
            
            successor_distance = abs(successor_distance)
            frontier.add(successor_state, priority=distance[node] + 1 + heuristic(successor_state))
            
            search_states[successor_state] = (distance[node] + 1, heuristic(successor_state))
            search_count = search_count + 1;
            
            if (successor_state not in distance
                or distance[node] + 1 < distance[successor_state]):
                distance[successor_state] = distance[node] + 1
                came_from[successor_state] = node
    return None, search_count, search_states

In [17]:
def reconstruct_path(came_from, start, end):
    reverse_path = [end]
    while end != start:
        end = came_from[end]
        reverse_path.append(end)
    return list(reversed(reverse_path))

In [18]:
def determine_dir_of_car(cars, action):
    cars_dict = {car.letter:car.horizontal for car in cars}
    letter, distance = action
    if(cars_dict[letter]):
        return 'right' if distance > 0 else 'left'
    else:
        return 'down' if distance > 0 else 'up'

In [19]:
def format_explored_states(search_states):
    states_str_list = []
    for state in search_states:
        dist_val, heur_val = search_states[state]
        total = dist_val + heur_val
        states_str_list.append('{f} {g} {h} {state}\n'.format(f=total, g=dist_val, h=heur_val, state=state))
    return states_str_list

In [20]:
heuristics = [blocking_heuristic, blocked_positions_heuristic, blocking_heuristic_with_multiplier]

In [21]:
for p in range(len(puzzles)):
    for h in range(len(heuristics)):
        current_heuristic=heuristics[h]
        current_puzzle=puzzles[p]
        search_states_with_eval=dict()
        
        #create the file
        file_name='a-h{h_num}-sol-{sol_num}.txt'.format(h_num=h+1, sol_num=p+1)
        file = Path('output/{file}'.format(file=file_name))
        file.touch()
        fuel_str = ' '.join('{}{}'.format(car.letter, car.fuel) for car in get_all_cars_in_grid(current_puzzle))
        cars = get_all_cars_in_grid(current_puzzle)

        #run a* algorithm
        start_time = time.perf_counter()
        solution = a_star_search(current_puzzle, goal_state, explore_all_states, current_heuristic)
        stop_time = time.perf_counter()
        if solution[0] is None:
            search_states_with_eval = solution[2]
            with open('output/{file}'.format(file=file_name),'a') as file:
                file.writelines('no solution')
                file.close()
        else:
            solution_path, solution_actions, states_explored, search_states_with_eval = solution

            #get solution actions as strings
            solution_actions = [(letter,determine_dir_of_car(cars,(letter, distance)),distance) for letter, distance in solution_actions[1:]]
            solution_actions = ['{letter} {direction} {distance}'.format(letter=x, direction=y, distance=abs(z)) for x,y,z in solution_actions]

            with open('output/{file}'.format(file=file_name),'a') as file:
                file.writelines([
                            'Initial Board Configuration: {puzzle}\n'.format(puzzle=current_puzzle),
                            '\n{}\n'.format(str(create_grid(current_puzzle)[0])),
                            '\nCar Fuel Available: {}\n'.format(fuel_str),
                            '\nRuntime: {} seconds\n'.format(stop_time-start_time),
                            '\nSearch path length: {} states\n'.format(states_explored),
                            '\nSolution path length: {} moves\n'.format(len(solution_path)-1),
                            '\nSolution path: {}\n'.format('; '.join(solution_actions))
                ])

                for i in range(1,len(solution_path)):
                    file.writelines([
                            '\n{}\t{}\n'.format(solution_actions[i-1],solution_path[i])
                    ])

                file.writelines('\n{}\n'.format(str(create_grid(solution_path[-1])[0])))

                file.close()
            
        # search file creation
        file_name='a-h{h_num}-search-{sol_num}.txt'.format(h_num=h+1, sol_num=p+1)
        file = Path('output/{file}'.format(file=file_name))
        file.touch()
        
        formatted_search_states = format_explored_states(search_states_with_eval)
        
        with open('output/{file}'.format(file=file_name),'a') as file:
            file.writelines(formatted_search_states)
            file.close()

# TODO:
# - Start on UCS
# - Add the other 2 heuristics

In [22]:
def ucs(start, goal, successors):
    visited = set()
    came_from = dict()
    distance = {start:0}
    frontier = PriorityQueue()
    frontier.add(start)
    actions_info = {start:(None)}
    search_states = dict()
    search_count = 0
    while frontier:
        node = frontier.pop()
        if get_only_grid_string(node) in visited:
            continue
        if goal(node):
            goal_path = reconstruct_path(came_from, start, node)
            relevant_actions = [actions_info[state] for state in goal_path]
            return goal_path, relevant_actions, search_count, search_states
        visited.add(get_only_grid_string(node))
        for successor in successors(node):
            successor_state, successor_letter, successor_distance, successor_fuel = successor
            actions_info[successor_state] = (successor_letter, successor_distance)
            
            successor_distance = abs(successor_distance)
            frontier.add(successor_state, priority=distance[node] + 1)
            
            search_states[successor_state] = distance[node] + 1
            search_count = search_count + 1
            
            if (successor_state not in distance
                or distance[node] + 1 < distance[successor_state]):
                distance[successor_state] = distance[node] + 1
                came_from[successor_state] = node
    return None, search_count, search_states

# UCS Version

In [25]:
def format_explored_states_ucs(search_states):
    states_str_list = []
    for state in search_states:
        dist_val = search_states[state]
        states_str_list.append('{g} {state}\n'.format(g=dist_val, state=state))
    return states_str_list

In [26]:
for p in range(len(puzzles)):
    current_puzzle=puzzles[p]
    search_states_with_eval=dict()

    #create the file
    file_name='ucs-sol-{sol_num}.txt'.format(sol_num=p+1)
    file = Path('output/{file}'.format(file=file_name))
    file.touch()
    fuel_str = ' '.join('{}{}'.format(car.letter, car.fuel) for car in get_all_cars_in_grid(current_puzzle))
    cars = get_all_cars_in_grid(current_puzzle)

    #run a* algorithm
    start_time = time.perf_counter()
    solution = ucs(current_puzzle, goal_state, explore_all_states)
    stop_time = time.perf_counter()
    if solution[0] is None:
        search_states_with_eval = solution[2]
        with open('output/{file}'.format(file=file_name),'a') as file:
            file.writelines('no solution')
            file.close()
    else:
        solution_path, solution_actions, states_explored, search_states_with_eval = solution

        #get solution actions as strings
        solution_actions = [(letter,determine_dir_of_car(cars,(letter, distance)),distance) for letter, distance in solution_actions[1:]]
        solution_actions = ['{letter} {direction} {distance}'.format(letter=x, direction=y, distance=abs(z)) for x,y,z in solution_actions]

        with open('output/{file}'.format(file=file_name),'a') as file:
            file.writelines([
                        'Initial Board Configuration: {puzzle}\n'.format(puzzle=current_puzzle),
                        '\n{}\n'.format(str(create_grid(current_puzzle)[0])),
                        '\nCar Fuel Available: {}\n'.format(fuel_str),
                        '\nRuntime: {} seconds\n'.format(stop_time-start_time),
                        '\nSearch path length: {} states\n'.format(states_explored),
                        '\nSolution path length: {} moves\n'.format(len(solution_path)-1),
                        '\nSolution path: {}\n'.format('; '.join(solution_actions))
            ])

            for i in range(1,len(solution_path)):
                file.writelines([
                        '\n{}\t{}\n'.format(solution_actions[i-1],solution_path[i])
                ])

            file.writelines('\n{}\n'.format(str(create_grid(solution_path[-1])[0])))

            file.close()

    # search file creation
    file_name='ucs-search-{sol_num}.txt'.format(sol_num=p+1)
    file = Path('output/{file}'.format(file=file_name))
    file.touch()

    formatted_search_states = format_explored_states_ucs(search_states_with_eval)

    with open('output/{file}'.format(file=file_name),'a') as file:
        file.writelines(formatted_search_states)
        file.close()