# Execution environment

In [113]:
!pip install kaggle-environments --upgrade
print("Import started")
from kaggle_environments import make
from kaggle_environments.envs.halite.helpers import *
import random
import numpy as np
from scipy.optimize import linear_sum_assignment
from queue import PriorityQueue
print("Import ended")

Requirement already up-to-date: kaggle-environments in /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages (1.0.10)
Import started
Import ended


# Test Environment

In [121]:
environment = make("halite", configuration={"size": 21, "startingHalite": 25000}, debug=True)
agent_count = 2
environment.reset(agent_count)
state = environment.state[0]
board = Board(state.observation, environment.configuration)

# Framework

## Static
Static

## Navigation
Contains helper functions related to *Points* and *Movement*

## Calculator
Encodes *Board* to numpy array and runs most computationally intensive calculations and heuristics.

In [122]:
# Static
init = False
nav, calc = None, None

#TODO: Move CFG to static?


class Navigation:

    # Helper
    def __init__(self, board: Board):
        self.CFG = board.configuration

    def dist(self, a: Point, b: Point) -> int:
        return min(abs(a.x - b.x), self.CFG.size - abs(a.x - b.x)) + min(abs(a.y - b.y), self.CFG.size - abs(a.y - b.y))

    def directionTo(self, s: Point, t: Point) -> ShipAction:
        candidate = []  # [N/S, E/W]
        if s.x - t.x != 0:
            candidate.append(ShipAction.WEST if (s.x - t.x) % self.CFG.size < (t.x - s.x) % self.CFG.size else ShipAction.EAST)
        if s.y - t.y != 0:
            candidate.append(ShipAction.SOUTH if (s.y - t.y) % self.CFG.size < (t.y - s.y) % self.CFG.size else ShipAction.NORTH)
        return random.choice(candidate) if len(candidate) > 0 else None

    def unpack(self, n):
        return Point(n // self.CFG.size, n % self.CFG.size)

    # Navigation

    def update(self):
        self.next = np.zeros((self.CFG.size,self.CFG.size))
    

    def safeMoveTo(self, s : Ship, t : Point): #A* Movement. Suggested move by priority.

        sPos = s.position
        # Stay still
        if sPos == t:
            self.next[sPos.x][sPos.y] = 1
            return None

        threshold = s.halite
        enemyBlock = np.where(calc.enemyShipHalite <= threshold, 1, 0)
        enemyBlock = enemyBlock + calc.enemyShipyard

        #Temporary: Delete when swapping system done
        enemyBlock = enemyBlock + calc.ally

        blocked = self.next + enemyBlock
        blocked = np.where(blocked>0,1,0) #Bit redundant, but be safe
        #TODO: Improve obstacle calculation

        #A*
        pred = {}
        dist = {}
        pq = PriorityQueue()
        pqMap = {}

        pqMap[self.dist(sPos,t)] = [sPos]
        pq.put(self.dist(sPos,t))
        pred[sPos] = sPos
        dist[sPos] = 0

            # Main

        while not pq.empty():
            if t in dist:
                break
            currentPoint = pqMap.get(pq.get()).pop()
            for offX in range(-1,2):
                for offY in range(-1,2):
                    processPoint = currentPoint.translate(Point(offX,offY),self.CFG.size)
                    if self.dist(currentPoint,processPoint) != 1 or blocked[processPoint.x][processPoint.y] or processPoint in dist: 
                        continue
                    dist[processPoint] = dist[currentPoint] + 1
                    priority =  dist[processPoint] + self.dist(processPoint,t)
                    pqMap[priority] = pqMap.get(priority,[])
                    pqMap[priority].append(processPoint)
                    pq.put(priority)
                    pred[processPoint] = currentPoint

            # Path reconstruction
        
        #TODO: Catch this exception. Or make sure this never happens. Don't just move randomly.
        if blocked[t.x][t.y]:
            print("Blocked!")
            #Random move
            block = 0
            for offX in range(-1,2):
                for offY in range(-1,2):
                    processPoint = sPos.translate(Point(offX,offY),self.CFG.size)
                    if self.dist(sPos,processPoint) != 1:
                        continue
                    if not blocked[processPoint.x][processPoint.y]:
                        return self.directionTo(sPos,processPoint)
            return None

        while pred[t] != sPos:
            t = pred[t]

        desired = self.directionTo(sPos,t)
        self.next[t.x][t.y] = 1
        # Swapping
        if calc.ally[t.x][t.y]:
            #TODO: Do swapping
            pass
        

        return desired

class Calculator:
    def __init__(self, board: Board):
        self.CFG = board.configuration
        self.me = board.current_player_id
        self.playerNum = len(board.players)

    def update(self, board: Board):
        # Updates
        self.board = board

        # Encoding
        self.encode()

        # Calculate
        self.haliteMean = np.mean(self.haliteMap, axis=None)
        self.ally = self.shipMap[self.me]
        self.allyShipyard = self.shipyardMap[self.me]
        self.enemy = np.sum(self.shipMap, axis=0) - self.ally
        self.enemyShipyard = np.sum(self.shipyardMap, axis=0) - self.allyShipyard
        self.enemyShipHaliteMap()

    # Encodes halite and units to matrices
    def encode(self) -> dict:
        # Map
        self.haliteMap = np.zeros((self.CFG.size, self.CFG.size))
        self.shipMap = np.zeros((self.playerNum, self.CFG.size, self.CFG.size))
        self.shipyardMap = np.zeros((self.playerNum, self.CFG.size, self.CFG.size))
        for cell in self.board.cells.values():
            self.haliteMap[cell.position.x][cell.position.y] = cell.halite
        for ship in self.board.ships.values():
            self.shipMap[ship.player_id][ship.position.x][ship.position.y] = 1
        for shipyard in self.board.shipyards.values():
            self.shipyardMap[shipyard.player_id][shipyard.position.x][shipyard.position.y] = 1

        # TODO: Add encoding for individual ships and yards (not necessary now)
    
    # Calculations
    
    def enemyShipHaliteMap(self):
        self.enemyShipHalite = np.zeros((self.CFG.size, self.CFG.size))
        self.enemyShipHalite += np.Infinity
        for ship in self.board.ships.values():
            if ship.player_id != self.me:
                self.enemyShipHalite[ship.position.x][ship.position.y] = ship.halite

    def controlMap(self): # TODO: rename or refactor
        # TODO: Consider enemyShipHalite and shipyards
        self.controlMap = self.ally - self.enemy
        # TODO: avg pooling
    
    



# Agent

In [123]:
def cost(ship, cell):
    # TODO: much to improve
    cfg = environment.configuration
    haliteCoef = cfg.size / cfg.maxCellHalite
    return nav.dist(ship.position, cell.position) - haliteCoef * cell.halite

@board_agent
def agent(board):
    global init, nav, calc
    if not init:
        init = True
        nav = Navigation(board)
        calc = Calculator(board)

    # Process map
    calc.update(board)
    nav.update()
    ships = board.current_player.ships
    shipyards = board.current_player.shipyards

    # Decide tasks 
    miningCells = calc.haliteMap

        # Terrible mining algorithm, should probably come up with something entirely new
    assign = []
    for i, ship in enumerate(ships):
        if ship.cell.halite >= calc.haliteMean / 2:
            pass
        else:
            if ship.halite > 500 and len(shipyards) > 0:
                ship.next_action = nav.directionTo(ship.position, shipyards[0].position)
            else:
                assign.append(ship)

    miningCells = np.argpartition(miningCells, -len(assign),axis=None)[-len(assign):]
    miningCells = miningCells.tolist()
    miningCells = [board.cells[nav.unpack(i)] for i in miningCells]

    costMatrix = np.array([[cost(ship, cell) for ship in assign] for cell in miningCells])
    tasks, _ = linear_sum_assignment(costMatrix)
    j = 0
    for ship in assign:
        ship.next_action = nav.safeMoveTo(ship,miningCells[tasks[j]].position)
        j+=1

    if len(shipyards) == 0:
        ships[0].next_action = ShipAction.CONVERT
    for shipyard in shipyards:
        if shipyard.cell.ship is None:
            shipyard.next_action = ShipyardAction.SPAWN

# Run

In [127]:
environment.reset(agent_count)
environment.run([agent, agent])
environment.render(mode="ipython", width=500, height=450)

Blocked!
Blocked!
Blocked!
Blocked!
Blocked!
Blocked!
Blocked!
Blocked!
Blocked!
Blocked!
Blocked!
Blocked!
Blocked!


In [None]:
environment.configuration