# 110. Deep Neural Network을 이용한 함수 근사에서 필요한 torch basics

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

import random
import gymnasium as gym
import collections

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

env = gym.make('CartPole-v1')  
action_size = env.action_space.n
action_size

cpu


2

## Experience Replay

In [2]:
class ExperienceReplay:
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, state, action, new_state, reward, done):
        transition = (state, action, new_state, reward, done)

        if self.position >= len(self.memory):
            self.memory.append(transition)
        else:
            self.memory[self.position] = transition

        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return zip(*random.sample(self.memory, batch_size))

    def __len__(self):
        return len(self.memory)

In [4]:
#Initialize replay memory D to capacity N
D = ExperienceReplay(10)

s, _ = env.reset()
for i in range(10):
    a = env.action_space.sample()
    s_, r, truncated, terminated, _ = env.step(a)
    done = truncated or terminated
    D.push(s, a, s_, r, done)
    s = s_
    
D.memory

[(array([ 0.03732503, -0.03825209, -0.03765024, -0.00319418], dtype=float32),
  1,
  array([ 0.03655998,  0.15738903, -0.03771412, -0.30751443], dtype=float32),
  1.0,
  False),
 (array([ 0.03655998,  0.15738903, -0.03771412, -0.30751443], dtype=float32),
  0,
  array([ 0.03970776, -0.0371758 , -0.04386441, -0.02696005], dtype=float32),
  1.0,
  False),
 (array([ 0.03970776, -0.0371758 , -0.04386441, -0.02696005], dtype=float32),
  1,
  array([ 0.03896425,  0.15854685, -0.04440361, -0.33315364], dtype=float32),
  1.0,
  False),
 (array([ 0.03896425,  0.15854685, -0.04440361, -0.33315364], dtype=float32),
  0,
  array([ 0.04213519, -0.03591589, -0.05106669, -0.05479741], dtype=float32),
  1.0,
  False),
 (array([ 0.04213519, -0.03591589, -0.05106669, -0.05479741], dtype=float32),
  1,
  array([ 0.04141687,  0.15989968, -0.05216263, -0.36314493], dtype=float32),
  1.0,
  False),
 (array([ 0.04141687,  0.15989968, -0.05216263, -0.36314493], dtype=float32),
  1,
  array([ 0.04461486,  0.35

## Sample random minibatch

In [5]:
states, actions, rewards, dones, next_states = D.sample(5)

print("-------------states-------------------------")
print(states)
print("-------------actions----------------------")
print(actions)
print("------------rewards------------------------")
print(rewards)
print("------------next states--------------------")
print(next_states)
print("---------------dones-------------------------")
print(dones)

-------------states-------------------------
(array([ 0.04213519, -0.03591589, -0.05106669, -0.05479741], dtype=float32), array([ 0.05172932,  0.16147497, -0.07286171, -0.39841238], dtype=float32), array([ 0.04141687,  0.15989968, -0.05216263, -0.36314493], dtype=float32), array([ 0.03970776, -0.0371758 , -0.04386441, -0.02696005], dtype=float32), array([ 0.03732503, -0.03825209, -0.03765024, -0.00319418], dtype=float32))
-------------actions----------------------
(1, 1, 1, 1, 1)
------------rewards------------------------
(array([ 0.04141687,  0.15989968, -0.05216263, -0.36314493], dtype=float32), array([ 0.05495882,  0.35755086, -0.08082996, -0.71314824], dtype=float32), array([ 0.04461486,  0.3557227 , -0.05942553, -0.6718088 ], dtype=float32), array([ 0.03896425,  0.15854685, -0.04440361, -0.33315364], dtype=float32), array([ 0.03655998,  0.15738903, -0.03771412, -0.30751443], dtype=float32))
------------next states--------------------
(False, False, False, False, False)
----------

## Select Action 

- state가 4 개의 feature로 구성되고 각 state에서의 action이 2 가지인 MDP의 parameter화 된 state action value function

In [6]:
n_inputs = 4  # state feature
n_outputs = 2  # action space
hidden_layer = 64

class NeuralNetwork(nn.Module):
    def __init__(self) -> None:
        super(NeuralNetwork, self).__init__()
        self.linear1 = nn.Linear(n_inputs, hidden_layer)
        self.linear2 = nn.Linear(hidden_layer, n_outputs)

    def forward(self, x):
        a1 = torch.relu(self.linear1(x))
        output = self.linear2(a1)
        return output

Q = NeuralNetwork().to(device)

- 입력 : 4 개 feature 로 구성된 state 
- 출력 : 2 개 action values  

- greedy action : $max_{a'}Q(s', a';\theta)$

In [7]:
s, _ = env.reset()

action_values = Q(torch.tensor(s).to(device))
action_values

tensor([ 0.1466, -0.1327], grad_fn=<AddBackward0>)

In [8]:
# greedy action
action = torch.argmax(action_values).item() 
action

0

## State-Action Value (q value) from DQN 

Q-network 에서 입력으로 주어진 states 에 대응하는 action values 를 출력으로 얻어 greedy action 을 선택하는 code.  

함수 max()는 최대값과 해당 값의 인덱스를 모두 반환하므로 최대값과 argmax를 모두 계산합니다. 이 경우 값에만 관심이 있기 때문에 결과의 첫 번째 항목(values)을 사용합니다.

In [16]:
states, actions, new_states, rewards, dones = D.sample(5)

states = torch.Tensor(states).to(device)
actions = torch.LongTensor(actions).to(device)
new_states = torch.Tensor(new_states).to(device)
rewards = torch.Tensor([rewards]).to(device)
dones = torch.Tensor(dones).to(device)

new_action_values = Q(new_states).detach()

In [19]:
GAMMA = 1.0
y_target = rewards + \
                (1 - dones) * GAMMA * torch.max(new_action_values, 1)[0]
y_pred = Q(states).gather(1, actions.unsqueeze(1))

y_target, y_pred

(tensor([[1.1617, 1.1517, 1.1485, 1.1598, 1.1576]]),
 tensor([[ 0.1640],
         [-0.1431],
         [ 0.1517],
         [ 0.1573],
         [-0.1087]], grad_fn=<GatherBackward0>))

## torch.gather

- torch.gather 함수 (또는 torch.Tensor.gather)는 다중 인덱스 선택 방법  

- 첫 번째 인수인 input은 요소를 선택하려는 소스 텐서. 두 번째 dim은 수집하려는 차원. 마지막으로 index는 입력을 인덱싱하는 인덱스.

4개의 항목과 4개의 작업으로 구성된 일괄 처리가 있는 간단한 예제 사례에서 gather가 수행하는 작업의 요약입니다.

```
state_action_values = net(states_v).gather(1, actions_v.unsqueeze(1))
```


<img src=https://miro.medium.com/max/1400/1*fS-9p5EBKVgl69Gy0gwjGQ.png width=400>

In [28]:
class PolicyNetwork(nn.Module):
    def __init__(self, input_dims, n_actions):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(*input_dims, 128)
        self.fc2 = nn.Linear(128, n_actions)

    def forward(self, state):
        x = F.relu(self.fc1(state))
        x = F.softmax(self.fc2(x), dim=-1)
        return x

In [29]:
pi = PolicyNetwork(input_dims=env.observation_space.shape,
                   n_actions=env.action_space.n).to(device)

In [30]:
states_v  # 4개의 feature

tensor([[ 0.0517,  0.1615, -0.0729, -0.3984],
        [ 0.0414,  0.1599, -0.0522, -0.3631],
        [ 0.0621,  0.1636, -0.0951, -0.4470],
        [ 0.0550,  0.3576, -0.0808, -0.7131],
        [ 0.0390,  0.1585, -0.0444, -0.3332]])

In [40]:
log_prob = torch.log(pi(states_v))
log_prob

tensor([[-0.6098, -0.7841],
        [-0.6090, -0.7851],
        [-0.6127, -0.7806],
        [-0.6284, -0.7624],
        [-0.6078, -0.7864]], grad_fn=<LogBackward0>)

In [45]:
actions.unsqueeze(1)

tensor([[0],
        [1],
        [0],
        [0],
        [1]])

In [44]:
selected_log_probs = rewards * \
                        torch.gather(log_prob, 1, actions.unsqueeze(1)).squeeze()
selected_log_probs

tensor([[-0.6098, -0.7851, -0.6127, -0.6284, -0.7864]], grad_fn=<MulBackward0>)

## REINFORECE 알고리즘 지원을 위한 PROBABILITY DISTRIBUTIONS - TORCH.DISTRIBUTIONS

- distribution 패키지에는 매개변수화할 수 있는 확률 분포와 sampling 함수가 포함되어 있습니다. 이를 통해 최적화를 위한 확률적 계산 그래프 및 확률적 기울기 추정기를 구성할 수 있습니다. 

- torch 는 다음과 같이 REINFORCE 알고리즘을 지원합니다.

```python
    probs = policy_network(state)
    m = Categorical(probs)
    action = m.sample()
    next_state, reward = env.step(action)
    loss = -m.log_prob(action) * reward
    loss.backward()
```

### 방법 1) Categorical(probs) 에서 sampling

'probs'가 길이가 'K'인 1차원 array인 경우, 각 element 는 해당 인덱스에서 클래스를 샘플링할 상대 확률입니다.

In [None]:
import torch
from torch.distributions import Categorical

logits = torch.rand(4)
probs = F.softmax(logits, dim=-1)
print(f"softmax 확률 분포 : {probs}, sum = {probs.sum()}")

# 각 class 를 sampling 할 상대 확률
m = Categorical(probs)
m

위의 m 에서 sampling 을 반복하면 softmax 확률 분포로 sampling 된다.

In [None]:
from collections import Counter
samples = []

for _ in range(30000):
    a = m.sample()
    samples.append(a.item())

[cnt/len(samples) for a, cnt in sorted(Counter(samples).items())]

### 방법 2) np.random.choice 에서 sampling

- np.random.choice 의 `parameter p`에 softmax 확률 분포 지정하여 sampling

In [None]:
samples = []

for _ in range(30000):
    a = np.random.choice(4, p=probs.numpy())
    samples.append(a)
    
[cnt/len(samples) for a, cnt in sorted(Counter(samples).items())]

### REINFORCE 구현을  위해  total expected return $G_t$ 를 estimate 하는 방법

In [None]:
import numpy as np

# 5 step 만에 spisode 종료 가정
rewards = [1, 2, 3, 4, 5]
gamma = 0.99

In [None]:
G_0 = 1 + 0.99**1 * 2 + 0.99**2 * 3 + 0.99**3 * 4 + 0.99**4 * 5
G_1 = 2 + 0.99**1 * 3 + 0.99**2 * 4 + 0.99**3 * 5
G_2 = 3 + 0.99**1 * 4 + 0.99**2 * 5
G_3 = 4 + 0.99**1 * 5
G_4 = 5
print(G_0, G_1, G_2, G_3, G_4)

In [None]:
r = np.array([gamma**i * rewards[i] for i in range(len(rewards))])
# Reverse the array direction for cumsum and then
# revert back to the original order
r = r[::-1].cumsum()[::-1]
# return r - r.mean()

In [None]:
# episodic task
Returns = []
G = 0
for r in rewards[::-1]:
    G = r + gamma * G
    Returns.append(G)
    
Returns = np.array(Returns[::-1], dtype=np.float64)
Returns

In [None]:
# continuing task
def discount_rewards(rewards):
    Returns = []
    G = 0
    for r in rewards[::-1]:
        G = r + gamma * G
        Returns.append(G)
    # cumsum의 배열 방향을 반대로 한 다음 원래 순서로 되돌립니다.
    Returns = np.array(Returns[::-1], dtype=np.float64)
    print(Returns)
    return Returns - Returns.mean()

discount_rewards(rewards)

### REINFORCE 구현을 위한 Score Function

- 확률 밀도 함수가 매개 변수와 관련하여 미분할 수있는 경우 REINFORCE를 구현하려면 sample () 및 log_prob () 만 필요

$$\Delta_{\theta} = \alpha r \frac{\partial log p(a | \pi^{\theta}(s))}{\partial\theta}$$  

$\alpha$ - learning rate, r - reward,  $p(a|\pi^\theta(s))$ - probability of taking action a  


- Network 출력에서 action을 샘플링하고 이 action을 environment에 적용한 다음 log_prob를 사용하여 동등한 손실 함수를 구성.   
- optimizer는 경사 하강법을 사용하기 때문에 음수를 사용하는 반면 위의 규칙은 경사 상승을 가정.   
- Categorical Policy를 사용하는 경우 REINFORCE를 구현하는 코드는 다음과 같다.

In [None]:
env = gym.make('CartPole-v1')  
s = env.reset()

#probs = policy_network(state)
logits = torch.rand(2)
probs = torch.softmax(logits, dim=-1)

m = Categorical(probs)
action = m.sample()

next_state, reward, done, _ = env.step(action.item())

loss = -m.log_prob(action) * reward
#loss.backward()
print(loss)

## Huber Loss

- Actor-Critic 의 critic value function 의 loss 계산에 사용  
- Huber Loss는 L1과 L2의 장점을 취하면서 단점을 보완하기 위해서 제안된 것이 Huber Loss다.
    - 모든 지점에서 미분이 가능하다.  
    - Outlier에 상대적으로 Robust하다.
<img src=https://bekaykang.github.io/assets/img/post/201209-2.png width=300>

In [None]:
import torch
import torch.nn.functional as F

curr_q = torch.FloatTensor([10,11,12,10,9])
target_q = torch.FloatTensor([12,8,10,13,11])

loss = F.smooth_l1_loss(curr_q, target_q)
print(loss)

In [None]:
F.mse_loss(curr_q, target_q)