In [None]:
#!pip install "gym [accept-rom-license, atari]"

In [None]:
!pip install gym[atari,accept-rom-license] --quiet
!pip install ale-py --quiet

In [1]:
from collections import deque

def compute_gae(next_value, rewards, masks, values, gamma=0.999, tau=0.95):
    # Similar to calculating the returns we can start at the end of the sequence and go backwards
    gae = 0
    returns = deque()
    gae_logger = deque()
    for step in reversed(range(len(rewards))):
        # Calculate the current delta value
        delta = rewards[step] + gamma * next_value * masks[step] - values[step]

        # The GAE is the decaying sum of these delta values
        gae = delta + gamma * tau * masks[step] * gae
        # Get the new next value
        next_value = values[step]



        # If we add the value back to the GAE we get a TD approximation for the returns
        # which we can use to train the Value function
        returns.appendleft(gae + values[step])
        gae_logger.appendleft(gae)


    return returns, gae_logger

In [16]:
import torch
from torch import nn
import torch as T
import torch.nn.functional as F

import numpy as np
from torch.distributions.categorical import Categorical
from torch.distributions.normal import Normal

def layer_init(layer, std=np.sqrt(2), bias_const=0.0):
    torch.nn.init.orthogonal_(layer.weight, std)
    torch.nn.init.constant_(layer.bias, bias_const)
    return layer


class Agent(nn.Module):
    def __init__(self, envs):
        super(Agent, self).__init__()
        self.critic = nn.Sequential(
            layer_init(nn.Linear(np.array(envs.observation_space.shape).prod(), 64)),
            nn.Tanh(),
            layer_init(nn.Linear(64, 64)),
            nn.Tanh(),
            layer_init(nn.Linear(64, 1), std=1.0),
        )
        self.actor = nn.Sequential(
            layer_init(nn.Linear(np.array(envs.observation_space.shape).prod(), 64)),
            nn.Tanh(),
            layer_init(nn.Linear(64, 64)),
            nn.Tanh(),
            layer_init(nn.Linear(64, envs.action_space.n), std=0.01),
        )

    def get_value(self, x):
        return self.critic(x)

    def get_action_and_value(self, x, action=None):
        logits = self.actor(x)
        probs = Categorical(logits=logits)
        if action is None:
            action = probs.sample()
        return action, probs.log_prob(action), probs.entropy(), self.critic(x)


In [17]:
import torch

def ppo_loss(new_dist, old_log_probs, advantages, clip_param):
      new_log_probs = new_dist
      ratio = torch.exp(new_log_probs - old_log_probs)

      surr1 = - advantages * ratio

      surr2 = - advantages * torch.clamp(ratio, 1- clip_param, 1 + clip_param)

      actor_loss = torch.max(surr1, surr2)


      return actor_loss.mean()

In [None]:
import torch

def clipped_critic_loss(new_value, old_value, returns, clip_param):
      vf_loss1 = (new_value - returns)**2

    # 2. MSE/L2 loss on the clipped value and the returns
    # Here we create an "approximation" of the new value (aka the current value) by finding the difference
    # between the "new" and "old" value and adding a clipped amount back to the old value
      vpredclipped = old_value + torch.clamp(new_value - old_value, -clip_param, clip_param)
    # Note that we ONLY backprop through the new value
      vf_loss2 = (vpredclipped - returns)**2

    # 3. Take the MAX between the two losses
    # This trick has the effect of only updating the current value DIRECTLY if is it WORSE (higher error)
    # than the old value.
    # If the old value was worse then the "approximation" will be worse and we update
    # the new value only a little bit!
      critic_loss = torch.max(vf_loss1, vf_loss2)

    # 4. Return the Expectation over the batch
      return 0.5 * critic_loss.mean()

In [88]:
def xuly_dulieu(env, model, gamma, tau, device,tong_up):
    obs= env.reset()
    dulieu = {
        "obs": [],
        "action": [],
        "reward": [],
        "logprob": [],
        "value": [],
        "done": []
    }

    for _ in range(tong_up):
        with torch.no_grad():
            obs_array = np.array(obs)
            obs_tensor = torch.tensor(obs_array, dtype=torch.float32).unsqueeze(0).to(device)
            action, logprob, _, value = model.get_action_and_value(obs_tensor)
        next_obs, reward, terminated, _ = env.step(action[0].cpu().numpy())
        done = 0 if terminated else 1

        dulieu["obs"].append(obs_array)  # Save as np.array to avoid LazyFrames
        dulieu["action"].append(action)
        dulieu["reward"].append(reward)
        dulieu["logprob"].append(logprob)
        dulieu["value"].append(value)
        dulieu["done"].append(done)

        obs = next_obs
        if terminated :
            obs = env.reset()

    with torch.no_grad():
        next_value = model.get_value(
            torch.tensor(np.array(obs), dtype=torch.float32).unsqueeze(0).to(device)
        )
        next_value=next_value.view(-1)

    # GAE
    with torch.no_grad():
        returns, advantages = compute_gae(
        next_value,
        dulieu["reward"],
        dulieu["done"],
        dulieu["value"],
        gamma,
        tau
    )

    advantages = torch.tensor(advantages, dtype=torch.float32).to(device)
    # Convert everything to tensors and shuffle
    obs_tensor = torch.tensor(np.array(dulieu["obs"]), dtype=torch.float32).to(device)
    action_tensor = torch.cat(dulieu["action"]).to(device)
    logprob_tensor = torch.cat(dulieu["logprob"]).to(device)
    value_tensor = torch.cat(dulieu["value"]).squeeze(-1).to(device)
    return_tensor = torch.tensor(returns, dtype=torch.float32).to(device)

    b_states = obs_tensor.reshape((-1,) + env.observation_space.shape)
    b_actions = action_tensor.reshape((-1,) +  env.action_space.shape)
    b_logprobs= logprob_tensor.reshape(-1)
    b_advantages = advantages.reshape(-1)
    b_returns = return_tensor.reshape(-1)
    b_values = value_tensor.reshape(-1)

    rs = torch.tensor(dulieu["reward"]).sum()

    minibatch = {
        "obs": b_states,
        "action":  b_actions,
        "logprob": b_logprobs,
        "value":  b_values,
        "returns": b_returns,
        "advantage": b_advantages,
    }

    return minibatch, rs


In [89]:
def ppo_update(data_buffer, ppo_epochs, clip_param, model, optimizer, device, minibatch_size=32):

    obs =data_buffer["obs"].to(device)
    logprob =data_buffer["logprob"].to(device)
    values = data_buffer["value"].to(device)
    returns = data_buffer["returns"].to(device)
    actions = data_buffer["action"]
    advantages = data_buffer["advantage"].to(device)
    batch_size = obs.shape[0]
    for _ in range(ppo_epochs):
        idx = torch.randperm(batch_size)
        for start in range(0, batch_size, minibatch_size):
            end = start + minibatch_size
            mb_idx = idx[start:end]
            mb_obs = obs[mb_idx]

            mb_old_logprob = logprob[mb_idx]
            mb_advantage =advantages[mb_idx]
            mb_advantages = (mb_advantage - mb_advantage.mean()) / (mb_advantage.std() +1e-8)
            mb_returns = returns[mb_idx]
            mb_old_values = values[mb_idx]

                # 4. Tính toán giá trị mới
            _, new_logprob, entropy, new_value = model.get_action_and_value(mb_obs,actions.long()[mb_idx])

                # 5. Tính loss
            actor_loss = ppo_loss(new_logprob, mb_old_logprob, mb_advantages, clip_param)
            critic_loss = clipped_critic_loss(new_value.view(-1), mb_old_values, mb_returns, clip_param)
            loss_entropy = entropy.mean()

            total_loss = actor_loss + 0.5 * critic_loss + 0.01* -loss_entropy

                # 6. Tối ưu hóa
            optimizer.zero_grad()
            total_loss.backward()
            nn.utils.clip_grad_norm_(model.parameters(), 0.5)
            optimizer.step()

    return loss_entropy  # hoặc return None nếu không cần


In [125]:
import gym
import torch
from torch.optim.lr_scheduler import CosineAnnealingLR
import numpy as np
import torch.optim as optim


lr = 2e-4
ppo_epochs = 4
clip_param = 0.2
gamma=0.99
tau=0.95
seed=42

name="LunarLander-v2"
env = gym.make(name)
env = gym.wrappers.NormalizeReward(env)
env = gym.wrappers.TransformReward(env, lambda reward: np.clip(reward, -10, 10))
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
agent=Agent(env).to(device)
optimizer = optim.Adam(agent.parameters(), lr=lr, eps=1e-5)


  deprecation(
  deprecation(


In [None]:

tong_up=2048
bat_dau=1
total_timesteps=10000000
num_updates = total_timesteps // tong_up

for update in range(bat_dau, num_updates + 1):

   fraction = 1.0 - ((update - 1.0) / num_updates)
   lr_current = fraction * lr
   optimizer.param_groups[0]['lr'] = lr_current

   du_lieu,rs= xuly_dulieu(env,agent,gamma, tau,device,tong_up)

   ep=ppo_update(du_lieu, ppo_epochs, clip_param,agent,optimizer,device,minibatch_size=128)
   if update % 50 ==0:
       torch.save({
            'update': update,
            'model_state_dict': agent.state_dict(),
            'optimizer_state_dict': optimizer.state_dict()
        }, f"agent_update_{update}.pth")
   if update % 20 ==0:

       print(f"num_up: {update}/{num_updates} -- total_timesteps: {update*tong_up}/{total_timesteps}")
       print(f"phần thưởng: {rs.item():.3f} -- entropy: {ep.item():.3f}")
       k=tes()
       print( "phần thưởng thử",k.item())
       print()

In [97]:
from typing import Tuple, Dict, Optional, Iterable, Callable

import numpy as np
import seaborn as sns

import matplotlib
from matplotlib import animation

from IPython.display import HTML

import gym
from gym import spaces
from gym.error import DependencyNotInstalled


import numpy as np
import matplotlib.pyplot as plt

In [98]:
def display_video(frames):
    # Copied from: https://colab.research.google.com/github/deepmind/dm_control/blob/master/tutorial.ipynb
    orig_backend = matplotlib.get_backend()
    matplotlib.use('Agg')
    fig, ax = plt.subplots(1, 1, figsize=(5, 5))
    matplotlib.use(orig_backend)
    ax.set_axis_off()
    ax.set_aspect('equal')
    ax.set_position([0, 0, 1, 1])
    im = ax.imshow(frames[0])
    def update(frame):
        im.set_data(frame)
        return [im]
    anim = animation.FuncAnimation(fig=fig, func=update, frames=frames,
                                    interval=50, blit=True, repeat=False)
    return HTML(anim.to_html5_video())

In [127]:
env = gym.make("LunarLander-v2",render_mode='rgb_array')


In [152]:
obs= env.reset()
k=0
frames = []
for i in range(1024):
    with torch.no_grad():
        obs_array = np.array(obs)
        obs_tensor = torch.tensor(obs_array, dtype=torch.float32).unsqueeze(0).to(device)

        action, logprob, _, value = agent.get_action_and_value(obs_tensor)
        next_obs, reward, done, info = env.step(action[0].cpu().numpy())
        obs=next_obs
        k += reward
        img = env.render()
        frames.append(img[0])
        if done:
            break
env.close()

In [None]:
display_video(frames)