In [1]:
#Process.py #

import numpy as np 
import pandas as pd

def data_preprocess(data, split_ratio):
    _data = data
    _split_ratio = split_ratio
    n_rows = _data.shape[0] 	# 데이터의 행 수
    n_cols = _data.shape[1]   # 데이터의 열 수
    data_c_open = _data[['Open']] # 개시 가격 데이터 로드
    tradePrice = np.array(data_c_open[1:]) # 다음날의 개시가격 로드
    _data = _data[:-1]

    if len(tradePrice) == len(_data):   # 행수가 동일하면 데이터를 접합
        _data = _data.loc[:,['Open','High','Low','Close','Volume']] 
        _data['tradePrice'] = tradePrice # 당일의 개시일 및 거래가격 
        _data['cash'] = 100000.   # 초기 현금 100,000원으로 설정
        _data['stockValue'] = 0.   # 초기 주식량 0으로 설정
        _data = np.array(_data)
        n_train = int(np.round(_split_ratio * n_rows))   # 훈련 데이터의 수
        n_test = n_rows - n_train    # 검증 데이터의 수
        train = _data[:n_train]    # 훈련 데이터 셋
        test = _data[-n_test:]    # 검증 데이터 셋
        return train, test
    else:
        return None, None

In [3]:
#Environment.py #

from tensorforce.environments import Environment
import pandas as pd 
import numpy as np 

# 상태의 단계
N_TIMESTEPS = 5
# 매번 거래되는 주식 수
ORDER_SIZE = 10

class StockEnv(Environment):
    def __init__(self, data):
        self.xdata = data
        self.reset()  # 초기화 환경
        
    def __str__(self):
        return 'StockEnvironment'

    def close(self):
        print('stock over') # 환경을 닫음. 추후 다른 메소드 호출이 가능하지 않음

    def seed(self, seed):
        return None  # 환경의 난수를 지정된 값으로 설정

    def reset(self):  # 환경을 재설정하고 새 에피소드를 설정
        self.sample_size = len(self.xdata)    # 샘플수 설정
        self.step_counter = 0     # 훈련 초기화
        self.n_timesteps = N_TIMESTEPS   # 상태 단계 초기화
        self.order_size = ORDER_SIZE   # 단일 거래 번호 초기화
        self.n_actions = len(self.actions)    # 초기화 된 action 수
        self.reward = 0.    # 보상 초기화
        self.current_state = self.states     # 상태 초기화
        self.next_states = self.xdata[self.step_counter + 1:  				+ self.step_counter + self.n_timesteps + 1]
        self.tradePrice = self.states[-1, 5]   # 거래 당일 개시가격(21일)
        self.cal_value_price = self.states[-1, 3] # 기록 마감 가격(20일), 주식 가치 측정
        self.stock_amount = 0. # 보유한 주식 수량 초기화
        self.stock_value = 0. # 주식값 초기화
        self.cash_hold = 100000.   # 현재 현금
        self.current_value = self.cash_hold + self.stock_value   # 현재 재산초기화 
        self.past_value = 100000.   # 과거 재산의 총가치 초기화
        self.done = False   # 현재 최종 상태에 있는지 여부
        
    def execute(self, action):  # 행동을 실행하고 다음 상태 및 보상을 관찰
        self.past_value = self.current_value   # 과거 재산을 현재 재산으로 업데이트
        self.cal_value_price = self.states[-1, 3]   # 주식 가치 계산
        self.tradePrice = self.states[-1, 5]   # 거래 주문
        
        if action == 1: # 구매
            # 현금 총액 업데이트
            self.cash_hold = self.cash_hold - self.tradePrice * self.order_size 
            # 주식 수량 업데이트
            self.stock_amount = self.stock_amount + self.order_size
            
        elif action == 2: # 판매
            # 현금 총액 업데이트
            self.cash_hold = self.cash_hold + self.tradePrice * self.order_size
            # 주식 수량 업데이트
            self.stock_amount = self.stock_amount - self.order_size  
        # 돈의 가치를 고려하고 현금의 총가치를 업데이트
        self.cash_hold = 0.9997 * self.cash_hold 

        # 상태 업데이트
        if self.step_counter + self.n_timesteps + 1 < self.sample_size: 
            self.done = False
            self.next_states = self.xdata[self.step_counter + 1 : self.step_counter +  			    self.n_timesteps + 1]
            # 주식 가치를 계산하는데 사용된느 다음날의 종가를 계산
            self.stock_value = self.next_states[-1, 3] * self.stock_amount
            # 당일 현금 및 재고 값 업데이트
            self.xdata[self.step_counter + self.n_timesteps, 6] = self.cash_hold
            self.xdata[self.step_counter + self.n_timesteps, 7] = self.stock_value  
        else:
            self.done = True
            self.next_states = self.states
            # 당일의 종가가 계산되지 않았으므로, 거래 개시 시점의 주가를 계산함 
            self.stock_value = self.next_states[-1, 5] * self.stock_amount
            # 당일 현금 및 재고 값 업데이트
            self.xdata[self.step_counter + self.n_timesteps, 6] = self.cash_hold
            self.xdata[self.step_counter + self.n_timesteps, 7] = self.stock_value

        # 자본의 총액 계산(현금 + 주식 가치)
        self.current_value = self.cash_hold + self.stock_value
        
        # 보상 규칙
        # 다음과 같은 상황에서는 최대 패널티를 내며 주식 거래를 일찍 종료：
        #       1. 현금 < 0; 
        #       2. 자본금 < 70,000
        #       3. 현금이 총 자본금의 30% 미만으로 유지될 때
        #       4. 주식 구매 시 현금이 충분하지 않을 때
        
        if self.cash_hold <= 0 or self.current_value <= 70000. or 				(self.cash_hold <= (0.3 * self.current_value)):  
            self.reward = self.reward - 1.

        if self.stock_amount < 0 and ((-1 * self.stock_amount) > 				(0.7 * self.cash_hold / self.states[-1,5])):
            self.reward = self.reward - 1.

        # 보상 벌칙
        self.reward = self.reward + 1. * (self.current_value - self.past_value) /  				self.past_value

        self.step_counter = self.step_counter + 1 # 학습 시간 업데이트        
        return self.next_states, self.done, self.reward 

    @property
    def states(self):
        # 현재 시점에서 20개 이상 데이터를 취할수 있는 경우 현재 상태를 반환
        if self.step_counter + self.n_timesteps < self.sample_size:
            states = self.xdata[self.step_counter : self.step_counter +							 self.n_timesteps]
            return states
        # 현재 시점에서 20개 이상 데이터를 취할수 없는 경우 이전 상태를 반환
        else:
            # print("No More Data.")
            self.done = True
            states = self.xdata[self.step_counter-1 : self.step_counter + 							self.n_timesteps-1]
            return states

    @property
    def actions(self):   # action 공간을 반환
        actions = [0, 1, 2]  # 세 가지 행동( 0:보류, 1:구매, 2:판매 )
        return actions

    @staticmethod
    def from_spec(spec, kwargs):   # 환경을 기록
        env = tensorforce.util.get_object(
            obj=spec,
            predefined_objects=tensorforce.environments.environments,
            kwargs=kwargs)
        assert isinstance(env, Environment)
        return env

In [None]:
#DQN.py#

import numpy as np
import random
from collections import deque
from keras.models import Sequential
from keras.layers import Dense, Dropout, Activation
from keras.optimizers import Adam

# 파라미터 설정
BATCH_SIZE = 64
GAMMA = 0.9
EPSILON = 1.0
EPSILON_MIN = 0.01
EPSILON_DECAY = 0.995
LEARNING_RATE = 0.001
TAU = 0.05 

class DQN:
    def __init__(self, env):
        self.env = env   # 학습환경 설정
        self.memory  = deque(maxlen=2000)  # 초기화 매개변수 설정
        self.gamma = GAMMA  
        self.epsilon = EPSILON 
        self.epsilon_min = EPSILON_MIN  
        self.epsilon_decay = EPSILON_DECAY  
        self.learning_rate = LEARNING_RATE  
        self.tau = TAU 
        self.eval_model = self.create_model()   # 평가 네트워크 생성
        self.target_model = self.create_model()   # 타겟 네트워크 생성

    def create_model(self):
        model   = Sequential()   # 모델 생성
        state_shape  = self.env.states.shape

        # 첫번째 히든 레이어 설정
        model.add(Dense(32, input_dim=state_shape[1], activation="relu"))
        model.add(Dropout(0.5))

        # 두번째 히든 레이어 설정
        model.add(Dense(64, activation="relu"))
        model.add(Dropout(0.5))

        # 세번째 히든 레이어 설정
        model.add(Dense(32, activation="relu"))
        model.add(Dropout(0.5))

        # 출력 레이어 설정(soft-max)
        model.add(Dense(self.env.n_actions, activation="linear"))

        # 네트워크 모델 생성
        model.compile(loss="mean_squared_error", optimizer=Adam(lr=self.learning_rate))
        
        return model

    def act(self, state):
        self.epsilon *= self.epsilon_decay    # epsilon 없데이트
        self.epsilon = max(self.epsilon_min, self.epsilon)

        if np.random.random() < self.epsilon:
            return np.random.randint(0, self.env.n_actions)
        return np.argmax(self.eval_model.predict(state)[0])

    def remember(self, state, action, reward, new_state, done):
        self.memory.append([state, action, reward, new_state, done])  # 메모리 설정

    def replay(self):
        batch_size = BATCH_SIZE    # 배치 수 설정
        if len(self.memory) < batch_size: 
            return
        samples = random.sample(self.memory, batch_size)    # 랜덤 샘플링

        for sample in samples:
            state, action, reward, new_state, done = sample
            # 타겟 모델을 사용해 예측하고 예측된 결과를 대상에 저장
            target = self.target_model.predict(state)
            if done:
                target[0][action] = reward
            else:
                Q_future = max(self.target_model.predict(new_state)[0])
                target[0][action] = reward + Q_future * self.gamma
            self.eval_model.fit(state, target, epochs=1, verbose=0)

    def target_train(self):
        # 평가 네트워크의 가중치 매개변수를 가져옴
        eval_weights = self.eval_model.get_weights()
        # 타겟 네트워크의 가중치 매개변수를 가져옴
        target_weights = self.target_model.get_weights()
        # 타겟 네트워크의 가중치 매개변수를 하나씩 업데이트
        for i in range(len(target_weights)):
            # 평가 및 타겟 네트워크의 가중치 매개변수값을 tau로 조정하여 업데이트
            target_weights[i] = eval_weights[i] * self.tau + target_weights[i] * 							     (1 - self.tau)
        # 업데이트된 가중치 매개변수를 타겟 네트워크로 설정
        self.target_model.set_weights(target_weights)
        
    def save_model(self, fn):
        self.eval_model.save(fn)    # 모델 저장

In [None]:
#Trainer.py#

import Environment
import DQN
import numpy as np
import tensorflow as tf
from keras import backend as K
from keras.models import load_model

GAMMA = 0.9   # 보상 감소 비율
EPSILON = 1.0   # EPSILON 기본값 설정
EPSILON_MIN = 0.01   # 최소 EPSILON 설정
EPSILON_DECAY = 0.995   # EPSILON 감소 비율

class Runner:
    def __init__(self):
        # 모델을 사용하기 전 이전 모델이 차지한 메모리를 삭제
        K.clear_session()
        tf.reset_default_graph()
        
        # 보상 감소 비율 초기화
        self.gamma = GAMMA
        # epsilon 초기화
        self.epsilon = EPSILON
        self.epsilon_min = EPSILON_MIN  
        self.epsilon_decay = EPSILON_DECAY  
        self.train_success = False
        
        # 훈련을 마친 뒤, 테스트를 위해 최종 훈련된 모델의 이름을 반환
        self.model_name = ''
        
    def trainer(self, symbol, env, epochs):
        self.env = env   # 액세스 설정
        self.dqn_agent = DQN.DQN(env=self.env)   # DQN Agent 초기화
        self.epochs = epochs   # epoch 초기화
        
        self.epoch_len = self.env.sample_size – self.env.n_timesteps  # 훈련 단계 수 초기화
        
        for epoch in range(self.epochs):   # 훈련 시작
            self.env.reset()   # 훈련 환경 초기화
            cur_state = self.env.states   # 현재 상태 가져오기
            fortune = list()   # 자본 기록 초기화
            cash = list()   # 현금 기록 초기화
            act = list()   # action 기록 초기화
            re = list()   # 보상기록 초기화
            
            for step in range(self.epoch_len):
                action = self.dqn_agent.act(cur_state) # 현재 상태에 따라 action을 선택
                new_state, done, reward = self.env.execute(action) # 상태 반환
                act.append(action)   # 반환된 action을 action 기록에 추가
                re.append(reward)   # 반환된 보상을 보상 기록에 추가
                cash.append(new_state[-1,6])   # 반환된 현금을 현금 기록에 추가
                
                # 현재 상태의 총 자본 가치를 기록하고 자본 기록에 추가
                _fortune = new_state[-1,6] + new_state[-1,7]
                fortune.append(_fortune)

                # 기억 메모리에 저장
                self.dqn_agent.remember(cur_state, action, reward, new_state, done)
                self.dqn_agent.replay() 
                if step > 20:
                    self.dqn_agent.target_train()   # 모델 매개변수 업데이트
                cur_state = new_state            # 다음 상태 가져오기

                # 종료상태인 경우 훈련을 종류하고 그렇지 않으면 훈련을 계속
                if done:
                    # 모델 훈련 완료시 모델을 저장
                    if fortune[-1] >= 120000. and cash[-1] >= 0.:
                        self.train_success = True
                        self.model_name = "success-model-{}-{}.h5“
						.format(symbol, epoch)
                        self.dqn_agent.save_model(self.model_name)
                        
                    # 모델 훈련 미완료시 모델 제외
                    else:
                        self.train_success = False
                        self.model_name = "train-model-{}-{}.h5".
						   format(symbol, epoch)
                        self.dqn_agent.save_model(self.model_name)
                    break
            print("Epoch {}: Fortune-{}, Cash-{}, Reward-{}".format(str(epoch),
		fortune[-1], cash[-1], re[-1]))    
        return self.model_name   # 모든 훈련을 마친 후 모델명 변환   
                       
    def tester(self, env, model_name):
        self.env = env   # 검증 데이터를 기반으로 환경 초기화
        self.epoch_len = self.env.sample_size – self.env.n_timesteps   
        self.model_name = model_name   # 모델 이름 초기화
        self.test_model = load_model(self.model_name)   # 검증할 모델 불러오기
        self.env.reset()   # 검증 환경 초기화
        cur_state = self.env.states   # 현재상태 가져오기
        fortune = list()   # 자본 기록 초기화
        cash = list()   # 현금 기록 초기화
        act = list()   # action 기록 초기화
        re = list()   # 보상 기록 초기화
        
        for step in range(self.epoch_len):    # 검증 시작
            self.epsilon *= self.epsilon_decay   # epsilon 업데이트
            
            # epsilon과 epsilon_min 중 큰 값으로 epsilon 업데이트
            self.epsilon = max(self.epsilon_min, self.epsilon)
            
            # 발생시킨 난수가 epsilon보다 작으면, 무작위로 action을 선택
            if np.random.random() < self.epsilon:
                action = np.random.randint(0, self.env.n_actions)
                
            # 발생시킨 난수가 epsilon보다 크거나 같으면 검증 모델에 따라 action을 선택
            else:
                action = np.argmax(self.test_model.predict(cur_state)[0])

            # action 실행에 따른 상태 반환
            new_state, done, reward = self.env.execute(action)
            act.append(action)   # 반환된 action을 action 기록에 추가
            re.append(reward)   # 반환된 보상을 보상 기록에 추가
            cash.append(new_state[-1,6])  # 반환된 현금을 현금 기록에 추가  
            
            # 현재 상태의 총 자본 가치를 기록하고 자본 기록에 추가  
            _fortune = new_state[-1,6] + new_state[-1,7]
            fortune.append(_fortune)
            cur_state = new_state   # 다음 상태 가져오기
    
            if done:    # 검증 종료 
                break
        
        print("Test Result: Fortune-{}, Cash-{}, Reward-{}".format(fortune[-1], 	     cash[-1], re[-1]))          

        return fortune, act, re, cash # 결과 반환

In [None]:
#Main.py#

import os
import sys
import tensorflow as tf
import pandas as pd
import Preprocess
import Environment
import DQN
import Trainer
import numpy as np
import matplotlib.pyplot as plt

global _symbol
global _split_ratio
global _epochs

_symbol = "AAPL"
_split_ratio = 0.8
_epochs = 5

data_ex = pd.read_csv("apple.csv") #apple 주식데이터 2016년 3월 ~ 2018년 3월

# 훈련용, 검증용 데이터 분리
train, test = Preprocess.data_preprocess(data_ex, _split_ratio)

# 훈련 및 검증 환경 생성
env_train = Environment.StockEnv(train)
env_test = Environment.StockEnv(test)
runner = Trainer.Runner()

# DQN을 훈련시키고, 훈련된 모델로 돌아가 최종 결과를 업데이트
trained_model = runner.trainer(_symbol, env_train, _epochs)

# 훈련 된 trained_Q를 사용하여 테스트 데이터를 분석하고 예측 된 최종 거래 행동을 제공
fortune, act, reward, cash = runner.tester(env_test, trained_model)
print("fortune:{},act:{},reward:{},cash:{}".format(fortune[-1], act[-1], reward[-1], cash[-1]))

print(fortune)   # 설정한 기간(100일) 동안의 재산
print(act)       # 설정한 기간(100일) 동안의 행동(1= 매수, 2= 매도, 3 = 보류)
print(reward)   # 설정한 기간(100일) 동안의 보상
print(cash)     # 설정한 기간(100일) 동안의 현금
close_price = test[5:, 3] #검증용 데이터의 종가 정보를 가져옴

fig = plt.figure()
ax = fig.add_subplot(111)
ax.plot(np.arange(len(close_price)), close_price)
ax.set_xlim((0, len(close_price)))
ax.set_ylim((np.min(close_price), np.max(close_price)))
ax.set_xlabel("Steps")
ax.set_ylabel("Close Price")
ax.set_title('Trade Point predicted by DQN Trader')

# 검증용 데이터의 종가 정보에 따른 강화학습 모델의 매수/매도 행동을 확인
# 빨간색: 매수
# 초록색: 매도

for i in range(len(act)):
    if act[i] == 1:
        ax.scatter(x=i, y=close_price[i], c='r', marker='o', linewidths=0, label='Buy')
    if act[i] == 2:
        ax.scatter(x=i, y=close_price[i], c='g', marker='o', linewidths=0, label='Sell')
