In [23]:
import tqdm
import torch
import numpy as np
import pandas as pd
from torch import nn
from torch.nn import RNN
import torch.optim as optim
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, normalize, MinMaxScaler

In [24]:
data = pd.read_csv("Datos históricos IBE 2010-2022.csv").iloc[:,:2]
data.set_index("Fecha", inplace=True) #inplace=True modifica el DataFrame original en lugar de devolver uno nuevo
data

Unnamed: 0_level_0,Último
Fecha,Unnamed: 1_level_1
30/12/2022,10.751
29/12/2022,10.834
28/12/2022,10.746
27/12/2022,10.900
23/12/2022,10.880
...,...
08/01/2010,6.700
07/01/2010,6.690
06/01/2010,6.710
05/01/2010,6.740


In [25]:
n_steps=20
for i in range(1, n_steps+1):
    data[f'Último(t-{i})'] = data['Último'].shift(i)
data.dropna(inplace=True)

In [26]:
data

Unnamed: 0_level_0,Último,Último(t-1),Último(t-2),Último(t-3),Último(t-4),Último(t-5),Último(t-6),Último(t-7),Último(t-8),Último(t-9),...,Último(t-11),Último(t-12),Último(t-13),Último(t-14),Último(t-15),Último(t-16),Último(t-17),Último(t-18),Último(t-19),Último(t-20)
Fecha,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
01/12/2022,10.915,10.850,10.895,10.905,10.920,10.910,10.995,10.995,10.995,11.055,...,10.715,10.780,10.810,10.930,10.930,10.88,10.90,10.746,10.834,10.751
30/11/2022,10.780,10.915,10.850,10.895,10.905,10.920,10.910,10.995,10.995,10.995,...,10.900,10.715,10.780,10.810,10.930,10.93,10.88,10.900,10.746,10.834
29/11/2022,10.655,10.780,10.915,10.850,10.895,10.905,10.920,10.910,10.995,10.995,...,11.055,10.900,10.715,10.780,10.810,10.93,10.93,10.880,10.900,10.746
28/11/2022,10.755,10.655,10.780,10.915,10.850,10.895,10.905,10.920,10.910,10.995,...,10.995,11.055,10.900,10.715,10.780,10.81,10.93,10.930,10.880,10.900
25/11/2022,10.820,10.755,10.655,10.780,10.915,10.850,10.895,10.905,10.920,10.910,...,10.995,10.995,11.055,10.900,10.715,10.78,10.81,10.930,10.930,10.880
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
08/01/2010,6.700,6.680,6.690,6.670,6.680,6.580,6.610,6.670,6.520,6.420,...,6.290,6.340,6.260,6.170,6.170,6.20,6.26,6.180,5.890,5.800
07/01/2010,6.690,6.700,6.680,6.690,6.670,6.680,6.580,6.610,6.670,6.520,...,6.350,6.290,6.340,6.260,6.170,6.17,6.20,6.260,6.180,5.890
06/01/2010,6.710,6.690,6.700,6.680,6.690,6.670,6.680,6.580,6.610,6.670,...,6.420,6.350,6.290,6.340,6.260,6.17,6.17,6.200,6.260,6.180
05/01/2010,6.740,6.710,6.690,6.700,6.680,6.690,6.670,6.680,6.580,6.610,...,6.520,6.420,6.350,6.290,6.340,6.26,6.17,6.170,6.200,6.260


In [27]:
data=data/10 -1 #Por qué escalar de esta forma?

In [28]:
y=np.array(data.iloc[:,-1])
y

array([ 0.0751,  0.0834,  0.0746, ..., -0.382 , -0.374 , -0.38  ])

In [29]:
x=np.array(data.iloc[:,:-1])
x

array([[ 0.0915,  0.085 ,  0.0895, ...,  0.09  ,  0.0746,  0.0834],
       [ 0.078 ,  0.0915,  0.085 , ...,  0.088 ,  0.09  ,  0.0746],
       [ 0.0655,  0.078 ,  0.0915, ...,  0.093 ,  0.088 ,  0.09  ],
       ...,
       [-0.329 , -0.331 , -0.33  , ..., -0.383 , -0.38  , -0.374 ],
       [-0.326 , -0.329 , -0.331 , ..., -0.383 , -0.383 , -0.38  ],
       [-0.329 , -0.326 , -0.329 , ..., -0.374 , -0.383 , -0.383 ]])

In [30]:
x.shape, y.shape

((3302, 20), (3302,))

In [31]:
x_train,x_test,y_train,y_test=train_test_split(x,y,test_size=0.33,random_state=0)

In [None]:
#Tensores
x_train = torch.unsqueeze(torch.from_numpy(x_train).to(torch.float32),dim=-1)
x_test = torch.unsqueeze(torch.from_numpy(x_test).to(torch.float32),dim=-1)
y_train = torch.unsqueeze(torch.from_numpy(y_train).to(torch.float32),dim=-1)
y_test = torch.unsqueeze(torch.from_numpy(y_test).to(torch.float32),dim=-1)

In [None]:
x_train.shape, x_test.shape, y_train.shape, y_test.shape

In [None]:
from torch.utils.data import Dataset
class TimeSeriesDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y
    def __len__(self):
        return len(self.X)
    def __getitem__(self, i):
        return self.X[i], self.y[i]

In [None]:
train_dataset = TimeSeriesDataset(x_train, y_train)
test_dataset = TimeSeriesDataset(x_test, y_test)

In [None]:
from torch.utils.data import DataLoader
batch_size = 16
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
class Elman(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.hidden_size = hidden_size
        self.hidden_layer = nn.RNN(input_size, self.hidden_size, num_layers=2, batch_first=True)
        self.output_layer = nn.Linear(self.hidden_size, 1)
        self.tanh=nn.Tanh()
    def forward(self, x):
        batch_size = x.size(0)
        h0 = torch.zeros(2, batch_size, self.hidden_size)
        out, h = self.hidden_layer(x, h0)
        output = self.tanh(self.output_layer(out[:, -1, :]))
        return output

In [None]:
modelo=Elman(input_size=1,hidden_size=5,output_size=1)

In [None]:
def train_one_epoch():
    modelo.train(True)
    print(f'Epoch: {epoch + 1}')
    running_loss = 0.0
    for batch_index, batch in enumerate(train_loader):
        x_batch, y_batch = batch[0], batch[1]
        output = modelo(x_batch)
        loss = loss_function(output, y_batch)
        running_loss += loss.item()
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    return running_loss/len(train_loader)


In [None]:
def validate_one_epoch():
    modelo.train(False)
    running_loss =0.0
    
    for batch_index, batch in enumerate(test_loader):
        x_batch,y_batch = batch[0], batch[1]
        
        with torch.no_grad():
            output=modelo(x_batch)
            loss= loss_function(output,y_batch)
            running_loss += loss.item()
            
    return running_loss /len(test_loader)

In [None]:
train_loss= []
test_loss=[]
test_accuracy=[]
train_accuracy=[]

In [None]:
learning_rate = 0.001
num_epochs = 100
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(modelo.parameters(), lr=learning_rate)

In [None]:
for epoch in range (num_epochs):
    train_loss.append(train_one_epoch())
    with torch.no_grad():
        predicted_train=modelo(x_train).numpy()
        train_accuracy.append(np.mean((torch.abs((y_train - predicted_train)/y_train).numpy()<0.05)))
    print("Train Loss: {0:.5f}".format(train_loss[-1]),end="\t")

    with torch.no_grad():
        predicted_test=modelo(x_test).numpy()
        test_accuracy.append(np.mean((torch.abs((y_test - predicted_test)/y_test).numpy()<0.05)))
    
    test_loss.append(validate_one_epoch())
    print("Val Loss: {0:.5f}".format(test_loss[-1]))

In [None]:
fig, ax = plt.subplots(2,2,figsize=(20,20))

with torch.no_grad():
    predicted_train=modelo(x_train).numpy()
with torch.no_grad():
    predicted_test=modelo(x_test).numpy()    
    
ax[0,0].plot(y_train, label='Actual Close')
ax[0,0].plot(predicted_train,label="Predicted Close")
ax[0,0].set_xlabel('Dia')
ax[0,0].set_ylabel('Cierre')
ax[0,0].legend()
ax[0,0].set_title("TRAIN DATASET")

ax[0,1].plot(y_test, label='Actual Close')
ax[0,1].plot(predicted_test,label="Predicted Close")
ax[0,1].set_xlabel('Dia')
ax[0,1].set_ylabel('cierre')
ax[0,1].legend()
ax[0,1].set_title("TEST DATASET")

ax[1,0].plot(train_loss)
ax[1,0].set_xlabel('Epoch')
ax[1,0].set_ylabel('Loss Function')
ax[1,0].grid()
ax[1,0].set_title("TRAIN DATASET")

ax[1,1].plot(test_loss)
ax[1,1].set_xlabel('Epoch')
ax[1,1].set_ylabel('Loss Function')
ax[1,1].grid()
ax[1,1].set_title("TEST DATASET")

plt.show()

In [None]:
np.mean((torch.abs((y_test - predicted_test)/y_test).numpy()<0.05))

In [None]:
fig, ax = plt.subplots(2,1,figsize=(20,10))

ax[0].plot(train_loss, label='train loss')
ax[0].plot(test_loss,label="test loss")
ax[0].set_xlabel('Epoch')
ax[0].set_ylabel('loss')
ax[0].legend()
ax[0].set_title("Loss FUNCTION")

ax[1].plot(train_accuracy, label='train accuracy')
ax[1].plot(test_accuracy,label="test accuracy")
ax[1].set_xlabel('Epoch')
ax[1].set_ylabel('loss')
ax[1].legend()
ax[1].set_title("accuracy evolution")

## GRU

In [None]:
class GRU(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.hidden_size = hidden_size
        self.hidden_layer = nn.GRU(input_size, self.hidden_size, num_layers=1, batch_first=True)
        self.output_layer = nn.Linear(self.hidden_size, 1)
        self.tanh=nn.Tanh()
    def forward(self, x):
        batch_size = x.size(0)
        h0 = torch.zeros(1, batch_size, self.hidden_size)
        out, h = self.hidden_layer(x, h0)
        output = self.tanh(self.output_layer(out[:, -1, :]))
        
        return output

In [None]:
modelo=GRU(input_size=1,hidden_size=10,output_size=1)

In [None]:
train_loss= []
test_loss=[]
test_accuracy=[]
train_accuracy=[]

In [None]:
learning_rate = 0.001
num_epochs = 100
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(modelo.parameters(), lr=learning_rate)

In [None]:
for epoch in range (num_epochs):
    train_loss.append(train_one_epoch())
    with torch.no_grad():
        predicted_train=modelo(x_train).numpy()
        train_accuracy.append(np.mean((torch.abs((y_train - predicted_train)/y_train).numpy()<0.05)))
    print("Train Loss: {0:.5f}".format(train_loss[-1]),end="\t")

    with torch.no_grad():
        predicted_test=modelo(x_test).numpy()
        test_accuracy.append(np.mean((torch.abs((y_test - predicted_test)/y_test).numpy()<0.05)))
    
    test_loss.append(validate_one_epoch())
    print("Val Loss: {0:.5f}".format(test_loss[-1]))


In [None]:
fig, ax = plt.subplots(2,2,figsize=(20,20))

    
ax[0,0].plot(y_train, label='Actual Close')
ax[0,0].plot(predicted_train,label="Predicted Close")
ax[0,0].set_xlabel('Dia')
ax[0,0].set_ylabel('Cierre')
ax[0,0].legend()
ax[0,0].set_title("TRAIN DATASET")

ax[0,1].plot(y_test, label='Actual Close')
ax[0,1].plot(predicted_test,label="Predicted Close")
ax[0,1].set_xlabel('Dia')
ax[0,1].set_ylabel('cierre')
ax[0,1].legend()
ax[0,1].set_title("TEST DATASET")

ax[1,0].plot(train_loss)
ax[1,0].set_xlabel('Epoch')
ax[1,0].set_ylabel('Loss Function')
ax[1,0].grid()
ax[1,0].set_title("TRAIN DATASET")

ax[1,1].plot(test_loss)
ax[1,1].set_xlabel('Epoch')
ax[1,1].set_ylabel('Loss Function')
ax[1,1].grid()
ax[1,1].set_title("TEST DATASET")


plt.show()

In [None]:
np.mean((torch.abs((y_test - predicted_test)/y_test).numpy()<0.05))

In [None]:
fig, ax = plt.subplots(2,1,figsize=(20,10))

ax[0].plot(train_loss, label='train loss')
ax[0].plot(test_loss,label="test loss")
ax[0].set_xlabel('Epoch')
ax[0].set_ylabel('loss')
ax[0].legend()
ax[0].set_title("Loss FUNCTION")

ax[1].plot(train_accuracy, label='train accuracy')
ax[1].plot(test_accuracy,label="test accuracy")
ax[1].set_xlabel('Epoch')
ax[1].set_ylabel('loss')
ax[1].legend()
ax[1].set_title("accuracy evolution")

## LSTM

In [None]:
class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.hidden_size = hidden_size
        self.input_size=input_size
        self.output_size=output_size
        self.hidden_layer = nn.LSTM(input_size, self.hidden_size, num_layers=1, batch_first=True)
        self.output_layer = nn.Linear(self.hidden_size, self.output_size)
        self.tanh=nn.Tanh()
        
        
    def forward(self, x):
        batch_size = x.size(0)
        h0 = torch.zeros(1, batch_size, self.hidden_size)
        c0 = torch.zeros(1, batch_size, self.hidden_size)
        out, (h, c) = self.hidden_layer(x, (h0,c0))
        output = self.tanh(self.output_layer(out[:, -1, :]))
        return output

In [None]:
modelo=LSTM(input_size=1,hidden_size=10,output_size=1)

In [None]:
train_loss= []
test_loss=[]
test_accuracy=[]
train_accuracy=[]

In [None]:
learning_rate = 0.001
num_epochs = 100
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(modelo.parameters(), lr=learning_rate)

In [None]:
for epoch in range (num_epochs):
    train_loss.append(train_one_epoch())
    with torch.no_grad():
        predicted_train=modelo(x_train).numpy()
        train_accuracy.append(np.mean((torch.abs((y_train - predicted_train)/y_train).numpy()<0.05)))
    print("Train Loss: {0:.5f}".format(train_loss[-1]),end="\t")

    with torch.no_grad():
        predicted_test=modelo(x_test).numpy()
        test_accuracy.append(np.mean((torch.abs((y_test - predicted_test)/y_test).numpy()<0.05)))
    
    test_loss.append(validate_one_epoch())
    print("Val Loss: {0:.5f}".format(test_loss[-1]))

In [None]:
fig, ax = plt.subplots(2,2,figsize=(20,20))

    
ax[0,0].plot(y_train, label='Actual Close')
ax[0,0].plot(predicted_train,label="Predicted Close")
ax[0,0].set_xlabel('Dia')
ax[0,0].set_ylabel('Cierre')
ax[0,0].legend()
ax[0,0].set_title("TRAIN DATASET")

ax[0,1].plot(y_test, label='Actual Close')
ax[0,1].plot(predicted_test,label="Predicted Close")
ax[0,1].set_xlabel('Dia')
ax[0,1].set_ylabel('cierre')
ax[0,1].legend()
ax[0,1].set_title("TEST DATASET")

ax[1,0].plot(train_loss)
ax[1,0].set_xlabel('Epoch')
ax[1,0].set_ylabel('Loss Function')
ax[1,0].grid()
ax[1,0].set_title("TRAIN DATASET")

ax[1,1].plot(test_loss)
ax[1,1].set_xlabel('Epoch')
ax[1,1].set_ylabel('Loss Function')
ax[1,1].grid()
ax[1,1].set_title("TEST DATASET")


plt.show()

In [None]:
np.mean((torch.abs((y_test - predicted_test)/y_test).numpy()<0.05))

In [None]:
fig, ax = plt.subplots(2,1,figsize=(20,10))

ax[0].plot(train_loss, label='train loss')
ax[0].plot(test_loss,label="test loss")
ax[0].set_xlabel('Epoch')
ax[0].set_ylabel('loss')
ax[0].legend()
ax[0].set_title("Loss FUNCTION")

ax[1].plot(train_accuracy, label='train accuracy')
ax[1].plot(test_accuracy,label="test accuracy")
ax[1].set_xlabel('Epoch')
ax[1].set_ylabel('loss')
ax[1].legend()
ax[1].set_title("accuracy evolution")