Aufgaben zu Vertiefung AI-Engineer Modul 5 - Time-Series-Data

Aufgabe 1: Time-Series-Data

Identifizieren Sie im UCI Repository (oder von anderen Stellen) einen Datensatz mit temporaler 
Dynamik. Implementieren Sie ein Neuronales Netz mit dem “naiven Ansatz”, mehrere Instanzen 
nachrutschend in die Input Schicht zu geben. Evaluieren Sie diesen naiven Ansatz gegen eine 
Implementierung mittels rekurrenter Layer.

https://archive.ics.uci.edu/datasets.php

Dow Jones Index
Donated on 10/22/2014

https://archive.ics.uci.edu/dataset/312/dow+jones+index

In [1]:
import pandas as pd
from sklearn.model_selection import train_test_split
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import cm
from sklearn import metrics


import torch
import torch.nn as nn
import torch.nn.functional as F

In [None]:
import pandas as pd
import numpy as npw
import torch
import torch.nn as nn
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split

# Read the CSV file
data = pd.read_csv("../data/data/dow_jones_index.csv")

# Preprocess the data
data["close"] = data["close"].apply(lambda x: x.replace("$", ""))
data["open"] = data["open"].apply(lambda x: x.replace("$", ""))
data["high"] = data["high"].apply(lambda x: x.replace("$", ""))
data["low"] = data["low"].apply(lambda x: x.replace("$", ""))
data["next_weeks_open"] = data["next_weeks_open"].apply(lambda x: x.replace("$", ""))
data["next_weeks_close"] = data["next_weeks_close"].apply(lambda x: x.replace("$", ""))

data["close"] = data["close"].astype(float)
data["open"] = data["open"].astype(float)
data["high"] = data["high"].astype(float)
data["low"] = data["low"].astype(float)
data["volume"] = data["volume"].astype(float)
data["percent_change_price"] = data["percent_change_price"].astype(float)
data["next_weeks_open"] = data["next_weeks_open"].astype(float)
data["next_weeks_close"] = data["next_weeks_close"].astype(float)
data["percent_change_next_weeks_price"] = data["percent_change_next_weeks_price"].astype(float)
data["days_to_next_dividend"] = data["days_to_next_dividend"].astype(float)
data["percent_return_next_dividend"] = data["percent_return_next_dividend"].astype(float)

# Load the x_train and y_train data
x_train = data[['open', 'high', 'low', 'volume', 'percent_change_price', 'next_weeks_open', 'next_weeks_close', 'percent_change_next_weeks_price', 'days_to_next_dividend', 'percent_return_next_dividend']].to_numpy()
y_train = data["close"].to_numpy()

# Split the data into training and testing sets
x_train, x_test, y_train, y_test = train_test_split(x_train, y_train, test_size=0.33, random_state=45)

# Initialize the scaler
scaler = MinMaxScaler()

# Fit the scaler on the training data and transform both training and testing data
x_train_scaled = scaler.fit_transform(x_train)
x_test_scaled = scaler.transform(x_test)

# Convert the data to PyTorch tensors
x_train_tensor = torch.tensor(x_train_scaled, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
x_test_tensor = torch.tensor(x_test_scaled, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

# Define the LSTM model class
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_dim, n_layers, output_size):
        super(LSTMModel, self).__init__()

        self.input_size = input_size  # Define input_size as an attribute
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers

        self.lstm = nn.LSTM(input_size, hidden_dim, n_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_size)

    def forward(self, x):
        batch_size = x.size(0)

        h0 = torch.zeros(self.n_layers, batch_size, self.hidden_dim).to(x.device)
        c0 = torch.zeros(self.n_layers, batch_size, self.hidden_dim).to(x.device)

        # Ensure input has the shape [batch_size, sequence_length, input_size]
        x = x.view(batch_size, -1, self.input_size)

        out, (hidden, cell) = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])  # Select the last time step's output and apply the linear layer
        return out

# Set random seeds for reproducibility
seed = 42
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

# Define hyperparameters
input_size = 10
output_size = 1
hidden_dim = 32
n_layers = 2
sequence_length = 1  # Keep this as 1 for your input data
batch_size = 8

# Create a DataLoader for batch training
train_data = torch.utils.data.TensorDataset(x_train_tensor, y_train_tensor)
train_loader = torch.utils.data.DataLoader(dataset=train_data, batch_size=batch_size, shuffle=True)

# Create an instance of the LSTM model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = LSTMModel(input_size, hidden_dim, n_layers, output_size).to(device)

# Define loss function and optimizer
loss_function = nn.MSELoss()
learning_rate = 0.005
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# Training loop
num_epochs = 2000

for epoch in range(num_epochs):
    model.train()
    for batch_x, batch_y in train_loader:
        optimizer.zero_grad()

        # Forward pass
        outputs = model(batch_x)

        # Calculate loss
        loss = loss_function(outputs, batch_y.view(-1, 1))  # Ensure batch_y has the right shape

        # Backpropagation
        loss.backward()
        optimizer.step()

    if (epoch + 1) % 100 == 0:
        # Evaluate on testing set
        model.eval()
        with torch.no_grad():
            val_outputs = model(x_test_tensor)
            val_loss = loss_function(val_outputs, y_test_tensor.view(-1, 1))  # Ensure y_test_tensor has the right shape
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}, Val Loss: {val_loss.item():.4f}')

# Code for making predictions
input_data = np.array([15.82, 16.72, 15.78, 239655616.0, 3.79267, 16.71, 15.97, -4.428490, 26.0, 0.182704])
input_data_scaled = scaler.transform(input_data.reshape(1, -1))  # Scale the input data
input_tensor = torch.tensor(input_data_scaled, dtype=torch.float32).to(device)

# Ensure the input data has the same sequence length as the model's input
# If sequence_length != 1, you need to repeat the input data to match the sequence length
if sequence_length != 1:
    input_tensor = input_tensor.repeat(1, sequence_length, 1)

model.eval()
with torch.no_grad():
    prediction_tensor = model(input_tensor)
predicted_value = prediction_tensor[0][0].item()

print("Predicted Value:", predicted_value)


: 