Train

In [1]:
import gym
from gym import spaces
import numpy as np
import random
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import os

save_dir = 'models'
if not os.path.exists(save_dir):
    os.makedirs(save_dir)

# 使用 GPU 如果可用
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"device:{device}\n")

class IntersectionEnv(gym.Env):
    def __init__(self):
        super(IntersectionEnv, self).__init__()
        
        # 定義觀察空間 (4 張 84x84 圖像)
        self.observation_space = spaces.Box(low=0, high=255, shape=(4, 84, 84), dtype=np.uint8)
        
        # 定義動作空間 (5 種動作：前進、左轉、右轉、剎車、等待)
        self.action_space = spaces.Discrete(5)
        
        # 初始化環境狀態
        self.reset()

    def reset(self):
        # 重置狀態
        self.state = np.zeros((4, 84, 84), dtype=np.uint8)  # 模擬環境圖像
        self.done = False
        self.steps = 0
        self.collisions = 0  # 新增碰撞計數器
        return self.state

    def step(self, action):
        # 更新狀態邏輯 (優化版本)
        self.steps += 1
        reward = -0.1
        
        # 動作與獎勵優化
        if action == 0:  # 前進
            reward = 1
        elif action in [1, 2]:  # 左轉或右轉
            reward = 2
        elif action == 3:  # 剎車
            reward = 0.5
        elif action == 4:  # 等待
            reward = -0.1

        # 增加持續存活獎勵
        reward += 0.1 * self.steps

        # 模擬碰撞或完成檢查 (優化)
        collision_chance = 0.05 if action == 0 else 0.1  # 動態調整碰撞概率
        if random.random() < collision_chance:
            reward = -10
            self.done = True
            self.collisions += 1
        elif self.steps > 50:
            self.done = True

        # 模擬下一張圖像變化
        self.state = np.random.randint(0, 255, (4, 84, 84), dtype=np.uint8)
        
        return self.state, reward, self.done, {}

    def render(self, mode='human'):
        pass

    def close(self):
        pass

# 定義 R2D2 模型
class R2D2(nn.Module):
    def __init__(self, input_shape, n_actions):
        super(R2D2, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(4, 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU()
        )
        self.lstm = nn.LSTM(input_size=64 * 7 * 7, hidden_size=512, batch_first=True)
        self.fc = nn.Linear(512, n_actions)

    def forward(self, x, hidden):
        x = x / 255.0
        x = self.conv(x)
        x = x.view(x.size(0), -1)
        x, hidden = self.lstm(x.unsqueeze(0), hidden)
        return self.fc(x.squeeze(0)), hidden

# 訓練參數優化
BATCH_SIZE = 32
GAMMA = 0.99
EPSILON = 0.1
TARGET_UPDATE = 10
MEMORY_SIZE = 10000
LEARNING_RATE = 1e-4
TAU = 0.005

class PrioritizedReplayBuffer:
    def __init__(self, capacity, alpha=0.6):
        self.buffer = []
        self.capacity = capacity
        self.alpha = alpha
        self.position = 0
        self.priorities = np.zeros((capacity,), dtype=np.float32)

    def push(self, state, action, reward, next_state, done):
        max_prio = self.priorities.max() if len(self.buffer) > 0 else 1.0
        if len(self.buffer) < self.capacity:
            self.buffer.append((state.cpu().numpy(), action, reward, next_state.cpu().numpy(), done))
        else:
            self.buffer[self.position] = (state.cpu().numpy(), action, reward, next_state.cpu().numpy(), done)
        self.priorities[self.position] = max_prio
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size, beta=0.4):
        if len(self.buffer) == self.capacity:
            probs = self.priorities ** self.alpha
            probs /= probs.sum()
        else:
            probs = np.ones(len(self.buffer)) / len(self.buffer)

        indices = np.random.choice(len(self.buffer), batch_size, p=probs)
        samples = [self.buffer[idx] for idx in indices]

        weights = (len(self.buffer) * probs[indices]) ** (-beta)
        weights /= weights.max()
        weights = torch.tensor(weights, dtype=torch.float32).to(device)

        batch = list(zip(*samples))
        states = torch.tensor(np.array(batch[0]), dtype=torch.float32).to(device)
        actions = torch.tensor(batch[1]).to(device)
        rewards = torch.tensor(batch[2], dtype=torch.float32).to(device)
        next_states = torch.tensor(np.array(batch[3]), dtype=torch.float32).to(device)
        dones = torch.tensor(batch[4], dtype=torch.float32).to(device)

        return states, actions, rewards, next_states, dones, weights, indices

    def update_priorities(self, batch_indices, priorities):
        self.priorities[batch_indices] = priorities

    def __len__(self):
        return len(self.buffer)

# 環境與模型初始化
env = IntersectionEnv()
n_actions = env.action_space.n
input_shape = (4, 84, 84)
policy_net = R2D2(input_shape, n_actions).to(device)
target_net = R2D2(input_shape, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=LEARNING_RATE)
memory = PrioritizedReplayBuffer(MEMORY_SIZE)

best_reward = float('-inf')

# 訓練過程
for episode in range(1000):
    state = env.reset()
    state = torch.tensor(state, dtype=torch.float32).to(device)
    done = False
    total_reward = 0
    epsilon = max(0.01, EPSILON * (0.995 ** episode))
    hidden = (torch.zeros(1, 1, 512).to(device), torch.zeros(1, 1, 512).to(device))
    while not done:
        if random.random() < epsilon:
            action = env.action_space.sample()
        else:
            with torch.no_grad():
                state_tensor = state.unsqueeze(0).to(device)
                q_values, hidden = policy_net(state_tensor, hidden)
                action = q_values.max(1)[1].item()

        next_state, reward, done, _ = env.step(action)
        next_state = torch.tensor(next_state, dtype=torch.float32).to(device)
        memory.push(state, action, reward, next_state, done)
        state = next_state
        total_reward += reward

        if len(memory) > BATCH_SIZE:
            states, actions, rewards, next_states, dones, weights, indices = memory.sample(BATCH_SIZE)

            q_values, _ = policy_net(states, hidden)
            q_values = q_values.gather(1, actions.unsqueeze(1)).squeeze(1)
            next_q_values, _ = target_net(next_states, hidden)
            expected_q_values = rewards + GAMMA * next_q_values.max(1)[0] * (1 - dones)

            losses = nn.MSELoss(reduction='none')(q_values, expected_q_values.detach())
            priorities = losses.detach().cpu().numpy() + 1e-5
            memory.update_priorities(indices, priorities)
            loss = (losses * weights).mean()

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

    if total_reward > best_reward:
        best_reward = total_reward
        torch.save(policy_net.state_dict(), os.path.join(save_dir, 'model.pth'))

    for target_param, param in zip(target_net.parameters(), policy_net.parameters()):
        target_param.data.copy_(TAU * param.data + (1.0 - TAU) * target_param.data)

    print(f"Episode: {episode}, Total Reward: {total_reward}, Epsilon: {epsilon:.3f}")

env.close()

device:cuda

Episode: 0, Total Reward: 6.800000000000001, Epsilon: 0.100
Episode: 1, Total Reward: 5.800000000000001, Epsilon: 0.100
Episode: 2, Total Reward: -1.0, Epsilon: 0.099
Episode: 3, Total Reward: -3.3999999999999995, Epsilon: 0.099
Episode: 4, Total Reward: -7.9, Epsilon: 0.098
Episode: 5, Total Reward: 20.3, Epsilon: 0.098
Episode: 6, Total Reward: 19.400000000000002, Epsilon: 0.097
Episode: 7, Total Reward: -10, Epsilon: 0.097
Episode: 8, Total Reward: -5.699999999999999, Epsilon: 0.096
Episode: 9, Total Reward: -1.0, Epsilon: 0.096
Episode: 10, Total Reward: 96.39999999999999, Epsilon: 0.095
Episode: 11, Total Reward: 4.1, Epsilon: 0.095
Episode: 12, Total Reward: 2.5999999999999996, Epsilon: 0.094
Episode: 13, Total Reward: 57.0, Epsilon: 0.094
Episode: 14, Total Reward: 41.800000000000004, Epsilon: 0.093
Episode: 15, Total Reward: 68.7, Epsilon: 0.093
Episode: 16, Total Reward: -6.7, Epsilon: 0.092
Episode: 17, Total Reward: 26.200000000000003, Epsilon: 0.092
Episode: 18

Test

In [3]:
import gymnasium as gym
import torch
import numpy as np
import torch.nn.functional as F
import highway_env
import warnings

warnings.filterwarnings("ignore")

# 測試參數設定
MODEL_PATH = 'models/model.pth'
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 模型定義
class R2D2(torch.nn.Module):
    def __init__(self, input_shape, n_actions):
        super(R2D2, self).__init__()
        self.conv = torch.nn.Sequential(
            torch.nn.Conv2d(4, 32, kernel_size=8, stride=4),
            torch.nn.ReLU(),
            torch.nn.Conv2d(32, 64, kernel_size=4, stride=2),
            torch.nn.ReLU(),
            torch.nn.Conv2d(64, 64, kernel_size=3, stride=1),
            torch.nn.ReLU()
        )
        self.lstm = torch.nn.LSTM(input_size=64 * 7 * 7, hidden_size=512, batch_first=True)
        self.fc = torch.nn.Linear(512, n_actions)

    def forward(self, x, hidden):
        x = x / 255.0
        x = self.conv(x)
        x = x.view(x.size(0), -1)
        x, hidden = self.lstm(x.unsqueeze(0), hidden)
        return self.fc(x.squeeze(0)), hidden

# 加載模型
input_shape = (4, 84, 84)
policy_net = R2D2(input_shape, 5).to(device)
policy_net.load_state_dict(torch.load(MODEL_PATH))
policy_net.eval()

# 測試模型 (至少跑 3 次)
env = gym.make('intersection-v0', render_mode='human')
episodes = 3
for episode in range(episodes):
    state, _ = env.reset()
    done = False
    total_reward = 0

    # 初始化隱藏狀態
    hidden = (torch.zeros(1, 1, 512).to(device), torch.zeros(1, 1, 512).to(device))

    while not done:
        with torch.no_grad():
            # 狀態預處理
            if len(state.shape) == 3:  # (H, W, C)
                state = np.transpose(state, (2, 0, 1))  # (C, H, W)
            elif len(state.shape) == 2:  # (H, W)
                state = np.expand_dims(state, axis=0)  # (1, H, W)

            # 重塑為 (4, 84, 84)
            state = np.tile(state, (4, 1, 1))
            state = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(device)
            state = F.interpolate(state, size=(84, 84), mode='bilinear', align_corners=False)

            # 模型預測動作
            q_values, hidden = policy_net(state, hidden)
            action = q_values.max(1)[1].item()

        # 執行動作
        next_state, reward, done, _, _ = env.step(action)
        state = next_state
        total_reward += reward

    print(f"Episode {episode + 1}: Total Reward = {total_reward}")

env.close()

Episode 1: Total Reward = 9.0
Episode 2: Total Reward = 9.0
Episode 3: Total Reward = 0.0
