# Library

In [1]:
import torch
import numpy as np
from matplotlib import pyplot as plt

# Set torch constants

In [2]:
# CUDA Check
if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")
    
# dtype setting
dtype = torch.double

# Load the stock data

In [4]:
stock_data = np.loadtxt("stock.csv", delimiter=",")
stock_data = stock_data[::-1] # Reverse the data

print("Show part of the data")
print(stock_data[:3,:])

Show part of the data
[[1.30528003e+03 1.31800000e+03 1.30436499e+03 1.25140000e+06
  1.31137000e+03]
 [1.29428003e+03 1.32373999e+03 1.29424500e+03 2.03000000e+06
  1.30885999e+03]
 [1.28945996e+03 1.29372998e+03 1.28250000e+03 1.15270000e+06
  1.29180005e+03]]


# Set hyperparameters

In [5]:
learning_rate = 0.01
training_epochs = 10000

seq_len = 2 # 얼마나 많은 T를사용할거지

display_step = 1000 #

D_in = 5 # Dimension of x (In this case, 5 feature per one day) 

H = 10
D_out = 1
Layer = 1

In [8]:
def split_dataset(data, test_ratio):
    num_data = len(data)
    print(num_data)
    train_data = data[:int((1 - test_ratio) * num_data),:]
    test_data = data[int((1 - test_ratio) * num_data):,:]
    
    return train_data, test_data
train_data, test_data = split_dataset(stock_data, 0.2)
min_data = np.min(train_data, 0)
print(min_data)

250
[  1042.900024   1047.48999    1025.       710200.         1036.22998 ]


# Preprocess the stock data

In [None]:
def normalize(data):
    min_data = np.min(data, 0)
    max_data = np.max(data, 0)
    
    numerator = data - min_data
    denominator = max_data - min_data + 1e-8 # 1e-8 makes denominator not to be zero
    
    return numerator / denominator

def construct_dataset(data, seq_len): #close 순서를 뒤로 배게 하기 위해서ㅏ 하는 
    X_data = []
    y_data = []
    
    print("Construct the data...")
    for i in range(len(data) - seq_len):
        X_example = data[i:(i + seq_len), : ]
        y_example = data[(i + seq_len), [-1]]
        
        X_data.append(X_example)
        y_data.append(y_example)
    print("Finish construciton")
    
    return np.array(X_data), np.array(y_data)

def split_dataset(data, test_ratio):
    num_data = len(data)
    print(num_data)
    train_data = data[:int((1 - test_ratio) * num_data),:]
    test_data = data[int((1 - test_ratio) * num_data):,:]
    
    return train_data, test_data

In [None]:
train_data, test_data = split_dataset(stock_data, 0.2)

train_data = normalize(train_data)
test_data = normalize(test_data)

_X_train, _y_train = construct_dataset(train_data, seq_len)
_X_test, _y_test = construct_dataset(test_data, seq_len)

X_train = torch.tensor(_X_train).to(device, dtype)
y_train = torch.tensor(_y_train).to(device, dtype)
X_test = torch.tensor(_X_test).to(device, dtype)
y_test = torch.tensor(_y_test).to(device, dtype)

print("y_train: ", y_train.size())
print("y_test: ", y_test.size())

# Make RNN model from scratch

In [None]:
# Construct a cell for recurrent neural network (RNN)
class RNN(torch.nn.Module):
    def __init__(self, D_in, H, D_out):
        super(RNN, self).__init__()
        # To initialize hidden vectors
        self.hidden_size = H
        
        self.i2h = torch.nn.Linear(D_in, H, bias=False)
        self.h2h = torch.nn.Linear(H, H, bias=True)
        self.h2o = torch.nn.Linear(H, D_out, bias=True)
        
        self.activation_h = torch.nn.Tanh()
        self.activation_o = torch.nn.Tanh()
        
    def forward(self, i, h):
        wx = self.i2h(i)
        wh = self.h2h(h)
        
        h = wx + wh
        h = self.activation_h(h)
        
        o = self.h2o(h)
        o = self.activation_o(o)
        return o, h
    
    def initHidden(self):
        return torch.zeros(1, self.hidden_size).to(dtype=dtype, device=device)
    
model = RNN(D_in, H, D_out).to(dtype=dtype, device=device)
criterion = torch.nn.MSELoss(reduction="mean")
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

# Construct the model (Vanilla RNN)

In [None]:
# Construct a recurrent neural netwok (RNN)
class RNN(torch.nn.Module):
    def __init__(self, D_in, H, D_out, Layer):
        super(RNN, self).__init__()
        self.rnn = torch.nn.RNN(D_in, H, num_layers=Layer, batch_first=True)
        self.fc = torch.nn.Linear(H, D_out, bias=True)
        
    def forward(self, x):
        output, _status = self.rnn(x)
        output = self.fc(output[:,-1])
        return output
    
model = RNN(D_in, H, D_out, Layer).to(device, dtype)

criterion = torch.nn.MSELoss(reduction="mean")
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

# Train the model (Vanilla RNN)

In [None]:
losses = []

for epoch in range(training_epochs):
    y_train_pred = model(X_train)
    
    loss = criterion(y_train_pred, y_train)
    losses.append(loss.item())
    
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    if (epoch + 1) % display_step == 0:
        print("Epoch " + str(epoch+1) + "\t Loss: ", loss.item())

# Plot the loss function

In [None]:
plt.figure()
plt.plot(losses)
plt.title("Loss function")
plt.show()

# Show the prediction

In [None]:
with torch.no_grad():
    y_test_pred = model(X_test)
    plt.figure()
    plt.plot(_y_test)
    plt.plot(y_test_pred.cpu().detach().numpy())
    plt.legend(['original', 'prediction'])
    plt.title("Stock prediction")
    plt.show()

# Construct the model (LSTM)

In [None]:
# Construct a recurrent neural netwok (LSTM)
class LSTM(torch.nn.Module):
    def __init__(self, D_in, H, D_out, Layer):
        super(LSTM, self).__init__()
        self.lstm = torch.nn.LSTM(D_in, H, num_layers=Layer, batch_first=True)
        self.fc = torch.nn.Linear(H, D_out, bias=True)
        
    def forward(self, x):
        x, _status = self.lstm(x)
        x = self.fc(x[:,-1])
        return x
    
model = LSTM(D_in, H, D_out, Layer).to(device, dtype)

criterion = torch.nn.MSELoss(reduction="mean")
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

# Train the model (LSTM)

In [None]:
losses = []

for epoch in range(training_epochs):
    y_train_pred = model(X_train)
    
    loss = criterion(y_train_pred, y_train)
    losses.append(loss.item())
    
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    if (epoch + 1) % display_step == 0:
        print("Epoch " + str(epoch+1) + "\t Loss: ", loss.item())

# Plot the loss function

In [None]:
plt.figure()
plt.plot(losses)
plt.title("Loss function")
plt.show()

# Show the prediction

In [None]:
with torch.no_grad():
    y_test_pred = model(X_test)
    plt.figure()
    plt.plot(_y_test)
    plt.plot(y_test_pred.cpu().detach().numpy())
    plt.legend(['original', 'prediction'])
    plt.title("Stock prediction")
    plt.show()