In [None]:
"""
@ Author: Zachary Deng
@ Date: 2021/2/9
@ Brief: 使用 Actor-Critic算法训练CartPole-v0
"""

import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import numpy as np
import time

In [None]:
# Hyper Parameters for Actor
GAMMA = 0.95
LR = 0.01

# Use GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.backends.cudnn.enabled = False # 关闭非确定性算法 / 防止自动找最适的算法

In [None]:
class PGNetwork(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(PGNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim, 40)
        self.fc2 = nn.Linear(40, action_dim)
    
    def forward(self, x):
        out = F.relu(self.fc1(x))
        out = self.fc2(out)
        return out
    
    def initialize_weights(self):
        for m in self.modules():
            nn.init.normal_(m.weight.data, 0, 0.1)
            nn.init.constant_(m.bias.data, 0.01)
            
class Actor(object):

    def __init__(self, env):
        # 初始化状态空间和动作空间的维度
        self.state_dim = env.observation_space.shape[0]
        self.action_dim = env.action_space.n
        
        # init network parameters
        self.network = PGNetwork(self.state_dim, self.action_dim).to(device)
        self.optimizer = torch.optim.Adam(self.network.parameters(), lr=LR)
        
        # init some parameters
        self.time_step = 0
        
    def choose_action(self, observation):
        observation = torch.FloatTensor(observation).to(device)
        network_output = self.network.forward(observation)
        with torch.no_grad():
            prob_weights = F.softmax(network_output, dim=0).cuda().data.cpu().numpy()
        action = np.random.choice(range(prob_weights.shape[0]), p=prob_weights)
        return action
    
    def learn(self, state, action, td_error):
        self.time_step += 1
        # Step1 : 前向传递
        softmax_input = self.network.forward(torch.FloatTensor(state).to(device)).unsqueeze(0)
        action = torch.LongTensor([action]).to(device)
        neg_log_prob = F.cross_entropy(input=softmax_input, target=action, reduction='none')
        
        # Step2 : 反向传播
        # 这里需要最大化当前策略的价值，因此需要最大化neg_log_prob * td_error,即最小化-neg_log_prob * td_error
        loss = neg_log_prob * td_error
        self.optimizer.zero_grad() # 将梯度初始化为零（因为一个batch的loss关于weight的导数是所有sample的loss关于weight的导数的累加和）
        loss.backward() #反向传播求梯度
        self.optimizer.step() #更新所有参数

In [None]:
# Hyper parameters for Critic
class QNetwork(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(QNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim, 40)
        self.fc2 = nn.Linear(40, 1) # 这个地方和之前略有区别，输出不是动作维度，而是一维
    
    def forward(self, x):
        out = F.relu(self.fc1(x))
        out = self.fc2(out)
        return out
    
    def initialize_weights(self):
        for m in self.modules():
            nn.init.normal_(m.weight.data, 0, 0.1)
            nn.init.constant_(m.bias.data, 0.01)
            
class Critic(object):
    def __init__(self, env):
        # 状态空间和动作空间的维度
        self.state_dim = env.observation_space.shape[0]
        self.action_dim = env.action_space.n
        
        # init network parameters
        self.network = QNetwork(self.state_dim, self.action_dim).to(device)
        self.optimizer = torch.optim.Adam(self.network.parameters(), lr=LR)
        self.loss_func = nn.MSELoss() #对于Critic本身的参数更新一般用均方误差损失函数来更新
        
        # init some parameters
        self.time_step = 0

    def target_Q_network(self, state, reward, next_state):
        s, s_ = torch.FloatTensor(state).to(device), torch.FloatTensor(next_state).to(device)
        #前向传播
        v = self.network.forward(s)
        v_ = self.network.forward(s_)
        
        #反向传播
        loss_q = self.loss_func(reward + GAMMA * v_, v) 
        self.optimizer.zero_grad()
        loss_q.backward()
        self.optimizer.step()
        
        with torch.no_grad():
            td_error = reward + GAMMA * v_ - v #参考公式 td_error = r + GAMMA*V(s') - V(s)
            
        return td_error

In [None]:
ENV_NAME = 'CartPole-v0'
EPISODE = 50000
STEP = 3000
TEST = 10

def main():
    env = gym.make(ENV_NAME)
    actor = Actor(env)
    critic = Critic(env)
    
    for episode in range(EPISODE):
        # initialize task
        state = env.reset()
        
        # Train
        for step in range(STEP):
            action = actor.choose_action(state) # softmax概率选择action
            next_state, reward, done, _ = env.step(action)
            td_error = critic.target_Q_network(state, reward, next_state) # gradient = grad[r + gamma * V(s_) - V(s)]
            actor.learn(state, action, td_error) # true_gradient = grad[logPi(s,a) * td_error]
            state = next_state
            if done:
                break
                
        # Test every 100 episodes
        if episode%100 == 0:
            total_reward = 0
            for i in range(TEST):
                state = env.reset()
                for j in range(STEP):
                    env.render()
                    action = actor.choose_action(state)
                    state, reward, done, _ = env.step(action)
                    total_reward += reward
                    if done:
                        break
            ave_reward = total_reward / TEST
            print('episode:', episode, 'Evaluation Average Reward:', ave_reward)
            if ave_reward>=300:
                break
                
                
if __name__ == '__main__':
    time_start = time.time()
    main()
    time_end = time.time()
    print('Total time:', int(time_end-time_start), 's')