In [1]:
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Categorical
from torch.utils.data.sampler import BatchSampler, SubsetRandomSampler
import matplotlib.pyplot as plt
from collections import namedtuple
from IPython.display import clear_output
from statistics import mean
from utils.wrappers import make_atari, wrap_deepmind, wrap_pytorch
from itertools import count
from torch.utils.tensorboard import SummaryWriter
import time
import os

In [2]:
# 输入图片大小 W×W
# Filter大小 F×F
# 步长 S
# padding的像素数 P
# N = (W − F + 2P )/S + 1
class CNN_Actor_critic(nn.Module):
    def __init__(self, out_dim, in_channels=3):
        super(CNN_Actor_critic, self).__init__()
        
        self.common_net = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=8, stride=4),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.BatchNorm2d(64),
            nn.ReLU()
        )
        
        self.critic_net = nn.Sequential(
            nn.Linear(352 * 64, 512),
            nn.ReLU(),
            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Linear(128, 1)
        )
        
        self.actor_net = nn.Sequential(
            nn.Linear(352 * 64, 512),
            nn.ReLU(),
            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Linear(128, out_dim),
            nn.Softmax()
        )
    
    def forward(self, x):
        common = self.common_net(x)
        common = common.view(common.size(0), -1)
        
        value = self.critic_net(common)
        probs = self.actor_net(common)
        
        return Categorical(probs), value

In [3]:
def smooth_plot(factor, item, plot_decay):
    item_x = np.arange(len(item))
    item_smooth = [np.mean(item[i:i+factor]) if i > factor else np.mean(item[0:i+1])
                  for i in range(len(item))]
    for i in range(len(item)// plot_decay):
        item_x = item_x[::2]
        item_smooth = item_smooth[::2]
    return item_x, item_smooth
    
def plot(episode, rewards, losses, episode_steps):
    clear_output(True)
    rewards_x, rewards_smooth = smooth_plot(10, rewards, 1000)
    losses_x, losses_smooth = smooth_plot(10, losses, 1000)
    episode_steps_x, episode_steps_smooth = smooth_plot(10, episode_steps, 1000)
    
    plt.figure(figsize=(18, 12))
    plt.subplot(311)
    plt.title('Episode %s. reward: %s'%(episode, rewards_smooth[-1]))
    plt.plot(rewards, label="Rewards", color='lightsteelblue', linewidth='1')
    plt.plot(rewards_x, rewards_smooth, label='Smothed_Rewards', color='darkorange', linewidth='3')
    plt.legend(loc='best')
    
    plt.subplot(312)
    plt.title('Losses')
    plt.plot(losses, label="Losses", color='lightsteelblue', linewidth='1')
    plt.plot(losses_x, losses_smooth, 
             label="Smoothed_Losses", color='darkorange', linewidth='3')
    plt.legend(loc='best')
    
    plt.subplot(313)
    plt.title('Episode_Steps %s.'%(episode))
    plt.plot(episode_steps, label="Episode_Steps", color='lightsteelblue', linewidth='1')
    plt.plot(episode_steps_x, episode_steps_smooth, 
             label="Episode_Steps_Losses",color='darkorange',linewidth='3')
    plt.legend(loc='best')
    
    plt.show()

In [4]:
## hyperparameters

env_name = "PongNoFrameskip-v4"
env = make_atari(env_name)
env = wrap_deepmind(env)
env = wrap_pytorch(env)

current_time = time.strftime('%Y-%m-%d_%H:%M:%S',time.localtime(time.time()))
ROOT_DIR = "./running_log/PPO2_stable_pic_input/{}".format(current_time)
model_dir = os.path.join(ROOT_DIR, "model")
plot_dir = os.path.join(ROOT_DIR, "tensorboard")
os.makedirs(model_dir)
os.makedirs(plot_dir)
writer = SummaryWriter(plot_dir)

max_epoch = 20000
num_timesteps = 2000
clip_param = 0.2
ppo_epochs = 4
gamma = 0.99
learning_rate = 0.002
betas = (0.9, 0.999)
update_timestep = 2000

## hyperparameters

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

env = gym.make(env_name)

in_dim = 4
# in_dim = env.observation_space.shape[0]
out_dim = env.env.action_space.n
network = CNN_Actor_critic(in_dim, out_dim).to(device)
old_network = CNN_Actor_critic(in_dim, out_dim).to(device)
old_network.load_state_dict(network.state_dict())

optimizer = optim.Adam(network.parameters(), lr=learning_rate, betas=betas)

Trajectory = namedtuple('Trajectory', ['state', 'action', 'reward', 'done', 'log_prob'])
buffer = []

In [5]:
def trans_state(state):
    state = state.transpose((2, 0, 1))
    state = torch.FloatTensor(state).unsqueeze(0).to(device)
    return state

def choose_action(state):
    state = trans_state(state)
    with torch.no_grad():
        dist, value = old_network(state)
    action = dist.sample()
    next_state, reward, done, _ = env.step(action.item())

    buffer.append(Trajectory(state, action, reward, done, dist.log_prob(action)))
    
    return next_state, reward, done

In [6]:
def ppo_train(next_state, update_time):
    losses = []
    
    state = torch.stack([t.state for t in buffer]).squeeze(1)
    action = torch.stack([t.action for t in buffer])
    reward = [t.reward for t in buffer]
    dones = [t.done for t in buffer]
    old_log_prob = torch.stack([t.log_prob for t in buffer])
    
    Gt, R = [], 0
    # done 掉之后，整条路径要重新计算
    for r, done in zip(reversed(reward), reversed(dones)):
        if done:
            R = 0
        R = r + gamma * R
        Gt.insert(0, R)
    # 计算之后Gt的维度为[1, ...], 和value运算会广播
    Gt = torch.FloatTensor(Gt).to(device)
    Gt = (Gt - Gt.mean()) / (Gt.std() + 1e-5)
    
    for _ in range(ppo_epochs):
        dist, value = network(state)
        entropy = dist.entropy().mean()
        advantage = Gt - value.squeeze().detach()

        action_prob =  dist.log_prob(action)

        ratio = torch.exp(action_prob - old_log_prob.detach())
        surr1 = ratio * advantage
        surr2 = torch.clamp(ratio, 1 - clip_param, 1 + clip_param) * advantage
        actor_loss = - torch.min(surr1, surr2).mean()

        critic_loss = (Gt - value).pow(2).mean()

        loss = 0.5 * critic_loss + actor_loss - 0.01 * entropy
        losses.append(loss.item())

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    old_network.load_state_dict(network.state_dict())
    del buffer[:]
    
    writer.add_scalar("Loss", np.array(losses).mean(), update_time)

In [None]:
update_time, timestep, done_time = 0, 0, 0

for epoch in range(max_epoch):
    
    state = env.reset()
    rewards = 0
    
    for i in count():
        timestep += 1
        
        next_state, reward, done = choose_action(state)
        
        state = next_state
        rewards += reward
        
        if timestep % update_timestep == 0:
            ppo_train(next_state, update_time)
            update_time += 1
            timestep = 0
        
        if done :
            writer.add_scalar("Episode_Steps", i, done_time)
            done_time += 1
            break
    
    writer.add_scalar("Rewards", rewards, epoch)
torch.save(network, model_dir + "/model.pth")

  input = module(input)


In [None]:
# torch.save(old_network.state_dict(), "./model/LunarLander-v2_PPO2.pth")

In [None]:
# env_name = "LunarLander-v2"
# env = gym.make(env_name)
# in_dim = env.observation_space.shape[0]
# out_dim = env.env.action_space.n
test_episodes = 100
test_steps = 300

def test_ppo():
    test_rewards = []
    # network = Actor_critic(in_dim, out_dim).to(device)
    # network.load_state_dict(torch.load("./model/LunarLander-v2_PPO2.pth"))
    for _ in range(test_episodes):
        state = env.reset()
        rewards = 0
        for _ in range(test_steps):
            state = trans_state(state)
            with torch.no_grad():
                dist, value = old_network(state)
            action = dist.probs.argmax()
            next_state, reward, done, _ = env.step(action.item())
            state = next_state
            rewards += reward
            if done: break
            
        test_rewards.append(rewards)
    return test_rewards

In [None]:
test_result = test_ppo()
plt.title("Average reward : {}".format(np.array(test_result).mean()))
plt.plot(test_result)

Reference:<br>
[https://github.com/nikhilbarhate99/PPO-PyTorch/blob/master/PPO.py](https://github.com/nikhilbarhate99/PPO-PyTorch/blob/master/PPO.py)<br>
[https://github.com/sweetice/Deep-reinforcement-learning-with-pytorch/tree/master/Char07%20PPO](https://github.com/sweetice/Deep-reinforcement-learning-with-pytorch/tree/master/Char07%20PPO)<br>