In [None]:
"""
Author: Milad Pooladsanj, April 2024
"""

# King's Planning Problem
The king's planning problem was given as a take-home project by Instock. In the following section, we describe the problem formulation.

## Problem Description
Design and implement in code a Planner algorithm for a square chessboard of arbitrary size with certain cells marked red as restricted areas. There are $N$ kings on the board, each with a specific target cell to reach. The kings move sequentially, one by one, one step at a time ($K_1$ step1, $K_2$ step1…. $K_1$ step2, $K_2$ step2…), and must adhere to two main rules: they cannot move into restricted cells, and they cannot end a move adjacent to another king (including diagonally). A king may stay in place (empty move). The task is to compute a path for each king to its target, ensuring compliance with the movement rules and the sequential order of moves.

## Input
Planner’s input would be given as 2 files:

- File1: king starting and target positions
  x_from, y_from, x_to, y_to

- File2: chessboard size and restricted areas
  $S$,
  $x$, $y$,
  where $x$ and $y$ are integer coordinates starting from zero.

## Restrictions
- If plan (solution) does not exist, Planner should detect and output that plan does not exist.
- If plan does exist, Planner should output a plan in format: x_from, y_from, x_to, y_to, where each line is one step of one king, and king_id would be deduced by a test program from “from” coordinates.
- For a 100x100 chessboard size the Planner should finish planning in reasonable time.
- The Planner program should use not more than 1GB of RAM.
- A reasonable simple visualization would be a plus but is not a requirement.
- Your ideas on optimality of your algorithm would be a valuable add-on when discussing your solution on a call.
- Implementations can be done in Go, C++ or python.

# Planner Magnus
In this section, we introduce our planning algorithm, called Planner Magnus, and provide its implementation in Python. 

## Notations
For a set $\mathcal{A}$, let $|\mathcal{A}|$ denote its cardinality. Given an integer $n$, we use $[n]$ to denote the set $\{1, 2, \cdots, n\}$. We let $\mathcal{K}$ be the set of kings, where $|\mathcal{K}| = N$. We let $\mathcal{I}$ be the set of squares on the chess board, and $\mathcal{I}_{s}$ be the set of restricted squares. For king $k \in \mathcal{K}$, let $o_k \in \mathcal{I}$ denote its initial square and $d_k \in \mathcal{I}$ denote its target square. With a slight abuse of notation, given a square $i \in \mathcal{I}$, we let $\text{adj}(i)$ denote all of its adjacent squares *and* square $i$.


## Optimization Framework
Since empty moves are allowed, the sequential order of moves is redundant. In other words, the problem is equivalent to a planning problem with no constraint on the sequential order of moves. We now describe the optimization framework used in the proposed planner (Planner Magnus). For a given king $k \in \mathcal{K}$, we define a binary variable $w_{i,t}^{k}$, which indicates whether king $k$ is occupying square $i$ of the chess board at time $t$ (if $w_{i,t}^{k} = 1$) or not (if $w_{i,t}^{k} = 0$). 

Planner Magnus solves an optimization problem in order to find a path (if it exists) for all the kings. Since the goal is merely to find a path, we cast the problem as a feasiblity problem with the following constraints:
\begin{align}
w_{o_k, 1}^{k} &= 1,~~~ \forall k \in \mathcal{K}, \label{eq:initial-condition} \tag{1} \\
w_{d_k, T}^{k} &= 1,~~~ \forall k \in \mathcal{K}, \label{eq:final-condition} \tag{2} \\
w_{i, t}^{k} &= 0,~~~ \forall i \in \mathcal{I}_{s},~ \forall t = 1, \cdots, T,~ \forall k \in \mathcal{K}, \label{eq:resrticted-square} \tag{3} \\
\sum_{j \in \text{adj}(i)} w_{j, t + 1}^{k} & \geq w_{i, t}^{k},~~~ \forall i \in \mathcal{I},~ \forall t = 1, \cdots, T-1,~ \forall k \in \mathcal{K}, \label{eq:king-move} \tag{4} \\
\sum_{i \in \mathcal{I}}w_{i,t}^{k} &= 1,~~~ \forall t = 1, \cdots, T,~ \forall k \in \mathcal{K}, \label{eq:occupy-one-square} \tag{5} \\
w_{i,t}^{\ell} + \sum_{j \in \text{adj}(i)} w_{j, t}^{k} &\leq 1,~~~ \forall t = 1, \cdots, T,~ \forall k \in \mathcal{K}: k \neq \ell, \label{eq:no-two-kings-adj} \tag{6} \\
w_{i, t}^{k} &\in \{0, 1\},~~~ \forall i \in \mathcal{I},~ \forall t = 1, \cdots, T,~ \forall k \in \mathcal{K}. \label{eq:w-binary} \tag{7}
\end{align}

Here,  $T$ is a rough upper bound on the time it takes for all the kings to reach their target squares, and is determined by the user. Constraints \eqref{eq:initial-condition} and \eqref{eq:final-condition} enforce the initial and final condition of all kings, respectively. Constraint \eqref{eq:resrticted-square} ensures that no king can occupy the restricted squares at any time. Constraint \eqref{eq:king-move} ensures that, upon occupying a sqaure, a king either stays in that square or moves to an adjacent square in the next move. Constraint \eqref{eq:occupy-one-square} ensures that each king occupies exactly one square, and constraint \eqref{eq:no-two-kings-adj} ensures that no two kings occupy adjacent squares during the planning horizon. 

**Remark)** While an optimization framework seems like a natural choice to solve this problem, it certainly is not the only approach. Other choices such as Dynamic Programming (DP) and Breath-First Search (BFS) algorithms are also solid options to consider. However, the provided optimization framework seems to be more flexible, especially if the user needs more than just finding a plan, e.g., a plan that minimizes the total time it takes for all kings to reach their target squares. 

## Implementation
Among the plethora of options, we choose the *PuLP* library to implement and solve the integer linear program described in Planner Magnus. PuLP is a modeling framework for linear and integer programming problems in Python that relies on a solver to compute a solution. One advantage of PuLP over other frameworks is that it works with many popular solvers such as CPLEX, Gurobi, and COIN. Here, we use the Gurobi solver, which is considered to be the state-of-the-art in solving integer linear programs. References [1-3] at the end of this report provide instructions on how to set up PuLp and Gurobi. 

Although Gurobi is quite efficient, it still suffers from the curse of dimensionality, i.e., computational time increases exponentially as the problem size increases. To overcome this issue, we implement Planner Magnus using an *incremental batch* approach. That is, we first divide the kings into batches, starting from batch size of 1. We then use Planner Magnus to find a plan for each batch while keeping other kings idle. If a plan is not found, we increase the batch size until a plan is found (if any). As we will see, this approach improves computational time substantially.  

In [244]:
import os
import time
from pulp import *
import collections
import time
import numpy as np
import matplotlib.pyplot as plt
path_to_gurobi = r'C:\gurobi1101\win64\bin\gurobi_cl.exe'
# Make Matplotlib plots appear inline
%matplotlib inline

In [245]:
"""
All the functions used in Planner Magnus are provided in this cell.
"""
def read_positions(file_path: str) -> (list, list):
    """Read the position of all kings.
    
    Args:
        string (file_path): path to file that contains king position information
    
    Returns:
        list (initial_positions): starting squares of the kings
        list (target_positions): target squares of the kings
    """
    with open(file_path, 'r') as f:
        data = [tuple(map(int, line.strip().split(','))) for line in f]
    initial_positions = [(x_from, y_from) for x_from, y_from, _, _ in data]
    target_positions = [(x_to, y_to) for _, _, x_to, y_to in data]
    return initial_positions, target_positions

def read_board(file_path: str) -> (int, set):
    """Read the chess board information.
    
    Args:
        string (file_path): path to file that contains board information
    
    Returns:
        int (S): size of chess board
        set (restricted): restricted squares on chess board
    """
    with open(file_path, 'r') as f:
        lines = f.readlines()
        S = int(lines[0].strip())
        restricted = {tuple(map(int, line.strip().split(','))) for line in lines[1:]}
    return S, restricted

def create_adjacency_dict(board_size: int) -> dict:
    """Store adjacent squares of each chess square in a dictionary.
    
    Args:
        int (board_size): size of chess board
    
    Returns:
        dict (adjacency_dict): dictionary of adjacent squares
    """
    square_list = ["i" + str(j) for j in range(board_size * board_size)]
    adjacency_dict = {}
    # Directions including the square itself
    directions = [(-1, -1), (-1, 0), (-1, 1), (0, -1), (0, 1), (1, -1), (1, 0), (1, 1), (0, 0)]  

    for idx, label in enumerate(square_list):
        i, j = divmod(idx, board_size)  # Calculate row and column from index
        adj_labels = set()
        for dx, dy in directions:
            ni, nj = i + dx, j + dy
            if 0 <= ni < board_size and 0 <= nj < board_size:
                adj_index = ni * board_size + nj
                adj_labels.add(square_list[adj_index])
        adjacency_dict[label] = adj_labels
    return adjacency_dict

def split_kings_into_batches(num_kings, batch_size):
    """Generate batches of kings.
    
    Args:
        int (num_kings): number of kings
        int (batch_size): batch size
    
    Returns:
        list: king numbers in each batch 
    """
    return [range(i, min(i + batch_size, num_kings)) for i in range(0, num_kings, batch_size)]

def create_time_indexed_dict(prob: object, board_size: int) -> dict:
    """Store king positions after finding a plan in a dictionary.
    
    Args:
        object (prob): output of PuLP solution
        int (board_size): size of chess board
    
    Returns:
        dict (time_indexed_dict): dictionary of king positions
    """
    time_indexed_dict = {}
    # Extract all variables and filter only those with a value of 1
    for var in prob.variables():
        if var.varValue == 1:
            _, k, i, t = var.name.split('_')
            time_key = int(t[1:])
            if time_key not in time_indexed_dict:
                time_indexed_dict[time_key] = []
            # Convert index to coordinates
            x, y = divmod(int(i[1:]), board_size)
            time_indexed_dict[time_key].append((int(k[1:]), x, y))

    return time_indexed_dict

def extract_moves_from_dict(time_indexed_dict: dict, batch: list) -> list:
    """Extract king moves after finding a plan.
    
    Args:
        dict (time_indexed_dict): dictionary of king positions
        list (batch): batch of kings
    
    Returns:
        list (moves): king moves
    """
    moves = []
    last_positions = [None] * len(batch)  # Track last positions for all kings
    # Sort the time steps
    sorted_time_keys = sorted(time_indexed_dict.keys())
    for time_key in sorted_time_keys:
        current_positions = {k: (x, y) for k, x, y in time_indexed_dict[time_key]}
        for i in range(len(batch)):
            if last_positions[i] is not None:
                last_x, last_y = last_positions[i]
                curr_x, curr_y = current_positions[batch[i]]
                moves.append((batch[i], last_x, last_y, curr_x, curr_y))  # Including king index in the move
            last_positions[i] = current_positions.get(batch[i])  # Update last position for next time
    return moves

def write_moves_to_file(moves: list, file_path: str) -> None:
    """Write king moves into a file.
    
    Args:
        list (moves): king moves
        str (file_path): file path to write the king moves into
    
    Returns:
        None
    """
    with open(file_path, 'w') as file:
        for move in moves:
            line = f"{move[1]}, {move[2]}, {move[3]}, {move[4]}\n"
            file.write(line)

In [246]:
def solve_batch(batch, restricted_squares, initial_positions, target_positions, board_size, adj_dict):
    """Solve an optimization problem to find a plan for current batch.
    
    Args:
        list (batch): current batch of kings
        set (restricted_squares): restricted squares on chess board
        list (initial_positions): initial positions of kings in batch
        list (target_positions): target positions of kings in batch
        int (board_size): size of chess board
        dict (adj_dict): adjacent squares of each chess square
    
    Returns:
        list (moves): king moves if a plan is found for current batch
    """    
    # Create an indexed list of kings in the batch
    num_kings = len(batch)
    king_list = ["k" + str(i) for i in batch]
    
    # Create an indexed list of time stamps
    T = int(2 * board_size)
    time_list = ["t" + str(i) for i in range(T)]
    
    # Create an indexed list of chess squares
    squares_list = ["i" + str(j) for j in range(board_size * board_size)]
    
    restricted_list = ["i" + str((x) * board_size + y) for x, y in restricted_squares]
    initial_dict = {}
    target_dict = {}
    for i in range(num_kings): 
        (x_from, y_from) = initial_positions[i]
        initial_index = x_from * board_size + y_from
        initial_label = ["i" + str(initial_index)]
        initial_dict["k" + str(batch[i])] = initial_label
        (x_to, y_to) = target_positions[i]
        target_index = x_to * board_size + y_to
        target_label = ["i" + str(target_index)]
        target_dict["k" + str(batch[i])] = target_label
    
    # Create the 'prob' variable to contain the problem data
    prob = LpProblem("Planner Magnus", LpMinimize)
    
    # A dictionary called 'w' is created to contain the decision variables w_{i,t}^{k}
    w = {}
    w = LpVariable.dicts("w", (king_list, squares_list, time_list), cat="Binary")
    
    # We do not define an objective function since none is needed
    
    # Add constraints
    for k in king_list:
        prob += w[k][initial_dict[k][0]][time_list[0]] == 1 # Eq (2): initial condition
        prob += w[k][target_dict[k][0]][time_list[-1]] == 1 # Eq (3): final condition
        for n in range(len(time_list)):
            for i in restricted_list:           
                prob += w[k][i][time_list[n]] == 0 # Eq (4): no king on restrictd squares
            for i in squares_list:    
                if n < len(time_list) - 1:
                    prob += (
                        lpSum(w[k][j][time_list[n + 1]] for j in adj_dict[i]) -
                        w[k][i][time_list[n]] >= 0
                    )                  # Eq (5): king stays or moves to adjacent squares
                for l in king_list:
                    if l != k:
                        prob += (
                            w[k][i][time_list[n]] +
                            lpSum(w[l][j][time_list[n]] for j in adj_dict[i]) <= 1
                        )             # Eq (7): no two kings are in adjacent squares
                                      
            prob += lpSum(w[k][i][time_list[n]] for i in squares_list) == 1 # Eq (6): each king occupies one square

    # Gurobi is chosen as the solver
    solver = GUROBI_CMD(path=path_to_gurobi, options=[("MIPFocus", 1), ("LogFile", "unique_gurobi.log")])
    
    # The problem data is written to an .lp file
    prob.writeLP("Planner_Magnus.lp")
    
    # The problem is solved using Gurobi
    prob.solve(solver)
    # The status of the solution is printed to the screen
    moves = []
    if LpStatus[prob.status] == "Not Solved":
        return moves
    time_indexed_dict = create_time_indexed_dict(prob, board_size)
    moves = extract_moves_from_dict(time_indexed_dict, batch)
    return moves

In [254]:
def find_feasible_plan(kings_file_path, board_file_path):
    """Find a feasible plan for kings (if exists) through an incremental batch approach.
    
    Args:
        kings_file_path (str): Path to the file containing kings' starting and target positions
        board_file_path (str): Path to the file containing the chessboard size and restricted squares
    
    Returns:
        list: king moves if a plan is found.
    """
    initial_positions, target_positions = read_positions(kings_file_path)
    board_size, restricted_squares = read_board(board_file_path)
    
    num_kings = len(initial_positions)
    
    adj_dict = create_adjacency_dict(board_size)
    # Start with the smallest batch size and incrementally increase
    for current_batch_size in range(1, num_kings + 1):
        print(f"Trying with batch size: {current_batch_size}")
        batches = split_kings_into_batches(num_kings, current_batch_size)
        all_moves = []  # to store all moves across all batches
    
        # Assume a flag that checks if the current batch size was successful
        all_batches_successful = True
        current_positions = initial_positions[:]
        for batch in batches:
            current_batch_fail = False
            additional_restricted = []
            batch_initial_positions = [initial_positions[k] for k in batch]
            batch_target_positions = [target_positions[k] for k in batch]
            # Update restricted squares based on the current and settled kings
            updated_restricted_squares = set(restricted_squares)
            for k in range(num_kings):
                if k not in batch:
                    updated_restricted_squares.add(current_positions[k])
                    square_index = current_positions[k][0] * board_size + current_positions[k][1]
                    adjacent_restricted = adj_dict["i" + str(square_index)]
                    for square in adjacent_restricted:
                        i, j = divmod(int(square[1:]), board_size)
                        updated_restricted_squares.add((i, j))
                        
            for position in batch_target_positions:
                if position in updated_restricted_squares:
                    current_batch_fail = True
            if current_batch_fail:
                all_batches_successful = False
                break
            # Solve for the current batch with updated restrictions
            batch_moves = []
            batch_moves = solve_batch(batch, updated_restricted_squares,
                                      batch_initial_positions,
                                      batch_target_positions,
                                      board_size, adj_dict
                                     )
            for time in range(len(batch_moves) // len(batch)):
                # Update current positions and record moves, including empty moves for idle kings
                for k in range(num_kings):
                    if k in batch:
                        relative_index = k - batch[0]
                        move = batch_moves[int(time * len(batch) + relative_index)]
                        current_positions[k] = (move[3], move[4])
                        all_moves.append(move)
                    else:
                        # Record an empty move for kings not in the current batch
                        all_moves.append((k, current_positions[k][0], current_positions[k][1], current_positions[k][0], current_positions[k][1]))
            if len(batch_moves) == 0:
                all_batches_successful = False
                break
        if all_batches_successful:
            print(f"Feasible plan found with batch size: {current_batch_size}")
            return all_moves

In [258]:
start_time = time.time()
kings_file_path = "problem-tests/10/kings.txt"
board_file_path = "problem-tests/10/map.txt"

all_moves = find_feasible_plan(kings_file_path, board_file_path)
if all_moves == []:
    print("Plan does not exists")
else:
    output_file_path = 'king_moves.txt'
    write_moves_to_file(all_moves, output_file_path)
    print(f"Moves have been written to {output_file_path}.")
end_time = time.time()
duration = end_time - start_time
print(f"Time taken: {duration} seconds")

Trying with batch size: 1
Trying with batch size: 2
Trying with batch size: 3
Feasible plan found with batch size: 3
Moves have been written to king_moves.txt.
Time taken: 0.4528214931488037 seconds


## Results
In this section, we discuss the performance of Planner Magnus for the 10 problem tests provided. For all problem tests, we set $T = 2S$, where $S$ is the size of the chess board, and use the default Gurobi parameters. We also use an incremental batch approach described in Section 2.3, starting from batch size of 1. All problem sets were run on an Intel(R) Core(TM) i7-8700 CPU @ 3.20GHz   3.19 GHz desktop computer.

In problem test 1, there are $N = 8$ kings located on an $8 \times 8$ chess board. Planner Magnus takes about 9.0 seconds to find a plan for this case using a batch size of $8$ (the king moves are stored in "king_moves_1.txt"). On the other hand, for problem test 2, which includes $N = 8$ kings on a $16 \times 16$ chess board, Planner Magnus is able to find a plan in about 9.6 seconds using a batch size of $1$ (the king moves are stored in "king_moves_2.txt"). As we can see, even though the size of problem test 2 is twice the size of problem test 1, we are able to control the computational speed by using the incremental batch approach. The results from all simulation runs are shown in Table below: 

- simulation test 3: plan exists, batch size 1, 26.2 seconds
- simulation test 4: plan exists, batch size 1, 13.0 seconds
- simulation test 5: plan exists, batch size 3, 229.7 seconds
- simulation test 6: plan exists, batch size 1, 833.6 seconds
- simulation test 7: plan exists, batch size 2, 303.2 seconds
- simulation test 8: plan exists, batch size 1, 183.6 seconds
- simulation test 9: plan exists, batch size 1, 114.2 seconds
- simulation test 10: plan exists, batch size 3, 0.4 seconds

As we can see from Table, Planner Magnus performs very well in all cases. However, we might still face some issues with computational time if small batch sizes lead to no feasible plans. Potential solutions to overcome this issue include using memoization, reducing the size of the problem by dividing the chess board into smaller sub-boards, and relaxing the optimization problem by looking for approximate solutions. A recent example of the latter approach can be found in [4], where the authors use neural networks to store the integer part of a mixed linear integer program, and use the neural network to solve new instances of the problem in milliseconds.  

# References
[1] https://coin-or.github.io/pulp/main/installing_pulp_at_home.html 

[2] https://support.gurobi.com/hc/en-us/articles/4534161999889-How-do-I-install-Gurobi-Optimizer

[3] https://coin-or.github.io/pulp/guides/how_to_configure_solvers.html

[4] Bertsimas, Dimitris, and Bartolomeo Stellato. "Online mixed-integer optimization in milliseconds." INFORMS Journal on Computing 34.4 (2022): 2229-2248.