<a href="https://colab.research.google.com/github/kaneelgit/ConnectFourZero/blob/main/Basic_Deep_QL_Connect4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
#clone the repo
!git clone https://github.com/kaneelgit/ConnectFourZero.git

fatal: destination path 'ConnectFourZero' already exists and is not an empty directory.


In [5]:
#import libraries
import numpy as np
import pandas
import matplotlib.pyplot as plt

import tensorflow as tf
import tensorflow_probability as tfp

from tensorflow.keras.models import Sequential
from tensorflow.keras import layers
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import Accuracy

tfd = tfp.distributions
tfpl = tfp.layers

from sklearn.preprocessing import MinMaxScaler
from collections import deque
import random
import sys
# sys.path.append('/ConnectFourZero')
from ConnectFourZero.ConnectFour.game import ConnectFour

In [6]:
device = tf.test.gpu_device_name()
device

'/device:GPU:0'

In [7]:
#create a Neural Network Model

#create a convolution block
class ConvBlock(tf.keras.layers.Layer):

    def __init__(self, in_channels, out_channels, kernel_size = 3, stride = 1, padding = "same"):
        super(ConvBlock, self).__init__()
        self.conv = tf.keras.layers.Conv2D(filters=out_channels, kernel_size=kernel_size, strides=stride, padding=padding)
        self.batchnorm = tf.keras.layers.BatchNormalization()
        self.relu = tf.keras.layers.ReLU()

    def call(self, inputs):
        x = self.conv(inputs)
        x = self.batchnorm(x)
        x = self.relu(x)
        return x


class ResidualBlock(tf.keras.layers.Layer):

    def __init__(self, in_channels, out_channels):
        super(ResidualBlock, self).__init__()
        self.conv1 = ConvBlock(in_channels, out_channels)
        self.conv2 = ConvBlock(in_channels, out_channels)
        if in_channels != out_channels:
            self.shortcut = tf.keras.layers.Conv2D(filters=out_channels, kernel_size=1, strides=1, padding="same")
        else:
            self.shortcut = lambda x: x

    def call(self, inputs):
        residual = self.shortcut(inputs)
        x = self.conv1(inputs)
        x = self.conv2(x)
        x = x + residual
        x = tf.nn.relu(x)
        return x


model = tf.keras.Sequential([
    ConvBlock(3, 64, kernel_size = 3),
    ResidualBlock(64, 64),
    ResidualBlock(64, 64),
    ResidualBlock(64, 64),
    ResidualBlock(64, 64),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(6, activation = 'linear')
])

model.compile(loss = 'mean_squared_error', optimizer = Adam())
model.build(input_shape = (None, 5, 6, 3))

In [9]:
model.load_weights('/content/model_weights_59.h5')

In [10]:
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv_block (ConvBlock)      (None, 5, 6, 64)          2048      
                                                                 
 residual_block (ResidualBl  (None, 5, 6, 64)          74368     
 ock)                                                            
                                                                 
 residual_block_1 (Residual  (None, 5, 6, 64)          74368     
 Block)                                                          
                                                                 
 residual_block_2 (Residual  (None, 5, 6, 64)          74368     
 Block)                                                          
                                                                 
 residual_block_3 (Residual  (None, 5, 6, 64)          74368     
 Block)                                                 

In [11]:
def board_state_int(board, current_player):

    board_state = np.zeros([5, 6, 3])

    for r, row in enumerate(board):
        for c, col in enumerate(row):
            if col == 'X':
                board_state[r, c, 0] = 1
            if col == 'O':
                board_state[r, c, 1] = 2
            if col == ' ':
                board_state[r, c, 2] = current_player

    return board_state



In [14]:
import pickle
from collections import deque

# Load data from the pickle file
file_name = 'my_deque.pkl'
try:
    with open(file_name, 'rb') as file:
        loaded_data = pickle.load(file)
except FileNotFoundError:
    print(f"File '{file_name}' not found.")
    loaded_data = None

# Initialize a deque with the loaded data
if loaded_data:
    try:
        loaded_deque = deque(loaded_data)
        print("Deque loaded from pickle file:", len(loaded_data))
    except TypeError as e:
        print(f"Error loading deque: {e}")

Deque loaded from pickle file: 389


In [15]:
#variables
gamma = 0.95
epsilon = 0.9
epsilon_min = 0.05
epsilon_decay = 0.003
# memory = deque(maxlen = 2000)
memory = loaded_data
batch_size = 32
episodes = 100

In [16]:
#training loop
for episode in range(59, episodes):

    #start new game
    c4 = ConnectFour()

    #select current player
    rand_choice = np.random.randint(1, 3, size = 1) #if 1 computer plays first if 2 computer plays second
    if rand_choice == 1:
        computer = 'X'
    else:
        computer = 'O'

    #current state
    cp = 1 if c4.current_player == 'X' else 2 #since this is the beginning current player is passed on to the first state representation
    state = board_state_int(c4.board, cp)

    #append first state
    current_game_states = []
    current_game_states.append(state)

    #bool to start game and break loop
    play_game = True

    while play_game:

        if np.random.rand() <= epsilon:
            ## random action
            move = np.random.randint(0, 6)

        else:
            ## predict from the model
            with tf.device(device):
              q_preds = model.predict(state[np.newaxis, :])

            move = np.argmax(q_preds)

        if c4.make_move(move):
            next_player =  2 if c4.current_player == 'X' else 1 #find the next player using the current player
            state = board_state_int(c4.board, next_player)
            current_game_states.append(state)

            if c4.check_winner():
                winner = c4.winner
                # c4.print_board()
                break
            if all(cell != ' ' for row in c4.board for cell in row):
                winner = 'draw'
                # c4.print_board()
                break

            c4.current_player = 'O' if c4.current_player == 'X' else 'X'

        if len(memory) > batch_size:

            #get a mini batch
            mini_batch = random.sample(memory, batch_size)

            inputs = tf.zeros((batch_size, state.shape[0], state.shape[1], state.shape[2]))
            outputs = tf.zeros((batch_size, 6)) #6 is the number of columns

            #get stuff from the mini batch and get qu values and stuff
            for i, (cs, ns, r) in enumerate(mini_batch):

                with tf.device(device):

                  q_value = r + gamma * tf.reduce_max(model.predict(ns[np.newaxis, :]))

                  #predicted q values
                  pred_q_values = model.predict(cs[np.newaxis, :])

                #add the new q value
                pred_q_values[0][move] = q_value

                inputs.numpy()[i] = cs
                outputs.numpy()[i] = pred_q_values

            model.fit(inputs, outputs, verbose = 0, epochs = 3)
            epsilon = epsilon - epsilon_decay

    #if winner is 'X' and random choice was 1. you are the winner add the reward and stuff (+ 1 for win -1 for loss 0 for draw)
    if winner == computer:
        reward = 1
    elif winner != computer:
        reward = -1
    else:
        reward = 0

    #load stuff to memory
    for i in range(0, len(current_game_states) - 1):
        memory.append((current_game_states[i], current_game_states[i + 1], reward))

    #save model every 10 episodes

    if episode % 5 == 0:
        # save_dir = f'/content/drive/MyDrive/Colab Notebooks/C4/model_weights_{episode}.h5' #google drive
        save_dir = f"/content/model_weights_{episode}.h5"
        model.save_weights(save_dir)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m


KeyboardInterrupt: ignored

In [17]:
episode

71

In [None]:
save_dir

'/content/model_weights_20.h5'

In [18]:
model.save_weights('/content/model_weights_71.h5')

In [19]:
import pickle
from collections import deque

# Create a deque
my_deque = memory

# Save the deque to a pickle file
file_name = 'my_deque2.pkl'  # Replace 'my_deque.pkl' with your desired file name
try:
    with open(file_name, 'wb') as file:
        pickle.dump(my_deque, file)
    print(f"Deque saved to '{file_name}'")
except Exception as e:
    print(f"Error saving deque: {e}")

Deque saved to 'my_deque2.pkl'
