# 110. Deep Neural Network을 이용한 함수 근사에서 필요한 torch basics

- Colab에서 실행

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

import random
import gymnasium as gym
import collections

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

# CartPole 환경 생성
env = gym.make('CartPole-v1')

# 환경에서 선택 가능한 행동(action)의 개수
action_size = env.action_space.n
action_size

cuda:0


np.int64(2)

## Experience Replay

예시 시나리오 (capacity = 4)

| step | position | memory                              |
| ---- | -------- | ----------------------------------- |
| 0    | 0        | \[exp0]                             |
| 1    | 1        | \[exp0, exp1]                       |
| 2    | 2        | \[exp0, exp1, exp2]                 |
| 3    | 3        | \[exp0, exp1, exp2, exp3]           |
| 4    | 0        | \[exp4, exp1, exp2, exp3] ← 덮어쓰기 시작 |
| 5    | 1        | \[exp4, exp5, exp2, exp3]           |
| ...  | ...      | 계속해서 순환 덮어쓰기                        |


In [3]:
class ExperienceReplay:
    def __init__(self, capacity):
        # 메모리의 최대 저장 용량 설정
        self.capacity = capacity
        self.memory = []         # transition들을 저장할 리스트
        self.position = 0        # 현재 저장 위치 인덱스 (순환 구조)

    def push(self, state, action, new_state, reward, done):
        # 하나의 transition을 메모리에 저장 (s, a, s', r, done)
        transition = (state, action, new_state, reward, done)

        # 메모리 용량이 아직 부족하면 append, 가득 찼으면 덮어쓰기
        if self.position >= len(self.memory):
            self.memory.append(transition)
        else:
            self.memory[self.position] = transition

        # 저장 위치를 순환시키기 위해 모듈로 연산 사용
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        # 메모리에서 무작위로 batch_size만큼 샘플링하고, 각 항목별로 묶어서 반환
        # zip(*list) 형식으로 반환하면 (states, actions, new_states, rewards, dones) 순으로 반환됨
        return zip(*random.sample(self.memory, batch_size))

    def __len__(self):
        # 현재 메모리에 저장된 transition의 수 반환
        return len(self.memory)

In [11]:
# 리플레이 메모리 D를 용량 10으로 초기화
D = ExperienceReplay(5)

# 환경을 초기화하고 상태 s를 얻음
s, _ = env.reset()

# 5번의 경험을 저장
for i in range(5):
    # 무작위로 액션 a를 선택
    a = env.action_space.sample()

    # 환경에 액션 a를 적용하고 다음 상태 s_, 보상 r 등을 얻음
    s_, r, truncated, terminated, _ = env.step(a)

    # 에피소드가 끝났는지 여부 확인
    done = truncated or terminated

    # 경험 (s, a, s_, r, done)을 메모리에 저장
    D.push(s, a, s_, r, done)

    # 다음 상태를 현재 상태로 업데이트
    s = s_

# 저장된 메모리 내용을 출력
for i, (s, a, s_, r, d) in enumerate(D.memory):
    print(f"{i}:")
    print(f"   state:      {s}")
    print(f"   action:     {a}")
    print(f"   next_state: {s_}")
    print(f"   reward:     {r}")
    print(f"   done:       {d}\n")


0:
   state:      [ 0.04782227  0.01752919 -0.00694257 -0.044804  ]
   action:     1
   next_state: [ 0.04817285  0.21275    -0.00783865 -0.33966926]
   reward:     1.0
   done:       False

1:
   state:      [ 0.04817285  0.21275    -0.00783865 -0.33966926]
   action:     1
   next_state: [ 0.05242785  0.40798262 -0.01463203 -0.6348137 ]
   reward:     1.0
   done:       False

2:
   state:      [ 0.05242785  0.40798262 -0.01463203 -0.6348137 ]
   action:     1
   next_state: [ 0.0605875   0.6033056  -0.02732831 -0.93206847]
   reward:     1.0
   done:       False

3:
   state:      [ 0.0605875   0.6033056  -0.02732831 -0.93206847]
   action:     1
   next_state: [ 0.07265361  0.7987854  -0.04596968 -1.2332122 ]
   reward:     1.0
   done:       False

4:
   state:      [ 0.07265361  0.7987854  -0.04596968 -1.2332122 ]
   action:     0
   next_state: [ 0.08862932  0.6042837  -0.07063392 -0.95527816]
   reward:     1.0
   done:       False



## Sample random minibatch

In [12]:
states, actions, rewards, dones, next_states = D.sample(5)

print("-------------states-------------------------")
print(states)
print("-------------actions----------------------")
print(actions)
print("------------rewards------------------------")
print(rewards)
print("------------next states--------------------")
print(next_states)
print("---------------dones-------------------------")
print(dones)

-------------states-------------------------
(array([ 0.05242785,  0.40798262, -0.01463203, -0.6348137 ], dtype=float32), array([ 0.04817285,  0.21275   , -0.00783865, -0.33966926], dtype=float32), array([ 0.07265361,  0.7987854 , -0.04596968, -1.2332122 ], dtype=float32), array([ 0.04782227,  0.01752919, -0.00694257, -0.044804  ], dtype=float32), array([ 0.0605875 ,  0.6033056 , -0.02732831, -0.93206847], dtype=float32))
-------------actions----------------------
(np.int64(1), np.int64(1), np.int64(0), np.int64(1), np.int64(1))
------------rewards------------------------
(array([ 0.0605875 ,  0.6033056 , -0.02732831, -0.93206847], dtype=float32), array([ 0.05242785,  0.40798262, -0.01463203, -0.6348137 ], dtype=float32), array([ 0.08862932,  0.6042837 , -0.07063392, -0.95527816], dtype=float32), array([ 0.04817285,  0.21275   , -0.00783865, -0.33966926], dtype=float32), array([ 0.07265361,  0.7987854 , -0.04596968, -1.2332122 ], dtype=float32))
------------next states-----------------

## Select Action

- 4개의 특성(feature)으로 구성된 상태에서, 각각의 상태에서 선택할 수 있는 행동이 2가지인 환경에서, 신경망으로 근사한 상태-행동 가치 함수

In [13]:
# 입력 차원, state feature
n_inputs = 4

# 출력 차원,  action space
n_outputs = 2

# 은닉층의 뉴런 수
hidden_layer = 64

# Q-함수 근사를 위한 신경망 정의
class NeuralNetwork(nn.Module):
    def __init__(self) -> None:
        super(NeuralNetwork, self).__init__()

        # 첫 번째 선형 계층: 입력 → 은닉층
        self.linear1 = nn.Linear(n_inputs, hidden_layer)

        # 두 번째 선형 계층: 은닉층 → 출력층 (Q값 출력)
        self.linear2 = nn.Linear(hidden_layer, n_outputs)

    def forward(self, x):
        # 첫 번째 은닉층에 ReLU 활성화 함수 적용
        a1 = torch.relu(self.linear1(x))

        # 최종 출력값 (각 행동에 대한 Q값) 반환
        output = self.linear2(a1)
        return output

# 네트워크를 생성하고, GPU 사용 가능 시 GPU에 올림
Q = NeuralNetwork().to(device)
Q

NeuralNetwork(
  (linear1): Linear(in_features=4, out_features=64, bias=True)
  (linear2): Linear(in_features=64, out_features=2, bias=True)
)

- 입력 : 4 개 feature 로 구성된 state
- 출력 : 2 개 action values  

- greedy action : $max_{a'}Q(s', a';\theta)$

In [14]:
# 환경을 초기화하고, 초기 상태(state) s를 얻음
s, _ = env.reset()


# Q 네트워크에 상태를 입력하여 각 행동(action)에 대한 Q값 계산
action_values = Q(torch.tensor(s).to(device))

# 계산된 Q값 출력 (예: [왼쪽으로 이동할 Q값, 오른쪽으로 이동할 Q값])
action_values

tensor([0.1032, 0.0316], device='cuda:0', grad_fn=<ViewBackward0>)

In [15]:
# greedy 방식으로 행동 선택 (가장 Q값이 높은 행동 선택)
action = torch.argmax(action_values).item()

# 선택된 행동 출력 (예: 0 또는 1 → CartPole에서는 왼쪽 또는 오른쪽)
action

0

## State-Action Value (q value) from DQN

Q-network 에서 입력으로 주어진 states 에 대응하는 action values 를 출력으로 얻어 greedy action 을 선택하는 code.  

함수 max()는 최대값과 해당 값의 인덱스를 모두 반환하므로 최대값과 argmax를 모두 계산합니다. 이 경우 값에만 관심이 있기 때문에 결과의 첫 번째 항목(values)을 사용합니다.

In [19]:
# numpy 형태의 상태들(states)을 텐서로 변환하고 GPU로 이동
states_v = torch.tensor(states).to(device)

# Q-네트워크에 상태들을 넣어 각 행동에 대한 Q값 예측
# detach(): 그래디언트 추적 중단
# cpu(): GPU에 있던 값을 CPU로 이동 (출력/시각화용)
action_values = Q(states_v).detach().cpu()

# 각 상태에 대한 모든 행동의 Q값 출력
print("action values:")
print(action_values)

# 각 상태별로 가장 큰 Q값과 그에 해당하는 행동 인덱스를 튜플로 출력
# 가장 큰 Q값들만 추출 (values: 최대 Q값, indices: 해당 행동 인덱스)
values, indices = torch.max(action_values, dim=1)

# 각 상태에 대해 선택된 행동의 Q값 (가장 큰 값)
print("Q values:")
print(values)

# 각 상태에 대해 Q값이 가장 큰 행동의 인덱스 (예: 0 또는 1)
print("actions:")
print(indices)

action values:
tensor([[-0.0870, -0.0902],
        [-0.0316, -0.0338],
        [-0.1147, -0.2716],
        [ 0.0700,  0.0196],
        [-0.1059, -0.1787]])
Q values:
tensor([-0.0870, -0.0316, -0.1147,  0.0700, -0.1059])
actions:
tensor([0, 0, 0, 0, 0])


## torch.gather

- torch.gather 함수 (또는 torch.Tensor.gather)는 다중 인덱스 선택 방법  

- 첫 번째 인수인 input은 요소를 선택하려는 소스 텐서. 두 번째 dim은 수집하려는 차원. 마지막으로 index는 입력을 인덱싱하는 인덱스.

4개의 항목과 4개의 작업으로 구성된 일괄 처리가 있는 간단한 예제 사례에서 gather가 수행하는 작업의 요약입니다.

```
state_action_values = net(states_v).gather(1, actions_v.unsqueeze(1))
```


<img src=https://miro.medium.com/max/1400/1*fS-9p5EBKVgl69Gy0gwjGQ.png width=400>

| 구분                    | 설명                                                 |
| --------------------- | -------------------------------------------------- |
| `Output of the model` | 신경망의 출력 → 각 상태(batch)에 대해 가능한 모든 action의 Q값들 (4개씩) |
| `Actions taken`       | 각 상태에서 실제로 취한 행동의 인덱스 (예: `[2, 3, 0, 2]`)          |
| `Result of gather`    | 각 상태에서 실제 취한 행동에 대한 Q값만 추출한 결과                     |


In [22]:
import torch

output = torch.tensor([[0, 1, 2, 3],
                       [4, 5, 6, 7],
                       [8, 9,10,11],
                       [12,13,14,15]])  # (4, 4)

actions = torch.tensor([2, 3, 0, 2]).unsqueeze(1)  # shape: (4, 1)

# gather: 각 row에서 지정한 index(actions) 위치의 값만 추출
result = output.gather(1, actions)

print(result.squeeze())  # tensor([2, 7, 8, 14])

tensor([ 2,  7,  8, 14])


In [20]:
states_v  # 4개의 feature

tensor([[ 0.0524,  0.4080, -0.0146, -0.6348],
        [ 0.0482,  0.2128, -0.0078, -0.3397],
        [ 0.0727,  0.7988, -0.0460, -1.2332],
        [ 0.0478,  0.0175, -0.0069, -0.0448],
        [ 0.0606,  0.6033, -0.0273, -0.9321]], device='cuda:0')

In [23]:
q_values = Q(states_v)
q_values  # 2 개의 action values

tensor([[-0.0870, -0.0902],
        [-0.0316, -0.0338],
        [-0.1147, -0.2716],
        [ 0.0700,  0.0196],
        [-0.1059, -0.1787]], device='cuda:0', grad_fn=<AddmmBackward0>)

In [28]:
# 실행한 action 들을 LongTensor 형태로 정의하고, (batch_size, 1) 형태로 reshape
# 예: batch 내 5개의 상태에서 취한 행동 = [1, 0, 1, 1, 0]
action = torch.LongTensor([1, 0, 1, 1, 0]).unsqueeze(1).to(device)
action

tensor([[1],
        [0],
        [1],
        [1],
        [0]], device='cuda:0')

In [30]:
# # gather를 통해 각 상태에서 취한 action의 Q값 추출
torch.gather(q_values, 1, action)

tensor([[-0.0902],
        [-0.0316],
        [-0.2716],
        [ 0.0196],
        [-0.1059]], device='cuda:0', grad_fn=<GatherBackward0>)

In [31]:
q_values.gather(1, action)   # 위와 동일 operation

tensor([[-0.0902],
        [-0.0316],
        [-0.2716],
        [ 0.0196],
        [-0.1059]], device='cuda:0', grad_fn=<GatherBackward0>)

## REINFORECE 알고리즘 지원을 위한 PROBABILITY DISTRIBUTIONS - TORCH.DISTRIBUTIONS

- distribution 패키지에는 매개변수화할 수 있는 확률 분포와 sampling 함수가 포함되어 있습니다. 이를 통해 최적화를 위한 확률적 계산 그래프 및 확률적 기울기 추정기를 구성할 수 있습니다.

- torch 는 다음과 같이 REINFORCE 알고리즘을 지원합니다.

```python
    probs = policy_network(state)
    m = Categorical(probs)
    action = m.sample()
    next_state, reward = env.step(action)
    loss = -m.log_prob(action) * reward
    loss.backward()
```

### 방법 1) Categorical(probs) 에서 sampling

'probs'가 길이가 'K'인 1차원 array인 경우, 각 element 는 해당 인덱스에서 클래스를 샘플링할 상대 확률입니다.

In [None]:
import torch
from torch.distributions import Categorical

logits = torch.rand(4)
probs = F.softmax(logits, dim=-1)
print(f"softmax 확률 분포 : {probs}, sum = {probs.sum()}")

# 각 class 를 sampling 할 상대 확률
m = Categorical(probs)
m

위의 m 에서 sampling 을 반복하면 softmax 확률 분포로 sampling 된다.

In [None]:
from collections import Counter
samples = []

for _ in range(30000):
    a = m.sample()
    samples.append(a.item())

[cnt/len(samples) for a, cnt in sorted(Counter(samples).items())]

### 방법 2) np.random.choice 에서 sampling

- np.random.choice 의 `parameter p`에 softmax 확률 분포 지정하여 sampling

In [None]:
samples = []

for _ in range(30000):
    a = np.random.choice(4, p=probs.numpy())
    samples.append(a)

[cnt/len(samples) for a, cnt in sorted(Counter(samples).items())]

### REINFORCE 구현을  위해  total expected return $G_t$ 를 estimate 하는 방법

In [None]:
import numpy as np

# 5 step 만에 spisode 종료 가정
rewards = [1, 2, 3, 4, 5]
gamma = 0.99

In [None]:
G_0 = 1 + 0.99**1 * 2 + 0.99**2 * 3 + 0.99**3 * 4 + 0.99**4 * 5
G_1 = 2 + 0.99**1 * 3 + 0.99**2 * 4 + 0.99**3 * 5
G_2 = 3 + 0.99**1 * 4 + 0.99**2 * 5
G_3 = 4 + 0.99**1 * 5
G_4 = 5
print(G_0, G_1, G_2, G_3, G_4)

In [None]:
r = np.array([gamma**i * rewards[i] for i in range(len(rewards))])
# Reverse the array direction for cumsum and then
# revert back to the original order
r = r[::-1].cumsum()[::-1]
# return r - r.mean()

In [None]:
# episodic task
Returns = []
G = 0
for r in rewards[::-1]:
    G = r + gamma * G
    Returns.append(G)

Returns = np.array(Returns[::-1], dtype=np.float64)
Returns

In [None]:
# continuing task
def discount_rewards(rewards):
    Returns = []
    G = 0
    for r in rewards[::-1]:
        G = r + gamma * G
        Returns.append(G)
    # cumsum의 배열 방향을 반대로 한 다음 원래 순서로 되돌립니다.
    Returns = np.array(Returns[::-1], dtype=np.float64)
    print(Returns)
    return Returns - Returns.mean()

discount_rewards(rewards)

### REINFORCE 구현을 위한 Score Function

- 확률 밀도 함수가 매개 변수와 관련하여 미분할 수있는 경우 REINFORCE를 구현하려면 sample () 및 log_prob () 만 필요

$$\Delta_{\theta} = \alpha r \frac{\partial log p(a | \pi^{\theta}(s))}{\partial\theta}$$  

$\alpha$ - learning rate, r - reward,  $p(a|\pi^\theta(s))$ - probability of taking action a  


- Network 출력에서 action을 샘플링하고 이 action을 environment에 적용한 다음 log_prob를 사용하여 동등한 손실 함수를 구성.   
- optimizer는 경사 하강법을 사용하기 때문에 음수를 사용하는 반면 위의 규칙은 경사 상승을 가정.   
- Categorical Policy를 사용하는 경우 REINFORCE를 구현하는 코드는 다음과 같다.

In [None]:
env = gym.make('CartPole-v1')
s = env.reset()

#probs = policy_network(state)
logits = torch.rand(2)
probs = torch.softmax(logits, dim=-1)

m = Categorical(probs)
action = m.sample()

next_state, reward, done, _, _ = env.step(action.item())

loss = -m.log_prob(action) * reward
#loss.backward()
print(loss)

## Huber Loss

- Actor-Critic 의 critic value function 의 loss 계산에 사용  
- Huber Loss는 L1과 L2의 장점을 취하면서 단점을 보완하기 위해서 제안된 것이 Huber Loss다.
    - 모든 지점에서 미분이 가능하다.  
    - Outlier에 상대적으로 Robust하다.
<img src=https://bekaykang.github.io/assets/img/post/201209-2.png width=300>

In [None]:
import torch
import torch.nn.functional as F

curr_q = torch.FloatTensor([10,11,12,10,9])
target_q = torch.FloatTensor([12,8,10,13,11])

loss = F.smooth_l1_loss(curr_q, target_q)
print(loss)

In [None]:
F.mse_loss(curr_q, target_q)