# 필요한 변수 크롤링 및 병합

In [1]:
import pandas as pd
import numpy as np
from bs4 import BeautifulSoup
from urllib.request import Request, urlopen
import requests
from datetime import datetime
from functools import reduce

## USD KRW 크롤링

In [2]:
url = Request('https://kr.investing.com/currencies/usd-krw-historical-data', headers={'user-agent': 'Mozilla/5.0 (Windows NT 11.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.5414.76 Safari/537.36 NetHelper70'})

res = urlopen(url)
bs = BeautifulSoup(res, 'html.parser')

table = bs.select_one('#__next > div.desktop\:relative.desktop\:bg-background-default > div > div > div.grid.gap-4.tablet\:gap-6.grid-cols-4.tablet\:grid-cols-8.desktop\:grid-cols-12.grid-container--fixed-desktop.general-layout_main__lRLYJ > main > div > div:nth-child(4) > div > div:nth-child(1) > div > div.border.border-main > div > table')

rows=table.select('#__next > div.desktop\:relative.desktop\:bg-background-default > div > div > div.grid.gap-4.tablet\:gap-6.grid-cols-4.tablet\:grid-cols-8.desktop\:grid-cols-12.grid-container--fixed-desktop.general-layout_main__lRLYJ > main > div > div:nth-child(4) > div > div:nth-child(1) > div > div.border.border-main > div > table > tbody > tr')

a_list = []
b_list = []

for i in rows:
    a = i.find('time').text.replace(' ', '')
    a_list.append(a)
    
    b = i.find('td', attrs={'dir': 'ltr'}).text.replace(',', '')
    b_list.append(b)

data = {'date': a_list, 'price': b_list}
df1 = pd.DataFrame(data)
df1 = df1.sort_values(by='date')

## 달러지수

In [3]:
url = Request('https://kr.investing.com/currencies/us-dollar-index-historical-data', headers={'user-agent': 'Mozilla/5.0 (Windows NT 11.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.5414.76 Safari/537.36 NetHelper70'})

res = urlopen(url)
bs = BeautifulSoup(res, 'html.parser')

table = bs.find('table', {'id': 'curr_table'})

data = []
for row in table.find_all('tr'):
    cols = row.find_all('td')
    cols = [col.text.strip() for col in cols[:2]]  # Only extract first 2 columns
    data.append(cols)

columns = ['date', 'price']

df2 = pd.DataFrame(data[1:], columns=columns)

df2['date'] = pd.to_datetime(df2['date'], format='%Y년 %m월 %d일')
df2 = df2.sort_values(by='date')
df2['date'] = df2['date'].apply(lambda x: datetime.strftime(x, '%Y-%m-%d'))

##  crb

In [4]:
url = Request('https://kr.investing.com/indices/thomson-reuters---jefferies-crb-historical-data', headers={'user-agent': 'Mozilla/5.0 (Windows NT 11.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.5414.76 Safari/537.36 NetHelper70'})

res = urlopen(url)
bs = BeautifulSoup(res, 'html.parser')

table=bs.select_one('#__next > div.desktop\:relative.desktop\:bg-background-default > div > div > div.grid.gap-4.tablet\:gap-6.grid-cols-4.tablet\:grid-cols-8.desktop\:grid-cols-12.grid-container--fixed-desktop.general-layout_main__lRLYJ > main > div > div:nth-child(4) > div > div > div.border.border-main > div > table')
# print(table)

rows=table.select('#__next > div.desktop\:relative.desktop\:bg-background-default > div > div > div.grid.gap-4.tablet\:gap-6.grid-cols-4.tablet\:grid-cols-8.desktop\:grid-cols-12.grid-container--fixed-desktop.general-layout_main__lRLYJ > main > div > div > div > div > div.border.border-main > div > table > tbody > tr')

a_list = []
b_list = []

for i in rows:
    a = i.find('time').text.replace(' ', '')
    a_list.append(a)
    
    b = i.find('td', attrs={'dir': 'ltr'}).text
    b_list.append(b)

data = {'date': a_list, 'price': b_list}
df3 = pd.DataFrame(data)
df3 = df3.sort_values(by='date')
# print(df4)

## 병합

In [5]:
merged=reduce(lambda x,y: pd.merge(x,y, on='date', how ='outer'), [df1, df2, df3])

merged.columns=['date', 'USD/KRW', '달러지수', 'crb']

merged['date'] = pd.to_datetime(merged['date'])
merged['USD/KRW'] = pd.to_numeric(merged['USD/KRW'], errors='coerce')
merged['달러지수'] = pd.to_numeric(merged['달러지수'], errors='coerce')
merged['crb'] = pd.to_numeric(merged['crb'], errors='coerce')

existing_data=pd.read_excel('./최종지수.xlsx')

combined_data = pd.concat([existing_data, merged])

combined_data = combined_data.drop_duplicates(subset=['date'],keep='last').sort_values('date')
# print(combined_data)

combined_data.to_excel('./최종지수.xlsx', index=False)

# 저장한 자료 불러오기

## 자료불러오기

In [6]:
최종지수 = pd.read_excel('./최종지수.xlsx').set_index('date').fillna(method='ffill')

In [7]:
최종지수 = 최종지수['1998-03-23':]

## 결측치 및 자료 길이 확인

In [8]:
np.sum(최종지수.isna())

USD/KRW    0
달러지수       0
crb        0
dtype: int64

In [9]:
len(최종지수)

9149

In [10]:
data = 최종지수

# 모델

## 파라미터 조정

In [11]:
import tensorflow as tf
import matplotlib as mpl
import matplotlib.pyplot as plt
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.callbacks import EarlyStopping

In [12]:
BATCH_SIZE = 32
TRAIN_SPLIT = int(len(data)*0.9)
EVALUATION_INTERVAL = int(TRAIN_SPLIT/BATCH_SIZE)
EPOCHS = 100
VALIDATION_EVALU_INTERVAL = int((len(data)-TRAIN_SPLIT)/BATCH_SIZE)
BUFFER_SIZE = 64
past_history = 10
future_target = int(input('향후 몇일의 예측값을 보고 싶으신가요 (권장 1일) :'))
STEP = 1
#drop_rate = 0.1
patience = 40

향후 몇일의 예측값을 보고 싶으신가요 (권장 1일) :1


## 데이터 표준화 

In [13]:
dataset = data.values
data_mean = dataset.mean(axis=0)
data_std = dataset.std(axis=0)
dataset = (dataset-data_mean)/data_std

print(dataset.shape)
print(dataset)

(9149, 3)
[[ 2.06145235  0.71783269 -0.95960735]
 [ 2.15002108  0.72416426 -0.99916922]
 [ 2.25723586  0.6997425  -0.97938829]
 ...
 [ 1.47755134  0.8168765   0.53435843]
 [ 1.47764457  0.8168765   0.53435843]
 [ 1.54374481  0.82818287  0.53435843]]


## 학습과 검증데이터 분리

In [14]:
def split_data(dataset, target, start_index, end_index, history_size, target_size, step):
    data = []
    labels = []

    start_index = start_index + history_size
    if end_index is None:
        end_index = len(dataset) - target_size+1
    

    for i in range(start_index, end_index):
        labels.append(target[i:i + target_size])
        indices = range(i - history_size, i, step)
        data.append(dataset[indices])
    
    return np.array(data), np.array(labels)

In [15]:

x_train, y_train = split_data(dataset, dataset[:,0], 0, TRAIN_SPLIT, past_history, future_target, STEP)

x_val, y_val = split_data(dataset, dataset[:,0], TRAIN_SPLIT, None, past_history, future_target, STEP)

train_data= tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_data = train_data.cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE).repeat()

val_data = tf.data.Dataset.from_tensor_slices((x_val, y_val))
val_data = val_data.cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE).repeat()

## 모델 구축 & 학습

In [None]:
model = tf.keras.models.Sequential()

model.add(tf.keras.layers.LSTM(200, input_shape = x_train.shape[-2:], activation='tanh'))
model.add(tf.keras.layers.Dense(future_target))


model.compile(optimizer=tf.keras.optimizers.RMSprop(), loss='mae')
earlystopping = EarlyStopping(monitor='val_loss',patience=patience, mode='auto')
checkpoint = ModelCheckpoint( 'model_weight.ckpt',
                             save_best_only=True,
                              save_weights_only=True,
                              monitor='val_loss', 
                              verbose=1)
model.fit(train_data, epochs=EPOCHS, steps_per_epoch=EVALUATION_INTERVAL,
          validation_data=val_data, validation_steps= VALIDATION_EVALU_INTERVAL,
          callbacks=[earlystopping,checkpoint], verbose=1)


Epoch 1/100
Epoch 1: val_loss improved from inf to 0.15087, saving model to model_weight.ckpt
Epoch 2/100
Epoch 2: val_loss improved from 0.15087 to 0.08064, saving model to model_weight.ckpt
Epoch 3/100
Epoch 3: val_loss improved from 0.08064 to 0.06820, saving model to model_weight.ckpt
Epoch 4/100
Epoch 4: val_loss improved from 0.06820 to 0.05854, saving model to model_weight.ckpt
Epoch 5/100
Epoch 5: val_loss did not improve from 0.05854
Epoch 6/100
Epoch 6: val_loss improved from 0.05854 to 0.04913, saving model to model_weight.ckpt
Epoch 7/100
Epoch 7: val_loss did not improve from 0.04913
Epoch 8/100
Epoch 8: val_loss did not improve from 0.04913
Epoch 9/100
Epoch 9: val_loss improved from 0.04913 to 0.03963, saving model to model_weight.ckpt
Epoch 10/100
Epoch 10: val_loss did not improve from 0.03963
Epoch 11/100
Epoch 11: val_loss did not improve from 0.03963
Epoch 12/100
Epoch 12: val_loss did not improve from 0.03963
Epoch 13/100
Epoch 13: val_loss did not improve from 0.0

Epoch 38/100
Epoch 38: val_loss did not improve from 0.03633
Epoch 39/100
Epoch 39: val_loss did not improve from 0.03633
Epoch 40/100
Epoch 40: val_loss did not improve from 0.03633
Epoch 41/100
 38/257 [===>..........................] - ETA: 1s - loss: 0.0438

In [None]:
model.load_weights('model_weight.ckpt')

# Loss  시각화

In [None]:
# Loss 함수
def plot_train_history(model, title):
    loss = model.history.history['loss']
    val_loss = model.history.history['val_loss']

    epochs = range(len(loss))

    plt.figure()

    plt.plot(epochs, loss, 'b', label='Training loss')
    plt.plot(epochs, val_loss, 'r', label='Validation loss')
    plt.title(title)
    plt.legend()

    plt.show()

In [None]:
plot_train_history(model, 'Loss')

# 모델 검증 (MAE , R-square)

## 환율의 평균과 표준편차 구하기

In [None]:
FX_Mean = data.values[:,0].mean()
FX_St = data.values[:,0].std()

## 예측치와 실제값

In [None]:
predict = model.predict(x_val)*FX_St+FX_Mean 
Y = y_val*FX_St+FX_Mean

## 검증데이터의 MAE와 R2

In [None]:
from sklearn.metrics import mean_absolute_error ,r2_score
mae = mean_absolute_error(Y, predict)
r2 = r2_score(Y,predict)
print(f'mae: {mae:.5f}')
print(f'r2: {r2:.5f}')


## 가장 큰 오차, 가장 작은 오차 살피기

In [None]:
np.set_printoptions(suppress=True, precision = 5)
print(f'최대 오차: {np.max(np.abs(Y-predict)):.5f}원')
print(f'최소 오차: {np.min(np.abs(Y-predict)):.5f}원')

plt.plot(np.abs(Y-predict), 'o')
#plt.ylabel('오차 (원)')
plt.yticks(rotation = 'vertical')
plt.xticks(rotation = 'vertical')
plt.xlabel('Date')
plt.ylabel('Error(won)')
plt.grid()
plt.show()

## 예측 시각화 - 그래프 그리기

In [None]:
def create_time_steps(length):
    return list(range(-length, 0))

def multi_step_plot(history, true_future, prediction):
    plt.figure(figsize=(12, 6))
    num_in = create_time_steps(len(history))
    num_out = len(true_future)
    
    # history[:, 1] : 3개의 독립변 수 중 2번째 컬럼이 기온 데이터
    plt.plot(num_in, np.array(history[:,0]), label='History')
    plt.plot(np.arange(num_out)/STEP, np.array(true_future), 'bo', label='True Exchange Rate')
    if prediction.any():
        plt.plot(np.arange(num_out)/STEP, np.array(prediction), 'ro', label='Predicted Future')
    plt.legend()
    plt.show()
    


In [None]:
for x, y in val_data.take(5):
    multi_step_plot(x[0]*FX_St+FX_Mean , y[0]*FX_St+FX_Mean , model.predict(x)[0]*FX_St+FX_Mean )

# 다음날 예측

## 10일 전 독립변수 데이터의 표준화

In [None]:
pred = data[-past_history:].values
pred_mean = pred.mean(axis=0)
pred_std =pred.std(axis=0)
pred = (pred-pred_mean)/pred_std

## 다음날 환율 예측 값

In [None]:
if future_target == 1:
    tmr = model.predict(pred.reshape(-1,pred.shape[-2],pred.shape[-1]))*FX_St+FX_Mean
    tmr = tmr.flatten()
    print(f'내일 환율은 {tmr} 원으로 예상됩니다')

else:
    multiple = model.predict(pred.reshape(-1,pred.shape[-2],pred.shape[-1]))*FX_St+FX_Mean
    multiple = multiple.flatten()
    for num, value in enumerate(multiple,start=1):
        print(f'{num}일 후의 환율은 {value} 원으로 예상됩니다')

## 실제값과 예측값의 데이터 프레임 만들기

In [None]:
predict= pd.DataFrame(predict[:,0], index = data.index[-len(predict):], columns=['predict'])
gather_all = pd.concat([data[-len(predict):], predict], axis=1)
gather_all = gather_all.drop(['달러지수','crb'], axis=1)

## 환율 예측 시각화

In [None]:
import matplotlib.pyplot as plt
plt.figure(figsize=(16,8))
plt.plot(data['USD/KRW'], label ='real')
plt.plot(gather_all['predict'],'r:', label ='predict')
plt.title('Total Data',fontsize =20)
plt.xticks(fontsize =16)
plt.yticks(fontsize =16)
plt.legend(fontsize =16)
plt.show()

plt.plot(gather_all.index,  Y, 'r--', label ='real')
plt.plot(gather_all.index, predict, 'b:', label ='predict')
plt.xticks(rotation = 'vertical')
plt.xlabel('Date',fontsize =12)
plt.ylabel('USD/KRW',fontsize =12)
plt.legend(fontsize =12)
plt.title('Validation Data')
plt.show()

In [None]:
날짜 = int(input('과거 몇일의 데이터를 시각화 할 것인지 입력하세요:'))
plt.plot(gather_all.index[-날짜:],  gather_all['USD/KRW'][-날짜:], 'r--', label ='real')
plt.plot(gather_all.index[-날짜:],gather_all['predict'][-날짜:], 'b:', label ='predict')

plt.xticks(gather_all.index[-날짜:], rotation='vertical')
plt.legend()
plt.grid()
plt.show()

# 환율 추세 맞추기 _ 이동평균선

In [None]:
import matplotlib.pyplot as plt
날짜 = int(input('과거 몇일의 데이터를 시각화 할 것인지 입력하세요:'))


MA = pd.DataFrame(index = gather_all.index)

MA['real'] = gather_all['USD/KRW']
from datetime import timedelta
today = MA.index[-1]
tomorrow = today + timedelta(days=1)

MA = pd.concat([MA, pd.DataFrame(tmr, index=[tomorrow], columns=['real'])])
MA['ma2'] = MA['real'].rolling(window=2).mean()


plt.plot(MA['real'][-날짜:],'r:', label='Real')
plt.plot(MA['ma2'][-날짜:],'y:', label='ma2')


plt.xticks(MA[-날짜:].index,rotation='vertical')


plt.legend(loc="best")
plt.grid()
plt.show()

In [None]:
MA['real-ma2'] = MA['real'] - MA['ma2']

if MA.iloc[-2]['real-ma2']>0:
    if MA.iloc[-2]['real-ma2']*MA.iloc[-1]['real-ma2'] <0:
        print('내일 환율이 떨어질 가능성이 크다')
    else:
        print('내일 환율이 어떻게 될지 모른다')
        
elif MA.iloc[-2]['real-ma2']<0:
    if MA.iloc[-2]['real-ma2']*MA.iloc[-1]['real-ma2'] <0:
        print('내일 환율이 오를 가능성이 크다')
    else:
        print('내일 환율이 어떻게 될지 모른다')
        
else:
    if MA.iloc[-3]['real-ma2']>0:
        if MA.iloc[-3]['real-ma2']*MA.iloc[-2]['real-ma2'] <0:
            print('내일 환율이 떨어질 가능성이 크다')
        else:
            print('내일 환율이 어떻게 될지 모른다')
    elif MA.iloc[-3]['real-ma2']<0:
        if MA.iloc[-3]['real-ma2']*MA.iloc[-2]['real-ma2'] <0:
            print('내일 환율이 오를 가능성이 크다')
        else:
            print('내일 환율이 어떻게 될지 모른다')
    else:
        if MA.iloc[-4]['real-ma2']>0:
            if MA.iloc[-4]['real-ma2']*MA.iloc[-3]['real-ma2'] <0:
                print('내일 환율이 떨어질 가능성이 크다')
            else:
                print('내일 환율이 어떻게 될지 모른다')
        elif MA.iloc[-4]['real-ma2']<0:
            if MA.iloc[-4]['real-ma2']*MA.iloc[-3]['real-ma2'] <0:
                print('내일 환율이 오를 가능성이 크다')
            else:
                print('내일 환율이 어떻게 될지 모른다')
        else:
            print('내일 환율이 어떻게 될지 모른다')
        