In [None]:
from google.colab import files
from zipfile import ZipFile
uploaded = files.upload()
with ZipFile("Othello.zip", 'r') as zip_file:
    zip_file.extractall()

In [None]:
from Arena import Arena
from OthelloGame import OthelloGame
from OthelloPlayers import RandomPlayer, HumanOthelloPlayer, GreedyOthelloPlayer

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from collections import deque
import random
import torch.optim as optim
from Arena import Arena
from OthelloGame import OthelloGame
from OthelloPlayers import *
import math
from tqdm.notebook import tqdm
from random import shuffle

In [None]:
class PolicyNet(nn.Module):
    """
    This class implements the policy network
    """
    def __init__(self, game):
        super().__init__()

        # parameters
        self.board_x, self.board_y = game.getBoardSize()
        self.action_size = game.getActionSize()
        self.num_channels = 256  # number of channels for the Conv2d layer
        self.dropout = 0.3  # Dropout probability

        # convolutional layers
        self.conv1 = nn.Conv2d(1, self.num_channels, 3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(self.num_channels, self.num_channels, 3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(self.num_channels, self.num_channels, 3, stride=1)
        # self.conv4 = nn.Conv2d(self.num_channels, self.num_channels, 3, stride=1)

        self.bn1 = nn.BatchNorm2d(self.num_channels)
        self.bn2 = nn.BatchNorm2d(self.num_channels)
        self.bn3 = nn.BatchNorm2d(self.num_channels)
        # self.bn4 = nn.BatchNorm2d(self.num_channels)

        self.fc1 = nn.Linear(self.num_channels*(self.board_x-2)*(self.board_y-2), 512)
        self.fc_bn1 = nn.BatchNorm1d(512)
        self.fc2 = nn.Linear(512, self.action_size)
        self.fc3 = nn.Linear(512, 1)

    def forward(self, s):
        """
        Args:
            s: board configurtion, torch.Tensor with shape (batch_size, board_x, board_y)
        Returns:
            pi: log probability of actions in state s, torch.Tensor with shape (batch_size, action_size)
            v: value of state s, torch.Tensor with shape (batch_size, 1)
        """
        s = s.view(-1, 1, self.board_x, self.board_y)                # batch_size x 1 x board_x x board_y
        s = F.relu(self.bn1(self.conv1(s)))                          # batch_size x num_channels x board_x x board_y
        s = F.relu(self.bn2(self.conv2(s)))                          # batch_size x num_channels x board_x x board_y
        s = F.relu(self.bn3(self.conv3(s)))                          # batch_size x num_channels x (board_x-2) x (board_y-2)
        s = s.view(-1, self.num_channels*(self.board_x-2)*(self.board_y-2))

        s = F.dropout(F.relu(self.fc_bn1(self.fc1(s))), p=self.dropout, training=self.training)  # batch_size x 512

        # log probability of actions in state s
        pi = F.log_softmax(self.fc2(s), dim=1)                                                   # batch_size x action_size
        # value of state s
        v = torch.tanh(self.fc3(s))                                                              # batch_size x 1

        return pi, v


In [1]:
class MCTS:
    """
    This class handles the MCTS tree.
    """
    def __init__(self, game, policy_net):
        self.game = game
        self.policy_net = policy_net.to("cuda")

        self.num_MCTS_sims = 40  # number of simulations for MCTS for each action
        self.bonus_term_factor = 1.0

        self.Qsa = {}  # stores Q values for s,a
        self.Nsa = {}  # stores number of times edge s,a was visited
        self.Ns = {}  # stores number of times board s was visited
        self.Ps = {}  # stores initial policy (returned by policy network)

        self.Es = {}  # stores game.getGameEnded for board s
        self.Vs = {}  # stores game.getValidMoves for board s
        self.device = "cuda"

    def getActionProb(self, canonicalBoard):
        """
        Args:
            canonicalBoard: canonical board configuration, a 2D numpy array:
                            1=current player, -1=the opponent, 0=empty
                            first dim is row , second is column
        Returns:
            probs: a list with len=action_size, which is a policy vector
                   where the probability of the ith action is proportional to Nsa[(s,a)]
        """
        # Doing self.num_MCTS_sims times of simulations starting from the state 'canonicalBoard'
        for i in range(self.num_MCTS_sims):
            self.search(canonicalBoard)

        # Use string representation for the state
        s = self.game.stringRepresentation(canonicalBoard)
        probs = [0] * self.game.getActionSize()
        for a in range(self.game.getActionSize()):
          if (s,a) in self.Nsa:
            probs[a] = self.Nsa[(s,a)] / self.Ns[s]

        return probs

    def search(self, canonicalBoard):
        """
        Args:
            canonicalBoard: canonical board configuration, a 2D numpy array:
                            1=current player, -1=the opponent, 0=empty
                            first dim is row , second is column
        Returns:
            v: the negative of the value of the current canonicalBoard
        """

        # Use string representation for the state
        s = self.game.stringRepresentation(canonicalBoard)
        # Update self.Es
        if s not in self.Es:
            self.Es[s] = self.game.getGameEnded(canonicalBoard, 1)


        if self.Es[s] != 0:
            # The game ended, which means that s is a terminal node
            return -self.Es[s]

        if s not in self.Ps:

            # There is no policy for the current state s, which means that s is a leaf node (a new state)
            # self.Ps = {}  # stores initial policy (returned by policy network)
            # Set Q(s,a)=0 and N(s,a)=0 for all a
            for a in range(self.game.getActionSize()):
                self.Qsa[(s, a)] = 0
                self.Nsa[(s, a)] = 0

            # Calculate the output of the policy network, which are the policy and the value for state s
            # the numpy representation of board converted to torch tensor
            board = torch.FloatTensor(canonicalBoard.astype(np.float64),).view(1, self.policy_net.board_x,
                                                                              self.policy_net.board_y)
            board = board.to(self.device)
            self.policy_net.eval()

            # get two output of the policy network regarding to the newly seen state
            with torch.no_grad():
                pi, v = self.policy_net(board)
            self.Ps[s] = torch.exp(pi).data.cpu().numpy()[0]  # The policy for state s
            v = v.data.cpu().numpy()[0][0]  # The value of state s

            # Masking invalid moves
            valids = self.game.getValidMoves(canonicalBoard, 1)
            self.Ps[s] = self.Ps[s] * valids
            sum_Ps_s = np.sum(self.Ps[s])
            if sum_Ps_s > 0:
                self.Ps[s] /= sum_Ps_s  # renormalize
            else:
                # if all valid moves were masked make all valid moves equally probable
                self.Ps[s] = self.Ps[s] + valids
                self.Ps[s] /= np.sum(self.Ps[s])

            self.Vs[s] = valids  # Stores the valid moves
            self.Ns[s] = 0
            return -v

        # pick the action with the highest upper confidence bound (ucb) and assign it to best_act
        best_act = -1
        valids = self.Vs[s]
        cur_best = -float('inf')
        for a in range(self.game.getActionSize()):
            if valids[a]:
                # compute q-value plus UCB bonus
                Q_val = self.Qsa[(s, a)]
                UCB = self.bonus_term_factor * self.Ps[s][a] * math.sqrt(self.Ns[s]) / ( 1 + self.Nsa[(s, a)] )
                if (Q_val + UCB) > cur_best:
                  cur_best = Q_val + UCB
                  best_act = a


        # Continue the simulation: take action best_act in the simulation
        a = best_act
        next_s, next_player = self.game.getNextState(canonicalBoard, 1, a)
        next_s = self.game.getCanonicalForm(next_s, next_player)


        # This returns the value for the current player
        v = self.search(next_s)

        self.Qsa[(s,a)] = ( self.Nsa[(s, a)] * self.Qsa[(s,a)] + v ) / (self.Nsa[(s, a)] + 1)
        self.Nsa[(s,a)] += 1

        # Update the number of times that s has been visited
        self.Ns[s] += 1

        return -v


In [None]:
from torch import cuda
class Coach():
    """
    This class executes the self-play + learning.
    """
    def __init__(self, game):
        self.game = game
        self.nnet = PolicyNet(game).to("cuda")
        self.pnet = PolicyNet(game).to("cuda")  # the competitor network
        self.mcts = MCTS(game, self.nnet)
        self.epochs = 100  # number of training epochs for each iteration
        self.learning_rate = 0.00005
        self.batch_size = 64  # batch size
        self.trainExamples = []  # historical examples for training
        self.numIters = 12  # number of iterations
        self.numEps = 30  # number of complete self-play games for one iteration.
        self.arenaCompare = 30  # number of games to play during arena play to determine if new net will be accepted.
        self.updateThreshold = 0.6  # During arena playoff, new neural net will be accepted if threshold or more of games are won.
        self.device = torch.device("cuda")

    def train(self):
        for i in range(1, self.numIters + 1):
            print(f'Starting Iter #{i} ...')

            for _ in tqdm(range(self.numEps), desc="Self Play"):
                self.mcts = MCTS(self.game, self.nnet)  # reset search tree
                self.trainExamples.extend(self.executeEpisode()) # save the iteration examples to the history

            # shuffle examples before training
            shuffle(self.trainExamples)

            # training new network, keeping a copy of the old one
            self.pnet.load_state_dict(self.nnet.state_dict())

            optimizer = optim.Adam(self.nnet.parameters(), lr=self.learning_rate, weight_decay=2.0e-4)

            for epoch in range(self.epochs):
                print('EPOCH ::: ' + str(epoch + 1))
                self.nnet.train()

                board_list = []
                pi_list = []
                v_list = []
                for training_example in self.trainExamples[:self.batch_size]:
                    canonicalBoard, pi, v = training_example
                    torch_board = torch.from_numpy(canonicalBoard.copy()).float().view(1, game.getBoardSize()[0], game.getBoardSize()[1])
                    board_list.append(torch_board)
                    pi_list.append(pi)
                    v_list.append(v)

                # define batched states
                batch_board = torch.cat(board_list)
                batch_pi = torch.tensor(pi_list)
                batch_v = torch.tensor(v_list).view(-1)


                # to cuda
                batch_board = batch_board.cuda()
                batch_pi = batch_pi.cuda()
                batch_v = batch_v.cuda()

                net_pi, net_v = self.nnet(batch_board)
                l2_lambda = 0.0
                loss = (net_v - batch_v) ** 2 - torch.sum(batch_pi * net_pi, 1) + l2_lambda * sum((p**2).sum() for p in self.nnet.parameters())

                loss_final = torch.mean(loss)
                optimizer.zero_grad()
                loss_final.backward()
                optimizer.step()


            pmcts = MCTS(self.game, self.pnet)
            nmcts = MCTS(self.game, self.nnet)

            print('PITTING AGAINST PREVIOUS VERSION')
            arena = Arena(lambda x: np.argmax(pmcts.getActionProb(x)),
                          lambda x: np.argmax(nmcts.getActionProb(x)), self.game)
            pwins, nwins, draws = arena.playGames(self.arenaCompare)

            print('NEW/PREV WINS : %d / %d ; DRAWS : %d' % (nwins, pwins, draws))

            if pwins + nwins == 0 or float(nwins) / (pwins + nwins) < self.updateThreshold:
                print('REJECTING NEW MODEL')
                self.nnet.load_state_dict(self.pnet.state_dict())
            else:
                print('ACCEPTING NEW MODEL')
                self.pnet.load_state_dict(self.nnet.state_dict())
                self.trainExamples = []

    def play(self, canonicalBoard):
        """
        Args:
            canonicalBoard: canonical board configuration, a 2D numpy array:
                            1=current player, -1=the opponent, 0=empty
                            first dim is row , second is column
        Returns:
            action: Putting a disc on row x and column y of the board corresponds to action=x*n+y. action=n*n means passing.
            (Row and column are counting from 0 to n-1.)
        """
        mcts = MCTS(self.game, self.nnet)
        action = np.argmax(mcts.getActionProb(canonicalBoard))
        return action

    def executeEpisode(self):
        """
        Returns:
            trainExamples: a list of examples of the form (canonicalBoard, pi, v)
                           pi is the MCTS informed policy vector, v is +1 if
                           the player eventually won the game, -1 if the player lost the game, and otherwise 0.000001
        """
        trainExamples = []
        board = self.game.getInitBoard()
        self.curPlayer = 1
        episodeStep = 0

        while True:
            episodeStep += 1
            canonicalBoard = self.game.getCanonicalForm(board, self.curPlayer)

            # After 10 steps, we use the greedy action rather than a random action
            if episodeStep < 10:
                pi = self.mcts.getActionProb(canonicalBoard)
            else:
                pi = list(np.zeros((self.game.getActionSize(),)))
                pi[np.argmax(self.mcts.getActionProb(canonicalBoard))] = 1

            # Add symmetric samples
            sym = self.game.getSymmetries(canonicalBoard, pi)

            for b, p in sym:
                trainExamples.append([b, self.curPlayer, p, None])

            # Take action according to the policy pi
            action = np.random.choice(len(pi), p=pi)
            board, self.curPlayer = self.game.getNextState(board, self.curPlayer, action)

            r = self.game.getGameEnded(board, self.curPlayer)

            if r != 0:
                # if the current episode of game ended
                trainExamples = [(x[0], x[2], r * ((-1) ** (x[1] != self.curPlayer))) for x in trainExamples]
                return trainExamples



In [None]:
random.seed(0)
np.random.seed(0)
torch.manual_seed(0)
game = OthelloGame(6)  # An Othello game with a 6*6 board
greedy_player = GreedyOthelloPlayer(game).play
coach = Coach(game)

coach.train()
print("\nTESTING")
arena = Arena(coach.play, greedy_player, game)
oneWon, twoWon, draws = arena.playGames(100)
fraction_won = oneWon / 100
print("Fractin won: ", fraction_won)

In [None]:
import os
from google.colab import drive
drive.mount('/content/drive')
GOOGLE_DRIVE_PATH_AFTER_MYDRIVE = "rl_coding_proj2"
GOOGLE_DRIVE_PATH = os.path.join('drive', 'My Drive', GOOGLE_DRIVE_PATH_AFTER_MYDRIVE)
actor_path = "actor_seventh.pth"
path = os.path.join(GOOGLE_DRIVE_PATH, actor_path)
torch.save(coach.nnet.to("cpu").state_dict(), path)

print(os.listdir(GOOGLE_DRIVE_PATH))

In [None]:
game = OthelloGame(6)
greedy_player = GreedyOthelloPlayer(game).play

old_player = Coach(game)
GOOGLE_DRIVE_PATH_AFTER_MYDRIVE = "rl_coding_proj2"
GOOGLE_DRIVE_PATH = os.path.join('drive', 'My Drive', GOOGLE_DRIVE_PATH_AFTER_MYDRIVE)
actor_path = "actor_fourth.pth"
path = os.path.join(GOOGLE_DRIVE_PATH, actor_path)
old_player.nnet.load_state_dict(torch.load(path, map_location=torch.device('cpu')))


arena = Arena(old_player.play, greedy_player, game)
oneWon, twoWon, draws = arena.playGames(50)
fraction_won = oneWon / 50
print("Fractin won: ", fraction_won)



Arena.playGames (1):   0%|          | 0/25 [00:00<?, ?it/s]

Arena.playGames (2):   0%|          | 0/25 [00:00<?, ?it/s]

Fractin won:  0.48


In [None]:
import os
from google.colab import drive
drive.mount('/content/drive')
GOOGLE_DRIVE_PATH_AFTER_MYDRIVE = "rl_coding_proj2"
GOOGLE_DRIVE_PATH = os.path.join('drive', 'My Drive', GOOGLE_DRIVE_PATH_AFTER_MYDRIVE)
actor_path = "actor_1.pth"
path = os.path.join(GOOGLE_DRIVE_PATH, actor_path)
print(os.listdir(GOOGLE_DRIVE_PATH))
torch.save(coach.nnet.to("cpu").state_dict(), path)


print(os.listdir(GOOGLE_DRIVE_PATH))

In [None]:
## Sample test
import os
uploaded = files.upload()

import random
import numpy as np
import torch
import agent2

random.seed(0)
np.random.seed(0)
torch.manual_seed(0)
game = OthelloGame(6)  # An Othello game with a 6*6 board
greedy_player = GreedyOthelloPlayer(game).play
print("\nTESTING")
arena = Arena(agent2.play, greedy_player, game)
oneWon, twoWon, draws = arena.playGames(100)
fraction_won = oneWon / 100
print("Fractin won: ", fraction_won)

Upload the file 'agent.py':


Saving agent2.py to agent2.py

TESTING


Arena.playGames (1):   0%|          | 0/50 [00:00<?, ?it/s]

Arena.playGames (2):   0%|          | 0/50 [00:00<?, ?it/s]

Fractin won:  1.0
