In [1]:
!pip install kaggle_environments==0.1.6

from kaggle_environments import make, evaluate
import tensorflow as tf
from tensorflow import keras
import random
import numpy as np
import tqdm

['random', 'negamax']


In [202]:
# create the DQN

action_space = 7
input_space = (6,7)

model = tf.keras.Sequential([
  tf.keras.layers.Dense(20, activation=tf.nn.relu, input_shape=input_space),  
  tf.keras.layers.Dense(20, activation=tf.nn.relu),
  tf.keras.layers.Dense(action_space)
])

model.summary()

Model: "sequential_3"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_11 (Dense)            (None, 6, 20)             160       
                                                                 
 dense_12 (Dense)            (None, 6, 20)             420       
                                                                 
 dense_13 (Dense)            (None, 6, 7)              147       
                                                                 
Total params: 727
Trainable params: 727
Non-trainable params: 0
_________________________________________________________________


In [213]:
# helper functions

def epsilonDecision(epsilon):
    return random.choices(['model','random'], weights = [1 - epsilon, epsilon])[0]


def chooseAction(model, observation, epsilon):
    action_decision = epsilonDecision(epsilon)
    observation = np.array([observation])
    preds = model.predict(observation)
    weights = tf.nn.softmax(preds).numpy()[0]
    
    if action_decision == 'model':
        action = np.argmax(weights)
    if action_decision == 'random':
        action = random.randint(0,6)
        
    return int(action), weights


def checkValid(obs, action):
    valid_actions = set([0,1,2,3,4,5,6])
    try:
        if obs[0,action] != 0:
            valid_actions = valid_actions - set([action])
            action = random.choice(list(valid_actions))
    except:
        action = random.choice(list(valid_actions))
    return action


def getReward(winner, state):
    if not state:
        reward = 0
    if state: 
        if winner == 1:
            reward = 50
        if winner == -1:
            reward = -50
        if winner == 0:
            reward = -50
    return reward

In [204]:
class Experience:
    def __init__(self):
        self.clear() 
        
    def clear(self):
        self.observations = []
        self.actions = []
        self.rewards = []
        
    def store_experience(self, new_obs, new_act, new_reward):
        self.observations.append(new_obs)
        self.actions.append(new_act)
        self.rewards.append(new_reward)

In [205]:
def compute_loss(target_q_values, predicted_q_values):
    return tf.reduce_mean(tf.square(target_q_values - predicted_q_values))


def update_q_network(states, actions, rewards, gamma=0.99):
    with tf.GradientTape() as tape:
        q_values = model(states)
        selected_action_indices = tf.range(0, tf.shape(actions)[0]) * tf.shape(q_values)[1] + actions
        selected_q_values = tf.gather(tf.reshape(q_values, [-1]), selected_action_indices)
        target_q_values = rewards
        loss = compute_loss(target_q_values, selected_q_values)
   
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    return loss

In [206]:
def train_agent(n):
    env = make("connectx", debug=True)
    optimizer = tf.keras.optimizers.Adam()
    replay = Experience()
    epsilon = 1
    epsilon_rate = 0.995
    win_track = []

    for episode in tqdm.tqdm(range(n)):
        trainer = env.train([None, 'random'])
        state = np.array(trainer.reset()['board']).reshape(6,7)
        replay.clear()
        epsilon = epsilon*epsilon_rate

        done=False
        while not done:
            action, w = chooseAction(model, state, epsilon)

            while True:
                t_action = action
                action = checkValid(state, t_action)
                if t_action==action:
                    break

            new_state, winner, done, info = trainer.step(action)
            state = np.array(new_state['board']).reshape(6,7)
            reward = getReward(winner, done)
            replay.store_experience(state, action, reward)
            
            if done:
                win_track.append(winner)
                update_q_network(np.asarray(replay.observations), replay.actions, replay.rewards)
                break
        
    print("Training against random agent finished")
    return (np.count_nonzero(np.asarray(win_track)==1)/n)*100

In [214]:
def q_agent(observation, configuration):
    action = chooseAction(model, np.asarray(observation.board).reshape(6,7), 1)
    return action[0]

def test(observation, configuration):
    return 1

In [215]:
env.reset()
env.run([q_agent, "random"])
env.render()

+---+---+---+---+---+---+---+
| 0 | 0 | 0 | 0 | 0 | 0 | 0 |
+---+---+---+---+---+---+---+
| 0 | 0 | 0 | 0 | 0 | 0 | 0 |
+---+---+---+---+---+---+---+
| 0 | 0 | 2 | 0 | 2 | 0 | 1 |
+---+---+---+---+---+---+---+
| 0 | 0 | 1 | 0 | 2 | 0 | 2 |
+---+---+---+---+---+---+---+
| 0 | 1 | 1 | 0 | 1 | 1 | 1 |
+---+---+---+---+---+---+---+
| 2 | 2 | 2 | 2 | 1 | 2 | 1 |
+---+---+---+---+---+---+---+

