# Deep Deterministic Policy Gradient

코드 구현에 필요한 패키지 불러오기

In [None]:
# Import packages

import gym

import random
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F

사용할 게임 환경 설정 & Hyper parameter 정의하기

게임 환경 : pendulum-v0

In [None]:
# Hyper parameters
H1 = 400  # Number of hidden nodes
H2 = 300  # Number of hidden nodes
EPS = 0.03
EPS_DECAY = 0.001
BATCH_SIZE = 128
N_EPISODE = 200
MAX_STEP = 1000
TAU = 0.001  # Soft-update
LR = 0.001  # Critic learning rate
LR_A = 0.0001  # Actor learning rate
GAMMA = 0.99  # Reward discount
EPSILON_MIN = 0.001
EPS_DECAY = 0.001  # EPSILON decay
EPSILON = 1.0  # Greedy policy
MEMORY_CAPACITY = 1000000  # Update frequency
EPS_DECAY = 0.001  # EPSILON decay
USE_CUDA = torch.cuda.is_available()  # Use GPU

env = gym.make('Pendulum-v0')
N_ACTIONS = env.action_space.shape[0]
ACTIONS_LIM = env.action_space.high[0]
N_STATES = env.observation_space.shape[0]

Replay Buffer 정의 

경험(s, a, r, t, s_)를 구성하여 replay buffer에 저장

replay buffer의 수용량이 초과할 경우 저장된 경험에서 일부 가져와 학습 진행

In [None]:
class ReplayBuffer(object):
    def __init__(self, buffer_size, random_seed=123):
        self.buffer_size = buffer_size
        self.count = 0
        self.buffer = []
        random.seed(random_seed)

    def add(self, s, a, r, t, s2):
        experience = (s, a, r, t, s2)
        if self.count < self.buffer_size:
            self.buffer.append(experience)
            self.count += 1
        else:
            self.buffer.pop(0)
            self.buffer.append(experience)

    def size(self):
        return self.count

    def sample_batch(self, batch_size):
        if self.count < batch_size:
            batch = random.sample(self.buffer, self.count)
        else:
            batch = random.sample(self.buffer, batch_size)

        s_batch = np.array([_[0] for _ in batch])
        a_batch = np.array([_[1] for _ in batch])
        r_batch = np.array([_[2] for _ in batch])
        t_batch = np.array([_[3] for _ in batch])
        s2_batch = np.array([_[4] for _ in batch])

        return s_batch, a_batch, r_batch, t_batch, s2_batch

    def clear(self):
        self.buffer = []
        self.count = 0

입력(Input)인 게임 상태정보에 대해 Noise를 추가하는 클래스 정의

In [None]:
class OUNoise:
    """docstring for OUNoise"""
    def __init__(self, action_dimension, mu=0, theta=0.15, sigma=0.2):
        self.action_dimension = action_dimension
        self.mu = mu
        self.theta = theta
        self.sigma = sigma
        self.state = np.ones(self.action_dimension) * self.mu
        self.reset()

    def reset(self):
        self.state = np.ones(self.action_dimension) * self.mu

    def noise(self):
        x = self.state
        dx = self.theta * (self.mu - x) + self.sigma * np.random.randn(len(x))
        self.state = x + dx
        return self.state

Fully Connected Neural Network 구현

모든 노드 간에 연결 되어있는 모델

입력(Input) : 게임의 상태 정보

출력(Output) : Critic network의 경우 행동 가치 함수의 반환 값 / Actor network의 경우 정책 함수의 반환 값

In [None]:
# Create actor-critic network

class Critic(nn.Module):
    def __init__(self):
        super(Critic, self).__init__()
        
        self.fc1 = nn.Linear(N_STATES, H1)
        self.fc1.weight.data.normal_(0, 0.1)
    
        self.fc2 = nn.Linear(H1 + N_ACTIONS, H2)
        self.fc2.weight.data.normal_(0, 0.1)
    
        self.fc3 = nn.Linear(H2, 1)
        self.fc3.weight.data.uniform_(-EPS, EPS)
        
        self.relu = nn.ReLU()
        
    def forward(self, state, action):
        """
        return critic Q(s,a)
        :param state: state [n, state_dim] (n is batch_size)
        :param action: action [n, action_dim]
        :return: Q(s,a) [n, 1]
        """
        
        s1 = self.relu(self.fc1(state))
        x = torch.cat((s1, action), dim=1)
        
        x = self.relu(self.fc2(x))
        action_value = self.fc3(x)
        
        return action_value
    

class Actor(nn.Module):
    def __init__(self):
        """
        :param state_dim: int
        :param action_dim: int
        :param action_lim: Used to limit action space in [-action_lim,action_lim]
        :return:
        """
        super(Actor, self).__init__()
        
        self.fc1 = nn.Linear(N_STATES, H1)
        self.fc1.weight.data.normal_(0, 0.1)
    
        self.fc2 = nn.Linear(H1, H2)
        self.fc2.weight.data.normal_(0, 0.1)
        
        self.fc3 = nn.Linear(H2, N_ACTIONS)
        self.fc3.weight.data.uniform_(-EPS, EPS)
        
        self.relu = nn.ReLU()
        self.tanh = nn.Tanh()
        
    def forward(self, state):
        """
        return actor policy function Pi(s)
        :param state: state [n, state_dim]
        :return: action [n, action_dim]
        """
        
        x = self.relu(self.fc1(state))
        x = self.relu(self.fc2(x))
        action = self.tanh(self.fc3(x))  # tanh limit (-1, 1)
        return action
        

DDPG Agent 정의

1. Actor Network와 Critic Network를 정의 & Optimizer(최적화 기법)와 손실 함수(Loss function)정의

2. 행동 선택 : 연속적인 행동 (-2,2)에서 상태 정보에 노이즈를 추가하여 (-1,1) 범위의 행동으로 수정 및 선택

continuous action spaces에서의 주된 과제는 탐험(exploration)이다. DDPG는 지속적인 탐험을 위해 action에 노이즈를 준다.

3. 경험(s, a, r, t, s_)이 저장된 replay buffer로부터 batch size만큼 가져와 tensor형태로 변환 후, 손실 함수(Loss function)를 최소화하는 방향으로 가중치를 학습

4. Soft target update는 업데이트 되는 Q train network가 target network 값의 계산에도 사용되기 때문에, Q 업데이트가 발산하는 경향이 있다.

따라서 target network 값을 계산하는데 사용되는 actor와 critic train network의 weights를 복사한다.

이 target network의 weights는 학습된 networks를 천천히 추적하도록 함으로써 업데이트 된다.

즉 target network 값을 천천히 변하도록 제한하여 학습의 안정성을 높이게 되는 것이다.

5. Critic network의 손실 함수를 최소화하는 방향으로 가중치 업데이트 후, Actor network의 가중치 학습을 진행. 이때 Critic network에서 업데이트한 가중치를 포함하여 정책 함수의 가중치를 업데이트 한다.

In [None]:
class DDPG(object):
    def __init__(self):
        self.is_training = True
        self.epsilon = EPSILON
        self.eps_decay = EPS_DECAY
        self.randomer = OUNoise(N_ACTIONS)
        self.buffer = ReplayBuffer(MEMORY_CAPACITY)
        
        self.actor = Actor()  # Train network
        self.actor_target = Actor()  # Target network
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=LR_A)
        
        self.critic = Critic()  # Train network
        self.critic_target = Critic()  # Target network
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=LR)
        
        self.hard_update(self.actor_target, self.actor)
        self.hard_update(self.critic_target, self.critic)
        
        if USE_CUDA:
            self.cuda()
    
    def select_action(self, state):
        state = torch.tensor(state, dtype=torch.float).unsqueeze(0)
        
        if USE_CUDA:
            state = state.cuda()
            
        action = self.actor(state).detach()
        action = action.squeeze(0).cpu().numpy()
        action += self.is_training * max(EPSILON, EPSILON_MIN) * self.randomer.noise()
        action = np.clip(action, -1.0, 1.0)
        
        self.action = action
        return action
    
    def learn(self):
        s1, a1, r1, t1, s2 = self.buffer.sample_batch(BATCH_SIZE)
        # bool -> int
        t1 = (t1 == False) * 1
        s1 = torch.tensor(s1, dtype=torch.float)
        a1 = torch.tensor(a1, dtype=torch.float)
        r1 = torch.tensor(r1, dtype=torch.float)
        t1 = torch.tensor(t1, dtype=torch.float)
        s2 = torch.tensor(s2, dtype=torch.float)
        if USE_CUDA:
            s1 = s1.cuda()
            a1 = a1.cuda()
            r1 = r1.cuda()
            t1 = t1.cuda()
            s2 = s2.cuda()
        
        a2 = self.actor_target(s2).detach()  # Don't backporpagate
        target_q = self.critic_target(s2, a2).detach()  # Don't backporpagate
        y_expected = r1[:, None] + t1[:, None] * GAMMA * target_q
        y_predicted = self.critic.forward(s1, a1)
        
        # critic gradient
        critic_loss = nn.MSELoss()
        loss_critic = critic_loss(y_predicted, y_expected)
        self.critic_optimizer.zero_grad()
        loss_critic.backward()
        self.critic_optimizer.step()
        
        # actor gradient
        pred_a = self.actor.forward(s1)
        loss_actor = (-self.critic.forward(s1, pred_a)).mean()
        self.actor_optimizer.zero_grad()
        loss_actor.backward()
        self.actor_optimizer.step()
        
        self.soft_update(self.actor_target, self.actor, TAU)
        self.soft_update(self.critic_target, self.critic, TAU)
        
        return loss_actor.item(), loss_critic.item()
    
    def cuda(self):
        self.actor.cuda()
        self.actor_target.cuda()
        self.critic.cuda()
        self.critic_target.cuda()
        
    def soft_update(self, target, source, tau=0.001):
        """
        update target by target = tau * source + (1 - tau) * target
        :param target: Target network
        :param source: source network
        :param tau: 0 < tau << 1
        :return:
        """
        for target_param, param in zip(target.parameters(), source.parameters()):
            target_param.data.copy_(
                target_param.data * (1.0 - tau) + param.data * tau
            )

    def hard_update(self, target, source):
        """
        update target by target = source
        :param target: Target network
        :param source: source network
        :return:
        """
        for target_param, param in zip(target.parameters(), source.parameters()):
                target_param.data.copy_(param.data)  
     
    def decay_epsilon(self):
        self.epsilon -= self.eps_decay

    def reset(self):
        self.randomer.reset()

In [None]:
# DDPG agent define
agent = DDPG()

학습

에피소드 내에 제한된 Timestep에서 경험을 생성하여 저장 후 BATCH_SIZE보다 클 경우 학습을 진행

In [None]:
def train(pre_episodes=0, pre_total_step=0):
    total_step = pre_total_step
    
    all_rewards = []
    for ep in range(pre_episodes + 1, N_EPISODE + 1):
        s0 = env.reset()
        agent.reset()
        
        done = False
        step = 0
        actor_loss, critics_loss, reward = 0, 0, 0
        
        # decay noise
        agent.decay_epsilon()
        
        while not done:
            #env.render()
            action = agent.select_action(s0)
            
            s1, r1, done, info = env.step(action)
            agent.buffer.add(s0, action, r1, done, s1)
            s0 = s1
            
            if agent.buffer.size() > BATCH_SIZE:
                loss_a, loss_c = agent.learn()
                actor_loss += loss_a
                critics_loss += loss_c
                
            reward += r1
            step += 1
            total_step += 1
            
            if step + 1 > MAX_STEP:
                break
        
        all_rewards.append(reward)
        avg_reward = float(np.mean(all_rewards[-100:]))
        
        print('total step: %5d, episodes %3d, episode_step: %5d, episode_reward: %5f'
              % (total_step, ep, step, reward))     
    
    env.close()
    
    plt.title("Pendulum scores for 200 episodes")
    plt.plot(np.arange(200), all_rewards)
    plt.xlabel("episode")
    plt.ylabel("ep_reward")
    plt.show()

In [None]:
train()
