In [1]:
import numpy as np
import pygame
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque
import sys
import os
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import json
from tqdm import tqdm

# 你原有的代碼保持不變，這裡添加改進和可視化功能

# ---------- 改進的環境類 -----------
class RLGameEnv:
    def __init__(self, render_mode=False):
        pygame.init()
        self.screen = pygame.display.set_mode((640, 480))
        self.clock = pygame.time.Clock()
        self.render_mode = render_mode
        # 假設你的Game類存在，這裡保持原有邏輯
        # self.game = Game(self.screen, self.clock, ai_archetype="rl")
        self.action_space = 5
        self.state_dim = (10,)
        
        # 添加統計信息
        self.episode_stats = {
            'wins': 0,
            'losses': 0,
            'draws': 0,
            'total_episodes': 0
        }

    def reset(self):
        # self.game.setup_initial_state()
        return self.get_state()

    def step(self, action):
        # 原有邏輯保持不變
        # 這裡添加更詳細的獎勵設計
        state = self.get_state()
        reward, done = self.compute_reward_done()
        
        # 添加形狀獎勵（shaping reward）
        reward += self.compute_shaping_reward(action)
        
        return state, reward, done, {}

    def compute_shaping_reward(self, action):
        """計算形狀獎勵，幫助AI學習更好的策略"""
        reward = 0.0
        
        # 獎勵存活時間
        reward += 0.01
        
        # 獎勵靠近敵人（鼓勵攻擊性）
        # p2 = self.game.player2_ai
        # p1 = self.game.player1
        # if p2 and p1:
        #     distance = abs(p2.rect.x - p1.rect.x) + abs(p2.rect.y - p1.rect.y)
        #     reward += max(0, (200 - distance) / 1000)  # 距離越近獎勵越高
        
        # 懲罰無意義的移動
        if action == 0:  # 停留
            reward -= 0.005
            
        return reward

    def get_state(self):
        # 原有邏輯，可以考慮添加更多特徵
        # 比如：炸彈位置、道具位置、牆壁信息等
        return np.zeros(self.state_dim, dtype=np.float32)

    def compute_reward_done(self):
        # 原有邏輯保持不變
        done = False  # self.game.game_state in ["GAME_OVER", "SCORE_SUBMITTED"]
        reward = 0.0
        
        # 更新統計信息
        if done:
            self.episode_stats['total_episodes'] += 1
            # 根據遊戲結果更新統計
            
        return reward, done
    
# ---------- Q Network ----------
class QNetwork(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(QNetwork, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, action_dim)
        )

    def forward(self, x):
        return self.model(x)

# ---------- 改進的DQN Agent ----------
class DQNAgent:
    def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99, epsilon=1.0, epsilon_min=0.1, epsilon_decay=0.995):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.q_net = QNetwork(state_dim, action_dim)
        self.target_net = QNetwork(state_dim, action_dim)
        self.target_net.load_state_dict(self.q_net.state_dict())
        self.optimizer = optim.Adam(self.q_net.parameters(), lr=lr)
        
        # 使用優先經驗回放
        self.memory = deque(maxlen=50000)  # 增大經驗池
        self.batch_size = 64
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.update_target_steps = 200  # 增加更新頻率
        self.step_count = 0
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.q_net.to(self.device)
        self.target_net.to(self.device)
        
        # 添加訓練統計
        self.training_stats = {
            'losses': [],
            'q_values': [],
            'epsilon_history': []
        }
        
    def store(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def select_action(self, state):
        if np.random.rand() < self.epsilon:
            return np.random.randint(self.action_dim)
        
        state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
        with torch.no_grad():
            q_values = self.q_net(state_tensor)
            
        # 記錄Q值用於分析
        self.training_stats['q_values'].append(q_values.max().item())
        
        return q_values.argmax().item()

    def train_step(self):
        if len(self.memory) < self.batch_size:
            return
            
        batch = random.sample(self.memory, self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        states = torch.FloatTensor(states).to(self.device)
        actions = torch.LongTensor(actions).unsqueeze(1).to(self.device)
        rewards = torch.FloatTensor(rewards).unsqueeze(1).to(self.device)
        next_states = torch.FloatTensor(next_states).to(self.device)
        dones = torch.FloatTensor(dones).unsqueeze(1).to(self.device)

        # 使用Double DQN
        q_values = self.q_net(states).gather(1, actions)
        with torch.no_grad():
            next_actions = self.q_net(next_states).argmax(1).unsqueeze(1)
            max_next_q = self.target_net(next_states).gather(1, next_actions)
            target_q = rewards + (1 - dones) * self.gamma * max_next_q

        loss = nn.MSELoss()(q_values, target_q)
        
        # 梯度裁剪
        self.optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(self.q_net.parameters(), 1.0)
        self.optimizer.step()

        # 記錄損失
        self.training_stats['losses'].append(loss.item())
        
        # Epsilon decay
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
            
        self.training_stats['epsilon_history'].append(self.epsilon)

        # 更新 target net
        self.step_count += 1
        if self.step_count % self.update_target_steps == 0:
            self.target_net.load_state_dict(self.q_net.state_dict())

    def save_model(self, filepath):
        """保存模型和訓練統計"""
        torch.save({
            'q_net_state_dict': self.q_net.state_dict(),
            'target_net_state_dict': self.target_net.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'training_stats': self.training_stats,
            'epsilon': self.epsilon,
            'step_count': self.step_count
        }, filepath)

    def load_model(self, filepath):
        """加載模型"""
        checkpoint = torch.load(filepath, map_location=self.device)
        self.q_net.load_state_dict(checkpoint['q_net_state_dict'])
        self.target_net.load_state_dict(checkpoint['target_net_state_dict'])
        self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        self.training_stats = checkpoint['training_stats']
        self.epsilon = checkpoint['epsilon']
        self.step_count = checkpoint['step_count']

# ---------- 可視化和分析工具 ----------
class TrainingVisualizer:
    def __init__(self):
        plt.style.use('seaborn-v0_8')
        self.fig = None
        
    def plot_training_results(self, rewards_log, agent, env, save_path="training_results.png"):
        """繪製訓練結果"""
        fig, axes = plt.subplots(2, 3, figsize=(18, 12))
        fig.suptitle('炸彈人RL訓練結果分析', fontsize=16, fontweight='bold')
        
        # 1. 獎勵曲線
        axes[0, 0].plot(rewards_log, alpha=0.7, linewidth=1)
        if len(rewards_log) > 20:
            # 移動平均
            window = min(50, len(rewards_log) // 10)
            moving_avg = np.convolve(rewards_log, np.ones(window)/window, mode='valid')
            axes[0, 0].plot(range(window-1, len(rewards_log)), moving_avg, 
                           color='red', linewidth=2, label=f'移動平均({window})')
            axes[0, 0].legend()
        axes[0, 0].set_title('每回合獎勵')
        axes[0, 0].set_xlabel('回合數')
        axes[0, 0].set_ylabel('獎勵')
        axes[0, 0].grid(True, alpha=0.3)
        
        # 2. 損失曲線
        if agent.training_stats['losses']:
            losses = agent.training_stats['losses']
            axes[0, 1].plot(losses, alpha=0.7)
            if len(losses) > 100:
                window = len(losses) // 50
                moving_avg = np.convolve(losses, np.ones(window)/window, mode='valid')
                axes[0, 1].plot(range(window-1, len(losses)), moving_avg, 
                               color='red', linewidth=2)
            axes[0, 1].set_title('訓練損失')
            axes[0, 1].set_xlabel('訓練步數')
            axes[0, 1].set_ylabel('MSE Loss')
            axes[0, 1].set_yscale('log')
            axes[0, 1].grid(True, alpha=0.3)
        
        # 3. Epsilon衰減
        if agent.training_stats['epsilon_history']:
            axes[0, 2].plot(agent.training_stats['epsilon_history'])
            axes[0, 2].set_title('探索率(Epsilon)衰減')
            axes[0, 2].set_xlabel('訓練步數')
            axes[0, 2].set_ylabel('Epsilon')
            axes[0, 2].grid(True, alpha=0.3)
        
        # 4. Q值統計
        if agent.training_stats['q_values']:
            q_values = agent.training_stats['q_values']
            axes[1, 0].plot(q_values, alpha=0.7)
            if len(q_values) > 100:
                window = len(q_values) // 50
                moving_avg = np.convolve(q_values, np.ones(window)/window, mode='valid')
                axes[1, 0].plot(range(window-1, len(q_values)), moving_avg, 
                               color='red', linewidth=2)
            axes[1, 0].set_title('最大Q值變化')
            axes[1, 0].set_xlabel('動作選擇次數')
            axes[1, 0].set_ylabel('最大Q值')
            axes[1, 0].grid(True, alpha=0.3)
        
        # 5. 獎勵分布
        axes[1, 1].hist(rewards_log, bins=30, alpha=0.7, edgecolor='black')
        axes[1, 1].axvline(np.mean(rewards_log), color='red', linestyle='--', 
                          label=f'平均: {np.mean(rewards_log):.2f}')
        axes[1, 1].axvline(np.median(rewards_log), color='green', linestyle='--', 
                          label=f'中位數: {np.median(rewards_log):.2f}')
        axes[1, 1].set_title('獎勵分布')
        axes[1, 1].set_xlabel('獎勵值')
        axes[1, 1].set_ylabel('頻率')
        axes[1, 1].legend()
        axes[1, 1].grid(True, alpha=0.3)
        
        # 6. 勝率統計（如果有環境統計）
        if hasattr(env, 'episode_stats') and env.episode_stats['total_episodes'] > 0:
            stats = env.episode_stats
            labels = ['勝利', '失敗', '平局']
            sizes = [stats['wins'], stats['losses'], stats['draws']]
            colors = ['gold', 'lightcoral', 'lightblue']
            
            # 只顯示非零的項目
            non_zero_data = [(label, size, color) for label, size, color in zip(labels, sizes, colors) if size > 0]
            if non_zero_data:
                labels, sizes, colors = zip(*non_zero_data)
                axes[1, 2].pie(sizes, labels=labels, colors=colors, autopct='%1.1f%%', startangle=90)
                axes[1, 2].set_title('遊戲結果分布')
        else:
            axes[1, 2].text(0.5, 0.5, '暫無統計數據', ha='center', va='center', fontsize=14)
            axes[1, 2].set_title('遊戲結果分布')
        
        plt.tight_layout()
        plt.savefig(save_path, dpi=300, bbox_inches='tight')
        plt.show()
        
    def plot_action_distribution(self, action_history, save_path="action_distribution.png"):
        """分析動作選擇分布"""
        plt.figure(figsize=(10, 6))
        action_names = ['停留', '向上', '向下', '向左', '向右']
        action_counts = np.bincount(action_history, minlength=5)
        
        bars = plt.bar(action_names, action_counts, color=['gray', 'blue', 'green', 'orange', 'red'])
        plt.title('動作選擇分布', fontsize=14, fontweight='bold')
        plt.xlabel('動作類型')
        plt.ylabel('選擇次數')
        
        # 添加數值標籤
        for bar, count in zip(bars, action_counts):
            plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + max(action_counts)*0.01,
                    str(count), ha='center', va='bottom')
        
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.savefig(save_path, dpi=300, bbox_inches='tight')
        plt.show()

# ---------- 改進的訓練函數 ----------
def train(env, num_episodes=1000, save_interval=100, visualize=True):
    """改進的訓練函數"""
    agent = DQNAgent(state_dim=env.state_dim[0], action_dim=env.action_space)
    visualizer = TrainingVisualizer() if visualize else None
    
    rewards_log = []
    action_history = []
    best_avg_reward = float('-inf')
    
    print(f"開始訓練 - 總回合數: {num_episodes}")
    print(f"設備: {agent.device}")
    print("-" * 50)
    
    for episode in tqdm(range(num_episodes), desc="Training", ncols=80, dynamic_ncols=False):
        state = env.reset()
        total_reward = 0
        episode_actions = []
        done = False
        steps = 0
        max_steps = 1000  # 防止無限循環

        while not done and steps < max_steps:
            action = agent.select_action(state)
            next_state, reward, done, _ = env.step(action)
            agent.store(state, action, reward, next_state, done)
            agent.train_step()
            
            state = next_state
            total_reward += reward
            episode_actions.append(action)
            steps += 1
            
            if env.render_mode:
                env.render()

        rewards_log.append(total_reward)
        action_history.extend(episode_actions)
        
        # 計算最近100回合的平均獎勵
        recent_avg = np.mean(rewards_log[-100:]) if len(rewards_log) >= 100 else np.mean(rewards_log)
        
        # 每10回合打印一次進度
        if (episode + 1) % 10 == 0:
            print(f"Episode {episode+1:4d}/{num_episodes} | "
                  f"Reward: {total_reward:6.2f} | "
                  f"Avg(100): {recent_avg:6.2f} | "
                  f"Epsilon: {agent.epsilon:.3f} | "
                  f"Steps: {steps:3d}")

        # 保存最佳模型
        if recent_avg > best_avg_reward:
            best_avg_reward = recent_avg
            agent.save_model("best_model.pt")
            
        # 定期保存和可視化
        if (episode + 1) % save_interval == 0:
            # 保存檢查點
            agent.save_model(f"checkpoint_episode_{episode+1}.pt")
            
            # 生成可視化
            if visualizer:
                print(f"\n生成訓練進度可視化...")
                visualizer.plot_training_results(rewards_log, agent, env, 
                                                f"training_progress_ep{episode+1}.png")
                
            print(f"已保存檢查點和可視化結果")
            print("-" * 50)

    # 訓練完成後的最終分析
    print(f"\n訓練完成！")
    print(f"最佳平均獎勵: {best_avg_reward:.2f}")
    print(f"最終平均獎勵: {np.mean(rewards_log[-100:]):.2f}")
    
    if visualizer:
        print("生成最終分析報告...")
        visualizer.plot_training_results(rewards_log, agent, env, "final_training_results.png")
        visualizer.plot_action_distribution(action_history, "action_distribution.png")
    
    # 保存訓練日誌
    training_log = {
        'rewards': rewards_log,
        'training_stats': agent.training_stats,
        'env_stats': env.episode_stats if hasattr(env, 'episode_stats') else {},
        'hyperparameters': {
            'learning_rate': 1e-3,
            'gamma': 0.99,
            'epsilon_decay': 0.995,
            'batch_size': agent.batch_size,
            'memory_size': len(agent.memory)
        },
        'timestamp': datetime.now().isoformat()
    }
    
    with open('training_log.json', 'w') as f:
        json.dump(training_log, f, indent=2)
    
    return rewards_log, agent

# ---------- 模型評估函數 ----------
def evaluate_model(env, model_path, num_episodes=100):
    """評估訓練好的模型"""
    agent = DQNAgent(state_dim=env.state_dim[0], action_dim=env.action_space)
    agent.load_model(model_path)
    agent.epsilon = 0  # 評估時不使用隨機動作
    
    rewards = []
    
    for episode in range(num_episodes):
        state = env.reset()
        total_reward = 0
        done = False
        
        while not done:
            action = agent.select_action(state)
            state, reward, done, _ = env.step(action)
            total_reward += reward
            
            if env.render_mode:
                env.render()
                
        rewards.append(total_reward)
        
    print(f"評估結果 (共{num_episodes}回合):")
    print(f"平均獎勵: {np.mean(rewards):.2f} ± {np.std(rewards):.2f}")
    print(f"最佳獎勵: {np.max(rewards):.2f}")
    print(f"最差獎勵: {np.min(rewards):.2f}")
    
    return rewards

# ---------- 主程序 ----------
if __name__ == "__main__":
    # 創建環境
    env = RLGameEnv(render_mode=False)  # 訓練時建議關閉渲染加速
    
    # 開始訓練
    rewards_log, trained_agent = train(env, num_episodes=500, visualize=True)
    
    # 評估最佳模型
    print("\n評估最佳模型:")
    eval_env = RLGameEnv(render_mode=True)  # 評估時可以開啟渲染觀看
    evaluate_model(eval_env, "best_model.pt", num_episodes=10)

pygame 2.6.1 (SDL 2.28.4, Python 3.8.19)
Hello from the pygame community. https://www.pygame.org/contribute.html
開始訓練 - 總回合數: 500
設備: cuda
--------------------------------------------------


  states = torch.FloatTensor(states).to(self.device)
Training:   2%|▋                               | 10/500 [00:29<24:46,  3.03s/it]

Episode   10/500 | Reward:   9.89 | Avg(100):   9.88 | Epsilon: 0.100 | Steps: 1000


Training:   3%|▉                               | 14/500 [00:42<24:32,  3.03s/it]

: 

In [None]:
# import numpy as np
# import pygame
# import torch
# import torch.nn as nn
# import torch.optim as optim
# import random
# from collections import deque
# import sys
# import os

# # ---------- 改進的環境類 - 包含炸彈機制 -----------
# class RLGameEnv:
#     def __init__(self, render_mode=False):
#         pygame.init()
#         self.screen = pygame.display.set_mode((640, 480))
#         self.clock = pygame.time.Clock()
#         self.render_mode = render_mode
#         # self.game = Game(self.screen, self.clock, ai_archetype="rl")
        
#         # 擴展動作空間 - 增加放炸彈
#         self.action_space = 6  # 0:停留, 1:上, 2:下, 3:左, 4:右, 5:放炸彈
        
#         # 擴展狀態空間 - 包含更多炸彈人相關信息
#         self.state_dim = (20,)  # 增加到20維狀態
        
#         # 遊戲統計
#         self.episode_stats = {
#             'wins': 0,
#             'losses': 0,
#             'draws': 0,
#             'bombs_placed': 0,
#             'enemies_killed_by_bomb': 0,
#             'self_killed_by_bomb': 0,
#             'total_episodes': 0
#         }
        
#         # 上一步的信息（用於計算獎勵）
#         self.prev_enemy_lives = None
#         self.prev_ai_lives = None
#         self.prev_distance_to_enemy = None

#     def reset(self):
#         # self.game.setup_initial_state()
#         self.prev_enemy_lives = None
#         self.prev_ai_lives = None
#         self.prev_distance_to_enemy = None
#         return self.get_state()

#     def step(self, action):
#         # 記錄動作前的狀態
#         prev_state = self.get_state()
        
#         # 應用動作到AI玩家
#         if self.game.player2_ai and self.game.player2_ai.is_alive:
#             self.apply_action(action)
        
#         # 更新遊戲狀態
#         self.game.update()
        
#         # 獲取新狀態
#         new_state = self.get_state()
        
#         # 計算獎勵和結束條件
#         reward, done = self.compute_reward_done(action, prev_state)
        
#         return new_state, reward, done, {}

#     def apply_action(self, action):
#         """應用動作到AI玩家"""
#         ai = self.game.player2_ai
#         if not ai:
#             return
            
#         if action == 0:
#             pass  # 停留
#         elif action == 1:
#             ai.move(0, -1)  # 向上
#         elif action == 2:
#             ai.move(0, 1)   # 向下
#         elif action == 3:
#             ai.move(-1, 0)  # 向左
#         elif action == 4:
#             ai.move(1, 0)   # 向右
#         elif action == 5:
#             # 放置炸彈 - 模擬按下F鍵
#             self.place_bomb(ai)
#             self.episode_stats['bombs_placed'] += 1

#     def place_bomb(self, player):
#         """
#         放置炸彈的邏輯 - 你需要根據你的遊戲實現來調整
#         這裡假設你的Game類有相應的方法
#         """
#         # 假設你的遊戲有這樣的方法：
#         # self.game.place_bomb(player)
        
#         # 或者直接觸發按鍵事件：
#         # pygame.event.post(pygame.event.Event(pygame.KEYDOWN, key=pygame.K_f))
        
#         # 這裡你需要根據你的實際遊戲代碼來實現
#         pass

#     def get_state(self):
#         """
#         獲取更詳細的遊戲狀態 - 專為炸彈人遊戲設計
#         """
#         # 如果遊戲未初始化，返回零狀態
#         if not hasattr(self, 'game') or not self.game:
#             return np.zeros(self.state_dim, dtype=np.float32)
            
#         p2 = self.game.player2_ai  # AI玩家
#         p1 = self.game.player1     # 人類玩家
        
#         if p2 is None or p1 is None:
#             return np.zeros(self.state_dim, dtype=np.float32)
        
#         # 基本位置和狀態信息
#         state_list = [
#             # AI玩家信息
#             p2.rect.x / 640.0,           # AI位置X
#             p2.rect.y / 480.0,           # AI位置Y
#             p2.lives / 3.0,              # AI生命值（假設最大3條命）
#             p2.score / 1000.0,           # AI分數
#             int(p2.is_alive),            # AI是否存活
            
#             # 敵人信息
#             p1.rect.x / 640.0,           # 敵人位置X
#             p1.rect.y / 480.0,           # 敵人位置Y
#             p1.lives / 3.0,              # 敵人生命值
#             p1.score / 1000.0,           # 敵人分數
#             int(p1.is_alive),            # 敵人是否存活
            
#             # 遊戲全局信息
#             self.game.time_elapsed_seconds / 120.0,  # 遊戲時間（假設2分鐘）
#         ]
        
#         # 距離信息
#         distance_to_enemy = abs(p2.rect.x - p1.rect.x) + abs(p2.rect.y - p1.rect.y)
#         state_list.append(distance_to_enemy / 1000.0)  # 正規化距離
        
#         # 炸彈相關狀態（如果你的遊戲有炸彈列表）
#         # 這裡需要根據你的遊戲實現來調整
#         bomb_states = self.get_bomb_states()
#         state_list.extend(bomb_states)
        
#         # 地圖信息（周圍的牆壁、道具等）
#         surrounding_info = self.get_surrounding_info(p2)
#         state_list.extend(surrounding_info)
        
#         # 確保狀態維度正確
#         state_array = np.array(state_list[:self.state_dim[0]], dtype=np.float32)
        
#         # 如果不足維度，用0填充
#         if len(state_array) < self.state_dim[0]:
#             padding = np.zeros(self.state_dim[0] - len(state_array), dtype=np.float32)
#             state_array = np.concatenate([state_array, padding])
            
#         return state_array

#     def get_bomb_states(self):
#         """
#         獲取炸彈相關狀態信息
#         你需要根據你的遊戲實現來調整
#         """
#         bomb_info = []
        
#         # 假設你的遊戲有炸彈列表
#         # if hasattr(self.game, 'bombs'):
#         #     # 最近炸彈的距離和剩餘時間
#         #     ai_pos = (self.game.player2_ai.rect.x, self.game.player2_ai.rect.y)
#         #     nearest_bomb_dist = float('inf')
#         #     nearest_bomb_time = 0
#         #     
#         #     for bomb in self.game.bombs:
#         #         dist = abs(bomb.x - ai_pos[0]) + abs(bomb.y - ai_pos[1])
#         #         if dist < nearest_bomb_dist:
#         #             nearest_bomb_dist = dist
#         #             nearest_bomb_time = bomb.timer
#         #     
#         #     bomb_info = [
#         #         min(nearest_bomb_dist / 200.0, 1.0),  # 最近炸彈距離
#         #         nearest_bomb_time / 100.0,            # 最近炸彈剩餘時間
#         #         len(self.game.bombs) / 10.0           # 場上炸彈數量
#         #     ]
#         # else:
#         bomb_info = [0.0, 0.0, 0.0]  # 默認值
        
#         return bomb_info

#     def get_surrounding_info(self, player):
#         """
#         獲取玩家周圍的環境信息
#         """
#         surrounding = []
        
#         # 檢查周圍8個方向是否有障礙物
#         directions = [(-1,-1), (-1,0), (-1,1), (0,-1), (0,1), (1,-1), (1,0), (1,1)]
        
#         for dx, dy in directions:
#             # 這裡需要根據你的地圖實現來檢查
#             # is_blocked = self.game.is_position_blocked(player.rect.x + dx*32, player.rect.y + dy*32)
#             # surrounding.append(float(is_blocked))
#             surrounding.append(0.0)  # 暫時用0填充
            
#         return surrounding[:5]  # 只取前5個方向

#     def compute_reward_done(self, action, prev_state):
#         """
#         計算獎勵和判斷是否結束 - 專為炸彈人遊戲設計的獎勵函數
#         """
#         done = self.game.game_state in ["GAME_OVER", "SCORE_SUBMITTED"]
#         reward = 0.0
        
#         p2 = self.game.player2_ai
#         p1 = self.game.player1
        
#         if p2 is None or p1 is None:
#             return 0.0, True
        
#         # === 1. 遊戲結果獎勵（最重要） ===
#         if done:
#             self.episode_stats['total_episodes'] += 1
#             if self.game.time_up_winner == "AI":
#                 reward += 100.0  # 大獎勵：獲勝
#                 self.episode_stats['wins'] += 1
#             elif self.game.time_up_winner == "P1":
#                 reward -= 100.0  # 大懲罰：失敗
#                 self.episode_stats['losses'] += 1
#             elif self.game.time_up_winner == "DRAW":
#                 reward += 10.0   # 小獎勵：平局
#                 self.episode_stats['draws'] += 1
        
#         # === 2. 生命值變化獎勵 ===
#         current_ai_lives = p2.lives
#         current_enemy_lives = p1.lives
        
#         if self.prev_ai_lives is not None:
#             # AI失去生命 - 大懲罰
#             if current_ai_lives < self.prev_ai_lives:
#                 reward -= 50.0
#                 # 如果是被自己炸彈炸死，額外懲罰
#                 if action == 5:  # 剛才放了炸彈
#                     reward -= 20.0
#                     self.episode_stats['self_killed_by_bomb'] += 1
            
#             # 敵人失去生命 - 大獎勵
#             if current_enemy_lives < self.prev_enemy_lives:
#                 reward += 50.0
#                 self.episode_stats['enemies_killed_by_bomb'] += 1
        
#         # === 3. 距離相關獎勵 ===
#         current_distance = abs(p2.rect.x - p1.rect.x) + abs(p2.rect.y - p1.rect.y)
        
#         if self.prev_distance_to_enemy is not None:
#             # 靠近敵人獎勵（鼓勵攻擊性）
#             if current_distance < self.prev_distance_to_enemy:
#                 reward += 0.5
#             # 遠離敵人小懲罰
#             elif current_distance > self.prev_distance_to_enemy:
#                 reward -= 0.2
        
#         # === 4. 動作相關獎勵 ===
#         # 鼓勵積極行動
#         if action == 0:  # 停留
#             reward -= 0.1
#         elif action == 5:  # 放炸彈
#             reward += 1.0  # 鼓勵使用炸彈
            
#             # 如果在敵人附近放炸彈，額外獎勵
#             if current_distance < 100:  # 假設100像素內算"附近"
#                 reward += 2.0
            
#             # 如果太靠近自己放炸彈，小懲罰（危險）
#             if current_distance < 50:
#                 reward -= 1.0
        
#         # === 5. 存活時間獎勵 ===
#         if p2.is_alive:
#             reward += 0.01  # 每個時間步的小獎勵
        
#         # === 6. 分數變化獎勵 ===
#         # 如果你的遊戲有分數系統
#         # score_diff = p2.score - p1.score
#         # reward += score_diff * 0.01
        
#         # === 7. 安全性考慮 ===
#         # 如果AI在炸彈爆炸範圍內，給予懲罰
#         danger_penalty = self.calculate_danger_penalty(p2)
#         reward += danger_penalty
        
#         # 更新上一步信息
#         self.prev_ai_lives = current_ai_lives
#         self.prev_enemy_lives = current_enemy_lives
#         self.prev_distance_to_enemy = current_distance
        
#         return reward, done

#     def calculate_danger_penalty(self, player):
#         """
#         計算危險懲罰 - 如果AI在炸彈爆炸範圍內
#         """
#         penalty = 0.0
        
#         # 這裡需要根據你的炸彈爆炸機制來實現
#         # if hasattr(self.game, 'bombs'):
#         #     for bomb in self.game.bombs:
#         #         # 檢查是否在爆炸範圍內
#         #         if self.is_in_blast_range(player, bomb):
#         #             # 根據炸彈剩餘時間給予不同程度的懲罰
#         #             time_factor = max(0, 1 - bomb.timer / 100.0)
#         #             penalty -= 5.0 * time_factor
        
#         return penalty

#     def is_in_blast_range(self, player, bomb):
#         """
#         檢查玩家是否在炸彈爆炸範圍內
#         """
#         # 這裡需要根據你的遊戲爆炸機制來實現
#         # 假設爆炸範圍是十字形，每個方向3格
#         blast_range = 3 * 32  # 假設每格32像素
        
#         # 檢查是否在同一行或同一列，且在爆炸範圍內
#         same_row = abs(player.rect.y - bomb.y) < 16  # 允許小誤差
#         same_col = abs(player.rect.x - bomb.x) < 16
        
#         if same_row:
#             return abs(player.rect.x - bomb.x) <= blast_range
#         elif same_col:
#             return abs(player.rect.y - bomb.y) <= blast_range
        
#         return False

# # ---------- 改進的Q網絡 - 適應炸彈人遊戲 ----------
# class BombermanQNetwork(nn.Module):
#     def __init__(self, state_dim, action_dim):
#         super(BombermanQNetwork, self).__init__()
        
#         # 更深的網絡結構，適合複雜的炸彈人策略
#         self.feature_layers = nn.Sequential(
#             nn.Linear(state_dim, 256),
#             nn.ReLU(),
#             nn.Dropout(0.2),
#             nn.Linear(256, 256),
#             nn.ReLU(),
#             nn.Dropout(0.2),
#             nn.Linear(256, 128),
#             nn.ReLU()
#         )
        
#         # 分別處理移動和攻擊動作
#         self.movement_head = nn.Linear(128, 5)  # 0-4: 停留和移動
#         self.bomb_head = nn.Linear(128, 1)      # 5: 放炸彈
        
#     def forward(self, x):
#         features = self.feature_layers(x)
#         movement_q = self.movement_head(features)
#         bomb_q = self.bomb_head(features)
        
#         # 合併輸出
#         q_values = torch.cat([movement_q, bomb_q], dim=1)
#         return q_values

# # ---------- 專用的炸彈人DQN智能體 ----------
# class BombermanDQNAgent:
#     def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99, epsilon=1.0, 
#                  epsilon_min=0.1, epsilon_decay=0.995):
#         self.state_dim = state_dim
#         self.action_dim = action_dim
        
#         # 使用專門的炸彈人網絡
#         self.q_net = BombermanQNetwork(state_dim, action_dim)
#         self.target_net = BombermanQNetwork(state_dim, action_dim)
#         self.target_net.load_state_dict(self.q_net.state_dict())
        
#         # 使用較小的學習率，因為炸彈人策略比較複雜
#         self.optimizer = optim.Adam(self.q_net.parameters(), lr=lr, weight_decay=1e-4)
        
#         # 更大的經驗回放池
#         self.memory = deque(maxlen=100000)
#         self.batch_size = 128  # 增大批次大小
#         self.gamma = gamma
#         self.epsilon = epsilon
#         self.epsilon_min = epsilon_min
#         self.epsilon_decay = epsilon_decay
#         self.update_target_steps = 500  # 更頻繁的目標網絡更新
#         self.step_count = 0
        
#         self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#         self.q_net.to(self.device)
#         self.target_net.to(self.device)
        
#         # 動作統計
#         self.action_stats = {i: 0 for i in range(action_dim)}
        
#         print(f"炸彈人DQN智能體初始化完成")
#         print(f"狀態維度: {state_dim}, 動作維度: {action_dim}")
#         print(f"設備: {self.device}")

#     def select_action(self, state, training=True):
#         """選擇動作，在訓練時使用epsilon-greedy策略"""
#         if training and np.random.rand() < self.epsilon:
#             # 隨機動作，但偏向於移動而不是放炸彈
#             if np.random.rand() < 0.8:
#                 action = np.random.randint(0, 5)  # 移動動作
#             else:
#                 action = 5  # 放炸彈
#         else:
#             state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
#             with torch.no_grad():
#                 q_values = self.q_net(state_tensor)
#             action = q_values.argmax().item()
        
#         # 記錄動作統計
#         self.action_stats[action] += 1
#         return action

#     def store(self, state, action, reward, next_state, done):
#         self.memory.append((state, action, reward, next_state, done))

#     def train_step(self):
#         if len(self.memory) < self.batch_size:
#             return None
            
#         batch = random.sample(self.memory, self.batch_size)
#         states, actions, rewards, next_states, dones = zip(*batch)

#         states = torch.FloatTensor(states).to(self.device)
#         actions = torch.LongTensor(actions).unsqueeze(1).to(self.device)
#         rewards = torch.FloatTensor(rewards).unsqueeze(1).to(self.device)
#         next_states = torch.FloatTensor(next_states).to(self.device)
#         dones = torch.FloatTensor(dones).unsqueeze(1).to(self.device)

#         # Double DQN
#         q_values = self.q_net(states).gather(1, actions)
#         with torch.no_grad():
#             next_actions = self.q_net(next_states).argmax(1).unsqueeze(1)
#             max_next_q = self.target_net(next_states).gather(1, next_actions)
#             target_q = rewards + (1 - dones) * self.gamma * max_next_q

#         # Huber Loss（對異常值更魯棒）
#         loss = nn.SmoothL1Loss()(q_values, target_q)
        
#         self.optimizer.zero_grad()
#         loss.backward()
        
#         # 梯度裁剪
#         torch.nn.utils.clip_grad_norm_(self.q_net.parameters(), 1.0)
#         self.optimizer.step()

#         # Epsilon衰減
#         if self.epsilon > self.epsilon_min:
#             self.epsilon *= self.epsilon_decay

#         # 更新目標網絡
#         self.step_count += 1
#         if self.step_count % self.update_target_steps == 0:
#             self.target_net.load_state_dict(self.q_net.state_dict())
            
#         return loss.item()

#     def get_action_distribution(self):
#         """獲取動作分布統計"""
#         total = sum(self.action_stats.values())
#         if total == 0:
#             return self.action_stats
        
#         return {action: count/total for action, count in self.action_stats.items()}

# # ---------- 主訓練函數 ----------
# def train_bomberman_ai(env, num_episodes=1000, save_interval=100, visualize=False):
#     visualizer = TrainingVisualizer() if visualize else None
#     """訓練炸彈人AI"""
#     agent = BombermanDQNAgent(
#         state_dim=env.state_dim[0], 
#         action_dim=env.action_space,
#         lr=5e-4,  # 較小的學習率
#         gamma=0.95,  # 稍微降低折扣因子
#         epsilon_decay=0.9995  # 較慢的探索衰減
#     )
    
#     rewards_log = []
#     loss_log = []
#     episode_lengths = []
    
#     print(f"開始訓練炸彈人AI - 總回合數: {num_episodes}")
#     print(f"動作空間: {['停留', '上', '下', '左', '右', '放炸彈']}")
#     print("-" * 60)
    
#     for episode in range(num_episodes):
#         state = env.reset()
#         total_reward = 0
#         episode_loss = []
#         steps = 0
#         max_steps = 2000  # 炸彈人遊戲可能較長
#         done = False

#         while not done and steps < max_steps:
#             action = agent.select_action(state, training=True)
#             next_state, reward, done, _ = env.step(action)
            
#             agent.store(state, action, reward, next_state, done)
#             loss = agent.train_step()
#             if loss is not None:
#                 episode_loss.append(loss)
            
#             state = next_state
#             total_reward += reward
#             steps += 1
            
#             if env.render_mode and episode % 50 == 0:  # 每50回合渲染一次
#                 env.render()

#         rewards_log.append(total_reward)
#         episode_lengths.append(steps)
#         if episode_loss:
#             loss_log.append(np.mean(episode_loss))
        
#         # 計算統計信息
#         recent_avg_reward = np.mean(rewards_log[-100:]) if len(rewards_log) >= 100 else np.mean(rewards_log)
#         recent_avg_length = np.mean(episode_lengths[-100:]) if len(episode_lengths) >= 100 else np.mean(episode_lengths)
        
#         # 每10回合報告一次
#         if (episode + 1) % 10 == 0:
#             action_dist = agent.get_action_distribution()
#             bomb_rate = action_dist.get(5, 0) * 100  # 放炸彈的比例
            
#             print(f"Episode {episode+1:4d}/{num_episodes} | "
#                   f"Reward: {total_reward:7.2f} | "
#                   f"Avg(100): {recent_avg_reward:6.2f} | "
#                   f"Steps: {steps:3d} | "
#                   f"Epsilon: {agent.epsilon:.3f} | "
#                   f"Bomb%: {bomb_rate:4.1f}")
        
#         # 每100回合詳細報告
#         if (episode + 1) % 100 == 0:
#             print(f"\n=== 第 {episode+1} 回合統計 ===")
#             print(f"環境統計: {env.episode_stats}")
#             print(f"動作分布: {['停留', '上', '下', '左', '右', '放炸彈']}")
#             action_dist = agent.get_action_distribution()
#             for i, action_name in enumerate(['停留', '上', '下', '左', '右', '放炸彈']):
#                 print(f"  {action_name}: {action_dist.get(i, 0)*100:.1f}%")
#             print("-" * 60)
        
#         # 保存檢查點
#         if (episode + 1) % save_interval == 0:
#             torch.save({
#                 'q_net_state_dict': agent.q_net.state_dict(),
#                 'target_net_state_dict': agent.target_net.state_dict(),
#                 'optimizer_state_dict': agent.optimizer.state_dict(),
#                 'episode': episode,
#                 'rewards_log': rewards_log,
#                 'env_stats': env.episode_stats,
#                 'action_stats': agent.action_stats
#             }, f"bomberman_checkpoint_ep{episode+1}.pt")

#     print(f"\n訓練完成！")
#     print(f"最終統計: {env.episode_stats}")
    
#     return rewards_log, agent

# # 使用示例
# if __name__ == "__main__":
#     env = RLGameEnv(render_mode=False)  # 訓練時建議關閉渲染加速
    
#     # 開始訓練
#     rewards_log, trained_agent = train_bomberman_ai(env, num_episodes=500, visualize=True)
    
#     # 評估最佳模型
#     print("\n評估最佳模型:")
#     eval_env = RLGameEnv(render_mode=True)  # 評估時可以開啟渲染觀看
#     evaluate_model(eval_env, "best_model.pt", num_episodes=10)

pygame 2.6.1 (SDL 2.28.4, Python 3.8.19)
Hello from the pygame community. https://www.pygame.org/contribute.html


TypeError: train_bomberman_ai() got an unexpected keyword argument 'visualize'