# Connect 4 board class

In [None]:
import numpy as np
import pandas as pd
from IPython import display

'''
Class for the connect 4 game:
Number of rows = 4, Number of columns = 5
The board will be a 2D Numpy array consisting of 0s, 1s, and 2s (where 1 is player 1, 2 is player 2, 0 is an empty slot)
Rewards are as follows: {win: 1, draw: -0.5, lose: -1} (we want to maximize winning)
'''

class C4:
    def __init__(self):
        self.width = 7
        self.height = 6
        self.state = np.zeros([self.height, self.width], dtype=np.uint8)
        self.players = {'P1': 1, 'P2': 2}
        self.rewards = {'Win': 1, 'Draw': -0.5, 'Lose': -1}
        self.Finished = False
        self.actions = [0, 1, 2, 3, 4, 5, 6]
        
    def resetGame(self):
        self.__init__()

    
    '''
    Function for returning the columns which are not full (the topmost slot in the column should be a 0)
    '''

    def free_cols(self):
        return [col for col in range(self.width) if self.state[0, col] == 0]



    '''
    Function for checking winning conditions
    Input will be the player, row & col of move played
    Search for win in the col, row and the two diagonals
    '''
    
    def check_vertical(self, sub_str, col):
        return sub_str in ' '.join(map(str, self.state[:, col]))

    
    def check_horizontal(self, sub_str, row):
        return sub_str in ' '.join(map(str, self.state[row, :]))
    
    def check_diagonal(self, sub_str, row, col):
        left_diagonal = ''

        #first go to the lefmost point in the left diagonal of the row, col
        i = row - min(row, col)
        j = col - min(row, col)
        while i < self.height and j < self.width:
            left_diagonal += f'{self.state[i, j]} '
            i+=1
            j+=1
        
        right_diagonal = ''

        #first go to the rightmost point in the right diagonal of the row, col
        i  = row - min(row, 6 - col)
        j = col + min(row, 6 - col)
        while i < self.height and j > 0:
            right_diagonal += f'{self.state[i, j]} '
            i+=1
            j-=1

        return sub_str in left_diagonal or sub_str in right_diagonal
    
    #we just need to check if the board is full 
    def is_draw(self):
        for col in range(self.width):
            if self.state[0][col] == 0:
                return False
        self.Finished = True
        return True
    
    def render(self):
        rendered_board_state = self.state.copy().astype(str)
        rendered_board_state[self.state == 0] = ' '
        rendered_board_state[self.state == 1] = 'O'
        rendered_board_state[self.state == 2] = 'X'
        display(pd.DataFrame(rendered_board_state))

    def check_win(self, player, row, col):
        win_substr = ' '.join([str(self.players[player])] * 4)
        #if either of the conditions passes, the current player has won
        if self.check_vertical(win_substr, col) or self.check_horizontal(win_substr, row) or self.check_diagonal(win_substr, row, col):
            self.Finished = True
        
        if self.Finished:
            return self.rewards['Win']
        elif self.is_draw():
            return self.rewards['Draw']
        else:
            return 0

    '''
    Function for making a move.
    If the move is valid, drop the token at the lowest empty space in the column
    Once the move is made, check winning conditions

    '''

    def move(self, player, col):
        #check if there is free space in the column
        if self.state[0, col] == 0:
            row = np.where(self.state[:, col] == 0)[0][-1]
            self.state[row, col] = self.players[player]


        else:
            print('Invalid move')

        return self.state.copy(), self.check_win(player, row, col)    

# Experience Replay Class

In [None]:
'''
Experience Replay will be used to train.
Any transition that is observed will be stored: (state, action taken, reward received, next state)
We can randomly sample from this list to use for training instead of training on each state-action pair
'''

import random

class Expr_Replay:
    def __init__(self):
        self.store = []

    def sample(self, num):
        return random.sample(self.store, num)
    
    def add(self, transition):
        self.store.append(transition)

    def _len(self):
        return len(self.store)

# Neural Network to approximate the Q table

In [None]:
import torch
from torch import nn
import torch.nn.functional as F

class DQN(nn.Module):
    def __init__(self):
        super(DQN, self).__init__()

        #convolutional layers
        self.conv1 = nn.Conv2d(1, 32, kernel_size=5, padding=2)
        self.conv2 = nn.Conv2d(32, 32, kernel_size=5, padding=2)

        #fully connected layers
        self.fc1 = nn.Linear(32 * 6 * 7, 42)
        self.fc2 = nn.Linear(42, 42)
        self.out = nn.Linear(42, 7)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.out(x)
        return x

# Parameters used in training

In [None]:
import torch.optim as optim
import math
from tqdm import tqdm

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

class Agent:
    def __init__(self):
        self.learning_rate = 0.001
        self.batch_size = 200
        self.gamma = 0.9
        self.e_max = 1.0
        self.e_min = 0.01
        self.decay_rate = 0.001
        self.env = C4()
        self.memory = Expr_Replay()
        self.main_net = DQN().to(device)
        self.target_net = DQN().to(device)
        self.target_net.load_state_dict(self.main_net.state_dict())
        self.optimizer = optim.Adam(self.main_net.parameters(), lr=self.learning_rate)
        self.loss_fn = nn.MSELoss()
        self.steps_done = 0
        self.episodes = 25000
        self.target_lag = 15
        self.training_history = []



    #exponential decay used for epsilon
    def epsilon_decay(self, step):
        return self.e_min + (self.e_max - self.e_min) * math.exp(-self.decay_rate * step)


    #add batch size and channel to make it ready for a conv layer
    def transform_input(self, state):
        return torch.tensor(state, dtype=torch.float, device=device).view(1, 1, *state.shape)

    '''
    Function for carrying out epsilon-greedy strategy for the agent
    '''
    def epsilon_greedy(self, state, free_actions, step=None, training=True):
        state = self.transform_input(state)

        if training:
            threshold = self.epsilon_decay(step)
        else:
            threshold = 0
            
        #if less then epsilon, then choose a random available action
        #if greater than epsilon, then use the main NN to exploit
        if random.random() < threshold:
            return random.choice(free_actions)
        else:
            with torch.no_grad():
                actions = self.main_net(state)[0, :]
                vals = [actions[i].cpu().numpy() for i in free_actions]
                return free_actions[np.argmax(vals)]

    def rand_agent(self, free_cols):
        return random.choice(free_cols)


    def optimize(self):
        transitions = self.memory.sample(self.batch_size)
        state_batch, action_batch, reward_batch, next_state_batch = zip(*[(np.expand_dims(m[0], axis=0), [m[1]], m[2], np.expand_dims(m[3], axis=0)) for m in transitions])

        state_batch = torch.tensor(state_batch, dtype=torch.float, device=device)
        action_batch = torch.tensor(action_batch, dtype=torch.long, device=device)
        reward_batch = torch.tensor(reward_batch, dtype=torch.float, device=device)

        # for assigning terminal state value = 0 later
        non_final_mask = torch.tensor(tuple(map(lambda s_: s_[0] is not None, next_state_batch)), device=device)
        non_final_next_state = torch.cat([torch.tensor(s_, dtype=torch.float, device=device).unsqueeze(0) for s_ in next_state_batch if s_[0] is not None])

        # prediction from policy_net
        state_action_values = self.main_net(state_batch).gather(1, action_batch)
        
        # truth from target_net, initialize with zeros since terminal state value = 0
        next_state_values = torch.zeros(self.batch_size, device=device)
        # tensor.detach() creates a tensor that shares storage with tensor that does not require grad
        next_state_values[non_final_mask] = self.target_net(non_final_next_state).max(1)[0].detach()
        # compute the expected Q values
        expected_state_action_values = (next_state_values * self.gamma) + reward_batch

        loss = self.loss_fn(state_action_values, expected_state_action_values.unsqueeze(1)) # torch.tensor.unsqueeze returns a copy

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    ''' 
    Function for training our main NN. For each episode, we will randomize who makes the first move so that our agent has experience with both situations.
    Need to rewrite a lot
    '''
    def train(self):
        for i in range(self.episodes):
            self.env.resetGame()

            #select random player to go first: P1 is always our agent and P2 is always a random agent
            #exposes our agent to episodes where it starts with the second turn
            first = random.choice(['P1', 'P2'])
            if first == 'P2':
                free_actions = self.env.free_cols()
                a_p2 = self.rand_agent(free_actions)
                s_p2_curr, reward_p2 = self.env.move('P2', a_p2)
                s_p1 = s_p2_curr
            else:
                s_p1 = self.env.state.copy()


            #main loop for each episode
            while True:
                free_actions = self.env.free_cols()
                a_p1 = self.epsilon_greedy(s_p1, free_actions, self.steps_done)
                s_p1_curr, reward_p1 = self.env.move('P1', a_p1)
                self.steps_done += 1

                if self.env.Finished:
                    self.memory.add([s_p1, a_p1, reward_p1, None])
                    break

                free_actions = self.env.free_cols()
                a_p2 = self.rand_agent(free_actions)
                s_p2_curr, reward_p2 = self.env.move("P2", a_p2)

                if self.env.Finished:
                    if reward_p2 == 1:
                        self.memory.add([s_p1, a_p1, -1, None])
                    else:
                        self.memory.add([s_p1, a_p1, -0.5, None])
                    break
                
                self.memory.add([s_p1, a_p1, -0.1, s_p2_curr])
                s_p1 = s_p2_curr

                #optimize the main net
                if self.memory._len() >= self.batch_size:
                    self.optimize()
            
            if i % self.target_lag == self.target_lag - 1:
                self.target_net.load_state_dict(self.main_net.state_dict())

        print('Completed')

In [None]:
agent = Agent()
agent.train()
torch.save(agent.main_net.state_dict(), 'C4.pth')
from google.colab import files
files.download('C4.pth')

In [None]:
#Simulate a game against the trained bot

env = C4()
model = torch.load('C4.pth')
p1_turn = True
last = 'P1'
env.render()
while not env.Finished:
    if p1_turn:
        last = 'P1'
        while col not in env.free_cols():
            col = input("Enter a column number: ")
        env.move('P1', col)
        env.render()    
        p1_turn = not p1_turn

    else:
        last = 'P2'
        available = env.free_cols()
        actions = model(env.state.copy())[0, :]
        vals = [actions[i] for i in available]
        move = available[np.argmax(vals)]
        env.move('P2', move)
        env.render()
        p1_turn = not p1_turn
if last == 'P1':
    print('You have won against the bot!')
else:
    print('The bot has won against you!')