In [64]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
from sklearn.metrics import mean_squared_error, mean_absolute_error

import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
from torch.optim import Adam
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import mean_squared_error, mean_absolute_error

from dateutil.relativedelta import relativedelta
from pytimekr import pytimekr
from datetime import datetime
import random

In [65]:
# 시드 고정
def seed_everything(seed_value):
    random.seed(seed_value)
    np.random.seed(seed_value)
    torch.manual_seed(seed_value)
    
    if torch.cuda.is_available(): 
        torch.cuda.manual_seed(seed_value)
        torch.cuda.manual_seed_all(seed_value)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

In [66]:
# Sequence generation and multi-step target creation
def generate_sequences(data, seq_length, n_steps):
    sequences_x = []
    sequences_y = []
    
    for i in range(len(data) - seq_length - n_steps + 1):
        sequences_x.append(data[i:i+seq_length, :])
        sequences_y.append(data[i+seq_length:i+seq_length+n_steps, 0]) # We predict 'CNY_KRW_Close'
    
    return np.array(sequences_x), np.array(sequences_y)

# CNN model definition
class CNNModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, kernel_size=3):
        super(CNNModel, self).__init__()
        self.cnn = nn.Sequential(
            nn.Conv1d(input_dim, hidden_dim, kernel_size),
            nn.ReLU(),
            nn.Conv1d(hidden_dim, hidden_dim, kernel_size),
            nn.ReLU(),
        )
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.output_dim = output_dim

    def forward(self, x):
        x = x.permute(0, 2, 1)
        x = self.cnn(x)
        x = x.mean(2)
        x = self.fc(x)
        return x.view(-1, self.output_dim)

# Train function
def train(model, train_loader, loss_function, optimizer):
    model.train()
    train_loss = 0.0
    for inputs, targets in train_loader:
        outputs = model(inputs)
        loss = loss_function(outputs, targets)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
    return train_loss / len(train_loader)

# MAPE 계산을 위한 함수 정의
def mean_absolute_percentage_error(y_true, y_pred): 
    y_true, y_pred = np.array(y_true), np.array(y_pred)
    epsilon = 1e-7  # 아주 작은 상수
    return np.mean(np.abs((y_true - y_pred) / (y_true + epsilon))) * 100


def test(model, test_loader, loss_function):
    model.eval()
    test_loss = 0.0
    all_outputs = []
    all_targets = []
    with torch.no_grad():
        for inputs, targets in test_loader:
            outputs = model(inputs)
            test_loss += loss_function(outputs, targets).item()
            
            all_outputs.append(outputs.detach().numpy())
            all_targets.append(targets.detach().numpy())
            
    all_outputs = np.concatenate(all_outputs)
    all_targets = np.concatenate(all_targets)
    
    mse = mean_squared_error(all_targets, all_outputs)
    mae = mean_absolute_error(all_targets, all_outputs)
    mape = mean_absolute_percentage_error(all_targets, all_outputs)
    
    return test_loss / len(test_loader), mse, mae, mape

In [88]:
def prediction(country):
    data = pd.read_csv(r'C:\Users\whfhr\Desktop\신한 AI\사용data\Total.csv')
    data = data[sum(columns[country], [])]

    seq_length = 30 # 입력 시퀀스
    n_steps = 60 # 출력 시퀀스 n_steps의 스텝을 에측.

    # 원본 데이터에서 끝의 n_steps개 데이터를 제외한 데이터에 대한 작업(data leakage를 완전 방지기 위함)
    
    source_data = data
    data = source_data.head(len(data) - n_steps)

    # Data scaling
    scaler = MinMaxScaler()
    scaled_data = scaler.fit_transform(data)

    X, y = generate_sequences(scaled_data, seq_length, n_steps)

    train_size = int(len(X) * 0.8)
    X_train, X_test = X[:train_size], X[train_size:]
    y_train, y_test = y[:train_size], y[train_size:]

    # PyTorch tensors and DataLoader
    train_dataset = TensorDataset(torch.Tensor(X_train), torch.Tensor(y_train))
    test_dataset = TensorDataset(torch.Tensor(X_test), torch.Tensor(y_test))

    train_loader = DataLoader(train_dataset, batch_size=128)
    test_loader = DataLoader(test_dataset, batch_size=128)
    
    seed_everything(42)
    
    input_dim = X_train.shape[2]
    hidden_dim = 64
    output_dim = y_train.shape[1]
    kernel_size = 3

    model = CNNModel(input_dim, hidden_dim, output_dim, kernel_size)

    # Loss function and optimizer
    loss_function = nn.MSELoss()
    optimizer = Adam(model.parameters(), lr=0.001)

    train_losses = []
    test_losses = []

    n_epochs = 20

    for epoch in range(n_epochs):
        train_loss = train(model, train_loader, loss_function, optimizer)
        test_metrics = test(model, test_loader, loss_function)  # test_metrics는 이제 튜플입니다.
        test_loss = test_metrics[0]  # test_loss는 튜플의 첫 번째 요소입니다.
        train_losses.append(train_loss)
        test_losses.append(test_loss)
        
    test_loss, mse, mae, mape = test(model, test_loader, loss_function)

    # Prepare the last n_steps sequence from the test data
    last_sequence = pd.DataFrame(scaled_data).tail(seq_length).values

    # Predict the next n_steps
    next_n_steps_predictions = np.empty((n_steps, 1))
    for i in range(n_steps):
        next_step_prediction = model(torch.Tensor(last_sequence).unsqueeze(0)).detach().numpy()[:, 0]  # skip across feature dimensions
        next_n_steps_predictions[i, :] = next_step_prediction.reshape(-1, 1)

        # Change - Update last_sequence correctly considering dimensions
        last_sequence = np.roll(last_sequence, shift=-1, axis=0)
        last_sequence[-1, 0] = next_step_prediction

    # Inverse transformation
    next_n_steps_predictions_inverse = scaler.inverse_transform(
        np.hstack((next_n_steps_predictions, np.zeros((next_n_steps_predictions.shape[0], data.shape[1] - next_n_steps_predictions.shape[1]))))
    )[:, 0]
    return source_data[columns[country][0]].tail(n_steps+1).iloc[0], next_n_steps_predictions_inverse

def prediction_date_func():
    n_steps = 60
    start = datetime.today().replace(month = 1, day=1)
    end = start + relativedelta(years=1)
    date = pd.date_range(start, periods=(end-start).days+1)

    week = []
    for i in date:
        if i.weekday() == 6 or i.weekday() == 5:
            week.append(i)
        else: pass
    
    holidays = pytimekr.holidays()

    date = list(map(lambda x: x.strftime("%Y-%m-%d"), date))
    week = list(map(lambda x: x.strftime("%Y-%m-%d"), week))
    holidays = list(map(lambda x: x.strftime("%Y-%m-%d"), holidays))

    date = list(set(date) - set(week))
    date = list(set(date) - set(holidays))
    date.sort()

    today = datetime.today().strftime("%Y-%m-%d")

    if today in date:
        prediction_date = date[date.index(today)+1:date.index(today)+n_steps+1]
    else:
        for i in date:
            if i > today:
                prediction_date = date[date.index(i):date.index(i)+n_steps]
                break
            else: pass
    return prediction_date

In [81]:
def recommend_date(country):
    _, next_n_steps_predictions_inverse = prediction(country)
    prediction_date = prediction_date_func()
            
    next_n_steps_predictions_inverse = list(next_n_steps_predictions_inverse)
    minidx = next_n_steps_predictions_inverse.index(min(next_n_steps_predictions_inverse))
    
    return prediction_date[minidx]

In [84]:
columns = {"미국" : [['USD_KRW_Close'],['Korea_KOSDAQ_Close']],
          "일본" : [['JPY_KRW_Close'],['Korea_KOSDAQ_Close', 'Korea_KOSPI200_Close']],
          "영국" : [['GBP_KRW_Close'],["USA_DOW_Close"]],
          "유로존" : [['EUR_KRW_Close'],["USA_DOW_Close"]],
          "중국" : [['CNY_KRW_Close'],["USA_DOW_Close"]]}

In [92]:
def recommend_date_result():
    prediction_date = prediction_date_func()
    print("(오늘이 2022년 )")
    print("{} ~ {} 기간 내에 지정하신 국가의 환율이 낮은 날을 추천을 해드립니다.".format(prediction_date[0],prediction_date[-1]))
    print("미국, 일본, 영국, 유로존, 중국 중 여행을 떠날 나라를 지정해주세요.")
    country = input()
    
    if country in ["미국", "일본", "영국", "유로존", "중국"]:
        date = recommend_date(country)
        print("\n(개장일 기준) {} ~ {} 중 {} 환율이 낮은 날은 \"{}(+-2일)\"부근입니다.".format(prediction_date[0],prediction_date[-1],country,date))
    else:
        print("\n국가를 다시 확인해주세요.\n")
        recommend_date_result()
recommend_date_result()


(오늘이 2022년 )
2023-05-15 ~ 2023-08-08 기간 내에 지정하신 국가의 환율이 낮은 날을 추천을 해드립니다.
미국, 일본, 영국, 유로존, 중국 중 여행을 떠날 나라를 지정해주세요.
미국

(개장일 기준) 2023-05-15 ~ 2023-08-08 중 미국 환율이 낮은 날은 "2023-08-08(+-2일)"부근입니다.


In [107]:
def recommend_country():
    today_usd, USD = list(prediction("미국"))
    today_jpy, JPY = list(prediction("일본"))
    today_gbp, GBP = list(prediction("영국"))
    today_eur, EUR = list(prediction("유로존"))
    today_cny, CNY = list(prediction("중국"))
    
    prediction_date = prediction_date_func()
    
    enc_usd = (list(USD)/today_usd.values*100)
    enc_jpy = (list(JPY)/today_jpy.values*100)
    enc_gbp = (list(GBP)/today_gbp.values*100)
    enc_eur = (list(EUR)/today_eur.values*100)
    enc_cny = (list(CNY)/today_cny.values*100)
    
    min_value = [min(enc_usd),min(enc_jpy),min(enc_gbp),min(enc_eur),min(enc_cny)]
    min_idx = min_value.index(min(min_value))
    enchage = [enc_usd, enc_jpy, enc_gbp, enc_eur, enc_cny]
    
    idx = list(enchage[min_idx]).index(min(enchage[min_idx]))
    
    country = ["미국", "일본", "영국", "유로존", "중국"]
    
    return country[min_idx], prediction_date[idx]

In [108]:
print("미국, 일본, 영국, 유로존, 중국 중 추천을 해드립니다.")
def recommend_country_result():
    prediction_date = prediction_date_func()

    country, date = recommend_country()
    print("\n현재 대비 지정일에 환율이 가장 낮은 나라는 {} 이고, {} 날 입니다.".format(country, date))
    
recommend_country_result()


미국, 일본, 영국, 유로존, 중국 중 추천을 해드립니다.

현재 대비 지정일에 환율이 가장 낮은 나라는 미국 이고, 2023-08-08 날 입니다.
