# Install and import all the package we will be using

In [None]:
!pip install rlcard
!pip install torch
!pip install numpy

In [None]:
!git clone https://github.com/yunjiezhong/ST449_Group_N.git

In [None]:
%cd /content/ST449_Group_N

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import rlcard
from rlcard.agents import RandomAgent
from rlcard.agents import DQNAgent
from rlcard.agents import nfsp_agent
import torch
import torch.nn as nn
import sys
sys.path.insert(0, '/content/ST449_Group_N')
from my_agent_utils import LoadedAgent
from sklearn.metrics import confusion_matrix, accuracy_score, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
import json

# Train the model

## Train baseline model

This part of the code is based on the existing package `RLcard` and uses the "limit holdem" environment. The code for training the baseline model uses the existing training code `run_rl.py` in the `RLcard` package and the NFSP and DQN agents provided in the examples. We implemented our bias adjustment for model actions based on `dqn_agent.py` and `nfsp_agent.py`, and created a new `StyledNFSPAgent` class in `styled_nfsp_agent.py`. `run_rl_styled.py` is the training file for training styled NFSP based on `run_rl.py`. Specific training hyperparameters can be adjusted in the initialization of the `NFSPAgent` class in `styled_nfsp_agent.py`, and the training parameters required by `run_rl_styled.py` can be found at the bottom of the file.

In [None]:
# first train DQN agent baseline
!python examples/run_rl.py --env limit-holdem --algorithm dqn --num_episodes 100000 --log_dir experiments/dqn_baseline/

In [None]:
# train NFSP agent baseline
!python examples/run_rl.py --env limit-holdem --algorithm nfsp --num_episodes 100000 --log_dir experiments/nfsp_baseline/

In [None]:
# rename and move the trained model to models directory
!mkdir -p models
!cp experiments/dqn_baseline/model.pth models/dqn_baseline.pth
!cp experiments/nfsp_baseline/model.pth models/nfsp_baseline.pth

## Train style agents

Check the agents action bias configurations.

In [None]:
with open('style_config.json', 'r', encoding='utf-8') as f:
    style_config = json.load(f)
print(style_config)

Train all style agents.

In [None]:
import subprocess
import os

styles = list(style_config['styles'].keys())

for style in styles:
    log_dir = f"experiments/{style}/"
    os.makedirs(log_dir, exist_ok=True)
    cmd = [
        "python", "run_rl_styled.py",
        "--env", "limit-holdem",
        "--style", style,
        "--log_dir", log_dir,
        "--num_episodes", "60000",
        "--style_config", "style_config.json",
        "--opponent", "checkpoint",
        "--opponent_path", "models/nfsp_baseline.pth"
    ]
    print(f"Training {style} ...")
    subprocess.run(cmd)
    print(f"{style} training completed, model saved in {log_dir} folder.")

In [None]:
!cp experiments/agent0/NFSP_agent0_*.pth models/agent0.pth
!cp experiments/agent1/NFSP_agent1_*.pth models/agent1.pth
!cp experiments/agent2/NFSP_agent2_*.pth models/agent2.pth
!cp experiments/agent3/NFSP_agent3_*.pth models/agent3.pth
!cp experiments/agent4/NFSP_agent4_*.pth models/agent4.pth
!cp experiments/agent5/NFSP_agent5_*.pth models/agent5.pth

# Implement the UCB Algorithm

In [None]:
class UCBbandit:
    #k number of bandits
    #c: the exploration parameter
    def __init__(self, k: int, c: float = np.sqrt(2)):
        self.k = k 
        self.c = c
        self.t = 0
        self.index = None
        self.n_i = np.zeros(self.k, dtype=int)
        self.u = np.zeros(self.k)

    def selection(self):
        self.t += 1
        #try every model first
        for i in range(self.k):
            if self.n_i[i] == 0:
                self.index = i
                return i
            
        ucb = self.u + self.c * np.sqrt(np.log(self.t) / self.n_i)
        self.index = int(np.argmax(ucb))
        return self.index
    
    def update(self, reward):
        #update series
        i = self.index
        self.n_i[i] += 1
        self.u[i] += (reward - self.u[i])/self.n_i[i]

## Define the Markov Switching Process

In [None]:
#the opponent will be switching
#M is the markov matrix, the opponent have probability to switch to another one
class opponentswitching:
    def __init__(self, M, start=0, seed=1):
        self.M = np.array(M, dtype = float)
        self.type = start
        self.random = np.random.default_rng(seed)

    def switching(self):
        m = self.M
        self.type = int(self.random.choice(len(m), p=m[self.type]))
        return self.type

## Define the Oppponent identify algorithm

In [None]:
#the agent need to the style of current opponent
#we use ema
class spotopponent:
    def __init__(self, rate = 0.05):
        self.rate = rate
        self.aggression = 0.5
        self.prediction = 1

    def score(self, action):
        s = 0.5
        if action == 1:
            s = 1
        elif action == 2:
            s = 0
        elif action == 0 or action == 3:
            s = 0.5
    
        self.aggression = (1 - self.rate) * self.aggression + self.rate * s
  #get the prediected type
    def predict(self):
        if self.prediction== 1: 
            if self.aggression > 0.84: 
                self.prediction = 2 
            elif self.aggression < 0.54: 
                self.prediction = 0 

        elif self.prediction == 2: 
            if self.aggression < 0.79: 
                 self.prediction = 1

        elif self.prediction == 0:
            if self.aggression > 0.59: 
                self.prediction = 1
        
        return self.prediction


## Main Function

In [None]:
#normalization reward
#sigmoid, can adjust parameter k
def normal_reward(payoff, k = 0.6):
    return 1.0 / (1.0 + np.exp(-k * float(payoff)))
#game playing
def play_game(env, my_agent, opp_agent, num_games=200):
    env.set_agents([my_agent, opp_agent])
    payoffs = []
    opp_actions = []
    
    # map action to number
    action_dict = {'call': 0, 'raise': 1, 'fold': 2, 'check': 3}

    for _ in range(num_games):
        trajectories, game_payoffs = env.run(is_training=False)
        payoffs.append(game_payoffs[0]) 
        
        if len(trajectories) > 1:
            opp_traj = trajectories[1] 
            for item in opp_traj:
                if isinstance(item, (int, np.integer)) and 0 <= item <= 3:
                    opp_actions.append(int(item))
                elif isinstance(item, str) and item in action_dict:
                    opp_actions.append(action_dict[item])
    return float(np.mean(payoffs)), opp_actions

## Run experiments

In [None]:
experiment_results = []

if __name__ == "__main__":
    #initialize
    env = rlcard.make('limit-holdem', config={'seed': 42})
    #define agent library

    from rlcard.models.limitholdem_rule_models import LimitholdemRuleModelV1
    rule_model = LimitholdemRuleModelV1()
    rule_agent = rule_model.agents[0]

    from rlcard.models.limitholdem_rule_models_2 import LimitholdemRuleModelV2
    rule_model_2 = LimitholdemRuleModelV2()
    rule_agent_2 = rule_model_2.agents[0]

    random_agent = RandomAgent(num_actions=env.num_actions)

    dqn_agent_path = 'models/dqn_baseline.pth'
    dqn_agent = LoadedAgent(dqn_agent_path, env=env, device='cpu')

    nfsp_agent_path = 'models/nfsp_baseline.pth'
    nfsp_agent = LoadedAgent(nfsp_agent_path, env=env, device='cpu')

    my_agents = []
    #model paths
    model_paths = {
        0:'models/agent0.pth',
        1:'models/agent1.pth',
        2:'models/agent2.pth',
        3:'models/agent3.pth',
        4:'models/agent4.pth',
        5:'models/agent5.pth',
    }
    agent0 = LoadedAgent(model_paths[0], env=env, device='cpu')
    agent1 = LoadedAgent(model_paths[1], env=env, device='cpu')
    agent2 = LoadedAgent(model_paths[2], env=env, device='cpu')
    agent3 = LoadedAgent(model_paths[3], env=env, device='cpu')
    agent4 = LoadedAgent(model_paths[4], env=env, device='cpu')
    agent5 = LoadedAgent(model_paths[5], env=env, device='cpu')

    using_agents = [agent0, agent1, agent2, agent3, agent4, agent5]
    for agent in using_agents:
        my_agents.append(agent)
    opp_agents = {0: rule_agent, 1:dqn_agent, 2: rule_agent_2}


    K = len(my_agents)
    #opponoent tracker
    tracker = spotopponent(rate=0.1)
    #bandits
    bandits = {0: UCBbandit(K), 1: UCBbandit(K), 2: UCBbandit(K)}
    #markov process
    M = [[0.95, 0.025, 0.025], [0.025, 0.95, 0.025], [0.025, 0.025, 0.95]]
    M1 = [[1.0, 0.0, 0.0], [1.0, 0.0, 0.0], [1.0, 0.0, 0.0]] # only rule base 1
    M2 = [[0.0, 1.0, 0.0], [0.0, 1.0, 0.0], [0.0, 1.0, 0.0]] # only rule base 2
    M3 = [[0.0, 0.0, 1.0], [0.0, 0.0, 1.0], [0.0, 0.0, 1.0]] # only dqn
    opp_switch = opponentswitching(M)


    #load models


    #start
    ROUNDS = 300
    GAMES_PER_ROUND = 30
    total_reward = 0
    reward_history = []

    opponent_pred = []
    opponent_true = []

    for r in range(ROUNDS):
        #simulate environment
        true_type = opp_switch.switching()
        current_opp = opp_agents[true_type]

        context_type = tracker.predict()
        opponent_true.append(true_type)
        opponent_pred.append(context_type)
        
        #select bandit
        curr_bandit = bandits[context_type]
        chosen_model_idx = curr_bandit.selection()
        my_agent = my_agents[chosen_model_idx]
        curr_bandit.index = chosen_model_idx

        #game playing
        avg_payoff, opp_actions = play_game(env, my_agent, current_opp, num_games=GAMES_PER_ROUND)

        #update
        for a in opp_actions:
            tracker.score(a)

        reward = normal_reward(avg_payoff)
        curr_bandit.update(reward)
        total_reward += reward
        reward_history.append(total_reward)

        if (r+1) % 5 == 0:
           print(f"Round {r+1}: Step Reward={reward:.2f}, Total={total_reward:.2f}")

    experiment_results.append(['UCB All agents', 'Markov opponents', total_reward])
    print("\nDone.")

### Plot confusion matrix

In [None]:
#calculate the acuuraycy of classifier
accuracy = accuracy_score(opponent_true, opponent_pred)
print(f"Prediction Accuracy: {accuracy:.2%}")
#confusion matrix
cm = confusion_matrix(opponent_true, opponent_pred, labels=[0, 1, 2])
conf = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['0', '1', '2'])
conf.plot(cmap=plt.cm.Blues)
plt.title(f'(Accuracy: {accuracy:.2%})')
plt.show()

#### Define functions for three conditions

In [None]:
def run_research_single(agent1, agent2):
    env = rlcard.make('limit-holdem', config={'seed': 42})
    
    total_reward = 0
    reward_history = []
    avg_payoff_history = []
    
    ROUNDS = 300
    GAMES_PER_ROUND = 30

    for r in range(ROUNDS):
        avg_payoff, _ = play_game(env, agent1, agent2, num_games=GAMES_PER_ROUND)
        reward = normal_reward(avg_payoff)
        total_reward += reward
        reward_history.append(total_reward)
        avg_payoff_history.append(avg_payoff)
    
    return total_reward


def run_single_vs_markov(agent1, opp_agents):
    
    env = rlcard.make('limit-holdem', config={'seed': 42})
    

    M = [[0.95, 0.025, 0.025], [0.025, 0.95, 0.025], [0.025, 0.025, 0.95]]
    
    opp_switch = opponentswitching(M, start=0, seed=42)
    
    total_reward = 0
    reward_history = []
    avg_payoff_history = []
    
    ROUNDS = 300
    GAMES_PER_ROUND = 30

    for r in range(ROUNDS):
        true_type = opp_switch.switching()
        current_opp = opp_agents[true_type]
        
        avg_payoff, _ = play_game(env, agent1, current_opp, num_games=GAMES_PER_ROUND)
        reward = normal_reward(avg_payoff)
        total_reward += reward
        reward_history.append(total_reward)
        avg_payoff_history.append(avg_payoff)

    
    return total_reward

def run_ucb_vs_single(agents, opp_agent):
    env = rlcard.make('limit-holdem', config={'seed': 42})
    
    K = len(agents)
    bandit = UCBbandit(K, c=2)
    
    total_reward = 0
    reward_history = []
    avg_payoff_history = []
    
    ROUNDS = 300
    GAMES_PER_ROUND = 30

    for r in range(ROUNDS):
        chosen_model_idx = bandit.selection()
        my_agent = agents[chosen_model_idx]
        bandit.index = chosen_model_idx
        
        avg_payoff, _ = play_game(env, my_agent, opp_agent, num_games=GAMES_PER_ROUND)
        
        reward = normal_reward(avg_payoff)
        bandit.update(reward)
        total_reward += reward
        reward_history.append(total_reward)
        avg_payoff_history.append(avg_payoff)

    
    return total_reward


### Run all experiments

In [None]:
#initialize
env = rlcard.make('limit-holdem', config={'seed': 42})
#define agent library

from rlcard.models.limitholdem_rule_models import LimitholdemRuleModelV1
rule_model = LimitholdemRuleModelV1()
rule_agent = rule_model.agents[0]

from rlcard.models.limitholdem_rule_models_2 import LimitholdemRuleModelV2
rule_model_2 = LimitholdemRuleModelV2()
rule_agent_2 = rule_model_2.agents[0]

random_agent = RandomAgent(num_actions=env.num_actions)

dqn_agent_path = 'models/dqn_baseline.pth'
dqn_agent = LoadedAgent(dqn_agent_path, env=env, device='cpu')

nfsp_agent_path = 'models/nfsp_baseline.pth'
nfsp_agent = LoadedAgent(nfsp_agent_path, env=env, device='cpu')

#model paths
model_paths = {
    0:'models/agent0.pth',
    1:'models/agent1.pth',
    2:'models/agent2.pth',
    3:'models/agent3.pth',
    4:'models/agent4.pth',
    5:'models/agent5.pth',
}
agent0 = LoadedAgent(model_paths[0], env=env, device='cpu')
agent1 = LoadedAgent(model_paths[1], env=env, device='cpu')
agent2 = LoadedAgent(model_paths[2], env=env, device='cpu')
agent3 = LoadedAgent(model_paths[3], env=env, device='cpu')
agent4 = LoadedAgent(model_paths[4], env=env, device='cpu')
agent5 = LoadedAgent(model_paths[5], env=env, device='cpu')

research_agents = [agent0, agent1, agent2, agent3, agent4, agent5]
opp_agents = {0: rule_agent_2, 1:rule_agent, 2: dqn_agent}

agent_name_map = {}
for i, agent in enumerate([agent0, agent1, agent2, agent3, agent4, agent5]):
    agent_name_map[agent] = f'Agent {i}'
agent_name_map[random_agent] = 'Random'
agent_name_map[nfsp_agent] = 'NFSP baseline'

opp_name_map = {
    rule_agent_2: 'Rule base 2',
    rule_agent: 'Rule base 1',
    dqn_agent: 'DQN baseline',
    random_agent: 'Random'
    }

all_agents = [agent0, agent1, agent2, agent3, agent4, agent5, random_agent, nfsp_agent]
all_opp_agents = [rule_agent_2, rule_agent, dqn_agent, random_agent]


for agent in all_agents:
    for opp_agent in all_opp_agents:
        agent_name = agent_name_map.get(agent, getattr(agent, 'name', 'Unknown'))
        opp_name = opp_name_map.get(opp_agent, getattr(opp_agent, 'name', 'Unknown'))
        print(f"Running {agent_name} vs {opp_name}")
        result = run_research_single(agent, opp_agent)
        experiment_results.append([agent_name, opp_name, result])

for opp_agent in all_opp_agents:
    opp_name = opp_name_map.get(opp_agent, getattr(opp_agent, 'name', 'Unknown'))
    print(f"Running UCB All agents vs {opp_name}")
    result = run_ucb_vs_single(research_agents, opp_agent)
    experiment_results.append(['UCB All agents', opp_name, result])

for agent in all_agents:
    agent_name = agent_name_map.get(agent, getattr(agent, 'name', 'Unknown'))
    print(f"Running {agent_name} vs Markov opponents")
    result = run_single_vs_markov(agent, opp_agents)
    experiment_results.append([agent_name, 'Markov opponents', result])


### Get final plot for report

In [None]:
import pandas as pd
import seaborn as sns

df = pd.DataFrame(experiment_results, columns=['Agent', 'Opponent', 'Total Reward'])

heatmap_data = df.pivot(index='Agent', columns='Opponent', values='Total Reward')

plt.figure(figsize=(8, 6), dpi=300)  
sns.heatmap(heatmap_data, annot=True, fmt='.2f', cmap='YlOrRd', cbar_kws={'label': 'Total Reward'}, annot_kws={'size': 8})
plt.xlabel('Opponents', fontsize=10)
plt.ylabel('Agents', fontsize=10)
plt.xticks(rotation=0, ha='right', fontsize=9)  
plt.yticks(fontsize=9)
plt.tight_layout()
plt.show()

print("\nNumerical Results:")
print(heatmap_data)