# 코인으로 마통갚기 강화학습 (Deep Q Learning)

- Motivation : 마이너스 통장에 대출이 있는데, 한달에 생활비를 제외하면 최대 갚을 수 있는 돈이 정해져 있음. 이중에 일부를 대출상환에 쓰고 일부는 주식 또는 코인에 투자하려고 함. 이 경우, 이자를 적게 내기 위한 가장 적합한 상환 전략은?

In [1]:
import numpy as np
import pandas as pd
import seaborn as sns
import random
import torch
import torch.nn as nn
from torch.nn import functional as F
from torch import optim

from itertools import product
from collections import deque
from tqdm import tqdm

State space : 대출 잔액, 코인 계좌 잔액, (time t)
- 최초 대출금액은 5천만원. 1만원 단위로 갚는 경우로 가정.
- 그리고 코인계좌 잔액 요렇게 2개가 될듯 함. 
- 이자랑 코인 수익 만원단위까지 계산 넣으려면 State Space 는 만원 단위로 설정해야함.

In [2]:
state_space = np.arange(0, 5000, 1)
print(len(state_space))

5000


Action space : 대출 상환금액, 코인 인출 금액
- 매달 생활비 빼고 이자 상환할 수 있는 금액은 60만원 인데, 이중에서 얼마는 코인에 투자하고 얼마는 대출을 갚으려고 함. 코인에 투자한 금액은 다음달에 확률적으로 오를수도 있고 떨어질 수도 있어.
- 코인 계좌에서 얼마를 인출할 것인가? (0%~100% 로 비율로 인출하는 것으로 가정)
- 만원단위로 설정할지, 10만원단위로 할지 -> 계산 편의상 10만원부터 해보고 나중에 1만원 하기

In [3]:
debt_repay = np.arange(0, 60, 1)
coin_withdrawal = np.arange(0, 100, 1)

# 2가지 액션의 조합으로 액션 설명하고자 함. - 얼마나 적급할거냐, coint에서 얼마나 뺄거냐
xx, yy = np.meshgrid(debt_repay, coin_withdrawal)
print(xx.shape)

action_space = list(product(debt_repay, coin_withdrawal))
print(len(action_space))

(100, 60)
6000


Reward space : 매달 내는 이자 상환액 (대출금리 * 대출잔액)
- 3.75 고정금리 가정
- 변동금리 가정 -> 현실하고 유사하게 만들려면 고민 필요함. 에피소드 생성시점에 에피소드 길이만큼 만들어서 reward 생성해야 할듯함.

In [4]:
# 고정금리 경우
reward_space = state_space * 3.75e-2 / 12
print(reward_space[:5], "...", reward_space[-5:])

[0.       0.003125 0.00625  0.009375 0.0125  ] ... [15.609375 15.6125   15.615625 15.61875  15.621875]


Transition Matrix : 다음달 남은 대출잔액으로 옮겨갈 확률(?)

- P 는 그리기 어려워서 아래 설명. 에피소드 발생 시점에 나와...
- 다음달 잔액 계산식:

\begin{align}
대출잔액_{t+1} &= 대출잔액_{t} - 대출원금상환액_{t} \nonumber \\
대출원금상환액_t &= 상환금액(0to60만원)_t + 코인수익_t * 코인인출비율(0to100) - 이번달이자_t \nonumber \\
코인수익_t &= (지난달코인잔액_{t-1} + 지난달코인납입액_{t-1}) * 지난달 코인변동_{t-1} \nonumber \\
\end{align}

Episode generator or Environment: 주어진 기간(10년=120t) 에피소드 동안 s, a, r, s+1 생성하는 environment

In [5]:
class DebtRepayer(object):
    def __init__(self, start_debt, duration=120, interest_rate=3.75, round=0) -> None:
        self.start_debt = start_debt
        self.current_debt = start_debt
        self.duration = duration
        self.t = 0
        self.coin_balance = 0
        self.interest_rate = interest_rate / 100 # 은행 대출 이자 - 고정금리 가정
        self.round = round

        # action_space grid
        self.repay_grid, self.withdraw_grid = np.meshgrid(np.arange(10, 70, 10), np.arange(0, 110, 10))

        # history
        self.history = []

    def reset(self):
        self.current_debt = self.start_debt
        self.coin_balance = 0
        self.history = []
        self.t = 0

    def get_one_episode(self, policy):
        for action in policy:
            sample = self.forward_one_month((action[0], action[1]))
            if sample[3][1] == 0: break
            if sample[3][0] > 120: break

        return self.history
            
    def forward_one_month(self, action: tuple):
        past_state = (self.t, self.current_debt, self.coin_balance)
        repay_amount = action[0]
        withdraw_pct = action[1] / 100
        interest_amount = round(self.current_debt * self.interest_rate / 12, self.round)

        repay_amount = min(60 - interest_amount, repay_amount) # 은행 이자 제외하고 남은 금액에서만 값을 수 있음.
        coin_input = 60 - repay_amount - interest_amount
        
        withdrawed_coin = self.get_coin_balance(coin_input, withdraw_pct)
        total_repay_amount = repay_amount + withdrawed_coin

        self.current_debt -= total_repay_amount
        if self.current_debt <= 0:
            self.current_debt = 0
            done = 0
        else:
            done = 1
        self.t += 1
        prime_state = (self.t, self.current_debt, self.coin_balance)

        sample = (past_state, action, interest_amount, prime_state, done)
        self.history.append(sample)
        
        return sample

    # 다음달 코인 잔액 및 인출액 생성기 - 
    def get_coin_balance(self, current_input, withdraw_pct):
        #상승 or 하락으로 정해서 beta 분포로 변동 설정함
        if np.random.rand() >= 0.5:
            change_rate = np.random.beta(a=1.5, b=4)
        else:
            change_rate = -(np.random.beta(a=1.25, b=8.7))
        # 코인 잔액 업데이트하고 인출액 반환 - FIXME 순서확인, 마이너스 안나게 input 값 고려 
        withdrawal_amount = round(self.coin_balance * withdraw_pct, self.round)
        temp_balance = self.coin_balance - withdrawal_amount   # 당월 인출
        temp_balance += current_input # 당월 입금
        self.coin_balance = round(temp_balance + (temp_balance * change_rate), self.round)
        if self.coin_balance < 0: self.coin_balance = 0

        return withdrawal_amount

In [6]:
d = DebtRepayer(start_debt=5000, round=4)
d.forward_one_month(action=(1, 4))
d.forward_one_month(action=(3, 4))
d.history

[((0, 5000, 0), (1, 4), 15.625, (1, 4999.0, 27.6795), 1),
 ((1, 4999.0, 27.6795), (3, 4), 15.6219, (2, 4994.8928, 74.465), 1)]

Deep Q-Learning

In [7]:
class ReplayBuffer(object):
    """A class for store episodes"""
    def __init__(self, capacity) -> None:
        self.buffer = deque(maxlen=capacity)

    def put(self, transition):
        self.buffer.append(transition)

    def __len__(self):
        return len(self.buffer)

    def get_sample(self, n):
        batch = random.sample(self.buffer, n)
        s_lst, a_lst, r_lst, s_prime_lst, done_mask_lst = [], [], [], [], []

        for transition in batch:
            s, a, r, s_prime, done_mask = transition
            s_lst.append(s)
            a_lst.append(a)
            r_lst.append([r])
            s_prime_lst.append(s_prime)
            done_mask_lst.append([done_mask])

        sample = (
            torch.tensor(s_lst, dtype=torch.float),
            a_lst,
            torch.tensor(r_lst, dtype=torch.float),
            torch.tensor(s_prime_lst, dtype=torch.float),
            torch.tensor(done_mask_lst, dtype=torch.float),
        )
        return sample

In [8]:
class Qnet(nn.Module):
    def __init__(self) -> None:
        super().__init__()

        self.qnet = nn.Sequential(
            nn.Linear(3, 128), nn.ReLU(),
            nn.Linear(128, 128), nn.ReLU(),
            nn.Linear(128, 128), nn.ReLU(),
            nn.Linear(128, 60),
        )

    def forward(self, x):
        return self.qnet(x)

    def sample_action(self, obs, eps):
        if not isinstance(obs, torch.Tensor):
            obs = torch.Tensor(obs)
        out = self.forward(obs)
        if random.random() < eps:
            return random.randint(0, 59)
        else:
            return out.argmax().item()
        

Trainer

In [10]:
lr = 1e-4
gamma = 1
buffer_limit = 5000
batch_size = 32

In [11]:
def train(q, q_target, buffer, opt, gamma, batch_size):
    s, a, r, s_prime, done_mask = buffer.get_sample(batch_size)

    q_out = q(s)
    action_idx = torch.tensor([action_space.index(i) for i in a], dtype=torch.int64).unsqueeze(1)
    q_a = q_out.gather(1, action_idx)
    max_q_prime = q_target(s_prime).max(1)[0].unsqueeze(1)
    target = -r + gamma * max_q_prime * done_mask
    loss = F.smooth_l1_loss(q_a, target)

    opt.zero_grad()
    loss.backward()
    opt.step()

In [12]:
def main(epoch: int):
    # set envs
    env = DebtRepayer(start_debt=5000, round=4)
    buffer = ReplayBuffer(buffer_limit)

    # set model
    q = Qnet()
    q_target = Qnet()
    opt = optim.AdamW(q.parameters(), lr=lr)
    q_target.load_state_dict(q.state_dict())
    
    # set training loop
    G = 0
    eps = 1
    pbar = tqdm(range(epoch), desc="Deep Q net trainin : ", miniters=10)
    for epi in pbar:
        eps = max(eps * 0.9997, 0.001)
        env.reset()
        s = [env.t, env.start_debt, env.coin_balance]

        # put samples until episode ends
        flag = False
        while not flag: 
            a = q.sample_action(s, eps)
            sample = env.forward_one_month(action_space[a])
            buffer.put((sample))
            s = sample[3]
            G += sample[2]
            if s[1] == 0: break
            if s[0] >= 120: break

        # training when buffer got enough samples ten backward per one loop
        if len(buffer) > 2000:
            for _ in range(10):
                train(q, q_target, buffer, opt, gamma, batch_size)

        # q_target copy trained q every 20 loop
        if epi % 20 == 0 and epi != 0:
            q_target.load_state_dict(q.state_dict())

            q_1a = action_space[q(torch.Tensor((0, 5000, 0))).argmax().item()]
            pbar.set_postfix_str("n_episode : {}, Q1 : {}, interest_amount : {:.1f}, n_buffer : {}, eps : {:.1f}%".format(epi, q_1a, G, len(buffer), eps))

            G = 0
    
    print("Training done!!")
    return q

In [14]:
model = main(10000)

Deep Q net trainin : 100%|█| 10000/10000 [03:02<00:00, 54.66it/s, n_episode : 9980, Q1 : (0, 16), interest_amount : 18060.9, n_buffer : 5000, eps : 0.

Training done!!



