In [2]:
import gymnasium as gym
from gymnasium import spaces, vector
import numpy as np
from scipy.stats import binom, nbinom, beta, poisson, gamma, norm, geom
import random
import ray
from ray import tune, air
from ray.rllib.algorithms.ppo import PPOConfig
from functools import partial
from itertools import combinations

import numpy as np
import gymnasium as gym
from gymnasium import spaces
from gymnasium.utils import seeding
import random



# Question 1: 1 play and 1 discard

In [12]:
suit_to_int = {'Hearts': 0, 'Diamonds': 1, 'Clubs': 2, 'Spades': 3}
val_to_int = {'2': 0, '3': 1, '4': 2, '5': 3, '6': 4, '7': 5, '8': 6, '9': 7, '10': 8, 'J': 9, 'Q': 10, 'K': 11, 'A': 12}
def card_to_int(card):
    """converts each cart to a unique integer

    Args:
        card (_type_): _description_

    Returns:
        _type_: _description_
    """
    return card[1] * 13 + card[0]


class CardGameEnv(gym.Env):
    def __init__(self, seed = None):
        # self.action_space = spaces.Box(low=0, high=1, shape=(8,), dtype=np.float32)  # Binary array to represent discarding individual cards
        # action space is one of 2**8 options, later turned into a list of 1s and 0s
        # 1s representing a card to discard
        # 0 representing a card to keep
        self.action_space = spaces.Tuple([spaces.Discrete(2**8), spaces.Discrete(2)])
        self.observation_space = spaces.Box(low=0, high=51, shape = [8] ,dtype=np.int16)
        self.deck =  [(value, suit) for value in range(0, 13) for suit in range(4)] 
        self.plays = 1
        self.discards = 1
        self.total_reward = 0
        self.seed()

    def seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    def reset(self, *, seed=None, options=None):
        self.hand = self.draw_hand(8)  # Number, Suit
        # print(self.hand)
        self.discards = 1
        self.plays = 1
        self.total_reward = 0
        self.deck =  [(value, suit) for value in range(0, 13) for suit in range(4)] 
        return np.array([card_to_int(card) for card in self.hand]), {}

    def draw_hand(self, num_cards_to_draw):
        """draws num_cards_to_draw number of cards from remaining deck and removes from avail deck

    Args:
        num_cards_to_draw (_type_): number of cards to draw

    Returns:
        _type_: list of newly drawn cards
    """
        cards = []
        for i in range(num_cards_to_draw):
            card = random.choice(self.deck)
            #  Remove the selected card from available deck
            self.deck.remove(card)
            cards.append(card)
        # print("Cards remaining:", len(self.deck))
        return cards
    def action_to_list(self, action):
        bin_num = bin(action)
        list_of_nums = [int(x) for x in str(bin_num)[2:]]
        while len(list_of_nums) < 8:
            list_of_nums.insert(0, 0)
        return list_of_nums
    def step(self, action):
        if action[1] == 0:
            self.discards -= 1
        else: # choosing to play, so collect reward
            reward = self.calculate_hand_value(self.hand)
            self.total_reward += reward
            self.plays -= 1

        # either way, perform discard stuff

        # Grab cards to discard
        action = self.action_to_list(action[0])
        discards = [idx for idx, val in enumerate(action) if val == 1]
        num_discards = len(discards)
        # Ensure that number of discards is at most 5 (prevent invalid action space)
        while len(discards) > 5:
            discards.pop()
            num_discards = len(discards)

        if num_discards > 0:  # choosing to discard something
            new_cards = self.draw_hand(num_discards)
            # Calculate remaining indices ensuring they are within the range of self.hand
            remaining_indices = [i for i in range(8) if i not in discards]
            self.hand = [self.hand[idx] for idx in remaining_indices] + new_cards

        # check for terminal state
        if self.discards == 0 or self.plays == 0:
            done = True
        else:
            done = False

        # If game is over, need to add up total_reward if last action was discard
        if self.discards == 0:
            reward = self.calculate_hand_value(self.hand)
            self.total_reward += reward
        
        return np.array([card_to_int(card) for card in self.hand]), self.total_reward, done, False, {}


    def calculate_hand_value(self, hand):
        # Generate all combinations of 5 cards
        combinations_5 = combinations(hand, 5)
        best_reward = 0
        for combo in combinations_5:
            card_values = sorted([card[0] for card in combo])
            card_suits = [card[1] for card in combo]
            counts = np.bincount(card_values)
            num_unique_cards = np.count_nonzero(counts)
            is_flush = len(set(card_suits)) == 1

            if num_unique_cards == 2:
                if 4 in counts:
                    reward = 420  # Four of a Kind
                else:
                    reward = 160  # Full House
            elif num_unique_cards == 3:
                if 3 in counts:
                    reward = 90  # Three of a Kind
                else:
                    reward = 40  # Two Pair
            elif num_unique_cards == 4:
                reward = 20  # Pair
            elif num_unique_cards == 5:
                is_straight = all(card_values[i] == card_values[i-1] + 1 for i in range(1, len(card_values)))
                if is_straight:
                    if is_flush:
                        reward = 800  # Straight Flush
                    else:
                        reward = 120  # Straight
                elif is_flush:
                    reward = 140  # Flush
                else:
                    reward = 5  # High Card
            
            if reward > best_reward:
                best_reward = reward
        return best_reward


In [13]:
# Test the environment
env = CardGameEnv()
obs, _ = env.reset()
total_reward = 0
done = False
while not done:
    assert obs.shape 
    action = env.action_space.sample()  # Random action for testing
    obs, reward, done, _, _ = env.step(action)
    total_reward += reward
print("Reward:", total_reward)

Reward: 20


In [14]:
if ray.is_initialized():
  ray.shutdown()

runtime_env = {"py_modules": ["../.."]}
ray.init(runtime_env=runtime_env)

2024-05-11 14:02:39,030	INFO worker.py:1642 -- Started a local Ray instance.
2024-05-11 14:02:39,106	INFO packaging.py:518 -- Creating a file package for local directory '../..'.
2024-05-11 14:02:39,213	INFO packaging.py:346 -- Pushing file package 'gcs://_ray_pkg_8be5750dc399be9c.zip' (9.74MiB) to Ray cluster...
2024-05-11 14:02:39,265	INFO packaging.py:359 -- Successfully pushed file package 'gcs://_ray_pkg_8be5750dc399be9c.zip'.


0,1
Python version:,3.10.12
Ray version:,2.7.0


In [15]:
config = (PPOConfig()
          .environment(CardGameEnv)
          .framework('torch')
          .training(gamma=0.9,)
          .rollouts(num_rollout_workers=3)
)
stop = {"timesteps_total": 100000}

tuner = tune.Tuner(
    "PPO",
    param_space=config.to_dict(),
    run_config=air.RunConfig(stop=stop),
)

res = tuner.fit()

0,1
Current time:,2024-05-11 14:07:55
Running for:,00:05:14.90
Memory:,32.8/125.5 GiB

Trial name,# failures,error file
PPO_CardGameEnv_cda9a_00000,1,/home/healthcare/ray_results/PPO_2024-05-11_14-02-40/PPO_CardGameEnv_cda9a_00000_0_2024-05-11_14-02-40/error.txt

Trial name,status,loc,iter,total time (s),ts,reward,episode_reward_max,episode_reward_min,episode_len_mean
PPO_CardGameEnv_cda9a_00000,ERROR,206.211.132.160:1212702,22,300.708,88000,62.7263,800,5,1


2024-05-11 14:07:55,312	ERROR tune_controller.py:1502 -- Trial task failed for trial PPO_CardGameEnv_cda9a_00000
Traceback (most recent call last):
  File "/home/healthcare/.local/lib/python3.10/site-packages/ray/air/execution/_internal/event_manager.py", line 110, in resolve_future
    result = ray.get(future)
  File "/home/healthcare/.local/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/home/healthcare/.local/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/home/healthcare/.local/lib/python3.10/site-packages/ray/_private/worker.py", line 2547, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(UnboundLocalError): [36mray::PPO.train()[39m (pid=1212702, ip=206.211.132.160, actor_id=d872ee18875f55032aba028201000000, repr=PPO)
  File "/home/healthcare/.local/lib/python3.10/site-packages/ray/tune/trainabl

# Question 2: 4 plays and 3 discards

In [8]:


suit_to_int = {'Hearts': 0, 'Diamonds': 1, 'Clubs': 2, 'Spades': 3}
val_to_int = {'2': 0, '3': 1, '4': 2, '5': 3, '6': 4, '7': 5, '8': 6, '9': 7, '10': 8, 'J': 9, 'Q': 10, 'K': 11, 'A': 12}
def card_to_int(card):
    """converts each cart to a unique integer

    Args:
        card (_type_): _description_

    Returns:
        _type_: _description_
    """
    return card[1] * 13 + card[0]


class CardGameEnv(gym.Env):
    def __init__(self, seed = None):
        # self.action_space = spaces.Box(low=0, high=1, shape=(8,), dtype=np.float32)  # Binary array to represent discarding individual cards
        # action space is one of 2**8 options, later turned into a list of 1s and 0s
        # 1s representing a card to discard
        # 0 representing a card to keep
        self.action_space = spaces.Tuple([spaces.Discrete(2**8), spaces.Discrete(2)])
        self.observation_space = spaces.Box(low=0, high=51, shape = [8] ,dtype=np.int16)
        self.deck =  [(value, suit) for value in range(0, 13) for suit in range(4)] 
        self.plays = 4
        self.discards = 3
        self.total_reward = 0
        self.seed()

    def seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    def reset(self, *, seed=None, options=None):
        self.hand = self.draw_hand(8)  # Number, Suit
        # print(self.hand)
        self.discards = 3
        self.plays = 4
        self.total_reward = 0
        self.deck =  [(value, suit) for value in range(0, 13) for suit in range(4)] 
        return np.array([card_to_int(card) for card in self.hand]), {}

    def draw_hand(self, num_cards_to_draw):
        """draws num_cards_to_draw number of cards from remaining deck and removes from avail deck

    Args:
        num_cards_to_draw (_type_): number of cards to draw

    Returns:
        _type_: list of newly drawn cards
    """
        cards = []
        for i in range(num_cards_to_draw):
            card = random.choice(self.deck)
            #  Remove the selected card from available deck
            self.deck.remove(card)
            cards.append(card)
        # print("Cards remaining:", len(self.deck))
        return cards
    def action_to_list(self, action):
        bin_num = bin(action)
        list_of_nums = [int(x) for x in str(bin_num)[2:]]
        while len(list_of_nums) < 8:
            list_of_nums.insert(0, 0)
        return list_of_nums
    def step(self, action):
        if action[1] == 0:
            self.discards -= 1
        else: # choosing to play, so collect reward
            reward = self.calculate_hand_value(self.hand)
            self.total_reward += reward
            self.plays -= 1

        # either way, perform discard stuff

        # Grab cards to discard
        action = self.action_to_list(action[0])
        discards = [idx for idx, val in enumerate(action) if val == 1]
        num_discards = len(discards)
        # Ensure that number of discards is at most 5 (prevent invalid action space)
        while len(discards) > 5:
            discards.pop()
            num_discards = len(discards)

        if num_discards > 0:  # choosing to discard something
            new_cards = self.draw_hand(num_discards)
            # Calculate remaining indices ensuring they are within the range of self.hand
            remaining_indices = [i for i in range(8) if i not in discards]
            self.hand = [self.hand[idx] for idx in remaining_indices] + new_cards

        # check for terminal state
        if self.discards == 0 or self.plays == 0:
            done = True
        else:
            done = False

        # If game is over, need to add up total_reward if last action was discard
        if self.discards == 0:
            reward = self.calculate_hand_value(self.hand)
            self.total_reward += reward
        
        return np.array([card_to_int(card) for card in self.hand]), self.total_reward, done, False, {}


    def calculate_hand_value(self, hand):
        # Generate all combinations of 5 cards
        combinations_5 = combinations(hand, 5)
        best_reward = 0
        for combo in combinations_5:
            card_values = sorted([card[0] for card in combo])
            card_suits = [card[1] for card in combo]
            counts = np.bincount(card_values)
            num_unique_cards = np.count_nonzero(counts)
            is_flush = len(set(card_suits)) == 1

            if num_unique_cards == 2:
                if 4 in counts:
                    reward = 420  # Four of a Kind
                else:
                    reward = 160  # Full House
            elif num_unique_cards == 3:
                if 3 in counts:
                    reward = 90  # Three of a Kind
                else:
                    reward = 40  # Two Pair
            elif num_unique_cards == 4:
                reward = 20  # Pair
            elif num_unique_cards == 5:
                is_straight = all(card_values[i] == card_values[i-1] + 1 for i in range(1, len(card_values)))
                if is_straight:
                    if is_flush:
                        reward = 800  # Straight Flush
                    else:
                        reward = 120  # Straight
                elif is_flush:
                    reward = 140  # Flush
                else:
                    reward = 5  # High Card
            
            if reward > best_reward:
                best_reward = reward
        return best_reward


Reward: 280
[0, 0, 0, 0, 1, 1, 0, 0]


(array([10, 44, 37, 40, 46, 12, 29, 27]), {})

In [None]:
# Test the environment
env = CardGameEnv()
obs, _ = env.reset()
total_reward = 0
done = False
while not done:
    assert obs.shape 
    action = env.action_space.sample()  # Random action for testing
    obs, reward, done, _, _ = env.step(action)
    total_reward += reward
print("Reward:", total_reward)

In [9]:
if ray.is_initialized():
  ray.shutdown()

runtime_env = {"py_modules": ["../.."]}
ray.init(runtime_env=runtime_env)

2024-05-11 13:51:39,380	INFO worker.py:1642 -- Started a local Ray instance.
2024-05-11 13:51:39,471	INFO packaging.py:518 -- Creating a file package for local directory '../..'.
2024-05-11 13:51:39,565	INFO packaging.py:346 -- Pushing file package 'gcs://_ray_pkg_8be5750dc399be9c.zip' (9.74MiB) to Ray cluster...
2024-05-11 13:51:39,612	INFO packaging.py:359 -- Successfully pushed file package 'gcs://_ray_pkg_8be5750dc399be9c.zip'.


0,1
Python version:,3.10.12
Ray version:,2.7.0


In [11]:
config = (PPOConfig()
          .environment(CardGameEnv)
          .framework('torch')
          .training(gamma=0.9,)
          .rollouts(num_rollout_workers=3)
)
stop = {"timesteps_total": 100000}

tuner = tune.Tuner(
    "PPO",
    param_space=config.to_dict(),
    run_config=air.RunConfig(stop=stop),
)

res = tuner.fit()


0,1
Current time:,2024-05-11 14:01:27
Running for:,00:05:24.77
Memory:,32.7/125.5 GiB

Trial name,status,loc,iter,total time (s),ts,reward,episode_reward_max,episode_reward_min,episode_len_mean
PPO_CardGameEnv_e07b5_00000,TERMINATED,206.211.132.160:1208549,25,317.005,100000,634.359,3430,20,4.74228


[2m[36m(PPO pid=1208549)[0m Checkpoint successfully created at: Checkpoint(filesystem=local, path=/home/healthcare/ray_results/PPO_2024-05-11_13-56-02/PPO_CardGameEnv_e07b5_00000_0_2024-05-11_13-56-02/checkpoint_000000)
2024-05-11 14:01:27,849	INFO tune.py:1143 -- Total run time: 325.32 seconds (324.72 seconds for the tuning loop).
