# Poker AI Setup (Deep MCCFR)

## Imports

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import random
import pokerenv.obs_indices as indices
from pokerenv.table import Table
from treys import Deck, Evaluator, Card
from pokerenv.common import GameState, PlayerState, PlayerAction, TablePosition, Action, action_list
from pokerenv.player import Player
from pokerenv.utils import pretty_print_hand, approx_gt, approx_lte
import types

## Agent

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from collections import defaultdict
from pokerenv import Poker
import random
from typing import List, Dict, Tuple, Optional

class PokerStateEncoder:
    """Handles state encoding for poker observations"""
    def __init__(self, num_cards: int = 52):
        self.num_cards = num_cards
        
    def encode_cards(self, cards: List[int]) -> np.ndarray:
        """One-hot encode cards"""
        encoding = np.zeros(self.num_cards)
        for card in cards:
            if card is not None:  # Handle hidden cards
                encoding[card] = 1
        return encoding
    
    def encode_state(self, obs: dict) -> np.ndarray:
        """
        Encode full poker state
        Expected obs keys: hole_cards, community_cards, pot, current_bet, 
                         position, betting_history
        """
        # Encode cards
        hole_cards_enc = self.encode_cards(obs['hole_cards'])
        community_cards_enc = self.encode_cards(obs['community_cards'])
        
        # Encode numerical values
        pot_enc = np.array([obs['pot'] / 1000.0])  # Normalize pot
        bet_enc = np.array([obs['current_bet'] / 1000.0])  # Normalize bet
        
        # Encode position
        position_enc = np.zeros(6)  # Assuming 6-max table
        position_enc[obs['position']] = 1
        
        # Encode betting history (last 4 actions)
        history_enc = np.zeros(3 * 4)  # 3 possible actions * 4 last actions
        for i, action in enumerate(obs['betting_history'][-4:]):
            history_enc[i * 3 + action] = 1
            
        # Concatenate all features
        return np.concatenate([
            hole_cards_enc,
            community_cards_enc,
            pot_enc,
            bet_enc,
            position_enc,
            history_enc
        ])

class PopulationManager:
    """Manages a population of agents for training"""
    def __init__(self, num_agents: int, env: Poker):
        self.agents = [DeepMCCFR(env) for _ in range(num_agents)]
        self.performance_history = defaultdict(list)
        
    def select_training_pair(self) -> Tuple[int, int]:
        """Select two agents for training"""
        idx1, idx2 = random.sample(range(len(self.agents)), 2)
        return idx1, idx2
    
    def update_performance(self, agent_idx: int, reward: float):
        """Track agent performance"""
        self.performance_history[agent_idx].append(reward)
    
    def merge_experiences(self, frequency: int = 1000):
        """Periodically merge experiences across successful agents"""
        if len(self.performance_history[0]) % frequency == 0:
            # Get top performing agents
            avg_performances = {
                idx: np.mean(perfs[-100:])
                for idx, perfs in self.performance_history.items()
            }
            top_agents = sorted(
                avg_performances.keys(),
                key=lambda x: avg_performances[x],
                reverse=True
            )[:3]
            
            # Share experiences among top agents
            for idx1 in top_agents:
                for idx2 in top_agents:
                    if idx1 != idx2:
                        self._share_experiences(idx1, idx2)
    
    def _share_experiences(self, agent1_idx: int, agent2_idx: int):
        """Share experiences between two agents"""
        agent1 = self.agents[agent1_idx]
        agent2 = self.agents[agent2_idx]
        
        # Share advantage memories
        for player in range(2):
            memories1 = agent1.advantage_memories[player].sample(1000)
            memories2 = agent2.advantage_memories[player].sample(1000)
            
            for memory in memories1:
                agent2.advantage_memories[player].add(memory)
            for memory in memories2:
                agent1.advantage_memories[player].add(memory)

class DeepMCCFR:
    def __init__(
        self,
        env: Poker,
        hidden_size: int = 256,
        learning_rate: float = 0.001,
        memory_size: int = 1000000,
        batch_size: int = 32,
        update_freq: int = 100
    ):
        self.env = env
        self.state_encoder = PokerStateEncoder()
        
        # Calculate input size based on encoded state
        sample_obs = env.reset()
        encoded_state = self.state_encoder.encode_state(sample_obs)
        self.input_size = len(encoded_state)
        
        # Network initialization (rest remains same as before)
        self.advantage_nets = [
            PokerNetwork(self.input_size, hidden_size, env.action_space.n)
            for _ in range(2)
        ]
        
        

    def get_state_tensor(self, obs: dict) -> torch.Tensor:
        """Convert observation to tensor format"""
        encoded_state = self.state_encoder.encode_state(obs)
        return torch.FloatTensor(encoded_state).unsqueeze(0)

    def traverse_game_tree(self, player: int, traverser: int):
        """Modified to handle proper state encoding"""
        if self.env.is_terminal():
            return self.env.get_payoff(traverser)
        
        obs = self.env.get_observation()  # Get current observation
        legal_actions = self.env.legal_actions()
        
        if player == traverser:
            strategy = self.get_strategy(obs, player)
            advantages = np.zeros(self.env.action_space.n)
            
            for action in legal_actions:
                self.env.step(action)
                payoff = self.traverse_game_tree(1 - player, traverser)
                self.env.undo_step()
                
                advantages[action] = payoff
            
            # Store encoded state
            self.advantage_memories[player].add(
                (self.state_encoder.encode_state(obs), advantages, legal_actions)
            )
            
            contribution = advantages.dot(strategy)
            return contribution
            
        else:
            strategy = self.get_strategy(obs, player)
            # ... (rest remains same as before)

def train_population(
    num_agents: int,
    env: Poker,
    num_iterations: int,
    merge_freq: int = 1000
):
    """Train a population of agents"""
    pop_manager = PopulationManager(num_agents, env)
    
    for iteration in range(num_iterations):
        # Select two agents to play against each other
        idx1, idx2 = pop_manager.select_training_pair()
        agent1 = pop_manager.agents[idx1]
        agent2 = pop_manager.agents[idx2]
        
        # Play one game
        env.reset()
        done = False
        while not done:
            current_player = env.current_player()
            current_agent = agent1 if current_player == 0 else agent2
            
            obs = env.get_observation()
            action = current_agent.act(obs, current_player)
            _, reward, done, _ = env.step(action)
        
        # Update performance history
        pop_manager.update_performance(idx1, reward if current_player == 0 else -reward)
        pop_manager.update_performance(idx2, -reward if current_player == 0 else reward)
        
        # Periodically merge experiences
        if iteration % merge_freq == 0:
            pop_manager.merge_experiences()
            
        # Train both agents
        agent1.train_advantage_network(0)
        agent1.train_strategy_network(0)
        agent2.train_advantage_network(1)
        agent2.train_strategy_network(1)
    
    return pop_manager

# Example usage:
if __name__ == "__main__":
    env = Poker()
    pop_manager = train_population(
        num_agents=10,
        env=env,
        num_iterations=100000,
        merge_freq=1000
    )

## Play against Agent

In [None]:
from typing import Dict, List, Optional
import numpy as np
from pokerenv import Poker
from multi_agent_mccfr import DeepMCCFR, PopulationManager  # Import from your previous code

class PokerGame:
    """Interface for playing against trained agent"""
    def __init__(self, agent: DeepMCCFR, env: Poker):
        self.agent = agent
        self.env = env
        self.card_symbols = {
            0: "2♠", 1: "3♠", 2: "4♠", 3: "5♠", 4: "6♠", 5: "7♠", 
            6: "8♠", 7: "9♠", 8: "10♠", 9: "J♠", 10: "Q♠", 11: "K♠", 12: "A♠",
            13: "2♣", 14: "3♣", 15: "4♣", 16: "5♣", 17: "6♣", 18: "7♣",
            19: "8♣", 20: "9♣", 21: "10♣", 22: "J♣", 23: "Q♣", 24: "K♣", 25: "A♣",
            26: "2♥", 27: "3♥", 28: "4♥", 29: "5♥", 30: "6♥", 31: "7♥",
            32: "8♥", 33: "9♥", 34: "10♥", 35: "J♥", 36: "Q♥", 37: "K♥", 38: "A♥",
            39: "2♦", 40: "3♦", 41: "4♦", 42: "5♦", 43: "6♦", 44: "7♦",
            45: "8♦", 46: "9♦", 47: "10♦", 48: "J♦", 49: "Q♦", 50: "K♦", 51: "A♦"
        }
    
    def format_cards(self, cards: List[int]) -> str:
        """Convert card IDs to readable format"""
        return " ".join(self.card_symbols[c] for c in cards if c is not None)
    
    def display_game_state(self, obs: Dict):
        """Display current game state"""
        print("\n" + "="*50)
        print(f"Pot: ${obs['pot']}")
        print(f"Current bet: ${obs['current_bet']}")
        print(f"Your hole cards: {self.format_cards(obs['hole_cards'])}")
        
        community_cards = obs['community_cards']
        if any(card is not None for card in community_cards):
            print(f"Community cards: {self.format_cards(community_cards)}")
        
        print(f"Your stack: ${obs['stack']}")
        print("="*50 + "\n")
    
    def get_human_action(self) -> int:
        """Get action from human player"""
        legal_actions = self.env.legal_actions()
        action_map = {
            0: "Fold",
            1: "Check/Call",
            2: "Bet/Raise"
        }
        
        while True:
            print("\nAvailable actions:")
            for action in legal_actions:
                print(f"{action}: {action_map[action]}")
            
            try:
                action = int(input("Enter your action (number): "))
                if action in legal_actions:
                    return action
                print("Invalid action! Please choose from available actions.")
            except ValueError:
                print("Please enter a valid number!")
    
    def play_game(self, human_position: int = 0):
        """Play a full game against the agent"""
        obs = self.env.reset()
        done = False
        total_reward = 0
        
        print("\nNew game started!")
        print("You are in position", "SB" if human_position == 0 else "BB")
        
        while not done:
            self.display_game_state(obs)
            current_player = self.env.current_player()
            
            if current_player == human_position:
                # Human turn
                action = self.get_human_action()
            else:
                # Agent turn
                print("\nAgent is thinking...")
                action = self.agent.act(obs, current_player)
                action_map = {0: "folds", 1: "checks/calls", 2: "bets/raises"}
                print(f"Agent {action_map[action]}")
            
            obs, reward, done, _ = self.env.step(action)
            if done:
                total_reward = reward if human_position == 0 else -reward
        
        # Game over - display results
        self.display_game_state(obs)
        if total_reward > 0:
            print(f"\nYou won ${abs(total_reward)}!")
        elif total_reward < 0:
            print(f"\nYou lost ${abs(total_reward)}")
        else:
            print("\nIt's a draw!")
        
        return total_reward

def play_against_agent(agent: DeepMCCFR, env: Poker, num_games: int = 5):
    """Play multiple games against the agent"""
    game = PokerGame(agent, env)
    total_profit = 0
    
    for game_num in range(num_games):
        print(f"\nGame {game_num + 1}/{num_games}")
        # Alternate positions
        position = game_num % 2
        profit = game.play_game(human_position=position)
        total_profit += profit
        
        print(f"\nCurrent profit/loss: ${total_profit}")
        
        if game_num < num_games - 1:
            input("\nPress Enter to start next game...")
    
    print(f"\nFinal profit/loss over {num_games} games: ${total_profit}")
    return total_profit

# Example usage:
if __name__ == "__main__":
    # First train the agent (assuming you have the training code from before)
    env = Poker()
    pop_manager = train_population(
        num_agents=10,
        env=env,
        num_iterations=100000
    )
    
    # Get the best performing agent
    best_agent_idx = max(
        pop_manager.performance_history.keys(),
        key=lambda x: np.mean(pop_manager.performance_history[x][-1000:])
    )
    best_agent = pop_manager.agents[best_agent_idx]
    
    # Play against the best agent
    play_against_agent(best_agent, env, num_games=5)