In [1]:
# Make Kaggle Env
from kaggle_environments import evaluate, make, utils
import numpy as np
import gym
import random
from random import choice
from tqdm import tqdm

env = make("connectx", debug=True)

Loading environment lux_ai_s2 failed: No module named 'vec_noise'


In [2]:
# Test Out the environment
env.render(mode="ipython", width=500, height=450)

In [3]:
class QTable:
    def __init__(self, action_space):
        self.table = dict()
        self.action_space = action_space
        
    def add_item(self, state_key):
        self.table[state_key] = list(np.zeros(self.action_space.n))
    
    def __call__(self, state):
        board = state.board[:] # Get a copy
        board.append(state.mark)
        state_key = np.array(board).astype(str)
        state_key = hex(int(''.join(state_key), 3))[2:]
        if state_key not in self.table.keys():
            self.add_item(state_key)
            
        return self.table[state_key]
    

In [4]:
# Environment parameters
cols = 7
rows = 6

action_space = gym.spaces.Discrete(cols)
observation_space = gym.spaces.Discrete(cols * rows)

In [5]:
# configure hyper-parameters
alpha =  0.1
gamma = 0.6
epsilon = 0.99
min_epsilon = 0.1

episodes = 500
alpha_decay_step = 1000
alpha_decay_rate = 0.9
epsilon_decay_rate = 0.9999

In [6]:
q_table = QTable(action_space)
trainer = env.train([None, "negamax"])

all_epochs = []
all_total_rewards = []
all_avg_rewards = []
all_q_table_rows = []
all_epsilons = []

for i in tqdm(range(episodes)):
    state = trainer.reset()
    epsilon = max(min_epsilon, epsilon * epsilon_decay_rate)
    epochs, total_rewards = 0, 0
    done = False
    
    while not done:
        if random.uniform(0, 1) < epsilon:
            action = choice([c for c in range(action_space.n) if state.board[c] == 0])
        
        else:
            row = q_table(state)[:]
            selected_items = []
            for j in range(action_space.n):
                if state.board[j] == 0:
                    selected_items.append(row[j])
                else:
                    selected_items.append(-1e7)
            action = int(np.argmax(selected_items))
                
        next_state, reward, done, info = trainer.step(action)
        
        # apply new rules
        if done:
            if reward == 1:
                reward = 20
            elif reward == 0:
                reward = -20
            else:
                reward = 10
                
        else:
            reward = -0.05
        
        old_value = q_table(state)[action]
        next_max = np.argmax(q_table(next_state))
        
        # update q value
        new_value = (1 - alpha) * old_value + alpha * (reward + gamma * next_max)
        q_table(state)[action] = new_value
        
        state = next_state
        epochs += 1
        total_rewards += reward
    
    all_epochs.append(epochs)
    all_total_rewards.append(total_rewards)
    avg_rewards = np.mean(all_total_rewards[max(0, i - 100) : (i + 1)])
    all_avg_rewards.append(avg_rewards)
    all_q_table_rows.append(len(q_table.table))
    all_epsilons.append(epsilon)
    
    if (i +1) % alpha_decay_step == 0:
        alpha += alpha_decay_rate
    

100%|██████████| 500/500 [11:51<00:00,  1.42s/it]


In [7]:
tmp_dict_q_table = q_table.table.copy()
dict_q_table = dict()

for k in tmp_dict_q_table:
    if np.count_nonzero(tmp_dict_q_table[k]) > 0:
        dict_q_table[k] = int(np.argmax(tmp_dict_q_table[k]))

In [8]:
def my_agent(observation, configuration):
    from random import choice
    
    q_table = dict_q_table
    board = observation.board[:]
    board.append(observation.mark)
    state_key = list(map(str, board))
    state_key = hex(int(''.join(state_key), 3))[2:]
    
    if state_key not in q_table.keys():
        return choice([c for c in range(configuration.columns) 
                       if observation.board[c] == 0])
    action = q_table[state_key]
    
    if observation.board[action] != 0:
        return choice([c for c in range(configuration.columns) 
                       if observation.board[c] == 0])
    return action

In [9]:
# Run against negamax
env.reset()
env.run([my_agent, "negamax"])
env.render(mode="ipython")

In [9]:
agent = """def my_agent(observation, configuration):
    from random import choice
    
    q_table = """+ str(dict_q_table).replace(" ", "") +"""
    board = observation.board[:]
    board.append(observation.mark)
    state_key = list(map(str, board))
    state_key = hex(int(''.join(state_key), 3))[2:]
    
    if state_key not in q_table.keys():
        return choice([c for c in range(configuration.columns) 
                       if observation.board[c] == 0])
    action = q_table[state_key]
    
    if observation.board[action] != 0:
        return choice([c for c in range(configuration.columns) 
                       if observation.board[c] == 0])
    return action """

In [10]:
with open("submission.py", 'w') as f:
    f.write(agent)

In [35]:
# import saved agent
from submission import my_agent

In [11]:
def agent_alphabeta(obs, config):

    
    import numpy as np
    import random

    def is_terminal_window(window, config):
        return window.count(1) == config.inarow or window.count(2) == config.inarow

    def is_terminal_node(grid, config):

        if list(grid[0, :]).count(0) == 0:
            return True

        for row in range(config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[row, col:col+config.inarow])
                if is_terminal_window(window, config):
                    return True
 
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns):
                window = list(grid[row:row+config.inarow, col])
                if is_terminal_window(window, config):
                    return True
      
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row+config.inarow), range(col, col+config.inarow)])
                if is_terminal_window(window, config):
                    return True
    
        for row in range(config.inarow-1, config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
                if is_terminal_window(window, config):
                    return True
        return False


    def check_window(window, num_discs, piece, config):
        return (window.count(piece) == num_discs and window.count(0) == config.inarow-num_discs)

 
    def count_windows(grid, num_discs, piece, config):
        num_windows = 0

        for row in range(config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[row, col:col+config.inarow])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1

        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns):
                window = list(grid[row:row+config.inarow, col])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1

        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row+config.inarow), range(col, col+config.inarow)])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1

        for row in range(config.inarow-1, config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        return num_windows

 
    def get_heuristic(grid, mark, config):
        num_threes = count_windows(grid, 3, mark, config)
        num_fours = count_windows(grid, 4, mark, config)
        num_threes_opp = count_windows(grid, 3, mark%2+1, config)
        num_fours_opp = count_windows(grid, 4, mark%2+1, config)
        return num_threes - 1e2*num_threes_opp - 1e4*num_fours_opp + 1e6*num_fours

    def drop_piece(grid, col, mark, config):
        next_grid = grid.copy()
        for row in range(config.rows-1, -1, -1):
            if next_grid[row][col] == 0:
                break
        next_grid[row][col] = mark
        return next_grid


    def minimax_alphabeta(node, depth, alpha, beta, maximizingPlayer, mark, config, columns_centered): # columns_centered
        if depth == 0 or is_terminal_node(node, config):
            return get_heuristic(node, mark, config)
  
        valid_moves = [c for c in columns_centered if node[0][c] == 0] 
        if maximizingPlayer:
            value = -np.Inf
            for col in valid_moves:
                child = drop_piece(node, col, mark, config)
                value = max(value, minimax_alphabeta(child, depth-1, alpha, beta, False, mark, config, columns_centered)) 
                alpha = max(alpha, value) 
                if alpha >= beta: 
                    break 
            return value
        else:
            value = np.Inf
            for col in valid_moves:
                child = drop_piece(node, col, mark%2+1, config)
                value = min(value, minimax_alphabeta(child, depth-1, alpha, beta, True, mark, config, columns_centered)) 
                beta = min(beta, value) 
                if beta <= alpha: 
                    break 
            return value


    def score_move(grid, col, mark, config, nsteps, columns_centered): 
        next_grid = drop_piece(grid, col, mark, config)
        return minimax_alphabeta(next_grid, nsteps-1, -np.Inf, np.Inf, False, mark, config, columns_centered) 


    N_STEPS = 5

  
    valid_moves = [c for c in range(config.columns) if obs.board[c] ==0]
    print("Valid Moves", len(valid_moves))

    columns = [c for c in range(config.columns) if obs.board[c] == 0]
    dist_from_center = {c: abs(c-(config.columns-1)/2) for c in columns}
    columns_centered = [k for k, v in sorted(dist_from_center.items(), key=lambda item: item[1])]

    grid = np.asarray(obs.board).reshape(config.rows, config.columns)

    scores = dict(zip(columns_centered, [score_move(grid, col, obs.mark, config, N_STEPS, columns_centered) for col in columns_centered])) # columns_centered

    max_cols = [key for key in scores.keys() if scores[key] == max(scores.values())]

    return random.choice(max_cols)

In [45]:

env.run([agent_alphabeta, agent_alphabeta])


env.render(mode="ipython")

Valid Moves 7
Valid Moves 7
Valid Moves 7
Valid Moves 7
Valid Moves 7
Valid Moves 7
Valid Moves 7
Valid Moves 7
Valid Moves 7
Valid Moves 7
Valid Moves 7
Valid Moves 7
Valid Moves 7
Valid Moves 7
Valid Moves 7
Valid Moves 7
Valid Moves 7
Valid Moves 7
Valid Moves 7
Valid Moves 7
Valid Moves 7
Valid Moves 7
Valid Moves 7
Valid Moves 7
Valid Moves 7
Valid Moves 7
Valid Moves 7
Valid Moves 7
Valid Moves 7
Valid Moves 6
Valid Moves 5
Valid Moves 5
Valid Moves 5
Valid Moves 5
Valid Moves 4
Valid Moves 3
Valid Moves 3


In [14]:

env.run([my_agent, agent_alphabeta])


env.render(mode="ipython")

Valid Moves 7
Timeout: 


In [None]:
import matplotlib.pyplot as plt

In [37]:
def agent_smart(obs, config):
    

    
    import numpy as np
    import random


    def drop_piece(grid, col, piece, config):
        next_grid = grid.copy()
        for row in range(config.rows-1, -1, -1):
            if next_grid[row][col] == 0:
                break
        next_grid[row][col] = piece
        return next_grid

 
    def check_winning_move(obs, config, col, piece):
  
        grid = np.asarray(obs.board).reshape(config.rows, config.columns)
        next_grid = drop_piece(grid, col, piece, config)
  
        for row in range(config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(next_grid[row,col:col+config.inarow])
                if window.count(piece) == config.inarow:
                    return True

        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns):
                window = list(next_grid[row:row+config.inarow,col])
                if window.count(piece) == config.inarow:
                    return True
 
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns-(config.inarow-1)):
                window = list(next_grid[range(row, row+config.inarow), range(col, col+config.inarow)])
                if window.count(piece) == config.inarow:
                    return True

        for row in range(config.inarow-1, config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(next_grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
                if window.count(piece) == config.inarow:
                    return True
        return False
    

    
    valid_moves = [col for col in range(config.columns) if obs.board[col] == 0]
    for col in valid_moves:
        if check_winning_move(obs, config, col, obs.mark):
            return col
    for col in valid_moves:
        if check_winning_move(obs, config, col, obs.mark%2+1):
            return col
    return random.choice(valid_moves)

In [12]:
def eval_players(p1 , p2 , num_battles : int, games_per_battle = 100, loc='best'):
    p1_wins = []
    p2_wins = []
    draws = []
    count = []    

    for i in range(num_battles):
        p1win, p2win, draw = play_games(games_per_battle,p1, p2 )
        p1_wins.append(p1win*100.0/games_per_battle)
        p2_wins.append(p2win*100.0/games_per_battle)
        draws.append(draw*100.0/games_per_battle)
        count.append(i*games_per_battle)
        p1_wins.append(p1win*100.0/games_per_battle)
        p2_wins.append(p2win*100.0/games_per_battle)
        draws.append(draw*100.0/games_per_battle)
        count.append((i+1)*games_per_battle)

    plt.ylabel('Game outcomes in %')
    plt.xlabel('Game number')

    plt.plot(count, draws, 'r-', label='Draw')
    plt.plot(count, p1_wins, 'g-', label='Player 1 wins')
    plt.plot(count, p2_wins, 'b-', label='Player 2 wins')
    plt.legend(loc=loc, shadow=True, fancybox=True, framealpha =0.7)
    plt.show()

In [13]:

def get_win_percentages(agent1, agent2, n_rounds):

    config = {'rows': 6, 'columns': 7, 'inarow': 4}         
    outcomes = evaluate("connectx", [agent1, agent2], config, [], n_rounds//2)   
    outcomes += [[b,a] for [a,b] in evaluate("connectx", [agent2, agent1], config, [], n_rounds-n_rounds//2)]
    print("Agent 1 Win Percentage:", np.round(outcomes.count([1,-1])/len(outcomes), 2))
    print("Agent 2 Win Percentage:", np.round(outcomes.count([-1,1])/len(outcomes), 2))
    

    

In [14]:
get_win_percentages(agent1=my_agent, agent2=agent_alphabeta, n_rounds=2)

Agent 1 Win Percentage: 0.0
Agent 2 Win Percentage: 1.0


In [15]:
get_win_percentages(agent1=agent_alphabeta, agent2=my_agent, n_rounds=2)

Agent 1 Win Percentage: 1.0
Agent 2 Win Percentage: 0.0


In [41]:
get_win_percentages(agent1="random", agent2=my_agent, n_rounds=2)

Agent 1 Win Percentage: 0.5
Agent 2 Win Percentage: 0.5


In [42]:
get_win_percentages(agent1=my_agent, agent2="random", n_rounds=10)

Agent 1 Win Percentage: 0.6
Agent 2 Win Percentage: 0.4


In [16]:
get_win_percentages(agent1="random", agent2=agent_alphabeta, n_rounds=2)

Agent 1 Win Percentage: 0.0
Agent 2 Win Percentage: 1.0


In [49]:
get_win_percentages(agent1=agent_alphabeta, agent2="random", n_rounds=10)

Agent 1 Win Percentage: 1.0
Agent 2 Win Percentage: 0.0


In [19]:
env.run([agent_smart, agent_alphabeta])

# Show the game
env.render(mode="ipython")

NameError: name 'agent_smart' is not defined

In [None]:
env.run([agent_smart, my_agent])

# Show the game
env.render(mode="ipython")

In [None]:
get_win_percentages(agent1=agent_smart, agent2=agent_alphabeta, n_rounds=100)