## Video Poker using rllib and PPO

In [1]:
!pip install ray[rllib]



In [2]:
!pip install ipywidgets gputil



In [3]:
import random
import gymnasium as gym
from gymnasium import spaces, vector
import numpy as np
from ray import tune
from collections import Counter
from scipy.stats import binom, nbinom, beta, poisson, gamma, norm, geom
from ray.rllib.env import MultiAgentEnv

## 1) Write up the environment and test that it is working.



In [4]:
# Define global variables
deck = ['2', '3', '4', '5', '6', '7', '8', '9', '10', 'Jack', 'Queen', 'King', 'Ace'] * 4

def initialize_deck():
    suits = ['Clubs', 'Diamonds', 'Hearts', 'Spades']
    ranks = ['Ace','2', '3', '4', '5', '6', '7', '8', '9', '10', 'Jack', 'Queen', 'King']
    deck = [(rank, suit) for suit in suits for rank in ranks]
    random.shuffle(deck)
    return deck

def partition_selected_unselected(data_list, indicator_list, max_select=None):
    if not max_select:
      max_select = len(data_list)
    unselected_list = []
    selected_list = []

    # Iterate over both lists simultaneously
    for data, indicator in zip(data_list, indicator_list):
        if indicator == 1 and len(selected_list) < max_select:
            selected_list.append(data)
        else:
            unselected_list.append(data)

    return unselected_list, selected_list

def create_card_matrix(cards):
    # Ranks and suits mapping to index
    ranks = {'Ace': 0, '2': 1, '3': 2, '4': 3, '5': 4, '6': 5, '7': 6,
             '8': 7, '9': 8, '10': 9, 'Jack': 10, 'Queen': 11, 'King': 12}
    suits = {'Clubs': 0, 'Diamonds': 1, 'Hearts': 2, 'Spades': 3}

    # Initialize the matrix with zeros
    matrix = [[0]*4 for _ in range(13)]

    # Fill the matrix based on cards input
    for rank, suit in cards:
        if rank in ranks and suit in suits:
            matrix[ranks[rank]][suits[suit]] = 1

    return matrix

def deal_hand(deck, hand_size):
    return [deck.pop() for _ in range(hand_size)]

def state_function(player_hand, deck):
    return {
        'player_hand': create_card_matrix(player_hand),
        'cards_left_in_deck': create_card_matrix(deck)
    }


# Define the rewards for each hand
rewards = {
    "Royal Flush": 800,
    "Straight Flush": 800,
    "Four of a Kind": 420,
    "Full House": 160,
    "Flush": 140,
    "Straight": 120,
    "Three of a Kind": 90,
    "Two Pair": 40,
    "Pair": 20,
    "High Card": 5
}

In [5]:
import gymnasium as gym
from collections import Counter

class CardGameEnv(gym.Env):
    def __init__(self, seed=None):
        self.hand_size = 8
        self.MAX_PLAYS = 4  # Limit the number of plays to 4
        self.MAX_DISCARDS = 3  # Limit the number of discards to 3

        self.action_space = gym.spaces.MultiDiscrete([2]*(1+self.hand_size))
        self.observation_space = gym.spaces.Dict({
            'player_hand': gym.spaces.MultiBinary([13, 4]),
            'cards_left_in_deck': gym.spaces.MultiBinary([13, 4])
        })

        self.num_discards = 0
        self.num_plays = 0
        self.deck = initialize_deck()
        self.player_hand = deal_hand(self.deck, self.hand_size)
        self.player_hand.sort()
        self.state = state_function(self.player_hand, self.deck)

    #added by Ron
    def save_checkpoint(self, tmp_checkpoint_dir):
        checkpoint_path = os.path.join(tmp_checkpoint_dir, "multiturn_model.pth")
        torch.save(self.model.state_dict(), checkpoint_path)
        return tmp_checkpoint_dir

    def load_checkpoint(self, tmp_checkpoint_dir):
        checkpoint_path = os.path.join(tmp_checkpoint_dir, "multiturn_model.pth")
        self.model.load_state_dict(torch.load(checkpoint_path))


    def reset(self, seed=None, options=None):
        self.num_discards = 0
        self.num_plays = 0
        self.deck = initialize_deck()
        self.player_hand = deal_hand(self.deck, self.hand_size)
        self.player_hand.sort()
        self.state = state_function(self.player_hand, self.deck)
        return self.state, {}

    def step(self, action):
        dec = action[0]
        if self.num_discards >= self.MAX_DISCARDS:
          dec = 1
        if self.num_plays >= self.MAX_PLAYS:
          dec = 0

        selection = action[1:]
        remaining_hand, selected_hand = partition_selected_unselected(self.player_hand, selection, max_select=5)

        if dec == 1:  # Play
            self.num_plays += 1
            hand_rank_counts = Counter([rank for rank, _ in selected_hand])
            hand_suit_counts = Counter([suit for _, suit in selected_hand])


            # Define the mapping of card ranks to their order
            card_order = {'Ace': 0, '2': 1, '3': 2, '4': 3, '5': 4, '6': 5, '7': 6, '8': 7, '9': 8, '10': 9, 'Jack': 10, 'Queen': 11, 'King': 12}

            # Check for different hand categories
            if len(hand_rank_counts) == 5 and len(hand_suit_counts) == 1 and all(rank in ['10', 'Jack', 'Queen', 'King', 'Ace'] for rank, _ in selected_hand):
                hand_category = "Royal Flush"
            elif len(hand_rank_counts) == 5 and len(hand_suit_counts) == 1 and all(rank in card_order.keys() for rank, _ in selected_hand) and sorted(selected_hand, key=lambda x: card_order[x[0]]) in (['Ace','2', '3', '4', '5'], ['2', '3', '4', '5', '6'], ['3', '4', '5', '6', '7'], ['4', '5', '6', '7', '8'], ['5', '6', '7', '8', '9'], ['6', '7', '8', '9', '10'], ['7', '8', '9', '10', 'Jack'], ['8', '9', '10', 'Jack', 'Queen'], ['9', '10', 'Jack', 'Queen', 'King']):
                hand_category = "Straight Flush"
            elif any(count == 4 for count in hand_rank_counts.values()):
                hand_category = "Four of a Kind"
            elif any(count == 3 for count in hand_rank_counts.values()) and any(count == 2 for count in hand_rank_counts.values()):
                hand_category = "Full House"
            elif len(hand_suit_counts) == 1 and len(selected_hand) == 5:
                hand_category = "Flush"
            elif len(hand_rank_counts) == 5 and all(rank in card_order.keys() for rank, _ in selected_hand) and sorted(selected_hand, key=lambda x: card_order[x[0]]) in (['Ace','2', '3', '4', '5'], ['2', '3', '4', '5', '6'], ['3', '4', '5', '6', '7'], ['4', '5', '6', '7', '8'], ['5', '6', '7', '8', '9'], ['6', '7', '8', '9', '10'], ['7', '8', '9', '10', 'Jack'], ['8', '9', '10', 'Jack', 'Queen'], ['9', '10', 'Jack', 'Queen', 'King']):
                hand_category = "Straight"
            elif any(count == 3 for count in hand_rank_counts.values()):
                hand_category = "Three of a Kind"
            elif sum(count == 2 for count in hand_rank_counts.values()) == 2:
                hand_category = "Two Pair"
            elif any(count == 2 for count in hand_rank_counts.values()):
                hand_category = "Pair"
            else:
                hand_category = "High Card"

            reward = rewards.get(hand_category, 0)  # Reward based on hand category

        elif dec == 0:  # Discard
            self.num_discards += 1
            reward = 0  # Discarding has no reward

        self.player_hand = remaining_hand + deal_hand(self.deck, len(selected_hand))
        self.player_hand.sort()
        self.state = state_function(self.player_hand, self.deck)

        done = self.num_plays == self.MAX_PLAYS or len(self.deck) == 0

        #print(f"Player's Hand: {', '.join([f'{rank} of {suit}' for rank, suit in self.player_hand])}")
        #print(f"Action: {action}, Reward: {reward}, Done: {done}")

        return self.state, reward, done, False, {}

In [6]:
# Create an instance of the CardGameEnv class
env = CardGameEnv()
env.reset()

#Nice way to test Action Space before running RLLib
action = env.action_space.sample()
print(action)
print(env.player_hand)
state, _, _, _, _ = env.step(action)
len(state['cards_left_in_deck'])
print(env.player_hand)

[0 0 1 0 0 1 1 1 0]
[('10', 'Hearts'), ('2', 'Spades'), ('3', 'Diamonds'), ('3', 'Spades'), ('7', 'Spades'), ('Ace', 'Spades'), ('King', 'Clubs'), ('King', 'Spades')]
[('10', 'Hearts'), ('3', 'Diamonds'), ('3', 'Spades'), ('9', 'Clubs'), ('9', 'Spades'), ('Jack', 'Diamonds'), ('King', 'Spades'), ('Queen', 'Hearts')]


In [7]:
# Create an instance of the CardGameEnv class
env = CardGameEnv()
env.reset()

env.observation_space.sample()

# Testing if our observation space matches is being returned by our step function
env.observation_space.contains(state)

True

In [8]:
state, _ = env.reset()
env.observation_space.contains(state)

True

In [9]:
# Create an instance of the CardGameEnv class
env = CardGameEnv()
env.reset()

total_reward = 0  # Initialize total reward

# Game loop
while True:
    print("Player's Hand:")
    for rank, suit in env.player_hand:
        print(f"{rank} of {suit}")

    action = env.action_space.sample()

    state, reward, done, _, _ = env.step(action) # Ignoring False, {}

    total_reward += reward  # Accumulate the reward obtained in each step

    print(f"Decision made by the player: {action}")
    print(f"Total Reward after this turn: {total_reward}")

    if done:
        break

print("Game over")

Player's Hand:
10 of Clubs
2 of Hearts
3 of Diamonds
4 of Spades
6 of Hearts
9 of Diamonds
Ace of Clubs
King of Hearts
Decision made by the player: [1 1 1 1 0 0 0 0 1]
Total Reward after this turn: 5
Player's Hand:
4 of Spades
6 of Diamonds
6 of Hearts
7 of Hearts
8 of Clubs
9 of Diamonds
Ace of Clubs
King of Clubs
Decision made by the player: [0 0 0 0 0 1 1 0 0]
Total Reward after this turn: 5
Player's Hand:
4 of Hearts
4 of Spades
6 of Diamonds
6 of Hearts
7 of Hearts
7 of Spades
Ace of Clubs
King of Clubs
Decision made by the player: [1 1 0 0 1 1 0 0 1]
Total Reward after this turn: 10
Player's Hand:
10 of Diamonds
3 of Spades
4 of Diamonds
4 of Spades
6 of Diamonds
7 of Spades
Ace of Clubs
Queen of Spades
Decision made by the player: [0 0 0 1 1 0 0 0 0]
Total Reward after this turn: 10
Player's Hand:
10 of Diamonds
3 of Spades
6 of Diamonds
7 of Spades
Ace of Clubs
Jack of Diamonds
Queen of Clubs
Queen of Spades
Decision made by the player: [0 0 0 1 1 0 1 1 1]
Total Reward after th

## 2) Use RLlib to get as high an expected score as possible with 4 Plays and 3 Discards.

In [10]:
import ray
from ray import tune, air, train
from ray.rllib.algorithms.ppo import PPOConfig
from functools import partial

In [11]:
if ray.is_initialized():
  ray.shutdown()
ray.init(num_cpus=20)

2024-04-22 18:57:36,748	INFO worker.py:1752 -- Started a local Ray instance.


0,1
Python version:,3.10.11
Ray version:,2.10.0


[36m(PPO pid=14588)[0m Trainable.setup took 15.198 seconds. If your trainable is slow to initialize, consider setting reuse_actors=True to reduce actor creation overheads.


In [None]:
config = (PPOConfig()
          .environment(CardGameEnv)
          .framework('torch')
          .training(gamma=1)
          .rollouts(num_rollout_workers=19)
)

stop = {"timesteps_total": 2000000}

tuner = tune.Tuner(
    "PPO",
    param_space=config.to_dict(),
    run_config=air.RunConfig(stop=stop, checkpoint_config=train.CheckpointConfig(checkpoint_frequency=50)),
)

tuner.fit()

0,1
Current time:,2024-04-22 19:03:08
Running for:,00:05:29.62
Memory:,20.8/127.9 GiB

Trial name,status,loc,iter,total time (s),ts,reward,episode_reward_max,episode_reward_min,episode_len_mean
PPO_CardGameEnv_dd24e_00000,RUNNING,127.0.0.1:14588,27,307.185,108000,79.1069,470,20,6.37321


## 3) Use RLlib to get as high an expected score as possible with 1 Play and 1 Discard.

In [None]:
import gymnasium as gym
from collections import Counter

class CardGameEnv_One_play(gym.Env):
    def __init__(self, seed=None):
        self.hand_size = 8
        self.MAX_PLAYS = 1  # Limit the number of plays to 1
        self.MAX_DISCARDS = 1  # Limit the number of discards to 1

        self.action_space = gym.spaces.MultiDiscrete([2]*(1+self.hand_size))
        self.observation_space = gym.spaces.Dict({
            'player_hand': gym.spaces.MultiBinary([13, 4]),
            'cards_left_in_deck': gym.spaces.MultiBinary([13, 4])
        })

        self.num_discards = 0
        self.num_plays = 0
        self.deck = initialize_deck()
        self.player_hand = deal_hand(self.deck, self.hand_size)
        self.player_hand.sort()
        self.state = state_function(self.player_hand, self.deck)


    def reset(self, seed=None, options=None):
        self.num_discards = 0
        self.num_plays = 0
        self.deck = initialize_deck()
        self.player_hand = deal_hand(self.deck, self.hand_size)
        self.player_hand.sort()
        self.state = state_function(self.player_hand, self.deck)
        return self.state, {}

    #added by Ron
    def save_checkpoint(self, tmp_checkpoint_dir):
        checkpoint_path = os.path.join(tmp_checkpoint_dir, "singleturn_model.pth")
        torch.save(self.model.state_dict(), checkpoint_path)
        return tmp_checkpoint_dir

    def load_checkpoint(self, tmp_checkpoint_dir):
        checkpoint_path = os.path.join(tmp_checkpoint_dir, "single_model.pth")
        self.model.load_state_dict(torch.load(checkpoint_path))

    def step(self, action):
        dec = action[0]
        if self.num_discards >= self.MAX_DISCARDS:
          dec = 1
        if self.num_plays >= self.MAX_PLAYS:
          dec = 0

        selection = action[1:]
        remaining_hand, selected_hand = partition_selected_unselected(self.player_hand, selection, max_select=5)

        if dec == 1:  # Play
            self.num_plays += 1
            hand_rank_counts = Counter([rank for rank, _ in selected_hand])
            hand_suit_counts = Counter([suit for _, suit in selected_hand])


            # Define the mapping of card ranks to their order
            card_order = {'Ace': 0, '2': 1, '3': 2, '4': 3, '5': 4, '6': 5, '7': 6, '8': 7, '9': 8, '10': 9, 'Jack': 10, 'Queen': 11, 'King': 12}

            # Check for different hand categories
            if len(hand_rank_counts) == 5 and len(hand_suit_counts) == 1 and all(rank in ['10', 'Jack', 'Queen', 'King', 'Ace'] for rank, _ in selected_hand):
                hand_category = "Royal Flush"
            elif len(hand_rank_counts) == 5 and len(hand_suit_counts) == 1 and all(rank in card_order.keys() for rank, _ in selected_hand) and sorted(selected_hand, key=lambda x: card_order[x[0]]) in (['Ace','2', '3', '4', '5'], ['2', '3', '4', '5', '6'], ['3', '4', '5', '6', '7'], ['4', '5', '6', '7', '8'], ['5', '6', '7', '8', '9'], ['6', '7', '8', '9', '10'], ['7', '8', '9', '10', 'Jack'], ['8', '9', '10', 'Jack', 'Queen'], ['9', '10', 'Jack', 'Queen', 'King']):
                hand_category = "Straight Flush"
            elif any(count == 4 for count in hand_rank_counts.values()):
                hand_category = "Four of a Kind"
            elif any(count == 3 for count in hand_rank_counts.values()) and any(count == 2 for count in hand_rank_counts.values()):
                hand_category = "Full House"
            elif len(hand_suit_counts) == 1 and len(selected_hand) == 5:
                hand_category = "Flush"
            elif len(hand_rank_counts) == 5 and all(rank in card_order.keys() for rank, _ in selected_hand) and sorted(selected_hand, key=lambda x: card_order[x[0]]) in (['Ace','2', '3', '4', '5'], ['2', '3', '4', '5', '6'], ['3', '4', '5', '6', '7'], ['4', '5', '6', '7', '8'], ['5', '6', '7', '8', '9'], ['6', '7', '8', '9', '10'], ['7', '8', '9', '10', 'Jack'], ['8', '9', '10', 'Jack', 'Queen'], ['9', '10', 'Jack', 'Queen', 'King']):
                hand_category = "Straight"
            elif any(count == 3 for count in hand_rank_counts.values()):
                hand_category = "Three of a Kind"
            elif sum(count == 2 for count in hand_rank_counts.values()) == 2:
                hand_category = "Two Pair"
            elif any(count == 2 for count in hand_rank_counts.values()):
                hand_category = "Pair"
            else:
                hand_category = "High Card"

            reward = rewards.get(hand_category, 0)  # Reward based on hand category

        elif dec == 0:  # Discard
            self.num_discards += 1
            reward = 0  # Discarding has no reward

        self.player_hand = remaining_hand + deal_hand(self.deck, len(selected_hand))
        self.player_hand.sort()
        self.state = state_function(self.player_hand, self.deck)

        done = self.num_plays == self.MAX_PLAYS or len(self.deck) == 0

        #print(f"Player's Hand: {', '.join([f'{rank} of {suit}' for rank, suit in self.player_hand])}")
        #print(f"Action: {action}, Reward: {reward}, Done: {done}")

        return self.state, reward, done, False, {}

In [None]:
# Create an instance of the CardGameEnv class
env = CardGameEnv_One_play()
env.reset()

#Nice way to test Action Space before running RLLib
action = env.action_space.sample()
print(action)
print(env.player_hand)
state, _, _, _, _ = env.step(action)
len(state['cards_left_in_deck'])
print(env.player_hand)

In [None]:
# Create an instance of the CardGameEnv_One_play class
env = CardGameEnv_One_play()
env.reset()

env.observation_space.sample()

# Testing if our observation space matches is being returned by our step function
env.observation_space.contains(state)

In [None]:
state, _ = env.reset()
env.observation_space.contains(state)

In [None]:
# Create an instance of the CardGameEnv_One_play class
env = CardGameEnv_One_play()
env.reset()

total_reward = 0  # Initialize total reward

# Game loop
while True:
    print("Player's Hand:")
    for rank, suit in env.player_hand:
        print(f"{rank} of {suit}")

    action = env.action_space.sample()

    state, reward, done, _, _ = env.step(action) # Ignoring False, {}

    total_reward += reward  # Accumulate the reward obtained in each step

    print(f"Decision made by the player: {action}")
    print(f"Total Reward after this turn: {total_reward}")

    if done:
        break

print("Game over")

In [None]:
config = (PPOConfig()
          .environment(CardGameEnv_One_play)
          .framework('torch')
          .training(gamma=1)
          .rollouts(num_rollout_workers=19)
)

stop = {"timesteps_total": 1000000}

tuner = tune.Tuner(
    "PPO",
    param_space=config.to_dict(),
    run_config=air.RunConfig(stop=stop, checkpoint_config=train.CheckpointConfig(checkpoint_frequency=50)),
)

tuner.fit()

## Thank you!