In [None]:
import mjx
from mjx.agents import RandomAgent, ShantenAgent

from ppo_agent import PPOAgent, GymEnv
from tqdm import tqdm
import numpy as np

import json


In [None]:
RANK_DICT = {
    90 : 1,
    45 : 2,
    0 : 3,
    -135 : 4
}

def score_func(avg_score, avg_rank, std_score, std_rank, rank_distribution, 
          w_score=0.4, w_rank=0.2, w_stability=0.2, w_distribution=0.2):
    """
    rank_distribution: list or array of length 4, e.g. [1st%, 2nd%, 3rd%, 4th%]
    """

    # Normalize avg_score: from [0, 25000] to [0, 1]
    score_norm = avg_score / 25000

    # Normalize avg_rank: from [1, 4] to [1, 0]
    rank_norm = (4 - avg_rank) / 3

    # Stability score: low std is better
    std_score_norm = 1 - (std_score / 10000)
    std_rank_norm = 1 - std_rank  # since max std(rank) is 1 in 4-rank system
    stability = 0.5 * std_score_norm + 0.5 * std_rank_norm

    # Rank distribution quality: encourage 1st, penalize 4th
    dist_score = (
        1.0 * rank_distribution[0] +   # 1st
        0.6 * rank_distribution[1] +   # 2nd
        0.2 * rank_distribution[2] +   # 3rd
        -0.5 * rank_distribution[3]    # 4th (penalty)
    )

    final_score = (
        w_score * score_norm +
        w_rank * rank_norm +
        w_stability * stability +
        w_distribution * dist_score
    ) * 100  # Optional: scale to 0–100

    return final_score

    

def eval(records):
    avg_score = sum([record["score"] for record in records]) / len(records)
    avg_rank = sum([record["rank"] for record in records]) / len(records)
    std_score = (sum([(record["score"] - avg_score) ** 2 for record in records]) / len(records)) ** 0.5
    std_rank = (sum([(record["rank"] - avg_rank) ** 2 for record in records]) / len(records)) ** 0.5
    ranks = np.array([r["rank"] for r in records])
    total_games = len(ranks)

    rank_counts = np.bincount(ranks)[1:5]  
    rank_distribution = rank_counts / total_games

    score = score_func(
        avg_score,
        avg_rank,
        std_score,
        std_rank,
        rank_distribution
    )
    
    print(f"Average score: {avg_score:.2f} ± {std_score:.2f}")
    print(f"Average rank: {avg_rank:.2f} ± {std_rank:.2f}")
    print(f"Rank distribution: {rank_distribution}")
    print(f"Score: {score:.2f}")





## Test Base Model: Base Model vs. Base Model

In [None]:
def test_base_agent(my_agent, num_games=100):
    env = mjx.MjxEnv()
    results = []
    players = [
        my_agent,
        ShantenAgent(),
        ShantenAgent(),
        ShantenAgent(),
    ]

    for _ in tqdm(range(num_games)):
        obs_dict = env.reset()
        while not env.done():
            # actions = {
            #     player_id: my_agent.act(obs_dict[player_id])
            #     for player_id in obs_dict.keys()
            # }
            actions = {
                player_id: players[i].act(obs_dict[player_id])
                for i, player_id in enumerate(obs_dict.keys())
            }

            obs_dict = env.step(actions)

        my_index = obs_dict["player_0"].who()
        score = obs_dict["player_0"].tens()[my_index]
        my_reward = env.rewards()["player_0"]
        my_rank = RANK_DICT[my_reward]

        results.append({"rank": my_rank, "score": score})
        
    return results

### Test Random Agent

In [None]:
random_agent = RandomAgent()
random_agent_results = test_base_agent(random_agent, num_games=100)
# store results in a json file
with open("logs/battle_results/random_agent_results.json", "w") as f:
    json.dump(random_agent_results, f)
eval(random_agent_results)

100%|██████████| 100/100 [00:04<00:00, 22.02it/s]

Average score: 25187.00 ± 2639.76
Average rank: 2.30 ± 1.07
Rank distribution: [0.3  0.27 0.26 0.17]
Score: 66.85





### Test Shanten Agent

In [None]:
shanten_agent = ShantenAgent()
shanten_agent_results = test_base_agent(shanten_agent, num_games=100)
# store results in a json file
with open("logs/battle_results/shanten_agent_results.json", "w") as f:
    json.dump(shanten_agent_results, f)
eval(shanten_agent_results)

100%|██████████| 100/100 [00:08<00:00, 12.20it/s]

Average score: 24367.00 ± 11017.75
Average rank: 2.67 ± 1.12
Rank distribution: [0.22 0.19 0.29 0.3 ]
Score: 50.45





## Test PPO Model: PPO Model vs. Base Model

In [None]:
def test_ppo_agent(pretrained_model, num_games=100):
    my_agent = PPOAgent(
        input_dim=544,
        hidden_dim=128,
        output_dim=181,
        pretrained_model=pretrained_model, # If have a pretrained model, load it
    )
    env = GymEnv(opponent_agents=[RandomAgent(), RandomAgent(), RandomAgent()], info_type="default")

    records = []
    for _ in tqdm(range(num_games)):
        obs, info = env.reset()
        done = False
        while not done:
            action_mask = info["action_mask"]
            action = my_agent.act(obs, action_mask)

            # env.step returns obs, reward, done, info
            obs, reward, done, info = env.step(action)

        my_index = env.curr_obs_dict['player_0'].who()
        my_score = env.curr_obs_dict['player_0'].tens()[my_index]
        my_reward = env.mjx_env.rewards()['player_0']
        my_rank = RANK_DICT[my_reward]


        records.append({
            "score": my_score,
            "rank": my_rank,
        })

    return records

### Test PPO Agent 1
PPO Agent 1 learned from random agent opponents and use the final rank as the reward.


In [None]:
ppo1_model = "pretrained_models/ppo1_model.pt"
ppo_agent1_results = test_ppo_agent(ppo1_model, num_games=100)


  state_dict = torch.load(pretrained_model)


Loaded pretrained model from pretrained_models/ppo1_model.pt


  model.load_state_dict(torch.load(path, map_location=DEVICE)['model_state'])
100%|██████████| 100/100 [34:23<00:00, 20.63s/it]


KeyError: 'score'

In [None]:
# store results in a json file
with open("logs/battle_results/ppo_agent1_results.json", "w") as f:
    json.dump(ppo_agent1_results, f)
eval(ppo_agent1_results)

Average score: 28005.00 ± 3454.40
Average rank: 1.55 ± 0.91


### Test PPO Agent 2 
PPO Agent 2 learned from shanten agent opponents and use the final rank as the reward

In [12]:
ppo2_model = "pretrained_models/ppo2_model.pt"
ppo_agent2_results = test_ppo_agent(ppo2_model, num_games=100)
eval(ppo_agent2_results)

  state_dict = torch.load(pretrained_model)


Loaded pretrained model from pretrained_models/ppo2_model.pt


  model.load_state_dict(torch.load(path, map_location=DEVICE)['model_state'])
100%|██████████| 100/100 [34:30<00:00, 20.70s/it]


Average score: 3302.00
Average rank: 1.50


In [None]:
# store results in a json file
with open("logs/battle_results/ppo_agent2_results.json", "w") as f:
    json.dump(ppo_agent2_results, f)
eval(ppo_agent2_results)

Average score: 28302.00 ± 4475.00
Average rank: 1.50 ± 0.93


### Test PPO Agent 3
PPO Agent 3 learned from shanten agent opponents and use the custom reward

In [None]:
ppo3_model = "pretrained_models/ppo3_model.pt"
ppo_agent3_results = test_ppo_agent(ppo3_model, num_games=100)

Loaded pretrained model from pretrained_models/ppo3_model.pt


100%|██████████| 100/100 [30:27<00:00, 18.28s/it]


Average score: 5968.00
Average rank: 1.10


In [None]:
# store results in a json file
with open("logs/battle_results/ppo_agent3_results.json", "w") as f:
    json.dump(ppo_agent3_results, f)
eval(ppo_agent3_results)

In [None]:
eval(ppo_agent3_results)

Average score: 29943.00 ± 4552.74
Average rank: 1.13 ± 0.44


### Test PPO Agent 4
PPO Agent 4 learned from shanten agent opponents, use the custom reward, with curriculum learning

In [None]:
ppo4_model = "pretrained_models/ppo4_model.pt"
ppo_agent4_results = test_ppo_agent(ppo4_model, num_games=100)


Loaded pretrained model from pretrained_models/ppo4_model.pt


100%|██████████| 100/100 [38:39<00:00, 23.20s/it]

Average score: 54544.00 ± 3902.62
Average rank: 1.27 ± 0.65





In [None]:
# store results in a json file
with open("logs/battle_results/ppo_agent4_results.json", "w") as f:
    json.dump(ppo_agent4_results, f)
eval(ppo_agent4_results)

### Test PPO Agent 5
PPO Agent 5 learned from shanten agen opponents, use the custom reward with discard supervised learning model

In [None]:
ppo5_model = "pretrained_models/ppo5_model.pt"
ppo_agent5_results = test_ppo_agent(ppo5_model, num_games=100)

  state_dict = torch.load(pretrained_model)


Loaded pretrained model from pretrained_models/ppo5_model.pt


  model.load_state_dict(torch.load(path, map_location=DEVICE)['model_state'])
100%|██████████| 100/100 [30:48<00:00, 18.48s/it]


Average score: 54265.00 ± 3428.51
Average rank: 1.31 ± 0.76


In [None]:
# store results in a json file
with open("logs/battle_results/ppo_agent5_results.json", "w") as f:
    json.dump(ppo_agent5_results, f)
eval(ppo_agent5_results)

## Test PPO Agent: Inter-Model Performance Evaluation

In [7]:
def run_game_between_ppo_agents(pretrained_models, num_games=100):
    agents = [
        PPOAgent(
            input_dim=544,
            hidden_dim=128,
            output_dim=181,
            pretrained_model=model,
        )
        for model in pretrained_models
    ]
    env = GymEnv(opponent_agents=agents, info_type="default")
    records = []
    for _ in tqdm(range(num_games)):
        obs, info = env.reset()
        done = False
        while not done:
            action_mask = info["action_mask"]
            action = agents[0].act(obs, action_mask)

            # env.step 只需要 PPO 的动作，内部会处理其他 agent
            obs, reward, done, info = env.step(action)

        my_index = env.curr_obs_dict['player_0'].who()
        my_score = env.curr_obs_dict['player_0'].tens()[my_index]
        my_reward = env.mjx_env.rewards()['player_0']
        my_rank = RANK_DICT[my_reward]


        records.append({
            "score": my_score,
            "rank": my_rank,
        })

    return records

In [None]:
ppo2_model = "pretrained_models/ppo2_model.pt"
ppo3_model = "pretrained_models/ppo3_model.pt"
ppo4_model = "pretrained_models/ppo4_model.pt"
ppo5_model = "pretrained_models/ppo5_model.pt"

game_players = [ppo2_model, ppo3_model, ppo4_model, ppo5_model]
ppo_agent_results = run_game_between_ppo_agents(game_players, num_games=100)

  state_dict = torch.load(pretrained_model)


Loaded pretrained model from pretrained_models/ppo2_model.pt


  state_dict = torch.load(pretrained_model)


Loaded pretrained model from pretrained_models/ppo3_model.pt
Loaded pretrained model from pretrained_models/ppo4_model.pt
Loaded pretrained model from pretrained_models/ppo5_model.pt


  model.load_state_dict(torch.load(path, map_location=DEVICE)['model_state'])
 91%|█████████ | 91/100 [38:11<03:42, 24.77s/it]

In [None]:
# store results in a json file
with open("logs/battle_results/ppo_agent_results.json", "w") as f:
    json.dump(ppo_agent_results, f)
eval(ppo_agent_results)