# 12. Play with Connect 4

## Connect 4 (중력4목)

이전까지 오목을 학습하려고 이런저런 시도를 했지만
그리 좋은 결과를 얻지는 못했습니다.

여기서는 상태와 입력을 더 적은 Connect 4에
RL을 시도해봅니다.

## 고려할 점들

- Value network 등 neural network에는
직관 이상으로 많은 layer가 필요합니다.
AlphaGo의 경우에도 Layer를 12개 가량 쌓아서 구성을
하였고, Connect 4도 Layer 7개로 하루종일 학습시켜도
value network만으로는 greedy algorithm을 넘기는
쉽지 않다고 합니다.
결국에는 complexity를 최대한 늘리고 resource를 부어서
value나 policy를 학습해야되고, 그걸로 부족한 부분을
최대한 가지를 쳐서 트리 탐색을 할 수 밖에 없는 듯 합니다.
- 이전까지는 단순히 Monte Carlo 방식으로 value
function만을 근사하려고 했습니다만,
여기서는 더 정석적인 방법을 시도합니다.

## 환경설정

여기서는 Mock4.py를 사용합니다.

https://github.com/lumiknit/mock4.py

In [1]:
!rm -rf mock4.py m4
!git clone https://github.com/lumiknit/mock4.py.git
!mv mock4.py m4
!mv m4/mock4.py .
from mock4 import *

Cloning into 'mock4.py'...
remote: Enumerating objects: 10, done.[K
remote: Counting objects:  10% (1/10)[Kremote: Counting objects:  20% (2/10)[Kremote: Counting objects:  30% (3/10)[Kremote: Counting objects:  40% (4/10)[Kremote: Counting objects:  50% (5/10)[Kremote: Counting objects:  60% (6/10)[Kremote: Counting objects:  70% (7/10)[Kremote: Counting objects:  80% (8/10)[Kremote: Counting objects:  90% (9/10)[Kremote: Counting objects: 100% (10/10)[Kremote: Counting objects: 100% (10/10), done.[K
remote: Compressing objects:  12% (1/8)[Kremote: Compressing objects:  25% (2/8)[Kremote: Compressing objects:  37% (3/8)[Kremote: Compressing objects:  50% (4/8)[Kremote: Compressing objects:  62% (5/8)[Kremote: Compressing objects:  75% (6/8)[Kremote: Compressing objects:  87% (7/8)[Kremote: Compressing objects: 100% (8/8)[Kremote: Compressing objects: 100% (8/8), done.[K
remote: Total 10 (delta 2), reused 6 (delta 1), pack-reused 0[K
Unpacking ob

Mock5.py와 거의 비슷하게 사용하면 됩니다.

In [2]:
Mock4().play(agent_greedy, agent_greedy, p_msg=False)

-----------------
[ Turn  29 ; 2P ]
| 0 1 2 3 4 5 6 |
| O . O X X . . |
| X O O X X . . |
| O O O X X . . |
| O X X O O . . |
| X O O X X . . |
| X O O O X . . |
1P Win (<function agent_greedy at 0x7f5623d55710>)


2

In [3]:
test_mock4(100, agent_random, agent_greedy)

** Test
* A1 = <function agent_random at 0x7f5623d49290>
* A2 = <function agent_greedy at 0x7f5623d55710>
Total = 100 games
W1 0 (0.000) / Dr 0 (0.000) / W2 100 (1.000)


외에 pytorch, numpy를 불러옵니다.

In [4]:
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print("Device: {}".format(device))

Device: cpu


In [5]:
class Flatten(nn.Module):
  def forward(self, x):
    if len(x.shape) == 3: return x.view(-1)
    else: return x.flatten(1, -1)

In [6]:
class Replay():
  def __init__(self, size):
    self.size = size
    self.b = []

  def remove_olds(self):
    if len(self.b) > self.size:
      self.b = self.b[-self.size :]
  
  def add(self, S0, A0, R0, S1):
    self.b.append((S0, A0, R0, S1))
    self.remove_olds()
  
  def sample(self, size):
    Z = [None] * size
    for i in range(size):
      j = np.random.randint(len(self.b))
      Z[i] = self.b[j]
    S0 = [z[0] for z in Z]
    A0s = [z[1] for z in Z]
    R0 = [z[2] for z in Z]
    S1 = [z[3] for z in Z]
    return S0, A0s, R0, S1

## DQN

Deep neural network를 이용해서
Q-Learning을 합니다.

- Action value function $q$를
deep neural network로 구성합니다.
- Policy $\pi$는 $q$를 그대로 쓰되,
action을 선택할 떄 argmax로 선택합니다.
(Softmax로 확률처럼 바꿀 수는 있습니다만..)
Policy improvement를 할 경우에
$\pi$를 $q$로 바꿉니다.
- $\alpha$, $\epsilon$을 모두 decay합니다.
- Batch normalization을 사용합니다.
- Episode를 진행하며 replay memory를 누적시키고
replay memory에서 샘플링한 batch로 학습시킵니다.


In [4]:
## nn
def new_nn():
  W = 7
  H = 6
  net = nn.Sequential(
      # 01
      nn.Conv2d(3, 32, 3, padding='same'),
      nn.BatchNorm2d(32),
      nn.ReLU(),
      # 02
      nn.Conv2d(32, 64, 3, padding='same'),
      nn.BatchNorm2d(64),
      nn.MaxPool2d(2),
      nn.ReLU(),
      # 03
      nn.Conv2d(64, 64, 3, padding='same'),
      nn.BatchNorm2d(64),
      nn.ReLU(),
      # 04
      nn.Conv2d(64, 64, 3, padding='same'),
      nn.BatchNorm2d(64),
      nn.ReLU(),
      # 05
      nn.Conv2d(64, 4, 3, padding='same'),
      nn.BatchNorm2d(4),
      nn.ReLU(),
      # Lin01
      Flatten(),
      nn.Linear(4 * (W // 2) * (H // 2), 20),
      nn.BatchNorm1d(20),
      nn.ReLU(),
      # Lin02
      nn.Linear(20, W)
  ).to(device)
  return net

def update_policy(policy, q_fn):
  policy.load_state_dict(q_fn.state_dict())
  q_fn.train()
  policy.eval()

def init_nn():
  global policy, q_fn
  policy = new_nn()
  q_fn = new_nn()
  update_policy(policy, q_fn)

In [None]:
# Policy = epsilon-greedy for q
def agent_policy(epsilon):
  def agent(game):
    if np.random.uniform() < epsilon: return agent_random(game)
    X = game.tensor().unsqueeze(dim=0).to(device)
    M = game.tensor_full()
    with torch.no_grad():
      Q = policy(X)
      Q = Q.squeeze(dim=0)
    Q[M] = -float('inf')
    A = torch.argmax(Q)
    return A
  return agent

In [None]:
replay = Replay(65536)
replay_terminal = Replay(2048)

In [None]:
# Q-Learning
def learn(
    opt,
    loss_fn,
    n_episode,
    n_epoch,
    int_policy_update,
    gamma,
    alpha_fn,
    epsilon_fn,
    sz_sample,
    sz_sample_terminal
):
  epi = 0
  while epi < n_episode:
    # -- Get parameters
    alpha = alpha_fn(epi)
    epsilon = epsilon_fn(epi)
    # -- Run Game
    game = Mock4()
    result = game.play(agent_policy(epsilon), agent_policy(0), p_msg=False, p_res=False)
    reward = 1
    if result == 0: # Draw
      reward = 0
      result = 1
    # -- Append to Replay
    S1_p, S1_o = None, None
    while len(game.history) > 0:
      h = game.history[-1]
      a = int(h / game.h)
      del game.history[-1]
      game.board[h] = 0
      S0_p = game.tensor(player=result)
      S0_o = game.tensor(player=(3 - result))
      replay.add(S0_p, a, reward, S1_p)
      replay.add(S0_o, a, -reward, S1_o)
      if S1_p is None:
        replay_terminal.add(S0_p, a, reward, S1_p)
        replay_terminal.add(S0_o, a, -reward, S1_o)
      S1_p, S1_o = S0_p, S0_o
      reward = 0
    # -- Sampling and learning
    if len(replay.b) >= sz_sample:
      S_0, As, Rs, S_1 = replay.sample(sz_sample)
      # Append Terminal states
      t_S_0, t_As, t_Rs, t_S_1 = replay_terminal.sample(sz_sample_terminal)
      S_0 += t_S_0
      As += t_As
      Rs += t_Rs
      S_1  += t_S_1
      # Tensor-fy
      X_0 = torch.stack(S_0).to(device)
      R = torch.tensor(Rs, dtype=torch.float).to(device)
      Sz_1 = [torch.zeros(3, game.w, game.h) if s is None else s for s in S_1]
      Snone_1 = [s is None for s in S_1]
      X_1 = torch.stack(Sz_1).to(device)
      # Calc Curr Q
      with torch.no_grad():
        Q_0 = q_fn(X_0)
        Q_1 = q_fn(X_1)
      Qa_0 = Q_0[range(len(As)), As]
      Qmax_1 = torch.max(Q_1, dim=1).values
      # Q_0 <- Q_0 + alpha * (R + gamma * max Q_1 - Q_0) if not terminated
      Qtgt_0 = Qa_0 + alpha * (R + gamma * Qmax_1 - Qa_0)
      # Q_0 <- R otherwise
      Qtgt_0[Snone_1] = R[Snone_1]
      # Learn
      loss_list = []
      for e in range(n_epoch):
        opt.zero_grad()
        Q_0 = q_fn(X_0)
        Qa_0 = Q_0[range(len(As)), As]
        loss = loss_fn(Qa_0, Qtgt_0)
        loss_list.append(loss.mean().item())
        loss.backward()
        opt.step()
      epi += 1
      print("Ep #{} (#Repl={}) Loss {:.8f}α -> {:.8f}α".format(
          epi, len(replay.b), loss_list[0] / alpha, loss_list[-1] / alpha))
      # Update Policy
      if (epi + 1) % int_policy_update == 0:
        update_policy(policy, q_fn)
    else: print("Accumulating Replay... (#={})".format(len(replay.b)))

In [None]:
init_nn()

opt = optim.Adam(q_fn.parameters(), lr=1e-3, weight_decay=1e-5)
loss_fn = nn.SmoothL1Loss()
n_episode = 1000
n_epoch = 50
int_policy_update = 10
alpha_fn = lambda n: 1 / (1 + n)
gamma = 0.99
epsilon_fn = lambda n: 0.3 * (0.99 ** n)
sz_sample_terminal = 512
sz_sample = 2048 - sz_sample_terminal

learn(
    opt=opt,
    loss_fn=loss_fn,
    n_episode=n_episode,
    n_epoch=n_epoch,
    int_policy_update=int_policy_update,
    alpha_fn=alpha_fn,
    gamma=gamma,
    epsilon_fn=epsilon_fn,
    sz_sample=sz_sample,
    sz_sample_terminal=sz_sample_terminal
)

In [None]:
test_mock4(100, agent_random, agent_policy(0))
test_mock4(100, agent_greedy, agent_policy(0))

## Policy Gradient

#WIP