In [None]:
###### PART A: Transformer architecture and training ############

In [5]:
## Working Tx code

import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, Model
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import load_model

class TransformerBlock(layers.Layer):
    def __init__(self, d_model=64, num_heads=4, ff_dim=128, dropout=0.1, **kwargs):
        super().__init__(**kwargs)
        self.attn = layers.MultiHeadAttention(num_heads=num_heads, key_dim=d_model//num_heads)
        self.ffn = tf.keras.Sequential([
            layers.Dense(ff_dim, activation='gelu'),
            layers.Dense(d_model)
        ])
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(dropout)
        self.dropout2 = layers.Dropout(dropout)

    def call(self, inputs, training=False):
        attn_output = self.attn(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)

        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

    def get_config(self):
        config = super().get_config()
        config.update({
            "d_model": 64,
            "num_heads": 4,
            "ff_dim": 128,
            "dropout": 0.1
        })
        return config

    @classmethod
    def from_config(cls, config):
        return cls(**config)


class Connect4Transformer(Model):
    def __init__(self, d_model=64, num_heads=4, ff_dim=128, num_layers=3, dropout=0.1, **kwargs):
        super().__init__(**kwargs)  # Pass **kwargs to the parent class
        self.d_model = d_model
        self.input_projection = layers.Dense(d_model)
        self.row_embed = layers.Embedding(input_dim=6, output_dim=d_model // 2)
        self.col_embed = layers.Embedding(input_dim=7, output_dim=d_model // 2)
        self.transformer_blocks = [TransformerBlock(d_model, num_heads, ff_dim, dropout) for _ in range(num_layers)]
        self.column_attention = layers.MultiHeadAttention(num_heads=2, key_dim=d_model)
        self.output_layer = layers.Dense(7, activation='softmax')

    def call(self, inputs, training=False):
        x = tf.reshape(inputs, (-1, 6, 7, 2))
        batch_size = tf.shape(x)[0]
        rows = tf.tile(tf.range(6)[None, :, None], [batch_size, 1, 7])
        cols = tf.tile(tf.range(7)[None, None, :], [batch_size, 6, 1])

        row_emb = self.row_embed(rows)
        col_emb = self.col_embed(cols)
        pos_encoding = tf.concat([row_emb, col_emb], axis=-1)

        x = self.input_projection(x)
        x += pos_encoding

        x = tf.reshape(x, (-1, 6 * 7, self.d_model))
        for transformer in self.transformer_blocks:
            x = transformer(x)

        column_queries = tf.tile(tf.range(7)[None, :, None], [batch_size, 1, self.d_model])
        context = self.column_attention(column_queries, x)

        return self.output_layer(context[:, :, 0])


def load_and_preprocess(data):
    X = data.iloc[:, :-1].values.astype(np.float32)
    y = data.iloc[:, -1].values.astype(np.int32)
    X = X.reshape(-1, 6, 7, 2)
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)
    return X_train, X_val, y_train, y_val

def create_model():
    model = Connect4Transformer(
        d_model=64,
        num_heads=8,
        ff_dim=128,
        num_layers=3,
        dropout=0.1
    )

    model.compile(
        optimizer=tf.keras.optimizers.Adam(3e-4),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    return model

if __name__ == "__main__":
    df = pd.read_csv("converted_board_with_play_y.csv")
    X_train, X_val, y_train, y_val = load_and_preprocess(df)
    model = create_model()

    # Build the model by calling it once
    dummy_input = tf.zeros((1, 6, 7, 2))
    _ = model(dummy_input)

    history = model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val),
        batch_size=64,
        epochs=10
    )

    # model.save('tx.h5')
    model.save('Converted_8H_10E_64B.keras')

[1m3321/3321[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m761s[0m 225ms/step - accuracy: 0.2335 - loss: 1.8251 - val_accuracy: 0.3212 - val_loss: 1.6799


In [9]:
###### PART B: Retrieve the model ############

In [8]:
import tensorflow as tf
from tensorflow.keras.models import load_model

# Define the custom TransformerBlock and Connect4Transformer classes
class TransformerBlock(tf.keras.layers.Layer):
    def __init__(self, d_model=64, num_heads=4, ff_dim=128, dropout=0.1, **kwargs):
        super().__init__(**kwargs)
        self.attn = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=d_model//num_heads)
        self.ffn = tf.keras.Sequential([
            tf.keras.layers.Dense(ff_dim, activation='gelu'),
            tf.keras.layers.Dense(d_model)
        ])
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = tf.keras.layers.Dropout(dropout)
        self.dropout2 = tf.keras.layers.Dropout(dropout)

    def call(self, inputs, training=False):
        attn_output = self.attn(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)

        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

    def get_config(self):
        config = super().get_config()
        config.update({
            "d_model": 64,
            "num_heads": 8,
            "ff_dim": 128,
            "dropout": 0.1
        })
        return config

    @classmethod
    def from_config(cls, config):
        return cls(**config)

class Connect4Transformer(tf.keras.Model):
    def __init__(self, d_model=64, num_heads=8, ff_dim=128, num_layers=3, dropout=0.1, **kwargs):
        super().__init__(**kwargs)
        self.d_model = d_model
        self.input_projection = tf.keras.layers.Dense(d_model)
        self.row_embed = tf.keras.layers.Embedding(input_dim=6, output_dim=d_model // 2)
        self.col_embed = tf.keras.layers.Embedding(input_dim=7, output_dim=d_model // 2)
        self.transformer_blocks = [TransformerBlock(d_model, num_heads, ff_dim, dropout) for _ in range(num_layers)]
        self.column_attention = tf.keras.layers.MultiHeadAttention(num_heads=2, key_dim=d_model)
        self.output_layer = tf.keras.layers.Dense(7, activation='softmax')

    def call(self, inputs, training=False):
        x = tf.reshape(inputs, (-1, 6, 7, 2))
        batch_size = tf.shape(x)[0]
        rows = tf.tile(tf.range(6)[None, :, None], [batch_size, 1, 7])
        cols = tf.tile(tf.range(7)[None, None, :], [batch_size, 6, 1])

        row_emb = self.row_embed(rows)
        col_emb = self.col_embed(cols)
        pos_encoding = tf.concat([row_emb, col_emb], axis=-1)

        x = self.input_projection(x)
        x += pos_encoding

        x = tf.reshape(x, (-1, 6 * 7, self.d_model))
        for transformer in self.transformer_blocks:
            x = transformer(x)

        column_queries = tf.tile(tf.range(7)[None, :, None], [batch_size, 1, self.d_model])
        context = self.column_attention(column_queries, x)

        return self.output_layer(context[:, :, 0])

# Load the model with custom objects
model = load_model('Converted_8H_10E_64B.keras', custom_objects={'TransformerBlock': TransformerBlock, 'Connect4Transformer': Connect4Transformer})

# Print model summary to verify
model.summary()


In [None]:
###### PART C: Play the game ############

In [10]:
# prompt: download model to local


# -*- coding: utf-8 -*-
"""connect4_final_1

Automatically generated by Colab.

Original file is located at
    https://colab.research.google.com/drive/1CCgmf1e6eGHhf6_3p7b5wg8If0scxkWo
"""

#import anvil.server
import numpy as np
import os
os.environ["TF_TRT_ALLOW_BUILD_FAILURE"] = "1"
import tensorflow as tf
from tensorflow.keras.models import load_model
from tensorflow.keras import layers


# Replace with your actual Anvil Uplink Key
#ANVIL_UPLINK_KEY = "server_YRMKNBRJCQVXQ7JPEMRYXMUL-BLGMXSSKTGUJGMP2"

# Connect to Anvil
#anvil.server.connect(ANVIL_UPLINK_KEY)

print("Connected to Anvil successfully!")

#@anvil.server.callable
def initialize_board():
    """Initialize an empty Connect 4 board."""
    return np.zeros((6, 7, 2), dtype=int).tolist()  # Convert to list for client-side compatibility

#@anvil.server.callable
def drop_token(board, column, player):
    """Drop a token into the specified column."""
    board = np.array(board)  # Convert list back to NumPy array

    for row in range(5, -1, -1):
        if board[row, column, 0] == 0:
            board[row, column, 0] = player
            return board.tolist()  # Convert back to list before returning

    return board.tolist()  # Return unchanged board if column is full

###################

import tensorflow as tf
from tensorflow.keras.models import load_model

# Define the custom TransformerBlock and Connect4Transformer classes
class TransformerBlock(tf.keras.layers.Layer):
    def __init__(self, d_model=64, num_heads=4, ff_dim=128, dropout=0.1, **kwargs):
        super().__init__(**kwargs)
        self.attn = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=d_model//num_heads)
        self.ffn = tf.keras.Sequential([
            tf.keras.layers.Dense(ff_dim, activation='gelu'),
            tf.keras.layers.Dense(d_model)
        ])
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = tf.keras.layers.Dropout(dropout)
        self.dropout2 = tf.keras.layers.Dropout(dropout)

    def call(self, inputs, training=False):
        attn_output = self.attn(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)

        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

    def get_config(self):
        config = super().get_config()
        config.update({
            "d_model": 64,
            "num_heads": 8,
            "ff_dim": 128,
            "dropout": 0.1
        })
        return config

    @classmethod
    def from_config(cls, config):
        return cls(**config)

class Connect4Transformer(tf.keras.Model):
    def __init__(self, d_model=64, num_heads=8, ff_dim=128, num_layers=3, dropout=0.1, **kwargs):
        super().__init__(**kwargs)
        self.d_model = d_model
        self.input_projection = tf.keras.layers.Dense(d_model)
        self.row_embed = tf.keras.layers.Embedding(input_dim=6, output_dim=d_model // 2)
        self.col_embed = tf.keras.layers.Embedding(input_dim=7, output_dim=d_model // 2)
        self.transformer_blocks = [TransformerBlock(d_model, num_heads, ff_dim, dropout) for _ in range(num_layers)]
        self.column_attention = tf.keras.layers.MultiHeadAttention(num_heads=2, key_dim=d_model)
        self.output_layer = tf.keras.layers.Dense(7, activation='softmax')

    def call(self, inputs, training=False):
        x = tf.reshape(inputs, (-1, 6, 7, 2))
        batch_size = tf.shape(x)[0]
        rows = tf.tile(tf.range(6)[None, :, None], [batch_size, 1, 7])
        cols = tf.tile(tf.range(7)[None, None, :], [batch_size, 6, 1])

        row_emb = self.row_embed(rows)
        col_emb = self.col_embed(cols)
        pos_encoding = tf.concat([row_emb, col_emb], axis=-1)

        x = self.input_projection(x)
        x += pos_encoding

        x = tf.reshape(x, (-1, 6 * 7, self.d_model))
        for transformer in self.transformer_blocks:
            x = transformer(x)

        column_queries = tf.tile(tf.range(7)[None, :, None], [batch_size, 1, self.d_model])
        context = self.column_attention(column_queries, x)

        return self.output_layer(context[:, :, 0])


###################

#@anvil.server.callable
def predict_move(board_state):
    """Receives board state from Anvil, returns AI's move."""

    # Load the model with custom objects
    model = load_model('Converted_8H_10E_64B.keras', custom_objects={'TransformerBlock': TransformerBlock, 'Connect4Transformer': Connect4Transformer})

    # Print model summary to verify
    # model.summary()

    # Convert board to model input shape
    input_board = np.array(board_state).reshape(1, 6, 7, 2)
    # print(input_board)
    # print(input_board.shape)

    # Predict best move
    predictions = model.predict(input_board)
    best_move = int(np.argmax(predictions[0].flatten()))

    return best_move


board =initialize_board()
ai_type = "CNN"
current_player = 1


column = 4
board = drop_token(board, column, 1)


current_player = 2
ai_move = predict_move(board)
ai_move

Connected to Anvil successfully!
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 612ms/step


3