In [None]:
import copy
import os
import time
import random

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.autograd as autograd

from tqdm import tqdm
import cv2
import glob
import numpy as np
from matplotlib import pyplot as plt

In [None]:
WINDOW = 12
W_STRIDE = 1

EPOCHS = 16
BATCH_SIZE = 16
LR = 0.001

if torch.cuda.is_available():
    TARGET = 'cuda'
else:
    TARGET = 'cpu'

In [None]:
file = open('AAPL.csv', 'r')
lines = file.read()
lines = lines.split('\n')
del lines[0]

def min_max_normilize(data):
    data_opt = np.zeros(shape=data.shape, dtype=np.float32)

    data_min = min(data)
    data_max = max(data)
    
    for i in range(len(data)):
        data_opt[i] = float((data[i] - data_min) / (data_max - data_min))

    return data_opt

data = []

for i in range(len(lines) - 1):
    tmp = lines[i].split(',')
    data.append(float(tmp[5]))

data_original = np.array(data, dtype=np.float32)
data_original = min_max_normilize(data_original)

In [None]:
def generate_data(f_data, window_size, stride):
    data = []
    pos = 0
    
    while (pos + window_size) < f_data.shape[0]:
        tmp = []
        for i in range(window_size):
            tmp.append(f_data[pos + i])
        
        data.append(tmp)
        pos += stride
    
    return data

def split_data(f_data, answer):
    data = []
    
    for d in f_data:
        tmp = np.array(d[:len(d)-answer])
        tmp = np.expand_dims(tmp, 1)
        x_train = torch.tensor(tmp, dtype=torch.float32)
        y_train = torch.tensor(d[len(d)-answer:], dtype=torch.float32)
        
        data.append([x_train, y_train])
    
    return data


data = generate_data(data_original, WINDOW + 1, W_STRIDE)
data = split_data(data, 1)

In [None]:
class RNN_Net(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.rnn1 = nn.RNN(input_size=1, hidden_size=32, batch_first=True, nonlinearity='relu')
        self.rnn2 = nn.RNN(input_size=32, hidden_size=32, batch_first=True, nonlinearity='relu')
        self.rnn3 = nn.RNN(input_size=32, hidden_size=32, batch_first=True, nonlinearity='relu')
        self.lr1 = nn.Linear(32, 1)
        
    def forward(self, x, rnn1_h, rnn2_h, rnn3_h):
        x, h = self.rnn1(x, rnn1_h)
        x = F.leaky_relu(x)
        
        x, h = self.rnn2(x, rnn2_h)
        x = F.leaky_relu(x)

        x = x[:, 11, ...] # return only vector
        
        x = self.repeat_tensor(x, 1)
        
        x, h = self.rnn3(x, rnn3_h)
        x = F.relu(x)

        x = self.lr1(x)

        return x
    
    def repeat_tensor(self, tensor, num):
        return tensor.repeat(1, num).view(tensor.shape[0], num, tensor.shape[1])

print(RNN_Net())

In [None]:
class RNN_Dataset(torch.utils.data.Dataset):
    def __init__(self, data):
        self.__data = data
        
    def __getitem__(self, index):
        return (self.__data[index][0], self.__data[index][1])
    
    def __len__(self):
        return len(self.__data)

data_set = RNN_Dataset(data)
data_loader = torch.utils.data.DataLoader(data_set, batch_size=BATCH_SIZE, num_workers=4, shuffle=True)

In [None]:
net = RNN_Net().to(TARGET)
optimizer = optim.Adam(net.parameters(), lr=0.0001)
loss_function = nn.SmoothL1Loss()

best_net = RNN_Net().to(TARGET)
best_loss = 10e10

for e in range(EPOCHS):
    time.sleep(1)
    
    for data_l in tqdm(data_loader, desc="Epoch %d" % (e + 1)):
        x, y = data_l
        
        x = x.to(device)
        y = y.to(device)
    
        net.zero_grad()
        
        rnn1_h = torch.randn(size=(1, x.shape[0], 32)).to(device)
        rnn2_h = torch.randn(size=(1, x.shape[0], 32)).to(device)
        rnn3_h = torch.randn(size=(1, x.shape[0], 32)).to(device)
    
        output = net(x, rnn1_h, rnn2_h, rnn3_h)
        
        loss = loss_function(output, y)
        loss.backward()
        optimizer.step()
        
    print('Loss', loss.item())
    
    if loss < best_loss:
        best_loss = loss
        best_net = copy.copy(net)
        
time.sleep(0.1)
print('Best loss', best_loss.item())

In [None]:
i = random.randint(0, 1000)

test = torch.tensor(data[i][0], dtype=torch.float32)
test = test.unsqueeze(0)
test = test.to(device)

print(test.shape)
rnn1_h = torch.zeros(size=(1, 1, 32)).to(device)
rnn2_h = torch.zeros(size=(1, 1, 32)).to(device)
rnn3_h = torch.zeros(size=(1, 1, 32)).to(device)

print(best_net(test, rnn1_h, rnn2_h, rnn3_h))
print(data[i][1])