In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session


In [None]:
!pip install finance-datareader

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import FinanceDataReader as fdr
from sklearn import preprocessing
import matplotlib.pyplot as plt
import datetime

start_date = datetime.datetime.now() - datetime.timedelta(days=365*20)
# 현재 시간으로부터 20년 전 날짜 계산

df_heelim = fdr.DataReader('037440', start_date)
normalizer = preprocessing.MinMaxScaler()

prep_heelim = df_heelim
prep_heelim['Volume'] = df_heelim['Volume'].replace(0, np.nan)
prep_heelim = prep_heelim.dropna()
prep_heelim

plt.figure(figsize=(7,4))

plt.title('prep_heelim')
plt.ylabel('price (won)')
plt.xlabel('period (day)')
plt.grid()

plt.plot(df_heelim['Close'], label='Close', color='r')
plt.legend(loc='best')

plt.show()

norm_cols = ['Open', 'Close', 'Volume']
norm_heelim_np = normalizer.fit_transform(prep_heelim[norm_cols])

norm_heelim = pd.DataFrame(norm_heelim_np, columns=norm_cols)
norm_heelim

plt.title('norm_heelim')
plt.plot(norm_heelim['Close'], label='Close', color='purple')
plt.show()

In [None]:
import torch
import torch.nn as nn
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import FinanceDataReader as fdr
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split

In [None]:
type(df_heelim)
df_heelim.head(1)

In [None]:
data = df_heelim[["Open", "Close", "Volume"]]
data = data.dropna() # 결측치 제거
data = data.apply(pd.to_numeric, errors='coerce') # 문자열을 NaN으로 변환
data = data.dropna() # NaN 제거
data = data.values # pandas DataFrame을 numpy 배열로 변환
data = np.array(data, dtype='float64')


In [None]:
# 정규화
mean = np.mean(data, axis=0)
std = np.std(data, axis=0)
data = (data - mean) / std

# 데이터 분할
train_data = data[:-10]
test_data = data[-10:]

# 하이퍼파라미터 설정
input_size = 3
hidden_size = 64
num_layers = 2
output_size = 13
seq_length = 40
learning_rate = 0.01
num_epochs = 100

# RNN 모델 정의
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(RNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        out, _ = self.rnn(x, h0)
        out = self.fc(out[:, -1, :])
        return out

In [None]:
# 모델 초기화
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = RNN(input_size, hidden_size, num_layers, output_size).to(device)

# 손실 함수와 최적화 알고리즘 정의
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)


In [None]:
train_data.shape

In [None]:
train_data

In [None]:
# 학습
for epoch in range(num_epochs):
    for i in range(0, train_data.shape[0]-seq_length, seq_length):
        seq = train_data[i:i+seq_length]
        seq = torch.from_numpy(seq).float().to(device)
        label = train_data[i+seq_length]
        
        print(label[1])
        
        
        label = np.digitize(label[1], [-1.0, -0.9, -0.8, -0.7, -0.6, -0.5, -0.4, -0.3, -0.2, -0.1, 
                                       
                                       0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]) - 1
        label = torch.tensor(label).to(device)
        
        optimizer.zero_grad()
        
        print(seq.shape)
        print(seq)
        
        output = model(seq.unsqueeze(0))
        print(float(output[0][0]))
        print(label)
#         output = model(seq)
        loss = criterion(torch.tensor(float(output[0][0])), label)
        loss.backward()
        optimizer.step()

    if (epoch+1) % 10 == 0:
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")


In [None]:

# 테스트
with torch.no_grad():
    seq = test_data[:seq_length]
    seq = torch.from_numpy(seq).float().to(device)
    pred = []
    for i in range(10):
        output = model(seq.unsqueeze(0))
        pred.append(output.item())
        seq = seq[1:]
        seq = torch.cat([seq, output.unsqueeze(0)], dim=0)
    pred = np.array(pred)
    pred = np.digitize(pred, [-1.0, -0.9, -0.8, ..., 0.8, 0.9, 1.0]) - 1

# 결과 출력
result = []
for i in range(13):
    if i == 0:
        result.append(f"[-10%, -9%]")
    elif i == 12:
        result.append(f"[+9%, +10%]")
    else:
        result.append(f"[{i-9}%, {i-8}%]")
        
print("Predictions:")
for p in pred:
    print(result[p])
