In [7]:
import numpy as np 
from IPython.display import clear_output
from tabulate import tabulate
import copy
import time

In [8]:
ROW_COUNT = 6
COLUMN_COUNT = 7
 
def create_board():
    board = np.zeros((6,7))
    return board
 
def drop_piece(board,row,col,piece):
    board[row][col]= piece

def get_valid_locations(board, ROW_COUNT,COLUMN_COUNT):
    top_row = board[ROW_COUNT-1]
    valid_acts = []
    for i in range(COLUMN_COUNT):
        if top_row[i] == 0:
            valid_acts.append(i)
    return valid_acts

def is_valid_location(board,col):
    return  col < COLUMN_COUNT and board[5][col]==0
 
def get_next_open_row(board,col):
    for r in range(ROW_COUNT):
        if board[r][col]==0:
            return r
    
def winning_move(board, piece):
    # Check horizontal locations for win
    for c in range(COLUMN_COUNT-3):
        for r in range(ROW_COUNT):
            if board[r][c] == piece and board[r][c+1] == piece and board[r][c+2] == piece and board[r][c+3] == piece:
                return True
 
    # Check vertical locations for win
    for c in range(COLUMN_COUNT):
        for r in range(ROW_COUNT-3):
            if board[r][c] == piece and board[r+1][c] == piece and board[r+2][c] == piece and board[r+3][c] == piece:
                return True
 
    # Check positively sloped diaganols
    for c in range(COLUMN_COUNT-3):
        for r in range(ROW_COUNT-3):
            if board[r][c] == piece and board[r+1][c+1] == piece and board[r+2][c+2] == piece and board[r+3][c+3] == piece:
                return True
 
    # Check negatively sloped diaganols
    for c in range(COLUMN_COUNT-3):
        for r in range(3, ROW_COUNT):
            if board[r][c] == piece and board[r-1][c+1] == piece and board[r-2][c+2] == piece and board[r-3][c+3] == piece:
                return True
    
    return False

def turn(player,board,agent='human',valid=True):
    
    if agent == 'human':
        if player == -1: name = 2
        else: name = 1
        
        if valid: 
            text = 'Make your Selection(0-6):'
        else: 
            text = 'Invalid choice, make new selection(0-6):'

        col = int(input(f"Player {name}, {text}"))
    
    else:
        valid_acts = get_valid_locations(board,ROW_COUNT=ROW_COUNT,COLUMN_COUNT = COLUMN_COUNT)
        col = agent.make_choice(board,valid_acts,player)

    if is_valid_location(board,col):
        row = get_next_open_row(board,col)
        drop_piece(board,row,col,player)    
        return col
    else: 
        turn(player,board,agent=agent, valid=False)

            
def play_game(init_board = np.zeros((ROW_COUNT,COLUMN_COUNT)), agent_1 = 'human',agent_2 = 'human',printy=True,starting_player=1):
    board = init_board
    
    if printy: 
        print(tabulate(np.flip(board,0)))
    game_over = False
    player = starting_player
    agent = agent_1
    
    while not game_over:
        if player == 1:
            agent = agent_1
        else:
            agent = agent_2
        
        t = turn(player,board, agent)
        
        if printy: 
            clear_output()
            printable_board = np.where(board==-1,2,board)
            print(tabulate(np.flip(printable_board,0)))
            print(f'Move played: {t}')

        if winning_move(board, player): 
            if printy: 
                print(f'player {player} won')
                time.sleep(5)
            game_over = True
            return player
        
        if board.all() != 0:
            game_over = True
            return 0
        player*=-1
        

In [9]:
# play_game()

In [10]:
class random_agent():
    def __init__(self,low,high,agent_name="random ronald"):
        self.name = agent_name
        self.low = low
        self.high = high
    def make_choice(self,board,valid_acts,player):
        try: return np.random.choice(valid_acts)
        except: print('hi',board)

In [11]:
random_ronald = random_agent(0,7)

In [13]:
board=create_board()
play_game(board,agent_1='human',agent_2 = random_ronald)

-  -  -  -  -  -  -
0  0  0  0  0  0  0
0  0  0  0  0  0  0
0  0  0  0  0  0  0
0  0  0  0  0  0  0
0  0  0  0  0  0  0
0  0  0  0  0  0  0
-  -  -  -  -  -  -


KeyboardInterrupt: Interrupted by user

In [14]:
class Node():
    def __init__(self,board, action, player, parent = None,actions=[0,1,2,3,4,5,6], done=False,cupt = 5, gamma = 0.9):
        
        self.board = copy.deepcopy(board)
        self.parent = parent
        self.action = action
        self.children = []
        self.n = 0
        self.player = player
        self.q_value = 0
        self.actions = actions
        self.done = done
        self.cupt = cupt
        self.gamma = gamma
        
    def is_root(self):
        return self.parent is None
    
    def select_best_leaf(self):

        if self.n == 0 or self.done:
            return self
        if len(self.children)==0:
            self.expand()
        best_child = self.children[np.argmax([x.ucb_value() for x in self.children])]
        return best_child.select_best_leaf()
         
        
    def ucb_value(self):
        #q values are for the player in that state, so the neg of the q value is the player taking the action that gets you there.
        return -self.q_value + (self.cupt/np.sqrt(2))*np.sqrt(2*np.log(self.parent.n))/(self.n+1e-5)
        
    def expand(self):
        for i in self.actions:
            board_copy = copy.deepcopy(self.board)
            if is_valid_location(board_copy,i):
                row = get_next_open_row(board_copy,i)
                drop_piece(board_copy,row,i,self.player) 
            done = winning_move(board_copy,self.player)
            child = Node(board=board_copy,action=i,player=self.player*-1,parent=self,done=done,cupt=self.cupt, gamma=self.gamma)
            self.children.append(child)

        #should expand and then end with 'select best_leaf'
    
    def propagate(self,winner):
        self.q_value = (self.q_value * self.n ) + winner*self.player
        self.n+=1
        self.q_value /= self.n
        if not self.is_root():
            self.parent.propagate(winner)

            
def simulation(board,player,iterations, actions=[0,1,2,3,4,5,6], cupt=1, gamma = 0.9):
    reward_log =[]
    starting_point = board
    game_over = False
    root = Node(board, action=None,player=player,parent=None,actions=actions, cupt=cupt, gamma = gamma)
    for i in range(iterations):
        #select the ubc recursion
        node = root
        node = node.select_best_leaf()
        boardy = copy.deepcopy(node.board)
        
        #check if that board is complete
        if node.done: 
            reward = -node.player

        elif boardy.all() != 0:
            reward = 0
        #play out game randomly
        else:
            reward = play_game(boardy,agent_1 = random_ronald, agent_2 = random_ronald, printy=False, starting_player=node.player) 
        node.propagate(reward)
    return root

In [15]:
# time_start = time.time()
# board= create_board()
# node = simulation(board, 1, 400)
# print('This took' ,round(-time_start + time.time(),2),' seconds')

In [16]:
class monte_carlo_player:
    def __init__(self,simulations, actions=7,gamma=0.9, cupt =5):
        self.simulations = simulations
        self.actions = actions
        self.gamma = gamma
        self.cupt = cupt
    def make_choice(self,board,actions_, player):
        available_options = []
        root = simulation(board, player, self.simulations, actions=actions_, cupt = self.cupt, gamma = self.gamma)
        action = actions_[np.argmin([x.q_value for x in root.children])]
        return action

magic_monty1 = monte_carlo_player(150,cupt=1)
magic_monty2 = monte_carlo_player(5000,cupt=20)


In [17]:
winners = []
for i in range(100):
    board=create_board()
    winner = play_game(init_board=board,agent_1=alfie ,agent_2=magic_monty1,printy=False)
    winners.append(winner)
    # board=create_board()
    # winner = play_game(init_board=board,agent_1='human',agent_2=magic_monty1,printy=False)
    # winners.append(winner*-1)
    

NameError: name 'alfie' is not defined

---

---

#### ALPHA-ZERO

In [67]:
x = np.array([0.15,0.15,0.17,0.2,0.23,0.05,0.05])
y = np.array([0.2, 0.1, 0.1,0.3,0.1, 0.15, 0.05])
x@y/7

0.02214285714285714

In [18]:
import torch
from torch.nn import Linear, ReLU, Softmax, Sigmoid

In [19]:
class LambdaLayer(torch.nn.Module):
    def __init__(self, lambd):
        super(LambdaLayer, self).__init__()
        self.lambd = lambd
    def forward(self, x):
        return self.lambd(x)

def lambd(x):
    if x < 0:
        return -1
    else:
        return 1
        
class Linear_Model(torch.nn.Module):
    def __init__(self):
        super().__init__()
        #general use
        self.relu = ReLU()
        self.dense1 = Linear(42,50)
        self.dense2 = Linear(50,50)
        self.dense3 = Linear(50,30)
        self.dense4p = Linear(30,7)
        self.dense4r = Linear(30,1)
        self.softmax = Softmax(dim=0)
        self.sigmoid = Sigmoid()
        self.lamb = LambdaLayer(lambd)
        self.tanh = torch.nn.Tanh()
        
    def forward(self,flat_board): 
        x1 = self.dense1(flat_board); x1=self.relu(x1)
        x2 = self.dense2(x1); x2=self.relu(x2)
        x3 = self.dense3(x2); x3=self.relu(x3)
        x4p = self.dense4p(x3)
        x4r = self.dense4r(x3)
        probs = self.softmax(x4p)
        v = self.tanh(x4r)
#         v = self.lamb(v)
        return probs, v
    
class Dataset(torch.utils.data.Dataset):      
    def __init__(self, x, y_P, y_R): 
        self.x = x.float()
        self.y_P = y_P.float()
        self.y_R = y_R.float()

    def __getitem__(self, index):
        return self.x[index],self.y_P[index],self.y_R[index]

    def __len__(self):
        return len(self.y_R)

In [None]:
class AZ_node():
    def __init__(self, board, action, player, model, actions=7, parent=None, done=False, cupt=1, gamma =0.9, valid=True):
        self.n = 0
        self.board = board
        self.player = player
        self.parent = parent
        self.children = []
        self.action = action
        self.model = model
        self.prob_vector, self.exp_value = self.model((torch.from_numpy(board.flatten()*self.player)).float())
        self.prob_vector = self.prob_vector.detach().numpy()
        self.q_value = 0
        self.cupt = cupt
        self.actions = 7
        self.gamma = gamma
        self.done = done
        self.valid = valid

        
    def perform_mcts_search(self):
        if self.n == 0:
            return self,self.exp_value * self.player #multiply by self.player so it puts in right format

        if self.done:
            return self,-self.player
        

        if len(self.children)==0:
            self.expand()
        best_child = self.children[np.argmax([x.ucb_value() for x in self.children])]
        return best_child.perform_mcts_search()

    def ucb_value(self):
        if not self.valid: 
            return float('-inf')
        return  ( -self.q_value + ( (self.cupt * self.parent.prob_vector[self.action] * np.sqrt(self.parent.n)) / (1+self.n) ))

    def expand(self):
        for i in range(self.actions):
            board_copy = copy.deepcopy(self.board)
            if is_valid_location(board_copy,i):
                row = get_next_open_row(board_copy,i)
                drop_piece(board_copy,row,i,self.player) 
                won = winning_move(board_copy,self.player)
                child = AZ_node(board_copy, i, self.player*-1, model=self.model,parent=self,done=won, cupt=self.cupt, gamma=self.gamma)
            else:
                child = AZ_node(board_copy, i, self.player*-1, model=self.model,parent=self, done = False, cupt=self.cupt, gamma=self.gamma, valid =False)
            self.children.append(child)

    def mcts_propagate(self, reward):
        self.q_value = self.q_value*self.n + reward*self.player
        self.n+=1
        self.q_value = self.q_value / self.n
        if self.parent is not None:
            self.parent.mcts_propagate(reward)

    def sampled_prob_vector(self):
        return [x.n/self.n for x in self.children]

class Alpha_player():
    def __init__(self,model,simulations, actions=7,gamma=0.9, cupt =1):
        self.simulations = simulations
        self.actions = actions
        self.gamma = gamma
        self.cupt = cupt
        self.model = model
        
    def make_choice(self,board,actions_,player):
        
        root = AZ_node(board, model = self.model, player=player,action=None, cupt=self.cupt)
        for i in range(self.simulations):
            node, reward = root.perform_mcts_search() #gets valid options frm board
            node.mcts_propagate(reward)
        col = np.argmax(root.sampled_prob_vector())
        print([np.random.normal(loc=10,scale=10)*x.ucb_value() for x in root.children])
        time.sleep(5)
        return col

In [None]:
np.random.normal(loc=1,scale=0.05)

In [None]:
model = Linear_Model()
alfie = Alpha_player(simulations=120, model = model,cupt=2)

In [55]:
board=create_board()
play_game(board, agent_1 = alfie, agent_2 = 'human')

-  -  -  -  -  -  -
0  0  0  0  0  0  0
0  0  0  0  0  0  0
0  0  0  0  0  0  0
0  0  0  0  0  0  0
0  0  0  0  0  0  0
0  2  0  0  0  1  0
-  -  -  -  -  -  -
Move played: 1
[tensor([0.1953], grad_fn=<AddBackward0>), tensor([0.1982], grad_fn=<AddBackward0>), tensor([0.1979], grad_fn=<AddBackward0>), tensor([0.1959], grad_fn=<AddBackward0>), tensor([0.1976], grad_fn=<AddBackward0>), tensor([0.1961], grad_fn=<AddBackward0>), tensor([0.1968], grad_fn=<AddBackward0>)]


KeyboardInterrupt: 

## def pit_two_agents(agent_1, agent_2, matches):
    winners = []
    for i in range(matches//2):
        board = create_board()
        w = play_game(board, agent_1, agent_2, printy= False)
        winners.append(w)
        board=create_board()
        w = play_game(board, agent_1, agent_2, printy= False, starting_player=-1)
        winners.append(w)
        
    return winners

In [48]:
magic_monty1 = monte_carlo_player(50,cupt=1)
pit_result = pit_two_agents(alfie, magic_monty1, 200)
pit_result

[1,
 -1,
 -1,
 -1,
 1,
 -1,
 -1,
 -1,
 -1,
 -1,
 -1,
 -1,
 1,
 1,
 -1,
 -1,
 -1,
 1,
 1,
 1,
 -1,
 -1,
 -1,
 -1,
 -1,
 -1,
 1,
 -1,
 1,
 -1,
 1,
 1,
 1,
 -1,
 -1,
 1,
 -1,
 -1,
 -1,
 1,
 1,
 -1,
 -1,
 1,
 1,
 -1,
 -1,
 -1,
 -1,
 -1,
 -1,
 1,
 1,
 -1,
 -1,
 -1,
 -1,
 1,
 -1,
 -1,
 1,
 -1,
 -1,
 -1,
 -1,
 -1,
 1,
 -1,
 -1,
 1,
 -1,
 1,
 -1,
 1,
 -1,
 -1,
 -1,
 -1,
 -1,
 -1,
 -1,
 -1,
 -1,
 -1,
 1,
 1,
 -1,
 1,
 -1,
 -1,
 1,
 -1,
 -1,
 -1,
 -1,
 -1,
 1,
 1,
 1,
 -1]

In [19]:
def training_loop(model, training_epochs, games_per_epoch, mcts_search, batch_size, crit1, crit2, mini_epochs, cupt):
    candidate_mod = Linear_Model()
    candidate_mod.load_state_dict(model.state_dict())
    for i in range(training_epochs):
        m = 0
        episodes = {}
 
        for game in range(games_per_epoch):
            states, prob_vecs, rewards = [], [] ,[]
            states, prob_vecs, rewards= run_episode(mcts_search, candidate_mod, cupt, states, prob_vecs, rewards)
            m+=len(rewards)
            episodes[game] = {
                              'states':states,
                              'prob_vecs': prob_vecs,
                              'rewards':rewards,
                                'm': m
                             }
            
            
        x, y_probs, y_rewards = tensorfy_data(episodes,m)    
        dLoader = dataloader_func(x, y_probs, y_rewards, batch_size)
        candidate_mod = train_system(dLoader, candidate_mod, optimizer, crit1, crit2, mini_epochs)
    return 1, 2, 3
    candidate_agent = Alpha_player(simulations = mcts_searches, model = candidate_model, cupt=1)
    old_agent = Alpha_player(simulations = mcts_searches, model = model, cupt=1)
    score_v_monty = pit_two_agents(candidate_agent, magic_monty1, 4)
    print(f'Score v Monty: {np.mean(score_v_monty)}')
    score_v_old_agent = pit_two_agents(candidate_agent, magic_monty1, 5)
    print(f'Score v old agent: {np.mean(score_v_old_agent)}')
    return candidate_mod, score_v_monty, score_v_old_agent

def run_episode(mcts_searches,candidate_mod,cupt, states, prob_vecs, rewards, gamma=0.9):
    
    board = create_board()
    current_player = 1
    is_done = False
    
    while True:
        
        root = AZ_node(board, action= None, player=current_player, model=candidate_mod, cupt=cupt, gamma=gamma) #node with starting board
        
        for i in range(mcts_searches):
            node, reward = root.perform_mcts_search() #gets valid options frm board
            node.mcts_propagate(reward)
            
        #log 
        states.append(root.board.flatten() * root.player) #if current_player = -1, store the state as *= -1 so it's in first person mode.
        prob_vecs.append(root.sampled_prob_vector())
        rewards.append(current_player) #we can use this to multiply by the reward later
        
        #makemove
        col = np.argmax(root.sampled_prob_vector())
        row = get_next_open_row(board,col)
        drop_piece(board,row,col,current_player)    
        
        #check win and add rewards
        if winning_move(board,current_player):
            reward = current_player
            rewards = [x*reward for x in rewards] 
            break
        if board.all() != 0:
            reward = 0
            rewards = [x*0 for x in rewards]
            break
        current_player*=-1
    return states, prob_vecs, rewards

def tensorfy_data(episodes,m):
    x = torch.zeros((m,42))
    y_P = torch.zeros((m,7))
    y_R = torch.zeros((m,1))
    progress = 0 
    for value in episodes.values():
        x[progress:value['m']] = torch.from_numpy(np.array(value['states']))
        y_P[progress:value['m']] = torch.from_numpy(np.array(value['prob_vecs']))
        y_R[progress:value['m']] = torch.from_numpy(np.array(value['rewards']).reshape(-1,1))
        progress = value['m']
    return x, y_P, y_R

def dataloader_func(x, y_probs, y_rewards, batch_size):
    dataset = Dataset(x, y_probs, y_rewards)
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size,shuffle=True)
    return dataloader
    
def train_system(dataloader, model, optimizer, crit1, crit2, epochs):
    model.train()
    for x, y_p, y_r in dataloader:
        x, y_p, y_r = x.float(), y_p.float(), y_r.float()
        probas, reward = model(x)
        loss1 = crit1(reward, y_r)
        loss2 = crit2(probas, y_p)
        loss = loss1 - loss2
        print(loss)
        loss.backward()
        optimizer.step(); optimizer.zero_grad()
    return model

In [20]:
####NEED TO REMOVE GAMMA
model = Linear_Model()
optimizer = torch.optim.RMSprop(model.parameters(),lr=0.001)
crit_1 = torch.nn.MSELoss()
def prob_loss(probas, y_p):
    return torch.mean(torch.sum(y_p * torch.log(probas),dim=1))
crit_2 = prob_loss
mcts_searches = 300
training_epochs = 2
games_per_epoch = 2
batch_size = 32
mini_epochs = 2
cupt = 2
cand_mod, score_v_monty, score_v_old_agent = training_loop(model, training_epochs, games_per_epoch, mcts_searches, batch_size, crit_1, crit_2, mini_epochs, cupt)
# score_v_monty, score_v_old_agent = training_loop(model, training_epochs, games_per_epoch, mcts_searches, batch_size, crit_1, crit_2, mini_epochs, cupt)

In [None]:
score_v_monty

Error: Kernel is dead