## Tracking local learning coefficient with a tic-tac-toe bot.

This is a project for the SERIMATS 2024 Winter application - developmental interpretability stream.

Sources and references:
- Tic tac toe bot based off this source: 
- Also referred to https://plainenglish.io/blog/building-a-tic-tac-toe-game-with-reinforcement-learning-in-python
- paper on LLC
- this post https://www.alignmentforum.org/posts/6g8cAftfQufLmFDYT/you-re-measuring-model-complexity-wrong

Toatl hours spent:

In [1]:
#%pip install devinterp matplotlib seaborn torchvision

In [2]:
import tensorflow as tf
import numpy as np
import random

KeyboardInterrupt: 

Tic tac toe game:

def totuple(a):
    try:
        return tuple(totuple(i) for i in a)
    except TypeError:
        return a

class TicTacToe:
    def __init__(self):
        self.board = np.zeros((3, 3))
        self.players = ["X", "O"]
        self.current_player = None
        self.winner = None
        self.game_over = False

    def reset(self):
        self.board = np.zeros((3, 3))
        self.current_player = None
        self.winner = None
        self.game_over = False

    def available_moves(self):
        moves = []
        for i in range(3):
            for j in range(3):
                if self.board[i][j] == 0:
                    moves.append((i, j))
        return totuple(moves) #note changed

    def make_move(self, move):
        if self.board[move[0]][move[1]] != 0:
            return False
        self.board[move[0]][move[1]] = self.players.index(self.current_player) + 1
        self.check_winner()
        self.switch_player()
        return True

    def switch_player(self):
        if self.current_player == self.players[0]:
            self.current_player = self.players[1]
        else:
            self.current_player = self.players[0]

    def check_winner(self):
        # Check rows
        for i in range(3):
            if self.board[i][0] == self.board[i][1] == self.board[i][2] != 0:
                self.winner = self.players[int(self.board[i][0] - 1)]
                self.game_over = True
        # Check columns
        for j in range(3):
            if self.board[0][j] == self.board[1][j] == self.board[2][j] != 0:
                self.winner = self.players[int(self.board[0][j] - 1)]
                self.game_over = True
        # Check diagonals
        if self.board[0][0] == self.board[1][1] == self.board[2][2] != 0:
            self.winner = self.players[int(self.board[0][0] - 1)]
            self.game_over = True
        if self.board[0][2] == self.board[1][1] == self.board[2][0] != 0:
            self.winner = self.players[int(self.board[0][2] - 1)]
            self.game_over = True

    def print_board(self):
        print("-------------")
        for i in range(3):
            print("|", end=" ")
            for j in range(3):
                print(self.players[int(self.board[i][j] - 1)] if self.board[i][j] != 0 else " ", end=" | ")
            print()
            print("-------------")

Check that tictactoe game works

game = TicTacToe()
game.current_player = game.players[0]
game.print_board()

while not game.game_over:
    move = input(f"{game.current_player}'s turn. Enter row and column (e.g. 0 0): ")
    move = tuple(map(int, move.split()))
    while move not in game.available_moves():
        move = input("Invalid move. Try again: ")
        move = tuple(map(int, move.split()))
    game.make_move(move)
    game.print_board()

if game.winner:
    print(f"{game.winner} wins!")
else:
    print("It's a tie!")

Implement RL agent:

import random

class QLearningAgent:
    def __init__(self, alpha, epsilon, discount_factor):
        self.Q = {}
        self.alpha = alpha
        self.epsilon = epsilon
        self.discount_factor = discount_factor

    def get_Q_value(self, state, action):
        # resolving issues with unhashable types:
        #print("state type is:", type(state))
        #print("action type is:", type(action))
        #res_state = list(map(type, state)) 
        #print("The data types of state in order are : " + str(res_state)) 
        #res_action = list(map(type, action)) 
        #print("The data types of action in order are : " + str(res_action)) 
        t_state = totuple(state)
        if (t_state, action) not in self.Q: #check if this works??
            self.Q[(t_state, action)] = 0.0
        return self.Q[(t_state, action)] 

    def choose_action(self, state, available_moves):
        state = totuple(state)
        if random.uniform(0, 1) < self.epsilon:
            return random.choice(available_moves)
        else:
            Q_values = [self.get_Q_value(state, action) for action in available_moves]
            max_Q = max(Q_values)
            if Q_values.count(max_Q) > 1:
                best_moves = [i for i in range(len(available_moves)) if Q_values[i] == max_Q]
                i = random.choice(best_moves)
            else:
                i = Q_values.index(max_Q)
            return available_moves[i]

    def update_Q_value(self, state, action, reward, next_state):
        next_Q_values = [self.get_Q_value(next_state, next_action) for next_action in TicTacToe(next_state).available_moves()]
        max_next_Q = max(next_Q_values) if next_Q_values else 0.0
        self.Q[(state, action)] += self.alpha * (reward + self.discount_factor * max_next_Q - self.Q[(state, action)])

Train and test methods:

def train(num_episodes, alpha, epsilon, discount_factor):
    agent = QLearningAgent(alpha, epsilon, discount_factor)
    for i in range(num_episodes):
        game = TicTacToe()
        state = game.board
        while not game.game_over:
            available_moves = game.available_moves()
            action = agent.choose_action(state, available_moves)
            next_state, reward = game.make_move(action)
            agent.update_Q_value(state, action, reward, next_state)
            state = next_state
    return agent

def test(agent, num_games):
    num_wins = 0
    for i in range(num_games):
        state = TicTacToe().board
        while not TicTacToe(state).game_over():
            if TicTacToe(state).player == 1:
                action = agent.choose_action(state, TicTacToe(state).available_moves())
            else:
                action = random.choice(TicTacToe(state).available_moves())
            state, reward = TicTacToe(state).make_move(action)
        if reward == 1:
            num_wins += 1
    return num_wins / num_games * 100

Training and testing the bot:

# Train the Q-learning agent
agent = train(num_episodes=10, alpha=0.5, epsilon=0.1, discount_factor=1.0)



# Test the Q-learning agent
win_percentage = test(agent, num_games=10)#10000
print("Win percentage: {:.2f}%".format(win_percentage))

Bot attempt #2

Based on tutorial from: https://towardsdatascience.com/reinforcement-learning-implement-tictactoe-189582bea542

Bot above has issues with setup, may be able to fix but trying another due to time constraints

In [None]:
import numpy as np
import pickle

In [None]:
BOARD_ROWS = 3
BOARD_COLS = 3

In [None]:
class State:
    def __init__(self, p1, p2):
        self.board = np.zeros((BOARD_ROWS, BOARD_COLS))
        self.p1 = p1
        self.p2 = p2
        self.isEnd = False
        self.boardHash = None
        # init p1 plays first
        self.playerSymbol = 1
    
    # get unique hash of current board state
    def getHash(self):
        self.boardHash = str(self.board.reshape(BOARD_COLS*BOARD_ROWS))
        return self.boardHash
    
    def winner(self):
        # row
        for i in range(BOARD_ROWS):
            if sum(self.board[i, :]) == 3:
                self.isEnd = True
                return 1
            if sum(self.board[i, :]) == -3:
                self.isEnd = True
                return -1
        # col
        for i in range(BOARD_COLS):
            if sum(self.board[:, i]) == 3:
                self.isEnd = True
                return 1
            if sum(self.board[:, i]) == -3:
                self.isEnd = True
                return -1
        # diagonal
        diag_sum1 = sum([self.board[i, i] for i in range(BOARD_COLS)])
        diag_sum2 = sum([self.board[i, BOARD_COLS-i-1] for i in range(BOARD_COLS)])
        diag_sum = max(diag_sum1, diag_sum2)
        if diag_sum == 3:
            self.isEnd = True
            return 1
        if diag_sum == -3:
            self.isEnd = True
            return -1
        
        # tie
        # no available positions
        if len(self.availablePositions()) == 0:
            self.isEnd = True
            return 0
        # not end
        self.isEnd = False
        return None
    
    def availablePositions(self):
        positions = []
        for i in range(BOARD_ROWS):
            for j in range(BOARD_COLS):
                if self.board[i, j] == 0:
                    positions.append((i, j))  # need to be tuple
        return positions
    
    def updateState(self, position):
        self.board[position] = self.playerSymbol
        # switch to another player
        self.playerSymbol = -1 if self.playerSymbol == 1 else 1
    
    # only when game ends
    def giveReward(self):
        result = self.winner()
        # backpropagate reward
        if result == 1:
            self.p1.feedReward(1)
            self.p2.feedReward(0)
        elif result == -1:
            self.p1.feedReward(0)
            self.p2.feedReward(1)
        else:
            self.p1.feedReward(0.1)
            self.p2.feedReward(0.5)
    
    # board reset
    def reset(self):
        self.board = np.zeros((BOARD_ROWS, BOARD_COLS))
        self.boardHash = None
        self.isEnd = False
        self.playerSymbol = 1
    
    def play(self, rounds=100):
        for i in range(rounds):
            if i%1000 == 0:
                print("Rounds {}".format(i))
            while not self.isEnd:
                # Player 1
                positions = self.availablePositions()
                p1_action = self.p1.chooseAction(positions, self.board, self.playerSymbol)
                # take action and upate board state
                self.updateState(p1_action)
                board_hash = self.getHash()
                self.p1.addState(board_hash)
                # check board status if it is end

                win = self.winner()
                if win is not None:
                    # self.showBoard()
                    # ended with p1 either win or draw
                    self.giveReward()
                    self.p1.reset()
                    self.p2.reset()
                    self.reset()
                    break

                else:
                    # Player 2
                    positions = self.availablePositions()
                    p2_action = self.p2.chooseAction(positions, self.board, self.playerSymbol)
                    self.updateState(p2_action)
                    board_hash = self.getHash()
                    self.p2.addState(board_hash)
                    
                    win = self.winner()
                    if win is not None:
                        # self.showBoard()
                        # ended with p2 either win or draw
                        self.giveReward()
                        self.p1.reset()
                        self.p2.reset()
                        self.reset()
                        break
    
    # play with human
    def play2(self):
        while not self.isEnd:
            # Player 1
            positions = self.availablePositions()
            p1_action = self.p1.chooseAction(positions, self.board, self.playerSymbol)
            # take action and upate board state
            self.updateState(p1_action)
            self.showBoard()
            # check board status if it is end
            win = self.winner()
            if win is not None:
                if win == 1:
                    print(self.p1.name, "wins!")
                else:
                    print("tie!")
                self.reset()
                break

            else:
                # Player 2
                positions = self.availablePositions()
                p2_action = self.p2.chooseAction(positions)

                self.updateState(p2_action)
                self.showBoard()
                win = self.winner()
                if win is not None:
                    if win == -1:
                        print(self.p2.name, "wins!")
                    else:
                        print("tie!")
                    self.reset()
                    break

    def showBoard(self):
        # p1: x  p2: o
        for i in range(0, BOARD_ROWS):
            print('-------------')
            out = '| '
            for j in range(0, BOARD_COLS):
                if self.board[i, j] == 1:
                    token = 'x'
                if self.board[i, j] == -1:
                    token = 'o'
                if self.board[i, j] == 0:
                    token = ' '
                out += token + ' | '
            print(out)
        print('-------------')    

In [None]:
class Player:
    def __init__(self, name, exp_rate=0.3):
        self.name = name
        self.states = []  # record all positions taken
        self.lr = 0.2
        self.exp_rate = exp_rate
        self.decay_gamma = 0.9
        self.states_value = {}  # state -> value
    
    def getHash(self, board):
        boardHash = str(board.reshape(BOARD_COLS*BOARD_ROWS))
        return boardHash
    
    def chooseAction(self, positions, current_board, symbol):
        if np.random.uniform(0, 1) <= self.exp_rate:
            # take random action
            idx = np.random.choice(len(positions))
            action = positions[idx]
        else:
            value_max = -999
            for p in positions:
                next_board = current_board.copy()
                next_board[p] = symbol
                next_boardHash = self.getHash(next_board)
                value = 0 if self.states_value.get(next_boardHash) is None else self.states_value.get(next_boardHash)
                # print("value", value)
                if value >= value_max:
                    value_max = value
                    action = p
        # print("{} takes action {}".format(self.name, action))
        return action
    
    # append a hash state
    def addState(self, state):
        self.states.append(state)
    
    # at the end of game, backpropagate and update states value
    def feedReward(self, reward):
        for st in reversed(self.states):
            if self.states_value.get(st) is None:
                self.states_value[st] = 0
            self.states_value[st] += self.lr*(self.decay_gamma*reward - self.states_value[st])
            reward = self.states_value[st]
            
    def reset(self):
        self.states = []
        
    def savePolicy(self):
        fw = open('policy_' + str(self.name), 'wb')
        pickle.dump(self.states_value, fw)
        fw.close()

    def loadPolicy(self, file):
        fr = open(file,'rb')
        self.states_value = pickle.load(fr)
        fr.close()

In [None]:
class HumanPlayer:
    def __init__(self, name):
        self.name = name 
    
    def chooseAction(self, positions):
        while True:
            row = int(input("Input your action row:"))
            col = int(input("Input your action col:"))
            action = (row, col)
            if action in positions:
                return action
    
    # append a hash state
    def addState(self, state):
        pass
    
    # at the end of game, backpropagate and update states value
    def feedReward(self, reward):
        pass
            
    def reset(self):
        pass

In [None]:
p1 = Player("p1")
p2 = Player("p2")

st = State(p1, p2)
print("training...")
st.play(200)

In [None]:
p1.savePolicy()
p2.savePolicy()

In [None]:
p1.loadPolicy("policy_p1")

Human vs Computer

In [None]:
p1 = Player("computer", exp_rate=0)
p1.loadPolicy("policy_p1")

p2 = HumanPlayer("human")

st = State(p1, p2)
st.play2()

In [None]:
#%pip install devinterp matplotlib transformers torchvision 

### Notes on local learning coefficient (LLC) for RL: 

The estimator for LLC uses the number of samples in its calculation. For supervised learning this is the size of training data; for RL I am assuming it would be the number of training episodes (i.e. rounds of tic tac toe for this bot). I'm not sure about this assumption - more notes on it to follow. Below I try computing LLC based on this assumption.




In [None]:
from typing import Callable, Dict, List, Literal, Optional, Type, Union

import numpy as np
import pandas as pd
import torch
from torch.utils.data import DataLoader

from devinterp.optim.sgld import SGLD
from devinterp.slt.sampler import sample

In [None]:
# changing to take num. of rounds as param

def estimate_learning_coeff_RL(
    model: torch.nn.Module,
    loader: DataLoader,
    criterion: Callable,
    num_rounds: int = 0, #added this
    sampling_method: Type[torch.optim.Optimizer] = SGLD,
    optimizer_kwargs: Optional[Dict[str, Union[float, Literal["adaptive"]]]] = None,
    num_draws: int = 100,
    num_chains: int = 10,
    num_burnin_steps: int = 0,
    num_steps_bw_draws: int = 1,
    cores: int = 1,
    seed: Optional[Union[int, List[int]]] = None,
    pbar: bool = True,
    device: torch.device = torch.device("cpu"),
    verbose: bool = True,
) -> float:
    trace = sample(
        model=model,
        loader=loader,
        criterion=criterion,
        sampling_method=sampling_method,
        optimizer_kwargs=optimizer_kwargs,
        num_draws=num_draws,
        num_chains=num_chains,
        num_burnin_steps=num_burnin_steps,
        num_steps_bw_draws=num_steps_bw_draws,
        cores=cores,
        seed=seed,
        pbar=pbar,
        device=device,
        verbose=verbose,
    )
    baseline_loss = trace.loc[trace["chain"] == 0, "loss"].iloc[0]
    avg_loss = trace.groupby("chain")["loss"].mean().mean()
    num_samples = num_rounds #len(loader.dataset)

    return (avg_loss - baseline_loss) * num_samples / np.log(num_samples)


In [None]:
from devinterp.slt import estimate_learning_coeff
import torch.nn as nn

model = p1
num_rounds = 10
criterion = nn.CrossEntropyLoss() #look into this, may change

test_llc = estimate_learning_coeff(
        model,
        num_rounds,
        #train_loader,
        criterion=criterion,
        )

print(test_llc)

#AttributeError: 'Player' object has no attribute 'to'
# Facing issues with bot and given methods

Bot 3

Source: https://github.com/nestedsoftware/tictac/blob/master/tictac/qneural.py

In [1]:
import torch
from torch.nn import MSELoss
import numpy as np
#import board
#import qneural

from board import play_random_move, play_games, Board
from qneural import (TicTacNet, NetContext, create_qneural_player,
                            get_q_values, play_training_games_x,
                            play_training_games_o)

policy_net = TicTacNet()
target_net = TicTacNet()

sgd = torch.optim.SGD(policy_net.parameters(), lr=0.1)
loss = MSELoss()
net_context = NetContext(policy_net, target_net, sgd, loss)

with torch.no_grad():
    board = Board(np.array([1, -1, -1, 0, 1, 1, 0, 0, -1]))
    q_values = get_q_values(board, net_context.target_net)
    print(f"Before training q_values = {q_values}")

print("Training qlearning X vs. random...")
play_training_games_x(net_context=net_context,
                      o_strategies=[play_random_move])

Before training q_values = tensor([0.4891, 0.4739, 0.4953, 0.4377, 0.4838, 0.4530, 0.5147, 0.5072, 0.4520])
Training qlearning X vs. random...
200/2000 games, using epsilon=0.6...
400/2000 games, using epsilon=0.5...
600/2000 games, using epsilon=0.4...
800/2000 games, using epsilon=0.30000000000000004...
1000/2000 games, using epsilon=0.20000000000000004...
1200/2000 games, using epsilon=0.10000000000000003...
1400/2000 games, using epsilon=2.7755575615628914e-17...
1600/2000 games, using epsilon=0...
1800/2000 games, using epsilon=0...
2000/2000 games, using epsilon=0...


Need to figure out train_loader var: https://stackoverflow.com/questions/57258323/how-can-i-use-a-pytorch-dataloader-for-reinforcement-learning

Looks like https://slm-lab.gitbook.io/slm-lab/analyzing-results/analytics may have some method to store data from RL?

Possibly-relevant discussion: https://github.com/Lightning-AI/lightning/issues/713

There may be some hacks but not sure if they would make sense w.r.t calculating LLC. Would need to explore this further.

In [4]:
from devinterp.slt import estimate_learning_coeff
import torch.nn as nn

model = policy_net
num_rounds = [0]*10 #change 10 
criterion = nn.CrossEntropyLoss() #look into this, may change

test_llc = estimate_learning_coeff(
        model,
        num_rounds,
        #train_loader,
        criterion=criterion,
        )

print(test_llc)


Chain 0:   0%|                                                                                 | 0/100 [00:00<?, ?it/s]


TypeError: cannot unpack non-iterable int object