In [1]:
import gym
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F
from torch.optim import Adam

In [2]:
# actor
class ActorNet(nn.Module):
    def __init__(self, n_features, n_hidden, n_outputs):
        super().__init__()
        self.l1 = nn.Linear(n_features, n_hidden)
        self.mu = nn.Linear(n_hidden, n_outputs)
        self.sigma = nn.Linear(n_hidden, n_outputs)
    
    def forward(self, x):
        x = self.l1(x)
        x = F.relu(x)
        mu = self.mu(x)
        mu = torch.tanh(mu) # [-1, 1]
        sigma = self.sigma(x)
        sigma = F.softplus(sigma) # [0, ∞]
        return mu, sigma

class Actor(object):
    def __init__(self, n_features, action_bound, n_hidden=30, lr=0.0001):
        self.n_features = n_features
        self.action_bound = action_bound
        self.n_hidden = n_hidden
        self.lr = lr

        self.__build_net()

    def __build_net(self):
        self.actor_net = ActorNet(self.n_features, self.n_hidden, 1)
        self.optimizer = Adam(self.actor_net.parameters(), lr=self.lr)

    def normal_dist(self, s):
        s = torch.FloatTensor(s[np.newaxis, :])
        mu, sigma = self.actor_net(s)
        mu, sigma = (mu * 2).squeeze(),  (sigma + 0.1).squeeze()
        # get the normal distribution of average=mu and std=sigma
        normal_dist = torch.distributions.Normal(mu, sigma)
        return normal_dist
    
    def choose_action(self, s):
        self.actor_net.eval()
        with torch.no_grad():
            # sample action accroding to the distribution
            normal_dist = self.normal_dist(s)
        action = torch.clamp(normal_dist.sample(), self.action_bound[0], self.action_bound[1])
        return action.item()

    def learn(self, s, a, td):
        self.actor_net.train()
        normal_dist = self.normal_dist(s)
        # log_prob get the probability of action a under the distribution of normal_dist
        log_prob = normal_dist.log_prob(torch.tensor(a))
        # advantage (TD_error) guided loss
        exp_v = log_prob * torch.tensor(td.item())
        # Add cross entropy cost to encourage exploration
        exp_v += 0.01 * normal_dist.entropy()
        # max(v) = min(-v)
        loss = - exp_v   
        
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        return exp_v

In [3]:
class CriticNet(nn.Module):
    def __init__(self, n_features, n_hidden, n_outputs):
        super().__init__()
        self.l1 = nn.Linear(n_features, n_hidden)
        self.v = nn.Linear(n_hidden, n_outputs)


    def forward(self, x):
        x = self.l1(x)
        x = F.relu(x)
        x = self.v(x)
        return x

class Critic:
    def __init__(self, n_features, n_hidden=30, n_output=1, lr=0.01, gamma=0.9):
        self.n_features = n_features
        self.n_hidden = n_hidden
        self.n_output = n_output
        self.lr = lr
        self.gamma = gamma
        self.__build_net()

    def __build_net(self):
        self.critic_net = CriticNet(self.n_features, self.n_hidden, self.n_output)
        self.optimizer = Adam(self.critic_net.parameters(), lr=self.lr)
    
    def learn(self, s, r, s_):
        s, s_ = torch.FloatTensor(s[np.newaxis, :]), torch.FloatTensor(s_[np.newaxis, :])
        v, v_ = self.critic_net(s), self.critic_net(s_)
        td_error = torch.mean(r + self.gamma * v_.double() - v.double())
        loss = td_error ** 2

        self.optimizer.zero_grad()
        loss.backward(retain_graph=True)
        self.optimizer.step()

        return td_error

In [4]:
MAX_EPISODE = 1000
MAX_EP_STEPS = 200
DISPLAY_REWARD_THRESHOLD = -100
RENDER = False
LR_A = 0.001
LR_C = 0.01

In [5]:
env = gym.make('Pendulum-v0')
env = env.unwrapped

In [6]:
N_S = env.observation_space.shape[0]
A_BOUND = env.action_space.high

In [7]:
actor = Actor(n_features=N_S, lr=LR_A, action_bound=[float(-A_BOUND), float(A_BOUND)])
critic = Critic(n_features=N_S, lr=LR_C)

In [8]:
for i_episode in range(MAX_EPISODE):
	s = env.reset()
	t = 0
	ep_rs = []
	while True:
		if RENDER: env.render()
		a = actor.choose_action(s)

		s_, r, done, info = env.step([a])
		r /= 10

		td_error = critic.learn(s, r, s_)   # gradient = grad[r + gamma * V(s_) - V(s)]
		actor.learn(s, a, td_error)   # gradient = grad[logPi(s, a) * td_error]

		s = s_
		t += 1
		ep_rs.append(r)
		if t > MAX_EP_STEPS:
			ep_rs_sum = sum(ep_rs)
			if 'running_reward' not in globals():
				running_reward = ep_rs_sum
			else:
				running_reward = running_reward * 0.9 + ep_rs_sum * 0.1
			if running_reward > DISPLAY_REWARD_THRESHOLD: RENDER = True
			print('episode: ', i_episode, '  reward:', int(running_reward))
			break
env.close()

episode:  0   reward: -142
episode:  1   reward: -139
episode:  2   reward: -140
episode:  3   reward: -138
episode:  4   reward: -139
episode:  5   reward: -135
episode:  6   reward: -136
episode:  7   reward: -139
episode:  8   reward: -136
episode:  9   reward: -138
episode:  10   reward: -139
episode:  11   reward: -139
episode:  12   reward: -139
episode:  13   reward: -137
episode:  14   reward: -136
episode:  15   reward: -138
episode:  16   reward: -140
episode:  17   reward: -136
episode:  18   reward: -138
episode:  19   reward: -139
episode:  20   reward: -139
episode:  21   reward: -141
episode:  22   reward: -142
episode:  23   reward: -141
episode:  24   reward: -142
episode:  25   reward: -143
episode:  26   reward: -138
episode:  27   reward: -139
episode:  28   reward: -140
episode:  29   reward: -140
episode:  30   reward: -140
episode:  31   reward: -137
episode:  32   reward: -139
episode:  33   reward: -139
episode:  34   reward: -140
episode:  35   reward: -139
ep