In [13]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import gym

class ValueNetwork(nn.Module):
    def __init__(self, num_features, hidden_size, learning_rate=0.01):
        '''
        Tác dụng: Khởi tạo mạng giá trị để ước lượng giá trị trạng thái.
        Tham số:
            num_features: số chiều đầu vào (đặc trưng trạng thái)
            hidden_size: kích thước lớp ẩn
            learning_rate: tốc độ học cho bộ tối ưu
        Trả về: None
        '''
        super(ValueNetwork, self).__init__()
        self.network = nn.Sequential(
            nn.Linear(num_features, hidden_size),
            nn.Sigmoid(),
            nn.Linear(hidden_size, 1)
        )
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.to(self.device)

    def forward(self, states):
        '''
        Tác dụng: Dự đoán giá trị trạng thái.
        Tham số:
            states: trạng thái đầu vào (tensor hoặc ndarray)
        Trả về: Giá trị trạng thái (tensor)
        '''
        if not isinstance(states, torch.Tensor):
            states = torch.as_tensor(states, dtype=torch.float32).to(self.device)
        return self.network(states).squeeze(-1)

    def update(self, states, discounted_rewards):
        '''
        Tác dụng: Cập nhật mạng giá trị bằng MSE Loss giữa giá trị dự đoán và phần thưởng chiết khấu.
        Tham số:
            states: tập trạng thái
            discounted_rewards: phần thưởng chiết khấu tương ứng
        Trả về: Giá trị loss
        '''
        if not isinstance(states, torch.Tensor):
            states = torch.as_tensor(states, dtype=torch.float32).to(self.device)
        if not isinstance(discounted_rewards, torch.Tensor):
            discounted_rewards = torch.as_tensor(discounted_rewards, dtype=torch.float32).to(self.device)
        self.optimizer.zero_grad()
        predictions = self(states)
        loss = nn.MSELoss()(predictions, discounted_rewards)
        loss.backward()
        self.optimizer.step()
        return loss.item()

class PPOPolicyNetwork(nn.Module):
    def __init__(self, num_features, layer_1_size, layer_2_size, layer_3_size, num_actions, epsilon=0.2, learning_rate=9e-4):
        '''
        Tác dụng: Khởi tạo mạng chính sách PPO với kiến trúc 3 lớp ẩn và softmax đầu ra.
        Tham số:
            num_features: số chiều đầu vào (trạng thái)
            layer_1_size, layer_2_size, layer_3_size: kích thước các lớp ẩn
            num_actions: số lượng hành động
            epsilon: ngưỡng cắt trong PPO
            learning_rate: tốc độ học
        Trả về: None
        '''
        super(PPOPolicyNetwork, self).__init__()
        self.epsilon = epsilon
        self.network = nn.Sequential(
            nn.Linear(num_features, layer_1_size),
            nn.ReLU(),
            nn.Linear(layer_1_size, layer_2_size),
            nn.ReLU(),
            nn.Linear(layer_2_size, layer_3_size),
            nn.ReLU(),
            nn.Linear(layer_3_size, num_actions),
            nn.Softmax(dim=-1)
        )
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.to(self.device)

    def forward(self, states):
        '''
        Tác dụng: Trả về phân phối xác suất hành động.
        Tham số:
            states: trạng thái đầu vào
        Trả về: phân phối xác suất hành động (tensor)
        '''
        if not isinstance(states, torch.Tensor):
            states = torch.as_tensor(states, dtype=torch.float32).to(self.device)
        return self.network(states)

    def update(self, states, chosen_actions, ep_advantages, old_probabilities):
        '''
        Tác dụng: Cập nhật mạng chính sách theo thuật toán PPO.
        Tham số:
            states: tập trạng thái
            chosen_actions: hành động đã chọn dưới dạng one-hot
            ep_advantages: lợi thế thu được
            old_probabilities: phân phối xác suất hành động cũ
        Trả về: giá trị loss
        '''
        if not isinstance(states, torch.Tensor):
            states = torch.as_tensor(states, dtype=torch.float32).to(self.device)
        if not isinstance(chosen_actions, torch.Tensor):
            chosen_actions = torch.as_tensor(chosen_actions, dtype=torch.float32).to(self.device)
        if not isinstance(ep_advantages, torch.Tensor):
            ep_advantages = torch.as_tensor(ep_advantages, dtype=torch.float32).to(self.device)
        if not isinstance(old_probabilities, torch.Tensor):
            old_probabilities = torch.as_tensor(old_probabilities, dtype=torch.float32).to(self.device)

        self.optimizer.zero_grad()
        new_probabilities = self(states)
        new_responsible_outputs = (chosen_actions * new_probabilities).sum(dim=1)
        old_responsible_outputs = (chosen_actions * old_probabilities).sum(dim=1)
        ratio = new_responsible_outputs / (old_responsible_outputs + 1e-10)
        clipped_ratio = torch.clamp(ratio, 1 - self.epsilon, 1 + self.epsilon)
        loss = -torch.min(ratio * ep_advantages, clipped_ratio * ep_advantages).mean()
        loss.backward()
        self.optimizer.step()
        return loss.item()

class PPO:
    def __init__(self, env, num_features=1, num_actions=1, gamma=0.98, lam=1, epsilon=0.2,
                 value_network_lr=0.001, policy_network_lr=0.01, value_network_hidden_size=100,
                 policy_network_hidden_size_1=40, policy_network_hidden_size_2=35, policy_network_hidden_size_3=30):
        ''''
        Tác dụng: Khởi tạo agent PPO với mạng giá trị và mạng chính sách tương ứng.
        Tham số:
            env: môi trường huấn luyện (Gym environment)
            num_features: số lượng đặc trưng (input size của mạng)
            num_actions: số hành động có thể chọn trong môi trường
            gamma: hệ số chiết khấu phần thưởng tương lai
            lam: hệ số lambda trong tính toán lợi thế GAE
            epsilon: ngưỡng cắt tỉ lệ trong PPO để tránh cập nhật quá mạnh
            value_network_lr: tốc độ học của mạng giá trị
            policy_network_lr: tốc độ học của mạng chính sách
            value_network_hidden_size: kích thước lớp ẩn trong mạng giá trị
            policy_network_hidden_size_1: kích thước lớp ẩn 1 trong mạng chính sách
            policy_network_hidden_size_2: kích thước lớp ẩn 2 trong mạng chính sách
            policy_network_hidden_size_3: kích thước lớp ẩn 3 trong mạng chính sách
        Trả về: None
        '''
        self.env = env
        self.num_features = num_features
        self.num_actions = num_actions
        self.gamma = gamma
        self.lam = lam
        self.Pi = PPOPolicyNetwork(
            num_features=num_features, num_actions=num_actions,
            layer_1_size=policy_network_hidden_size_1,
            layer_2_size=policy_network_hidden_size_2,
            layer_3_size=policy_network_hidden_size_3,
            epsilon=epsilon, learning_rate=policy_network_lr
        )
        self.V = ValueNetwork(num_features, value_network_hidden_size, learning_rate=value_network_lr)

    def discount_rewards(self, rewards):
        '''
        Tác dụng: Tính phần thưởng chiết khấu từ danh sách phần thưởng
        Tham số:
            rewards: danh sách phần thưởng
        Trả về: phần thưởng chiết khấu cùng chiều
        '''
        running_total = 0
        discounted = np.zeros_like(rewards)
        for r in reversed(range(len(rewards))):
            running_total = running_total * self.gamma + rewards[r]
            discounted[r] = running_total
        return discounted

    def calculate_advantages(self, rewards, values):
        '''
        Tác dụng: Tính lợi thế bằng công thức GAE (Generalized Advantage Estimation)
        Tham số:
            rewards: danh sách phần thưởng
            values: giá trị trạng thái tương ứng
        Trả về: lợi thế chuẩn hóa (advantage)
        '''
        advantages = np.zeros_like(rewards, dtype=np.float32)
        for t in range(len(rewards)):
            ad = 0
            for l in range(0, len(rewards) - t):
                delta = rewards[t + l] + (self.gamma * values[t + l + 1] if t + l + 1 < len(values) else 0) - values[t + l]
                ad += ((self.gamma * self.lam) ** l) * delta
            advantages[t] = ad
        return (advantages - np.mean(advantages)) / (np.std(advantages) + 1e-10)

    def run_model(self):
        '''
        Tác dụng: Huấn luyện mô hình PPO trên môi trường đã khởi tạo.
        '''
        episode = 1
        running_reward = []
        environment_solved = False
        while not environment_solved:
            reset_result = self.env.reset()
            s0 = reset_result[0] if isinstance(reset_result, tuple) else reset_result
            is_terminal = False
            ep_rewards = []
            ep_actions = []
            ep_states = []
            score = 0
            while not is_terminal:
                dist = self.Pi(np.array(s0)[None, :]).detach().cpu().numpy()[0]
                action = np.random.choice(range(self.num_actions), p=dist)
                a_binarized = np.zeros(self.num_actions)
                a_binarized[action] = 1
                s1, r, terminated, truncated, _ = self.env.step(action)
                is_terminal = terminated or truncated
                score += r
                ep_actions.append(a_binarized)
                ep_rewards.append(r)
                ep_states.append(s0)
                s0 = s1
                if is_terminal:
                    ep_actions = np.array(ep_actions, dtype=np.float32)
                    ep_rewards = np.array(ep_rewards, dtype=np.float32)
                    ep_states = np.array(ep_states, dtype=np.float32)
                    targets = self.discount_rewards(ep_rewards)
                    for i in range(len(ep_states)):
                        self.V.update(ep_states[i:i+1], targets[i:i+1])
                    ep_values = self.V(ep_states).detach().cpu().numpy()
                    ep_advantages = self.calculate_advantages(ep_rewards, ep_values)
                    old_probabilities = self.Pi(ep_states).detach().cpu().numpy()
                    self.Pi.update(ep_states, ep_actions, ep_advantages, old_probabilities)
                    running_reward.append(score)
                    if episode % 25 == 0:
                        avg_score = np.mean(running_reward[-25:])
                        print(f"Episode: {episode} - Score: {avg_score}")
                        if avg_score >= 500:
                            if not environment_solved:
                                print("Environment solved!")
                                environment_solved = True
                            break
                    episode += 1

In [14]:
# Khởi tạo môi trường và chạy mô hình
env = gym.make('CartPole-v1', new_step_api=True)
agent = PPO(
    env, num_features=4, num_actions=2, gamma=0.98, lam=1, epsilon=0.2,
    value_network_lr=0.001, policy_network_lr=0.01, value_network_hidden_size=100,
    policy_network_hidden_size_1=40, policy_network_hidden_size_2=35, policy_network_hidden_size_3=30
)
agent.run_model()
env.close()

Episode: 25 - Score: 16.2
Episode: 50 - Score: 13.2
Episode: 75 - Score: 23.92
Episode: 100 - Score: 76.12
Episode: 125 - Score: 171.96
Episode: 150 - Score: 167.72
Episode: 175 - Score: 393.64
Episode: 200 - Score: 478.88
Episode: 225 - Score: 497.96
Episode: 250 - Score: 434.88
Episode: 275 - Score: 446.4
Episode: 300 - Score: 419.52
Episode: 325 - Score: 500.0
Environment solved!
