In [None]:
!pip install gym

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import gym
import collections
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import tqdm
import matplotlib.pyplot as plt
from torch.distributions.categorical import Categorical

In [None]:
learning_rate = 0.005
gamma = 0.98
lmbda = 0.95
eps_clip  = 0.1
K_epochs = 5

policy_losses, value_losses = [], []

In [None]:
class PPO(nn.Module):
  def __init__(self):
    super(PPO, self).__init__()
    self.data = []
    self.lmbda = lmbda # GAE(Generalized Advantage Estimation)에 쓰이는 계수
    self.gamma = gamma
    self.eps = eps_clip

    self.fc1 = nn.Linear(4, 256)
    self.fc_pi = nn.Linear(256, 2) # action
    self.fc_v = nn.Linear(256, 1) # value
    self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)

  # policy
  def pi(self, x, softmax_dim=0): # softmax_dim은 배치처리를 위한 
    x = F.relu(self.fc1(x))
    x = self.fc_pi(x)
    prob = F.softmax(x, dim=softmax_dim) # 어떤 디맨션 끼리 softmax해야하는지.
    return prob # action probability distribution

  # value
  def v(self, x):
    x = F.relu(self.fc1(x))
    v = self.fc_v(x)
    return v

  # put data : 데이터(trajectory) 넣는 부분
  def put_data(self, transition):
    self.data.append(transition)

  # make batch
  def make_batch(self):
    s_list, a_list, r_list, s_prime_list, prob_a_list, done_mask_list = [], [], [], [], [], []
    
    for transition in self.data:
      s, a, r, s_prime, prob_a, done = transition

      s_list.append(s)
      a_list.append([a]) # [a]는 파이토치 unsqueeze와 같다.
      r_list.append([r])
      s_prime_list.append(s_prime)
      prob_a_list.append([prob_a])
      done_mask = 0 if done else 1
      done_mask_list.append([done_mask])

    s, a, r, s_prime, prob_a, done_mask = torch.tensor(s_list, dtype=torch.float), torch.tensor(a_list), torch.tensor(r_list),\
                                  torch.tensor(s_prime_list, dtype=torch.float), torch.tensor(prob_a_list), \
                                  torch.tensor(done_mask_list, dtype=torch.float)
    
    self.data = []
    return s, a, r, s_prime, prob_a, done_mask

  def train(self):
    s, a, r, s_prime, prob_a, done_mask = self.make_batch()

    for i in range(K_epochs):
      # 한번 갔으니까 가서 얻은 리워드 + 간 곳에서 기대하는 가치( v(s) ) 인데 한번 갔으니 감마 곱하기
      td_target = r + gamma * self.v(s_prime) * done_mask
      delta = td_target - self.v(s) # 차이가 어드밴티지의 정의
      delta = delta.detach().numpy() # 이해한 원래의 Advantage 값. GAE를 위해서 delta로 적음

      # 델타를 뒤에서부터 보면서 gamma랑 lamda를 곱하고
      # 계속해서 더해나가면서 저장
      advantage_list = []
      advantage = 0.0
      for delta_t in delta[::-1]:
        advantage = gamma * lmbda * advantage + delta_t[0] # 그럼 먼저들어간것들은 람다랑 감마가 계속 곱해짐
        advantage_list.append([advantage]) 
      advantage_list.reverse() # 위에서 [::-1]로 거꾸로 쌓았기 때문에
      advantage = torch.tensor(advantage_list, dtype=torch.float)
      
      # 확률 뽑고
      pi = self.pi(s, softmax_dim=1)
      # 그 확률 중에서 실제 했던 액션의 최신 policy에서의 확률 구하고.
      pi_a = pi.gather(1, a)
      # ratio 계산. -> clipped loss function에서 (파이 세타 / 파이 세타 올드) 한 그 ratio. 근데 a/b == exp(log(a) - log(b))이기도 해서 이렇게 적는다. 더 좋다고함
      ratio = torch.exp(torch.log(pi_a) - torch.log(prob_a)) # prob_a는 경험 쌓을 때 했던 액션의 확률. pi_a는 그 액션이 현재 policy에서의 확률

      # ratio * advantage = min의 좌측
      surr1 = ratio * advantage
      # min의 우측 : clip 한거 clip(ratio, 1-e, 1+e,) * A
      surr2 = torch.clamp(ratio, 1 - self.eps, 1 + self.eps)*advantage # clip은 ratio가 1-e 보다 작으면 1-e로, 1+e 보다 크면 1+e로 만드는 것.
      # 위 두개 써서 계산 + td target과 v의 smooth error loss => 작아져야함
      # 왼쪽은 policy loss, 오른쪽은 value loss. 
      loss = -torch.min(surr1, surr2) + F.smooth_l1_loss(self.v(s), td_target.detach())
      value_losses.append(F.smooth_l1_loss(self.v(s), td_target.detach()))
      policy_losses.append(-torch.min(surr1, surr2))

      self.optimizer.zero_grad()
      loss.mean().backward()
      self.optimizer.step()


In [None]:
env = gym.make('CartPole-v1')
T = 20 # 몇 타임스텝 동안 trajectories를 모을지.
model = PPO()
score = 0.0
print_interval = 20
num_episodes = 10000

In [None]:
for episode in range(num_episodes):
  # Reset environment and get first new onservation
  state = env.reset()
  done = False

  # The Q-Table learning algorithm
  while not done:
    # collect during T timesteps
    for t in range(T):
      prob = model.pi(torch.from_numpy(state).float()) # 모델한테 확률 뱉으라고 하고
      m = Categorical(prob) # 확률을 categorical 변수로 만들어서
      a = m.sample().item() # action sampling.
      s_prime, r, done, _ = env.step(a) # action을 던진다. s_prime은 바뀐 state
      model.put_data((state, a, r/100.0, s_prime, prob[a].item(), done)) # r은 값이 커서 그냥 100으로 나눔. 학습이 잘되더라
      # prob[a].item()은 왜 갑자기 저장하지? -> 실제 내가 했던 action의 확률값. PPO에 ratio 계산을 하는데 그 때 필요하다.
      state = s_prime

      score += r
      if done:
        break
    
    model.train()
  if episode%print_interval==0 and episode!=0:
      print("# of episode :{}, avg score : {:.1f}".format(episode, score/print_interval))
      score = 0.0

# of episode :20, avg score : 11.8
# of episode :40, avg score : 9.4
# of episode :60, avg score : 10.6
# of episode :80, avg score : 10.3
# of episode :100, avg score : 9.8
# of episode :120, avg score : 10.1
# of episode :140, avg score : 10.2
# of episode :160, avg score : 12.8
# of episode :180, avg score : 29.6
# of episode :200, avg score : 44.9
# of episode :220, avg score : 82.4
# of episode :240, avg score : 84.7
# of episode :260, avg score : 81.0
# of episode :280, avg score : 110.4
# of episode :300, avg score : 138.2
# of episode :320, avg score : 108.8
# of episode :340, avg score : 156.3
# of episode :360, avg score : 204.8
# of episode :380, avg score : 133.9
# of episode :400, avg score : 128.8
# of episode :420, avg score : 196.4
# of episode :440, avg score : 248.0
# of episode :460, avg score : 179.5
# of episode :480, avg score : 464.7
# of episode :500, avg score : 234.8
# of episode :520, avg score : 120.3
# of episode :540, avg score : 118.0
# of episode :560, a

KeyboardInterrupt: ignored

In [None]:
m = nn.Softmax(dim=0)
input = torch.randn(2, 3)
output = m(input)

In [None]:
output

tensor([[0.0957, 0.1954, 0.8602],
        [0.9043, 0.8046, 0.1398]])

In [None]:
# import gym
# import torch
# import torch.nn as nn
# import torch.nn.functional as F
# import torch.optim as optim
# from torch.distributions import Categorical

# #Hyperparameters
# learning_rate = 0.0005
# gamma         = 0.98
# lmbda         = 0.95
# eps_clip      = 0.1
# K_epoch       = 3
# T_horizon     = 20

# class PPO(nn.Module):
#     def __init__(self):
#         super(PPO, self).__init__()
#         self.data = []
        
#         self.fc1   = nn.Linear(4,256)
#         self.fc_pi = nn.Linear(256,2)
#         self.fc_v  = nn.Linear(256,1)
#         self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)

#     def pi(self, x, softmax_dim = 0):
#         x = F.relu(self.fc1(x))
#         x = self.fc_pi(x)
#         prob = F.softmax(x, dim=softmax_dim)
#         return prob
    
#     def v(self, x):
#         x = F.relu(self.fc1(x))
#         v = self.fc_v(x)
#         return v
      
#     def put_data(self, transition):
#         self.data.append(transition)
        
#     def make_batch(self):
#         s_lst, a_lst, r_lst, s_prime_lst, prob_a_lst, done_lst = [], [], [], [], [], []
#         for transition in self.data:
#             s, a, r, s_prime, prob_a, done = transition
            
#             s_lst.append(s)
#             a_lst.append([a])
#             r_lst.append([r])
#             s_prime_lst.append(s_prime)
#             prob_a_lst.append([prob_a])
#             done_mask = 0 if done else 1
#             done_lst.append([done_mask])
            
#         s,a,r,s_prime,done_mask, prob_a = torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst), \
#                                           torch.tensor(r_lst), torch.tensor(s_prime_lst, dtype=torch.float), \
#                                           torch.tensor(done_lst, dtype=torch.float), torch.tensor(prob_a_lst)
#         self.data = []
#         return s, a, r, s_prime, done_mask, prob_a
        
#     def train_net(self):
#         s, a, r, s_prime, done_mask, prob_a = self.make_batch()

#         for i in range(K_epoch):
#             td_target = r + gamma * self.v(s_prime) * done_mask
#             delta = td_target - self.v(s)
#             delta = delta.detach().numpy()

#             advantage_lst = []
#             advantage = 0.0
#             for delta_t in delta[::-1]:
#                 advantage = gamma * lmbda * advantage + delta_t[0]
#                 advantage_lst.append([advantage])
#             advantage_lst.reverse()
#             advantage = torch.tensor(advantage_lst, dtype=torch.float)

#             pi = self.pi(s, softmax_dim=1)
#             pi_a = pi.gather(1,a)
#             ratio = torch.exp(torch.log(pi_a) - torch.log(prob_a))  # a/b == exp(log(a)-log(b))

#             surr1 = ratio * advantage
#             surr2 = torch.clamp(ratio, 1-eps_clip, 1+eps_clip) * advantage
#             loss = -torch.min(surr1, surr2) + F.smooth_l1_loss(self.v(s) , td_target.detach())

#             self.optimizer.zero_grad()
#             loss.mean().backward()
#             self.optimizer.step()
        
# env = gym.make('CartPole-v1')
# model = PPO()
# score = 0.0
# print_interval = 20

# for n_epi in range(10000):
#     s = env.reset()
#     done = False
#     while not done:
#         for t in range(T_horizon):
#             prob = model.pi(torch.from_numpy(s).float())
#             m = Categorical(prob)
#             a = m.sample().item()
#             s_prime, r, done, info = env.step(a)

#             model.put_data((s, a, r/100.0, s_prime, prob[a].item(), done))
#             s = s_prime

#             score += r
#             if done:
#                 break

#         model.train_net()

#     if n_epi%print_interval==0 and n_epi!=0:
#         print("# of episode :{}, avg score : {:.1f}".format(n_epi, score/print_interval))
#         score = 0.0

# env.close()