In [2]:
import pandas as pd
import numpy as np
import numpy as np
from sklearn.model_selection import train_test_split
import ast
import tensorflow as tf
from tensorflow.keras import layers

2025-01-28 11:18:50.658246: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [3]:
df2 = pd.read_csv("data/large_board_dataset2.csv")
df3 = pd.read_csv("data/large_board_dataset3.csv")

df = pd.concat([df2, df3], axis = 0)

df.head()
df["x"] = df["x"].apply(lambda x: np.array(ast.literal_eval(x)).reshape(2, 6, 7))

def preprocess_board(board):
    if board["whose_turn"] != "red":
        # Flip the layers to make each board look like it's from the perspective of "red", aka plus
        board["x"] = board["x"][::-1, :, :]
    return board

df = df.apply(preprocess_board, axis=1)

In [4]:
def flip_board(board, col):
    new_board = np.flip(board, axis = 2)
    # for now this needs to be done by index 2 because the input is still 2x6x7, but this can easily be updated
    new_column = 6 - col
    return new_board, new_column

flipped_x = []
flipped_y = []

for index, row in df.iterrows():
    board = row["x"]
    col = row["y"]
    new_board, new_column = flip_board(board, col)

    flipped_x.append(new_board)
    flipped_y.append(new_column)

new_df = pd.DataFrame({"x": flipped_x, "y": flipped_y})

df = pd.concat([df, new_df], ignore_index = True)

In [5]:
X = np.stack(df["x"].values)
X = X.transpose(0, 2, 3, 1)

# get all of the boards out, then reshape them to be 6x7x2

y = df["y"].values

In [6]:
# reshape boards to be 42 x 2 np arrays

X = X.reshape(X.shape[0], -1, 2)

In [7]:
# used chat to make the positional encoding function

def add_positional_encoding(X, d_model):
    """
    Add fixed positional encodings to the input data.
    Args:
        X: numpy array of shape (N, 42, 2) - the input boards.
        d_model: int - embedding size.
    Returns:
        numpy array with positional encodings added, shape (N, 42, d_model).
    """
    N, seq_len, C = X.shape  # N = number of samples, seq_len = 42, C = 2 (channels)
    
    # Initialize positional encoding matrix
    positional_encoding = np.zeros((seq_len, d_model))
    
    for pos in range(seq_len):
        for i in range(d_model):
            if i % 2 == 0:
                positional_encoding[pos, i] = np.sin(pos / (10000 ** (i / d_model)))
            else:
                positional_encoding[pos, i] = np.cos(pos / (10000 ** ((i - 1) / d_model)))
    
    # Add positional encoding to the input data
    # Expand positional_encoding to batch size and concatenate with input
    positional_encoding = np.expand_dims(positional_encoding, axis=0)  # Shape: (1, seq_len, d_model)
    positional_encoding = np.tile(positional_encoding, (N, 1, 1))  # Shape: (N, seq_len, d_model)
    
    # Expand input channels to match positional encoding size
    X = np.concatenate([X, np.zeros((N, seq_len, d_model - C))], axis=-1)  # Shape: (N, seq_len, d_model)
    
    # Add positional encoding
    X += positional_encoding
    return X

In [8]:
d_model = 64  # Desired embedding size
X_with_pos_encoding = add_positional_encoding(X, d_model)
print("Shape after adding positional encoding:", X_with_pos_encoding.shape)  # Expected: (N, 42, 64)

Shape after adding positional encoding: (260848, 42, 64)


In [9]:
# train, test splits

X_train, X_temp, y_train, y_temp = train_test_split(X_with_pos_encoding, y, test_size=0.2, random_state=22)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=22)

In [10]:
# convert everything to tensorflow tensors (is this 100% necessary?)
# is it confusing to keep renaming the same variable? it's just easier for me to write X_train rather than X_train_tensor

X_train = tf.convert_to_tensor(X_train, dtype=tf.float32)
y_train = tf.convert_to_tensor(y_train, dtype=tf.int32)
X_val = tf.convert_to_tensor(X_val, dtype=tf.float32)
y_val = tf.convert_to_tensor(y_val, dtype=tf.int32)
X_test = tf.convert_to_tensor(X_test, dtype=tf.float32)
y_test = tf.convert_to_tensor(y_test, dtype=tf.int32)

In [11]:
def build_transformer_model(seq_len, d_model, num_heads, ff_dim, num_layers, num_classes):
    """
    Build a transformer model for Connect4 move prediction.
    
    Args:
        seq_len: int - Sequence length (42 for Connect4).
        d_model: int - Embedding size (e.g., 64).
        num_heads: int - Number of attention heads.
        ff_dim: int - Size of the feedforward network.
        num_layers: int - Number of transformer encoder layers.
        num_classes: int - Number of output classes (e.g., 7 for Connect4 columns).
    
    Returns:
        A compiled TensorFlow Keras model.
    """
    inputs = layers.Input(shape=(seq_len, d_model))

    # Positional encodings already added during preprocessing

    x = inputs
    for _ in range(num_layers):
        # Multi-Head Self-Attention
        attention_output = layers.MultiHeadAttention(num_heads=num_heads, key_dim=d_model)(x, x)
        attention_output = layers.Dropout(0.1)(attention_output)
        attention_output = layers.LayerNormalization(epsilon=1e-6)(x + attention_output)

        # Feedforward Network
        ff_output = layers.Dense(ff_dim, activation="relu")(attention_output)
        ff_output = layers.Dense(d_model)(ff_output)
        ff_output = layers.Dropout(0.1)(ff_output)
        x = layers.LayerNormalization(epsilon=1e-6)(attention_output + ff_output)

    # Global average pooling over sequence dimension
    x = layers.GlobalAveragePooling1D()(x)

    # Output layer for classification
    outputs = layers.Dense(num_classes, activation="softmax")(x)

    # Build and compile the model
    model = tf.keras.Model(inputs=inputs, outputs=outputs)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"]
    )
    return model

In [15]:
# hyperparameters

seq_len = 42         # flattened boards
d_model = 64         # embedding
num_heads = 2        # attention
ff_dim = 36         # Feedforward network dimension
num_layers = 1       # Number of transformer layers
num_classes = 7      # Number of output classes (columns)

# Build the transformer model
transformer_model = build_transformer_model(seq_len, d_model, num_heads, ff_dim, num_layers, num_classes)
transformer_model.summary()


In [16]:
# Train the transformer
history = transformer_model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=20,
    batch_size=250
)


# Evaluate on test data
test_loss, test_accuracy = transformer_model.evaluate(X_test, y_test)
print(f"Test Loss: {test_loss}, Test Accuracy: {test_accuracy}")


Epoch 1/20
[1m367/835[0m [32m━━━━━━━━[0m[37m━━━━━━━━━━━━[0m [1m3:11[0m 409ms/step - accuracy: 0.1770 - loss: 1.9424