In [34]:
import pandas as pd
from arch import arch_model
import matplotlib.pyplot as plt
import plotly.graph_objects as go
from plotly_resampler import FigureResampler, FigureWidgetResampler

In [35]:
import pandas as pd
import requests

def get_imbalance_price(start_date, end_date, price_area="DK1", sort_by="TimeUTC ASC", imbalance_type="afrr"):
    
    COLS_SEL = ['TimeUTC', 'BalancingDemand','SpotPriceEUR','DominatingDirection']
    
    if isinstance(start_date, pd.Timestamp):
        start_date = start_date.strftime("%Y-%m-%dT%H:%M")
    if isinstance(end_date, pd.Timestamp):
        end_date = end_date.strftime("%Y-%m-%dT%H:%M")
    
    base_url = "https://api.energidataservice.dk/dataset/ImbalancePrice"
    
    url = f"{base_url}?offset=0&start={start_date}&end={end_date}&filter={{\"PriceArea\":[\"{price_area}\"]}}&sort={sort_by}"
    response = requests.get(url)
    
    if response.status_code == 200:
        data = response.json()
        df = pd.DataFrame(data.get("records", []))
        
        if imbalance_type == "afrr":
            COLS_SEL.extend(['aFRRUpMW', 'aFRRVWAUpEUR', 'aFRRDownMW', 'aFRRVWADownEUR'])
            return df[COLS_SEL].set_index('TimeUTC')
        elif imbalance_type == "mfrr":
            COLS_SEL.append(['mFRRVWAUpEUR', 'mFRRVWADownEUR'])
            return df[[COLS_SEL]].set_index('TimeUTC')
            
    else:
        print(f"Error: {response.status_code}")
        print(response.text)
        return None
    
    
start = pd.Timestamp('20240101', tz='Europe/Copenhagen')
end = pd.Timestamp('20250501', tz='Europe/Copenhagen')
df = get_imbalance_price(start_date=start, end_date=end, price_area="DK1")

In [36]:
import pandas as pd
import numpy as np 
from arch import arch_model

# Your existing code
df['res_aFRRVWAUp'] = df['SpotPriceEUR'] - df['aFRRVWAUpEUR'].rolling(24).mean()
df['res_aFRRVWADown'] = df['SpotPriceEUR'] - df['aFRRVWADownEUR'].rolling(24).mean()

df = df.dropna(subset=['res_aFRRVWAUp', 'res_aFRRVWADown'])


In [None]:
def fit_garch_model(residuals, p=1, q=1, lags=1, disp = "off"):
    model = arch_model(residuals, mean='AR', lags=lags, vol='GARCH', p=p, q=q)
    result = model.fit(disp=disp)
    return result, result.resid

def forecast_garch_variance(arch_model, custom_residual=None, horizon=1):

    omega = arch_model.params['omega']
    alpha = arch_model.params['alpha[1]']
    beta = arch_model.params['beta[1]']
    
    if custom_residual is not None:
        last_resid = custom_residual
    else:
        last_resid = arch_model.resid[-1]
    
    last_var = arch_model.conditional_volatility[-1]**2
    variance_forecast = np.zeros(horizon)
    for h in range(horizon):
        if h == 0:
            variance_forecast[h] = omega + alpha * last_resid**2 + beta * last_var
        else:
            variance_forecast[h] = omega + (alpha + beta) * variance_forecast[h-1]
    
    volatility_forecast = np.sqrt(variance_forecast)
    
    return volatility_forecast



def generate_copula_forecasts(df, residual_cols, quantiles=[0.1, 0.5, 0.9]):
    
    result_df = df.copy()
    residuals_data = df[residual_cols].dropna()
    
    copula = GaussianMultivariate()
    copula.fit(residuals_data)
    
    # Number of simulations per row
    n_sims = 1000
    
    # For each row in the dataset
    for i, _ in df.iterrows():
        # Generate samples for this row
        samples = copula.sample(n_sims)
        
        # Calculate quantiles for each residual column
        for col in residual_cols:
            for q in quantiles:
                q_str = f"{int(q*100)}"
                new_col_name = f"{col}_q{q_str}"
                
                # Calculate the quantile from the samples
                q_value = samples[col].quantile(q)
                
                # Assign the quantile value to this specific row
                result_df.loc[i, new_col_name] = q_value
    
    return result_df


def forecast_garch_column(df, column_name, arch_model):
    result_df = df.copy()
    
    for idx in result_df.index:
        custom_residual = result_df.loc[idx, column_name]
        
        volatility_forecast = forecast_garch_variance(
            arch_model,
            custom_residual=custom_residual,
            horizon=1
        )
        
        result_df.loc[idx, f"{column_name}_vol"] = volatility_forecast[0]
    
    return result_df

In [None]:
model_garch_res_aFRRVWAUp, df['res_aFRRVWAUp_cop'] = fit_garch_model(df['res_aFRRVWAUp'], p=1, q=1, lags=1)
model_garch_res_aFRRVWADown, df['res_aFRRVWADown_cop'] = fit_garch_model(df['res_aFRRVWADown'], p=1, q=1, lags=1)

xdf = df[['res_aFRRVWAUp', 'res_aFRRVWAUp_cop_q10', 'res_aFRRVWAUp_cop_q90']]
forecasted_df = forecast_garch_column(xdf, 'res_aFRRVWAUp_cop_q10', model_garch_res_aFRRVWAUp)


In [43]:
import warnings

warnings.filterwarnings('ignore')

from sklearn.datasets import load_diabetes
from sklearn.model_selection import train_test_split

X, y = load_diabetes(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y)