In [38]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import math
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
import numpy as np
from pypokerengine.utils.card_utils import gen_cards, estimate_hole_card_win_rate
from itertools import count
from pypokerengine.api.game import setup_config, start_poker


In [29]:
device = torch.device("cuda")
print(f"Is device avalible: {torch.cuda.is_available()}")
print(f"Number of Devices: {torch.cuda.device_count()}")
print(f"Current Device: {torch.cuda.current_device()}")
print(f"Current Device Name : {torch.cuda.get_device_name(0)}")
!nvidia-smi


Is device avalible: True
Number of Devices: 1
Current Device: 0
Current Device Name : NVIDIA GeForce RTX 3060 Laptop GPU
Wed Mar  5 18:03:55 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 572.60                 Driver Version: 572.60         CUDA Version: 12.8     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                  Driver-Model | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  NVIDIA GeForce RTX 3060 ...  WDDM  |   00000000:01:00.0 Off |                  N/A |
| N/A   45C    P0             19W /   80W |     103MiB /   6144MiB |      0%      Default |
|                                         |                        |                  N/A |
+------------------

In [30]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [None]:
class PokerBot:
    def __init__():
        pass

    def act(self, state):
        """Given a state, choose an epsilon-greedy action"""
        pass

    def cache(self, experience):
        """Add the experience to memory"""
        pass

    def recall(self):
        """Sample experiences from memory"""
        pass

    def learn(self):
        """Update online action value (Q) function with a batch of experiences"""
        pass



class PokerDQN(nn.Module):

    def __init__(self, n_observations, n_actions):
        super(PokerDQN, self).__init__()
        self.layer1 = nn.Linear(n_observations, 128)
        self.layer2 = nn.Linear(128, 128)
        self.layer3 = nn.Linear(128, n_actions)

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    def forward(self, x):
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        return self.layer3(x)

In [32]:
def encode_state(hole_card, round_state):
    """ Converts PyPokerEngine game state into a fixed-length numerical vector for DQN. """
    
    # Compute hand strength (Monte Carlo estimation)
    hand_strength = estimate_hole_card_win_rate(
        nb_simulation=1000, 
        nb_player=2, 
        hole_card=gen_cards(hole_card),  
        community_card=gen_cards(round_state["community_card"])
    )

    # Normalize pot size & stack size
    pot_size = round_state["pot"]["amount"] / 10000  # Normalize pot (adjust max chips)
    stack_size = round_state["seats"][round_state["next_player"]]["stack"] / 10000  # Normalize stack

    # Opponent action (One-hot encode last move: Fold=0, Call=1, Raise=2)
    last_action = [0, 0, 0]  # Default: [Fold, Call, Raise]
    if "street" in round_state["action_histories"]:
        last_move = round_state["action_histories"]["street"][-1]["action"]
        if last_move == "call":
            last_action = [0, 1, 0]
        elif last_move == "raise":
            last_action = [0, 0, 1]

    # Big Blind position (1 if this bot is big blind, else 0)
    bb_position = int(round_state["small_blind_pos"] != round_state["next_player"])

    # Combine all features into a state vector
    state_vector = np.array([
        hand_strength,  
        pot_size,  
        stack_size,  
        bb_position,  
        *last_action  # One-hot encoded last opponent action
    ], dtype=np.float32)

    return state_vector

actions = ["fold", "call", "raise"]  # Map action indices to poker moves


In [33]:
BATCH_SIZE = 128    # BATCH_SIZE is the number of transitions sampled from the replay buffer
GAMMA = 0.99        # GAMMA is the discount factor as mentioned in the previous section
EPS_START = 0.9     # EPS_START is the starting value of epsilon
EPS_END = 0.05      # EPS_END is the final value of epsilon
EPS_DECAY = 1000    # EPS_DECAY controls the rate of exponential decay of epsilon, higher means a slower decay
TAU = 0.005         # TAU is the update rate of the target network
LR = 1e-4           # LR is the learning rate of the ``AdamW`` optimizer


# Get number of actions
n_actions = len(actions)
# Get the number of state observations
state = encode_state
n_states = np.size(state)

policy_net = PokerDQN(n_states, n_actions).to(device)
target_net = PokerDQN(n_states, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())

optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(10000)


steps_done = 0


def select_action(state):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps_threshold:
        with torch.no_grad():
            # t.max(1) will return the largest column value of each row.
            # second column on max result is index of where max element was
            # found, so we pick action with the larger expected reward.
            return policy_net(state).max(1).indices.view(1, 1)
    else:
        return torch.tensor([[random.randint(0, len(actions) - 1)]], device=device, dtype=torch.long) #edit this with my action list


In [34]:
def optimize_model():
    if len(memory) < BATCH_SIZE:
        return

    transitions = memory.sample(BATCH_SIZE)  # Get batch of experiences
    batch_state, batch_action, batch_next_state, batch_reward = zip(*transitions)

    state_batch = torch.tensor(batch_state, dtype=torch.float32)
    action_batch = torch.tensor(batch_action, dtype=torch.long).unsqueeze(1)
    reward_batch = torch.tensor(batch_reward, dtype=torch.float32)

    # Compute Q-values from policy network
    state_action_values = policy_net(state_batch).gather(1, action_batch)
   
    next_state_batch = torch.tensor(batch_next_state, dtype=torch.float32)


    # Compute expected Q-values from target network
    next_state_values = target_net(next_state_batch).max(1)[0].detach()
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # Compute loss and update model
    loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

In [None]:
num_episodes = 100

for i_episode in range(num_episodes):
    # Initialize the environment and get its state
    state = encode_state
    state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
    for t in count():
        action = select_action(state)
        observation, reward, terminated, truncated, _ = env.step(action.item())
        reward = torch.tensor([reward], device=device)
        done = terminated or truncated

        if terminated:
            next_state = None
        else:
            next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)

        # Store the transition in memory
        memory.push(state, action, next_state, reward)

        # Move to the next state
        state = next_state

        # Perform one step of the optimization (on the policy network)
        optimize_model()

        # Soft update of the target network's weights
        # θ′ ← τ θ + (1 −τ )θ′
        target_net_state_dict = target_net.state_dict()
        policy_net_state_dict = policy_net.state_dict()
        for key in policy_net_state_dict:
            target_net_state_dict[key] = policy_net_state_dict[key]*TAU + target_net_state_dict[key]*(1-TAU)
        target_net.load_state_dict(target_net_state_dict)

NameError: name 'env' is not defined