In [1]:
import torch
from torch import nn 
from torch.nn import functional
import gym
import numpy as np
from collections import deque
import random

def cuda_tensor(x):
    return torch.tensor(x, dtype=torch.float32, device='cuda')

# 과거 기억 메모리
# Policy gradient 기반 알고리즘들은 과거 기억 메모리를 적용하기가 곤란하지만,
# DDPG는 예외적으로 가능함.
class ReplayMemory:
    def __init__(self, maxlen):
        self.memory = deque(maxlen=maxlen)
    
    def push(self, i_state:np.ndarray, i_action:np.ndarray, reward:float, done:bool, f_state:np.ndarray):
        self.memory.append((i_state, i_action, reward, done, f_state))
    
    def len(self):
        return len(self.memory)
    
    def sample(self, batch_size):
        i_states, i_actions, rewards, dones, f_states = zip(*random.sample(self.memory, batch_size))
        i_states = torch.stack(tuple(map(cuda_tensor, i_states)))
        f_states = torch.stack(tuple(map(cuda_tensor, f_states)))
        i_actions = torch.stack(tuple(map(cuda_tensor, i_actions)))
        rewards = torch.stack(tuple(map(cuda_tensor, rewards)))
        dones = torch.stack(tuple(map(cuda_tensor, dones)))

        return i_states, i_actions, rewards, dones, f_states

# actor
# 상태 (8개의 실수) 를 입력받아 
# 2개의 -1 ~ 1 사이의 실수로 이루어진 행위 벡터를 출력
# 범위를 맞추기 위해 출력층에는 tanh를 돌려줌
class Actor(nn.Module):
    def __init__(self):
        super(Actor, self).__init__()
        self.network = nn.Sequential(
            nn.Linear(8, 400),
            nn.ReLU(),
            nn.Linear(400, 300),
            nn.ReLU(),
            nn.Linear(300, 2),
            nn.Tanh() 
        ).to('cuda')
        self.optimizer=torch.optim.Adam(self.network.parameters(), 0.0001)
    
    def forward(self, x):
        return self.network.forward(x)
    
    # 다양한 경험을 해 볼 수 있어야 하기 떄문에, 액션 출력 전
    # 신경망의 파라미터들에 표준정규분포에 따라 고른 무작위의 실수에 noise_scalar를 곱한 값을 더해줌
    # noise scalar는 학습이 진행될수록 점차 줄여나갔음
    def choose_action(self, state, noise_scalar):
        with torch.no_grad():
            noisy_network = Actor()
            noisy_network.load_state_dict(self.state_dict())
            for i in noisy_network.parameters():
                i.data += torch.randn_like(i.data) * noise_scalar
            return noisy_network.forward(state).cpu().numpy()

# critic
# 상태를 나타내는 8개의 실수와 actor의 행위를 나타내는 2개의 실수를 입력받아
# 1개의 추후 누적 보상의 예상 값을 출력함
class Critic(nn.Module):
    def __init__(self):
        super(Critic, self).__init__()
        self.network = nn.Sequential(
            nn.Linear(10, 400),
            nn.ReLU(),
            nn.Linear(400, 300),
            nn.ReLU(),
            nn.Linear(300, 1)
        ).to('cuda')
        self.optimizer=torch.optim.Adam(self.network.parameters(), 0.001, weight_decay=0.01)
    
    def forward(self, x):
        return self.network.forward(x)

class Agent:
    def __init__(self):
        self.noise = 0.3
        self.noise_decay = .002
        self.gamma = .99
        
        # actor와 critic의 분신(target)을 만들어서, 
        # 기존의 놈들 (main) 을 편미분할 때 상수 취급해야 할 값들은,
        # 전부 분신들이 출력하게 함. 
        # 분신은 따로 학습시키지 않으며, 대신 학습된 main 놈들이
        # 일정 주기마다 분신에게 뇌를 복사해 주는거임.
        # 뻘짓 같아 보이지만, 학습의 안정성을 크게 높여 줌
        self.main_actor = Actor()
        self.target_actor = Actor()

        self.main_critic = Critic()
        self.target_critic= Critic()
    
        self.target_actor.load_state_dict(self.main_actor.state_dict())
        self.target_critic.load_state_dict(self.main_critic.state_dict())

        self.replay = ReplayMemory(1000000)

    def learn(self):
        if self.replay.len()<128:
            return
        
        i_states, i_actions, rewards, dones, f_states= self.replay.sample(128)
        
        # critic 신경망 업데이트
        # 상태 s, 행위 a, 그로 인해 받은 보상 R, 다음 상태 s', s'에서 actor가 고를 행위 a'에 대해
        # critic이 예측한 값 Q(s, a) == R + self.gamma * Q(s', a') 이 성립하게끔 학습시킴
        with torch.no_grad():
            f_actions = self.target_actor.forward(f_states)
            f_state_action_vals = self.target_critic.forward(torch.cat((f_states, f_actions), dim=1)).squeeze()
            td_targets = f_state_action_vals * (1-dones) * self.gamma + rewards

        i_state_action_vals = self.main_critic.forward(torch.cat((i_states, i_actions), dim=1)).squeeze()

        critic_loss = functional.smooth_l1_loss(i_state_action_vals, td_targets)
        self.main_critic.optimizer.zero_grad()
        critic_loss.backward()
        self.main_critic.optimizer.step()

        # actor 신경망 업데이트
        # s를 입력하여 a를 얻고, a를 critic에 입력하여 Q(s, a)를 얻은 뒤,
        # 이를 역전파(연쇄법칙에 따라 미분)를 해주면 
        # Q(s, a)를 actor 신경망의 파라미터들로 미분한 값가지 모두 얻을 수 있음
        # 다만 파이토치 내장 옵티마이저는 역전파된 값(Q(s, a))을 최소화 하려 할 것이기 때문에,
        # 우리의 목표대로 Q(s, a)를 최대화 하려면, 
        # 부호를 바꾼 -Q(s, a)를 역전파 시켜서 이를 최소화 하라고 하면 됨.
        det_actions = self.main_actor.forward(i_states)
        state_action_vals = -self.main_critic.forward(torch.cat((i_states, det_actions), dim=1))
        
        self.main_actor.optimizer.zero_grad()
        state_action_vals.mean().backward()
        self.main_actor.optimizer.step()

    def choose_action(self, state:np.ndarray ):
        return self.main_actor.choose_action(cuda_tensor(state), self.noise)
    
    def update_target(self):
        self.target_actor.load_state_dict(self.main_actor.state_dict())
        self.target_critic.load_state_dict(self.main_critic.state_dict())


In [2]:
env=gym.make('LunarLanderContinuous-v2')
agent = Agent()
total_steps = 0

max_score=0
actor_model_path="ddpg_actor.pth"
critic_model_path = "ddpg_critic.pth"

In [None]:

for episode in range(1000):
    i_state = env.reset()
    score=0
    while True:
        total_steps += 1
        action = agent.choose_action(i_state)
        f_state, reward, done, _ = env.step(action)
        
        agent.replay.push(np.copy(i_state), np.copy(action), reward, done, np.copy(f_state))
        agent.learn()
        score += reward
        
        env.render('rgb_array')
        
        if total_steps % 30 == 0:
            agent.update_target()
            
        if done:
            break

        i_state = np.copy(f_state)

    if score > max_score:
        max_score = score
        torch.save(agent.main_actor.state_dict(), actor_model_path)
        torch.save(agent.main_critic.state_dict(), critic_model_path)
    agent.noise = max(0.01, agent.noise - agent.noise_decay)
        