<h1>Install Dependcies</h1>

In [18]:
!pip install torch --user
!pip install gymnasium --user
!pip install stable-baselines3[extra] --user

Collecting torch
  Obtaining dependency information for torch from https://files.pythonhosted.org/packages/74/07/edce54779f5c3fe8ab8390eafad3d7c8190fce68f922a254ea77f4a94a99/torch-2.1.0-cp311-cp311-win_amd64.whl.metadata
  Using cached torch-2.1.0-cp311-cp311-win_amd64.whl.metadata (25 kB)
Collecting filelock (from torch)
  Obtaining dependency information for filelock from https://files.pythonhosted.org/packages/5e/5d/97afbafd9d584ff1b45fcb354a479a3609bd97f912f8f1f6c563cb1fae21/filelock-3.12.4-py3-none-any.whl.metadata
  Using cached filelock-3.12.4-py3-none-any.whl.metadata (2.8 kB)
Collecting networkx (from torch)
  Using cached networkx-3.1-py3-none-any.whl (2.1 MB)
Collecting fsspec (from torch)
  Obtaining dependency information for fsspec from https://files.pythonhosted.org/packages/fe/d3/e1aa96437d944fbb9cc95d0316e25583886e9cd9e6adc07baad943524eda/fsspec-2023.9.2-py3-none-any.whl.metadata
  Using cached fsspec-2023.9.2-py3-none-any.whl.metadata (6.7 kB)
Using cached torch-2.1.0



Collecting stable-baselines3[extra]
  Obtaining dependency information for stable-baselines3[extra] from https://files.pythonhosted.org/packages/5e/81/7a0fbfc45240ec36cc3fcfe8f135996ef03277e2305d941a6d9186eb14e8/stable_baselines3-2.1.0-py3-none-any.whl.metadata
  Using cached stable_baselines3-2.1.0-py3-none-any.whl.metadata (5.2 kB)
Collecting matplotlib (from stable-baselines3[extra])
  Obtaining dependency information for matplotlib from https://files.pythonhosted.org/packages/40/d9/c1784db9db0d484c8e5deeafbaac0d6ed66e165c6eb4a74fb43a5fa947d9/matplotlib-3.8.0-cp311-cp311-win_amd64.whl.metadata
  Using cached matplotlib-3.8.0-cp311-cp311-win_amd64.whl.metadata (5.9 kB)
Collecting opencv-python (from stable-baselines3[extra])
  Obtaining dependency information for opencv-python from https://files.pythonhosted.org/packages/38/d2/3e8c13ffc37ca5ebc6f382b242b44acb43eb489042e1728407ac3904e72f/opencv_python-4.8.1.78-cp37-abi3-win_amd64.whl.metadata
  Using cached opencv_python-4.8.1.78-cp37



<h1>Imports</h1>

In [63]:
import gymnasium as gym
from gym import Env
from gym.spaces import Discrete,MultiDiscrete, Box
import numpy as np

<h1>Create Game Classes</h1>

In [7]:
# %load classes.py
import functools
import random
class Domino():
    def __init__(self, s1:int,s2:int):
        self.sides = (s1,s2)
        self.isDouble = (s1 == s2)
    def evalute_side(self, side: int):
        if(self.sides[0] == side): return 0
        elif (self.sides[1] == side): return 1
    def calc_points(self):
        return self.sides[0] + self.sides[1]
    def __str__(self):
        return str(self.sides)


class BoneYard():
    def __init__(self):
        self.dominos = []
        self.build()
        self.shuffle()
    def build(self):
        for i in range(0,13):
            for j in range(i,13):
                self.dominos.append(Domino(i,j))
    def shuffle(self):
        random.shuffle(self.dominos)
    def draw(self):
        if len(self.dominos) == 0: return False
        return self.dominos.pop()


class Train():
    startingSide = 12
    def __init__(self, id):
        self.openSides = [Train.startingSide]
        self.trainUp = False
        self.id = id
    def add(self,placement,domino):
        placeIndex = self.openSides.index(placement)
        trainSide = self.openSides[placeIndex]
        if(trainSide in domino.sides):
            if(domino.isDouble):
                self.openSides.pop(placeIndex)
                self.openSides.append(domino.sides[0])
                self.openSides.append(domino.sides[1])
            else:
                self.openSides.pop(placeIndex)
                self.openSides.append(domino.sides[1-domino.evalute_side(trainSide)])
        else:
            return False
    def __str__(self):
        return f"id: {self.id} trainUp?:{self.trainUp} openSides: {self.openSides}"
        

class Player():
    handSize = 12
    nextID = 0 
    def __init__(self,boneYard:BoneYard):
        self.id = Player.nextID
        Player.nextID +=1
        self.hand = []
        for _ in range(Player.handSize): self.hand.append(boneYard.draw()) 
    def highestDouble(self):
        highest = -1
        for domino in self.hand:
            if(domino.isDouble and domino.sides[0] > highest): highest = domino.sides[0]
        return highest
    def intializeTrain(self):
        if not hasattr(self,"train"):
            self.train = Train(self.id)
        return self.train
    def getDominoFromSides(self,s1:int,s2:int):
        for domino in self.hand:
            if (domino.sides == (s1,s2) or domino.sides == (s2,s1)):
                return domino
    def play(self, domino:Domino, placement:int, train:Train|None=None,firstDouble:bool=False):
        selfTrain = False
        played = False
        if train is None: 
            train = self.train
            selfTrain = True
        if firstDouble: 
            self.hand.remove(domino)
            played = True
        elif train.add(placement, domino) != False:
            self.hand.remove(domino)
            if selfTrain: self.train.trainUp = False
            played = True
        if len(self.hand) <= 0: return None
        else: return played
        

    def pointsInHand(self):
        return functools.reduce(lambda acc, domino: acc + domino.calc_points(), self.hand, 0)
    def pickup(self,boneYard:BoneYard):
        domino = boneYard.draw()
        if domino is not False:
            self.hand.append(domino)
        return domino
    def __str__(self):
        return f"id:{self.id} train:{self.train.id}"
    



class Game():
    def __init__(self,numPlayers:int):
        self.boneyard = BoneYard()
        self.players = []
        self.trains = []
        self.done = False
        self.numPlayers = numPlayers
        self.unsastifiedDouble = None
        if(numPlayers<= 4): Player.handSize = 15
        elif(numPlayers<=6): Player.handSize = 12
        elif(numPlayers<=8): Player.handSize = 10
        Player.nextID = 0
        for _ in range(numPlayers): 
            self.players.append(Player(self.boneyard))
        doubles = [player.highestDouble() for player in self.players]
        highestDouble = max(doubles)
        firstPlayer = doubles.index(highestDouble)
        firstDomino = self.players[firstPlayer].getDominoFromSides(highestDouble,highestDouble)
        for player in self.players: 
            player.intializeTrain()
            self.trains.append(player.train)
        if (len(self.players)<8): self.mexican = Train(8)
        self.centerDouble = max(doubles)
        Train.startingSide = self.centerDouble
        self.players[firstPlayer].play(firstDomino,0,firstDouble=True) #removing first double
        self.startingPlayer = self.nextPlayer(firstPlayer)
        self.prevPlayer = None
    def getTrain(self,id:int):
        if id == 8: return self.mexican
        else:
            for t in self.trains:
                if t.id==id:
                    return t
            else:
                return None
    def getPlayer(self,id:int):
        if id > 7: return None
        else: 
            for p in self.players:
                if p.id==id:
                    return p
            else:
                return None
    def nextPlayer(self, currPlayer:int):
        next = currPlayer+1 
        if (next>=self.numPlayers): next = 0 # looping around if its not an actual player
        return next

class BoardState():
    def __init__(self, trains:list[Train],centerDouble:int, mexican:Train|None = None,unsastifiedDouble=None):
        self.mexican = mexican
        self.trains = trains
        self.unsastifiedDouble = unsastifiedDouble
    #train up returns only sides that are on trains with thier trains up
    #maybe this signature should be changed to just take a list of trains? and let caller deal with filtering?
    def getPlacements(self, trainUp: bool=False,include:list[Train]=[], exclude:list[Train]=[]):
        trains = [*self.trains,self.mexican]
        placements = []
        if trainUp:
            for train in trains:
                if (train in include or train.trainUp) and train not in exclude:
                    for side in train.openSides: 
                        placements.append((train.id,side))
        else:
            for train in trains: 
                if train not in exclude:
                    for side in train.openSides: placements.append((train.id,side))
        return placements
    def getTrain(self, id):
        for train in self.trains:
            if train.id == id: return train
    def availablePlays(self, player:Player,placements:list|None=None):
        plays = []
        places = []
        if self.unsastifiedDouble is not None:
            places = [self.unsastifiedDouble]
        elif placements is not None:
            places = placements
        elif player.train.trainUp:
            places = [(player.id, side) for side in player.train.openSides]
        else: places = self.getPlacements(trainUp=True, include=[player.train])
        for placement in places:
            for domino in player.hand:
                eval = domino.evalute_side(placement[1])
                if( eval is not None): plays.append((domino.sides, placement))
        return plays
    def isValidPlay(self, player:Player, action:list[list]):
        valid = False
        plays = self.availablePlays(player)
        for play in plays:
            tuplist = [tuple(list) for list in action]
            if tuple(tuplist) == play:
                valid = True
                print(f"valid play: {play}")
        return valid
    @staticmethod
    def fromGame(game:Game):
        return BoardState(game.trains,game.centerDouble, game.mexican,game.unsastifiedDouble)


<h2>Test Game Objects</h2>

<h3>Testing intialization</h3>

In [31]:
game = Game(6)
bs = BoardState.fromGame(game)
print([str(player) for player in game.players])
print([str(domino) for domino in game.players[0].hand])
print(Train.startingSide)
plays = bs.availablePlays(game.players[0])
print(plays)


['id:0 train:0', 'id:1 train:1', 'id:2 train:2', 'id:3 train:3', 'id:4 train:4', 'id:5 train:5']
['(1, 3)', '(3, 10)', '(2, 5)', '(0, 1)', '(0, 6)', '(7, 10)', '(4, 4)', '(9, 11)', '(4, 11)', '(8, 8)', '(10, 12)', '(6, 8)']
11
[((9, 11), (0, 11)), ((4, 11), (0, 11))]


<h3>Testing random play</h3>

In [43]:
plays = bs.availablePlays(game.players[0])
if len(plays)> 0:
    play = plays[random.randint(0, len(plays) - 1)] # play = (dominoSideTuple,placementTuple)
    player = game.getPlayer(0)
    domino = player.getDominoFromSides(*play[0])
    train = bs.getTrain(play[1][0]) # play[1] is placement, = (train.id, side)
    print(train)
    player.play(domino,play[1][1],train)
    print(train)
else:
    print("no possible plays")

id: 0 trainUp?:False openSides: [12]
id: 0 trainUp?:False openSides: [4]


<h1>Start Making Env</h1>

In [39]:
# %load DominoEnv.py
from gymnasium import Env
from gym.spaces import Dict, Discrete , MultiDiscrete, Box, Sequence
import numpy as np
class DominoTrainEnv(Env):
    def __init__(self,numPlayers:int):
        # Actions we can take, 13,13 for possible domino sides, [9,13] for possible domino placements
        self.action_space = MultiDiscrete(np.array([[13, 13], [9, 13]]))
        # observation space
        obsv =  {
        "hand": Sequence(MultiDiscrete(np.array([13, 13]), dtype=np.int8)),
        "placements": Sequence(MultiDiscrete(np.array([9, 13]), dtype=np.int8)),
        "available-actions": Sequence(MultiDiscrete(np.array([[13, 13], [9, 13]]), dtype=np.int8)),
        "trains": Sequence(MultiDiscrete(np.array([9, 2]), dtype=np.int8))
        }
        self.observation_space = Dict(obsv)
        #setup game
        self.game = Game(numPlayers)
        self.player = self.game.getPlayer(0)
        bs = BoardState.fromGame(self.game)
        handarray = [domino.sides for domino in self.player.hand]
        placements = bs.getPlacements()
        state = {
            "hand": handarray,
            "placements": placements,
            "available-actions": bs.availablePlays(self.player),
            "trains": [[train.id,train.trainUp]for train in bs.trains]
        }
        self.state = state
        self.fails = 0
        
    def play(self,domino:Domino, placement,player:Player, bs:BoardState):
        print(f"attempting to play {domino} on {placement} from {player}")
        game = self.game
        played = False
        if domino is not None:
            train = game.getTrain(placement[0])
            if player.play(domino,placement[1],train):
                played = True   
                if domino.isDouble:
                    print("double played")
                    players = game.players
                    start_index = players.index(player)
                    #loop through all players at "table" starting with person who played double
                    for i in range(len(players)):
                        index = (start_index + i) % len(players)
                        loop_player = players[index] 
                        newPlacement = (placement[0],domino.sides[0])
                        plays = bs.availablePlays(loop_player, placements=[placement])
                        print(f"checking if {loop_player} can play on {newPlacement}\n, plays {plays}")
                        #if player can't play on double
                        if len(plays)<=0:
                            pickupDomino = loop_player.pickup(self.game.boneyard)
                            #if a domino was actually picked up
                            if pickupDomino:
                                print(f"{loop_player} pickedup {pickupDomino}")
                                plays = bs.availablePlays(loop_player, placements=[newPlacement])
                                if len(plays)<=0: 
                                    print("player can't play pickup")
                                    loop_player.train.trainUp = True
                                    continue
                                #if possible to play pickup
                                else:
                                    print(f"attepmting to play pickup on {newPlacement}, train {train}")
                                    if loop_player.play(pickupDomino,newPlacement[1],train) is None:
                                        game.done = True
                                    break
                                    
                            # no domino was pickedup, meaning boneyard is empty and end of game
                            else:
                                game.done = True
                                break
                        #if player only has one choice to play
                        elif len(plays) == 1:
                            play = plays[0]
                            ranDomino = loop_player.getDominoFromSides(*play[0])
                            print(f"attempting to play {ranDomino} on {newPlacement}, player.train: {player.train}")
                            if loop_player.play(ranDomino,newPlacement[1],train) is None:
                                game.done = True
                            break
                        #if player has many choices to play
                        else:
                            #if ai player
                            if loop_player.id == 0:
                                print(f"letting ai make choice for double play")
                                self.game.unsastifiedDouble = (train.id,domino.sides[0])
                                self.game.prevPlayer = player.id
                                break
                            #if other players, random choice
                            else:
                                print(player.train)
                                play = plays[random.randint(0, len(plays) - 1)]
                                ranDomino = loop_player.getDominoFromSides(*play[0])
                                print(f"attempting to play {ranDomino} on {newPlacement}, player.train: {player.train}\n available plays: {plays}")
                                if loop_player.play(ranDomino,newPlacement[1],train) is None:
                                    game.done = True
                            break
            else:
                   print("Invalid Placement")
        else:
            print("Invlaid Domino")
        return played
    def maskAction(self,availablActions):
        pass
    def step(self, action):
        start_index = self.game.startingPlayer
        players = self.game.players
        length = len(players)
        stateChanged = False
        reward = 0
        bs= BoardState.fromGame(self.game)
        if bs.isValidPlay(self.player,action):
            if bs.unsastifiedDouble is not None:
                domino = action[0]
                domino = self.player.getDominoFromSides(*domino)
                self.play(domino,action[1],self.player,bs)
                reward += domino.calc_points()
                stateChanged = True
                start_index = self.game.nextPlayer(self.player.id)
                length = length - 1
                bs.unsastifiedDouble = None #probably a better way of doing this
                self.game.unsastifiedDouble = None
            if stateChanged or bs.unsastifiedDouble is None:    
                for i in range(length):
                    index = (start_index + i) % length
                    player = players[index]
                    posActions = bs.availablePlays(player)
                    #if current turn is ai
                    if player.id == 0:
                        #if no action available
                        if len(posActions)<=0:
                            self.player.pickup(self.game.boneyard)
                            posActions = bs.availablePlays(self.player)
                            if len(posActions)>0:
                                ranAction = posActions[0]
                                ranDomino = self.player.getDominoFromSides(*ranAction[0])
                                self.play(ranDomino,ranAction[1],self.player,bs)
                                stateChanged = True
                            # Check if action is valid
                        else:
                            # Apply action
                            domino = action[0]
                            domino = self.player.getDominoFromSides(*domino)
                            self.play(domino,action[1],self.player,bs)
                            reward += domino.calc_points()
                            stateChanged = True
                        #invalid action   
                        
                    else:
                        print(f"current turn: {player}, plays: {posActions}")
                        if len(posActions)<=0:
                            player.pickup(self.game.boneyard)
                            posActions = bs.availablePlays(player)
                        if len(posActions)>0:
                            ranIndex = 0
                            if len(posActions)>1: ranIndex = random.randint(0, len(posActions)-1)
                            ranAction = posActions[ranIndex]
                            ranDomino = player.getDominoFromSides(*ranAction[0])
                            self.play(ranDomino,ranAction[1],player,bs)
                            stateChanged = True
        else: 
            reward += -500
            self.fails +=1
        if stateChanged:    
            #assigning state
            handarray = [domino.sides for domino in self.player.hand]
            placements = bs.getPlacements()
            state = {
                "hand": handarray,
                "placements": placements,
                "available-actions": bs.availablePlays(self.player),
                "trains": [[train.id,train.trainUp]for train in bs.trains]
            }
            self.state = state
        
        done = self.game.done
        #hard limit on game length since, the ai can make invalid plays which could loop forever
        if self.fails >=10000: done = True
        if done:
            # add negative reward for points remaining in hand at game end
            reward += -1*self.player.pointsInHand()  
        
        # Set placeholder for info
        info = {}
        
        # Return step information
        return self.state, reward, done, info

    def render(self):
        # Implement viz
        pass
    
    def reset(self):
        self.game = Game(self.game.numPlayers)
        self.player = self.game.getPlayer(0)
        bs = BoardState.fromGame(self.game)
        handarray = [domino.sides for domino in self.player.hand]
        placements = bs.getPlacements()
        state = {
            "hand": handarray,
            "placements": placements,
            "available-actions": bs.availablePlays(self.player),
            "trains": [[train.id,train.trainUp]for train in bs.trains]
        }
        self.state = state
        self.fails = 0
        return self.state
    

In [30]:
env = DominoTrainEnv(6)

In [10]:
episodes = 10
for episode in range(1, episodes+1):
    state = env.reset()
    done = False
    score = 0 
    step = 0
    while not done:
        #env.render()
        #print(step)
        step+=1
        action = env.action_space.sample()
        n_state, reward, done, info = env.step(action)
        score+=reward
    print('Episode:{} Score:{}'.format(episode, score))
    print(step)

Episode:1 Score:-5000164
10000
Episode:2 Score:-5000160
10000
valid play: ((2, 12), (0, 12))
current turn: id:5 train:5, plays: [((1, 12), (5, 12)), ((3, 12), (5, 12)), ((7, 12), (5, 12))]
attempting to play (3, 12) on (5, 12) from id:5 train:5
attempting to play (2, 12) on [ 0 12] from id:0 train:0
current turn: id:1 train:1, plays: [((9, 12), (1, 12))]
attempting to play (9, 12) on (1, 12) from id:1 train:1
current turn: id:2 train:2, plays: [((8, 12), (2, 12)), ((6, 12), (2, 12))]
attempting to play (6, 12) on (2, 12) from id:2 train:2
current turn: id:3 train:3, plays: [((4, 12), (3, 12)), ((5, 12), (3, 12))]
attempting to play (5, 12) on (3, 12) from id:3 train:3
current turn: id:4 train:4, plays: []
Episode:3 Score:-5000100
10001
Episode:4 Score:-5000152
10000
valid play: ((9, 12), (0, 12))
current turn: id:3 train:3, plays: [((4, 12), (3, 12)), ((2, 12), (3, 12)), ((10, 12), (3, 12))]
attempting to play (4, 12) on (3, 12) from id:3 train:3
current turn: id:4 train:4, plays: []
c

<h1>Make RL Model</h1>

In [40]:
import numpy as np
from stable_baselines3 import A2C
from stable_baselines3 import PPO
from gym.wrappers import FlattenObservation

In [41]:
from stable_baselines3.common.base_class import BaseAlgorithm


def evaluate(
    model: BaseAlgorithm,
    num_episodes: int = 100,
    deterministic: bool = True,
) -> float:
    """
    Evaluate an RL agent for `num_episodes`.

    :param model: the RL Agent
    :param env: the gym Environment
    :param num_episodes: number of episodes to evaluate it
    :param deterministic: Whether to use deterministic or stochastic actions
    :return: Mean reward for the last `num_episodes`
    """
    # This function will only work for a single environment
    vec_env = model.get_env()
    obs = vec_env.reset()
    all_episode_rewards = []
    for _ in range(num_episodes):
        episode_rewards = []
        done = False
        # Note: SB3 VecEnv resets automatically:
        # https://stable-baselines3.readthedocs.io/en/master/guide/vec_envs.html#vecenv-api-vs-gym-api
        # obs = vec_env.reset()
        while not done:
            # _states are only useful when using LSTM policies
            # `deterministic` is to use deterministic actions
            action, _states = model.predict(obs, deterministic=deterministic)
            # here, action, rewards and dones are arrays
            # because we are using vectorized env
            obs, reward, done, _info = vec_env.step(action)
            episode_rewards.append(reward)

        all_episode_rewards.append(sum(episode_rewards))

    mean_episode_reward = np.mean(all_episode_rewards)
    print(f"Mean reward: {mean_episode_reward:.2f} - Num episodes: {num_episodes}")

    return mean_episode_reward

In [44]:
env = DominoTrainEnv(6)
print(env.observation_space)
env = FlattenObservation(env)
print(env.observation_space)
# Initialize the PPO agent
model = A2C('MlpPolicy', env, verbose=1)

# Train the agent
model.learn(total_timesteps=10000)

# Save the trained model
model.save("A2C_DominoTrain")

# Load the saved model
model = A2C.load("A2C_DominoTrain")

# Evaluate the agent
mean_reward, _ = model.evaluate(env, n_eval_episodes=10)

Dict('available-actions': Sequence(MultiDiscrete([[13 13]
 [ 9 13]])), 'hand': Sequence(MultiDiscrete([13 13])), 'placements': Sequence(MultiDiscrete([ 9 13])), 'trains': Sequence(MultiDiscrete([9 2])))
Dict('available-actions': Sequence(Box(0, 1, (48,), int8)), 'hand': Sequence(Box(0, 1, (26,), int8)), 'placements': Sequence(Box(0, 1, (22,), int8)), 'trains': Sequence(Box(0, 1, (11,), int8)))
Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


TypeError: 'NoneType' object is not iterable