<a href="https://colab.research.google.com/github/sddyshou/lux-ai/blob/main/Lux_ai_p1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install kaggle-environments -U



In [2]:
# from kaggle_environments import make

In [3]:
# env = make("lux_ai_2021", configuration={"seed": 562124210, "loglevel": 2, "annotations": True}, debug=True)

# print(list(env.agents))

In [4]:
# run a match between two simple agents, which are the agents we will walk you through on how to build!
# steps = env.run(["simple_agent", "simple_agent"])

# if you are viewing this outside of the interactive jupyter notebook / kaggle notebooks mode, this may look cutoff
# render the game, feel free to change width and height to your liking. We recommend keeping them as large as possible for better quality.
# you may also want to close the output of this render cell or else the notebook might get laggy
# env.render(mode="ipython", width=1200, height=800)

In [5]:
!cp -r 'drive/MyDrive/Colab Notebooks/Kaggle/Lux-ai/lux-lib/lux' .

In [6]:
# resource block
#

# TODO
## M1: release it (1W)
1. Clustering algorithm V1 (3D) - **DONE**
1. Conflict detection (2D) - **DONE**
1. Bug: Unit move out of city at night (bonus) (1D)
1. Global state (0.5D) - **DONE**
1. Assign First Worker as builder
1. Don't die in random map

## M2: make it better
1. Smarter res collection strategy
1. Smarter path finding strategy (1D)
1. Research strategy
1. Collecting other res
1. support 2+ clusters
1. return to tile with least resource
1. Clustering algorithm V2 (3D)

## M3: make it combat
1. Kill oponent

## M4: icing on the top

1. Dynamic clustering algorithm, cluster changes
1. Use Cart
1. Smarter routing algorithm
1. Delete a tile
1. implement Builder, Collector



In [80]:
from typing import Dict
# for kaggle-environments
from kaggle_environments import make
from lux.game import Game
from lux.game_map import Cell, RESOURCE_TYPES, Position
from lux.game_objects import Player, City, CityTile, Cargo, Unit
from lux.constants import Constants
from lux.game_constants import GAME_CONSTANTS
from lux import annotate
import math
import sys
from collections import deque
import heapq

DIRECTIONS = Constants.DIRECTIONS
RESOURCE_TYPES = Constants.RESOURCE_TYPES
PARAMS = GAME_CONSTANTS["PARAMETERS"]

CHECK_DIRS = [
    DIRECTIONS.NORTH,
    DIRECTIONS.EAST,
    DIRECTIONS.SOUTH,
    DIRECTIONS.WEST,
]

RESOURCE_TEXT = {
    "wood": "WOOD",
    "coal": "COAL",
    "uranium": "URANIUM"
}

game_manager = None
unit_service = None
turn_state = None

def log(message):
    global turn_state
    print("{}-{} | {}".format("DAY" if turn_state.is_day() else "NIGHT", turn_state.turn, message))

class TurnState:

    def __init__(self, game_state, observation, configuration):
        self.game_state = game_state
        self.width, self.height = game_state.map.width, game_state.map.height
        self.day_length, self.night_length = PARAMS["DAY_LENGTH"], PARAMS["NIGHT_LENGTH"]
        self.turn_length = self.day_length + self.night_length
        self.update(observation, configuration)
        self.player_start_pos = self.player.units[0].pos
        self.opponent_start_pos = self.opponent.units[0].pos

    def update(self, observation, configuration):
        self.game_state._update(observation["updates"])
        self.turn = observation["step"]
        self.player = self.game_state.players[observation.player]
        self.opponent = self.game_state.players[(observation.player + 1) % 2]
        self.map = self.game_state.map
    
    def is_day(self):
        return self.turn % self.turn_length <= self.day_length

class CityTileRepo:
    global turn_state

    def __init__(self, citytiles_plan, citytiles_pos):
        self.citytiles_plan : List[Position] = []
        self.citytiles_pos : List[Position] = []
        self.citytiles_pos_to_build : List[Position] = []
        self.citytiles : List[CityTile] = []
        self.hip_citytiles = set()
        self.lop_citytiles = set()

        self.citytiles_plan.extend(citytiles_plan)
        self.citytiles_pos.extend(citytiles_pos)
        self.visited_citytile_ids = set()
        self.update()
    
    def update(self):
        self.citytiles = []
        if self.citytiles_pos_to_build:
            self.citytiles_pos.extend(self.citytiles_pos_to_build)
            self.citytiles_pos_to_build.clear()

        self.hip_citytiles.clear()
        self.lop_citytiles.clear()
        new_citytiles_pos = []
        for pos in self.citytiles_pos:
            tile = self._fetch_citytile(pos)
            if tile is not None:
                self.citytiles.append(tile)
                new_citytiles_pos.append(tile.pos)
                city = turn_state.player.cities[tile.cityid]
                if self._is_high_priority(city):
                    self.hip_citytiles.add(tile)
                else:
                    self.lop_citytiles.add(tile)

        self.citytiles_pos = new_citytiles_pos
        # self.citytiles_pos.sort(key=lambda pos : turn_state.player.cities[self._fetch_citytile(pos).cityid].fuel) # sort by resource 

        self.visited_citytile_ids.clear()
    
    def add_citytile(self, pos):
        self.citytiles_pos_to_build.append(pos)

    def get_citytile_count(self):
        return len(self.citytiles_pos)
    
    def get_eot_citytile_count(self):
        return self.get_citytile_count + len(self.citytiles_pos_to_build)

    def get_unvisited_tiles(self):
        hip_unvisited_tile = [x for x in self.hip_citytiles if x.pos not in self.visited_citytile_ids]
        if hip_unvisited_tile:
            return hip_unvisited_tile
        return [x for x in self.lop_citytiles if x.pos not in self.visited_citytile_ids]

    def mark_visited(self, citytile):
        self.visited_citytile_ids.add(citytile.pos)

    def _fetch_citytile(self, pos):
        return turn_state.map.get_cell_by_pos(pos).citytile

    def _is_high_priority(self, city):
        num_nights_left = turn_state.night_length - max(0, turn_state.turn % turn_state.turn_length - turn_state.day_length)
        total_res_needed = num_nights_left * city.light_upkeep
        return total_res_needed > city.fuel


class UnitService:
    def __init__(self, turn_state, observation, configuration):
        self.unit_pos_map : List[List[Unit]] = []
        self.unit_id_map : Dict[str, Unit] = {}
        self.player = None
        self.update(turn_state)

    def update(self, turn_state):
        game_state = turn_state.game_state
        self.player = turn_state.player
        self.width, self.height = game_state.map.width, game_state.map.height
        self.unit_pos_map : List[List[Unit]] = [None] * self.height
        self.eot_unit_pos_map : List[List[Unit]] = [None] * self.height
        self.unit_id_map.clear()
        for y in range(0, self.height):
            self.unit_pos_map[y] = [None] * self.width
            self.eot_unit_pos_map[y] = [None] * self.width

        for unit in self.player.units:
            self.unit_pos_map[unit.pos.y][unit.pos.x] = unit
            self.eot_unit_pos_map[unit.pos.y][unit.pos.x] = unit
            self.unit_id_map[unit.id] = unit

    def find_unit_by_pos(self, pos, eot = True):
        return self.eot_unit_pos_map[pos.y][pos.x] if eot else self.unit_pos_map[pos.y][pos.x]
    
    def find_unit_by_id(self, id):
        return self.unit_id_map.get(id)
    
    def update_unit_pos(self, unit, new_pos):
        self.eot_unit_pos_map[unit.pos.y][unit.pos.x] = None
        self.eot_unit_pos_map[new_pos.y][new_pos.x] = Unit

class MapUtils:
    global unit_service
    global turn_state

    @staticmethod
    def direction_to(start_pos, end_pos, map, avoid_citytile = False, avoid_unit = True, max_dist = 5):
        """
        this function finds the shortest path to end_pos using BFS
        It will return two values: 
        1. next_direction, None if max_depth exceeded
        2. distance to end_pos, -1 if max_depth exceeded, 0 if start_pos = end_pos
        """
        width, height = map.width, map.height
        visited = [[None for j in range(width)] for i in range(height)]
        if start_pos.equals(end_pos):
            return DIRECTIONS.CENTER, 0
        queue = deque()
        queue.append(start_pos)
        dist = 0
        while(queue and dist <= max_dist):
            size = len(queue)
            for i in range(size):
                cur_pos = queue.popleft()
                if cur_pos.equals(end_pos):
                    return visited[cur_pos.x][cur_pos.y], dist
                
                for dir in CHECK_DIRS:
                    new_pos = cur_pos.translate(dir, 1)
                    if ( MapUtils.is_valid(new_pos, avoid_citytile, avoid_unit) and 
                        visited[new_pos.x][new_pos.y] is None):
                        visited[new_pos.x][new_pos.y] = visited[cur_pos.x][cur_pos.y] if dist!=0 else dir
                        queue.append(new_pos)

            dist += 1

        return None, -1 # max_depth exceeded or can't get to end_pos
    
    @staticmethod
    def is_valid(pos, avoid_citytile, avoid_unit, avoid_resource = False):
        width, height, map = turn_state.width, turn_state.height, turn_state.map
        if pos.x <0 or pos.x >= width or pos.y <0 or pos.y >= height: # out of map
            return False
        cell = map.get_cell_by_pos(pos)
        if ((cell.citytile is not None and cell.citytile.team == turn_state.opponent.team) or # opponent tile
            (avoid_citytile and cell.citytile is not None) or # hit a city tile
            (avoid_unit and unit_service.find_unit_by_pos(pos) is not None) or # hit a unit
            (avoid_resource and cell.has_resource())): # hit a resource
            return False
        return True
    
    @staticmethod
    def find_resources(cluster):
        '''
        finds all resources stored on the map and puts them into a list so we can search over them
        '''
        resource_tiles: list[Cell] = []
        width, height = turn_state.width, turn_state.height
        for y in range(height):
            for x in range(width):
                cell = turn_state.map.get_cell(x, y)
                if cell.has_resource():
                    resource_tiles.append(cell)
        return resource_tiles

class ResourceBlock: 
    def __init__(self):
        pass

class ClusterStrategy:
    global unit_service, turn_state

    @staticmethod
    def create_clusters(pos, radius=10):
        '''
        find all clusters within x radius of pos
        '''
        width, height, map = turn_state.width, turn_state.height, turn_state.map
        score_data = [[0 for j in range(width)] for i in range(height)]

        for y in range(height):
            for x in range(width):
                cell = map.get_cell(x, y)
                if cell.has_resource() and cell.resource.type == GAME_CONSTANTS['RESOURCE_TYPES']['WOOD']:
                    ClusterStrategy._score_resource(score_data, cell, map)

        
        pos_score = []
        radius = max(width, height)
        for y in range(max(pos.y-radius, 0), min(pos.y+radius, height)):
            for x in range(max(pos.x-radius, 0), min(pos.x+radius, width)):
                if score_data[x][y] > 0:
                    pos_score.append((Position(x, y), score_data[x][y]))
        
        best_pos = heapq.nlargest(4, pos_score, key=lambda e:e[1])
        for bp in best_pos:
            print("{} score is {}".format(bp[0], bp[1]))
        citytile_plan = [x[0] for x in best_pos]

        citytile_repo = CityTileRepo(citytile_plan, [pos])
        return Cluster("cluster1", citytile_repo)

    _score_resource_d1 = [1, -1]

    @staticmethod
    def _score_resource(score_data, cell, map, score_radius=5):
        if not cell.has_resource():
            return
        pos = cell.pos
        per_res_fuel = PARAMS["RESOURCE_TO_FUEL_RATE"][RESOURCE_TEXT[cell.resource.type]]
        per_dist_score = cell.resource.amount * per_res_fuel / score_radius
        for i in range(score_radius+1):
            for j in range(score_radius+1):
                dist = i+j
                if dist > score_radius:
                    break
                if dist == 0:
                    continue
                for k1 in ClusterStrategy._score_resource_d1:
                    for k2 in ClusterStrategy._score_resource_d1:
                        x1 = pos.x + i*k1
                        y1 = pos.y + j*k2
                        new_pos = Position(x1, y1)
                        if (MapUtils.is_valid(Position(x1, y1), False, False, avoid_resource=True) and 
                            turn_state.player_start_pos.distance_to(new_pos) <= turn_state.opponent_start_pos.distance_to(new_pos)
                            ):
                            score = per_dist_score * (score_radius - dist + 1)
                            if i==0 or j ==0:
                                score = score/2
                            score_data[x1][y1] += score

class CityTileStrategy:
    global turn_state

    @staticmethod
    def find_return_resource_citytile(pos, citytile_repo):

        # return citytile_repo.citytiles.pop()
        return CityTileStrategy.find_closest_citytile(pos, citytile_repo)

    @staticmethod
    def find_closest_citytile(pos, citytile_repo):
        tiles = citytile_repo.get_unvisited_tiles()
        if len(tiles) > 0:
            closest_city_tile = None
            closest_dist = math.inf
            for city_tile in tiles:
                dist = city_tile.pos.distance_to(pos)
                if dist < closest_dist:
                    closest_dist = dist
                    closest_city_tile = city_tile
            return closest_city_tile 
        else:
            return None
    
    @staticmethod
    def find_next_building_location(unit, citytile_repo):
        # getting from cluster
        for pos in citytile_repo.citytiles_plan:
            new_tile = turn_state.map.get_cell_by_pos(pos)
            if not new_tile.has_resource() and new_tile.citytile is None:
                    return new_tile

        # for city in self.player.cities.values():
        #     for tile in city.citytiles:
        #         newpos = tile.pos.translate(Constants.DIRECTIONS.NORTH, 1)
        #         new_tile = game_manager.game_state.map.get_cell_by_pos(newpos)
        #         if not new_tile.has_resource() and new_tile.citytile is None:
        #             return new_tile

        # return None
        # res = self.find_closest_resources(unit)

        # for direction in GameManager.CHECK_DIRS:
        #     newpos = unit.pos.translate(direction, 1)
        #     new_tile = game_manager.game_state.map.get_cell_by_pos(newpos)
        #     if not new_tile.has_resource() and new_tile.citytile is None:
        #         return new_tile
        return None

class Cluster:
    global unit_service
    global turn_state
    
    def __init__(self, name, citytile_repo):
        # fixed config
        self.max_unit_count = 10
        self.name = name
        self.total_resource = 10
        self.resource_blocks = []

        self.turn_state = turn_state
        self.citytile_repo = citytile_repo

        self.collectors_to_build : List[Position] = []
        self.collectors_ids : List[str] = []
        self.collectors : List[str] = []

        self.actions = []
        self.builder_id = 'u_1'
        self.builder = None

    def update(self): 
        self.game_state = self.turn_state.game_state
        self.player = self.turn_state.player

        # update citytiles
        self.citytile_repo.update()
        
        # update collectors
        self.collectors = []
        new_collectors_ids = []
        for id in self.collectors_ids:
            unit = unit_service.find_unit_by_id(id)
            if unit is not None:
                self.collectors.append(unit) 
                new_collectors_ids.append(unit.id)
            else:
                log("losing existing unit {}".format(id))

        for pos in self.collectors_to_build:
            unit = unit_service.find_unit_by_pos(pos)
            if unit is not None:
                log("adding new unit {} at {}".format(pos, unit.id))
                self.collectors.append(unit)
                new_collectors_ids.append(unit.id)
            else:
                log("losing new unit at {}".format(pos))

        self.collectors_ids = new_collectors_ids
        self.collectors_to_build = []

        # update builder
        self.builder = None if self.builder_id is None else unit_service.find_unit_by_id(self.builder_id)        

        self.actions = [] # refresh actions

        self.map = self.game_state.map
        self.resource_cells = MapUtils.find_resources(self)
        self.unit_service = unit_service

        self.citytiles_to_build = []
        self.collectors_to_build = []

    
    def next_turn(self):
        self.update()
        self.builder_action()
        self.collector_action()
        self.city_tile_action()
        for pos in self.citytile_repo.citytiles_plan:
            self.actions.append(annotate.circle(pos.x, pos.y))
        
        return self.actions
    
    def collector_action(self):
        for unit in self.collectors:
            self._collect_resource(unit, avoid_citytile=False)
    
    def _collect_resource(self, unit, avoid_citytile):
        if unit.can_act():
            citytile = CityTileStrategy.find_return_resource_citytile(unit.pos, self.citytile_repo)
            if citytile is None:
                return
            self.actions.append(annotate.line(unit.pos.x, unit.pos.y, citytile.pos.x, citytile.pos.y))
            if self._should_mine_res(unit):
                resource_cell = self.find_closest_resources(citytile.pos, avoid_citytile)
                if resource_cell is not None:
                    if resource_cell.has_resource():
                        self.resource_cells.remove(resource_cell)
                    self.move_unit(unit, resource_cell)
            else:
                self.citytile_repo.mark_visited(citytile)
                self.move_unit(unit, citytile)
    
    def _should_mine_res(self, unit):
        if turn_state.is_day():
            return unit.get_cargo_space_left() > 0
        else:
            return unit.get_cargo_space_left() > 10

    
    def builder_action(self):
        if self.builder is not None and self.builder.can_act():
            building_tile = CityTileStrategy.find_next_building_location(self.builder, self.citytile_repo)
            self.actions.append(annotate.x(self.builder.pos.x, self.builder.pos.y))
            if building_tile is not None: # building something
                if self._should_mine_res(self.builder):
                    resource_cell = self.find_closest_resources(building_tile.pos, avoid_citytile=True)
                    # log("Builder {} collect resource at {}".format(self.builder.id, resource_tile))
                    if resource_cell is not None:
                        if resource_cell.has_resource():
                            self.resource_cells.remove(resource_cell)
                        self.move_unit(self.builder, resource_cell, avoid_citytile=True)
                else:
                    # log("Builder {} build tile on at {}".format(self.builder.id, building_tile))
                    if self.builder.pos.equals(building_tile.pos):
                        if self.builder.can_build(self.map):
                            self.build_city(self.builder)
                            log("build a new tile at {}".format(building_tile.pos))
                    else:
                        self.move_unit(self.builder, building_tile, avoid_citytile=True)
                self.actions.append(annotate.line(self.builder.pos.x, self.builder.pos.y, building_tile.pos.x, building_tile.pos.y))
            else: # not building block left, just collect
                self._collect_resource(self.builder, avoid_citytile=False)


    def city_tile_action(self):
        # log("There are {} tiles, {} units".format(self.citytile_repo.get_citytile_count(), self.get_unit_count()))
        for tile in self.citytile_repo.citytiles:
            if tile.can_act():
                if ( self.get_unit_count() < self.citytile_repo.get_citytile_count() and 
                    self.unit_service.find_unit_by_pos(tile.pos) is None and 
                    self.get_unit_count() < self.max_unit_count) :
                    self.build_worker(tile)
                else:
                    self.actions.append(tile.research())


    def find_closest_resources(self, pos, avoid_citytile = False):
        '''
        finds the closest resources that we can mine given position on a map
        '''
        cell = self.map.get_cell_by_pos(pos)
        if not avoid_citytile and cell.citytile is not None and self.has_res_surrounding(pos):
            return cell # stay same place

        closest_dist = math.inf
        closest_resource_tile = None
        for resource_tile in self.resource_cells:
            # we skip over resources that we can't mine due to not having researched them
            if resource_tile.resource.type == Constants.RESOURCE_TYPES.COAL and not self.player.researched_coal(): continue
            if resource_tile.resource.type == Constants.RESOURCE_TYPES.URANIUM and not self.player.researched_uranium(): continue
            dist = resource_tile.pos.distance_to(pos)
            if dist < closest_dist:
                closest_dist = dist
                closest_resource_tile = resource_tile
        return closest_resource_tile
    
    def has_res_surrounding(self, pos):
        for dir in CHECK_DIRS:
            new_pos = pos.translate(dir, 1)
            cell = self.map.get_cell_by_pos(new_pos)
            # log("checking surrounding for pos {}, new_pos {}, res {}".format(pos, new_pos, cell.has_resource()))
            if cell.has_resource():
                return True
        return False

    def build_city(self, unit):
        self.actions.append(unit.build_city())
        self.citytile_repo.add_citytile(unit.pos)

    def build_worker(self, tile):
        self.actions.append(tile.build_worker())
        self.collectors_to_build.append(tile.pos)
    
    def move_unit(self, unit, cell, avoid_citytile=False):
        ## Need to improve move unit logic
        # new_dir = unit.pos.direction_to(cell.pos)
        new_dir, dist = MapUtils.direction_to(unit.pos, cell.pos, self.map, avoid_citytile)
        if new_dir is None:
            # log("Unit {} has no move".format(unit.id))
            dist = -1
        else:
            new_pos = unit.pos.translate(new_dir, 1)
            unit_service.update_unit_pos(unit, new_pos)
            # log("unit {} move from {} to {}".format(unit.id, unit.pos, new_pos))
            action = unit.move(new_dir)
            self.actions.append(action)
        

    def get_unit_count(self):
        return len(self.collectors_to_build) + len(self.collectors) + (0 if self.builder is None else 1)

class Builder(Unit):
    pass

class Collecter(Unit):
    pass

class GameManager:

    def __init__(self, turn_state, unit_service, observation, configuration):
        self.turn_state = turn_state
        self.unit_service = unit_service
        self.cluster = ClusterStrategy.create_clusters(self.turn_state.player.units[0].pos)

    def update(self, observation, configuration):
        self.turn_state.update(observation, configuration)
        self.unit_service.update(self.turn_state)

    def next_turn(self, observation):
        self.actions = []
        self.actions.extend(self.cluster.next_turn())                
        return self.actions

def agent(observation, configuration):
    global game_manager
    global unit_service
    global turn_state

    ### Do not edit ###
    if observation["step"] == 0:
        game_state = Game()
        game_state._initialize(observation["updates"])
        game_state._update(observation["updates"][2:])
        game_state.id = observation.player
        turn_state = TurnState(game_state, observation, configuration)
        unit_service = UnitService(turn_state, observation, configuration)
        game_manager = GameManager(turn_state, unit_service, observation, configuration)
        print("Agent is running!", file=sys.stderr)
    else:
        game_manager.update(observation, configuration)
    

    ### AI Code goes down here! ###          

    actions = game_manager.next_turn(observation)
    
    return actions

In [81]:
env = make("lux_ai_2021", configuration={"seed": 92590156, "loglevel": 2, "annotations": True}, debug=True) #"seed": 562124210
steps = env.run([agent, "simple_agent"])
env.render(mode="ipython", width=1200, height=800)

(4, 18) score is 2337.4
(3, 19) score is 2337.4
(2, 17) score is 2240.0
(5, 18) score is 2124.2
Agent is running!
DAY-6 | build a new tile at (4, 18)
DAY-8 | adding new unit (4, 18) at u_3
DAY-11 | build a new tile at (3, 19)
DAY-13 | adding new unit (3, 19) at u_4
DAY-22 | build a new tile at (5, 18)
DAY-24 | adding new unit (5, 18) at u_5
NIGHT-33 | adding new unit (3, 19) at u_6
NIGHT-74 | losing existing unit u_5
NIGHT-78 | adding new unit (4, 18) at u_7
NIGHT-79 | losing existing unit u_7
DAY-83 | adding new unit (3, 19) at u_8
NIGHT-156 | losing existing unit u_4
NIGHT-191 | losing existing unit u_6
NIGHT-191 | losing existing unit u_8
NIGHT-194 | adding new unit (5, 18) at u_9
NIGHT-195 | losing existing unit u_9
NIGHT-198 | adding new unit (4, 18) at u_10
DAY-200 | losing existing unit u_10
DAY-201 | adding new unit (2, 17) at u_11
DAY-204 | adding new unit (5, 18) at u_12
NIGHT-271 | losing existing unit u_12
NIGHT-272 | losing existing unit u_3
NIGHT-274 | adding new unit (5,