In [18]:
# --- Cell 1: Setup and Initialization ---
import os
import sys
import logging
import yaml
import pandas as pd
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Any, Union
from datetime import datetime
import pytz  # We'll need to install this

# Add project root to Python path
project_root = str(Path(os.getcwd()).parent)
if project_root not in sys.path:
    sys.path.append(project_root)

# Import project modules
from src.data.data_processor import DataProcessor
from src.data.efficient_data_storage import EfficientDataStorage, DataType, DataStage
from src.features.feature_selector import FeatureSelector
from src.data.feature_generator import FeatureGenerator

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', force=True)

# Load configuration
config_path = os.path.join(project_root, 'config', 'config.yaml')
with open(config_path, 'r') as f:
    config = yaml.safe_load(f)

# Initialize classes
processor = DataProcessor(config, logging.getLogger('Processor'))
storage = EfficientDataStorage(config, logging.getLogger('Storage'))
feature_generator = FeatureGenerator(config, logging.getLogger('FeatureGenerator'))

# Initialize FeatureSelector with correct config structure
feature_selection_config = {
    'data': {
        'features': {
            'base_dir': config['data']['base_dir'],
            'fundamental': config['data']['features']['fundamentals']
        }
    },
    'feature_selection': config['feature_selection'],
    'output': {
        'results_dir': os.path.join(config['data']['base_dir'], 'results', 'feature_selection')
    }
}

# Generate descriptive run name with Sydney time if no custom name provided
def generate_run_name(config: dict) -> str:
    """Generate a descriptive run name with Sydney timestamp."""
    # Get Sydney time
    sydney_tz = pytz.timezone('Australia/Sydney')
    sydney_time = datetime.now(sydney_tz)
    
    # Extract feature selection parameters
    method = config['feature_selection'].get('method', 'shap_threshold')[:4]
    min_thresh = f"t{int(config['feature_selection'].get('min_threshold', 0.01) * 100)}"
    min_feat = f"min{config['feature_selection'].get('min_features', 10)}"
    max_feat = f"max{config['feature_selection'].get('max_features', 40)}"
    cumul = "cum" if config['feature_selection'].get('use_cumulative', True) else "nocum"
    cumul_thresh = f"c{int(config['feature_selection'].get('cumulative_threshold', 0.95) * 100)}" if cumul == "cum" else ""
    
    # Format timestamp
    timestamp = sydney_time.strftime("%Y%m%d_%H%M_SYD")
    
    # Combine into run name
    run_name = f"{method}_{min_thresh}_{min_feat}_{max_feat}_{cumul}{cumul_thresh}_{timestamp}"
    return run_name

# You can specify a custom run_id here, or leave it as None for automatic generation
run_id = None  # Change this to your desired run name if needed
if run_id is None:
    run_id = generate_run_name(feature_selection_config)
    logging.info(f"Generated run name: {run_id}")

feature_selector = FeatureSelector(feature_selection_config, run_id=run_id, logger=logging.getLogger('FeatureSelector'))

2025-04-03 01:37:40,509 - INFO - Validating data structure
2025-04-03 01:37:40,510 - INFO - Data structure validation completed successfully
2025-04-03 01:37:40,511 - INFO - Validating data structure
2025-04-03 01:37:40,512 - INFO - Data structure validation completed successfully
2025-04-03 01:37:40,530 - INFO - Generated run name: shap_t1_min10_max40_cumc95_20250403_1237_SYD
2025-04-03 01:37:40,531 - INFO - Saved run configuration to /home/siddharth.johri/DECOHERE/data/results/feature_selection/run_shap_t1_min10_max40_cumc95_20250403_1237_SYD/run_config.json


In [21]:
# --- Cell 2: Processing Function (with Enhanced Features) ---
def run_pipeline_for_date(date_str: str, processor: DataProcessor, storage: EfficientDataStorage, 
                         feature_generator: Optional[FeatureGenerator], config: dict):
    """Runs the full data pipeline for a single date, including enhanced features if available."""
    
    if not processor or not storage:
        logging.error(f"[{date_str}] Processor or Storage not initialized. Aborting.")
        return False, None, None, None

    logging.info(f"--- Starting Pipeline for Date: {date_str} ---")
    processed_file_path = None
    pre_feature_file_path = None
    enhanced_feature_file_path = None
    success = False

    try:
        # 1. Load and process raw data
        raw_data = processor.load_raw_data(date_str)
        if raw_data.empty:
            logging.warning(f"[{date_str}] No raw data found. Skipping remaining steps.")
            return True, None, None, None

        # 2. Process data
        transformed_data = processor.transform_raw_data(raw_data)
        filled_data = processor.fill_missing_values(transformed_data)

        # 3. Store processed data
        processed_file_path = storage.store_data(
            df=filled_data, 
            data_type=DataType.FUNDAMENTALS,
            stage=DataStage.PROCESSED, 
            date=date_str
        )

        # 4. Generate and store pre-feature data
        pre_feature_df = storage.processed_data_feat_gen(filled_data)
        if not pre_feature_df.empty:
            pre_feature_file_path = storage.store_data(
                df=pre_feature_df, 
                data_type=DataType.FUNDAMENTALS,
                stage=DataStage.FEATURES, 
                date=date_str, 
                sub_type='pre_feature_set'
            )

            # 5. Generate enhanced features if generator available
            if feature_generator:
                # Get parameters from config
                feature_cfg = config.get('features', {})
                hist_w = feature_cfg.get('hist_window', 6)
                fwd_w = feature_cfg.get('fwd_window', 6)
                target_m = feature_cfg.get('target_metric', 'PE_RATIO_RATIO_SIGNED_LOG')
                sector_map_rel_path = feature_cfg.get('sector_mapping_path', None)
                sector_map_abs_path = os.path.join(project_root, sector_map_rel_path) if sector_map_rel_path else None
                sector_levels = feature_cfg.get('sector_levels_to_include', ['sector_1'])
                include_sectors = feature_cfg.get('include_sector_features', True)

                enhanced_feature_df = feature_generator.generate_enhanced_features(
                    df=pre_feature_df,
                    hist_window=hist_w,
                    fwd_window=fwd_w,
                    target_metric=target_m,
                    sector_mapping_path=sector_map_abs_path,
                    sector_levels_to_include=sector_levels,
                    include_sector_features=include_sectors
                )
                
                if not enhanced_feature_df.empty:
                    enhanced_feature_file_path = storage.store_data(
                        df=enhanced_feature_df, 
                        data_type=DataType.FUNDAMENTALS,
                        stage=DataStage.FEATURES, 
                        date=date_str, 
                        sub_type='enhanced_features'
                    )
            else:
                logging.info(f"[{date_str}] FeatureGenerator not available. Skipping enhanced features.")

        success = True

    except Exception as e:
        logging.error(f"[{date_str}] Pipeline error: {e}", exc_info=True)
        success = False

    finally:
        logging.info(f"--- Finished Pipeline for Date: {date_str} (Success: {success}) ---")

    return success, processed_file_path, pre_feature_file_path, enhanced_feature_file_path

print("Pipeline function defined.")

Pipeline function defined.


In [22]:
# --- new Cell 3: Execution Loop and Verification ---
dates_to_process = ['2024-09-02', '2024-09-03', '2024-09-04']
results = {}

# Process each date
for date_str in dates_to_process:
    logging.info(f"--- Starting Pipeline Run for Date: {date_str} ---")
    success, proc_path, pre_feat_path, enh_feat_path = run_pipeline_for_date(
        date_str=date_str,
        processor=processor,
        storage=storage,
        feature_generator=feature_generator,
        config=config
    )
    results[date_str] = (success, proc_path, pre_feat_path, enh_feat_path)
    logging.info(f"--- Completed Pipeline Run for Date: {date_str} ---")

# Verify results
def verify_pipeline_output(date_str: str, result: tuple, expect_files: bool = True) -> bool:
    """Verify the pipeline output for a specific date."""
    success, proc_path, pre_feat_path, enh_feat_path = result
    
    if not success:
        logging.error(f"[{date_str}] Pipeline run failed")
        return False
        
    if expect_files:
        # Check if all expected files exist
        for path, file_type in [
            (proc_path, "processed data"),
            (pre_feat_path, "pre-feature data"),
            (enh_feat_path, "enhanced features")
        ]:
            if path and not os.path.exists(path):
                logging.error(f"[{date_str}] Missing {file_type} file: {path}")
                return False
            elif path:
                logging.info(f"[{date_str}] Verified {file_type} file: {path}")
    
    return True

# Verify all results
all_success = True
for date_str, result in results.items():
    if not verify_pipeline_output(date_str, result):
        all_success = False
        logging.error(f"[{date_str}] Verification failed")
    else:
        logging.info(f"[{date_str}] Verification successful")

if all_success:
    logging.info("All pipeline runs completed and verified successfully")
else:
    logging.error("Some pipeline runs failed verification")

print("Pipeline execution and verification complete.")

2025-04-03 01:46:20,400 - INFO - --- Starting Pipeline Run for Date: 2024-09-02 ---
2025-04-03 01:46:20,401 - INFO - --- Starting Pipeline for Date: 2024-09-02 ---
2025-04-03 01:46:20,402 - INFO - Loading raw data from /home/siddharth.johri/DECOHERE/data/raw/fundamentals/financials_2024_09.pq
2025-04-03 01:46:20,421 - INFO - Filtering data for date: 2024-09-02
2025-04-03 01:46:20,429 - INFO - Loaded raw data with shape: (5223, 34)
2025-04-03 01:46:20,430 - INFO - Calculating periods for each ticker using COHERE logic
2025-04-03 01:46:20,442 - INFO - Number of unique fiscal months per ID: fiscal_month
1    457
2     43
Name: count, dtype: int64
2025-04-03 01:46:20,443 - INFO - Calculating periods by ID using id column: 'ID'
2025-04-03 01:46:22,311 - INFO - Period counts: {-6: np.int64(1), -5: np.int64(476), -4: np.int64(486), -3: np.int64(492), -2: np.int64(497), -1: np.int64(499), 0: np.int64(500), 1: np.int64(451), 2: np.int64(450), 3: np.int64(424), 4: np.int64(346), 5: np.int64(292)

Processing groups:   0%|          | 0/500 [00:00<?, ?it/s]

2025-04-03 01:47:51,883 - INFO - Identified 786 potential numerical feature columns generated.
2025-04-03 01:47:51,883 - INFO - Sector features requested but no path provided. Skipping.
2025-04-03 01:47:51,889 - INFO - Ranking 786 numerical features cross-sectionally (by PIT_DATE)...
2025-04-03 01:47:52,196 - INFO - Numerical feature ranking complete.
2025-04-03 01:47:52,197 - INFO - Merging target variable: PE_RATIO_RATIO_SIGNED_LOG
2025-04-03 01:47:52,207 - INFO - Feature generation pipeline complete.
2025-04-03 01:47:52,213 - INFO - Final DataFrame shape: (500, 789)
2025-04-03 01:47:52,233 - INFO - Storing fundamentals data for date: 2024-09-02 in features stage
2025-04-03 01:47:52,358 - INFO - Saved data to: /home/siddharth.johri/DECOHERE/data/features/fundamentals/enhanced_features/year=2024/month=09/data_2024-09-02.pq
2025-04-03 01:47:52,359 - INFO - File size: 2577107 bytes
2025-04-03 01:47:52,359 - INFO - --- Finished Pipeline for Date: 2024-09-02 (Success: True) ---
2025-04-03

Processing groups:   0%|          | 0/500 [00:00<?, ?it/s]

2025-04-03 01:49:25,199 - INFO - Identified 786 potential numerical feature columns generated.
2025-04-03 01:49:25,200 - INFO - Sector features requested but no path provided. Skipping.
2025-04-03 01:49:25,205 - INFO - Ranking 786 numerical features cross-sectionally (by PIT_DATE)...
2025-04-03 01:49:25,368 - INFO - Numerical feature ranking complete.
2025-04-03 01:49:25,369 - INFO - Merging target variable: PE_RATIO_RATIO_SIGNED_LOG
2025-04-03 01:49:25,377 - INFO - Feature generation pipeline complete.
2025-04-03 01:49:25,382 - INFO - Final DataFrame shape: (500, 789)
2025-04-03 01:49:25,401 - INFO - Storing fundamentals data for date: 2024-09-03 in features stage
2025-04-03 01:49:25,547 - INFO - Saved data to: /home/siddharth.johri/DECOHERE/data/features/fundamentals/enhanced_features/year=2024/month=09/data_2024-09-03.pq
2025-04-03 01:49:25,548 - INFO - File size: 2741297 bytes
2025-04-03 01:49:25,548 - INFO - --- Finished Pipeline for Date: 2024-09-03 (Success: True) ---
2025-04-03

Processing groups:   0%|          | 0/500 [00:00<?, ?it/s]

2025-04-03 01:50:56,831 - INFO - Identified 786 potential numerical feature columns generated.
2025-04-03 01:50:56,832 - INFO - Sector features requested but no path provided. Skipping.
2025-04-03 01:50:56,837 - INFO - Ranking 786 numerical features cross-sectionally (by PIT_DATE)...
2025-04-03 01:50:57,151 - INFO - Numerical feature ranking complete.
2025-04-03 01:50:57,152 - INFO - Merging target variable: PE_RATIO_RATIO_SIGNED_LOG
2025-04-03 01:50:57,159 - INFO - Feature generation pipeline complete.
2025-04-03 01:50:57,164 - INFO - Final DataFrame shape: (500, 789)
2025-04-03 01:50:57,184 - INFO - Storing fundamentals data for date: 2024-09-04 in features stage
2025-04-03 01:50:57,311 - INFO - Saved data to: /home/siddharth.johri/DECOHERE/data/features/fundamentals/enhanced_features/year=2024/month=09/data_2024-09-04.pq
2025-04-03 01:50:57,312 - INFO - File size: 2756397 bytes
2025-04-03 01:50:57,313 - INFO - --- Finished Pipeline for Date: 2024-09-04 (Success: True) ---
2025-04-03

Pipeline execution and verification complete.


In [41]:
#Comparison block

# s =pd.read_parquet('/home/siddharth.johri/DECOHERE/data/raw/fundamentals/financials_2024_09.pq')
# a = pd.read_parquet('/home/siddharth.johri/DECOHERE/data/processed/fundamentals/year=2024/month=09/data_2024-09-04.pq')
# b = pd.read_parquet('/home/siddharth.johri/DECOHERE/data/features/fundamentals/pre_feature_set/year=2024/month=09/data_2024-09-04.pq')
# c = pd.read_parquet('/home/siddharth.johri/DECOHERE/data/features/fundamentals/enhanced_features/year=2024/month=09/data_2024-09-04.pq')
# a.query('ID == "INFO IB Equity" & PERIOD_END_DATE == "2024-03-31"')['PE_RATIO_RATIO']
# c.query('ID == "INFO IB Equity"')['PE_RATIO_RATIO_SIGNED_LOG']


In [44]:
# --- Cell 4: Feature Selection and Stability Analysis ---
target_date = '2024-09-02'
start_date, end_date = '2024-09-02', '2024-09-04'

# Select features
selected_features = feature_selector.select_features_daily(target_date)

# Analyze stability
stability_results = feature_selector.analyze_feature_stability(start_date, end_date)

# Display results
print(f"Selected {len(selected_features['common'])} features for {target_date}")
print(f"Stability score: {stability_results['stability_score']:.2%}")
print(f"Top 5 stable: {stability_results['most_stable_features'][:5]}")
print(f"Top 5 unstable: {stability_results['least_stable_features'][:5]}")

2025-04-03 03:35:35,762 - INFO - Starting feature selection for date: 2024-09-02
2025-04-03 03:35:35,764 - ERROR - Data file not found: /home/siddharth.johri/DECOHERE/data/features/fundamentals/year=2024/month=09/data_2024-09-02.pq
2025-04-03 03:35:35,765 - INFO - Analyzing feature stability from 2024-09-02 to 2024-09-04
2025-04-03 03:35:35,766 - INFO - Saved feature stability analysis to /home/siddharth.johri/DECOHERE/data/results/feature_selection/run_shap_t1_min10_max40_cumc95_20250403_1237_SYD/feature_stability_shap_t1_min10_max40_cumc95_20250403_1237_SYD.csv


TypeError: 'NoneType' object is not subscriptable

In [None]:
def run_feature_selection(date: str, run_id: Optional[str] = None) -> Dict[str, List[str]]:
    """
    Run feature selection for a specific date.
    
    Args:
        date: Date in YYYY-MM-DD format
        run_id: Optional custom run identifier. If None, an intuitive name will be generated
        
    Returns:
        Dictionary containing selected features for each method
    """
    try:
        logger.info(f"Starting feature selection for date: {date}")
        
        # Initialize feature selector
        feature_selector = FeatureSelector(config, run_id=run_id, logger=logger)
        
        # Select features for the date
        results = feature_selector.select_features_daily(date)
        
        if results:
            logger.info(f"Feature selection completed successfully for {date}")
            logger.info(f"Selected features saved in run directory: {feature_selector.results_dir}")
        else:
            logger.warning(f"No features selected for {date}")
            
        return results
        
    except Exception as e:
        logger.error(f"Error in feature selection for {date}: {str(e)}")
        return None

In [12]:
def analyze_feature_stability(start_date: str, end_date: str, run_id: str) -> Dict[str, Any]:
    """
    Analyze feature stability across a date range.
    
    Args:
        start_date: Start date in YYYY-MM-DD format
        end_date: End date in YYYY-MM-DD format
        run_id: Run identifier to analyze
        
    Returns:
        Dictionary containing stability analysis results
    """
    try:
        logger.info(f"Analyzing feature stability from {start_date} to {end_date}")
        
        # Initialize feature selector with the same run_id
        feature_selector = FeatureSelector(config, run_id=run_id, logger=logger)
        
        # Analyze stability
        stability_metrics = feature_selector.analyze_feature_stability(start_date, end_date)
        
        logger.info(f"Stability analysis completed. Results saved in run directory: {feature_selector.results_dir}")
        
        return stability_metrics
        
    except Exception as e:
        logger.error(f"Error in stability analysis: {str(e)}")
        return None

NameError: name 'Dict' is not defined

In [None]:
# Example: Run feature selection for a specific date
date = "2024-03-15"
results = run_feature_selection(date)

# Example: Run feature selection with custom run ID
custom_run_id = "my_experiment_1"
results = run_feature_selection(date, run_id=custom_run_id)

# Example: Analyze feature stability
start_date = "2024-03-01"
end_date = "2024-03-15"
stability = analyze_feature_stability(start_date, end_date, custom_run_id)