In [1]:
import os
from typing import Dict, List, Tuple

import gymnasium as gym
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from IPython.display import clear_output
from torch.distributions import Normal
import math
import random
import torch.nn as nn


In [2]:
env = gym.make("Pendulum-v1", render_mode="rgb_array")

device = torch.device(
    "cuda" if torch.cuda.is_available() else "cpu"
)

obs_dim = env.observation_space.shape[0]
action_info = env.action_space
max_action = float(env.action_space.high[0])
print(env.observation_space.shape, obs_dim, action_info, max_action)

(3,) 3 Box(-2.0, 2.0, (1,), float32) 2.0


In [19]:
class QNetwork(nn.Module):
    def __init__(self, in_dim: int):
        """Initialization."""
        super(QNetwork, self).__init__()

        self.base = nn.Sequential(
            nn.Linear(in_dim, 400), 
            nn.ReLU(),
            nn.Linear(400, 300), 
            nn.ReLU()
        )
        self.mean = nn.Sequential(
            nn.Linear(300, 1)
        )
        self.std = nn.Sequential(
            nn.Linear(300, 1)
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """Forward method implementation."""
        val = self.base(x)
        mean = self.mean(val)
        std = self.std(val)
        return mean, torch.sqrt(torch.exp(std))
    
class VNetwork(nn.Module):
    def __init__(self, in_dim: int):
        """Initialization."""
        super(VNetwork, self).__init__()

        self.base = nn.Sequential(
            nn.Linear(in_dim, 400), 
            nn.ReLU(),
            nn.Linear(400, 300), 
            nn.ReLU(),
            nn.Linear(300, 1)
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """Forward method implementation."""
        return self.base(x)

In [43]:
actor = QNetwork(obs_dim).to(device)
critic = VNetwork(obs_dim).to(device)
def select_action(state, is_test = False):
    mean, std = actor(torch.FloatTensor(state).to(device))
    # 得到动作策略的概率质量分布,用该分布初始化采样器
    m = Normal(mean, std)
    # 按该分布做采样,得到一个动作
    action = m.sample()
    # 对概率质量函数取对数,并采样该分布在action的值,即log P在action点的概率密度函数值
    loss = m.log_prob(action)
    return max_action * torch.tanh(action), loss

def critic_value(state):
    return critic(torch.FloatTensor(state).to(device))

In [None]:
gamma = 0.98

actor_optimizer = optim.Adam(actor.parameters(), lr=0.0001)
critic_optimizer = optim.Adam(critic.parameters(), lr=0.0001)

steps = []
U_s = []
view_losses = []
scores = []
max_epoch = 5000
for i in range(max_epoch):
    score = 0
    step = 0
    
    state, _ = env.reset(seed=39)
    trajectories = []
    while True:
        action, log_prob = select_action(state)
        '''从环境中观测到奖励和新的状态'''
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        
        value = critic_value(state)
        
        step += 1
        score += reward
        
        trajectories.append([state, action, reward, next_state, log_prob, done, value])
        if done:
            steps.append(step)
            scores.append(score)
            break
        
        state = next_state
        
    # 结束一轮游戏
        
#         '''让价值网络打分'''
    roll_count = math.floor(len(trajectories) / 8) * 8
    start_idx = 0
    log_probs = []
    values    = []
    masks     = []
    G = []
    for k in range(roll_count):
        state, action, reward, next_state, log_prob, done, value = trajectories[start_idx + k]
        log_probs.append(log_prob)
        values.append(reward + gamma * value)
        masks.append(1 - done)
        G.append(critic_value(next_state))

    log_probs = torch.cat(log_probs)
    G = torch.cat(G)
    values = torch.cat(values)
    advantage = G - values

    actor_loss  = -(log_probs * advantage.detach()).mean()
#         actor_loss += -(log_probs[indx] * advantage.detach()[indx])
#     print('actor_loss', actor_loss)
    view_losses.append(actor_loss.item())

#     critic_loss = 0.5 * advantage.pow(2).mean()
    critic_loss = F.mse_loss(G, values)
#     print('critic_loss', critic_loss)
    actor_optimizer.zero_grad()
    actor_loss.backward()
    actor_optimizer.step()

    critic_optimizer.zero_grad()
    critic_loss.backward()
    critic_optimizer.step()

In [None]:
# 观察曲线
plt.figure(figsize=(20, 5))
plt.subplot(131)
# plt.title('frame %s. score: %s' % (frame_idx, np.mean(scores[-10:])))
plt.title('score')
plt.plot(scores)
plt.subplot(132)
plt.title('steps')
plt.plot(steps)
plt.subplot(133)
plt.title('actor loss')
plt.plot(view_losses)
plt.show()

In [42]:
# 观察效果
def test():
    dvideo_env = gym.wrappers.RecordVideo(env, video_folder='.')
    state, _ = dvideo_env.reset(seed=3)
    done = False
    score = 0
    step = 0
    while not done:
        action, _ = select_action(state)
        next_state, reward, done, truncated, _ = dvideo_env.step([action.item()])

        state = next_state
        score += reward
        step +=1
        
        if step > 200:
            break

    print("score: ", score, "step:", step)
    dvideo_env.close()
    
test()

  logger.warn(


Moviepy - Building video /Users/webberg/workspace/code/reinforce_learning/policy/rl-video-episode-0.mp4.
Moviepy - Writing video /Users/webberg/workspace/code/reinforce_learning/policy/rl-video-episode-0.mp4



                                                                                

Moviepy - Done !
Moviepy - video ready /Users/webberg/workspace/code/reinforce_learning/policy/rl-video-episode-0.mp4
score:  -1358.848910913532 step: 201


In [None]:
,«2q12yykm,,,,,,,,,,i89[]