# 110. Deep Neural Network을 이용한 함수 근사에서 필요한 torch basics

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

import random
import gym
from collections import deque, namedtuple

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

env = gym.make('CartPole-v1')  
action_size = env.action_space.n
action_size

cuda:0


2

## Experience Replay

In [3]:
class ReplayBuffer:
    """Fixed-size buffer to store experience tuples."""

    def __init__(self, action_size, buffer_size, batch_size, seed):
        """Initialize a ReplayBuffer object.

        Params
        ======
            action_size (int): dimension of each action
            buffer_size (int): maximum size of buffer
            batch_size (int): size of each training batch
            seed (int): random seed
        """
        self.action_size = action_size
        self.memory = deque(maxlen=buffer_size)  
        self.batch_size = batch_size
        self.experience = namedtuple("Experience", 
                                     field_names=["state", "action", "reward", "next_state", "done"])
        self.seed = random.seed(seed)
    
    def add(self, state, action, reward, next_state, done):
        """Add a new experience to memory."""
        e = self.experience(state, action, reward, next_state, done)
        self.memory.append(e)
    
    def sample(self):
        """Randomly sample a batch of experiences from memory."""
        experiences = random.sample(self.memory, k=self.batch_size)

        states = torch.from_numpy(
                        np.vstack([e.state for e in experiences if e is not None])).float().to(device)
        actions = torch.from_numpy(
                        np.vstack([e.action for e in experiences if e is not None])).long().to(device)
        rewards = torch.from_numpy(
                        np.vstack([e.reward for e in experiences if e is not None])).float().to(device)
        next_states = torch.from_numpy(
                        np.vstack([e.next_state for e in experiences if e is not None])).float().to(device)
        dones = torch.from_numpy(
                        np.vstack([e.done for e in experiences if e is not None]).astype(np.uint8)).float().to(device)
        return (states, actions, rewards, next_states, dones)

    def __len__(self):
        """Return the current size of internal memory."""
        return len(self.memory)

In [4]:
#Initialize replay memory D to capacity N
BUFFER_SIZE = 500
BATCH_SIZE = 3
seed=0

replay_memory = ReplayBuffer(action_size, BUFFER_SIZE, BATCH_SIZE, seed)

s = env.reset()
for i in range(10):
    a = env.action_space.sample()
    s_, r, done, _ = env.step(a)
    replay_memory.add(s, a, r, s_, done)
    s = s_
    
replay_memory.memory

deque([Experience(state=array([-0.03634721, -0.0365727 , -0.0331441 ,  0.03930578]), action=0, reward=1.0, next_state=array([-0.03707866, -0.23120407, -0.03235799,  0.32134992]), done=False),
       Experience(state=array([-0.03707866, -0.23120407, -0.03235799,  0.32134992]), action=0, reward=1.0, next_state=array([-0.04170274, -0.42585064, -0.02593099,  0.60365534]), done=False),
       Experience(state=array([-0.04170274, -0.42585064, -0.02593099,  0.60365534]), action=1, reward=1.0, next_state=array([-0.05021975, -0.2303758 , -0.01385788,  0.3029188 ]), done=False),
       Experience(state=array([-0.05021975, -0.2303758 , -0.01385788,  0.3029188 ]), action=0, reward=1.0, next_state=array([-0.05482727, -0.42529754, -0.00779951,  0.59119925]), done=False),
       Experience(state=array([-0.05482727, -0.42529754, -0.00779951,  0.59119925]), action=0, reward=1.0, next_state=array([-0.06333322, -0.62030943,  0.00402448,  0.88141515]), done=False),
       Experience(state=array([-0.063333

## Sample random minibatch

In [5]:
states, actions, rewards, next_states, dones = replay_memory.sample()

states, actions, rewards, next_states, dones

(tensor([[-0.0757, -0.4252,  0.0217,  0.5900],
         [-0.1052, -0.6218,  0.0634,  0.9158],
         [-0.0363, -0.0366, -0.0331,  0.0393]], device='cuda:0'), tensor([[0],
         [1],
         [0]], device='cuda:0'), tensor([[1.],
         [1.],
         [1.]], device='cuda:0'), tensor([[-0.0842, -0.6207,  0.0335,  0.8894],
         [-0.1176, -0.4276,  0.0817,  0.6437],
         [-0.0371, -0.2312, -0.0324,  0.3213]], device='cuda:0'), tensor([[0.],
         [0.],
         [0.]], device='cuda:0'))

## Select Action 

- state가 4 개의 feature로 구성되고 각 state에서의 action이 2 가지인 MDP의 parameter화 된 state action value function

In [11]:
n_inputs = 4  # state feature
n_outputs = 2  # action space
hidden_layer = 64

class NeuralNetwork(nn.Module):
    def __init__(self) -> None:
        super(NeuralNetwork, self).__init__()
        self.linear1 = nn.Linear(n_inputs, hidden_layer)
        self.linear2 = nn.Linear(hidden_layer, n_outputs)

    def forward(self, x):
        a1 = torch.relu(self.linear1(x))
        output = self.linear2(a1)
        return output

Q = NeuralNetwork().to(device)

- 입력 : 4 개 feature 로 구성된 state 
- 출력 : 2 개 action values  

- greedy action : $max_{a'}Q(s', a';\theta)$

In [14]:
input = [0.1, 0.2, 0.3, 0.4]

action_values = Q(torch.tensor(input).to(device))
action_values

tensor([-0.0737, -0.0007], device='cuda:0', grad_fn=<AddBackward0>)

In [15]:
# greedy action
action = torch.argmax(action_values).item() 
action

1

## State-Action Value (q value) from DQN 

Q-network 에서 입력으로 주어진 states 에 대응하는 action values 를 출력으로 얻어 greedy action 을 선택하는 code.  

함수 max()는 최대값과 해당 값의 인덱스를 모두 반환하므로 최대값과 argmax를 모두 계산합니다. 이 경우 값에만 관심이 있기 때문에 결과의 첫 번째 항목(values)을 사용합니다.

In [16]:
action_values = Q(states).cpu().detach()

print(action_values)
print(torch.max(action_values, dim=1))
print()

values, indices = torch.max(action_values, dim=1)

print(values)
print(indices)

tensor([[ 0.1251, -0.0336],
        [ 0.2277, -0.0451],
        [-0.0629, -0.0509]])
torch.return_types.max(
values=tensor([ 0.1251,  0.2277, -0.0509]),
indices=tensor([0, 0, 1]))

tensor([ 0.1251,  0.2277, -0.0509])
tensor([0, 0, 1])


## torch.gather

- torch.gather 함수 (또는 torch.Tensor.gather)는 다중 인덱스 선택 방법  

- 첫 번째 인수 인 input은 요소를 선택하려는 소스 텐서. 두 번째 dim은 수집하려는 차원. 마지막으로 index는 입력을 인덱싱하는 인덱스.

4개의 항목과 4개의 작업으로 구성된 일괄 처리가 있는 간단한 예제 사례에서 gather가 수행하는 작업의 요약입니다.

```
state_action_values = net(states_v).gather(1, actions_v.unsqueeze(1))
```


<img src=https://miro.medium.com/max/1400/1*fS-9p5EBKVgl69Gy0gwjGQ.png width=400>

In [17]:
states  # 4개의 feature

tensor([[-0.0757, -0.4252,  0.0217,  0.5900],
        [-0.1052, -0.6218,  0.0634,  0.9158],
        [-0.0363, -0.0366, -0.0331,  0.0393]], device='cuda:0')

In [18]:
q_values = Q(states)
q_values  # 2 개의 action values

tensor([[ 0.1251, -0.0336],
        [ 0.2277, -0.0451],
        [-0.0629, -0.0509]], device='cuda:0', grad_fn=<AddmmBackward0>)

In [21]:
action = torch.LongTensor([1, 0, 1]).unsqueeze(1).to(device)
action

tensor([[1],
        [0],
        [1]], device='cuda:0')

In [22]:
torch.gather(q_values, 1, action)  #q_value의 axis=1에서 action index 수집

tensor([[-0.0336],
        [ 0.2277],
        [-0.0509]], device='cuda:0', grad_fn=<GatherBackward0>)

In [23]:
q_values.gather(1, action)   # 위와 동일 operation

tensor([[-0.0336],
        [ 0.2277],
        [-0.0509]], device='cuda:0', grad_fn=<GatherBackward0>)

## REINFORECE 알고리즘 지원을 위한 PROBABILITY DISTRIBUTIONS - TORCH.DISTRIBUTIONS

- distribution 패키지에는 매개변수화할 수 있는 확률 분포와 sampling 함수가 포함되어 있습니다. 이를 통해 최적화를 위한 확률적 계산 그래프 및 확률적 기울기 추정기를 구성할 수 있습니다. 

- torch 는 다음과 같이 REINFORCE 알고리즘을 지원합니다.

```python
    probs = policy_network(state)
    m = Categorical(probs)
    action = m.sample()
    next_state, reward = env.step(action)
    loss = -m.log_prob(action) * reward
    loss.backward()
```

### 방법 1) Categorical(probs) 에서 sampling

'probs'가 길이가 'K'인 1차원 array인 경우, 각 element 는 해당 인덱스에서 클래스를 샘플링할 상대 확률입니다.

In [24]:
import torch
from torch.distributions import Categorical

logits = torch.rand(4)
probs = F.softmax(logits, dim=-1)
print(f"softmax 확률 분포 : {probs}, sum = {probs.sum()}")

# 각 class 를 sampling 할 상대 확률
m = Categorical(probs)
m

softmax 확률 분포 : tensor([0.1533, 0.3354, 0.2068, 0.3045]), sum = 0.9999998807907104


Categorical(probs: torch.Size([4]))

위의 m 에서 sampling 을 반복하면 softmax 확률 분포로 sampling 된다.

In [25]:
from collections import Counter
samples = []

for _ in range(30000):
    a = m.sample()
    samples.append(a.item())

sorted(Counter(samples).items())

[(0, 4607), (1, 9959), (2, 6245), (3, 9189)]

### 방법 2) np.random.choice 에서 sampling

- np.random.choice 의 `parameter p`에 softmax 확률 분포 지정하여 sampling

In [26]:
samples = []

for _ in range(30000):
    a = np.random.choice(4, p=probs.numpy())
    samples.append(a)
    
print(sorted(Counter(samples).items()))

[(0, 4601), (1, 10057), (2, 6273), (3, 9069)]


### REINFORCE 구현을  위해  total expected return $G_t$ 를 estimate 하는 방법

In [27]:
import numpy as np

rewards = [1, 2, 3, 4, 5]
gamma = 0.99

In [28]:
G_0 = 1 + 0.99 * 2 + 0.99**2*3 + 0.99**3*4 + 0.99**4*5
G_1 = 2 + 0.99**1*3 + 0.99**2*4 + 0.99**3*5
G_2 = 3 + 0.99**1*4 + 0.99**2*5
G_3 = 4 + 0.99**1*5
G_4 = 5
print(G_0, G_1, G_2, G_3, G_4)

14.604476049999999 13.741895 11.8605 8.95 5


In [29]:
r = np.array([gamma**i * rewards[i]
              for i in range(len(rewards))])
# Reverse the array direction for cumsum and then
# revert back to the original order
r = r[::-1].cumsum()[::-1]
# return r - r.mean()

In [30]:
# episodic task
Returns = []
G = 0
for r in rewards[::-1]:
    G = r + gamma * G
    Returns.append(G)
    
Returns = np.array(Returns[::-1], dtype=np.float64)
Returns

array([14.60447605, 13.741895  , 11.8605    ,  8.95      ,  5.        ])

In [31]:
# continuing task
def discount_rewards(rewards):
    Returns = []
    G = 0
    for r in rewards[::-1]:
        G = r + gamma * G
        Returns.append(G)
    # cumsum의 배열 방향을 반대로 한 다음 원래 순서로 되돌립니다.
    Returns = np.array(Returns[::-1], dtype=np.float64)
    print(Returns)
    return Returns - Returns.mean()

discount_rewards(rewards)

[14.60447605 13.741895   11.8605      8.95        5.        ]


array([ 3.77310184,  2.91052079,  1.02912579, -1.88137421, -5.83137421])

### REINFORCE 구현을 위한 Score Function

- 확률 밀도 함수가 매개 변수와 관련하여 미분할 수있는 경우 REINFORCE를 구현하려면 sample () 및 log_prob () 만 필요

$$\Delta_{\theta} = \alpha r \frac{\partial log p(a | \pi^{\theta}(s))}{\partial\theta}$$  

$\alpha$ - learning rate, r - reward,  $p(a|\pi^\theta(s))$ - probability of taking action a  


- Network 출력에서 action을 샘플링하고 이 action을 environment에 적용한 다음 log_prob를 사용하여 동등한 손실 함수를 구성.   
- optimizer는 경사 하강법을 사용하기 때문에 음수를 사용하는 반면 위의 규칙은 경사 상승을 가정.   
- Categorical Policy를 사용하는 경우 REINFORCE를 구현하는 코드는 다음과 같다.

In [32]:
env = gym.make('CartPole-v1')  
s = env.reset()

#probs = policy_network(state)
logits = torch.rand(2)
probs = torch.softmax(logits, dim=-1)

m = Categorical(probs)
action = m.sample()

next_state, reward, done, _ = env.step(action.item())

loss = -m.log_prob(action) * reward
#loss.backward()
print(loss)

tensor(0.8201)


## Huber Loss

- Actor-Critic 의 critic value function 의 loss 계산에 사용  
- Huber Loss는 L1과 L2의 장점을 취하면서 단점을 보완하기 위해서 제안된 것이 Huber Loss다.
    - 모든 지점에서 미분이 가능하다.  
    - Outlier에 상대적으로 Robust하다.
<img src=https://bekaykang.github.io/assets/img/post/201209-2.png width=300>

In [33]:
import torch
import torch.nn.functional as F

curr_q = torch.FloatTensor([10,11,12,10,9])
target_q = torch.FloatTensor([12,8,10,13,11])
loss = F.smooth_l1_loss(curr_q, target_q)
print(loss)

tensor(1.9000)
