In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import statsmodels.api as sm

from fbm import FBM
from arch import arch_model
from scipy.linalg import cholesky
from scipy.optimize import minimize
from statsmodels.tsa.stattools import acf
from sklearn.metrics import mean_squared_error, mean_absolute_error


def load_and_clean_file(file_path):
    """
    Carica un file di dati finanziari in formato .txt o .csv, dove i valori
    sono separati da tabulatori.

    Args:
        file_path (str): Il percorso del file da caricare.

    Returns:
        pd.DataFrame: Un DataFrame pulito e pronto per l'analisi.
    """
    # Legge il file usando '\t' come delimitatore.
    # Combina le prime due colonne (indice 0 e 1) in un'unica colonna
    # chiamata 'datetime' e la imposta come indice.
    df = pd.read_csv(
        file_path,
        delimiter='\t',
        header=0,
        parse_dates={'datetime': [0]},
        index_col='datetime'
    )
    
    # Standardizza i nomi delle colonne in minuscolo
    df.columns = [col.lower() for col in df.columns]

    print(f"File caricato con successo: {file_path}")
    return df

file1 = "data/fake/AAPLUSUSD_M5.csv"
file2 = "data/fake/NFLXUSUSD_M5.csv"
file3 = "data/fake/TSLAUSUSD_M5.csv"

df_aapl = load_and_clean_file(file1)
df_nflx = load_and_clean_file(file2)
df_tsla = load_and_clean_file(file3)

df_aapl.to_csv('data/fake/df_aapl.csv', sep=',', index=True, header=True)
df_nflx.to_csv('data/fake/df_nflx.csv', sep=',', index=True, header=True)
df_tsla.to_csv('data/fake/df_tsla.csv', sep=',', index=True, header=True)

  df = pd.read_csv(
  df = pd.read_csv(
  df = pd.read_csv(


File caricato con successo: data/fake/AAPLUSUSD_M5.csv
File caricato con successo: data/fake/NFLXUSUSD_M5.csv
File caricato con successo: data/fake/TSLAUSUSD_M5.csv


In [34]:
def calculate_log_rv(df, price_col='Close', resample_freq='1D'):
    """
    Calcola la log realized volatility (log-RV) da dati ad alta frequenza.

    Args:
        df (pd.DataFrame): DataFrame con indice di tipo Datetime e colonna dei prezzi.
        price_col (str): Nome della colonna dei prezzi.
        resample_freq (str): Frequenza per il resampling (es. '1D', '4H', '1H').

    Returns:
        pd.Series: Serie della log realized volatility con timestamp.
    """

    log_returns = np.log(df[price_col]).diff().dropna()
    
    # 2. Eleva al quadrato i log-return
    squared_log_returns = log_returns**2
    
    # 3. Resample sommando i return al quadrato per ottenere la RV
    # Il timestamp viene mantenuto automaticamente come indice
    realized_variance = squared_log_returns.resample(resample_freq).sum()
    
    # 4. Calcola il logaritmo della RV, gestendo valori nulli o negativi
    log_realized_variance = np.log(realized_variance)
    
    # Rimuovi i valori infiniti o mancanti (es. weekend senza trading)
    log_realized_variance = log_realized_variance.replace([np.inf, -np.inf], np.nan).dropna()
    
    print(f"Calcolata log-RV per frequenza '{resample_freq}'. Trovate {len(log_realized_variance)} osservazioni.")
    return log_realized_variance


lrv_d_aapl = calculate_log_rv(df_aapl, price_col=['close'], resample_freq='1D')
lrv_h_aapl = calculate_log_rv(df_aapl, price_col=['close'], resample_freq='1h')
lrv_5m_aapl = calculate_log_rv(df_aapl, price_col=['close'], resample_freq='5min')

lrv_d_nflx = calculate_log_rv(df_nflx, price_col=['close'], resample_freq='1D')
lrv_h_nflx = calculate_log_rv(df_nflx, price_col=['close'], resample_freq='1h')
lrv_5m_nflx = calculate_log_rv(df_nflx, price_col=['close'], resample_freq='5min')

lrv_d_tsla = calculate_log_rv(df_tsla, price_col=['close'], resample_freq='1D')
lrv_h_tsla = calculate_log_rv(df_tsla, price_col=['close'], resample_freq='1h')
lrv_5m_tsla = calculate_log_rv(df_tsla, price_col=['close'], resample_freq='5min')

Calcolata log-RV per frequenza '1D'. Trovate 1289 osservazioni.
Calcolata log-RV per frequenza '1h'. Trovate 8986 osservazioni.
Calcolata log-RV per frequenza '5min'. Trovate 98891 osservazioni.
Calcolata log-RV per frequenza '1D'. Trovate 1289 osservazioni.
Calcolata log-RV per frequenza '1h'. Trovate 8984 osservazioni.
Calcolata log-RV per frequenza '5min'. Trovate 99588 osservazioni.
Calcolata log-RV per frequenza '1D'. Trovate 1289 osservazioni.
Calcolata log-RV per frequenza '1h'. Trovate 8984 osservazioni.
Calcolata log-RV per frequenza '5min'. Trovate 99676 osservazioni.


  result = func(self.values, **kwargs)
  result = func(self.values, **kwargs)
  result = func(self.values, **kwargs)
  result = func(self.values, **kwargs)
  result = func(self.values, **kwargs)
  result = func(self.values, **kwargs)
  result = func(self.values, **kwargs)
  result = func(self.values, **kwargs)
  result = func(self.values, **kwargs)


In [35]:
# RFSV

def theoretical_acf(h, n_lags):
    """Calcola l'ACF teorica per un processo fBm."""
    lags = np.arange(1, n_lags + 1)
    # Formula dell'autocovarianza per un fBm incrementale
    autocov = 0.5 * (np.abs(lags - 1)**(2*h) - 2 * np.abs(lags)**(2*h) + np.abs(lags + 1)**(2*h))
    return autocov

def est_parameters(log_rv_series, max_lags=100):
    """
    Stima l'esponente di Hurst H e la vol-of-vol nu.

    Args:
        log_rv_series (pandas.Series): Serie della log realized variance.
        max_lags (int): Numero di lag da usare per il matching dell'ACF.

    Returns:
        tuple: (h_estimated, nu_estimated)
    """
    # 1. Stima di H
    empirical_acf_vals = acf(log_rv_series, nlags=max_lags, fft=True)[1:]
    
    def objective_function(h):

        theoretical_acf_vals = theoretical_acf(h[0], max_lags)
        # Minimizza la somma dei quadrati degli errori
        return np.sum((empirical_acf_vals - theoretical_acf_vals)**2)

    # Esegui la minimizzazione per trovare H
    # Usiamo un valore iniziale ragionevole per H (es. 0.1)
    result = minimize(objective_function, x0=[0.1], bounds=[(0.01, 10)])
    h_estimated = result.x[0]

    # 2. Stima di nu
    # Var(log_sigma_t) = nu^2 * t^(2H)
    # Per t=1 (passo giornaliero), Var(log_RV) ≈ nu^2
    # Quindi, nu ≈ std(log_RV)
    nu_estimated = np.std(log_rv_series)
    
    return h_estimated, nu_estimated

In [36]:
def build_fbm_covariance_matrix(times, h):
    """
    Costruisce la matrice di covarianza per un fBm ai tempi specificati.

    Args:
        times (np.array): Array di istanti temporali.
        h (float): Esponente di Hurst.

    Returns:
        np.array: Matrice di covarianza.
    """
    n = len(times)
    cov_matrix = np.zeros((n, n))
    for i in range(n):
        for j in range(n):
            t_i = times[i]
            t_j = times[j]
            cov_matrix[i, j] = 0.5 * (
                np.power(t_i, 2 * h) + 
                np.power(t_j, 2 * h) - 
                np.power(np.abs(t_i - t_j), 2 * h)
            )
    return cov_matrix

def forecast_RFSV(past_log_rv_series, h, nu, horizon, n_sims=1):
    """
    Esegue una previsione condizionale per il modello RFSV.

    Args:
        past_log_rv_series (pd.Series): Serie storica di log-RV.
        h (float): Esponente di Hurst.
        nu (float): Vol-of-vol.
        horizon (int): Orizzonte di previsione.
        n_sims (int): Numero di percorsi futuri da simulare.

    Returns:
        np.array: Matrice di previsioni (n_sims, horizon). 
                  Se n_sims=1, restituisce la media condizionale.
    """
    n_past = len(past_log_rv_series)
    
    # Definisci gli istanti temporali per il passato e il futuro
    past_times = np.arange(1, n_past + 1)
    future_times = np.arange(n_past + 1, n_past + horizon + 1)
    all_times = np.concatenate([past_times, future_times])

    # 1. Costruisci la matrice di covarianza globale (per W, non per log_sigma)
    # la covarianza del processo log_sigma è nu^2 * Cov(W)
    cov_global = build_fbm_covariance_matrix(all_times, h)
    
    # 2. Partiziona la matrice
    sigma_pp = cov_global[:n_past, :n_past]
    sigma_ff = cov_global[n_past:, n_past:]
    sigma_fp = cov_global[n_past:, :n_past]
    sigma_pf = sigma_fp.T

    # 3. Calcola i componenti per la media e covarianza condizionali
    # È la parte computazionalmente più costosa: inversione di sigma_pp
    try:
        sigma_pp_inv = np.linalg.inv(sigma_pp)
    except np.linalg.LinAlgError:
        # Usa la pseudo-inversa se la matrice è singolare
        sigma_pp_inv = np.linalg.pinv(sigma_pp)

    # Il processo che osserviamo è log_sigma = nu * W
    # Quindi, il passato osservato di W è W_p = past_log_rv_series / nu
    past_W = past_log_rv_series.values / nu
    
    # Media condizionale (per il processo W)
    mean_cond_W = sigma_fp @ sigma_pp_inv @ past_W
    
    # Covarianza condizionale (per il processo W)
    cov_cond_W = sigma_ff - sigma_fp @ sigma_pp_inv @ sigma_pf
    
    # 4. Simula dal processo condizionale
    # Se n_sims > 1, campiona. Altrimenti, restituisci solo la media.
    if n_sims > 1:
        # Usa la decomposizione di Cholesky per simulare in modo efficiente
        # Aggiungiamo una piccola quantità alla diagonale per stabilità numerica
        L = cholesky(cov_cond_W + np.eye(horizon) * 1e-8, lower=True)
        # Genera campioni casuali normali
        z = np.random.normal(size=(horizon, n_sims))
        # Simula i percorsi
        simulated_paths_W = mean_cond_W[:, np.newaxis] + L @ z
        # Riconverti in percorsi di log_sigma
        simulated_paths_log_sigma = nu * simulated_paths_W.T
        return simulated_paths_log_sigma
    else:
        # Se vuoi solo il "best guess", restituisci la media condizionale
        mean_forecast_log_sigma = nu * mean_cond_W
        return mean_forecast_log_sigma.reshape(1, -1)

In [None]:
h_est_d_aapl, nu_est_d_aapl = est_parameters(lrv_d_aapl, max_lags=100)
h_est_h_aapl, nu_est_h_aapl = est_parameters(lrv_h_aapl, max_lags=100)
h_est_5m_aapl, nu_est_5m_aapl = est_parameters(lrv_5m_aapl, max_lags=100)

print("1 day:",h_est_d_aapl, nu_est_d_aapl)
print("1 hour:", h_est_h_aapl, nu_est_h_aapl)
print("5 minutes", h_est_5m_aapl, nu_est_5m_aapl)

h_est_d_nflx, nu_est_d_nflx = est_parameters(lrv_d_nflx, max_lags=100)
h_est_h_nflx, nu_est_h_nflx = est_parameters(lrv_h_nflx, max_lags=100)
h_est_5m_nflx, nu_est_5m_nflx = est_parameters(lrv_5m_nflx, max_lags=100)

print("1 day:",h_est_d_nflx, nu_est_d_nflx)
print("1 hour:", h_est_h_nflx, nu_est_h_nflx)
print("5 minutes", h_est_5m_nflx, nu_est_5m_nflx)

h_est_d_tsla, nu_est_d_tsla = est_parameters(lrv_d_tsla, max_lags=100)
h_est_h_tsla, nu_est_h_tsla = est_parameters(lrv_h_tsla, max_lags=100)
h_est_5m_tsla, nu_est_5m_tsla = est_parameters(lrv_5m_tsla, max_lags=100)

print("1 day:",h_est_d_tsla, nu_est_d_tsla)
print("1 hour:", h_est_h_tsla, nu_est_h_tsla)
print("5 minutes", h_est_5m_tsla, nu_est_5m_tsla)

In [38]:
def estimate_h_loglog(series, scales, q=1):
    """
    Stima l'esponente di Hurst H usando una regressione log-log sui momenti.

    Args:
        series (pd.Series): La serie temporale (preferibilmente detrendizzata).
        scales (list of int): Lista di scale temporali (lag) da analizzare.
        q (int): L'ordine del momento da usare (solitamente 1 o 2).

    Returns:
        tuple: (H_stimato, R_squared)
    """
    moments = []
    for tau in scales:
        # Calcola gli incrementi sulla scala 'tau' e rimuovi i NaN
        increments = series.diff(tau).dropna()
        # Calcola il momento di ordine q
        moment = np.mean(np.abs(increments)**q)
        moments.append(moment)
    
    # Prepara i dati per la regressione log-log
    log_tau = np.log(scales)
    log_moments = np.log(moments)
    
    # Aggiungi una costante per l'intercetta della regressione
    X = sm.add_constant(log_tau)
    y = log_moments
    
    # Esegui la regressione lineare (OLS)
    model = sm.OLS(y, X)
    results = model.fit()
    
    # La pendenza è il coefficiente di log_tau
    slope = results.params[1]
    
    # Ricava H dalla pendenza
    h_estimated = slope / q
    
    return h_estimated, results.rsquared

In [40]:

# Usiamo una delle tue serie detrendizzate, ad esempio quella giornaliera di AAPL
# Assumiamo che 'lvol_d_detrended_aapl' sia disponibile
# lvol_d_detrended_aapl = detrend_series(calculate_log_rv(df_aapl, resample_freq='1D'))

# Definiamo le scale temporali su cui analizzare il processo
# È bene scegliere scale sia piccole che grandi
time_scales = [1, 2, 5, 10, 20, 50, 100, 200]

# Stima H usando il primo momento (q=1)
H_loglog_q1, r2_q1 = estimate_h_loglog(lrv_d_aapl, time_scales, q=1)

# Stima H usando il secondo momento (q=2)
H_loglog_q2, r2_q2 = estimate_h_loglog(lrv_d_aapl, time_scales, q=2)


print(f"Stima di H (q=1): {H_loglog_q1:.4f}")
print(f"R-squared della regressione (q=1): {r2_q1:.4f}\n")

print(f"Stima di H (q=2): {H_loglog_q2:.4f}")
print(f"R-squared della regressione (q=2): {r2_q2:.4f}")

Stima di H (q=1): 0.1069
R-squared della regressione (q=1): 0.9869

Stima di H (q=2): 0.0946
R-squared della regressione (q=2): 0.9826
