## 5. gradients parallelism 
#### : 여러 자식 process가 각각 학습하고, 학습 결과(gradients)를 마스터 process에 전달 
- 여러 자식 process가 자신의 training data를 사용해 gradients 계산하고(학습), 그것을 마스터 process에 전달 
- 마스터 process는 gradients를 모두 더하고 SGD 업데이트 수행

<img src="./image/gradient.png">

- 여러 GPU들을 통해 수행할 수 있기 때문에, 더 확장성이 높음 



In [None]:
import gym
import ptan
import argparse
from tensorboardX import SummaryWriter

import torch
import torch.nn.utils as nn_utils
import torch.nn.functional as F
import torch.optim as optim
import torch.multiprocessing as mp

from lib import common

In [1]:
GAMMA = 0.99
LEARNING_RATE = 0.001
ENTROPY_BETA = 0.01

REWARD_STEPS = 4
CLIP_GRAD = 0.1

PROCESSES_COUNT = 4 #자식 process개수
NUM_ENVS = 15

GRAD_BATCH = 64 #각 자식 process에서 loss, gradients를 계산할 batch 개수  
TRAIN_BATCH = 2 #SGD반복에 합쳐질 자식 process의 gradients batch 개수 

- 매 최적화 단계마다 TRAIN_BATCH * GRAD_BATCH (128)만큼의 학습 샘플이 사용됨 
    - loss 계산, backpropagation은 꽤 무거운 연산임으로 효율성을 위해 GRAD_BATCH사용 
    - 네트워크를 on-policy로 업데이트하는 것을 유지하기위해 TRAIN_BATCH를 작게 유지 

### grads_func(proc_name, net, device, train_queue):
#### - 자식 process에서 수행됨 
#### - 넘겨주는 인자 
- proc_name : process의 이름, Tensorboard writer 생성에 사용 
- net : 공유 네트워크 
- device : 연산을 수행할 device
- train_queue : 계산된 gradients를 중앙 process에 전달할 때 사용할 queue

In [None]:
def grads_func(proc_name, net, device, train_queue):
    
    """환경, 에이전트, 경험 생성"""
    envs = [make_env() for _ in range(NUM_ENVS)]

    agent = ptan.agent.PolicyAgent(lambda x: net(x)[0], device=device, apply_softmax=True)
    exp_source = ptan.experience.ExperienceSourceFirstLast(envs, agent, gamma=GAMMA, steps_count=REWARD_STEPS)

    batch = []
    frame_idx = 0
    writer = SummaryWriter(comment=proc_name)#

    
    with common.RewardTracker(writer, stop_reward=REWARD_BOUND) as tracker:
        with ptan.common.utils.TBMeanTracker(writer, batch_size=100) as tb_tracker:
            
            for exp in exp_source:
                frame_idx += 1
                new_rewards = exp_source.pop_total_rewards()
                
                if new_rewards and tracker.reward(new_rewards[0], frame_idx):
                    break

                batch.append(exp)
                
                if len(batch) < GRAD_BATCH:
                    continue

                states_v, actions_t, vals_ref_v = common.unpack_batch(batch, net, 
                                                                      last_val_gamma=GAMMA**REWARD_STEPS, 
                                                                      device=device)
                batch.clear()

                
                net.zero_grad()#optimizer가 아닌 network
                
                logits_v, value_v = net(states_v)
                loss_value_v = F.mse_loss(value_v.squeeze(-1), vals_ref_v)

                log_prob_v = F.log_softmax(logits_v, dim=1)
                adv_v = vals_ref_v - value_v.detach()
                log_prob_actions_v = adv_v * log_prob_v[range(GRAD_BATCH), actions_t]
                loss_policy_v = -log_prob_actions_v.mean()

                prob_v = F.softmax(logits_v, dim=1)
                entropy_loss_v = ENTROPY_BETA * (prob_v * log_prob_v).sum(dim=1).mean()

                loss_v = entropy_loss_v + loss_value_v + loss_policy_v
                loss_v.backward()

                tb_tracker.track("advantage", adv_v, frame_idx)
                tb_tracker.track("values", value_v, frame_idx)
                tb_tracker.track("batch_rewards", vals_ref_v, frame_idx)
                tb_tracker.track("loss_entropy", entropy_loss_v, frame_idx)
                tb_tracker.track("loss_policy", loss_policy_v, frame_idx)
                tb_tracker.track("loss_value", loss_value_v, frame_idx)
                tb_tracker.track("loss_total", loss_v, frame_idx)

                # gather gradients
                nn_utils.clip_grad_norm_(net.parameters(), CLIP_GRAD)
                
                grads = [param.grad.data.cpu().numpy() if param.grad is not None else None
                         for param in net.parameters()]
                #다음 반복에 변질(변형)될 것을 방지하기위해 개별 buffer에 gradients만 따로 저장 
                
                train_queue.put(grads) #train_queue에 저장되는 것은 gradients

    #자식 process의 게임이 solved되면 큐에 None추가 
    #reward가 REWARD_BOUND보다 크면 
    train_queue.put(None)

In [None]:
if __name__ == "__main__":
    mp.set_start_method('spawn')
    parser = argparse.ArgumentParser()
    parser.add_argument("--cuda", default=False, action="store_true", help="Enable cuda")
    parser.add_argument("-n", "--name", required=True, help="Name of the run")
    args = parser.parse_args()
    device = "cuda" if args.cuda else "cpu"
    
    """환경, 네트워크 설정"""
    env = make_env()
    net = common.AtariA2C(env.observation_space.shape, env.action_space.n).to(device)
    net.share_memory()

    
    optimizer = optim.Adam(net.parameters(), lr=LEARNING_RATE, eps=1e-3)

    """큐 생성, process생성"""
    train_queue = mp.Queue(maxsize=PROCESSES_COUNT)
    data_proc_list = []
    for proc_idx in range(PROCESSES_COUNT):
        proc_name = "-a3c-grad_" + NAME + "_" + args.name + "#%d" % proc_idx
        data_proc = mp.Process(target=grads_func, args=(proc_name, net, device, train_queue))
        data_proc.start()
        data_proc_list.append(data_proc)
        
        

    batch = []
    step_idx = 0
    grad_buffer = None

    try:
        while True:
            train_entry = train_queue.get() #gradients
            
            if train_entry is None: #게임이 solved된 것이면
                break

            
            #게임이 solved된 것이 아니면 
            step_idx += 1

            
            if grad_buffer is None:
                grad_buffer = train_entry
                
            else:
                for tgt_grad, grad in zip(grad_buffer, train_entry):
                    tgt_grad += grad #train_entry의 gradients를 모두 더함 
            
            #TRAIN_BATCH(2)마다 
            if step_idx % TRAIN_BATCH == 0:
                for param, grad in zip(net.parameters(), grad_buffer):
                    param.grad = torch.FloatTensor(grad).to(device) #더해진 gradients를 네트워크 파라미터의 grad로 배치 

                nn_utils.clip_grad_norm_(net.parameters(), CLIP_GRAD)
                optimizer.step() #배치된 gradient로 네트워크 업데이트 
                grad_buffer = None
                
    #GPU자원에서 차지하는 좀비 process를 방지하기위해            
    finally:
        for p in data_proc_list:
            p.terminate() #자식 process 종료 
            p.join()


- 표시되는 결과는 각 자식 process 의 local 값 (speed, 완료된 게임 개수)
    - 자식 process 개수 만큼을 곱해야 전체 process의 값이 나옴 
    

<img src = "./image/grad1.png">
<img src = "./image/grad2.png">
<img src = "./image/grad3.png">
<img src = "./image/grad4.png">
<img src = "./image/grad5.png">
<img src ="./image/grad_final.png">