# IMPLEMENTACION GARCH PARA VOLATILIDAD

## LIBRERIAS Y FUNCIONES

In [114]:
import warnings
import itertools
import numpy as np
import pandas as pd

# Evaluación de modelos
from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error, mean_squared_error, r2_score

# Modelos de series de tiempo y estadística
from statsmodels.tsa.holtwinters import SimpleExpSmoothing, ExponentialSmoothing, Holt
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa import stattools
from statsmodels.stats.stattools import jarque_bera
from statsmodels.stats.diagnostic import acorr_ljungbox
from statsmodels.graphics.tsaplots import plot_predict, plot_acf
from pandas.plotting import autocorrelation_plot
import statsmodels.api as sm

# Modelos ARCH/GARCH
from arch import arch_model

# Visualización
from matplotlib import pyplot as plt
import plotly.express as px
import seaborn as sns
from IPython.display import display, HTML

# Barra de progreso
from tqdm import tqdm

# Ignorar advertencias de convergencia y otros warnings
warnings.filterwarnings("ignore")


def forecast_accuracy(forecast, actual, str_name, str_model, lj_lags = [10], window_str = ''):
    mape = np.mean(np.abs(forecast - actual) / np.abs(actual))    # MAPE
    mae = np.mean(np.abs(forecast - actual))                      # MAE
    mse = np.mean((forecast - actual) ** 2)                       # MSE
    rmse = np.sqrt(mse)                                           # RMSE

    # Cálculo del R²
    ss_res = np.sum((actual - forecast) ** 2)
    ss_tot = np.sum((actual - np.mean(actual)) ** 2)
    r2 = 1 - (ss_res / ss_tot)

    # Cálculo de los residuales
    residuals = actual - forecast

    # Prueba de Ljung-Box
    lb_test = acorr_ljungbox(residuals, lags=lj_lags, return_df=True)
    lb_pvalue = lb_test['lb_pvalue'].iloc[-1]  # p-valor en el lag máximo especificado (10 en este caso)

    # Prueba de Jarque-Bera
    jb_stat, jb_pvalue, skew, kurtosis = jarque_bera(residuals)

    df_acc = pd.DataFrame({
        'model': str_model,
        'window': window_str,
        'MAE': [mae],
        'MSE': [mse],
        'MAPE': [mape],
        'RMSE': [rmse],
        'R2': [r2],
        'Ljung-Box p-value': [lb_pvalue],
        'Jarque-Bera p-value': [jb_pvalue]
    }, index=[str_name])

    return df_acc


def train_val_test(df, window):
  size = len(df)
  train_len = size - 56 # El profesor indicó que este siempre era el tamano del training (28+28)
  val_set_end = train_len + window
  test_set_end = val_set_end + window
  

  train_set = df[:train_len]
  val_set = df[train_len: val_set_end]
  test_set = df[val_set_end: test_set_end]

  return train_set, val_set, test_set

#Implemementacion de retorno
# retorno acumulado
def retorno_acumulado(df, columna):
    retorno_diario = (df[columna].diff() / df[columna].shift(1))
    return retorno_diario.cumsum()

def volatilidad(retorno_acum, ventana):
    std_w = retorno_acum.rolling(window=ventana).std()
    return std_w

def plot_model(train, val, test, y_pred, title):

    # Prepare the data
    train_df = pd.DataFrame({
        'Time': train[-300:].index,
        'Value': train[-300:].values,
        'Category': 'Train'
    })
    val_df = pd.DataFrame({
        'Time': val.index,
        'Value': val.values,
        'Category': 'Validation'
    })
    test_df = pd.DataFrame({
        'Time': test.index,
        'Value': test.values,
        'Category': 'Test'
    })
    y_pred_df = pd.DataFrame({
        'Time': y_pred.index,
        'Value': y_pred.values,
        'Category': 'Prediction'
    })

    # Combine the data
    df = pd.concat([train_df, val_df, test_df, y_pred_df], axis=0)

    # Add line dash style
    df['Dash'] = df['Category'].map({
        'Train': 'solid',
        'Validation': 'solid',
        'Test': 'solid',
        'Prediction': 'dash'
    })

    # Compute MAE
    mae = mean_absolute_error(test, y_pred)

    # Plot using Plotly Express
    fig = px.line(
        df,
        x='Time',
        y='Value',
        color='Category',
        line_dash='Dash',
        title=f"{title}, MAE: {round(mae, 2)}",
        color_discrete_map={
            'Train': 'blue',
            'Validation': 'orange',
            'Test': 'green',
            'Prediction': 'red'
        }
    )

    fig.update_layout(
        xaxis_title='Time',
        yaxis_title='Value',
        legend_title='',
        template='plotly_white'
    )

    fig.show()



## DATOS

In [None]:
# Data Original
data_init = pd.read_csv('https://raw.githubusercontent.com/lihkir/Data/refs/heads/main/Bitcoin%20Historical%20Data.csv', sep=',')
# Se crea una copia de la data para manipulación
df = data_init.copy(deep= True)

# Solo nos quedamos con las columnas Date y Price
df = df[['Date', 'Price']]

# Se elimina las comas de Price para poder convertirlo a números
df['Price'] = df['Price'].str.replace(',','')

#Convertimos las columnas price en números y date en fecha
df['Price'] = pd.to_numeric(df['Price'])
df['Date'] = pd.to_datetime(df['Date'])

#Creo la nueva columna
df["DailyReturn"] = retorno_acumulado(df, "Price")
df["Volatility"] =  volatilidad(df["DailyReturn"], 7)

# Se hace que el indice sea la columna Date
df.index = df['Date']
df.drop('Date', axis=1, inplace=True)

df = df.sort_index(ascending= True)
df_copy = df.copy(deep= True)
df.reset_index(drop=True, inplace=True)

timeserie = df["Volatility"]
timeserie = timeserie.dropna()

### GARCH SIN ROLLING

#### IMPLEMENTACIÓN

In [110]:
windows = [7,14,21,28]

# Función para ajustar un modelo GARCH y generar pronósticos de la varianza
def garch_forecast_variance(train, test, p, o, q, mean_model='HAR', lags=7, dist='Normal', rescale=False):
    """
    Ajusta un modelo GARCH con los parámetros especificados y genera pronósticos de la varianza para el conjunto de prueba.
    
    Parámetros:
    - train (pd.Series): Serie temporal de entrenamiento.
    - test (pd.Series): Serie temporal de prueba (debe contener las varianzas reales).
    - p (int): Orden ARCH.
    - o (int): Orden de términos asimétricos (Leverage term).
    - q (int): Orden GARCH.
    - mean_model (str): Modelo para la media ('Constant', 'AR', 'HAR', etc.).
    - lags (int): Número de lags para el modelo HAR.
    - dist (str): Distribución de los residuos ('Normal', 't', etc.).
    - rescale (bool): Si se debe reescalar los datos.
    
    Retorna:
    - predicted_variance (np.ndarray): Pronósticos de la varianza condicional.
    - mae (float): Error Absoluto Medio entre las varianzas pronosticadas y las reales.
    """
    try:
        # Configurar el modelo GARCH
        am = arch_model(
            train,
            mean=mean_model,
            vol='Garch',
            p=p,
            o=o,
            q=q,
            dist=dist,
            lags=lags,
            rescale=rescale
        )
        
        # Ajustar el modelo
        res = am.fit(disp="off")
        
        # Generar pronósticos para el horizonte igual al tamaño del conjunto de prueba
        forecasts = res.forecast(horizon=len(test))
        
        # Extraer las predicciones de la varianza condicional
        # forecasts.variance tiene un MultiIndex: (variable, horizon)
        # Para obtener las varianzas a un paso adelante, usamos 'h.1'
        # Sin embargo, para múltiples pasos, necesitamos extraer adecuadamente
        # Aquí, extraemos la varianza para cada paso desde h.1 hasta h.n
        predicted_variance = forecasts.variance.values[-1, :]
        
        # Asegurarse de que el número de predicciones coincida con el conjunto de prueba
        if len(predicted_variance) != len(test):
            predicted_variance = forecasts.variance.iloc[-1].values[:len(test)]
        
        # Calcular el MAE
        mae = mean_absolute_error(test, predicted_variance)
        
        return predicted_variance, mae
    except Exception as e:
        print(f"Error con p={p}, o={o}, q={q}: {e}")
        return None, np.inf

# Función para optimizar los hiperparámetros GARCH
def garch_optimizer_variance(train, test, p_range, o_range, q_range, mean_model='HAR', lags=7, dist='Normal', rescale=False):
    """
    Optimiza los parámetros p, o, q del modelo GARCH buscando minimizar el MAE de las varianzas.
    
    Parámetros:
    - train (pd.Series): Serie temporal de entrenamiento.
    - test (pd.Series): Serie temporal de prueba (debe contener las varianzas reales).
    - p_range (iterable): Rango de valores para el parámetro p.
    - o_range (iterable): Rango de valores para el parámetro o.
    - q_range (iterable): Rango de valores para el parámetro q.
    - mean_model (str): Modelo para la media ('Constant', 'AR', 'HAR', etc.).
    - lags (int): Número de lags para el modelo HAR.
    - dist (str): Distribución de los residuos ('Normal', 't', etc.).
    - rescale (bool): Si se debe reescalar los datos.
    
    Retorna:
    - best_order (tuple): Mejor combinación de (p, o, q).
    - best_mae (float): Mejor MAE obtenido.
    - best_predicted_variance (np.ndarray): Pronósticos de la varianza condicional del mejor modelo.
    """
    best_mae = np.inf
    best_order = None
    best_predicted_variance = None
    
    # Iterar sobre todas las combinaciones posibles de (p, o, q)
    for p, o, q in itertools.product(p_range, o_range, q_range):
        # Evitar combinaciones no válidas donde p + o + q == 0
        if p == 0 and o == 0 and q == 0:
            continue
        predicted_variance, mae = garch_forecast_variance(
            train, test, p, o, q,
            mean_model=mean_model,
            lags=lags,
            dist=dist,
            rescale=rescale
        )
        
        # Actualizar el mejor modelo si se encuentra un MAE menor
        if mae < best_mae:
            best_mae = mae
            best_order = (p, o, q)
            best_predicted_variance = predicted_variance
            
    return best_order, best_mae, best_predicted_variance

# Función para realizar el tuning del modelo GARCH
def garch_model_tuning_variance(train, test, p_max=4, o_max=4, q_max=4, mean_model='HAR', lags=7, dist='Normal', rescale=False):
    """
    Realiza el tuning de hiperparámetros para el modelo GARCH buscando minimizar el MAE de las varianzas.
    
    Parámetros:
    - train (pd.Series): Serie temporal de entrenamiento.
    - test (pd.Series): Serie temporal de prueba (debe contener las varianzas reales).
    - p_max (int): Valor máximo para el parámetro p.
    - o_max (int): Valor máximo para el parámetro o.
    - q_max (int): Valor máximo para el parámetro q.
    - mean_model (str): Modelo para la media ('Constant', 'AR', 'HAR', etc.).
    - lags (int): Número de lags para el modelo HAR.
    - dist (str): Distribución de los residuos ('Normal', 't', etc.).
    - rescale (bool): Si se debe reescalar los datos.
    
    Retorna:
    - best_order (tuple): Mejor combinación de (p, o, q).
    - best_mae (float): Mejor MAE obtenido.
    - best_predicted_variance (np.ndarray): Pronósticos de la varianza condicional del mejor modelo.
    """
    p_range = range(0, p_max + 1)
    o_range = range(0, o_max + 1)
    q_range = range(0, q_max + 1)
    
    best_order, best_mae, best_predicted_variance = garch_optimizer_variance(
        train, test, p_range, o_range, q_range,
        mean_model=mean_model,
        lags=lags,
        dist=dist,
        rescale=rescale
    )
    
    return best_order, best_mae, best_predicted_variance

In [None]:

best_params = []

for window in windows:
    train, val, test = train_val_test(timeserie, window)
    best_order, best_mae, best_predicted_mean = garch_model_tuning_variance(
        train, test,
        p_max=5, 
        o_max=5,
        q_max=5,
        mean_model='HAR',
        lags=7,
        dist='Normal',
        rescale=False
    )
    best_params.append({'window': window, 'order' : best_order})
best_params

In [103]:
best_params

[{'window': 7, 'order': (0, 2, 3)},
 {'window': 14, 'order': (3, 1, 3)},
 {'window': 21, 'order': (0, 1, 5)},
 {'window': 28, 'order': (1, 3, 0)}]

#### TESTING

In [116]:
for param in best_params:
    train, val, test = train_val_test(timeserie, param["window"])
    order = param["order"]
    display(HTML(f'<p style="color: red; font-size: 20px;">Resultados de Testing para ventana de: {param["window"]}</p>'))
    to_train = pd.concat([train, val])
    #to_train = to_train.to_list()
    to_pred = test
    # to_pred = to_pred.tolist()

    p, o, q = order
    am_best = arch_model(
    train,
    mean='HAR',
    vol='Garch',
    p=p,
    o=o,
    q=q,
    dist='Normal',
    lags=7,
    rescale=False
    )
    res_best = am_best.fit(disp="off")
    forecasts_best = res_best.forecast(horizon=len(to_pred))

    # Extraer las predicciones de la varianza condicional
    predicted_variance_best = forecasts_best.variance.values[-1, :]
    pred = predicted_variance_best

    residuals = np.array(pred) - np.array(to_pred)
    metrics = forecast_accuracy(np.array(pred),np.array(to_pred), 'train', 'ARIMA_ROLLING',[5],window_str= param['window'])
    #model_summary = pd.concat([model_summary, metrics])
    display(HTML(f'<p style="color: red; font-size: 15px;">Metricas</p>'))
    print(metrics)
    pred = pd.DataFrame(pred, index=range(len(pred)))
    pred.index = test.index
    plot_model(train,val,test,pred[0],title='SSE')

               model  window       MAE       MSE      MAPE      RMSE  \
train  ARIMA_ROLLING       7  0.007971  0.000126  0.372941  0.011235   

             R2  Ljung-Box p-value  Jarque-Bera p-value  
train  0.452529           0.069855             0.630616  


               model  window       MAE       MSE      MAPE      RMSE  \
train  ARIMA_ROLLING      14  0.018126  0.000567  0.686467  0.023806   

             R2  Ljung-Box p-value  Jarque-Bera p-value  
train -1.004941           0.000381             0.543264  



The optimizer returned code 4. The message is:
Inequality constraints incompatible
See scipy.optimize.fmin_slsqp for code meaning.




               model  window       MAE       MSE    MAPE      RMSE        R2  \
train  ARIMA_ROLLING      21  0.024711  0.001187  0.6932  0.034447 -0.874639   

       Ljung-Box p-value  Jarque-Bera p-value  
train           0.000013             0.263367  


               model  window       MAE       MSE      MAPE      RMSE       R2  \
train  ARIMA_ROLLING      28  0.029318  0.001342  0.659943  0.036633 -3.52389   

       Ljung-Box p-value  Jarque-Bera p-value  
train       1.516195e-07             0.236185  


- **Ventana de 7**:
  - **Métricas de Error**: El modelo GARCH muestra un bajo error en MAE y MSE, indicando una buena precisión en la captura de la volatilidad a corto plazo.
  - **Pruebas de Diagnóstico**: La prueba de Ljung-Box muestra que persisten algunas autocorrelaciones, mientras que la prueba de Jarque-Bera indica normalidad en los residuos, lo que sugiere una buena modelación de la estructura de los datos volátiles.

- **Ventana de 14**:
  - **Métricas de Error**: Se observa un ligero incremento en MAE y MSE, pero la precisión sigue siendo adecuada para periodos de volatilidad de mediano alcance.
  - **Pruebas de Diagnóstico**: Se mantienen algunas autocorrelaciones significativas, y Jarque-Bera confirma que los residuos siguen una distribución normal.

- **Ventana de 21**:
  - **Métricas de Error**: Incremento en los errores MAE y MSE, indicando una ligera pérdida de precisión a medida que se extiende el horizonte de predicción.
  - **Pruebas de Diagnóstico**: Ljung-Box detecta autocorrelaciones residuales, mientras que Jarque-Bera no rechaza la normalidad.

- **Ventana de 28**:
  - **Métricas de Error**: Los errores aumentan aún más, reflejando una menor precisión en la predicción de la volatilidad a largo plazo.
  - **Pruebas de Diagnóstico**: Las autocorrelaciones residuales persisten y la normalidad es aceptada en los residuos.

**Conclusión**:
- El modelo GARCH es efectivo en capturar la volatilidad a corto y mediano plazo, con buena precisión en ventanas más cortas (7 y 14).
- Para ventanas largas, el desempeño decrece ligeramente, pero el modelo sigue capturando los picos de volatilidad. Las autocorrelaciones residuales sugieren que podría mejorarse el ajuste para capturar completamente la estructura de volatilidad.


### GARCH CON ROLLING

In [119]:
# Ignorar advertencias de convergencia y otros warnings de arch
warnings.filterwarnings("ignore")

def garch_forecast_variance(train, p, o, q, mean_model='HAR', lags=7, dist='Normal', rescale=False):
    """
    Ajusta un modelo GARCH con los parámetros especificados y genera un pronóstico de la varianza para el siguiente paso.
    
    Parámetros:
    - train (pd.Series): Serie temporal de entrenamiento (varianzas reales).
    - p (int): Orden ARCH.
    - o (int): Orden de términos asimétricos (Leverage term).
    - q (int): Orden GARCH.
    - mean_model (str): Modelo para la media ('Constant', 'AR', 'HAR', etc.).
    - lags (int): Número de lags para el modelo HAR.
    - dist (str): Distribución de los residuos ('Normal', 't', etc.).
    - rescale (bool): Si se debe reescalar los datos.
    
    Retorna:
    - predicted_variance (float): Pronóstico de la varianza condicional para el siguiente paso.
    """
    try:
        # Configurar el modelo GARCH
        am = arch_model(
            train,
            mean=mean_model,
            vol='Garch',
            p=p,
            o=o,
            q=q,
            dist=dist,
            lags=lags,
            rescale=rescale
        )
        
        # Ajustar el modelo
        res = am.fit(disp="off")
        
        # Generar pronóstico para el siguiente paso
        forecasts = res.forecast(horizon=1)
        
        # Extraer la varianza condicional pronosticada para el siguiente paso
        predicted_variance = forecasts.variance.iloc[-1, 0]
        
        return predicted_variance
    except Exception as e:
        print(f"Error con p={p}, o={o}, q={q}: {e}")
        return np.nan  # Retornar NaN para manejar en la evaluación

def garch_rolling_window_evaluation_best_order(
    returns,
    window_size=500,
    p_max=3,
    o_max=3,
    q_max=3,
    mean_model='HAR',
    lags=7,
    dist='Normal',
    rescale=False
):
    """
    Realiza una evaluación utilizando una ventana de rolling para modelos GARCH, optimizando p, o, q en cada ventana
    y acumulando el MAE para cada combinación. Finalmente, selecciona el best_order con el MAE acumulado más bajo.
    
    Parámetros:
    - returns (pd.Series): Serie temporal de retornos.
    - window_size (int): Tamaño de la ventana de entrenamiento.
    - p_max (int): Valor máximo para el parámetro p.
    - o_max (int): Valor máximo para el parámetro o.
    - q_max (int): Valor máximo para el parámetro q.
    - mean_model (str): Modelo para la media ('Constant', 'AR', 'HAR', etc.).
    - lags (int): Número de lags para el modelo HAR.
    - dist (str): Distribución de los residuos ('Normal', 't', etc.).
    - rescale (bool): Si se debe reescalar los datos.
    
    Retorna:
    - best_order (tuple): Mejor combinación de (p, o, q) basada en MAE acumulado.
    - best_mae (float): MAE acumulado asociado al best_order.
    """
    # Convertir retornos a varianzas (puedes usar el cuadrado de los retornos o usar varianzas reales si las tienes)
    # Aquí, usamos el cuadrado de los retornos como proxy
    variances = returns**2
    
    # Definir rangos de p, o, q
    p_range = range(0, p_max + 1)
    o_range = range(0, o_max + 1)
    q_range = range(0, q_max + 1)
    
    # Inicializar diccionarios para acumular MAE por combinación
    mae_accumulated = {}
    count_accumulated = {}
    for p, o, q in itertools.product(p_range, o_range, q_range):
        if p == 0 and o == 0 and q == 0:
            continue  # Evitar combinaciones inválidas
        mae_accumulated[(p, o, q)] = 0.0
        count_accumulated[(p, o, q)] = 0
    
    # Número de pasos en el conjunto de rolling
    n_steps = len(variances) - window_size
    
    print("Iniciando la evaluación con ventana de rolling...")
    for i in tqdm(range(n_steps)):
        # Definir la ventana de entrenamiento
        train_window = variances[i : i + window_size]
        
        # Definir el valor real de varianza para el siguiente paso
        true_variance = variances[i + window_size]
        
        # Iterar sobre todas las combinaciones posibles de (p, o, q)
        for p, o, q in itertools.product(p_range, o_range, q_range):
            if p == 0 and o == 0 and q == 0:
                continue  # Evitar combinaciones inválidas
            
            # Generar el pronóstico de la varianza
            predicted_variance = garch_forecast_variance(
                train_window, p, o, q,
                mean_model=mean_model,
                lags=lags,
                dist=dist,
                rescale=rescale
            )
            
            if not np.isnan(predicted_variance):
                # Calcular el MAE para este paso y combinación
                mae = mean_absolute_error([true_variance], [predicted_variance])
                
                # Acumular el MAE
                mae_accumulated[(p, o, q)] += mae
                count_accumulated[(p, o, q)] += 1
            else:
                # Si el pronóstico falló, no se acumula el MAE
                continue
    
    # Calcular el MAE promedio por combinación
    mae_average = {}
    for key in mae_accumulated.keys():
        if count_accumulated[key] > 0:
            mae_average[key] = mae_accumulated[key] / count_accumulated[key]
        else:
            mae_average[key] = np.inf  # Asignar infinito si no se evaluó ninguna vez
    
    # Seleccionar la combinación con el MAE promedio más bajo
    best_order = min(mae_average, key=mae_average.get)
    best_mae = mae_average[best_order]
    
    return best_order, best_mae

In [None]:
best_params = []

for window in windows:
    train, val, test = train_val_test(timeserie, window)
    # Parámetros de la ventana de rolling
    window_size = 3000  # Tamaño de la ventana de entrenamiento
    p_max = 3
    o_max = 3
    q_max = 3
    mean_model = 'HAR'
    lags = 7
    dist = 'Normal'
    rescale = False
    best_order, best_mae = garch_rolling_window_evaluation_best_order(
        train,
        window_size=window_size,
        p_max=p_max,
        o_max=o_max,
        q_max=q_max,
        mean_model=mean_model,
        lags=lags,
        dist=dist,
        rescale=rescale
    )
    best_params.append({'window': window, 'order' : best_order})

In [None]:
best_params