https://www.kaggle.com/jnesbit6/pytorch-deep-q-learning-train

In [1]:
!pip install kaggle-environments -U
!git clone https://github.com/Lux-AI-Challenge/Lux-Design-2021.git
!cp -r ./Lux-Design-2021/kits/python/simple/lux .

fatal: destination path 'Lux-Design-2021' already exists and is not an empty directory.


In [2]:
from kaggle_environments import make
import json

Loading environment football failed: No module named 'gfootball'


In [3]:
from lux.game import Game
from lux.game_map import Cell, RESOURCE_TYPES, Position
from lux.game_objects import Unit
from lux.constants import Constants
from lux.game_constants import GAME_CONSTANTS
from lux import annotate

In [4]:
import math, sys
import random
import numpy as np
import matplotlib.pyplot as plt

from collections import namedtuple, deque
from itertools import count
from PIL import Image

In [5]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T

In [6]:
DIRECTIONS = Constants.DIRECTIONS
game_state = None

In [7]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [8]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))

In [9]:
class ReplayMemory(object):
    def __init__(self, capacity):
        # memory를 deque로 만들어둠
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        # args에 뭐가 들어올지는 아직 잘 모르겠음
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        # 가지고 있는 memory에서 sampling
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [10]:
class DQN(nn.Module):
    def __init__(self, h, w, outputs):
        super(DQN, self).__init__()
        
        self.conv1 = nn.Conv2d(10, 32, kernel_size=4, stride=2)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=4, stride=2)
        self.bn3 = nn.BatchNorm2d(64)

        # 마지막 linear function을 위해 conv를 한번 할 때마다 feature map의 size를 구함
        def conv2d_size_out(size, kernel_size = 4, stride=2):
            return (size - (kernel_size - 1) - 1) // stride + 1

        convw = conv2d_size_out(conv2d_size_out(conv2d_size_out(w)))
        convh = conv2d_size_out(conv2d_size_out(conv2d_size_out(h)))

        linear_input_size = convw * convh * 64

        self.head = nn.Linear(linear_input_size, outputs)

    def forward(self, x):
        x = x.to(device)
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        return self.head(x.view(x.size(0), -1))

In [11]:
def get_screen(game_state, unit = 0):
    w, h = game_state.map.width, game_state.map.height

    #  map에서 resource feature 추출
    M = [ [0  if game_state.map.map[j][i].resource==None else game_state.map.map[j][i].resource.amount for i in range(w)]  for j in range(h)]
    M = np.array(M).reshape((w,h,1))

    # unit feature 추출
    U = [ [[0,0,0,0,0] for i in range(w)]  for j in range(h)]
    units = game_state.players[0].units
    for i in units:
        U[i.pos.y][i.pos.x] = [i.type,i.cooldown,i.cargo.wood,i.cargo.coal,i.cargo.uranium]

    # 아래 if 문이 왜 있는지 모르겠음
    if type(unit) != int:
        U[unit.pos.y][unit.pos.x] = [unit.type+100,unit.cooldown,unit.cargo.wood,unit.cargo.coal,unit.cargo.uranium]

    U = np.array(U)

    # 왜 굳이 상대방 city를 feature 로 삼는걸까...
    e = game_state.players[1].cities
    C = [ [[0,0,0,0] for i in range(w)]  for j in range(h)]
    for k in e:
        citytiles = e[k].citytiles
        for i in citytiles:
            C[i.pos.y][i.pos.x] = [i.cooldown,e[k].fuel,e[k].light_upkeep,e[k].team]
    C = np.array(C)
    E = np.dstack([M,U,C])

    # M : 1차원 U : 5차원 C : 4차원
    return torch.tensor(E,dtype=torch.float).reshape([1, 10, 32, 32])

In [12]:
# y가 주어지면, i라는 unit에 action을 할당하는 작업
def get_prediction_actions(y,i):
    # move
    aactions = []
    # 만약 4보다 같거나 작으면 csnwe 중에 하나
    if y <= 4:
        d = "csnwe"[y]
        if i.can_act():aactions = i.move(d)
    # 5인 경우 build
    elif y==5 and i.can_build(game_state.map):aactions = i.build_city()
    # 6인 경우 pillage
    elif y==6:aactions = i.pillage()
        
    return aactions,1

In [13]:
BATCH_SIZE = 128
GAMMA = 0.8
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 200
TARGET_UPDATE = 10

In [14]:
screen_height, screen_width = 32, 32

# 취할 수 있는 행위가 총 7종류
n_actions = 7

policy_net = DQN(screen_height, screen_width, n_actions).to(device)
# policy_net.load_state_dict()

target_net = DQN(screen_height, screen_width, n_actions).to(device)
# target_net.load_state_dict()
target_net.eval()

optimizer = optim.RMSprop(policy_net.parameters())
memory = ReplayMemory(10000)

total_reward = 0
first = 0
steps_done = 0
last = 0

In [15]:
def select_action(state):
    global steps_done
    sample = random.random()

    eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1

    if sample > eps_threshold:
        with torch.no_grad():
            return policy_net(state).max(1)[1].view(1, 1)
    # exploration
    else:
        return torch.tensor([[random.randrange(n_actions)]], device=device, dtype= torch.long)

In [16]:
def optimize_model():
    # BATCH_SIZE만큼 데이터가 안쌓이면 업데이트 안함
    if len(memory) < BATCH_SIZE:
        return

    # memory에서 꺼내옴
    transitions = memory.sample(BATCH_SIZE)
    batch = Transition(*zip(*transitions))

    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])

    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    state_action_values = policy_net(state_batch).gather(1, action_batch)

    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()

    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

    optimizer.zero_grad()
    loss.backward()
    for param in policy_net.parameters():
        param.grad.data.clamp_(-1, 1)
    optimizer.step()

In [17]:
def agent(observation, configuration):
    global game_state, done, memory, action, state, reward, first, total_reward, last

    ### Do not edit ###
    if observation["step"] == 0:
        game_state = Game()
        game_state._initialize(observation["updates"])
        game_state._update(observation["updates"][2:])
        game_state.id = observation.player
    else:
        game_state._update(observation["updates"])

    actions = []
    test_actions = []

    ### AI Code goes down here! ### 
    player = game_state.players[observation.player]
    opponent = game_state.players[(observation.player + 1) % 2]
    width, height = game_state.map.width, game_state.map.height

    # 이 resource를 채집할 수 있는지 없는지
    def researched(resource):
        if resource.type == Constants.RESOURCE_TYPES.WOOD:
            return True
        if resource.type == Constants.RESOURCE_TYPES.COAL and player.research_points >= GAME_CONSTANTS['PARAMETERS']['RESEARCH_REQUIREMENTS']['COAL']:
                return True
        if resource.type == Constants.RESOURCE_TYPES.URANIUM and player.research_points >= GAME_CONSTANTS['PARAMETERS']['RESEARCH_REQUIREMENTS']['URANIUM']:
                return True
        return False

    # 필요한 cell_type 리스트로 가져오기
    def get_cells(cell_type):
        cells_of_type = []
        for y in range(height):
            for x in range(width):
                cell = game_state.map.get_cell(x, y)
                if (
                       ( cell_type == 'resource' and cell.has_resource() ) \
                    or ( cell_type == 'researched resource' and cell.has_resource() and researched(cell.resource) ) \
                    or ( cell_type == 'player citytile' and cell.citytile is not None and cell.citytile.team == observation.player ) \
                    or ( cell_type == 'enemy citytile' and cell.citytile is not None and cell.citytile.team != observation.player ) \
                    or ( cell_type == 'empty' and cell.citytile is None and not cell.has_resource() )
                ):
                    cells_of_type.append(cell)
        
        return cells_of_type


    # resource, citytile 위치 가져오기
    researched_resource_cells = get_cells('researched resource')
    citytile_cells = get_cells('player citytile')

    num_citytiles = len(citytile_cells)

    # 맨 처음 이라면 
    if first == 0:
        last = game_state.turn
        first = 1
    # ???
    elif last < game_state.turn:
        done = False
        if not done:
            next_state = get_screen(game_state)
        else:
            next_state = None

        # episode 저장
        memory.push(state, action, next_state, reward)
        # 학습
        optimize_model()
    
    # 유닛 별로
    for unit in player.units:
        # 유닛이 일을 할 수 있으면
        if unit.is_worker() and unit.can_act():
            # input 만들기
            state = get_screen(game_state)
            # output 뽑기
            action = select_action(state)

            # reward 보정
            reward = observation["reward"] / 1000
            total_reward += reward
            reward = torch.tensor([reward], device=device)
            
            # output으로 실제 action 만들기
            actions2, _ = get_prediction_actions(action, unit)

            if len(actions2) != 0:
                actions.append(actions2)

    # citytile 별로
    for k, city in player.cities.items():
        for citytile in city.citytiles:
            if citytile.can_act():
                # 만약 유닛이 더 적으면 무조건 유닛 만들기
                if num_citytiles > len(player.units):
                    actions.append(citytile.build_worker())
                else:
                    actions.append(citytile.research())

    return actions

In [18]:
import time
import tqdm
from IPython.display import clear_output
t_list = []
t = tqdm.tqdm(range(10), position=0, leave=True)
for ep in t:
    env = make('lux_ai_2021', configuration={'seed': 562124210, 'loglevel': 2, 'annotations': True}, debug=True)
    steps = env.run([agent, "simple_agent"])
    target_net.load_state_dict(policy_net.state_dict())
    t_list.append(total_reward)
    clear_output()
    t.set_description_str(str(total_reward/360))
    total_reward = 0

3.945033333333323: 100%|██████████| 10/10 [01:12<00:00,  7.21s/it]


In [19]:
env.render(mode='ipython', width=1200, height=1000)