In [1]:
import sys
from pathlib import Path
import pandas as pd
import os
from IPython.display import display, Markdown  # Assuming you use these for display


# Set pandas display options to show more columns and rows
pd.set_option('display.max_columns', None)  # Show all columns
# pd.set_option('display.max_rows', 10)       # Limit to 10 rows for readability
pd.set_option('display.width', 1500)        # Let the display adjust to the window


# Notebook cell
%load_ext autoreload
%autoreload 2

# Get root directory (assuming notebook is in root/notebooks/)
NOTEBOOK_DIR = Path.cwd()
ROOT_DIR = NOTEBOOK_DIR.parent if NOTEBOOK_DIR.name == 'notebooks' else NOTEBOOK_DIR

# Add src directory to Python path
sys.path.append(str(ROOT_DIR / 'src'))

# Verify path
print(f"Python will look in these locations:\n{sys.path}")


# --- Execute the processor ---
import utils




Python will look in these locations:
['C:\\Users\\ping\\.pyenv\\pyenv-win\\versions\\3.11.9\\python311.zip', 'C:\\Users\\ping\\.pyenv\\pyenv-win\\versions\\3.11.9\\DLLs', 'C:\\Users\\ping\\.pyenv\\pyenv-win\\versions\\3.11.9\\Lib', 'C:\\Users\\ping\\.pyenv\\pyenv-win\\versions\\3.11.9', 'c:\\Users\\ping\\Files_win10\\python\\py311\\.venv', '', 'c:\\Users\\ping\\Files_win10\\python\\py311\\.venv\\Lib\\site-packages', 'c:\\Users\\ping\\Files_win10\\python\\py311\\.venv\\Lib\\site-packages\\win32', 'c:\\Users\\ping\\Files_win10\\python\\py311\\.venv\\Lib\\site-packages\\win32\\lib', 'c:\\Users\\ping\\Files_win10\\python\\py311\\.venv\\Lib\\site-packages\\Pythonwin', 'c:\\Users\\ping\\Files_win10\\python\\py311\\stocks\\src']


In [2]:
# path_OHLCV, _, _ = utils.main_processor(
#     data_dir='..\data',  
#     # data_dir='output\selection_results',  # search project ..\data
#     downloads_dir='',  # None searchs Downloads dir, '' omits search1
#     downloads_limit=60,  # search the first 10 files
#     clean_name_override=None,  # override filename
#     start_file_pattern='df_OHLCV', # search for files starting with 'df_'
#     contains_pattern='clean' # search for files containing 'df_'
# )
path_OHLCV = ROOT_DIR / 'data' / 'df_OHLCV_clean_stocks_etfs.parquet'

print(f'path_OHLCV: {path_OHLCV}')
df_OHLCV = pd.read_parquet(path_OHLCV)
print(f'df_OHLCV:\n{df_OHLCV.head()}\n')
print(f'df_OHLCV.info():\n{df_OHLCV.info()}')

path_OHLCV: c:\Users\ping\Files_win10\python\py311\stocks\data\df_OHLCV_clean_stocks_etfs.parquet
df_OHLCV:
                   Adj Open  Adj High  Adj Low  Adj Close   Volume
Symbol Date                                                       
A      2025-05-20    113.45    114.14   112.66     113.48  2245772
       2025-05-19    112.50    113.47   112.03     113.44  2086300
       2025-05-16    112.28    113.83   110.82     113.77  1812000
       2025-05-15    111.28    112.40   108.93     112.40  1873900
       2025-05-14    114.95    115.50   111.28     111.52  2563400

<class 'pandas.core.frame.DataFrame'>
MultiIndex: 510190 entries, ('A', Timestamp('2025-05-20 00:00:00')) to ('ZWS', Timestamp('2024-02-01 00:00:00'))
Data columns (total 5 columns):
 #   Column     Non-Null Count   Dtype  
---  ------     --------------   -----  
 0   Adj Open   510190 non-null  float64
 1   Adj High   510190 non-null  float64
 2   Adj Low    510190 non-null  float64
 3   Adj Close  510190 non-null  f

In [3]:
import pandas as pd
import io
import pprint # For cleaner dictionary printing
import numpy as np
import traceback # Added for detailed error logging
import json
import os # For creating directories and checking file existence
import logging # For logging instead of print
import datetime # For timestamping runs
import sys # Added for interpreter/path logging

from typing import List, Dict, Tuple, Set, Any, Callable, Optional # Import types for hinting


# --- Make sure 'utils' exists and has 'extract_date_from_string' ---
try:
    import utils # Assuming this is your utility module for date extraction
    # Check if the function exists (optional but good practice)
    if not hasattr(utils, 'extract_date_from_string'):
        logging.error("ERROR: 'utils' module imported but 'extract_date_from_string' function not found!")
        sys.exit("Critical function missing from utils module.")
except ImportError:
    # This will be caught if logging isn't set up yet, so print is a fallback here.
    print("ERROR: Failed to import the 'utils' module. Make sure utils.py exists and is in the Python path.")
    # If logging is set up, this will also go to the log.
    if logging.getLogger().hasHandlers():
        logging.error("ERROR: Failed to import the 'utils' module.", exc_info=True)
    sys.exit("Missing required 'utils' module.") # Exit if utils can't be imported
except Exception as e:
    print(f"ERROR: An unexpected error occurred during 'utils' import: {e}")
    if logging.getLogger().hasHandlers():
        logging.error(f"ERROR: An unexpected error occurred during 'utils' import: {e}", exc_info=True)
    sys.exit("Error during module import.")


# --- Constants ---
RISK_FREE_RATE_DAILY = 0.04 / 252
LOG_DIR = 'logs'
RESULTS_DIR = 'output/backtest_results'
RESULTS_CSV_PATH = os.path.join(RESULTS_DIR, 'backtest_parameter_performance.csv') # Path to the CSV file
RESULTS_DF_PATH = os.path.join(RESULTS_DIR, 'df_backtest_parameter_performance.parquet') # Path to dataframe file

# Parameters that, along with selection_date and scheme, define a unique run for overwriting purposes
PARAMS_TO_TRACK = [
    'n_select_requested',
    'inv_vol_col_name',
    'filter_min_price',
    'filter_min_avg_volume_m',
    'filter_min_roe_pct',
    'filter_max_debt_eq',
    'score_weight_rsi',
    'score_weight_change',
    'score_weight_rel_volume',
    'score_weight_volatility',
    # 'weight' (scheme) is handled separately by using 'scheme' column
]

# Define column order for the CSV file. This ensures consistency.
# This list should include all fields from PARAMS_TO_TRACK, plus others.
CSV_COLUMN_ORDER = [
    'run_timestamp', 'log_file', 'selection_date', 'actual_selection_date_used', 'scheme',
    # Parameters from PARAMS_TO_TRACK
    'n_select_requested', 'inv_vol_col_name', 'filter_min_price',
    'filter_min_avg_volume_m', 'filter_min_roe_pct', 'filter_max_debt_eq',
    'score_weight_rsi', 'score_weight_change', 'score_weight_rel_volume', 'score_weight_volatility',
    # Other parameters/results
    'n_select_actual',
    'portfolio_return', 'portfolio_return_normalized',
    'num_attempted_trades',
    'num_successful_trades', 'num_failed_or_skipped_trades',
    'total_weight_traded',
    'win_rate', 'average_return', 'std_dev_return', 'sharpe_ratio_period'
]

# Check at startup that all PARAMS_TO_TRACK are in CSV_COLUMN_ORDER
if not set(PARAMS_TO_TRACK).issubset(set(CSV_COLUMN_ORDER)):
    logging.error("CRITICAL: Not all PARAMS_TO_TRACK are present in CSV_COLUMN_ORDER!")
    sys.exit("Configuration error: PARAMS_TO_TRACK mismatch with CSV_COLUMN_ORDER.")


# Columns used to uniquely identify a row for the purpose of overwriting.
# This combines 'selection_date', 'scheme', and all parameters defined in PARAMS_TO_TRACK.
UNIQUE_KEY_COLUMNS_FOR_CSV = ['selection_date', 'scheme'] + PARAMS_TO_TRACK

# Path to adjusted close prices
ADJ_CLOSE_PATH = '../data/df_adj_close.parquet'

# Path to selection results
OUTPUT_DIR = 'output/selection_results/'



# --- 1. Setup Logging ---
def setup_logging(log_dir: str = LOG_DIR):
    """Configures logging to write to a file."""
    os.makedirs(log_dir, exist_ok=True)
    log_filename = datetime.datetime.now().strftime("backtest_run_%Y%m%d_%H%M%S.log")
    log_filepath = os.path.join(log_dir, log_filename)

    logger = logging.getLogger()
    logger.setLevel(logging.INFO) 

    for handler in logger.handlers[:]:
        handler.close()
        logger.removeHandler(handler)

    file_handler_instance = logging.FileHandler(log_filepath)
    file_handler_instance.setLevel(logging.INFO) 

    stream_handler = logging.StreamHandler()
    stream_handler.setLevel(logging.INFO) 

    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    file_handler_instance.setFormatter(formatter)
    stream_handler.setFormatter(formatter)

    logger.addHandler(file_handler_instance)
    logger.addHandler(stream_handler)

    logging.info(f"Logging initialized. Log file: {log_filepath}")
    return log_filepath 


def extract_backtest_setups(
    dataframe: pd.DataFrame,
    weight_column_names: List[str],
    date_str: str, 
    scheme_separator: str = '_'
    ) -> Dict[str, Dict[str, Dict[str, float]]]: 
    """
    Extracts Ticker-Weight pairs from specified columns in a DataFrame.
    """
    if not date_str: 
        logging.error("The 'date_str' argument cannot be None or empty.")
        raise ValueError("The 'date_str' argument cannot be None or empty.")

    if dataframe is None or dataframe.empty:
        logging.warning("Input DataFrame is None or empty. Cannot extract setups.")
        return {} 

    scheme_setups: Dict[str, Dict[str, float]] = {}

    for col_name in weight_column_names:
        if col_name in dataframe.columns:
            try:
                parts = col_name.split(scheme_separator)
                scheme_name = parts[-1] if len(parts) > 1 else col_name
                numeric_col = pd.to_numeric(dataframe[col_name], errors='coerce')
                ticker_weights = numeric_col.dropna().astype(float).to_dict()

                if ticker_weights:
                    if scheme_name in scheme_setups:
                        logging.warning(f"Duplicate scheme name '{scheme_name}' derived. "
                                        f"Weights from column '{col_name}' might overwrite previous ones.")
                    scheme_setups[scheme_name] = ticker_weights
                    logging.info(f"Successfully extracted weights for scheme: {scheme_name} "
                                f"({len(ticker_weights)} tickers) for date {date_str}")
                else:
                    logging.warning(f"No valid (non-NaN, numeric) weights found for column '{col_name}'. "
                                  f"Skipping scheme '{scheme_name}' for date '{date_str}'.")
            except Exception as e:
                logging.error(f"Error processing column '{col_name}': {e}", exc_info=True) 
        else:
            logging.warning(f"Column '{col_name}' not found in the DataFrame.")

    final_output = {date_str: scheme_setups}

    if not scheme_setups:
      logging.warning(f"No valid backtest setups generated for date {date_str}.")
    return final_output


def run_single_backtest(
    selection_date: str,
    scheme_name: str,
    ticker_weights: Dict[str, float],
    df_adj_close: pd.DataFrame,
    risk_free_rate_daily: float = RISK_FREE_RATE_DAILY,
    ) -> Optional[Dict[str, Any]]:
    """
    Runs a simple backtest for a given selection date and ticker weights.
    """
    logging.info("-" * 30)
    logging.info(f"Initiating Backtest Run...")
    logging.info(f"  Date          : {selection_date}")
    logging.info(f"  Scheme        : {scheme_name}")
    logging.info(f"  Num Tickers   : {len(ticker_weights)}")
    sample_weights_str = io.StringIO()
    pprint.pprint(dict(list(ticker_weights.items())[:3]), stream=sample_weights_str)
    if len(ticker_weights) > 3: sample_weights_str.write("    ...\n")
    logging.debug(f"  Sample Weights:\n{sample_weights_str.getvalue()}") 

    try:
        df_prices = df_adj_close.copy()
        if not isinstance(df_prices.index, pd.DatetimeIndex):
            try:
                df_prices.index = pd.to_datetime(df_prices.index)
                logging.info("  Info: Converted DataFrame index to DatetimeIndex.")
            except Exception as e:
                logging.error(f"  Error: Failed to convert DataFrame index to DatetimeIndex: {e}", exc_info=True)
                logging.info("-" * 30)
                return None

        if not df_prices.index.is_monotonic_increasing:
            logging.info("  Info: Sorting DataFrame index by date...")
            df_prices = df_prices.sort_index()
            logging.info("  Info: DataFrame index sorted.")

        all_trading_dates = df_prices.index
        selection_timestamp = pd.Timestamp(selection_date)
    except Exception as e:
        logging.error(f"  Error during initial data preparation: {e}", exc_info=True)
        logging.info("-" * 30)
        return None

    try:
        try:
            selection_idx = all_trading_dates.get_loc(selection_timestamp)
        except KeyError:
            indexer = all_trading_dates.get_indexer([selection_timestamp], method='ffill') 
            if indexer[0] == -1: 
                 indexer_bfill = all_trading_dates.get_indexer([selection_timestamp], method='bfill')
                 if indexer_bfill[0] == -1:
                    logging.error(f"  Error: Selection date {selection_date} or a nearby trading date not found in price data index.")
                    logging.info("-" * 30)
                    return None
                 else:
                     selection_idx = indexer_bfill[0]
                     actual_selection_date_used = all_trading_dates[selection_idx]
                     logging.warning(f"  Warning: Exact selection date {selection_date} not found. Using next available date: {actual_selection_date_used.strftime('%Y-%m-%d')}")
            else:
                selection_idx = indexer[0]
                actual_selection_date_used = all_trading_dates[selection_idx]
                if actual_selection_date_used != selection_timestamp:
                     logging.warning(f"  Warning: Exact selection date {selection_date} not found. Using previous available date: {actual_selection_date_used.strftime('%Y-%m-%d')}")

        if selection_idx + 1 >= len(all_trading_dates):
            logging.error(f"  Error: No trading date found after selection date index {selection_idx} ({all_trading_dates[selection_idx].strftime('%Y-%m-%d')}).")
            logging.info("-" * 30)
            return None
        buy_date = all_trading_dates[selection_idx + 1]

        if selection_idx + 2 >= len(all_trading_dates):
            logging.error(f"  Error: No trading date found after buy date {buy_date.strftime('%Y-%m-%d')}.")
            logging.info("-" * 30)
            return None
        sell_date = all_trading_dates[selection_idx + 2]

        logging.info(f"  Selection Date Used: {all_trading_dates[selection_idx].strftime('%Y-%m-%d')}")
        logging.info(f"  Buy Date           : {buy_date.strftime('%Y-%m-%d')}")
        logging.info(f"  Sell Date          : {sell_date.strftime('%Y-%m-%d')}")

        trades = []
        returns = []
        portfolio_return = 0.0
        total_weight_traded = 0.0
        valid_tickers_count = 0
        missing_price_count = 0

        relevant_tickers = [t for t in ticker_weights.keys() if t in df_prices.columns]
        relevant_dates = [buy_date, sell_date]
        try:
            price_subset = df_prices.loc[relevant_dates, relevant_tickers]
        except KeyError as e:
            logging.error(f"  Error selecting price subset for dates {relevant_dates} and tickers. Missing columns?: {e}", exc_info=True)
            return None 

        for ticker in ticker_weights.keys():
            if ticker not in price_subset.columns: 
                logging.warning(f"    Warning: Ticker {ticker} not found in price data columns. Skipping.")
                continue

            valid_tickers_count += 1
            trade_info = { "ticker": ticker, "weight": ticker_weights[ticker],
                          "buy_date": buy_date.strftime('%Y-%m-%d'), "sell_date": sell_date.strftime('%Y-%m-%d'),
                          "buy_price": None, "sell_price": None, "return": None, "status": "Pending" }

            try:
                buy_price = price_subset.at[buy_date, ticker]
                if pd.isna(buy_price) or buy_price <= 0: raise ValueError(f"Invalid buy price ({buy_price})")
                sell_price = price_subset.at[sell_date, ticker]
                if pd.isna(sell_price): raise ValueError(f"Invalid sell price ({sell_price})") 

                trade_return = (sell_price - buy_price) / buy_price
                trade_info.update({"buy_price": buy_price, "sell_price": sell_price, "return": trade_return, "status": "Success"})
                trades.append(trade_info)
                returns.append(trade_return)

                current_weight = ticker_weights[ticker]
                portfolio_return += trade_return * current_weight
                total_weight_traded += current_weight
            except KeyError as e:
                logging.warning(f"    Error accessing price for {ticker} on {e}. Skipping trade.")
                trade_info["status"] = f"Error: Price data missing ({e})"
                trades.append(trade_info)
                missing_price_count += 1
            except ValueError as e:
                logging.warning(f"    Warning: Invalid price data for {ticker} between {buy_date.strftime('%Y-%m-%d')} and {sell_date.strftime('%Y-%m-%d')} ({e}). Skipping trade.")
                trade_info["status"] = f"Skipped: Invalid price ({e})"
                try: trade_info["buy_price"] = price_subset.at[buy_date, ticker]
                except: pass
                try: trade_info["sell_price"] = price_subset.at[sell_date, ticker]
                except: pass
                trades.append(trade_info)
                missing_price_count += 1
            except Exception as e:
                logging.error(f"    Unexpected error processing trade for {ticker}: {e}", exc_info=True)
                trade_info["status"] = f"Error: Unexpected ({type(e).__name__})"
                trades.append(trade_info)
                missing_price_count += 1

        num_attempted_trades = valid_tickers_count
        num_successful_trades = len(returns)
        metrics = {
            'num_selected_tickers': len(ticker_weights),
            'num_valid_tickers_in_data': valid_tickers_count,
            'num_attempted_trades': num_attempted_trades,
            'num_successful_trades': num_successful_trades,
            'num_failed_or_skipped_trades': num_attempted_trades - num_successful_trades,
            'portfolio_return': portfolio_return if num_successful_trades > 0 and abs(total_weight_traded) > 1e-9 else 0.0,
            'total_weight_traded': total_weight_traded,
            'win_rate': None, 'average_return': None, 'std_dev_return': None, 'sharpe_ratio_period': None,
        }

        if num_successful_trades > 0:
            returns_array = np.array(returns)
            metrics['win_rate'] = np.sum(returns_array > 0) / num_successful_trades
            metrics['average_return'] = np.mean(returns_array)
            metrics['std_dev_return'] = np.std(returns_array, ddof=1) if num_successful_trades > 1 else 0.0
            std_dev = metrics['std_dev_return']
            avg_ret = metrics['average_return'] 

            if std_dev is not None and std_dev > 1e-9: 
                excess_return = avg_ret - risk_free_rate_daily
                metrics['sharpe_ratio_period'] = excess_return / std_dev
            elif avg_ret is not None: 
                excess_return = avg_ret - risk_free_rate_daily
                if abs(excess_return) < 1e-9: 
                    metrics['sharpe_ratio_period'] = 0.0
                else: 
                    metrics['sharpe_ratio_period'] = np.inf * np.sign(excess_return)
            else: 
                metrics['sharpe_ratio_period'] = np.nan

            logging.info(f"  Trades Executed: {num_successful_trades}/{num_attempted_trades}")
            if abs(total_weight_traded - 1.0) > 1e-6 and abs(total_weight_traded) > 1e-9:
                normalized_portfolio_return = portfolio_return / total_weight_traded
                logging.info(f"  Portfolio Return (Raw)    : {portfolio_return:.4f} (Based on Weight Sum: {total_weight_traded:.4f})")
                logging.info(f"  Portfolio Return (Norm'd) : {normalized_portfolio_return:.4f}")
                metrics['portfolio_return_normalized'] = normalized_portfolio_return 
            else:
                 logging.info(f"  Portfolio Return          : {portfolio_return:.4f} (Based on Weight Sum: {total_weight_traded:.4f})")

            logging.info(f"  Win Rate (Individual)   : {metrics['win_rate']:.2%}" if metrics['win_rate'] is not None else "N/A")
            logging.info(f"  Avg Ticker Return       : {metrics['average_return']:.4f}" if metrics['average_return'] is not None else "N/A")
            logging.info(f"  Std Dev Ticker Return   : {metrics['std_dev_return']:.4f}" if metrics['std_dev_return'] is not None else "N/A")
            logging.info(f"  Period Sharpe (Indiv)   : {metrics['sharpe_ratio_period']:.4f}" if metrics['sharpe_ratio_period'] is not None else "N/A")
        else:
            logging.warning(f"  No successful trades executed out of {num_attempted_trades} attempted.")
            logging.info(f"  Portfolio Return          : {metrics['portfolio_return']:.4f}")

        backtest_results = {
            "run_inputs": {
                "selection_date": selection_date,
                "actual_selection_date_used": all_trading_dates[selection_idx].strftime('%Y-%m-%d'), 
                "scheme_name": scheme_name,
                "num_tickers_input": len(ticker_weights),
                "risk_free_rate_daily": risk_free_rate_daily,
                "buy_date": buy_date.strftime('%Y-%m-%d'),
                "sell_date": sell_date.strftime('%Y-%m-%d'),
            },
            "metrics": metrics,
            "trades": trades 
        }
        logging.info(f"Backtest simulation for '{scheme_name}' on {selection_date} completed.")
        logging.info("-" * 30)
        return backtest_results
    except Exception as e:
        logging.critical(f"  FATAL ERROR during backtest run for {selection_date}, {scheme_name}: {e}", exc_info=True)
        logging.info("-" * 30)
        return None


def process_all_backtests(
    nested_setups: Dict[str, Dict[str, Dict[str, float]]],
    df_adj_close: pd.DataFrame 
    ) -> Dict[str, Dict[str, Optional[Dict[str, Any]]]]:
    """
    Iterates through the nested setup dictionary and runs individual backtests.
    """  
    all_results: Dict[str, Dict[str, Optional[Dict[str, Any]]]] = {}

    if not nested_setups:
        logging.warning("Received empty setup dictionary. No backtests to run.")
        return all_results

    logging.info("\n===== Starting Batch Backtest Processing =====")
    try:
        df_prices_global = df_adj_close.copy()
        if not isinstance(df_prices_global.index, pd.DatetimeIndex):
            df_prices_global.index = pd.to_datetime(df_prices_global.index)
        if not df_prices_global.index.is_monotonic_increasing:
            df_prices_global = df_prices_global.sort_index()
        logging.info("Prepared global price data copy for backtests.")
    except Exception as e:
        logging.critical(f"Failed to prepare global price data copy: {e}", exc_info=True)
        return all_results 

    for date_str, schemes_for_date in nested_setups.items():
        logging.info(f"\nProcessing date: {date_str}")
        if not schemes_for_date:
            logging.warning(f"  No schemes found for this date. Skipping.")
            all_results[date_str] = {} 
            continue

        results_for_date: Dict[str, Optional[Dict[str, Any]]] = {}
        for scheme_name, ticker_weights in schemes_for_date.items():
            if not ticker_weights:
                logging.warning(f"  Skipping scheme '{scheme_name}': No ticker weights provided.")
                results_for_date[scheme_name] = None 
                continue
            try:
                backtest_result = run_single_backtest(
                    selection_date=date_str,
                    scheme_name=scheme_name,
                    ticker_weights=ticker_weights,
                    df_adj_close=df_prices_global, 
                    risk_free_rate_daily = RISK_FREE_RATE_DAILY,
                )
                results_for_date[scheme_name] = backtest_result
            except Exception as e:
                logging.error(f"!! UNEXPECTED Error running backtest for {scheme_name} on {date_str} in outer loop: {e}", exc_info=True)
                results_for_date[scheme_name] = None 

        all_results[date_str] = results_for_date

    logging.info("\n===== Batch Backtest Processing Finished =====")
    return all_results

# --- 3. Function to Extract Parameters and Results for Storage ---
def extract_params_and_results(
    params: Dict[str, Any],
    backtest_results_summary: Dict[str, Dict[str, Optional[Dict[str, Any]]]],
    run_timestamp: str,
    log_filepath: str
    ) -> List[Dict[str, Any]]:
    """
    Extracts relevant parameters and portfolio returns from results.
    """
    records = []
    if not log_filepath: 
        log_filename = "unknown_log.log"
        logging.error("Log filepath was not set correctly during parameter extraction.")
    else:
        log_filename = os.path.basename(log_filepath)

    for date_str, scheme_results in backtest_results_summary.items():
        if not scheme_results: 
             logging.warning(f"No scheme results found for date {date_str} during param extraction.")
             continue
        for scheme_name, result in scheme_results.items():
            record = {
                'run_timestamp': run_timestamp,
                'log_file': log_filename,
                'selection_date': date_str,
                'scheme': scheme_name,
            }
            for p_key in PARAMS_TO_TRACK:
                record[p_key] = params.get(p_key, None) 
            record['n_select_actual'] = params.get('n_select_actual', np.nan) 

            if result and isinstance(result, dict) and 'metrics' in result and isinstance(result['metrics'], dict):
                metrics = result['metrics']
                record['portfolio_return'] = metrics.get('portfolio_return', np.nan)
                record['portfolio_return_normalized'] = metrics.get('portfolio_return_normalized', np.nan) 
                record['num_successful_trades'] = metrics.get('num_successful_trades', 0)
                record['total_weight_traded'] = metrics.get('total_weight_traded', 0.0)
                record['win_rate'] = metrics.get('win_rate', np.nan)
                record['average_return'] = metrics.get('average_return', np.nan)
                record['std_dev_return'] = metrics.get('std_dev_return', np.nan) 
                record['sharpe_ratio_period'] = metrics.get('sharpe_ratio_period', np.nan) 
                record['num_attempted_trades'] = metrics.get('num_attempted_trades', 0)
                record['num_failed_or_skipped_trades'] = metrics.get('num_failed_or_skipped_trades', 0)
                record['actual_selection_date_used'] = result.get('run_inputs', {}).get('actual_selection_date_used', None)
            else:
                record['portfolio_return'] = np.nan
                record['portfolio_return_normalized'] = np.nan
                record['num_successful_trades'] = 0
                record['total_weight_traded'] = 0.0
                record['win_rate'] = np.nan
                record['average_return'] = np.nan
                record['std_dev_return'] = np.nan
                record['sharpe_ratio_period'] = np.nan
                record['num_attempted_trades'] = np.nan 
                record['num_failed_or_skipped_trades'] = np.nan
                record['actual_selection_date_used'] = None
                if result is None:
                     logging.warning(f"Extracting params: Backtest result was None for {date_str} / {scheme_name}.")
                else:
                     logging.warning(f"Extracting params: Unexpected result format for {date_str} / {scheme_name}: {type(result)}")
            records.append(record)
    return records

# --- 4. MODIFIED Function to Write Results to CSV (handles overwriting) ---
def write_results_to_csv(records: List[Dict[str, Any]], filepath: str = RESULTS_CSV_PATH):
    """
    Writes a list of result records to a CSV file.
    If a record with the same selection_date, scheme, and all parameters
    defined in PARAMS_TO_TRACK already exists, it's overwritten by the new record.
    Only the latest result (from the current batch of `records`) for that specific
    combination is kept. The file is rewritten entirely.
    """
    if not records:
        logging.info("No records to write to CSV.")
        return

    try:
        df_new_records = pd.DataFrame(records)
    except Exception as e:
        logging.error(f"Failed to create DataFrame from new records: {e}", exc_info=True)
        return

    # Ensure df_new_records has all columns from CSV_COLUMN_ORDER, adding NaNs for missing ones
    for col in CSV_COLUMN_ORDER:
        if col not in df_new_records.columns:
            df_new_records[col] = np.nan
    # Select and reorder columns to match CSV_COLUMN_ORDER
    df_new_records = df_new_records[CSV_COLUMN_ORDER]

    # Ensure directory for the CSV file exists
    os.makedirs(os.path.dirname(filepath), exist_ok=True)

    df_existing_records = pd.DataFrame(columns=CSV_COLUMN_ORDER) # Default to empty DF with correct columns
    if os.path.exists(filepath) and os.path.getsize(filepath) > 0:
        try:
            df_existing_records = pd.read_csv(filepath, na_filter=True, keep_default_na=True)

            # Align columns of df_existing_records with CSV_COLUMN_ORDER
            for col in CSV_COLUMN_ORDER:
                if col not in df_existing_records.columns:
                    logging.warning(f"Column '{col}' from CSV_COLUMN_ORDER not found in existing CSV '{filepath}'. Adding it with NaNs.")
                    df_existing_records[col] = np.nan
            df_existing_records = df_existing_records[CSV_COLUMN_ORDER]

        except pd.errors.EmptyDataError:
            logging.info(f"Existing CSV file '{filepath}' is empty. Will create a new one.")
        except Exception as e:
            logging.error(f"Error reading existing CSV file '{filepath}': {e}. "
                          "Proceeding as if it were empty or to create a new file.", exc_info=True)
            df_existing_records = pd.DataFrame(columns=CSV_COLUMN_ORDER) # Reset to empty on error

    # Combine existing data with the new records.
    # New records are placed after existing ones.
    df_combined = pd.concat([df_existing_records, df_new_records], ignore_index=True)

    # Deduplicate based on the unique key columns. 'keep="last"' ensures that
    # if duplicates exist (i.e., an old record and a new record for the same key set),
    # the one from df_new_records (which is last in df_combined) is kept.
    df_final = df_combined.drop_duplicates(subset=UNIQUE_KEY_COLUMNS_FOR_CSV, keep='last').copy()

    # Optional: Sort the final DataFrame for consistent ordering in the CSV.
    if 'run_timestamp' in df_final.columns: # Should always be true
        df_final['run_timestamp'] = pd.to_datetime(df_final['run_timestamp'], errors='coerce')
    
    if 'selection_date' in df_final.columns: # Should always be true
        # Create a temporary column for sorting by datetime version of selection_date
        # Convert selection_date to string first to handle potential mixed types before to_datetime
        # This assumes selection_date might be like 'YYYYMMDD' or 'YYYY-MM-DD'
        # A try-except block can make parsing more robust if formats vary
        try:
            # Attempt parsing with a common format like YYYYMMDD if applicable, e.g., format='%Y%m%d'
            # For general case, let pandas infer or handle multiple formats.
            # Convert to string first to ensure consistent input to to_datetime
            df_final['selection_date_dt_sort'] = pd.to_datetime(df_final['selection_date'].astype(str), errors='coerce')
        except Exception as e: # Broad exception for parsing issues
            logging.warning(f"Could not reliably parse 'selection_date' for sorting: {e}. Sorting may be string-based for this column.")
            df_final['selection_date_dt_sort'] = df_final['selection_date'] # Fallback to original for sort

        df_final = df_final.sort_values(
            by=['run_timestamp', 'selection_date_dt_sort', 'scheme'],
            ascending=[False, False, True] # Latest runs, latest selection dates, then by scheme
        ).drop(columns=['selection_date_dt_sort'], errors='ignore') # Drop temporary sort column

    try:
        # Write the consolidated DataFrame to the CSV file, overwriting it.
        df_final.to_csv(filepath, mode='w', header=True, index=False, float_format='%.8f')

        num_input_records = len(df_new_records)
        num_final_records = len(df_final)
        num_existing_before_op = len(df_existing_records) # Count before concat
        net_change = num_final_records - num_existing_before_op

        logging.info(f"Processed {num_input_records} new records. "
                     f"CSV '{filepath}' now contains {num_final_records} records (net change: {net_change:+}).")
    except Exception as e:
        logging.error(f"Error writing consolidated data to CSV file '{filepath}': {e}", exc_info=True)

def setup_script_logging():
    """
    Sets up logging for the script and logs initial information.
    Returns the log file path.
    """
    log_filepath = setup_logging()
    run_timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    logging.info(f"Script execution started at: {run_timestamp}")

    logging.info(f"Python Interpreter: {sys.executable}")
    logging.info(f"Current Working Directory: {os.getcwd()}")
    logging.info(f"Pandas Version: {pd.__version__}")
    logging.info(f"Numpy Version: {np.__version__}")

    return log_filepath, run_timestamp # Return both log_filepath and run_timestamp

# --- New Function for Loading and Preparing Price Data ---
def load_and_prepare_price_data(file_path: str) -> pd.DataFrame:
    """
    Loads price data from a parquet file, validates and prepares its index.

    Args:
        file_path: The path to the price data parquet file.

    Returns:
        A pandas DataFrame with a DatetimeIndex, sorted chronologically.

    Raises:
        FileNotFoundError: If the specified file_path does not exist.
        Exception: For other errors during loading or processing.
    """
    logging.info(f"Attempting to load price data from: {os.path.abspath(file_path)}")

    if not os.path.exists(file_path):
        raise FileNotFoundError(f"Price data file not found: {os.path.abspath(file_path)}")

    try:
        df_adj_close = pd.read_parquet(file_path)

        if not isinstance(df_adj_close.index, pd.DatetimeIndex):
            logging.info("Converting price data index to DatetimeIndex...")
            # Attempt common conversions, handle potential errors
            try:
                 df_adj_close.index = pd.to_datetime(df_adj_close.index)
            except Exception as e:
                 logging.error(f"Error converting index to DatetimeIndex: {e}", exc_info=True)
                 # Decide how to handle: raise, return None, etc.
                 # For this example, we'll let the outer exception handler catch it if it fails.
                 raise # Re-raise the conversion error

        if not df_adj_close.index.is_monotonic_increasing:
            logging.info("Sorting price data index...")
            df_adj_close = df_adj_close.sort_index()

        logging.info(f"Successfully loaded and prepared price data from {file_path}")
        logging.info(f"Price data shape: {df_adj_close.shape}, Date range: {df_adj_close.index.min()} to {df_adj_close.index.max()}")
        logging.info("Price data loading completed.")

        return df_adj_close

    except Exception as e:
        logging.error(f"An error occurred while loading/preparing price data from {file_path}: {e}", exc_info=True)
        raise # Re-raise the exception so the main try/except block can catch it

# --- New Function for Finding and Mapping Files ---
def find_and_map_param_files(directory_path: str) -> Tuple[List[str], List[str], Dict[str, str]]:
    """
    Discovers .parquet and .json files in a directory that start with '20', and creates a map
    of parameter files by extracted date.

    Args:
        directory_path: The path to the directory containing the files.

    Returns:
        A tuple containing:
        - A list of discovered selection file names (.parquet).
        - A list of discovered parameter file names (.json).
        - A dictionary mapping extracted date strings to parameter file names.

    Raises:
        FileNotFoundError: If the specified directory_path does not exist.
    """
    logging.info(f"Attempting to find files in: {os.path.abspath(directory_path)}")

    if not os.path.isdir(directory_path):
        raise FileNotFoundError(f"Data directory not found: {os.path.abspath(directory_path)}")

    all_files = os.listdir(directory_path)
    logging.debug(f"Files found in directory: {all_files}")

    # Assuming files starting with '20' and ending with specific extensions are relevant
    selection_files = sorted([f for f in all_files if f.startswith('20') and f.endswith('.parquet')])
    param_files = sorted([f for f in all_files if f.startswith('20') and f.endswith('.json')])
    logging.info(f"Found {len(selection_files)} potential selection files (.parquet)")
    logging.info(f"Found {len(param_files)} potential parameter files (.json)")

    param_map = {}
    # Extracted dates sets are useful for internal function validation/logging
    # but not strictly necessary to return if only used for mismatch reporting later.
    # We'll keep them for logging within this function.
    extracted_dates_params = set()


    for pf in param_files:
        try:
            # Assume utils.extract_date_from_string is available globally or imported
            date_key = utils.extract_date_from_string(pf)
            if date_key:
                if date_key in param_map:
                    logging.warning(f"Duplicate date key '{date_key}' found for param file '{pf}'. Overwriting mapping with previous file '{param_map[date_key]}'.")
                param_map[date_key] = pf
                extracted_dates_params.add(date_key)
            else:
                logging.warning(f"Could not extract date from param file: {pf}. Skipping.")
        except Exception as e:
            logging.error(f"Error extracting date from param file '{pf}': {e}", exc_info=True)

    logging.debug(f"Parameter map created: {param_map}")
    logging.info("File discovery and parameter mapping completed.")

    # Return the lists of files and the parameter map
    # return selection_files, param_files, param_map
    return selection_files, param_files, param_map, extracted_dates_params # Return the set of extracted dates for params

# --- New Function for Pairing Files ---
def pair_data_and_param_files(
    selection_files: List[str],
    param_files: List[str],
    param_map: Dict[str, str],
    extracted_dates_params: Set[str], # Now takes this as input
    utils_module # Pass utility module
) -> List[Tuple[str, str]]:
    """
    Pairs selection files with parameter files based on date extraction.
    Reports mismatch details.

    Args:
        selection_files: List of discovered selection file names.
        param_files: List of discovered parameter file names (for mismatch reporting).
        param_map: Dictionary mapping date strings to parameter file names.
        extracted_dates_params: Set of dates extracted from parameter files.
        utils_module: Module containing necessary utility functions.

    Returns:
        A list of successfully paired (selection_file, param_file) tuples.
    """
    file_pairs = []
    extracted_dates_select = set() # Keep this inside, only used for mismatch reporting here

    logging.info("Attempting to pair selection files with parameter files...")

    for sf in selection_files:
        date_str_key = None
        try:
            date_str_key = utils_module.extract_date_from_string(sf)
            if date_str_key:
                extracted_dates_select.add(date_str_key)
                logging.debug(f"Extracted date '{date_str_key}' from selection file: {sf}")
                if date_str_key in param_map:
                    file_pairs.append((sf, param_map[date_str_key]))
                    logging.debug(f"  Matched pair: ({sf}, {param_map[date_str_key]})")
                else:
                    logging.warning(f"Could not find matching param file for data file: {sf} (extracted date: {date_str_key})")
            else:
                logging.warning(f"Could not extract valid date from selection file: {sf}. Skipping.")
        except Exception as e:
            logging.error(f"Error extracting date from selection file '{sf}': {e}", exc_info=True)

    logging.info(f"\n--- Found {len(file_pairs)} Paired Data and Parameter Files ---")

    # Mismatch details reporting using the sets and original file counts
    if len(selection_files) > len(file_pairs) or len(param_files) > len(file_pairs):
          logging.warning(f"Mismatch details: Selection dates={extracted_dates_select}, Param dates={extracted_dates_params}")

    logging.info("File pairing completed.")

    # IMPORTANT: Do NOT include the 'if not file_pairs:' check here.
    # That's logic for the caller (the main script) to decide what to do with the result.

    return file_pairs

# --- New Function to Process a Single Pair ---
def process_single_pair(
    data_file: str,
    param_file_name: str,
    output_dir: str, # Use output_dir consistently
    df_adj_close: pd.DataFrame,
    run_timestamp: str,
    log_filepath: str,
    utils_module: Any,
    extract_backtest_setups_func: Callable,
    process_all_backtests_func: Callable,
    extract_params_and_results_func: Callable
) -> List[Dict[str, Any]]:
    """
    Processes a single data/parameter file pair, runs backtests, and extracts results.

    Args:
        data_file: Name of the selection data file (.parquet).
        param_file_name: Name of the parameter file (.json).
        output_dir: Directory containing the data and parameter files.
        df_adj_close: DataFrame containing the main price data.
        run_timestamp: Timestamp for the current run.
        log_filepath: Path to the log file for the current run.
        utils_module: Module containing utility functions like extract_date_from_string.
        extract_backtest_setups_func: The function to extract backtest setups.
        process_all_backtests_func: The function to run backtests.
        extract_params_and_results_func: The function to extract performance records.

    Returns:
        A list of performance records (dictionaries) for this pair, or an empty list if
        processing failed or no valid setups were found.
    """
    current_date_str = None
    try:
        # Date re-extraction (kept here as in original logic, though could be passed in file_pairs)
        current_date_str = utils_module.extract_date_from_string(data_file)
        if not current_date_str:
            logging.error(f"Skipping pair - Failed to re-extract valid date from {data_file}")
            return [] # Return empty list on failure

        logging.info(f"Processing for extracted date: {current_date_str}")

        # Ensure price data is available before proceeding
        if df_adj_close is None or df_adj_close.empty:
             logging.critical(f"Price data (df_adj_close) is not loaded or is empty. Skipping pair for {current_date_str}.")
             return [] # Return empty list if price data is missing

        param_path = os.path.join(OUTPUT_DIR, param_file_name)
        logging.debug(f"Reading parameters from: {param_path}")
        with open(param_path, 'r', encoding='utf-8') as f:
            params = json.load(f)
            logging.info(f"Parameters loaded from {param_file_name}:")
            # Log parameters neatly
            params_str_io = io.StringIO()
            pprint.pprint(params, stream=params_str_io, width=100)
            logging.info("\n" + params_str_io.getvalue())

        selection_path = os.path.join(OUTPUT_DIR, data_file)
        logging.debug(f"Reading selection data from: {selection_path}")
        selection_df = pd.read_parquet(selection_path)
        logging.debug(f'Loaded selection_df. Shape: {selection_df.shape}, Index type: {type(selection_df.index)}, Columns: {selection_df.columns.tolist()[:10]}...')

        logging.debug(f"Extracting backtest setups for date: {current_date_str}")
        backtest_setups = extract_backtest_setups_func(
            dataframe=selection_df,
            weight_column_names=['Weight_EW', 'Weight_IV', 'Weight_SW'],
            date_str=current_date_str,
        )

        if not backtest_setups or not backtest_setups.get(current_date_str):
            logging.warning(f"No valid backtest setups extracted for {current_date_str}. Skipping backtest run for this pair.")
            return [] # Return empty list if no setups

        # Log extracted setups neatly
        logging.info(f"Successfully extracted {len(backtest_setups.get(current_date_str, {}))} setup(s) for {current_date_str}.")
        setups_str_io = io.StringIO()
        pprint.pprint(backtest_setups, stream=setups_str_io, width=120, depth=3)
        logging.debug("Extracted Backtest Setups (preview):\n" + setups_str_io.getvalue())

        logging.info(f"Running backtests for date: {current_date_str}")
        backtest_results_summary = process_all_backtests_func(backtest_setups, df_adj_close)

        # Log backtest summary neatly
        summary_str_io = io.StringIO()
        for res_date, res_schemes in backtest_results_summary.items():
            pprint.pprint({res_date: list(res_schemes.keys())}, stream=summary_str_io)
        logging.debug("\n--- Backtest Results Summary (Schemes Processed) ---\n" + summary_str_io.getvalue())


        logging.debug(f"Extracting parameters and results for date: {current_date_str}")
        run_records = extract_params_and_results_func(
            params=params,
            backtest_results_summary=backtest_results_summary,
            run_timestamp=run_timestamp,
            log_filepath=log_filepath
        )
        logging.info(f"Extracted {len(run_records)} performance records for this pair.")

        logging.info(f"--- Finished processing pair for {current_date_str}. ---")

        return run_records # Return the list of records on success

    except FileNotFoundError as e:
        logging.error(f"FILE NOT FOUND Error processing pair ({data_file}, {param_file_name}): {e}", exc_info=True)
        return [] # Return empty list on specific error
    except KeyError as e:
         logging.error(f"KEY Error processing pair ({data_file}, {param_file_name}) - often related to missing columns/dates: {e}", exc_info=True)
         return [] # Return empty list on specific error
    except Exception as e:
        logging.error(f"UNHANDLED Error processing pair ({data_file}, {param_file_name}) for date {current_date_str}: {e}", exc_info=True)
        logging.error(traceback.format_exc())
        return [] # Return empty list on any other unhandled error

# --- (Existing functions like write_results_to_csv) ---

# --- New Function to Update or Create a DataFrame with Records ---
def update_or_create_dataframe_with_records(
    new_records: List[Dict[str, Any]],
    existing_df: Optional[pd.DataFrame] = None,
    column_order: List[str] = CSV_COLUMN_ORDER,
    unique_key_columns: List[str] = UNIQUE_KEY_COLUMNS_FOR_CSV
) -> pd.DataFrame:
    """
    Updates an existing DataFrame with new records or creates a new DataFrame.

    This function aligns columns, combines new records with an optional existing
    DataFrame, deduplicates based on unique keys (keeping the latest), and sorts
    the results.

    Args:
        new_records: A list of new records (dictionaries) to be added or to update
                     existing entries.
        existing_df: An optional existing pandas DataFrame. If None or empty,
                     processing starts as if with a new DataFrame.
        column_order: The definitive list and order of columns for the resulting
                      DataFrame. Defaults to CSV_COLUMN_ORDER.
        unique_key_columns: A list of column names that define a unique record.
                            Used for deduplication, keeping the 'last' entry in
                            case of duplicates. Defaults to UNIQUE_KEY_COLUMNS_FOR_CSV.

    Returns:
        A pandas DataFrame containing the consolidated, deduplicated, and sorted records.
        Returns an empty DataFrame (with specified columns) if both new_records
        and existing_df are empty or None.
    """
    logging.debug(f"DataFrame Update: {len(new_records)} new records. Existing DF has "
                  f"{len(existing_df) if existing_df is not None else 0} rows.")

    # 1. Prepare the existing DataFrame
    if existing_df is not None and not existing_df.empty:
        df_processed_existing = existing_df.copy() # Work with a copy
        # Align columns of df_processed_existing
        for col in column_order:
            if col not in df_processed_existing.columns:
                logging.debug(f"DataFrame Update: Column '{col}' not in existing_df. Adding with NaNs.")
                df_processed_existing[col] = np.nan
        df_processed_existing = df_processed_existing[column_order].copy() # Re-index and ensure copy
    else:
        # Start with an empty DataFrame with defined columns if no valid existing_df
        df_processed_existing = pd.DataFrame(columns=column_order)

    # 2. Prepare the new records DataFrame
    if new_records:
        try:
            df_new_records = pd.DataFrame(new_records)
            # Align columns of df_new_records
            for col in column_order:
                if col not in df_new_records.columns:
                    df_new_records[col] = np.nan
            df_new_records = df_new_records[column_order].copy() # Re-index and ensure copy
        except Exception as e:
            logging.error(f"DataFrame Update: Failed to create DataFrame from new records: {e}", exc_info=True)
            # If new records fail, proceed with only the (processed) existing DataFrame
            df_new_records = pd.DataFrame(columns=column_order) # Empty DF with correct columns
    else:
        df_new_records = pd.DataFrame(columns=column_order) # Empty DF if no new records

    # 3. Combine DataFrames
    if df_processed_existing.empty and df_new_records.empty:
        logging.info("DataFrame Update: No existing data and no new records. Returning empty DataFrame.")
        return pd.DataFrame(columns=column_order) # Explicitly return empty DF with columns

    df_combined = pd.concat([df_processed_existing, df_new_records], ignore_index=True)

    # 4. Deduplicate
    # Ensure unique_key_columns exist, otherwise, deduplication might fail or be meaningless
    missing_keys = [key for key in unique_key_columns if key not in df_combined.columns]
    if missing_keys:
        logging.warning(f"DataFrame Update: Unique key columns {missing_keys} not found in combined DataFrame. Skipping deduplication.")
        df_final = df_combined.copy()
    elif df_combined.empty:
        df_final = df_combined.copy() # Already empty, just copy
    else:
        df_final = df_combined.drop_duplicates(subset=unique_key_columns, keep='last').copy()

    num_new_actual = len(df_new_records) # Actual count from new_records list
    num_existing_initial = len(existing_df) if existing_df is not None else 0
    num_final_df = len(df_final)
    # Net change calculation is tricky if existing_df was modified before concat
    # Let's use a simpler logging message
    logging.info(
        f"DataFrame Update: Started with {num_existing_initial} existing records, processed {num_new_actual} new records. "
        f"Resulting DataFrame has {num_final_df} records after consolidation and deduplication."
    )


    # 5. Sort the final DataFrame
    if df_final.empty:
        logging.debug("DataFrame Update: Final DataFrame is empty, no sorting needed.")
        return df_final # Already has correct columns from earlier steps

    # Ensure 'run_timestamp' and 'selection_date' are present for sorting; critical for logic.
    # These should be in 'column_order'.
    if 'run_timestamp' in df_final.columns:
        df_final.loc[:, 'run_timestamp'] = pd.to_datetime(df_final.loc[:, 'run_timestamp'], errors='coerce')

    if 'selection_date' in df_final.columns:
        try:
            # Using .loc for assignment to avoid SettingWithCopyWarning
            df_final.loc[:, 'selection_date_dt_sort'] = pd.to_datetime(df_final['selection_date'].astype(str), errors='coerce')
        except Exception as e:
            logging.warning(f"DataFrame Update: Could not parse 'selection_date' for sorting: {e}. Sorting may be string-based.")
            df_final.loc[:, 'selection_date_dt_sort'] = df_final['selection_date'] # Fallback

        # Perform sorting
        df_final = df_final.sort_values(
            by=['run_timestamp', 'selection_date_dt_sort', 'scheme'],
            ascending=[False, False, True] # Latest runs, latest selection dates, then by scheme
        ).drop(columns=['selection_date_dt_sort'], errors='ignore')
    elif 'run_timestamp' in df_final.columns and 'scheme' in df_final.columns:
        logging.warning("DataFrame Update: 'selection_date' not found for sorting. Sorting by 'run_timestamp' and 'scheme'.")
        df_final = df_final.sort_values(
            by=['run_timestamp', 'scheme'],
            ascending=[False, True]
        )
    elif 'run_timestamp' in df_final.columns:
        logging.warning("DataFrame Update: Only 'run_timestamp' found for sorting.")
        df_final = df_final.sort_values(by=['run_timestamp'], ascending=False)
    else:
        logging.warning("DataFrame Update: Key sorting columns ('run_timestamp', 'selection_date') not found. DataFrame may not be optimally sorted.")

    return df_final



In [4]:
# --- Main Execution Block (Updated with process_single_pair) ---

log_filepath = None
run_timestamp = None
df_adj_close = None
all_performance_records = []
file_pairs = []

try:
    log_filepath, run_timestamp = setup_script_logging()

    # --- Step 1: Load and Prepare Price Data ---
    df_adj_close = load_and_prepare_price_data(ADJ_CLOSE_PATH)

    # --- Step 2: Discover and Map Input Files ---
    # Use 'output' directory for demonstration consistency
    selection_files, param_files, param_map, extracted_dates_params = find_and_map_param_files(OUTPUT_DIR)

    # --- Step 3: Pair Selection Files with Parameter Files ---
    file_pairs = pair_data_and_param_files(
        selection_files=selection_files,
        param_files=param_files,
        param_map=param_map,
        extracted_dates_params=extracted_dates_params,
        utils_module=utils
    )

    # --- Step 4: Check if pairs were found and log before proceeding to loop ---
    if not file_pairs:
        logging.warning("No file pairs found to process. Skipping backtest loop.")
    else:
        logging.info(f"Starting processing for {len(file_pairs)} file pairs...")

        # --- Step 5: Process Paired Files (Loop calls process_single_pair) ---
        processed_pair_count = 0
        for data_file, param_file_name in file_pairs:
            processed_pair_count += 1

            # Log the start of processing for this specific pair (kept in the loop for context)
            logging.info(f"\n--- Processing Pair {processed_pair_count}/{len(file_pairs)}: Data='{data_file}', Params='{param_file_name}' ---")

            # Call the function to process this single pair
            # Pass necessary data and function references
            pair_records = process_single_pair(
                data_file=data_file,
                param_file_name=param_file_name,
                output_dir=OUTPUT_DIR, # Directory where the pair files live
                df_adj_close=df_adj_close, # Pass the main price data
                run_timestamp=run_timestamp, # Pass run metadata
                log_filepath=log_filepath,
                utils_module=utils, # Pass utilities module
                extract_backtest_setups_func=extract_backtest_setups, # Pass function references
                process_all_backtests_func=process_all_backtests,
                extract_params_and_results_func=extract_params_and_results
            )

            # Extend the main results list with the records returned by the function
            # This handles cases where the function returned [] due to an error/skip
            all_performance_records.extend(pair_records)

        logging.info("--- File processing loop finished. ---")

    # --- Step 6: Save Accumulated Results ---

    # --- CSV Handling (primarily for new records from this run) ---
    if all_performance_records:
        logging.info(f"\n--- Attempting to Save/Update {len(all_performance_records)} Performance Records to CSV ---")
        write_results_to_csv(all_performance_records, RESULTS_CSV_PATH) # This updates/overwrites CSV based on its own logic
    else:
        logging.info("\n--- No new performance records from this run to add to CSV. ---")

    # --- DataFrame Store Handling (e.g., Parquet) ---
    logging.info(f"\n--- Processing All Performance Records for DataFrame Store (e.g., Parquet) ---")
    
    # Load existing DataFrame from Parquet, if any
    current_results_df = None
    if os.path.exists(RESULTS_DF_PATH):
        try:
            current_results_df = pd.read_parquet(RESULTS_DF_PATH)
            logging.info(f"Loaded existing results DataFrame from {RESULTS_DF_PATH} ({len(current_results_df)} records).")
        except Exception as e:
            logging.error(f"Error loading existing results DataFrame from {RESULTS_DF_PATH}: {e}. Will proceed as if creating a new DataFrame.", exc_info=True)
            current_results_df = None # Reset if loading fails

    # Consolidate new records with the existing DataFrame content
    # The new_records argument will be all_performance_records from the current run.
    # The existing_df argument will be current_results_df loaded from Parquet.
    final_results_df = update_or_create_dataframe_with_records(
        new_records=all_performance_records, # Records from the current run
        existing_df=current_results_df      # DataFrame loaded from storage (or None)
        # Using default column_order=CSV_COLUMN_ORDER and unique_key_columns=UNIQUE_KEY_COLUMNS_FOR_CSV
    )

    # Save the final consolidated DataFrame to Parquet
    # This overwrites the Parquet file with the complete, updated dataset.
    if final_results_df is not None and not final_results_df.empty:
        try:
            # Ensure the directory exists (LOG_DIR should already be created by setup_logging)
            os.makedirs(os.path.dirname(RESULTS_DF_PATH), exist_ok=True)
            final_results_df.to_parquet(RESULTS_DF_PATH, index=False)
            logging.info(f"Final results DataFrame successfully saved to {RESULTS_DF_PATH} ({len(final_results_df)} records).")
        except Exception as e:
            logging.error(f"Error saving final results DataFrame to {RESULTS_DF_PATH}: {e}", exc_info=True)
    elif final_results_df is not None and final_results_df.empty:
        logging.info("Final results DataFrame is empty. Not saving to Parquet. If an old Parquet file exists, it may remain or you might want to delete it.")
        # Optional: Delete the parquet file if the result is empty to signify no data
        # if os.path.exists(RESULTS_DF_PATH):
        #     os.remove(RESULTS_DF_PATH)
        #     logging.info(f"Removed empty Parquet file: {RESULTS_DF_PATH}")
    else: # Should ideally not happen if the function guarantees a DataFrame return
        logging.error("The function to update/create DataFrame returned None. This is unexpected. Parquet file not saved.")





except FileNotFoundError as e:
    print(f"FATAL ERROR: Required file or directory not found: {e}")
    if log_filepath and logging.getLogger().hasHandlers():
          logging.critical(f"FATAL FileNotFoundError: {e}", exc_info=True)
    else:
          print(f"Logging not initialized. Error: {e}")
except Exception as e:
    print(f"FATAL ERROR in main execution block: {e}")
    if log_filepath and logging.getLogger().hasHandlers():
          logging.critical(f"CRITICAL ERROR in main execution block: {e}", exc_info=True)
    else:
          print(f"Logging not initialized. Error: {e}")
          traceback.print_exc()
finally:
    final_message = "=== Script Execution Finished (with errors if reported above) ==="
    print(final_message)
    if log_filepath and logging.getLogger().hasHandlers():
          logging.info(final_message)
          logging.shutdown()
          print("Logging shutdown complete.")
    else:
          print("Logging was not fully initialized or already shut down.")

2025-05-20 15:33:34,924 - INFO - Logging initialized. Log file: logs\backtest_run_20250520_153334.log
2025-05-20 15:33:34,924 - INFO - Script execution started at: 2025-05-20 15:33:34
2025-05-20 15:33:34,924 - INFO - Python Interpreter: c:\Users\ping\Files_win10\python\py311\.venv\Scripts\python.exe
2025-05-20 15:33:34,940 - INFO - Current Working Directory: c:\Users\ping\Files_win10\python\py311\stocks\notebooks
2025-05-20 15:33:34,940 - INFO - Pandas Version: 2.2.3
2025-05-20 15:33:34,949 - INFO - Numpy Version: 1.26.4
2025-05-20 15:33:34,951 - INFO - Attempting to load price data from: c:\Users\ping\Files_win10\python\py311\stocks\data\df_adj_close.parquet
2025-05-20 15:33:35,360 - INFO - Successfully loaded and prepared price data from ../data/df_adj_close.parquet
2025-05-20 15:33:35,377 - INFO - Price data shape: (326, 1565), Date range: 2024-02-01 00:00:00 to 2025-05-20 00:00:00
2025-05-20 15:33:35,377 - INFO - Price data loading completed.
2025-05-20 15:33:35,377 - INFO - Attemp

=== Script Execution Finished (with errors if reported above) ===
Logging shutdown complete.
