# Autoencoder Tic Tac Toe Verifier

This is another approach of verifying games but using an anomaly detection approach instead of classification.

Make sure to use a GPU otherwise you'll take a super long time to train

In [None]:
# check if notebook is in colab
try:
    import google.colab
    import subprocess
    import sys
    subprocess.check_call([sys.executable, "-m", "pip", "install", "ezkl"])
    subprocess.check_call([sys.executable, "-m", "pip", "install", "onnx"])

# rely on local installation of ezkl if the notebook is not in colab
except:
    pass

!RUST_LOG=trace

# Create TicTacToe Game States

The game states are in the form of
```json
[
    {
        "history": [
            [null, null, null, null, null, null, null, null, null],
            ["X", null, null, null, null, null, null, null, null],
            ...
        ],
        "outcome": "X"
    }
]
```

To create the game states for tic tac toe, do a recursive a tree search of possible gameplay.

In [None]:
import json

def check_winner(board):
    winning_combinations = [
        [0, 1, 2], [3, 4, 5], [6, 7, 8],  # rows
        [0, 3, 6], [1, 4, 7], [2, 5, 8],  # columns
        [0, 4, 8], [2, 4, 6]              # diagonals
    ]
    for combo in winning_combinations:
        if board[combo[0]] == board[combo[1]] == board[combo[2]] and board[combo[0]] is not None:
            return board[combo[0]]
    return None

def generate_games(board, player):
    winner = check_winner(board)
    if winner or None not in board:
        # Game is over, save the outcome and board state
        return [{
            "history": [list(board)],
            "outcome": winner if winner else "Draw"
        }]

    games = []
    for i in range(9):
        if board[i] is None:
            new_board = board.copy()
            new_board[i] = player
            next_player = 'O' if player == 'X' else 'X'
            next_games = generate_games(new_board, next_player)
            for game in next_games:
                game["history"].insert(0, list(board))
            games.extend(next_games)

    return games

initial_board = [None for _ in range(9)]
games = generate_games(initial_board, 'X')

with open("tic_tac_toe_games.json", "w") as file:
    file.write("[\n")  # Start of the list
    for i, game in enumerate(games):
        json.dump(game, file, separators=(',', ': '))
        if i != len(games) - 1:  # If it's not the last game, add a comma
            file.write(",\n")
        else:
            file.write("\n")
    file.write("]\n")

Now, let's make a list of illegal game play by running through the legal games and making bad games.

In [None]:
with open("tic_tac_toe_games_bad.json", "w") as file:
    file.write("[\n")
    for i, game in enumerate(games):
        # we the permute the games
        game_history = game['history']
        new_game_history = []
        for moves in game_history:
            new_moves = []
            for move in moves:
                if move is None:
                    new_moves.append('X')
                elif move == 'X':
                    new_moves.append('O')
                else:
                    new_moves.append(None)
            new_game_history.append(new_moves)
        game['history'] = new_game_history

        json.dump(game, file, separators=(',', ': '))

        if i != len(games) - 1:  # If it's not the last game, add a comma
            file.write(",\n")
        else:
            file.write("\n")
    file.write("]\n")


## Create a neural net to verify the execution of the tic tac toe model

1. We use an autoencoder to verify the execution. An autoencoder is essentially a fan-in, fan-out architecture. It can be used for generative tasks, but in this case we use it as an anomaly detection approach.

2. The autoencoder helps us extract out the latent distribution of normal tic tac toe games. If there's a weird tic tac toe game, we can then reject it if the distribution is not within some threshold we want. We can quantify this distribution via the mean absolute error between the data point and the reconstructed data.

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, IterableDataset, random_split
import torch.optim as optim
import json
import torch.nn.functional as F

class TicTacToeNet(nn.Module):
    def __init__(self):
        super(TicTacToeNet, self).__init__()
        self.return_x = True
        self.dense = nn.Linear(11 * 9 * 3, 50)
        self.dense1 = nn.Linear(50, 20)
        self.dense2 = nn.Linear(20, 10)
        self.dense3 = nn.Linear(10, 20)
        self.dense4 = nn.Linear(20, 50)
        self.dense5 = nn.Linear(50, 11 * 9 * 3)

    def forward(self, x):
        # Store original input
        original_x = x.clone()

        # Neural network operations
        x = F.relu(self.dense(x))
        x = F.relu(self.dense1(x))
        x = F.relu(self.dense2(x))
        x = F.relu(self.dense3(x))
        x = F.relu(self.dense4(x))
        x = self.dense5(x)  # Not applying activation on final layer

        # Calculate L1 loss
        l1_loss = torch.mean(torch.abs(x - original_x))

        if self.return_x:
            return x, l1_loss
        else:
            return l1_loss

## Load all possible tictactoe games

We want to load good games and bad games for the classification later.

In [None]:
class JsonDataset(IterableDataset):
    def __init__(self, file):
        self.file_good = file
        self.length = self.compute_length(self.file_good)
        self.data = self.load_data(self.file_good)

    def __iter__(self):
        for i in range(len(self.data)):
            yield self.data[i]

    def parse_json_object(self, line):
        try:
            return json.loads(line)
        except json.JSONDecodeError:
            return None

    def encode_board(self, board):
        encoding = []
        for cell in board:
            if cell == 'X':
                encoding.extend([1,0,0])
            elif cell == 'O':
                encoding.extend([0,1,0])
            else:
                encoding.extend([0,0,1])

        return encoding


    def encode_outcome(self, outcome):
        if outcome == 'X':
            return [1,0,0]
        elif outcome == 'O':
            return [0,1,0]
        else:
            return [0,0,1]

    def compute_length(self, file_good):
        count = 0
        with open(file_good, 'r') as f:
            # Skip the first line (which is "[")
            next(f)
            for line in f:
                if line.strip() not in [",", "]"]:
                    count += 1

        return count

    def __len__(self):
        return self.length

    def __getitem__(self, idx):
        padded_history, sample_outcome = self.data[idx]

        return torch.tensor(padded_history, dtype=torch.float), torch.tensor(sample_outcome, dtype=torch.float)

    def process_file(self, file, is_good):
        data = []
        with open(file, 'r') as f:
            next(f)
            for line in f:
                # Remove the trailing comma for all lines except the last one (which is "]")
                if line.endswith(",\n"):
                    line = line[:-2]
                sample = self.parse_json_object(line)
                if sample is not None:
                    max_length = 10  # Maximum length of a Tic Tac Toe game
                    history = sample['history']

                    if len(history) == max_length:
                        padded_history = history
                    else:
                        padded_history = history + [[None] * 9 for _ in range(max_length - len(history))]

                    padded_history = [self.encode_board(x) for x in padded_history]
                    sample_outcome = self.encode_outcome(sample['outcome'])
                    sample_outcome.extend([0,0,1] * 8)
                    padded_history.append(sample_outcome)

                    if is_good:
                        # data.append((padded_history, sample_outcome, 0))
                        data.append((padded_history, [1, 0]))
                    else:
                        # data.append((padded_history, sample_outcome, 1))
                        data.append((padded_history, [0, 1]))

        return data


    def load_data(self, file_good):
        data_good = self.process_file(file_good, True)
        # data_bad = self.process_file(file_bad, False)
        # data_good.extend(data_bad)

        return data_good

def collate_fn(batch):
    histories, outcomes = zip(*batch)

    # Convert nested lists to tensor
    histories_tensor = torch.tensor(histories, dtype=torch.float32)
    outcomes_tensor = torch.tensor(outcomes, dtype=torch.int64)

    return histories_tensor, outcomes_tensor

dataset = JsonDataset('tic_tac_toe_games.json')

total_size = len(dataset)
train_size = int(0.8 * total_size)
test_size = total_size - train_size

train_dataset, test_dataset = random_split(dataset, [train_size, test_size])


train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

## Train

Note that during training we want to neural network to try to recreate its inputs. To do this we compare the L1Loss or Mean Absolute Error between the prediction and the input.

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = TicTacToeNet().to(device)
criterion = nn.L1Loss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# You may want to increase this
MAX_EPOCH = 1

for epoch in range(MAX_EPOCH):
    model.train()

    for history, valid in train_loader:

        history = history.to(device)
        optimizer.zero_grad()
        history = history.view(history.size(0), -1)
        prediction, _ = model(history)
        loss = criterion(prediction, history)
        loss.backward()
        optimizer.step()

        # Gather data and report

    # Validation phase
    model.eval()


    with torch.no_grad():
        total_count = 0
        total_loss = 0
        for history, valid in test_loader:
            history = history.to(device)
            history = history.view(history.size(0), -1)

            valid = valid.to(device)
            prediction, _ = model(history)
            loss = criterion(prediction, history)

            total_loss += loss

            total_count += 1

            # get values for the accuracy


    avg_loss = total_loss / total_count
    print(f"Epoch {epoch + 1}/{MAX_EPOCH} - Loss: {avg_loss:.2f}")
    print(history[-1])
    print(prediction[-1])


# Get the ranges for normal data

We plot the range of possible losses for normal data.



In [None]:
# set model to get loss out
model.eval()
model.return_x = False


loss_list = []

for history, valid in test_loader:
    history = history.to(device)
    history = history.view(history.size(0), -1)
    loss = model(history)
    loss_list.append(loss.cpu().detach().numpy())



In [None]:
# visualize
import matplotlib.pyplot as plt

plt.hist(loss_list, bins=50)
plt.xlabel("Normal loss")
plt.ylabel("No of examples")
plt.show()

# Get the ranges for bad data

Likewise, we plot the ranges of losses for bad data.

In [None]:
# load bad data
bad_dataset = JsonDataset('tic_tac_toe_games_bad.json')

bad_total_size = len(bad_dataset)
bad_train_size = int(0.8 * bad_total_size)
bad_test_size = bad_total_size - bad_train_size

bad_train_dataset, bad_test_dataset = random_split(bad_dataset, [bad_train_size, bad_test_size])

bad_train_loader = DataLoader(bad_train_dataset, batch_size=32, shuffle=True)
bad_test_loader = DataLoader(bad_test_dataset, batch_size=32, shuffle=False)

In [None]:
bad_loss_list = []

for history, valid in bad_test_loader:
    history = history.to(device)
    history = history.view(history.size(0), -1)
    loss = model(history)
    bad_loss_list.append(loss.cpu().detach().numpy())

In [None]:
import matplotlib.pyplot as plt


plt.hist(bad_loss_list, bins=50)
plt.xlabel("Normal loss")
plt.ylabel("No of examples")
plt.show()

**Note:** By visual inspection we can see that if the loss is greater than `~0.200` the data is likely anomalous (this value will probably change depending on the runs). There seems to be no overlap in loss between good gameplay and bad gameplay.

We could alternatively set the threshold to 4 standard deviation away from the mean of normal values. Or you can vary this according to the error tolerance required.

In [None]:
import numpy as np
threshold = np.mean(loss_list)+ 4 * np.std(loss_list)
print(threshold)

## Export Onnx and Data

In [None]:
# Set the model to evaluation mode
model = model.cpu()
model.eval()

# Obtain a sample datapoint from the DataLoader
sample_data_iter = iter(train_loader)
sample_history, _ = next(sample_data_iter)

# Move the sample datapoint to the cpu so it is the same as the model
x = sample_history.to('cpu')
x = x.view(x.size(0), -1)

# Export the model using ONNX
torch.onnx.export(
    model,                        # model being run
    x,                            # model input (or a tuple for multiple inputs)
    "tictactoe_network.onnx",     # where to save the model (can be a file or file-like object)
    export_params=True,           # store the trained parameter weights inside the model file
    opset_version=10,             # the ONNX version to export the model to
    do_constant_folding=True,     # whether to execute constant folding for optimization
    input_names=['input'],        # the model's input names
    output_names=['output'],      # the model's output names
    dynamic_axes={
        'input': {0: 'batch_size'},     # variable length axes
        'output': {0: 'batch_size'}
    }
)

data_array = ((x[0]).detach().numpy()).reshape([-1]).tolist()

data = dict(input_data = [data_array])

# Serialize data into file:
json.dump(data, open("data.json", 'w'))


# Prove the Tic Tac Toe Anomaly Detection Model

We import the ezkl library to setup the zk system.

In [None]:
import os
import ezkl

data_path = os.path.join("data.json")
model_path = os.path.join('tictactoe_network.onnx')
compiled_model_path = os.path.join('network.ezkl')
pk_path = os.path.join('test.pk')
vk_path = os.path.join('test.vk')
settings_path = os.path.join('settings.json')

witness_path = os.path.join('witness.json')
proof_path = os.path.join('proof.json')

In [None]:
res = ezkl.gen_settings(model_path, settings_path)

In [None]:
# IMPORTANT NOTE: You may want to set the scale ranges and set to "accuracy"
# In it's current state, it will likely truncate values
# For testing we will just stick to resources to reduce computational costs
# Example:
# ezkl.calibrate_settings(data_path, model_path, settings_path, "accuracy", scales = [2,9])
cal_path = os.path.join("calibration.json")

data_array = ((x[0:20]).detach().numpy()).reshape([-1]).tolist()

data = dict(input_data = [data_array])

# Serialize data into file:
json.dump(data, open(cal_path, 'w'))


ezkl.calibrate_settings(cal_path, model_path, settings_path, "resources", scales = [11])

In [None]:
await ezkl.get_srs( settings_path)

In [None]:
ezkl.compile_circuit(model_path, compiled_model_path, settings_path)

In [None]:
ezkl.setup(
    compiled_model_path,
    vk_path,
    pk_path,
)

In [None]:
import json

with open(data_path, "r") as f:
    data = json.load(f)
    print(len(data['input_data'][0]))

ezkl.gen_witness(data_path, compiled_model_path, witness_path)

In [None]:
ezkl.mock(witness_path, compiled_model_path)

In [None]:
res = ezkl.prove(
        witness_path,
        compiled_model_path,
        pk_path,
        proof_path,
        
            )

print(res)
assert os.path.isfile(proof_path)

In [None]:
res = ezkl.verify(
        proof_path,
        settings_path,
        vk_path,
        
    )

assert res == True
print("verified")