In [1]:
from kaggle_environments import evaluate, make, utils
import random
import numpy as np
import torch
from tqdm import tqdm

env = make("connectx", debug=True)

n_col, n_row = env.configuration.columns, env.configuration.rows

  from .autonotebook import tqdm as notebook_tqdm


termcolor not installed, skipping dependency
No pygame installed, ignoring import


In [6]:
# build your model
import torch.nn as nn
# we need an Actor, a Critic
# the actor needs to be able to evaluate a policy based off of the value provided by the Critic
# the Critic needs to be able to provide the value

class Critic(nn.Module):
    def __init__(self, input_shape, k_size = 3):
        super(Critic, self).__init___()
        self.in_lay = nn.Conv2d(input_shape, input_shape, kernel_size = k_size) # k_size of 3 so that it is 3 in both directions and captures the in a row components (of 4 normally)
        self.fc1 = nn.Dense(input_shape*input_shape*k_size, 256)
        self.fc2 = nn.Linear(256, 128)
        self.out_lay = nn.Linear(128,1) # output shape is 1, it is the index of the best action
    
    def forward(self, x):
        x = self.in_lay(x)
        x = self.fc1(x)
        x = self.fc2(x)
        return self.out_lay(x)

class Actor:
    def __init__(self, input_shape, output_shape):
        super(Actor, self).__init__()
        self.in_lay = nn.Linear(input_shape, 256)
        self.hid_lay = nn.Linear(256, 128)
        self.out_lay = nn.Linear(128, output_shape)

    def forward(self, x):
        x = torch.relu(self.in_lay(x))
        x = torch.relu(self.hid_lay(x))
        logits = self.out_lay(x)
        logits = torch.clamp(logits, min=-10, max=10)  # Prevent overflow
        log_probs = torch.log_softmax(logits, dim=-1)
        return torch.exp(log_probs)  # back to probabilities

In [None]:
# Train against a random agent

# all encompassing for loop
n_action_space = env.configuration['columns']
q_table = QTable(n_action_space)


games = 10000
trainer = env.train([None, "negamax"])
mark = 1
progression = []
eps_ub = 1
eps_lb = 0.1
eps_decay_rank = 2000
shown = False


for g in tqdm(range(games)):

    if g > eps_decay_rank:
        if not shown:
            print('Now decaying epsilon to 0.1')
            shown = True
        eps = eps_ub - (g-eps_decay_rank)*(eps_ub-eps_lb)/(games-eps_decay_rank)
    else:
        eps = eps_ub
    observation = trainer.reset()
    epochs, total_reward = 0, 0
    done = False
    q_learner = QLearner(q_table, n_action_space, epsilon = eps, mark = mark)


    while not done:
        # read state
        state = observation['board']

        # pick a move
        board_col = observation['board'][0:n_col]
        valid_moves = [i for i in range(len(board_col)) if board_col[i]==0]
        action = q_learner.choose_action(state, valid_moves, learning=True)

        # make a move, observe the next state
        next_state, reward, done, _ = trainer.step(action)

        # format reward:
        if done :
            if reward == None:
                reward = -1
        if reward == 0.5:
            reward = 0
        elif reward == 0:
            reward = -1  

        # update the QTable
        q_learner.update_quality(action, state, next_state['board'], reward, done)

        # prepare next step
        observation = next_state
        total_reward += reward

        if done:
            progression.append(total_reward)
        
        epochs += 1

        



In [12]:
# let's save the table as it is:
import pickle
from datetime import datetime

# Get the current date and time
current_datetime = datetime.now()

# Format the date and time as a string down to the second
formatted_datetime = current_datetime.strftime("%Y-%m-%d %H:%M:%S")

# Save the dictionary
with open(f'q_table_{formatted_datetime}.pkl', 'wb') as file:
    pickle.dump(q_table.table, file)

In [None]:
# Train against a heuristic agent

from out.heuristics.submission import my_agent

# all encompassing for loop
n_action_space = env.configuration['columns']
# q_table = QTable(n_action_space)


games = 10000
trainer = env.train([None, my_agent])
mark = 1
progression = []
eps_ub = 1
eps_lb = 0.1
eps_decay_rank = 2000
shown = False


for g in tqdm(range(games)):

    if g > eps_decay_rank:
        if not shown:
            print('Now decaying epsilon to 0.1')
            shown = True
        eps = eps_ub - (g-eps_decay_rank)*(eps_ub-eps_lb)/(games-eps_decay_rank)
    else:
        eps = eps_ub
    observation = trainer.reset()
    epochs, total_reward = 0, 0
    done = False
    q_learner = QLearner(q_table, n_action_space, epsilon = eps, mark = mark)


    while not done:
        # read state
        state = observation['board']

        # pick a move
        board_col = observation['board'][0:n_col]
        valid_moves = [i for i in range(len(board_col)) if board_col[i]==0]
        action = q_learner.choose_action(state, valid_moves, learning=True)

        # make a move, observe the next state
        next_state, reward, done, _ = trainer.step(action)

        # format reward:
        if done :
            if reward == None:
                reward = -1
        if reward == 0.5:
            reward = 0
        elif reward == 0:
            reward = -1  

        # update the QTable
        q_learner.update_quality(action, state, next_state['board'], reward, done)

        # prepare next step
        observation = next_state
        total_reward += reward

        if done:
            progression.append(total_reward)
        
        epochs += 1

        



In [17]:
tmp_dict_q_table = q_table.table.copy()
dict_q_table = dict()

for k in tmp_dict_q_table:
    if np.count_nonzero(tmp_dict_q_table[k]) > 0:
        dict_q_table[k] = int(np.argmax(tmp_dict_q_table[k]))

In [18]:
q_learner_agent = '''def my_agent(observation, configuration):
    from random import choice

    q_table = ''' \
    + str(dict_q_table).replace(' ', '') \
    + '''

    board = observation.board
    board.append(observation.mark)
    state_key = list(map(str, board))
    state_key = hex(int(''.join(state_key), 3))[2:]

    if state_key not in q_table.keys():
        return choice([c for c in range(configuration.columns) if observation.board[c] == 0])

    action = q_table[state_key]

    if observation.board[action] != 0:
        return choice([c for c in range(configuration.columns) if observation.board[c] == 0])

    return action
    '''

In [19]:
with open('new_submission.py', 'w') as f:
    f.write(q_learner_agent)

In [None]:
# Observe your model play against itself

from kaggle_environments import make
import sys

# Import the agent directly
from submission import my_agent

# Run the environment with the agent
env = make("connectx", debug=True)
env.run([my_agent, my_agent])

print("Success!" if env.state[0].status == env.state[1].status == "DONE" else "Failed...")

# Display the game
env.render(mode="ipython")