# `README.md`

# A Scalable Framework for Benford's Law Conformity Testing of Financial Data

<!-- PROJECT SHIELDS -->
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![Python Version](https://img.shields.io/badge/python-3.9%2B-blue.svg)](https://www.python.org/downloads/)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Imports: isort](https://img.shields.io/badge/%20imports-isort-%231674b1?style=flat&labelColor=ef8336)](https://pycqa.github.io/isort/)
[![Type Checking: mypy](https://img.shields.io/badge/type_checking-mypy-blue)](http://mypy-lang.org/)
[![Jupyter](https://img.shields.io/badge/Jupyter-%23F37626.svg?style=flat&logo=Jupyter&logoColor=white)](https://jupyter.org/)
[![arXiv](https://img.shields.io/badge/arXiv-2509.09415-b31b1b.svg)](https://arxiv.org/abs/2509.09415)
[![Year](https://img.shields.io/badge/Year-2025-purple)](https://github.com/chirindaopensource/search_benford_law_compatibility)
[![Discipline](https://img.shields.io/badge/Discipline-Forensic%20Accounting%20%26%20Econometrics-blue)](https://github.com/chirindaopensource/search_benford_law_compatibility)
[![Methodology](https://img.shields.io/badge/Methodology-Benford's%20Law%20%7C%20Goodness--of--Fit-orange)](https://github.com/chirindaopensource/search_benford_law_compatibility)
[![Data Source](https://img.shields.io/badge/Data-Refinitiv%20EIKON-lightgrey)](https://www.refinitiv.com/en/products/eikon-trading-software)
[![Pandas](https://img.shields.io/badge/pandas-%23150458.svg?style=flat&logo=pandas&logoColor=white)](https://pandas.pydata.org/)
[![NumPy](https://img.shields.io/badge/numpy-%23013243.svg?style=flat&logo=numpy&logoColor=white)](https://numpy.org/)
[![SciPy](https://img.shields.io/badge/SciPy-%23025596?style=flat&logo=scipy&logoColor=white)](https://scipy.org/)
[![PyYAML](https://img.shields.io/badge/PyYAML-4B5F6E.svg?style=flat)](https://pyyaml.org/)

--

**Repository:** `https://github.com/chirindaopensource/search_benford_law_compatibility`

**Owner:** 2025 Craig Chirinda (Open Source Projects)

This repository contains an **independent**, professional-grade Python implementation of the research methodology from the 2025 paper entitled **"Note on pre-taxation reported data by UK FTSE-listed companies. A search for Benford's laws compatibility"** by:

*   Marcel Ausloos
*   Probowo Erawan Sastroredjo
*   Polina Khrennikova

The project provides a complete, end-to-end computational framework for testing the conformity of financial datasets with Benford's Law. It delivers a modular, auditable, and extensible pipeline that replicates the paper's entire workflow: from rigorous data and configuration validation, through robust, mathematically-grounded digit extraction, to the precise calculation of Chi-Squared and Mean Absolute Deviation (MAD) test statistics and the final generation of publication-quality results tables.

## Table of Contents

- [Introduction](#introduction)
- [Theoretical Background](#theoretical-background)
- [Features](#features)
- [Methodology Implemented](#methodology-implemented)
- [Core Components (Notebook Structure)](#core-components-notebook-structure)
- [Key Callable: execute_full_project_workflow](#key-callable-execute_full_project_workflow)
- [Prerequisites](#prerequisites)
- [Installation](#installation)
- [Input Data Structure](#input-data-structure)
- [Usage](#usage)
- [Output Structure](#output-structure)
- [Project Structure](#project-structure)
- [Customization](#customization)
- [Contributing](#contributing)
- [Recommended Extensions](#recommended-extensions)
- [License](#license)
- [Citation](#citation)
- [Acknowledgments](#acknowledgments)

## Introduction

This project provides a Python implementation of the methodologies presented in the 2025 paper "Note on pre-taxation reported data by UK FTSE-listed companies. A search for Benford's laws compatibility." The core of this repository is the iPython Notebook `search_benford_law_compatibility_draft.ipynb`, which contains a comprehensive suite of functions to replicate the paper's findings, from initial data validation to the final generation and analysis of conformity test results.

The paper addresses a classic problem in forensic accounting and auditing: using statistical laws to identify potential anomalies in reported financial data. This codebase operationalizes the paper's approach, allowing users to:
-   Rigorously validate input data and methodological parameters against a predefined schema.
-   Perform a detailed data quality audit, identifying outliers and missing value patterns without altering the source data.
-   Create derived analytical variables (e.g., PI/TA ratio) and segmented datasets (e.g., profitable vs. unprofitable firms).
-   Apply robust, mathematically-grounded algorithms to extract the first, second, and first-two significant digits from numerical data.
-   Calculate the theoretical probability distributions for Benford's Law (BL1, BL2, BL12) with high precision.
-   Execute Pearson's Chi-Squared (χ²) and Mean Absolute Deviation (MAD) goodness-of-fit tests to quantify deviations from the theoretical benchmarks.
-   Generate a full suite of publication-quality tables that precisely replicate the findings in the source paper.
-   Conduct a comprehensive series of robustness checks, including bootstrap resampling, parameter sensitivity analysis, and temporal stability analysis.

## Theoretical Background

The implemented methods are grounded in Probability Theory, Statistics, and Forensic Accounting.

**1. Benford's Law (The First-Digit Law):**
Benford's Law states that in many naturally occurring sets of numbers, the leading significant digit is more likely to be small. The probability of a first digit `d` is given by:
$$ P(d_1 = i) = \log_{10}\left(1 + \frac{1}{i}\right) \quad \text{for } i \in \{1, ..., 9\} $$
This principle extends to second and subsequent digits, as well as to blocks of digits, with specific logarithmic formulas. Deviations from these expected frequencies can suggest that a dataset was not naturally generated and may be the result of manipulation, fabrication, or systemic error.

**2. Chi-Squared (χ²) Goodness-of-Fit Test:**
This is a classical statistical test used to determine if there is a significant difference between observed and expected frequencies. The test statistic is calculated as:
$$ \chi^2 = \sum_{i=1}^{K} \frac{(O_i - E_i)^2}{E_i} $$
where `O` are the observed counts and `E` are the expected counts across `K` bins. A large χ² statistic suggests that the observed data does not fit the theoretical distribution.

**3. Mean Absolute Deviation (MAD) Test:**
The MAD test provides a direct measure of the magnitude of the deviation between the observed proportions (`fₒ`) and the expected proportions (`fₑ`). The paper uses the sum of absolute deviations:
$$ \text{MAD} = \sum_{i=1}^{K} |f_{o,i} - f_{e,i}| $$
This statistic is less sensitive to sample size than the χ² test and provides an intuitive measure of non-conformity, which is then compared against established thresholds.

## Features

The provided iPython Notebook (`search_benford_law_compatibility_draft.ipynb`) implements the full research pipeline, including:

-   **Modular, Task-Based Architecture:** The entire pipeline is broken down into 18 distinct, modular tasks, from data validation to final packaging.
-   **Configuration-Driven Design:** All methodological parameters are managed in an external `config.yaml` file, allowing for easy customization without code changes.
-   **Professional-Grade Data Validation:** A comprehensive validation suite ensures all inputs (data and configurations) conform to the required schema before execution.
-   **Robust, Mathematical Digit Extraction:** Numerically stable, from-scratch implementations of logarithmic algorithms for extracting significant digits, avoiding common pitfalls of string-based methods.
-   **High-Fidelity Statistical Testing:** Precise, vectorized implementations of the Chi-Squared and Mean Absolute Deviation tests.
-   **Automated Report Generation:** Programmatic generation of all 6 analytical tables (Tables 3-8) from the paper with high fidelity to the original formatting.
-   **Advanced Robustness Toolkit:**
    -   A framework for conducting **bootstrap resampling** to assess the stability of test statistics.
    -   A framework for **parameter sensitivity analysis** to test the impact of varying alpha levels and MAD thresholds.
    -   A framework for **temporal stability analysis** to check for consistency of results over time.
-   **Automated Replication Validation:** A final quality assurance step that programmatically compares the generated results against the paper's published statistics and produces a certification report.

## Methodology Implemented

The core analytical steps directly implement the methodology from the paper:

1.  **Validation (Tasks 1-3):** Ingests and rigorously validates the raw data and `config.yaml` file, and performs a detailed data quality audit.
2.  **Preprocessing (Tasks 4-6):** Prepares the data by flagging outliers, creating the PI/TA ratio, segmenting the data by profitability, and generating the summary statistics table (Table 1).
3.  **Digit Extraction (Tasks 7-9):** Applies robust mathematical algorithms to extract the first, second, and first-two significant digits for all relevant variables.
4.  **Theoretical Distribution Generation (Tasks 10-12):** Calculates the high-precision theoretical probabilities and expected frequencies for BL1, BL2, and BL12.
5.  **Statistical Testing & Reporting (Tasks 13-15):** Executes the Chi-Squared and MAD tests for all variable-digit combinations and compiles the results into replications of Tables 3-8.
6.  **Robustness & Packaging (Tasks 16-18):** Orchestrates the entire pipeline, runs the optional robustness checks, and performs the final validation and packaging of all outputs.

## Core Components (Notebook Structure)

The `search_benford_law_compatibility_draft.ipynb` notebook is structured as a logical pipeline with modular orchestrator functions for each of the major tasks. All functions are self-contained, fully documented with type hints and docstrings, and designed for professional-grade execution.

## Key Callable: execute_full_project_workflow

The central function in this project is `execute_full_project_workflow`. It orchestrates the entire analytical workflow, providing a single entry point for running the baseline study replication and the advanced robustness checks.

```python
def execute_full_project_workflow(
    raw_financial_data: pd.DataFrame,
    study_configuration: Dict[str, Any],
    output_directory: str,
    run_robustness_checks: bool = True
) -> Dict[str, Any]:
    """
    Executes the entire, end-to-end research project workflow.
    """
    # ... (implementation is in the notebook)
```

## Prerequisites

-   Python 3.9+
-   Core dependencies: `pandas`, `numpy`, `scipy`, `pyyaml`, `tqdm`.

## Installation

1.  **Clone the repository:**
    ```sh
    git clone https://github.com/chirindaopensource/search_benford_law_compatibility.git
    cd search_benford_law_compatibility
    ```

2.  **Create and activate a virtual environment (recommended):**
    ```sh
    python -m venv venv
    source venv/bin/activate  # On Windows, use `venv\Scripts\activate`
    ```

3.  **Install Python dependencies:**
    ```sh
    pip install pandas numpy scipy pyyaml tqdm
    ```

## Input Data Structure

The pipeline requires two primary inputs:
1.  **`raw_financial_data`:** A `pandas.DataFrame` containing the panel data. It **must** have a `MultiIndex` with the levels `['CompanyID', 'Year']` and the columns `['CompanyName', 'PreTaxIncome_GBP', 'TotalAssets_GBP']`. Financial columns must be `float64`.
2.  **`study_configuration`:** A Python dictionary loaded from the `config.yaml` file, which controls all methodological parameters.

A mock data generation function is provided in the main notebook to create a valid example DataFrame for testing the pipeline.

## Usage

The `search_benford_law_compatibility_draft.ipynb` notebook provides a complete, step-by-step guide. The core workflow is:

1.  **Prepare Inputs:** Load your `raw_financial_data` DataFrame. Ensure the `config.yaml` file is present in the same directory.
2.  **Execute Pipeline:** Call the grand orchestrator function.

    ```python
    # This single call runs the entire project.
    final_project_outputs = execute_full_project_workflow(
        raw_financial_data=my_company_data_df,
        study_configuration=my_config_dict,
        output_directory="research_outputs",
        run_robustness_checks=False  # Set to True for the full analysis
    )
    ```
3.  **Inspect Outputs:** All results are saved to the specified output directory. You can also programmatically access any result from the returned dictionary.

## Output Structure

The `execute_full_project_workflow` function returns a single, comprehensive dictionary containing all generated artifacts. Additionally, the `output_directory` will be populated with:
-   `replication_validation_report.json`: A report certifying the accuracy of the replication.
-   `data_quality_report.json`: A detailed audit of the input data quality.
-   `table_1_summary_statistics.csv`: The replicated descriptive statistics table.
-   `table_3_body.csv`, `table_3_stats.csv`, etc.: Pairs of CSV files for each analytical table (3-8).
-   `raw_test_results.json`: A comprehensive file with the detailed numerical outputs of all statistical tests.
-   `robustness_analysis_report.json`: (If run) A file with the results of all robustness checks.

## Project Structure

```
search_benford_law_compatibility/
│
├── search_benford_law_compatibility_draft.ipynb   # Main implementation notebook
├── config.yaml                                    # Master configuration file
├── requirements.txt                               # Python package dependencies
├── LICENSE                                        # MIT license file
└── README.md                                      # This documentation file
```

## Customization

The pipeline is highly customizable via the `config.yaml` file. Users can easily modify all methodological parameters, such as statistical thresholds, critical values, and bin definitions, without altering the core Python code.

## Contributing

Contributions are welcome. Please fork the repository, create a feature branch, and submit a pull request with a clear description of your changes. Adherence to PEP 8, type hinting, and comprehensive docstrings is required.

## Recommended Extensions

Future extensions could include:
-   **Additional Goodness-of-Fit Tests:** Implementing other statistical tests for Benford's Law conformity, such as the Kolmogorov-Smirnov test or Kuiper's test.
-   **Visualization Module:** Creating a function that takes the final results and generates plots of the observed vs. expected distributions, similar to Figures 2-4 in the paper.
-   **Automated Reporting:** Building a module that uses the generated tables and plots to automatically create a full PDF or HTML summary report of the findings.
-   **Integration with Database:** Developing a data ingestion module to pull data directly from a SQL database instead of a CSV file.

## License

This project is licensed under the MIT License. See the `LICENSE` file for details.

## Citation

If you use this code or the methodology in your research, please cite the original paper:

```bibtex
@article{ausloos2025note,
  title={{Note on pre-taxation reported data by UK FTSE-listed companies. A search for Benford's laws compatibility}},
  author={Ausloos, Marcel and Sastroredjo, Probowo Erawan and Khrennikova, Polina},
  journal={arXiv preprint arXiv:2509.09415},
  year={2025}
}
```

For the implementation itself, you may cite this repository:
```
Chirinda, C. (2025). A Python Implementation for Benford's Law Conformity Testing of Financial Data.
GitHub repository: https://github.com/chirindaopensource/search_benford_law_compatibility
```

## Acknowledgments

-   Credit to **Marcel Ausloos, Probowo Erawan Sastroredjo, and Polina Khrennikova** for their foundational research, which forms the entire basis for this computational replication.
-   This project is built upon the exceptional tools provided by the open-source community. Sincere thanks to the developers of the scientific Python ecosystem, including **Pandas, NumPy, SciPy, and PyYAML**, whose work makes complex computational analysis accessible and robust.

--

*This README was generated based on the structure and content of `search_benford_law_compatibility_draft.ipynb` and follows best practices for research software documentation.*

# Paper

Title: "*Note on pre-taxation reported data by UK FTSE-listed companies. A search for Benford's laws compatibility*"

Authors: Marcel Ausloos, Probowo Erawan Sastroredjo, Polina Khrennikova

E-Journal Submission Date: 11 September 2025

Link: https://arxiv.org/abs/2509.09415


Abstract:

**Objective & Motivation**
> Pre-taxation analysis plays a crucial role in ensuring the fairness of public revenue collection. It can also serve as a tool to reduce the risk of tax avoidance, one of the UK government's concerns.

**Dataset & Variables**
> Our report utilises pre-tax income (`PI`) and total assets (`TA`) data from 567 companies listed on the FTSE All-Share index, gathered from the Refinitiv EIKON database, covering 14 years, i.e., the period from 2009 to 2022. We also derive the `PI/TA` ratio, and distinguish between positive and negative `PI` cases.

**Methodology**
> We test the conformity of such data to Benford's Laws, specifically studying the first significant digit (`Fd`), the second significant digit (`Sd`), and the first and second significant digits (`FSd`). We use and justify two pertinent tests, the χ² and the Mean Absolute Deviation (MAD).

**Key Findings & Implications**
> We find that both tests are not leading to conclusions in complete agreement with each other, in particular the MAD test entirely rejects the Benford's Laws conformity of the reported financial data.
>
> *   **From an accounting perspective:** We conclude that the findings not only cast some doubt on the reported financial data, but also suggest that many more investigations be envisaged on closely related matters.
>
> *   **From a methodological perspective:** The study of a ratio, like `PI/TA`, of variables which are (or not) Benford's Laws compliant add to the literature debating whether such indirect variables should (or not) be Benford's Laws compliant.

**Keywords**
> Benford's laws; first significant digit; first-second significant digit; second significant digit; FTSE-listed companies; MAD statistical tests

# Summary

### Summary and Analysis of "A search for Benford's laws compatibility"

Here is a structured breakdown of the research presented by Ausloos et al.

#### **Research Objective and Motivation**

The fundamental goal of this paper is to perform a forensic accounting analysis on a large dataset of UK public companies. The authors use **Benford's Law**—an empirical observation that the leading digits in many real-world numerical datasets follow a specific, non-uniform logarithmic distribution—as a tool to test for potential irregularities.

The motivation is twofold and highly relevant in financial econometrics:

1.  **Fraud and Anomaly Detection:** Deviations from Benford's Law can be a red flag, suggesting that the data may not be naturally occurring. This could imply earnings management, tax avoidance strategies, or even outright data fabrication. The UK tax gap (£39.8 billion) is cited as a key motivator, positioning this work as a potential pre-audit screening tool for tax authorities like HMRC.
2.  **Methodological Contribution:** The paper aims to contribute to the ongoing academic debate about the scope and limitations of Benford's Law. Specifically, it tests not only the first significant digit (BL1) but also higher-order laws for the second digit (BL2) and the first-two digits combined (BL12). Crucially, it also examines the conformity of a *derived variable* (a ratio), which is a known area of contention in the literature.

#### **Data and Methodology**

The authors' approach is straightforward and grounded in standard econometric practice for this type of analysis.

*   **Dataset:**
    *   **Source:** Refinitiv EIKON database.
    *   **Sample:** 567 companies listed on the UK's FTSE All-Share index.
    *   **Time Period:** 2009 to 2022 (a 14-year window).
    *   **Primary Variables:**
        1.  **Pre-Tax Income (PI):** The core variable of interest for tax analysis. A key methodological choice was to split this dataset into positive incomes (`PI(+)`, or "gains") and negative incomes (`PI(-)`,` or "losses") for separate analysis. This is a sophisticated step, as the psychological and strategic incentives for managing profits versus losses are fundamentally different.
        2.  **Total Assets (TA):** Often used as a proxy for company size.
    *   **Derived Variable:**
        *   **PI/TA Ratio:** A measure of profitability or return on assets. Analyzing this ratio tests whether Benford's Law holds for combinations of variables.

*   **Econometric Tests (Goodness-of-Fit):**
    The authors employ two distinct statistical tests to compare the empirical digit distributions in their data to the theoretical Benford distributions. This use of multiple tests is crucial, as they measure "distance" in different ways.
    1.  **Chi-Squared (χ²) Test:** This is the classical statistical test for comparing observed frequencies with expected frequencies in categorical bins (here, the bins are the digits 1-9, 0-9, or 10-99). A known weakness, which is relevant here, is its high statistical power with large sample sizes (N ≈ 7000), meaning it can flag deviations that are statistically significant but practically irrelevant.
    2.  **Mean Absolute Deviation (MAD) Test:** This test measures the average absolute difference between the observed and expected proportions for each digit. It is often considered a more robust and practical measure in the Benford's Law literature, as it is less sensitive to sample size and provides a direct measure of the magnitude of the deviation. The authors use established conformity thresholds (e.g., for BL1, MAD < 0.006 indicates close conformity).

#### **Key Findings and Results**

The core of the paper lies in the results, which present a fascinating and important contradiction.

1.  **Universal Rejection by the MAD Test:** The most striking result is that the **MAD test shows non-conformity across the board.** For every variable tested (PI, PI(+), PI(-), TA, and PI/TA) and for every law (BL1, BL2, BL12), the calculated MAD value exceeds the threshold for conformity. From a purely practical, audit-focused perspective, this is a strong signal that the data is not "naturally occurring" and warrants further investigation.

2.  **Ambiguous Results from the χ² Test:** In stark contrast, the **χ² test provides a much more mixed and ambiguous picture.** It fails to reject the null hypothesis (i.e., it suggests conformity) in several cases, particularly for the second-digit (BL2) and first-two-digit (BL12) distributions of certain variables.

3.  **The Central Conflict:** This discrepancy between the two tests is the paper's most significant finding. The MAD test, focusing on the *magnitude* of deviation, says "all data is suspect." The χ² test, focusing on *statistical probability*, says "some data appears to conform." This highlights a classic econometric dilemma: the difference between statistical significance and practical importance. With a large dataset, even minor, economically meaningless deviations can become statistically significant under a χ² framework.

4.  **The Derived Variable (PI/TA):** The PI/TA ratio consistently fails to conform to Benford's Law by a large margin under *both* tests. This reinforces the theoretical uncertainty: the distribution of a ratio of two random variables is not guaranteed to be Benford-compliant, even if the numerator and denominator are. This finding serves as a strong cautionary note against naively applying Benford's Law to derived financial ratios.

#### **Conclusion and Critique**

From an academic standpoint, this paper is a solid "Note" that makes a valuable contribution through its careful empirical work.

*   **Main Takeaway:** The authors correctly conclude that there is a significant conflict between their chosen statistical tests. The MAD results cast serious doubt on the integrity of the reported pre-tax financial data for these FTSE companies. The ambiguity of the χ² test serves as a methodological lesson on the limitations of certain statistical tools in the presence of very large datasets.

*   **Critique and Interpretation:**
    *   The decision to rely more on the MAD test's conclusion is sound. In forensic applications, the magnitude of deviation (what MAD measures) is often more informative than the p-value from a χ² test. An auditor would likely find the MAD results more actionable.
    *   The analysis of `PI(+)` versus `PI(-)` was insightful. The incentives to "round up" to a profit or "manage" the size of a loss are different, and analyzing them separately was the correct approach.
    *   The paper effectively highlights the "derived variable problem." It demonstrates empirically that one cannot assume a ratio of variables will follow Benford's Law. This is a crucial point for anyone looking to apply these techniques in finance.
    *   The scatter plot of TA vs. PI (Figure 1) is intriguing. The authors' suggestion of two distinct "clusters" representing different "tax strategies" is speculative but points toward a promising avenue for future research using more advanced techniques like cluster analysis or mixture models to formally identify these sub-populations.

In summary, Ausloos et al. have conducted a robust empirical study that provides evidence of widespread deviation from Benford's Law in UK corporate financial data. More importantly, their work serves as a sophisticated case study on the nuances of statistical testing in forensic accounting, emphasizing the need for multiple tests and a deep understanding of their underlying assumptions and limitations. It successfully raises more questions than it answers, which is often the hallmark of good exploratory research.

# Import Essential Modules

In [None]:
#!/usr/bin/env python3
# ==============================================================================#
#
#  A Scalable Framework for Benford's Law Conformity Testing of Financial Data
#
#  This module provides a complete, production-grade implementation of the
#  analytical framework presented in "Note on pre-taxation reported data by UK
#  FTSE-listed companies. A search for Benford's laws compatibility" by Ausloos,
#  Sastroredjo, and Khrennikova (2025). It delivers a robust, scalable, and
#  reproducible system for the probabilistic screening of large numerical
#  datasets to detect deviations from expected statistical patterns, enabling
#  the efficient, risk-based prioritization of finite investigative resources.
#
#  Core Methodological Components:
#  • High-fidelity replication of Benford's Law for first, second, and first-two
#    significant digits (BL1, BL2, BL12).
#  • Precise implementation of Pearson's Chi-Squared (χ²) goodness-of-fit test.
#  • Precise implementation of the Mean Absolute Deviation (MAD) conformity test.
#  • Robust, mathematically-grounded algorithms for significant digit extraction
#    that avoid common floating-point and string-parsing pitfalls.
#  • Systematic analysis of primary financial data (Pre-Tax Income, Total Assets)
#    and derived ratios (PI/TA), including segmentation by profitability.
#
#  Technical Implementation Features:
#  • A modular, multi-phase pipeline architecture encapsulating validation,
#    preprocessing, analysis, reporting, and robustness checking.
#  • Vectorized numerical computations using NumPy for maximum efficiency and precision.
#  • Comprehensive data validation and quality assurance at each stage.
#  • A full suite of robustness checks, including bootstrap resampling, parameter
#    sensitivity analysis, and temporal stability analysis.
#  • Automated generation of publication-quality tables that replicate the study's findings.
#  • A top-level orchestrator providing a "one-button-to-run" capability for the
#    entire research workflow.
#
#  Paper Reference:
#  Ausloos, M., Sastroredjo, P. E., & Khrennikova, P. (2025). Note on pre-taxation
#  reported data by UK FTSE-listed companies. A search for Benford's laws
#  compatibility. arXiv preprint arXiv:2509.09415.
#  https://arxiv.org/abs/2509.09415
#
#  Author: CS Chirinda
#  License: MIT
#  Version: 1.0.0
#
# ==============================================================================#

# ==============================================================================
# Standard Library Imports
# ==============================================================================
import copy
import json
import math
from pathlib import Path
from typing import Any, Dict, List, Tuple

# ==============================================================================
# Third-party Library Imports
# ==============================================================================
import numpy as np
import pandas as pd
from pandas.api.types import is_integer_dtype, is_numeric_dtype, is_object_dtype
from scipy.stats import chi2, kurtosis, median_abs_deviation, skew
from tqdm import tqdm



# Implementation

# Draft 1

### **Exegesis of the Research Pipeline Orchestrators**

#### **Task 1: `validate_study_configuration`**

*   **Inputs:** A Python dictionary, `study_config`, intended to be the `STUDY_CONFIGURATION_OBJECT`.
*   **Processes:** The function sequentially invokes three specialized helper functions. Each helper accesses a specific nested key (`'data_source'`, `'benford_laws'`, `'statistical_tests'`) and performs a series of assertion-based checks. It validates the data types and exact values of critical methodological parameters (e.g., sample size, date ranges, degrees of freedom, statistical thresholds) against the hardcoded specifications derived directly from the paper.
*   **Outputs:** A boolean `True` upon successful validation of the entire configuration. It raises a specific `AssertionError` or `ValueError` upon the first detected failure.
*   **Data Transformation:** This is a pure validation function. No data is transformed; the input dictionary is merely inspected.
*   **Role in Research Pipeline:** This function serves as the foundational quality gate for the entire project. It implements the **methodological prerequisites** outlined throughout the paper, ensuring that the computational framework is perfectly aligned with the study's stated parameters before any data is processed. It guarantees that any subsequent step will use the correct significance levels, critical values, and conformity thresholds as cited in **Section 4 (p. 13)** and the footnotes of **Tables 3-8 (pp. 14-16)**.

#### **Task 2: `validate_dataframe_structure`**

*   **Inputs:** A `pandas.DataFrame`, `df`, and the validated `study_config` dictionary.
*   **Processes:** The function executes a three-step structural validation.
    1.  It verifies that the DataFrame's index is a `pandas.MultiIndex` with exactly two levels named `'CompanyID'` (object/string) and `'Year'` (integer), and that the years fall within the study's range.
    2.  It confirms the DataFrame contains the exact set of three required columns (`'CompanyName'`, `'PreTaxIncome_GBP'`, `'TotalAssets_GBP'`) and validates their dtypes, strictly enforcing `float64` for the financial variables.
    3.  It performs a completeness assessment, comparing the counts of non-null observations and unique companies against the figures reported in **Section 3.1 (p. 7)** and **Table 1 (p. 8)**, using a small tolerance.
*   **Outputs:** A boolean `True` upon successful validation. It raises an `AssertionError` or `ValueError` upon failure.
*   **Data Transformation:** This is a pure validation function. The input DataFrame is inspected but not altered.
*   **Role in Research Pipeline:** This function ensures the **input data integrity**. It confirms that the raw data conforms to the precise panel data structure required for all subsequent filtering, segmentation, and analysis, as described in **Section 3.1, "Data Acquisition" (p. 7)**.

#### **Task 3: `validate_financial_data_quality`**

*   **Inputs:** A `pandas.DataFrame`, `df`.
*   **Processes:** The function performs a three-step quality assessment without modifying the data.
    1.  It checks that the financial data points fall within plausible ranges derived from the min/max values in **Table 1 (p. 8)**.
    2.  It analyzes missing value patterns, calculating the correlation of missingness and identifying entities with high proportions of missing data.
    3.  It detects extreme outliers by calculating a robust Z-score based on the median and Median Absolute Deviation (MAD), a method appropriate for skewed financial data. This aligns with the paper's qualitative mention of preserving "~3 anomalously extreme outliers" (**p. 8**).
*   **Outputs:** A nested dictionary, `quality_report`, containing the quantitative results of each check (e.g., counts of out-of-range values, list of high-missing-data companies, counts of extreme outliers).
*   **Data Transformation:** The function transforms the input DataFrame into a structured summary report (a dictionary). The original DataFrame is not modified.
*   **Role in Research Pipeline:** This function implements the **pre-analytical data audit**. It programmatically investigates the data quality issues mentioned by the authors in **Section 3.1 (p. 8)**, providing a quantitative baseline of the data's characteristics before the main analysis begins.

#### **Task 4: `prepare_and_standardize_data`**

*   **Inputs:** A `pandas.DataFrame`, `raw_df`.
*   **Processes:** The function executes the paper's conservative "cleansing" methodology.
    1.  It creates a safe copy of the data to ensure the original input remains untouched.
    2.  It verifies that the financial columns have the required `float64` precision.
    3.  It operationalizes the outlier detection from Task 3 by adding two new boolean columns (e.g., `'IsExtreme_PI'`) to the DataFrame, flagging the rows containing extreme values.
*   **Outputs:** A new `pandas.DataFrame` that is a copy of the input, augmented with the new boolean flag columns.
*   **Data Transformation:** This function transforms the input DataFrame by adding metadata columns. The original numerical data is explicitly preserved and not altered, in direct accordance with the authors' stated methodology: "we have preferred conserving them" (**p. 8**).
*   **Role in Research Pipeline:** This function implements the **audited data preparation**. It creates the final, standardized dataset that will be used for all subsequent analysis, ensuring it is numerically pristine while carrying a transparent audit trail of its quality characteristics.

#### **Task 5: `create_analytical_variables`**

*   **Inputs:** The prepared `pandas.DataFrame` from Task 4.
*   **Processes:** The function first constructs the derived ratio variable, `PI/TA`, via robust, vectorized division that correctly handles `NaN` propagation and division-by-zero. It then uses this augmented DataFrame to create the two required analytical subsets: `pi_positive` (where `PI > 0`) and `pi_negative` (where `PI < 0`).
*   **Outputs:** A dictionary of `pandas.DataFrame`s, containing the full dataset with the new ratio, the positive-income subset, and the negative-income subset.
*   **Data Transformation:** This function transforms a single input DataFrame into a structured collection of three related DataFrames, each tailored for a specific part of the analysis.
*   **Role in Research Pipeline:** This function implements the **variable construction and sample segmentation** described in the **Abstract (p. 2)** and **Section 3.1 (p. 7)**. It creates the exact data structures upon which the Benford's Law tests will be performed.

#### **Task 6: `generate_summary_statistics`**

*   **Inputs:** The dictionary of analytical `pandas.DataFrame`s from Task 5.
*   **Processes:** The function iterates through a predefined map of variables and subsets. For each, it calculates a comprehensive suite of descriptive statistics (N, min, max, mean, std, skewness, kurtosis, CV). It then meticulously formats these statistics into a single DataFrame, applying specific rounding and data type conversions to precisely match the presentation of **Table 1 (p. 8)**.
*   **Outputs:** A single, formatted `pandas.DataFrame` that is a high-fidelity replication of Table 1.
*   **Data Transformation:** It transforms the collection of analytical datasets into a single, highly structured and formatted summary table.
*   **Role in Research Pipeline:** This function serves as the **final data validation checkpoint**. By replicating Table 1, it provides definitive, quantitative proof that the prepared dataset is statistically indistinguishable from the one used in the original study.

#### **Tasks 7, 8, 9: `extract_first_significant_digit`, `extract_second_significant_digit`, `extract_first_two_significant_digits`**

*   **Inputs:** A `pandas.Series` of numerical data.
*   **Processes:** Each of these orchestrators follows the same robust three-step pattern:
    1.  **Preprocess:** It takes the absolute value and filters for strictly positive, finite numbers.
    2.  **Extract:** It applies a direct, vectorized, and mathematically robust logarithmic formula to extract the required digit(s). The core algorithm for the first-two digits is:
        $$
        d_1d_2 = \lfloor \frac{x}{10^{(\lfloor \log_{10}(x) \rfloor - 1)}} \rfloor
        $$
        The other two functions use variations of this principle.
    3.  **Align:** It re-indexes the results back to the original series' index, correctly populating `NaN` for all invalid inputs.
*   **Outputs:** A `pandas.Series` of the same size as the input, containing the extracted digits as floats (to accommodate `NaN`).
*   **Data Transformation:** These functions transform a series of continuous financial data into a series of discrete integer digits.
*   **Role in Research Pipeline:** These functions are the **computational core of the empirical analysis**. They generate the observed digit distributions that will be compared against the theoretical Benford's Law predictions.

#### **Tasks 10, 11, 12: `generate_bl1_distribution`, `generate_bl2_distribution`, `generate_bl12_distribution`**

*   **Inputs:** An integer sample size, `N`.
*   **Processes:** Each of these orchestrators follows a three-step pattern:
    1.  **Calculate:** It computes the theoretical probabilities using a high-precision, vectorized implementation of the relevant formula from the paper. For example, for BL1, it implements:
        $$
        P(d_1 = i) = \log_{10}\left(1 + \frac{1}{i}\right) \quad \text{(Equation 1, p. 10)}
        $$
        For BL2, it implements the summation:
        $$
        P(d_2 = j) = \sum_{k=1}^{9} \log_{10}\left(1 + \frac{1}{10k + j}\right) \quad \text{(Equation 3, p. 10)}
        $$
    2.  **Validate:** It calls a generic helper to confirm the calculated probabilities are non-negative and sum to 1.0 within a tight tolerance.
    3.  **Apply:** It multiplies the validated probability distribution by the input `sample_size` to generate the expected frequencies for the test.
*   **Outputs:** A tuple containing two `pandas.Series`: the theoretical probabilities and the expected frequencies.
*   **Data Transformation:** These functions transform a mathematical formula and a sample size into the theoretical benchmarks for the statistical tests.
*   **Role in Research Pipeline:** These functions generate the **theoretical benchmarks** against which the empirical data will be judged. They are the computational embodiment of the Benford's Law theory described in **Section 3.2 (p. 10)**.

#### **Task 13: `perform_chi_squared_test`**

*   **Inputs:** An empirical digit series, the corresponding expected frequencies, degrees of freedom, and a critical value.
*   **Processes:** The function first prepares the data by calculating observed frequencies and aligning them perfectly with the expected frequencies. It then computes the χ² statistic using a vectorized implementation of:
    $$
    \chi^2 = \sum_{i=1}^{K} \frac{(O_i - E_i)^2}{E_i} \quad \text{(Equation 5, p. 13)}
    $$
    Finally, it compares this statistic to the critical value to determine the hypothesis test outcome and also calculates the p-value.
*   **Outputs:** A dictionary containing the χ² statistic, p-value, and the boolean rejection decision.
*   **Data Transformation:** It transforms empirical and theoretical frequency distributions into a single, decisive statistical result.
*   **Role in Research Pipeline:** This function implements one of the two **primary statistical tests** used in the study to quantify conformity with Benford's Law, as described in **Section 4 (p. 13)**.

#### **Task 14: `perform_mad_test`**

*   **Inputs:** An empirical digit series, the theoretical probabilities, and a dictionary of conformity thresholds.
*   **Processes:** The function first prepares the data by calculating observed proportions and aligning them with the theoretical probabilities. It then computes the MAD statistic using a vectorized implementation of the paper's specific formula:
    $$
    \text{MAD} = \sum_{i=1}^{K} |f_{o,i} - f_{e,i}| \quad \text{(Equation 6, p. 13)}
    $$
    Finally, it classifies this statistic into a conformity level using the study's specified thresholds.
*   **Outputs:** A dictionary containing the MAD statistic and the string-based conformity classification.
*   **Data Transformation:** It transforms empirical and theoretical probability distributions into a single, decisive statistical result.
*   **Role in Research Pipeline:** This function implements the second of the two **primary statistical tests** and is the one ultimately favored by the authors in their conclusion.

#### **Task 15: `compile_all_results`**

*   **Inputs:** The dictionary of analytical datasets and the `study_config` object.
*   **Processes:** This is a high-level orchestrator that systematically runs the entire test suite (BL1, BL2, BL12) for every required variable. It first computes and stores all raw numerical results. It then uses a series of dedicated, non-placeholder helper functions to meticulously format these raw results into a collection of `pandas.DataFrame`s that are high-fidelity replications of **Tables 3 through 8 (pp. 14-16, 20)**.
*   **Outputs:** A dictionary containing the comprehensive `raw_results` and the collection of formatted `publication_tables`.
*   **Data Transformation:** It transforms the collection of analytical datasets into the final, presentation-ready analytical outputs of the study.
*   **Role in Research Pipeline:** This function executes the **entire core analysis and reporting phase**, generating the primary evidence upon which the paper's conclusions are based.

#### **Task 16: `run_benford_replication_pipeline`**

*   **Inputs:** The raw `pandas.DataFrame` and the `study_config` object.
*   **Processes:** This function orchestrates the execution of the main orchestrators from Tasks 1 through 15 in a single, sequential workflow. It manages the flow of data from one major phase to the next, from initial validation to the final generation of results tables.
*   **Outputs:** A comprehensive dictionary containing all major outputs of the primary pipeline.
*   **Data Transformation:** It orchestrates the entire transformation of raw data into final, interpretable results.
*   **Role in Research Pipeline:** This function represents the **complete, end-to-end primary research workflow**.

#### **Task 17: `run_full_robustness_analysis`**

*   **Inputs:** The key outputs from the primary pipeline (`analytical_datasets`, `raw_results`, `prepared_df`) and the `study_config` object.
*   **Processes:** This top-level orchestrator sequentially executes the three distinct robustness analyses:
    1.  **Bootstrap Resampling:** Re-runs the analysis on thousands of resampled datasets to generate confidence intervals for the test statistics.
    2.  **Parameter Sensitivity:** Re-evaluates the test conclusions using different `alpha` levels and MAD thresholds.
    3.  **Temporal Stability:** Splits the data into two time periods and re-runs the entire analysis on each to check for consistency.
*   **Outputs:** A dictionary containing the structured results from all three robustness checks.
*   **Data Transformation:** It transforms the static results of the primary analysis into a dynamic assessment of their stability.
*   **Role in Research Pipeline:** This function implements the **validation and sensitivity analysis** of the primary findings, a critical step in any professional research project to ensure the conclusions are not fragile or spurious.

#### **Task 18: `package_research_outputs`**

*   **Inputs:** All major outputs from the preceding pipeline stages and a target output directory path.
*   **Processes:** This function performs the final two steps of the project.
    1.  It programmatically validates the accuracy of the pipeline's calculated statistics against a hardcoded "answer key" transcribed from the paper's tables, generating a quantitative replication success report.
    2.  It saves all artifacts—the validation report, data quality report, summary statistics, all publication tables, raw results, and robustness analyses—to the specified directory in accessible formats (CSV and JSON).
*   **Outputs:** None (the function writes files to disk).
*   **Data Transformation:** It transforms the in-memory Python objects (DataFrames, dictionaries) into persistent files on disk.
*   **Role in Research Pipeline:** This function provides the **final certification and dissemination** of the research product, ensuring its results are verifiable, auditable, and reproducible.

#### **Final Orchestrator: `execute_full_project_workflow`**

*   **Inputs:** The three fundamental project inputs: raw data, configuration, and an output directory.
*   **Processes:** This is the apex orchestrator. It executes the main orchestrators for the primary pipeline (Task 16), the robustness analysis (Task 17), and the final packaging (Task 18) in a single, seamless, and logically sound sequence, correctly managing the flow of data states between these major stages.
*   **Outputs:** A comprehensive dictionary containing all in-memory results generated during the workflow.
*   **Data Transformation:** It orchestrates the entire end-to-end transformation of raw inputs into a fully validated, analyzed, and packaged research product.
*   **Role in Research Pipeline:** This function is the **single entry point for the entire project**, embodying the principle of a "one-button-to-run" reproducible research workflow.


<br> <br>


### **Usage Example**

### **User Guide: Executing the Benford's Law Replication Pipeline**

This guide demonstrates how to properly prepare the inputs and execute the main pipeline orchestrator, `execute_full_project_workflow`, to replicate the findings of Ausloos et al. (2025).

#### **Step 1: Understanding the Input Parameters**

The main orchestrator function, `execute_full_project_workflow`, requires three primary inputs:

1.  **`study_configuration` (Type: `Dict[str, Any]`)**:
    *   **Description:** This is a Python dictionary containing all the methodological parameters of the study. It is the computational blueprint for the replication, defining everything from date ranges and statistical thresholds to the mathematical formulas being tested.
    *   **Source:** This dictionary is loaded from the `config.yaml` file we created. This practice decouples the configuration from the code, allowing for easy modification and auditing without touching the implementation.

2.  **`raw_financial_data` (Type: `pd.DataFrame`)**:
    *   **Description:** This is a pandas DataFrame containing the raw panel data for the companies under analysis. Its structure must be precise:
        *   **Index:** A `pandas.MultiIndex` with two levels: `'CompanyID'` (string) and `'Year'` (integer).
        *   **Columns:** Exactly three columns are required: `'CompanyName'` (string), `'PreTaxIncome_GBP'` (float64), and `'TotalAssets_GBP'` (float64). The `float64` dtype is non-negotiable for maintaining numerical precision.
    *   **Source:** In a real-world scenario, this DataFrame would be the result of a data acquisition script that queries a source like Refinitiv EIKON. For this example, we will create a synthetic, but structurally identical, DataFrame.

3.  **`output_directory` (Type: `str`)**:
    *   **Description:** A string representing the file path to a directory where all generated artifacts (reports, tables, logs) will be saved. The pipeline will create this directory if it does not exist.

#### **Step 2: Preparing the Environment and Inputs**

Before calling the main function, we must load the configuration from the YAML file and prepare the sample data.

**2.1. Loading the Configuration**

First, we need a function to load our `config.yaml` file. The `PyYAML` library is the standard for this.

```python
# Code Snippet: Loading the YAML configuration
import yaml
from typing import Dict, Any

def load_configuration(config_path: str) -> Dict[str, Any]:
    """Loads the study configuration from a YAML file."""
    try:
        with open(config_path, 'r') as f:
            config = yaml.safe_load(f)
        return config
    except FileNotFoundError:
        print(f"Error: Configuration file not found at '{config_path}'")
        raise
    except yaml.YAMLError as e:
        print(f"Error: Failed to parse YAML configuration file. {e}")
        raise
```

**2.2. Creating a Synthetic Dataset**

For this example to be self-contained, we will generate a small, synthetic DataFrame that perfectly matches the required input schema. This mock data will allow the pipeline to run and demonstrate its functionality.

```python
# Code Snippet: Creating a structurally-correct synthetic DataFrame
import pandas as pd
import numpy as np

def create_synthetic_data(num_companies: int = 50, years: range = range(2009, 2023)) -> pd.DataFrame:
    """Creates a synthetic DataFrame matching the required input schema."""
    # Create the MultiIndex
    company_ids = [f'COMP_{i:03d}' for i in range(num_companies)]
    index_tuples = [(cid, year) for cid in company_ids for year in years]
    multi_index = pd.MultiIndex.from_tuples(index_tuples, names=['CompanyID', 'Year'])
    
    # Create the data
    num_rows = len(multi_index)
    data = {
        'CompanyName': [f'Company {i:03d}' for i in range(num_companies) for _ in years],
        # Generate log-normally distributed data, which tends to follow Benford's Law
        'PreTaxIncome_GBP': np.random.lognormal(mean=15, sigma=2.5, size=num_rows) * np.random.choice([-1, 1], size=num_rows, p=[0.2, 0.8]),
        'TotalAssets_GBP': np.random.lognormal(mean=20, sigma=2, size=num_rows)
    }
    
    # Introduce some missing values, as expected in real data
    df = pd.DataFrame(data, index=multi_index)
    missing_indices_pi = df.sample(frac=0.05).index
    missing_indices_ta = df.sample(frac=0.04).index
    df.loc[missing_indices_pi, 'PreTaxIncome_GBP'] = np.nan
    df.loc[missing_indices_ta, 'TotalAssets_GBP'] = np.nan
    
    # Ensure dtypes are correct
    df['PreTaxIncome_GBP'] = df['PreTaxIncome_GBP'].astype(np.float64)
    df['TotalAssets_GBP'] = df['TotalAssets_GBP'].astype(np.float64)
    
    return df
```

#### **Step 3: Executing the Pipeline**

With the inputs prepared, we can now execute the main orchestrator function. The function will run the entire workflow and save the outputs to the specified directory.

```python
# Code Snippet: Executing the main pipeline orchestrator

# Assume all previously defined pipeline functions are in a module called 'benford_pipeline'
# from benford_pipeline import execute_full_project_workflow

# --- Main Execution Block ---
if __name__ == "__main__":
    
    # Define the path to the configuration file and the output directory.
    CONFIG_FILE_PATH = 'config.yaml'
    OUTPUT_DIRECTORY = 'research_outputs'

    # 1. Load the study configuration from the YAML file.
    print(f"Loading configuration from '{CONFIG_FILE_PATH}'...")
    study_config = load_configuration(config_path=CONFIG_FILE_PATH)

    # 2. Create the synthetic raw financial data.
    # In a real scenario, this would be replaced with loading your actual dataset.
    print("Creating synthetic financial data for demonstration...")
    # NOTE: We use a smaller synthetic dataset to ensure the example runs quickly.
    # The validation steps inside the pipeline that check for exact sample sizes
    # would need to be adjusted or disabled for this synthetic data. For this
    # example, we assume they are commented out for the demo run.
    raw_data = create_synthetic_data(num_companies=567, years=range(2009, 2023))
    print(f"Synthetic data created with {len(raw_data)} total observations.")

    # 3. Execute the full end-to-end project workflow.
    # This single function call runs all 18 tasks: validation, preprocessing,
    # analysis, robustness checks, and packaging.
    final_outputs = execute_full_project_workflow(
        raw_financial_data=raw_data,
        study_configuration=study_config,
        output_directory=OUTPUT_DIRECTORY,
        run_robustness_checks=False  # Set to False to speed up the example run
    )

    # 4. (Optional) Inspect the in-memory results.
    print("\n--- Example of Accessing In-Memory Outputs ---")
    # Access the replicated Table 3 from the results.
    if 'publication_tables' in final_outputs['primary_analysis']:
        table_3_body = final_outputs['primary_analysis']['publication_tables']['Table 3']['body']
        print("\nReplicated Table 3 (Body):")
        print(table_3_body.head())
```

#### **Step 4: Interpreting the Outputs**

Upon successful execution, the `output_directory` (in this case, a folder named `research_outputs`) will be populated with a series of files:

*   **`replication_validation_report.json`**: The final certification of the pipeline's accuracy against the paper's published results.
*   **`data_quality_report.json`**: A detailed report on the quality of the input data.
*   **`table_1_summary_statistics.csv`**: A CSV file containing the replicated descriptive statistics.
*   **`table_3_body.csv`, `table_3_stats.csv`, etc.**: A pair of CSV files for each of the analytical tables (3-8), separating the main data from the statistical test results for clarity.
*   **`raw_test_results.json`**: A comprehensive JSON file containing the detailed numerical outputs of every statistical test performed.
*   **`robustness_analysis_report.json`**: If `run_robustness_checks` was set to `True`, this file will contain the detailed results of the bootstrap, sensitivity, and temporal analyses.

This structured output provides a complete, auditable, and reproducible record of the entire research project.


In [None]:
# Task 1: Study Configuration Object Validation

# ==============================================================================
# Phase I, Task 1: Helper Function for Data Source Validation
# ==============================================================================
def _validate_data_source_config(config: Dict[str, Any]) -> None:
    """
    Validates the 'data_source' section of the study configuration object.

    This internal helper function performs rigorous checks on the data acquisition
    parameters defined in the configuration dictionary to ensure they align
    perfectly with the specifications from the source paper. It validates
    types, exact values, and presence of required keys.

    Args:
        config (Dict[str, Any]): The 'data_source' nested dictionary from the
                                 main study configuration object.

    Returns:
        None: This function returns nothing upon success.

    Raises:
        AssertionError: If any validation check fails, providing a specific
                        message about the discrepancy.
        KeyError: If the 'data_source' key is missing from the top-level config.
    """
    # Retrieve the 'data_source' sub-dictionary for validation.
    data_source_config = config['data_source']

    # Step 1.1: Verify company count. Must be an integer equal to 567.
    # Source: p. 7, "which comprises 567 companies"
    company_count = data_source_config.get('company_count')
    assert isinstance(company_count, int), \
        f"Config Error: 'company_count' must be an integer, not {type(company_count)}."
    assert company_count == 567, \
        f"Config Error: 'company_count' must be 567, not {company_count}."

    # Step 1.2: Confirm temporal start parameter. Must be an integer equal to 2009.
    # Source: Abstract, p. 2, "period from 2009 to 2022"
    start_year = data_source_config.get('time_period_start')
    assert isinstance(start_year, int), \
        f"Config Error: 'time_period_start' must be an integer, not {type(start_year)}."
    assert start_year == 2009, \
        f"Config Error: 'time_period_start' must be 2009, not {start_year}."

    # Step 1.3: Confirm temporal end parameter. Must be an integer equal to 2022.
    # Source: Abstract, p. 2, "period from 2009 to 2022"
    end_year = data_source_config.get('time_period_end')
    assert isinstance(end_year, int), \
        f"Config Error: 'time_period_end' must be an integer, not {type(end_year)}."
    assert end_year == 2022, \
        f"Config Error: 'time_period_end' must be 2022, not {end_year}."

    # Step 1.4: Validate currency field. Must be the string 'GBP' (case-sensitive).
    # Source: p. 7, "recorded in GBP currency"
    currency = data_source_config.get('currency')
    assert isinstance(currency, str), \
        f"Config Error: 'currency' must be a string, not {type(currency)}."
    assert currency == 'GBP', \
        f"Config Error: 'currency' must be 'GBP', not '{currency}'."

# ==============================================================================
# Phase I, Task 1: Helper Function for Benford's Law Validation
# ==============================================================================
def _validate_benford_laws_config(config: Dict[str, Any]) -> None:
    """
    Validates the 'benford_laws' section of the study configuration object.

    This internal helper function verifies the structural and numerical integrity
    of the Benford's Law definitions. It ensures the correct laws are present
    and that their parameters (degrees of freedom, bins) are specified exactly
    as required by statistical theory and the study's methodology.

    Args:
        config (Dict[str, Any]): The 'benford_laws' nested dictionary from the
                                 main study configuration object.

    Returns:
        None: This function returns nothing upon success.

    Raises:
        AssertionError: If any validation check fails.
        KeyError: If the 'benford_laws' key is missing.
    """
    # Retrieve the 'benford_laws' sub-dictionary for validation.
    benford_config = config['benford_laws']

    # Step 2.1: Verify that the configuration contains exactly three Benford's Laws.
    expected_laws = {'BL1', 'BL2', 'BL12'}
    actual_laws = set(benford_config.keys())
    assert actual_laws == expected_laws, \
        f"Config Error: 'benford_laws' must contain keys {expected_laws}, but found {actual_laws}."

    # Step 2.2: Define expected parameters for each law for systematic validation.
    expected_params = {
        'BL1': {'dof': 8, 'bins': list(range(1, 10))},
        'BL2': {'dof': 9, 'bins': list(range(0, 10))},
        'BL12': {'dof': 89, 'bins': list(range(10, 100))}
    }

    # Step 2.3: Iterate and validate each law's configuration.
    for law_key, params in expected_params.items():
        # Retrieve the specific law's configuration.
        law_config = benford_config.get(law_key, {})

        # Validate degrees of freedom (integer type and value).
        dof = law_config.get('degrees_of_freedom')
        assert isinstance(dof, int), \
            f"Config Error in {law_key}: 'degrees_of_freedom' must be an int, not {type(dof)}."
        assert dof == params['dof'], \
            f"Config Error in {law_key}: 'degrees_of_freedom' must be {params['dof']}, not {dof}."

        # Validate bin ranges (list type and exact sequence).
        bins = law_config.get('bins')
        assert isinstance(bins, list), \
            f"Config Error in {law_key}: 'bins' must be a list, not {type(bins)}."
        assert bins == params['bins'], \
            f"Config Error in {law_key}: 'bins' must be {params['bins']}, not {bins}."

# ==============================================================================
# Phase I, Task 1: Helper Function for Statistical Test Validation
# ==============================================================================
def _validate_statistical_tests_config(config: Dict[str, Any]) -> None:
    """
    Validates the 'statistical_tests' section of the study configuration object.

    This internal helper function checks the parameters for the statistical
    conformity tests (Chi-Squared and MAD). It pays special attention to the
    precision of floating-point values (alpha, critical values, thresholds)
    using tolerance-based comparisons.

    Args:
        config (Dict[str, Any]): The 'statistical_tests' nested dictionary from
                                 the main study configuration object.

    Returns:
        None: This function returns nothing upon success.

    Raises:
        AssertionError: If any validation check fails.
        KeyError: If the 'statistical_tests' key is missing.
    """
    # Retrieve the 'statistical_tests' sub-dictionary for validation.
    stats_config = config['statistical_tests']

    # --- Chi-Squared Test Validation ---
    chi_squared_config = stats_config.get('chi_squared', {})

    # Step 3.1: Verify alpha significance level. Must be a float equal to 0.05.
    # Source: Table 3 footnote, p. 14, "at 5%"
    alpha = chi_squared_config.get('alpha')
    assert isinstance(alpha, float), \
        f"Config Error: 'alpha' must be a float, not {type(alpha)}."
    assert math.isclose(alpha, 0.05, rel_tol=1e-9), \
        f"Config Error: 'alpha' must be 0.05, not {alpha}."

    # Step 3.2: Confirm critical values with high precision.
    # Source: Table footnotes on pages 14, 15, 16.
    critical_values = chi_squared_config.get('critical_values', {})
    expected_critical_values = {8: 15.507, 9: 16.919, 89: 113.145}

    # Ensure the set of degrees of freedom keys match.
    assert set(critical_values.keys()) == set(expected_critical_values.keys()), \
        "Config Error: Mismatch in degrees of freedom for critical values."

    # Check each critical value using a precision-aware comparison.
    for dof, expected_val in expected_critical_values.items():
        actual_val = critical_values.get(dof)
        assert actual_val is not None, f"Config Error: Critical value for DoF={dof} is missing."
        assert math.isclose(actual_val, expected_val, rel_tol=1e-9), \
            f"Config Error: Critical value for DoF={dof} must be {expected_val}, not {actual_val}."

    # --- MAD Test Validation ---
    mad_config = stats_config.get('mad', {})
    thresholds = mad_config.get('conformity_thresholds', {})

    # Step 3.3: Validate MAD conformity thresholds with high precision.
    # Source: p. 13 and table footnotes.
    expected_thresholds = {
        'BL1': {'close_conformity': 0.006},
        'BL2': {'close_conformity': 0.008},
        'BL12': {'close_conformity': 0.0012}
    }

    # Ensure the set of law keys match.
    assert set(thresholds.keys()) == set(expected_thresholds.keys()), \
        "Config Error: Mismatch in Benford's Law keys for MAD thresholds."

    # Check each threshold value using a precision-aware comparison.
    for law_key, expected_vals in expected_thresholds.items():
        actual_vals = thresholds.get(law_key, {})
        expected_thresh = expected_vals['close_conformity']
        actual_thresh = actual_vals.get('close_conformity')
        assert actual_thresh is not None, f"Config Error: MAD threshold for {law_key} is missing."
        assert math.isclose(actual_thresh, expected_thresh, rel_tol=1e-9), \
            f"Config Error: MAD threshold for {law_key} must be {expected_thresh}, not {actual_thresh}."

# ==============================================================================
# Phase I, Task 1: Main Orchestrator Function
# ==============================================================================
def validate_study_configuration(
    study_config: Dict[str, Any]
) -> bool:
    """
    Orchestrates the validation of the entire study configuration object.

    This function serves as the main entry point for validating the methodological
    parameters of the replication study. It sequentially invokes specialized
    helper functions to validate each major section of the configuration:
    data source, Benford's Law definitions, and statistical test parameters.
    The function ensures that the provided configuration is a high-fidelity
    representation of the methodology described in the source paper.

    Args:
        study_config (Dict[str, Any]): The complete study configuration
            dictionary, containing all non-data parameters for the analysis.

    Returns:
        bool: Returns True if the entire configuration object passes all
              validation checks successfully.

    Raises:
        ValueError: If the top-level configuration object is not a dictionary
                    or is missing essential top-level keys.
        AssertionError: Propagates specific assertion errors from helper
                        functions if any part of the configuration is invalid.
    """
    # Top-level input validation: ensure the config is a dictionary.
    if not isinstance(study_config, dict):
        raise ValueError("The study configuration object must be a dictionary.")

    # Check for the presence of essential top-level keys.
    required_keys = ['data_source', 'benford_laws', 'statistical_tests']
    for key in required_keys:
        if key not in study_config:
            raise ValueError(f"Missing required top-level key in configuration: '{key}'.")

    try:
        # Execute Step 1: Validate the 'data_source' parameters.
        _validate_data_source_config(config=study_config)

        # Execute Step 2: Validate the 'benford_laws' definitions.
        _validate_benford_laws_config(config=study_config)

        # Execute Step 3: Validate the 'statistical_tests' parameters.
        _validate_statistical_tests_config(config=study_config)

    except (AssertionError, KeyError) as e:
        # Catch specific validation errors and re-raise for clarity.
        print(f"Configuration validation failed: {e}")
        raise

    # If all checks pass without raising an exception, return True.
    return True


In [None]:
# Task 2: DataFrame Structure Validation

# ==============================================================================
# Phase I, Task 2: Helper for MultiIndex Structure Validation
# ==============================================================================
def _validate_multiindex_structure(
    df: pd.DataFrame,
    config: Dict[str, Any]
) -> None:
    """
    Validates the MultiIndex structure of the input DataFrame.

    This internal helper function performs a series of rigorous checks to ensure
    the DataFrame's index is a correctly structured pandas MultiIndex, which is
    a prerequisite for panel data analysis. It verifies the index type, number
    of levels, level names, and the data type of each level's values.

    Args:
        df (pd.DataFrame): The DataFrame to be validated.
        config (Dict[str, Any]): The study configuration object, used to
                                 retrieve the expected time period.

    Returns:
        None: Returns nothing upon successful validation.

    Raises:
        AssertionError: If any structural aspect of the MultiIndex is incorrect.
    """
    # Step 1.1: Confirm the index is a pandas MultiIndex.
    assert isinstance(df.index, pd.MultiIndex), \
        f"DataFrame index must be a pandas.MultiIndex, but found {type(df.index)}."

    # Step 1.2: Confirm the MultiIndex has exactly two levels.
    assert df.index.nlevels == 2, \
        f"DataFrame MultiIndex must have 2 levels, but found {df.index.nlevels}."

    # Step 1.3: Verify the names of the index levels are correct and in order.
    expected_names = ['CompanyID', 'Year']
    assert list(df.index.names) == expected_names, \
        f"DataFrame MultiIndex names must be {expected_names}, but found {list(df.index.names)}."

    # Step 1.4: Verify the data type of the 'CompanyID' level (level 0).
    company_id_level = df.index.get_level_values('CompanyID')
    assert is_object_dtype(company_id_level), \
        f"'CompanyID' index level must have object (string) dtype, but found {company_id_level.dtype}."

    # Step 1.5: Verify the data type of the 'Year' level (level 1).
    year_level = df.index.get_level_values('Year')
    assert is_integer_dtype(year_level), \
        f"'Year' index level must have integer dtype, but found {year_level.dtype}."

    # Step 1.6: Validate that the years in the index fall within the study's specified range.
    start_year = config['data_source']['time_period_start']
    end_year = config['data_source']['time_period_end']
    assert year_level.min() >= start_year and year_level.max() <= end_year, \
        f"Years in index must be within [{start_year}, {end_year}], but range is [{year_level.min()}, {year_level.max()}]."

# ==============================================================================
# Phase I, Task 2: Helper for Required Columns Validation
# ==============================================================================
def _validate_required_columns(df: pd.DataFrame) -> None:
    """
    Validates the presence and data types of required columns in the DataFrame.

    This function checks that the DataFrame contains the exact set of columns
    required for the analysis and that each of these columns has the correct,
    non-negotiable data type (e.g., float64 for high-precision financial data).

    Args:
        df (pd.DataFrame): The DataFrame to be validated.

    Returns:
        None: Returns nothing upon successful validation.

    Raises:
        AssertionError: If columns are missing, extra columns are present, or
                        any column has an incorrect data type.
    """
    # Step 2.1: Verify that the DataFrame contains exactly the three required columns.
    expected_columns = {'CompanyName', 'PreTaxIncome_GBP', 'TotalAssets_GBP'}
    actual_columns = set(df.columns)
    assert actual_columns == expected_columns, \
        f"DataFrame must contain exactly the columns {expected_columns}, but found {actual_columns}."

    # Step 2.2: Verify the data type of the 'CompanyName' column.
    assert is_object_dtype(df['CompanyName']), \
        f"'CompanyName' column must have object (string) dtype, but found {df.dtypes['CompanyName']}."

    # Step 2.3: Verify the data type of the 'PreTaxIncome_GBP' column is float64 for precision.
    assert df.dtypes['PreTaxIncome_GBP'] == np.float64, \
        f"'PreTaxIncome_GBP' column must have float64 dtype, but found {df.dtypes['PreTaxIncome_GBP']}."

    # Step 2.4: Verify the data type of the 'TotalAssets_GBP' column is float64 for precision.
    assert df.dtypes['TotalAssets_GBP'] == np.float64, \
        f"'TotalAssets_GBP' column must have float64 dtype, but found {df.dtypes['TotalAssets_GBP']}."

# ==============================================================================
# Phase I, Task 2: Helper for Data Completeness Assessment
# ==============================================================================
def _validate_data_completeness(
    df: pd.DataFrame,
    config: Dict[str, Any]
) -> None:
    """
    Performs a high-level assessment of the DataFrame's data completeness.

    This function checks if the number of observations and unique entities in
    the DataFrame are reasonably close to the figures reported in the source
    study. This serves as a sanity check to detect major data loading errors.

    Args:
        df (pd.DataFrame): The DataFrame to be assessed.
        config (Dict[str, Any]): The study configuration object.

    Returns:
        None: Returns nothing upon successful validation.

    Raises:
        AssertionError: If the data counts deviate significantly from the
                        study's reported figures.
    """
    # Define tolerance for count comparisons.
    tolerance = 0.05  # 5% tolerance

    # Step 3.1: Assess non-missing count for Pre-Tax Income.
    # Source: p. 8, "6768 data points" for PI.
    pi_count = df['PreTaxIncome_GBP'].count()
    expected_pi_count = 6768
    assert abs(pi_count - expected_pi_count) / expected_pi_count <= tolerance, \
        f"Expected ~{expected_pi_count} non-null PI observations, but found {pi_count}."

    # Step 3.2: Assess non-missing count for Total Assets.
    # Source: p. 8, "6811 data points" for TA.
    ta_count = df['TotalAssets_GBP'].count()
    expected_ta_count = 6811
    assert abs(ta_count - expected_ta_count) / expected_ta_count <= tolerance, \
        f"Expected ~{expected_ta_count} non-null TA observations, but found {ta_count}."

    # Step 3.3: Assess unique company count.
    # Source: p. 7, "567 companies".
    unique_companies = df.index.get_level_values('CompanyID').nunique()
    expected_companies = config['data_source']['company_count']
    assert abs(unique_companies - expected_companies) / expected_companies <= tolerance, \
        f"Expected ~{expected_companies} unique companies, but found {unique_companies}."

# ==============================================================================
# Phase I, Task 2: Main Orchestrator Function
# ==============================================================================
def validate_dataframe_structure(
    df: pd.DataFrame,
    study_config: Dict[str, Any]
) -> bool:
    """
    Orchestrates the validation of the input DataFrame's structure and content.

    This function serves as the main entry point for validating the primary
    dataset. It ensures the DataFrame adheres to the strict structural and
    content requirements of the study by invoking a series of specialized
    helper functions for:
    1. MultiIndex structure verification.
    2. Required columns and dtypes validation.
    3. Data completeness assessment against study benchmarks.

    Args:
        df (pd.DataFrame): The panel data DataFrame to be validated.
        study_config (Dict[str, Any]): The complete study configuration
            object, providing expected values for validation checks.

    Returns:
        bool: Returns True if the DataFrame passes all validation checks.

    Raises:
        ValueError: If the input `df` is not a pandas DataFrame.
        AssertionError: Propagates specific assertion errors from helpers
                        if any part of the DataFrame's structure is invalid.
    """
    # Top-level input validation.
    if not isinstance(df, pd.DataFrame):
        raise ValueError("Input 'df' must be a pandas DataFrame.")

    try:
        # Execute Step 1: Validate the MultiIndex structure.
        _validate_multiindex_structure(df=df, config=study_config)

        # Execute Step 2: Validate the required columns and their dtypes.
        _validate_required_columns(df=df)

        # Execute Step 3: Perform a data completeness assessment.
        _validate_data_completeness(df=df, config=study_config)

    except AssertionError as e:
        # Catch specific validation errors and re-raise for clear reporting.
        print(f"DataFrame structure validation failed: {e}")
        raise

    # If all checks pass, the DataFrame is structurally valid.
    return True


In [None]:
# Task 3: Financial Data Quality Validation

# ==============================================================================
# Phase I, Task 3: Helper for Value Range Validation
# ==============================================================================
def _validate_value_ranges(df: pd.DataFrame) -> Dict[str, int]:
    """
    Validates that financial data falls within plausible ranges from the study.

    This function checks the 'PreTaxIncome_GBP' and 'TotalAssets_GBP' columns
    against reasonable bounds derived from the descriptive statistics in the
    source paper (Table 1, p. 8). It flags and counts any values outside these
    ranges but does not modify the original data, adhering to the study's
    preservationist methodology.

    Args:
        df (pd.DataFrame): The DataFrame with financial data.

    Returns:
        Dict[str, int]: A dictionary containing the counts of values found
                        outside the defined plausible ranges for each variable.
    """
    # Define plausible value ranges based on study's min/max statistics.
    # Source: Table 1, p. 8. Using slightly wider bounds for robustness.
    ranges = {
        'PreTaxIncome_GBP': (-2.0e10, 6.0e10),
        'TotalAssets_GBP': (1.0e4, 3.0e12)
    }

    # Initialize a dictionary to store the counts of out-of-range values.
    out_of_range_counts = {}

    # Step 1.1: Iterate through the variables and their defined ranges.
    for col, (lower_bound, upper_bound) in ranges.items():
        # Create a boolean mask for values that are outside the bounds.
        # This correctly handles NaNs, which will result in False.
        mask = (df[col] < lower_bound) | (df[col] > upper_bound)

        # Count the number of True values in the mask, which corresponds to
        # the number of out-of-range data points.
        out_of_range_counts[col] = mask.sum()

    # Return the dictionary of counts.
    return out_of_range_counts

# ==============================================================================
# Phase I, Task 3: Helper for Missing Value Pattern Analysis
# ==============================================================================
def _analyze_missing_value_patterns(df: pd.DataFrame) -> Tuple[float, List[str]]:
    """
    Analyzes patterns of missing values in the financial data.

    This function performs two key analyses:
    1. Calculates the correlation of missingness between Pre-Tax Income and
       Total Assets to identify systematic data gaps.
    2. Identifies companies with an excessive percentage (>50%) of missing
       data points over the study period.

    Args:
        df (pd.DataFrame): The DataFrame with financial data.

    Returns:
        Tuple[float, List[str]]: A tuple containing:
            - The correlation coefficient of missingness between PI and TA.
            - A list of CompanyIDs with more than 50% missing data points.
    """
    # Step 2.1: Calculate the correlation of missingness.
    # Create a boolean DataFrame where True indicates a missing value.
    missing_df = df[['PreTaxIncome_GBP', 'TotalAssets_GBP']].isna()
    # Calculate the correlation matrix on this boolean DataFrame.
    missing_corr_matrix = missing_df.corr()
    # Extract the specific correlation between the two variables.
    missing_corr = missing_corr_matrix.loc['PreTaxIncome_GBP', 'TotalAssets_GBP']

    # Step 2.2: Identify companies with excessive missing data.
    # Group by company and calculate the mean of the boolean missingness indicator.
    # This gives the proportion of missing entries for each company.
    missing_rates = df.groupby(level='CompanyID')[['PreTaxIncome_GBP', 'TotalAssets_GBP']].apply(
        lambda x: x.isna().all(axis=1).mean()
    )
    # Filter for companies where this rate exceeds the 50% threshold.
    high_missing_companies = missing_rates[missing_rates > 0.5].index.tolist()

    # Return the calculated metrics.
    return missing_corr, high_missing_companies

# ==============================================================================
# Phase I, Task 3: Helper for Extreme Value Detection
# ==============================================================================
def _detect_extreme_values(df: pd.DataFrame) -> Dict[str, int]:
    """
    Detects extreme values using a robust Z-score methodology.

    This function identifies outliers based on the median and Median Absolute
    Deviation (MAD), which are robust to the influence of outliers themselves.
    It flags values with a robust Z-score > 5, aligning with the paper's
    mention of a few "anomalously extreme outliers" (p. 8).

    Args:
        df (pd.DataFrame): The DataFrame with financial data.

    Returns:
        Dict[str, int]: A dictionary containing the counts of extreme values
                        detected for each financial variable.
    """
    # Initialize a dictionary to store the counts of extreme values.
    extreme_value_counts = {}

    # Define the financial columns to check.
    financial_cols = ['PreTaxIncome_GBP', 'TotalAssets_GBP']

    # Step 3.1: Iterate through the financial columns to detect outliers.
    for col in financial_cols:
        # Extract the series and drop NaNs for statistical calculation.
        series = df[col].dropna()

        # Calculate the median of the series.
        series_median = series.median()

        # Calculate the Median Absolute Deviation (MAD).
        # `nan_policy='omit'` is the default and correct behavior.
        # The scale='normal' argument applies the 1/Φ⁻¹(3/4) ≈ 1.4826 factor.
        series_mad = median_abs_deviation(series, scale='normal')

        # Avoid division by zero if MAD is 0 (all values are the same).
        if series_mad == 0:
            extreme_value_counts[col] = 0
            continue

        # Step 3.2: Calculate the robust Z-score for the original series.
        # Robust Z-score = (value - median) / MAD
        robust_z_scores = (df[col] - series_median) / series_mad

        # Step 3.3: Count values where the absolute robust Z-score exceeds 5.
        extreme_mask = robust_z_scores.abs() > 5.0
        extreme_value_counts[col] = extreme_mask.sum()

    # Return the dictionary of extreme value counts.
    return extreme_value_counts

# ==============================================================================
# Phase I, Task 3: Main Orchestrator Function
# ==============================================================================
def validate_financial_data_quality(
    df: pd.DataFrame
) -> Dict[str, Any]:
    """
    Orchestrates a comprehensive validation of financial data quality.

    This function serves as the main entry point for assessing the quality of
    the numerical data within the DataFrame. It does not alter the data, but
    instead generates a detailed report by invoking helper functions that:
    1. Check for values outside plausible ranges.
    2. Analyze patterns of missing data.
    3. Detect statistically extreme outliers using a robust methodology.

    The output is a structured dictionary that serves as a data quality audit log.

    Args:
        df (pd.DataFrame): The panel data DataFrame with financial columns to
                           be validated.

    Returns:
        Dict[str, Any]: A nested dictionary containing the results of all
                        data quality checks.
    """
    # Top-level input validation.
    if not isinstance(df, pd.DataFrame):
        raise ValueError("Input 'df' must be a pandas DataFrame.")

    # Initialize the results dictionary.
    quality_report = {}

    try:
        # Execute Step 1: Validate value ranges.
        quality_report['range_validation'] = _validate_value_ranges(df=df)

        # Execute Step 2: Analyze missing value patterns.
        missing_corr, high_missing_cos = _analyze_missing_value_patterns(df=df)
        quality_report['missing_value_analysis'] = {
            'missingness_correlation': missing_corr,
            'companies_with_high_missing_data': high_missing_cos
        }

        # Execute Step 3: Detect extreme values.
        quality_report['extreme_value_detection'] = _detect_extreme_values(df=df)

    except Exception as e:
        # Catch any unexpected errors during the quality check process.
        print(f"An error occurred during data quality validation: {e}")
        raise

    # Return the comprehensive quality report.
    return quality_report


In [None]:
# Task 4: Data Cleansing and Standardization

# ==============================================================================
# Phase II, Task 4: Helper for Missing Value Preservation
# ==============================================================================
def _preserve_missing_values(df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Ensures missing values are preserved and explicitly documented.

    This function creates a safe copy of the input DataFrame to prevent
    unintended modifications. It then generates a boolean mask DataFrame that
    explicitly documents the location of every np.nan value. This adheres to

    the study's principle of not imputing or altering missing data.

    Args:
        df (pd.DataFrame): The input DataFrame.

    Returns:
        Tuple[pd.DataFrame, pd.DataFrame]: A tuple containing:
            - A safe copy of the original DataFrame.
            - A boolean DataFrame of the same shape indicating the locations
              of missing values.
    """
    # Step 1.1: Create a deep copy to ensure the original DataFrame is never modified.
    df_copy = df.copy()

    # Step 1.2: Document the locations of all missing values.
    # The .isna() method returns a boolean DataFrame of the same dimensions.
    missing_value_mask = df_copy.isna()

    # Step 1.3: Perform a verification check to ensure no NaNs were lost or added.
    assert df.isna().sum().equals(missing_value_mask.sum()), \
        "Mismatch in NaN counts after creating preservation mask. This indicates a copy error."

    # Return the safe copy and the documentation mask.
    return df_copy, missing_value_mask

# ==============================================================================
# Phase II, Task 4: Helper for Precision Standardization
# ==============================================================================
def _standardize_precision(df: pd.DataFrame) -> pd.DataFrame:
    """
    Verifies and enforces high-precision float64 dtype for financial columns.

    This function acts as a quality gate, ensuring that the key financial
    variables ('PreTaxIncome_GBP', 'TotalAssets_GBP') are represented using
    the float64 data type. This is non-negotiable for maintaining the
    numerical precision required for subsequent statistical and logarithmic
    calculations.

    Args:
        df (pd.DataFrame): The DataFrame to validate.

    Returns:
        pd.DataFrame: The validated DataFrame, passed through without
                      modification if checks are successful.

    Raises:
        TypeError: If any financial column does not have the float64 dtype.
    """
    # Step 2.1: Define the columns that require high-precision floating-point representation.
    financial_cols = ['PreTaxIncome_GBP', 'TotalAssets_GBP']

    # Step 2.2: Iterate through the columns and validate their dtype.
    for col in financial_cols:
        # This check is strict. It will not accept float32 or other numeric types.
        assert df[col].dtype == np.float64, \
            f"Column '{col}' must have dtype float64 for precision, but found {df[col].dtype}."

    # Step 2.3: As per the study, data is already in GBP. No currency conversion is needed.
    # In a production system, a currency check/conversion step would be here.

    # Return the DataFrame if all checks pass.
    return df

# ==============================================================================
# Phase II, Task 4: Helper for Flagging Extreme Values
# ==============================================================================
def _flag_extreme_values(df: pd.DataFrame) -> pd.DataFrame:
    """
    Adds boolean flags to the DataFrame to identify extreme values.

    This function operationalizes the extreme value detection from Task 3. It
    calculates the robust Z-score for each financial data point and adds new
    boolean columns to the DataFrame (e.g., 'IsExtreme_PI') that are True
    for any observation with an absolute robust Z-score greater than 5.0.
    This preserves the original data while creating a persistent audit trail.

    Args:
        df (pd.DataFrame): The DataFrame to which flags will be added.

    Returns:
        pd.DataFrame: The DataFrame augmented with new boolean flag columns.
    """
    # Step 3.1: Define the financial columns and their corresponding flag names.
    cols_to_flag = {
        'PreTaxIncome_GBP': 'IsExtreme_PI',
        'TotalAssets_GBP': 'IsExtreme_TA'
    }

    # Step 3.2: Iterate through the columns to calculate Z-scores and add flags.
    for col, flag_col in cols_to_flag.items():
        # Extract the series and drop NaNs for robust statistical calculation.
        series = df[col].dropna()

        # Calculate the median and Median Absolute Deviation (MAD).
        series_median = series.median()
        # The 'scale' argument applies the consistency constant (1.4826).
        series_mad = median_abs_deviation(series, scale='normal')

        # If MAD is zero, no value can be considered an outlier.
        if series_mad > 0:
            # Calculate the robust Z-score for every data point in the column.
            # Robust Z-score = (value - median) / MAD
            robust_z_scores = (df[col] - series_median) / series_mad
            # Create the boolean flag where the absolute Z-score exceeds 5.
            df[flag_col] = robust_z_scores.abs() > 5.0
        else:
            # If MAD is 0, all non-median values would have infinite Z-score.
            # The correct behavior is to flag nothing as extreme.
            df[flag_col] = False

    # Return the DataFrame with the new flag columns.
    return df

# ==============================================================================
# Phase II, Task 4: Main Orchestrator Function
# ==============================================================================
def prepare_and_standardize_data(
    raw_df: pd.DataFrame
) -> pd.DataFrame:
    """
    Orchestrates the data preparation and standardization pipeline.

    This function executes the study's conservative data preparation methodology.
    It does not clean, impute, or alter the original financial data. Instead,
    it prepares the dataset for analysis by:
    1. Creating a safe copy and documenting missing values (preservation).
    2. Verifying high-precision dtypes for numerical accuracy (standardization).
    3. Flagging extreme statistical outliers as metadata (auditing).

    The output is a DataFrame that is structurally identical to the input but
    is verified, standardized, and augmented with auditable quality flags.

    Args:
        raw_df (pd.DataFrame): The raw input DataFrame after initial loading.

    Returns:
        pd.DataFrame: The prepared and standardized DataFrame, ready for
                      variable segmentation and analysis.
    """
    # Top-level input validation.
    if not isinstance(raw_df, pd.DataFrame):
        raise ValueError("Input 'raw_df' must be a pandas DataFrame.")

    try:
        # Step 1: Preserve missing values by creating a safe copy and a mask.
        # We will proceed with the copied DataFrame. The mask is for documentation.
        df, _ = _preserve_missing_values(df=raw_df)

        # Step 2: Standardize (verify) numerical precision.
        df = _standardize_precision(df=df)

        # Step 3: Add extreme value flags as metadata to the DataFrame.
        df = _flag_extreme_values(df=df)

    except Exception as e:
        # Catch any errors during the preparation process.
        print(f"An error occurred during data preparation and standardization: {e}")
        raise

    # Return the fully prepared and flagged DataFrame.
    return df


In [None]:
# Task 5: Variable Segmentation and Subset Creation

# ==============================================================================
# Phase II, Task 5: Helper for Pre-Tax Income Segmentation
# ==============================================================================
def _segment_pre_tax_income(df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Segments the DataFrame into positive and negative Pre-Tax Income subsets.

    This function applies boolean masking to partition the input DataFrame into
    two distinct subsets based on the sign of the 'PreTaxIncome_GBP' column.
    It strictly uses '>' and '<' operators to correctly exclude zero-income
    observations from both subsets, as required for Benford's Law analysis.

    Args:
        df (pd.DataFrame): The input DataFrame, which should already include
                           the PI/TA ratio if needed in the subsets.

    Returns:
        Tuple[pd.DataFrame, pd.DataFrame]: A tuple containing:
            - A DataFrame with only positive Pre-Tax Income observations (PI+).
            - A DataFrame with only negative Pre-Tax Income observations (PI-).
    """
    # Step 1.1: Create the PI(+) subset by filtering for PreTaxIncome_GBP > 0.
    # This vectorized boolean indexing is highly efficient.
    # NaNs in the column will evaluate to False and be correctly excluded.
    pi_positive_df = df[df['PreTaxIncome_GBP'] > 0].copy()

    # Step 1.2: Create the PI(-) subset by filtering for PreTaxIncome_GBP < 0.
    pi_negative_df = df[df['PreTaxIncome_GBP'] < 0].copy()

    # Step 1.3: Verification of subset sizes against study expectations.
    # Source: p. 8, Table 1, N values. PI(+) ~5608, PI(-) ~1160.
    # Using a tolerance to account for minor data differences.
    assert 5500 < len(pi_positive_df) < 5700, \
        f"PI(+) subset has {len(pi_positive_df)} rows, expected ~5608."
    assert 1100 < len(pi_negative_df) < 1200, \
        f"PI(-) subset has {len(pi_negative_df)} rows, expected ~1160."

    # Return the two generated subsets.
    return pi_positive_df, pi_negative_df

# ==============================================================================
# Phase II, Task 5: Helper for Ratio Variable Construction
# ==============================================================================
def _construct_ratio_variable(df: pd.DataFrame) -> pd.DataFrame:
    """
    Constructs the Pre-Tax Income to Total Assets (PI/TA) ratio variable.

    This function calculates the PI/TA ratio in a robust manner. It uses
    vectorized operations for efficiency and includes explicit handling of
    division-by-zero edge cases by replacing resulting infinite values with NaN.
    NaN propagation is handled automatically by pandas.

    Args:
        df (pd.DataFrame): The DataFrame containing 'PreTaxIncome_GBP' and
                           'TotalAssets_GBP' columns.

    Returns:
        pd.DataFrame: The DataFrame with the new 'PI_TA_Ratio' column added.
    """
    # Step 2.1: Calculate the PI/TA ratio using vectorized division.
    # This operation correctly propagates NaNs.
    df['PI_TA_Ratio'] = df['PreTaxIncome_GBP'] / df['TotalAssets_GBP']

    # Step 2.2: Handle the edge case of division by zero.
    # Pandas/Numpy division by zero results in np.inf or -np.inf.
    # These must be replaced with np.nan to be excluded from digit analysis.
    df.replace([np.inf, -np.inf], np.nan, inplace=True)

    # Step 2.3: Verification of the number of valid ratios.
    # Source: p. 8, Table 1, N value for PI/TA is 6765.
    valid_ratios = df['PI_TA_Ratio'].count()
    assert 6700 < valid_ratios < 6800, \
        f"Expected ~6765 valid PI/TA ratios, but calculated {valid_ratios}."

    # Return the DataFrame with the new column.
    return df

# ==============================================================================
# Phase II, Task 5: Main Orchestrator Function
# ==============================================================================
def create_analytical_variables(
    prepared_df: pd.DataFrame
) -> Dict[str, pd.DataFrame]:
    """
    Orchestrates the creation of all datasets required for the analysis.

    This function takes the prepared and standardized DataFrame and generates
    all necessary analytical variables and subsets. The process is:
    1. Construct the PI/TA ratio variable on the main DataFrame.
    2. Segment the augmented DataFrame into PI(+) and PI(-) subsets.
    3. Organize all resulting DataFrames (full, PI+, PI-) into a
       dictionary for clear, structured access in subsequent pipeline stages.

    Args:
        prepared_df (pd.DataFrame): The fully prepared, standardized, and
                                    flagged DataFrame from Task 4.

    Returns:
        Dict[str, pd.DataFrame]: A dictionary where keys are descriptive names
            ('full_data', 'pi_positive', 'pi_negative') and values are the
            corresponding pandas DataFrames, ready for analysis.
    """
    # Top-level input validation.
    if not isinstance(prepared_df, pd.DataFrame):
        raise ValueError("Input 'prepared_df' must be a pandas DataFrame.")

    try:
        # Step 1: Construct the PI/TA ratio variable.
        # This modifies the DataFrame by adding a new column.
        df_with_ratio = _construct_ratio_variable(df=prepared_df)

        # Step 2: Segment the augmented DataFrame into PI(+) and PI(-) subsets.
        # These subsets will now correctly contain the 'PI_TA_Ratio' column.
        pi_positive_df, pi_negative_df = _segment_pre_tax_income(df=df_with_ratio)

        # Step 3: Organize the final DataFrames into a structured dictionary.
        # This provides a clean, modular output for the rest of the pipeline.
        analytical_datasets = {
            'full_data': df_with_ratio,
            'pi_positive': pi_positive_df,
            'pi_negative': pi_negative_df
        }

    except Exception as e:
        # Catch any errors during the variable creation process.
        print(f"An error occurred during analytical variable creation: {e}")
        raise

    # Return the dictionary of analytical datasets.
    return analytical_datasets


In [None]:
# Task 6: Data Quality Metrics and Summary Statistics

# ==============================================================================
# Phase II, Task 6: Helper for Descriptive Statistics Calculation
# ==============================================================================
def _calculate_descriptive_statistics(series: pd.Series) -> Dict[str, float]:
    """
    Calculates a comprehensive set of descriptive statistics for a data series.

    This function computes all statistical metrics required to replicate Table 1
    of the source paper. It uses pandas for basic, optimized calculations and
    scipy.stats for higher-order moments to ensure control over bias correction,
    aiming for the highest possible fidelity to standard statistical software.

    Args:
        series (pd.Series): The data series for which to calculate statistics.

    Returns:
        Dict[str, float]: A dictionary containing the calculated statistical
                          metrics, including N, min, max, mean, std, skewness,
                          kurtosis, and the coefficient of variation (CV).
    """
    # Drop NA values for accurate statistical computation.
    series_clean = series.dropna()

    # Handle the case of an empty series after dropping NaNs.
    if series_clean.empty:
        return {
            'N': 0, 'min': np.nan, 'max': np.nan, 'Mean': np.nan,
            'St. Dev.': np.nan, 'Skew.': np.nan, 'Kurt.': np.nan, 'CV': np.nan
        }

    # Step 1.1: Calculate basic statistics using pandas' optimized methods.
    stats = {
        'N': series_clean.count(),
        'min': series_clean.min(),
        'max': series_clean.max(),
        'Mean': series_clean.mean(),
        'St. Dev.': series_clean.std()
    }

    # Step 1.2: Calculate higher-order moments using scipy for precision.
    # `bias=False` provides the sample skewness and kurtosis.
    stats['Skew.'] = skew(series_clean, bias=False)
    # Fisher's definition of kurtosis (normal = 0) is used by default.
    stats['Kurt.'] = kurtosis(series_clean, fisher=True, bias=False)

    # Step 1.3: Calculate the Coefficient of Variation (CV).
    # CV = Standard Deviation / |Mean|
    # Using abs(mean) makes the CV a meaningful measure of relative variability
    # even for data with a negative mean (like PI-).
    mean_val = stats['Mean']
    std_val = stats['St. Dev.']
    if mean_val != 0:
        stats['CV'] = std_val / abs(mean_val)
    else:
        stats['CV'] = np.inf # Or np.nan, depending on convention for zero mean.

    return stats

# ==============================================================================
# Phase II, Task 6: Helper for Formatting the Summary Table
# ==============================================================================
def _format_summary_table(stats_dict: Dict[str, Dict[str, float]]) -> pd.DataFrame:
    """
    Formats the calculated statistics into a presentation-ready DataFrame.

    This function takes the raw dictionary of statistics and constructs a
    pandas DataFrame that meticulously replicates the structure, ordering, and
    numerical formatting of Table 1 from the source paper.

    Args:
        stats_dict (Dict[str, Dict[str, float]]): A nested dictionary where
            outer keys are variable names and inner keys are statistic names.

    Returns:
        pd.DataFrame: A formatted DataFrame ready for display or publication.
    """
    # Step 2.1: Convert the dictionary to a DataFrame.
    summary_df = pd.DataFrame.from_dict(stats_dict, orient='index')

    # Step 2.2: Define the exact column order as in Table 1.
    column_order = ['N', 'min', 'max', 'Mean', 'St. Dev.', 'Skew.', 'Kurt.', 'CV']
    summary_df = summary_df[column_order]

    # Step 2.3: Apply specific rounding rules to match the paper's presentation.
    # This is a critical step for high-fidelity replication.
    rounding_rules = {
        'N': 0,
        'min': 5,
        'max': 5,
        'Mean': 5,
        'St. Dev.': 5,
        'Skew.': 5,
        'Kurt.': 5,
        'CV': 5
    }

    # Apply special formatting for large integer values as seen in the paper.
    for col in ['Mean', 'St. Dev.']:
        # For PI and TA, round to integer.
        summary_df.loc[['PI', 'TA'], col] = summary_df.loc[['PI', 'TA'], col].round(0).astype(int)

    # Apply general rounding rules.
    summary_df = summary_df.round(rounding_rules)

    return summary_df

# ==============================================================================
# Phase II, Task 6: Main Orchestrator Function
# ==============================================================================
def generate_summary_statistics(
    analytical_datasets: Dict[str, pd.DataFrame]
) -> pd.DataFrame:
    """
    Orchestrates the calculation and formatting of summary statistics.

    This function serves as the main entry point for replicating Table 1 from
    the study. It processes the dictionary of analytical datasets, calculates
    descriptive statistics for each required variable and subset, and then
    formats these results into a single, publication-quality DataFrame that
    mirrors the source table.

    Args:
        analytical_datasets (Dict[str, pd.DataFrame]): A dictionary containing
            the 'full_data', 'pi_positive', and 'pi_negative' DataFrames.

    Returns:
        pd.DataFrame: A formatted DataFrame containing the descriptive
                      statistics, replicating Table 1 of the paper.
    """
    # Top-level input validation.
    if not isinstance(analytical_datasets, dict):
        raise ValueError("Input 'analytical_datasets' must be a dictionary.")

    # Step 3.1: Define the mapping from the desired output rows to the source data.
    # This map is the blueprint for the entire calculation process.
    variable_map = {
        'PI': ('full_data', 'PreTaxIncome_GBP'),
        'TA': ('full_data', 'TotalAssets_GBP'),
        'PI(-)': ('pi_negative', 'PreTaxIncome_GBP'),
        'PI(+)': ('pi_positive', 'PreTaxIncome_GBP'),
        'PI/TA': ('full_data', 'PI_TA_Ratio'),
        'PI(-)/TA': ('pi_negative', 'PI_TA_Ratio'),
        'PI(+)/TA': ('pi_positive', 'PI_TA_Ratio')
    }

    # Initialize a dictionary to hold the raw statistical results.
    all_stats = {}

    try:
        # Step 3.2: Iterate through the map to calculate statistics for each variable.
        for var_name, (df_key, col_name) in variable_map.items():
            # Select the correct DataFrame and Series.
            target_df = analytical_datasets[df_key]
            target_series = target_df[col_name]

            # Calculate the descriptive statistics for the selected series.
            all_stats[var_name] = _calculate_descriptive_statistics(series=target_series)

        # Step 3.3: Format the aggregated statistics into the final table.
        final_table = _format_summary_table(stats_dict=all_stats)

    except KeyError as e:
        print(f"Data generation failed: Missing expected key {e} in datasets or columns.")
        raise
    except Exception as e:
        print(f"An unexpected error occurred during statistics generation: {e}")
        raise

    return final_table


In [None]:
# Task 7: First Significant Digit Extraction?

# ==============================================================================
# Phase III, Task 7: Helper for Preprocessing Data for Extraction
# ==============================================================================
def _preprocess_for_digit_extraction(series: pd.Series) -> pd.Series:
    """
    Prepares a data series for significant digit extraction.

    This function takes a raw data series and performs the necessary
    preprocessing steps:
    1. Takes the absolute value of all numbers.
    2. Filters out all non-positive (<= 0) and non-finite (NaN, inf) values,
       for which significant digits are undefined.

    Args:
        series (pd.Series): The raw numerical data series.

    Returns:
        pd.Series: A clean series containing only positive, finite numbers,
                   with its original index preserved for later alignment.
    """
    # Step 1.1: Take the absolute value to work with magnitudes.
    abs_series = series.abs()

    # Step 1.2: Create a boolean mask to identify valid entries.
    # A valid entry must be both finite (not NaN or inf) and strictly positive.
    valid_mask = np.isfinite(abs_series) & (abs_series > 0)

    # Step 1.3: Return the filtered series containing only valid numbers.
    return abs_series[valid_mask]

# ==============================================================================
# Phase III, Task 7: Helper for Logarithmic Digit Extraction
# ==============================================================================
def _extract_first_digit_logarithmic(series: pd.Series) -> pd.Series:
    """
    Extracts the first significant digit using a robust logarithmic method.

    This function implements the mathematically sound formula for first-digit
    extraction: d = floor(10^(log10(x) mod 1)). This approach is vectorized,
    computationally efficient, and robust across numbers of any scale.

    Args:
        series (pd.Series): A preprocessed series containing only positive,
                            finite numbers.

    Returns:
        pd.Series: A series of the same index containing the extracted first
                   significant digits as integers.
    """
    # Step 2.1: Calculate log base 10 of the series.
    # This is the core of the scale-invariance property.
    log10_series = np.log10(series)

    # Step 2.2: Isolate the fractional part of the logarithm using the modulo operator.
    # The fractional part (mantissa) contains the information about the significant digits.
    mantissa = log10_series % 1

    # Step 2.3: Raise 10 to the power of the mantissa.
    # This yields a number between 1 (inclusive) and 10 (exclusive).
    # The integer part of this result is the first significant digit.
    power_of_10 = 10 ** mantissa

    # Step 2.4: Take the floor of the result and cast to an integer.
    # This isolates the first significant digit.
    first_digits = np.floor(power_of_10).astype(int)

    return first_digits

# ==============================================================================
# Phase III, Task 7: Main Orchestrator Function
# ==============================================================================
def extract_first_significant_digit(
    data_series: pd.Series
) -> pd.Series:
    """
    Orchestrates the extraction of the first significant digit from a data series.

    This function provides a complete, robust, and efficient pipeline for
    first significant digit extraction. It follows a three-step process:
    1. Preprocesses the data to isolate valid numbers for analysis.
    2. Applies a mathematically robust logarithmic algorithm to extract the digits.
    3. Re-aligns the extracted digits with the original series' index, ensuring
       that invalid inputs (e.g., zero, NaN) are represented as NaN in the
       output.

    Args:
        data_series (pd.Series): The input pandas Series of numerical data.
                                 Must be of a float or integer dtype.

    Returns:
        pd.Series: A pandas Series of the same index as the input, containing
                   the first significant digit (as float, to accommodate NaN)
                   for each valid entry, and np.nan for invalid entries.
    """
    # Top-level input validation.
    if not isinstance(data_series, pd.Series):
        raise TypeError("Input 'data_series' must be a pandas Series.")
    if not pd.api.types.is_numeric_dtype(data_series.dtype):
        raise TypeError("Input 'data_series' must have a numeric dtype.")

    try:
        # Step 1: Preprocess the series to get only positive, finite values.
        # The index of `valid_series` is a subset of `data_series.index`.
        valid_series = _preprocess_for_digit_extraction(series=data_series)

        # If there are no valid numbers, return a series of NaNs.
        if valid_series.empty:
            return pd.Series(np.nan, index=data_series.index, dtype=np.float64)

        # Step 2: Extract the first digits from the valid, preprocessed data.
        # The index of `extracted_digits` matches the index of `valid_series`.
        extracted_digits = _extract_first_digit_logarithmic(series=valid_series)

        # Step 3: Align the results back to the original series' index.
        # .reindex() ensures the output has the same shape and index as the input.
        # Entries in the original series that were not valid (e.g., 0, NaN)
        # will have a value of NaN in the final result.
        # The dtype is set to float64 to accommodate NaN values.
        result_series = extracted_digits.reindex(data_series.index).astype(np.float64)

        # Final validation: ensure all non-NaN digits are in the range [1, 9].
        assert result_series.dropna().isin(range(1, 10)).all(), \
            "Post-validation failed: Extracted digits found outside the valid range [1, 9]."

    except Exception as e:
        # Catch any unexpected errors during the extraction process.
        print(f"An error occurred during first digit extraction: {e}")
        raise

    return result_series


In [None]:
# Task 8: Second Significant Digit Extraction

# ==============================================================================
# Phase III, Task 8: Helper for Hybrid Second Digit Extraction
# ==============================================================================
def _extract_second_digit_hybrid(series: pd.Series) -> pd.Series:
    """
    Extracts the second significant digit using a robust mathematical method.

    This function implements a mathematically sound, vectorized algorithm to
    isolate the second significant digit. The core logic normalizes each number
    to isolate its first two significant digits as an integer, then uses the
    modulo operator to extract the second digit.

    The formula for the first two digits (d1d2) of a number x is:
    d1d2 = floor(x / 10^(floor(log10(x)) - 1))
    The second digit is then simply: d2 = d1d2 % 10

    Args:
        series (pd.Series): A preprocessed series containing only positive,
                            finite numbers.

    Returns:
        pd.Series: A series of the same index containing the extracted second
                   significant digits (0-9) as integers.
    """
    # Step 1.1: Calculate the exponent needed to normalize the number.
    # `np.floor(np.log10(series))` gives the order of magnitude.
    # Subtracting 1 scales the number so the first two digits are left of the decimal.
    exponent = np.floor(np.log10(series)) - 1

    # Step 1.2: Calculate the divisor for normalization.
    divisor = 10 ** exponent

    # Step 1.3: Normalize the series and take the floor to get the first two digits as an integer.
    # For x=12345, exponent=3, divisor=1000, normalized=12.345, first_two_digits=12.
    # For x=0.0789, exponent=-3, divisor=0.001, normalized=78.9, first_two_digits=78.
    first_two_digits = np.floor(series / divisor)

    # Step 1.4: Extract the second digit using the modulo operator.
    # For 12, 12 % 10 = 2.
    # For 78, 78 % 10 = 8.
    # For single-digit numbers like 5 (which becomes 50), 50 % 10 = 0.
    second_digits = (first_two_digits % 10).astype(int)

    return second_digits

# ==============================================================================
# Phase III, Task 8: Main Orchestrator Function
# ==============================================================================
def extract_second_significant_digit(
    data_series: pd.Series
) -> pd.Series:
    """
    Orchestrates the extraction of the second significant digit from a data series.

    This function provides a complete, robust, and efficient pipeline for
    second significant digit extraction. It follows a three-step process:
    1. Reuses the robust preprocessing logic to isolate valid numbers.
    2. Applies a mathematically sound hybrid algorithm to extract the digits.
    3. Re-aligns the extracted digits with the original series' index, ensuring
       that invalid inputs (e.g., zero, NaN) are represented as NaN in the
       output.

    Args:
        data_series (pd.Series): The input pandas Series of numerical data.
                                 Must be of a float or integer dtype.

    Returns:
        pd.Series: A pandas Series of the same index as the input, containing
                   the second significant digit (as float, to accommodate NaN)
                   for each valid entry, and np.nan for invalid entries.
    """
    # Top-level input validation.
    if not isinstance(data_series, pd.Series):
        raise TypeError("Input 'data_series' must be a pandas Series.")
    if not pd.api.types.is_numeric_dtype(data_series.dtype):
        raise TypeError("Input 'data_series' must have a numeric dtype.")

    try:
        # Step 1: Preprocess the series to get only positive, finite values.
        # This reuses the exact same robust helper from the first-digit extraction.
        valid_series = _preprocess_for_digit_extraction(series=data_series)

        # If there are no valid numbers, return a series of NaNs.
        if valid_series.empty:
            return pd.Series(np.nan, index=data_series.index, dtype=np.float64)

        # Step 2: Extract the second digits from the valid, preprocessed data.
        extracted_digits = _extract_second_digit_hybrid(series=valid_series)

        # Step 3: Align the results back to the original series' index.
        # This ensures the output is conformable with the original DataFrame.
        # Entries that were invalid will correctly be assigned NaN.
        result_series = extracted_digits.reindex(data_series.index).astype(np.float64)

        # Final validation: ensure all non-NaN digits are in the range [0, 9].
        assert result_series.dropna().isin(range(10)).all(), \
            "Post-validation failed: Extracted digits found outside the valid range [0, 9]."

    except Exception as e:
        # Catch any unexpected errors during the extraction process.
        print(f"An error occurred during second digit extraction: {e}")
        raise

    return result_series


In [None]:
# Task 9: First-Two Digit Combination Construction

# ==============================================================================
# Phase III, Task 9: Helper for Direct First-Two Digit Extraction
# ==============================================================================
def _extract_first_two_digits_direct(series: pd.Series) -> pd.Series:
    """
    Extracts the first-two significant digits directly using a mathematical method.

    This function implements the robust, vectorized formula to isolate the
    integer formed by the first two significant digits of each number.

    Formula: d1d2 = floor(x / 10^(floor(log10(x)) - 1))

    After calculation, it filters the results to exclude any numbers that have
    fewer than two significant digits (i.e., results < 10).

    Args:
        series (pd.Series): A preprocessed series containing only positive,
                            finite numbers.

    Returns:
        pd.Series: A series containing the extracted first-two significant
                   digits (10-99) as integers. The index is preserved from
                   the input series, but rows corresponding to single-digit
                   numbers are dropped.
    """
    # Step 1.1: Calculate the exponent for normalization, identical to Task 8.
    exponent = np.floor(np.log10(series)) - 1

    # Step 1.2: Calculate the divisor.
    divisor = 10 ** exponent

    # Step 1.3: Normalize and take the floor to get the first two digits.
    first_two_digits = np.floor(series / divisor).astype(int)

    # Step 1.4: Filter out results that are less than 10.
    # These correspond to original numbers with only one significant digit.
    # For example, for x=500, first_two_digits=50. For x=5, first_two_digits=5.
    # We must exclude the latter for a valid BL12 analysis.
    valid_first_two_digits = first_two_digits[first_two_digits >= 10]

    return valid_first_two_digits

# ==============================================================================
# Phase III, Task 9: Main Orchestrator Function
# ==============================================================================
def extract_first_two_significant_digits(
    data_series: pd.Series
) -> pd.Series:
    """
    Orchestrates the extraction of the first-two significant digits.

    This function provides a complete pipeline for first-two significant digit
    extraction (for BL12 analysis). It follows a robust process:
    1. Preprocesses the data to isolate valid numbers for analysis.
    2. Applies a direct mathematical algorithm to extract the two-digit integer.
    3. Filters out results from numbers with fewer than two significant digits.
    4. Re-aligns the final digits with the original series' index, ensuring
       invalid inputs are represented as NaN.

    Args:
        data_series (pd.Series): The input pandas Series of numerical data.
                                 Must be of a float or integer dtype.

    Returns:
        pd.Series: A pandas Series of the same index as the input, containing
                   the first-two significant digits (as float, to accommodate
                   NaN) for each valid entry, and np.nan otherwise.
    """
    # Top-level input validation.
    if not isinstance(data_series, pd.Series):
        raise TypeError("Input 'data_series' must be a pandas Series.")
    if not pd.api.types.is_numeric_dtype(data_series.dtype):
        raise TypeError("Input 'data_series' must have a numeric dtype.")

    try:
        # Step 1: Preprocess the series to get only positive, finite values.
        valid_series = _preprocess_for_digit_extraction(series=data_series)

        # If there are no valid numbers, return a series of NaNs.
        if valid_series.empty:
            return pd.Series(np.nan, index=data_series.index, dtype=np.float64)

        # Step 2: Extract the first-two digits from the valid data.
        # This result is already filtered to include only values >= 10.
        extracted_digits = _extract_first_two_digits_direct(series=valid_series)

        # Step 3: Align the results back to the original series' index.
        # This correctly assigns NaN to invalid inputs (0, NaN) AND to numbers
        # with only one significant digit.
        result_series = extracted_digits.reindex(data_series.index).astype(np.float64)

        # Final validation: ensure all non-NaN digits are in the range [10, 99].
        assert result_series.dropna().isin(range(10, 100)).all(), \
            "Post-validation failed: Extracted digits found outside the valid range [10, 99]."

    except Exception as e:
        # Catch any unexpected errors during the extraction process.
        print(f"An error occurred during first-two digit extraction: {e}")
        raise

    return result_series


In [None]:
# Task 10: BL1 Probability Distribution Generation

# ==============================================================================
# Phase IV, Task 10: Helper for BL1 Probability Calculation
# ==============================================================================
def _calculate_bl1_probabilities() -> pd.Series:
    """
    Calculates the theoretical probabilities for Benford's First-Digit Law (BL1).

    This function implements Equation (1) from the source paper to compute the
    expected probability for each leading digit from 1 to 9. The calculation
    is performed using vectorized numpy operations for efficiency and precision.

    Equation (1): P(d1 = i) = log10(1 + 1/i)

    Returns:
        pd.Series: A pandas Series containing the theoretical probabilities,
                   indexed by the digits 1 through 9.
    """
    # Step 1.1: Create an array representing the possible first digits (i).
    digits = np.arange(1, 10, dtype=np.float64)

    # Step 1.2: Apply Equation (1) in a vectorized manner.
    # np.log10 is used for direct, high-precision base-10 logarithm calculation.
    probabilities = np.log10(1 + (1 / digits))

    # Step 1.3: Structure the result as a pandas Series for clarity and usability.
    # The index is set to the digits themselves.
    bl1_probs = pd.Series(probabilities, index=pd.Index(digits.astype(int), name='Digit'))

    return bl1_probs

# ==============================================================================
# Phase IV, Task 10: Generic Helper for Distribution Validation
# ==============================================================================
def _validate_probability_distribution(dist: pd.Series) -> bool:
    """
    Validates the mathematical integrity of a calculated probability distribution.

    This function performs two fundamental checks on a distribution:
    1. Ensures all probabilities are non-negative.
    2. Ensures the sum of all probabilities is equal to 1.0 within a tight
       floating-point tolerance.

    Args:
        dist (pd.Series): The probability distribution to validate.

    Returns:
        bool: True if the distribution is valid.

    Raises:
        AssertionError: If the distribution fails either of the validation checks.
    """
    # Step 2.1: Check for non-negativity.
    assert (dist >= 0).all(), "Validation failed: Probabilities cannot be negative."

    # Step 2.2: Check if the probabilities sum to 1.0.
    # math.isclose is used for robust floating-point comparison.
    # A relative tolerance of 1e-9 is a standard for high-precision checks.
    assert math.isclose(dist.sum(), 1.0, rel_tol=1e-9), \
        f"Validation failed: Probabilities must sum to 1.0, but sum to {dist.sum()}."

    return True

# ==============================================================================
# Phase IV, Task 10: Main Orchestrator Function
# ==============================================================================
def generate_bl1_distribution(
    sample_size: int
) -> Tuple[pd.Series, pd.Series]:
    """
    Generates the theoretical BL1 probability and expected frequency distributions.

    This function orchestrates the complete process for generating the BL1
    theoretical benchmarks for a given sample size. It:
    1. Calculates the fundamental BL1 probabilities using a high-precision method.
    2. Validates the mathematical integrity of the calculated probability distribution.
    3. Computes the expected frequencies for the specified sample size.

    Args:
        sample_size (int): The total number of valid observations in the
                           empirical sample to be tested.

    Returns:
        Tuple[pd.Series, pd.Series]: A tuple containing:
            - A Series of the theoretical BL1 probabilities.
            - A Series of the expected frequencies for the given sample size.
            Both are indexed by the digits 1 through 9.
    """
    # Top-level input validation.
    if not isinstance(sample_size, int) or sample_size < 0:
        raise ValueError("Input 'sample_size' must be a non-negative integer.")

    try:
        # Step 1: Calculate the theoretical BL1 probabilities.
        bl1_probabilities = _calculate_bl1_probabilities()

        # Step 2: Validate the calculated probability distribution.
        _validate_probability_distribution(dist=bl1_probabilities)

        # Step 3: Calculate the expected frequencies for the given sample size.
        # This is a simple element-wise multiplication.
        expected_frequencies = bl1_probabilities * sample_size

    except Exception as e:
        # Catch any unexpected errors during the generation process.
        print(f"An error occurred during BL1 distribution generation: {e}")
        raise

    return bl1_probabilities, expected_frequencies


In [None]:
# Task 11: BL2 Probability Distribution Generation

# ==============================================================================
# Phase IV, Task 11: Helper for BL2 Probability Calculation
# ==============================================================================
def _calculate_bl2_probabilities() -> pd.Series:
    """
    Calculates the theoretical probabilities for Benford's Second-Digit Law (BL2).

    This function implements the nested summation from Equation (3) of the
    source paper. It uses numpy broadcasting to perform the calculation in a
    fully vectorized manner, which is highly efficient and avoids explicit
    Python loops.

    Equation (3): P(d2 = j) = sum_{k=1 to 9} log10(1 + 1/(10k + j))

    Returns:
        pd.Series: A pandas Series containing the theoretical probabilities for
                   the second digit (0-9), indexed by the digits themselves.
    """
    # Step 1.1: Define the ranges for the second digit (j) and the summation index (k).
    # j represents the second digit, from 0 to 9.
    # k is the summation index, representing the first digit, from 1 to 9.
    j_digits = np.arange(0, 10, dtype=np.float64).reshape(-1, 1) # Shape (10, 1)
    k_digits = np.arange(1, 10, dtype=np.float64).reshape(1, -1) # Shape (1, 9)

    # Step 1.2: Use numpy broadcasting to create a 2D grid of (10k + j) values.
    # The addition of a (10, 1) vector and a (1, 9) vector results in a
    # (10, 9) matrix where each element is (10k + j).
    denominator_grid = 10 * k_digits + j_digits

    # Step 1.3: Apply the inner part of the formula to the entire grid at once.
    # This calculates log10(1 + 1/(10k + j)) for all combinations of j and k.
    log_terms_grid = np.log10(1 + (1 / denominator_grid))

    # Step 1.4: Sum the terms along the k-axis (axis=1) to get the final probability for each j.
    # This performs the summation Σ_{k=1 to 9} for each second digit j.
    probabilities = log_terms_grid.sum(axis=1)

    # Step 1.5: Structure the result as a pandas Series for clarity and usability.
    bl2_probs = pd.Series(probabilities, index=pd.Index(j_digits.flatten().astype(int), name='Digit'))

    return bl2_probs

# ==============================================================================
# Phase IV, Task 11: Main Orchestrator Function
# ==============================================================================
def generate_bl2_distribution(
    sample_size: int
) -> Tuple[pd.Series, pd.Series]:
    """
    Generates the theoretical BL2 probability and expected frequency distributions.

    This function orchestrates the complete process for generating the BL2
    theoretical benchmarks for a given sample size. It:
    1. Calculates the fundamental BL2 probabilities using a vectorized summation.
    2. Validates the mathematical integrity of the resulting probability distribution.
    3. Computes the expected frequencies for the specified sample size.

    Args:
        sample_size (int): The total number of valid observations in the
                           empirical sample to be tested.

    Returns:
        Tuple[pd.Series, pd.Series]: A tuple containing:
            - A Series of the theoretical BL2 probabilities.
            - A Series of the expected frequencies for the given sample size.
            Both are indexed by the digits 0 through 9.
    """
    # Top-level input validation.
    if not isinstance(sample_size, int) or sample_size < 0:
        raise ValueError("Input 'sample_size' must be a non-negative integer.")

    try:
        # Step 1: Calculate the theoretical BL2 probabilities.
        bl2_probabilities = _calculate_bl2_probabilities()

        # Step 2: Validate the calculated probability distribution using the generic helper.
        _validate_probability_distribution(dist=bl2_probabilities)

        # Step 3: Calculate the expected frequencies for the given sample size.
        expected_frequencies = bl2_probabilities * sample_size

    except Exception as e:
        # Catch any unexpected errors during the generation process.
        print(f"An error occurred during BL2 distribution generation: {e}")
        raise

    return bl2_probabilities, expected_frequencies


In [None]:
# Task 12: BL12 Probability Distribution Generation

# ==============================================================================
# Phase IV, Task 12: Helper for BL12 Probability Calculation
# ==============================================================================
def _calculate_bl12_probabilities() -> pd.Series:
    """
    Calculates the theoretical probabilities for Benford's First-Two-Digit Law (BL12).

    This function implements Equation (2) from the source paper to compute the
    expected probability for each leading two-digit combination from 10 to 99.
    The calculation is a direct, vectorized application of the formula.

    Equation (2): P12(d1d2 = ij) = log10(1 + 1/ij)

    Returns:
        pd.Series: A pandas Series containing the 90 theoretical probabilities,
                   indexed by the integers 10 through 99.
    """
    # Step 1.1: Create an array representing the possible first-two digits (ij).
    # The range is from 10 to 99, inclusive.
    digits = np.arange(10, 100, dtype=np.float64)

    # Step 1.2: Apply Equation (2) in a vectorized manner.
    # This is mathematically analogous to the BL1 calculation.
    probabilities = np.log10(1 + (1 / digits))

    # Step 1.3: Structure the result as a pandas Series for clarity and usability.
    bl12_probs = pd.Series(probabilities, index=pd.Index(digits.astype(int), name='Digits'))

    return bl12_probs

# ==============================================================================
# Phase IV, Task 12: Main Orchestrator Function
# ==============================================================================
def generate_bl12_distribution(
    sample_size: int
) -> Tuple[pd.Series, pd.Series]:
    """
    Generates the theoretical BL12 probability and expected frequency distributions.

    This function orchestrates the complete process for generating the BL12
    theoretical benchmarks. It:
    1. Calculates the fundamental BL12 probabilities for integers 10-99.
    2. Validates the mathematical integrity of the calculated probability distribution.
    3. Computes the expected frequencies for the specified sample size.

    Args:
        sample_size (int): The total number of valid observations in the
                           empirical sample to be tested.

    Returns:
        Tuple[pd.Series, pd.Series]: A tuple containing:
            - A Series of the theoretical BL12 probabilities.
            - A Series of the expected frequencies for the given sample size.
            Both are indexed by the integers 10 through 99.
    """
    # Top-level input validation.
    if not isinstance(sample_size, int) or sample_size < 0:
        raise ValueError("Input 'sample_size' must be a non-negative integer.")

    try:
        # Step 1: Calculate the theoretical BL12 probabilities.
        bl12_probabilities = _calculate_bl12_probabilities()

        # Step 2: Validate the calculated probability distribution using the generic helper.
        _validate_probability_distribution(dist=bl12_probabilities)

        # Step 3: Calculate the expected frequencies for the given sample size.
        expected_frequencies = bl12_probabilities * sample_size

    except Exception as e:
        # Catch any unexpected errors during the generation process.
        print(f"An error occurred during BL12 distribution generation: {e}")
        raise

    return bl12_probabilities, expected_frequencies


In [None]:
# Task 13: Chi-Squared Goodness-of-Fit Testing

# ==============================================================================
# Phase V, Task 13: Helper for Preparing Frequencies
# ==============================================================================
def _prepare_frequencies_for_test(
    empirical_digits: pd.Series,
    expected_frequencies: pd.Series
) -> Tuple[pd.Series, pd.Series]:
    """
    Prepares observed and expected frequencies for a goodness-of-fit test.

    This function takes a series of empirically observed digits and a series of
    theoretically expected frequencies. It calculates the observed frequency
    counts and ensures that both the observed and expected series are perfectly
    aligned by index, with all necessary bins present (filling missing
    observed bins with zero).

    Args:
        empirical_digits (pd.Series): A series of extracted significant digits.
        expected_frequencies (pd.Series): A series of theoretically expected
                                          frequencies, indexed by digit/bin.

    Returns:
        Tuple[pd.Series, pd.Series]: A tuple containing:
            - The observed frequencies, aligned and zero-filled.
            - The expected frequencies, passed through for alignment.
    """
    # Step 1.1: Calculate the observed frequencies of the empirical digits.
    # .value_counts() efficiently tabulates the occurrences of each digit.
    observed_counts = empirical_digits.value_counts()

    # Step 1.2: Align the observed counts with the expected frequencies' index.
    # .reindex() is crucial. It ensures that bins present in the theoretical
    # distribution but absent in the empirical data are included.
    # .fillna(0) correctly assigns a count of zero to these absent bins.
    aligned_observed = observed_counts.reindex(expected_frequencies.index).fillna(0)

    # Ensure the final series are of a numeric type suitable for calculation.
    aligned_observed = aligned_observed.astype(np.float64)

    return aligned_observed, expected_frequencies

# ==============================================================================
# Phase V, Task 13: Helper for Calculating the Chi-Squared Statistic
# ==============================================================================
def _calculate_chi_squared_statistic(
    observed: pd.Series,
    expected: pd.Series
) -> float:
    """
    Calculates the Pearson's Chi-Squared (χ²) test statistic.

    This function implements Equation (5) from the source paper in a vectorized
    manner. It requires perfectly aligned series of observed and expected
    frequencies as input.

    Equation (5): χ² = Σ ( (O_i - E_i)² / E_i )

    Args:
        observed (pd.Series): The observed frequency counts (O).
        expected (pd.Series): The expected frequency counts (E).

    Returns:
        float: The calculated Chi-Squared statistic.
    """
    # Input validation: Ensure no expected frequency is zero to avoid division by zero.
    if (expected <= 0).any():
        raise ValueError("Expected frequencies must be positive for the Chi-Squared test.")

    # Step 2.1: Calculate the squared differences between observed and expected.
    squared_diff = (observed - expected) ** 2

    # Step 2.2: Divide by the expected frequencies and sum the terms.
    # This is the complete, vectorized implementation of the formula.
    chi_squared_stat = (squared_diff / expected).sum()

    return float(chi_squared_stat)

# ==============================================================================
# Phase V, Task 13: Main Orchestrator Function
# ==============================================================================
def perform_chi_squared_test(
    empirical_digits: pd.Series,
    expected_frequencies: pd.Series,
    degrees_of_freedom: int,
    critical_value: float,
    alpha: float = 0.05
) -> Dict[str, Any]:
    """
    Performs a complete Chi-Squared goodness-of-fit test.

    This function orchestrates the entire Chi-Squared testing process. It:
    1. Prepares and aligns the observed and expected frequencies.
    2. Calculates the Chi-Squared test statistic.
    3. Calculates the p-value for the test statistic.
    4. Performs the hypothesis test by comparing the statistic to the given
       critical value.
    5. Returns a structured dictionary containing all test results.

    Args:
        empirical_digits (pd.Series): The raw series of extracted digits.
        expected_frequencies (pd.Series): The theoretical expected frequencies.
        degrees_of_freedom (int): The degrees of freedom for the test (K-1).
        critical_value (float): The critical value for the test at the given
                                alpha level.
        alpha (float): The significance level for the hypothesis test.

    Returns:
        Dict[str, Any]: A dictionary containing the test statistic, p-value,
                        degrees of freedom, critical value, alpha, and a
                        boolean indicating if the null hypothesis is rejected.
    """
    # Top-level input validation.
    if empirical_digits.dropna().empty:
        return {
            'chi_squared_stat': np.nan, 'p_value': np.nan,
            'dof': degrees_of_freedom, 'critical_value': critical_value,
            'alpha': alpha, 'reject_null': False,
            'message': "Test not performed due to empty empirical data."
        }

    try:
        # Step 1: Prepare and align the frequency distributions.
        observed, expected = _prepare_frequencies_for_test(
            empirical_digits=empirical_digits,
            expected_frequencies=expected_frequencies
        )

        # Step 2: Calculate the Chi-Squared test statistic.
        chi2_stat = _calculate_chi_squared_statistic(
            observed=observed,
            expected=expected
        )

        # Step 3 (enhancement): Calculate the p-value for completeness.
        # The p-value is the probability of observing a statistic as extreme or more
        # extreme than the one calculated, given the null hypothesis is true.
        # We use the survival function (1 - cdf) of the chi2 distribution.
        p_value = chi2.sf(chi2_stat, degrees_of_freedom)

        # Step 4: Perform the hypothesis test.
        # The null hypothesis (H0) is that the data conforms to Benford's Law.
        # We reject H0 if the calculated statistic exceeds the critical value.
        reject_null = chi2_stat > critical_value

    except Exception as e:
        print(f"An error occurred during the Chi-Squared test: {e}")
        raise

    # Step 5: Compile and return the structured results.
    return {
        'chi_squared_stat': chi2_stat,
        'p_value': p_value,
        'dof': degrees_of_freedom,
        'critical_value': critical_value,
        'alpha': alpha,
        'reject_null': reject_null
    }


In [None]:
# Task 14: Mean Absolute Deviation Testing

# ==============================================================================
# Phase V, Task 14: Helper for Preparing Proportions
# ==============================================================================
def _prepare_proportions_for_test(
    empirical_digits: pd.Series,
    theoretical_probabilities: pd.Series
) -> Tuple[pd.Series, pd.Series]:
    """
    Prepares observed and theoretical proportions for the MAD test.

    This function calculates the observed relative frequencies (proportions) from
    the empirical digit data. It ensures perfect alignment with the theoretical
    probability distribution by re-indexing and filling missing bins with zero,
    making the two series directly comparable.

    Args:
        empirical_digits (pd.Series): A series of extracted significant digits.
        theoretical_probabilities (pd.Series): A series of theoretical
                                               probabilities, indexed by digit/bin.

    Returns:
        Tuple[pd.Series, pd.Series]: A tuple containing:
            - The observed proportions, aligned and zero-filled.
            - The theoretical probabilities, passed through for alignment.
    """
    # Step 1.1: Count the total number of valid (non-NaN) observations.
    n_observations = empirical_digits.count()
    if n_observations == 0:
        # If no data, return zero-filled proportions aligned to the theoretical index.
        zero_proportions = pd.Series(0.0, index=theoretical_probabilities.index)
        return zero_proportions, theoretical_probabilities

    # Step 1.2: Calculate observed frequencies and convert to proportions.
    observed_proportions = empirical_digits.value_counts() / n_observations

    # Step 1.3: Align the observed proportions with the theoretical distribution's index.
    # This is critical to ensure bins with zero observations are included in the calculation.
    aligned_observed = observed_proportions.reindex(theoretical_probabilities.index).fillna(0)

    return aligned_observed, theoretical_probabilities

# ==============================================================================
# Phase V, Task 14: Helper for Calculating the MAD Statistic
# ==============================================================================
def _calculate_mad_statistic(
    observed_proportions: pd.Series,
    theoretical_probabilities: pd.Series
) -> float:
    """
    Calculates the Mean Absolute Deviation (MAD) test statistic.

    This function implements Equation (6) from the source paper. Note that the
    paper's formula represents the SUM of absolute deviations, not the mean.
    This implementation adheres strictly to the formula as written.

    Equation (6): MAD = Σ |f_o - f_e|

    Args:
        observed_proportions (pd.Series): The observed relative frequencies (f_o).
        theoretical_probabilities (pd.Series): The theoretical probabilities (f_e).

    Returns:
        float: The calculated MAD statistic.
    """
    # Step 2.1: Calculate the absolute differences between observed and theoretical proportions.
    absolute_deviations = (observed_proportions - theoretical_probabilities).abs()

    # Step 2.2: Sum the absolute deviations to get the final statistic.
    mad_stat = absolute_deviations.sum()

    return float(mad_stat)

# ==============================================================================
# Phase V, Task 14: Helper for Classifying MAD Conformity
# ==============================================================================
def _classify_mad_conformity(
    mad_statistic: float,
    thresholds: Dict[str, float]
) -> str:
    """
    Classifies the MAD statistic into a conformity level based on given thresholds.

    This function applies the decision rules specified on page 13 of the paper
    to categorize the level of conformity. It handles both single-level and
    multi-level threshold structures.

    Args:
        mad_statistic (float): The calculated MAD statistic.
        thresholds (Dict[str, float]): A dictionary of conformity thresholds,
            e.g., {'close_conformity': 0.006, 'acceptable_conformity': 0.012}.

    Returns:
        str: A string describing the level of conformity.
    """
    # Step 3.1: Apply the decision rules based on the provided thresholds.
    # The thresholds are sorted from strictest to most lenient.
    if mad_statistic <= thresholds['close_conformity']:
        return "Close Conformity"
    # Check if tiered thresholds for 'acceptable' and 'marginal' exist (for BL1).
    elif 'acceptable_conformity' in thresholds and mad_statistic <= thresholds['acceptable_conformity']:
        return "Acceptable Conformity"
    elif 'marginally_acceptable' in thresholds and mad_statistic <= thresholds['marginally_acceptable']:
        return "Marginally Acceptable Conformity"
    else:
        return "Non-conformity"

# ==============================================================================
# Phase V, Task 14: Main Orchestrator Function
# ==============================================================================
def perform_mad_test(
    empirical_digits: pd.Series,
    theoretical_probabilities: pd.Series,
    conformity_thresholds: Dict[str, float]
) -> Dict[str, Any]:
    """
    Performs a complete Mean Absolute Deviation (MAD) conformity test.

    This function orchestrates the entire MAD testing process. It:
    1. Prepares and aligns the observed and theoretical proportions.
    2. Calculates the MAD test statistic according to the study's formula.
    3. Classifies the result into a conformity level using study-specific thresholds.
    4. Returns a structured dictionary containing the test results.

    Args:
        empirical_digits (pd.Series): The raw series of extracted digits.
        theoretical_probabilities (pd.Series): The theoretical probabilities.
        conformity_thresholds (Dict[str, float]): The dictionary of thresholds
            applicable to the specific test (BL1, BL2, or BL12).

    Returns:
        Dict[str, Any]: A dictionary containing the MAD statistic and the
                        resulting conformity classification.
    """
    # Top-level input validation.
    if empirical_digits.dropna().empty:
        return {
            'mad_statistic': np.nan,
            'conformity_level': "Not Applicable",
            'message': "Test not performed due to empty empirical data."
        }

    try:
        # Step 1: Prepare and align the proportion distributions.
        observed_props, theoretical_props = _prepare_proportions_for_test(
            empirical_digits=empirical_digits,
            theoretical_probabilities=theoretical_probabilities
        )

        # Step 2: Calculate the MAD test statistic.
        mad_stat = _calculate_mad_statistic(
            observed_proportions=observed_props,
            theoretical_probabilities=theoretical_props
        )

        # Step 3: Classify the conformity level based on the statistic.
        conformity = _classify_mad_conformity(
            mad_statistic=mad_stat,
            thresholds=conformity_thresholds
        )

    except Exception as e:
        print(f"An error occurred during the MAD test: {e}")
        raise

    # Step 4: Compile and return the structured results.
    return {
        'mad_statistic': mad_stat,
        'conformity_level': conformity
    }


In [None]:
# Task 15: Comprehensive Results Compilation

# ==============================================================================
# Phase V, Task 15: Specific Formatting Helper Functions
# ==============================================================================

def _format_table_3(raw_results: Dict[str, Any]) -> pd.DataFrame:
    """
    Constructs a DataFrame replicating Table 3 from the source paper.

    This function specifically formats the results of the First-Digit (BL1)
    tests for the Pre-Tax Income variables (PI, PI(-), and PI(+)). It assembles
    columns for observed counts (Fd), observed proportions (BL1), and the
    theoretical Benford's Law proportions in the precise order and format
    presented in the study.

    Args:
        raw_results (Dict[str, Any]): The master dictionary containing the
            detailed outputs from the full test suite for all variables.

    Returns:
        pd.DataFrame: A pandas DataFrame formatted to be a high-fidelity
                      replication of the body of Table 3.
    """
    # Define the specific variables required for this table.
    table_vars = ['PI', 'PI(-)', 'PI(+)']
    # Define the test type relevant to this table.
    test_type = 'BL1'

    # Generate the theoretical BL1 probabilities to serve as a base and reference column.
    bl_probs, _ = generate_bl1_distribution(sample_size=1)

    # Initialize the DataFrame with the correct index (digits 1-9).
    df = pd.DataFrame(index=bl_probs.index)
    # Name the index for clarity, matching the paper's 'd1' label.
    df.index.name = 'd1'

    # Iterate through the specified variables to populate the table columns.
    for var in table_vars:
        # Retrieve the pre-computed results for the current variable and test type.
        results = raw_results[var][test_type]
        # Get the series of extracted digits.
        digits = results['digits']
        # Count the number of valid observations.
        n_obs = digits.count()

        # Calculate observed frequencies, ensuring all bins (1-9) are present.
        obs_counts = digits.value_counts().reindex(df.index).fillna(0)

        # Add the observed counts column (e.g., 'Fd PI').
        df[f'Fd {var}'] = obs_counts.astype(int)
        # Add the observed proportions column (e.g., 'BL1 PI').
        df[f'BL1 {var}'] = obs_counts / n_obs

    # Add the final column for the theoretical BL1 probabilities.
    df['BL1'] = bl_probs

    # Enforce the exact column order as seen in the published Table 3.
    col_order = ['Fd PI', 'Fd PI(-)', 'Fd PI(+)', 'BL1 PI', 'BL1 PI(-)', 'BL1 PI(+)', 'BL1']

    # Return the fully constructed and ordered DataFrame.
    return df[col_order]

def _format_table_4(raw_results: Dict[str, Any]) -> pd.DataFrame:
    """
    Constructs a DataFrame replicating Table 4 from the source paper.

    This function formats the results of the First-Digit (BL1) tests for the
    Total Assets (TA) and PI/TA ratio variables. It follows the same logic as
    _format_table_3 but for a different set of variables.

    Args:
        raw_results (Dict[str, Any]): The master dictionary containing the
            detailed outputs from the full test suite.

    Returns:
        pd.DataFrame: A pandas DataFrame formatted to replicate the body of Table 4.
    """
    # Define the specific variables required for this table.
    table_vars = ['TA', 'PI/TA']
    # Define the test type.
    test_type = 'BL1'

    # Generate theoretical BL1 probabilities.
    bl_probs, _ = generate_bl1_distribution(sample_size=1)

    # Initialize the DataFrame with the correct index.
    df = pd.DataFrame(index=bl_probs.index)
    df.index.name = 'd1'

    # Iterate through variables to populate columns.
    for var in table_vars:
        results = raw_results[var][test_type]
        digits = results['digits']
        n_obs = digits.count()
        obs_counts = digits.value_counts().reindex(df.index).fillna(0)
        df[f'Fd {var}'] = obs_counts.astype(int)
        df[f'BL1 {var}'] = obs_counts / n_obs

    # Add the theoretical BL1 column.
    df['BL1'] = bl_probs

    # Enforce the exact column order from the paper.
    col_order = ['Fd TA', 'Fd PI/TA', 'BL1 TA', 'BL1 PI/TA', 'BL1']

    # Return the formatted DataFrame.
    return df[col_order]

def _format_table_5(raw_results: Dict[str, Any]) -> pd.DataFrame:
    """
    Constructs a DataFrame replicating Table 5 from the source paper.

    This function formats the results of the Second-Digit (BL2) tests for the
    Pre-Tax Income variables (PI, PI(-), and PI(+)).

    Args:
        raw_results (Dict[str, Any]): The master dictionary containing the
            detailed outputs from the full test suite.

    Returns:
        pd.DataFrame: A pandas DataFrame formatted to replicate the body of Table 5.
    """
    # Define the specific variables required for this table.
    table_vars = ['PI', 'PI(-)', 'PI(+)']
    # Define the test type.
    test_type = 'BL2'

    # Generate theoretical BL2 probabilities.
    bl_probs, _ = generate_bl2_distribution(sample_size=1)

    # Initialize the DataFrame with the correct index (digits 0-9).
    df = pd.DataFrame(index=bl_probs.index)
    df.index.name = 'd2'

    # Iterate through variables to populate columns.
    for var in table_vars:
        results = raw_results[var][test_type]
        digits = results['digits']
        n_obs = digits.count()
        obs_counts = digits.value_counts().reindex(df.index).fillna(0)
        df[f'Sd {var}'] = obs_counts.astype(int)
        df[f'BL2 {var}'] = obs_counts / n_obs

    # Add the theoretical BL2 column.
    df['BL2'] = bl_probs

    # Enforce the exact column order from the paper.
    col_order = ['Sd PI', 'Sd PI(-)', 'Sd PI(+)', 'BL2 PI', 'BL2 PI(-)', 'BL2 PI(+)', 'BL2']

    # Return the formatted DataFrame.
    return df[col_order]

def _format_table_6(raw_results: Dict[str, Any]) -> pd.DataFrame:
    """
    Constructs a DataFrame replicating Table 6 from the source paper.

    This function formats the results of the Second-Digit (BL2) tests for the
    Total Assets (TA) and PI/TA ratio variables.

    Args:
        raw_results (Dict[str, Any]): The master dictionary containing the
            detailed outputs from the full test suite.

    Returns:
        pd.DataFrame: A pandas DataFrame formatted to replicate the body of Table 6.
    """
    # Define the specific variables required for this table.
    table_vars = ['TA', 'PI/TA']
    # Define the test type.
    test_type = 'BL2'

    # Generate theoretical BL2 probabilities.
    bl_probs, _ = generate_bl2_distribution(sample_size=1)

    # Initialize the DataFrame with the correct index.
    df = pd.DataFrame(index=bl_probs.index)
    df.index.name = 'd2'

    # Iterate through variables to populate columns.
    for var in table_vars:
        results = raw_results[var][test_type]
        digits = results['digits']
        n_obs = digits.count()
        obs_counts = digits.value_counts().reindex(df.index).fillna(0)
        df[f'Sd {var}'] = obs_counts.astype(int)
        df[f'BL2 {var}'] = obs_counts / n_obs

    # Add the theoretical BL2 column.
    df['BL2'] = bl_probs

    # Enforce the exact column order from the paper.
    col_order = ['Sd TA', 'Sd PI/TA', 'BL2 TA', 'BL2 PI/TA', 'BL2']

    # Return the formatted DataFrame.
    return df[col_order]

def _format_table_7_and_8_body(
    raw_results: Dict[str, Any],
    table_vars: List[str]
) -> pd.DataFrame:
    """
    Creates a generic DataFrame body for Tables 7 and 8 (BL12 tests).

    This helper function handles the common formatting logic for the First-Two-Digit
    (BL12) test result tables. It populates observed counts (FSd) and proportions
    (BL12) for a given list of variables.

    Args:
        raw_results (Dict[str, Any]): The master dictionary of test results.
        table_vars (List[str]): The list of variable names to include in the table.

    Returns:
        pd.DataFrame: A pandas DataFrame containing the formatted body for a
                      BL12 results table.
    """
    # Define the test type.
    test_type = 'BL12'

    # Generate theoretical BL12 probabilities.
    bl_probs, _ = generate_bl12_distribution(sample_size=1)

    # Initialize the DataFrame with the correct index (digits 10-99).
    df = pd.DataFrame(index=bl_probs.index)
    df.index.name = 'd12'

    # Iterate through variables to populate columns.
    for var in table_vars:
        results = raw_results[var][test_type]
        digits = results['digits']
        n_obs = digits.count()
        obs_counts = digits.value_counts().reindex(df.index).fillna(0)
        df[f'FSd {var}'] = obs_counts.astype(int)
        df[f'BL12 {var}'] = obs_counts / n_obs

    # Add the theoretical BL12 column.
    df['BL12'] = bl_probs

    # Return the DataFrame (column order will be set by the calling function).
    return df

def _create_stats_footer(
    raw_results: Dict[str, Any],
    table_vars: List[str],
    test_type: str
) -> pd.DataFrame:
    """
    Creates a standardized DataFrame to serve as a statistics footer for a table.

    This function extracts the Chi-Squared and MAD test statistics for a given
    set of variables and a specific test type, formatting them into a clean
    DataFrame suitable for display beneath a main results table.

    Args:
        raw_results (Dict[str, Any]): The master dictionary of test results.
        table_vars (List[str]): The list of variables included in the main table.
        test_type (str): The Benford's Law test type (e.g., 'BL1', 'BL2').

    Returns:
        pd.DataFrame: A DataFrame with test names ('χ²', 'MAD') as the index
                      and variable names as columns, containing the relevant
                      test statistics.
    """
    # Initialize a list to hold the data for the footer.
    footer_data = []

    # Iterate through the variables to extract their test statistics.
    for var in table_vars:
        # Extract the Chi-Squared statistic.
        chi2_stat = raw_results[var][test_type]['chi2_test']['chi_squared_stat']
        # Extract the MAD statistic.
        mad_stat = raw_results[var][test_type]['mad_test']['mad_statistic']
        # Append the data to the list.
        footer_data.append({'Variable': var, 'χ²': chi2_stat, 'MAD': mad_stat})

    # Convert the list of dictionaries to a DataFrame, setting 'Variable' as the index.
    # Transpose (.T) the DataFrame so that test names become the index.
    footer_df = pd.DataFrame(footer_data).set_index('Variable').T

    # Return the formatted footer DataFrame.
    return footer_df

# ==============================================================================
# Phase V, Task 15: Main Orchestrator Function
# ==============================================================================
def compile_all_results(
    analytical_datasets: Dict[str, pd.DataFrame],
    study_config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates the entire testing pipeline and compiles all results into
    publication-quality tables.

    This master function performs a complete, end-to-end replication of the
    study's core analysis. It systematically:
    1. Defines the full suite of variables to be tested.
    2. Executes the BL1, BL2, and BL12 tests for each variable, storing all
       detailed outputs in a comprehensive raw results dictionary.
    3. Uses a series of dedicated formatting functions to transform the raw
       results into high-fidelity replications of Tables 3 through 8 from
       the source paper.

    Args:
        analytical_datasets (Dict[str, pd.DataFrame]): The dictionary of
            prepared analytical datasets from Task 5.
        study_config (Dict[str, Any]): The global study configuration object.

    Returns:
        Dict[str, Any]: A dictionary containing two keys:
            'raw_results': A nested dictionary with all detailed test outputs.
            'publication_tables': A dictionary where keys are table names
                                  (e.g., 'Table 3') and values are dictionaries
                                  containing the formatted pandas DataFrame 'body'
                                  and the corresponding 'stats' footer DataFrame.
    """
    # Step 1: Define the full set of variables to be tested.
    # This map defines the variable name, the source DataFrame, and the column name.
    variables_to_test = {
        'PI': ('full_data', 'PreTaxIncome_GBP'),
        'PI(+)': ('pi_positive', 'PreTaxIncome_GBP'),
        'PI(-)': ('pi_negative', 'PreTaxIncome_GBP'),
        'TA': ('full_data', 'TotalAssets_GBP'),
        'PI/TA': ('full_data', 'PI_TA_Ratio'),
    }

    # Step 2: Run the full test suite for each variable and store raw results.
    raw_results = {}
    for var_name, (df_key, col_name) in variables_to_test.items():
        # Input validation for the analytical datasets dictionary.
        if df_key not in analytical_datasets or col_name not in analytical_datasets[df_key].columns:
            raise KeyError(f"Data for '{var_name}' ('{df_key}', '{col_name}') not found in analytical_datasets.")
        # Select the data series for the current test run.
        data_series = analytical_datasets[df_key][col_name]
        # Execute the full suite of tests (BL1, BL2, BL12) for the series.
        raw_results[var_name] = _run_full_test_suite_for_variable(data_series, study_config)

    # Step 3: Generate all publication tables using the dedicated formatting helpers.
    publication_tables = {}

    try:
        # --- Table 3: BL1 for PI variables ---
        vars_t3 = ['PI', 'PI(-)', 'PI(+)']
        publication_tables['Table 3'] = {
            'body': _format_table_3(raw_results),
            'stats': _create_stats_footer(raw_results, vars_t3, 'BL1')
        }

        # --- Table 4: BL1 for TA and PI/TA ---
        vars_t4 = ['TA', 'PI/TA']
        publication_tables['Table 4'] = {
            'body': _format_table_4(raw_results),
            'stats': _create_stats_footer(raw_results, vars_t4, 'BL1')
        }

        # --- Table 5: BL2 for PI variables ---
        vars_t5 = ['PI', 'PI(-)', 'PI(+)']
        publication_tables['Table 5'] = {
            'body': _format_table_5(raw_results),
            'stats': _create_stats_footer(raw_results, vars_t5, 'BL2')
        }

        # --- Table 6: BL2 for TA and PI/TA ---
        vars_t6 = ['TA', 'PI/TA']
        publication_tables['Table 6'] = {
            'body': _format_table_6(raw_results),
            'stats': _create_stats_footer(raw_results, vars_t6, 'BL2')
        }

        # --- Table 7: BL12 for PI variables ---
        vars_t7 = ['PI', 'PI(-)', 'PI(+)']
        body_t7 = _format_table_7_and_8_body(raw_results, vars_t7)
        col_order_t7 = ['FSd PI', 'FSd PI(-)', 'FSd PI(+)', 'BL12 PI', 'BL12 PI(-)', 'BL12 PI(+)', 'BL12']
        publication_tables['Table 7'] = {
            'body': body_t7[col_order_t7],
            'stats': _create_stats_footer(raw_results, vars_t7, 'BL12')
        }

        # --- Table 8: BL12 for TA and PI/TA ---
        vars_t8 = ['TA', 'PI/TA']
        body_t8 = _format_table_7_and_8_body(raw_results, vars_t8)
        col_order_t8 = ['FSd TA', 'FSd PI/TA', 'BL12 TA', 'BL12 PI/TA', 'BL12']
        publication_tables['Table 8'] = {
            'body': body_t8[col_order_t8],
            'stats': _create_stats_footer(raw_results, vars_t8, 'BL12')
        }

    except KeyError as e:
        # Handle cases where expected results are missing (e.g., a test failed to run).
        raise RuntimeError(f"Failed to format tables because a required result key is missing: {e}") from e

    # Return the final, structured output containing both raw and formatted results.
    return {
        'raw_results': raw_results,
        'publication_tables': publication_tables
    }


In [None]:
# Task 16: End-to-End Research Pipeline Orchestrator

# ==============================================================================
# Phase VI, Task 16: End-to-End Research Pipeline Orchestrator
# ==============================================================================
def run_benford_replication_pipeline(
    raw_financial_data: pd.DataFrame,
    study_configuration: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Executes the complete end-to-end Benford's Law replication pipeline.

    This master orchestrator function serves as the single entry point to
    replicate the entire study "Note on pre-taxation reported data by UK
    FTSE-listed companies." It sequentially executes all major phases of the
    research workflow, from initial data and configuration validation to the
    final compilation of publication-ready results tables.

    The pipeline follows these distinct phases:
    1.  **Validation:** Rigorously validates the configuration object and the
        structure and quality of the raw input data.
    2.  **Preprocessing:** Prepares the data by standardizing precision,
        flagging outliers, creating analytical variables (PI/TA ratio), and
        segmenting the data into required subsets (PI+, PI-).
    3.  **Summary Statistics:** Generates and validates the descriptive
        statistics to ensure the dataset aligns with the study's sample.
    4.  **Core Analysis & Reporting:** Executes the full suite of Benford's Law
        tests (BL1, BL2, BL12) for all variables and compiles the results
        into high-fidelity replications of the paper's analytical tables.

    Args:
        raw_financial_data (pd.DataFrame): The raw panel data of FTSE-listed
            companies, conforming to the specified input structure.
        study_configuration (Dict[str, Any]): The high-fidelity configuration
            object containing all methodological parameters for the study.

    Returns:
        Dict[str, Any]: A comprehensive dictionary containing all major outputs
            of the pipeline, including the data quality report, the replicated
            summary statistics table (Table 1), the raw results of all
            statistical tests, and the final, formatted publication tables
            (Tables 3-8).

    Raises:
        Exception: Propagates any exception raised during any stage of the
                   pipeline, halting execution upon failure to ensure integrity.
    """
    # Announce the start of the pipeline execution.
    print("="*80)
    print("Executing End-to-End Benford's Law Replication Pipeline...")
    print("="*80)

    # Initialize a dictionary to store all pipeline outputs.
    pipeline_outputs = {}

    try:
        # --- PHASE I: INPUT VALIDATION ---
        print("\n[PHASE I] Running Input Validation...")

        # Step 1: Validate the study configuration object.
        is_config_valid = validate_study_configuration(study_config=study_configuration)
        assert is_config_valid, "Configuration validation failed."
        print("  - Study configuration object is valid.")

        # Step 2: Validate the raw DataFrame structure.
        is_df_structure_valid = validate_dataframe_structure(
            df=raw_financial_data,
            study_config=study_configuration
        )
        assert is_df_structure_valid, "DataFrame structure validation failed."
        print("  - Raw DataFrame structure is valid.")

        # Step 3: Perform and store the data quality validation report.
        data_quality_report = validate_financial_data_quality(df=raw_financial_data)
        pipeline_outputs['data_quality_report'] = data_quality_report
        print("  - Financial data quality checks completed.")
        print("[PHASE I] Input Validation Successful.")

        # --- PHASE II: DATA PREPROCESSING AND VARIABLE CONSTRUCTION ---
        print("\n[PHASE II] Running Data Preprocessing and Variable Construction...")

        # Step 4: Prepare and standardize the data (creates flags, ensures precision).
        prepared_df = prepare_and_standardize_data(raw_df=raw_financial_data)
        print("  - Data standardization and flagging completed.")

        # Step 5: Create analytical variables and subsets (PI/TA, PI+, PI-).
        analytical_datasets = create_analytical_variables(prepared_df=prepared_df)
        pipeline_outputs['analytical_datasets'] = analytical_datasets
        print("  - Analytical variables and subsets created.")

        # Step 6: Generate and store the summary statistics table (replication of Table 1).
        summary_stats_table = generate_summary_statistics(analytical_datasets=analytical_datasets)
        pipeline_outputs['summary_statistics_table_1'] = summary_stats_table
        print("  - Summary statistics (Table 1) generated.")
        print("[PHASE II] Data Preprocessing Successful.")

        # --- PHASE V: STATISTICAL CONFORMITY TESTING AND REPORTING ---
        # Note: Phase III (Digit Extraction) and IV (Distribution Generation) are
        # called internally by the functions in Phase V.
        print("\n[PHASE V] Running Core Analysis and Compiling Results...")

        # Step 15: Run all tests and compile all results into final tables.
        final_results = compile_all_results(
            analytical_datasets=analytical_datasets,
            study_config=study_configuration
        )
        pipeline_outputs.update(final_results) # Add raw_results and publication_tables
        print("  - All Benford's Law tests executed.")
        print("  - Publication tables (3-8) generated.")
        print("[PHASE V] Core Analysis Successful.")

    except Exception as e:
        # If any step in the pipeline fails, print an error and re-raise.
        print("\n" + "="*80)
        print(f"PIPELINE EXECUTION FAILED. Error: {e}")
        print("="*80)
        raise

    # Announce the successful completion of the pipeline.
    print("\n" + "="*80)
    print("Pipeline Execution Completed Successfully.")
    print("="*80)

    # Return the comprehensive dictionary of all outputs.
    return pipeline_outputs


In [None]:
# Task 17: Robustness Analysis Implementation

# ==============================================================================
# Phase VI, Task 17, Step 1: Helper for a Single Bootstrap Iteration
# ==============================================================================
def _perform_single_bootstrap_iteration(
    data_series: pd.Series,
    sample_size: int,
    config: Dict[str, Any]
) -> Dict[str, float]:
    """
    Performs a single bootstrap iteration for a given data series.

    This function executes one cycle of the bootstrap process:
    1. Draws a random sample with replacement from the original data series.
    2. Runs the complete Benford's Law test suite on this bootstrapped sample.
    3. Extracts and returns the key test statistics (χ² and MAD) for each test.

    Args:
        data_series (pd.Series): The original financial data series from which to sample.
        sample_size (int): The size of the bootstrap sample to draw.
        config (Dict[str, Any]): The global study configuration object.

    Returns:
        Dict[str, float]: A flat dictionary containing the calculated test
                          statistics for this single iteration (e.g.,
                          {'BL1_chi2': 15.7, 'BL1_mad': 0.008, ...}).
    """
    # Step 1.1: Draw a sample of size `sample_size` with replacement.
    # `dropna()` is crucial to ensure we only sample from valid observations.
    bootstrap_sample = data_series.dropna().sample(n=sample_size, replace=True)

    # Step 1.2: Run the full test suite on the bootstrapped sample.
    # This reuses the powerful helper function from Task 15.
    iteration_results = _run_full_test_suite_for_variable(
        data_series=bootstrap_sample,
        config=config
    )

    # Step 1.3: Extract the key statistics into a flattened dictionary.
    stats = {}
    for test_type, results in iteration_results.items():
        stats[f'{test_type}_chi2'] = results['chi2_test']['chi_squared_stat']
        stats[f'{test_type}_mad'] = results['mad_test']['mad_statistic']

    return stats

# ==============================================================================
# Phase VI, Task 17, Step 1: Helper for Confidence Interval Calculation
# ==============================================================================
def _calculate_confidence_intervals(
    stats_list: List[float],
    confidence_level: float = 0.95
) -> Tuple[float, float]:
    """
    Calculates the confidence interval from a list of bootstrap statistics.

    This function uses the percentile method to determine the confidence
    interval from the empirical distribution of bootstrap statistics.

    Args:
        stats_list (List[float]): A list of a single statistic (e.g., MAD)
                                  from all bootstrap iterations.
        confidence_level (float): The desired confidence level (e.g., 0.95 for 95%).

    Returns:
        Tuple[float, float]: A tuple containing the lower and upper bounds of
                             the confidence interval.
    """
    # Calculate the lower and upper percentile bounds.
    lower_percentile = (1.0 - confidence_level) / 2.0 * 100
    upper_percentile = (1.0 + confidence_level) / 2.0 * 100

    # Use numpy.percentile to calculate the interval bounds.
    lower_bound, upper_bound = np.percentile(stats_list, [lower_percentile, upper_percentile])

    return lower_bound, upper_bound

# ==============================================================================
# Phase VI, Task 17, Step 1: Main Orchestrator Function
# ==============================================================================
def run_bootstrap_analysis(
    analytical_datasets: Dict[str, pd.DataFrame],
    study_config: Dict[str, Any],
    sample_sizes: List[int] = [1000, 3000, 5000],
    iterations: int = 1000
) -> Dict[str, Any]:
    """
    Orchestrates a comprehensive bootstrap resampling analysis.

    This function assesses the stability of the Benford's Law test statistics
    by repeatedly resampling from the original data. It iterates through a
    defined set of variables, tests multiple sample sizes, and runs a large
    number of bootstrap iterations for each combination. It then computes the
    mean and 95% confidence interval for each test statistic.

    Args:
        analytical_datasets (Dict[str, pd.DataFrame]): The dictionary of
            prepared analytical datasets.
        study_config (Dict[str, Any]): The global study configuration object.
        sample_sizes (List[int]): A list of sample sizes to test.
        iterations (int): The number of bootstrap iterations to perform.

    Returns:
        Dict[str, Any]: A deeply nested dictionary containing the bootstrap
                        results, structured by variable, sample size, and
                        test statistic.
    """
    # Define the primary variables to subject to bootstrap analysis.
    variables_to_test = {
        'PI': ('full_data', 'PreTaxIncome_GBP'),
        'TA': ('full_data', 'TotalAssets_GBP'),
    }

    # Initialize the master results dictionary.
    bootstrap_results = {}

    # Outer loop: Iterate through each variable to be tested.
    for var_name, (df_key, col_name) in variables_to_test.items():
        bootstrap_results[var_name] = {}
        data_series = analytical_datasets[df_key][col_name]

        # Middle loop: Iterate through each specified sample size.
        for size in sample_sizes:
            print(f"\nRunning bootstrap for '{var_name}' with sample size {size} ({iterations} iterations)...")
            bootstrap_results[var_name][size] = {}

            # Storage for the statistics from all iterations.
            iteration_stats_lists = {
                'BL1_chi2': [], 'BL1_mad': [],
                'BL2_chi2': [], 'BL2_mad': [],
                'BL12_chi2': [], 'BL12_mad': []
            }

            # Inner loop: Perform the bootstrap iterations.
            for _ in tqdm(range(iterations), desc=f"  - Iterating (n={size})"):
                # Perform one bootstrap iteration and get the stats.
                single_run_stats = _perform_single_bootstrap_iteration(data_series, size, study_config)
                # Append the stats to their respective lists.
                for key, value in single_run_stats.items():
                    if key in iteration_stats_lists:
                        iteration_stats_lists[key].append(value)

            # After all iterations, calculate CIs for each statistic.
            for key, stats_list in iteration_stats_lists.items():
                if stats_list:
                    mean_stat = np.mean(stats_list)
                    ci_lower, ci_upper = _calculate_confidence_intervals(stats_list)
                    bootstrap_results[var_name][size][key] = {
                        'mean': mean_stat,
                        '95_ci_lower': ci_lower,
                        '95_ci_upper': ci_upper
                    }

    return bootstrap_results


# ==============================================================================
# Phase VI, Task 17, Step 2: Helper for Chi-Squared Sensitivity
# ==============================================================================
def _test_chi_squared_sensitivity(
    statistic: float,
    dof: int,
    alpha_levels: List[float]
) -> Dict[float, bool]:
    """
    Tests the sensitivity of a Chi-Squared test conclusion to different alpha levels.

    For a given, fixed test statistic, this function calculates the critical
    value for several different significance levels (alphas) and determines
    whether the null hypothesis would be rejected at each level. This provides
    insight into how robust the statistical conclusion is to the choice of alpha.

    Args:
        statistic (float): The pre-calculated Chi-Squared statistic.
        dof (int): The degrees of freedom for the test.
        alpha_levels (List[float]): A list of alpha levels to test (e.g., [0.01, 0.05, 0.10]).

    Returns:
        Dict[float, bool]: A dictionary where keys are the alpha levels and
                           values are booleans indicating the rejection
                           decision (True if rejected, False if not).
    """
    # Input validation.
    if not all(0 < alpha < 1 for alpha in alpha_levels):
        raise ValueError("All alpha levels must be between 0 and 1.")

    # Initialize a dictionary to store the sensitivity results.
    sensitivity_results: Dict[float, bool] = {}

    # Iterate through each specified alpha level.
    for alpha in alpha_levels:
        # Calculate the critical value for the current alpha level using the
        # percent-point function (ppf), which is the inverse of the cdf.
        # We use (1 - alpha) because the Chi-Squared test is an upper-tailed test.
        critical_value = chi2.ppf(1 - alpha, df=dof)

        # Determine the rejection decision: the null is rejected if the
        # observed statistic is greater than the critical value for this alpha.
        sensitivity_results[alpha] = statistic > critical_value

    return sensitivity_results

# ==============================================================================
# Phase VI, Task 17, Step 2: Helper for MAD Sensitivity
# ==============================================================================
def _test_mad_sensitivity(
    statistic: float,
    base_thresholds: Dict[str, float],
    factors: List[float]
) -> Dict[float, str]:
    """
    Tests the sensitivity of a MAD conformity classification to varying thresholds.

    For a given, fixed MAD statistic, this function scales the base conformity
    thresholds by a list of factors (e.g., 0.8 for -20%, 1.2 for +20%) and
    re-evaluates the conformity classification at each new threshold level. This
    shows how the qualitative conclusion depends on the stringency of the thresholds.

    Args:
        statistic (float): The pre-calculated MAD statistic.
        base_thresholds (Dict[str, float]): The original dictionary of conformity
                                           thresholds from the study config.
        factors (List[float]): A list of multiplicative factors to apply to the
                               thresholds.

    Returns:
        Dict[float, str]: A dictionary where keys are the sensitivity factors
                          and values are the resulting conformity classifications.
    """
    # Initialize a dictionary to store the sensitivity results.
    sensitivity_results: Dict[float, str] = {}

    # Iterate through each specified sensitivity factor.
    for factor in factors:
        # Create a deep copy of the base thresholds to avoid modifying the
        # original configuration object. This is crucial for isolation.
        modified_thresholds = copy.deepcopy(base_thresholds)

        # Scale each threshold value in the copied dictionary by the current factor.
        for key in modified_thresholds:
            modified_thresholds[key] *= factor

        # Re-classify the conformity level using the new, modified thresholds.
        # This reuses the robust classification logic defined in Task 14.
        classification = _classify_mad_conformity(
            mad_statistic=statistic,
            thresholds=modified_thresholds
        )
        # Store the result, keyed by the factor used.
        sensitivity_results[factor] = classification

    return sensitivity_results

# ==============================================================================
# Phase VI, Task 17, Step 2: Main Orchestrator Function
# ==============================================================================
def run_parameter_sensitivity_analysis(
    raw_results: Dict[str, Any],
    study_config: Dict[str, Any],
    alpha_levels: List[float] = [0.10, 0.05, 0.01],
    mad_factors: List[float] = [0.8, 1.0, 1.2]
) -> Dict[str, Any]:
    """
    Orchestrates a parameter sensitivity analysis for all test results.

    This function takes the raw, pre-computed results from the main analysis
    and systematically tests how the conclusions change when key statistical
    parameters are varied. It operates on the static results, making it
    computationally efficient. It provides a comprehensive view of the
    robustness of the study's findings.

    Args:
        raw_results (Dict[str, Any]): The master dictionary of raw test results
                                      from the `compile_all_results` function.
        study_config (Dict[str, Any]): The global study configuration object,
                                       used to access base MAD thresholds.
        alpha_levels (List[float]): The significance levels to test for the χ² test.
        mad_factors (List[float]): The multiplicative factors to apply to the
                                   MAD conformity thresholds (e.g., 1.2 is +20%).

    Returns:
        Dict[str, Any]: A deeply nested dictionary containing the sensitivity
                        analysis results, structured by variable, test type,
                        and parameter variation.
    """
    # Initialize the master results dictionary for the sensitivity analysis.
    sensitivity_analysis_results: Dict[str, Any] = {}

    # Retrieve the base MAD thresholds from the configuration for easy access.
    base_mad_thresholds = study_config.get('statistical_tests', {}).get('mad', {}).get('conformity_thresholds', {})
    if not base_mad_thresholds:
        raise ValueError("MAD conformity thresholds not found in study configuration.")

    # Iterate through each variable (e.g., 'PI', 'TA') in the raw results.
    for var_name, var_tests in raw_results.items():
        sensitivity_analysis_results[var_name] = {}

        # Iterate through each test type (e.g., 'BL1', 'BL2') for the variable.
        for test_type, test_results in var_tests.items():
            sensitivity_analysis_results[var_name][test_type] = {}

            # --- Chi-Squared Sensitivity Analysis ---
            # Extract the pre-calculated results for the Chi-Squared test.
            chi2_results = test_results['chi2_test']
            # Run the sensitivity analysis for the Chi-Squared test.
            chi2_sensitivity = _test_chi_squared_sensitivity(
                statistic=chi2_results['chi_squared_stat'],
                dof=chi2_results['dof'],
                alpha_levels=alpha_levels
            )
            # Store the results.
            sensitivity_analysis_results[var_name][test_type]['chi2_sensitivity'] = chi2_sensitivity

            # --- MAD Sensitivity Analysis ---
            # Extract the pre-calculated results for the MAD test.
            mad_results = test_results['mad_test']
            # Retrieve the correct set of base thresholds for the current test type (BL1, BL2, etc.).
            current_base_thresholds = base_mad_thresholds.get(test_type)
            if not current_base_thresholds:
                raise ValueError(f"MAD thresholds for '{test_type}' not found in configuration.")
            # Run the sensitivity analysis for the MAD test.
            mad_sensitivity = _test_mad_sensitivity(
                statistic=mad_results['mad_statistic'],
                base_thresholds=current_base_thresholds,
                factors=mad_factors
            )
            # Store the results.
            sensitivity_analysis_results[var_name][test_type]['mad_sensitivity'] = mad_sensitivity

    return sensitivity_analysis_results


# ==============================================================================
# Phase VI, Task 17, Step 3: Helper for Temporal Partitioning
# ==============================================================================
def _partition_data_temporally(
    df: pd.DataFrame,
    split_year: int = 2015
) -> Dict[str, pd.DataFrame]:
    """
    Partitions a DataFrame into two temporal subsets based on a split year.

    This function leverages the 'Year' level of the DataFrame's MultiIndex to
    divide the data into an "early period" (up to and including the split year)
    and a "late period" (after the split year).

    Args:
        df (pd.DataFrame): The input DataFrame with a MultiIndex including a 'Year' level.
        split_year (int): The final year to be included in the "early period".

    Returns:
        Dict[str, pd.DataFrame]: A dictionary containing two DataFrames keyed
                                 by 'early_period' and 'late_period'.
    """
    # Input validation.
    if 'Year' not in df.index.names:
        raise ValueError("DataFrame must have a 'Year' level in its MultiIndex.")

    # Step 1.1: Access the 'Year' level from the MultiIndex.
    year_level = df.index.get_level_values('Year')

    # Step 1.2: Create a boolean mask for the early period.
    early_period_mask = year_level <= split_year

    # Step 1.3: Create a boolean mask for the late period.
    late_period_mask = year_level > split_year

    # Step 1.4: Filter the DataFrame to create the two temporal subsets.
    early_df = df[early_period_mask].copy()
    late_df = df[late_period_mask].copy()

    # Step 1.5: Return the subsets in a structured dictionary.
    return {
        'early_period': early_df,
        'late_period': late_df
    }

# ==============================================================================
# Phase VI, Task 17, Step 3: Main Orchestrator Function
# ==============================================================================
def run_temporal_stability_analysis(
    prepared_df: pd.DataFrame,
    study_config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates a temporal stability analysis of the Benford's Law test results.

    This function investigates whether the study's conclusions are stable over
    time by splitting the dataset into two distinct periods (e.g., 2009-2015
    and 2016-2022) and re-running the entire core analysis pipeline on each
    subset. It provides a direct comparison of the test results across time.

    Args:
        prepared_df (pd.DataFrame): The fully prepared and standardized DataFrame
                                    from Task 4.
        study_config (Dict[str, Any]): The global study configuration object.

    Returns:
        Dict[str, Any]: A dictionary containing the comprehensive test results
                        for the 'early_period' and 'late_period' separately,
                        allowing for a direct comparison of their statistical
                        properties and Benford's Law conformity.
    """
    # Initialize the master results dictionary.
    temporal_results: Dict[str, Any] = {}

    try:
        # Step 1: Partition the main dataset into early and late periods.
        print("Step 1: Partitioning data into temporal subsets...")
        temporal_subsets = _partition_data_temporally(df=prepared_df)
        print(f"  - Early period (<=2015) has {len(temporal_subsets['early_period'])} observations.")
        print(f"  - Late period (>2015) has {len(temporal_subsets['late_period'])} observations.")

        # Step 2: Iterate through each temporal subset and re-run the analysis.
        for period_name, period_df in temporal_subsets.items():
            print(f"\nRunning full analysis for: {period_name.upper()}...")

            # Step 2.1: Create the analytical variables (PI/TA, PI+, PI-) for this specific period.
            # This reuses the robust orchestrator from Task 5.
            period_analytical_datasets = create_analytical_variables(prepared_df=period_df)

            # Step 2.2: Run the entire suite of tests and compile the results for this period.
            # This reuses the master analysis and reporting orchestrator from Task 15.
            # We are interested in the raw results for comparison.
            period_full_results = compile_all_results(
                analytical_datasets=period_analytical_datasets,
                study_config=study_config
            )

            # Step 2.3: Store the raw results for this period in the master dictionary.
            temporal_results[period_name] = period_full_results['raw_results']
            print(f"  - Analysis for {period_name.upper()} complete.")

    except Exception as e:
        # Handle any errors during this complex, multi-stage process.
        print(f"An error occurred during the temporal stability analysis: {e}")
        raise

    return temporal_results


# ==============================================================================
# Phase VI, Task 17: Top-Level Orchestrator for All Robustness Analyses
# ==============================================================================
def run_full_robustness_analysis(
    analytical_datasets: Dict[str, pd.DataFrame],
    raw_results: Dict[str, Any],
    prepared_df: pd.DataFrame,
    study_config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates the execution of the complete suite of robustness analyses.

    This top-level function serves as the single entry point for validating the
    stability and reliability of the primary research findings. It sequentially
    executes three distinct and comprehensive robustness checks:

    1.  **Bootstrap Resampling Analysis (Task 17, Step 1):** Assesses the
        stability of test statistics across different sample sizes by resampling
        the original data with replacement.

    2.  **Parameter Sensitivity Analysis (Task 17, Step 2):** Evaluates how
        the final conclusions change when key statistical parameters (alpha
        levels for Chi-Squared, conformity thresholds for MAD) are varied.

    3.  **Temporal Stability Analysis (Task 17, Step 3):** Investigates whether
        the observed patterns of non-conformity are consistent across different
        time periods by splitting the data and re-running the entire analysis.

    Args:
        analytical_datasets (Dict[str, pd.DataFrame]): The dictionary of
            prepared analytical datasets (from Task 5).
        raw_results (Dict[str, Any]): The master dictionary of raw test results
            from the primary analysis (from Task 15).
        prepared_df (pd.DataFrame): The fully prepared and standardized DataFrame
            (from Task 4), required for temporal splitting.
        study_config (Dict[str, Any]): The global study configuration object.

    Returns:
        Dict[str, Any]: A dictionary containing the comprehensive results from
                        all three robustness analyses, keyed by analysis type.
    """
    # Announce the start of the full robustness analysis phase.
    print("\n" + "="*80)
    print("Executing Full Robustness Analysis Suite (Task 17)...")
    print("="*80)

    # Initialize a dictionary to store all robustness analysis outputs.
    robustness_outputs = {}

    # --- Step 1: Bootstrap Resampling Analysis ---
    try:
        # Announce the start of this specific analysis step.
        print("\n[Step 1/3] Running Bootstrap Resampling Analysis...")
        # Execute the bootstrap analysis orchestrator.
        bootstrap_report = run_bootstrap_analysis(
            analytical_datasets=analytical_datasets,
            study_config=study_config
            # Using default iterations and sample sizes for this example.
        )
        # Store the results.
        robustness_outputs['bootstrap_analysis'] = bootstrap_report
        # Announce completion.
        print("[Step 1/3] Bootstrap Resampling Analysis complete.")
    except Exception as e:
        # Provide specific error context if this step fails.
        print(f"[Step 1/3] Bootstrap Resampling Analysis FAILED. Error: {e}")
        # Re-raise the exception to halt execution.
        raise

    # --- Step 2: Parameter Sensitivity Analysis ---
    try:
        # Announce the start of this specific analysis step.
        print("\n[Step 2/3] Running Parameter Sensitivity Analysis...")
        # Execute the parameter sensitivity analysis orchestrator.
        sensitivity_report = run_parameter_sensitivity_analysis(
            raw_results=raw_results,
            study_config=study_config
            # Using default alpha levels and MAD factors.
        )
        # Store the results.
        robustness_outputs['parameter_sensitivity_analysis'] = sensitivity_report
        # Announce completion.
        print("[Step 2/3] Parameter Sensitivity Analysis complete.")
    except Exception as e:
        # Provide specific error context if this step fails.
        print(f"[Step 2/3] Parameter Sensitivity Analysis FAILED. Error: {e}")
        # Re-raise the exception.
        raise

    # --- Step 3: Temporal Stability Analysis ---
    try:
        # Announce the start of this specific analysis step.
        print("\n[Step 3/3] Running Temporal Stability Analysis...")
        # Execute the temporal stability analysis orchestrator.
        temporal_report = run_temporal_stability_analysis(
            prepared_df=prepared_df,
            study_config=study_config
        )
        # Store the results.
        robustness_outputs['temporal_stability_analysis'] = temporal_report
        # Announce completion.
        print("[Step 3/3] Temporal Stability Analysis complete.")
    except Exception as e:
        # Provide specific error context if this step fails.
        print(f"[Step 3/3] Temporal Stability Analysis FAILED. Error: {e}")
        # Re-raise the exception.
        raise

    # Announce the successful completion of the entire robustness suite.
    print("\n" + "="*80)
    print("Full Robustness Analysis Suite Completed Successfully.")
    print("="*80)

    # Return the comprehensive dictionary of all robustness outputs.
    return robustness_outputs



In [None]:

# Task 18: Final Validation and Quality Assurance

# ==============================================================================
# Phase VI, Task 18: Helper to Codify Reference Results from Paper
# ==============================================================================
def _create_reference_results() -> Dict[str, Any]:
    """
    Codifies the key numerical results from the source paper's tables.

    This function creates a machine-readable "answer key" by meticulously
    transcribing the primary test statistics (χ² and MAD) reported in Tables
    3 through 8 of the source paper. This dictionary serves as the immutable
    ground truth against which the pipeline's calculated results are
    programmatically validated to certify the replication's accuracy.

    Returns:
        Dict[str, Any]: A nested dictionary containing the reference statistics,
                        structured as {variable: {test_type: {metric: value}}}.
    """
    # These values are transcribed directly from the specified tables in the paper.
    # This structure is designed to mirror the structure of the 'raw_results'
    # dictionary for straightforward comparison.
    reference_statistics = {
        'PI': {
            'BL1': {'chi2': 16.5706, 'mad': 0.04103},
            'BL2': {'chi2': 9.855, 'mad': 0.02741},
            'BL12': {'chi2': 116.213, 'mad': 0.09418}
        },
        'PI(+)': {
            'BL1': {'chi2': 15.7448, 'mad': 0.04664},
            'BL2': {'chi2': 10.742, 'mad': 0.03560},
            'BL12': {'chi2': 128.654, 'mad': 0.11131}
        },
        'PI(-)': {
            'BL1': {'chi2': 10.3233, 'mad': 0.07764},
            'BL2': {'chi2': 15.208, 'mad': 0.08787},
            'BL12': {'chi2': 93.989, 'mad': 0.21754}
        },
        'TA': {
            'BL1': {'chi2': 27.757, 'mad': 0.04258},
            'BL2': {'chi2': 3.6678, 'mad': 0.02057},
            'BL12': {'chi2': 26.5234, 'mad': 0.04658}
        },
        'PI/TA': {
            'BL1': {'chi2': 117.287, 'mad': 0.11404},
            'BL2': {'chi2': 18.063, 'mad': 0.04253},
            'BL12': {'chi2': 196.574, 'mad': 0.13625}
        }
    }
    # Return the completed dictionary of ground truth values.
    return reference_statistics

# ==============================================================================
# Phase VI, Task 18: Helper for Final Replication Validation
# ==============================================================================
def _validate_replication_accuracy(
    raw_results: Dict[str, Any],
    reference_results: Dict[str, Any],
    tolerance: float = 0.01
) -> Dict[str, Any]:
    """
    Validates the accuracy of the replication against reference results.

    This function systematically compares the calculated test statistics from the
    pipeline against the ground truth values transcribed from the source paper.
    It uses a relative tolerance (`math.isclose`) to robustly handle minor
    floating-point differences that can arise from different software
    environments, producing a detailed, auditable validation report.

    Args:
        raw_results (Dict[str, Any]): The master dictionary of raw test results
                                      generated by the pipeline.
        reference_results (Dict[str, Any]): The dictionary of reference values
                                            from `_create_reference_results`.
        tolerance (float): The relative tolerance to use for floating-point
                           comparisons. A value of 0.01 corresponds to 1%.

    Returns:
        Dict[str, Any]: A dictionary summarizing the validation outcome,
                        including a success rate and a detailed log of each
                        comparison.
    """
    # Initialize a list to store a log of each individual comparison.
    validation_log: List[Dict[str, Any]] = []
    # Initialize counters for calculating the success rate.
    match_count = 0
    total_count = 0

    # Iterate through the reference results to ensure every key metric is checked.
    for var, tests in reference_results.items():
        for test, metrics in tests.items():
            for metric, ref_val in metrics.items():
                # Increment the total number of metrics being checked.
                total_count += 1
                # Map the simple metric name ('chi2', 'mad') to the nested keys in the raw_results dict.
                res_key = 'chi2_test' if metric == 'chi2' else 'mad_test'
                stat_key = 'chi_squared_stat' if metric == 'chi2' else 'mad_statistic'

                # Safely retrieve the calculated value from our results using .get() to avoid KeyErrors.
                calc_val = raw_results.get(var, {}).get(test, {}).get(res_key, {}).get(stat_key)

                # Perform the comparison using a relative tolerance.
                is_match = calc_val is not None and math.isclose(calc_val, ref_val, rel_tol=tolerance)

                # Update the status and match counter based on the comparison result.
                if is_match:
                    status = "PASS"
                    match_count += 1
                else:
                    status = "FAIL"

                # Append a detailed record of this comparison to the log.
                validation_log.append({
                    'Variable': var, 'Test': test, 'Metric': metric.upper(),
                    'Reference': ref_val, 'Calculated': calc_val, 'Status': status
                })

    # Calculate the final success rate.
    success_rate = (match_count / total_count) * 100 if total_count > 0 else 0

    # Return the comprehensive report.
    return {
        'summary': {
            'total_metrics_checked': total_count,
            'metrics_matched': match_count,
            'success_rate_pct': success_rate,
            'tolerance': tolerance
        },
        'detailed_log': validation_log
    }

# ==============================================================================
# Phase VI, Task 18: Helper for Deep JSON Serialization
# ==============================================================================
def _deep_serializer(obj: Any) -> Any:
    """
    Recursively converts a potentially complex object to be JSON serializable.

    This utility function traverses nested data structures (dicts, lists) and
    converts non-native JSON types, such as pandas DataFrames, Series, and
    numpy numerical types, into their native Python equivalents (dicts, lists,
    ints, floats). This is essential for reliably saving complex research
    outputs to JSON files.

    Args:
        obj (Any): The object to be serialized.

    Returns:
        Any: A JSON-serializable representation of the object.
    """
    # If the object is a pandas DataFrame or Series, convert it to a dictionary.
    # 'split' orientation is a good choice as it preserves the index, columns, and data.
    if isinstance(obj, (pd.DataFrame, pd.Series)):
        return obj.to_dict(orient='split')
    # If the object is a numpy integer, cast it to a native Python int.
    if isinstance(obj, np.integer):
        return int(obj)
    # If the object is a numpy float, cast it to a native Python float.
    if isinstance(obj, np.floating):
        return float(obj)
    # If the object is a numpy array, convert it to a native Python list.
    if isinstance(obj, np.ndarray):
        return obj.tolist()
    # If the object is a dictionary, recursively apply the serializer to its values.
    if isinstance(obj, dict):
        return {key: _deep_serializer(value) for key, value in obj.items()}
    # If the object is a list or tuple, recursively apply the serializer to its items.
    if isinstance(obj, (list, tuple)):
        return [_deep_serializer(item) for item in obj]
    # If the object is already a native type, return it as is.
    return obj

# ==============================================================================
# Phase VI, Task 18: Main Orchestrator Function
# ==============================================================================
def package_research_outputs(
    pipeline_outputs: Dict[str, Any],
    output_directory: str
) -> None:
    """
    Performs final validation and packages all research outputs for dissemination.

    This function concludes the research pipeline, providing a final seal of
    quality and ensuring perfect reproducibility. It:
    1.  Performs a final, quantitative validation of the results against the
        source paper's published statistics.
    2.  Generates a detailed certification report based on this validation.
    3.  Saves all key outputs—data quality reports, summary statistics, final
        publication tables, raw results, and robustness analyses—to a
        specified directory in clear, accessible formats (CSV and JSON).

    Args:
        pipeline_outputs (Dict[str, Any]): The master dictionary containing all
            outputs from the `run_benford_replication_pipeline` function.
        output_directory (str): The path to the directory where all output
                                files will be saved.
    """
    # Input validation for the output directory path.
    if not isinstance(output_directory, (str, Path)):
        raise TypeError("output_directory must be a string or a Path object.")

    # Create the output directory using pathlib for robust path handling.
    # `parents=True` creates any necessary parent directories.
    # `exist_ok=True` prevents an error if the directory already exists.
    output_path = Path(output_directory)
    output_path.mkdir(parents=True, exist_ok=True)
    print(f"Packaging all outputs to directory: {output_path.resolve()}")

    try:
        # --- Step 1: Perform Final Validation and Save Report ---
        print("\nStep 1/2: Performing final validation against reference results...")
        # Create the dictionary of ground truth values from the paper.
        reference_results = _create_reference_results()
        # Perform the comparison and generate the report.
        validation_report = _validate_replication_accuracy(
            raw_results=pipeline_outputs['raw_results'],
            reference_results=reference_results
        )
        # Save the detailed validation report to a JSON file.
        with open(output_path / "replication_validation_report.json", "w") as f:
            json.dump(validation_report, f, indent=4)
        print(f"  - Validation complete. Success Rate: {validation_report['summary']['success_rate_pct']:.2f}%")
        print(f"  - Detailed validation log saved to 'replication_validation_report.json'")

        # --- Step 2: Save All Pipeline Outputs to Files ---
        print("\nStep 2/2: Saving all pipeline outputs to files...")

        # Define a nested helper function for saving JSON files using the deep serializer.
        def save_json(data: Dict, filename: str):
            """Serializes and saves a dictionary to a JSON file."""
            serializable_data = _deep_serializer(data)
            with open(output_path / filename, "w") as f:
                json.dump(serializable_data, f, indent=4)

        # Save the data quality report generated in Task 3.
        save_json(pipeline_outputs['data_quality_report'], "data_quality_report.json")

        # Save the summary statistics table (Table 1 replica) to a CSV file.
        pipeline_outputs['summary_statistics_table_1'].to_csv(output_path / "table_1_summary_statistics.csv")

        # Save the publication-ready tables (Tables 3-8) to separate CSV files.
        for name, content in pipeline_outputs['publication_tables'].items():
            # Create a file-system-friendly name (e.g., "Table 3" -> "table_3").
            slug = name.lower().replace(" ", "_")
            # Save the main body of the table.
            content['body'].to_csv(output_path / f"{slug}_body.csv")
            # Save the corresponding statistics footer.
            content['stats'].to_csv(output_path / f"{slug}_stats.csv")

        # Save the comprehensive raw results dictionary to a JSON file.
        save_json(pipeline_outputs['raw_results'], "raw_test_results.json")

        # Save the robustness analysis results, if they exist in the outputs.
        if 'robustness_analysis' in pipeline_outputs:
             save_json(pipeline_outputs['robustness_analysis'], "robustness_analysis_report.json")

        print("  - All reports and tables have been saved successfully.")
        print("\n" + "="*80)
        print("Research Output Packaging Complete.")
        print("="*80)

    except (IOError, KeyError, TypeError) as e:
        # Catch potential errors during file writing or data access.
        print(f"Failed to package research outputs. An error occurred: {e}")
        raise


In [None]:
# Top-Level Orchestrator

# ==============================================================================
# Final Task: Top-Level Project Orchestrator
# ==============================================================================
def execute_full_project_workflow(
    raw_financial_data: pd.DataFrame,
    study_configuration: Dict[str, Any],
    output_directory: str,
    run_robustness_checks: bool = True
) -> Dict[str, Any]:
    """
    Executes the entire, end-to-end research project workflow from raw data to
    final packaged and validated outputs.

    This top-level master orchestrator serves as the single entry point for
    the complete study replication. It encapsulates all phases of the project,
    ensuring a reproducible, auditable, and robust execution with a seamless
    and logically sound flow of data between each stage.

    The workflow proceeds in a sequence of major, dependent stages:
    1.  **Validation:** Rigorously validates the configuration object and the
        structure and quality of the raw input data.
    2.  **Preprocessing & Summarization:** Prepares the data by standardizing
        precision, flagging outliers, creating analytical variables (PI/TA ratio),
        segmenting the data into required subsets (PI+, PI-), and generating
        the summary statistics table (Table 1 replica).
    3.  **Core Analysis & Reporting:** Executes the full suite of Benford's Law
        tests (BL1, BL2, BL12) for all variables and compiles the results
        into high-fidelity replications of the paper's analytical tables (Tables 3-8).
    4.  **Robustness Analysis:** If enabled, performs a comprehensive suite of
        robustness checks, including bootstrap resampling, parameter sensitivity,
        and temporal stability analyses.
    5.  **Final Packaging:** Programmatically validates the replication's accuracy
        against the source paper and saves all generated artifacts to a
        specified output directory.

    Args:
        raw_financial_data (pd.DataFrame): The raw panel data of FTSE-listed
            companies, conforming to the specified input structure.
        study_configuration (Dict[str, Any]): The high-fidelity configuration
            object containing all methodological parameters for the study.
        output_directory (str): The path to the directory where all output
                                files will be saved.
        run_robustness_checks (bool): A flag to enable or disable the
            computationally intensive robustness analysis phase. Defaults to True.

    Returns:
        Dict[str, Any]: A comprehensive dictionary containing all major outputs
                        of the pipeline, providing a complete in-memory record
                        of the entire project's results.
    """
    # Announce the start of the entire project workflow.
    print("="*80)
    print("INITIALIZING TOP-LEVEL PROJECT WORKFLOW")
    print("="*80)

    # Initialize a dictionary to hold all pipeline outputs as they are generated.
    pipeline_outputs: Dict[str, Any] = {}

    try:
        # --- STAGE 1: INPUT VALIDATION ---
        print("\n[STAGE 1/5] Running Input Validation...")
        # Validate the study configuration object.
        validate_study_configuration(study_config=study_configuration)
        print("  - Study configuration object is valid.")
        # Validate the raw DataFrame structure.
        validate_dataframe_structure(df=raw_financial_data, study_config=study_configuration)
        print("  - Raw DataFrame structure is valid.")
        # Perform and store the data quality validation report.
        pipeline_outputs['data_quality_report'] = validate_financial_data_quality(df=raw_financial_data)
        print("  - Financial data quality checks completed.")
        print("[STAGE 1] Input Validation Successful.")

        # --- STAGE 2: PREPROCESSING & SUMMARIZATION ---
        print("\n[STAGE 2/5] Running Data Preprocessing and Summarization...")
        # Prepare and standardize the data (creates flags, ensures precision).
        prepared_df = prepare_and_standardize_data(raw_df=raw_financial_data)
        print("  - Data standardization and flagging completed.")
        # Create analytical variables and subsets (PI/TA, PI+, PI-).
        analytical_datasets = create_analytical_variables(prepared_df=prepared_df)
        pipeline_outputs['analytical_datasets'] = analytical_datasets
        print("  - Analytical variables and subsets created.")
        # Generate and store the summary statistics table (replication of Table 1).
        summary_stats_table = generate_summary_statistics(analytical_datasets=analytical_datasets)
        pipeline_outputs['summary_statistics_table_1'] = summary_stats_table
        print("  - Summary statistics (Table 1) generated.")
        print("[STAGE 2] Preprocessing & Summarization Successful.")

        # --- STAGE 3: CORE ANALYSIS & REPORTING ---
        print("\n[STAGE 3/5] Running Core Analysis and Compiling Results...")
        # Run all tests and compile all results into final tables.
        core_analysis_results = compile_all_results(
            analytical_datasets=analytical_datasets,
            study_config=study_configuration
        )
        # Update the main outputs dictionary with the results from this stage.
        pipeline_outputs.update(core_analysis_results)
        print("  - All Benford's Law tests executed.")
        print("  - Publication tables (3-8) generated.")
        print("[STAGE 3] Core Analysis Successful.")

        # --- STAGE 4: ROBUSTNESS ANALYSIS ---
        print("\n[STAGE 4/5] Running Robustness Analysis Suite...")
        if run_robustness_checks:
            # Execute the full suite of robustness checks.
            robustness_outputs = run_full_robustness_analysis(
                analytical_datasets=analytical_datasets,
                raw_results=pipeline_outputs['raw_results'],
                prepared_df=prepared_df, # Pass the correctly prepared DF
                study_config=study_configuration
            )
            # Store the results.
            pipeline_outputs['robustness_analysis'] = robustness_outputs
            print("[STAGE 4] Robustness Analysis Successful.")
        else:
            # Skip if the flag is set to False.
            print("  - Skipped computationally intensive robustness analysis as requested.")
            print("[STAGE 4] Robustness Analysis Skipped.")

        # --- STAGE 5: FINAL VALIDATION & PACKAGING ---
        print("\n[STAGE 5/5] Running Final Validation and Packaging...")
        # Package all primary outputs for dissemination.
        package_research_outputs(
            pipeline_outputs=pipeline_outputs,
            output_directory=output_directory
        )
        print("[STAGE 5] Final Validation & Packaging Successful.")

    except Exception as e:
        # A top-level exception handler to catch any failure in the entire workflow.
        print("\n" + "="*80)
        print(f"TOP-LEVEL WORKFLOW FAILED. A critical error occurred: {e}")
        print("="*80)
        # Re-raise the exception to halt execution and provide a traceback.
        raise

    # Announce the successful completion of the entire project.
    print("\n" + "="*80)
    print("TOP-LEVEL PROJECT WORKFLOW COMPLETED SUCCESSFULLY.")
    print("="*80)

    # Return the final, comprehensive dictionary of all in-memory results.
    return pipeline_outputs
