# 使用機器學習的方法改善策略

# import package

In [1]:
# import packages
## niuniu function
from niuniu_func import *

## caculating
import random
import numpy as np
from collections import Counter

## torch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Categorical
import torch.nn.functional as F
import multiprocessing as mp

## os
import os

# build niuniu env

In [2]:
# env of niuniu
# set myself as player 0
class NiuNiuEnv:
    # init 
    def __init__(self):
        # generate deck
        self.deck = self.generate_deck()
        # generate player == 4
        self.players = [[] for _ in range(4)]
        self.banker_index = -1
        # banker multiplier
        self.banker_multiplier = 1
        # bet number
        self.bets = [0, 0, 0, 0]
        # state, 0: bank step, 1: bet step, 2: result step
        self.current_phase = 0
        # reset
        self.reset()

    # generate deck
    def generate_deck(self):
        suits = ['heart', 'spade', 'diamond', 'club']
        ranks = ['2', '3', '4', '5', '6', '7', '8', '9', '10', 'J', 'Q', 'K', 'A']
        return [(suit, rank) for suit in suits for rank in ranks]

    # reset
    def reset(self):
        # regenerate deck & shuffle
        self.deck = self.generate_deck()        
        random.shuffle(self.deck)

        # every player have 4 cards
        self.players = [[self.deck.pop() for _ in range(4)] for _ in range(4)]

        # init bets
        self.bets = [0] * 4
        self.banker_index = -1
        self.banker_multiplier = 1

        # init step
        self.current_phase = 0

        # reload state
        self.state = self.get_state()
        return self.state
    
    # get myself's hand number
    def get_state(self):
        # myself's hand
        state = []
        for card in self.players[0]:
            state.append(get_suit_rank(card))
            state.append(get_card_rank(card))
        # which step
        state.append(self.current_phase)
        # who is banker
        state.append(self.banker_index)
        # banker multiplayer
        state.append(self.banker_multiplier)
        # every player's bet
        state.extend(self.bets)
        return np.array(state, dtype=np.float32)
        
    # step    
    def step(self, action):
        """
        action: [banker_action, bet_action]
        banker_action: 0-4 is baker multi
        bet_action: 1-5 is bet multi
        """
        # unpack action
        banker_action, bet_action = action 
        # banker
        self.banker_bid = banker_action
        # bet
        self.bet_amount = bet_action

        """
        step 1 : decide whether to get banker
            * myself : by ppo agent
            * others : by simulate_ev to decide
        """
        bank_multipliers = [simulate_ev(self.players[i], 100000)[0] for i in range(4)]
        bank_multipliers[0] = self.banker_bid
        # run time : 22s

        """
        step 2 : decide final banker(the max multiplier)
            * if all not want to be banker, random choose one & set multiplier = 1
            * if more than one have same multiplier, random choose one
        """
        max_bet = max(bank_multipliers)
        if max_bet == 1:
            random_banker = random.choice(range(4))
            bank_multipliers[random_banker] = 1
        banker_candidates = [i for i, b in enumerate(bank_multipliers) if b == max_bet]
        self.banker_index = random.choice(banker_candidates)
        self.banker_multiplier = max_bet
        banker_hand = self.players[self.banker_index]

        # whether myself is banker
        is_banker = (self.banker_index == 0)

        # go to next action -- bet
        self.current_phase = 1

        # bet action
        if is_banker:
            """
            step 3 : if myself is banker
                * I don't need to bet
                * others use `calculate_ev_against_banker` to bet, besides
                if banker multiplier over 3, we assume banker have niu
            """
            self.bets[0] = self.bet_amount
            for i in range(1, 4):
                have_niu = self.banker_multiplier >= 3
                self.bets[i] = calculate_ev_against_banker(self.players[i], 100000, have_niu)[1]
        else:
            """
            step 4 : if myself is not banker
                * let ppo decide bet
                * others we don't care
            """
            self.bets[0] = max(1, min(5, action[1]))

        """
        step 5 : add the 5th card to every player
        """
        for i in range(4):
            self.players[i].append(self.deck.pop())

        # go to next action -- result
        self.current_phase = 2

        """
        step 6 : caculate ev of myself
            * I am banker : caculate payout of the sum of me against others(use negative)
            * I am not banker : calculate the payout against the banker
        """
        if is_banker:
            # I am banker
            total_payout = -sum(
                calculate_payout(self.players[i], banker_hand, False) * self.bets[i] * self.banker_multiplier
                for i in range(4) if i != self.banker_index
            )

            """
            step 7 : caculate reward(scaling & punishing)
            """
            # def max payout : 3 player * max bet(5) * bank multi
            max_payout = 3 * 5 * self.banker_multiplier
            # # min max function, let reward in [0, 1]
            # min_payout = -max_payout
            # reward = (total_payout - min_payout) / (max_payout - min_payout)
            reward = (total_payout) / (max_payout)

            # win big, give prize
            if reward > 0.7:
                reward += 0.2
            # lose, punish
            elif reward < 0:
                reward -= 0.2
            elif reward < -0.5:
                reward -= 0.4
            elif reward < -0.8:
                reward -= 0.6
            
            # different bank multiplier have different reward
            if self.banker_multiplier == 4:
                reward -= 0.3
            elif self.banker_multiplier == 3:
                reward -= 0.15
            elif self.banker_multiplier == 2:
                reward += 0.1

            if self.banker_multiplier >= 3 and total_payout < 0:
                reward -= 0.3


        else:
            # I am not banker
            total_payout = calculate_payout(self.players[0], banker_hand, False) * self.bets[0] * self.banker_multiplier
            
            """
            step 7 : caculate reward(scaling & punishing)
            """
            # def max payout : max bet(5) * bank multi
            max_possible_profit = 5 * self.banker_multiplier
            # # min max function, let reward in [0, 1]
            # max_possible_loss = -max_possible_profit
            # reward = (total_payout - max_possible_loss) / (max_possible_profit - max_possible_loss)
            reward = (total_payout) / (max_possible_profit)

            # win big, give prize
            if reward > 0.8:
                reward += 0.3
            elif reward > 0.5:
                reward += 0.2
            elif reward > 0.2:
                reward += 0.1
            # lose, punish
            elif reward < -0.2:
                reward -= 0.1
            elif reward < -0.5:
                reward -= 0.3
            elif reward < -0.8:
                reward -= 0.6

        """
        step 8 : finish one round
        """
        done = True

        """
        step 9 : reset
        """
        self.reset()

        return self.get_state(), reward, done, {}



## simple test
test whether niuniu env is runnable <br>
to avoid getting error of having NaN <br>

In [3]:
# test env of niuniu
def test_env():
    env = NiuNiuEnv()
    state = env.reset()
    print("Initial State:", state)
    # test 10 times
    for i in range(10):
        # random action
        action = [np.random.randint(0, 5), np.random.randint(1, 6)]
        state, reward, done, _ = env.step(action)
        if np.isnan(state).any():
            print(f"NaN detected in state at step {i}!")
        if np.isnan(reward):
            print(f"NaN detected in reward at step {i}!")
        print(f"Step {i} - State: {state}, Reward: {reward}")

test_env()
# windows run time : 3m 55s
# mac run time : 2m 37.1s
"""
result explain :
    * the first 8 numbers represent 4 card in hands, (suit, card)
    * the others represent the other states
"""

Initial State: [ 1. 10.  2.  9.  4.  2.  2.  1.  0. -1.  1.  0.  0.  0.  0.]
Step 0 - State: [ 4.  7.  1.  4.  1.  3.  3. 11.  0. -1.  1.  0.  0.  0.  0.], Reward: -0.7
Step 1 - State: [ 4. 13.  2.  9.  2.  3.  3.  3.  0. -1.  1.  0.  0.  0.  0.], Reward: -0.7
Step 2 - State: [ 1.  3.  2.  4.  2. 10.  4. 12.  0. -1.  1.  0.  0.  0.  0.], Reward: 0.5
Step 3 - State: [ 2. 11.  2.  5.  3. 12.  4.  4.  0. -1.  1.  0.  0.  0.  0.], Reward: 0.5
Step 4 - State: [ 1.  4.  3. 11.  3.  1.  1.  9.  0. -1.  1.  0.  0.  0.  0.], Reward: -0.9
Step 5 - State: [ 3.  6.  3.  5.  3.  7.  3.  1.  0. -1.  1.  0.  0.  0.  0.], Reward: 1.0
Step 6 - State: [ 1.  7.  2.  9.  1.  6.  3.  4.  0. -1.  1.  0.  0.  0.  0.], Reward: 2.3
Step 7 - State: [ 4.  5.  3.  3.  3. 13.  3.  2.  0. -1.  1.  0.  0.  0.  0.], Reward: 1.5
Step 8 - State: [ 1.  3.  3.  9.  4.  2.  4. 11.  0. -1.  1.  0.  0.  0.  0.], Reward: 0.8
Step 9 - State: [ 2.  1.  1.  2.  3.  5.  2. 13.  0. -1.  1.  0.  0.  0.  0.], Reward: -1.3


'\nresult explain :\n    * the first 8 numbers represent 4 card in hands, (suit, card)\n    * the others represent the other states\n'

# build PPO

In [4]:
# PPO value network (V(s))
class ValueNetwork(nn.Module):
    def __init__(self, input_dim):
        super(ValueNetwork, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, 128),
            # avoid pop NA in normalization
            nn.LayerNorm(128, eps=1e-5),
            nn.LeakyReLU(),
            nn.Linear(128, 128),
            nn.LayerNorm(128, eps=1e-5),
            nn.LeakyReLU(),
            # output V(s)
            nn.Linear(128, 1)
        )

    def forward(self, x):
        # let output become (batch, )
        return self.fc(x).squeeze(-1)

In [5]:
# PPO policy network
class PolicyNetwork(nn.Module):
    def __init__(self, input_dim, output_dim1, output_dim2):
        super(PolicyNetwork, self).__init__()
        self.shared_fc = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU()
        )
        # get banker
        self.banker_fc = nn.Linear(128, output_dim1)
        # bet
        self.bet_fc = nn.Linear(128, output_dim2)

    def forward(self, x):
        x = self.shared_fc(x)

        banker_logits = self.banker_fc(x)
        bet_logits = self.bet_fc(x)

        banker_probs = F.softmax(banker_logits, dim=-1)
        bet_probs = F.softmax(bet_logits, dim=-1)

        return banker_probs, bet_probs

    def select_action(self, state):
        # turns to batch
        state = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
        banker_probs, bet_probs = self.forward(state)

        banker_dist = Categorical(banker_probs)
        bet_dist = Categorical(bet_probs)

        banker_action = banker_dist.sample().item()
        bet_action = bet_dist.sample().item()

        banker_log_prob = banker_dist.log_prob(torch.tensor(banker_action))
        bet_log_prob = bet_dist.log_prob(torch.tensor(bet_action))

        return (banker_action, bet_action), banker_log_prob, bet_log_prob


In [6]:
# PPO Agent
class PPOAgent:
    def __init__(self, input_dim, output_dim1, output_dim2, lr=3e-4, gamma=0.99, eps_clip=0.2, K_epochs=10, model_path="niu_ppo_model", num_envs=8):
        self.device = torch.device("cpu")
        self.num_envs = num_envs
        
        self.policy = PolicyNetwork(input_dim, output_dim1, output_dim2).to(self.device)
        self.value = ValueNetwork(input_dim).to(self.device)
        self.optimizer_policy = optim.Adam(self.policy.parameters(), lr=lr)
        self.optimizer_value = optim.Adam(self.value.parameters(), lr=lr)

        self.gamma = gamma
        self.eps_clip = eps_clip
        self.K_epochs = K_epochs
        self.model_path = model_path
        # load saved model
        self.load_model()


    def compute_returns(self, rewards, dones):
        returns = []
        # init
        R = torch.zeros(1, dtype=torch.float32).to(self.device)
        # ensure rewards & dones is shape (T, 1)
        rewards = torch.tensor(rewards, dtype=torch.float32).view(-1, 1)
        dones = torch.tensor(dones, dtype=torch.float32).view(-1, 1)

        for t in reversed(range(len(rewards))):
            R = rewards[t] + self.gamma * R * (1 - dones[t])
            returns.insert(0, R)

        return torch.cat(returns).detach()


    def update(self, states, actions, log_probs, rewards, dones):
        returns = self.compute_returns(rewards, dones)

        states = torch.tensor(states, dtype=torch.float32).to(self.device)
        # ensure dimention
        actions = torch.tensor(actions, dtype=torch.long).view(-1, 2).to(self.device)
        # turns 1D
        old_log_probs = torch.tensor(log_probs, dtype=torch.float32).view(-1).to(self.device)

        for _ in range(self.K_epochs):
            banker_probs, bet_probs = self.policy(states)

            banker_probs = torch.nan_to_num(banker_probs, nan=0.0)
            bet_probs = torch.nan_to_num(bet_probs, nan=0.0)

            banker_dist = Categorical(banker_probs)
            bet_dist = Categorical(bet_probs)

            new_banker_log_prob = banker_dist.log_prob(actions[:, 0])
            new_bet_log_prob = bet_dist.log_prob(actions[:, 1] - 1)
            new_log_probs = new_banker_log_prob + new_bet_log_prob
            new_log_probs = torch.nan_to_num(new_banker_log_prob, nan=0.0) + torch.nan_to_num(new_bet_log_prob, nan=0.0)
    
            value_estimates = self.value(states).view(-1)
            value_estimates = torch.nan_to_num(value_estimates, nan=0.0)

            advantages = returns - value_estimates.detach()

            ratio = torch.exp(new_log_probs - old_log_probs)
            surr1 = ratio * advantages
            surr2 = torch.clamp(ratio, 1 - self.eps_clip, 1 + self.eps_clip) * advantages

            policy_loss = -torch.min(surr1, surr2).mean()
            value_loss = F.mse_loss(value_estimates, returns)

            self.optimizer_policy.zero_grad()
            policy_loss.backward()
            self.optimizer_policy.step()

            self.optimizer_value.zero_grad()
            value_loss.backward()
            self.optimizer_value.step()

    def select_action(self, state):
        # state = torch.FloatTensor(state).unsqueeze(0)
        # state = torch.nan_to_num(state, nan=0.0, posinf=1.0, neginf=-1.0)
        state = torch.FloatTensor(state).to(self.device)
        state = torch.nan_to_num(state, nan=0.0)

        banker_probs, bet_probs = self.policy(state)
        banker_probs = torch.nan_to_num(banker_probs, nan=0.2)
        bet_probs = torch.nan_to_num(bet_probs, nan=0.2)

        banker_dist = Categorical(banker_probs)
        bet_dist = Categorical(bet_probs)

        banker_action = banker_dist.sample().cpu().numpy()
        bet_action = (bet_dist.sample() + 1).cpu().numpy()

        banker_log_prob = banker_dist.log_prob(torch.tensor(banker_action, device=self.device))
        bet_log_prob = bet_dist.log_prob(torch.tensor(bet_action - 1, device=self.device))

        return np.array([banker_action, bet_action]), banker_log_prob.detach().cpu().numpy(), bet_log_prob.detach().cpu().numpy()
    
    def save_model(self):
        os.makedirs(os.path.dirname(self.model_path), exist_ok=True)
        torch.save(self.policy.state_dict(), f"{self.model_path}_policy.pth")
        torch.save(self.value.state_dict(), f"{self.model_path}_value.pth")
        print("model saved")

    def load_model(self):
        policy_path = f"{self.model_path}_policy.pth"
        value_path = f"{self.model_path}_value.pth"

        if os.path.exists(policy_path) and os.path.exists(value_path):
            self.policy.load_state_dict(torch.load(policy_path))
            self.value.load_state_dict(torch.load(value_path))
            print("load saved model")
        else:
            print("model not found, start training from begining")




# train

In [None]:
# train parameter
num_episodes = 1200
batch_size = 2048
gamma = 0.99
clip_epsilon = 0.2
lr=1e-4
update_epochs = 10
# save interval
save_interval = 5

# save path
save_model_path = "D:/python/poker_gto/ppo_models/ppo_model_v2"
num_envs = 12

In [None]:
# set niuniu env
env = NiuNiuEnv()
state_dim = len(env.get_state())
# banker bet (0~4)
banker_action_dim = 5
# player bet (1~5)
bet_action_dim = 5

# ppo agent
ppo_agent = PPOAgent(state_dim, banker_action_dim, bet_action_dim, model_path=save_model_path, num_envs=num_envs)

model not found, start training from begining


In [None]:
# train
for episode in range(num_episodes):
    state = env.reset()
    done = False
    episode_reward = 0
    states, actions, log_probs, rewards, dones = [], [], [], [], []

    while not done:
        action, banker_log_prob, bet_log_prob = ppo_agent.select_action(state)

        # ensure action
        banker_action, bet_action = action
        # ensure env
        next_state, reward, done, _ = env.step((banker_action, bet_action))

        # record data
        states.append(state)
        actions.append([banker_action, bet_action])
        log_probs.append(banker_log_prob + bet_log_prob)
        rewards.append(reward)
        dones.append(done)

        state = next_state
        episode_reward += reward

    # update PPO
    ppo_agent.update(states, actions, log_probs, rewards, dones)

    # show result
    print(f"Episode {episode + 1}: Total Reward = {episode_reward}")
    
    # save model every interval
    if (episode + 1) % save_interval == 0:
        ppo_agent.save_model()

# 1200 episode / 403m 45.3s


Episode 1: Total Reward = 2.3
Episode 2: Total Reward = 2.3
Episode 3: Total Reward = 1.3
Episode 4: Total Reward = 2.3
Episode 5: Total Reward = 3.3
model saved
Episode 6: Total Reward = 1.3
Episode 7: Total Reward = 2.3
Episode 8: Total Reward = -1.1
Episode 9: Total Reward = -1.1
Episode 10: Total Reward = -1.1
model saved
Episode 11: Total Reward = 1.6666666666666665
Episode 12: Total Reward = -1.7000000000000002
Episode 13: Total Reward = -1.9000000000000001
Episode 14: Total Reward = 1.0
Episode 15: Total Reward = 0.05000000000000002
model saved
Episode 16: Total Reward = 2.3
Episode 17: Total Reward = 3.3
Episode 18: Total Reward = -0.7166666666666666
Episode 19: Total Reward = -1.1
Episode 20: Total Reward = -1.1
model saved
Episode 21: Total Reward = 1.2666666666666666
Episode 22: Total Reward = -1.1
Episode 23: Total Reward = 0.5
Episode 24: Total Reward = 1.3
Episode 25: Total Reward = -0.7166666666666666
model saved
Episode 26: Total Reward = 1.3
Episode 27: Total Reward = 

In [7]:
# def load model
def load_model(agent, policy_model_path, value_model_path):
    print(f"using model: policy -> {policy_model_path}, value -> {value_model_path}")
    agent.policy.load_state_dict(torch.load(policy_model_path))
    agent.value.load_state_dict(torch.load(value_model_path))
    print("finish model loading")

In [8]:
# def test function
# use trained model to test
# let user input 4 card in hand, and model will output banker bet and player bet
def test_trained_model(env, agent, policy_model, value_model):
    print("start testing, enter 'exit' can leave testing mode")
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    agent.policy.to(device)
    load_model(agent, policy_model, value_model)
    while True:
        try:
            # get init state
            state = env.reset()

            # let user input 4 cards in hand
            print("\n please enter 4 cards in hand(type : heart J daimond 10 club K spade A), or enter 'exit' to leave :")
            input_cards = input().strip()

            # eixt & wrong input
            if input_cards.lower() == 'exit':
                print("finish testing !")
                break
            input_cards = input_cards.split()
            if len(input_cards) != 8:
                print("wrong input ! please enter the suit & value of 4 cards (total 8 str)")
                continue

            # parse the input hand
            player_hand = [(input_cards[i], input_cards[i + 1]) for i in range(0, 8, 2)]
            print(f"your hand : {player_hand}")

            # update state, make sure the hand information is correct
            for i in range(4):
                state[i * 2] = get_suit_rank(player_hand[i])
                state[i * 2 + 1] = get_card_rank(player_hand[i])

            # trun into pytorch and move to GPU
            state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(device)

            # predict banker multiplier by model
            with torch.no_grad():
                banker_dist, bet_dist = agent.policy(state_tensor)
                # print all banker multiplier's probability
                print(f"banker_dist: {banker_dist}")
                # print all bet multiplier's probability
                print(f"bet_dist: {bet_dist}")
                
                # use max probability to be our decision
                if hasattr(banker_dist, 'probs'):
                    banker_action = torch.argmax(banker_dist.probs).item()
                else:
                    banker_action = torch.argmax(banker_dist).item()
                    
                if hasattr(bet_dist, 'probs'):
                    bet_action = torch.argmax(bet_dist.probs).item()
                else:
                    bet_action = torch.argmax(bet_dist).item()

            print(f"the banker multiplier predict by model : {banker_action}")

            # enter whether get banker
            is_banker = input("get banker (y/n): ").strip().lower()
            if is_banker == 'y':
                print("you are banker, no need to bet")
            else:
                print("you are not banker")
                banker_multiplier = float(input("input the banker multiplier : ").strip())
                print(f"banker multiplier: {banker_multiplier}")

                # reload the state of banker multiplier
                state[-4] = banker_multiplier

                # transfer pytorch into GPU
                state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(device)

                # bet multiplier predicted by model
                with torch.no_grad():
                    _, bet_dist = agent.policy(state_tensor)
                    # print all bet multiplier's probability
                    print(f"🔍 bet_dist: {bet_dist}")
                    if hasattr(bet_dist, 'probs'):
                        bet_action = torch.argmax(bet_dist.probs).item()
                    else:
                        bet_action = torch.argmax(bet_dist).item()

                print(f"the bet multiplier predict by model: {bet_action}")

        except Exception as e:
            print(f"ERROR : {e}")


In [None]:
# practical operate
use_policy_model = "D:\python\poker_gto\ppo_models\ppo_model_v2_policy.pth"
use_value_model = "D:\python\poker_gto\ppo_models\ppo_model_v2_value.pth"

test_trained_model(env, ppo_agent, use_policy_model, use_value_model)

🔍 測試模式啟動！輸入 `exit` 可離開測試模式。
🔍 正在使用的模型: policy -> D:\python\poker_gto\ppo_models\ppo_model_v2_policy.pth, value -> D:\python\poker_gto\ppo_models\ppo_model_v2_value.pth
✅ 模型載入完成！

請輸入 4 張手牌（格式：heart J diamond 10 club J spade A），或輸入 `exit` 離開:


🎴 你的手牌: [('heart', 'J'), ('diamond', '10'), ('club', 'J'), ('spade', 'A')]
🔍 banker_dist: tensor([[1.0000e+00, 5.3097e-09, 7.0077e-09, 7.7462e-08, 5.9058e-08]])
🔍 bet_dist: tensor([[3.2299e-07, 1.0000e+00, 2.4663e-10, 4.5769e-09, 2.0982e-08]])
🤖 模型預測的搶莊倍率: 0
沒有搶到莊家
🤖 莊家的倍率: 4.0
🔍 bet_dist: tensor([[1.2889e-07, 1.0000e+00, 9.3902e-11, 1.8543e-09, 9.2701e-09]])
🤖 模型建議的下注倍率: 1

請輸入 4 張手牌（格式：heart J diamond 10 club J spade A），或輸入 `exit` 離開:
🎴 你的手牌: [('heart', 'J'), ('diamond', '10'), ('club', 'J'), ('spade', '8')]
🔍 banker_dist: tensor([[1.0000e+00, 1.1953e-09, 9.0293e-10, 1.5182e-08, 1.4504e-08]])
🔍 bet_dist: tensor([[1.8523e-09, 1.0000e+00, 1.3970e-12, 8.2057e-11, 3.9673e-10]])
🤖 模型預測的搶莊倍率: 0
沒有搶到莊家
🤖 莊家的倍率: 2.0
🔍 bet_dist: tensor([[1.1957e-09, 1.0000e+00, 9.2904e-13, 5.7513e-11, 2.8250e-10]])
🤖 模型建議的下注倍率: 1

請輸入 4 張手牌（格式：heart J diamond 10 club J spade A），或輸入 `exit` 離開:
👋 測試結束！


# find banker distribution

In [None]:
# def random hand
def generate_random_hand():
    suits = ["heart", "diamond", "club", "spade"]
    ranks = ["2", "3", "4", "5", "6", "7", "8", "9", "10", "J", "Q", "K", "A"]
    hand = np.random.choice([f"{suit} {rank}" for suit in suits for rank in ranks], 4, replace=False)
    hand = [tuple(card.split()) for card in hand]
    return hand

# def banker distribution
def test_banker_distribution(env, agent, num_tests=100):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    agent.policy.to(device)
    banker_choices = []
    
    for _ in range(num_tests):
        state = env.reset()
        player_hand = generate_random_hand()
        
        for i in range(4):
            state[i * 2] = get_suit_rank(player_hand[i])
            state[i * 2 + 1] = get_card_rank(player_hand[i])

        state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(device)
        
        with torch.no_grad():
            banker_dist, _ = agent.policy(state_tensor)
            banker_action = torch.argmax(banker_dist).item()
            banker_choices.append(banker_action)

    # caculate distribution
    counter = Counter(banker_choices)
    total = sum(counter.values())
    for action, count in sorted(counter.items()):
        print(f"倍率 {action}: {count} 次 ({count / total:.2%})")
    
    return counter


In [124]:
# test banker distribution for random hand
test_banker_distribution(env, ppo_agent, num_tests=500)

倍率 0: 500 次 (100.00%)


Counter({0: 500})

In [37]:
test_trained_model(env, ppo_agent)

🔍 測試模式啟動！輸入 `exit` 可離開測試模式。

請輸入 4 張手牌（格式：heart J diamond 10 club J spade A），或輸入 `exit` 離開:
🎴 你的手牌: [('heart', 'J'), ('diamond', '10'), ('club', 'J'), ('spade', 'A')]
📊 banker_dist 統計 (測試 100 次): {3: 100}

請輸入 4 張手牌（格式：heart J diamond 10 club J spade A），或輸入 `exit` 離開:
👋 測試結束！


# to do list
* **(finish)** 訓練時應該包含花色 : 可能將花色也轉為數值，變成一個 2 維度的 state  <br>
* **(finish)** 訓練後應該可以保存模型，並且疊家每次訓練的成果上去 <br>
* 新增一個輸入 : 假設我不是莊家時，現在的倍率是幾倍，這會影響到我後續的下注策略 <br>
* <b>(maybe finish)</b>輸入我的手牌之類的資訊後應該要可以當作回測，紀錄到模型訓練當中，並加以改進 <br>

* 完成後可能可以架設簡單的 app 或 api  <br>
* 完成後可以接著改做 德州撲克的訓練 <br>

In [None]:
def backtest_with_real_hands(env, agent):
    """
    讓使用者輸入實際手牌，讓 AI 提供決策建議，並將結果回報給模型，增強學習
    """
    print("\U0001F4CA 進入回測模式！輸入 `exit` 可離開回測模式。")
    
    while True:
        try:
            state = env.reset()

            print("\n請輸入 4 張手牌（格式：heart J diamond 10 club J spade A），或輸入 `exit` 離開:")
            input_cards = input().strip()

            if input_cards.lower() == 'exit':
                print("\U0001F44B 回測結束！")
                break

            input_cards = input_cards.split()

            if len(input_cards) != 8:
                print("❌ 錯誤！請輸入 4 張手牌的花色與數值（共 8 個字串）。")
                continue

            player_hand = [(input_cards[i], input_cards[i + 1]) for i in range(0, 8, 2)]
            print(f"\U0001F3B4 你的手牌: {player_hand}")

            for i in range(4):
                state[i] = card_value(player_hand[i])

            state_tensor = torch.tensor(state, dtype=torch.float32)

            with torch.no_grad():
                banker_dist, bet_dist = agent.policy(state_tensor)
                banker_action = torch.argmax(banker_dist.probs).item()
                bet_action = torch.argmax(bet_dist.probs).item()

            print(f"\U0001F916 AI 建議的搶莊倍率: {banker_action}")

            is_banker = input("✅ 是否搶到莊？ (y/n): ").strip().lower()
            if is_banker == 'y':
                print("\U0001F389 你是莊家！不需要下注")
                bet_action = 0
            else:
                print(f"\U0001F916 AI 建議的下注倍率: {bet_action}")

            reward = float(input("\U0001F4B0 請輸入這局的最終收益（負值代表虧損）: ").strip())

            states = [state]
            actions = [[banker_action, bet_action]]
            rewards = [reward]
            dones = [True]

            agent.update(states, actions, rewards, dones)
            print("\U0001F4C8 AI 已學習這局的結果！")

        except Exception as e:
            print(f"❌ 發生錯誤: {e}")


# testing reward settings

In [11]:
import itertools
import numpy as np
import torch
from collections import Counter
from niuniu_func import get_card_rank, get_suit_rank, calculate_payout

# === 1. 自動產生 reward config 組合 ===
scale_modes = ["normalize", "raw"]
punish_negatives = [0.1, 0.2, 0.3]
bonus_highs = [0.1, 0.2, 0.3]
penalty_big_losses = [0.1, 0.2, 0.3]
penalty_high_bid_losses = [0.1, 0.2, 0.3]

reward_configs = []
for scale, punish, bonus, big_loss, high_bid_loss in itertools.product(
        scale_modes, punish_negatives, bonus_highs, penalty_big_losses, penalty_high_bid_losses):
    config = {
        "scale_mode": scale,
        "punish_negative": punish,
        "bonus_high": bonus,
        "penalty_big_loss": big_loss,
        "penalty_high_bid_loss": high_bid_loss
    }
    reward_configs.append(config)

print(f"✅ 共產生 {len(reward_configs)} 組 reward config")

# === 2. 測試 banker 分佈 ===
def generate_random_hand():
    suits = ["heart", "diamond", "club", "spade"]
    ranks = ["2", "3", "4", "5", "6", "7", "8", "9", "10", "J", "Q", "K", "A"]
    hand = np.random.choice([f"{suit} {rank}" for suit in suits for rank in ranks], 4, replace=False)
    return [tuple(card.split()) for card in hand]

def test_banker_distribution(env, agent, num_tests=100):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    agent.policy.to(device)
    banker_choices = []

    for _ in range(num_tests):
        state = env.reset()
        player_hand = generate_random_hand()

        for i in range(4):
            state[i * 2] = get_suit_rank(player_hand[i])
            state[i * 2 + 1] = get_card_rank(player_hand[i])

        state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(device)
        with torch.no_grad():
            banker_dist, _ = agent.policy(state_tensor)
            banker_action = torch.argmax(banker_dist).item()
            banker_choices.append(banker_action)

    counter = Counter(banker_choices)
    return counter

# === 3. 注入 reward 設定進 Env ===
def apply_reward_config_to_env_class(reward_config):
    class CustomNiuNiuEnv(NiuNiuEnv):
        def step(self, action):
            state, _, done, _ = super().step(action)

            is_banker = self.banker_index == 0
            if is_banker:
                total_payout = -sum(
                    calculate_payout(self.players[i], self.players[0], False) * self.bets[i] * self.banker_multiplier
                    for i in range(4) if i != self.banker_index
                )
                max_payout = 3 * 5 * self.banker_multiplier
                reward = total_payout / max_payout if reward_config["scale_mode"] == "raw" else (total_payout + max_payout) / (2 * max_payout)
                if reward < 0:
                    reward -= reward_config["punish_negative"]
                if reward > 0.7:
                    reward += reward_config["bonus_high"]
                if self.banker_multiplier >= 3 and total_payout < 0:
                    reward -= reward_config["penalty_high_bid_loss"]
            else:
                total_payout = calculate_payout(self.players[0], self.players[self.banker_index], False) * self.bets[0] * self.banker_multiplier
                max_possible = 5 * self.banker_multiplier
                reward = total_payout / max_possible if reward_config["scale_mode"] == "raw" else (total_payout + max_possible) / (2 * max_possible)
                if reward > 0.7:
                    reward += reward_config["bonus_high"]
                if reward < 0:
                    reward -= reward_config["punish_negative"]

            return self.get_state(), reward, True, {}

    return CustomNiuNiuEnv

# === 4. 執行訓練與測試 ===
def evaluate_reward_function(config_idx, config, episodes=60):
    EnvClass = apply_reward_config_to_env_class(config)
    env = EnvClass()
    state_dim = len(env.get_state())
    agent = PPOAgent(state_dim, 5, 5, model_path=f"grid_model_{config_idx}", num_envs=1)

    for ep in range(episodes):
        state = env.reset()
        done = False
        while not done:
            action, _, _ = agent.select_action(state)
            next_state, reward, done, _ = env.step(action)
            agent.update([state], [action], [0.0], [reward], [done])
            state = next_state

    counter = test_banker_distribution(env, agent)
    top_ratio = max(counter.values()) / sum(counter.values())
    is_bad = top_ratio > 0.8

    print(f"\n=== Config {config_idx} 完成 ===")
    print(f"是否偏斷 (>80%): {'❌' if is_bad else '✅'}")
    print(f"分佈: {counter}")
    print(f"Config: {config}")

    return {"index": config_idx, "is_bad": is_bad, "counter": counter, "config": config}

# === 5. 執行所有 config ===
results = [evaluate_reward_function(i, config) for i, config in enumerate(reward_configs)]

# === 6. 輸出結果 ===
print("\n===== 總結結果 =====")
for result in results:
    print(f"\n--- Config {result['index']} ---")
    print(f"是否偏斷 (>80%): {'❌' if result['is_bad'] else '✅'}")
    print(f"分佈: {result['counter']}")
    print(f"Config: {result['config']}")


✅ 共產生 162 組 reward config
model not found, start training from begining


KeyboardInterrupt: 

上方程式碼預計: 
3 times/ 1 minutes
60 times one config
with 162 config
3240 times --> 1080 minute

# parellel computing 