In [2]:
!pip install numpy pandas matplotlib scipy scikit-learn pytest -q
!pip freeze > requirements.txt

In [None]:
# Standard Libraries
import numpy as np  # Numerical operations
import pandas as pd  # Data handling and processing
from datetime import datetime  # Handling date and time-related operations

# Visualization
import matplotlib.pyplot as plt  # Plotting and visualization

# Numerical Computation and Linear Algebra
from scipy.linalg import toeplitz, solve, inv, pinv  # Linear algebra tools for matrix operations
from scipy.optimize import minimize, minimize_scalar  # Optimization functions for parameter estimation

import warnings

from sklearn.linear_model import LinearRegression
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import PolynomialFeatures
from sklearn.neural_network import MLPRegressor
from sklearn.model_selection import GridSearchCV

In [None]:
class TimeSeriesPreprocessor:
    """
    Clase para procesar series de tiempo a partir de un conjunto de valores numéricos,
    generando un rango de fechas y aplicando interpolación para valores faltantes.

    Atributos:
        num_array (list o np.ndarray): Lista o array de valores numéricos.
        start_date (str o datetime): Fecha de inicio de la serie de tiempo.
        end_date (str o datetime): Fecha de fin de la serie de tiempo.
        freq (str): Frecuencia de la serie de tiempo (ej. 'D' para diario, 'H' para horas, etc.).
        interp_method (str): Método de interpolación para manejar valores faltantes (por defecto 'linear').
    """

    def __init__(self, num_array, start_date, end_date, freq, interp_method='linear'):
        """
        Inicializa la clase con los parámetros requeridos.

        :param num_array: Lista o array de valores numéricos.
        :param start_date: Fecha de inicio de la serie de tiempo (str o datetime).
        :param end_date: Fecha de fin de la serie de tiempo (str o datetime).
        :param freq: Frecuencia de la serie de tiempo (ej. 'D' para diario, 'H' para horas, etc.).
        :param interp_method: Método de interpolación para manejar valores faltantes (por defecto 'linear').
        """
        self.num_array = np.array(num_array, dtype=np.float64)  # Convierte a array NumPy para manejo eficiente
        self.start_date = pd.to_datetime(start_date)  # Convierte la fecha de inicio a formato datetime
        self.end_date = pd.to_datetime(end_date)  # Convierte la fecha de fin a formato datetime
        self.freq = freq  # Define la frecuencia de la serie de tiempo
        self.interp_method = interp_method  # Define el método de interpolación para valores faltantes

        # Validaciones básicas
        if len(self.num_array) == 0:
            raise ValueError("El array de valores numéricos no puede estar vacío.")

        if self.start_date >= self.end_date:
            raise ValueError("La fecha de inicio debe ser anterior a la fecha de fin.")

        if not isinstance(self.freq, str):
            raise TypeError("La frecuencia debe ser una cadena de caracteres (ej. 'D', 'H').")


    def _convert_to_pd_series(self):
        """
        Convierte la lista de valores numéricos en una serie de Pandas.

        :return: Serie de Pandas con valores numéricos o None en caso de error.
        """
        try:
            return pd.Series(self.num_array, dtype=float)
        except Exception as e:
            print(f"🚨 Error al convertir los valores numéricos en una serie de Pandas: {e}")
            return None

    def _create_pd_datetime(self):
        """
        Genera un rango de fechas con la frecuencia especificada.

        :return: Serie de Pandas con fechas o None en caso de error.
        """
        try:
            return pd.date_range(start=self.start_date, end=self.end_date, freq=self.freq)
        except Exception as e:
            print(f"📅 Error al generar la serie de fechas: {e}")
            return None

    def create_time_series(self, col_name):
        """
        Crea una serie de tiempo combinando valores numéricos con su correspondiente rango de fechas.

        :return: DataFrame de Pandas con dos columnas: 'Date' y 'Value', o None en caso de error.
        """
        try:
            dates = self._create_pd_datetime()
            values = self._convert_to_pd_series()

            if dates is None or values is None:
                print("❌ Error: No se pudo generar la serie de tiempo debido a errores previos.")
                return None

            if len(dates) != len(values):
                print("⚠️ Error: La cantidad de valores numéricos no coincide con la cantidad de fechas generadas.")
                return None

            return pd.DataFrame({'Date': dates, col_name: values})

        except Exception as e:
            print(f"⚠️ Error en la creación de la serie de tiempo: {e}")
            return None

    def interpolate_and_fill(self, series):
        """
        Aplica interpolación y relleno de valores faltantes en una serie de Pandas.

        :param series: Serie de Pandas con posibles valores NaN.
        :return: Serie de Pandas con valores interpolados o None en caso de error.
        """
        try:
            series = series.copy()  # Evita modificar el original
            series = series.interpolate(method=self.interp_method)  # Interpolación
            series = series.ffill().bfill()  # Rellena hacia adelante y hacia atrás
            return series

        except Exception as e:
            print(f"⚠️ Error en la interpolación y relleno de valores: {e}")
        
        
        return None


# 🔹 Definimos parámetros para la serie de tiempo
start_date = "2024-01-01"
end_date = "2024-01-10"
freq = "D"  # Diaria

# 🔹 Creamos un array de valores numéricos con algunos NaN simulando datos faltantes
num_values = np.array([10, np.nan, 15, np.nan, 20, 22, np.nan, 30, np.nan, 40])

# 🔹 Instanciamos la clase
preprocessor = TimeSeriesPreprocessor(num_values, start_date, end_date, freq)

# 🔹 Creamos la serie de tiempo
time_series_df = preprocessor.create_time_series(col_name = "hf_y")

time_series_df

Unnamed: 0,Date,hf_y
0,2024-01-01,10.0
1,2024-01-02,
2,2024-01-03,15.0
3,2024-01-04,
4,2024-01-05,20.0
5,2024-01-06,22.0
6,2024-01-07,
7,2024-01-08,30.0
8,2024-01-09,
9,2024-01-10,40.0


In [4]:
class TimeSeriesUnifier:
    """
    Clase para procesar y unir series de tiempo de alta y baja frecuencia.
    Se preparan adecuadamente para su posterior uso en desagregación.
    """

    def __init__(self, ts_high_freq, start_date_hf, end_date_hf, freq_hf,
                 ts_low_freq, start_date_lf, end_date_lf, freq_lf):
        """
        Inicializa la clase con series de alta y baja frecuencia.

        :param ts_high_freq: Serie de tiempo de alta frecuencia (pandas Series o DataFrame).
        :param start_date_hf: Fecha de inicio de la serie de alta frecuencia (str o datetime).
        :param end_date_hf: Fecha de fin de la serie de alta frecuencia (str o datetime).
        :param freq_hf: Frecuencia de la serie de alta frecuencia (str, ej. 'D', 'H').
        :param ts_low_freq: Serie de tiempo de baja frecuencia (pandas Series o DataFrame).
        :param start_date_lf: Fecha de inicio de la serie de baja frecuencia (str o datetime).
        :param end_date_lf: Fecha de fin de la serie de baja frecuencia (str o datetime).
        :param freq_lf: Frecuencia de la serie de baja frecuencia (str, ej. 'M', 'Q').
        """

        # Convertir fechas a datetime
        self.start_date_hf = pd.to_datetime(start_date_hf)
        self.end_date_hf = pd.to_datetime(end_date_hf)
        self.start_date_lf = pd.to_datetime(start_date_lf)
        self.end_date_lf = pd.to_datetime(end_date_lf)

        # Validar rangos de fechas
        if self.start_date_hf > self.end_date_hf:
            raise ValueError("La fecha de inicio de alta frecuencia no puede ser mayor que la de fin.")
        if self.start_date_lf > self.end_date_lf:
            raise ValueError("La fecha de inicio de baja frecuencia no puede ser mayor que la de fin.")

        # Validar tipos de series de tiempo
        if not isinstance(ts_high_freq, (pd.Series, pd.DataFrame)):
            raise TypeError("La serie de alta frecuencia debe ser un pandas Series o DataFrame.")
        if not isinstance(ts_low_freq, (pd.Series, pd.DataFrame)):
            raise TypeError("La serie de baja frecuencia debe ser un pandas Series o DataFrame.")

        # Guardar series de tiempo
        self.ts_high_freq = ts_high_freq
        self.ts_low_freq = ts_low_freq
        self.freq_hf = freq_hf
        self.freq_lf = freq_lf

    def _process_time_series(self):
        """
        Procesa y unifica las series de alta y baja frecuencia.
        """

        # Crear la serie de alta frecuencia
        preprocessor_hf = TimeSeriesPreprocessor(self.ts_high_freq, self.start_date_hf, self.end_date_hf, self.freq_hf)
        time_series_hf_df = preprocessor_hf.create_time_series(col_name = "High_freq_y")

        # Crear la serie de baja frecuencia
        preprocessor_lf = TimeSeriesPreprocessor(self.ts_low_freq, self.start_date_lf, self.end_date_lf, self.freq_lf)
        time_series_lf_df = preprocessor_lf.create_time_series(col_name = "Low_freq_y")

        # Validar que ambas series fueron creadas correctamente
        if time_series_hf_df is None or time_series_lf_df is None:
            raise ValueError("No se pudieron procesar correctamente una o ambas series de tiempo.")

        # Unir las series por la columna de fecha
        time_series_merged = pd.merge(left=time_series_hf_df, right=time_series_lf_df, on="Date", how="left")

        return time_series_merged

    def _infer_start_value(self, date_index: pd.DatetimeIndex, periods_per_year: int) -> int:
        """
        Infers the start_value based on the position of the first date within the available data range.

        Args:
            date_index (pd.DatetimeIndex): List of generated dates.
            periods_per_year (int): Expected number of periods per year.

        Returns:
            int: Inferred start value.
        """
        try:
            if len(date_index) == 0:
                raise ValueError("El índice de fechas está vacío.")

            first_date = date_index[0]  # Primera fecha de la serie
            last_date = date_index[-1]  # Última fecha de la serie

            # Determinar en qué parte del ciclo anual cae
            if periods_per_year == 365:  # Diario
                return first_date.timetuple().tm_yday  # Día del año (1-365)
            elif periods_per_year == 52:  # Semanal
                return first_date.isocalendar()[1]  # Semana del año (1-52)
            elif periods_per_year == 12:  # Mensual
                return first_date.month  # Mes del año (1-12)
            elif periods_per_year == 4:  # Trimestral
                num_trimestres_disponibles = ((last_date.month - first_date.month) // 3) + 1
                return min((first_date.month - 1) // 3 + 1, num_trimestres_disponibles)  # Evita ciclos incorrectos
            elif periods_per_year == 1:  # Anual
                return 1  # Siempre comienza en 1
            else:
                raise ValueError(f"Frecuencia no reconocida: {periods_per_year}")

        except Exception as e:
            print(f"⚠️ Error en _infer_start_value: {e}")
            return 1  # Valor por defecto para evitar fallos
        

    def _generate_indicator(self, num_obs: int, periods_per_year: int, start_value: int) -> np.array:
        """
        Generates a cyclic indicator based on the number of periods per year.

        Args:
            num_obs (int): Total number of observations.
            periods_per_year (int): Number of periods in a year.
            start_value (int): Initial value of the indicator.

        Returns:
            np.array: Vector with the cyclic indicator.
        """
        try:
            if num_obs <= 0:
                raise ValueError("El número de observaciones debe ser mayor que cero.")
            if periods_per_year <= 0:
                raise ValueError("El número de periodos por año debe ser mayor que cero.")

            # Ajustar start_value para estar dentro del rango disponible
            start_value = (start_value - 1) % periods_per_year + 1

            # Generar la secuencia cíclica dentro del rango observado
            indicator = (np.arange(num_obs) + start_value - 1) % min(num_obs, periods_per_year) + 1
            return indicator

        except Exception as e:
            print(f"⚠️ Error en _generate_indicator: {e}")
            return np.array([])
        
    def _infer_frequency(self, freq: str) -> int:
        """
        Infers the annual frequency based on the given frequency string.

        Args:
            freq (str): Frequency of the time series (e.g., 'D', 'W', 'M', 'Q', 'A').

        Returns:
            int: Estimated annual frequency (365 for daily, 52 for weekly, 12 for monthly, etc.).
        """
        # Convertimos la frecuencia a mayúsculas para evitar errores por diferencia de capitalización
        freq = freq.upper()

        # Verificamos si contiene la letra clave para cada periodo
        if "D" in freq:
            return 365  # Diario
        elif "W" in freq:
            return 52  # Semanal
        elif "M" in freq:
            return 12  # Mensual
        elif "Q" in freq:
            return 4   # Trimestral
        elif "A" in freq or "Y" in freq:  # "A" para anual, "Y" si usa "Yearly"
            return 1   # Anual
        else:
            return 1   # Valor por defecto: Anual si no se reconoce

        
    def generate_ts_combined(self):
    
        time_series_merged = self._process_time_series()
        time_series_merged = time_series_merged.set_index("Date")

        freq = self._infer_frequency(self.freq_hf)
        start_value = self._infer_start_value(time_series_merged.index, freq)

        # Generar el indicador cíclico
        time_series_merged["Indicator"] = self._generate_indicator(len(time_series_merged), freq, start_value)

        # Interpolación de valores faltantes (sin inplace)
        time_series_merged["Low_freq_y"] = time_series_merged["Low_freq_y"].interpolate(method="linear")

        # Rellenar valores NaN restantes (sin inplace)
        time_series_merged["Low_freq_y"] = time_series_merged["Low_freq_y"].ffill()
        time_series_merged["Low_freq_y"] = time_series_merged["Low_freq_y"].bfill()

        return time_series_merged


# 📆 Corregir la cantidad de valores para que coincidan con las fechas

start_date_hf = "2001-01-01"
end_date_hf = "2024-01-01"

start_date_lf = "2001-01-01"
end_date_lf = "2024-01-01"

freq_hf = "ME"
freq_lf = "YE"

# Generar correctamente los valores de alta frecuencia (mensual)
num_months = pd.date_range(start_date_hf, end_date_hf, freq=freq_hf).shape[0]
num_values_hf = np.linspace(100, 350, num_months)  # Generar valores progresivos

# Generar correctamente los valores de baja frecuencia (anual)
num_years = pd.date_range(start_date_lf, end_date_lf, freq=freq_lf).shape[0]
num_values_lf = np.linspace(150, 500, num_years)  # Valores anuales

# 📊 Convertir a pandas Series
ts_high_freq = pd.Series(num_values_hf)
ts_low_freq = pd.Series(num_values_lf)

# 🔹 Crear instancia de TimeSeriesUnifier con las series corregidas
unifier = TimeSeriesUnifier(ts_high_freq, start_date_hf, end_date_hf, freq_hf,
                            ts_low_freq, start_date_lf, end_date_lf, freq_lf)

# Procesar la serie unificada
time_series_merged = unifier.generate_ts_combined()

In [5]:
time_series_merged 

Unnamed: 0_level_0,High_freq_y,Low_freq_y,Indicator
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2001-01-31,100.000000,150.000000,1
2001-02-28,100.909091,150.000000,2
2001-03-31,101.818182,150.000000,3
2001-04-30,102.727273,150.000000,4
2001-05-31,103.636364,150.000000,5
...,...,...,...
2023-08-31,346.363636,494.696970,8
2023-09-30,347.272727,496.022727,9
2023-10-31,348.181818,497.348485,10
2023-11-30,349.090909,498.674242,11


In [165]:
import pandas as pd
import numpy as np


class TimeSeriesProcessor:
    """
    Class to process, interpolate, and merge time series of different frequencies.
    """

    def __init__(self, ts_high_freq, start_date_hf, end_date_hf, freq_hf,
                 ts_low_freq, start_date_lf, end_date_lf, freq_lf, interp_method='nearest'):
        """
        Initializes the class with high- and low-frequency time series.

        :param ts_high_freq: High-frequency time series (array or list).
        :param start_date_hf: Start date of the high-frequency series (str or datetime).
        :param end_date_hf: End date of the high-frequency series (str or datetime).
        :param freq_hf: Frequency of the high-frequency series (e.g., 'D' for daily, 'H' for hourly).
        :param ts_low_freq: Low-frequency time series (array or list).
        :param start_date_lf: Start date of the low-frequency series (str or datetime).
        :param end_date_lf: End date of the low-frequency series (str or datetime).
        :param freq_lf: Frequency of the low-frequency series (e.g., 'M' for monthly, 'Q' for quarterly).
        :param interp_method: Interpolation method for missing values (default is 'nearest').
        """
        self.freq_hf = freq_hf  # Stores the frequency of the high-frequency series
        self.freq_lf = freq_lf  # Stores the frequency of the low-frequency series
        self.interp_method = interp_method  # Defines the interpolation method

        # Create DataFrames for time series using a helper method
        self.ts_high_freq = self._create_time_series(ts_high_freq, start_date_hf, end_date_hf, freq_hf, "High_freq")
        self.ts_low_freq = self._create_time_series(ts_low_freq, start_date_lf, end_date_lf, freq_lf, "Low_freq")

        # Ensure both time series were successfully created
        if self.ts_high_freq is None or self.ts_low_freq is None:
            raise ValueError("Error in creating time series.")

    def _create_time_series(self, values, start_date, end_date, freq, col_name):
        """
        Creates a pandas DataFrame with a time series.

        :param values: List of numerical values.
        :param start_date: Start date of the time series.
        :param end_date: End date of the time series.
        :param freq: Frequency of the time series.
        :param col_name: Name of the values column.
        :return: Pandas DataFrame with 'Date' and the time series.
        """
        try:
            # Generate date range based on start, end, and frequency
            dates = pd.date_range(start=start_date, end=end_date, freq=freq)

            # Ensure the number of generated dates matches the number of values
            if len(dates) != len(values):
                raise ValueError(f"Error: Number of values does not match generated dates for {col_name}.")

            # Create DataFrame with dates and values
            df = pd.DataFrame({'Date': dates, col_name: values})
            return df
        except Exception as e:
            print(f"Error creating time series {col_name}: {e}")
            return None

    def _interpolate_and_fill(self, series):
        """
        Applies interpolation and fills missing values.

        :param series: Pandas Series with possible NaN values.
        :return: Interpolated Series without NaN values.
        """
        try:
            # Apply interpolation using the defined method and fill missing values forward and backward
            series = series.interpolate(method=self.interp_method).ffill().bfill()
            return series
        except Exception as e:
            print(f"Error in interpolation and filling: {e}")
            return None

    def _infer_frequency(self, freq):
        """
        Infers the number of periods per year based on the series frequency.

        :param freq: Frequency of the time series (e.g., 'D', 'W', 'M', 'Q', 'A').
        :return: Number of periods per year.
        """
        # Mapping different time frequencies to their corresponding periods per year
        freq_map = {"D": 365, "W": 52, "M": 12, "Q": 4, "A": 1}
        return freq_map.get(freq[0].upper(), 1)  # Default to 1 if not found

    def _infer_start_value(self, date_index, periods_per_year):
        """
        Infers the initial value of the cyclic indicator based on the date.

        :param date_index: Date index of the time series.
        :param periods_per_year: Number of periods per year.
        :return: Initial value of the cyclic indicator.
        """
        try:
            first_date = date_index[0]  # Retrieve the first date of the series

            # Assign the starting value based on the frequency
            if periods_per_year == 365:
                return first_date.timetuple().tm_yday  # Day of the year
            elif periods_per_year == 52:
                return first_date.isocalendar()[1]  # ISO week number
            elif periods_per_year == 12:
                return first_date.month  # Month number
            elif periods_per_year == 4:
                return (first_date.month - 1) // 3 + 1  # Quarter of the year
            else:
                return 1  # Default value if none match
        except Exception as e:
            print(f"Error in _infer_start_value: {e}")
            return 1  # Return a default value in case of failure

    def _generate_indicator(self, num_obs, periods_per_year, start_value):
        """
        Generates a cyclic indicator based on the number of periods per year.

        :param num_obs: Total number of observations.
        :param periods_per_year: Expected periods per year.
        :param start_value: Initial value of the indicator.
        :return: Vector with the cyclic indicator.
        """
        try:
            # Adjust the start value to ensure it fits within the cycle
            start_value = (start_value - 1) % periods_per_year + 1

            # Create a cyclic pattern by repeating values in the expected period
            return (np.arange(num_obs) + start_value - 1) % periods_per_year + 1
        except Exception as e:
            print(f"Error in _generate_indicator: {e}")
            return np.array([])  # Return an empty array in case of failure

    def process_and_merge_series(self):
        """
        Processes and integrates high- and low-frequency time series.

        :return: Pandas DataFrame with the combined series.
        """
        # Merge high- and low-frequency series using a left join on the date
        merged_df = pd.merge(self.ts_high_freq, self.ts_low_freq, on="Date", how="left")

        # Interpolate and fill missing values in the low-frequency series
        merged_df["Low_freq"] = self._interpolate_and_fill(merged_df["Low_freq"])

        # Infer the number of periods per year based on high-frequency data
        periods_per_year = self._infer_frequency(self.freq_hf)

        # Determine the starting value of the cyclic indicator
        start_value = self._infer_start_value(merged_df["Date"], periods_per_year)

        # Generate the cyclic indicator based on inferred parameters
        merged_df["Indicator"] = self._generate_indicator(len(merged_df), periods_per_year, start_value)

        # Extract year from the date column and rename it as 'index'
        merged_df["Year"] = pd.to_numeric(merged_df["Date"].dt.year).astype(int)

        # Rename columns to match expected output format
        merged_df = merged_df.rename(columns={"Year": "Index",
                                              "High_freq": "X",
                                              "Low_freq": "y",
                                              "Indicator": "Grain"})

        # Return only the required columns
        return merged_df[['Index', 'Grain', 'X', 'y']]


# 📆 Definir fechas y frecuencias
start_date_hf = "2001-01-01"
end_date_hf = "2024-01-01"
start_date_lf = "2001-01-01"
end_date_lf = "2024-01-01"
freq_hf = "ME"  # Mensual
freq_lf = "YE"  # Anual

# Generar valores de alta y baja frecuencia
num_months = pd.date_range(start_date_hf, end_date_hf, freq=freq_hf).shape[0]
num_years = pd.date_range(start_date_lf, end_date_lf, freq=freq_lf).shape[0]
values_hf = np.linspace(100, 350, num_months)
values_lf = np.linspace(150, 500, num_years)

# Instanciar y procesar la serie unificada
processor = TimeSeriesProcessor(values_hf, start_date_hf, end_date_hf, freq_hf,
                                values_lf, start_date_lf, end_date_lf, freq_lf)
merged_series = processor.process_and_merge_series()

In [166]:
merged_series

Unnamed: 0,Index,Grain,X,y
0,2001,1,100.000000,150.0
1,2001,2,100.909091,150.0
2,2001,3,101.818182,150.0
3,2001,4,102.727273,150.0
4,2001,5,103.636364,150.0
...,...,...,...,...
271,2023,8,346.363636,500.0
272,2023,9,347.272727,500.0
273,2023,10,348.181818,500.0
274,2023,11,349.090909,500.0


In [None]:
class TemporalDisaggregation:
    """
    Class for temporal disaggregation of time series data using various statistical methods.
    """

    def __init__(self, conversion="sum", min_rho_boundarie=-0.9, max_rho_boundarie=0.99, apply_adjustment = True):
        """
        Initializes the TemporalDisaggregation class with parameters for disaggregation.

        Parameters:
            conversion (str): Specifies the type of aggregation method to ensure consistency during disaggregation.
                             Options include:
                                - "sum": Ensures that the sum of the disaggregated values matches the low-frequency series.
                                - "average": Ensures the average value remains consistent.
                                - "first": Preserves the first observed value in each aggregated period.
                                - "last": Maintains the last observed value in each aggregated period.

            min_rho_boundarie (float): The minimum allowed value for the autoregressive parameter (rho).
                                       This is used to constrain the estimation process to avoid instability.

            max_rho_boundarie (float): The maximum allowed value for the autoregressive parameter (rho).
                                       It prevents the estimation from diverging or producing unreliable results.
            
            apply_adjustment (bool): The bool value that reflects whether the series must be corrected or not.
                                       Negative values must be transformed.

        Attributes:
            self.conversion (str): Stores the specified conversion method for future computations.
            self.min_rho_boundarie (float): Lower bound for rho to ensure a stable disaggregation process.
            self.max_rho_boundarie (float): Upper bound for rho to prevent extreme values.
            self.apply_adjustment (bool): Boolean for negative values adjustment
        """

        # Store the chosen conversion method for disaggregation
        self.conversion = conversion  

        # Set the lower boundary for the autoregressive parameter (rho)
        self.min_rho_boundarie = min_rho_boundarie  

        # Set the upper boundary for the autoregressive parameter (rho)
        self.max_rho_boundarie = max_rho_boundarie 

        # Set the negative values adjustment
        self.apply_adjustment = apply_adjustment


    def build_conversion_matrix(self, df):
        """
        Constructs a conversion matrix to map high-frequency data to low-frequency data.

        This matrix ensures that the disaggregated series maintains consistency with the 
        specified aggregation method.

        Parameters:
            df (pd.DataFrame): A DataFrame containing time series data with "Index" and "Grain" columns.

        Returns:
            np.ndarray: Conversion matrix for temporal disaggregation.
        """

        def get_conversion_vector(size, conversion):
            """
            Generates a conversion vector based on the specified aggregation method.

            Parameters:
                size (int): The number of high-frequency observations corresponding to a single low-frequency period.
                conversion (str): The method of aggregation ('sum', 'average', 'first', 'last').

            Returns:
                np.ndarray: A vector that defines how high-frequency data should be aggregated.
            """
            if conversion == "sum":
                return np.ones(size)  # Assigns equal weight to all values to preserve the sum.
            elif conversion == "average":
                return np.ones(size) / size  # Distributes weights equally to maintain the average.
            elif conversion == "first":
                vec = np.zeros(size)
                vec[0] = 1  # Assigns weight only to the first observation.
                return vec
            elif conversion == "last":
                vec = np.zeros(size)
                vec[-1] = 1  # Assigns weight only to the last observation.
                return vec
            raise ValueError("Invalid method in conversion.")  # Ensures an error is raised for unsupported methods.

        # Extract unique (index, grain) combinations, ensuring order consistency
        unique_combinations = df[["Index", "Grain"]].drop_duplicates().sort_values(["Index", "Grain"])

        # Get unique low-frequency index values
        unique_indexes = unique_combinations["Index"].unique()
        n_l = len(unique_indexes)  # Number of unique low-frequency periods

        # Initialize an empty conversion matrix with dimensions (low-frequency periods x total observations)
        C = np.zeros((n_l, len(df)))

        # Populate the conversion matrix by iterating through unique low-frequency indices
        for i, idx in enumerate(unique_indexes):
            mask = (df["Index"] == idx).values  # Boolean mask for high-frequency observations corresponding to idx
            num_valid = np.sum(mask)  # Count the number of high-frequency observations in the current period

            # Assign appropriate conversion weights using the selected aggregation method
            C[i, mask] = get_conversion_vector(num_valid, self.conversion)

        return C  # Returns the constructed conversion matrix

    def denton_estimation(self, y_l, X, C, h=1):
        """
        Performs Denton temporal disaggregation.

        This method minimizes distortions by preserving the movement of the 
        high-frequency indicator while ensuring consistency with the low-frequency data.

        Parameters:
            y_l (np.ndarray): Low-frequency time series.
            X (np.ndarray): High-frequency indicator series.
            C (np.ndarray): Conversion matrix.
            h (int, optional): Degree of differencing (0 for levels, 1 for first differences, etc.).

        Returns:
            np.ndarray: The estimated high-frequency series.
        """

        try:
            n = len(X)  # Number of high-frequency observations

            # Preprocess inputs to ensure proper formatting and dimensions
            y_l, X, C = self.preprocess_inputs(y_l, X, C)

            # Construct the differencing matrix (D) to compute differences in time series
            D = np.eye(n) - np.diag(np.ones(n - 1), -1)  # First-order difference matrix (D)

            # Apply differencing according to the specified degree (h)
            # If h = 0, no differencing is applied (identity matrix is used)
            D_h = np.linalg.matrix_power(D, h) if h > 0 else np.eye(n)

            # Compute the inverse covariance matrix (Σ_D) using the pseudoinverse
            # This helps in controlling the smoothness of the high-frequency series
            Sigma_D = pinv(D_h.T @ D_h)

            # Compute the Denton adjustment matrix (D_matrix) 
            # This maps residual adjustments from low-frequency to high-frequency
            D_matrix = Sigma_D @ C.T @ pinv(C @ Sigma_D @ C.T)

            # Compute residuals (discrepancies between actual low-frequency values and aggregated high-frequency data)
            u_l = y_l - C @ X

            # Adjust the high-frequency series using the computed transformation matrix
            return X + D_matrix @ u_l

        except:
            print(f"Error in Denton estimation")
            return None  # Return None in case of an error


    def chow_lin_estimation(self, y_l, X, C, rho=0.5):
        """
        Performs Chow-Lin temporal disaggregation.

        This method estimates high-frequency values based on a regression approach
        with an autoregressive process.

        Parameters:
            y_l (np.ndarray): Low-frequency time series.
            X (np.ndarray): High-frequency indicator series.
            C (np.ndarray): Conversion matrix.
            rho (float, optional): Autoregressive parameter for residuals.

        Returns:
            np.ndarray: The estimated high-frequency series.
        """
        try:
            n = len(X)  # Number of high-frequency observations

            # Preprocess inputs to ensure proper formatting and dimensions
            y_l, X, C = self.preprocess_inputs(y_l, X, C)

            # Ensure the autoregressive parameter rho is within the allowed boundaries
            rho = np.clip(rho, self.min_rho_boundarie, self.max_rho_boundarie)

            # Construct the covariance matrix (Σ_CL) for the autoregressive process
            # This models the dependency structure of the high-frequency residuals
            Sigma_CL = (1 / (1 - rho**2)) * toeplitz((rho ** np.arange(n)).ravel())

            # Compute the variance-covariance matrix of the aggregated series
            Q = C @ Sigma_CL @ C.T  # Q = C * Σ_CL * C'

            # Compute the pseudo-inverse of Q to handle potential singularity issues
            inv_Q = pinv(Q)

            # Estimate the regression coefficients (β) using Generalized Least Squares (GLS)
            # β = (X' C' Q^-1 C X)^-1 * (X' C' Q^-1 y_l)
            beta = solve(X.T @ C.T @ inv_Q @ C @ X, X.T @ C.T @ inv_Q @ y_l).reshape(-1, 1)

            # Reshape X to ensure matrix compatibility
            X = X.reshape(-1, 1)

            # Compute the preliminary high-frequency estimate (p = X * β)
            p = X @ beta

            # Compute the Denton-like distribution matrix (D) that adjusts for residuals
            # D = Σ_CL * C' * Q^-1
            D = Sigma_CL @ C.T @ inv_Q

            # Compute the low-frequency residuals (u_l = y_l - C * p)
            u_l = y_l.reshape(-1, 1) - C @ p

            # Final high-frequency estimate by adjusting the preliminary estimate with residuals
            return p + D @ u_l

        except:
            print(f"Error in Chow Lin estimation")
            return None  # Return None in case of an error


    def litterman_estimation(self, y_l, X, C, rho=0.5):
        """
        Implements the Litterman method for temporal disaggregation.

        This approach extends the Chow-Lin method by incorporating a random-walk structure 
        in the residuals, allowing for better handling of non-stationary series.

        Parameters:
            y_l (np.ndarray): Low-frequency time series.
            X (np.ndarray): High-frequency indicator series.
            C (np.ndarray): Conversion matrix.
            rho (float, optional): Autoregressive parameter.

        Returns:
            np.ndarray: The estimated high-frequency series.
        """
        try:
            n = len(X)  # Number of high-frequency observations

            # Preprocess inputs to ensure proper formatting and dimensions
            y_l, X, C = self.preprocess_inputs(y_l, X, C)

            # Ensure the autoregressive parameter rho is within the allowed range
            rho = np.clip(rho, self.min_rho_boundarie, self.max_rho_boundarie)

            # Construct the Litterman transformation matrix (H)
            # This incorporates the random-walk structure into the residuals
            H = np.eye(n) - np.diag(np.ones(n - 1), -1) * rho

            # Compute the inverse covariance matrix (Σ_L), ensuring numerical stability using pinv()
            # Σ_L = (H' H)^-1
            Sigma_L = pinv(H.T @ H)

            # Compute the variance-covariance matrix for the aggregated series
            Q = C @ Sigma_L @ C.T  # Q = C * Σ_L * C'

            # Compute the pseudo-inverse of Q to handle singular matrices
            inv_Q = pinv(Q)

            # Estimate the regression coefficients (β) using Generalized Least Squares (GLS)
            # β = (X' C' Q^-1 C X)^-1 * (X' C' Q^-1 y_l)
            beta = solve(X.T @ C.T @ inv_Q @ C @ X, X.T @ C.T @ inv_Q @ y_l).reshape(-1, 1)

            # Reshape X to ensure matrix compatibility
            X = X.reshape(-1, 1)

            # Compute the preliminary high-frequency estimate (p = X * β)
            p = X @ beta

            # Compute the adjustment matrix (D) to refine the high-frequency series
            # D = Σ_L * C' * Q^-1
            D = Sigma_L @ C.T @ inv_Q

            # Compute the low-frequency residuals (u_l = y_l - C * p)
            u_l = y_l.reshape(-1, 1) - C @ p

            # Final high-frequency estimate by adjusting the preliminary estimate with residuals
            return p + D @ u_l

        except:
            print(f"Error in Litterman estimation")
            return None  # Return None in case of an error


    def fernandez_estimation(self, y_l, X, C):
        """
        Uses the Fernandez method for temporal disaggregation.

        This method is a special case of the Litterman approach where 
        the autoregressive parameter is set to zero, modeling residuals 
        as a simple random walk.

        Parameters:
            y_l (np.ndarray): Low-frequency time series.
            X (np.ndarray): High-frequency indicator series.
            C (np.ndarray): Conversion matrix.

        Returns:
            np.ndarray: The estimated high-frequency series.
        """
        try:
            n = len(X)  # Number of high-frequency observations

            # Construct the first-difference operator matrix (Δ)
            # Δ transforms the series into first differences, enforcing smoothness
            Delta = np.eye(n) - np.diag(np.ones(n - 1), -1)

            # Compute the covariance matrix (Σ_F) for a random walk process
            # Σ_F = (Δ' Δ)^-1 ensures residuals follow a simple random walk
            Sigma_F = np.linalg.inv(Delta.T @ Delta)

            # Compute the variance-covariance matrix for the aggregated series
            Q = C @ Sigma_F @ C.T  # Q = C * Σ_F * C'

            # Compute the inverse of Q to ensure numerical stability
            inv_Q = np.linalg.inv(Q)

            # Estimate the regression coefficients (β) using Generalized Least Squares (GLS)
            # β = (X' C' Q^-1 C X)^-1 * (X' C' Q^-1 y_l)
            beta = solve(X.T @ C.T @ inv_Q @ C @ X, X.T @ C.T @ inv_Q @ y_l).reshape(-1, 1)

            # Compute the preliminary high-frequency estimate (p = X * β)
            p = X.reshape(-1, 1) @ beta

            # Compute the adjustment matrix (D) to refine the high-frequency series
            # D = Σ_F * C' * Q^-1
            D = Sigma_F @ C.T @ inv_Q

            # Compute the low-frequency residuals (u_l = y_l - C * p)
            u_l = y_l.reshape(-1, 1) - C @ p

            # Final high-frequency estimate by adjusting the preliminary estimate with residuals
            return (p + D @ u_l).flatten()

        except:
            print(f"Error in Fernandez estimation")
            return None  # Return None in case of an error


    def ols_estimation(self, y_l, X, C):
        """
        Applies Ordinary Least Squares (OLS) regression for temporal disaggregation.

        This method assumes a simple linear relationship between the low-frequency 
        data and the high-frequency indicators without considering autocorrelation.

        Parameters:
            y_l (np.ndarray): Low-frequency time series.
            X (np.ndarray): High-frequency indicator series.
            C (np.ndarray): Conversion matrix.

        Returns:
            np.ndarray: The estimated high-frequency series.
        """
        try:
            # Ensure y_l and X are treated as 2D column vectors
            # Preprocess inputs to ensure proper formatting and dimensions
            y_l, X, C = self.preprocess_inputs(y_l, X, C)

            # Aggregate the high-frequency indicator using the conversion matrix C
            X_l = np.atleast_2d(C @ X)  # X_l represents the aggregated high-frequency data

            # Compute the OLS regression coefficients (β) using the pseudo-inverse
            # β = (X_l' X_l)^-1 * X_l' y_l
            beta = pinv(X_l.T @ X_l) @ X_l.T @ y_l

            # Estimate the high-frequency values (ŷ = X * β)
            y_hat = X @ beta

            # Flatten the output to return a 1D array
            return y_hat.flatten()

        except:
            print(f"Error in OLS estimation")
            return None  # Return None in case of an error
       
        
    def fast_estimation(self, y_l, X, C):
        """
        Provides a fast approximation of Chow-Lin estimation.

        This method uses a fixed high autoregressive parameter and is computationally 
        efficient, closely replicating Denton-Cholette smoothing.

        Parameters:
            y_l (np.ndarray): Low-frequency time series.
            X (np.ndarray): High-frequency indicator series.
            C (np.ndarray): Conversion matrix.

        Returns:
            np.ndarray: The estimated high-frequency series.
        """
        try:
            rho = 0.9  # Fixed high autoregressive parameter for efficient computation
            n = len(X)  # Number of high-frequency observations

            # Ensure y_l and X are treated as column vectors
            # Preprocess inputs to ensure proper formatting and dimensions
            y_l, X, C = self.preprocess_inputs(y_l, X, C)

            # Construct the transformation matrix (H)
            # This matrix incorporates a high degree of autoregression (ρ = 0.9)
            H = np.eye(n) - np.diag(np.ones(n - 1), -1) * rho

            # Compute the covariance matrix (Σ_F) for the process
            # Σ_F = (H' H)^-1 ensures smooth transition in the estimated series
            Sigma_F = pinv(H.T @ H)

            # Compute the variance-covariance matrix for the aggregated series
            Q = C @ Sigma_F @ C.T  # Q = C * Σ_F * C'

            # Compute the pseudo-inverse of Q to ensure numerical stability
            inv_Q = pinv(Q)

            # Estimate the regression coefficients (β) using Generalized Least Squares (GLS)
            # β = (X' C' Q^-1 C X)^-1 * (X' C' Q^-1 y_l)
            beta = solve(X.T @ C.T @ inv_Q @ C @ X, X.T @ C.T @ inv_Q @ y_l)

            # Compute the preliminary high-frequency estimate (p = X * β)
            p = X @ beta

            # Compute the adjustment matrix (D) to refine the high-frequency series
            # D = Σ_F * C' * Q^-1
            D = Sigma_F @ C.T @ inv_Q

            # Compute the low-frequency residuals (u_l = y_l - C * p)
            u_l = y_l - C @ p

            # Final high-frequency estimate by adjusting the preliminary estimate with residuals
            y_hat = p + D @ u_l

            # Flatten the output to return a 1D array
            return y_hat.flatten()

        except:
            print(f"Error in Fast estimation")
            return None  # Return None in case of an error


    def power_matrix_calculation(self, n):
        """
        Computes a power matrix used in autoregressive modeling.

        This matrix captures dependencies between different time periods 
        to model the persistence of the series.

        Parameters:
            n (int): Number of time periods.

        Returns:
            np.ndarray: Power matrix for autoregressive modeling.
        """
        # Generate a matrix where each entry (i, j) represents |i - j|
        # This encodes the absolute distance between time periods
        # and is useful in modeling autoregressive dependencies.
        return np.abs(np.subtract.outer(np.arange(n), np.arange(n)))


    def q_calculation(self, rho, pm):
        """
        Computes the covariance matrix for an autoregressive process.

        This matrix is used in regression-based disaggregation methods 
        to model the correlation between observations.

        Parameters:
            rho (float): Autoregressive parameter.
            pm (np.ndarray): Power matrix representing time dependencies.

        Returns:
            np.ndarray: Covariance matrix for the autoregressive process.
        """
        epsilon = 1e-6  # Small constant to prevent division by zero or instability

        # Ensure rho is within the valid range to maintain numerical stability
        rho = np.clip(rho, self.min_rho_boundarie, self.max_rho_boundarie)

        # Compute the scaling factor for the covariance matrix
        # This ensures proper normalization to prevent over-scaling of the model
        factor = 1 / (1 - rho**2 + epsilon)

        # Compute the covariance matrix using the power matrix (pm)
        # Each entry represents rho raised to the power of the absolute difference in time steps
        Q = factor * (rho ** pm)

        return Q

        
    def q_lit_calculation(self, X, rho=0):
        """
        Computes the pseudo-variance-covariance matrix for the Litterman method.

        This matrix incorporates an autoregressive structure if specified.

        Parameters:
            X (np.ndarray): High-frequency indicator series.
            rho (float, optional): Autoregressive parameter. Defaults to 0 (no autoregression).

        Returns:
            np.ndarray: Pseudo-variance-covariance matrix.
        """
        n = X.shape[0]  # Number of high-frequency observations
        epsilon = 1e-8  # Small constant to improve numerical stability

        # Construct the transformation matrix (H) incorporating the autoregressive parameter (ρ)
        # H introduces the autoregressive component in the variance structure
        H = np.eye(n) - np.diag(np.ones(n - 1), -1) * rho

        # Construct the first-difference operator matrix (D)
        # D applies a first-difference transformation, ensuring smoothness in estimates
        D = np.eye(n) - np.diag(np.ones(n - 1), -1)

        # Compute the pseudo-variance-covariance matrix (Q_Lit)
        # This matrix captures dependencies in the autoregressive process
        Q_Lit = D.T @ H.T @ H @ D

        try:
            # Compute the inverse of Q_Lit with regularization (adds ε * I to avoid singularity)
            Q_Lit_inv = np.linalg.inv(Q_Lit + np.eye(n) * epsilon)
        except np.linalg.LinAlgError:
            # If the matrix is singular, use the Moore-Penrose pseudo-inverse
            Q_Lit_inv = np.linalg.pinv(Q_Lit)

        return Q_Lit_inv


    def rho_optimization(self, y_l, X, C, method="maxlog"):
        """
        Finds the optimal autoregressive parameter (rho).

        This is done by maximizing the likelihood function or minimizing 
        the residual sum of squares, which is crucial for Chow-Lin and Litterman methods.

        Parameters:
            y_l (np.ndarray): Low-frequency time series.
            X (np.ndarray): High-frequency indicator series.
            C (np.ndarray): Conversion matrix.
            method (str, optional): Optimization criterion. Options: "maxlog" (maximize log-likelihood) 
                                    or "minrss" (minimize residual sum of squares).

        Returns:
            float: Optimal autoregressive parameter rho.
        """
        # Compute the aggregated high-frequency series using the conversion matrix
        X_l = np.atleast_2d(C @ X)

        # Compute the power matrix, which captures time dependencies
        pm = self.power_matrix_calculation(X.shape[0])

        # Ensure input dimensions are properly formatted before optimization
        y_l, X, C = self.preprocess_inputs(y_l, X, C)

        def objective(rho):
            """
            Defines the optimization objective function for rho.

            The function either maximizes the log-likelihood (maxlog) 
            or minimizes the residual sum of squares (minrss).
            """
            # Ensure rho is within the specified boundaries
            if not (self.min_rho_boundarie < rho < self.max_rho_boundarie):
                return np.inf  # Return infinity to prevent invalid rho values

            # Compute the covariance matrix Q for the autoregressive process
            Q = self.q_calculation(rho, pm)

            # Compute the variance-covariance matrix for the aggregated series
            vcov = C @ Q @ C.T

            # Compute the inverse of vcov, adding a small regularization term for stability
            inv_vcov = pinv(vcov + np.eye(vcov.shape[0]) * 1e-8)

            # Ensure that the dimensions match for matrix operations
            if X_l.shape[0] != inv_vcov.shape[0]:
                return np.inf  # Return infinity to prevent errors

            # Compute (X' C' inv_vcov C X), ensuring it's a square matrix
            XTX = X_l.T @ inv_vcov @ X_l
            if XTX.shape[0] != XTX.shape[1]:
                return np.inf  # Return infinity if matrix inversion is not feasible

            # Estimate regression coefficients (β) using the Generalized Least Squares (GLS) formula
            beta = pinv(XTX) @ X_l.T @ inv_vcov @ y_l

            # Compute residuals (difference between observed and estimated values)
            u_l = y_l - X_l @ beta

            # Choose the optimization method: log-likelihood maximization or residual minimization
            if method == "maxlog":
                # Maximize log-likelihood: minimize its negative
                return -(-0.5 * (np.log(np.abs(np.linalg.det(vcov)) + 1e-8) + u_l.T @ inv_vcov @ u_l))
            elif method == "minrss":
                # Minimize the residual sum of squares
                return u_l.T @ inv_vcov @ u_l
            else:
                raise ValueError("Invalid method for rho calculation")

        # Perform bounded scalar optimization within the defined range of rho
        opt_result = minimize_scalar(objective, bounds=(self.min_rho_boundarie, self.max_rho_boundarie), method="bounded")

        # Return the optimal rho value
        return opt_result.x

    
    def litterman_opt_estimation(self, y_l, X, C):
        """
        Implements the optimized Litterman method.

        This method estimates the best autoregressive parameter before performing 
        disaggregation, refining the standard Litterman approach for better accuracy.

        Parameters:
            y_l (np.ndarray): Low-frequency time series.
            X (np.ndarray): High-frequency indicator series.
            C (np.ndarray): Conversion matrix.

        Returns:
            np.ndarray: The estimated high-frequency series.
        """
        try:
            # Preprocess inputs to ensure proper formatting and dimensions
            y_l, X, C = self.preprocess_inputs(y_l, X, C)

            # Validate that the number of columns in C matches the number of rows in X
            if C.shape[1] != X.shape[0]:
                return None  # Return None if dimensions are incompatible

            # Compute the aggregated high-frequency series using the conversion matrix
            X_l = np.atleast_2d(C @ X)

            # Optimize the autoregressive parameter (rho) by minimizing residual sum of squares (minrss)
            rho_opt = self.rho_optimization(y_l, X, C, method="minrss")

            # Compute the variance-covariance matrix (Q_Lit) for the optimized Litterman method
            Q_Lit = self.q_lit_calculation(X, rho_opt)

            # Compute the variance-covariance matrix for the aggregated series
            vcov = C @ Q_Lit @ C.T

            # Compute the pseudo-inverse of vcov, adding a small regularization term for numerical stability
            inv_vcov = pinv(vcov + np.eye(vcov.shape[0]) * 1e-8)

            # Validate dimensions to prevent computational errors
            if X_l.shape[0] != inv_vcov.shape[0]:
                return None  # Return None if dimensions mismatch

            # Compute (X' C' inv_vcov C X), ensuring it is a square matrix before inversion
            XTX = X_l.T @ inv_vcov @ X_l
            if XTX.shape[0] != XTX.shape[1]:
                return None  # Return None if matrix is not square

            # Estimate regression coefficients (β) using Generalized Least Squares (GLS)
            # β = (X' C' Q^-1 C X)^-1 * (X' C' Q^-1 y_l)
            beta = pinv(XTX) @ X_l.T @ inv_vcov @ y_l

            # Compute the preliminary high-frequency estimate (p = X * β)
            p = X @ beta

            # Compute the adjustment matrix (D) to refine the high-frequency series
            # D = Q_Lit * C' * Q^-1
            D = Q_Lit @ C.T @ inv_vcov

            # Compute the low-frequency residuals (u_l = y_l - C * p)
            u_l = y_l - C @ p

            # Final high-frequency estimate by adjusting the preliminary estimate with residuals
            y_hat = p + D @ u_l

            # Flatten the output to return a 1D array
            return y_hat.flatten()
        except:
            print("Error in optimized Litterman")


    def chow_lin_opt_estimation(self, y_l, X, C):
        """
        Implements the optimized Chow-Lin method.

        This method estimates the best autoregressive parameter before performing 
        disaggregation, improving accuracy by tuning the autoregressive component.

        Parameters:
            y_l (np.ndarray): Low-frequency time series.
            X (np.ndarray): High-frequency indicator series.
            C (np.ndarray): Conversion matrix.

        Returns:
            np.ndarray: The estimated high-frequency series.
        """
        try:
            # Preprocess inputs to ensure proper formatting and dimensions
            y_l, X, C = self.preprocess_inputs(y_l, X, C)

            # Validate that the number of columns in C matches the number of rows in X
            if C.shape[1] != X.shape[0]:
                return None  # Return None if dimensions are incompatible

            # Compute the aggregated high-frequency series using the conversion matrix
            X_l = np.atleast_2d(C @ X)

            # Optimize the autoregressive parameter (rho) by maximizing the log-likelihood (maxlog)
            rho_opt = self.rho_optimization(y_l, X, C, method="maxlog")

            # Number of high-frequency observations
            n = X.shape[0]

            # Construct the autoregressive covariance matrix (Σ_CL)
            # This models the dependency structure of the high-frequency residuals
            Sigma_CL = (1 / (1 - rho_opt**2)) * toeplitz(np.ravel(rho_opt ** np.arange(n)))

            # Compute the variance-covariance matrix for the aggregated series
            Q = C @ Sigma_CL @ C.T

            # Ensure Q is a square matrix before inversion
            if Q.shape[0] != Q.shape[1]:
                return None  # Return None if dimensions are incorrect

            # Compute the pseudo-inverse of Q, adding a small regularization term for numerical stability
            inv_Q = pinv(Q + np.eye(Q.shape[0]) * 1e-8)

            # Validate dimensions to prevent computational errors
            if X_l.shape[0] != inv_Q.shape[0]:
                return None  # Return None if dimensions mismatch

            # Compute (X' C' inv_Q C X), ensuring it is a square matrix before inversion
            XTX = X_l.T @ inv_Q @ X_l
            if XTX.shape[0] != XTX.shape[1]:
                return None  # Return None if matrix is not square

            # Estimate regression coefficients (β) using Generalized Least Squares (GLS)
            # β = (X' C' Q^-1 C X)^-1 * (X' C' Q^-1 y_l)
            beta = pinv(XTX) @ X_l.T @ inv_Q @ y_l

            # Compute the preliminary high-frequency estimate (p = X * β)
            p = X @ beta

            # Compute the adjustment matrix (D) to refine the high-frequency series
            # D = Σ_CL * C' * Q^-1
            D = Sigma_CL @ C.T @ inv_Q

            # Compute the low-frequency residuals (u_l = y_l - C * p)
            u_l = y_l - C @ p

            # Final high-frequency estimate by adjusting the preliminary estimate with residuals
            y_hat = p + D @ u_l

            # Flatten the output to return a 1D array
            return y_hat.flatten()
        except:
            print("Error in optimized Litterman")

    
    def preprocess_inputs(self, y_l, X, C):
        """
        Preprocesses inputs for temporal disaggregation methods.

        This function ensures the correct shape and format of the input data.

        Parameters:
            y_l (np.ndarray): Low-frequency time series.
            X (np.ndarray): High-frequency indicator series.
            C (np.ndarray): Conversion matrix.

        Returns:
            tuple: Processed (y_l, X, C) as numpy arrays with the correct dimensions.
        """
        # Ensure y_l and X are treated as 2D column vectors
        y_l = np.atleast_2d(y_l).reshape(-1, 1)
        X = np.atleast_2d(X).reshape(-1, 1)

        # Validate that the number of rows in C matches the number of observations in y_l
        if C.shape[0] != y_l.shape[0]:
            raise ValueError(f"Shape mismatch: C.shape[0] ({C.shape[0]}) != y_l.shape[0] ({y_l.shape[0]})")

        # Validate that the number of columns in C matches the number of rows in X
        if C.shape[1] != X.shape[0]:
            raise ValueError(f"Shape mismatch: C.shape[1] ({C.shape[1]}) != X.shape[0] ({X.shape[0]})")

        # Return the processed inputs with corrected dimensions
        return y_l, X, C
    

    def chow_lin_minrss_ecotrim(self, y_l, X, C, rho=0.75):
        """
        Implements the Chow-Lin method with RSS minimization (Ecotrim).

        This method estimates high-frequency values by minimizing the 
        residual sum of squares (RSS) while preserving correlation structure.

        Parameters:
            y_l (np.ndarray): Low-frequency time series.
            X (np.ndarray): High-frequency indicator series.
            C (np.ndarray): Conversion matrix.
            rho (float): Autoregressive parameter.

        Returns:
            np.ndarray: Estimated high-frequency series.
        """
        # Preprocess inputs to ensure proper formatting and dimensions
        y_l, X, C = self.preprocess_inputs(y_l, X, C)
                                               
        n = X.shape[0]  # Number of high-frequency observations

        # Ensure rho is within valid boundaries
        rho = np.clip(rho, self.min_rho_boundarie, self.max_rho_boundarie)

        # Compute the correlation matrix R (instead of covariance matrix)
        # Toeplitz structure models time dependency with autoregressive correlation
        R = toeplitz(rho ** np.arange(n))

        # Compute the aggregated variance-covariance matrix
        Q = C @ R @ C.T  # Q = C * R * C'

        # Compute the pseudo-inverse for numerical stability
        inv_Q = pinv(Q + np.eye(Q.shape[0]) * 1e-8)

        # Generalized Least Squares (GLS) estimation of beta coefficients
        beta = pinv(X.T @ C.T @ inv_Q @ C @ X) @ X.T @ C.T @ inv_Q @ y_l

        # Compute the preliminary high-frequency estimate (p = X * β)
        p = X @ beta

        # Compute the adjustment matrix (D) to refine the high-frequency series
        # D = R * C' * Q^-1
        D = R @ C.T @ inv_Q

        # Compute the low-frequency residuals (u_l = y_l - C * p)
        u_l = y_l - C @ p

        # Final high-frequency estimate by adjusting the preliminary estimate with residuals
        return p + D @ u_l
    
    def chow_lin_minrss_quilis(self, y_l, X, C, rho=0.15):
        """
        Implements the Chow-Lin method with RSS minimization (Quilis approach).

        This method estimates high-frequency values by minimizing the 
        residual sum of squares (RSS) while scaling the correlation matrix.

        Parameters:
            y_l (np.ndarray): Low-frequency time series.
            X (np.ndarray): High-frequency indicator series.
            C (np.ndarray): Conversion matrix.
            rho (float): Autoregressive parameter.

        Returns:
            np.ndarray: Estimated high-frequency series.
        """
        # Preprocess inputs to ensure proper formatting and dimensions
        y_l, X, C = self.preprocess_inputs(y_l, X, C)

        n = X.shape[0]  # Number of high-frequency observations

        # Ensure rho is within valid boundaries
        rho = np.clip(rho, self.min_rho_boundarie, self.max_rho_boundarie)

        # Compute the scaled correlation matrix R
        # Unlike Ecotrim, Quilis scales the matrix with (1 / (1 - rho^2))
        epsilon = 1e-6 
        R = (1 / (1 - (rho + epsilon)**2)) * toeplitz(rho ** np.arange(n))

        # Compute the aggregated variance-covariance matrix
        Q = C @ R @ C.T  # Q = C * R * C'

        # Compute the pseudo-inverse for numerical stability
        inv_Q = pinv(Q + np.eye(Q.shape[0]) * 1e-8)

        # Generalized Least Squares (GLS) estimation of beta coefficients
        beta = pinv(X.T @ C.T @ inv_Q @ C @ X) @ X.T @ C.T @ inv_Q @ y_l

        # Compute the preliminary high-frequency estimate (p = X * β)
        p = X @ beta

        # Compute the adjustment matrix (D) to refine the high-frequency series
        # D = R * C' * Q^-1
        D = R @ C.T @ inv_Q

        # Compute the low-frequency residuals (u_l = y_l - C * p)
        u_l = y_l - C @ p

        # Final high-frequency estimate by adjusting the preliminary estimate with residuals
        return p + D @ u_l

    def predict(self, df, method, **kwargs):
        """
        General interface for performing temporal disaggregation.

        Selects and applies the appropriate estimation method.

        Parameters:
            method (str): The disaggregation method to use.
            y_l (np.ndarray): Low-frequency time series.
            X (np.ndarray): High-frequency indicator series.
            C (np.ndarray): Conversion matrix.
            **kwargs: Additional parameters for specific methods.

        Returns:
            np.ndarray: The estimated high-frequency series.
        """
        # Dictionary of available disaggregation methods
        all_methods = {
            "ols": self.ols_estimation,
            "denton": self.denton_estimation,
            "chow-lin": self.chow_lin_estimation,
            "litterman": self.litterman_estimation,
            "fernandez": self.fernandez_estimation,
            "fast": self.fast_estimation,
            "chow-lin-opt": self.chow_lin_opt_estimation,
            "litterman-opt": self.litterman_opt_estimation,
            "chow-lin-ecotrim": self.chow_lin_minrss_ecotrim,
            "chow-lin-quilis": self.chow_lin_minrss_quilis,
        }

        df_predicted = df.copy()

        # Build the conversion matrix
        C = self.build_conversion_matrix(df_predicted)

        # Extract low-frequency series (one observation per low-frequency period)
        y_l = df_predicted.groupby("Index")["y"].first().values

        # Extract high-frequency indicator series
        X = df_predicted["X"].values

        # Ensure the method is valid
        if method not in all_methods:
            raise ValueError(f"Method '{method}' is not supported. Available methods: {list(all_methods.keys())}")

        # Ensure input arrays are column vectors
        y_l = np.atleast_2d(y_l).reshape(-1, 1)
        X = np.atleast_2d(X).reshape(-1, 1)

        # Validate matrix dimensions before proceeding
        if C.shape[1] != X.shape[0]:
            raise ValueError(f"Shape mismatch: C.shape[1] ({C.shape[1]}) != X.shape[0] ({X.shape[0]})")
        if C.shape[0] != y_l.shape[0]:
            raise ValueError(f"Shape mismatch: C.shape[0] ({C.shape[0]}) != y_l.shape[0] ({y_l.shape[0]})")

        # Define the list of valid keyword arguments that methods can accept
        valid_args = ["h", "rho", "alpha", "weights"]

        # Filter only the valid arguments to pass to the method
        filtered_kwargs = {k: v for k, v in kwargs.items() if k in valid_args}

        try:
            # Execute the selected disaggregation method and return the estimated series
            y_hat = all_methods[method](y_l, X, C, **filtered_kwargs)
    
        except:
            try:
                # Execute the selected disaggregation method (first backup)
                warnings.warn("Error in disaggregation. FAST used as default method", UserWarning)
                y_hat = all_methods["fast"](y_l, X, C, **filtered_kwargs)
             
            except:
                # Execute the selected disaggregation method (second backup)
                warnings.warn("Error in disaggregation. OLS used as default method", UserWarning)
                y_hat = all_methods["ols"](y_l, X, C, **filtered_kwargs)

        df_predicted["y_hat"] = y_hat

        if self.apply_adjustment:
            df_predicted = self.adjust_negative_values(df_predicted)

        return df_predicted
    

    def adjust_negative_values(self, df):
        """
        Adjusts negative values in the predicted high-frequency series while preserving aggregation constraints.

        Parameters:
            df (pd.DataFrame): The dataframe containing the following columns:
                            ['Index', 'Grain', 'X', 'y', 'y_hat'].

        Returns:
            pd.DataFrame: The adjusted dataframe with non-negative values in 'y_hat'.
        """
        df_adjusted = df.copy()
        df_adjusted["y_hat_adj"] = df_adjusted["y_hat"].copy()
        
        # Identificar los índices con valores negativos en y_hat
        negative_indexes = df_adjusted[df_adjusted["y_hat"] < 0]["Index"].unique()
        
        for index in negative_indexes:
            group = df_adjusted[df_adjusted["Index"] == index].reset_index(drop=True)
            y_hat = group["y_hat"].values

            # Si no hay valores negativos, continuar con el siguiente grupo
            if (y_hat >= 0).all():
                continue
            
            if self.conversion in ["sum", "average"]:
                # Sumar todas las diferencias negativas para crear un factor de ajuste
                negative_sum = np.abs(y_hat[y_hat < 0].sum())
                
                # Obtener valores positivos y su suma
                positive_values = y_hat[y_hat > 0]
                positive_sum = positive_values.sum()

                if positive_sum > 0:
                    # Calcular las participaciones relativas de los valores positivos
                    weights = positive_values / positive_sum

                    # Aplicar ajuste proporcionalmente
                    y_hat[y_hat > 0] -= negative_sum * weights
                    
                    # Ajustar los valores negativos a un pequeño valor positivo si se requiere
                    y_hat[y_hat < 0] = 0

                else:
                    # Si no hay valores positivos, distribuir de manera uniforme
                    y_hat[:] = negative_sum / len(y_hat)

            elif self.conversion == "first":
                # Mantener el primer valor fijo
                first_value = y_hat[0]
                remaining_values = y_hat[1:]

                if remaining_values.sum() < 0:
                    remaining_values[:] = 0
                else:
                    negative_sum = np.abs(remaining_values[remaining_values < 0].sum())
                    positive_values = remaining_values[remaining_values > 0]
                    positive_sum = positive_values.sum()

                    if positive_sum > 0:
                        weights = positive_values / positive_sum
                        remaining_values[remaining_values > 0] -= negative_sum * weights
                    
                    remaining_values[remaining_values < 0] = 0
                
                y_hat[1:] = remaining_values
                y_hat[0] = first_value

            elif self.conversion == "last":
                # Mantener el último valor fijo
                last_value = y_hat[-1]
                remaining_values = y_hat[:-1]

                if remaining_values.sum() < 0:
                    remaining_values[:] = 0
                else:
                    negative_sum = np.abs(remaining_values[remaining_values < 0].sum())
                    positive_values = remaining_values[remaining_values > 0]
                    positive_sum = positive_values.sum()

                    if positive_sum > 0:
                        weights = positive_values / positive_sum
                        remaining_values[remaining_values > 0] -= negative_sum * weights
                    
                    remaining_values[remaining_values < 0] = 0
                
                y_hat[:-1] = remaining_values
                y_hat[-1] = last_value
            
            # Reasignar los valores ajustados
            df_adjusted.loc[df_adjusted["Index"] == index, "y_hat_adj"] = y_hat

        return df_adjusted


In [467]:
# Crear instancia de la clase con conversión por promedio
td = TemporalDisaggregation(conversion="average", apply_adjustment = False)

# Realizar la predicción en un solo paso
df_res = td.predict(merged_series, method="chow-lin-opt")

In [470]:
df_res

Unnamed: 0,Index,Grain,X,y,y_hat
0,2001,1,100.000000,150.0,146.773631
1,2001,2,100.909091,150.0,147.965087
2,2001,3,101.818182,150.0,148.975339
3,2001,4,102.727273,150.0,149.804334
4,2001,5,103.636364,150.0,150.451985
...,...,...,...,...,...
271,2023,8,346.363636,500.0,486.074653
272,2023,9,347.272727,500.0,487.332949
273,2023,10,348.181818,500.0,488.578955
274,2023,11,349.090909,500.0,489.812668


In [471]:
# Generate test data
np.random.seed(1)  # Set a random seed for reproducibility
df_test = pd.DataFrame({
    "Index": np.repeat(np.arange(2000, 2010), 4),  # Repeats each year 4 times (quarterly data)
    "Grain": np.tile(np.arange(1, 5), 10),  # Cycles through 1 to 4 for each year
    "X": np.random.rand(40) * 100,  # High-frequency indicator series (random values scaled to 100)
    "y": np.repeat(np.random.rand(10) * 1000, 4)  # Low-frequency series (constant per year, scaled to 1000)
})


# Crear instancia de la clase con conversión por promedio
td = TemporalDisaggregation(conversion="average")

# Realizar la predicción en un solo paso
td.predict(df_test, method="chow-lin-opt")

Unnamed: 0,Index,Grain,X,y,y_hat,y_hat_adj
0,2000,1,41.7022,988.861089,1003.046507,1003.046507
1,2000,2,72.032449,988.861089,1034.655128,1034.655128
2,2000,3,0.011437,988.861089,952.22009,952.22009
3,2000,4,30.233257,988.861089,965.52263,965.52263
4,2001,1,14.675589,748.165654,922.785151,922.785151
5,2001,2,9.233859,748.165654,834.76269,834.76269
6,2001,3,18.626021,748.165654,706.284504,706.284504
7,2001,4,34.556073,748.165654,528.830263,528.830263
8,2002,1,39.676747,280.443992,284.593228,284.593228
9,2002,2,53.881673,280.443992,193.063419,193.063419


In [472]:
import pandas as pd
import numpy as np

class TemporalAggregation:
    """
    Class for temporal aggregation of high-frequency time series into low-frequency series.
    """

    def __init__(self, conversion="sum"):
        """
        Initializes the TemporalAggregation class.

        Parameters:
            conversion (str): Specifies the aggregation method:
                - "sum": Sum of the high-frequency values.
                - "average": Mean of the high-frequency values.
                - "first": First observed value.
                - "last": Last observed value.
        """
        if conversion not in ["sum", "average", "first", "last"]:
            raise ValueError("Invalid conversion method. Choose from 'sum', 'average', 'first', 'last'.")
        
        self.conversion = conversion

    def aggregate(self, df, time_col, value_col, freq):
        """
        Aggregates high-frequency data into low-frequency data.

        Parameters:
            df (pd.DataFrame): DataFrame containing time series data.
            time_col (str): Column representing the time index.
            value_col (str): Column with values to aggregate.
            freq (str): Target frequency ('M' for monthly, 'Q' for quarterly, 'A' for annual).

        Returns:
            pd.DataFrame: Aggregated time series.
        """
        df = df.copy()
        df[time_col] = pd.to_datetime(df[time_col])
        df.set_index(time_col, inplace=True)
        
        if self.conversion == "sum":
            aggregated = df[value_col].resample(freq).sum()
        elif self.conversion == "average":
            aggregated = df[value_col].resample(freq).mean()
        elif self.conversion == "first":
            aggregated = df[value_col].resample(freq).first()
        elif self.conversion == "last":
            aggregated = df[value_col].resample(freq).last()
        
        return aggregated.reset_index()

In [None]:
class Retropolation:
    """
    Class for extending time series backwards using various statistical and machine learning methods.
    """

    def __init__(self, df, new_col, old_col):
        """
        Initializes the Retropolation class.

        Parameters:
            df (pd.DataFrame): DataFrame containing the time series.
            new_col (str): Column with the new methodology series.
            old_col (str): Column with the old methodology series.
        """
        self.df = df.copy()
        self.new_col = new_col
        self.old_col = old_col

        # Fill missing values using linear interpolation
        self.df[self.new_col] = self.df[self.new_col].interpolate(method='linear')
        self.df[self.old_col] = self.df[self.old_col].interpolate(method='linear')

    def _proportion(self, mask):
        """Applies proportion-based retropolation."""
        proportion = self.df[self.new_col].dropna() / self.df[self.old_col].dropna()
        mean_prop = np.mean(proportion)
        self.df.loc[mask, self.new_col] = mean_prop * self.df.loc[mask, self.old_col]

    def _linear_regression(self, mask):
        """Applies linear regression for retropolation."""
        valid_data = self.df.dropna(subset=[self.new_col, self.old_col])
        if valid_data.empty:
            raise ValueError("Not enough data for linear regression.")
        
        model = LinearRegression().fit(valid_data[[self.old_col]], valid_data[self.new_col])
        self.df.loc[mask, self.new_col] = model.predict(self.df.loc[mask, self.old_col].values.reshape(-1, 1))

    def _polynomial_regression(self, mask):
        """Applies polynomial regression for retropolation."""
        valid_data = self.df.dropna(subset=[self.new_col, self.old_col])
        if valid_data.empty:
            raise ValueError("Not enough data for polynomial regression.")
        
        poly_model = make_pipeline(PolynomialFeatures(), LinearRegression())
        params = {'polynomialfeatures__degree': range(1, 6)}
        grid = GridSearchCV(poly_model, params, cv=5)
        grid.fit(valid_data[[self.old_col]], valid_data[self.new_col])
        self.df.loc[mask, self.new_col] = grid.predict(self.df.loc[mask, self.old_col].values.reshape(-1, 1))

    def _exponential_smoothing(self, mask, alpha=0.5):
        """Applies exponential smoothing for retropolation."""
        smoothed = self.df[self.new_col].ewm(alpha=alpha, adjust=False).mean()
        self.df.loc[mask, self.new_col] = smoothed.iloc[-1]

    def _mlp_regression(self, mask):
        """Applies MLP regression for retropolation."""
        valid_data = self.df.dropna(subset=[self.new_col, self.old_col])
        if valid_data.empty:
            raise ValueError("Not enough data for MLP regression.")

        model = MLPRegressor(hidden_layer_sizes=(100,), max_iter=5000, activation="relu", alpha=0.001, random_state=42)
        model.fit(valid_data[[self.old_col]], valid_data[self.new_col])
        self.df.loc[mask, self.new_col] = model.predict(self.df.loc[mask, self.old_col].values.reshape(-1, 1))

    def retropolate(self, method="proportion"):
        """
        Performs retropolation using the specified method.

        Parameters:
            method (str): Retropolation method. Options:
                - "proportion"
                - "linear_regression"
                - "polynomial_regression"
                - "exponential_smoothing"
                - "mlp_regression"

        Returns:
            pd.Series: Completed series with retropolated values.
        """
        mask = self.df[self.new_col].isna() & self.df[self.old_col].notna()

        if method == "proportion":
            self._proportion(mask)
        elif method == "linear_regression":
            self._linear_regression(mask)
        elif method == "polynomial_regression":
            self._polynomial_regression(mask)
        elif method == "exponential_smoothing":
            self._exponential_smoothing(mask)
        elif method == "mlp_regression":
            self._mlp_regression(mask)
        else:
            raise ValueError("Invalid method. Choose from the available options.")

        return self.df[self.new_col]