In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam
import oandapyV20
import oandapyV20.endpoints.instruments as instruments

# Define parameters for the OANDA API
access_token = "e11aadc917842adf254cd73c038c4e0a-321ea21ac5697ab46036807f5e5e943d"
instrument = "NAS100_USD"

# Fetch data from OANDA API
api = oandapyV20.API(access_token=access_token)

params = {
    "count": 5000,
    "granularity": "M30"
}

r = instruments.InstrumentsCandles(instrument=instrument, params=params)
data = []

while True:
    response = api.request(r)
    data.extend(response["candles"])
    if "next" not in response:
        break
    r.params["from"] = response["next"]

# Extracting close prices from candles
time_series_data = [float(candle["mid"]["c"]) for candle in data]

# Normalizing the data
mean = np.mean(time_series_data)
std = np.std(time_series_data)
normalized_data = (time_series_data - mean) / std

# Function to create dataset for training
def create_dataset(data, window_size):
    x, y = [], []
    for i in range(len(data) - window_size):
        x.append(data[i:i + window_size])
        y.append(data[i + window_size])
    x = np.array(x).reshape(-1, window_size, 1)
    y = np.array(y).reshape(-1, 1)
    return x, y

# Define window size and create dataset
window_size = 20  # Number of previous time steps to use as input
x_data, y_data = create_dataset(normalized_data, window_size)

# Convert to PyTorch tensors
x_train = torch.tensor(x_data, dtype=torch.float32)
y_train = torch.tensor(y_data, dtype=torch.float32)

# Define the model
class MonteCarloDropoutModel(nn.Module):
    def __init__(self, loss_type="heteroscedastic"):
        super(MonteCarloDropoutModel, self).__init__()
        self.loss_type = loss_type

        self.fc1 = nn.Linear(window_size, 100)
        self.dropout1 = nn.Dropout(p=0.2)
        self.fc2 = nn.Linear(100, 100)
        self.dropout2 = nn.Dropout(p=0.5)

        self.mean_layer = nn.Linear(100, 1)
        if self.loss_type == "heteroscedastic":
            self.var_layer = nn.Linear(100, 1)

    def forward(self, x):
        x = x.view(x.size(0), -1)  # Flatten the input
        x = F.relu(self.fc1(x))
        x = self.dropout1(x)
        x = F.relu(self.fc2(x))
        x = self.dropout2(x)

        mean = self.mean_layer(x)
        if self.loss_type == "heteroscedastic":
            log_var = self.var_layer(x)
            return mean, log_var

        return mean

# Define the loss functions
def mse_loss(model, x, y_true):
    y_pred = model(x)
    mse = F.mse_loss(y_pred, y_true)
    return mse

def heteroscedastic_loss(model, x, y_true):
    mean, log_var = model(x)
    precision = torch.exp(-log_var)
    mse = torch.mean(0.5 * precision * (y_true - mean)**2 + log_var)
    return mse

# Training function
def fit_model(model, x_train, y_train, epochs, learning_rate, loss_type):
    optimizer = Adam(model.parameters(), lr=learning_rate)
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()

        if loss_type == "mse":
            loss = mse_loss(model, x_train, y_train)
        elif loss_type == "heteroscedastic":
            loss = heteroscedastic_loss(model, x_train, y_train)

        loss.backward()
        optimizer.step()

        if epoch % 100 == 0:
            print(f"Epoch {epoch}, Loss: {loss.item()}")

# Define the model and train it
model = MonteCarloDropoutModel(loss_type="heteroscedastic")
fit_model(model, x_train, y_train, epochs=1000, learning_rate=0.001, loss_type="heteroscedastic")

# Function to perform multi-step forecasting
def multi_step_forecast(model, initial_data, steps, window_size):
    model.eval()
    predictions = []
    current_data = initial_data[-window_size:]

    with torch.no_grad():
        for _ in range(steps):
            input_tensor = torch.tensor(current_data.reshape(1, -1, 1), dtype=torch.float32)
            mean, _ = model(input_tensor)
            mean = mean.item()

            predictions.append(mean)
            current_data = np.append(current_data[1:], mean)
    
    return predictions

# Perform a multi-step forecast
steps = 10  # Number of steps to forecast
initial_data = normalized_data[-window_size:]  # Last `window_size` points of the normalized data
predictions = multi_step_forecast(model, initial_data, steps, window_size)

# Print the predictions
print("Normalized Predictions:", predictions)

# Optionally, denormalize the predictions
denormalized_predictions = [pred * std + mean for pred in predictions]
print("Denormalized Predictions:", denormalized_predictions)


Epoch 0, Loss: 0.5275275707244873
Epoch 100, Loss: -2.393179416656494
Epoch 200, Loss: -3.018993616104126
Epoch 300, Loss: -3.18143892288208
Epoch 400, Loss: -3.2241427898406982
Epoch 500, Loss: -3.361675262451172
Epoch 600, Loss: -3.3818888664245605
Epoch 700, Loss: -3.47339129447937
Epoch 800, Loss: -3.4801342487335205
Epoch 900, Loss: -3.4428200721740723
Normalized Predictions: [1.4267138242721558, 1.3200623989105225, 1.2083367109298706, 1.1118072271347046, 0.9836230278015137, 0.9049385786056519, 0.8061133623123169, 0.7096899747848511, 0.615459680557251, 0.5171676874160767]
Denormalized Predictions: [18532.845875828658, 18470.422082336365, 18405.028288951613, 18348.528935643255, 18273.501865343747, 18227.447330291805, 18169.604269378106, 18113.16701492574, 18058.0133926686, 18000.482430836768]
