# Libraries

In [2]:
# standard
import pandas as pd
import numpy as np
from tqdm import tqdm
import math
from math import sqrt

# reading data
import os
import json
from collections import defaultdict

# machine learning
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, mean_absolute_error
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.fft import rfft, irfft, fftn, ifftn

# visuals
import matplotlib.pyplot as plt
import seaborn as sns

# eFormer
from eFormer.embeddings import Encoding, ProbEncoding, PositionalEncoding
from eFormer.sparse_attention import ProbSparseAttentionModule, DetSparseAttentionModule
from eFormer.loss_function import crps

%store -r Kelmarsh_df Penmanshiel_df

# Architektur

## Hyperparameters

In [3]:
# set global parameters

n_heads_global = 4
probabilistic_model = True
len_embedding_vector = 64

## Embedding

probabilistic embedding & positional encoding

In [4]:
test_df = Kelmarsh_df['1'][['# Date and time', 'Energy Export (kWh)']][-1024:]

# First, ensure that the column is in datetime format
test_df['# Date and time'] = pd.to_datetime(test_df['# Date and time'])

# Then convert it to timestamps
test_df['Timestamp'] = test_df['# Date and time'].apply(lambda x: x.timestamp())

# interpolate NaN values
test_df = test_df.interpolate(method='linear')

features_matrix = test_df[['Energy Export (kWh)', 'Timestamp']].values

In [5]:
# Forward pass through the model
feature_tensor = torch.tensor(features_matrix, dtype=torch.float32).unsqueeze(0)
# check for NaN values early
if torch.isnan(feature_tensor).any():
    raise ValueError('NaN values detected in Input')

# decide which model to use
if probabilistic_model == True:
    encoding_model = ProbEncoding(in_features=feature_tensor.shape[-1], out_features=len_embedding_vector)
else:
    encoding_model = Encoding(in_features=feature_tensor.shape[-1], out_features=len_embedding_vector)

# create embeddings
embeddings = encoding_model(feature_tensor)

# Check for NaN values after computation
if torch.isnan(embeddings).any():
    raise ValueError('NaN values detected in Embeddings')
else:
    print(f"Embedding shape: {embeddings.shape}")

Embedding shape: torch.Size([2, 1, 1024, 64])
Stored 'embeddings' (Tensor)


## Attention Mechanism

In [9]:
# determine which model to use
if probabilistic_model == True:
    model = ProbSparseAttentionModule(
        d_model=embeddings.shape[-1],
        n_heads=n_heads_global,
        prob_sparse_factor=5
        )
else:
    model = DetSparseAttentionModule(
        d_model=embeddings.shape[-1],
        n_heads=n_heads_global,
        prob_sparse_factor=5
        )

output = model(embeddings)

# check for NaN values early
if torch.isnan(output).any():
    raise ValueError('NaN values detected in ProbSparse Output')
else:
    print(f"Sparse Attention shape: {output.shape}")

Sparse Attention shape: torch.Size([2, 1, 1024, 64])


In [None]:
class ForecastDecoder(nn.Module):
    def __init__(self, d_model, n_heads, max_len=500, forecast_horizon=12):
        super(ForecastDecoder, self).__init__()
        self.d_model = d_model
        self.n_heads = n_heads
        self.forecast_horizon = forecast_horizon
        self.pos_encoder = PositionalEncoding(d_model, max_len)
        self.attention = SparseAttention(d_model, n_heads)
        self.output_layer = nn.Linear(d_model, 1)  # Assuming a univariate forecast
    
    def forward(self, x):
        # x: [batch_size, sequence_length, d_model]
        # Generate positional encodings for the forecast horizon
        pos_encodings = self.pos_encoder(torch.zeros(self.forecast_horizon, self.d_model)).unsqueeze(0)
        
        # Concatenate x with positional encodings
        x_with_pos = torch.cat([x, pos_encodings], dim=1)
        
        # Apply sparse attention
        attn_output, _ = self.attention(x_with_pos, x_with_pos, x_with_pos, None)  # Assuming self-attention
        
        # Take the last 'forecast_horizon' outputs for forecasting
        forecasts = attn_output[:, -self.forecast_horizon:]
        
        # Pass through the output layer
        forecasts = self.output_layer(forecasts)
        
        return forecasts

In [None]:
class DetSparseDecoder(nn.Module):
    def __init__(self, d_model, n_heads, encoder_output_dim, forecast_horizon=12, max_len=500):
        super(DetSparseDecoder, self).__init__()
        self.d_model = d_model
        self.n_heads = n_heads
        self.forecast_horizon = forecast_horizon
        # Initialize PositionalEncoding with a max_len that covers your forecast_horizon
        self.pos_encoder = PositionalEncoding(d_model, max_len)
        # Assuming SparseAttention is already defined and can handle the operations
        self.encoder_decoder_attention = SparseAttention(d_model, n_heads)  # Attention over encoder outputs
        self.output_layer = nn.Linear(d_model, 1)  # Assuming a univariate forecast
    
    def forward(self, encoder_output):
        # encoder_output: Output from the encoder phase

        # Generate positional encodings for the forecast horizon
        # Create a dummy input tensor for positional encoding generation
        dummy_input = torch.zeros(self.forecast_horizon, self.d_model).unsqueeze(0)
        pos_encodings = self.pos_encoder(dummy_input)
        
        # Since this model is not autoregressive, we directly use positional encodings as input
        # Apply encoder-decoder attention using positional encodings as queries
        # and encoder outputs as keys and values
        attn_output, _ = self.encoder_decoder_attention(pos_encodings, encoder_output, encoder_output, None)
        
        # Generate forecasts based on the attention output
        # Assuming the last dimension of attn_output corresponds to the forecast horizon
        forecasts = self.output_layer(attn_output).squeeze(-1)  # Adjust dimensions as necessary
        
        return forecasts

In [None]:
# Example usage
d_model = 512  # Embedding dimension
n_heads = 8  # Number of attention heads
decoder = DetSparseDecoder(d_model, n_heads)

# Transformer Model

# Test Area