In [None]:
# imports
import math
import copy
import torch
import torch.nn.functional as F
import torch.optim as optim
from torch import nn 
import numpy as np
from typing import NamedTuple



device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


In [None]:
class Gomoku:
  def __init__(self, dim, win):
    self.dim = dim
    self.win = win
    self.total_moves = 0
    self.winner = 0
    self.end = 0
    self.board = np.zeros((self.dim, self.dim))
    self.available = np.zeros((self.dim, self.dim))

    self.log = [] 

  def get_available_coord(self):
    positions = list(map(list, np.where(self.board == 0)))
    return list(zip(positions[0], positions[1]))

  def get_available_action(self):
    temp = self.get_available_coord()
    return [self.coordinates_to_action(x) for x in temp]


  def action_to_coordinates(self, action):
    return ((action//self.dim), (action%self.dim))

  def coordinates_to_action(self, coord):
    return self.dim * coord[0] + coord[1]


  def state_after_move(self, position, player):
    new_state = copy.deepcopy(self.board)
    new_available = copy.deepcopy(self.available)
    tot = self.total_moves
    reward = -1
    terminate = 0 

    if position not in self.get_available_coord():
      terminate = 1
      reward = -10
      
    new_state[position] = player
    new_available[position] = float("-inf")


    if tot + 1 == dim**2:
      terminate = 1

    if self.won2(new_state) == self.win:
      terminate = 1
      reward = 10
    
    return new_state, new_available, tot, terminate, reward

  def make_move(self, position, player):
    self.log.append((player, position))
    if position not in self.get_available_coord():
      self.end = 1
      return -10
      
    self.board[position] = player
    self.available[position] = float("-inf")
    self.total_moves += 1 
    # self.available

    consec = self.won()
    chains = {1 : consec[0], -1 : consec[1]}
    
    if abs(chains[player*(-1)]) == self.win - 1:
      return -10

    if abs(chains[player]) ==  self.win:
      self.winner = player
      self.end = 1
      return 10

    if self.total_moves == dim**2:
      self.end = 1
      return 0

    
    return -1

  def won(self):
    # Return self.winner if there's a winning subsequence for any player. 
    # check horizontal
    ans = 0
    ans2 = 0 
    for i in range(0, self.dim):
      ans = max(ans, max([ (sum(self.board[i, j: j +self.win])) for j in range(0, self.dim - self.win + 1)]))
      ans2 = min(ans2, min([ (sum(self.board[i, j: j +self.win])) for j in range(0, self.dim - self.win + 1)]))
    #check vertical
    for j in range(0, self.dim):
      ans  =max(ans, max([ (sum(self.board[i: i+self.win, j])) for i in range(0, self.dim - self.win + 1)]))
      ans2  =min(ans2, min([ (sum(self.board[i: i+self.win, j])) for i in range(0, self.dim - self.win + 1)]))
  
    for i in range(0, self.dim - self.win+1):
      ans = max(ans, max([ (sum(   self.board[ [i + k for k in range(0, self.win)], [j + l for l in range(0, self.win)]  ]    )) for j in range(0, self.dim - self.win + 1 )]))
      ans2 = min(ans2, min([ (sum(   self.board[ [i + k for k in range(0, self.win)], [j + l for l in range(0, self.win)]  ]    )) for j in range(0, self.dim - self.win + 1 )]))
    #check diagonal
    for i in range(self.win-1, self.dim):
      ans = max(ans, max([ (sum(self.board[[i - k for k in range(0, self.win)], [j + l for l in range(0, self.win)] ])) for j in range(0, self.dim - self.win + 1)]))
      ans2 = min(ans2, min([ (sum(self.board[[i - k for k in range(0, self.win)], [j + l for l in range(0, self.win)] ])) for j in range(0, self.dim - self.win + 1)]))
    #check anti-diagonal
    return ans, ans2

  

In [None]:
#Network for Q learning
class DQN(nn.Module):
  def __init__(self, dim, num_win):
    super().__init__()
    self.fc1 = nn.Linear(dim*dim, 32)
    self.fc2 = nn.Linear(32, 32)
    self.fc5 = nn.Linear(32, dim*dim)

  def forward(self, x):
    x = F.relu(self.fc1(x))
    x = F.relu(self.fc2(x))
    x = self.fc5(x)
    return x

In [None]:
def epsilon_greedy(epsilon):
  def policy_fn(available_actions, available_board, q_net, state):
    if torch.rand(1) < epsilon:
      return torch.tensor([np.random.choice(available_actions)] , device = device)
    else:
      with torch.no_grad():  
        q_pred = q_net(torch.flatten(state))
        return torch.argmax(q_pred + available_board.view(1,-1) ).view(1,)
  return policy_fn

In [None]:
def random_policy():
  def policy_fn(available_actions,available_board = None, q_net = None, state= None):
    return torch.tensor([np.random.choice(available_actions)] , device = device)
  return policy_fn

In [None]:
class Batch(NamedTuple):
  state: torch.Tensor
  action: torch.Tensor
  reward: torch.Tensor
  discount: torch.Tensor
  next_state: torch.Tensor

In [None]:
class ReplayBuffer:
  def __init__(self, state_dim, act_dim, buffer_size):
    self.buffer_size = buffer_size
    self.ptr = 0
    self.n_samples = 0

    self.state = torch.zeros(buffer_size, *state_dim, dtype=torch.float32, device=device)
    self.action = torch.zeros(buffer_size, act_dim, dtype=torch.int64, device=device)
    self.reward = torch.zeros(buffer_size, 1, dtype=torch.float32, device=device)
    self.discount = torch.zeros(buffer_size, 1, dtype=torch.float32, device=device)
    self.next_state = torch.zeros(buffer_size, *state_dim, dtype=torch.float32, device=device)

  def add(self, state, action, reward, discount, next_state):
    self.state[self.ptr] = state
    self.action[self.ptr] = action
    self.reward[self.ptr] = reward
    self.discount[self.ptr] = discount
    self.next_state[self.ptr] = next_state
    
    if self.n_samples < self.buffer_size:
      self.n_samples += 1

    self.ptr = (self.ptr + 1) % self.buffer_size

  def sample(self, batch_size):
    # Select batch_size number of sample indicies at random from the buffer
    idx = np.random.choice(self.n_samples, batch_size)
    state = self.state[idx]
    action = self.action[idx]
    reward = self.reward[idx]
    discount = self.discount[idx]
    next_state = self.next_state[idx]

    return Batch(state, action, reward, discount, next_state)

In [None]:
#Agent class as in week 11 tutorial.
class Agent:
  def __init__(self, policy, qnet= None, optimizer = None, replay_buffer= None, batch_size = None):
    self.policy = policy
    self.qnet = qnet
    self.optimizer = optimizer  
    self.replay_buffer = replay_buffer
    self.batch_size = batch_size

  def act(self, available_actions, available_board, state):
    with torch.no_grad():
      return self.policy(available_actions, available_board, self.qnet, state)
  
  def train(self, state, action, reward, discount, next_state, i):

    self.replay_buffer.add(state, action, reward, discount, next_state)  


    batch = self.replay_buffer.sample(self.batch_size)
    q_actions = self.qnet(torch.flatten(batch.state, start_dim = 1))
    q_pred = q_actions.gather(1, batch.action)

    if i % 1000 == 0:
      print(q_pred)
    with torch.no_grad():
      q_target_actions = self.qnet(torch.flatten(batch.next_state, start_dim = 1))
      q_target = q_target_actions.max(dim=1)[0].view(-1, 1)
      q_target = batch.discount * q_target + batch.reward
      

    #print(q_pred.shape, q_target.shape)
    loss = F.mse_loss(q_target, q_pred).mean()

    self.optimizer.zero_grad()
    loss.backward()
    self.optimizer.step()



In [None]:

# #Agent class as in week 11 tutorial.
# class Agent:
#   def __init__(self, policy, qnet, optimizer):
#     self.policy = policy
#     self.qnet = qnet
#     self.optimizer = optimizer  

#   def act(self, available_random, available, state):
#     with torch.no_grad():
#       return self.policy(available_random, available, self.qnet, state)
  
#   def train(self, state, action, reward, gamma, game, player, rival):
    
#     if reward == 1 or reward == 0:
#       q_pred = self.qnet(state).view(1,-1)
#       actionr = action.view(1,-1)
#       q_pred = q_pred.gather(1, actionr)
#       with torch.no_grad():
#         q_target = torch.tensor([[reward]]).type(torch.float32).to(device)
#         if q_pred > 10000000:
#           print(q_pred, q_target)
#     else:
#       q_pred = self.qnet(state).view(1,-1)
#       actionr = action.view(1,-1)
#       q_pred = q_pred.gather(1, actionr)

#       position = ((action//dim).cpu().data.numpy()[0], (action%dim).cpu().data.numpy()[0])
#       new_state, new_available, tot, terminate, rewardd =  game.state_after_move(position, player)
#       new_state = torch.from_numpy(new_state).type(torch.float32).to(device)
#       q_target = self.qnet(new_state).view(1,-1)
#       q_target = torch.max(q_target).view(-1,1)
#       q_target = reward + gamma * q_target

      
#     #   q_pred = self.qnet(state).view(1,-1)

#     #  # print(f"q-values for player {player} is:")
#     # #  print(q_pred.view(3,3))
#     #   actionr = action.view(1,-1)
#     #   q_pred = q_pred.gather(1, actionr)

#     #   with torch.no_grad():
#     #     board_for_rival = torch.from_numpy(game.board).type(torch.float32).to(device)
#     #     #print("For rival", game.board, game.get_available_action())
#     #     available_for_rival_random = np.array(game.get_available_action())
#     #     available_for_rival = game.available


#     #     rival_q = rival.qnet(board_for_rival).view(1,-1)
#     #     rival_action = rival.act(available_for_rival_random, available_for_rival, board_for_rival)
#     #     rival_position =  ((rival_action//dim).cpu().data.numpy()[0], (rival_action%dim).cpu().data.numpy()[0])
#     #     new_state, new_available, tot, terminate, riv_reward =  game.state_after_move(rival_position, player*-1)
#     #     if riv_reward == 10:
#     #       q_target = -1* torch.tensor([[-1 * reward]]).type(torch.float32).to(device)
#     #     elif terminate == 1:
#     #       q_target = reward + gamma * q_pred
#     #     else:
#     #       q_riv = torch.from_numpy(available_for_rival).type(torch.float32).view(1,-1).to(device) - rival_q
#     #       #print(q_riv)
#     #       q_riv = torch.max(q_riv).view(-1,1)

#     #       new_state = torch.from_numpy(new_state).type(torch.float32).to(device)  
#     #    #   print(f"player {player} playing")
#     #     #  print(state.cpu().data.numpy())
#     #     #  print(new_state.cpu().data.numpy())
#     #       q_target = self.qnet(new_state).view(1,-1)
#     #   #   print(f"target q-values for player {player} is:")
#     #     #  print(q_target.view(3,3))
#     #       q_target = torch.max(q_target).view(-1,1)
#     #      # print("hi", q_pred, q_riv, q_target)
#     #       q_target = reward + gamma * q_target + gamma * q_riv
          

        
#       #print(q_target)
#     #print(q_pred, q_target)
#   #  print('\n')
#     #print(q_pred.size, q_target.size)
#     loss = F.mse_loss(q_pred, q_target)

#     self.optimizer.zero_grad()
#     loss.backward()
#     self.optimizer.step()


In [None]:
def train_agent2(agent1, agent2, gamma, num_games, dim, num_win):
  for i in range(num_games):
    game = Gomoku(dim, num_win)
    curr_player = 1
    rewards = {1: 0, -1: 0}
    agent = {1 : agent1, -1 : agent2}

    while game.end == 0:
      #let two agents play each other and learn.
      state = torch.from_numpy(game.board).type(torch.float32).to(device)
      available_actions = np.array(game.get_available_action())
      available_board = torch.from_numpy(game.available).type(torch.float32).to(device)
      #print(curr_player, state)
      action = agent[curr_player].act(available_actions, available_board, state)
      action_coord = ((action//dim).cpu().data.numpy()[0], (action%dim).cpu().data.numpy()[0])
      reward = game.make_move(action_coord, curr_player)
      rewards[curr_player] += reward
      discount = gamma*(1 - game.end)

      next_state = torch.from_numpy(game.board).type(torch.float32).to(device)

      if curr_player == -1:
        agent2.train(state, action, reward, discount, next_state, i)
      curr_player = curr_player * -1
    
      
    if i % 1000 == 0:
      print(f"game {i} completed")
      print(game.board)
      print(game.log)
      print(len(game.log))
      print(rewards)
   # print(game.board, game.winner, game.total_moves)


def play_against_agent(agent, dim, num_win):
  game = Gomoku(dim, num_win)
  curr_player = 1
  while game.end == 0:
    state = torch.from_numpy(game.board).type(torch.float32).to(device)
    #print(state, curr_player)
    if curr_player == -1:
      state = torch.from_numpy(game.board).type(torch.float32).to(device)
      available_actions = np.array(game.get_available_action())
      available_board = torch.from_numpy(game.available).type(torch.float32).to(device)

      action = agent.act(available_actions, available_board, state)
      position = ((action//dim).cpu().data.numpy()[0], (action%dim).cpu().data.numpy()[0])
      print(f"Agent has made a move: {position}")

    else:
      position = eval(input("Enter position:"))
     # print(position.whattype())
      print(f"You have made a move: {position}")
    reward = game.make_move(position, curr_player)
    #print(game.end, game.winner)
    #end_game, next_state = game.end, torch.from_numpy(game.board).type(torch.float32)
   # print(game.end, game.winner)
     
    print(game.board)
    print("\n")
    #rewards[curr_player] += reward
    curr_player = curr_player * -1

In [None]:

num_games = 10000
gamma = 0.9
epsilon = 0.1

dim = 3
num_win = 3
num_actions = dim**2

policy1 = random_policy()
policy2 = epsilon_greedy(epsilon)

qnet = DQN(dim, num_win).to(device)
optimizer = torch.optim.Adam(qnet.parameters(), lr=1e-3)
buffer_size = 30
batch_size = 9
replay_buffer = ReplayBuffer((dim,dim), 1, buffer_size)


agent1 = Agent(policy = policy1, qnet= None, optimizer = None, replay_buffer= None, batch_size = None)
agent2 = Agent(policy = policy2, qnet= qnet, optimizer = optimizer, replay_buffer= replay_buffer, batch_size = batch_size)

train_agent2(agent1, agent2, gamma, num_games, dim, num_win)

tensor([[0.1199],
        [0.1199],
        [0.1199],
        [0.1199],
        [0.1199],
        [0.1199],
        [0.1199],
        [0.1199],
        [0.1199]], device='cuda:0', grad_fn=<GatherBackward>)
tensor([[0.0595],
        [0.1117],
        [0.0595],
        [0.1117],
        [0.1117],
        [0.1117],
        [0.1117],
        [0.0595],
        [0.0595]], device='cuda:0', grad_fn=<GatherBackward>)
tensor([[ 0.0526],
        [-0.0239],
        [ 0.0526],
        [ 0.0526],
        [ 0.0526],
        [-0.0239],
        [ 0.1074],
        [ 0.1074],
        [ 0.0526]], device='cuda:0', grad_fn=<GatherBackward>)
tensor([[ 0.1044],
        [-0.0277],
        [ 0.0445],
        [-0.0277],
        [-0.0044],
        [ 0.1044],
        [ 0.0445],
        [-0.0044],
        [ 0.1044]], device='cuda:0', grad_fn=<GatherBackward>)
game 0 completed
[[-1.  1.  1.]
 [-1.  1.  1.]
 [ 1. -1. -1.]]
[(1, (1, 2)), (-1, (2, 1)), (1, (1, 1)), (-1, (0, 0)), (1, (2, 0)), (-1, (1, 0)), (1, (0, 1)), 

KeyboardInterrupt: ignored

In [None]:
play_against_agent(agent2, 3,3)

In [None]:
agent2.qnet(torch.tensor([-1,1,0,0,1,0,0,0,0]).type(torch.float32).to(device)).view(3,3)

In [None]:
gm = Gomoku(3,3)
gm.board

array([[0., 0., 0.],
       [0., 0., 0.],
       [0., 0., 0.]])

In [None]:
gm.won()

(0, 0)

In [None]:
gm.make_move((1,1),1)

-1

In [None]:
gm.board

array([[0., 0., 0.],
       [0., 1., 0.],
       [0., 0., 0.]])

In [None]:
gm.won()

(1.0, 0)

In [None]:
gm.make_move((1,0),-1)

-1

In [None]:
gm.won()

(1.0, -1.0)