# Hierarchical Dynamic Factor Model for GDP Forecasting: Implementation Plan  

## 1. Architecture Overview  

The architecture implements a hierarchical forecasting system that integrates information from multiple time frequencies to predict quarterly GDP growth. The system processes data through a cascade of Dynamic Factor Models (DFMs) that successively integrate information from higher to lower frequencies:  
  
- Daily Data → Weekly Data → Monthly Data → Quarterly GDP  
  
We will implement two versions of the model:  
  
1. Pure Hierarchical: Using direct factor weighting for the final GDP prediction  
2. Hierarchical + MIDAS: Using Mixed-Data Sampling for optimal temporal weighting at the final stage  
  
### Key Components  
  
#### 1. Multi-frequency Data Processing Pipeline  
  
- Specialized preprocessing for daily, weekly, monthly, and quarterly data  
- Feature engineering with technical indicators (SMA, RSI, ROC)  
- Temporal alignment mechanism between frequency domains  
  
#### 2. Hierarchical Dynamic Factor Model System  
  
- Series of DFMs that extract factors at each frequency level  
- Autoregressive modeling of factors at each stage  
- Information flow from higher to lower frequencies  
  
#### 3. GDP Prediction Module  
  
- Pure hierarchical version with direct factor weighting  
- Hybrid version with MIDAS-style weighting  
- Autoregressive modeling of GDP growth  
  
#### 4. Evaluation and Diagnostic Framework  
  
- Out-of-sample forecasting performance evaluation  
- Comparative analysis of hierarchical vs. hybrid approaches  
- Economic interpretation of factor contributions  



## 2. Data Processing Framework  
  
### Daily Data Processing  
  
- Preprocessing of 7 daily time series (Oil, Gold, Copper/Gold, Lumber/Gold, S&P 500, 10Y-03M, 10Y-02Y)  
- Technical indicators: SMA(5,20,60,200), RSI(14,21), ROC(1,5,20,60)  
- Enhanced metrics for each technical indicator  
- Dimension reduction through dynamic factor model  
  
### Weekly Data Processing  
  
- Preprocessing of 2 weekly series (Unemployment Claims, Financial Conditions)  
- Technical indicators: SMA(4,12,26), RSI(8), ROC(1,4,12,26)  
- Integration with daily factors  
- Dimension reduction through dynamic factor model  
  
### Monthly Data Processing  
  
- Preprocessing of 7 monthly series (CPI, Unemployment, Interest Rates, Housing, etc.)  
- Technical indicators: SMA(3,6,12), RSI(6), ROC(1,3,6,12)  
- Integration with weekly factors  
- Dimension reduction through dynamic factor model
  
### Quarterly Data Processing  
  
- Preprocessing of GDP growth series  
- Autoregressive modeling  
- Integration with monthly factors  
- Final prediction model  
  
## 3. Temporal Alignment Strategy  
The system uses a state space alignment strategy for connecting factors across frequencies:  
  
1. Factor Projection Method: Uses the DFM's state evolution equations to project factors to required observation dates  
2. End-of-Period Anchoring: Anchors alignment to release dates of economic data  
3. Information Flow Control: Ensures no future information leaks into the model  
  
This approach is based on research by Bańbura et al. (2011) showing 15-25% improvement in forecast accuracy compared to simple aggregation methods.  

## 4. Dynamic Factor Model Implementation
Each DFM will:  
  
1. Extract common factors from input data  
2. Model the autoregressive structure of factors  
3. Pass both factor values and their dynamics to the next level  
  
The DFM implementation follows Doz, Giannone, and Reichlin (2011), using:  
  
- EM algorithm for parameter estimation  
- Kalman filtering for optimal factor extraction  
- State space representation for dynamic modeling  
  
## 5. MIDAS Implementation (Hybrid Version)  
The MIDAS component:  
  
1. Optimally weights monthly factors for quarterly GDP prediction  
2. Uses exponential Almon lag polynomial weighting functions  
3. Combines with autoregressive GDP components  
  
This approach is based on Ghysels et al. (2004) showing superior performance for mixed-frequency forecasting.  
  
## 6. Measurement and Evaluation  
The system evaluation includes:  
  
1. Out-of-sample forecast accuracy metrics (RMSE, MAE)  
2. Rolling window forecast evaluation  
3. Comparative analysis of both model versions  
4. Statistical significance testing of forecast improvements    

# Module 1: Enhanced Data Preprocessor for Multi-Frequency Data

In [2]:
import numpy as np
import pandas as pd
import os
import glob
import re
import copy
import warnings
import matplotlib.pyplot as plt
from tqdm import tqdm
from datetime import datetime, timedelta
from statsmodels.tsa.stattools import acf, pacf

class MultiFrequencyPreprocessor:
    """
    Enhanced data preprocessor for multi-frequency economic data.
    
    This class handles different time frequencies (daily, weekly, monthly, quarterly)
    and ensures proper alignment and processing for hierarchical modeling.
    
    Attributes:
        data_folder (str): Path to the folder containing CSV files
        available_files (list): List of available CSV files
        data_config (dict): Configuration for data loading and preprocessing
        frequency_data (dict): Dictionary containing data for each frequency
    """
    
    def __init__(self, data_folder):
        """
        Initialize the MultiFrequencyPreprocessor with the folder containing CSV files.
        
        Parameters:
        -----------
        data_folder: str
            Path to the folder containing CSV files
        """
        self.data_folder = data_folder
        self.available_files = self._get_available_files()
        self.data_config = {}
        self.frequency_data = {
            'daily': None,
            'weekly': None,
            'monthly': None, 
            'quarterly': None
        }
        self.start_date = None
        self.end_date = None
        
        # Dictionaries to store processed data and factors
        self.processed_data = {}
        self.factors = {}
        
        print(f"Found {len(self.available_files)} files in {data_folder}")
        
    def _get_available_files(self):
        """List all CSV files in the data folder."""
        # Normalize path to handle both forward and backward slashes
        norm_path = os.path.normpath(self.data_folder)
        files = glob.glob(os.path.join(norm_path, '*.csv'))
        return [os.path.basename(f) for f in files]
    
    def set_config(self, data_config):
        """
        Set the configuration for data loading and preprocessing.
        
        Parameters:
        -----------
        data_config: dict
            Configuration dictionary with the following structure:
            {
                'daily': {
                    'file_name.csv': {
                        'columns': ['column1'],
                        'transformations': {'column1': 'pct_change'},
                        'start_date': '1980-01-01'  # Optional
                    },
                    ...
                },
                'weekly': {...},
                'monthly': {...},
                'quarterly': {...}
            }
        """
        self.data_config = data_config
        
    def set_date_range(self, start_date=None, end_date=None):
        """
        Set the global date range for data processing.
        
        Parameters:
        -----------
        start_date: str or datetime
            Start date for data processing (format: 'YYYY-MM-DD')
        end_date: str or datetime
            End date for data processing (format: 'YYYY-MM-DD')
        """
        if start_date:
            self.start_date = pd.to_datetime(start_date) if isinstance(start_date, str) else start_date
        if end_date:
            self.end_date = pd.to_datetime(end_date) if isinstance(end_date, str) else end_date
    
    def _load_csv(self, file_name, frequency):
        """
        Load a CSV file and parse the date column.
        
        Parameters:
        -----------
        file_name: str
            Name of the CSV file
        frequency: str
            Data frequency ('daily', 'weekly', 'monthly', 'quarterly')
        
        Returns:
        --------
        pd.DataFrame
            Loaded dataframe with date index
        """
        # Normalize path
        norm_path = os.path.normpath(self.data_folder)
        file_path = os.path.join(norm_path, file_name)
        
        try:
            # First try standard CSV loading
            df = pd.read_csv(file_path, parse_dates=[0], index_col=0)
            
            # Check if index is datetime
            if not pd.api.types.is_datetime64_any_dtype(df.index):
                # Convert index to datetime
                df.index = pd.to_datetime(df.index)
                
            # Apply frequency-specific processing
            if frequency == 'daily':
                # For daily data, ensure the index is business days
                df = df.asfreq('B', method='ffill')
            elif frequency == 'weekly':
                # For weekly data, use end of week
                df = df.asfreq('W-FRI', method='ffill')
            elif frequency == 'monthly':
                # For monthly data, use end of month
                df = df.asfreq('M', method='ffill')
            elif frequency == 'quarterly':
                # For quarterly data, use end of quarter
                df = df.asfreq('Q', method='ffill')
            
            return df
            
        except Exception as e:
            print(f"Error loading {file_name}: {e}")
            
            # Try alternative approach
            try:
                df = pd.read_csv(file_path)
                date_col = df.columns[0]
                
                # Try different date formats
                try:
                    df[date_col] = pd.to_datetime(df[date_col])
                except:
                    for date_format in ['%Y-%m-%d', '%d/%m/%Y', '%m/%d/%Y', '%Y/%m/%d']:
                        try:
                            df[date_col] = pd.to_datetime(df[date_col], format=date_format)
                            break
                        except ValueError:
                            continue
                
                df.set_index(date_col, inplace=True)
                
                # Apply frequency-specific processing
                if frequency == 'daily':
                    df = df.asfreq('B', method='ffill')
                elif frequency == 'weekly':
                    df = df.asfreq('W-FRI', method='ffill')
                elif frequency == 'monthly':
                    df = df.asfreq('M', method='ffill')
                elif frequency == 'quarterly':
                    df = df.asfreq('Q', method='ffill')
                
                return df
                
            except Exception as nested_e:
                print(f"Failed to load {file_name} after multiple attempts: {nested_e}")
                raise
    
    def _apply_transformation(self, df, column, transformation):
        """
        Apply the specified transformation to a column.
        
        Parameters:
        -----------
        df: pd.DataFrame
            DataFrame containing the column
        column: str
            Column name to transform
        transformation: str or list
            Transformation type ('raw', 'pct_change', 'log_return', 'diff') or list
            
        Returns:
        --------
        list of tuples
            List of (column_name, transformed_series) tuples
        """
        if column not in df.columns:
            print(f"Warning: Column {column} not found in DataFrame")
            return []
        
        # Handle list of transformations
        if isinstance(transformation, list):
            result = []
            for t in transformation:
                column_name = f"{column}_{t}"
                series = self._apply_single_transformation(df, column, t)
                result.append((column_name, series))
            return result
        else:
            # Handle single transformation
            column_name = f"{column}_{transformation}" if transformation != 'raw' else column
            series = self._apply_single_transformation(df, column, transformation)
            return [(column_name, series)]
    
    def _apply_single_transformation(self, df, column, transformation):
        """
        Apply a single transformation to a column with robust handling of edge cases.
        
        Parameters:
        -----------
        df: pd.DataFrame
            DataFrame containing the column
        column: str
            Column name to transform
        transformation: str
            Transformation type ('raw', 'pct_change', 'log_return', 'diff', 'yoy')
            
        Returns:
        --------
        pd.Series
            Transformed series
        
        References:
        -----------
        - Stock, J. H., & Watson, M. W. (2002). Forecasting using principal components from a
        large number of predictors. Journal of the American Statistical Association, 97, 1167-1179.
        - Marcellino, M., Stock, J. H., & Watson, M. W. (2003). Macroeconomic forecasting in the
        euro area: Country specific versus area-wide information. European Economic Review, 47, 1-18.
        """
        if transformation == 'raw':
            return df[column]
        elif transformation == 'pct_change':
            # Calculate percentage change with correct usage
            # First ffill to avoid warning about fill_method
            pct = df[column].ffill().pct_change() * 100
            # Fill first value with 0 for continuity
            if len(pct) > 0:
                pct.iloc[0] = 0
            return pct
        elif transformation == 'log_return':
            # Calculate log return (continuously compounded return)
            # Log returns provide better statistical properties for economic forecasting
            # Reference: Campbell, J. Y., Lo, A. W., & MacKinlay, A. C. (1997). The Econometrics of Financial Markets.
            log_ret = (np.log(df[column]) - np.log(df[column].shift(1))) * 100
            # Fill first value with 0 for continuity
            if len(log_ret) > 0:
                log_ret.iloc[0] = 0
            return log_ret
        elif transformation == 'diff':
            # Calculate first difference
            diff = df[column].diff()
            # Fill first value with 0 for continuity
            if len(diff) > 0:
                diff.iloc[0] = 0
            return diff
        elif transformation == 'yoy':
            # Calculate year-over-year percentage change
            # Reference: Zarnowitz, V., & Ozyildirim, A. (2006). Time series decomposition and measurement
            # of business cycles, trends and growth cycles. Journal of Monetary Economics, 53, 1717-1739.
            yoy = df[column].ffill().pct_change(periods=12) * 100
            # Forward fill NaN values
            yoy = yoy.ffill()
            return yoy
        else:
            raise ValueError(f"Unknown transformation: {transformation}")

    def _calculate_ratios(self, data_dict, ratio_config):
        """
        Calculate financial ratios from base time series.
        
        Parameters:
        -----------
        data_dict: dict
            Dictionary of DataFrames
        ratio_config: dict
            Configuration for ratio calculation
            
        Returns:
        --------
        dict
            Dictionary with ratio DataFrames added
        """
        result_dict = data_dict.copy()
        
        for ratio_name, config in ratio_config.items():
            try:
                numerator_key = config['numerator']
                denominator_key = config['denominator']
                transformations = config.get('transformations', ['raw'])
                
                # Get the component series
                if numerator_key in data_dict and denominator_key in data_dict:
                    numerator = data_dict[numerator_key].iloc[:, 0]  # Assume first column
                    denominator = data_dict[denominator_key].iloc[:, 0]  # Assume first column
                    
                    # Calculate the ratio
                    ratio = numerator / denominator
                    ratio_df = pd.DataFrame({f"{ratio_name}_raw": ratio})
                    
                    # Apply transformations
                    for transform in transformations:
                        if transform != 'raw':
                            transformed_series = self._apply_single_transformation(ratio_df, f"{ratio_name}_raw", transform)
                            ratio_df[f"{ratio_name}_{transform}"] = transformed_series
                    
                    # Add to result
                    result_dict[ratio_name] = ratio_df
                    print(f"Created ratio: {ratio_name} with {len(ratio_df)} observations")
                else:
                    print(f"Warning: Could not create ratio {ratio_name}. Missing component series.")
            except Exception as e:
                print(f"Error calculating ratio {ratio_name}: {e}")
                
        return result_dict
                
    def process_frequency_data(self, frequency):
        """
        Process data for a specific frequency.
        
        Parameters:
        -----------
        frequency: str
            Data frequency ('daily', 'weekly', 'monthly', 'quarterly')
        
        Returns:
        --------
        pd.DataFrame
            Processed DataFrame for the specified frequency
        """
        if frequency not in self.data_config:
            print(f"No configuration found for {frequency} data")
            return None
        
        print(f"Processing {frequency} data...")
        freq_config = self.data_config[frequency]
        
        # Load and transform individual files
        data_dict = {}
        for file_name, config in freq_config.get('files', {}).items():
            if file_name not in self.available_files:
                print(f"Warning: {file_name} not found, skipping")
                continue
            
            try:
                # Load CSV file
                df = self._load_csv(file_name, frequency)
                
                # Apply date filtering if specified
                if 'start_date' in config:
                    df = df[df.index >= pd.to_datetime(config['start_date'])]
                elif self.start_date:
                    df = df[df.index >= self.start_date]
                
                if self.end_date:
                    df = df[df.index <= self.end_date]
                
                # Apply transformations
                transformed_columns = []
                for column in config['columns']:
                    # Get transformation type
                    transformation = config['transformations'].get(column, 'raw')
                    # Apply transformation
                    results = self._apply_transformation(df, column, transformation)
                    # Store results
                    for col_name, series in results:
                        # Create descriptive name: filename_column_transformation
                        file_prefix = file_name.split('.')[0] # Remove extension
                        prefixed_name = f"{file_prefix}_{col_name}"
                        transformed_columns.append((prefixed_name, series))
                
                # Create DataFrame from transformed columns
                if transformed_columns:
                    processed_df = pd.DataFrame({name: series for name, series in transformed_columns})
                    processed_df.index = df.index
                    # Store in data dictionary
                    key = file_name.split('.')[0] # Use filename without extension
                    data_dict[key] = processed_df
                    print(f"Processed {file_name}: {len(processed_df)} observations, {len(processed_df.columns)} features")
            except Exception as e:
                print(f"Error processing {file_name}: {e}")
                import traceback
                traceback.print_exc()
        
        # Calculate ratios if configured
        if 'ratios' in freq_config:
            data_dict = self._calculate_ratios(data_dict, freq_config['ratios'])
        
        # Merge all DataFrames
        if data_dict:
            merged_df = None
            for _, df in data_dict.items():
                if merged_df is None:
                    merged_df = df.copy()
                else:
                    merged_df = merged_df.join(df, how='outer')
            
            # Handle missing values
            if merged_df is not None:
                # Forward fill for continuity (using proper method)
                merged_df = merged_df.ffill()
                # Then backward fill any remaining NaNs at the beginning
                merged_df = merged_df.bfill()
                
                # Store in processed data dictionary
                self.processed_data[frequency] = merged_df
                print(f"Final {frequency} dataset: {len(merged_df)} observations, {len(merged_df.columns)} features")
                return merged_df
            else:
                print(f"No valid data found for {frequency} frequency")
                return None
        else:
            print(f"No data processed for {frequency} frequency")
            return None
    
    def process_all_frequencies(self):
        """
        Process data for all configured frequencies.
        
        Returns:
        --------
        dict
            Dictionary of processed DataFrames for each frequency
        """
        for frequency in self.data_config.keys():
            self.process_frequency_data(frequency)
        
        return self.processed_data
    
    def align_to_dates(self, source_df, target_dates, method='last'):
        """
        Align source DataFrame to target dates using specified method.
        
        Parameters:
        -----------
        source_df: pd.DataFrame
            Source DataFrame to align
        target_dates: pd.DatetimeIndex
            Target dates to align to
        method: str
            Method for alignment ('last', 'nearest', 'linear')
            
        Returns:
        --------
        pd.DataFrame
            Aligned DataFrame
        
        References:
        -----------
        - Ghysels, E., Santa-Clara, P., & Valkanov, R. (2004). The MIDAS touch: Mixed data 
          sampling regression models. Working paper, University of North Carolina.
        - Marcellino, M., & Schumacher, C. (2010). Factor MIDAS for nowcasting and forecasting 
          with ragged-edge data: A model comparison for German GDP. Oxford Bulletin of Economics 
          and Statistics, 72, 518-550.
        """
        # Initialize aligned DataFrame with the same columns as source_df
        aligned_df = pd.DataFrame(index=target_dates, columns=source_df.columns)
        
        if method == 'last':
            # For each target date, find the last available observation
            for date in target_dates:
                prev_data = source_df[source_df.index <= date]
                if not prev_data.empty:
                    # Get the last row as a Series and assign values column by column
                    last_row = prev_data.iloc[-1]
                    for col in source_df.columns:
                        aligned_df.loc[date, col] = last_row[col]
        
        elif method == 'nearest':
            # For each target date, find the nearest observation
            for date in target_dates:
                # Calculate absolute difference in days
                source_dates = source_df.index
                if len(source_dates) > 0:
                    # Convert to numpy arrays for vectorized operations
                    days_diff = np.abs((source_dates - date).days.values)
                    nearest_idx = np.argmin(days_diff)
                    # Assign values column by column
                    nearest_row = source_df.iloc[nearest_idx]
                    for col in source_df.columns:
                        aligned_df.loc[date, col] = nearest_row[col]
        
        elif method == 'linear':
            # This method can be implemented directly with pandas reindex
            aligned_df = source_df.reindex(index=sorted(list(source_df.index) + list(target_dates)))
            # Apply linear interpolation
            aligned_df = aligned_df.interpolate(method='linear')
            # Extract only the target dates
            aligned_df = aligned_df.reindex(target_dates)
        
        else:
            raise ValueError(f"Unknown alignment method: {method}")
        
        # Handle any remaining NaNs by forward filling, then backward filling
        aligned_df = aligned_df.ffill().bfill()
        
        return aligned_df
    
    def generate_hierarchical_dataset(self, target_frequency='quarterly'):
        """
        Generate hierarchical dataset with higher-frequency data aligned to lower frequency.
        
        Parameters:
        -----------
        target_frequency: str
            Target frequency for alignment ('quarterly', 'monthly', 'weekly')
            
        Returns:
        --------
        dict
            Dictionary of aligned datasets for hierarchical modeling
        """
        hierarchical_data = {}
        
        # Define frequency hierarchy
        freq_hierarchy = {
            'quarterly': ['monthly', 'weekly', 'daily'],
            'monthly': ['weekly', 'daily'],
            'weekly': ['daily']
        }
        
        # Get target dates
        if target_frequency not in self.processed_data:
            raise ValueError(f"No processed data found for {target_frequency} frequency")
            
        target_dates = self.processed_data[target_frequency].index
        hierarchical_data[target_frequency] = self.processed_data[target_frequency]
        
        # Align higher frequency data to target dates
        for higher_freq in freq_hierarchy.get(target_frequency, []):
            if higher_freq in self.processed_data:
                aligned_df = self.align_to_dates(
                    self.processed_data[higher_freq], 
                    target_dates,
                    method='last'  # Use last available observation
                )
                hierarchical_data[f"{higher_freq}_aligned"] = aligned_df
        
        return hierarchical_data
    
    def plot_data_overview(self, frequency=None):
        """
        Plot an overview of the processed data to help with visualization.
        
        Parameters:
        -----------
        frequency: str or None
            Frequency to plot, or None to plot all
            
        Returns:
        --------
        matplotlib.figure.Figure
            Matplotlib figure object
        """
        if frequency:
            if frequency not in self.processed_data:
                raise ValueError(f"No processed data found for {frequency} frequency")
            frequencies = [frequency]
        else:
            frequencies = list(self.processed_data.keys())
        
        n_freqs = len(frequencies)
        fig, axes = plt.subplots(n_freqs, 1, figsize=(15, 6*n_freqs))
        
        if n_freqs == 1:
            axes = [axes]
        
        for i, freq in enumerate(frequencies):
            df = self.processed_data[freq]
            
            # Select a subset of columns if there are too many
            max_cols = 10
            if len(df.columns) > max_cols:
                # Choose evenly spaced columns
                indices = np.linspace(0, len(df.columns)-1, max_cols, dtype=int)
                plot_cols = [df.columns[i] for i in indices]
            else:
                plot_cols = df.columns
            
            # Plot each column
            for col in plot_cols:
                axes[i].plot(df.index, df[col], label=col)
            
            axes[i].set_title(f"{freq.capitalize()} Data Overview")
            axes[i].set_xlabel('Date')
            axes[i].set_ylabel('Value')
            axes[i].grid(True, alpha=0.3)
            axes[i].legend(loc='upper left', bbox_to_anchor=(1, 1))
        
        plt.tight_layout()
        return fig

# Module 2: Technical Indicators for Multi-Frequency Data

In [3]:
class MultiFrequencyTechnicalIndicators:
    """
    Technical indicators calculation for multi-frequency economic data.
    
    This class implements SMA, RSI, and ROC with frequency-appropriate parameters
    and enhanced metrics for economic time series.
    
    References:
    -----------
    - Neely, C. J., Rapach, D. E., Tu, J., & Zhou, G. (2014). Forecasting the equity risk premium: 
      The role of technical indicators. Management Science, 60(7), 1772-1791.
    - Brock, W., Lakonishok, J., & LeBaron, B. (1992). Simple Technical Trading Rules and the 
      Stochastic Properties of Stock Returns. Journal of Finance, 47(5), 1731-1764.
    """
    
    @staticmethod
    def get_frequency_params(frequency):
        """
        Get appropriate technical indicator parameters for each frequency.
        
        Parameters:
        -----------
        frequency: str
            Data frequency ('daily', 'weekly', 'monthly', 'quarterly')
            
        Returns:
        --------
        dict
            Dictionary of parameters for each indicator type
            
        References:
        -----------
        - Neely, C. J., Rapach, D. E., Tu, J., & Zhou, G. (2014). Forecasting the equity risk premium: 
          The role of technical indicators. Management Science, 60(7), 1772-1791.
        - Brock, W., Lakonishok, J., & LeBaron, B. (1992). Simple Technical Trading Rules and the 
          Stochastic Properties of Stock Returns. Journal of Finance, 47(5), 1731-1764.
        """
        if frequency == 'daily':
            # For daily data - standard financial parameters
            # Reference: Brock et al. (1992), Neely et al. (2014)
            return {
                'sma': [5, 20, 60, 200],  # Short, medium, quarter, year
                'rsi': [14, 21],          # Standard and extended
                'roc': [1, 5, 20, 60]     # Daily, weekly, monthly, quarter
            }
        elif frequency == 'weekly':
            # For weekly data - adjusted to weekly scale
            # Reference: Marshall et al. (2008)
            return {
                'sma': [4, 12, 26, 52],   # Month, quarter, half-year, year
                'rsi': [8, 12],           # ~1.5-2 months
                'roc': [1, 4, 13, 26]     # Week, month, quarter, half-year
            }
        elif frequency == 'monthly':
            # For monthly data - adjusted to monthly scale
            # Reference: Fama & French (1988)
            return {
                'sma': [3, 6, 12, 24],    # Quarter, half-year, year, two years
                'rsi': [6, 9],            # Half-year, three quarters
                'roc': [1, 3, 6, 12]      # Month, quarter, half-year, year
            }
        elif frequency == 'quarterly':
            # For quarterly data - adjusted to quarterly scale
            # Reference: Stock & Watson (2002)
            return {
                'sma': [2, 4, 8, 12],     # Half-year, year, two years, three years
                'rsi': [4, 6],            # Year, year and half
                'roc': [1, 2, 4, 8]       # Quarter, half-year, year, two years
            }
        else:
            raise ValueError(f"Unknown frequency: {frequency}")
    
    @staticmethod
    def _calculate_trend_direction(series, periods=1):
        """
        Calculate trend direction for a series with proper handling of zeros and NaNs.
        
        Parameters:
        -----------
        series: pandas.Series
            Series to calculate trend direction for
        periods: int
            Number of periods to look back
            
        Returns:
        --------
        pandas.Series
            Series containing trend direction values:
            1 for rising, -1 for falling, 0 for no change
        """
        # Calculate direction safely
        diff = series.diff(periods)
        
        # Initialize direction series
        direction = pd.Series(0, index=series.index)
        
        # Positive direction
        direction[diff > 0] = 1
        
        # Negative direction
        direction[diff < 0] = -1
        
        # For zero-diff values, carry forward previous direction to avoid flicker
        # but only where series values are valid
        zero_mask = (diff == 0) & series.notna()
        if zero_mask.any():
            # Forward-fill only zero-diff positions
            direction_filled = direction.copy()
            direction_filled[zero_mask] = np.nan
            direction_filled = direction_filled.ffill()
            
            # Update direction where diff was zero
            direction[zero_mask] = direction_filled[zero_mask]
            
        return direction
    
    @staticmethod
    def calculate_sma(df, column, windows=None, include_trend=True, include_crossovers=True):
        """
        Calculate Simple Moving Averages with enhanced metrics.
        
        Parameters:
        -----------
        df: pandas.DataFrame
            DataFrame containing the data
        column: str
            Column name to calculate SMA for
        windows: list
            List of window sizes for SMA calculation
        include_trend: bool
            Whether to include trend direction
        include_crossovers: bool
            Whether to include crossover signals
            
        Returns:
        --------
        pandas.DataFrame
            DataFrame with SMA values and enhanced metrics
            
        References:
        -----------
        - Brock, W., Lakonishok, J., & LeBaron, B. (1992). Simple Technical Trading Rules and 
          the Stochastic Properties of Stock Returns. Journal of Finance, 47(5), 1731-1764.
        - Neely, C. J., Rapach, D. E., Tu, J., & Zhou, G. (2014). Forecasting the equity risk 
          premium: The role of technical indicators. Management Science, 60(7), 1772-1791.
        """
        if windows is None:
            # Default parameters - will be overridden by frequency-specific ones
            windows = [5, 20, 60, 200]
        
        result_df = pd.DataFrame(index=df.index)
        
        # Calculate SMAs for each window
        for window in windows:
            # Calculate SMA with proper min_periods
            min_periods = max(1, window // 4)
            sma = df[column].rolling(window=window, min_periods=min_periods).mean()
            sma_name = f"{column}_SMA_{window}"
            result_df[sma_name] = sma
            
            # Calculate percentage difference from SMA
            valid_mask = (sma != 0) & sma.notna() & df[column].notna()
            pct_diff = pd.Series(index=df.index, dtype=float)
            pct_diff[valid_mask] = (df[column][valid_mask] - sma[valid_mask]) / sma[valid_mask] * 100
            result_df[f"{sma_name}_pct_diff"] = pct_diff
            
            # Calculate trend if requested
            if include_trend:
                trend = MultiFrequencyTechnicalIndicators._calculate_trend_direction(sma)
                result_df[f"{sma_name}_trend"] = trend
        
        # Calculate crossovers if requested and we have at least two windows
        if include_crossovers and len(windows) >= 2:
            # Sort windows to ensure correct fast/slow designation
            sorted_windows = sorted(windows)
            
            # Calculate crossovers between adjacent SMAs
            for i in range(len(sorted_windows) - 1):
                fast_window = sorted_windows[i]
                slow_window = sorted_windows[i+1]
                
                fast_sma = result_df[f"{column}_SMA_{fast_window}"]
                slow_sma = result_df[f"{column}_SMA_{slow_window}"]
                
                # Calculate difference between fast and slow SMAs
                diff = fast_sma - slow_sma
                
                # Calculate crossover signal
                crossover = pd.Series(0, index=df.index)
                
                # Find where diff changes sign
                diff_sign = np.sign(diff)
                sign_change = diff_sign.diff().fillna(0)
                
                # 1 for bullish crossover (fast crosses above slow)
                crossover[sign_change > 0] = 1
                
                # -1 for bearish crossover (fast crosses below slow)
                crossover[sign_change < 0] = -1
                
                crossover_name = f"{column}_SMA_{fast_window}_{slow_window}_crossover"
                result_df[crossover_name] = crossover
        
        return result_df
    
    @staticmethod
    def calculate_rsi(df, column, windows=None, include_trend=True, include_zones=True):
        """
        Calculate Relative Strength Index with enhanced metrics.
        
        Parameters:
        -----------
        df: pandas.DataFrame
            DataFrame containing the data
        column: str
            Column name to calculate RSI for
        windows: list
            List of window sizes for RSI calculation
        include_trend: bool
            Whether to include trend direction
        include_zones: bool
            Whether to include overbought/oversold zone indicators
            
        Returns:
        --------
        pandas.DataFrame
            DataFrame with RSI values and enhanced metrics
            
        References:
        -----------
        - Wilder, J. W. (1978). New Concepts in Technical Trading Systems. 
          Trend Research, Greensboro, NC.
        - Chong, T. T. L., & Ng, W. K. (2008). Technical analysis and the London stock exchange: 
          Testing the MACD and RSI rules using the FT30. Applied Economics Letters, 15(14), 1111-1114.
        """
        if windows is None:
            # Default parameters - will be overridden by frequency-specific ones
            windows = [14, 21]
        
        result_df = pd.DataFrame(index=df.index)
        
        for window in windows:
            # Calculate price changes
            delta = df[column].diff()
            
            # Create separate gain and loss series with proper dtype
            gain = pd.Series(0.0, index=delta.index)  # Use float dtype
            loss = pd.Series(0.0, index=delta.index)  # Use float dtype
            
            # Set values for gain and loss series using .loc for proper assignment
            gain.loc[delta > 0] = delta[delta > 0]
            loss.loc[delta < 0] = -delta[delta < 0]  # Make losses positive
            
            # First values are NaN
            gain.iloc[0] = 0.0
            loss.iloc[0] = 0.0
            
            # Calculate RSI using Wilder's method
            # First calculate simple averages for initial periods
            avg_gain = gain.rolling(window=window, min_periods=1).mean()
            avg_loss = loss.rolling(window=window, min_periods=1).mean()
            
            # Then use the Wilder's smoothing method
            for i in range(window, len(gain)):
                avg_gain.iloc[i] = (avg_gain.iloc[i-1] * (window-1) + gain.iloc[i]) / window
                avg_loss.iloc[i] = (avg_loss.iloc[i-1] * (window-1) + loss.iloc[i]) / window
            
            # Calculate RS and RSI
            # Avoid division by zero with epsilon
            epsilon = np.finfo(float).eps
            rs = avg_gain / avg_loss.replace(0, epsilon)
            rsi = 100 - (100 / (1 + rs))
            
            # Ensure RSI is within [0, 100] bounds
            rsi = np.clip(rsi, 0, 100)
            rsi_name = f"{column}_RSI_{window}"
            result_df[rsi_name] = rsi
            
            # Calculate trend if requested
            if include_trend:
                trend = MultiFrequencyTechnicalIndicators._calculate_trend_direction(rsi)
                result_df[f"{rsi_name}_trend"] = trend
            
            # Add overbought/oversold indicators if requested
            if include_zones:
                # Overbought zone (RSI > 70)
                result_df[f"{rsi_name}_overbought"] = (rsi > 70).astype(int)
                # Oversold zone (RSI < 30)
                result_df[f"{rsi_name}_oversold"] = (rsi < 30).astype(int)
                
                # Initialize divergence column
                result_df[f"{rsi_name}_divergence"] = 0
                
                # Calculate divergence between price and RSI
                # Instead of using chained assignment, we'll create and assign a complete array
                divergence_window = max(5, window // 3)
                divergence_values = np.zeros(len(df))
                
                # Process in batches to improve performance
                batch_size = 1000  # Process in batches
                
                for start_idx in range(divergence_window, len(df), batch_size):
                    end_idx = min(start_idx + batch_size, len(df))
                    
                    for i in range(start_idx, end_idx):
                        # Get windows for analysis
                        price_window = df[column].iloc[i-divergence_window:i+1]
                        rsi_window = rsi.iloc[i-divergence_window:i+1]
                        
                        # Skip if windows contain NaN
                        if price_window.isna().any() or rsi_window.isna().any():
                            continue
                        
                        # Check for bearish divergence
                        # Price higher high but RSI lower high
                        if (price_window.iloc[-1] > price_window.iloc[:-1].max() and 
                            rsi_window.iloc[-1] < rsi_window.iloc[:-1].max()):
                            divergence_values[i] = -1  # Bearish
                            
                        # Check for bullish divergence
                        # Price lower low but RSI higher low
                        elif (price_window.iloc[-1] < price_window.iloc[:-1].min() and 
                              rsi_window.iloc[-1] > rsi_window.iloc[:-1].min()):
                            divergence_values[i] = 1  # Bullish
                
                # Assign the complete divergence array at once (avoids chained assignment)
                result_df.loc[:, f"{rsi_name}_divergence"] = divergence_values
        
        return result_df
    
    @staticmethod
    def calculate_roc(df, column, windows=None, include_trend=True, include_signal=True):
        """
        Calculate Rate of Change with enhanced metrics.
        
        Parameters:
        -----------
        df: pandas.DataFrame
            DataFrame containing the data
        column: str
            Column name to calculate ROC for
        windows: list
            List of window sizes for ROC calculation
        include_trend: bool
            Whether to include trend direction
        include_signal: bool
            Whether to include signal line
            
        Returns:
        --------
        pandas.DataFrame
            DataFrame with ROC values and enhanced metrics
            
        References:
        -----------
        - Moskowitz, T. J., Ooi, Y. H., & Pedersen, L. H. (2012). Time series momentum. 
          Journal of Financial Economics, 104(2), 228-250.
        - Menkhoff, L., Sarno, L., Schmeling, M., & Schrimpf, A. (2012). Currency momentum 
          strategies. Journal of Financial Economics, 106(3), 660-684.
        """
        if windows is None:
            # Default parameters - will be overridden by frequency-specific ones
            windows = [1, 5, 20, 60]
        
        result_df = pd.DataFrame(index=df.index)
        
        for window in windows:
            # Calculate ROC (percentage change over the specified window)
            roc = df[column].pct_change(periods=window) * 100
            
            # Fill first value with 0 for continuity
            roc.iloc[:window] = 0
            
            roc_name = f"{column}_ROC_{window}"
            result_df[roc_name] = roc
            
            # Calculate trend if requested
            if include_trend:
                trend = MultiFrequencyTechnicalIndicators._calculate_trend_direction(roc)
                result_df[f"{roc_name}_trend"] = trend
            
            # Calculate signal line if requested
            if include_signal:
                # Signal line is typically a moving average of the ROC
                signal_window = max(5, window // 4)
                signal = roc.rolling(window=signal_window, min_periods=1).mean()
                result_df[f"{roc_name}_signal"] = signal
                
                # Calculate crossover signal
                crossover = pd.Series(0, index=df.index)
                
                # ROC crossing above signal line = bullish
                crossover[(roc.shift(1) <= signal.shift(1)) & (roc > signal)] = 1
                
                # ROC crossing below signal line = bearish
                crossover[(roc.shift(1) >= signal.shift(1)) & (roc < signal)] = -1
                
                result_df[f"{roc_name}_crossover"] = crossover
                
                # Calculate histogram (difference between ROC and signal)
                histogram = roc - signal
                result_df[f"{roc_name}_histogram"] = histogram
        
        return result_df
    
    @staticmethod
    def apply_indicators(df, frequency='daily'):
        """
        Apply all technical indicators with frequency-appropriate parameters.
        Parameters:
        -----------
        df: pandas.DataFrame
            DataFrame containing the data
        frequency: str
            Data frequency ('daily', 'weekly', 'monthly', 'quarterly')
        Returns:
        --------
        pandas.DataFrame
            DataFrame with all technical indicators
        """
        # Get frequency-specific parameters
        params = MultiFrequencyTechnicalIndicators.get_frequency_params(frequency)
        
        # Store all indicators in a dictionary first to avoid DataFrame fragmentation
        all_indicators = {}
        
        # Process each column in the DataFrame
        for column in df.columns:
            try:
                # Calculate SMA
                sma_df = MultiFrequencyTechnicalIndicators.calculate_sma(
                    df, column, windows=params['sma'],
                    include_trend=True, include_crossovers=True
                )
                
                # Calculate RSI
                rsi_df = MultiFrequencyTechnicalIndicators.calculate_rsi(
                    df, column, windows=params['rsi'],
                    include_trend=True, include_zones=True
                )
                
                # Calculate ROC
                roc_df = MultiFrequencyTechnicalIndicators.calculate_roc(
                    df, column, windows=params['roc'],
                    include_trend=True, include_signal=True
                )
                
                # Combine all indicators into the dictionary
                for col in sma_df.columns:
                    all_indicators[f"{column}_{col}"] = sma_df[col]
                for col in rsi_df.columns:
                    all_indicators[f"{column}_{col}"] = rsi_df[col]
                for col in roc_df.columns:
                    all_indicators[f"{column}_{col}"] = roc_df[col]
                    
                print(f"Applied indicators to {column}: {len(sma_df.columns) + len(rsi_df.columns) + len(roc_df.columns)} new features")
            except Exception as e:
                print(f"Error applying indicators to {column}: {e}")
                import traceback
                traceback.print_exc()
        
        # Create the result DataFrame in one go to avoid fragmentation
        result_df = pd.DataFrame(all_indicators, index=df.index)
        
        # Handle NaN values properly
        if result_df.isna().any().any():
            # Use proper forward fill and backward fill
            result_df = result_df.ffill().bfill()
            # If still have NaNs, fill with zeros
            result_df = result_df.fillna(0)
        
        return result_df

# Module 3: Dynamic Factor Model Implementation

In [4]:
from statsmodels.base.model import Model
from statsmodels.tools.tools import add_constant
from statsmodels.tsa.statespace.tools import (
    constrain_stationary_univariate, unconstrain_stationary_univariate
)
from statsmodels.tsa.tsatools import lagmat
from scipy.optimize import minimize
from numpy.linalg import pinv
import numpy as np
import pandas as pd
import warnings

class DynamicFactorModel:
    """
    Implementation of Dynamic Factor Model with EM algorithm for parameter estimation.
    
    This implementation follows the methodology of:
    Doz, C., Giannone, D., & Reichlin, L. (2011). A two-step estimator for large 
    approximate dynamic factor models based on Kalman filtering. Journal of 
    Econometrics, 164(1), 188-205.
    
    Attributes:
        n_factors (int): Number of factors to extract
        max_iter (int): Maximum number of EM iterations
        tol (float): Tolerance for convergence
        ar_lags (int): Number of autoregressive lags for factors
    """
    
    def __init__(self, n_factors=5, ar_lags=1, max_iter=100, tol=1e-4, random_state=None):
        """
        Initialize the Dynamic Factor Model.
        
        Parameters:
        -----------
        n_factors: int
            Number of factors to extract
        ar_lags: int
            Number of autoregressive lags for factors
        max_iter: int
            Maximum number of EM iterations
        tol: float
            Tolerance for convergence
        random_state: int or None
            Random state for initialization
        """
        self.n_factors = n_factors
        self.ar_lags = ar_lags
        self.max_iter = max_iter
        self.tol = tol
        self.random_state = random_state
        
        # Model parameters to be estimated
        self.loadings = None      # Factor loadings (Lambda)
        self.ar_coefs = None      # Autoregressive coefficients (A)
        self.Q = None             # State innovations covariance
        self.R = None             # Observation innovations covariance
        
        # Kalman filter/smoother outputs
        self.factors = None       # Smoothed factor estimates
        self.factor_cov = None    # Factor covariance
        
        # Fit statistics
        self.log_likelihood = None
        self.n_obs = None
        self.n_vars = None
        
        # Initialize random number generator
        self.rng = np.random.RandomState(random_state)
    
    def _init_parameters(self, X):
        """
        Initialize model parameters.
        
        Parameters:
        -----------
        X: ndarray
            Data matrix (time × variables)
            
        Returns:
        --------
        tuple
            Initial parameter estimates
            
        References:
        -----------
        - Bańbura, M., & Modugno, M. (2014). Maximum likelihood estimation of factor models on 
          datasets with arbitrary pattern of missing data. Journal of Applied Econometrics, 29(1), 
          133-160.
        """
        T, n_vars = X.shape
        n_factors = self.n_factors
        ar_lags = self.ar_lags
        
        # Standardize data
        X_std = (X - np.nanmean(X, axis=0)) / np.nanstd(X, axis=0)
        
        # Handle missing values
        X_filled = X_std.copy()
        for j in range(n_vars):
            mask = np.isnan(X_filled[:, j])
            if mask.any():
                # Forward fill, then backward fill
                valid_indices = np.where(~mask)[0]
                if len(valid_indices) > 0:
                    for i in range(T):
                        if mask[i]:
                            # Find nearest valid index
                            dists = np.abs(valid_indices - i)
                            nearest_idx = valid_indices[np.argmin(dists)]
                            X_filled[i, j] = X_filled[nearest_idx, j]
                else:
                    # If all values are missing, fill with zeros
                    X_filled[:, j] = 0
        
        # 1. Initialize loadings using Principal Component Analysis
        U, s, Vt = np.linalg.svd(X_filled, full_matrices=False)
        loadings = Vt[:n_factors].T * np.sqrt(n_vars)
        
        # 2. Initialize factors
        factors = U[:, :n_factors] * s[:n_factors] / np.sqrt(n_vars)
        
        # 3. Initialize AR coefficients for factors
        ar_coefs = np.zeros((n_factors, n_factors * ar_lags))
        
        if T > ar_lags:
            # For each factor, estimate an AR model
            for i in range(n_factors):
                factor_series = factors[:, i]
                # Create lagged matrix
                y = factor_series[ar_lags:]
                X_ar = np.column_stack([factor_series[ar_lags-j-1:-j-1] for j in range(ar_lags)])
                
                # OLS estimation
                beta = np.linalg.lstsq(X_ar, y, rcond=None)[0]
                
                # Store coefficients
                ar_coefs[i, i::n_factors] = beta
        
        # Constrain AR coefficients to ensure stationarity
        for i in range(n_factors):
            coefs = ar_coefs[i, i::n_factors]
            if len(coefs) > 0:  # Check if we have any AR coefficients
                stable_coefs = constrain_stationary_univariate(coefs)
                ar_coefs[i, i::n_factors] = stable_coefs
        
        # 4. Initialize state and observation innovations covariance
        # Fit factor model and get residuals
        fitted = factors @ loadings.T
        residuals = X_filled - fitted
        
        # R: Observation innovations covariance (diagonal)
        R = np.diag(np.nanvar(residuals, axis=0))
        
        # Q: State innovations covariance (diagonal)
        Q = np.eye(n_factors) * 0.1
        
        return loadings, ar_coefs, Q, R
    
    def _kalman_filter(self, Y, loadings, ar_coefs, Q, R):
        """
        Run Kalman filter for the dynamic factor model.
        
        Parameters:
        -----------
        Y: ndarray
            Data matrix (time × variables)
        loadings: ndarray
            Factor loadings
        ar_coefs: ndarray
            Autoregressive coefficients
        Q: ndarray
            State innovations covariance
        R: ndarray
            Observation innovations covariance
            
        Returns:
        --------
        tuple
            Filtered states, log-likelihood, and related quantities
            
        References:
        -----------
        - Durbin, J., & Koopman, S. J. (2012). Time Series Analysis by State Space Methods: 
          Second Edition. Oxford University Press.
        """
        T, n_vars = Y.shape
        n_factors = self.n_factors
        ar_lags = self.ar_lags
        
        # State dimension
        n_states = n_factors * ar_lags
        
        # Initialize state matrices - only store current and next state to save memory
        alpha_curr = np.zeros(n_states)  # Current filtered state
        P_curr = np.eye(n_states) * 10   # Current filtered state covariance (diffuse prior)
        
        # For smoothing, we need to store all states and their covariances
        # Use more efficient storage - only store what we need
        alpha = np.zeros((T+1, n_states))  # Filtered states
        P = np.zeros((T+1, n_states, n_states))  # Filtered state covariance
        alpha[0] = alpha_curr
        P[0] = P_curr
        
        # For speed, pre-allocate and reuse memory for these matrices
        v_full = np.zeros(n_vars)  # Full prediction error vector
        F_full = np.zeros((n_vars, n_vars))  # Full prediction error covariance
        K_full = np.zeros((n_states, n_vars))  # Full Kalman gain
        
        # Innovation vectors and covariance matrices - only store what's needed for smoothing
        v = np.zeros((T, n_vars))  # Prediction errors
        F = np.zeros((T, n_vars, n_vars))  # Prediction error covariance
        K = np.zeros((T, n_states, n_vars))  # Kalman gain
        
        # Log-likelihood components
        loglik = 0
        
        # Create transition matrix (companion form for VAR)
        transition = np.zeros((n_states, n_states))
        transition[:n_factors, :] = ar_coefs
        for i in range(1, ar_lags):
            transition[i*n_factors:(i+1)*n_factors, (i-1)*n_factors:i*n_factors] = np.eye(n_factors)
        
        # Create selection matrix for innovations
        selection = np.zeros((n_states, n_factors))
        selection[:n_factors, :] = np.eye(n_factors)
        
        # Create design matrix for observation equation
        design = np.zeros((n_vars, n_states))
        design[:, :n_factors] = loadings
        
        # Calculate state innovation covariance matrix once
        Q_state = selection @ Q @ selection.T
        
        # Run the Kalman filter
        for t in range(T):
            # Forecast step
            alpha_pred = transition @ alpha_curr
            P_pred = transition @ P_curr @ transition.T + Q_state
            
            # Get observation for current time step
            y_t = Y[t]
            
            # Handle missing data
            mask = ~np.isnan(y_t)
            if np.any(mask):
                # Adapt matrices for missing data
                y_observed = y_t[mask]
                design_observed = design[mask]
                R_observed = R[mask][:, mask]
                
                # Calculate prediction error
                v_t = y_observed - design_observed @ alpha_pred
                
                # Calculate prediction error covariance
                F_t = design_observed @ P_pred @ design_observed.T + R_observed
                
                # Symmetrize F_t for numerical stability
                F_t = (F_t + F_t.T) / 2
                
                # Calculate Kalman gain
                try:
                    # Try Cholesky decomposition first (faster, more stable)
                    L = np.linalg.cholesky(F_t)
                    L_inv = np.linalg.inv(L)
                    F_inv = L_inv.T @ L_inv
                except np.linalg.LinAlgError:
                    # Fall back to pseudo-inverse if Cholesky fails
                    # Use more stable SVD-based pseudoinverse
                    F_inv = np.linalg.pinv(F_t, rcond=1e-12)
                
                K_t = P_pred @ design_observed.T @ F_inv
                
                # Update step
                alpha_curr = alpha_pred + K_t @ v_t
                P_curr = P_pred - K_t @ design_observed @ P_pred
                
                # Store for smoothing
                v_full[:] = np.nan  # Reset to NaN
                v_full[mask] = v_t
                v[t] = v_full
                
                F_full[:] = 0  # Reset to zeros
                F_full[mask][:, mask] = F_t
                F[t] = F_full
                
                K_full[:] = 0  # Reset to zeros
                K_full[:, mask] = K_t
                K[t] = K_full
                
                # Update log-likelihood if we have valid observation
                # log L = -0.5 * (k*log(2π) + log|F_t| + v_t'F_t^(-1)v_t)
                sign, logdet = np.linalg.slogdet(F_t)
                if sign > 0:  # Ensure positive determinant
                    loglik -= 0.5 * (len(y_observed) * np.log(2 * np.pi) + logdet + v_t.T @ F_inv @ v_t)
            else:
                # If all data is missing, just use predicted values
                alpha_curr = alpha_pred
                P_curr = P_pred
            
            # Store current state and covariance for smoothing
            alpha[t+1] = alpha_curr
            P[t+1] = P_curr
        
        return alpha, P, v, F, K, loglik
    
    def _kalman_smoother(self, alpha_filtered, P_filtered, v, F, K, transition):
        """
        Run Kalman smoother for the dynamic factor model.
        
        Parameters:
        -----------
        alpha_filtered: ndarray
            Filtered states
        P_filtered: ndarray
            Filtered state covariance
        v: ndarray
            Prediction errors
        F: ndarray
            Prediction error covariance
        K: ndarray
            Kalman gain
        transition: ndarray
            Transition matrix
            
        Returns:
        --------
        tuple
            Smoothed states and covariance
            
        References:
        -----------
        - Durbin, J., & Koopman, S. J. (2012). Time Series Analysis by State Space Methods: 
          Second Edition. Oxford University Press.
        """
        T = len(alpha_filtered) - 1
        n_states = alpha_filtered.shape[1]
        
        # Smoothed states and covariance
        alpha_smoothed = np.zeros_like(alpha_filtered)
        P_smoothed = np.zeros_like(P_filtered)
        
        # Initialize with final filtered values
        alpha_smoothed[T] = alpha_filtered[T]
        P_smoothed[T] = P_filtered[T]
        
        # Run the Kalman smoother backwards
        for t in range(T-1, -1, -1):
            # Calculate smoothing quantities
            L = transition - K[t] @ F[t] @ transition
            
            # Smoothing recursion
            alpha_smoothed[t] = alpha_filtered[t] + L.T @ (alpha_smoothed[t+1] - transition @ alpha_filtered[t])
            P_smoothed[t] = P_filtered[t] + L.T @ (P_smoothed[t+1] - P_filtered[t+1]) @ L
        
        return alpha_smoothed, P_smoothed
    
    def _em_step(self, Y, loadings, ar_coefs, Q, R):
        """
        Perform one step of the EM algorithm.
        
        Parameters:
        -----------
        Y: ndarray
            Data matrix (time × variables)
        loadings: ndarray
            Factor loadings
        ar_coefs: ndarray
            Autoregressive coefficients
        Q: ndarray
            State innovations covariance
        R: ndarray
            Observation innovations covariance
            
        Returns:
        --------
        tuple
            Updated parameters and log-likelihood
            
        References:
        -----------
        - Shumway, R. H., & Stoffer, D. S. (1982). An approach to time series smoothing and 
          forecasting using the EM algorithm. Journal of Time Series Analysis, 3(4), 253-264.
        - Doz, C., Giannone, D., & Reichlin, L. (2011). A two-step estimator for large 
          approximate dynamic factor models based on Kalman filtering. Journal of 
          Econometrics, 164(1), 188-205.
        """
        T, n_vars = Y.shape
        n_factors = self.n_factors
        ar_lags = self.ar_lags
        n_states = n_factors * ar_lags
        
        # Create transition matrix
        transition = np.zeros((n_states, n_states))
        transition[:n_factors, :] = ar_coefs
        for i in range(1, ar_lags):
            transition[i*n_factors:(i+1)*n_factors, (i-1)*n_factors:i*n_factors] = np.eye(n_factors)
        
        # Create design matrix
        design = np.zeros((n_vars, n_states))
        design[:, :n_factors] = loadings
        
        # Create selection matrix
        selection = np.zeros((n_states, n_factors))
        selection[:n_factors, :] = np.eye(n_factors)
        
        # 1. E-step: Run Kalman filter and smoother
        alpha_filtered, P_filtered, v, F, K, loglik = self._kalman_filter(Y, loadings, ar_coefs, Q, R)
        alpha_smoothed, P_smoothed = self._kalman_smoother(alpha_filtered, P_filtered, v, F, K, transition)
        
        # Process in batches to reduce memory usage
        batch_size = min(100, T//4)  # Use reasonable batch size
        
        # Initialize parameter update accumulators
        loadings_numerator = np.zeros((n_vars, n_factors))
        loadings_denominator = np.zeros((n_factors, n_factors))
        ar_numerator = np.zeros((n_factors, n_factors * ar_lags))
        ar_denominator = np.zeros((n_factors * ar_lags, n_factors * ar_lags))
        Q_new = np.zeros((n_factors, n_factors))
        R_new = np.zeros((n_vars, n_vars))
        obs_count = np.zeros(n_vars)
        
        # Process data in batches to update parameters
        for batch_start in range(0, T, batch_size):
            batch_end = min(batch_start + batch_size, T)
            
            # Update loadings (Lambda) batch
            for t in range(batch_start, batch_end):
                y_t = Y[t]
                mask = ~np.isnan(y_t)
                if np.any(mask):
                    alpha_t = alpha_smoothed[t][:n_factors]
                    P_t = P_smoothed[t][:n_factors, :n_factors]
                    loadings_numerator[mask] += np.outer(y_t[mask], alpha_t)
                    loadings_denominator += P_t + np.outer(alpha_t, alpha_t)
            
            # Update AR coefficients (A) batch
            for t in range(batch_start + 1, batch_end):
                alpha_t = alpha_smoothed[t][:n_factors]
                alpha_lag = alpha_smoothed[t-1][:n_factors*ar_lags]
                
                # For cross-covariance, we need a simplification
                # Instead of storing all P_cross matrices, compute directly
                P_t = P_smoothed[t][:n_factors, :n_factors]
                P_lag = P_smoothed[t-1][:n_factors*ar_lags, :n_factors*ar_lags]
                
                # Simplified P_cross approximation - works well in practice
                P_cross = np.zeros((n_factors, n_factors*ar_lags))
                if t > 0:
                    # Use a simplified approach that still captures the cross-covariance
                    P_cross = transition[:n_factors, :] @ P_lag
                
                ar_numerator += np.outer(alpha_t, alpha_lag) + P_cross
                ar_denominator += P_lag + np.outer(alpha_lag, alpha_lag)
            
            # Update Q batch
            for t in range(batch_start + 1, batch_end):
                alpha_t = alpha_smoothed[t][:n_factors]
                alpha_pred = transition @ alpha_smoothed[t-1][:n_states]
                alpha_pred = alpha_pred[:n_factors]  # Only first n_factors states are directly affected
                
                P_t = P_smoothed[t][:n_factors, :n_factors]
                P_pred = transition @ P_smoothed[t-1] @ transition.T
                P_pred = P_pred[:n_factors, :n_factors]  # Only use factor block
                
                # Error in state prediction
                pred_error = alpha_t - alpha_pred
                
                # Update Q - simplified to avoid storing P_cross
                P_cross_approx = transition[:n_factors, :] @ P_smoothed[t-1][:, :n_factors]
                
                Q_new += P_t - P_cross_approx - P_cross_approx.T + np.outer(pred_error, pred_error)
            
            # Update R batch
            for t in range(batch_start, batch_end):
                y_t = Y[t]
                mask = ~np.isnan(y_t)
                obs_count[mask] += 1
                
                if np.any(mask):
                    alpha_t = alpha_smoothed[t][:n_factors]
                    P_t = P_smoothed[t][:n_factors, :n_factors]
                    
                    # Predicted observation
                    y_pred = loadings[mask] @ alpha_t
                    
                    # Observation prediction error
                    obs_error = y_t[mask] - y_pred
                    
                    # Update R (diagonal elements only)
                    for i, idx in enumerate(np.where(mask)[0]):
                        R_new[idx, idx] += obs_error[i]**2 + loadings[idx] @ P_t @ loadings[idx].T
        
        # Finalize parameter updates
        # Update loadings
        new_loadings = loadings_numerator @ np.linalg.pinv(loadings_denominator)
        
        # Update AR coefficients
        new_ar_coefs = ar_numerator @ np.linalg.pinv(ar_denominator)
        
        # Constrain AR coefficients to ensure stationarity
        for i in range(n_factors):
            coefs = new_ar_coefs[i, i::n_factors]
            if len(coefs) > 0:  # Check if we have any AR coefficients
                try:
                    stable_coefs = constrain_stationary_univariate(coefs)
                    new_ar_coefs[i, i::n_factors] = stable_coefs
                except:
                    # If constraint fails, keep old coefficients
                    pass
        
        # Update Q
        Q_new /= (T - 1)
        
        # Ensure Q is positive definite
        Q_new = (Q_new + Q_new.T) / 2  # Make symmetric
        eigvals, eigvecs = np.linalg.eigh(Q_new)
        eigvals = np.maximum(eigvals, 1e-6)  # Ensure positive eigenvalues
        Q_new = eigvecs @ np.diag(eigvals) @ eigvecs.T
        
        # Update R
        for i in range(n_vars):
            if obs_count[i] > 0:
                R_new[i, i] /= obs_count[i]
            else:
                # Keep old value if no observations
                R_new[i, i] = R[i, i]
        
        # Ensure R is positive definite
        R_diag = np.diag(R_new)
        R_diag = np.maximum(R_diag, 1e-6)  # Ensure positive values
        R_new = np.diag(R_diag)
        
        return new_loadings, new_ar_coefs, Q_new, R_new, loglik, alpha_smoothed, P_smoothed
    
    def fit(self, X):
        """
        Fit the Dynamic Factor Model using EM algorithm.
        
        Parameters:
        -----------
        X: DataFrame or ndarray
            Data matrix (time × variables)
            
        Returns:
        --------
        self
            Fitted model instance
        """
        # Convert to numpy array if DataFrame
        if isinstance(X, pd.DataFrame):
            self.column_names = X.columns
            self.index = X.index
            X = X.values
        else:
            self.column_names = [f"Var{i}" for i in range(X.shape[1])]
            self.index = np.arange(X.shape[0])
        
        # Store dimensions
        self.n_obs, self.n_vars = X.shape
        
        # Initialize parameters
        loadings, ar_coefs, Q, R = self._init_parameters(X)
        
        # EM iterations
        prev_loglik = -np.inf
        
        for iteration in range(self.max_iter):
            # Perform one EM step
            loadings, ar_coefs, Q, R, loglik, factors, factor_cov = self._em_step(X, loadings, ar_coefs, Q, R)
            
            # Check for convergence
            if np.abs(loglik - prev_loglik) < self.tol:
                print(f"EM algorithm converged after {iteration+1} iterations")
                break
            
            prev_loglik = loglik
            
            if (iteration + 1) % 10 == 0:
                print(f"Iteration {iteration+1}, Log-likelihood: {loglik:.4f}")
        
        # Store final parameters
        self.loadings = loadings
        self.ar_coefs = ar_coefs
        self.Q = Q
        self.R = R
        self.log_likelihood = loglik
        
        # Extract factors (first n_factors states from smoothed estimates)
        self.factors = factors[:, :self.n_factors]
        self.factor_cov = factor_cov[:, :self.n_factors, :self.n_factors]
        
        return self
    
    def transform(self, X=None):
        """
        Extract factors from data.
        
        Parameters:
        -----------
        X: DataFrame or ndarray, optional
            New data to transform. If None, use the data used for fitting.
            
        Returns:
        --------
        ndarray
            Extracted factors
        """
        if X is None:
            # Return factors estimated during fitting
            return self.factors
        
        # Convert to numpy array if DataFrame
        if isinstance(X, pd.DataFrame):
            X = X.values
        
        # Check if X has the same number of variables
        if X.shape[1] != self.n_vars:
            raise ValueError(f"X has {X.shape[1]} variables, but the model was "
                            f"fitted with {self.n_vars} variables")
        
        # Run Kalman filter on new data
        alpha, P, _, _, _, _ = self._kalman_filter(X, self.loadings, self.ar_coefs, self.Q, self.R)
        
        # Extract factors (first n_factors states)
        return alpha[1:, :self.n_factors]  # Use filtered estimates (shifted by 1)
    
    def get_factor_loadings(self):
        """
        Get factor loadings as a DataFrame.
        
        Returns:
        --------
        pandas.DataFrame
            Factor loadings
        """
        if self.loadings is None:
            raise ValueError("Model has not been fitted yet")
        
        factor_names = [f"Factor{i+1}" for i in range(self.n_factors)]
        return pd.DataFrame(self.loadings, index=self.column_names, columns=factor_names)
    
    def get_factors(self):
        """
        Get extracted factors as a DataFrame.
        
        Returns:
        --------
        pandas.DataFrame
            Extracted factors
        """
        if self.factors is None:
            raise ValueError("Model has not been fitted yet")
        
        factor_names = [f"Factor{i+1}" for i in range(self.n_factors)]
        return pd.DataFrame(self.factors, index=self.index, columns=factor_names)
    
    def forecast(self, steps=1):
        """
        Forecast factors for future time periods.
        
        Parameters:
        -----------
        steps: int
            Number of steps ahead to forecast
            
        Returns:
        --------
        pandas.DataFrame
            Forecasted factors
        """
        if self.factors is None:
            raise ValueError("Model has not been fitted yet")
        
        # Get the last state
        n_states = self.n_factors * self.ar_lags
        last_state = np.zeros(n_states)
        
        # Fill in most recent observations
        for i in range(min(self.ar_lags, len(self.factors))):
            if i < len(self.factors):
                last_state[i*self.n_factors:(i+1)*self.n_factors] = self.factors[-i-1]
        
        # Create transition matrix
        transition = np.zeros((n_states, n_states))
        transition[:self.n_factors, :] = self.ar_coefs
        for i in range(1, self.ar_lags):
            transition[i*self.n_factors:(i+1)*self.n_factors, (i-1)*self.n_factors:i*self.n_factors] = np.eye(self.n_factors)
        
        # Iterate through forecast horizon
        forecasts = np.zeros((steps, self.n_factors))
        current_state = last_state
        
        for t in range(steps):
            # Forecast next state
            next_state = transition @ current_state
            
            # Store the forecast
            forecasts[t] = next_state[:self.n_factors]
            
            # Update current state
            current_state = next_state
        
        # Create forecast index
        last_date = self.index[-1]
        if isinstance(last_date, pd.Timestamp):
            # Infer frequency
            freq = pd.infer_freq(self.index)
            if freq is None:
                # Try to infer from last few observations
                freq = pd.infer_freq(self.index[-5:])
            
            if freq is None:
                # Fallback to day frequency
                freq = 'D'
            
            forecast_index = pd.date_range(start=last_date, periods=steps+1, freq=freq)[1:]
        else:
            forecast_index = np.arange(self.index[-1] + 1, self.index[-1] + steps + 1)
        
        # Create factor names
        factor_names = [f"Factor{i+1}" for i in range(self.n_factors)]
        
        return pd.DataFrame(forecasts, index=forecast_index, columns=factor_names)

# Module 4: MIDAS Implementation for GDP Forecasting

In [5]:
import numpy as np
import pandas as pd
from scipy.optimize import minimize
from sklearn.linear_model import Ridge
import warnings

class MIDASRegressor:
    """
    Mixed-Data Sampling (MIDAS) regression for mixed-frequency time series.
    
    This implementation allows for flexible weighting functions to combine 
    high-frequency data for predicting low-frequency targets.
    
    References:
    -----------
    - Ghysels, E., Santa-Clara, P., & Valkanov, R. (2004). The MIDAS touch: Mixed data 
      sampling regression models. Working paper, University of North Carolina.
    - Ghysels, E., Sinko, A., & Valkanov, R. (2007). MIDAS regressions: Further results 
      and new directions. Econometric Reviews, 26(1), 53-90.
    """
    
    def __init__(self, weight_function='exponential_almon', max_lags=12, 
                 n_weight_params=2, ar_lags=4, regularization=0.0, 
                 max_iter=1000, tol=1e-6, random_state=None):
        """
        Initialize MIDAS regressor.
        
        Parameters:
        -----------
        weight_function: str or callable
            Weighting function. Options are:
            - 'exponential_almon': Exponential Almon lag polynomial
            - 'beta': Beta function
            - A custom callable function with signature w(lag, params)
        max_lags: int
            Maximum number of lags for high-frequency variables
        n_weight_params: int
            Number of parameters in the weighting function
        ar_lags: int
            Number of autoregressive lags for the target variable
        regularization: float
            Ridge regularization parameter
        max_iter: int
            Maximum number of iterations for optimization
        tol: float
            Tolerance for optimization convergence
        random_state: int or None
            Random state for initialization
        """
        self.max_lags = max_lags
        self.n_weight_params = n_weight_params
        self.ar_lags = ar_lags
        self.regularization = regularization
        self.max_iter = max_iter
        self.tol = tol
        self.random_state = random_state
        
        # Set weighting function
        if weight_function == 'exponential_almon':
            self.weight_function = self._exponential_almon_weights
        elif weight_function == 'beta':
            self.weight_function = self._beta_weights
        elif callable(weight_function):
            self.weight_function = weight_function
        else:
            raise ValueError("weight_function must be 'exponential_almon', 'beta', or a callable")
        
        # Initialize parameters
        self.weight_params = None
        self.coef_ = None
        self.intercept_ = None
        self.fit_intercept = True
        
        # Initialize random number generator
        self.rng = np.random.RandomState(random_state)
    
    def _exponential_almon_weights(self, lag, params):
        """
        Exponential Almon lag polynomial weighting function.
        
        Parameters:
        -----------
        lag: ndarray
            Lag indices (0, 1, ..., max_lags-1)
        params: ndarray
            Parameters for the weighting function
            
        Returns:
        --------
        ndarray
            Weights for each lag
            
        References:
        -----------
        - Ghysels, E., Sinko, A., & Valkanov, R. (2007). MIDAS regressions: Further results 
          and new directions. Econometric Reviews, 26(1), 53-90.
        """
        if len(params) < 2:
            # Need at least two parameters
            params = np.array([params[0], 0.0])
        
        # Normalize lags to [0, 1]
        x = lag / (self.max_lags - 1) if self.max_lags > 1 else 0
        
        # Calculate weights
        exponent = params[0] * x + params[1] * x**2
        weights = np.exp(exponent)
        
        # Normalize weights to sum to 1
        weights = weights / weights.sum()
        
        return weights
    
    def _beta_weights(self, lag, params):
        """
        Beta function weighting.
        
        Parameters:
        -----------
        lag: ndarray
            Lag indices (0, 1, ..., max_lags-1)
        params: ndarray
            Parameters for the weighting function
            
        Returns:
        --------
        ndarray
            Weights for each lag
            
        References:
        -----------
        - Ghysels, E., Sinko, A., & Valkanov, R. (2007). MIDAS regressions: Further results 
          and new directions. Econometric Reviews, 26(1), 53-90.
        """
        if len(params) < 2:
            # Need at least two parameters
            params = np.array([1.0, 5.0])
        
        # Ensure parameters are positive
        a = np.abs(params[0])
        b = np.abs(params[1])
        
        # Normalize lags to [0, 1]
        x = lag / (self.max_lags - 1) if self.max_lags > 1 else 0
        
        # Calculate weights using beta function
        weights = x**(a-1) * (1-x)**(b-1)
        
        # Handle potential numerical issues
        weights = np.nan_to_num(weights, nan=0.0, posinf=0.0, neginf=0.0)
        
        # Normalize weights to sum to 1
        if weights.sum() > 0:
            weights = weights / weights.sum()
        else:
            # If all weights are zero, use uniform weights
            weights = np.ones_like(weights) / len(weights)
        
        return weights
    
    def _aggregate_high_frequency(self, X_hf, weight_params):
        """
        Aggregate high-frequency variables using weighting function.
        
        Parameters:
        -----------
        X_hf: list of ndarrays
            List of high-frequency variables, each with shape (n_samples, max_lags)
        weight_params: ndarray
            Parameters for the weighting function
            
        Returns:
        --------
        ndarray
            Aggregated high-frequency variables, shape (n_samples, n_hf_variables)
        """
        n_samples = X_hf[0].shape[0]
        n_hf_vars = len(X_hf)
        
        # Initialize aggregated variables
        X_aggregated = np.zeros((n_samples, n_hf_vars))
        
        # Calculate weights
        lags = np.arange(self.max_lags)
        weights = self.weight_function(lags, weight_params)
        
        # Apply weights to each high-frequency variable
        for i, X_var in enumerate(X_hf):
            # Weighted sum across lags
            X_aggregated[:, i] = np.sum(X_var * weights, axis=1)
        
        return X_aggregated
    
    def _objective_function(self, weight_params, X_hf, X_ar, y):
        """
        Objective function for MIDAS parameter optimization.
        
        Parameters:
        -----------
        weight_params: ndarray
            Parameters for the weighting function
        X_hf: list of ndarrays
            List of high-frequency variables
        X_ar: ndarray
            Autoregressive features
        y: ndarray
            Target variable
            
        Returns:
        --------
        float
            Mean squared error with regularization
        """
        # Aggregate high-frequency variables
        X_midas = self._aggregate_high_frequency(X_hf, weight_params)
        
        # Combine with autoregressive features
        if X_ar is not None:
            X = np.column_stack([X_ar, X_midas])
        else:
            X = X_midas
        
        # Add intercept
        if self.fit_intercept:
            X = np.column_stack([np.ones(X.shape[0]), X])
        
        # Compute coefficients using Ridge regression
        coef = np.linalg.lstsq(
            X.T @ X + self.regularization * np.eye(X.shape[1]),
            X.T @ y,
            rcond=None
        )[0]
        
        # Calculate predictions
        y_pred = X @ coef
        
        # Calculate MSE
        mse = np.mean((y - y_pred) ** 2)
        
        return mse
    
    def fit(self, X_hf, y, X_ar=None):
        """
        Fit MIDAS regression model.
        
        Parameters:
        -----------
        X_hf: list of DataFrames or ndarrays
            List of high-frequency variables, each with shape (n_samples, max_lags)
        y: DataFrame or ndarray
            Target variable
        X_ar: DataFrame or ndarray, optional
            Autoregressive features
            
        Returns:
        --------
        self
            Fitted model instance
        """
        # Convert inputs to numpy arrays if needed
        if isinstance(y, pd.Series) or isinstance(y, pd.DataFrame):
            y = y.values.flatten()
        else:
            y = np.asarray(y).flatten()
        
        # Process high-frequency variables
        X_hf_arrays = []
        for X in X_hf:
            if isinstance(X, pd.DataFrame):
                X_hf_arrays.append(X.values)
            else:
                X_hf_arrays.append(np.asarray(X))
        
        # Process autoregressive features
        if X_ar is not None:
            if isinstance(X_ar, pd.DataFrame):
                X_ar = X_ar.values
            else:
                X_ar = np.asarray(X_ar)
        
        # Initialize weight parameters
        init_params = self.rng.normal(0, 0.1, self.n_weight_params)
        
        # Use bounded optimization for better stability
        bounds = [(-10, 10)] * self.n_weight_params
        
        # Optimize weight parameters
        result = minimize(
            self._objective_function,
            init_params,
            args=(X_hf_arrays, X_ar, y),
            method='L-BFGS-B',
            bounds=bounds,
            options={'maxiter': self.max_iter, 'gtol': self.tol}
        )
        
        self.weight_params = result.x
        
        # Calculate final weights
        lags = np.arange(self.max_lags)
        self.weights_ = self.weight_function(lags, self.weight_params)
        
        # Aggregate high-frequency variables with optimized weights
        X_midas = self._aggregate_high_frequency(X_hf_arrays, self.weight_params)
        
        # Combine with autoregressive features
        if X_ar is not None:
            X = np.column_stack([X_ar, X_midas])
        else:
            X = X_midas
        
        # Fit Ridge regression
        ridge = Ridge(alpha=self.regularization, fit_intercept=self.fit_intercept)
        ridge.fit(X, y)
        
        # Store coefficients
        if self.fit_intercept:
            self.intercept_ = ridge.intercept_
            self.coef_ = ridge.coef_
        else:
            self.intercept_ = 0.0
            self.coef_ = ridge.coef_
        
        return self
    
    def predict(self, X_hf, X_ar=None):
        """
        Make predictions with fitted model.
        
        Parameters:
        -----------
        X_hf: list of DataFrames or ndarrays
            List of high-frequency variables
        X_ar: DataFrame or ndarray, optional
            Autoregressive features
            
        Returns:
        --------
        ndarray
            Predictions
        """
        if self.weight_params is None:
            raise ValueError("Model has not been fitted yet")
        
        # Convert inputs to numpy arrays if needed
        X_hf_arrays = []
        for X in X_hf:
            if isinstance(X, pd.DataFrame):
                X_hf_arrays.append(X.values)
            else:
                X_hf_arrays.append(np.asarray(X))
        
        if X_ar is not None:
            if isinstance(X_ar, pd.DataFrame):
                X_ar = X_ar.values
            else:
                X_ar = np.asarray(X_ar)
        
        # Aggregate high-frequency variables with fitted weights
        X_midas = self._aggregate_high_frequency(X_hf_arrays, self.weight_params)
        
        # Combine with autoregressive features
        if X_ar is not None:
            X = np.column_stack([X_ar, X_midas])
        else:
            X = X_midas
        
        # Make predictions
        if self.fit_intercept:
            y_pred = self.intercept_ + X @ self.coef_
        else:
            y_pred = X @ self.coef_
        
        return y_pred
    
    def get_midas_weights(self):
        """
        Get the MIDAS weighting function parameters and weights.
        
        Returns:
        --------
        dict
            Dictionary with parameters and weights
        """
        if self.weight_params is None:
            raise ValueError("Model has not been fitted yet")
        
        lags = np.arange(self.max_lags)
        weights = self.weight_function(lags, self.weight_params)
        
        return {
            'parameters': self.weight_params,
            'weights': weights,
            'lags': lags
        }

# Module 5: Hierarchical GDP Prediction System

In [6]:
class HierarchicalGDPPredictor:
    """
    Hierarchical system for GDP prediction using multi-frequency data.
    
    This class orchestrates the hierarchical prediction process:
    1. Daily factors extraction
    2. Weekly factors extraction with daily inputs
    3. Monthly factors extraction with weekly inputs
    4. Quarterly GDP prediction with monthly inputs
    
    Attributes:
        daily_model: DynamicFactorModel for daily data
        weekly_model: DynamicFactorModel for weekly data
        monthly_model: DynamicFactorModel for monthly data
        gdp_model: MIDASRegressor or scikit-learn regressor for GDP prediction
    """
    
    def __init__(self, 
                 daily_factors=5, weekly_factors=3, monthly_factors=3,
                 daily_ar_lags=2, weekly_ar_lags=2, monthly_ar_lags=2, gdp_ar_lags=4,
                 use_midas=True, midas_max_lags=6, random_state=None):
        """
        Initialize the hierarchical GDP prediction system.
        
        Parameters:
        -----------
        daily_factors: int
            Number of factors to extract from daily data
        weekly_factors: int
            Number of factors to extract from weekly data
        monthly_factors: int
            Number of factors to extract from monthly data
        daily_ar_lags: int
            Autoregressive lags for daily factors
        weekly_ar_lags: int
            Autoregressive lags for weekly factors
        monthly_ar_lags: int
            Autoregressive lags for monthly factors
        gdp_ar_lags: int
            Autoregressive lags for GDP
        use_midas: bool
            Whether to use MIDAS for the final GDP prediction
        midas_max_lags: int
            Maximum number of lags for MIDAS
        random_state: int or None
            Random state for reproducibility
        """
        self.daily_factors = daily_factors
        self.weekly_factors = weekly_factors
        self.monthly_factors = monthly_factors
        
        self.daily_ar_lags = daily_ar_lags
        self.weekly_ar_lags = weekly_ar_lags
        self.monthly_ar_lags = monthly_ar_lags
        self.gdp_ar_lags = gdp_ar_lags
        
        self.use_midas = use_midas
        self.midas_max_lags = midas_max_lags
        self.random_state = random_state
        
        # Initialize models
        self.daily_model = DynamicFactorModel(
            n_factors=daily_factors,
            ar_lags=daily_ar_lags,
            random_state=random_state
        )
        
        self.weekly_model = DynamicFactorModel(
            n_factors=weekly_factors,
            ar_lags=weekly_ar_lags,
            random_state=random_state
        )
        
        self.monthly_model = DynamicFactorModel(
            n_factors=monthly_factors,
            ar_lags=monthly_ar_lags,
            random_state=random_state
        )
        
        # Initialize GDP model based on configuration
        if use_midas:
            self.gdp_model = MIDASRegressor(
                weight_function='exponential_almon',
                max_lags=midas_max_lags,
                n_weight_params=2,
                ar_lags=gdp_ar_lags,
                regularization=0.01,
                random_state=random_state
            )
        else:
            # Use Ridge regression as fallback
            from sklearn.linear_model import Ridge
            self.gdp_model = Ridge(alpha=0.01)
        
        # Storage for fitted factors
        self.daily_factors_df = None
        self.weekly_factors_df = None
        self.monthly_factors_df = None
        self.is_fitted = False
    
    def fit_daily_model(self, daily_df):
        """
        Fit the daily DFM model.
        
        Parameters:
        -----------
        daily_df: pandas.DataFrame
            Daily data with technical indicators
            
        Returns:
        --------
        pandas.DataFrame
            Extracted daily factors
        """
        print(f"Fitting daily model with {daily_df.shape[1]} features")
        self.daily_model.fit(daily_df)
        self.daily_factors_df = self.daily_model.get_factors()
        
        print(f"Extracted {self.daily_factors_df.shape[1]} daily factors")
        return self.daily_factors_df
    
    def fit_weekly_model(self, weekly_df, daily_factors=None):
        """
        Fit the weekly DFM model, incorporating daily factors if provided.
        
        Parameters:
        -----------
        weekly_df: pandas.DataFrame
            Weekly data with technical indicators
        daily_factors: pandas.DataFrame, optional
            Daily factors aligned to weekly dates
            
        Returns:
        --------
        pandas.DataFrame
            Extracted weekly factors
        """
        if daily_factors is not None:
            # Ensure indices align
            common_index = weekly_df.index.intersection(daily_factors.index)
            weekly_df = weekly_df.loc[common_index]
            daily_factors = daily_factors.loc[common_index]
            
            # Combine weekly data with daily factors
            combined_df = pd.concat([weekly_df, daily_factors], axis=1)
            print(f"Fitting weekly model with {weekly_df.shape[1]} weekly features and {daily_factors.shape[1]} daily factors")
        else:
            combined_df = weekly_df
            print(f"Fitting weekly model with {weekly_df.shape[1]} features (no daily factors)")
        
        self.weekly_model.fit(combined_df)
        self.weekly_factors_df = self.weekly_model.get_factors()
        
        print(f"Extracted {self.weekly_factors_df.shape[1]} weekly factors")
        return self.weekly_factors_df
    
    def fit_monthly_model(self, monthly_df, weekly_factors=None):
        """
        Fit the monthly DFM model, incorporating weekly factors if provided.
        
        Parameters:
        -----------
        monthly_df: pandas.DataFrame
            Monthly data with technical indicators
        weekly_factors: pandas.DataFrame, optional
            Weekly factors aligned to monthly dates
            
        Returns:
        --------
        pandas.DataFrame
            Extracted monthly factors
        """
        if weekly_factors is not None:
            # Ensure indices align
            common_index = monthly_df.index.intersection(weekly_factors.index)
            monthly_df = monthly_df.loc[common_index]
            weekly_factors = weekly_factors.loc[common_index]
            
            # Combine monthly data with weekly factors
            combined_df = pd.concat([monthly_df, weekly_factors], axis=1)
            print(f"Fitting monthly model with {monthly_df.shape[1]} monthly features and {weekly_factors.shape[1]} weekly factors")
        else:
            combined_df = monthly_df
            print(f"Fitting monthly model with {monthly_df.shape[1]} features (no weekly factors)")
        
        self.monthly_model.fit(combined_df)
        self.monthly_factors_df = self.monthly_model.get_factors()
        
        print(f"Extracted {self.monthly_factors_df.shape[1]} monthly factors")
        return self.monthly_factors_df
    
    def fit_gdp_model(self, gdp_series, monthly_factors, use_ar=True):
        """
        Fit the GDP prediction model.
        
        Parameters:
        -----------
        gdp_series: pandas.Series
            GDP growth rates
        monthly_factors: pandas.DataFrame
            Monthly factors aligned to quarterly dates
        use_ar: bool
            Whether to use autoregressive components
            
        Returns:
        --------
        self
            Fitted instance
        """
        # Align indices
        common_index = gdp_series.index.intersection(monthly_factors.index)
        y = gdp_series.loc[common_index]
        X_monthly = monthly_factors.loc[common_index]
        
        if self.use_midas:
            # Prepare data for MIDAS model
            # We need to create lag structure for monthly factors
            X_lags = []
            for col in X_monthly.columns:
                # Create lag matrix for each factor
                lag_matrix = pd.DataFrame(index=X_monthly.index)
                for lag in range(self.midas_max_lags):
                    lag_matrix[f"{col}_lag{lag}"] = X_monthly[col].shift(lag)
                
                # Forward fill any NaNs at the beginning
                lag_matrix = lag_matrix.fillna(method='ffill')
                X_lags.append(lag_matrix.values)
            
            # Prepare autoregressive features
            if use_ar and self.gdp_ar_lags > 0:
                X_ar = pd.DataFrame(index=y.index)
                for lag in range(1, self.gdp_ar_lags + 1):
                    X_ar[f"GDP_lag{lag}"] = y.shift(lag)
                
                # Forward fill any NaNs at the beginning
                X_ar = X_ar.fillna(method='ffill')
                
                # Remove rows with NaNs (initial periods where lags aren't available)
                mask = ~X_ar.isna().any(axis=1)
                X_ar = X_ar[mask]
                y = y[mask]
                X_lags = [X[mask] for X in X_lags]
                
                print(f"Fitting MIDAS model with {len(X_lags)} monthly factors, {X_ar.shape[1]} GDP lags, {len(y)} observations")
                self.gdp_model.fit(X_lags, y, X_ar)
            else:
                # No autoregressive features
                # Remove rows with NaNs in the factors
                mask = ~pd.DataFrame(np.column_stack(X_lags)).isna().any(axis=1)
                y = y[mask]
                X_lags = [X[mask] for X in X_lags]
                
                print(f"Fitting MIDAS model with {len(X_lags)} monthly factors, {len(y)} observations, no autoregressive terms")
                self.gdp_model.fit(X_lags, y)
        else:
            # Using standard Ridge regression
            # Prepare autoregressive features
            if use_ar and self.gdp_ar_lags > 0:
                X_ar = pd.DataFrame(index=y.index)
                for lag in range(1, self.gdp_ar_lags + 1):
                    X_ar[f"GDP_lag{lag}"] = y.shift(lag)
                
                # Combine with monthly factors
                X_combined = pd.concat([X_monthly, X_ar], axis=1)
            else:
                X_combined = X_monthly
            
            # Handle NaNs
            X_combined = X_combined.fillna(method='ffill')
            mask = ~X_combined.isna().any(axis=1) & ~y.isna()
            X_combined = X_combined[mask]
            y = y[mask]
            
            print(f"Fitting Ridge regression with {X_combined.shape[1]} features, {len(y)} observations")
            self.gdp_model.fit(X_combined, y)
        
        self.is_fitted = True
        return self
    
    def fit(self, daily_df, weekly_df, monthly_df, gdp_series, 
            align_dates=True, use_ar=True):
        """
        Fit the complete hierarchical model.
        
        Parameters:
        -----------
        daily_df: pandas.DataFrame
            Daily data with technical indicators
        weekly_df: pandas.DataFrame
            Weekly data with technical indicators
        monthly_df: pandas.DataFrame
            Monthly data with technical indicators
        gdp_series: pandas.Series
            GDP growth rates
        align_dates: bool
            Whether to automatically align dates across frequencies
        use_ar: bool
            Whether to use autoregressive components in each step
            
        Returns:
        --------
        self
            Fitted instance
        """
        # 1. Fit daily model and get factors
        daily_factors = self.fit_daily_model(daily_df)
        
        # 2. Align daily factors to weekly dates
        if align_dates:
            # Find weekly dates
            weekly_dates = weekly_df.index
            
            # Align daily factors to weekly dates (use last available)
            aligned_daily_factors = pd.DataFrame(index=weekly_dates)
            
            for weekly_date in weekly_dates:
                daily_data = daily_factors[daily_factors.index <= weekly_date]
                if not daily_data.empty:
                    aligned_daily_factors.loc[weekly_date] = daily_data.iloc[-1]
            
            # Forward fill any NaNs
            aligned_daily_factors = aligned_daily_factors.fillna(method='ffill')
        else:
            aligned_daily_factors = daily_factors
        
        # 3. Fit weekly model with daily factors
        weekly_factors = self.fit_weekly_model(weekly_df, aligned_daily_factors)
        
        # 4. Align weekly factors to monthly dates
        if align_dates:
            # Find monthly dates
            monthly_dates = monthly_df.index
            
            # Align weekly factors to monthly dates (use last available)
            aligned_weekly_factors = pd.DataFrame(index=monthly_dates)
            
            for monthly_date in monthly_dates:
                weekly_data = weekly_factors[weekly_factors.index <= monthly_date]
                if not weekly_data.empty:
                    aligned_weekly_factors.loc[monthly_date] = weekly_data.iloc[-1]
            
            # Forward fill any NaNs
            aligned_weekly_factors = aligned_weekly_factors.fillna(method='ffill')
        else:
            aligned_weekly_factors = weekly_factors
        
        # 5. Fit monthly model with weekly factors
        monthly_factors = self.fit_monthly_model(monthly_df, aligned_weekly_factors)
        
        # 6. Align monthly factors to quarterly GDP dates
        if align_dates:
            # Find quarterly dates
            quarterly_dates = gdp_series.index
            
            # Align monthly factors to quarterly dates (use last available)
            aligned_monthly_factors = pd.DataFrame(index=quarterly_dates)
            
            for quarterly_date in quarterly_dates:
                monthly_data = monthly_factors[monthly_factors.index <= quarterly_date]
                if not monthly_data.empty:
                    aligned_monthly_factors.loc[quarterly_date] = monthly_data.iloc[-1]
            
            # Forward fill any NaNs
            aligned_monthly_factors = aligned_monthly_factors.fillna(method='ffill')
        else:
            aligned_monthly_factors = monthly_factors
        
        # 7. Fit GDP model with monthly factors
        self.fit_gdp_model(gdp_series, aligned_monthly_factors, use_ar)
        
        return self
    
    def predict(self, daily_df=None, weekly_df=None, monthly_df=None, gdp_history=None,
                predict_date=None, steps_ahead=1):
        """
        Generate GDP predictions using the hierarchical model.
        
        Parameters:
        -----------
        daily_df: pandas.DataFrame, optional
            Daily data for prediction period
        weekly_df: pandas.DataFrame, optional
            Weekly data for prediction period
        monthly_df: pandas.DataFrame, optional
            Monthly data for prediction period
        gdp_history: pandas.Series, optional
            Historical GDP data for autoregressive features
        predict_date: datetime or str, optional
            Date for which to generate prediction
        steps_ahead: int
            Number of steps ahead to forecast
            
        Returns:
        --------
        pandas.Series
            GDP growth predictions
        """
        if not self.is_fitted:
            raise ValueError("Model has not been fitted yet. Call fit() first.")
        
        # 1. Process daily data and extract factors
        if daily_df is not None:
            # Transform daily data to factors
            daily_factors = pd.DataFrame(
                self.daily_model.transform(daily_df),
                index=daily_df.index,
                columns=[f"DailyFactor{i+1}" for i in range(self.daily_factors)]
            )
        else:
            # Use existing daily factors
            daily_factors = self.daily_factors_df
        
        # 2. Process weekly data and extract factors
        if weekly_df is not None:
            # Align daily factors to weekly dates if needed
            if daily_factors is not None:
                # Find weekly dates
                weekly_dates = weekly_df.index
                
                # Align daily factors to weekly dates (use last available)
                aligned_daily_factors = pd.DataFrame(index=weekly_dates)
                
                for weekly_date in weekly_dates:
                    daily_data = daily_factors[daily_factors.index <= weekly_date]
                    if not daily_data.empty:
                        aligned_daily_factors.loc[weekly_date] = daily_data.iloc[-1]
                
                # Forward fill any NaNs
                aligned_daily_factors = aligned_daily_factors.fillna(method='ffill')
                
                # Combine weekly data with aligned daily factors
                combined_weekly = pd.concat([weekly_df, aligned_daily_factors], axis=1)
            else:
                combined_weekly = weekly_df
            
            # Transform to weekly factors
            weekly_factors = pd.DataFrame(
                self.weekly_model.transform(combined_weekly),
                index=weekly_df.index,
                columns=[f"WeeklyFactor{i+1}" for i in range(self.weekly_factors)]
            )
        else:
            # Use existing weekly factors
            weekly_factors = self.weekly_factors_df
        
        # 3. Process monthly data and extract factors
        if monthly_df is not None:
            # Align weekly factors to monthly dates if needed
            if weekly_factors is not None:
                # Find monthly dates
                monthly_dates = monthly_df.index
                
                # Align weekly factors to monthly dates (use last available)
                aligned_weekly_factors = pd.DataFrame(index=monthly_dates)
                
                for monthly_date in monthly_dates:
                    weekly_data = weekly_factors[weekly_factors.index <= monthly_date]
                    if not weekly_data.empty:
                        aligned_weekly_factors.loc[monthly_date] = weekly_data.iloc[-1]
                
                # Forward fill any NaNs
                aligned_weekly_factors = aligned_weekly_factors.fillna(method='ffill')
                
                # Combine monthly data with aligned weekly factors
                combined_monthly = pd.concat([monthly_df, aligned_weekly_factors], axis=1)
            else:
                combined_monthly = monthly_df
            
            # Transform to monthly factors
            monthly_factors = pd.DataFrame(
                self.monthly_model.transform(combined_monthly),
                index=monthly_df.index,
                columns=[f"MonthlyFactor{i+1}" for i in range(self.monthly_factors)]
            )
        else:
            # Use existing monthly factors
            monthly_factors = self.monthly_factors_df
        
        # 4. Use monthly factors to predict GDP
        if predict_date is not None:
            # Filter data up to predict_date
            monthly_factors = monthly_factors[monthly_factors.index <= predict_date]
        
        # If we have a GDP model using MIDAS
        if self.use_midas:
            # Prepare data for MIDAS model
            # We need to create lag structure for monthly factors
            X_lags = []
            for col in monthly_factors.columns:
                # Create lag matrix for each factor
                lag_matrix = pd.DataFrame(index=monthly_factors.index)
                for lag in range(self.midas_max_lags):
                    lag_matrix[f"{col}_lag{lag}"] = monthly_factors[col].shift(lag)
                
                # Forward fill any NaNs at the beginning
                lag_matrix = lag_matrix.fillna(method='ffill')
                X_lags.append(lag_matrix.values)
            
            # Prepare autoregressive features if needed
            if gdp_history is not None and self.gdp_ar_lags > 0:
                X_ar = pd.DataFrame(index=monthly_factors.index)
                for lag in range(1, self.gdp_ar_lags + 1):
                    X_ar[f"GDP_lag{lag}"] = gdp_history.shift(lag)
                
                # Forward fill any NaNs at the beginning
                X_ar = X_ar.fillna(method='ffill')
                
                # Make prediction
                gdp_pred = self.gdp_model.predict(X_lags, X_ar)
            else:
                # No autoregressive features
                gdp_pred = self.gdp_model.predict(X_lags)
        else:
            # Using standard Ridge regression
            # Prepare autoregressive features if needed
            if gdp_history is not None and self.gdp_ar_lags > 0:
                X_ar = pd.DataFrame(index=monthly_factors.index)
                for lag in range(1, self.gdp_ar_lags + 1):
                    X_ar[f"GDP_lag{lag}"] = gdp_history.shift(lag)
                
                # Combine with monthly factors
                X_combined = pd.concat([monthly_factors, X_ar], axis=1)
            else:
                X_combined = monthly_factors
            
            # Handle NaNs
            X_combined = X_combined.fillna(method='ffill')
            
            # Make prediction
            gdp_pred = self.gdp_model.predict(X_combined)
        
        # Convert to pandas Series
        gdp_predictions = pd.Series(gdp_pred, index=monthly_factors.index, name="GDP_prediction")
        
        return gdp_predictions
    
    def get_factor_loadings(self):
        """
        Get factor loadings for all levels.
        
        Returns:
        --------
        dict
            Dictionary of factor loadings
        """
        if not self.is_fitted:
            raise ValueError("Model has not been fitted yet")
        
        loadings = {
            'daily': self.daily_model.get_factor_loadings(),
            'weekly': self.weekly_model.get_factor_loadings(),
            'monthly': self.monthly_model.get_factor_loadings()
        }
        
        return loadings
    
    def get_factors(self):
        """
        Get extracted factors for all levels.
        
        Returns:
        --------
        dict
            Dictionary of factors
        """
        if not self.is_fitted:
            raise ValueError("Model has not been fitted yet")
        
        factors = {
            'daily': self.daily_factors_df,
            'weekly': self.weekly_factors_df,
            'monthly': self.monthly_factors_df
        }
        
        return factors
    
    def get_midas_weights(self):
        """
        Get MIDAS weights if using MIDAS model.
        
        Returns:
        --------
        dict
            Dictionary of MIDAS weights
        """
        if not self.is_fitted:
            raise ValueError("Model has not been fitted yet")
        
        if not self.use_midas:
            return None
        
        return self.gdp_model.get_midas_weights()

# Module 6: Evaluation Framework

In [7]:
class GDPForecastEvaluator:
    """
    Evaluation framework for GDP forecasting models.
    
    This class provides comprehensive evaluation metrics and visualizations
    for GDP forecasting performance.
    
    References:
    -----------
    - Clements, M. P., & Hendry, D. F. (1993). On the limitations of comparing mean 
      square forecast errors. Journal of Forecasting, 12(8), 617-637.
    - Diebold, F. X., & Mariano, R. S. (1995). Comparing predictive accuracy. 
      Journal of Business & Economic Statistics, 13(3), 253-263.
    """
    
    def __init__(self):
        """Initialize the evaluator."""
        self.results = {}
        self.models = {}
        self.actual = None
    
    def add_model(self, name, predictions, actual=None):
        """
        Add a model's predictions for evaluation.
        
        Parameters:
        -----------
        name: str
            Model name
        predictions: pandas.Series
            Predicted GDP values
        actual: pandas.Series, optional
            Actual GDP values (if not already set)
        """
        self.models[name] = predictions
        
        if actual is not None and self.actual is None:
            self.actual = actual
    
    def calculate_metrics(self, rolling_window=None):
        """
        Calculate evaluation metrics for all models.
        
        Parameters:
        -----------
        rolling_window: int, optional
            Window size for rolling metrics calculation
            
        Returns:
        --------
        dict
            Dictionary of evaluation metrics
        """
        if self.actual is None:
            raise ValueError("Actual values not set. Provide actual values when adding a model.")
        
        results = {}
        
        for model_name, predictions in self.models.items():
            # Align predictions with actual values
            common_index = self.actual.index.intersection(predictions.index)
            y_true = self.actual.loc[common_index]
            y_pred = predictions.loc[common_index]
            
            # Calculate metrics
            metrics = self._calculate_model_metrics(y_true, y_pred, model_name)
            
            # Add rolling metrics if requested
            if rolling_window is not None and len(y_true) > rolling_window:
                rolling_metrics = self._calculate_rolling_metrics(y_true, y_pred, rolling_window)
                metrics.update(rolling_metrics)
            
            results[model_name] = metrics
        
        self.results = results
        return results
    
    def _calculate_model_metrics(self, y_true, y_pred, model_name):
        """
        Calculate comprehensive evaluation metrics for a model.
        
        Parameters:
        -----------
        y_true: pandas.Series
            Actual values
        y_pred: pandas.Series
            Predicted values
        model_name: str
            Model name
            
        Returns:
        --------
        dict
            Dictionary of evaluation metrics
        """
        from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
        
        # Calculate basic error metrics
        mse = mean_squared_error(y_true, y_pred)
        rmse = np.sqrt(mse)
        mae = mean_absolute_error(y_true, y_pred)
        r2 = r2_score(y_true, y_pred)
        
        # Calculate directional accuracy
        direction_true = np.sign(y_true.diff().fillna(0))
        direction_pred = np.sign(y_pred.diff().fillna(0))
        
        # Ignore zero changes
        nonzero_mask = direction_true != 0
        if nonzero_mask.any():
            direction_accuracy = np.mean(direction_true[nonzero_mask] == direction_pred[nonzero_mask])
        else:
            direction_accuracy = np.nan
        
        # Calculate mean absolute percentage error
        # Use a safe version to handle zeros
        nonzero_mask = y_true != 0
        if nonzero_mask.any():
            mape = np.mean(np.abs((y_true[nonzero_mask] - y_pred[nonzero_mask]) / y_true[nonzero_mask])) * 100
        else:
            mape = np.nan
        
        # Calculate Theil's U statistic
        # U = sqrt(MSE(model)) / sqrt(MSE(naive))
        # Naive forecast is previous value (no change)
        naive_pred = y_true.shift(1).fillna(method='bfill')
        naive_mse = mean_squared_error(y_true[1:], naive_pred[1:])
        
        if naive_mse > 0:
            theils_u = np.sqrt(mse) / np.sqrt(naive_mse)
        else:
            theils_u = np.nan
        
        # Calculate advanced forecast accuracy metrics
        
        # Mean Directional Accuracy (MDA)
        actual_changes = y_true.diff().fillna(0)
        predicted_changes = y_pred.diff().fillna(0)
        mda = np.mean((actual_changes * predicted_changes) > 0)
        
        # Confusion matrix for directional forecasts
        direction_true_binary = (actual_changes > 0).astype(int)
        direction_pred_binary = (predicted_changes > 0).astype(int)
        
        true_pos = np.sum((direction_true_binary == 1) & (direction_pred_binary == 1))
        false_pos = np.sum((direction_true_binary == 0) & (direction_pred_binary == 1))
        true_neg = np.sum((direction_true_binary == 0) & (direction_pred_binary == 0))
        false_neg = np.sum((direction_true_binary == 1) & (direction_pred_binary == 0))
        
        # Hit rate (% of positive changes correctly predicted)
        if (true_pos + false_neg) > 0:
            hit_rate = true_pos / (true_pos + false_neg)
        else:
            hit_rate = np.nan
        
        # False alarm rate (% of negative changes incorrectly predicted as positive)
        if (false_pos + true_neg) > 0:
            false_alarm_rate = false_pos / (false_pos + true_neg)
        else:
            false_alarm_rate = np.nan
        
        # Calculate over/underprediction bias
        bias = np.mean(y_pred - y_true)
        
        # Create results dictionary
        metrics = {
            'rmse': rmse,
            'mae': mae,
            'mape': mape,
            'r2': r2,
            'direction_accuracy': direction_accuracy,
            'theils_u': theils_u,
            'mean_directional_accuracy': mda,
            'hit_rate': hit_rate,
            'false_alarm_rate': false_alarm_rate,
            'bias': bias,
            'confusion_matrix': {
                'true_pos': true_pos,
                'false_pos': false_pos,
                'true_neg': true_neg,
                'false_neg': false_neg
            },
            'forecast_errors': y_pred - y_true
        }
        
        return metrics
    
    def _calculate_rolling_metrics(self, y_true, y_pred, window):
        """
        Calculate rolling evaluation metrics.
        
        Parameters:
        -----------
        y_true: pandas.Series
            Actual values
        y_pred: pandas.Series
            Predicted values
        window: int
            Rolling window size
            
        Returns:
        --------
        dict
            Dictionary of rolling metrics
        """
        # Initialize rolling metrics
        rolling_rmse = []
        rolling_mae = []
        rolling_direction_accuracy = []
        
        # Loop through rolling windows
        for i in range(len(y_true) - window + 1):
            window_true = y_true.iloc[i:i+window]
            window_pred = y_pred.iloc[i:i+window]
            
            # Calculate metrics for this window
            mse = np.mean((window_true - window_pred) ** 2)
            rmse = np.sqrt(mse)
            mae = np.mean(np.abs(window_true - window_pred))
            
            # Calculate directional accuracy
            direction_true = np.sign(window_true.diff().fillna(0))
            direction_pred = np.sign(window_pred.diff().fillna(0))
            
            # Ignore zero changes
            nonzero_mask = direction_true != 0
            if nonzero_mask.any():
                direction_accuracy = np.mean(direction_true[nonzero_mask] == direction_pred[nonzero_mask])
            else:
                direction_accuracy = np.nan
            
            # Add to lists
            rolling_rmse.append(rmse)
            rolling_mae.append(mae)
            rolling_direction_accuracy.append(direction_accuracy)
        
        # Convert to pandas Series with appropriate index
        index = y_true.index[window-1:]
        rolling_metrics = {
            'rolling_rmse': pd.Series(rolling_rmse, index=index[:len(rolling_rmse)]),
            'rolling_mae': pd.Series(rolling_mae, index=index[:len(rolling_mae)]),
            'rolling_direction_accuracy': pd.Series(rolling_direction_accuracy, index=index[:len(rolling_direction_accuracy)])
        }
        
        return rolling_metrics
    
    def diebold_mariano_test(self, model1, model2, alternative='two-sided'):
        """
        Perform Diebold-Mariano test to compare forecast accuracy.
        
        Parameters:
        -----------
        model1: str
            First model name
        model2: str
            Second model name
        alternative: str
            Alternative hypothesis ('two-sided', 'less', 'greater')
            
        Returns:
        --------
        tuple
            DM statistic and p-value
            
        References:
        -----------
        - Diebold, F. X., & Mariano, R. S. (1995). Comparing predictive accuracy. 
          Journal of Business & Economic Statistics, 13(3), 253-263.
        """
        import statsmodels.api as sm
        
        if model1 not in self.models or model2 not in self.models:
            raise ValueError(f"Models {model1} and/or {model2} not found")
        
        # Get predictions
        pred1 = self.models[model1]
        pred2 = self.models[model2]
        
        # Align predictions with actual values
        common_index = self.actual.index.intersection(pred1.index).intersection(pred2.index)
        y_true = self.actual.loc[common_index]
        y_pred1 = pred1.loc[common_index]
        y_pred2 = pred2.loc[common_index]
        
        # Calculate squared errors
        error1 = (y_true - y_pred1) ** 2
        error2 = (y_true - y_pred2) ** 2
        
        # Calculate loss differential
        d = error1 - error2
        
        # Calculate DM statistic
        n = len(d)
        if n <= 1:
            return np.nan, np.nan
        
        # Estimate lag-1 autocorrelation of loss differential
        acf_result = sm.tsa.acf(d, nlags=1, fft=False)
        gamma_0 = acf_result[0]  # This is the variance of d
        gamma_1 = acf_result[1] * gamma_0  # Autocovariance at lag 1
        
        # Calculate long-run variance with Newey-West correction for autocorrelation
        lrvar = gamma_0 + 2 * gamma_1
        
        # Calculate DM statistic
        dm_stat = d.mean() / np.sqrt(lrvar / n)
        
        # Calculate p-value based on alternative hypothesis
        if alternative == 'two-sided':
            p_value = 2 * (1 - norm.cdf(np.abs(dm_stat)))
        elif alternative == 'less':
            p_value = norm.cdf(dm_stat)
        elif alternative == 'greater':
            p_value = 1 - norm.cdf(dm_stat)
        else:
            raise ValueError("alternative must be 'two-sided', 'less', or 'greater'")
        
        return dm_stat, p_value
    
    def plot_forecasts(self, start_date=None, end_date=None, figsize=(12, 6)):
        """
        Plot actual vs predicted GDP.
        
        Parameters:
        -----------
        start_date: str or datetime, optional
            Start date for plot
        end_date: str or datetime, optional
            End date for plot
        figsize: tuple
            Figure size
            
        Returns:
        --------
        matplotlib.figure.Figure
            Figure object
        """
        if self.actual is None:
            raise ValueError("Actual values not set")
        
        # Filter by date range if provided
        actual = self.actual
        if start_date is not None:
            actual = actual[actual.index >= pd.to_datetime(start_date)]
        if end_date is not None:
            actual = actual[actual.index <= pd.to_datetime(end_date)]
        
        # Create plot
        fig, ax = plt.subplots(figsize=figsize)
        
        # Plot actual values
        ax.plot(actual.index, actual, 'k-', linewidth=2, label='Actual GDP')
        
        # Plot predictions for each model
        colors = plt.cm.tab10.colors
        for i, (model_name, predictions) in enumerate(self.models.items()):
            # Filter predictions by date range
            pred = predictions
            if start_date is not None:
                pred = pred[pred.index >= pd.to_datetime(start_date)]
            if end_date is not None:
                pred = pred[pred.index <= pd.to_datetime(end_date)]
            
            # Only use shared dates
            common_index = actual.index.intersection(pred.index)
            pred = pred.loc[common_index]
            
            color = colors[i % len(colors)]
            ax.plot(pred.index, pred, 'o-', color=color, linewidth=1.5, label=f'{model_name}')
        
        # Add recession shading if available
        try:
            from pandas_datareader.data import DataReader
            from pandas_datareader._utils import RemoteDataError
            
            try:
                # Get US recession data from FRED
                recession = DataReader('USREC', 'fred', start=actual.index[0], end=actual.index[-1])
                
                # Create shaded regions for recessions
                last_date = None
                for date, value in recession.itertuples():
                    if value == 1.0:  # Recession period
                        if last_date is None:
                            last_date = date
                    elif last_date is not None:
                        # End of recession period
                        ax.axvspan(last_date, date, alpha=0.2, color='gray')
                        last_date = None
                
                # Handle case where we're still in a recession at the end of the data
                if last_date is not None:
                    ax.axvspan(last_date, actual.index[-1], alpha=0.2, color='gray')
            except RemoteDataError:
                print("Could not retrieve recession data from FRED")
        except ImportError:
            print("pandas_datareader not available for recession shading")
        
        # Add legend, grid, labels, etc.
        ax.set_xlabel('Date')
        ax.set_ylabel('GDP Growth (%)')
        ax.set_title('GDP Growth: Actual vs Predicted')
        ax.legend(loc='best')
        ax.grid(True, alpha=0.3)
        
        # Format y-axis to show percentage
        ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, _: f'{x:.1f}%'))
        
        plt.tight_layout()
        return fig
    
    def plot_error_distribution(self, figsize=(12, 8)):
        """
        Plot error distributions for all models.
        
        Parameters:
        -----------
        figsize: tuple
            Figure size
            
        Returns:
        --------
        matplotlib.figure.Figure
            Figure object
        """
        if not self.results:
            self.calculate_metrics()
        
        n_models = len(self.models)
        fig, axes = plt.subplots(n_models, 2, figsize=figsize)
        
        # Handle case with single model
        if n_models == 1:
            axes = axes.reshape(1, 2)
        
        # Iterate through models
        for i, (model_name, metrics) in enumerate(self.results.items()):
            errors = metrics['forecast_errors']
            
            # Histogram of errors
            bins = min(20, max(5, int(np.sqrt(len(errors)))))
            axes[i, 0].hist(errors, bins=bins, alpha=0.7, edgecolor='black')
            axes[i, 0].axvline(x=0, color='r', linestyle='--')
            axes[i, 0].set_title(f'{model_name}: Error Distribution')
            axes[i, 0].set_xlabel('Forecast Error (Predicted - Actual)')
            axes[i, 0].set_ylabel('Frequency')
            
            # Add metrics to plot
            metrics_text = (
                f"RMSE: {metrics['rmse']:.4f}\n"
                f"MAE: {metrics['mae']:.4f}\n"
                f"Bias: {metrics['bias']:.4f}\n"
                f"Dir. Acc: {metrics['direction_accuracy']:.2f}"
            )
            axes[i, 0].annotate(
                metrics_text, xy=(0.05, 0.95), xycoords='axes fraction',
                va='top', ha='left', bbox=dict(boxstyle='round', fc='white', alpha=0.7)
            )
            
            # Q-Q plot
            from scipy import stats
            
            # Get z-scores for normal distribution
            z = (errors - errors.mean()) / errors.std()
            
            # Create Q-Q plot
            stats.probplot(z, dist="norm", plot=axes[i, 1])
            axes[i, 1].set_title(f'{model_name}: Q-Q Plot')
        
        plt.tight_layout()
        return fig
    
    def plot_rolling_metrics(self, window=8, figsize=(12, 15)):
        """
        Plot rolling metrics for all models.
        
        Parameters:
        -----------
        window: int
            Rolling window size
        figsize: tuple
            Figure size
            
        Returns:
        --------
        matplotlib.figure.Figure
            Figure object
        """
        # Ensure we have rolling metrics
        if not self.results or 'rolling_rmse' not in next(iter(self.results.values())):
            self.calculate_metrics(rolling_window=window)
        
        fig, axes = plt.subplots(3, 1, figsize=figsize)
        
        # Plot rolling RMSE
        for model_name, metrics in self.results.items():
            axes[0].plot(
                metrics['rolling_rmse'].index,
                metrics['rolling_rmse'],
                'o-',
                label=model_name
            )
        
        axes[0].set_title(f'Rolling RMSE ({window}-quarter window)')
        axes[0].set_ylabel('RMSE')
        axes[0].grid(True, alpha=0.3)
        axes[0].legend(loc='best')
        
        # Plot rolling MAE
        for model_name, metrics in self.results.items():
            axes[1].plot(
                metrics['rolling_mae'].index,
                metrics['rolling_mae'],
                'o-',
                label=model_name
            )
        
        axes[1].set_title(f'Rolling MAE ({window}-quarter window)')
        axes[1].set_ylabel('MAE')
        axes[1].grid(True, alpha=0.3)
        axes[1].legend(loc='best')
        
        # Plot rolling direction accuracy
        for model_name, metrics in self.results.items():
            axes[2].plot(
                metrics['rolling_direction_accuracy'].index,
                metrics['rolling_direction_accuracy'],
                'o-',
                label=model_name
            )
        
        axes[2].set_title(f'Rolling Direction Accuracy ({window}-quarter window)')
        axes[2].set_ylabel('Direction Accuracy')
        axes[2].set_ylim(0, 1)
        axes[2].grid(True, alpha=0.3)
        axes[2].legend(loc='best')
        
        plt.tight_layout()
        return fig
    
    def generate_report(self, output_file=None, include_plots=True):
        """
        Generate comprehensive evaluation report.
        
        Parameters:
        -----------
        output_file: str, optional
            Path to save report (HTML or markdown)
        include_plots: bool
            Whether to include plots in the report
            
        Returns:
        --------
        str
            Report content
        """
        if not self.results:
            self.calculate_metrics()
        
        # Start building report
        report = "# GDP Forecasting Model Evaluation Report\n\n"
        report += f"Generated on: {pd.Timestamp.now().strftime('%Y-%m-%d %H:%M')}\n\n"
        
        # Add model summary
        report += "## Models Evaluated\n\n"
        report += f"Number of models: {len(self.models)}\n"
        report += f"Evaluation period: {self.actual.index[0]} to {self.actual.index[-1]}\n"
        report += f"Number of observations: {len(self.actual)}\n\n"
        
        # Add performance metrics table
        report += "## Performance Metrics\n\n"
        report += "| Model | RMSE | MAE | MAPE | R² | Direction Accuracy | Theil's U | Bias |\n"
        report += "|-------|------|-----|------|----|--------------------|-----------|------|\n"
        
        for model_name, metrics in self.results.items():
            report += (
                f"| {model_name} | "
                f"{metrics['rmse']:.4f} | "
                f"{metrics['mae']:.4f} | "
                f"{metrics['mape']:.2f}% | "
                f"{metrics['r2']:.4f} | "
                f"{metrics['direction_accuracy']:.2f} | "
                f"{metrics['theils_u']:.4f} | "
                f"{metrics['bias']:.4f} |\n"
            )
        
        report += "\n"
        
        # Add detailed analysis for each model
        report += "## Detailed Model Analysis\n\n"
        
        for model_name, metrics in self.results.items():
            report += f"### {model_name}\n\n"
            
            # Confusion matrix
            cm = metrics['confusion_matrix']
            report += "#### Directional Forecast Confusion Matrix\n\n"
            report += "| | Predicted Up | Predicted Down |\n"
            report += "|------------|--------------|----------------|\n"
            report += f"| **Actual Up** | {cm['true_pos']} | {cm['false_neg']} |\n"
            report += f"| **Actual Down** | {cm['false_pos']} | {cm['true_neg']} |\n\n"
            
            # Additional metrics
            report += "#### Additional Metrics\n\n"
            report += f"* Mean Directional Accuracy: {metrics['mean_directional_accuracy']:.4f}\n"
            report += f"* Hit Rate (% of Up movements correctly predicted): {metrics['hit_rate']:.4f}\n"
            report += f"* False Alarm Rate: {metrics['false_alarm_rate']:.4f}\n"
            report += f"* Bias (Average overestimation): {metrics['bias']:.4f}\n\n"
        
        # Add model comparison using Diebold-Mariano test
        if len(self.models) > 1:
            report += "## Model Comparison: Diebold-Mariano Test\n\n"
            report += "| Model 1 | Model 2 | DM Statistic | p-value | Conclusion |\n"
            report += "|---------|---------|--------------|---------|------------|\n"
            
            models = list(self.models.keys())
            for i in range(len(models)):
                for j in range(i+1, len(models)):
                    dm_stat, p_value = self.diebold_mariano_test(models[i], models[j])
                    
                    # Determine conclusion
                    if p_value < 0.01:
                        significance = "***"
                    elif p_value < 0.05:
                        significance = "**"
                    elif p_value < 0.1:
                        significance = "*"
                    else:
                        significance = ""
                    
                    if np.isnan(dm_stat) or np.isnan(p_value):
                        conclusion = "Insufficient data"
                    elif p_value < 0.05:
                        if dm_stat > 0:
                            conclusion = f"Model 2 is more accurate {significance}"
                        else:
                            conclusion = f"Model 1 is more accurate {significance}"
                    else:
                        conclusion = "No significant difference"
                    
                    report += (
                        f"| {models[i]} | {models[j]} | "
                        f"{dm_stat:.4f} | {p_value:.4f} | {conclusion} |\n"
                    )
            
            report += "\n*Significance levels: *** = 1%, ** = 5%, * = 10%\n\n"
        
        # Add conclusion
        report += "## Conclusion\n\n"
        
        # Determine best model based on metrics
        rmse_ranking = {model: metrics['rmse'] for model, metrics in self.results.items()}
        best_rmse = min(rmse_ranking.items(), key=lambda x: x[1])[0]
        
        dir_acc_ranking = {model: metrics['direction_accuracy'] for model, metrics in self.results.items()}
        best_dir_acc = max(dir_acc_ranking.items(), key=lambda x: x[1])[0]
        
        report += f"Based on RMSE, the best performing model is **{best_rmse}**.\n\n"
        report += f"Based on directional accuracy, the best performing model is **{best_dir_acc}**.\n\n"
        
        # If plots are included and an output file is specified
        if include_plots and output_file:
            # Save plots to files
            import os
            output_dir = os.path.dirname(output_file)
            if output_dir and not os.path.exists(output_dir):
                os.makedirs(output_dir)
            
            # Base file name without extension
            base_name = os.path.splitext(output_file)[0]
            
            # Forecast plot
            forecast_plot_path = f"{base_name}_forecasts.png"
            fig = self.plot_forecasts()
            fig.savefig(forecast_plot_path)
            plt.close(fig)
            
            # Error distribution plot
            error_plot_path = f"{base_name}_errors.png"
            fig = self.plot_error_distribution()
            fig.savefig(error_plot_path)
            plt.close(fig)
            
            # Rolling metrics plot
            rolling_plot_path = f"{base_name}_rolling.png"
            fig = self.plot_rolling_metrics()
            fig.savefig(rolling_plot_path)
            plt.close(fig)
            
            # Add images to report
            report += "## Visualizations\n\n"
            report += "### Forecast Comparison\n\n"
            report += f"![Forecast Comparison]({os.path.basename(forecast_plot_path)})\n\n"
            
            report += "### Error Distribution\n\n"
            report += f"![Error Distribution]({os.path.basename(error_plot_path)})\n\n"
            
            report += "### Rolling Metrics\n\n"
            report += f"![Rolling Metrics]({os.path.basename(rolling_plot_path)})\n\n"
        
        # Save report to file if specified
        if output_file:
            with open(output_file, 'w') as f:
                f.write(report)
        
        return report

# Module 7: Main Workflow Implementation

In [8]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import pickle
import time
import warnings
from tqdm import tqdm
from sklearn.model_selection import TimeSeriesSplit
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_squared_error, r2_score
from scipy.stats import norm

def run_gdp_forecast_workflow(
    data_folder,
    output_folder='./output',
    start_date=None,
    end_date=None,
    train_test_split=0.8,
    use_midas=True,
    use_pure_hierarchical=True,
    daily_factors=5,
    weekly_factors=3,
    monthly_factors=3,
    gdp_ar_lags=4,
    random_state=42,
    save_models=True
):
    """
    Run the complete GDP forecasting workflow.
    
    Parameters:
    -----------
    data_folder: str
        Path to the data folder
    output_folder: str
        Path to the output folder
    start_date: str or None
        Start date for analysis
    end_date: str or None
        End date for analysis
    train_test_split: float
        Proportion of data to use for training
    use_midas: bool
        Whether to use MIDAS for GDP prediction
    use_pure_hierarchical: bool
        Whether to create a pure hierarchical model version
    daily_factors: int
        Number of factors to extract from daily data
    weekly_factors: int
        Number of factors to extract from weekly data
    monthly_factors: int
        Number of factors to extract from monthly data
    gdp_ar_lags: int
        Number of autoregressive lags for GDP
    random_state: int
        Random seed for reproducibility
    save_models: bool
        Whether to save the models
        
    Returns:
    --------
    tuple
        (evaluator, models, preprocessor)
    """
    # Create output folder if it doesn't exist
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
    
    # Set up logging
    log_file = os.path.join(output_folder, 'workflow_log.txt')
    
    def log(message):
        """Log message to file and print to console."""
        with open(log_file, 'a') as f:
            f.write(f"{pd.Timestamp.now()}: {message}\n")
        print(message)
    
    log("=" * 80)
    log(f"Starting GDP Forecasting Workflow at {pd.Timestamp.now()}")
    log("=" * 80)
    
    # 1. Configuration
    log("\n1. Setting up configuration...")
    
    # Daily data configuration
    daily_config = {
        'daily': {
            'files': {
                'Oil_Investing_Fixed.csv': {
                    'columns': ['Close'],
                    'transformations': {'Close': ['raw', 'pct_change']},
                    'start_date': '1983-01-01'
                },
                'Gold_Investing_fixed.csv': {
                    'columns': ['Close'],
                    'transformations': {'Close': ['raw', 'pct_change']},
                    'start_date': '1975-01-01'
                },
                'SPX.csv': {
                    'columns': ['Close'],
                    'transformations': {'Close': ['raw', 'pct_change']},
                    'start_date': '1927-01-01'
                },
                '10Y-03M_YieldCurve_daily.csv': {
                    'columns': ['Close'],
                    'transformations': {'Close': ['raw', 'diff']},
                    'start_date': '1982-01-01'
                },
                '10Y-02Y_YieldCurve_daily.csv': {
                    'columns': ['Close'],
                    'transformations': {'Close': ['raw', 'diff']},
                    'start_date': '1976-01-01'
                },
                'COPPER_Macrotrends_1959.csv': {
                    'columns': ['Close'],
                    'transformations': {'Close': ['raw', 'pct_change']},
                    'start_date': '1959-01-01'
                },
                'Lumber_daily_macrotrends.csv': {
                    'columns': ['Close'],
                    'transformations': {'Close': ['raw', 'pct_change']},
                    'start_date': '1972-01-01'
                }
            },
            'ratios': {
                'Copper_Gold_Ratio': {
                    'numerator': 'COPPER_Macrotrends_1959',
                    'denominator': 'Gold_Investing_fixed',
                    'transformations': ['raw', 'pct_change']
                },
                'Lumber_Gold_Ratio': {
                    'numerator': 'Lumber_daily_macrotrends',
                    'denominator': 'Gold_Investing_fixed',
                    'transformations': ['raw', 'pct_change']
                }
            }
        }
    }
    
    # Weekly data configuration
    weekly_config = {
        'weekly': {
            'files': {
                'InitialUnemploymentClaims_weekly.csv': {
                    'columns': ['Value'],
                    'transformations': {'Value': ['raw', 'pct_change']},
                    'start_date': '1967-01-01'
                },
                'Fiancial_conditions.csv': {
                    'columns': ['Value'],
                    'transformations': {'Value': ['raw', 'diff']},
                    'start_date': '1971-01-01'
                }
            }
        }
    }
    
    # Monthly data configuration
    monthly_config = {
        'monthly': {
            'files': {
                'CPI_mon_monthly.csv': {
                    'columns': ['Value'],
                    'transformations': {'Value': ['raw', 'pct_change']},
                    'start_date': '1955-01-01'
                },
                'Unemployment_monthly.csv': {
                    'columns': ['Value'],
                    'transformations': {'Value': ['raw', 'diff']},
                    'start_date': '1948-01-01'
                },
                'InterestRate_monthly.csv': {
                    'columns': ['Value'],
                    'transformations': {'Value': ['raw', 'diff']},
                    'start_date': '1954-01-01'
                },
                'HousingStarts_monthly.csv': {
                    'columns': ['Value'],
                    'transformations': {'Value': ['raw', 'pct_change']},
                    'start_date': '1959-01-01'
                },
                'Heavy_Truck_Sales.csv': {
                    'columns': ['Value'],
                    'transformations': {'Value': ['raw', 'pct_change']},
                    'start_date': '1967-01-01'
                },
                'Manufacturing_Production_Motor_and_Vehicle_Parts.csv': {
                    'columns': ['Value'],
                    'transformations': {'Value': ['raw', 'pct_change']},
                    'start_date': '1972-01-01'
                },
                'Consumer_Confidence.csv': {
                    'columns': ['Value'],
                    'transformations': {'Value': ['raw', 'diff']},
                    'start_date': '1960-01-01'
                }
            }
        }
    }
    
    # Quarterly data configuration
    quarterly_config = {
        'quarterly': {
            'files': {
                'GDP_quaterly.csv': {
                    'columns': ['Value'],
                    'transformations': {'Value': ['raw', 'pct_change']},
                    'start_date': '1947-01-01'
                }
            }
        }
    }
    
    # Combine all configurations
    data_config = {}
    data_config.update(daily_config)
    data_config.update(weekly_config)
    data_config.update(monthly_config)
    data_config.update(quarterly_config)
    
    log(f"Configuration set up with {len(daily_config['daily']['files'])} daily files, " +
        f"{len(weekly_config['weekly']['files'])} weekly files, " +
        f"{len(monthly_config['monthly']['files'])} monthly files, " +
        f"{len(quarterly_config['quarterly']['files'])} quarterly files")
    
    # 2. Data Preprocessing
    log("\n2. Data Preprocessing...")
    
    # Initialize preprocessor
    preprocessor = MultiFrequencyPreprocessor(data_folder)
    preprocessor.set_config(data_config)
    
    # Set date range if provided
    if start_date is not None:
        preprocessor.set_date_range(start_date=start_date)
    if end_date is not None:
        preprocessor.set_date_range(end_date=end_date)
    
    # Process data for each frequency
    daily_df = preprocessor.process_frequency_data('daily')
    weekly_df = preprocessor.process_frequency_data('weekly')
    monthly_df = preprocessor.process_frequency_data('monthly')
    quarterly_df = preprocessor.process_frequency_data('quarterly')
    
    log(f"Processed data: daily={daily_df.shape}, weekly={weekly_df.shape}, " +
        f"monthly={monthly_df.shape}, quarterly={quarterly_df.shape}")
    
    # Plot data overview
    try:
        fig = preprocessor.plot_data_overview()
        fig.savefig(os.path.join(output_folder, 'data_overview.png'))
        plt.close(fig)
        log(f"Data overview saved to {os.path.join(output_folder, 'data_overview.png')}")
    except Exception as e:
        log(f"Warning: Could not create data overview plot: {e}")
    
    # 3. Technical Indicators
    log("\n3. Calculating Technical Indicators...")
    
    # Initialize technical indicators calculator
    tech_indicators = MultiFrequencyTechnicalIndicators()
    
    # Calculate technical indicators for each frequency
    daily_indicators = tech_indicators.apply_indicators(daily_df, frequency='daily')
    weekly_indicators = tech_indicators.apply_indicators(weekly_df, frequency='weekly')
    monthly_indicators = tech_indicators.apply_indicators(monthly_df, frequency='monthly')
    quarterly_indicators = tech_indicators.apply_indicators(quarterly_df, frequency='quarterly')
    
    log(f"Calculated technical indicators: daily={daily_indicators.shape}, " +
        f"weekly={weekly_indicators.shape}, monthly={monthly_indicators.shape}, " +
        f"quarterly={quarterly_indicators.shape}")
    
    # 4. Data Alignment for Hierarchical Model
    log("\n4. Aligning Data for Hierarchical Model...")
    
    # Get GDP target series
    gdp_target = quarterly_df['GDP_quaterly_Value_pct_change']
    
    # Align daily data to weekly dates
    daily_to_weekly = preprocessor.align_to_dates(daily_indicators, weekly_indicators.index, method='last')
    log(f"Aligned daily to weekly: {daily_to_weekly.shape}")
    
    # Align combined weekly data to monthly dates
    weekly_to_monthly = preprocessor.align_to_dates(weekly_indicators, monthly_indicators.index, method='last')
    log(f"Aligned weekly to monthly: {weekly_to_monthly.shape}")
    
    # Align combined monthly data to quarterly dates
    monthly_to_quarterly = preprocessor.align_to_dates(monthly_indicators, gdp_target.index, method='last')
    log(f"Aligned monthly to quarterly: {monthly_to_quarterly.shape}")
    
    # 5. Train-Test Split
    log("\n5. Creating Train-Test Split...")
    
    # Determine split point
    n_quarters = len(gdp_target)
    n_train = int(n_quarters * train_test_split)
    split_date = gdp_target.index[n_train]
    
    # Split GDP data
    train_gdp = gdp_target.iloc[:n_train]
    test_gdp = gdp_target.iloc[n_train:]
    
    # Split aligned data
    train_monthly_aligned = monthly_to_quarterly.loc[train_gdp.index]
    test_monthly_aligned = monthly_to_quarterly.loc[test_gdp.index]
    
    log(f"Train-test split at {split_date}: train={len(train_gdp)}, test={len(test_gdp)}")
    
    # 6. Model Building
    log("\n6. Building Models...")
    
    # Initialize models dictionary
    models = {}
    
    # 6.1. Pure Hierarchical Model
    if use_pure_hierarchical:
        log("Building Pure Hierarchical Model...")
        try:
            # Initialize hierarchical predictor with standard settings
            pure_model = HierarchicalGDPPredictor(
                daily_factors=daily_factors,
                weekly_factors=weekly_factors,
                monthly_factors=monthly_factors,
                gdp_ar_lags=gdp_ar_lags,
                use_midas=False,  # Pure hierarchical uses direct factor weighting
                random_state=random_state
            )
            
            # Fit the model with all frequency data
            pure_model.fit(
                daily_df=daily_indicators,
                weekly_df=weekly_indicators,
                monthly_df=monthly_indicators,
                gdp_series=train_gdp,
                align_dates=True,
                use_ar=True
            )
            
            # Store in models dictionary
            models['Pure_Hierarchical'] = pure_model
            log("Pure Hierarchical Model successfully built")
            
            # Save model if requested
            if save_models:
                model_path = os.path.join(output_folder, 'pure_hierarchical_model.pkl')
                with open(model_path, 'wb') as f:
                    pickle.dump(pure_model, f)
                log(f"Pure Hierarchical Model saved to {model_path}")
                
        except Exception as e:
            log(f"Error building Pure Hierarchical Model: {e}")
            import traceback
            traceback.print_exc()
    
    # 6.2. Hierarchical + MIDAS Model
    if use_midas:
        log("Building Hierarchical + MIDAS Model...")
        try:
            # Initialize hierarchical predictor with MIDAS
            hybrid_model = HierarchicalGDPPredictor(
                daily_factors=daily_factors,
                weekly_factors=weekly_factors,
                monthly_factors=monthly_factors,
                gdp_ar_lags=gdp_ar_lags,
                use_midas=True,  # Use MIDAS for final GDP prediction
                midas_max_lags=6,  # Use up to 6 quarters of monthly factors
                random_state=random_state
            )
            
            # Fit the model with all frequency data
            hybrid_model.fit(
                daily_df=daily_indicators,
                weekly_df=weekly_indicators,
                monthly_df=monthly_indicators,
                gdp_series=train_gdp,
                align_dates=True,
                use_ar=True
            )
            
            # Store in models dictionary
            models['Hierarchical_MIDAS'] = hybrid_model
            log("Hierarchical + MIDAS Model successfully built")
            
            # Save model if requested
            if save_models:
                model_path = os.path.join(output_folder, 'hybrid_midas_model.pkl')
                with open(model_path, 'wb') as f:
                    pickle.dump(hybrid_model, f)
                log(f"Hierarchical + MIDAS Model saved to {model_path}")
                
        except Exception as e:
            log(f"Error building Hierarchical + MIDAS Model: {e}")
            import traceback
            traceback.print_exc()
    
    # 6.3. Baseline Models
    log("Building Baseline Models...")
    
    # 6.3.1. AR Model (autoregressive)
    try:
        # Create lag features
        X_ar = pd.DataFrame(index=train_gdp.index)
        for lag in range(1, gdp_ar_lags + 1):
            X_ar[f'lag_{lag}'] = train_gdp.shift(lag)
        
        # Drop rows with NaN values
        valid_rows = ~X_ar.isna().any(axis=1)
        X_ar_valid = X_ar[valid_rows]
        y_ar_valid = train_gdp[valid_rows]
        
        # Fit AR model
        ar_model = Ridge(alpha=0.1, random_state=random_state)
        ar_model.fit(X_ar_valid, y_ar_valid)
        
        # Store model
        models['AR_Baseline'] = ar_model
        log("AR Baseline Model successfully built")
        
        # Save lag columns for prediction
        models['AR_lag_columns'] = X_ar.columns.tolist()
        
    except Exception as e:
        log(f"Error building AR Baseline Model: {e}")
    
    # 6.3.2. MA Model (moving average of previous quarters)
    try:
        # Create different MA versions
        ma_windows = [4]  # 1-year moving average
        
        for window in ma_windows:
            ma_model = {'window': window}
            models[f'MA_{window}_Baseline'] = ma_model
            log(f"MA-{window} Baseline Model defined")
            
    except Exception as e:
        log(f"Error defining MA Baseline Models: {e}")
    
    # 7. Model Evaluation
    log("\n7. Evaluating Models...")
    
    # Create evaluator
    evaluator = GDPForecastEvaluator()
    
    # Set actual values
    evaluator.add_model('Actual', test_gdp, test_gdp)
    
    # Generate predictions for each model
    for model_name, model in models.items():
        try:
            if model_name == 'Pure_Hierarchical':
                # Generate predictions using the hierarchical model
                predictions = model.predict(
                    monthly_df=monthly_indicators,
                    gdp_history=gdp_target,
                    predict_date=None  # Use all data
                )
                
                # Filter to test period
                test_predictions = predictions.loc[test_gdp.index]
                evaluator.add_model(model_name, test_predictions)
                log(f"Generated predictions for {model_name}: {len(test_predictions)} quarters")
                
            elif model_name == 'Hierarchical_MIDAS':
                # Generate predictions using the hybrid model
                predictions = model.predict(
                    monthly_df=monthly_indicators,
                    gdp_history=gdp_target,
                    predict_date=None  # Use all data
                )
                
                # Filter to test period
                test_predictions = predictions.loc[test_gdp.index]
                evaluator.add_model(model_name, test_predictions)
                log(f"Generated predictions for {model_name}: {len(test_predictions)} quarters")
                
            elif model_name == 'AR_Baseline':
                # Create features for test period
                X_ar_test = pd.DataFrame(index=test_gdp.index)
                for lag, col in enumerate(models['AR_lag_columns'], 1):
                    X_ar_test[col] = gdp_target.shift(lag).loc[test_gdp.index]
                
                # Make predictions
                ar_predictions = pd.Series(
                    model.predict(X_ar_test),
                    index=test_gdp.index,
                    name=model_name
                )
                
                evaluator.add_model(model_name, ar_predictions)
                log(f"Generated predictions for {model_name}: {len(ar_predictions)} quarters")
                
            elif 'MA_' in model_name:
                # Get window size from model
                window = model['window']
                
                # Calculate moving average for each test point
                ma_predictions = pd.Series(index=test_gdp.index)
                
                for i, date in enumerate(test_gdp.index):
                    # Get previous window periods
                    hist_data = gdp_target[gdp_target.index < date]
                    if len(hist_data) >= window:
                        ma_predictions[date] = hist_data[-window:].mean()
                    else:
                        # Use all available data if less than window
                        ma_predictions[date] = hist_data.mean() if len(hist_data) > 0 else np.nan
                
                # Fill any missing values
                ma_predictions = ma_predictions.fillna(method='ffill').fillna(0)
                
                evaluator.add_model(model_name, ma_predictions)
                log(f"Generated predictions for {model_name}: {len(ma_predictions)} quarters")
                
        except Exception as e:
            log(f"Error generating predictions for {model_name}: {e}")
            import traceback
            traceback.print_exc()
    
    # Calculate metrics
    log("Calculating evaluation metrics...")
    metrics = evaluator.calculate_metrics(rolling_window=8)
    
    # Output key metrics
    log("\nKey Performance Metrics:")
    log("-" * 80)
    log(f"{'Model':<25} {'RMSE':>10} {'MAE':>10} {'Dir Acc':>10}")
    log("-" * 80)
    
    for model_name, model_metrics in metrics.items():
        if model_name != 'Actual':
            log(f"{model_name:<25} {model_metrics['rmse']:>10.4f} {model_metrics['mae']:>10.4f} {model_metrics['direction_accuracy']:>10.4f}")
    
    # Create plots
    log("\nGenerating evaluation plots...")
    
    try:
        # Forecasts plot
        fig = evaluator.plot_forecasts()
        fig.savefig(os.path.join(output_folder, 'gdp_forecasts.png'))
        plt.close(fig)
        log(f"Forecasts plot saved to {os.path.join(output_folder, 'gdp_forecasts.png')}")
        
        # Error distribution plot
        fig = evaluator.plot_error_distribution()
        fig.savefig(os.path.join(output_folder, 'error_distribution.png'))
        plt.close(fig)
        log(f"Error distribution plot saved to {os.path.join(output_folder, 'error_distribution.png')}")
        
        # Rolling metrics plot
        fig = evaluator.plot_rolling_metrics()
        fig.savefig(os.path.join(output_folder, 'rolling_metrics.png'))
        plt.close(fig)
        log(f"Rolling metrics plot saved to {os.path.join(output_folder, 'rolling_metrics.png')}")
        
    except Exception as e:
        log(f"Error generating evaluation plots: {e}")
    
    # 8. Generate comprehensive report
    log("\n8. Generating Final Report...")
    
    try:
        report_path = os.path.join(output_folder, 'gdp_forecast_evaluation.md')
        report_content = evaluator.generate_report(report_path, include_plots=True)
        log(f"Comprehensive evaluation report saved to {report_path}")
        
    except Exception as e:
        log(f"Error generating evaluation report: {e}")
    
    # 9. Conclusion
    log("\n9. Workflow Completed")
    log("=" * 80)
    log(f"GDP Forecasting Workflow completed at {pd.Timestamp.now()}")
    log("=" * 80)
    
    return evaluator, models, preprocessor

if __name__ == "__main__":
    # Set parameters
    DATA_FOLDER = "./Project_Data"
    OUTPUT_FOLDER = "./output"
    
    # Run the workflow
    evaluator, models, preprocessor = run_gdp_forecast_workflow(
        data_folder=DATA_FOLDER,
        output_folder=OUTPUT_FOLDER,
        start_date='1980-01-01',  # Start date for analysis
        end_date=None,           # End date (use None for all available data)
        train_test_split=0.8,    # Use 80% of data for training
        use_midas=True,          # Use MIDAS for final GDP prediction
        use_pure_hierarchical=True,  # Also build pure hierarchical model
        daily_factors=5,         # Number of daily factors
        weekly_factors=3,        # Number of weekly factors
        monthly_factors=3,       # Number of monthly factors
        gdp_ar_lags=4,           # Number of AR lags for GDP
        random_state=42,         # For reproducibility
        save_models=True         # Save models to files
    )

Starting GDP Forecasting Workflow at 2025-06-04 17:51:27.404505

1. Setting up configuration...
Configuration set up with 7 daily files, 2 weekly files, 7 monthly files, 1 quarterly files

2. Data Preprocessing...
Found 18 files in ./Project_Data
Processing daily data...
Processed Oil_Investing_Fixed.csv: 10950 observations, 2 features
Processed Gold_Investing_fixed.csv: 13099 observations, 2 features
Processed SPX.csv: 25362 observations, 2 features
Processed 10Y-03M_YieldCurve_daily.csv: 11271 observations, 2 features
Processed 10Y-02Y_YieldCurve_daily.csv: 12730 observations, 2 features
Processed COPPER_Macrotrends_1959.csv: 17144 observations, 2 features
Processed Lumber_daily_macrotrends.csv: 13672 observations, 2 features
Created ratio: Copper_Gold_Ratio with 17145 observations
Created ratio: Lumber_Gold_Ratio with 13672 observations
Final daily dataset: 25381 observations, 18 features
Processing weekly data...
Processed InitialUnemploymentClaims_weekly.csv: 3035 observations, 2 

  df = df.asfreq('M', method='ffill')


Processed CPI_mon_monthly.csv: 829 observations, 2 features


  df = df.asfreq('M', method='ffill')


Processed Unemployment_monthly.csv: 925 observations, 2 features


  df = df.asfreq('M', method='ffill')


Processed InterestRate_monthly.csv: 847 observations, 2 features


  df = df.asfreq('M', method='ffill')


Processed HousingStarts_monthly.csv: 792 observations, 2 features


  df = df.asfreq('M', method='ffill')


Processed Heavy_Truck_Sales.csv: 698 observations, 2 features


  df = df.asfreq('M', method='ffill')


Processed Manufacturing_Production_Motor_and_Vehicle_Parts.csv: 637 observations, 2 features


  df = df.asfreq('M', method='ffill')


Processed Consumer_Confidence.csv: 768 observations, 2 features
Final monthly dataset: 926 observations, 14 features
Processing quarterly data...


  df = df.asfreq('Q', method='ffill')


Processed GDP_quaterly.csv: 311 observations, 2 features
Final quarterly dataset: 311 observations, 2 features
Processed data: daily=(25381, 18), weekly=(3038, 4), monthly=(926, 14), quarterly=(311, 2)
Data overview saved to ./output/data_overview.png

3. Calculating Technical Indicators...
Applied indicators to Oil_Investing_Fixed_Close_raw: 45 new features
Applied indicators to Oil_Investing_Fixed_Close_pct_change: 45 new features
Applied indicators to Gold_Investing_fixed_Close_raw: 45 new features
Applied indicators to Gold_Investing_fixed_Close_pct_change: 45 new features
Applied indicators to SPX_Close_raw: 45 new features
Applied indicators to SPX_Close_pct_change: 45 new features
Applied indicators to 10Y-03M_YieldCurve_daily_Close_raw: 45 new features
Applied indicators to 10Y-03M_YieldCurve_daily_Close_diff: 45 new features
Applied indicators to 10Y-02Y_YieldCurve_daily_Close_raw: 45 new features
Applied indicators to 10Y-02Y_YieldCurve_daily_Close_diff: 45 new features
Appli

  aligned_df = aligned_df.ffill().bfill()


Aligned daily to weekly: (3038, 810)


  aligned_df = aligned_df.ffill().bfill()


Aligned weekly to monthly: (926, 180)


  aligned_df = aligned_df.ffill().bfill()
  return ufunc.reduce(obj, axis, dtype, out, **passkwargs)


Aligned monthly to quarterly: (311, 630)

5. Creating Train-Test Split...
Train-test split at 2009-03-31 00:00:00: train=248, test=63

6. Building Models...
Building Pure Hierarchical Model...
Fitting daily model with 810 features


  X_std = (X - np.nanmean(X, axis=0)) / np.nanstd(X, axis=0)
  alpha_curr = alpha_pred + K_t @ v_t
  alpha_pred = transition @ alpha_curr


: 