In [None]:
# %% [markdown]
# # How to use PyTorch LSTMs for time series regression

# %% [markdown]
# # Data

# %% [markdown]
# 1. Download the Air Passengers data.
# 2. Load the Air Passengers data into a DataFrame.

# %%
import pandas as pd

df = pd.read_csv("data-sets/BTC-USD-2018-2023.csv", index_col="Date", parse_dates=True)
# df['Twitter'] = pd.read_csv("data-sets/twitter_sentiment_scores.csv", index_col="Date", parse_dates=True)
df['FearGreedIndex'] = pd.read_csv("data-sets/fear_greed_index_prepared.csv",index_col="Date",  parse_dates=True)
df['GoogleTrends'] = pd.read_csv("data-sets/bitcoin_google_trends.csv",index_col="Date",  parse_dates=True)

# %% [markdown]
# ## Create the target variable

target_sensor = "Close"
features = list(df.columns.difference([target_sensor]))

# %%
forecast_lead = 1
target = f"{target_sensor}_lead{forecast_lead}"

df[target] = df[target_sensor].shift(-forecast_lead)
df = df.iloc[:-forecast_lead]

# %% [markdown]
# ## Create a hold-out test set and preprocess the data

# %%
test_start = "2022-06-01"

df_train = df.loc[:test_start].copy()
df_test = df.loc[test_start:].copy()

print("Test set fraction:", len(df_test) / len(df))

# %% [markdown]
# ## Standardize the features and target, based on the training set

# %%
target_mean = df_train[target].mean()
target_stdev = df_train[target].std()

for c in df_train.columns:
    mean = df_train[c].mean()
    stdev = df_train[c].std()

    df_train[c] = (df_train[c] - mean) / stdev
    df_test[c] = (df_test[c] - mean) / stdev


print(df_train)

In [None]:
# %%
import plotly.express as px
import plotly.graph_objects as go
import plotly.io as pio
pio.templates.default = "plotly_white"

plot_template = dict(
    layout=go.Layout({
        "font_size": 18,
        "xaxis_title_font_size": 24,
        "yaxis_title_font_size": 24})
)

fig = px.line(df_train, labels=dict(index="Date", value="Close"))
fig.update_layout(
  template=plot_template, legend=dict(orientation='h', y=1.02, title_text="")
)
fig.show()

In [None]:

# %% [markdown]
# ## Create datasets that PyTorch `DataLoader` can work with

# %%
import torch
from torch.utils.data import Dataset

class SequenceDataset(Dataset):
    def __init__(self, dataframe, target, features, sequence_length=5):
        self.features = features
        self.target = target
        self.sequence_length = sequence_length
        self.y = torch.tensor(dataframe[target].values).float()
        self.X = torch.tensor(dataframe[features].values).float()

    def __len__(self):
        return self.X.shape[0]

    def __getitem__(self, i): 
        if i >= self.sequence_length - 1:
            i_start = i - self.sequence_length + 1
            x = self.X[i_start:(i + 1), :]
        else:
            padding = self.X[0].repeat(self.sequence_length - i - 1, 1)
            x = self.X[0:(i + 1), :]
            x = torch.cat((padding, x), 0)

        return x, self.y[i]
    



In [None]:
# %% [markdown]
# ## Create the datasets and data loaders

# %%
from bayes_opt import BayesianOptimization, UtilityFunction
from matplotlib import pyplot as plt
from torch.utils.data import DataLoader
torch.manual_seed(101)

batch_size = 3
sequence_length = 3

train_dataset = SequenceDataset(
    df_train,
    target=target,
    features=features,
    sequence_length=sequence_length
)
test_dataset = SequenceDataset(
    df_test,
    target=target,
    features=features,
    sequence_length=sequence_length
)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

X, y = next(iter(train_loader))
print("Features shape:", X.shape)
print("Target shape:", y.shape)

# %% [markdown]
# # The model and learning algorithm


In [None]:
print(X)

In [None]:

i = 5
print(df_train[features].iloc[(i - sequence_length + 1): (i + 1)])



In [None]:


# %%
from torch import nn

class ShallowRegressionLSTM(nn.Module):
    def __init__(self, num_sensors, hidden_size, num_layers):
        super().__init__()
        self.num_sensors = num_sensors  # this is the number of features
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.lstm = nn.LSTM(
            input_size=num_sensors,
            hidden_size=hidden_size,
            batch_first=True,
            num_layers=self.num_layers
        )

        self.linear = nn.Linear(in_features=self.hidden_size, out_features=1)

    def forward(self, x):
        batch_size = x.shape[0]
        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).requires_grad_()
        c0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).requires_grad_()
        
        _, (hn, _) = self.lstm(x, (h0, c0))
        out = self.linear(hn[0]).flatten()

        return out

In [None]:
def train_model(data_loader, model, loss_function, optimizer):
    num_batches = len(data_loader)
    total_loss = 0
    model.train()
    
    for X, y in data_loader:
        output = model(X)
        loss = loss_function(output, y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    avg_loss = total_loss / num_batches
    print(f"Train loss: {avg_loss}")
    return avg_loss

def test_model(data_loader, model, loss_function):
    
    num_batches = len(data_loader)
    total_loss = 0

    model.eval()
    with torch.no_grad():
        for X, y in data_loader:
            output = model(X)
            total_loss += loss_function(output, y).item()

    avg_loss = total_loss / num_batches
    print(f"Test loss: {avg_loss}")
    return avg_loss

# Define the function to be optimized
def evaluate_model(learning_rate_log, num_hidden_size, num_layers, weight_decay, batch_size):
    learning_rate = 10 ** learning_rate_log
    num_hidden_size = int(num_hidden_size)
    num_layers = int(num_layers)
    weight_decay = 10 ** weight_decay
    batch_size = int(batch_size)

    # Reinitialize the model with new parameters
    model = ShallowRegressionLSTM(num_sensors=len(features),hidden_size=num_hidden_size, num_layers=num_layers)
    loss_function = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
    
    # Train and test the model
    train_model(train_loader, model, loss_function, optimizer)
    test_loss = test_model(test_loader, model, loss_function)
    
    # Return the negative test loss because BayesianOptimization maximize the function
    return -test_loss


# Define the hyperparameters range
hyperparameters_range = {
    'learning_rate_log': (-5, -2),  # we optimize in log scale
    'num_hidden_size': (1, 64),  # assuming 50 is a sensible upper limit
    'num_layers': (2, 4),  # range of layers
    'weight_decay': (-5, -2),  # weight decay in log scale
    'batch_size': (1, len(df_train))
}
# Initialize the optimizer
bayesian_optimizer = BayesianOptimization(
    f=evaluate_model,
    pbounds=hyperparameters_range,
    verbose=2,
    random_state=1
)

# Maximize the evaluation function
bayesian_optimizer.maximize(init_points=5, n_iter=5)

# Print the best parameters
print(bayesian_optimizer.max)

best_params = bayesian_optimizer.max['params']

# Re-calculate the learning rate from its logarithm
best_params['learning_rate_log'] = 10 ** best_params['learning_rate_log']

# Ensure hidden_size and num_layers are integers
best_params['num_hidden_size'] = int(round(best_params['num_hidden_size']))
best_params['num_layers'] = int(round(best_params['num_layers']))


In [None]:

# Train a new model with the best parameters
model = ShallowRegressionLSTM(num_sensors=len(features), hidden_size=best_params['num_hidden_size'], num_layers=best_params['num_layers'])
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), best_params['learning_rate_log'])

In [None]:
# Store losses per epoch
train_losses = []
test_losses = []

print("Untrained test\n--------")
initial_test_loss = test_model(test_loader, model, loss_function)
test_losses.append(initial_test_loss)
print()


for ix_epoch in range(10):
    print(f"Epoch {ix_epoch}\n---------")
    train_loss = train_model(train_loader, model, loss_function, optimizer=optimizer)
    train_losses.append(train_loss)
    
    test_loss = test_model(test_loader, model, loss_function)
    test_losses.append(test_loss)
    print()

print("Hidden SİZE XD", model.hidden_size)

# Plot loss per epoch
plt.figure(figsize=(10,5))
plt.plot(train_losses, label='Training loss')
plt.plot(test_losses, label='Testing loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

# %% [markdown]
# # Evaluation

# %%
def predict(data_loader, model):
    """Just like `test_loop` function but keep track of the outputs instead of the loss
    function.
    """
    output = torch.tensor([])
    model.eval()
    with torch.no_grad():
        for X, _ in data_loader:
            y_star = model(X)
            output = torch.cat((output, y_star), 0)
    
    return output

# %%
train_eval_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)

ystar_col = "Model forecast"
df_train[ystar_col] = predict(train_eval_loader, model).numpy()
df_test[ystar_col] = predict(test_loader, model).numpy()

df_out = pd.concat((df_train, df_test))[[target, ystar_col]]

print(df_out)

for c in df_out.columns:
    df_out[c] = df_out[c] * stdev + mean

print(df_out)

# %%
fig = px.line(df_out, labels={'Price': "BTC", 'Date': 'Date'})
fig.add_vline(x=test_start, line_width=4, line_dash="dash")
fig.add_annotation(xref="paper", x=0.75, yref="paper", y=0.8, text="Test set start", showarrow=False)
fig.update_layout(
  template=plot_template, legend=dict(orientation='h', y=1.02, title_text="")
)
fig.show()
# fig.write_image("air_passengers_forecast.png", width=1200, height=600)


In [None]:
from sklearn.metrics import mean_absolute_percentage_error

mape = mean_absolute_percentage_error(df_out['Model forecast'], df_out['Close_lead1'])
mape_test = mean_absolute_percentage_error(df_out['Close_lead1'].iloc[118:144], df_out['Model forecast'].iloc[118:144])
print("MAPE", mape)
print("Test MAPE", mape_test)
