In [1]:
from kaggle_environments import make
from kaggle_environments.envs.halite.helpers import *

# Create a test environment for use later
board_size = 9
environment = make("halite", configuration={"size": board_size, "startingHalite": 1000})
agent_count = 4

# Greedy Mining

There are some bugs with this code, but the general approach is sound. We compare the values of staying, moving directly toward a specific cell, and moving generally toward clusters of halite.

In [2]:
import random
from board_viz import draw_game, draw_board

def vector_distance(origin: Point, dest: Point, board_size: int):
    center = Point(board_size // 2, board_size // 2)
    return (dest - origin + center) % board_size - center

def manhattan_distance(p1: Point, p2: Point, board_size: int):
    dp = abs(vector_distance(p1, p2, board_size))
    return dp.x + dp.y

def magnitude(p):
    return abs(p.x) + abs(p.y)

def safe_float_div(p: Point, scalar):
    ''' floating point divide by a scalar number, returns zeros instead raising an exception'''
    return Point(p.x/scalar if p.x != 0 else 0, p.y/scalar if p.y != 0 else 0)

def direction(origin, destination):
    ''' Returns a unit vector in the direction from origin to destination'''
    return safe_float_div(vector_distance(origin, destination, board_size), manhattan_distance(origin, destination, board_size))

def halite_move_force(ship, cell):
    ''' calculates the pull force vector of halite on the ship'''
    dist = manhattan_distance(ship.position, cell.position, board_size)
    # divide halite by distance to get initial magnitude
    magnitude = cell.halite / dist if dist > 0 else 0
    return direction(ship.position, cell.position) * magnitude

stay_power = 1.4
cluster_power = 0.1

def force_to_action(best_move_force, sum_move_force, stay_force):
    ''' Turns three input values into a Ship action
    best_move_force is the best calculated single cell to move to
    sum_move_force is the best overall direction to move toward
    stay_force is the magnitude of how good our current cell is
    we use stay_power and cluster_power to scale them
    '''
    move_force = best_move_force + sum_move_force * cluster_power
    stay_force = stay_force * stay_power
    if stay_force > abs(move_force.x) and stay_force > abs(move_force.y):
        return None
    if abs(move_force.x) > abs(move_force.y):
        if move_force.x > 0:
            return ShipAction.EAST
        return ShipAction.WEST
    if move_force.y > 0:
        return ShipAction.NORTH
    return ShipAction.SOUTH

def next_move(board, ship, t):
    if t == 0:
        return ShipAction.CONVERT
    sum_move_force = Point(0, 0)
    best_move_force = Point(0, 0)
    for cpos, cell in board.cells.items():
        move_force = halite_move_force(ship, cell)
        best_move_force = max(move_force, best_move_force, key=magnitude)
        sum_move_force += move_force
    return force_to_action(best_move_force, sum_move_force, board.cells[ship.position].halite)

environment.reset(agent_count)
state = environment.state[0]
board = Board(state.observation, environment.configuration)
t = 0
def step(board, t):
    for ship in board.ships.values():
        ship.next_action = next_move(board, ship, t)
    for shipyard in board.shipyards.values():
        if len(board.ships) < 2:
            shipyard.next_action = ShipyardAction.SPAWN

boards = [board]
for i in range(50):
    step(board, t)
    t += 1
    board = board.next()
    boards.append(board)
    if len(board.ships)+ len(board.shipyards)==0:
        break
draw_game(boards)


interactive(children=(IntSlider(value=0, description='t', max=50), Checkbox(value=True, description='cell_hali…

# DFS Path Mining
Search all possible paths to see which paths are projected to produce the most halite by some t_max.

This approach is computationally expensive. Even with optimization, this is unlikely to get past t_max > 10.

In [3]:
import time

def score_solution(board, positions):
    score = 0.0
    halite = {pos : cell.halite for pos, cell in board.cells.items()}
    prev_pos = None
    for pos in positions:
        if prev_pos:
            if pos == prev_pos:
                score += halite[pos] * 0.25
                halite[pos] *= 0.75
        prev_pos = pos
    return score

def is_valid(board, t, t_max, candidate, destination):
    if manhattan_distance(candidate, destination, board.configuration.size) > t_max - t:
        return False
    return True

def find_candidates(board, t, t_max, positions, origin, destination):
    pos = positions[t - 1] if t > 0 else origin
    legal_moves = [Point(1,0), Point(-1, 0), Point(0,1), Point(0,-1), Point(0,0)]
    candidates = [(m + pos) % board.configuration.size for m in legal_moves] 
    return [candidate for candidate in candidates if is_valid(board, t, t_max, candidate, destination)]

def dfs_solutions(board, t, t_max, positions, origin, destination):
    best_solution = [], 0
    if t == t_max:
        score = score_solution(board, positions)
        return positions, score
    candidates = find_candidates(board, t, t_max, positions, origin, destination)
    for candidate in candidates:
        positions[t] = candidate
        solution, score = dfs_solutions(board, t + 1, t_max, positions, origin, destination)
        if score > best_solution[1]:
            best_solution = solution[:], score
    return best_solution    

board_size = 8
environment = make("halite", configuration={"size": board_size, "startingHalite": 1000})
agent_count = 1
environment.reset(agent_count)
state = environment.state[0]
board = Board(state.observation, environment.configuration)

boards = [board]
ship = list(board.ships.values())[0]
start_pos = ship.position

start_time = time.time()
solution = dfs_solutions(board, 0, 6, [None]*6, start_pos, start_pos)
print(solution)
print('time:', time.time()-start_time, 'seconds')

action_lookup = {
    Point(0,0): None,
    Point(1,0): ShipAction.EAST,
    Point(-1,0): ShipAction.WEST,
    Point(0,1): ShipAction.NORTH,
    Point(0,-1): ShipAction.SOUTH
}
ship.next_action = ShipAction.CONVERT
board = board.next()
boards.append(board)
shipyard = list(board.shipyards.values())[0]
shipyard.next_action = ShipyardAction.SPAWN
board = board.next()
boards.append(board)
for pos in solution[0] +[start_pos]:
    ship = list(board.ships.values())[0]
    ship.next_action = action_lookup[pos - ship.position]
    board = board.next()
    boards.append(board)
boards.append(board)
draw_game(boards)



([(3, 3), (3, 3), (3, 3), (4, 3), (4, 3), (4, 3)], 33.25)
time: 0.26666760444641113 seconds


interactive(children=(IntSlider(value=0, description='t', max=10), Checkbox(value=True, description='cell_hali…

# DFS Traveling Salesman
Any moves that are not mining don't influence the final score. The only thing that influences score is the positions we choose to mine. The distance between mining targets is the cost in time of mining there.

The halite is concentrated so most cells have neglible halite. If we ignore those cells then we are left with a relatively small number of mining targets to consider. Our solution space will be much wider than the DFS path search but much shallower so we won't have to waste as much time traversing and backtracking.

In [4]:
def score_tsp_solution(board, positions):
    score = 0.0
    halite = {pos : cell.halite for pos, cell in board.cells.items()}
    for pos in positions:
        score += halite[pos] * 0.25
        halite[pos] *= 0.75
    return score

def tsp_is_valid(board, t, t_max, position, candidate, destination):
    travel_dist = manhattan_distance(position, candidate, board.configuration.size)
    home_dist = manhattan_distance(candidate, destination, board.configuration.size) if destination is not None else 0
    if travel_dist + home_dist + 1 > t_max - t:
        return False
    return True



def find_tsp_candidates(board, t, t_max, positions, origin, destination, halite_cells):
    pos = positions[-1] if t > 0 else origin
    reachable_cells = [cell for cell in halite_cells if tsp_is_valid(board, t, t_max, pos, cell, destination)]
    return reachable_cells

def tsp_dfs_solutions(board, t, t_max, positions, origin, destination, halite_cells, timeout):
    candidates = find_tsp_candidates(board, t, t_max, positions, origin, destination, halite_cells)
    if len(candidates) == 0:
        return positions, score_tsp_solution(board, positions)
    pos = positions[-1] if t > 0 else origin
    best_solution = [], 0
    for candidate in candidates:
        cost = 1 + manhattan_distance(pos, candidate, board.configuration.size)
        next_try = positions[:] + [candidate]
        solution, score = tsp_dfs_solutions(board, t + cost, t_max, next_try, origin, destination, halite_cells, timeout)
        if score > best_solution[1]:
            best_solution = solution[:], score
        if time.time() > timeout:
            return best_solution
    return best_solution

def next_action(pos, next_pos, board_size):
    v = vector_distance(pos, next_pos, board_size)
    if v.x > 0:
        return ShipAction.EAST
    if v.y > 0:
        return ShipAction.NORTH
    if v.x < 0:
        return ShipAction.WEST
    if v.y < 0:
        return ShipAction.SOUTH
    
def get_ship(board):
    # return board.players[0].ships[0]
    return [x for x in board.ships.values() if x.player_id == 0][0]
    
board_size = 21
environment = make("halite", configuration={"size": board_size})
agent_count = 4
environment.reset(agent_count)
state = environment.state[0]
board = Board(state.observation, environment.configuration)
ship = get_ship(board)
start_pos = ship.position

boards = [board]
ship.next_action = ShipAction.CONVERT
board = board.next()
boards.append(board)
shipyard = list(board.shipyards.values())[0]
shipyard.next_action = ShipyardAction.SPAWN
board = board.next()
boards.append(board)
ship = get_ship(board)
print(ship.id)
t_max = 10
negligible_halite_threshold = 100

for i in range(3):
    halite_cells = [cell.position for cell in board.cells.values() if cell.halite > negligible_halite_threshold]

    start_time = time.time()
    halite_cells = [cell for cell in halite_cells if manhattan_distance(ship.position, cell, board.configuration.size)<t_max]
    solution, score = tsp_dfs_solutions(board, 0, t_max, [], ship.position, None, halite_cells, time.time()+1.0)
    print('time:', time.time()-start_time, 'seconds')
    print('solution:', solution)

    for pos in solution:
        while True:
            ship.next_action = next_action(ship.position, pos, board.configuration.size)
            board = board.next()
            boards.append(board)
            last_pos = ship.position
            ship= get_ship(board)
            if ship.position == start_pos and pos == start_pos:
                break
            if ship.position == pos and pos == last_pos:
                break

halite_cells = [cell.position for cell in board.cells.values() if cell.halite > negligible_halite_threshold]

start_time = time.time()
halite_cells = [cell for cell in halite_cells if manhattan_distance(ship.position, cell, board.configuration.size)<t_max]
solution, score = tsp_dfs_solutions(board, 0, t_max, [], ship.position, start_pos, halite_cells, time.time()+1.0)
print('time:', time.time()-start_time, 'seconds')
print('solution:', solution)

for pos in solution+[start_pos]:
    while True:
        ship.next_action = next_action(ship.position, pos, board.configuration.size)
        board = board.next()
        boards.append(board)
        last_pos = ship.position
        ship= get_ship(board)
        if ship.position == start_pos and pos == start_pos:
            break
        if ship.position == pos and pos == last_pos:
            break                

draw_game(boards)

2-1
time: 0.06383299827575684 seconds
solution: [(5, 12), (5, 12), (5, 12), (5, 10), (5, 10)]
time: 0.40507960319519043 seconds
solution: [(5, 10), (5, 10), (5, 10), (5, 8), (5, 8), (5, 8), (5, 8), (5, 8)]
time: 0.848074197769165 seconds
solution: [(3, 7), (3, 7), (3, 7), (3, 7), (3, 6), (3, 6)]
time: 0.0009968280792236328 seconds
solution: []


interactive(children=(IntSlider(value=0, description='t', max=43), Checkbox(value=True, description='cell_hali…

* DFS TSP can get away with a higher t_max than DFS Path Mining because we ignore paths that go to negligible halite
* defining neglible halite is tricky
* Manhattan distance gets hit a lot harder in TSP, so a distance table will probably speed things up a lot
* DFS can easily keep track of the best solution so far so timeouts work pretty well
  * It would work even better if we searched solutions in a more useful order (I suppose that's A*)
* If you just can't hit a high enough t_max, you can chain solutions together
  * One solution without constraining endpoint, another solution to constrain the endpoint to home
  * This risks missing a global maximum because the first maximum takes you in the wrong direction

## Next Steps
### Tighten solution
Many solutions are pretty much equivalent, find a canonical form that is optimal.

* If the order of destinations is known, the optimal amount of time to mine each one is pretty easily solvable.
* If the destinations are known, there is a shortest order, but it can be expensive to calculate
  * Good solutions usually have a relatively short number of destinations.

### Neighboring Solutions
What are the smallest number of changes that can be made to a solution that with enough of those changes can cover all solutions?

* Remove one of the destinations
* Add a destination
* Swap the order of destinations

### Hill Climbing
Given a solution, find the adjacent solution that has the highest score.

### Simulated Annealing
Given a solution and a lower score bound N, find a random solution that has at least a score of N.

