# <div style="font-family: Cambria; font-weight:bold; letter-spacing: 0px; color:white; font-size:200%; text-align:center;padding:3.0px; background: #6A1B9A; border-bottom: 8px solid #9C27B0">Santa 2023 - A-Star Algorithm</div>
#### <div style= "font-family: Cambria; font-weight:bold; letter-spacing: 0px; color:white; font-size:150%; text-align:left;padding:3.0px; background: #6A1B9A; border-bottom: 8px solid #9C27B0" >TABLE OF CONTENTS<br><div>
* [IMPORTS](#1)
* [LOAD DATA](#2)
* [FUNCTIONS](#3)
* [SOLVE](#4)

Code modified from: https://www.kaggle.com/code/whats2000/a-star-algorithm-polytope-permutation

<a id="1"></a>
# <div style= "font-family: Cambria; font-weight:bold; letter-spacing: 0px; color:white; font-size:120%; text-align:left;padding:3.0px; background: #6A1B9A; border-bottom: 8px solid #9C27B0" > IMPORTS<br><div> 

In [2]:
import os
import time
import json
import heapq
import pandas as pd
from ast import literal_eval
from dataclasses import dataclass
import random
from sympy.combinatorics import Permutation
from typing import Dict, List
import zipfile
import numpy as np
import sqlite3
from tqdm import tqdm


In [3]:
database_file = '../solutions.db'

<a id="2"></a>
# <div style= "font-family: Cambria; font-weight:bold; letter-spacing: 0px; color:white; font-size:120%; text-align:left;padding:3.0px; background: #6A1B9A; border-bottom: 8px solid #9C27B0" >LOAD DATA<br><div> 

In [12]:
with zipfile.ZipFile('../../../res/data/santa-2023.zip', 'r') as z:
    
    with z.open('puzzle_info.csv') as f:
        puzzle_info = pd.read_csv(f, index_col = 'puzzle_type')        
                
    with z.open('puzzles.csv') as f:
        puzzles = pd.read_csv(f, index_col = 'id')
    
    with z.open('sample_submission.csv') as f:
        submission = pd.read_csv(f)

In [13]:
# Parsing the initial_state and solution_state columns
# Converting the semicolon-separated string values into lists of colors
puzzles['parsed_initial_state'] = puzzles['initial_state'].apply(lambda x: x.split(';'))
puzzles['parsed_solution_state'] = puzzles['solution_state'].apply(lambda x: x.split(';'))

# Converting the string representation of allowed_moves to dictionary
puzzle_info['allowed_moves'] = puzzle_info['allowed_moves'].apply(lambda x: json.loads(x.replace("'", '"')))

# Selecting an example puzzle type and displaying its allowed moves
example_puzzle_type = 'cube_2/2/2'
example_allowed_moves = puzzle_info.loc[example_puzzle_type, 'allowed_moves']


<a id="3"></a>
# <div style= "font-family: Cambria; font-weight:bold; letter-spacing: 0px; color:white; font-size:120%; text-align:left;padding:3.0px; background: #6A1B9A; border-bottom: 8px solid #9C27B0" >FUNCTIONS<br><div> 

In [14]:
def apply_move(state, move, inverse=False):
    """
    Apply a move or its inverse to the puzzle state.

    :param state: List of colors representing the current state of the puzzle.
    :param move: List representing the move as a permutation.
    :param inverse: Boolean indicating whether to apply the inverse of the move.
    :return: New state of the puzzle after applying the move.
    """
    if inverse:
        # Creating a dictionary to map the original positions to the new positions
        inverse_move = {v: k for k, v in enumerate(move)}
        return [state[inverse_move[i]] for i in range(len(state))]
    else:
        return [state[i] for i in move]



In [15]:
# Testing the function with an example move from the 'cube_2/2/2' puzzle
test_state = puzzles['parsed_initial_state'].iloc[0]
test_move = example_allowed_moves['f1']

# Applying the move and its inverse to the test state
applied_state = apply_move(test_state, test_move)
reverted_state = apply_move(applied_state, test_move, inverse=True)

In [16]:

def heuristic(state, goal_state):
    """
    Heuristic function estimating the cost from the current state to the goal state.
    Here, we use the number of mismatched colors between the current state and the goal state.
    """
    return sum(s != g for s, g in zip(state, goal_state))

import time

def a_star_search_with_timeout(initial_state, goal_state, allowed_moves, timeout=300):
    """
    A* search algorithm with a timeout feature.

    :param initial_state: The starting state of the puzzle.
    :param goal_state: The target state to reach.
    :param allowed_moves: A dictionary of moves that can be applied to the state.
    :param timeout: The maximum time (in seconds) allowed for the search.
    :return: The shortest sequence of moves to solve the puzzle, or None if unsolved within timeout.
    """
    start_time = time.time()
    open_set = []  # Priority queue for states to explore
    heapq.heappush(open_set, (0, initial_state, []))  # Each entry: (priority, state, path taken)

    closed_set = set()  # Set to keep track of already explored states

    while open_set:
        if time.time() - start_time > timeout:
            return None  # Timeout check

        _, current_state, path = heapq.heappop(open_set)

        if current_state == goal_state:
            return path  # Goal state reached

        state_tuple = tuple(current_state)
        if state_tuple in closed_set:
            continue  # Skip already explored states

        closed_set.add(state_tuple)

        for move_name, move in allowed_moves.items():
            for inverse in [False, True]:  # Consider both move and its inverse
                new_state = apply_move(current_state, move, inverse)
                new_state_tuple = tuple(new_state)
                if new_state_tuple not in closed_set:
                    priority = len(path) + 1 + heuristic(new_state, goal_state)
                    heapq.heappush(open_set, (priority, new_state, path + [(move_name, inverse)]))

# Testing the A* search algorithm with an example
test_initial_state = puzzles['parsed_initial_state'].iloc[0]
test_goal_state = puzzles['parsed_solution_state'].iloc[0]
test_allowed_moves = example_allowed_moves

# Running the A* search to find a solution
a_star_solution = a_star_search_with_timeout(test_initial_state, test_goal_state, test_allowed_moves)
a_star_solution  # Display the solution moves (if found)

[('r1', False), ('f1', True)]

In [17]:
# Modifying the A* search algorithm to improve efficiency
def improved_heuristic_with_wildcards(state, goal_state, num_wildcards):
    """
    Improved heuristic function considering wildcards.
    """
    mismatches = sum(s != g for s, g in zip(state, goal_state))
    return max(0, mismatches - num_wildcards)

def improved_a_star_search_with_wildcards(initial_state, goal_state, allowed_moves, num_wildcards, max_depth=30, timeout=100):
    """
    Improved A* search algorithm with wildcards, depth limit, and timeout.

    :param initial_state: List representing the initial state of the puzzle.
    :param goal_state: List representing the goal state of the puzzle.
    :param allowed_moves: Dictionary of allowed moves and their corresponding permutations.
    :param num_wildcards: Number of wildcards allowed for the puzzle.
    :param max_depth: Maximum depth to search to limit the search space.
    :param timeout: Time limit in seconds for the search.
    :return: Shortest sequence of moves to solve the puzzle, or None if no solution is found.
    """
    start_time = time.time()
    open_set = []
    heapq.heappush(open_set, (0, initial_state, [], num_wildcards))  # (priority, state, path, remaining wildcards)
    closed_set = set()

    while open_set:
        if time.time() - start_time > timeout:
            return None  # Timeout

        _, current_state, path, remaining_wildcards = heapq.heappop(open_set)

        if len(path) > max_depth:  # Depth limit
            continue

        if current_state == goal_state or improved_heuristic_with_wildcards(current_state, goal_state, remaining_wildcards) == 0:
            return path

        closed_set.add((tuple(current_state), remaining_wildcards))

        for move_name, move in allowed_moves.items():
            for inverse in [False, True]:
                new_state = apply_move(current_state, move, inverse)
                if (tuple(new_state), remaining_wildcards) not in closed_set:
                    priority = len(path) + 1 + improved_heuristic_with_wildcards(new_state, goal_state, remaining_wildcards)
                    heapq.heappush(open_set, (priority, new_state, path + [(move_name, inverse)], remaining_wildcards))

    return None  # No solution found

# Running the improved A* search to find a solution
test_num_wildcards = puzzles['num_wildcards'].iloc[0]
improved_a_star_solution = improved_a_star_search_with_wildcards(test_initial_state, test_goal_state, test_allowed_moves, test_num_wildcards)

In [19]:
def format_solution_for_submission(puzzle_id, solution_moves):
    """
    Format the solution to a puzzle for submission.

    :param puzzle_id: The unique identifier of the puzzle.
    :param solution_moves: List of tuples representing the solution moves.
    :return: Formatted string suitable for submission.
    """
    formatted_moves = []
    for move, inverse in solution_moves:
        move_str = '-' + move if inverse else move
        formatted_moves.append(move_str)

    # Joining the moves into a single string separated by periods
    return {'id': puzzle_id, 'moves': '.'.join(formatted_moves)}

# Example: Formatting the solution for the first puzzle in the dataframe for submission
puzzle_id_example = puzzles.index[0]
formatted_solution = format_solution_for_submission(puzzle_id_example, a_star_solution)
formatted_solution

{'id': 0, 'moves': 'r1.-f1'}

In [33]:
def solve_puzzles(database_file, puzzles, puzzle_info, submission, num_puzzles=None, limit_index=30, solution_method='a-star algo'):
    """
    Solve a set of puzzles using the A* search algorithm.

    :param database_file: path to solution database
    :param puzzles: DataFrame containing puzzles.
    :param puzzle_info: DataFrame containing allowed moves for each puzzle type.
    :param submission: DataFrame containing sample submission format.
    :param num_puzzles: Number of puzzles to solve (if None, solve all).
    :return: DataFrame with the solutions formatted for submission.
    """
    solutions = []
    
    # Connect to the SQLite database
    conn = sqlite3.connect(database_file)
    cursor = conn.cursor()

    # Limit the number of puzzles if specified
    puzzles_to_solve = puzzles if num_puzzles is None else puzzles.head(num_puzzles)

    for index, row in tqdm(puzzles_to_solve.iterrows(), total=puzzles_to_solve.shape[0], desc="Solving Puzzles"):
        puzzle_id = index
        initial_state = row['parsed_initial_state']
        goal_state = row['parsed_solution_state']
        puzzle_type = row['puzzle_type']
        num_wildcards = row['num_wildcards']
        allowed_moves = puzzle_info.loc[puzzle_type, 'allowed_moves']
        
        solution_moves = None
        
        # Attempt to find a solution
        if index < limit_index:
            solution_moves = improved_a_star_search_with_wildcards(initial_state, goal_state, allowed_moves, num_wildcards)

        # If no solution found, use the sample submission's result
        if solution_moves is None:
            solution_moves = submission.loc[puzzle_id, 'moves'].split('.')
            solution_moves = [(move.replace('-', ''), move.startswith('-')) for move in solution_moves]
        else:
            game_id = int(puzzle_id)
            moves = format_solution_for_submission(puzzle_id, solution_moves)['moves']            
            move_count = len(moves.split('.'))
    
            select_query = "SELECT count FROM solutions WHERE id = ?"
    
            # Execute the query
            cursor.execute(select_query, (game_id,))
            best_move_count = cursor.fetchone()
            # print(f'Solved {game_id}')
            if best_move_count[0] > move_count:
                print(f'Improvement to {game_id}')
                # Insert the moves into the database
                insert_query = "INSERT OR REPLACE INTO solutions (id, moves, count, solution_method) VALUES (?, ?, ?, ?)"
                cursor.execute(insert_query, (game_id, moves, move_count, solution_method))
                conn.commit()

        formatted_solution = format_solution_for_submission(puzzle_id, solution_moves)
        solutions.append(formatted_solution)
    
    # Commit the changes and close the connection
    conn.commit()
    conn.close()

    return pd.DataFrame(solutions)



# Solving the first 3 puzzles in the dataset for testing
solved_puzzles = solve_puzzles(database_file, puzzles, puzzle_info, submission, num_puzzles=3)
solved_puzzles

Solving Puzzles: 100%|██████████| 3/3 [00:05<00:00,  1.79s/it]


Unnamed: 0,id,moves
0,0,r1.-f1
1,1,f0.r1.f1.-d0.-d0.-f0.-r0.f0.d0
2,2,-d1.-r0.f0.-r1.f1.d1.-r1.-f0.d1.f0.d1.d1


<a id="4"></a>
# <div style= "font-family: Cambria; font-weight:bold; letter-spacing: 0px; color:white; font-size:120%; text-align:left;padding:3.0px; background: #6A1B9A; border-bottom: 8px solid #9C27B0" >SOLVE<br><div> 

In [35]:
# Solving the first 30 puzzles in the dataset
solved_puzzles = solve_puzzles(database_file, puzzles, puzzle_info, submission, num_puzzles=None, limit_index=398)
solved_puzzles

Solving Puzzles: 100%|██████████| 398/398 [9:29:16<00:00, 85.82s/it]   


Unnamed: 0,id,moves
0,0,r1.-f1
1,1,f0.r1.f1.-d0.-d0.-f0.-r0.f0.d0
2,2,-d1.-r0.f0.-r1.f1.d1.-r1.-f0.d1.f0.d1.d1
3,3,-f0.d0.-r0.f0.-d0.-r0.d0.-f0.-r0.-f0
4,4,-r1.-f0.d0.r0.-d1.-d1.r1.d1.f0.r1.-d1.-r1
...,...,...
393,393,f19.f21.-f39.f20.f2.-f5.f7.-r3.f55.-f12.f65.-f...
394,394,-f31.-f22.f16.-f17.-f13.-f24.-f14.f2.f21.f44.f...
395,395,-r0.-f42.-f8.f16.-f49.f14.-f1.f56.f26.f35.f62....
396,396,f25.-f29.f46.f49.-f8.f27.f26.-f20.f2.-f20.f6.f...
