# REINFORCE

가장 간단한 Policy gradient algorithm

In [39]:
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

In [40]:
#Hyperparameters
learning_rate = 0.0002
gamma         = 0.98

In [41]:
class Policy(nn.Module):
    def __init__(self):
        super(Policy, self).__init__()
        self.data = []
        
        self.fc1 = nn.Linear(4, 128) # 4 inputs : state
        self.fc2 = nn.Linear(128, 2) # 2 outputs : action (left or right)
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.softmax(self.fc2(x), dim=0) # softmax : probability, dim=0 : column
        return x
      
    def put_data(self, item):
        self.data.append(item)
        
    def train_net(self):
        R = 0
        self.optimizer.zero_grad()
        for r, prob in self.data[::-1]:
            R = r + gamma * R
            loss = -torch.log(prob) * R # policy gradient
            loss.backward()
        self.optimizer.step() # update parameter
        self.data = []

In [42]:
def main():
    env = gym.make('CartPole-v1')
    pi = Policy()
    score = 0.0
    print_interval = 20
    
    for n_epi in range(2001):
        s, _ = env.reset()
        done = False
        
        while not done: # CartPole-v1 forced to terminates at 500 step.
            prob = pi(torch.from_numpy(s).float())
            m = Categorical(prob) # discrete probability distributions : allows you to sample from the distribution and compute various properties
            a = m.sample() # 하나의 action을 sampling
            s_prime, r, done, truncated, info = env.step(a.item())
            pi.put_data((r,prob[a])) # network update를 위해 reward와 해당 action의 확률을 저장
            s = s_prime
            score += r
            
        pi.train_net() # episode가 끝날 때마다 policy network를 update
        
        if n_epi%print_interval==0 and n_epi!=0:
            print("# of episode :{}, avg score : {}".format(n_epi, score/print_interval))
            score = 0.0
    env.close()

In [43]:
if __name__ == '__main__':
    main()

# of episode :20, avg score : 21.3
# of episode :40, avg score : 20.35
# of episode :60, avg score : 25.0
# of episode :80, avg score : 25.6
# of episode :100, avg score : 26.5
# of episode :120, avg score : 24.3
# of episode :140, avg score : 25.05
# of episode :160, avg score : 24.3
# of episode :180, avg score : 31.0
# of episode :200, avg score : 32.2
# of episode :220, avg score : 31.3
# of episode :240, avg score : 27.0
# of episode :260, avg score : 29.75
# of episode :280, avg score : 40.4
# of episode :300, avg score : 37.65
# of episode :320, avg score : 34.0
# of episode :340, avg score : 39.5
# of episode :360, avg score : 38.8
# of episode :380, avg score : 36.45
# of episode :400, avg score : 34.4
# of episode :420, avg score : 39.2
# of episode :440, avg score : 53.25
# of episode :460, avg score : 45.95
# of episode :480, avg score : 47.55
# of episode :500, avg score : 43.55
# of episode :520, avg score : 44.2
# of episode :540, avg score : 41.65
# of episode :560, avg

# ActorCritic

정책 네트워크와 밸류 네트워크를 함께 학습하는 방법론

* Q AC 
* Advantage AC
* TD AC

In [44]:
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

In [45]:
#Hyperparameters
learning_rate = 0.0002
gamma         = 0.98
n_rollout     = 10 # 몇 틱의 데이터를 쌓아서 업데이트할 것인지 -> 10번의 상태 전이를 모아서 업데이트

In [46]:
class TDActorCritic(nn.Module):
    def __init__(self):
        super(TDActorCritic, self).__init__()
        self.data = []
        
        self.fc1 = nn.Linear(4,256) # 4 inputs : state, 두 개의 네트워크가 해당 레이어를 공유
        self.fc_pi = nn.Linear(256,2) # 2 outputs : action (left or right)
        self.fc_v = nn.Linear(256,1) # 1 outputs : value
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
        
    def pi(self, x, softmax_dim = 0): # 정책 네트워크 
        x = F.relu(self.fc1(x))
        x = self.fc_pi(x)
        prob = F.softmax(x, dim=softmax_dim)
        return prob
    
    def v(self, x): # 밸류 네트워크
        x = F.relu(self.fc1(x))
        v = self.fc_v(x)
        return v
    
    def put_data(self, transition):
        self.data.append(transition)
        
    def make_batch(self): # mini-batch를 만들어주는 함수
        s_lst, a_lst, r_lst, s_prime_lst, done_lst = [], [], [], [], []
        for transition in self.data:
            s,a,r,s_prime,done = transition
            s_lst.append(s)
            a_lst.append([a])
            r_lst.append([r/100.0])
            s_prime_lst.append(s_prime)
            done_mask = 0.0 if done else 1.0
            done_lst.append([done_mask])
        
        s_batch, a_batch, r_batch, s_prime_batch, done_batch = torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst), \
                                                               torch.tensor(r_lst, dtype=torch.float), torch.tensor(s_prime_lst, dtype=torch.float), \
                                                               torch.tensor(done_lst, dtype=torch.float)
        self.data = []
        return s_batch, a_batch, r_batch, s_prime_batch, done_batch
  
    def train_net(self):
        s, a, r, s_prime, done = self.make_batch()

        # value network
        td_target = r + gamma * self.v(s_prime) * done 
        delta = td_target - self.v(s) # v(s) 의 TD error

        # policy network        
        pi = self.pi(s, softmax_dim=1) # 이전이랑 tensor의 형태가 달라져서 올바른 함수 적용을 위해 softmax_dim=1로 바꿔줌
        pi_a = pi.gather(1,a) # gather(dim, index) : 해당 index의 값들을 모아서 반환

        # loss는 policy gradient와 value network의 loss를 합친 것
        loss = -torch.log(pi_a) * delta.detach() + F.smooth_l1_loss(self.v(s), td_target.detach()) 
        # detach: 해당 변수를 상수로 취급 -> 정답은 가만히 있고 예측치가 변하도록 하기 위함
        # TD 방식으로 value network의 loss 계산

        self.optimizer.zero_grad()
        loss.mean().backward() # 10개의 loss의 평균을 구한 후 backpropagation
        self.optimizer.step()         
      

In [47]:
def main():  
    env = gym.make('CartPole-v1')
    model = TDActorCritic()    
    print_interval = 20
    score = 0.0

    for n_epi in range(1001):
        done = False
        s, _ = env.reset()
        while not done:
            for t in range(n_rollout):
                prob = model.pi(torch.from_numpy(s).float()) # softmax_dim=0
                m = Categorical(prob)
                a = m.sample().item()
                s_prime, r, done, truncated, info = env.step(a)
                model.put_data((s,a,r,s_prime,done))
                
                s = s_prime
                score += r
                
                if done:
                    break                     
            
            model.train_net() # n_rollout 만큼의 데이터를 쌓은 후 학습 진행
            
        if n_epi%print_interval==0 and n_epi!=0:
            print("# of episode :{}, avg score : {:.1f}".format(n_epi, score/print_interval))
            score = 0.0
    env.close()

In [48]:
if __name__ == '__main__':
    main()

# of episode :20, avg score : 18.5
# of episode :40, avg score : 17.8
# of episode :60, avg score : 18.1
# of episode :80, avg score : 21.1
# of episode :100, avg score : 18.8
# of episode :120, avg score : 21.1
# of episode :140, avg score : 20.1
# of episode :160, avg score : 20.6
# of episode :180, avg score : 30.1
# of episode :200, avg score : 33.2
# of episode :220, avg score : 35.0
# of episode :240, avg score : 40.7
# of episode :260, avg score : 46.5
# of episode :280, avg score : 50.8
# of episode :300, avg score : 48.1
# of episode :320, avg score : 52.7
# of episode :340, avg score : 60.2
# of episode :360, avg score : 52.0
# of episode :380, avg score : 51.4
# of episode :400, avg score : 72.5
# of episode :420, avg score : 68.5
# of episode :440, avg score : 62.2
# of episode :460, avg score : 85.3
# of episode :480, avg score : 86.2
# of episode :500, avg score : 97.6
# of episode :520, avg score : 111.8
# of episode :540, avg score : 128.8
# of episode :560, avg score :