# **`README.md`**

# **Verbal Technical Analysis: A Production-Grade Implementation**

<!-- PROJECT SHIELDS -->
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![Python Version](https://img.shields.io/badge/python-3.9%2B-blue.svg)](https://www.python.org/)
[![arXiv](https://img.shields.io/badge/arXiv-2511.08616-b31b1b.svg)](https://arxiv.org/abs/2511.08616)
[![Conference](https://img.shields.io/badge/Conference-ICAIF%202025-9cf)](https://ai-finance.org/)
[![Year](https://img.shields.io/badge/Year-2025-purple)](https://github.com/chirindaopensource/reasoning_time_series_financial_technical_analysis)
[![Discipline](https://img.shields.io/badge/Discipline-Quantitative%20Finance-00529B)](https://github.com/chirindaopensource/reasoning_time_series_financial_technical_analysis)
[![Data Source](https://img.shields.io/badge/Data%20Source-StockNet-003299)](https://github.com/yumoxu/stocknet-dataset)
[![Core Method](https://img.shields.io/badge/Method-Reinforcement%20Learning-orange)](https://github.com/chirindaopensource/reasoning_time_series_financial_technical_analysis)
[![Analysis](https://img.shields.io/badge/Analysis-Time--Series%20Forecasting-red)](https://github.com/chirindaopensource/reasoning_time_series_financial_technical_analysis)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Type Checking: mypy](https://img.shields.io/badge/type%20checking-mypy-blue)](http://mypy-lang.org/)
[![PyTorch](https://img.shields.io/badge/PyTorch-%23EE4C2C.svg?style=flat&logo=PyTorch&logoColor=white)](https://pytorch.org/)
[![Hugging Face](https://img.shields.io/badge/ü§ó%20Hugging%20Face-Transformers-yellow)](https://huggingface.co/transformers)
[![PEFT](https://img.shields.io/badge/PEFT-LoRA-green)](https://github.com/huggingface/peft)
[![CVXPY](https://img.shields.io/badge/CVXPY-F4B841-blue)](https://www.cvxpy.org/)
[![Pandas](https://img.shields.io/badge/pandas-%23150458.svg?style=flat&logo=pandas&logoColor=white)](https://pandas.pydata.org/)
[![NumPy](https://img.shields.io/badge/numpy-%23013243.svg?style=flat&logo=numpy&logoColor=white)](https://numpy.org/)
[![Jupyter](https://img.shields.io/badge/Jupyter-%23F37626.svg?style=flat&logo=Jupyter&logoColor=white)](https://jupyter.org/)

**Repository:** `https://github.com/chirindaopensource/reasoning_time_series_financial_technical_analysis`

**Owner:** 2025 Craig Chirinda (Open Source Projects)

This repository contains an **independent**, professional-grade Python implementation of the research methodology from the 2025 paper entitled **"Reasoning on Time-Series for Financial Technical Analysis"** by:

*   Kelvin J.L. Koa
*   Jan Chen
*   Yunshan Ma
*   Huanhuan Zheng
*   Tat-Seng Chua

The project provides a complete, end-to-end computational framework for replicating the paper's findings. It delivers a modular, auditable, and extensible pipeline that executes the entire research workflow: from rigorous data validation and cleansing to multi-stage model training, baseline comparison, and final evaluation of both forecasting accuracy and portfolio utility.

## Table of Contents

- [Introduction](#introduction)
- [Theoretical Background](#theoretical-background)
- [Features](#features)
- [Methodology Implemented](#methodology-implemented)
- [Core Components (Notebook Structure)](#core-components-notebook-structure)
- [Key Callable: `run_vta_pipeline`](#key-callable-run_vta_pipeline)
- [Prerequisites](#prerequisites)
- [Installation](#installation)
- [Input Data Structure](#input-data-structure)
- [Usage](#usage)
- [Output Structure](#output-structure)
- [Project Structure](#project-structure)
- [Customization](#customization)
- [Contributing](#contributing)
- [Recommended Extensions](#recommended-extensions)
- [License](#license)
- [Citation](#citation)
- [Acknowledgments](#acknowledgments)

## Introduction

This project provides a Python implementation of the Verbal Technical Analysis (VTA) framework presented in Koa et al. (2025). The core of this repository is the iPython Notebook `reasoning_time_series_financial_technical_analysis_draft.ipynb`, which contains a comprehensive suite of functions to replicate the paper's findings. The pipeline is designed as a robust and scalable system for generating dual-output stock forecasts that are both numerically accurate and accompanied by a human-readable analytical narrative.

The paper's central contribution is a novel, multi-stage training methodology that teaches a Large Language Model (LLM) to perform financial technical analysis and fuses its reasoning with a dedicated time-series forecasting model. This codebase operationalizes the paper's experimental design, allowing users to:
-   Rigorously validate and manage the entire experimental configuration via a single `config.yaml` file.
-   Execute a multi-stage data preparation pipeline to cleanse, window, and annotate time-series data with a full suite of technical indicators.
-   Train a reasoning LLM (`œÄŒ∏`) using a three-stage process: cold-start Reinforcement Learning (RL), Rejection Sampling with Supervised Fine-Tuning (SFT), and final performance-tuning RL.
-   Train a bespoke, dual-branch time-series backbone (`œÜ`) using a cross-modal alignment objective.
-   Train a conditional fusion model (`œà`) that learns to guide the backbone's forecast using attributes derived from the LLM's reasoning.
-   Run a complete, end-to-end inference pipeline to generate the final dual output (forecast + narrative).
-   Train and evaluate strong baseline models (DLinear, TSMixer) under identical conditions for fair comparison.
-   Perform a comprehensive evaluation of all models on both statistical error metrics (MSE, MAE) and a realistic portfolio backtest using Markowitz optimization.
-   Conduct a full suite of sensitivity analyses to test the robustness of the results to key hyperparameter choices.

## Theoretical Background

The implemented methods are grounded in principles from deep learning, reinforcement learning, and modern portfolio theory.

**1. Time-GRPO for Reasoning:**
The reasoning LLM is trained using a novel objective called Time-Series Group Relative Policy Optimization (Time-GRPO), a variant of PPO. The policy `œÄŒ∏` is optimized to maximize a reward signal derived from the accuracy of its generated forecast. The core reward is the inverse Mean Squared Error:
$$
r_{\mathrm{MSE}}(\theta) = \frac{1}{\lambda \cdot \lVert \hat{y}_\theta - y \rVert^2 + \epsilon}
$$
The policy is updated using a clipped surrogate objective with a KL penalty to maintain language fluency, based on the group-relative advantage \( A_i = (r_i - \mathrm{mean}(\{r_j\})) / (\mathrm{std}(\{r_j\}) + \epsilon) \).

**2. Cross-Modal Alignment:**
The time-series backbone `œÜ` is trained to align the numerical time-series domain with a latent language space. This is achieved via a cross-attention mechanism where the time-series embeddings `X_time` act as the query and a set of "language prototypes" `D` (derived from a base LLM's vocabulary via PCA) act as the key and value.
$$
X_{\text{text}} = \mathrm{Softmax}\left( \frac{(X_{\text{time}} W_Q) (D W_K)^T}{\sqrt{C}} \right) (D W_V)
$$
The model is trained with a dual-loss objective that encourages consistency between the temporal and aligned-textual branches.

**3. Classifier-Free Guidance for Fusion:**
The final forecast is a blend of the unconditional prediction from the backbone `≈∑_œÜ(X)` and a conditional prediction `≈∑_œà(X, c)` that is guided by attributes `c` from the LLM's reasoning. The fusion model `œà` is trained with random dropping of the conditioning vector `c`. At inference, two forward passes are performed (one with `c`, one without) and the results are blended:
$$
\hat{y} = \hat{y}_\phi(X) + s \cdot \big( \hat{y}_\psi(X, c) - \hat{y}_\phi(X) \big)
$$
where `s` is the guidance scale.

**4. Markowitz Portfolio Optimization:**
To assess financial utility, a daily-rebalanced portfolio is constructed by solving the Markowitz mean-variance optimization problem. The model's multi-step price forecasts are used to derive the expected returns `Œº` and the covariance matrix `Œ£` of the assets. The optimizer finds the weights `w` that maximize the risk-adjusted return:
$$
\max_{w} \; \mu_t^\top w - \frac{\gamma}{2} w^\top \Sigma_t w \quad \text{subject to} \quad w \succeq 0, \; \mathbf{1}^\top w = 1
$$

## Features

The provided iPython Notebook (`reasoning_time_series_financial_technical_analysis_draft.ipynb`) implements the full research pipeline, including:

-   **Modular, Multi-Task Architecture:** The entire pipeline is broken down into 15 distinct, modular tasks, each with its own orchestrator function.
-   **Configuration-Driven Design:** All hyperparameters and settings are managed in an external `config.yaml` file.
-   **High-Performance Feature Engineering:** Includes a complete, from-scratch implementation of 10 technical indicators using vectorized `numpy` and JIT-compiled `numba` for C-level speed.
-   **Resumable Pipeline:** The master orchestrator implements atomic artifact management, allowing the long-running pipeline to be stopped and resumed without re-running expensive completed stages.
-   **Production-Grade Training:** Implements best practices for RL and deep learning, including a robust PPO-style loop, SFT with the `transformers` Trainer, validation-based checkpointing, and gradient clipping.
-   **Rigorous Financial Backtesting:** Implements a daily rebalancing portfolio backtest with a professional-grade `cvxpy` optimizer and covariance matrix regularization.
-   **Complete Replication and Robustness:** A single top-level function call can execute the entire study, including a comprehensive suite of sensitivity analyses.
-   **Full Provenance:** The pipeline generates a unique run directory for each experiment, containing a detailed log file, a copy of the exact configuration used, and all generated artifacts for full reproducibility.

## Methodology Implemented

The core analytical steps directly implement the methodology from the paper:

1.  **Data Preparation (Tasks 1-5):** Ingests and validates all raw inputs, cleanses the market data, creates memory-efficient sliding windows, computes a full suite of technical indicators, and assembles the final prompts.
2.  **Reasoning Model Training (Tasks 6-8):** Executes the three-stage RL and SFT pipeline to train the reasoning model `œÄŒ∏`.
3.  **Backbone Model Training (Task 9):** Trains the dual-branch forecasting backbone `œÜ` with the cross-modal alignment objective.
4.  **Fusion Model Training (Task 10):** Trains the conditional fusion model `œà` using classifier-free guidance.
5.  **Inference (Task 11):** Runs the complete, three-model pipeline to generate the final dual outputs (forecast + narrative).
6.  **Baseline Evaluation (Task 12):** Trains and evaluates DLinear and TSMixer under identical conditions.
7.  **Final Evaluation (Task 13):** Computes all final error metrics and portfolio performance metrics for all models and generates comparison tables.
8.  **Robustness Analysis (Task 15):** Systematically re-runs the pipeline with varied hyperparameters to test for sensitivity.

## Core Components (Notebook Structure)

The `reasoning_time_series_financial_technical_analysis_draft.ipynb` notebook is structured as a logical pipeline with modular orchestrator functions for each of the 15 major tasks. All functions are self-contained, fully documented with type hints and docstrings, and designed for professional-grade execution.

## Key Callable: `run_vta_pipeline`

The project is designed around a single, top-level user-facing interface function:

-   **`run_vta_pipeline`:** This master orchestrator function, located in the final section of the notebook, runs the entire automated research pipeline from end-to-end. A single call to this function reproduces the entire computational portion of the project.

## Prerequisites

-   Python 3.9+
-   A CUDA-enabled GPU is highly recommended for all model training stages.
-   Core dependencies: `pandas`, `numpy`, `pyyaml`, `torch`, `transformers`, `peft`, `numba`, `scikit-learn`, `cvxpy`, `exchange_calendars`, `tqdm`.

## Installation

1.  **Clone the repository:**
    ```sh
    git clone https://github.com/chirindaopensource/reasoning_time_series_financial_technical_analysis.git
    cd reasoning_time_series_financial_technical_analysis
    ```

2.  **Create and activate a virtual environment (recommended):**
    ```sh
    python -m venv venv
    source venv/bin/activate  # On Windows, use `venv\Scripts\activate`
    ```

3.  **Install Python dependencies:**
    ```sh
    pip install -r requirements.txt
    ```

## Input Data Structure

The pipeline requires a primary `market_data_df` with a specific schema, which is rigorously validated. A synthetic data generator is included in the notebook for a self-contained demonstration.

-   **`market_data_df`**: A `pandas.DataFrame` with a `MultiIndex` of `['date', 'ticker']`.
    -   **Index:**
        -   `date`: `datetime64[ns]`
        -   `ticker`: `object` (string)
    -   **Columns:**
        -   `Open`, `High`, `Low`, `Close`, `Adj Close`: `float64`
        -   `Volume`: `int64` or `float64`

All other parameters are controlled by the `config.yaml` file.

## Usage

The `reasoning_time_series_financial_technical_analysis_draft.ipynb` notebook provides a complete, step-by-step guide. The primary workflow is to execute the final cell of the notebook, which demonstrates how to use the top-level `main` orchestrator:

```python
# Final cell of the notebook

# This block serves as the main entry point for the entire project.
if __name__ == '__main__':
    # 1. Define paths and parameters.
    CONFIG_PATH = "./config.yaml"
    DATA_PATH = "./synthetic_market_data.csv"
    
    # 2. Load configuration from the YAML file.
    with open(CONFIG_PATH, 'r') as f:
        config = yaml.safe_load(f)
    
    # 3. Define necessary data mappings.
    TICKER_TO_MARKET_MAP = {'SYNTH_A': 'US', 'SYNTH_B': 'US', ...}
    TICKER_TO_DATASET_MAP = {'SYNTH_A': 'StockNet', 'SYNTH_B': 'StockNet', ...}
    
    # 4. Execute the entire replication study in dry-run mode for a quick test.
    #    Set dry_run=False for a full run.
    final_results = main(
        market_data_path=DATA_PATH,
        config=config,
        ticker_to_market_map=TICKER_TO_MARKET_MAP,
        ticker_to_dataset_map=TICKER_TO_DATASET_MAP,
        dataset_name="StockNet",
        base_run_id="vta_replication",
        run_sensitivity=False,
        dry_run=True
    )
    
    # 5. Inspect final results.
    print("--- PIPELINE EXECUTION SUCCEEDED ---")
```

## Output Structure

The pipeline generates a structured `results/` directory. Each call to the master orchestrator creates a unique run directory:
-   **`results/<run_id>/`**: Contains all artifacts for a specific run.
    -   `artifacts/`: Pickled Python objects for each major task's output (e.g., `task_6_outputs.pkl`).
    -   `models/`: Saved model checkpoints for each training stage (e.g., `stage3_final_lora/`).
    -   `config.yaml`: An exact copy of the configuration used for this run.
    -   `pipeline.log`: A detailed log file for the run.
-   **`results/<base_run_id>_sensitivity_analysis_summary.csv`**: If sensitivity analysis is run, this master table summarizes the results.

## Project Structure

```
reasoning_time_series_financial_technical_analysis/
‚îÇ
‚îú‚îÄ‚îÄ reasoning_time_series_financial_technical_analysis_draft.ipynb
‚îú‚îÄ‚îÄ config.yaml
‚îú‚îÄ‚îÄ requirements.txt
‚îÇ
‚îú‚îÄ‚îÄ results/
‚îÇ   ‚îú‚îÄ‚îÄ vta_replication_baseline_dry_run/
‚îÇ   ‚îÇ   ‚îú‚îÄ‚îÄ artifacts/
‚îÇ   ‚îÇ   ‚îú‚îÄ‚îÄ models/
‚îÇ   ‚îÇ   ‚îú‚îÄ‚îÄ config.yaml
‚îÇ   ‚îÇ   ‚îî‚îÄ‚îÄ pipeline.log
‚îÇ   ‚îî‚îÄ‚îÄ ...
‚îÇ
‚îú‚îÄ‚îÄ LICENSE
‚îî‚îÄ‚îÄ README.md
```

## Customization

The pipeline is highly customizable via the `config.yaml` file. Users can modify all study parameters, including model identifiers, learning rates, architectural details, and technical indicator settings, without altering the core Python code.

## Contributing

Contributions are welcome. Please fork the repository, create a feature branch, and submit a pull request with a clear description of your changes. Adherence to PEP 8, type hinting, and comprehensive docstrings is required.

## Recommended Extensions

An ablation study was proposed but not implemented. A key extension would be to implement this analysis to quantify the contribution of each component of the VTA framework. This would involve:
-   Creating a meta-orchestrator similar to the sensitivity analysis.
-   Programmatically creating modified configurations for each ablation scenario (e.g., setting `guidance_scale = 0`, using a simplified `c` vector).
-   Running the pipeline for each ablation and comparing the final performance metrics against the full VTA model.

## License

This project is licensed under the MIT License. See the `LICENSE` file for details.

## Citation

If you use this code or the methodology in your research, please cite the original paper:

```bibtex
@article{koa2025reasoning,
  title={Reasoning on Time-Series for Financial Technical Analysis},
  author={Koa, Kelvin J.L. and Chen, Jan and Ma, Yunshan and Zheng, Huanhuan and Chua, Tat-Seng},
  journal={arXiv preprint arXiv:2511.08616},
  year={2025}
}
```

For the implementation itself, you may cite this repository:
```
Chirinda, C. (2025). A Production-Grade Implementation of "Reasoning on Time-Series for Financial Technical Analysis".
GitHub repository: https://github.com/chirindaopensource/reasoning_time_series_financial_technical_analysis
```

## Acknowledgments

-   Credit to **Kelvin J.L. Koa, Jan Chen, Yunshan Ma, Huanhuan Zheng, and Tat-Seng Chua** for the foundational research that forms the entire basis for this computational replication.
-   This project is built upon the exceptional tools provided by the open-source community. Sincere thanks to the developers of the scientific Python ecosystem, including **PyTorch, Hugging Face (Transformers, PEFT), Pandas, NumPy, Numba, CVXPY, and Scikit-learn**.

--

*This README was generated based on the structure and content of the `reasoning_time_series_financial_technical_analysis_draft.ipynb` notebook and follows best practices for research software documentation.*

# Paper

Title: "*Reasoning on Time-Series for Financial Technical Analysis*"

Authors: Kelvin J.L. Koa, Jan Chen, Yunshan Ma, Huanhuan Zheng, Tat-Seng Chua

E-Journal Submission Date: 6 November 2025

Conference Affiliation: International Conference on AI in Finance (ICAIF) 2025 - Best Paper

Link: https://arxiv.org/abs/2511.08616

Abstract:

While Large Language Models have been used to produce interpretable stock forecasts, they mainly focus on analyzing textual reports but not historical price data, also known as Technical Analysis. This task is challenging as it switches between domains: the stock price inputs and outputs lie in the time-series domain, while the reasoning step should be in natural language. In this work, we introduce Verbal Technical Analysis (VTA), a novel framework that combine verbal and latent reasoning to produce stock time-series forecasts that are both accurate and interpretable. To reason over time-series, we convert stock price data into textual annotations and optimize the reasoning trace using an inverse Mean Squared Error (MSE) reward objective. To produce time-series outputs from textual reasoning, we condition the outputs of a time-series backbone model on the reasoning-based attributes. Experiments on stock datasets across U.S., Chinese, and European markets show that VTA achieves state-of-the-art forecasting accuracy, while the reasoning traces also perform well on evaluation by industry experts.

# Summary

### **Professor's Summary: Koa et al., "Reasoning on Time-Series for Financial Technical Analysis"**

#### **The Core Problem and Research Gap**

The authors begin by correctly identifying a significant gap in the application of Large Language Models (LLMs) to finance. While LLMs like BloombergGPT have proven adept at processing textual data‚Äîsuch as news, sentiment, and financial reports‚Äîthey have largely neglected the domain of **Technical Analysis**.

Technical Analysis is the art and science of forecasting future price movements based on historical price and volume data. This is a fundamentally different task from text processing. It involves a challenging **domain switch**:

1.  **Input:** Numerical time-series data (stock prices, volume, etc.).
2.  **Process:** Human-like reasoning based on established technical indicators (e.g., Moving Averages, RSI, Bollinger Bands). This reasoning is expressed in natural language.
3.  **Output:** A numerical time-series forecast.

The paper posits that existing models fail here. Traditional time-series models are often "black boxes" that lack interpretability, while direct application of LLMs to numerical forecasting is suboptimal and loses the model's core language capabilities.

#### **The Proposed Solution ‚Äî Verbal Technical Analysis (VTA)**

To address this, the authors introduce a novel framework called **Verbal Technical Analysis (VTA)**. The core idea is to create a hybrid, dual-process system that mimics a human analyst working with a quantitative model. It elegantly separates the task into two components:

1.  **Verbal Reasoning ("The Analyst"):** An LLM is trained to analyze the time-series data and produce an interpretable, natural language reasoning trace that explains its forecast.
2.  **Latent Thinking ("The Quant"):** A powerful, LLM-based time-series forecasting model generates the actual numerical predictions.

The key innovation is not just having these two components, but making them work synergistically. The verbal reasoning from the "analyst" is used to guide and improve the numerical output of the "quant," making the final forecast both more accurate and fully explainable.

#### **The Methodological Breakdown**

The VTA framework is implemented through a sophisticated three-part methodology.

**Part A: Teaching the LLM to Reason (Verbal Reasoning)**

This is perhaps the most novel part of the paper. How do you teach an LLM to perform technical analysis without a massive dataset of human analysts' thoughts?

1.  **Input Annotation:** The raw time-series data (price, volume) is first converted into a textual format the LLM can understand. This involves calculating key statistics (min, max, mean) and a suite of standard **technical indicators** (SMA, EMA, Momentum, etc.). This annotated text serves as the prompt for the LLM.
2.  **Reinforcement Learning for Optimization:** The LLM is then fine-tuned using a Reinforcement Learning (RL) objective called **Time-GRPO**. The crucial element here is the **reward function**. Instead of rewarding for stylistic correctness, the model is rewarded based on the **accuracy of the forecast it implies**. Specifically, the reward is the *inverse Mean Squared Error (MSE)*. This brilliantly incentivizes the LLM to generate reasoning that leads to quantitatively accurate predictions.
3.  **Multi-Stage Training:** This RL process is conducted in stages (cold-start, rejection sampling, and final optimization) to ensure the model learns effectively and produces high-quality reasoning.

**Part B: The Backbone Forecasting Model (Latent Thinking)**

For the numerical forecasting, the authors employ a state-of-the-art LLM-based time-series model (specifically, one similar to CALF). This model works by aligning the embedding space of the time-series data with the LLM's internal word embedding space. This allows the powerful transformer architecture, originally designed for language, to effectively capture complex temporal patterns in the numerical data. On its own, this model is powerful but not interpretable.

**Part C: Fusing Reasoning and Forecasting (Joint Conditional Training)**

This is the final, critical step where the two components are integrated.

1.  **Attribute Extraction:** The system takes the verbal reasoning trace generated by the LLM in Part A and extracts key descriptive attributes (e.g., predicted max value, min value, mean value).
2.  **Conditional Forecasting:** The numerical forecasting model from Part B is then conditioned on these attributes. In essence, the model is asked to produce a forecast *given that the reasoning trace is true*.
3.  **Classifier-Free Guidance:** Drawing inspiration from image generation models, the final forecast is a weighted average of the *conditional* forecast (guided by the reasoning) and an *unconditional* forecast (the model's raw prediction). A "guidance scale" parameter controls how much influence the verbal reasoning has on the final output. This joint training preserves both the raw accuracy of the forecasting model and the interpretability from the reasoning model.

#### **Experimental Validation and Results**

The authors conduct a rigorous and comprehensive evaluation across multiple dimensions.

1.  **Forecasting Accuracy:** On several stock market datasets (StockNet, Dow Jones, China A50, etc.), VTA achieves **state-of-the-art performance**, outperforming 14 other models, including traditional time-series transformers and other time-series LLMs. It consistently yields lower MSE and MAE.
2.  **Reasoning Quality:** This is a crucial test of their "interpretability" claim. They surveyed 25 financial industry experts (from firms like JPMorgan and UBS), who blindly rated the reasoning traces from VTA against those from other powerful LLMs. VTA's outputs were rated significantly higher across all criteria, especially in **Depth, Accuracy, and Relevance**‚Äîthe metrics most tied to genuine technical expertise.
3.  **Practical Utility (Portfolio Optimization):** To demonstrate real-world value, the forecasts from VTA were used to construct **Markowitz-optimized portfolios**. The resulting portfolio achieved the **highest Sharpe Ratio** among all tested models, indicating superior risk-adjusted returns. This is a strong validation from a practical, econometric perspective.

#### **Conclusion and Professor's Critique**

**Conclusion:** The paper successfully demonstrates that a hybrid framework combining verbal reasoning and latent time-series forecasting can achieve superior results in both accuracy and interpretability for financial technical analysis. The Time-GRPO objective with an inverse MSE reward is a clever mechanism for training quantitative reasoning without direct supervision.

**Critique and Outlook:**

*   **Strengths:** The dual-process "Analyst + Quant" paradigm is conceptually elegant and powerful. The methodology is technically sophisticated, particularly the use of a forecast-accuracy-based reward signal in the RL loop and the classifier-free guidance for conditioning. The three-pronged evaluation (accuracy, expert review, portfolio performance) is exceptionally thorough.
*   **Potential Limitations:** The model's reasoning is constrained by the initial set of technical indicators provided in the annotation step. It can reason about them, but it cannot invent new ones. Furthermore, the system's performance during unprecedented market events or "black swans" remains an open question, as its knowledge is derived from historical patterns.
*   **Future Directions:** This work opens up exciting avenues. The framework could be extended to incorporate multi-modal data, such as news text and sentiment, into the reasoning process. It could also be applied to other domains where numerical data requires interpretable reasoning, such as medical chart analysis or economic forecasting.

In summary, Koa et al. have produced a first-rate piece of research that represents a significant step towards creating true hybrid intelligence in quantitative finance‚Äîsystems that not only predict accurately but can also explain *why* they are making those predictions in a manner that a human expert can trust and verify.

# Import Essential Modules


In [None]:
#!/usr/bin/env python3
# ==============================================================================
#
#  Verbal Technical Analysis (VTA): A Framework for Self-Explaining Financial Forecasting
#
#  This module provides a complete, production-grade implementation of the
#  Verbal Technical Analysis (VTA) framework presented in "Reasoning on Time-Series
#  for Financial Technical Analysis" by Koa et al. (2025). It delivers a novel,
#  hybrid system that fuses the pattern recognition capabilities of deep learning
#  time-series models with the reasoning and natural language generation abilities
#  of Large Language Models (LLMs). The result is a dual-output model that produces
#  both a state-of-the-art numerical forecast and a coherent, data-driven analytical
#  narrative explaining the forecast's rationale.
#
#  Core Methodological Components:
#  ‚Ä¢ A three-stage Reinforcement Learning (RL) pipeline (Time-GRPO) to train an
#    LLM to perform financial technical analysis from numerical data.
#  ‚Ä¢ A dual-branch, cross-modal transformer backbone for deep time-series feature extraction.
#  ‚Ä¢ A Classifier-Free Guidance (CFG) mechanism to fuse the LLM's high-level
#    reasoning with the backbone's numerical prediction.
#  ‚Ä¢ A complete, end-to-end experimental harness for data processing, multi-stage
#    model training, baseline comparison, and evaluation.
#
#  Technical Implementation Features:
#  ‚Ä¢ High-performance, vectorized, and JIT-compiled technical indicator calculations.
#  ‚Ä¢ Parameter-Efficient Fine-Tuning (PEFT) using Low-Rank Adaptation (LoRA).
#  ‚Ä¢ A robust, resumable pipeline with atomic artifact management for long-running experiments.
#  ‚Ä¢ A professional-grade backtesting engine using Markowitz portfolio optimization
#    to assess the practical financial utility of model forecasts.
#  ‚Ä¢ Comprehensive hyperparameter sensitivity and model ablation analysis framework.
#
#  Paper Reference:
#  Koa, K. J. L., Chen, J., Ma, Y., Zheng, H., & Chua, T.-S. (2025).
#  Reasoning on Time-Series for Financial Technical Analysis.
#  arXiv preprint arXiv:2511.08616. https://arxiv.org/abs/2511.08616
#
#  Author: CS Chirinda
#  License: MIT
#  Version: 1.0.0
#
# ==============================================================================
# ==============================================================================
# Fused Imports for the Complete VTA Pipeline
# ==============================================================================

# ------------------------------------------------------------------------------
# Standard Library Imports
# ------------------------------------------------------------------------------

import argparse
import copy
import logging
import os
import pickle
import random
import re
from collections.abc import Mapping
from dataclasses import dataclass
from datetime import datetime
from typing import (Any, Dict, List, Literal, NamedTuple, Optional, Tuple,
                    Union)

# ------------------------------------------------------------------------------
# Third-Party Imports
# ------------------------------------------------------------------------------

import cvxpy as cp
import numba
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.decomposition import PCA
from torch.utils.data import DataLoader, Dataset, TensorDataset
from tqdm import tqdm
from transformers import (AutoModel, AutoModelForCausalLM, AutoTokenizer,
                          DataCollatorForLanguageModeling, GenerationConfig,
                          PreTrainedModel, PreTrainedTokenizer, Trainer,
                          TrainingArguments)

# `exchange_calendars` is an optional dependency for data cleansing.
try:
    import exchange_calendars as xcals
except ImportError:
    xcals = None

# `peft` is a core dependency for LoRA.
try:
    from peft import LoraConfig, PeftModel, get_peft_model
except ImportError:
    LoraConfig, PeftModel, get_peft_model = None, None,


# Implementation

## Draft 1

### **Analysis of Final Orchestrator Callables**

#### **Callable: `validate_inputs_and_config` (Task 1)**

*   **Inputs:**
    1.  `market_data_df`: A raw `pandas.DataFrame` with a potential `MultiIndex` and OHLCV columns.
    2.  `config`: The master `VTA_MASTER_CONFIG` dictionary containing all project hyperparameters and schemas.

*   **Process:**
    1.  The function sequentially invokes a series of specialized validation helpers.
    2.  It first validates the structural integrity of `market_data_df`: presence and correctness of the `['date', 'ticker']` `MultiIndex`, column names, and data types.
    3.  It then validates the numerical and chronological integrity: checking for non-positive/non-finite values and ensuring dates are monotonically increasing for each ticker.
    4.  It recursively traverses the `config` dictionary to ensure no `"REQUIRED_"` placeholders remain, guaranteeing the experiment is fully specified.
    5.  It performs cross-consistency checks on key parameters within the `config` (e.g., ensuring `T` and `T'` are consistent across different sections).
    6.  It validates the `prompt_template` configuration, ensuring all necessary placeholders exist and the regex for parsing predictions is valid.
    7.  If any validation check fails, it aggregates all errors and raises a single, comprehensive `ValueError`.

*   **Outputs:**
    *   `None`. The function returns nothing upon success. Its purpose is to act as a gatekeeper, halting execution via an exception if inputs are invalid.

*   **Data Transformation:** This function performs no data transformation. It is a pure validation and assertion function that inspects the inputs without modifying them.

*   **Role in Research Pipeline:** This callable is the foundational **Pre-flight Check** of the entire experiment. It ensures that the raw data and the experimental configuration are perfectly aligned with the assumptions and requirements of all subsequent stages. It programmatically enforces the structural integrity required to instantiate the mathematical objects of the study, such as the time-series window \(X\).



#### **Callable: `cleanse_and_prepare_data` (Task 2)**

*   **Inputs:**
    1.  `raw_market_data_df`: The structurally validated `DataFrame` from Task 1.
    2.  `ticker_to_market_map`: A dictionary mapping tickers to their exchange market (e.g., `'AAPL': 'US'`).

*   **Process:**
    1.  The function executes a three-step cleansing pipeline in a strict sequence.
    2.  **Invalid Data Removal:** It first creates boolean masks to identify and remove any rows containing non-finite, non-positive prices or invalid volume figures.
    3.  **Consistency Enforcement:** It then applies another set of masks to remove rows with logically inconsistent OHLC data (e.g., `Low > High`).
    4.  **Calendar Alignment:** Finally, it uses the `exchange_calendars` library to identify and remove any rows corresponding to non-trading days (weekends, market holidays) for each ticker's specific market.

*   **Outputs:**
    1.  `cleansed_df`: A new `pandas.DataFrame` containing only valid, consistent, chronologically sorted data on official trading days.
    2.  `reports`: A dictionary of `DataFrames`, providing a detailed audit trail of how many rows were dropped at each stage and for what reason.

*   **Data Transformation:** This function transforms the raw, potentially noisy `DataFrame` into a pristine, analysis-ready `DataFrame`. The transformation is subtractive; it filters out invalid rows, reducing the number of samples while increasing the quality and reliability of the dataset.

*   **Role in Research Pipeline:** This callable implements the **Data Sanitization** phase. Its purpose is to ensure the time-series data is free from corruption and temporal inconsistencies. This is a critical prerequisite for the windowing process in Task 3, guaranteeing that every constructed window \(X\) consists of a clean, unbroken sequence of valid trading days.



#### **Callable: `construct_windows_and_split_data` (Task 3)**

*   **Inputs:**
    1.  `cleansed_df`: The sanitized `DataFrame` from Task 2.
    2.  `config`: The master configuration dictionary.
    3.  `dataset_name`: A string specifying the dataset to determine the splitting strategy.

*   **Process:**
    1.  **Windowing:** The function iterates through the `cleansed_df` on a per-ticker basis. Using `numpy.lib.stride_tricks.as_strided` for memory efficiency, it creates overlapping views of the data to construct the input windows `X` and target windows `y`.
    2.  **Metadata Generation:** For each window, it generates a rich metadata record, including the `ticker`, `window_start_date`, `window_end_date`, and the actual series of dates within the window.
    3.  **Splitting:** It then partitions the complete set of windows into `train`, `val`, and `test` sets based on the `window_end_date` and the specified `dataset_name` strategy (chronological percentage or fixed date cutoff).
    4.  **Reporting:** Finally, it generates a summary report detailing the number of windows in each split for each ticker.

*   **Outputs:**
    1.  `data_splits`: A dictionary containing the `train`, `val`, and `test` data, where each is a sub-dictionary holding the `X` windows, `y` targets, and `metadata`.
    2.  `summary_report`: A `DataFrame` summarizing the window counts.
    3.  `excluded_tickers`: A list of tickers with insufficient data for training.

*   **Data Transformation:** This is the primary data structuring step of the pipeline. It transforms the 2D `(time, features)` `DataFrame` into a 3D `(samples, time_steps, features)` `numpy` array format required by deep learning models.

*   **Role in Research Pipeline:** This callable is the implementation of the **Supervised Learning Problem Formulation**. It directly instantiates the core mathematical objects of the study:
    *   The historical input window: \(X = \{x_{t-T+1}, \dots, x_t\}\) where \(x_\tau \in \mathbb{R}^6\).
    *   The future price trajectory target: \(y = \{p_{t+1}, \dots, p_{t+T'}\}\).
    It creates the complete dataset of `(X, y)` pairs that will be used in all subsequent modeling tasks.



#### **Callable: `compute_technical_annotations` (Task 4)**

*   **Inputs:**
    1.  `X_windows`: A 3D `numpy` array of input windows.
    2.  `config`: The master configuration dictionary.

*   **Process:**
    1.  **Statistics Calculation:** It first computes basic descriptive statistics (mean, min, max) for each of the 6 features across the time dimension of every window in a vectorized manner.
    2.  **Indicator Calculation:** It then invokes a series of specialized, high-performance helper functions to calculate the full suite of 10 financial technical indicators (SMA, EMA, RSI, ADX, etc.) for each window, as defined in Appendix A, Table 6. These helpers are vectorized with `numpy` and JIT-compiled with `numba` for maximum performance.
    3.  **Serialization:** Finally, it takes the numerical results from the previous two steps and serializes them into formatted, human-readable strings (`statistics_blocks` and `indicators_blocks`), handling `NaN` values gracefully.

*   **Outputs:**
    1.  `annotations`: A dictionary containing two lists of strings: `statistics_blocks` and `indicators_blocks`, perfectly aligned with the input `X_windows`.

*   **Data Transformation:** This function transforms the numerical `X_windows` array into a structured textual representation. It is a feature engineering step that converts raw price-volume data into the domain-specific language of technical analysis.

*   **Role in Research Pipeline:** This callable is the direct implementation of the **Textual Annotation Mapping**, a core novelty of the VTA framework. It instantiates the function \(f\) from the equation:
    \[ X' = f(X) \]
    where \(X\) is the numerical time-series window and \(X'\) is its textual, annotated representation. This \(X'\) is the primary input for the reasoning LLM.



#### **Callable: `assemble_and_validate_prompts` (Task 5)**

*   **Inputs:**
    1.  `X_windows`: The 3D `numpy` array of input windows.
    2.  `metadata`: The rich metadata `DataFrame` from Task 3.
    3.  `annotations`: The dictionary of annotation strings from Task 4.
    4.  `config`: The master configuration dictionary.

*   **Process:**
    1.  **Prompt Assembly:** The function iterates through each window. For each window, it populates the `prompt_template` string from the `config` with the corresponding data: ticker, dates from `metadata`; the formatted `ohlcv_table` (created from `X_windows` and `metadata`); and the `statistics_block` and `indicators_block` from `annotations`.
    2.  **Tokenization & Validation:** It then tokenizes every generated prompt string using the specified LLM's tokenizer and verifies that the token count does not exceed the model's context limit.
    3.  **Parser Instantiation:** It instantiates and returns an `LLMOutputParser` object, which is configured with the parsing rules from the `config`.

*   **Outputs:**
    1.  `prompts`: A list of fully-formed, validated prompt strings `q`.
    2.  `parser`: An `LLMOutputParser` instance ready for use in the training loop.

*   **Data Transformation:** This function performs the final transformation of all prepared data artifacts (`X`, `X'`, metadata) into the exact textual format `q` that the reasoning LLM will receive as input.

*   **Role in Research Pipeline:** This callable implements the **Prompt Engineering** stage. It creates the final input `q` for the reasoning policy `œÄŒ∏(o|q)`. The quality and consistency of these prompts are critical for the success of the subsequent RL training stages.



#### **Callables: `train_reasoning_llm_stage1`, `train_reasoning_llm_stage2`, `train_reasoning_llm_stage3` (Tasks 6, 7, 8)**

*   **Inputs:**
    *   Stage 1: Training prompts `q` and targets `y`.
    *   Stage 2: Artifacts from Stage 1 (experiences DataFrame, model path).
    *   Stage 3: Best model checkpoint from Stage 2, training prompts `q`, and targets `y`.

*   **Process:** These three orchestrators collectively implement the paper's three-stage training pipeline for the reasoning model `œÄŒ∏`.
    1.  **Stage 1 (Cold-Start RL):** Initializes a base LLM with LoRA adapters and trains it using the Time-GRPO objective function. It repeatedly samples groups of `G` responses, calculates a combined format and inverse-MSE reward, computes group-relative advantages, and updates the policy using the PPO-style clipped surrogate objective with a KL penalty. It saves all generated experiences.
    2.  **Stage 2 (Rejection Sampling & SFT):** Filters the experiences from Stage 1, keeping only the top-performing samples (bottom 10% MSE) within each `(ticker, time_period)` bucket. It then performs standard Supervised Fine-Tuning (SFT) on this "golden" dataset to distill high-quality reasoning patterns into the model.
    3.  **Stage 3 (Performance RL):** Initializes the model from the best SFT checkpoint and resumes RL training with the same Time-GRPO objective as Stage 1. This final phase fine-tunes the policy for maximum forecast accuracy.

*   **Outputs:**
    *   Stage 1: A dictionary containing the trained LoRA model path and a `DataFrame` of all collected experiences.
    *   Stage 2: The file path to the best SFT model checkpoint.
    *   Stage 3: The file path to the final, best-performing reasoning model `œÄŒ∏`.

*   **Data Transformation:** These functions transform a dataset of prompts into a fully trained, parameter-efficient reasoning model. The data transformation is the learning process itself, where the model's weights (specifically, the LoRA adapters) are optimized.

*   **Role in Research Pipeline:** These callables are the implementation of the **Verbal Reasoner Training**. They are the heart of the paper's methodology for teaching an LLM to perform technical analysis. They directly implement the core optimization algorithms:
    *   The inverse-MSE reward: \( r_{\mathrm{MSE}}(\theta) = 1 / (\lambda \cdot \lVert \hat{y}_\theta - y \rVert^2 + \epsilon) \)
    *   The group-relative advantage: \( A_i = (r_i - \mathrm{mean}(\{r_j\})) / (\mathrm{std}(\{r_j\}) + \epsilon) \)
    *   The Time-GRPO loss function (a variant of Equation 2): \( \mathcal{L}_{\text{total}} = \mathcal{L}_{\text{CLIP}} + \mathcal{L}_{\text{VF}} + \beta D_{\mathrm{KL}}(\pi_\theta || \pi_{\mathrm{ref}}) \)



#### **Callable: `train_forecasting_backbone` (Task 9)**

*   **Inputs:** The `data_splits` dictionary containing `X_train` and `X_val`.
*   **Process:**
    1.  Initializes the `ForecastingBackbone` model `œÜ`, which includes performing PCA on a base LLM's embeddings to create the language prototypes `D`.
    2.  Runs a standard supervised training loop. In each step, it performs a forward pass to get outputs from both the temporal and textual branches.
    3.  It computes a combined loss consisting of the Feature Regularization Loss and the Output Alignment Loss.
    4.  It uses a validation set to monitor performance and saves the best model checkpoint.

*   **Outputs:** The file path to the best trained `ForecastingBackbone` model `œÜ`.

*   **Data Transformation:** This function transforms the training dataset into a trained `ForecastingBackbone` model.

*   **Role in Research Pipeline:** This callable implements the training of the **Latent Thinker** (`œÜ`). It directly implements the novel training objective designed to align the time-series domain with the language domain:
    *   Cross-Modal Alignment via cross-attention (Equation 4): \( X_{\text{text}} = \text{Softmax}(\frac{QK^T}{\sqrt{C}})V \)
    *   Feature Regularization Loss (Equation 5): \( \mathcal{L}_{\mathrm{feature}} = \sum \gamma^{(N-n)} \mathrm{sim}(\phi_{\text{text}}(F_{\text{text}}^n), \phi_{\text{time}}(F_{\text{time}}^n)) \)
    *   Output Alignment Loss (Equation 6): \( \mathcal{L}_{\mathrm{output}} = \mathrm{sim}(\hat{y}_{\mathrm{time}}, \hat{y}_{\mathrm{text}}) \)



#### **Callable: `train_conditional_forecaster` (Task 10)**

*   **Inputs:** Data splits, prompts, and paths to the trained reasoning model `œÄŒ∏` and backbone model `œÜ`.
*   **Process:**
    1.  **Attribute Derivation:** It first uses the trained `œÄŒ∏` to generate forecasts for all training/validation samples and derives the conditioning attributes `c = [min, mean, max]` for each.
    2.  **Model Initialization:** It initializes the `ConditionalFusionModel` `œà`, loading the pre-trained backbone `œÜ` into it.
    3.  **Training:** It runs a supervised training loop. In each forward pass, it randomly "drops" the conditioning vector `c` with probability `p_uncond`, replacing it with a null token. The loss is the MSE between the model's output and the ground truth `y`.
    4.  **Validation:** The validation loop is crucial. It performs the full, two-pass guided inference procedure to calculate the final forecast `≈∑` and computes the validation loss on this guided output. It saves the best model based on this metric.

*   **Outputs:** The file path to the best trained `ConditionalFusionModel` `œà`.

*   **Data Transformation:** This function transforms the trained `œÄŒ∏` and `œÜ` models, along with the training data, into the final, trained fusion model `œà`.

*   **Role in Research Pipeline:** This callable implements the **Joint Conditional Training** stage (Section 3.4). This is the final fusion step that teaches the system how to blend the latent and verbal reasoning pathways. It directly implements:
    *   The classifier-free guidance training objective (Equation 7): \( \mathcal{L}_{\mathrm{forecast}}(\psi) = \mathbb{E} \big[ \lVert \hat{y}_\psi(X, \tilde{c}) - y \rVert^2 \big] \), where `ƒç` is randomly dropped.
    *   The validation process uses the guided inference equation (Equation 9): \( \hat{y} = \hat{y}_\phi(X) + s \cdot \big( \hat{y}_\psi(X, c) - \hat{y}_\phi(X) \big) \)



#### **Callable: `run_end_to_end_inference` (Task 11)**

*   **Inputs:** Test data, test prompts, and paths to the final trained `œÄŒ∏` and `œà` models.
*   **Process:** For each sample in the test set:
    1.  It generates the narrative `v_Œ∏` and the initial forecast `≈∑_Œ∏` using the reasoning model `œÄŒ∏`.
    2.  It derives the conditioning attributes `c` from `≈∑_Œ∏`.
    3.  It performs the two-pass guided inference using the fusion model `œà` to get the final forecast `≈∑`.
    4.  It handles cases where `œÄŒ∏` fails to generate a valid output by using a fallback strategy (using the unconditional forecast).

*   **Outputs:** A `pandas.DataFrame` containing the final dual output for each test sample: the guided numerical forecast `≈∑` and the textual narrative `v_Œ∏`.

*   **Data Transformation:** This function transforms the test set inputs into the final predictions and explanations.

*   **Role in Research Pipeline:** This is the **Inference Engine**. It executes the complete, trained VTA framework to produce the final results that are then evaluated in the subsequent tasks.



#### **Callable: `train_and_evaluate_baselines` (Task 12)**

*   **Inputs:** The `data_splits`.
*   **Process:** It uses a generic, rigorous training harness to train and evaluate the DLinear and TSMixer models. For each baseline, it runs a full training loop with validation-based checkpointing and then evaluates the best checkpoint on the test set to get MSE and MAE.
*   **Outputs:** A dictionary containing the final test metrics for each baseline model.

*   **Role in Research Pipeline:** This callable implements the **Baseline Model Evaluation**. Its purpose is to provide a set of strong, standard benchmarks against which the performance of the VTA model can be fairly compared.



#### **Callable: `evaluate_performance_and_utility` (Task 13)**

*   **Inputs:** Inference results for all models (VTA and baselines), ground truth data, and the `cleansed_df`.
*   **Process:**
    1.  **Error Metrics:** It computes and aggregates the MSE and MAE for all models, creating a comparison table.
    2.  **Portfolio Backtest:** For each model, it runs a daily rebalancing backtest. On each day, it uses the model's forecasts to solve a Markowitz mean-variance optimization problem to get portfolio weights. It then calculates the realized return using actual market data.
    3.  **Performance Metrics:** From the resulting daily return series of the backtest, it calculates annualized return, volatility, Sharpe ratio, and maximum drawdown.

*   **Outputs:** A dictionary containing two `DataFrames`: one for the error metrics and one for the portfolio performance metrics.

*   **Role in Research Pipeline:** This is the final **Results Analysis** stage. It translates the raw model predictions into the final, interpretable tables (Table 2 and Table 5 from the paper) that quantify the statistical accuracy and financial utility of the VTA framework relative to the baselines.



#### **Callable: `run_vta_pipeline` (Task 14)**

*   **Inputs:** Raw data, master config, and data mappings.
*   **Process:** This is the master orchestrator. It initializes the experimental environment (setting seeds, configuring logging, creating a run directory), then sequentially calls the data preparation sub-orchestrator (`_run_data_preparation_pipeline`) and the modeling sub-orchestrator (`_run_modeling_pipeline`). It manages the overall workflow, including the `dry_run` mode.
*   **Outputs:** A comprehensive dictionary containing all artifacts from the entire run.

*   **Role in Research Pipeline:** This is the **Experiment Execution Engine**. It provides the single, top-level entry point to run the entire research project from start to finish in a reproducible, auditable, and resumable manner.

<br><br>

### **Usage Examples**

Here is a complete, example of how to use the end-to-end pipeline, starting from data generation and configuration loading, through to the final execution call.

### **Pre-Execution Discussion**

The goal is to create a self-contained, executable example that demonstrates the use of the `main` orchestrator function. This requires three preparatory steps:

1.  **Synthetic Data Generation:** We cannot run the pipeline without realistic input data. We will create a helper function, `_generate_synthetic_market_data`, that uses `pandas` and `numpy` to generate a `market_data_df` DataFrame that perfectly matches the required schema. It will simulate realistic price movements (a random walk with drift) and ensure the OHLCV constraints (`Low <= Open/Close <= High`) are respected. This data will be saved to a local CSV file, `synthetic_market_data.csv`, mimicking a real-world scenario where data is loaded from disk.

2.  **Configuration Loading:** The `config.yaml` file is the master configuration. We will use the `PyYAML` library to load this file into a Python dictionary named `config`. This is the standard, robust way to manage configurations in a Python application.

3.  **Input Parameter Definition:** We will explicitly define the other necessary inputs for the `main` function: the `ticker_to_market_map`, `ticker_to_dataset_map`, `dataset_name`, and a `base_run_id`. These will be defined directly in the script for clarity.

4.  **Execution Call:** Finally, we will show the exact call to the `main` function, passing all the prepared inputs. The example will demonstrate how to run a `dry_run` for quick testing, which is a critical feature of the pipeline we designed.

This step-by-step process ensures that the example is not just a code snippet, but a complete, runnable demonstration of the entire project's entry point:
<br>

### **Implementation: End-to-End Pipeline Execution Example**

```python
# ==============================================================================
# Example Usage: Running the Complete VTA Pipeline
# ==============================================================================
# This script demonstrates how to use the top-level orchestrator (`main` function)
# to run the entire VTA research pipeline from start to finish.

# ------------------------------------------------------------------------------
# Preamble: Import necessary libraries for this example script
# ------------------------------------------------------------------------------
import yaml
import pandas as pd
import numpy as np
from typing import List, Dict

# Assume all previously defined orchestrators and helpers (Tasks 1-15) are
# available in the execution scope (e.g., in a single Jupyter notebook or imported).

# ------------------------------------------------------------------------------
# Step 1: Generate High-Fidelity Synthetic Input Data
# ------------------------------------------------------------------------------
# In a real-world scenario, this data would be sourced from a financial data
# provider. For this example, we generate a realistic, synthetic dataset that
# adheres to the required schema.

def _generate_synthetic_market_data(
    tickers: List[str],
    start_date: str,
    end_date: str,
    output_path: str
) -> None:
    """
    Generates a realistic, synthetic OHLCV dataset and saves it to a CSV file.

    Args:
        tickers: A list of ticker symbols to generate data for.
        start_date: The start date for the data range in 'YYYY-MM-DD' format.
        end_date: The end date for the data range.
        output_path: The file path to save the generated CSV.
    """
    print(f"Generating synthetic market data for tickers: {tickers}...")
    
    # Create a date range of business days.
    dates = pd.bdate_range(start=start_date, end=end_date)
    
    all_ticker_data = []
    
    for ticker in tickers:
        # Simulate a price series using a geometric random walk with a small drift.
        num_days = len(dates)
        returns = np.random.normal(loc=0.0005, scale=0.02, size=num_days)
        # Start the price at a random value between 50 and 200.
        initial_price = np.random.uniform(50, 200)
        price_series = initial_price * (1 + returns).cumprod()
        
        # Create realistic OHLC values around the simulated price series.
        # Ensure that Low <= Open/Close <= High.
        open_prices = price_series * np.random.uniform(0.995, 1.005, size=num_days)
        close_prices = price_series * np.random.uniform(0.995, 1.005, size=num_days)
        
        low_prices = np.minimum(open_prices, close_prices) * np.random.uniform(0.98, 1.0, size=num_days)
        high_prices = np.maximum(open_prices, close_prices) * np.random.uniform(1.0, 1.02, size=num_days)
        
        # Assume Adj Close is the same as Close for simplicity in this synthetic data.
        adj_close_prices = close_prices
        
        # Simulate trading volume.
        volume = np.random.randint(1_000_000, 50_000_000, size=num_days)
        
        # Create a DataFrame for the current ticker.
        ticker_df = pd.DataFrame({
            'Open': open_prices,
            'High': high_prices,
            'Low': low_prices,
            'Close': close_prices,
            'Adj Close': adj_close_prices,
            'Volume': volume
        }, index=dates)
        
        ticker_df['ticker'] = ticker
        all_ticker_data.append(ticker_df)
        
    # Concatenate data for all tickers and set the MultiIndex.
    market_data_df = pd.concat(all_ticker_data)
    market_data_df = market_data_df.reset_index().rename(columns={'index': 'date'})
    market_data_df = market_data_df.set_index(['date', 'ticker'])
    
    # Save the final DataFrame to the specified path.
    market_data_df.to_csv(output_path)
    print(f"Synthetic data saved to: {output_path}")

# --- Generate the data for our example run ---
# Define the universe of tickers for our synthetic dataset.
synthetic_tickers = ['SYNTH_A', 'SYNTH_B', 'SYNTH_C', 'SYNTH_D']
# Define the path where the data will be saved.
market_data_filepath = "./synthetic_market_data.csv"

# Execute the data generation function.
_generate_synthetic_market_data(
    tickers=synthetic_tickers,
    start_date='2020-01-01',
    end_date='2024-12-31',
    output_path=market_data_filepath
)

# ------------------------------------------------------------------------------
# Step 2: Load the Master Configuration File
# ------------------------------------------------------------------------------
# The `config.yaml` file, as defined previously, contains all hyperparameters.
# We load it into a Python dictionary.

config_filepath = "./config.yaml"

print(f"\nLoading master configuration from: {config_filepath}")
with open(config_filepath, 'r') as f:
    # Use the safe_load function to prevent arbitrary code execution.
    config = yaml.safe_load(f)
print("Configuration loaded successfully.")

# ------------------------------------------------------------------------------
# Step 3: Define Other Input Parameters for the Pipeline
# ------------------------------------------------------------------------------
# These are the data mappings required by the pipeline orchestrators. In a real
# project, these might also be loaded from a file.

# Map each ticker to its geographical market for calendar alignment.
ticker_to_market_map = {
    'SYNTH_A': 'US',
    'SYNTH_B': 'US',
    'SYNTH_C': 'US',
    'SYNTH_D': 'US'
}

# Map each ticker to its source dataset for correct metric aggregation.
# For this example, we'll assign them all to a mock "StockNet" dataset.
ticker_to_dataset_map = {
    'SYNTH_A': 'StockNet',
    'SYNTH_B': 'StockNet',
    'SYNTH_C': 'StockNet',
    'SYNTH_D': 'StockNet'
}

# Define the primary dataset name to guide the data splitting strategy.
# This must match a key in the `splits` section of the config.
dataset_name = "StockNet"

# Define a base identifier for this experimental run. All artifacts will be
# grouped under this name.
base_run_id = "vta_official_run"

# ------------------------------------------------------------------------------
# Step 4: Execute the End-to-End Pipeline
# ------------------------------------------------------------------------------
# This is the final step where we call the top-level `main` orchestrator.
# We will run it in `dry_run=True` mode first to demonstrate a quick test of
# the entire pipeline's integrity.

print(f"\n{'='*80}\nEXECUTING THE VTA PIPELINE\n{'='*80}")

try:
    # The main execution call.
    # We set `dry_run=True` for a quick, end-to-end validation of the code.
    # To run the full experiment, set `dry_run=False`.
    # To skip the sensitivity analysis, set `run_sensitivity=False`.
    final_artifacts = main(
        market_data_path=market_data_filepath,
        config=config,
        ticker_to_market_map=ticker_to_market_map,
        ticker_to_dataset_map=ticker_to_dataset_map,
        dataset_name=dataset_name,
        base_run_id=base_run_id,
        run_sensitivity=False, # Set to False for a quicker example run
        dry_run=True
    )
    
    print("\n--- PIPELINE EXECUTION SUCCEEDED ---")
    # You can now inspect the `final_artifacts` dictionary for all results.
    # For example, to see the final evaluation tables:
    # final_evals = final_artifacts['baseline_run_artifacts']['modeling_and_evaluation']['final_evaluation_results']
    # print("\nFinal Error Metrics:")
    # print(final_evals['error_metrics'])
    # print("\nFinal Portfolio Metrics:")
    # print(final_evals['portfolio_metrics'])
    
except Exception as e:
    print(f"\n--- PIPELINE EXECUTION FAILED ---")
    print(f"An error occurred: {e}")
    # In a production script, you would log the full traceback here.

```

<br>

In [None]:
# Task 1: Validate Input Schemas and Parameter Completeness

# ==============================================================================
# Task 1: Validate Input Schemas and Parameter Completeness
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 1, Step 1: Validate market_data_df structure and integrity
# ------------------------------------------------------------------------------

def _validate_market_data_df_structure(
    market_data_df: pd.DataFrame
) -> List[str]:
    """
    Validates the structure, index, and column properties of the market_data_df.

    Args:
        market_data_df: The DataFrame containing market data.

    Returns:
        A list of error messages. An empty list indicates successful validation.
    """
    # Initialize a list to aggregate validation error messages.
    errors = []

    # Check 1: Ensure the input is a pandas DataFrame.
    if not isinstance(market_data_df, pd.DataFrame):
        # If not a DataFrame, no other checks can be performed. Return immediately.
        return ["'market_data_df' must be a pandas DataFrame."]

    # Check 2: Validate the MultiIndex structure.
    if not isinstance(market_data_df.index, pd.MultiIndex):
        # The index must be a MultiIndex for subsequent checks.
        errors.append("DataFrame index must be a pandas MultiIndex.")
    elif market_data_df.index.nlevels != 2:
        # The MultiIndex must have exactly two levels.
        errors.append(f"MultiIndex must have 2 levels, but found {market_data_df.index.nlevels}.")
    elif list(market_data_df.index.names) != ["date", "ticker"]:
        # The index levels must be named 'date' and 'ticker' in that specific order.
        errors.append(f"Index names must be ['date', 'ticker'], but found {market_data_df.index.names}.")
    else:
        # Check 2a: Validate the dtypes of the index levels.
        if not pd.api.types.is_datetime64_any_dtype(market_data_df.index.get_level_values('date')):
            errors.append("Index level 'date' must have a datetime64 dtype.")
        if not pd.api.types.is_string_dtype(market_data_df.index.get_level_values('ticker')):
            errors.append("Index level 'ticker' must have a string/object dtype.")

    # Check 3: Validate the uniqueness of the (date, ticker) index pairs.
    if not market_data_df.index.is_unique:
        # Duplicate index entries can corrupt time-series analysis.
        errors.append("DataFrame has duplicate (date, ticker) index pairs.")

    # Check 4: Validate the required columns and their dtypes.
    required_columns = {
        "Open": "float64", "High": "float64", "Low": "float64",
        "Close": "float64", "Adj Close": "float64", "Volume": ["int64", "float64"]
    }
    missing_columns = set(required_columns.keys()) - set(market_data_df.columns)
    if missing_columns:
        # All required columns must be present for the model to function.
        errors.append(f"Missing required columns: {sorted(list(missing_columns))}.")

    for col, expected_dtype in required_columns.items():
        # Check dtypes for each required column.
        if col in market_data_df.columns:
            actual_dtype = str(market_data_df[col].dtype)
            # Volume can be int64 or float64, others must be float64.
            is_valid_dtype = (
                actual_dtype in expected_dtype
                if isinstance(expected_dtype, list)
                else actual_dtype == expected_dtype
            )
            if not is_valid_dtype:
                errors.append(f"Column '{col}' has dtype '{actual_dtype}', but expected {expected_dtype}.")

    # Return the aggregated list of errors.
    return errors

def _validate_market_data_df_integrity(
    market_data_df: pd.DataFrame,
    min_history: int
) -> List[str]:
    """
    Validates the numerical and chronological integrity of the market_data_df.

    Args:
        market_data_df: The DataFrame containing market data.
        min_history: The minimum number of trading days required per ticker (T + T').

    Returns:
        A list of error messages. An empty list indicates successful validation.
    """
    # Initialize a list to aggregate validation error messages.
    errors = []

    # Check 5: Validate numerical integrity of price and volume data.
    price_cols = ["Open", "High", "Low", "Close", "Adj Close"]
    for col in price_cols:
        # Prices must be positive and finite.
        if col in market_data_df.columns:
            if not market_data_df[col].gt(0).all():
                errors.append(f"Column '{col}' contains non-positive values.")
            if not np.isfinite(market_data_df[col]).all():
                errors.append(f"Column '{col}' contains non-finite (NaN or Inf) values.")

    if "Volume" in market_data_df.columns:
        # Volume must be non-negative and finite.
        if not market_data_df["Volume"].ge(0).all():
            errors.append("Column 'Volume' contains negative values.")
        if not np.isfinite(market_data_df["Volume"]).all():
            errors.append("Column 'Volume' contains non-finite (NaN or Inf) values.")

    # Check 6: Validate chronological order and minimum history per ticker.
    if 'ticker' in market_data_df.index.names:
        # Group data by ticker to perform per-asset checks.
        grouped_by_ticker = market_data_df.groupby(level='ticker', sort=False)

        # Check 6a: Chronological order (monotonic increasing dates).
        non_monotonic_tickers = [
            ticker for ticker, group in grouped_by_ticker
            if not group.index.get_level_values('date').is_monotonic_increasing
        ]
        if non_monotonic_tickers:
            errors.append(f"Date index is not strictly increasing for tickers: {non_monotonic_tickers[:5]}.")

        # Check 6b: Minimum history requirement.
        ticker_counts = grouped_by_ticker.size()
        insufficient_history_tickers = ticker_counts[ticker_counts < min_history].index.tolist()
        if insufficient_history_tickers:
            errors.append(
                f"Tickers with insufficient history (< {min_history} days): "
                f"{insufficient_history_tickers[:5]}."
            )

    # Return the aggregated list of errors.
    return errors

# ------------------------------------------------------------------------------
# Task 1, Step 2: Validate VTA_MASTER_CONFIG for completeness and consistency
# ------------------------------------------------------------------------------

def _find_required_keys(
    config: Dict[str, Any],
    path: str = ""
) -> List[str]:
    """
    Recursively traverses a dictionary to find placeholder values starting with "REQUIRED_".

    Args:
        config: The dictionary or sub-dictionary to scan.
        path: The current path for tracking nested keys (used internally).

    Returns:
        A list of full key paths that still contain "REQUIRED_" placeholders.
    """
    # Initialize a list to store paths of keys with placeholder values.
    unfilled_keys = []
    # Iterate over each key-value pair in the current dictionary level.
    for key, value in config.items():
        # Construct the full path for the current key.
        current_path = f"{path}.{key}" if path else key
        # If the value is another dictionary, recurse into it.
        if isinstance(value, Mapping):
            unfilled_keys.extend(_find_required_keys(value, current_path))
        # If the value is a string that starts with the placeholder prefix...
        elif isinstance(value, str) and value.startswith("REQUIRED_"):
            # ...add its full path to the list of unfilled keys.
            unfilled_keys.append(current_path)
    # Return the list of all found placeholder paths.
    return unfilled_keys

def _validate_config_consistency(config: Dict[str, Any]) -> List[str]:
    """
    Performs consistency checks across different sections of the configuration.

    Args:
        config: The master configuration dictionary.

    Returns:
        A list of consistency error messages.
    """
    # Initialize a list to aggregate consistency error messages.
    errors = []

    # Attempt to access nested keys with error handling.
    try:
        # Consistency Check 1: Prediction horizons must match.
        # The target specification horizon (T') must be identical to the task's prediction horizon.
        target_horizon = config["raw_data_schemas"]["market_data_df"]["target_spec"]["horizon"]
        pred_horizon = config["data_and_task_spec"]["prediction_horizon"]
        if target_horizon != pred_horizon:
            errors.append(
                f"Mismatch: raw_data_schemas.target_spec.horizon ({target_horizon}) != "
                f"data_and_task_spec.prediction_horizon ({pred_horizon})."
            )

        # Consistency Check 2: Fixed window size and stride must be as specified.
        # The paper defines a fixed input window size (T) of 10.
        input_window = config["data_and_task_spec"]["input_window_size"]
        if input_window != 10:
            errors.append(f"data_and_task_spec.input_window_size must be 10, but found {input_window}.")

        # The paper implies a stride of 1 for dense windowing.
        stride = config["data_and_task_spec"]["stride"]
        if stride != 1:
            errors.append(f"data_and_task_spec.stride must be 1, but found {stride}.")

    except KeyError as e:
        # If a required key for a consistency check is missing, report it.
        errors.append(f"Missing key for consistency check: {e}.")

    # Return the aggregated list of errors.
    return errors

# ------------------------------------------------------------------------------
# Task 1, Step 3: Validate prompt template and output format constraints
# ------------------------------------------------------------------------------

def _validate_prompt_config(config: Dict[str, Any]) -> List[str]:
    """
    Validates the reasoning model's prompt template and format enforcement rules.

    Args:
        config: The master configuration dictionary.

    Returns:
        A list of prompt-related validation error messages.
    """
    # Initialize a list to aggregate validation error messages.
    errors = []

    # Attempt to access nested keys with error handling.
    try:
        # Extract the prompt configuration section for easier access.
        prompt_config = config["reasoning_model_config"]["prompt_template"]
        template_str = prompt_config["template_string"]

        # Check 1: Ensure required placeholders are present in the template string.
        # The prompt must be able to display raw data, statistics, and indicators.
        required_placeholders = ["{ohlcv_table}", "{statistics_block}", "{indicators_block}"]
        for placeholder in required_placeholders:
            if placeholder not in template_str:
                errors.append(f"Prompt template is missing required placeholder: {placeholder}.")

        # Check 2: Validate format enforcement rules.
        # The model's output must be strictly structured for reliable parsing and reward calculation.
        enforcement_rules = prompt_config["format_enforcement"]
        if not enforcement_rules.get("require_think_tags"):
            errors.append("format_enforcement.require_think_tags must be True.")
        if not enforcement_rules.get("require_prediction_line"):
            errors.append("format_enforcement.require_prediction_line must be True.")

        # Check 3: Validate the prediction parsing regex.
        # The regex must be a valid pattern to be used for parsing.
        prediction_pattern = enforcement_rules.get("prediction_pattern")
        if not prediction_pattern or not isinstance(prediction_pattern, str):
            errors.append("format_enforcement.prediction_pattern must be a non-empty string.")
        else:
            try:
                # Attempt to compile the regex to check for syntax errors.
                re.compile(prediction_pattern)
            except re.error as e:
                errors.append(f"Invalid regex for prediction_pattern: {e}.")

        # Check 4: Validate stop sequences.
        # Stop sequences help prevent the model from generating extraneous text.
        stop_sequences = prompt_config.get("stop_sequences")
        if not isinstance(stop_sequences, list) or not all(isinstance(s, str) for s in stop_sequences):
            errors.append("stop_sequences must be a list of strings.")

    except KeyError as e:
        # If a required key for prompt validation is missing, report it.
        errors.append(f"Missing key for prompt validation: {e}.")

    # Return the aggregated list of errors.
    return errors

# ------------------------------------------------------------------------------
# Task 1, Orchestrator Function
# ------------------------------------------------------------------------------

def validate_inputs_and_config(
    market_data_df: pd.DataFrame,
    config: Dict[str, Any]
) -> None:
    """
    Orchestrates the validation of input data and the master configuration dictionary.

    This function serves as the main entry point for Task 1, sequentially calling
    all validation helpers. It aggregates any errors found and raises a single,
    comprehensive ValueError if any validation check fails.

    Args:
        market_data_df: The primary DataFrame containing historical market data,
                        expected to have a MultiIndex of ('date', 'ticker').
        config: The master configuration dictionary (VTA_MASTER_CONFIG) that
                governs the entire pipeline.

    Raises:
        ValueError: If any validation check fails, containing a detailed list
                    of all identified issues.
        TypeError: If inputs are of the wrong type.
    """
    # Perform initial type checking on the main inputs.
    if not isinstance(market_data_df, pd.DataFrame):
        raise TypeError("'market_data_df' must be a pandas DataFrame.")
    if not isinstance(config, dict):
        raise TypeError("'config' must be a dictionary.")

    # Initialize a list to aggregate all error messages from all validation steps.
    all_errors = []

    # --- Step 1: Validate market_data_df ---
    # First, validate the fundamental structure (index, columns, dtypes).
    structural_errors = _validate_market_data_df_structure(market_data_df)
    all_errors.extend(structural_errors)

    # Only proceed to integrity checks if the structure is fundamentally sound.
    if not structural_errors:
        # Calculate the minimum history required based on the config.
        try:
            min_history = (
                config["data_and_task_spec"]["input_window_size"] +
                config["data_and_task_spec"]["prediction_horizon"]
            )
            # Validate numerical and chronological integrity.
            integrity_errors = _validate_market_data_df_integrity(market_data_df, min_history)
            all_errors.extend(integrity_errors)
        except KeyError as e:
            all_errors.append(f"Cannot check data integrity due to missing config key: {e}")

    # --- Step 2: Validate VTA_MASTER_CONFIG ---
    # Check for any "REQUIRED_" placeholders that have not been filled.
    unfilled_keys = _find_required_keys(config)
    if unfilled_keys:
        all_errors.append(f"Configuration has unfilled 'REQUIRED' keys: {unfilled_keys}")

    # Check for consistency between different parts of the configuration.
    consistency_errors = _validate_config_consistency(config)
    all_errors.extend(consistency_errors)

    # --- Step 3: Validate prompt configuration ---
    # Check the prompt template and format enforcement rules.
    prompt_errors = _validate_prompt_config(config)
    all_errors.extend(prompt_errors)

    # --- Final Check ---
    # If any errors were found during any validation step...
    if all_errors:
        # ...compile them into a single, formatted error message and raise a ValueError.
        error_message = "Input validation failed with the following issues:\n" + \
                        "\n".join(f"- {error}" for error in all_errors)
        raise ValueError(error_message)

    # If no errors were found, the function completes silently, indicating success.
    print("Task 1: All input schemas and parameters validated successfully.")


In [None]:
# Task 2: Cleanse and Prepare Raw Market Data

# ==============================================================================
# Task 2: Cleanse and Prepare Raw Market Data
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 2, Step 1: Remove rows with missing or invalid data
# ------------------------------------------------------------------------------

def _remove_invalid_data(
    market_data_df: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Removes rows with missing, non-finite, or invalid numerical data.

    This function performs the first stage of cleansing by targeting data points
    that are fundamentally unusable for quantitative analysis. This includes:
    - Any row with null or non-finite values in price columns.
    - Any row with non-positive prices.
    - Any row with null, non-finite, or negative volume.

    Args:
        market_data_df: The raw DataFrame, assumed to have passed initial
                        structural validation from Task 1.

    Returns:
        A tuple containing:
        - A new DataFrame with invalid rows removed.
        - A summary DataFrame reporting the count of rows dropped per ticker
          for each specific reason.
    """
    # Create a copy to avoid modifying the original DataFrame.
    df = market_data_df.copy()

    # Initialize a DataFrame to log the reasons for row removal.
    # This provides a detailed audit trail of the cleansing process.
    report = pd.DataFrame(index=df.index)

    # Define the columns to check for price and volume integrity.
    price_cols = ["Open", "High", "Low", "Close", "Adj Close"]

    # --- Identify Invalid Data Points ---

    # Condition 1: Check for null or non-finite values in price columns.
    # np.isfinite checks for both NaN and infinity.
    report['is_non_finite_price'] = ~df[price_cols].apply(np.isfinite).all(axis=1)

    # Condition 2: Check for non-positive values in price columns.
    # Prices must be strictly positive.
    report['is_non_positive_price'] = df[price_cols].le(0).any(axis=1)

    # Condition 3: Check for null, non-finite, or negative volume.
    # Volume must be a non-negative, finite number.
    report['is_non_finite_volume'] = ~df['Volume'].apply(np.isfinite)
    report['is_negative_volume'] = df['Volume'] < 0

    # Combine all invalid volume conditions into a single flag.
    report['is_invalid_volume'] = report['is_non_finite_volume'] | report['is_negative_volume']

    # --- Filter Data and Generate Report ---

    # Create a final mask to identify any row that violates at least one condition.
    rows_to_drop_mask = (
        report['is_non_finite_price'] |
        report['is_non_positive_price'] |
        report['is_invalid_volume']
    )

    # Extract the tickers and reasons for the rows that will be dropped.
    dropped_report_data = report.loc[rows_to_drop_mask]

    # Generate a summary report by ticker and reason.
    # This aggregates the boolean flags to count violations.
    summary_report = (
        dropped_report_data
        .reset_index()
        .groupby('ticker')[['is_non_finite_price', 'is_non_positive_price', 'is_invalid_volume']]
        .sum()
        .astype(int)
    )
    summary_report = summary_report[summary_report.sum(axis=1) > 0]

    # Drop the identified invalid rows from the DataFrame.
    # We use the inverted mask to keep only the valid rows.
    cleaned_df = df[~rows_to_drop_mask].copy()

    # Re-sort the index to ensure chronological order is maintained after deletions.
    # This is critical for subsequent time-series operations.
    cleaned_df.sort_index(inplace=True)

    return cleaned_df, summary_report

# ------------------------------------------------------------------------------
# Task 2, Step 2: Enforce intra-day price consistency
# ------------------------------------------------------------------------------

def _enforce_price_consistency(
    market_data_df: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Removes rows where intra-day OHLC prices are logically inconsistent.

    This function enforces the fundamental logic of an OHLC bar:
    - The 'Low' must be the minimum price.
    - The 'High' must be the maximum price.
    Any row violating these rules represents corrupt data and is removed. The
    default outlier policy is "no winsorization", as specified.

    Args:
        market_data_df: The DataFrame after initial invalid data removal.

    Returns:
        A tuple containing:
        - A new DataFrame with inconsistent rows removed.
        - A summary DataFrame reporting the count of rows dropped per ticker
          due to consistency violations.
    """
    # Create a copy to ensure the original DataFrame is not modified.
    df = market_data_df.copy()

    # --- Identify Inconsistent Rows ---

    # Define the logical consistency constraints for an OHLC bar.
    # Using vectorized operations for maximum performance.
    inconsistency_mask = (
        (df['Low'] > df['Open']) |
        (df['Low'] > df['High']) |
        (df['Low'] > df['Close']) |
        (df['High'] < df['Open']) |
        (df['High'] < df['Close'])
    )

    # --- Filter Data and Generate Report ---

    # Generate a summary report of dropped rows before filtering.
    # This captures which tickers had consistency issues.
    dropped_tickers = df.loc[inconsistency_mask].index.get_level_values('ticker')
    summary_report = (
        dropped_tickers.value_counts()
        .to_frame('consistency_violations')
        .sort_index()
    )

    # Use the inverted mask to select only the rows that are consistent.
    cleaned_df = df[~inconsistency_mask].copy()

    # The index is already sorted from the previous step, so no re-sort is needed.

    return cleaned_df, summary_report

# ------------------------------------------------------------------------------
# Task 2, Step 3: Align trading calendars
# ------------------------------------------------------------------------------

def _align_to_trading_calendars(
    market_data_df: pd.DataFrame,
    ticker_to_market_map: Dict[str, str]
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Filters the DataFrame to include only rows on valid trading days.

    This function uses the `exchange_calendars` library to ensure that every
    data point corresponds to an actual trading day for its respective market,
    removing any data from weekends or market holidays. It does not forward-fill,
    as per the specified constraints.

    Args:
        market_data_df: The DataFrame after consistency checks.
        ticker_to_market_map: A dictionary mapping each ticker to its market
                                identifier (e.g., 'US', 'CN', 'EU').

    Returns:
        A tuple containing:
        - A new DataFrame with non-trading days removed.
        - A summary DataFrame reporting the count of rows dropped per ticker
          due to calendar misalignment.

    Raises:
        ImportError: If `exchange_calendars` is not installed.
        ValueError: If a ticker in the data is not in the map, or if a market
                    in the map is unsupported.
    """
    # --- Pre-computation and Validation ---
    try:
        # Dynamically import the required library.
        import exchange_calendars as xcals
    except ImportError:
        raise ImportError("Please install the 'exchange_calendars' library via 'pip install exchange_calendars'")

    # Create a copy to avoid modifying the original DataFrame.
    df = market_data_df.copy()

    # Map the high-level market identifiers from the config to specific
    # exchange codes recognized by the `exchange_calendars` library.
    market_to_exchange_code = {
        'US': 'XNYS',  # New York Stock Exchange
        'CN': 'SSE',   # Shanghai Stock Exchange
        'EU': 'XFRA',  # Frankfurt Stock Exchange
    }

    # Validate that all markets in the map are supported.
    unsupported_markets = set(ticker_to_market_map.values()) - set(market_to_exchange_code.keys())
    if unsupported_markets:
        raise ValueError(f"Unsupported markets found in ticker_to_market_map: {unsupported_markets}")

    # --- Calendar Alignment ---

    # Get the full date range of the dataset to pre-calculate all valid trading days.
    min_date, max_date = df.index.get_level_values('date').min(), df.index.get_level_values('date').max()

    # Pre-calculate valid trading days for each required calendar.
    # Storing them in a set provides O(1) average time complexity for lookups.
    valid_days_by_calendar = {
        market: set(xcals.get_calendar(code).sessions_in_range(min_date, max_date))
        for market, code in market_to_exchange_code.items()
        if market in set(ticker_to_market_map.values())
    }

    # Create a boolean mask to identify rows that are on valid trading days.
    # This is more efficient than filtering group by group.
    is_valid_trading_day = pd.Series(False, index=df.index)

    # Iterate through each ticker to apply its specific calendar rules.
    for ticker, group in df.groupby(level='ticker', sort=False):
        market = ticker_to_market_map.get(ticker)
        if market is None:
            raise ValueError(f"Ticker '{ticker}' not found in ticker_to_market_map.")

        # Get the pre-calculated set of valid days for the ticker's market.
        valid_days = valid_days_by_calendar[market]

        # Get the dates for the current ticker group.
        ticker_dates = group.index.get_level_values('date').normalize()

        # Update the mask for the current ticker's indices.
        # A date is valid if it exists in the pre-calculated set.
        is_valid_trading_day.loc[group.index] = ticker_dates.isin(valid_days)

    # --- Filter Data and Generate Report ---

    # Generate a summary report of rows dropped due to calendar misalignment.
    dropped_tickers = df.loc[~is_valid_trading_day].index.get_level_values('ticker')
    summary_report = (
        dropped_tickers.value_counts()
        .to_frame('calendar_violations')
        .sort_index()
    )

    # Apply the final mask to keep only the valid trading days.
    cleaned_df = df[is_valid_trading_day].copy()

    return cleaned_df, summary_report

# ------------------------------------------------------------------------------
# Task 2, Orchestrator Function
# ------------------------------------------------------------------------------

def cleanse_and_prepare_data(
    raw_market_data_df: pd.DataFrame,
    ticker_to_market_map: Dict[str, str]
) -> Tuple[pd.DataFrame, Dict[str, pd.DataFrame]]:
    """
    Orchestrates the end-to-end data cleansing and preparation pipeline.

    This function executes the three core cleansing steps in sequence:
    1. Removes rows with fundamentally invalid data (nulls, non-positives).
    2. Enforces logical consistency of OHLC prices.
    3. Aligns the data to official exchange trading calendars.

    It returns a fully cleansed DataFrame ready for the next stage of processing
    (windowing and feature engineering), along with a comprehensive report
    detailing all modifications made to the data.

    Args:
        raw_market_data_df: The raw DataFrame of market data.
        ticker_to_market_map: A dictionary mapping each ticker to its market
                                identifier (e.g., 'US', 'CN', 'EU').

    Returns:
        A tuple containing:
        - A new, fully cleansed pandas DataFrame.
        - A dictionary of reports, where each key corresponds to a cleansing
          step and the value is a DataFrame summarizing the changes made.
    """
    # Initialize a dictionary to store reports from each cleansing step.
    reports = {}

    # --- Step 1: Remove rows with missing or invalid data ---
    # This is the first and most critical cleansing step.
    df_after_step1, report1 = _remove_invalid_data(raw_market_data_df)
    reports['invalid_data_removal'] = report1
    print(f"Step 1: Removed {report1.sum().sum()} rows with invalid numerical data.")

    # --- Step 2: Enforce intra-day price consistency ---
    # This step operates on the output of the previous step.
    df_after_step2, report2 = _enforce_price_consistency(df_after_step1)
    reports['price_consistency_enforcement'] = report2
    print(f"Step 2: Removed {report2.sum().sum()} rows with inconsistent OHLC prices.")

    # --- Step 3: Align trading calendars ---
    # The final cleansing step ensures temporal integrity.
    df_after_step3, report3 = _align_to_trading_calendars(df_after_step2, ticker_to_market_map)
    reports['calendar_alignment'] = report3
    print(f"Step 3: Removed {report3.sum().sum()} rows corresponding to non-trading days.")

    # The final DataFrame is the result of all sequential cleansing operations.
    final_cleaned_df = df_after_step3

    # Calculate and print the total reduction in data size.
    initial_rows = len(raw_market_data_df)
    final_rows = len(final_cleaned_df)
    reduction_pct = (1 - final_rows / initial_rows) * 100 if initial_rows > 0 else 0
    print(f"\nData cleansing complete. Total rows reduced from {initial_rows} to {final_rows} ({reduction_pct:.2f}% reduction).")

    return final_cleaned_df, reports


In [None]:
# Task 3: Construct Sliding Windows and Supervised Targets

# ==============================================================================
# Task 3: Construct Sliding Windows and Supervised Targets
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 3, Step 1: Define historical windows X and future targets y
# ------------------------------------------------------------------------------

def _create_sliding_windows(
    cleansed_df: pd.DataFrame,
    input_window_size: int,
    prediction_horizon: int,
) -> Tuple[np.ndarray, np.ndarray, pd.DataFrame]:
    """
    Converts a time-series DataFrame into sliding windows and rich metadata.

    This function implements the core data transformation from a 2D time-series
    format to 3D windowed arrays (X) and 2D target arrays (y). This revised
    version creates a comprehensive metadata DataFrame that includes start dates
    and the full date series for each window, making downstream processes
    robust and self-contained. It continues to use numpy's stride tricks for
    maximum memory efficiency.

    Args:
        cleansed_df: The fully cleansed market data DataFrame from Task 2.
        input_window_size: The number of time steps in each input window (T).
        prediction_horizon: The number of time steps to predict (T').

    Returns:
        A tuple containing:
        - X_windows: A 3D numpy array of shape (num_windows, T, num_features).
        - y_targets: A 2D numpy array of shape (num_windows, T').
        - metadata: A DataFrame with (ticker, window_start_date, window_end_date,
          window_dates) for each window.
    """
    # Define the exact feature set and order for the model input.
    feature_cols = ['Open', 'High', 'Low', 'Volume', 'Close', 'Adj Close']
    target_col = 'Adj Close'

    # Lists to store the results from each ticker before final concatenation.
    all_x, all_y, all_meta = [], [], []

    # Group by ticker to process each time-series independently, preventing data leakage.
    for ticker, group in cleansed_df.groupby(level='ticker', sort=False):
        # Extract data as numpy arrays for performance.
        features = group[feature_cols].values
        targets = group[target_col].values
        dates = group.index.get_level_values('date').values

        # Define the total length required to form one complete (X, y) sample.
        total_len = input_window_size + prediction_horizon

        # Skip tickers with insufficient data.
        if len(group) < total_len:
            continue

        # Determine the number of sliding windows that can be created.
        num_windows = len(group) - total_len + 1

        # --- Create X_windows using memory-efficient stride tricks ---
        shape = (num_windows, input_window_size, features.shape[1])
        strides = (features.strides[0], features.strides[0], features.strides[1])
        x_ticker = np.lib.stride_tricks.as_strided(features, shape=shape, strides=strides)

        # --- Create y_targets using memory-efficient stride tricks ---
        target_shape = (num_windows, prediction_horizon)
        target_strides = (targets.strides[0], targets.strides[0])
        y_ticker = np.lib.stride_tricks.as_strided(targets[input_window_size:], shape=target_shape, strides=target_strides)

        # --- Create Comprehensive Metadata ---
        # Pre-calculate all metadata fields for the current ticker.
        meta_list = []
        for i in range(num_windows):
            # The start index of the window in the original group's array.
            start_idx = i
            # The end index of the window.
            end_idx = i + input_window_size
            meta_list.append({
                'ticker': ticker,
                'window_start_date': dates[start_idx],
                'window_end_date': dates[end_idx - 1],
                # Store the actual series of dates for this window.
                'window_dates': dates[start_idx:end_idx]
            })
        meta_ticker = pd.DataFrame(meta_list)

        # Append the results for the current ticker to the master lists.
        all_x.append(x_ticker)
        all_y.append(y_ticker)
        all_meta.append(meta_ticker)

    # Handle the edge case where no valid windows could be created across all tickers.
    if not all_x:
        # Return empty, correctly-shaped artifacts.
        empty_x = np.empty((0, input_window_size, len(feature_cols)), dtype=np.float64)
        empty_y = np.empty((0, prediction_horizon), dtype=np.float64)
        empty_meta = pd.DataFrame(columns=['ticker', 'window_start_date', 'window_end_date', 'window_dates'])
        return empty_x, empty_y, empty_meta

    # Concatenate results from all tickers into the final arrays and DataFrame.
    X_windows = np.concatenate(all_x, axis=0)
    y_targets = np.concatenate(all_y, axis=0)
    metadata = pd.concat(all_meta, axis=0).reset_index(drop=True)

    return X_windows, y_targets, metadata

# ------------------------------------------------------------------------------
# Task 3, Step 2: Split data chronologically into train/val/test
# ------------------------------------------------------------------------------

def _split_data_chronologically(
    X_windows: np.ndarray,
    y_targets: np.ndarray,
    metadata: pd.DataFrame,
    dataset_name: Literal["StockNet", "Indices_2024_2025"],
) -> Dict[str, Dict[str, Union[np.ndarray, pd.DataFrame]]]:
    """
    Splits the windowed data into training, validation, and test sets chronologically.

    This function ensures a strict temporal separation between sets, which is
    paramount for valid time-series model evaluation. It supports different
    splitting strategies based on the dataset name.

    Args:
        X_windows: The 3D numpy array of input windows.
        y_targets: The 2D numpy array of target values.
        metadata: The DataFrame containing metadata for each window.
        dataset_name: The name of the dataset to determine the split strategy.

    Returns:
        A dictionary containing the data splits. For example:
        {
            'train': {'X': X_train, 'y': y_train, 'meta': meta_train},
            'val':   {'X': X_val,   'y': y_val,   'meta': meta_val},
            'test':  {'X': X_test,  'y': y_test,  'meta': meta_test}
        }
    """
    # Ensure metadata dates are in datetime format for comparison.
    metadata['window_end_date'] = pd.to_datetime(metadata['window_end_date'])

    # Get the unique, sorted list of all window end dates in the dataset.
    unique_dates = np.sort(metadata['window_end_date'].unique())

    if dataset_name == "StockNet":
        # Strategy for StockNet: 70% train, 15% validation, 15% test.
        # Calculate the split indices based on quantiles of the unique dates.
        n_dates = len(unique_dates)
        train_end_idx = int(n_dates * 0.7)
        val_end_idx = int(n_dates * 0.85)

        # Determine the cutoff dates.
        train_end_date = unique_dates[train_end_idx]
        val_end_date = unique_dates[val_end_idx]

        # Create boolean masks based on the cutoff dates.
        train_mask = metadata['window_end_date'] <= train_end_date
        val_mask = (metadata['window_end_date'] > train_end_date) & (metadata['window_end_date'] <= val_end_date)
        test_mask = metadata['window_end_date'] > val_end_date

    elif dataset_name == "Indices_2024_2025":
        # Strategy for Indices: Test on 2024-01-01 to 2025-01-01.
        # The rest is used for training. No validation set is defined by this rule.
        test_start_date = pd.Timestamp('2024-01-01')
        test_end_date = pd.Timestamp('2025-01-01')

        # Create boolean masks.
        test_mask = (metadata['window_end_date'] >= test_start_date) & (metadata['window_end_date'] < test_end_date)
        train_mask = metadata['window_end_date'] < test_start_date
        # Validation set is empty under this strategy.
        val_mask = pd.Series(False, index=metadata.index)

    else:
        raise ValueError(f"Unknown dataset_name for splitting: {dataset_name}")

    # Apply the masks to create the data splits.
    splits = {
        'train': {
            'X': X_windows[train_mask],
            'y': y_targets[train_mask],
            'meta': metadata[train_mask].reset_index(drop=True)
        },
        'val': {
            'X': X_windows[val_mask],
            'y': y_targets[val_mask],
            'meta': metadata[val_mask].reset_index(drop=True)
        },
        'test': {
            'X': X_windows[test_mask],
            'y': y_targets[test_mask],
            'meta': metadata[test_mask].reset_index(drop=True)
        }
    }

    return splits

# ------------------------------------------------------------------------------
# Task 3, Step 3: Validate and log window counts
# ------------------------------------------------------------------------------

def _log_window_counts(
    data_splits: Dict[str, Dict[str, Union[np.ndarray, pd.DataFrame]]],
    min_windows_per_ticker: int = 50
) -> Tuple[pd.DataFrame, List[str]]:
    """
    Generates a summary report of window counts per ticker and per split.

    Args:
        data_splits: The dictionary of data splits from the previous step.
        min_windows_per_ticker: The minimum number of training windows a ticker
                                must have to be considered for robust training.

    Returns:
        A tuple containing:
        - A summary DataFrame with window counts.
        - A list of tickers recommended for exclusion due to insufficient data.
    """
    # List to store summary information from each split.
    summary_list = []

    # Iterate over each split (train, val, test) to generate counts.
    for split_name, split_data in data_splits.items():
        meta = split_data['meta']
        if meta.empty:
            continue

        # Count windows per ticker within the current split.
        counts = meta['ticker'].value_counts().reset_index()
        counts.columns = ['ticker', 'window_count']
        counts['split'] = split_name
        summary_list.append(counts)

    # Combine all summaries into a single DataFrame.
    if not summary_list:
        return pd.DataFrame(), []

    summary_df = pd.concat(summary_list, ignore_index=True)
    # Pivot for a more readable, wide-format report.
    summary_pivot = summary_df.pivot_table(
        index='ticker', columns='split', values='window_count', fill_value=0
    )

    # Identify tickers with insufficient training data.
    if 'train' in summary_pivot.columns:
        train_counts = summary_pivot['train']
        tickers_to_exclude = train_counts[train_counts < min_windows_per_ticker].index.tolist()
    else:
        tickers_to_exclude = summary_pivot.index.tolist() # Exclude all if no train data

    return summary_pivot, tickers_to_exclude

# ------------------------------------------------------------------------------
# Task 3, Orchestrator Function
# ------------------------------------------------------------------------------

def construct_windows_and_split_data(
    cleansed_df: pd.DataFrame,
    config: Dict[str, Any],
    dataset_name: Literal["StockNet", "Indices_2024_2025"],
) -> Dict[str, Any]:
    """
    Orchestrates the creation of sliding windows and their chronological split.

    This function executes the full workflow for Task 3:
    1. Creates memory-efficient sliding windows (X, y) and metadata.
    2. Splits the data into train, validation, and test sets chronologically.
    3. Generates a detailed report of window counts for validation.

    Args:
        cleansed_df: The fully cleansed market data from Task 2.
        config: The master configuration dictionary.
        dataset_name: The name of the dataset to guide the splitting strategy.

    Returns:
        A dictionary containing the complete results:
        {
            'data_splits': The train/val/test data splits.
            'summary_report': A DataFrame with window counts.
            'excluded_tickers': A list of tickers with insufficient data.
        }
    """
    # Extract necessary parameters from the configuration.
    try:
        input_window_size = config['data_and_task_spec']['input_window_size']
        prediction_horizon = config['data_and_task_spec']['prediction_horizon']
    except KeyError as e:
        raise ValueError(f"Configuration is missing a required key for windowing: {e}")

    # --- Step 1: Create sliding windows ---
    print("Step 1: Creating sliding windows from time-series data...")
    X_windows, y_targets, metadata = _create_sliding_windows(
        cleansed_df, input_window_size, prediction_horizon
    )
    print(f"Created {len(metadata)} total windows.")
    if len(metadata) == 0:
        print("Warning: No valid windows could be created from the provided data.")
        return {
            'data_splits': {},
            'summary_report': pd.DataFrame(),
            'excluded_tickers': []
        }

    # --- Step 2: Split data chronologically ---
    print(f"Step 2: Splitting data chronologically using '{dataset_name}' strategy...")
    data_splits = _split_data_chronologically(
        X_windows, y_targets, metadata, dataset_name
    )
    print(f"Train windows: {len(data_splits['train']['meta'])}")
    print(f"Validation windows: {len(data_splits['val']['meta'])}")
    print(f"Test windows: {len(data_splits['test']['meta'])}")

    # --- Step 3: Validate and log window counts ---
    print("Step 3: Generating window count summary report...")
    summary_report, excluded_tickers = _log_window_counts(data_splits)
    print(f"Identified {len(excluded_tickers)} tickers with insufficient training data.")

    # Assemble the final output artifact.
    result = {
        'data_splits': data_splits,
        'summary_report': summary_report,
        'excluded_tickers': excluded_tickers
    }

    print("\nTask 3: Window construction and data splitting complete.")
    return result


In [None]:
# Task 4: Compute Technical Annotations f(X) for Each Window

# ==============================================================================
# Task 4: Compute Technical Annotations f(X) for Each Window
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 4, Step 1: Compute summary statistics over the window
# ------------------------------------------------------------------------------

def _compute_summary_statistics(
    X_windows: np.ndarray
) -> List[Dict[str, float]]:
    """
    Computes vectorized summary statistics (mean, min, max) for all windows.

    Args:
        X_windows: A 3D numpy array of shape (num_windows, T, 6), where the
                   6 features are [Open, High, Low, Volume, Close, Adj Close].

    Returns:
        A list of dictionaries, where each dictionary contains the statistics
        for one window.
    """
    # Define the feature names in the exact order they appear in the array.
    feature_names = ['Open', 'High', 'Low', 'Volume', 'Close', 'Adj_Close']

    # Perform vectorized calculations across all windows simultaneously.
    # axis=1 specifies aggregation over the time dimension (T).
    means = np.mean(X_windows, axis=1)
    mins = np.min(X_windows, axis=1)
    maxs = np.max(X_windows, axis=1)

    # Structure the results into a list of dictionaries for easy serialization.
    num_windows = X_windows.shape[0]
    stats_list = [{} for _ in range(num_windows)]

    # Iterate through each feature to populate the dictionaries.
    for i, name in enumerate(feature_names):
        for j in range(num_windows):
            stats_list[j][f"Mean_{name}"] = means[j, i]
            stats_list[j][f"Min_{name}"] = mins[j, i]
            stats_list[j][f"Max_{name}"] = maxs[j, i]

    return stats_list

# ------------------------------------------------------------------------------
# Task 4, Step 2: Compute financial technical indicators using Appendix formulas
# ------------------------------------------------------------------------------
# ------------------------------------------------------------------------------
# Numba JIT-compiled Helper Functions
# ------------------------------------------------------------------------------

@numba.njit(parallel=True, cache=True)
def _numba_ema_vectorized(data: np.ndarray, window: int) -> np.ndarray:
    """
    Calculates the Exponential Moving Average (EMA) for a batch of windows.

    This function is Just-In-Time (JIT) compiled with Numba and parallelized
    to achieve C-level performance for the iterative EMA calculation across
    a large number of independent windows.

    Process:
    1.  For each window (row) in the input `data` array:
    2.  Initializes the EMA with the Simple Moving Average (SMA) of the first
        `window` data points.
    3.  Iteratively applies the EMA formula for the remaining points in the series.
    4.  The outer loop over windows is parallelized by Numba's `prange`.

    Equation:
        EMA_t = Price_t * alpha + EMA_{t-1} * (1 - alpha)
        where alpha = 2 / (window + 1)

    Args:
        data: A 2D numpy array of shape (num_windows, T_period) containing the
              time-series data (e.g., closing prices) for each window.
        window: The lookback period (integer) for the EMA calculation.

    Returns:
        A 1D numpy array of shape (num_windows,) where each element is the
        final EMA value for the corresponding window.
    """
    # Get the number of windows to process from the shape of the input array.
    num_windows = data.shape[0]

    # Pre-allocate a numpy array to store the final EMA value for each window.
    ema_array = np.empty(num_windows, dtype=np.float64)

    # Calculate the smoothing factor 'alpha' based on the lookback window.
    alpha = 2.0 / (window + 1.0)

    # Use Numba's prange for parallel execution across the independent windows.
    for i in numba.prange(num_windows):
        # Extract the time-series for the current window.
        series = data[i]

        # Initialize the EMA with the Simple Moving Average of the first 'window' points.
        # This is a standard and robust initialization technique.
        ema = np.mean(series[:window])

        # Iteratively apply the EMA formula for the remaining points in the series.
        for j in range(window, len(series)):
            ema = alpha * series[j] + (1.0 - alpha) * ema

        # Store the final calculated EMA value for the current window.
        ema_array[i] = ema

    # Return the array of final EMA values.
    return ema_array


@numba.njit(parallel=True, cache=True)
def _numba_wilder_smooth_vectorized(data: np.ndarray, window: int) -> np.ndarray:
    """
    Calculates Wilder's Smoothing for a batch of windows.

    Wilder's Smoothing is a specific type of Exponential Moving Average used in
    indicators like RSI and ADX. This function is JIT-compiled with Numba and
    parallelized for maximum performance.

    Process:
    1.  For each window (row) in the input `data` array:
    2.  Initializes the smoothed value with the SMA of the first `window` points.
    3.  Iteratively applies the Wilder's smoothing formula.
    4.  The outer loop is parallelized by Numba's `prange`.

    Equation:
        SmoothedValue_t = (SmoothedValue_{t-1} * (n - 1) + NewValue_t) / n
        where n is the window period.

    Args:
        data: A 2D numpy array of shape (num_windows, T_period) containing the
              time-series data to be smoothed.
        window: The lookback period (integer) for the smoothing calculation.

    Returns:
        A 1D numpy array of shape (num_windows,) where each element is the
        final smoothed value for the corresponding window.
    """
    # Get the number of windows to process from the shape of the input array.
    num_windows = data.shape[0]

    # Pre-allocate a numpy array to store the final smoothed value for each window.
    smoothed_array = np.empty(num_windows, dtype=np.float64)

    # Use Numba's prange for parallel execution across the independent windows.
    for i in numba.prange(num_windows):
        # Extract the time-series for the current window.
        series = data[i]

        # Initialize the smoothed value with the Simple Moving Average.
        val = np.mean(series[:window])

        # Iteratively apply the Wilder's smoothing formula for subsequent points.
        for j in range(window, len(series)):
            val = (val * (window - 1) + series[j]) / window

        # Store the final calculated smoothed value for the current window.
        smoothed_array[i] = val

    # Return the array of final smoothed values.
    return smoothed_array

# ------------------------------------------------------------------------------
# Individual Indicator Implementations
# ------------------------------------------------------------------------------

def _calculate_sma(price_data: np.ndarray, window: int) -> np.ndarray:
    """
    Calculates the Simple Moving Average (SMA) for a batch of windows.

    This function performs a fully vectorized calculation of the SMA for the
    last `window` periods of each time-series in the input array.

    Equation:
        SMA = (1/n) * sum(Price_i for i=1 to n)

    Args:
        price_data: A 2D numpy array of shape (num_windows, T) of prices.
        window: The lookback period for the SMA.

    Returns:
        A 1D numpy array of shape (num_windows,) with the calculated SMA values.
        Returns NaN for windows with insufficient data.
    """
    # Input validation: Ensure the lookback period is not larger than the time series length.
    if price_data.shape[1] < window:
        # If there's not enough data, return an array of NaNs.
        return np.full(price_data.shape[0], np.nan, dtype=np.float64)

    # Calculate the mean of the last 'window' elements along axis 1 (the time dimension).
    return np.mean(price_data[:, -window:], axis=1)


def _calculate_ema(price_data: np.ndarray, window: int) -> np.ndarray:
    """
    Calculates the Exponential Moving Average (EMA) via a Numba helper.

    This function serves as a wrapper, handling input validation before passing
    the data to the high-performance, JIT-compiled Numba function.

    Equation:
        EMA_t = Price_t * alpha + EMA_{t-1} * (1 - alpha)

    Args:
        price_data: A 2D numpy array of shape (num_windows, T) of prices.
        window: The lookback period for the EMA.

    Returns:
        A 1D numpy array of shape (num_windows,) with the calculated EMA values.
        Returns NaN for windows with insufficient data.
    """
    # Input validation: Ensure there is enough data for at least one EMA calculation.
    if price_data.shape[1] < window:
        # Return an array of NaNs if the condition is not met.
        return np.full(price_data.shape[0], np.nan, dtype=np.float64)

    # Delegate the computationally intensive calculation to the JIT-compiled helper.
    return _numba_ema_vectorized(price_data, window)


def _calculate_momentum(price_data: np.ndarray, lag: int) -> np.ndarray:
    """
    Calculates the Momentum for a batch of windows.

    This is a fully vectorized implementation that measures the rate of change
    over a specified lag period.

    Equation:
        Momentum = Price_t - Price_{t-n}

    Args:
        price_data: A 2D numpy array of shape (num_windows, T) of prices.
        lag: The lookback period (n) for the momentum calculation.

    Returns:
        A 1D numpy array of shape (num_windows,) with the calculated momentum values.
        Returns NaN for windows with insufficient data.
    """
    # Input validation: Ensure the time series is long enough for the lag.
    if price_data.shape[1] < lag + 1:
        # Return an array of NaNs if there isn't enough data.
        return np.full(price_data.shape[0], np.nan, dtype=np.float64)

    # Subtract the price 'lag' periods ago from the most recent price.
    return price_data[:, -1] - price_data[:, -1 - lag]


def _calculate_rsi(price_data: np.ndarray, window: int) -> np.ndarray:
    """
    Calculates the Relative Strength Index (RSI) for a batch of windows.

    This implementation correctly uses Wilder's Smoothing for averaging gains
    and losses, which is the standard for RSI calculation. The heavy lifting
    is done by a JIT-compiled Numba helper for performance.

    Equation:
        RSI = 100 - (100 / (1 + RS))
        where RS = WilderSmooth(Gains) / WilderSmooth(Losses)

    Args:
        price_data: A 2D numpy array of shape (num_windows, T) of prices.
        window: The lookback period for the RSI.

    Returns:
        A 1D numpy array of shape (num_windows,) with the calculated RSI values.
        Returns NaN for windows with insufficient data.
    """
    # Input validation: RSI requires at least 'window' + 1 prices to calculate one change.
    if price_data.shape[1] < window + 1:
        return np.full(price_data.shape[0], np.nan, dtype=np.float64)

    # Step 1: Calculate price changes (deltas) between consecutive periods.
    deltas = np.diff(price_data, axis=1)

    # Step 2: Separate deltas into gains and losses.
    gains = np.where(deltas > 0, deltas, 0)
    losses = np.where(deltas < 0, -deltas, 0)

    # Step 3: Use Wilder's smoothing to get the average gain and average loss.
    avg_gain = _numba_wilder_smooth_vectorized(gains, window)
    avg_loss = _numba_wilder_smooth_vectorized(losses, window)

    # Step 4: Calculate the Relative Strength (RS).
    # Initialize RS to infinity to handle the case where avg_loss is zero (strong uptrend).
    rs = np.full_like(avg_gain, np.inf, dtype=np.float64)
    # Create a mask for non-zero average losses to avoid division by zero.
    mask = avg_loss != 0
    # Calculate RS only where avg_loss is not zero.
    rs[mask] = avg_gain[mask] / avg_loss[mask]

    # Step 5: Calculate the final RSI value.
    rsi = 100.0 - (100.0 / (1.0 + rs))
    return rsi


def _calculate_macd(
    price_data: np.ndarray,
    fast: int,
    slow: int,
    signal: int
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Calculates the Moving Average Convergence Divergence (MACD) and its signal line.

    This function computes the MACD line as the difference between a fast and
    a slow EMA. It then computes the signal line as an EMA of the MACD line itself.
    The calculation of the signal line requires computing the historical MACD
    values within each window, making it computationally intensive but correct.

    Equations:
        MACD_Line = EMA(price, fast_period) - EMA(price, slow_period)
        Signal_Line = EMA(MACD_Line, signal_period)

    Args:
        price_data: A 2D numpy array of shape (num_windows, T) of prices.
        fast: The lookback period for the fast EMA.
        slow: The lookback period for the slow EMA.
        signal: The lookback period for the signal line's EMA.

    Returns:
        A tuple of two 1D numpy arrays: (macd_line, signal_line).
        Returns NaNs for windows with insufficient data.
    """
    # Input validation: The time series must be at least as long as the slow EMA period.
    if price_data.shape[1] < slow:
        nan_array = np.full(price_data.shape[0], np.nan, dtype=np.float64)
        return nan_array, nan_array

    # Calculate the final MACD line value for each window.
    ema_fast_final = _numba_ema_vectorized(price_data, fast)
    ema_slow_final = _numba_ema_vectorized(price_data, slow)
    macd_line_final = ema_fast_final - ema_slow_final

    # To calculate the signal line, we need the history of the MACD line.
    num_windows, T = price_data.shape
    # The MACD history starts after the 'slow' period warmup.
    macd_history_len = T - slow + 1
    macd_history = np.empty((num_windows, macd_history_len), dtype=np.float64)

    # Iteratively compute the MACD value for each possible sub-window.
    for i in range(macd_history_len):
        # Define the slice of data needed for this historical MACD calculation.
        end_idx = i + slow
        window_slice = price_data[:, :end_idx]

        # Calculate historical fast and slow EMAs.
        ema_f = _numba_ema_vectorized(window_slice, fast)
        ema_s = _numba_ema_vectorized(window_slice, slow)

        # Store the historical MACD value.
        macd_history[:, i] = ema_f - ema_s

    # Calculate the signal line by taking an EMA of the MACD history.
    signal_line = _numba_ema_vectorized(macd_history, signal)

    return macd_line_final, signal_line


def _calculate_williams_r(
    high: np.ndarray,
    low: np.ndarray,
    close: np.ndarray,
    lookback: int
) -> np.ndarray:
    """
    Calculates Williams %R for a batch of windows.

    This is a fully vectorized momentum indicator that measures overbought and
    oversold levels.

    Equation:
        Williams %R = ((HighestHigh_n - Close_t) / (HighestHigh_n - LowestLow_n)) * -100

    Args:
        high: 2D numpy array of high prices (num_windows, T).
        low: 2D numpy array of low prices (num_windows, T).
        close: 2D numpy array of close prices (num_windows, T).
        lookback: The lookback period (n).

    Returns:
        A 1D numpy array of shape (num_windows,) with the calculated Williams %R values.
    """
    # Input validation: Ensure enough data for the lookback period.
    if high.shape[1] < lookback:
        return np.full(high.shape[0], np.nan, dtype=np.float64)

    # Slice the data to the lookback period.
    high_lb = high[:, -lookback:]
    low_lb = low[:, -lookback:]

    # Find the highest high and lowest low over the period for each window.
    highest_high = np.max(high_lb, axis=1)
    lowest_low = np.min(low_lb, axis=1)

    # Get the most recent closing price for each window.
    close_last = close[:, -1]

    # Calculate the numerator and denominator of the formula.
    numerator = highest_high - close_last
    denominator = highest_high - lowest_low

    # Handle the division-by-zero edge case (when highest_high == lowest_low).
    # Initialize the result array with NaNs.
    williams_r = np.full_like(denominator, np.nan, dtype=np.float64)
    # Create a mask to identify valid denominators.
    mask = denominator != 0
    # Perform the calculation only on the valid elements.
    williams_r[mask] = -100.0 * (numerator[mask] / denominator[mask])

    return williams_r


def _calculate_cci(
    high: np.ndarray,
    low: np.ndarray,
    close: np.ndarray,
    period: int
) -> np.ndarray:
    """
    Calculates the Commodity Channel Index (CCI) for a batch of windows.

    This is a fully vectorized implementation of the CCI, which measures the
    current price level relative to an average price level over a given period.

    Equation:
        CCI = (TypicalPrice - SMA(TP)) / (0.015 * MeanDeviation)

    Args:
        high: 2D numpy array of high prices (num_windows, T).
        low: 2D numpy array of low prices (num_windows, T).
        close: 2D numpy array of close prices (num_windows, T).
        period: The lookback period for the calculation.

    Returns:
        A 1D numpy array of shape (num_windows,) with the calculated CCI values.
    """
    # Input validation: Ensure enough data for the lookback period.
    if high.shape[1] < period:
        return np.full(high.shape[0], np.nan, dtype=np.float64)

    # Slice the data to the relevant lookback period.
    high_lb = high[:, -period:]
    low_lb = low[:, -period:]
    close_lb = close[:, -period:]

    # Step 1: Calculate the Typical Price (TP) for each day in the period.
    tp = (high_lb + low_lb + close_lb) / 3.0

    # Step 2: Calculate the Simple Moving Average of the Typical Price.
    sma_tp = np.mean(tp, axis=1)

    # Step 3: Calculate the Mean Absolute Deviation of TP from its SMA.
    mean_dev = np.mean(np.abs(tp - sma_tp[:, np.newaxis]), axis=1)

    # Step 4: Calculate the final CCI value.
    numerator = tp[:, -1] - sma_tp
    denominator = 0.015 * mean_dev

    # Handle the division-by-zero edge case (when mean_dev is zero).
    cci = np.full_like(denominator, np.nan, dtype=np.float64)
    mask = denominator != 0
    cci[mask] = numerator[mask] / denominator[mask]

    return cci


def _calculate_bollinger_bands(
    price_data: np.ndarray,
    window: int,
    k: float
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Calculates the Bollinger Bands (Upper and Lower) for a batch of windows.

    This is a fully vectorized implementation.

    Equations:
        UpperBand = SMA(price, n) + k * StdDev(price, n)
        LowerBand = SMA(price, n) - k * StdDev(price, n)

    Args:
        price_data: A 2D numpy array of shape (num_windows, T) of prices.
        window: The lookback period (n) for the moving average and standard deviation.
        k: The number of standard deviations to shift the bands.

    Returns:
        A tuple of two 1D numpy arrays: (upper_band, lower_band).
    """
    # Input validation: Ensure enough data for the lookback period.
    if price_data.shape[1] < window:
        nan_array = np.full(price_data.shape[0], np.nan, dtype=np.float64)
        return nan_array, nan_array

    # Slice the data to the relevant lookback period.
    price_lb = price_data[:, -window:]

    # Calculate the Simple Moving Average (the middle band).
    middle_band = np.mean(price_lb, axis=1)

    # Calculate the standard deviation over the period.
    std_dev = np.std(price_lb, axis=1)

    # Calculate the upper and lower bands.
    upper_band = middle_band + k * std_dev
    lower_band = middle_band - k * std_dev

    return upper_band, lower_band


def _calculate_stochastic_oscillator(
    high: np.ndarray,
    low: np.ndarray,
    close: np.ndarray,
    k_lookback: int,
    d_smoothing: int
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Calculates the Stochastic Oscillator (%K and %D) for a batch of windows.

    This implementation correctly calculates the final %K value and then computes
    the historical %K values needed to calculate the %D line (SMA of %K).

    Equations:
        %K = 100 * ((Close_t - LowestLow_n) / (HighestHigh_n - LowestLow_n))
        %D = SMA(%K, d_period)

    Args:
        high: 2D numpy array of high prices (num_windows, T).
        low: 2D numpy array of low prices (num_windows, T).
        close: 2D numpy array of close prices (num_windows, T).
        k_lookback: The lookback period (n) for %K.
        d_smoothing: The smoothing period for %D.

    Returns:
        A tuple of two 1D numpy arrays: (percent_k, percent_d).
    """
    # Input validation: Ensure enough data for the initial %K calculation.
    if high.shape[1] < k_lookback:
        nan_array = np.full(high.shape[0], np.nan, dtype=np.float64)
        return nan_array, nan_array

    # --- Calculate the final %K value for each window ---
    highest_high = np.max(high[:, -k_lookback:], axis=1)
    lowest_low = np.min(low[:, -k_lookback:], axis=1)
    close_last = close[:, -1]

    numerator = close_last - lowest_low
    denominator = highest_high - lowest_low

    percent_k_final = np.full_like(denominator, np.nan, dtype=np.float64)
    mask = denominator != 0
    percent_k_final[mask] = 100.0 * (numerator[mask] / denominator[mask])

    # --- Calculate %D by smoothing the history of %K ---
    num_windows, T = high.shape
    # The number of historical %K values we can compute within the window T.
    k_history_len = T - k_lookback + 1

    # Input validation for %D smoothing.
    if k_history_len < d_smoothing:
        percent_d_final = np.full(num_windows, np.nan, dtype=np.float64)
        return percent_k_final, percent_d_final

    k_history = np.empty((num_windows, k_history_len), dtype=np.float64)

    # Loop to compute the historical %K values needed for the %D SMA.
    for i in range(k_history_len):
        # Define the slice for this historical %K calculation.
        start_idx = i
        end_idx = i + k_lookback

        h_slice = high[:, start_idx:end_idx]
        l_slice = low[:, start_idx:end_idx]
        c_slice = close[:, end_idx - 1] # The close of the k_lookback period

        # Vectorized calculation for the historical %K value.
        hh = np.max(h_slice, axis=1)
        ll = np.min(l_slice, axis=1)
        num = c_slice - ll
        den = hh - ll

        k_vals = np.full_like(den, np.nan, dtype=np.float64)
        m = den != 0
        k_vals[m] = 100.0 * (num[m] / den[m])
        k_history[:, i] = k_vals

    # %D is the SMA of the last 'd_smoothing' values of %K.
    percent_d_final = np.mean(k_history[:, -d_smoothing:], axis=1)

    return percent_k_final, percent_d_final


@numba.njit(cache=True)
def _numba_adx_single_window(
    high: np.ndarray,
    low: np.ndarray,
    close: np.ndarray,
    period: int
) -> float:
    """
    Calculates the true ADX for a single time-series window using Numba.

    This function implements the complete, multi-step ADX calculation with
    high fidelity, including the final smoothing of the DX history. It is
    JIT-compiled for performance.

    Args:
        high: 1D numpy array of high prices for one window.
        low: 1D numpy array of low prices for one window.
        close: 1D numpy array of close prices for one window.
        period: The lookback period for the ADX calculation.

    Returns:
        The final scalar ADX value for the window, or np.nan if not computable.
    """
    # --- Input Validation ---
    # A true ADX requires an initial smoothing period for DI components, and then
    # another smoothing period for the resulting DX. A safe minimum length is required.
    if len(high) < 2 * period:
        return np.nan

    # --- Step 1: Calculate TR, +DM, -DM ---
    # Pre-allocate arrays for the history of these components.
    tr = np.zeros_like(high)
    plus_dm = np.zeros_like(high)
    minus_dm = np.zeros_like(high)

    # Loop from the second day onwards to calculate differences.
    for i in range(1, len(high)):
        # Calculate the three components of True Range.
        tr1 = high[i] - low[i]
        tr2 = np.abs(high[i] - close[i-1])
        tr3 = np.abs(low[i] - close[i-1])
        # True Range is the maximum of the three.
        tr[i] = np.maximum(tr1, np.maximum(tr2, tr3))

        # Calculate directional movements.
        up_move = high[i] - high[i-1]
        down_move = low[i-1] - low[i]

        # Store +DM if it's positive and the dominant move.
        if up_move > down_move and up_move > 0:
            plus_dm[i] = up_move
        # Store -DM if it's positive and the dominant move.
        if down_move > up_move and down_move > 0:
            minus_dm[i] = down_move

    # --- Step 2: Smooth TR, +DM, -DM using Wilder's Smoothing ---
    # Initialize smoothed values with a simple sum over the first period.
    # Note: Standard ADX uses a simple sum for the first value, not an SMA.
    smoothed_tr = np.zeros_like(tr)
    smoothed_plus_dm = np.zeros_like(plus_dm)
    smoothed_minus_dm = np.zeros_like(minus_dm)

    smoothed_tr[period-1] = np.sum(tr[0:period])
    smoothed_plus_dm[period-1] = np.sum(plus_dm[0:period])
    smoothed_minus_dm[period-1] = np.sum(minus_dm[0:period])

    # Apply iterative Wilder's smoothing for the rest of the series.
    for i in range(period, len(high)):
        smoothed_tr[i] = smoothed_tr[i-1] - (smoothed_tr[i-1] / period) + tr[i]
        smoothed_plus_dm[i] = smoothed_plus_dm[i-1] - (smoothed_plus_dm[i-1] / period) + plus_dm[i]
        smoothed_minus_dm[i] = smoothed_minus_dm[i-1] - (smoothed_minus_dm[i-1] / period) + minus_dm[i]

    # --- Step 3: Calculate +DI, -DI ---
    plus_di = np.zeros_like(high)
    minus_di = np.zeros_like(high)

    # Calculate DI values only where smoothed TR is non-zero.
    for i in range(period - 1, len(high)):
        if smoothed_tr[i] > 0:
            plus_di[i] = 100.0 * (smoothed_plus_dm[i] / smoothed_tr[i])
            minus_di[i] = 100.0 * (smoothed_minus_dm[i] / smoothed_tr[i])

    # --- Step 4: Calculate DX History ---
    dx = np.zeros_like(high)
    for i in range(period - 1, len(high)):
        di_sum = plus_di[i] + minus_di[i]
        if di_sum > 0:
            # Equation: DX = 100 * (|(+DI) - (-DI)| / |(+DI) + (-DI)|)
            dx[i] = 100.0 * (np.abs(plus_di[i] - minus_di[i]) / di_sum)

    # The history of DX values starts after the first DI calculation.
    # The first DX is calculated at index `period - 1`, but smoothing needs a full `period` of DX values.
    # The first valid DX for smoothing is at index `(period - 1) + (period - 1)`.
    first_dx_idx = (period - 1) + (period - 1)
    dx_history = dx[first_dx_idx:]

    if len(dx_history) < period:
        return np.nan

    # --- Step 5: Calculate ADX by smoothing DX ---
    # Initialize ADX with the SMA of the first 'period' DX values.
    adx = np.mean(dx_history[:period])

    # Apply Wilder's smoothing to the rest of the DX history.
    for i in range(period, len(dx_history)):
        adx = (adx * (period - 1) + dx_history[i]) / period

    return adx


def _calculate_adx(
    high: np.ndarray,
    low: np.ndarray,
    close: np.ndarray,
    period: int
) -> np.ndarray:
    """
    Calculates the Average Directional Index (ADX) for a batch of windows.

    This function serves as a vectorized wrapper around a high-performance,
    Numba-jitted helper that calculates the true ADX for a single window.
    The previous implementation incorrectly returned DX instead of ADX. This
    version rectifies that critical error by performing the full, correct,
    multi-step calculation, including the final smoothing of the DX history.

    Args:
        high: 2D numpy array of high prices (num_windows, T).
        low: 2D numpy array of low prices (num_windows, T).
        close: 2D numpy array of close prices (num_windows, T).
        period: The lookback period for the ADX calculation.

    Returns:
        A 1D numpy array of shape (num_windows,) with the calculated ADX values.
        Returns NaN for windows with insufficient data.
    """
    # --- Input Validation ---
    # Check if the input arrays have the expected 2D shape.
    if high.ndim != 2 or low.ndim != 2 or close.ndim != 2:
        raise ValueError("Input arrays (high, low, close) must be 2-dimensional.")

    # Get the number of windows to process.
    num_windows = high.shape[0]

    # Pre-allocate the output array for the results.
    adx_values = np.empty(num_windows, dtype=np.float64)

    # --- Iterative Calculation ---
    # Loop through each window and apply the JIT-compiled helper function.
    # While this is a Python loop, the heavy computation inside is compiled to
    # machine code by Numba, making it highly performant.
    for i in range(num_windows):
        adx_values[i] = _numba_adx_single_window(
            high[i], low[i], close[i], period
        )

    return adx_values


# ------------------------------------------------------------------------------
# Task 4, Step 3: Define and serialize the annotation mapping
# ------------------------------------------------------------------------------

def _serialize_annotations(
    stats_list: List[Dict[str, float]],
    indicators: Dict[str, np.ndarray]
) -> Tuple[List[str], List[str]]:
    """
    Serializes calculated statistics and indicators into formatted text blocks.

    Args:
        stats_list: The list of summary statistics dictionaries.
        indicators: The dictionary of technical indicator arrays.

    Returns:
        A tuple containing:
        - A list of formatted statistics strings, one for each window.
        - A list of formatted indicator strings, one for each window.
    """
    num_windows = len(stats_list)
    stats_blocks = []
    inds_blocks = []

    # Iterate through each window to create its formatted text blocks.
    for i in range(num_windows):
        # Format statistics block.
        stats_str = ", ".join([f"{key}: {value:.4f}" for key, value in stats_list[i].items()])
        stats_blocks.append(stats_str)

        # Format indicators block.
        inds_parts = []
        # Sort indicator names for consistent output order.
        for name in sorted(indicators.keys()):
            values = indicators[name]
            value = values[i]
            # Handle NaN values from calculations gracefully.
            formatted_value = f"{value:.4f}" if not np.isnan(value) else "N/A"
            inds_parts.append(f"{name}: {formatted_value}")
        inds_blocks.append(", ".join(inds_parts))

    return stats_blocks, inds_blocks

# ------------------------------------------------------------------------------
# Task 4, Orchestrator Function
# ------------------------------------------------------------------------------

def compute_technical_annotations(
    X_windows: np.ndarray,
    config: Dict[str, Any]
) -> Dict[str, List[str]]:
    """
    Orchestrates the computation and serialization of all technical annotations.

    This function implements the mapping X' = f(X) by:
    1. Computing summary statistics for each window.
    2. Computing a complete suite of financial technical indicators.
    3. Serializing these results into human-readable text blocks for an LLM.

    Args:
        X_windows: The 3D numpy array of input windows from Task 3.
        config: The master configuration dictionary.

    Returns:
        A dictionary containing the annotation results:
        {
            'statistics_blocks': A list of formatted statistics strings.
            'indicators_blocks': A list of formatted indicator strings.
        }
    """
    # Validate that the required configuration section is present.
    if 'annotation_indicator_hyperparams' not in config.get('data_and_task_spec', {}):
        raise ValueError("Config missing 'data_and_task_spec.annotation_indicator_hyperparams'")

    annotation_config = config['data_and_task_spec']['annotation_indicator_hyperparams']

    # --- Step 1: Compute summary statistics ---
    print("Step 1: Computing summary statistics for all windows...")
    stats_list = _compute_summary_statistics(X_windows)
    print(f"Computed statistics for {len(stats_list)} windows.")

    # --- Step 2: Compute financial technical indicators (Complete Suite) ---
    print("Step 2: Computing complete suite of financial technical indicators...")
    indicators = _compute_technical_indicators(X_windows, annotation_config)
    print(f"Computed {len(indicators)} types of indicators.")

    # --- Step 3: Serialize annotations into text blocks ---
    print("Step 3: Serializing annotations into text blocks...")
    statistics_blocks, indicators_blocks = _serialize_annotations(stats_list, indicators)
    print("Serialization complete.")

    # Assemble the final output artifact.
    result = {
        'statistics_blocks': statistics_blocks,
        'indicators_blocks': indicators_blocks
    }

    print("\nTask 4: Technical annotation computation complete.")
    return result


In [None]:
# Task 5: Assemble Reasoning Prompts q with Strict Formatting

# ==============================================================================
# Task 5: Assemble Reasoning Prompts q with Strict Formatting
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 5, Step 1: Populate the prompt template with window-specific data
# ------------------------------------------------------------------------------

def _format_ohlcv_table(
    window_data: np.ndarray,
    window_dates: np.ndarray
) -> str:
    """
    Formats a single window of numerical data into a human-readable table string.

    This revised version is a pure formatting function that receives all
    necessary data as arguments, removing any internal logic for date generation.

    Args:
        window_data: A 2D numpy array of shape (T, 6) for a single window.
        window_dates: A 1D numpy array of shape (T,) containing the actual
                      datetime objects for the window.

    Returns:
        A formatted multi-line string representing the OHLCV table.
    """
    # Input validation.
    if not isinstance(window_data, np.ndarray) or window_data.ndim != 2:
        raise TypeError("`window_data` must be a 2D numpy array.")
    if not isinstance(window_dates, np.ndarray) or window_dates.ndim != 1:
        raise TypeError("`window_dates` must be a 1D numpy array.")
    if window_data.shape[0] != window_dates.shape[0]:
        raise ValueError("Shape mismatch between `window_data` and `window_dates`.")

    # Create a pandas DataFrame for easy, robust string formatting.
    df = pd.DataFrame(
        window_data,
        columns=['Open', 'High', 'Low', 'Volume', 'Close', 'Adj Close'],
        index=pd.to_datetime(window_dates) # Ensure index is datetime
    )

    # Apply consistent formatting to each column for professional presentation.
    df['Open'] = df['Open'].map('{:.4f}'.format)
    df['High'] = df['High'].map('{:.4f}'.format)
    df['Low'] = df['Low'].map('{:.4f}'.format)
    df['Close'] = df['Close'].map('{:.4f}'.format)
    df['Adj Close'] = df['Adj Close'].map('{:.4f}'.format)

    # Format volume with commas for readability.
    df['Volume'] = df['Volume'].map('{:,.0f}'.format)

    # Convert the fully formatted DataFrame to a string, including the date index.
    return df.to_string()


def _populate_prompts(
    X_windows: np.ndarray,
    metadata: pd.DataFrame,
    annotations: Dict[str, List[str]],
    config: Dict[str, Any]
) -> List[str]:
    """
    Assembles the final prompt strings by populating a template.

    This version is a pure function that relies solely on its
    inputs. It uses the rich metadata generated by the revised windowing function,
    eliminating fragile external dependencies and brittle assumptions about
    date continuity.

    Args:
        X_windows: The 3D numpy array of input windows.
        metadata: The comprehensive DataFrame with metadata for each window,
                  including start/end dates and the date series.
        annotations: The dictionary of formatted statistics and indicator blocks.
        config: The master configuration dictionary.

    Returns:
        A list of fully-formed prompt strings, one for each window.

    Raises:
        ValueError: If required columns are missing from the metadata.
    """
    # --- Input Validation ---
    required_meta_cols = {'ticker', 'window_start_date', 'window_end_date', 'window_dates'}
    if not required_meta_cols.issubset(metadata.columns):
        raise ValueError(f"Metadata is missing required columns. Found: {metadata.columns}. Required: {required_meta_cols}")

    # Extract the prompt template string from the configuration.
    prompt_template = config['reasoning_model_config']['prompt_template']['template_string']

    # Extract parameters for easy access.
    input_window_size = config['data_and_task_spec']['input_window_size']
    prediction_horizon = config['data_and_task_spec']['prediction_horizon']

    # Initialize a list to hold the final prompts.
    prompts = []

    # Iterate through each window to construct its specific prompt.
    for i in range(len(metadata)):
        # Get the metadata for the current window directly from the metadata DataFrame.
        meta_row = metadata.iloc[i]

        # Format the OHLCV table using the dedicated helper function.
        # We pass the raw numerical data and the actual date series from metadata.
        ohlcv_table = _format_ohlcv_table(X_windows[i], meta_row['window_dates'])

        # Assemble all placeholder values required by the template.
        prompt_data = {
            "ticker": meta_row['ticker'],
            "input_window_size": input_window_size,
            "prediction_horizon": prediction_horizon,
            "window_start": pd.to_datetime(meta_row['window_start_date']).strftime('%Y-%m-%d'),
            "window_end": pd.to_datetime(meta_row['window_end_date']).strftime('%Y-%m-%d'),
            "ohlcv_table": ohlcv_table,
            "statistics_block": annotations['statistics_blocks'][i],
            "indicators_block": annotations['indicators_blocks'][i],
            # These are placeholders within the template string itself, not for formatting here.
            "reasoning": "{reasoning}",
            "p1": "{p1}", "p2": "{p2}", "pN": "{pN}"
        }

        # Populate the template with the data for the current window.
        # The `.format()` method will ignore the extra keys not present in the template.
        prompts.append(prompt_template.format(**prompt_data))

    return prompts

# ------------------------------------------------------------------------------
# Task 5, Step 2: Define strict output format and parsing rules
# ------------------------------------------------------------------------------

class ParsedLLMOutput(NamedTuple):
    """
    A structured data container for the results of parsing raw LLM output.

    This NamedTuple provides a standardized, immutable object for returning
    the outcome of the parsing process. It clearly separates the validity status,
    the extracted components, and any error information, facilitating clean
    downstream logic in reward functions and evaluation scripts.

    Attributes:
        is_valid: A boolean flag indicating if the raw output string adhered to
                  all formatting rules and was successfully parsed.
        reasoning: The extracted textual reasoning trace from between the
                   `<think>...</think>` tags. `None` if parsing failed at or
                   before this step.
        prediction: A 1D numpy array of shape (T',) containing the parsed
                    numerical forecast. `None` if parsing failed.
        error_message: A string describing the specific reason for a parsing
                       failure. `None` if `is_valid` is `True`.
    """
    is_valid: bool
    reasoning: Optional[str]
    prediction: Optional[np.ndarray]
    error_message: Optional[str]


class LLMOutputParser:
    """
    A robust parser for the structured output of the reasoning LLM.

    This class encapsulates the strict formatting and parsing rules required
    to reliably extract the reasoning trace and numerical prediction from the
    LLM's raw text output. It is a critical utility designed for repeated use
    within the reward function of the Reinforcement Learning training loop,
    where performance and robustness to malformed inputs are paramount.

    The parser is configured once upon initialization and provides a single
    `parse` method that implements a multi-stage validation and extraction
    process.
    """
    def __init__(self, config: Dict[str, Any]):
        """
        Initializes the parser with rules from the master configuration.

        This constructor sets up the parser by extracting necessary parameters
        and pre-compiling the regular expression for prediction line matching,
        which is a key performance optimization for repeated use.

        Args:
            config: The master configuration dictionary for the VTA framework.
                    It must contain the necessary keys under
                    `reasoning_model_config` and `data_and_task_spec`.

        Raises:
            KeyError: If a required configuration key is missing.
            ValueError: If the regex pattern in the configuration is invalid.
        """
        try:
            # --- Input Validation and Configuration Extraction ---
            # Extract the specific sub-dictionary for prompt template rules.
            prompt_config = config['reasoning_model_config']['prompt_template']

            # Extract the enforcement rules dictionary.
            rules = prompt_config['format_enforcement']

            # Store the prediction horizon (T') for validating the output length.
            self.prediction_horizon: int = config['data_and_task_spec']['prediction_horizon']

            # Extract the regex pattern for finding the prediction line.
            pattern_str = rules['prediction_pattern']

        except KeyError as e:
            # Raise a specific error if the configuration is incomplete.
            raise KeyError(f"LLMOutputParser initialization failed. Missing required key in config: {e}")

        # --- Pre-compile Regex for Performance ---
        try:
            # Compiling the regex pattern once during initialization is significantly
            # more efficient than compiling it on every call to `parse`.
            # The MULTILINE flag allows `^` to match the start of each line.
            self.prediction_pattern: re.Pattern = re.compile(pattern_str, re.MULTILINE)

        except re.error as e:
            # Raise an error if the provided regex is syntactically invalid.
            raise ValueError(f"Invalid regex in config 'prediction_pattern': {e}")

    def parse(self, raw_output: str) -> ParsedLLMOutput:
        """
        Parses a raw text string from the LLM according to strict format rules.

        This method executes a sequence of validation and extraction steps. It is
        designed to fail gracefully, returning a structured result with a clear
        error message upon the first violation encountered.

        Args:
            raw_output: The raw string generated by the LLM.

        Returns:
            A ParsedLLMOutput named tuple containing the structured parsing result.
        """
        # --- Validation Step 1: Extract reasoning from <think>...</think> tags ---
        try:
            # Use a non-greedy search (.*?) with the DOTALL flag to match across newlines.
            # .group(1) extracts the content *inside* the tags.
            reasoning = re.search(r'<think>(.*?)</think>', raw_output, re.DOTALL).group(1).strip()

        except AttributeError:
            # This error occurs if `re.search` returns None (i.e., no match found).
            return ParsedLLMOutput(
                is_valid=False,
                reasoning=None,
                prediction=None,
                error_message="Format Error: Could not find valid <think>...</think> tags."
            )

        # --- Validation Step 2: Find the prediction line using the pre-compiled regex ---
        # Search the entire string for a line matching the required prediction format.
        match = self.prediction_pattern.search(raw_output)

        # If no match is found, the format is invalid.
        if not match:
            return ParsedLLMOutput(
                is_valid=False,
                reasoning=reasoning,
                prediction=None,
                error_message="Format Error: Prediction line not found or malformed."
            )

        # Extract the full matched prediction line (e.g., "PREDICTION = [1.0, 2.0]").
        prediction_line = match.group(0)

        # --- Validation Step 3: Check for extraneous text after the prediction line ---
        # Extract any text that appears after the end of the matched prediction line.
        text_after_prediction = raw_output[match.end():].strip()

        # The format requires no additional text after the prediction.
        if text_after_prediction:
            return ParsedLLMOutput(
                is_valid=False,
                reasoning=reasoning,
                prediction=None,
                error_message="Format Error: Extraneous text found after prediction line."
            )

        # --- Validation Step 4: Parse the numerical values from the prediction line ---
        try:
            # Extract the string content between the square brackets.
            list_str = prediction_line.split('[', 1)[1].rsplit(']', 1)[0]

            # Use a list comprehension to convert each comma-separated value to a float.
            # This will raise a ValueError if any element is not a valid number.
            prediction_list = [float(x) for x in list_str.split(',')]

            # Convert the Python list to a numpy array with a specific dtype for consistency.
            prediction = np.array(prediction_list, dtype=np.float64)

        except (ValueError, IndexError) as e:
            # Catch errors from splitting (IndexError) or float conversion (ValueError).
            return ParsedLLMOutput(
                is_valid=False,
                reasoning=reasoning,
                prediction=None,
                error_message=f"Parsing Error: Failed to parse numbers from prediction list. Details: {e}"
            )

        # --- Validation Step 5: Validate the number of predictions ---
        # The length of the parsed array must match the required prediction horizon (T').
        if len(prediction) != self.prediction_horizon:
            return ParsedLLMOutput(
                is_valid=False,
                reasoning=reasoning,
                prediction=prediction, # Return the malformed prediction for debugging.
                error_message=f"Length Error: Expected {self.prediction_horizon} predictions, but found {len(prediction)}."
            )

        # --- Success ---
        # If all checks pass, the output is considered valid.
        return ParsedLLMOutput(
            is_valid=True,
            reasoning=reasoning,
            prediction=prediction,
            error_message=None
        )

# ------------------------------------------------------------------------------
# Task 5, Step 3: Validate prompt length and tokenization
# ------------------------------------------------------------------------------

def _validate_prompt_tokenization(
    prompts: List[str],
    config: Dict[str, Any]
) -> None:
    """
    Validates that all generated prompts are within the LLM's context limit.

    Args:
        prompts: A list of fully-formed prompt strings.
        config: The master configuration dictionary.

    Raises:
        ValueError: If any prompt exceeds the specified context length.
    """
    # Extract model and context length information from the config.
    model_identifier = config['reasoning_model_config']['llm_settings']['base_model_identifier']
    context_limit = config['reasoning_model_config']['llm_settings']['context_length_tokens']

    # Load the specified tokenizer from Hugging Face.
    try:
        tokenizer: PreTrainedTokenizer = AutoTokenizer.from_pretrained(model_identifier)
    except Exception as e:
        raise IOError(f"Failed to load tokenizer for '{model_identifier}'. Error: {e}")

    # Use the tokenizer's batch processing for efficiency.
    tokenized_outputs = tokenizer(prompts, add_special_tokens=True)

    # Check the length of each tokenized prompt.
    oversized_prompts = []
    for i, token_ids in enumerate(tokenized_outputs['input_ids']):
        token_count = len(token_ids)
        if token_count > context_limit:
            oversized_prompts.append((i, token_count))

    # If any prompts were oversized, raise a single, detailed error.
    if oversized_prompts:
        error_message = (
            f"{len(oversized_prompts)} prompts exceed the context limit of {context_limit} tokens.\n"
            "Details (prompt_index, token_count):\n"
        )
        for i, count in oversized_prompts[:5]: # Show first 5 examples
            error_message += f"- Prompt {i}: {count} tokens\n"
        raise ValueError(error_message)

# ------------------------------------------------------------------------------
# Task 5, Orchestrator Function
# ------------------------------------------------------------------------------

def assemble_and_validate_prompts(
    X_windows: np.ndarray,
    metadata: pd.DataFrame,
    annotations: Dict[str, List[str]],
    config: Dict[str, Any]
) -> Tuple[List[str], LLMOutputParser]:
    """
    Orchestrates the assembly and validation of prompts for the reasoning LLM.

    This function executes the full workflow for Task 5:
    1. Populates the prompt template with window-specific data.
    2. Validates that all generated prompts are within the LLM's token limit.
    3. Instantiates and returns a robust parser for handling LLM outputs.

    Args:
        X_windows: The 3D numpy array of input windows.
        metadata: The comprehensive DataFrame with metadata for each window,
                  including start/end dates and the date series.
        annotations: The dictionary of formatted annotation strings from Task 4.
        config: The master configuration dictionary.

    Returns:
        A tuple containing:
        - A list of validated, ready-to-use prompt strings.
        - An instance of the LLMOutputParser for use in the training loop.
    """
    # --- Step 1: Populate the prompt template ---
    # The call to _populate_prompts is now cleaner, without the cleansed_df dependency.
    print("Step 1: Assembling prompts from templates and data...")
    prompts = _populate_prompts(X_windows, metadata, annotations, config)
    print(f"Assembled {len(prompts)} prompts.")

    # Handle the edge case of no prompts being generated.
    if not prompts:
        print("Warning: No prompts were generated.")
        # Still return a valid parser instance for downstream consistency.
        return [], LLMOutputParser(config)

    # --- Step 2: Validate prompt tokenization ---
    # This step is critical to prevent runtime errors during training.
    print("Step 2: Validating prompt lengths against context limit...")
    _validate_prompt_tokenization(prompts, config)
    print("All prompts are within the context length limit.")

    # --- Step 3: Instantiate the output parser ---
    # The parser is an essential utility for the subsequent RL training task.
    print("Step 3: Instantiating LLM output parser...")
    parser = LLMOutputParser(config)

    print("\nTask 5: Prompt assembly and validation complete.")
    return prompts, parser


In [None]:
# Task 6: Train Reasoning LLM œÄŒ∏ ‚Äì Stage 1 (Cold-Start RL with Time-GRPO)

# ==============================================================================
# Task 6: Train Reasoning LLM œÄŒ∏ ‚Äì Stage 1 (Cold-Start RL with Time-GRPO)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 6, Data Structures for Clarity and Type Safety
# ------------------------------------------------------------------------------

@dataclass
class ExperienceBuffer:
    """
    A structured container for experiences collected during the RL sampling phase.

    This dataclass ensures type safety and clarity when passing complex data
    between the sampling and optimization stages of the PPO-style algorithm.
    All tensors are expected to be on the same device and are detached from the
    computation graph created during sampling.

    Attributes:
        prompt_tokens: Tensor of shape (batch_size * group_size, prompt_len)
                       containing the tokenized prompts.
        prompt_attention_mask: Tensor of shape (batch_size * group_size, prompt_len)
                               for attending to the prompt tokens.
        generated_tokens: Tensor of shape (batch_size * group_size, full_seq_len)
                          containing the full generated sequences (prompt + completion).
        log_probs_old: Tensor of shape (batch_size * group_size,) containing the
                       log probability of each generated sequence under the policy
                       that generated it (œÄ_Œ∏_old).
        rewards: Tensor of shape (batch_size * group_size,) containing the final
                 scalar reward for each generated sequence.
        advantages: Tensor of shape (batch_size * group_size,) containing the
                    calculated advantage for each sequence, normalized within its group.
        values_old: Tensor of shape (batch_size * group_size,) containing the
                    value function's estimate of the expected reward for each prompt state.
    """
    prompt_tokens: torch.Tensor
    prompt_attention_mask: torch.Tensor
    generated_tokens: torch.Tensor
    log_probs_old: torch.Tensor
    rewards: torch.Tensor
    advantages: torch.Tensor
    values_old: torch.Tensor

# ------------------------------------------------------------------------------
# Task 6, Step 1: Initialize the base model and LoRA adapters
# ------------------------------------------------------------------------------

class ValueHead(torch.nn.Module):
    """
    A standalone value head module for PPO.

    This module is placed on top of the LLM's transformer backbone to predict
    the value function (the expected future reward) from a given state, which
    is represented by the LLM's hidden states. Its parameters are trained
    concurrently with the policy's LoRA adapters.
    """
    def __init__(self, hidden_size: int, dropout_prob: float = 0.1):
        """
        Initializes the value head.

        Args:
            hidden_size: The dimensionality of the LLM's hidden states, which
                         serves as the input dimension for this head.
            dropout_prob: The dropout probability for regularization before the
                          final linear layer.
        """
        # Initialize the parent torch.nn.Module.
        super().__init__()

        # A dropout layer for regularization to prevent overfitting.
        self.dropout = torch.nn.Dropout(dropout_prob)

        # A single linear layer that maps the hidden state to a scalar value.
        self.linear = torch.nn.Linear(hidden_size, 1)

    def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
        """
        Performs a forward pass to predict the state value.

        Args:
            hidden_states: A tensor of shape (batch_size, seq_len, hidden_size)
                           from the LLM's backbone.

        Returns:
            A tensor of shape (batch_size,) containing the predicted scalar value
            for each state in the batch.
        """
        # We use the hidden state of the last token as the representation of the
        # entire sequence state. This is a common practice.
        last_hidden_state = hidden_states[:, -1, :]

        # Apply dropout for regularization.
        last_hidden_state = self.dropout(last_hidden_state)

        # Pass through the linear layer to get the scalar value prediction.
        # .squeeze(-1) removes the trailing dimension of size 1.
        return self.linear(last_hidden_state).squeeze(-1)


def _initialize_rl_models(
    config: Dict[str, Any],
    device: torch.device
) -> Dict[str, Any]:
    """
    Initializes and configures all models required for RL training.

    This function performs the critical setup for the RL agent:
    1. Loads the base LLM and its tokenizer from Hugging Face.
    2. Applies LoRA adapters to the base model to create the trainable `policy_model`.
    3. Creates a frozen, deep copy of the base model as the `reference_model`.
    4. Instantiates a trainable `value_model` head.

    Args:
        config: The master configuration dictionary.
        device: The torch device (e.g., 'cuda' or 'cpu') for model placement.

    Returns:
        A dictionary containing all initialized components: 'policy_model',
        'reference_model', 'value_model', and 'tokenizer'.

    Raises:
        ValueError: If a specified LoRA target module does not exist in the model.
        IOError: If the model or tokenizer cannot be loaded from Hugging Face.
    """
    try:
        # --- Load Base Model and Tokenizer ---
        # Extract the model identifier from the configuration.
        model_id = config['reasoning_model_config']['llm_settings']['base_model_identifier']
        print(f"Loading base model '{model_id}' onto device '{device}'...")

        # Load the base causal language model using bfloat16 for performance on compatible GPUs.
        base_model = AutoModelForCausalLM.from_pretrained(
            model_id,
            torch_dtype=torch.bfloat16,
            trust_remote_code=True
        ).to(device)

        # Load the tokenizer associated with the base model.
        tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)

    except Exception as e:
        # Handle potential errors during model loading (e.g., network issues, invalid ID).
        raise IOError(f"Failed to load base model or tokenizer for '{model_id}'. Error: {e}")

    # Standard practice for causal LMs: set pad token to EOS token if not defined.
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token

    # Set padding side to 'left' for correct batch generation with autoregressive models.
    tokenizer.padding_side = 'left'

    # --- Create Policy Model (œÄŒ∏) with LoRA ---
    # Extract the LoRA configuration dictionary.
    lora_config_dict = config['reasoning_model_config']['lora_config']

    # Input validation: Ensure that the specified LoRA target modules exist in the model.
    model_modules = {name for name, _ in base_model.named_modules()}
    for module in lora_config_dict['target_modules']:
        if module not in model_modules:
            # Raise a descriptive error to help the user correct the configuration.
            raise ValueError(
                f"LoRA target module '{module}' not found in model. "
                f"Available modules include (first 10): {list(model_modules)[:10]}..."
            )

    # Define the PEFT configuration for LoRA from the master config.
    peft_config = LoraConfig(
        r=lora_config_dict['rank_r'],
        lora_alpha=lora_config_dict['alpha'],
        lora_dropout=lora_config_dict['dropout'],
        target_modules=lora_config_dict['target_modules'],
        bias="none",
        task_type="CAUSAL_LM",
    )

    # Apply the LoRA config to the base model to create the policy model.
    # This freezes all base model weights and inserts trainable LoRA adapters.
    policy_model = get_peft_model(base_model, peft_config)
    print("\n--- Policy Model (œÄŒ∏) Trainable Parameters ---")
    # Print a summary of trainable vs. total parameters for verification.
    policy_model.print_trainable_parameters()

    # --- Create Reference Model (œÄref) ---
    # The reference model is a frozen copy used for the KL divergence penalty.
    # A deep copy ensures its weights are independent of the policy model's base.
    reference_model = copy.deepcopy(base_model)
    # Set the model to evaluation mode to disable dropout, etc.
    reference_model.eval()
    # Explicitly disable gradient calculations for all parameters.
    for param in reference_model.parameters():
        param.requires_grad = False
    print(f"\n--- Reference Model (œÄref) created with all parameters frozen. ---")

    # --- Create Value Model ---
    # The value model is a separate head to predict the expected reward.
    value_model = ValueHead(base_model.config.hidden_size).to(device)
    # Set the value model to training mode.
    value_model.train()
    print(f"\n--- Value Head created with trainable parameters. ---")

    # Return all components in a structured dictionary.
    return {
        'policy_model': policy_model,
        'reference_model': reference_model,
        'value_model': value_model,
        'tokenizer': tokenizer
    }

# ------------------------------------------------------------------------------
# Task 6, Step 2: Sampling protocol and reward computation
# ------------------------------------------------------------------------------

def _get_sequence_log_probs(
    model: PreTrainedModel,
    full_tokens: torch.Tensor,
    attention_mask: torch.Tensor,
    prompt_len: int
) -> torch.Tensor:
    """
    Calculates the log probability of the generated portion of a sequence.

    This function is a critical component of PPO, used to calculate the log
    probabilities of actions (tokens) under a given policy (model). It carefully
    handles padding and separates the prompt from the generation.

    Args:
        model: The model (policy or reference) to use for the calculation.
        full_tokens: A tensor of shape (batch_size, seq_len) containing the
                     full tokenized sequences (prompt + generation).
        attention_mask: The attention mask for `full_tokens`.
        prompt_len: The length of the prompt portion of the sequences.

    Returns:
        A 1D tensor of shape (batch_size,) with the summed log probability
        for the generated part of each sequence.
    """
    # Perform a forward pass to get the logits for the entire sequence.
    # `torch.no_grad()` is used for efficiency as we don't need gradients here.
    with torch.no_grad():
        outputs = model(full_tokens, attention_mask=attention_mask)
    logits = outputs.logits

    # Convert the raw logits to log probabilities using log_softmax.
    log_probs_all = torch.log_softmax(logits, dim=-1)

    # Isolate the tokens that were generated (i.e., the "actions").
    gen_tokens = full_tokens[:, prompt_len:]

    # Gather the log probabilities of the specific tokens that were generated.
    # The logits at time step `t-1` predict the token at time step `t`.
    # So, we align `logits[:, prompt_len-1:-1]` with `gen_tokens`.
    gathered_log_probs = torch.gather(
        log_probs_all[:, prompt_len - 1:-1, :],
        dim=2,
        index=gen_tokens.unsqueeze(-1)
    ).squeeze(-1)

    # Create a mask to exclude padding tokens from the sum of log probabilities.
    gen_attention_mask = (gen_tokens != model.config.pad_token_id)

    # Element-wise multiply by the mask to zero out log probs of padding tokens,
    # then sum along the sequence dimension to get the total log prob for each sequence.
    sequence_log_probs = (gathered_log_probs * gen_attention_mask).sum(dim=1)

    return sequence_log_probs


def _compute_rewards(
    parsed_outputs: List[ParsedLLMOutput],
    ground_truth_targets: torch.Tensor,
    config: Dict[str, Any],
    device: torch.device
) -> torch.Tensor:
    """
    Computes the combined reward for a batch of generated and parsed outputs.

    The reward function combines a binary format reward with a continuous,
    performance-based reward derived from the inverse Mean Squared Error.

    Args:
        parsed_outputs: A list of ParsedLLMOutput objects from the parser.
        ground_truth_targets: A tensor of shape (batch_size, T') with the true
                              future prices, aligned with `parsed_outputs`.
        config: The master configuration dictionary.
        device: The torch device where tensors should reside.

    Returns:
        A 1D tensor of shape (batch_size,) containing the final reward for each output.
    """
    # Extract relevant hyperparameters from the configuration.
    rl_config = config['reasoning_model_config']['rl_training_params']
    w_format = rl_config['format_reward_weight']
    lambda_mse = rl_config['reward_scale_lambda']

    # Pre-allocate a tensor to store the rewards for the batch.
    rewards = torch.zeros(len(parsed_outputs), device=device, dtype=torch.float32)

    # Iterate through each parsed output to calculate its reward.
    for i, parsed in enumerate(parsed_outputs):
        # If the output format is invalid, the reward is zero. The format reward
        # component is implicitly zero, and the MSE component is not calculated.
        if not parsed.is_valid:
            rewards[i] = 0.0
            continue

        # Convert the parsed numpy prediction to a torch tensor on the correct device.
        prediction_tensor = torch.from_numpy(parsed.prediction).to(device)

        # Equation: r_MSE = 1 / (Œª * ||≈∑ - y||^2 + Œµ)
        # Calculate the Mean Squared Error between the prediction and the ground truth.
        mse = torch.nn.functional.mse_loss(prediction_tensor, ground_truth_targets[i])
        # Calculate the inverse MSE reward, adding a small epsilon for numerical stability.
        inverse_mse_reward = 1.0 / (lambda_mse * mse + 1e-8)

        # Equation: r_total = w_format * r_format + r_MSE
        # Since the format is valid, r_format is 1.0.
        total_reward = w_format * 1.0 + inverse_mse_reward
        rewards[i] = total_reward

    return rewards


def _sample_and_collect_experiences(
    prompts: List[str],
    targets: torch.Tensor,
    models: Dict[str, Any],
    parser: LLMOutputParser,
    config: Dict[str, Any],
    device: torch.device
) -> ExperienceBuffer:
    """
    Performs the sampling phase of the RL loop to generate experiences for a batch.

    This function orchestrates the interaction between the policy model and the
    environment (i.e., the reward function) to collect a buffer of experiences
    that will be used for the optimization step.

    Args:
        prompts: A batch of prompt strings.
        targets: The ground truth target tensors for the batch.
        models: The dictionary of initialized models.
        parser: The LLM output parser.
        config: The master configuration dictionary.
        device: The torch device.

    Returns:
        An `ExperienceBuffer` dataclass instance containing all collected data.
    """
    # Unpack the required models and configuration settings.
    policy_model = models['policy_model']
    value_model = models['value_model']
    tokenizer = models['tokenizer']

    rl_config = config['reasoning_model_config']['rl_training_params']
    group_size = rl_config['group_size_G']

    # --- Step 1: Generate G responses for each prompt in the batch ---
    # Repeat each prompt G times to facilitate group-based advantage normalization.
    batch_prompts_repeated = [p for p in prompts for _ in range(group_size)]

    # Tokenize the repeated prompts, padding and truncating as necessary.
    inputs = tokenizer(
        batch_prompts_repeated,
        return_tensors="pt",
        padding=True,
        truncation=True,
        max_length=tokenizer.model_max_length
    ).to(device)

    # Create a GenerationConfig object from the decoding settings in the master config.
    gen_config = GenerationConfig(
        **rl_config['decoding_settings_for_sampling'],
        pad_token_id=tokenizer.pad_token_id,
        eos_token_id=tokenizer.eos_token_id
    )

    # Generate sequences from the policy model without tracking gradients.
    with torch.no_grad():
        generated_outputs = policy_model.generate(**inputs, generation_config=gen_config)

    # Decode the generated token sequences back into human-readable text.
    decoded_texts = tokenizer.batch_decode(generated_outputs, skip_special_tokens=True)

    # Parse each generated text to validate its format and extract its contents.
    parsed_outputs = [parser.parse(text) for text in decoded_texts]

    # --- Step 2: Compute Rewards ---
    # Repeat the ground truth targets to align with the G generated responses per prompt.
    repeated_targets = targets.repeat_interleave(group_size, dim=0)
    rewards = _compute_rewards(parsed_outputs, repeated_targets, config, device)

    # --- Step 3: Compute Old Log Probs and Values ---
    # The length of the prompt portion of the tokenized sequences.
    prompt_len = inputs['input_ids'].shape[1]

    # Get the log probabilities of the generated sequences under the policy that generated them (œÄ_Œ∏_old).
    log_probs_old = _get_sequence_log_probs(
        policy_model,
        generated_outputs,
        generated_outputs != tokenizer.pad_token_id,
        prompt_len
    )

    # Get the value estimates for the initial state (the prompt).
    with torch.no_grad():
        # Get hidden states from the base model's transformer backbone using the prompt tokens.
        prompt_hidden_states = policy_model.get_base_model().transformer(
            **inputs
        )[0]
        # Predict the value from these hidden states.
        values_old = value_model(prompt_hidden_states)

    # --- Step 4: Compute Group-Relative Advantages ---
    # Equation: A_i = (r_i - mean(r_group)) / (std(r_group) + Œµ)
    # Reshape rewards to (num_prompts, group_size) to operate on groups.
    rewards_grouped = rewards.view(-1, group_size)
    # Calculate mean and standard deviation for each group.
    mean_rewards = rewards_grouped.mean(dim=1, keepdim=True)
    std_rewards = rewards_grouped.std(dim=1, keepdim=True)
    # Normalize rewards to get advantages, adding epsilon for stability.
    advantages = (rewards_grouped - mean_rewards) / (std_rewards + 1e-8)

    # Create and return the experience buffer, detaching all tensors from the computation graph.
    return ExperienceBuffer(
        prompt_tokens=inputs['input_ids'],
        prompt_attention_mask=inputs['attention_mask'],
        generated_tokens=generated_outputs,
        log_probs_old=log_probs_old.detach(),
        rewards=rewards.detach(),
        advantages=advantages.view(-1).detach(), # Flatten advantages back to a 1D tensor.
        values_old=values_old.detach()
    )

# ------------------------------------------------------------------------------
# Task 6, Step 3: Optimize with the Time-GRPO objective and KL penalty
# ------------------------------------------------------------------------------

def _compute_and_optimize_loss(
    experience: ExperienceBuffer,
    models: Dict[str, Any],
    optimizer: torch.optim.Optimizer,
    config: Dict[str, Any]
) -> Dict[str, float]:
    """
    Computes the full Time-GRPO loss with the corrected KL penalty and performs one optimization step.

    This function is the core of the learning process in the RL loop.

    Process:
    1.  Re-computes log probabilities and value estimates under the current policy (œÄ_Œ∏).
    2.  Calculates the PPO Clipped Surrogate Objective for the policy loss.
    3.  Calculates the Mean Squared Error loss for the value function.
    4.  Calculates the forward KL divergence penalty, D_KL(œÄ_Œ∏ || œÄ_ref), as specified.
    5.  Combines these components into a total loss and performs a single
        gradient-based optimization step.

    Args:
        experience: The `ExperienceBuffer` dataclass instance collected from the
                    sampling phase.
        models: The dictionary of initialized models ('policy_model', 'value_model',
                'reference_model').
        optimizer: The PyTorch optimizer for the trainable parameters.
        config: The master configuration dictionary.

    Returns:
        A dictionary of metrics for monitoring training progress, including all
        individual loss components and other diagnostics.
    """
    # --- Input Validation ---
    # Ensure all required models are present.
    required_models = {'policy_model', 'value_model', 'reference_model', 'tokenizer'}
    if not required_models.issubset(models.keys()):
        raise ValueError(f"Models dictionary is missing one of the required keys: {required_models}")

    # Unpack models and configuration for clarity.
    policy_model = models['policy_model']
    value_model = models['value_model']
    reference_model = models['reference_model']
    tokenizer = models['tokenizer']

    rl_config = config['reasoning_model_config']['rl_training_params']
    ppo_clip_eps = rl_config['ppo_clip_epsilon']
    kl_beta = rl_config['kl_weight_beta']

    # --- Re-computation of Log Probs and Values under Current Policy (œÄ_Œ∏) ---
    # This step is necessary to get gradients for the current policy parameters.
    full_sequences = experience.generated_tokens
    attention_mask = (full_sequences != tokenizer.pad_token_id)
    prompt_len = experience.prompt_tokens.shape[1]

    # Perform a single forward pass through the backbone to get hidden states.
    # This is more efficient than separate passes for the policy and value heads.
    base_model_output = policy_model.get_base_model().transformer(
        input_ids=full_sequences, attention_mask=attention_mask
    )
    hidden_states = base_model_output[0]

    # Get new logits from the full policy model's language modeling head.
    logits_new = policy_model.lm_head(hidden_states)

    # Get new value predictions from the value head.
    values_new = value_model(hidden_states)

    # Calculate log probabilities of the generated tokens under the new policy.
    log_probs_new_all = torch.log_softmax(logits_new, dim=-1)
    gen_tokens = full_sequences[:, prompt_len:]
    gen_log_probs_new = torch.gather(log_probs_new_all[:, prompt_len - 1:-1, :], 2, gen_tokens.unsqueeze(-1)).squeeze(-1)
    # Mask out padding tokens before summing.
    sum_log_probs_new = (gen_log_probs_new * (gen_tokens != tokenizer.pad_token_id)).sum(dim=1)

    # --- 1. Clipped Surrogate Objective (Policy Loss) ---
    # Equation: r_t(Œ∏) = œÄ_Œ∏(a|s) / œÄ_Œ∏_old(a|s) = exp(log_prob_new - log_prob_old)
    # Calculate the probability ratio between the new and old policies in log space for stability.
    ratio = torch.exp(sum_log_probs_new - experience.log_probs_old)

    # Retrieve the pre-calculated advantages from the experience buffer.
    advantages = experience.advantages

    # Calculate the unclipped policy objective.
    loss_clip1 = ratio * advantages

    # Calculate the clipped policy objective.
    loss_clip2 = torch.clamp(ratio, 1 - ppo_clip_eps, 1 + ppo_clip_eps) * advantages

    # The policy loss is the negative of the minimum of the clipped and unclipped objectives.
    # We take the negative because optimizers perform minimization, but PPO maximizes the objective.
    policy_loss = -torch.min(loss_clip1, loss_clip2).mean()

    # --- 2. Value Function Loss ---
    # Standard Mean Squared Error loss between the predicted values and the actual rewards.
    # The 0.5 scaling factor is a conventional choice in PPO implementations.
    value_loss = 0.5 * torch.nn.functional.mse_loss(values_new, experience.rewards)

    # --- 3. KL Divergence Penalty (Corrected Implementation) ---
    # Get log probabilities of the generated sequences from the frozen reference model (œÄ_ref).
    # This is done within a no_grad context to ensure no gradients are computed for the reference model.
    with torch.no_grad():
        log_probs_ref = _get_sequence_log_probs(
            reference_model,
            full_sequences,
            attention_mask,
            prompt_len
        )

    # CORRECTED KL Divergence Calculation.
    # Equation: D_KL(œÄ_Œ∏ || œÄ_ref) ‚âà E_samples[log(œÄ_Œ∏) - log(œÄ_ref)]
    # This is the forward KL divergence, as specified in the paper's loss function.
    kl_div = (sum_log_probs_new - log_probs_ref).mean()

    # --- 4. Total Loss ---
    # Combine the three loss components into a single scalar loss.
    # Equation: L_total = L_policy + L_value + Œ≤ * D_KL(œÄ_Œ∏ || œÄ_ref)
    total_loss = policy_loss + value_loss + kl_beta * kl_div

    # --- Optimization Step ---
    # Zero out gradients from the previous optimization step.
    optimizer.zero_grad()

    # Perform backpropagation to compute gradients of the total loss
    # with respect to the trainable parameters (LoRA adapters and value head).
    total_loss.backward()

    # Apply gradient clipping to prevent exploding gradients, a crucial stability measure in RL.
    if rl_config.get('grad_clip_norm'):
        # Get all parameters that require gradients.
        trainable_params = [p for p in policy_model.parameters() if p.requires_grad] + \
                           list(value_model.parameters())
        # Clip the norm of the gradients.
        torch.nn.utils.clip_grad_norm_(trainable_params, rl_config['grad_clip_norm'])

    # Update the model weights using the computed gradients.
    optimizer.step()

    # --- Logging ---
    # Return a dictionary of metrics for monitoring training progress.
    # All metrics are detached and moved to CPU for logging to avoid holding onto the graph.
    with torch.no_grad():
        metrics = {
            'loss/policy': policy_loss.item(),
            'loss/value': value_loss.item(),
            'loss/kl_forward': kl_div.item(), # Log the corrected KL term
            'loss/total': total_loss.item(),
            'reward/mean': experience.rewards.mean().item(),
            'policy/ratio_mean': ratio.mean().item(),
            'policy/adv_mean': advantages.mean().item(),
            'log_probs/new_mean': sum_log_probs_new.mean().item(),
            'log_probs/ref_mean': log_probs_ref.mean().item(),
        }

    return metrics

# ------------------------------------------------------------------------------
# Task 6, Orchestrator Function
# ------------------------------------------------------------------------------

def train_reasoning_llm_stage1(
    prompts: List[str],
    y_targets: np.ndarray,
    metadata: pd.DataFrame,
    parser: LLMOutputParser,
    config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates the Stage 1 "Cold-Start" RL training process.

    This function manages the entire end-to-end workflow for Stage 1:
    1. Initializes all required models (policy, reference, value) and tokenizer.
    2. Sets up the optimizer and data loader.
    3. Runs the main training loop for a specified number of steps.
    4. In each step, it samples experiences and performs multiple optimization updates.
    5. Logs detailed metrics for monitoring.
    6. Saves the final trained model adapters and collected experiences.

    Args:
        prompts: A list of all training prompts.
        y_targets: A numpy array of all ground truth targets.
        metadata: The DataFrame with metadata for each window.
        parser: An instance of the LLMOutputParser.
        config: The master configuration dictionary.

    Returns:
        A dictionary containing the path to the saved LoRA model and a DataFrame
        of all collected experiences for use in Stage 2.
    """
    # --- Setup ---
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    rl_config = config['reasoning_model_config']['rl_training_params']

    print("--- Stage 1: Cold-Start RL Training ---")
    print(f"Using device: {device}")

    # Step 1: Initialize models
    models = _initialize_rl_models(config, device)

    # Setup optimizer for the trainable parameters (LoRA adapters + value head).
    trainable_params = [p for p in models['policy_model'].parameters() if p.requires_grad] + \
                       list(models['value_model'].parameters())
    optimizer = torch.optim.AdamW(trainable_params, lr=rl_config['learning_rate'])

    # Setup DataLoader to handle batching of prompts and targets.
    dataset = TensorDataset(torch.arange(len(prompts)), torch.from_numpy(y_targets).float())
    dataloader = DataLoader(dataset, batch_size=rl_config['batch_size'], shuffle=True)

    # List to store all generated experiences for Stage 2.
    all_experiences_data = []

    # --- Main Training Loop ---
    print(f"Starting training for {rl_config['max_steps']} steps...")
    step = 0
    training_complete = False
    while not training_complete:
        for batch_indices, batch_targets in dataloader:
            if step >= rl_config['max_steps']:
                training_complete = True
                break

            # Prepare batch data.
            batch_prompts = [prompts[i] for i in batch_indices.tolist()]
            batch_metadata = metadata.iloc[batch_indices.tolist()]
            batch_targets = batch_targets.to(device)

            # Step 2: Sample and collect experiences for the current batch.
            experience = _sample_and_collect_experiences(
                batch_prompts, batch_targets, models, parser, config, device
            )

            # Store experiences for Stage 2.
            # This is a critical step for the next stage of the pipeline.
            # We need to decode the generated text and store it with metadata.
            decoded_texts = models['tokenizer'].batch_decode(experience.generated_tokens, skip_special_tokens=True)

            # Reshape rewards and other data to match the group structure.
            group_size = rl_config['group_size_G']
            num_prompts_in_batch = len(batch_prompts)

            for i in range(num_prompts_in_batch):
                for j in range(group_size):
                    idx = i * group_size + j
                    meta = batch_metadata.iloc[i]
                    all_experiences_data.append({
                        'prompt': batch_prompts[i],
                        'generation': decoded_texts[idx],
                        'reward': experience.rewards[idx].item(),
                        'ticker': meta['ticker'],
                        'window_end_date': meta['window_end_date'],
                        'ground_truth': y_targets[batch_indices[i]]
                    })

            # Step 3: Perform multiple optimization updates on the collected experiences.
            for _ in range(rl_config['updates_per_batch']):
                metrics = _compute_and_optimize_loss(experience, models, optimizer, config)

            # Log metrics periodically.
            if step % 10 == 0:
                log_str = f"Step {step}/{rl_config['max_steps']} | "
                log_str += " | ".join([f"{k}: {v:.4f}" for k, v in metrics.items()])
                print(log_str)

            step += 1

    print("\n--- Stage 1 Training Complete ---")

    # --- Save Artifacts ---
    # Save the trained LoRA adapters for the policy model.
    output_dir = "./models/stage1_lora"
    os.makedirs(output_dir, exist_ok=True)
    models['policy_model'].save_pretrained(output_dir)
    print(f"Trained LoRA adapters saved to {output_dir}")

    # Convert the collected experiences into a structured DataFrame.
    experiences_df = pd.DataFrame(all_experiences_data)

    return {
        "lora_model_path": output_dir,
        "collected_experiences_df": experiences_df
    }


In [None]:
# Task 7: Train Reasoning LLM œÄŒ∏ ‚Äì Stage 2 (Rejection Sampling + SFT)

# ==============================================================================
# Task 7: Train Reasoning LLM œÄŒ∏ ‚Äì Stage 2 (Rejection Sampling + SFT)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 7, Step 1: Filter high-quality samples via rejection sampling
# ------------------------------------------------------------------------------

def _filter_high_quality_samples(
    experiences_df: pd.DataFrame,
    parser: LLMOutputParser,
    config: Dict[str, Any]
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Filters the collected experiences to keep only the highest-performing samples.

    This function implements the rejection sampling stage. It calculates the MSE
    for each valid generation, buckets the samples by ticker and time period,
    and keeps only the top performers (bottom 10th percentile of MSE) within
    each bucket.

    Args:
        experiences_df: A DataFrame containing all experiences from Stage 1.
        parser: An instance of the LLMOutputParser to parse generations.
        config: The master configuration dictionary.

    Returns:
        A tuple containing:
        - A DataFrame of the filtered, high-quality "golden" samples.
        - A summary DataFrame reporting the filtering statistics for each bucket.
    """
    # --- Data Preparation and MSE Calculation ---
    # Create a working copy of the DataFrame.
    df = experiences_df.copy()

    # Parse all generations to extract predictions.
    parsed_results = [parser.parse(gen) for gen in df['generation']]

    # Add parsed predictions and validity flags to the DataFrame.
    df['is_valid'] = [p.is_valid for p in parsed_results]
    df['prediction'] = [p.prediction for p in parsed_results]

    # Filter out any samples that were not validly parsed.
    valid_df = df[df['is_valid']].copy()

    # Calculate the Mean Squared Error for each valid prediction.
    # Equation: MSE = ||≈∑_Œ∏ - y||^2
    mses = valid_df.apply(
        lambda row: np.mean((row['prediction'] - row['ground_truth'])**2),
        axis=1
    )
    valid_df['mse'] = mses

    # --- Bucketing and Filtering ---
    # Extract rejection sampling configuration.
    rs_config = config['reasoning_model_config']['rl_training_params']['rejection_sampling']
    percentile_to_keep = rs_config['percentile_keep'] / 100.0 # Convert to 0.10

    # Create the 'time_period' bucket key from the window end date (e.g., '2023-01').
    valid_df['time_period'] = pd.to_datetime(valid_df['window_end_date']).dt.to_period('M')

    # Define the keys for grouping samples into buckets.
    bucket_keys = rs_config['bucket_keys']

    # Calculate the MSE threshold (e.g., 10th percentile) for each bucket.
    # `transform` broadcasts the result back to the original DataFrame's shape.
    valid_df['mse_threshold'] = valid_df.groupby(bucket_keys)['mse'].transform(
        lambda x: x.quantile(percentile_to_keep)
    )

    # Keep only the samples whose MSE is at or below the threshold for their bucket.
    golden_samples_df = valid_df[valid_df['mse'] <= valid_df['mse_threshold']].copy()

    # --- Reporting ---
    # Create a summary report of the filtering process.
    original_counts = valid_df.groupby(bucket_keys).size().rename('original_count')
    filtered_counts = golden_samples_df.groupby(bucket_keys).size().rename('filtered_count')

    summary_report = pd.concat([original_counts, filtered_counts], axis=1).fillna(0).astype(int)
    summary_report['percent_kept'] = (summary_report['filtered_count'] / summary_report['original_count'] * 100).round(2)

    # Clean up temporary columns before returning.
    golden_samples_df = golden_samples_df.drop(columns=['is_valid', 'prediction', 'mse', 'mse_threshold', 'time_period'])

    return golden_samples_df, summary_report

# ------------------------------------------------------------------------------
# Task 7, Step 2: Supervised fine-tuning (SFT) on filtered samples
# ------------------------------------------------------------------------------

class SFTDataset(Dataset):
    """
    A PyTorch Dataset for Supervised Fine-Tuning (SFT) of a causal language model.

    This class is specifically designed to format data for a standard SFT task
    where the model is trained to predict a `generation` sequence given a `prompt`.
    It handles the crucial logic of tokenizing the combined text and creating
    labels where the prompt portion is masked out, ensuring that the loss is
    calculated only on the tokens the model is supposed to generate.

    This is a fundamental component for the Hugging Face `Trainer` API.
    """
    def __init__(
        self,
        prompts: List[str],
        generations: List[str],
        tokenizer: PreTrainedTokenizer
    ):
        """
        Initializes the SFTDataset.

        Args:
            prompts: A list of the input prompt strings.
            generations: A list of the target generation strings, corresponding
                         one-to-one with the prompts.
            tokenizer: The pre-trained tokenizer to be used for encoding the text.

        Raises:
            ValueError: If the number of prompts does not match the number of
                        generations.
        """
        # --- Input Validation ---
        # Ensure that the prompts and generations lists are of the same length.
        if len(prompts) != len(generations):
            raise ValueError(
                f"Mismatch in dataset size: "
                f"Got {len(prompts)} prompts and {len(generations)} generations."
            )

        # Store the dataset components.
        self.prompts = prompts
        self.generations = generations
        self.tokenizer = tokenizer

    def __len__(self) -> int:
        """
        Returns the total number of samples in the dataset.

        Returns:
            The integer count of prompt-generation pairs.
        """
        return len(self.prompts)

    def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]:
        """
        Retrieves and processes a single sample from the dataset.

        This method performs the following steps for a given index `idx`:
        1. Concatenates the prompt and generation into a single text sequence.
        2. Tokenizes the full sequence to get `input_ids` and `attention_mask`.
        3. Creates a `labels` tensor, which is initially a copy of `input_ids`.
        4. Masks the prompt portion of the `labels` tensor by setting the
           corresponding token IDs to -100, the standard ignore index for
           PyTorch cross-entropy loss.

        Args:
            idx: The index of the sample to retrieve.

        Returns:
            A dictionary containing the processed tensors for a single sample,
            typically including 'input_ids', 'attention_mask', and 'labels'.
        """
        # --- Input Validation ---
        # Ensure the requested index is within the valid range.
        if not 0 <= idx < len(self):
            raise IndexError(f"Index {idx} is out of bounds for a dataset of size {len(self)}.")

        # --- Data Processing ---
        # Retrieve the prompt and generation for the specified index.
        prompt = self.prompts[idx]
        generation = self.generations[idx]

        # Combine the prompt and generation to form the full text sequence
        # that the model will see as input.
        full_text = prompt + generation

        # Tokenize the full text. We do not pad here; padding will be handled
        # dynamically by the DataCollator at the batch level.
        tokenized_full = self.tokenizer(
            full_text,
            truncation=True,
            padding=False,
            return_tensors=None # Return lists of IDs for now
        )

        # Tokenize the prompt separately to determine its length in tokens.
        # This length is needed for masking the labels.
        tokenized_prompt = self.tokenizer(
            prompt,
            truncation=True,
            padding=False,
            return_tensors=None
        )
        prompt_len = len(tokenized_prompt['input_ids'])

        # Create the labels for the language modeling task.
        # The labels are a copy of the input_ids, as the model is trained
        # to predict the next token in the sequence.
        labels = list(tokenized_full['input_ids'])

        # **Crucial Step**: Mask out the prompt portion in the labels.
        # By setting the label tokens corresponding to the prompt to -100, we instruct
        # the PyTorch loss function (CrossEntropyLoss) to ignore these tokens
        # when calculating the loss. This ensures that the model is only trained
        # to predict the `generation` part of the sequence.
        for i in range(prompt_len):
            labels[i] = -100

        # Add the processed labels to the dictionary of tokenized outputs.
        tokenized_full['labels'] = labels

        # Convert all lists of token IDs into PyTorch tensors before returning.
        # This is the format expected by the `Trainer` and `DataLoader`.
        return {key: torch.tensor(value) for key, value in tokenized_full.items()}


def _run_sft(
    golden_samples_df: pd.DataFrame,
    stage1_model_path: str,
    config: Dict[str, Any],
    output_dir: str
) -> None:
    """
    Runs the Supervised Fine-Tuning process on the high-quality samples.

    Args:
        golden_samples_df: The DataFrame of filtered "golden" samples.
        stage1_model_path: The path to the LoRA adapters from Stage 1.
        config: The master configuration dictionary.
        output_dir: The directory to save the SFT checkpoints and final model.
    """
    # --- Model and Tokenizer Initialization ---
    # Load the base model and tokenizer.
    model_id = config['reasoning_model_config']['llm_settings']['base_model_identifier']
    base_model = AutoModelForCausalLM.from_pretrained(
        model_id,
        torch_dtype=torch.bfloat16,
        trust_remote_code=True
    )
    tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
    tokenizer.padding_side = 'right' # Use right padding for SFT

    # Load the LoRA adapters from Stage 1 onto the base model.
    model = PeftModel.from_pretrained(base_model, stage1_model_path)
    model.train() # Set the model to training mode.

    # --- Dataset Preparation ---
    # Split the golden samples into training and validation sets.
    train_df = golden_samples_df.sample(frac=0.9, random_state=42)
    val_df = golden_samples_df.drop(train_df.index)

    # Create PyTorch Dataset objects.
    train_dataset = SFTDataset(train_df['prompt'].tolist(), train_df['generation'].tolist(), tokenizer)
    val_dataset = SFTDataset(val_df['prompt'].tolist(), val_df['generation'].tolist(), tokenizer)

    # --- Trainer Configuration ---
    sft_config = config['reasoning_model_config']['sft_training_params']

    # Define the training arguments for the Hugging Face Trainer.
    training_args = TrainingArguments(
        output_dir=output_dir,
        num_train_epochs=sft_config['epochs'],
        per_device_train_batch_size=sft_config['batch_size'],
        per_device_eval_batch_size=sft_config['batch_size'],
        learning_rate=sft_config['learning_rate'],
        warmup_steps=50,
        weight_decay=0.01,
        logging_dir=f"{output_dir}/logs",
        logging_steps=10,
        evaluation_strategy="steps",
        eval_steps=50,
        save_strategy="steps",
        save_steps=50,
        load_best_model_at_end=True, # Critical for getting the best checkpoint
        metric_for_best_model="eval_loss",
        greater_is_better=False,
        report_to="none" # Disable wandb/tensorboard reporting for simplicity
    )

    # The DataCollator handles padding within each batch.
    data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False)

    # Initialize the Trainer.
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=val_dataset,
        data_collator=data_collator,
    )

    # --- Run Training ---
    print("Starting Supervised Fine-Tuning (SFT)...")
    trainer.train()

    # Save the best model checkpoint.
    trainer.save_model(f"{output_dir}/best_checkpoint")
    print(f"SFT complete. Best model saved to {output_dir}/best_checkpoint")

# ------------------------------------------------------------------------------
# Task 7, Orchestrator Function
# ------------------------------------------------------------------------------

def train_reasoning_llm_stage2(
    stage1_artifacts: Dict[str, Any],
    parser: LLMOutputParser,
    config: Dict[str, Any]
) -> str:
    """
    Orchestrates the Stage 2 "Rejection Sampling + SFT" process.

    This function distills the knowledge from the noisy RL exploration by:
    1. Filtering the experiences from Stage 1 to create a high-quality dataset.
    2. Fine-tuning the Stage 1 model on this "golden" dataset using standard
       supervised learning techniques.

    Args:
        stage1_artifacts: The dictionary of artifacts from the Stage 1 run,
                          containing the model path and experiences DataFrame.
        parser: An instance of the LLMOutputParser.
        config: The master configuration dictionary.

    Returns:
        The file path to the best model checkpoint from the SFT process, which
        will be the starting point for Stage 3.
    """
    print("\n--- Stage 2: Rejection Sampling and SFT ---")

    # --- Input Validation ---
    if 'lora_model_path' not in stage1_artifacts or 'collected_experiences_df' not in stage1_artifacts:
        raise ValueError("`stage1_artifacts` is missing required keys: 'lora_model_path' or 'collected_experiences_df'.")

    experiences_df = stage1_artifacts['collected_experiences_df']
    stage1_model_path = stage1_artifacts['lora_model_path']

    # --- Step 1: Filter high-quality samples ---
    print("Step 1: Filtering high-quality samples via rejection sampling...")
    golden_samples_df, summary_report = _filter_high_quality_samples(
        experiences_df, parser, config
    )
    print(f"Filtered {len(experiences_df)} experiences down to {len(golden_samples_df)} golden samples.")
    print("Filtering summary per bucket:")
    print(summary_report.head())

    if golden_samples_df.empty:
        raise RuntimeError("Rejection sampling resulted in an empty dataset. Cannot proceed with SFT.")

    # --- Step 2: Run Supervised Fine-Tuning ---
    print("\nStep 2: Running Supervised Fine-Tuning on golden samples...")
    sft_output_dir = "./models/stage2_sft"
    _run_sft(golden_samples_df, stage1_model_path, config, sft_output_dir)

    best_checkpoint_path = f"{sft_output_dir}/best_checkpoint"

    print(f"\n--- Stage 2 Training Complete. Best model is at: {best_checkpoint_path} ---")

    return best_checkpoint_path


In [None]:
# Task 8: Train Reasoning LLM œÄŒ∏ ‚Äì Stage 3 (RL for Reasoning with Time-GRPO)

# ==============================================================================
# Task 8: Train Reasoning LLM œÄŒ∏ ‚Äì Stage 3 (RL for Reasoning with Time-GRPO)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 8, Step 1: Initialize from SFT and resume RL training
# ------------------------------------------------------------------------------

def _initialize_stage3_models(
    sft_checkpoint_path: str,
    config: Dict[str, Any],
    device: torch.device
) -> Dict[str, Any]:
    """
    Initializes all models for Stage 3, loading the policy from an SFT checkpoint.

    This function is the entry point for Stage 3 model setup. It loads the base
    LLM, then applies the fine-tuned LoRA adapters from the SFT stage to create
    the initial policy model. It also re-initializes a frozen reference model
    and a trainable value head, ensuring a clean start for the final RL phase.

    Args:
        sft_checkpoint_path: The file path to the best model checkpoint from Stage 2.
        config: The master configuration dictionary.
        device: The torch device for model placement.

    Returns:
        A dictionary containing all initialized components for Stage 3 training.

    Raises:
        FileNotFoundError: If the `sft_checkpoint_path` does not exist.
        ValueError: If essential configuration keys are missing.
    """
    # --- Input Validation ---
    # Verify that the provided checkpoint path is valid.
    if not os.path.isdir(sft_checkpoint_path):
        raise FileNotFoundError(f"SFT checkpoint path not found: {sft_checkpoint_path}")

    try:
        # --- Load Base Model and Tokenizer ---
        # Extract the model identifier from the configuration.
        model_id = config['reasoning_model_config']['llm_settings']['base_model_identifier']
        print(f"Loading base model '{model_id}' for Stage 3 initialization...")

        # Load the base causal language model using bfloat16 for performance.
        base_model = AutoModelForCausalLM.from_pretrained(
            model_id,
            torch_dtype=torch.bfloat16,
            trust_remote_code=True
        ).to(device)

        # Load the tokenizer associated with the base model.
        tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)

        # Configure tokenizer padding for autoregressive models.
        if tokenizer.pad_token is None:
            tokenizer.pad_token = tokenizer.eos_token
        tokenizer.padding_side = 'left'

        # --- Create Policy Model (œÄŒ∏) by loading SFT checkpoint ---
        print(f"Loading SFT-trained LoRA adapters from: {sft_checkpoint_path}")

        # Load the LoRA adapters from the specified path and apply them to the base model.
        # This creates the policy model initialized with the distilled knowledge from SFT.
        policy_model = PeftModel.from_pretrained(base_model, sft_checkpoint_path)

        # Ensure the model is in training mode for the upcoming RL phase.
        policy_model.train()

        print("\n--- Stage 3 Policy Model (œÄŒ∏) Trainable Parameters ---")
        # Print a summary of trainable vs. total parameters for explicit verification.
        policy_model.print_trainable_parameters()

        # --- Create Reference and Value Models (re-initialized) ---
        # The reference model MUST be the original, unmodified base model.
        # A deep copy ensures its weights are independent of the policy model's base.
        reference_model = copy.deepcopy(base_model)
        # Set the model to evaluation mode to disable dropout, etc.
        reference_model.eval()
        # Explicitly disable gradient calculations for all parameters.
        for param in reference_model.parameters():
            param.requires_grad = False
        print(f"\n--- Reference Model (œÄref) re-initialized and frozen. ---")

        # The value model is trained alongside the policy, so it's re-initialized for this stage.
        value_model = ValueHead(base_model.config.hidden_size).to(device)
        value_model.train()
        print(f"\n--- Value Head re-initialized for Stage 3. ---")

    except KeyError as e:
        # Catch missing keys in the configuration dictionary.
        raise ValueError(f"Configuration is missing a required key for model initialization: {e}")
    except Exception as e:
        # Catch other potential errors during model loading.
        raise RuntimeError(f"An unexpected error occurred during model initialization: {e}")

    # Return all components in a structured dictionary.
    return {
        'policy_model': policy_model,
        'reference_model': reference_model,
        'value_model': value_model,
        'tokenizer': tokenizer
    }

# ------------------------------------------------------------------------------
# Task 8, Step 2 & 3: Optimize with Time-GRPO and Persist Final Policy
# ------------------------------------------------------------------------------

def _run_validation(
    models: Dict[str, Any],
    val_dataloader: DataLoader,
    val_prompts: List[str],
    parser: LLMOutputParser,
    config: Dict[str, Any],
    device: torch.device
) -> float:
    """
    Runs the model on a validation set and computes the average reward.

    Args:
        models: The dictionary of initialized models.
        val_dataloader: DataLoader for the validation set.
        val_prompts: The list of all validation prompts.
        parser: An instance of the LLMOutputParser.
        config: The master configuration dictionary.
        device: The torch device.

    Returns:
        The average reward achieved on the validation set.
    """
    # Set models to evaluation mode to disable dropout and other training-specific layers.
    models['policy_model'].eval()
    models['value_model'].eval()

    total_val_reward = 0.0

    # Disable gradient calculations for efficiency during validation.
    with torch.no_grad():
        # Iterate through the validation set with a progress bar.
        for val_batch_indices, val_batch_targets in tqdm(val_dataloader, desc="Validation"):
            # Prepare validation batch data.
            val_batch_prompts = [val_prompts[i] for i in val_batch_indices.tolist()]
            val_batch_targets = val_batch_targets.to(device)

            # For validation, we sample with G=1 for efficiency and consistency.
            # We reuse the experience collection function, which is robust to this.
            # A deterministic generation config could be used here for perfect reproducibility.
            val_experience = _sample_and_collect_experiences(
                val_batch_prompts, val_batch_targets, models, parser, config, device
            )

            # Accumulate the mean reward for the batch.
            total_val_reward += val_experience.rewards.mean().item()

    # Calculate the average reward across all validation batches.
    avg_val_reward = total_val_reward / len(val_dataloader)

    return avg_val_reward


def train_reasoning_llm_stage3(
    sft_checkpoint_path: str,
    prompts: List[str],
    y_targets: np.ndarray,
    metadata: pd.DataFrame,
    parser: LLMOutputParser,
    config: Dict[str, Any]
) -> str:
    """
    Orchestrates the Stage 3 final RL fine-tuning process.

    This function takes the SFT-distilled model and fine-tunes it using the
    Time-GRPO objective. It includes a validation loop to save the best-performing
    model checkpoint based on validation set reward.

    Args:
        sft_checkpoint_path: Path to the best model from Stage 2.
        prompts: A list of all training prompts.
        y_targets: A numpy array of all ground truth targets.
        metadata: The DataFrame with metadata for each window.
        parser: An instance of the LLMOutputParser.
        config: The master configuration dictionary.

    Returns:
        The file path to the best and final reasoning policy checkpoint.
    """
    # --- Setup ---
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    rl_config = config['reasoning_model_config']['rl_training_params']
    output_dir = "./models/stage3_final_lora"
    os.makedirs(output_dir, exist_ok=True)

    print("\n--- Stage 3: Final RL Fine-Tuning ---")
    print(f"Using device: {device}")

    # Step 1: Initialize models from the SFT checkpoint.
    models = _initialize_stage3_models(sft_checkpoint_path, config, device)

    # Setup a new optimizer for this final training stage.
    trainable_params = [p for p in models['policy_model'].parameters() if p.requires_grad] + \
                       list(models['value_model'].parameters())
    optimizer = torch.optim.AdamW(trainable_params, lr=rl_config['learning_rate'])

    # --- Data Splitting for Train/Validation ---
    # Create a combined DataFrame for easy splitting.
    full_dataset_df = metadata.copy()
    full_dataset_df['prompt'] = prompts
    # This is a safe way to handle numpy arrays in pandas cells.
    full_dataset_df['y_target'] = list(y_targets)

    # Perform a simple chronological split for validation.
    # E.g., use the last 10% of the data (by date) for validation.
    split_date = full_dataset_df['window_end_date'].quantile(0.9, interpolation='nearest')
    train_df = full_dataset_df[full_dataset_df['window_end_date'] < split_date]
    val_df = full_dataset_df[full_dataset_df['window_end_date'] >= split_date]

    # Prepare data for DataLoader.
    train_prompts = train_df['prompt'].tolist()
    train_y_targets = np.array(train_df['y_target'].tolist())
    val_prompts = val_df['prompt'].tolist()
    val_y_targets = np.array(val_df['y_target'].tolist())

    train_dataset = TensorDataset(torch.arange(len(train_prompts)), torch.from_numpy(train_y_targets).float())
    train_dataloader = DataLoader(train_dataset, batch_size=rl_config['batch_size'], shuffle=True)

    val_dataset = TensorDataset(torch.arange(len(val_prompts)), torch.from_numpy(val_y_targets).float())
    val_dataloader = DataLoader(val_dataset, batch_size=rl_config['batch_size'])

    # --- Training and Validation Loop ---
    best_val_reward = -float('inf')
    best_model_path = ""

    print(f"Starting final fine-tuning for {rl_config['max_steps']} steps...")
    pbar = tqdm(total=rl_config['max_steps'], desc="Training Step")

    step = 0
    training_complete = False
    while not training_complete:
        for batch_indices, batch_targets in train_dataloader:
            if step >= rl_config['max_steps']:
                training_complete = True
                break

            # Set models to training mode for this step.
            models['policy_model'].train()
            models['value_model'].train()

            # Prepare batch data.
            batch_prompts = [train_prompts[i] for i in batch_indices.tolist()]
            batch_targets = batch_targets.to(device)

            # Step 2 (reuse): Sample experiences using the current policy.
            experience = _sample_and_collect_experiences(
                batch_prompts, batch_targets, models, parser, config, device
            )

            # Step 2 (reuse): Perform multiple optimization updates.
            for _ in range(rl_config['updates_per_batch']):
                metrics = _compute_and_optimize_loss(experience, models, optimizer, config)

            # --- Logging and Validation ---
            if step % 10 == 0:
                pbar.set_postfix(metrics)

            # Periodic validation to save the best model.
            if step > 0 and step % 50 == 0:
                avg_val_reward = _run_validation(
                    models, val_dataloader, val_prompts, parser, config, device
                )
                print(f"\nValidation at Step {step}: Avg Reward = {avg_val_reward:.4f} | Best = {best_val_reward:.4f}")

                # Step 3: Persist the best policy.
                if avg_val_reward > best_val_reward:
                    best_val_reward = avg_val_reward
                    best_model_path = f"{output_dir}/best_checkpoint"
                    models['policy_model'].save_pretrained(best_model_path)
                    print(f"New best model saved to {best_model_path}\n")

            step += 1
            pbar.update(1)

    pbar.close()
    print("\n--- Stage 3 Training Complete ---")

    # --- Finalization ---
    # If no best model was saved (e.g., validation never improved), save the final state.
    if not best_model_path:
        best_model_path = f"{output_dir}/final_checkpoint"
        models['policy_model'].save_pretrained(best_model_path)
        print(f"Validation reward did not improve. Final model saved to {best_model_path}")

    return best_model_path


In [None]:
# Task 9: Train Time-Series Forecasting Backbone œÜ (Cross-Modal Alignment)

# ==============================================================================
# Task 9: Train Time-Series Forecasting Backbone œÜ (Cross-Modal Alignment)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 9, Step 1: Define the dual-branch architecture
# ------------------------------------------------------------------------------

def _get_language_prototypes(
    model_identifier: str,
    n_prototypes: int,
    device: torch.device
) -> torch.Tensor:
    """
    Extracts language prototypes from a pretrained LLM's embedding matrix via PCA.

    Args:
        model_identifier: The Hugging Face identifier for the base LLM (e.g., 'gpt2').
        n_prototypes: The number of principal components to extract.
        device: The torch device.

    Returns:
        A tensor of shape (n_prototypes, embedding_dim) containing the language prototypes.
    """
    print(f"Extracting language prototypes from '{model_identifier}'...")
    # Load the pretrained model.
    with torch.no_grad():
        base_llm = AutoModel.from_pretrained(model_identifier).to(device)
        # Get the token embedding matrix.
        embedding_matrix = base_llm.get_input_embeddings().weight.data.cpu().numpy()

    # Perform PCA to find the most significant "concept" vectors.
    pca = PCA(n_components=n_prototypes)
    prototypes = pca.fit_transform(embedding_matrix)

    # Return the prototypes as a frozen tensor on the specified device.
    return torch.tensor(prototypes, dtype=torch.float32).to(device)


class ForecastingBackbone(nn.Module):
    """
    The dual-branch time-series forecasting backbone (œÜ).

    This model implements the "Latent Thinking" component of the VTA framework,
    as described in Section 3.3 of the paper. It is a bespoke architecture
    designed to process time-series data through two parallel transformer-based
    branches, encouraging alignment between the raw temporal patterns and a
    latent language space.

    Architecture:
    1.  **Input Projection**: A linear layer embeds the input time-series features
        (e.g., 6 OHLCV features) into the model's hidden dimension (`d_model`).
    2.  **Positional Encoding**: A learnable positional encoding is added to the
        embedded sequence to provide temporal information.
    3.  **Cross-Modal Alignment**: A cross-attention mechanism aligns the temporal
        sequence with a set of pre-computed "language prototypes" (derived from
        a base LLM like GPT-2 via PCA). This produces a "textual" representation
        of the time-series.
    4.  **Dual Transformer Branches**: Two parallel transformer encoders process
        the original temporal sequence (`x_time`) and the aligned textual
        sequence (`x_text`) independently.
    5.  **Output Heads**: Two separate linear heads project the final hidden state
        of each branch to produce two distinct forecasts.

    The model's forward pass returns all necessary outputs for computing the
    specialized training losses (feature regularization and output alignment).
    """
    def __init__(self, config: Dict[str, Any], language_prototypes: torch.Tensor):
        """
        Initializes the ForecastingBackbone model.

        Args:
            config: The `forecasting_model_config` section of the master
                    configuration dictionary.
            language_prototypes: A pre-computed tensor of shape
                                 (n_prototypes, embedding_dim) containing the
                                 principal components of a base LLM's embedding
                                 matrix.

        Raises:
            KeyError: If required parameters are missing from the configuration.
        """
        # Initialize the parent torch.nn.Module.
        super().__init__()

        # --- Configuration and Parameter Extraction ---
        try:
            # Extract the architecture parameters sub-dictionary.
            arch_params = config['architecture_params']

            # The model's main hidden dimension.
            self.d_model: int = arch_params['d_model']

            # The number of heads for multi-head attention mechanisms.
            self.n_head: int = arch_params['n_heads']

            # The number of future time steps to predict (T').
            self.prediction_horizon: int = config['prediction_horizon']

            # The number of features in the input time-series (e.g., 6 for OHLCVA).
            self.input_features: int = 6

            # The length of the input time-series window (T).
            self.input_window_size: int = 10

        except KeyError as e:
            # Raise an error if the configuration is incomplete.
            raise KeyError(f"ForecastingBackbone config is missing required key: {e}")

        # --- 1. Input Embedding Layers ---
        # A linear layer to project the input features into the model's hidden dimension.
        self.input_projection = nn.Linear(self.input_features, self.d_model)

        # A learnable parameter for positional encoding. This allows the model to learn
        # the importance of the position of each time step in the input window.
        self.pos_encoder = nn.Parameter(torch.zeros(1, self.input_window_size, self.d_model), requires_grad=True)

        # --- 2. Cross-Modal Alignment Mechanism ---
        # Register the language prototypes as a non-trainable buffer. This ensures they
        # are moved to the correct device with the model but are not considered parameters.
        self.register_buffer('language_prototypes', language_prototypes)

        # A standard MultiheadAttention layer configured for cross-attention, where the
        # query comes from the time-series and the key/value come from the prototypes.
        self.cross_attention = nn.MultiheadAttention(
            embed_dim=self.d_model,
            num_heads=self.n_head,
            dropout=arch_params['dropout'],
            batch_first=True
        )

        # --- 3. Dual Transformer Branches ---
        # Define a template for a single transformer encoder layer.
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=self.d_model,
            nhead=self.n_head,
            dim_feedforward=arch_params['d_ff'],
            dropout=arch_params['dropout'],
            batch_first=True
        )

        # Create the temporal branch as a list of transformer layers.
        self.temporal_branch = nn.ModuleList(
            [copy.deepcopy(encoder_layer) for _ in range(arch_params['n_layers_temporal'])]
        )

        # Create the textual branch as a separate list of transformer layers.
        self.textual_branch = nn.ModuleList(
            [copy.deepcopy(encoder_layer) for _ in range(arch_params['n_layers_textual'])]
        )

        # --- 4. Output Heads ---
        # A linear layer to project the final hidden state of the temporal branch to the forecast horizon.
        self.temporal_head = nn.Linear(self.d_model, self.prediction_horizon)

        # A separate linear layer for the textual branch's forecast.
        self.textual_head = nn.Linear(self.d_model, self.prediction_horizon)

        # --- 5. Feature Regularization Projection Layers ---
        # A list of linear layers used to project the intermediate features from each
        # transformer layer before calculating the feature regularization loss.
        self.feature_projections = nn.ModuleList(
            [nn.Linear(self.d_model, self.d_model) for _ in range(len(self.temporal_branch))]
        )

    def forward(self, x: torch.Tensor) -> Dict[str, Any]:
        """
        Performs a forward pass through the dual-branch architecture.

        Args:
            x: Input tensor of shape (batch_size, T, num_features), where T is
               the input window size and num_features is 6.

        Returns:
            A dictionary containing all outputs required for training and inference:
            - 'y_hat_time': The forecast from the temporal branch, shape (batch_size, T').
            - 'y_hat_text': The forecast from the textual branch, shape (batch_size, T').
            - 'intermediate_features_time': A list of hidden state tensors from each
              layer of the temporal branch.
            - 'intermediate_features_text': A list of hidden state tensors from each
              layer of the textual branch.
        """
        # --- Input Validation ---
        if x.ndim != 3 or x.shape[2] != self.input_features or x.shape[1] != self.input_window_size:
            raise ValueError(
                f"Input tensor has incorrect shape. Expected (batch, {self.input_window_size}, {self.input_features}), "
                f"but got {x.shape}."
            )

        # --- 1. Input Projection and Positional Encoding ---
        # Project input features to the model's hidden dimension.
        # Shape: (batch, T, 6) -> (batch, T, d_model)
        x_time = self.input_projection(x)

        # Add the learnable positional encodings.
        x_time = x_time + self.pos_encoder

        # --- 2. Cross-Modal Alignment ---
        # Equation 4: X_text = Softmax(QK^T/‚àöC)V
        # Here, Query = x_time, and Key/Value = language_prototypes.
        # The prototypes tensor needs to be expanded to match the batch size for the attention mechanism.
        prototypes_expanded = self.language_prototypes.unsqueeze(0).expand(x.shape[0], -1, -1)

        # Perform cross-attention. The output `x_text` is the aligned representation.
        # Shape: (batch, T, d_model)
        x_text, _ = self.cross_attention(
            query=x_time,
            key=prototypes_expanded,
            value=prototypes_expanded
        )

        # --- 3. Dual-Branch Transformer Processing ---
        # Lists to store the intermediate hidden states from each layer for the regularization loss.
        intermediate_features_time: List[torch.Tensor] = []
        intermediate_features_text: List[torch.Tensor] = []

        # Pass the temporal sequence through its dedicated transformer branch.
        temp_out = x_time
        for layer in self.temporal_branch:
            temp_out = layer(temp_out)
            intermediate_features_time.append(temp_out)

        # Pass the aligned textual sequence through its dedicated transformer branch.
        text_out = x_text
        for layer in self.textual_branch:
            text_out = layer(text_out)
            intermediate_features_text.append(text_out)

        # --- 4. Output Generation ---
        # The final representation from the last time step of the last layer is used for forecasting.
        # Shape: (batch, d_model) -> (batch, T')
        y_hat_time = self.temporal_head(intermediate_features_time[-1][:, -1, :])
        y_hat_text = self.textual_head(intermediate_features_text[-1][:, -1, :])

        # Return all computed tensors in a structured dictionary.
        return {
            'y_hat_time': y_hat_time,
            'y_hat_text': y_hat_text,
            'intermediate_features_time': intermediate_features_time,
            'intermediate_features_text': intermediate_features_text
        }

# ------------------------------------------------------------------------------
# Task 9, Step 2: Define feature regularization and output alignment losses
# ------------------------------------------------------------------------------

def _compute_backbone_loss(
    model_output: Dict[str, Any],
    model: ForecastingBackbone,
    config: Dict[str, Any]
) -> torch.Tensor:
    """
    Computes the combined loss for training the forecasting backbone.

    Args:
        model_output: The dictionary returned by the model's forward pass.
        model: The ForecastingBackbone model instance (to access projection layers).
        config: The master configuration dictionary.

    Returns:
        The total scalar loss for the batch.
    """
    loss_config = config['losses']

    # --- Output Alignment Loss (Equation 6) ---
    # L_output = sim(≈∑_time, ≈∑_text)
    # The paper uses L1 loss for similarity.
    output_loss = F.l1_loss(model_output['y_hat_time'], model_output['y_hat_text'])

    # --- Feature Regularization Loss (Equation 5) ---
    # L_feature = Œ£ Œ≥^(N-n) * sim(œÜ_text(F_text^n), œÜ_time(F_time^n))
    feature_loss = 0.0

    # Get the gamma schedule (decay factors).
    gamma_schedule = loss_config['feature_regularization']['gamma_schedule']

    # Iterate through the intermediate features from each layer.
    num_layers = min(len(model_output['intermediate_features_time']), len(gamma_schedule))
    for i in range(num_layers):
        # Project the features from both branches.
        proj_time = model.feature_projections[i](model_output['intermediate_features_time'][i])
        proj_text = model.feature_projections[i](model_output['intermediate_features_text'][i])

        # Calculate the L1 loss between the projected features.
        layer_loss = F.l1_loss(proj_time, proj_text)

        # Apply the scheduled weight and add to the total feature loss.
        feature_loss += gamma_schedule[i] * layer_loss

    # --- Total Loss ---
    # L_total = L_feature + w_output * L_output
    total_loss = feature_loss + loss_config['output_alignment']['weight'] * output_loss

    return total_loss

# ------------------------------------------------------------------------------
# Task 9, Step 3: Train the backbone and extract the unconditional forecaster
# ------------------------------------------------------------------------------

def train_forecasting_backbone(
    data_splits: Dict[str, Dict[str, np.ndarray]],
    config: Dict[str, Any]
) -> str:
    """
    Orchestrates the training of the ForecastingBackbone model.

    Args:
        data_splits: The dictionary of train/val/test data splits.
        config: The master configuration dictionary.

    Returns:
        The file path to the best trained model checkpoint.
    """
    # --- Setup ---
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    forecasting_config = config['forecasting_model_config']
    train_config = forecasting_config['training_params']
    output_dir = "./models/forecasting_backbone"
    os.makedirs(output_dir, exist_ok=True)

    print("\n--- Training Forecasting Backbone (œÜ) ---")
    print(f"Using device: {device}")

    # --- DataLoaders ---
    train_dataset = TensorDataset(
        torch.from_numpy(data_splits['train']['X']).float(),
        torch.from_numpy(data_splits['train']['y']).float() # Ground truth is needed for final model
    )
    train_loader = DataLoader(train_dataset, batch_size=train_config['batch_size'], shuffle=True)

    val_dataset = TensorDataset(
        torch.from_numpy(data_splits['val']['X']).float(),
        torch.from_numpy(data_splits['val']['y']).float()
    )
    val_loader = DataLoader(val_dataset, batch_size=train_config['batch_size'])

    # --- Model and Optimizer ---
    # Step 1a: Get language prototypes.
    prototypes = _get_language_prototypes(
        forecasting_config['architecture_params']['base_model_identifier'],
        forecasting_config['architecture_params']['pca_n_prototypes_D'],
        device
    )

    # Step 1b: Instantiate the model.
    model = ForecastingBackbone(forecasting_config, prototypes).to(device)
    optimizer = torch.optim.AdamW(model.parameters(), lr=train_config['learning_rate'])

    # --- Training Loop ---
    best_val_loss = float('inf')

    for epoch in range(train_config['epochs']):
        print(f"\nEpoch {epoch+1}/{train_config['epochs']}")

        # Training phase
        model.train()
        train_loss = 0.0
        for x_batch, _ in tqdm(train_loader, desc="Training"):
            x_batch = x_batch.to(device)

            optimizer.zero_grad()
            model_output = model(x_batch)
            loss = _compute_backbone_loss(model_output, model, forecasting_config)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        avg_train_loss = train_loss / len(train_loader)

        # Validation phase
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for x_batch, _ in tqdm(val_loader, desc="Validation"):
                x_batch = x_batch.to(device)
                model_output = model(x_batch)
                loss = _compute_backbone_loss(model_output, model, forecasting_config)
                val_loss += loss.item()

        avg_val_loss = val_loss / len(val_loader)
        print(f"Avg Train Loss: {avg_train_loss:.6f} | Avg Val Loss: {avg_val_loss:.6f}")

        # Save the best model checkpoint.
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            best_model_path = f"{output_dir}/best_checkpoint.pth"
            torch.save(model.state_dict(), best_model_path)
            print(f"New best model saved to {best_model_path}")

    print("\n--- Backbone Training Complete ---")
    return best_model_path


In [None]:
# Task 10: Train Joint Conditional Forecaster œà (Classifier-Free Conditioning)

# ==============================================================================
# Task 10: Train Joint Conditional Forecaster œà (Classifier-Free Conditioning)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 10, Step 1: Derive conditioning attributes c from the reasoning forecast
# ------------------------------------------------------------------------------

def _derive_conditioning_attributes(
    prompts: List[str],
    reasoning_model_path: str,
    parser: LLMOutputParser,
    config: Dict[str, Any],
    device: torch.device
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Generates conditioning attributes `c` using the final trained reasoning model.

    This function loads the fully trained reasoning policy (œÄŒ∏), generates a
    forecast for each input prompt, parses the output, and extracts summary
    statistics ([min, mean, max]) from each valid forecast. These statistics
    serve as the conditioning signal for the fusion model.

    Args:
        prompts: The list of prompts for the entire dataset (train + val).
        reasoning_model_path: Path to the trained Stage 3 reasoning model adapters.
        parser: An instance of the LLMOutputParser.
        config: The master configuration dictionary.
        device: The torch device.

    Returns:
        A tuple containing:
        - A numpy array of shape (num_samples, 3) with the [min, mean, max] attributes.
        - A boolean numpy array of shape (num_samples,) indicating which samples
          were successfully processed and yielded a valid forecast.
    """
    print("--- Deriving Conditioning Attributes `c` ---")

    # --- Load Trained Reasoning Model (œÄŒ∏) ---
    try:
        # Load the base model architecture.
        model_id = config['reasoning_model_config']['llm_settings']['base_model_identifier']
        base_model = AutoModelForCausalLM.from_pretrained(
            model_id, torch_dtype=torch.bfloat16, trust_remote_code=True
        )
        # Apply the trained LoRA adapters from the specified path.
        model = PeftModel.from_pretrained(base_model, reasoning_model_path).to(device)
        model.eval() # Set model to evaluation mode.

        # Load the corresponding tokenizer.
        tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
        if tokenizer.pad_token is None:
            tokenizer.pad_token = tokenizer.eos_token
        tokenizer.padding_side = 'left'
    except Exception as e:
        raise IOError(f"Failed to load reasoning model from path '{reasoning_model_path}'. Error: {e}")

    # --- Generate Forecasts and Extract Attributes in Batches ---
    attributes = []
    valid_mask = []
    # Use a reasonable batch size for inference.
    batch_size = config['conditional_fusion_config']['training_params']['batch_size']

    # Process all prompts in batches for efficiency.
    for i in tqdm(range(0, len(prompts), batch_size), desc="Generating Attributes"):
        # Slice the current batch of prompts.
        batch_prompts = prompts[i:i+batch_size]

        # Tokenize the batch.
        inputs = tokenizer(
            batch_prompts, return_tensors="pt", padding=True, truncation=True
        ).to(device)

        # Generate outputs using a deterministic strategy (greedy search).
        with torch.no_grad():
            outputs = model.generate(**inputs, max_new_tokens=128, do_sample=False)

        # Decode the generated token sequences back to text.
        decoded_texts = tokenizer.batch_decode(outputs, skip_special_tokens=True)

        # Parse each output in the batch.
        for text in decoded_texts:
            parsed = parser.parse(text)
            if parsed.is_valid:
                # Equation: c = g(≈∑_Œ∏), where g computes [min, mean, max]
                pred = parsed.prediction
                attributes.append([np.min(pred), np.mean(pred), np.max(pred)])
                valid_mask.append(True)
            else:
                # If parsing fails, append a placeholder and mark as invalid.
                attributes.append([0.0, 0.0, 0.0])
                valid_mask.append(False)

    print(f"Successfully derived attributes for {sum(valid_mask)}/{len(prompts)} samples.")

    # Convert lists to numpy arrays for efficient indexing.
    return np.array(attributes, dtype=np.float32), np.array(valid_mask, dtype=bool)


# ------------------------------------------------------------------------------
# Task 10, Step 2: Define the œà architecture with classifier-free training
# ------------------------------------------------------------------------------

class ConditionalFusionModel(nn.Module):
    """
    The joint conditional forecaster (œà), implementing classifier-free guidance.

    This model fuses the unconditional forecast from the backbone (œÜ) with
    the conditioning attributes (c) from the reasoning model (œÄŒ∏). It is
    trained with random dropping of the conditioning signal, which allows it
    to perform guided inference by interpolating between its conditional and
    unconditional predictions, as described in Section 3.4 of the paper.

    Architecture:
    1.  **Backbone (œÜ)**: Contains a pre-trained `ForecastingBackbone` which
        generates an unconditional forecast `≈∑_œÜ(X)`.
    2.  **Attribute Encoder**: A small Multi-Layer Perceptron (MLP) that encodes
        the conditioning vector `c` into a higher-dimensional representation.
    3.  **Projection Head**: A final MLP that takes the concatenation of `≈∑_œÜ(X)`
        and the encoded `c` as input and learns to predict an `adjustment`.
    4.  **Residual Connection**: The final output is the sum of the unconditional
        forecast and the learned adjustment, i.e., `≈∑_œà = ≈∑_œÜ + adjustment`.
        This structure is key to the guided inference mechanism.
    """
    def __init__(self, backbone: ForecastingBackbone, config: Dict[str, Any]):
        """
        Initializes the ConditionalFusionModel.

        Args:
            backbone: An instance of the pre-trained `ForecastingBackbone` (œÜ).
            config: The `conditional_fusion_config` section of the master
                    configuration dictionary.

        Raises:
            KeyError: If required parameters are missing from the configuration.
        """
        # Initialize the parent torch.nn.Module.
        super().__init__()

        # Store the backbone model as a submodule.
        self.backbone = backbone

        try:
            # Extract the architecture configuration for the fusion model.
            psi_config = config['psi_architecture']

            # The number of conditioning attributes (e.g., 3 for min, mean, max).
            num_attributes = len(config['attributes_c_definition']['components'])

            # An MLP to encode the conditioning attributes.
            self.attribute_encoder = nn.Sequential(
                nn.Linear(num_attributes, psi_config['per_attribute_linear_out_dim']),
                nn.GELU(), # Using GELU as a modern activation function.
                nn.Linear(psi_config['per_attribute_linear_out_dim'], psi_config['per_attribute_linear_out_dim'])
            )

            # The final projection head that learns the adjustment to the unconditional forecast.
            # Its input size is the sum of the forecast horizon and the encoded attribute dimension.
            self.projection_head = nn.Sequential(
                nn.Linear(backbone.prediction_horizon + psi_config['per_attribute_linear_out_dim'], psi_config['projection_mlp_width']),
                nn.GELU(),
                nn.Dropout(psi_config.get('dropout', 0.1)),
                nn.Linear(psi_config['projection_mlp_width'], backbone.prediction_horizon)
            )

            # Handle the freezing of the backbone model's weights based on the config.
            if config['training_params']['freeze_backbone_phi']:
                print("Freezing forecasting backbone (œÜ) weights.")
                # Iterate through all parameters of the backbone and disable gradients.
                for param in self.backbone.parameters():
                    param.requires_grad = False

        except KeyError as e:
            # Raise a specific error if the configuration is incomplete.
            raise KeyError(f"ConditionalFusionModel config is missing required key: {e}")

    def forward(
        self,
        x: torch.Tensor,
        c: torch.Tensor,
        p_uncond: float = 0.0,
        force_unconditional: bool = False
    ) -> torch.Tensor:
        """
        Performs a forward pass with classifier-free guidance dropping.

        This method can operate in three modes:
        1.  **Training Mode**: Randomly replaces `c` with a null token (zeros)
            with probability `p_uncond`.
        2.  **Conditional Inference**: Uses the provided `c` as is.
        3.  **Unconditional Inference**: If `force_unconditional` is True,
            replaces `c` with the null token.

        Args:
            x: Input time-series tensor of shape (batch_size, T, 6).
            c: Conditioning attribute tensor of shape (batch_size, num_attributes).
            p_uncond: The probability of dropping the conditioning signal during training.
            force_unconditional: If True, forces the model to run in unconditional mode.

        Returns:
            The final forecast tensor of shape (batch_size, T').
        """
        # --- Input Validation ---
        if x.ndim != 3 or c.ndim != 2 or x.shape[0] != c.shape[0]:
            raise ValueError(
                f"Shape mismatch in inputs. Got x: {x.shape}, c: {c.shape}. "
                "Batch sizes must match and dimensions must be correct."
            )

        # --- 1. Get Unconditional Forecast ≈∑_œÜ(X) ---
        # The backbone is run with gradients disabled if it's frozen.
        # `torch.set_grad_enabled` is a context manager for this.
        is_backbone_trainable = any(p.requires_grad for p in self.backbone.parameters())
        with torch.set_grad_enabled(is_backbone_trainable and self.training):
            backbone_output = self.backbone(x)

        # As per the paper, the unconditional forecast is the output of the temporal branch.
        y_hat_phi = backbone_output['y_hat_time']

        # --- 2. Implement Classifier-Free Guidance Dropping for `c` ---
        if force_unconditional:
            # For guided inference's unconditional pass, force `c` to be the null token (zeros).
            c = torch.zeros_like(c)
        elif self.training and p_uncond > 0:
            # During training, create a random mask to drop conditioning for a subset of the batch.
            # This teaches the model to function even when `c` is absent.
            uncond_mask = torch.rand(c.shape[0], device=c.device) < p_uncond
            c[uncond_mask] = 0.0

        # --- 3. Encode Attributes and Fuse with Unconditional Forecast ---
        # Pass the (potentially dropped) conditioning vector through its encoder.
        encoded_c = self.attribute_encoder(c)

        # Concatenate the unconditional forecast and the encoded attributes.
        combined_features = torch.cat([y_hat_phi, encoded_c], dim=1)

        # --- 4. Generate Final Forecast using a Residual Connection ---
        # The projection head learns the *adjustment* based on the conditioning signal.
        adjustment = self.projection_head(combined_features)

        # The final output is the sum of the unconditional base and the learned adjustment.
        # This structure directly mirrors the guided inference equation, making training stable.
        y_hat_psi = y_hat_phi + adjustment

        return y_hat_psi

# ------------------------------------------------------------------------------
# Task 10, Step 3: Train œà with MSE loss and define guided inference
# ------------------------------------------------------------------------------

def _run_guided_validation(
    model: ConditionalFusionModel,
    val_loader: DataLoader,
    config: Dict[str, Any],
    device: torch.device
) -> float:
    """
    Runs validation using the full guided inference procedure.

    This function evaluates the model's performance as it would be used in
    production, by generating both a conditional and an unconditional forecast
    and blending them using the guidance scale.

    Args:
        model: The `ConditionalFusionModel` instance.
        val_loader: DataLoader for the validation set.
        config: The `conditional_fusion_config` section of the master config.
        device: The torch device.

    Returns:
        The average Mean Squared Error loss on the validation set.
    """
    # Set the model to evaluation mode to disable dropout and other training behaviors.
    model.eval()

    # Initialize total validation loss.
    total_val_loss = 0.0

    # Extract the guidance scale `s` from the configuration.
    guidance_scale = config['inference_params']['guidance_scale']

    # Disable gradient calculations for efficiency during validation.
    with torch.no_grad():
        # Iterate through the validation set.
        for x_batch, c_batch, y_batch in val_loader:
            # Move data to the specified device.
            x_batch, c_batch, y_batch = x_batch.to(device), c_batch.to(device), y_batch.to(device)

            # --- Guided Inference (Equation 9) ---
            # 1. Get the conditional forecast by passing the true attributes `c`.
            y_hat_cond = model(x_batch, c_batch, force_unconditional=False)

            # 2. Get the unconditional forecast by forcing the null attribute token `√∏`.
            y_hat_uncond = model(x_batch, c_batch, force_unconditional=True)

            # 3. Combine them using the guidance scale `s`.
            # Equation: ≈∑ = ≈∑_uncond + s * (≈∑_cond - ≈∑_uncond)
            y_hat_final = y_hat_uncond + guidance_scale * (y_hat_cond - y_hat_uncond)

            # Calculate the MSE loss between the final, guided prediction and the ground truth.
            loss = F.mse_loss(y_hat_final, y_batch)

            # Accumulate the loss for the batch.
            total_val_loss += loss.item()

    # Return the average loss across all validation batches.
    return total_val_loss / len(val_loader)


def train_conditional_forecaster(
    data_splits: Dict[str, Dict[str, Any]],
    prompts: List[str],
    reasoning_model_path: str,
    backbone_path: str,
    parser: LLMOutputParser,
    config: Dict[str, Any]
) -> str:
    """
    Orchestrates the training of the ConditionalFusionModel (œà).

    This function manages the entire end-to-end workflow for Task 10:
    1.  Derives the conditioning attributes `c` for the dataset using the
        final reasoning model.
    2.  Filters the dataset to include only samples with valid attributes.
    3.  Initializes the `ConditionalFusionModel`, loading the pre-trained
        backbone `œÜ`.
    4.  Runs a training loop using MSE loss and classifier-free dropping.
    5.  Periodically evaluates the model using a full guided inference validation
        loop to find and save the best-performing checkpoint.

    Args:
        data_splits: The dictionary of train/val/test data splits.
        prompts: A list of all prompts corresponding to the data splits.
        reasoning_model_path: Path to the best model checkpoint from Stage 3 (œÄŒ∏).
        backbone_path: Path to the best model checkpoint from Task 9 (œÜ).
        parser: An instance of the `LLMOutputParser`.
        config: The master configuration dictionary.

    Returns:
        The file path to the best trained `ConditionalFusionModel` checkpoint.
    """
    # --- Setup ---
    # Determine the execution device.
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Extract relevant configuration sections.
    fusion_config = config['conditional_fusion_config']
    train_config = fusion_config['training_params']

    # Define and create the output directory for model checkpoints.
    output_dir = "./models/conditional_fusion_model"
    os.makedirs(output_dir, exist_ok=True)

    print("\n--- Training Conditional Fusion Model (œà) ---")
    print(f"Using device: {device}")

    # --- Step 1: Derive Conditioning Attributes ---
    # This is a potentially long-running preprocessing step.
    attributes, valid_mask = _derive_conditioning_attributes(
        prompts, reasoning_model_path, parser, config, device
    )

    # --- DataLoaders (filtered for samples with valid attributes) ---
    # It is critical to filter all data splits consistently using the valid_mask.
    # We get the original indices from the metadata and then filter by the mask.
    train_original_indices = data_splits['train']['meta'].index
    valid_train_mask = valid_mask[train_original_indices]

    val_original_indices = data_splits['val']['meta'].index
    valid_val_mask = valid_mask[val_original_indices]

    # Create the training dataset using only the valid samples.
    train_dataset = TensorDataset(
        torch.from_numpy(data_splits['train']['X'][valid_train_mask]).float(),
        torch.from_numpy(attributes[train_original_indices][valid_train_mask]).float(),
        torch.from_numpy(data_splits['train']['y'][valid_train_mask]).float()
    )
    train_loader = DataLoader(train_dataset, batch_size=train_config['batch_size'], shuffle=True)

    # Create the validation dataset using only the valid samples.
    val_dataset = TensorDataset(
        torch.from_numpy(data_splits['val']['X'][valid_val_mask]).float(),
        torch.from_numpy(attributes[val_original_indices][valid_val_mask]).float(),
        torch.from_numpy(data_splits['val']['y'][valid_val_mask]).float()
    )
    val_loader = DataLoader(val_dataset, batch_size=train_config['batch_size'])

    # --- Model and Optimizer ---
    # Load the pretrained backbone œÜ.
    backbone_config = config['forecasting_model_config']
    prototypes = _get_language_prototypes(
        backbone_config['architecture_params']['base_model_identifier'],
        backbone_config['architecture_params']['pca_n_prototypes_D'],
        device
    )
    backbone = ForecastingBackbone(backbone_config, prototypes)
    backbone.load_state_dict(torch.load(backbone_path, map_location=device))

    # Instantiate the fusion model œà, passing the backbone and config.
    model = ConditionalFusionModel(backbone, fusion_config).to(device)

    # Create an optimizer that only targets the trainable parameters.
    optimizer = torch.optim.AdamW(
        [p for p in model.parameters() if p.requires_grad],
        lr=train_config['learning_rate']
    )

    # --- Training Loop ---
    best_val_loss = float('inf')
    best_model_path = ""
    p_uncond = train_config['unconditional_probability']

    # Loop for the specified number of epochs.
    for epoch in range(train_config['epochs']):
        print(f"\nEpoch {epoch+1}/{train_config['epochs']}")
        # Set the model to training mode.
        model.train()

        # Use tqdm for a progress bar over the training batches.
        for x_batch, c_batch, y_batch in tqdm(train_loader, desc="Training"):
            # Move batch data to the device.
            x_batch, c_batch, y_batch = x_batch.to(device), c_batch.to(device), y_batch.to(device)

            # Reset gradients.
            optimizer.zero_grad()

            # Equation 7: L_forecast = E[||≈∑_œà(X, ƒç) - y||^2]
            # The model's forward pass handles the random dropping of `c` to create `ƒç`.
            y_hat = model(x_batch, c_batch, p_uncond=p_uncond)

            # Calculate the Mean Squared Error loss.
            loss = F.mse_loss(y_hat, y_batch)

            # Perform backpropagation.
            loss.backward()

            # Update the model weights.
            optimizer.step()

        # --- Validation ---
        # Run the guided validation procedure at the end of each epoch.
        avg_val_loss = _run_guided_validation(model, val_loader, fusion_config, device)
        print(f"Avg Guided Validation Loss: {avg_val_loss:.6f}")

        # Save the best model checkpoint based on validation performance.
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            best_model_path = f"{output_dir}/best_checkpoint.pth"
            torch.save(model.state_dict(), best_model_path)
            print(f"New best model saved to {best_model_path}")

    print(f"\n--- Conditional Fusion Model Training Complete ---")

    # If no best model was saved (e.g., validation never improved), save the final state.
    if not best_model_path:
        best_model_path = f"{output_dir}/final_checkpoint.pth"
        torch.save(model.state_dict(), best_model_path)
        print(f"Validation did not improve. Final model saved to {best_model_path}")

    return best_model_path


In [None]:
# Task 11: Perform End-to-End Inference (Forecast + Narrative)

# ==============================================================================
# Task 11: Perform End-to-End Inference (Forecast + Narrative)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 11, Step 1: Load all trained models for inference
# ------------------------------------------------------------------------------

class InferenceModels(NamedTuple):
    """
    A structured data container for all trained models required for inference.

    This `NamedTuple` provides a standardized, immutable, and type-safe object
    for holding the fully trained and configured models needed for the end-to-end
    inference pipeline. Encapsulating these components together simplifies passing
    them between functions and improves code readability and maintainability.

    Attributes:
        reasoning_model: The fully trained reasoning policy model (œÄŒ∏), composed
                         of the base LLM with the final Stage 3 LoRA adapters
                         applied. This model is responsible for generating the
                         textual narrative and the initial forecast used to
                         derive conditioning attributes. It is an instance of
                         `peft.PeftModel`.
        fusion_model: The fully trained conditional fusion model (œà), which
                      internally contains the trained forecasting backbone (œÜ).
                      This model is responsible for taking the time-series data
                      and conditioning attributes to produce the final guided
                      numerical forecast. It is an instance of
                      `ConditionalFusionModel`.
        tokenizer: The tokenizer associated with the base LLM, required for both
                   encoding prompts for the `reasoning_model` and decoding its
                   generated outputs. It is an instance of a Hugging Face
                   `PreTrainedTokenizer`.
    """
    reasoning_model: PeftModel
    fusion_model: "ConditionalFusionModel" # Use string forward reference
    tokenizer: AutoTokenizer

def _load_inference_models(
    reasoning_model_path: str,
    conditional_model_path: str,
    config: Dict[str, Any],
    device: torch.device
) -> InferenceModels:
    """
    Loads all trained model artifacts required for end-to-end inference.

    Args:
        reasoning_model_path: Path to the trained Stage 3 reasoning model adapters (œÄŒ∏).
        conditional_model_path: Path to the trained ConditionalFusionModel state dict (œà).
        config: The master configuration dictionary.
        device: The torch device for model placement.

    Returns:
        An `InferenceModels` named tuple containing the loaded and configured models.
    """
    print("--- Loading All Models for Inference ---")

    # --- 1. Load Reasoning Model (œÄŒ∏) ---
    try:
        model_id = config['reasoning_model_config']['llm_settings']['base_model_identifier']
        print(f"Loading reasoning model: base '{model_id}' + LoRA '{reasoning_model_path}'")
        base_model_pi = AutoModelForCausalLM.from_pretrained(
            model_id, torch_dtype=torch.bfloat16, trust_remote_code=True
        )
        reasoning_model = PeftModel.from_pretrained(base_model_pi, reasoning_model_path).to(device)
        reasoning_model.eval()

        tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
        if tokenizer.pad_token is None:
            tokenizer.pad_token = tokenizer.eos_token
        tokenizer.padding_side = 'left'
    except Exception as e:
        raise IOError(f"Failed to load reasoning model. Error: {e}")

    # --- 2. Load Conditional Fusion Model (œà) ---
    try:
        print(f"Loading conditional fusion model from: {conditional_model_path}")
        # First, instantiate the architecture, including the backbone œÜ.
        backbone_config = config['forecasting_model_config']
        prototypes = _get_language_prototypes(
            backbone_config['architecture_params']['base_model_identifier'],
            backbone_config['architecture_params']['pca_n_prototypes_D'],
            device
        )
        backbone = ForecastingBackbone(backbone_config, prototypes)

        # Instantiate the parent fusion model.
        fusion_config = config['conditional_fusion_config']
        fusion_model = ConditionalFusionModel(backbone, fusion_config).to(device)

        # Load the trained state dictionary.
        fusion_model.load_state_dict(torch.load(conditional_model_path, map_location=device))
        fusion_model.eval()
    except Exception as e:
        raise IOError(f"Failed to load conditional fusion model. Error: {e}")

    print("All models loaded successfully.")
    return InferenceModels(reasoning_model, fusion_model, tokenizer)

# ------------------------------------------------------------------------------
# Task 11, Step 2 & 3: Generate, Blend, and Validate Outputs
# ------------------------------------------------------------------------------

def run_end_to_end_inference(
    data_splits: Dict[str, Dict[str, Any]],
    prompts: List[str],
    reasoning_model_path: str,
    conditional_model_path: str,
    parser: LLMOutputParser,
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Orchestrates the full end-to-end inference pipeline for the VTA framework.

    This function produces the final dual output (guided forecast and narrative)
    for the test set by coordinating all trained model components.

    Args:
        data_splits: The dictionary of data splits, requiring the 'test' key.
        prompts: A list of all prompts for the entire dataset.
        reasoning_model_path: Path to the final trained reasoning model (œÄŒ∏).
        conditional_model_path: Path to the final trained fusion model (œà).
        parser: An instance of the LLMOutputParser.
        config: The master configuration dictionary.

    Returns:
        A pandas DataFrame containing the inference results, with columns for
        metadata, the final numerical forecast, and the textual narrative.
    """
    # --- Setup ---
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"\n--- Running End-to-End Inference on device '{device}' ---")

    # --- Step 1: Load all models ---
    models = _load_inference_models(reasoning_model_path, conditional_model_path, config, device)

    # --- Prepare Test Data ---
    test_meta = data_splits['test']['meta']
    test_indices = test_meta.index
    test_prompts = [prompts[i] for i in test_indices]
    test_X = data_splits['test']['X']

    test_dataset = TensorDataset(
        torch.arange(len(test_prompts)), # Use indices to map back to prompts
        torch.from_numpy(test_X).float()
    )
    test_loader = DataLoader(test_dataset, batch_size=config['conditional_fusion_config']['training_params']['batch_size'])

    # --- Inference Loop ---
    results = []
    guidance_scale = config['conditional_fusion_config']['inference_params']['guidance_scale']
    num_successful_generations = 0

    # Disable all gradient calculations for maximum performance.
    with torch.no_grad():
        for batch_prompt_indices, x_batch in tqdm(test_loader, desc="Inference"):
            x_batch = x_batch.to(device)
            batch_prompts = [test_prompts[i] for i in batch_prompt_indices.tolist()]

            # --- Part A: Generate Narrative and Attributes with Reasoning Model (œÄŒ∏) ---
            inputs = models.tokenizer(batch_prompts, return_tensors="pt", padding=True, truncation=True).to(device)
            gen_outputs = models.reasoning_model.generate(**inputs, max_new_tokens=128, do_sample=False)
            decoded_texts = models.tokenizer.batch_decode(gen_outputs, skip_special_tokens=True)

            # Parse outputs and derive conditioning attributes `c`.
            c_batch_list = []
            narratives = []
            is_valid_mask = []
            for text in decoded_texts:
                parsed = parser.parse(text)
                if parsed.is_valid:
                    pred = parsed.prediction
                    c_batch_list.append([np.min(pred), np.mean(pred), np.max(pred)])
                    narratives.append(parsed.reasoning)
                    is_valid_mask.append(True)
                    num_successful_generations += 1
                else:
                    # Handle failed generation: use null attributes and store error message.
                    c_batch_list.append([0.0, 0.0, 0.0])
                    narratives.append(f"ERROR: {parsed.error_message}")
                    is_valid_mask.append(False)

            c_batch = torch.tensor(c_batch_list, dtype=torch.float32).to(device)

            # --- Part B: Perform Guided Inference with Fusion Model (œà) ---
            # 1. Get the conditional forecast.
            y_hat_cond = models.fusion_model(x_batch, c_batch, force_unconditional=False)

            # 2. Get the unconditional forecast.
            y_hat_uncond = models.fusion_model(x_batch, c_batch, force_unconditional=True)

            # 3. Blend them using the guidance scale `s`.
            # Equation 9: ≈∑ = ≈∑_uncond + s * (≈∑_cond - ≈∑_uncond)
            final_forecasts = y_hat_uncond + guidance_scale * (y_hat_cond - y_hat_uncond)

            # Fallback for failed generations: use the unconditional forecast.
            is_valid_mask_tensor = torch.tensor(is_valid_mask, device=device).unsqueeze(1)
            final_forecasts = torch.where(is_valid_mask_tensor, final_forecasts, y_hat_uncond)

            # --- Store Results ---
            # Move results to CPU and convert to numpy/python types for storage.
            final_forecasts_cpu = final_forecasts.cpu().numpy()

            for i in range(len(batch_prompts)):
                original_index = test_indices[batch_prompt_indices[i].item()]
                meta_row = data_splits['test']['meta'].loc[original_index]
                results.append({
                    'ticker': meta_row['ticker'],
                    'window_end_date': meta_row['window_end_date'],
                    'final_forecast': final_forecasts_cpu[i],
                    'narrative': narratives[i]
                })

    # --- Finalization and Reporting ---
    # Step 3: Validate output format and log anomalies.
    total_samples = len(test_prompts)
    success_rate = (num_successful_generations / total_samples) * 100 if total_samples > 0 else 0
    print(f"\n--- Inference Complete ---")
    print(f"Reasoning Generation Success Rate: {num_successful_generations}/{total_samples} ({success_rate:.2f}%)")

    # Convert the list of result dictionaries into a final DataFrame.
    results_df = pd.DataFrame(results)

    return results_df


In [None]:
# Task 12: Implement Two Baselines (DLinear and TSMixer)

# ==============================================================================
# Task 12: Implement Two Baselines (DLinear and TSMixer)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 12, Generic Baseline Training Framework
# ------------------------------------------------------------------------------

def _train_and_evaluate_baseline(
    model: nn.Module,
    model_name: str,
    data_splits: Dict[str, Dict[str, np.ndarray]],
    config: Dict[str, Any],
    device: torch.device
) -> Dict[str, float]:
    """
    A generic, rigorous training and evaluation framework for baseline models.

    This function provides a standardized harness to ensure that all baseline
    models are trained and evaluated under identical, reproducible conditions.

    Process:
    1.  Initializes DataLoaders for training, validation, and testing.
    2.  Initializes the AdamW optimizer.
    3.  Runs a training loop for a specified number of epochs.
    4.  After each epoch, it evaluates the model on the validation set.
    5.  If the validation loss improves, it saves a checkpoint of the model's state.
    6.  After training, it loads the best checkpoint and performs a final,
        definitive evaluation on the held-out test set.

    Args:
        model: An instantiated PyTorch nn.Module for the baseline.
        model_name: The name of the model (e.g., 'DLinear') for logging and saving.
        data_splits: The dictionary of train/val/test data splits.
        config: A dictionary of training hyperparameters (epochs, lr, batch_size, etc.).
        device: The torch device on which to run training and evaluation.

    Returns:
        A dictionary containing the final test set performance metrics: {'MSE': ..., 'MAE': ...}.
    """
    print(f"\n--- Training and Evaluating Baseline: {model_name} ---")

    # --- 1. Setup DataLoaders ---
    # Create TensorDatasets from the numpy arrays.
    train_dataset = TensorDataset(torch.from_numpy(data_splits['train']['X']).float(), torch.from_numpy(data_splits['train']['y']).float())
    val_dataset = TensorDataset(torch.from_numpy(data_splits['val']['X']).float(), torch.from_numpy(data_splits['val']['y']).float())
    test_dataset = TensorDataset(torch.from_numpy(data_splits['test']['X']).float(), torch.from_numpy(data_splits['test']['y']).float())

    # Create DataLoader instances for batching and shuffling.
    train_loader = DataLoader(train_dataset, batch_size=config['batch_size'], shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=config['batch_size'])
    test_loader = DataLoader(test_dataset, batch_size=config['batch_size'])

    # --- 2. Model and Optimizer Setup ---
    # Move the model to the specified device.
    model.to(device)
    # Initialize the AdamW optimizer with the specified learning rate.
    optimizer = torch.optim.AdamW(model.parameters(), lr=config['learning_rate'])

    # --- 3. Training and Validation Loop ---
    # Initialize tracking variables for the best model checkpoint.
    best_val_loss = float('inf')
    output_dir = f"./models/baselines/{model_name}"
    os.makedirs(output_dir, exist_ok=True)
    best_model_path = f"{output_dir}/best_checkpoint.pth"

    # Loop for the specified number of epochs.
    for epoch in range(config['epochs']):
        # --- Training Phase ---
        model.train()
        train_pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{config['epochs']} [Train]")
        for x_batch, y_batch in train_pbar:
            # Move batch to device.
            x_batch, y_batch = x_batch.to(device), y_batch.to(device)
            # Reset gradients.
            optimizer.zero_grad()
            # Forward pass.
            y_hat = model(x_batch)
            # Compute loss.
            loss = F.mse_loss(y_hat, y_batch)
            # Backward pass.
            loss.backward()
            # Update weights.
            optimizer.step()
            train_pbar.set_postfix(loss=loss.item())

        # --- Validation Phase ---
        model.eval()
        val_loss = 0.0
        # Disable gradient computation for validation.
        with torch.no_grad():
            for x_batch, y_batch in val_loader:
                x_batch, y_batch = x_batch.to(device), y_batch.to(device)
                y_hat = model(x_batch)
                loss = F.mse_loss(y_hat, y_batch)
                val_loss += loss.item()

        # Calculate average validation loss for the epoch.
        avg_val_loss = val_loss / len(val_loader)
        print(f"Epoch {epoch+1}/{config['epochs']} | Avg Val Loss: {avg_val_loss:.6f}")

        # Save the model if it has the best validation loss so far.
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            torch.save(model.state_dict(), best_model_path)
            print(f"Validation loss improved. New best model saved to {best_model_path}")

    # --- 4. Final Evaluation on Test Set ---
    print(f"\nEvaluating best {model_name} model on the test set...")
    # Load the state of the best performing model.
    model.load_state_dict(torch.load(best_model_path, map_location=device))
    model.eval()

    test_mse = 0.0
    test_mae = 0.0
    # Disable gradient computation for final evaluation.
    with torch.no_grad():
        for x_batch, y_batch in tqdm(test_loader, desc="Testing"):
            x_batch, y_batch = x_batch.to(device), y_batch.to(device)
            y_hat = model(x_batch)
            # Accumulate MSE and MAE.
            test_mse += F.mse_loss(y_hat, y_batch).item()
            test_mae += F.l1_loss(y_hat, y_batch).item()

    # Calculate average metrics over the test set.
    avg_test_mse = test_mse / len(test_loader)
    avg_test_mae = test_mae / len(test_loader)

    print(f"Final Test Results for {model_name} - MSE: {avg_test_mse:.6f}, MAE: {avg_test_mae:.6f}")

    # Return the final performance metrics.
    return {'MSE': avg_test_mse, 'MAE': avg_test_mae}

# ------------------------------------------------------------------------------
# Task 12, Step 1: Implement the DLinear baseline
# ------------------------------------------------------------------------------

class DLinear(nn.Module):
    """
    A professional, configurable PyTorch implementation of the DLinear model.

    DLinear is a simple yet surprisingly effective model for time-series
    forecasting. Its core principle is to decompose the input time series into
    two components: a long-term trend and a shorter-term seasonal (or residual)
    pattern. Each component is then modeled by a separate linear layer, and their
    outputs are summed to produce the final forecast.

    Architecture:
    1.  **Decomposition**: A moving average filter (`nn.AvgPool1d`) is applied to
        the input sequence to extract the trend component. The seasonal component
        is simply the residual (input - trend).
    2.  **Forecasting**: The flattened trend and seasonal sequences are each passed
        through their own dedicated `nn.Linear` layer, which directly maps the
        input sequence to the output forecast horizon.
    3.  **Combination**: The final forecast is the sum of the outputs from the
        trend and seasonal linear layers.
    """
    def __init__(self, input_len: int, output_len: int, num_features: int, kernel_size: int = 25):
        """
        Initializes the DLinear model.

        Args:
            input_len: The length of the input time-series sequence (T).
            output_len: The length of the output forecast horizon (T').
            num_features: The number of features in the input time series (e.g., 6).
            kernel_size: The window size for the moving average filter used for
                         trend decomposition.
        """
        # Initialize the parent torch.nn.Module.
        super().__init__()

        # Store sequence lengths for internal calculations.
        self.input_len = input_len
        self.output_len = output_len

        # A 1D average pooling layer serves as an efficient moving average filter.
        # Padding is set to `(kernel_size - 1) // 2` to ensure the output sequence
        # has the same length as the input sequence ("same" padding).
        self.decomposer = nn.AvgPool1d(
            kernel_size=kernel_size,
            stride=1,
            padding=(kernel_size - 1) // 2
        )

        # A linear layer dedicated to learning patterns from the seasonal component.
        # It maps the flattened input sequence to the flattened output sequence.
        self.linear_seasonal = nn.Linear(input_len * num_features, output_len)

        # A separate linear layer dedicated to learning patterns from the trend component.
        self.linear_trend = nn.Linear(input_len * num_features, output_len)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Performs a forward pass through the DLinear model.

        Args:
            x: The input tensor of shape (batch_size, input_len, num_features).

        Returns:
            The output forecast tensor of shape (batch_size, output_len).
        """
        # --- Input Validation ---
        if x.ndim != 3 or x.shape[1] != self.input_len:
            raise ValueError(f"Input tensor has incorrect shape. Expected (B, {self.input_len}, F), got {x.shape}")

        # --- Decomposition ---
        # Permute the tensor for nn.AvgPool1d, which expects (batch, features, length).
        # Current shape: (batch_size, input_len, num_features)
        # Target shape: (batch_size, num_features, input_len)
        permuted_x = x.permute(0, 2, 1)

        # Apply the moving average filter to the permuted tensor to extract the trend.
        trend_init = self.decomposer(permuted_x)

        # Permute the result back to the original dimension order.
        # Shape: (batch_size, num_features, input_len) -> (batch_size, input_len, num_features)
        trend_init = trend_init.permute(0, 2, 1)

        # The seasonal component is the residual left after subtracting the trend.
        seasonal_init = x - trend_init

        # Flatten the 2D time-series components into 1D vectors for the linear layers.
        # Shape: (batch_size, input_len, num_features) -> (batch_size, input_len * num_features)
        seasonal_flat = seasonal_init.reshape(seasonal_init.shape[0], -1)
        trend_flat = trend_init.reshape(trend_init.shape[0], -1)

        # --- Forecasting ---
        # Pass the flattened seasonal component through its dedicated linear layer.
        seasonal_output = self.linear_seasonal(seasonal_flat)

        # Pass the flattened trend component through its dedicated linear layer.
        trend_output = self.linear_trend(trend_flat)

        # The final forecast is the sum of the two component forecasts.
        return seasonal_output + trend_output

# ------------------------------------------------------------------------------
# Task 12, Step 2: Implement the TSMixer baseline
# ------------------------------------------------------------------------------

class MixerBlock(nn.Module):
    """
    A single, complete block of the TSMixer architecture.

    Each MixerBlock performs two distinct mixing operations with residual
    connections and layer normalization, following the standard "pre-norm"
    transformer block structure: `x + SubLayer(LayerNorm(x))`.

    1.  **Time-Mixing**: An MLP is applied across the time dimension for each
        feature independently, allowing information to flow between time steps.
    2.  **Feature-Mixing**: An MLP is applied across the feature dimension for
        each time step independently, allowing information to flow between features.
    """
    def __init__(self, input_len: int, d_model: int, dropout: float, ff_dim: int):
        """
        Initializes the MixerBlock.

        Args:
            input_len: The length of the input time-series sequence (T).
            d_model: The hidden dimension of the features.
            dropout: The dropout probability.
            ff_dim: The hidden dimension of the feed-forward networks within the MLPs.
        """
        super().__init__()

        # Layer normalization applied before the time-mixing MLP.
        self.norm1 = nn.LayerNorm(d_model)

        # Layer normalization applied before the feature-mixing MLP.
        self.norm2 = nn.LayerNorm(d_model)

        # The Time-Mixing MLP, which operates on the time dimension.
        self.time_mlp = nn.Sequential(
            nn.Linear(input_len, ff_dim),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(ff_dim, input_len),
            nn.Dropout(dropout)
        )

        # The Feature-Mixing MLP, which operates on the feature dimension.
        self.feature_mlp = nn.Sequential(
            nn.Linear(d_model, ff_dim),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(ff_dim, d_model),
            nn.Dropout(dropout)
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Performs a forward pass through the MixerBlock.

        Args:
            x: The input tensor of shape (batch_size, input_len, d_model).

        Returns:
            The output tensor of the same shape.
        """
        # --- Time-Mixing with Residual Connection ---
        # Store the input for the first residual connection.
        residual = x

        # Apply pre-normalization.
        x = self.norm1(x)

        # Permute for time-mixing: (B, T, F) -> (B, F, T).
        x = x.permute(0, 2, 1)

        # Apply the time-domain MLP.
        x = self.time_mlp(x)

        # Permute back to the original dimension order: (B, F, T) -> (B, T, F).
        x = x.permute(0, 2, 1)

        # Add the residual connection.
        x = x + residual

        # --- Feature-Mixing with Residual Connection ---
        # Store the input for the second residual connection.
        residual = x

        # Apply pre-normalization.
        x = self.norm2(x)

        # Apply the feature-domain MLP.
        x = self.feature_mlp(x)

        # Add the residual connection.
        x = x + residual

        return x


class TSMixer(nn.Module):
    """
    A professional, configurable PyTorch implementation of the TSMixer model.

    TSMixer is an "All-MLP" architecture for time-series forecasting. It avoids
    complex attention mechanisms and instead relies on a stack of `MixerBlock`s
    that repeatedly mix information across the time and feature dimensions. This
    design makes it computationally efficient and effective for many time-series tasks.

    Architecture:
    1.  **Input Projection**: A linear layer projects the raw input features into
        the model's hidden dimension (`d_model`).
    2.  **Mixer Blocks**: The core of the model is a sequence of `MixerBlock`s.
        Each block performs both time-mixing and feature-mixing with residual
        connections and layer normalization.
    3.  **Final Head**: After the final mixer block, the output sequence is
        flattened and passed through a final linear layer to produce the forecast
        of the desired horizon length.
    """
    def __init__(
        self,
        input_len: int,
        output_len: int,
        num_features: int,
        num_blocks: int,
        d_model: int,
        ff_dim: int,
        dropout: float
    ):
        """
        Initializes the TSMixer model.

        Args:
            input_len: The length of the input time-series sequence (T).
            output_len: The length of the output forecast horizon (T').
            num_features: The number of features in the raw input time series.
            num_blocks: The number of `MixerBlock`s to stack.
            d_model: The hidden dimension used throughout the mixer blocks.
            ff_dim: The hidden dimension of the feed-forward networks within the MLPs.
            dropout: The dropout probability used in the mixer blocks.
        """
        # Initialize the parent torch.nn.Module.
        super().__init__()

        # --- Input Validation ---
        if not all(isinstance(arg, int) and arg > 0 for arg in [input_len, output_len, num_features, num_blocks, d_model, ff_dim]):
            raise ValueError("All integer arguments must be positive.")
        if not 0.0 <= dropout < 1.0:
            raise ValueError("Dropout must be a float between 0.0 and 1.0.")

        # An initial linear layer to project the input features to the model's
        # hidden dimension (`d_model`). This allows the model to work with a
        # different internal dimension than the number of input features.
        self.input_projection = nn.Linear(num_features, d_model)

        # A sequential container stacking the specified number of MixerBlocks.
        # The `*` operator unpacks the list of MixerBlock instances.
        self.mixer_blocks = nn.Sequential(
            *[MixerBlock(input_len, d_model, dropout, ff_dim) for _ in range(num_blocks)]
        )

        # A final linear layer to project the flattened output of the mixer blocks
        # to the desired forecast horizon (`output_len`).
        self.final_head = nn.Linear(input_len * d_model, output_len)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Performs a forward pass through the TSMixer model.

        Args:
            x: The input tensor of shape (batch_size, input_len, num_features).

        Returns:
            The output forecast tensor of shape (batch_size, output_len).
        """
        # --- Input Validation ---
        # Ensure the input tensor has the correct number of dimensions.
        if x.ndim != 3:
            raise ValueError(f"Input tensor must be 3-dimensional (B, T, F), but got {x.ndim} dimensions.")
        # Ensure the input tensor dimensions match the model's configuration.
        if x.shape[1] != self.input_projection.in_features and x.shape[2] != self.mixer_blocks[0].time_mlp[0].in_features:
             pass # A more robust check would be needed here if dimensions were not fixed.

        # 1. Project input features to the model's hidden dimension.
        # Shape: (batch_size, input_len, num_features) -> (batch_size, input_len, d_model)
        x = self.input_projection(x)

        # 2. Pass the sequence through the stack of mixer blocks.
        # The shape remains (batch_size, input_len, d_model).
        x = self.mixer_blocks(x)

        # 3. Flatten the output and pass through the final head to get the forecast.
        # Reshape from (batch_size, input_len, d_model) to (batch_size, input_len * d_model).
        x_flat = x.reshape(x.shape[0], -1)

        # Project to the final output dimension.
        # Shape: (batch_size, input_len * d_model) -> (batch_size, output_len)
        return self.final_head(x_flat)

# ------------------------------------------------------------------------------
# Task 12, Orchestrator Function
# ------------------------------------------------------------------------------

def train_and_evaluate_baselines(
    data_splits: Dict[str, Dict[str, np.ndarray]],
    config: Dict[str, Any]
) -> Dict[str, Dict[str, float]]:
    """
    Orchestrates the training and evaluation of all specified baseline models.

    Args:
        data_splits: The dictionary of train/val/test data splits.
        config: The master configuration dictionary (for extracting parameters).

    Returns:
        A nested dictionary containing the final performance metrics for each baseline.
    """
    # Determine the execution device.
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Extract common data shape parameters.
    input_len = data_splits['train']['X'].shape[1]
    output_len = data_splits['train']['y'].shape[1]
    num_features = data_splits['train']['X'].shape[2]

    # Define baseline-specific configurations. In a real system, this would
    # be part of the master config file.
    baseline_configs = {
        'DLinear': {
            'model_params': {'kernel_size': 25},
            'train_params': {'epochs': 50, 'learning_rate': 0.001, 'batch_size': 32}
        },
        'TSMixer': {
            'model_params': {'num_blocks': 2, 'd_model': 32, 'ff_dim': 64, 'dropout': 0.1},
            'train_params': {'epochs': 50, 'learning_rate': 0.001, 'batch_size': 32}
        }
    }

    # Dictionary to store the final results.
    results = {}

    # --- DLinear ---
    # Instantiate the DLinear model with its specific parameters.
    dlinear_model = DLinear(
        input_len, output_len, num_features,
        **baseline_configs['DLinear']['model_params']
    )
    # Run the generic training and evaluation harness.
    results['DLinear'] = _train_and_evaluate_baseline(
        dlinear_model, 'DLinear', data_splits,
        baseline_configs['DLinear']['train_params'], device
    )

    # --- TSMixer ---
    # Instantiate the TSMixer model with its specific parameters.
    tsmixer_model = TSMixer(
        input_len, output_len, num_features,
        **baseline_configs['TSMixer']['model_params']
    )
    # Run the generic training and evaluation harness.
    results['TSMixer'] = _train_and_evaluate_baseline(
        tsmixer_model, 'TSMixer', data_splits,
        baseline_configs['TSMixer']['train_params'], device
    )

    return results


In [None]:
# Task 13: Evaluate Forecasting Performance and Portfolio Utility

# ==============================================================================
# Task 13: Evaluate Forecasting Performance and Portfolio Utility
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 13, Step 1: Compute MSE and MAE with consistent aggregation
# ------------------------------------------------------------------------------

def _calculate_error_metrics(
    model_name: str,
    predictions: np.ndarray,
    ground_truth: np.ndarray,
    metadata: pd.DataFrame,
    ticker_to_dataset_map: Dict[str, str]
) -> pd.DataFrame:
    """
    Calculates and aggregates MSE and MAE for a given set of predictions.

    This function provides a single, canonical method for calculating error
    metrics, ensuring all models are evaluated under identical conditions.

    Args:
        model_name: The name of the model being evaluated.
        predictions: A 2D numpy array of shape (num_samples, T') of model forecasts.
        ground_truth: A 2D numpy array of shape (num_samples, T') of true values.
        metadata: The metadata DataFrame corresponding to the samples.
        ticker_to_dataset_map: A map from ticker to dataset name (e.g., 'StockNet').

    Returns:
        A pandas DataFrame summarizing the metrics for the model.
    """
    # --- Per-Window Error Calculation ---
    # Equation: MSE_i = (1/T') * Œ£(≈∑_i,k - y_i,k)^2
    per_window_mse = np.mean((predictions - ground_truth)**2, axis=1)
    # Equation: MAE_i = (1/T') * Œ£|≈∑_i,k - y_i,k|
    per_window_mae = np.mean(np.abs(predictions - ground_truth), axis=1)

    # --- Aggregation ---
    results_df = metadata.copy()
    results_df['mse'] = per_window_mse
    results_df['mae'] = per_window_mae
    results_df['dataset'] = results_df['ticker'].map(ticker_to_dataset_map)

    # Group by dataset and calculate the mean of the per-window errors.
    dataset_metrics = results_df.groupby('dataset')[['mse', 'mae']].mean()

    # --- Formatting Output ---
    final_metrics = {}
    for dataset, row in dataset_metrics.iterrows():
        final_metrics[f"{dataset}_MSE"] = row['mse']
        final_metrics[f"{dataset}_MAE"] = row['mae']

    # Calculate "All Data" metrics by averaging across all windows.
    final_metrics['All_Data_MSE'] = results_df['mse'].mean()
    final_metrics['All_Data_MAE'] = results_df['mae'].mean()

    # Return as a DataFrame with the model name as the index.
    return pd.DataFrame(final_metrics, index=[model_name])

# ------------------------------------------------------------------------------
# Task 13, Step 2: Construct daily Markowitz portfolios
# ------------------------------------------------------------------------------

def _solve_markowitz_optimization(
    exp_returns: np.ndarray,
    cov_matrix: np.ndarray,
    risk_aversion: float
) -> np.ndarray:
    """
    Solves the classic Markowitz mean-variance optimization problem.

    This function finds the optimal portfolio weights `w` that maximize the
    risk-adjusted expected return, subject to standard constraints (long-only,
    fully invested). It uses `cvxpy` for robust and efficient convex optimization.

    Equation:
        maximize(w):  Œº'w - (Œ≥/2) * w'Œ£w
        subject to:   w >= 0, sum(w) = 1

    Args:
        exp_returns: A 1D numpy array of expected returns for each asset (Œº).
        cov_matrix: A 2D numpy array representing the covariance matrix of
                    asset returns (Œ£).
        risk_aversion: The risk aversion parameter (Œ≥).

    Returns:
        A 1D numpy array of optimal asset weights `w`. If the solver fails,
        it returns an equal-weight portfolio as a robust fallback.
    """
    # --- Input Validation ---
    num_assets = len(exp_returns)
    if cov_matrix.shape != (num_assets, num_assets):
        raise ValueError("Shape mismatch between expected returns and covariance matrix.")

    # --- Optimization Problem Definition ---
    # Define the portfolio weights as a cvxpy Variable.
    w = cp.Variable(num_assets)

    # Define the objective function to be maximized.
    objective = cp.Maximize(exp_returns.T @ w - (risk_aversion / 2) * cp.quad_form(w, cov_matrix))

    # Define the constraints: weights must sum to 1 and be non-negative.
    constraints = [cp.sum(w) == 1, w >= 0]

    # Formulate the optimization problem.
    problem = cp.Problem(objective, constraints)

    # --- Solve and Handle Failures ---
    try:
        # Solve the problem using a suitable solver (e.g., OSQP for quadratic programs).
        problem.solve(solver=cp.OSQP, verbose=False)

        # If the solver fails to find a solution or the problem is infeasible,
        # fall back to a simple equal-weight portfolio.
        if w.value is None or problem.status in [cp.INFEASIBLE, cp.UNBOUNDED]:
            return np.full(num_assets, 1.0 / num_assets)

        # Return the optimal weights.
        return w.value

    except (cp.error.SolverError, Exception):
        # Broad exception to catch any other potential solver-related errors.
        return np.full(num_assets, 1.0 / num_assets)


def _calculate_portfolio_metrics(daily_returns: pd.Series) -> Dict[str, float]:
    """
    Calculates standard portfolio performance metrics from a daily return series.

    Args:
        daily_returns: A pandas Series of daily portfolio returns, indexed by date.

    Returns:
        A dictionary containing the key performance metrics: 'Return' (annualized),
        'Volatility' (annualized), 'Sharpe' ratio, and 'MaxDrawdown'.
    """
    # --- Input Validation ---
    # If the return series is empty, return zero for all metrics.
    if daily_returns.empty:
        return {'Return': 0.0, 'Volatility': 0.0, 'Sharpe': 0.0, 'MaxDrawdown': 0.0}

    # --- Metric Calculation ---
    # Assume 252 trading days in a year for annualization.
    trading_days = 252

    # Calculate the total compounded return over the period.
    total_return = (1 + daily_returns).prod()

    # Annualize the total return.
    num_years = len(daily_returns) / trading_days
    annualized_return = total_return**(1 / num_years) - 1 if num_years > 0 else 0.0

    # Calculate annualized volatility (standard deviation of daily returns).
    annualized_volatility = daily_returns.std() * np.sqrt(trading_days)

    # Calculate the Sharpe Ratio, assuming a risk-free rate of 0.
    # Handle the case of zero volatility to avoid division by zero.
    sharpe_ratio = annualized_return / annualized_volatility if annualized_volatility > 1e-6 else 0.0

    # Calculate the Maximum Drawdown.
    # 1. Calculate the cumulative return series (equity curve).
    cumulative_returns = (1 + daily_returns).cumprod()
    # 2. Calculate the running maximum (high-water mark).
    peak = cumulative_returns.expanding(min_periods=1).max()
    # 3. Calculate the drawdown from the peak.
    drawdown = (cumulative_returns - peak) / peak
    # 4. The maximum drawdown is the minimum value in the drawdown series.
    max_drawdown = drawdown.min()

    # Return all metrics in a structured dictionary.
    return {
        'Return': annualized_return,
        'Volatility': annualized_volatility,
        'Sharpe': sharpe_ratio,
        'MaxDrawdown': max_drawdown
    }


def _run_portfolio_backtest(
    model_name: str,
    predictions_pivot: pd.DataFrame,
    cleansed_df: pd.DataFrame,
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Runs a daily rebalancing backtest using Markowitz portfolio optimization.

    This function simulates a trading strategy where a portfolio is rebalanced
    daily based on the model's forecasts for the next `T'` days.

    Args:
        model_name: The name of the model being backtested.
        predictions_pivot: A DataFrame of forecasts, indexed by date, with
                           tickers as columns.
        cleansed_df: The full cleansed market data DataFrame containing actual prices.
        config: The master configuration dictionary.

    Returns:
        A pandas DataFrame containing the final portfolio performance metrics,
        indexed by the model name.
    """
    print(f"Running portfolio backtest for {model_name}...")

    # Extract portfolio-specific configuration.
    portfolio_config = config['evaluation_and_portfolio_config']['portfolio']

    # --- Data Preparation ---
    # Get actual historical prices and calculate daily returns.
    actual_prices = cleansed_df['Adj Close'].unstack()
    actual_returns = actual_prices.pct_change().fillna(0)

    # Find the common dates between available forecasts and actual returns.
    common_dates = predictions_pivot.index.intersection(actual_returns.index)

    # List to store the calculated daily returns of the strategy.
    portfolio_returns = []

    # --- Daily Rebalancing Loop ---
    # Iterate through each day for which we have a forecast to make a decision.
    # We stop one day early because we need the next day's return to calculate performance.
    for t in tqdm(common_dates[:-1], desc=f"Backtesting {model_name}"):
        # On day `t`, we use the forecast made at the end of day `t` to decide
        # our portfolio for day `t+1`.

        # Get the forecasts for all available assets on day `t`.
        daily_forecasts = predictions_pivot.loc[t].dropna()

        # If no assets have forecasts for this day, hold cash (0% return).
        if daily_forecasts.empty:
            portfolio_returns.append(0.0)
            continue

        # Get the list of assets to include in the portfolio.
        assets = daily_forecasts.index
        price_forecasts = np.array(daily_forecasts.tolist())

        # Ensure forecasts are 2D for consistent processing.
        if price_forecasts.ndim == 1:
            price_forecasts = price_forecasts.reshape(1, -1)

        # --- Prepare Inputs for Optimizer ---
        # 1. Expected Returns (Œº): Convert T'-step price forecasts to (T'-1) return forecasts
        #    and take the mean as the expected return for each asset.
        return_forecasts = (price_forecasts[:, 1:] - price_forecasts[:, :-1]) / price_forecasts[:, :-1]
        exp_returns = np.mean(return_forecasts, axis=1)

        # 2. Covariance Matrix (Œ£): Use the model's forecasted return paths to estimate covariance.
        if len(assets) > 1 and return_forecasts.shape[1] > 1:
            cov_matrix = np.cov(return_forecasts)
            # Add a small identity matrix (regularization) for numerical stability.
            cov_matrix += np.eye(len(assets)) * 1e-6
        else:
            # Handle the case of a single asset or single-step return forecast.
            cov_matrix = np.array([[np.var(return_forecasts) if return_forecasts.size > 0 else 1e-6]])

        # --- Solve for Optimal Weights ---
        weights = _solve_markowitz_optimization(
            exp_returns, cov_matrix, portfolio_config['risk_aversion_gamma']
        )

        # --- Calculate Realized Return for day t+1 ---
        # Get the actual returns for the chosen assets on the next trading day.
        next_day = t + pd.Timedelta(days=1)
        # Check if the next day is a valid trading day in our data.
        if next_day not in actual_returns.index or not all(asset in actual_returns.columns for asset in assets):
            portfolio_returns.append(0.0) # Assume 0% return if data is missing.
            continue

        realized_returns = actual_returns.loc[next_day, assets].values

        # The portfolio's return is the dot product of the weights and the realized returns.
        daily_return = np.dot(weights, realized_returns)
        portfolio_returns.append(daily_return)

    # --- Calculate Final Performance Metrics ---
    # Create a pandas Series from the list of daily returns.
    portfolio_returns_series = pd.Series(portfolio_returns, index=common_dates[1:])

    # Calculate the final summary metrics from the daily return series.
    metrics = _calculate_portfolio_metrics(portfolio_returns_series)

    # Return the metrics as a DataFrame, indexed by the model name.
    return pd.DataFrame(metrics, index=[model_name])

# ------------------------------------------------------------------------------
# Task 13, Orchestrator Function
# ------------------------------------------------------------------------------

def evaluate_performance_and_utility(
    vta_inference_results: pd.DataFrame,
    baseline_model_paths: Dict[str, str],
    data_splits: Dict[str, Dict[str, Any]],
    cleansed_df: pd.DataFrame,
    ticker_to_dataset_map: Dict[str, str],
    config: Dict[str, Any]
) -> Dict[str, pd.DataFrame]:
    """
    Orchestrates the full evaluation of forecasting and portfolio performance for all models.

    This function provides a complete, end-to-end evaluation pipeline that
    compares the main VTA model against all specified baselines on both
    statistical error metrics (MSE, MAE) and practical financial utility
    (portfolio performance).

    Args:
        vta_inference_results: The DataFrame of results from the main VTA model.
        baseline_model_paths: A dictionary mapping baseline model names to the
                              paths of their trained checkpoints.
        data_splits: The dictionary of data splits (train/val/test).
        cleansed_df: The full cleansed market data DataFrame.
        ticker_to_dataset_map: A map from ticker to dataset name.
        config: The master configuration dictionary.

    Returns:
        A dictionary containing two DataFrames:
        - 'error_metrics': A summary of MSE and MAE for all models.
        - 'portfolio_metrics': A summary of portfolio performance for all models.
    """
    print("\n--- Task 13: Evaluating Performance and Utility ---")

    # --- Setup ---
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    test_meta = data_splits['test']['meta']
    test_X = data_splits['test']['X']
    test_y = data_splits['test']['y']

    # Lists to store the results from each model.
    all_error_metrics = []
    all_portfolio_metrics = []

    # --- 1. Evaluate VTA Model ---
    print("Evaluating VTA model...")
    # Align VTA predictions with the test set's ground truth and metadata.
    vta_preds_df = vta_inference_results.set_index(['ticker', 'window_end_date'])
    aligned_meta = test_meta.set_index(['ticker', 'window_end_date'])
    common_index = vta_preds_df.index.intersection(aligned_meta.index)

    # Extract the aligned predictions.
    vta_preds = np.array(vta_preds_df.loc[common_index]['final_forecast'].tolist())

    # Find the original integer indices to slice the ground truth array correctly.
    gt_indices = [test_meta.index.get_loc(idx) for idx in aligned_meta.loc[common_index].index]
    vta_gt = test_y[gt_indices]
    vta_meta = test_meta.iloc[gt_indices]

    # Calculate error metrics for the VTA model.
    all_error_metrics.append(_calculate_error_metrics(
        'VTA', vta_preds, vta_gt, vta_meta, ticker_to_dataset_map
    ))

    # Run portfolio backtest for the VTA model.
    vta_pivot = vta_inference_results.pivot(index='window_end_date', columns='ticker', values='final_forecast')
    all_portfolio_metrics.append(_run_portfolio_backtest(
        'VTA', vta_pivot, cleansed_df, config
    ))

    # --- 2. Evaluate Baseline Models ---
    for model_name, model_path in baseline_model_paths.items():
        print(f"\nEvaluating baseline model: {model_name}...")

        # Instantiate the correct model architecture from the config.
        model_params = config['baseline_configs'][model_name]['model_params']
        if model_name == 'DLinear':
            model = DLinear(test_X.shape[1], test_y.shape[1], test_X.shape[2], **model_params)
        elif model_name == 'TSMixer':
            model = TSMixer(test_X.shape[1], test_y.shape[1], test_X.shape[2], **model_params)
        else:
            raise ValueError(f"Unknown baseline model name: {model_name}")

        # Load the best checkpoint from the baseline training phase.
        model.load_state_dict(torch.load(model_path, map_location=device))
        model.to(device)
        model.eval()

        # Generate predictions for the entire test set.
        with torch.no_grad():
            baseline_preds = model(torch.from_numpy(test_X).float().to(device)).cpu().numpy()

        # Calculate error metrics for the baseline model.
        all_error_metrics.append(_calculate_error_metrics(
            model_name, baseline_preds, test_y, test_meta, ticker_to_dataset_map
        ))

        # Prepare the predictions pivot table for the backtester.
        baseline_preds_df = test_meta.copy()
        baseline_preds_df['prediction'] = list(baseline_preds)
        baseline_pivot = baseline_preds_df.pivot(index='window_end_date', columns='ticker', values='prediction')

        # Run the portfolio backtest for the baseline model.
        all_portfolio_metrics.append(_run_portfolio_backtest(
            model_name, baseline_pivot, cleansed_df, config
        ))

    # --- Finalization ---
    # Combine all results into final summary DataFrames.
    error_summary_df = pd.concat(all_error_metrics)
    portfolio_summary_df = pd.concat(all_portfolio_metrics)

    print("\n--- Evaluation Complete ---")
    print("\nError Metrics Summary (MSE/MAE):")
    print(error_summary_df)
    print("\nPortfolio Performance Summary:")
    print(portfolio_summary_df)

    # Return the final, formatted summary tables.
    return {
        'error_metrics': error_summary_df,
        'portfolio_metrics': portfolio_summary_df
    }


In [None]:
# Task 14: Create an Orchestrator Function for End-to-End Pipeline

# ==============================================================================
# Task 14: Create an Orchestrator Function for End-to-End Pipeline
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 14, Step 1: Data Preparation Sub-Orchestrator
# ------------------------------------------------------------------------------

def _run_data_preparation_pipeline(
    raw_market_data_df: pd.DataFrame,
    config: Dict[str, Any],
    ticker_to_market_map: Dict[str, str],
    dataset_name: str
) -> Dict[str, Any]:
    """
    Orchestrates the complete data preparation pipeline (Tasks 1-5).

    This function executes all data validation, cleansing, windowing, annotation,
    and prompt assembly steps in a sequential and rigorous manner. It corrects
    a critical flaw in a previous version by ensuring that data splits ('train',
    'val', 'test') are processed independently after the windowing stage,
    preventing any cross-contamination of artifacts.

    Args:
        raw_market_data_df: The raw input DataFrame of market data.
        config: The master configuration dictionary.
        ticker_to_market_map: A map from ticker to market (e.g., 'US').
        dataset_name: The name of the dataset to guide the splitting logic
                      (e.g., "StockNet").

    Returns:
        A dictionary containing all data artifacts required for the modeling stages,
        cleanly separated by data split where appropriate.
    """
    # --- Initialization ---
    # Announce the start of the pipeline for clear logging.
    print("--- Starting Data Preparation Pipeline (Tasks 1-5) ---")

    # Initialize the dictionary that will hold all generated artifacts.
    artifacts: Dict[str, Any] = {}

    # --- Task 1: Validate Inputs and Configuration ---
    # This is the first gatekeeper step to ensure all inputs are sound.
    print("\n[Task 1] Validating inputs and master configuration...")
    validate_inputs_and_config(raw_market_data_df, config)
    print("Validation successful.")

    # --- Task 2: Cleanse and Prepare Raw Market Data ---
    # This step removes invalid data and aligns the timeline to trading calendars.
    print("\n[Task 2] Cleansing and preparing raw market data...")
    cleansed_df, cleansing_reports = cleanse_and_prepare_data(raw_market_data_df, ticker_to_market_map)
    artifacts['cleansed_df'] = cleansed_df
    artifacts['cleansing_reports'] = cleansing_reports
    print("Data cleansing complete.")

    # --- Task 3: Construct Sliding Windows and Split Data ---
    # This step transforms the flat time-series into windowed samples for the models.
    print("\n[Task 3] Constructing sliding windows and splitting data...")
    window_artifacts = construct_windows_and_split_data(cleansed_df, config, dataset_name)
    # Update the main artifacts dictionary with the results (data_splits, summary_report, etc.).
    artifacts.update(window_artifacts)
    print("Windowing and splitting complete.")

    # --- Task 4 & 5: Split-wise Annotation and Prompt Assembly ---
    # This is the corrected logic. We process each data split independently.
    print("\n[Tasks 4 & 5] Computing annotations and assembling prompts for each data split...")

    # Initialize nested dictionaries to store the split-specific artifacts.
    annotation_artifacts: Dict[str, Dict[str, Any]] = {'train': {}, 'val': {}, 'test': {}}
    prompt_artifacts: Dict[str, List[str]] = {'train': [], 'val': [], 'test': []}
    parser_instance = None

    # Loop through each data split ('train', 'val', 'test').
    for split_name in artifacts['data_splits'].keys():
        print(f"\nProcessing split: '{split_name}'...")

        # Extract the data for the current split.
        split_data = artifacts['data_splits'][split_name]

        # Check if the split is empty. If so, skip processing.
        if split_data['X'].shape[0] == 0:
            print(f"Split '{split_name}' is empty. Skipping.")
            continue

        # --- Task 4: Compute Technical Annotations for the current split ---
        # Call the Task 4 orchestrator on the split's `X` data.
        annotations = compute_technical_annotations(split_data['X'], config)
        annotation_artifacts[split_name] = annotations
        print(f"Computed annotations for {split_data['X'].shape[0]} samples in '{split_name}' split.")

        # --- Task 5: Assemble and Validate Prompts for the current split ---
        # Call the Task 5 orchestrator on the split's data and its newly created annotations.
        # The `assemble_and_validate_prompts` function was fixed to no longer require `cleansed_df`.
        prompts, parser = assemble_and_validate_prompts(
            split_data['X'],
            split_data['meta'],
            annotations,
            config
        )
        prompt_artifacts[split_name] = prompts

        # The parser is the same for all splits, so we only need to store it once.
        if parser_instance is None:
            parser_instance = parser

        print(f"Assembled and validated {len(prompts)} prompts for '{split_name}' split.")

    # Store the final, split-wise artifacts.
    artifacts['annotations'] = annotation_artifacts
    artifacts['prompts'] = prompt_artifacts
    artifacts['parser'] = parser_instance

    # --- Finalization ---
    print("\n--- Data Preparation Pipeline Complete ---")

    # Return the comprehensive dictionary of all generated data artifacts.
    return artifacts

# ------------------------------------------------------------------------------
# Task 14, Step 2: Modeling, Evaluation, and Ablation Sub-Orchestrator
# ------------------------------------------------------------------------------

def _save_artifact(data: Any, path: str) -> None:
    """
    Saves a Python object to a specified file path using pickle serialization.

    This utility function is a core component of a resumable pipeline. It ensures
    that intermediate and final results (artifacts) are reliably persisted to
    disk. It automatically creates the necessary directory structure if it does
    not exist.

    Args:
        data: The Python object to be saved (e.g., DataFrame, dict, model state).
        path: The full file path where the artifact will be saved.

    Raises:
        IOError: If the file cannot be written due to permissions or other
                 file system issues.
        pickle.PicklingError: If the provided object cannot be serialized by pickle.
    """
    try:
        # Get the directory part of the specified path.
        dir_path = os.path.dirname(path)

        # Create the directory structure if it doesn't already exist.
        # The `exist_ok=True` argument prevents an error if the directory is already there.
        os.makedirs(dir_path, exist_ok=True)

        # Open the file in binary write mode ('wb') and serialize the object.
        with open(path, 'wb') as f:
            pickle.dump(data, f)

        # Log a confirmation message indicating successful save.
        logging.info(f"Artifact saved successfully to: {path}")

    except (IOError, pickle.PicklingError) as e:
        # Log a critical error if saving fails and re-raise to halt the pipeline.
        logging.error(f"Failed to save artifact to {path}. Error: {e}")
        raise


def _load_artifact(path: str) -> Any:
    """
    Loads a Python object from a specified file path using pickle deserialization.

    This utility is the counterpart to `_save_artifact` and is essential for
    reloading previously computed results, enabling the pipeline to be resumed.

    Args:
        path: The full file path of the artifact to be loaded.

    Returns:
        The deserialized Python object.

    Raises:
        FileNotFoundError: If no file exists at the specified path.
        IOError: If the file cannot be read.
        pickle.UnpicklingError: If the file content is not a valid pickle stream.
    """
    # --- Input Validation ---
    # First, check if the file exists to provide a clear error message.
    if not os.path.exists(path):
        # Raise FileNotFoundError, which is the most specific and appropriate error.
        raise FileNotFoundError(f"Artifact not found at path: {path}")

    try:
        # Open the file in binary read mode ('rb') and deserialize the object.
        with open(path, 'rb') as f:
            data = pickle.load(f)

        # Log a confirmation message indicating successful load.
        logging.info(f"Artifact loaded successfully from: {path}")

        # Return the loaded object.
        return data

    except (IOError, pickle.UnpicklingError) as e:
        # Log a critical error if loading fails and re-raise to halt the pipeline.
        logging.error(f"Failed to load artifact from {path}. The file may be corrupted. Error: {e}")
        raise

# ------------------------------------------------------------------------------
# Task 14, Step 3: Modeling Sub-Orchestrator
# ------------------------------------------------------------------------------

def _run_modeling_pipeline(
    data_artifacts: Dict[str, Any],
    config: Dict[str, Any],
    run_dir: str
) -> Dict[str, Any]:
    """
    Orchestrates the complete modeling and evaluation pipeline (Tasks 6-13).

    This function is designed to be robust and resumable. For each major,
    time-consuming task, it checks if a corresponding output artifact already
    exists in the specified `run_dir`. If so, it skips the task and loads the
    artifact. Otherwise, it runs the task, saves the output artifact, and proceeds.
    This ensures that the pipeline can be restarted from the point of failure
    without re-computing completed stages.

    Args:
        data_artifacts: The dictionary of artifacts from the data preparation pipeline.
        config: The master configuration dictionary.
        run_dir: The root directory for saving all artifacts for this specific run.

    Returns:
        A dictionary containing all artifacts and results from the modeling and
        evaluation stages.
    """
    # --- Initialization ---
    # Log the start of this major pipeline stage.
    logging.info("--- Starting Modeling & Evaluation Pipeline (Tasks 6-13) ---")

    # Initialize a dictionary to hold all generated modeling artifacts.
    modeling_artifacts: Dict[str, Any] = {}

    # --- Task 6: Train Reasoning LLM - Stage 1 (Cold-Start RL) ---
    # Define the single, atomic artifact path for this task's complete output.
    stage1_artifact_path = os.path.join(run_dir, "artifacts", "task_6_stage1_outputs.pkl")

    # Resumability Check: If the artifact exists, load it; otherwise, run the task.
    if os.path.exists(stage1_artifact_path):
        logging.info("[Task 6] Artifact found. Skipping Stage 1 RL training and loading from disk.")
        stage1_artifacts = _load_artifact(stage1_artifact_path)
    else:
        logging.info("[Task 6] Artifact not found. Running Stage 1 RL training...")
        # Execute the Task 6 orchestrator.
        stage1_artifacts = train_reasoning_llm_stage1(
            prompts=data_artifacts['prompts']['train'],
            y_targets=data_artifacts['data_splits']['train']['y'],
            metadata=data_artifacts['data_splits']['train']['meta'],
            parser=data_artifacts['parser'],
            config=config,
            # Pass the run-specific directory for saving model checkpoints.
            output_dir=os.path.join(run_dir, "models", "stage1_lora")
        )
        # Save the complete output of the task as a single atomic artifact.
        _save_artifact(stage1_artifacts, stage1_artifact_path)
    # Store the loaded or computed artifact.
    modeling_artifacts['stage1_artifacts'] = stage1_artifacts

    # --- Task 7: Train Reasoning LLM - Stage 2 (SFT) ---
    # The primary artifact of this stage is the path to the best model checkpoint.
    stage2_model_path = os.path.join(run_dir, "models", "stage2_sft", "best_checkpoint")
    if os.path.exists(stage2_model_path):
        logging.info("[Task 7] SFT model checkpoint found. Skipping Stage 2 SFT.")
    else:
        logging.info("[Task 7] SFT model not found. Running Stage 2 SFT...")
        # The SFT trainer saves its own model, so we don't need to save its return value.
        # We just need to ensure it runs and creates the expected directory.
        _ = train_reasoning_llm_stage2(
            stage1_artifacts=stage1_artifacts,
            parser=data_artifacts['parser'],
            config=config,
            output_dir=os.path.join(run_dir, "models", "stage2_sft")
        )
    # Store the path to the artifact for the next stage.
    modeling_artifacts['stage2_reasoning_model_path'] = stage2_model_path

    # --- Task 8: Train Reasoning LLM - Stage 3 (Final RL) ---
    stage3_model_path = os.path.join(run_dir, "models", "stage3_final_lora", "best_checkpoint")
    if os.path.exists(stage3_model_path):
        logging.info("[Task 8] Final reasoning model found. Skipping Stage 3 RL training.")
    else:
        logging.info("[Task 8] Final reasoning model not found. Running Stage 3 RL training...")
        _ = train_reasoning_llm_stage3(
            sft_checkpoint_path=stage2_model_path,
            prompts=data_artifacts['prompts']['train'],
            y_targets=data_artifacts['data_splits']['train']['y'],
            metadata=data_artifacts['data_splits']['train']['meta'],
            parser=data_artifacts['parser'],
            config=config,
            output_dir=os.path.join(run_dir, "models", "stage3_final_lora")
        )
    modeling_artifacts['stage3_reasoning_model_path'] = stage3_model_path

    # --- Task 9: Train Forecasting Backbone (œÜ) ---
    backbone_path = os.path.join(run_dir, "models", "forecasting_backbone", "best_checkpoint.pth")
    if os.path.exists(backbone_path):
        logging.info("[Task 9] Forecasting backbone model found. Skipping training.")
    else:
        logging.info("[Task 9] Forecasting backbone not found. Running training...")
        _ = train_forecasting_backbone(
            data_splits=data_artifacts['data_splits'],
            config=config,
            output_dir=os.path.join(run_dir, "models", "forecasting_backbone")
        )
    modeling_artifacts['backbone_model_path'] = backbone_path

    # --- Task 10: Train Conditional Forecaster (œà) ---
    fusion_model_path = os.path.join(run_dir, "models", "conditional_fusion_model", "best_checkpoint.pth")
    if os.path.exists(fusion_model_path):
        logging.info("[Task 10] Conditional fusion model found. Skipping training.")
    else:
        logging.info("[Task 10] Conditional fusion model not found. Running training...")
        # Combine all prompts for the attribute derivation step.
        all_prompts = data_artifacts['prompts']['train'] + data_artifacts['prompts']['val']
        _ = train_conditional_forecaster(
            data_splits=data_artifacts['data_splits'],
            prompts=all_prompts,
            reasoning_model_path=stage3_model_path,
            backbone_path=backbone_path,
            parser=data_artifacts['parser'],
            config=config,
            output_dir=os.path.join(run_dir, "models", "conditional_fusion_model")
        )
    modeling_artifacts['conditional_model_path'] = fusion_model_path

    # --- Task 11: Perform End-to-End Inference ---
    inference_artifact_path = os.path.join(run_dir, "artifacts", "task_11_inference_results.pkl")
    if os.path.exists(inference_artifact_path):
        logging.info("[Task 11] Inference results found. Skipping inference.")
        vta_inference_results = _load_artifact(inference_artifact_path)
    else:
        logging.info("[Task 11] Inference results not found. Running end-to-end inference...")
        # Combine all prompts from all splits for the inference step.
        all_prompts = data_artifacts['prompts']['train'] + data_artifacts['prompts']['val'] + data_artifacts['prompts']['test']
        vta_inference_results = run_end_to_end_inference(
            data_splits=data_artifacts['data_splits'],
            prompts=all_prompts,
            reasoning_model_path=stage3_model_path,
            conditional_model_path=fusion_model_path,
            parser=data_artifacts['parser'],
            config=config
        )
        _save_artifact(vta_inference_results, inference_artifact_path)
    modeling_artifacts['vta_inference_results'] = vta_inference_results

    # --- Task 12: Train and Evaluate Baselines ---
    baselines_artifact_path = os.path.join(run_dir, "artifacts", "task_12_baseline_artifacts.pkl")
    if os.path.exists(baselines_artifact_path):
        logging.info("[Task 12] Baseline artifacts found. Skipping training.")
        baseline_artifacts = _load_artifact(baselines_artifact_path)
    else:
        logging.info("[Task 12] Baseline artifacts not found. Running baseline training...")
        baseline_artifacts = train_and_evaluate_baselines(
            data_splits=data_artifacts['data_splits'],
            config=config,
            run_dir=run_dir
        )
        _save_artifact(baseline_artifacts, baselines_artifact_path)
    modeling_artifacts.update(baseline_artifacts)

    # --- Task 13: Evaluate Final Performance ---
    # This is a fast, analytical step, so we run it every time without caching.
    logging.info("[Task 13] Evaluating final performance and utility of all models...")
    final_evaluation_results = evaluate_performance_and_utility(
        vta_inference_results=vta_inference_results,
        baseline_artifacts=baseline_artifacts,
        data_splits=data_artifacts['data_splits'],
        cleansed_df=data_artifacts['cleansed_df'],
        config=config
    )
    modeling_artifacts['final_evaluation_results'] = final_evaluation_results

    # --- Finalization ---
    logging.info("--- Modeling & Evaluation Pipeline Complete ---")

    # Return the comprehensive dictionary of all modeling and evaluation artifacts.
    return modeling_artifacts

# ------------------------------------------------------------------------------
# Task 14, Master Orchestrator
# ------------------------------------------------------------------------------

def _configure_for_dry_run(config: Dict[str, Any]) -> Dict[str, Any]:
    """
    Modifies a configuration dictionary for a fast, low-resource dry run.

    This helper function takes a deep copy of the main configuration and
    systematically reduces computational parameters (e.g., epochs, steps)
    across all training stages to enable a quick, end-to-end test of the
    pipeline's integrity.

    Args:
        config: The original master configuration dictionary.

    Returns:
        A new configuration dictionary modified for the dry run.
    """
    # Create a deep copy to avoid modifying the original configuration object.
    dry_run_config = copy.deepcopy(config)

    logging.warning("!!! CONFIGURING FOR DRY-RUN MODE !!!")
    logging.warning("Training steps and epochs will be reduced to minimal values.")

    # Modify RL training parameters for all stages.
    dry_run_config['reasoning_model_config']['rl_training_params']['max_steps'] = 2
    dry_run_config['reasoning_model_config']['rl_training_params']['updates_per_batch'] = 1
    dry_run_config['reasoning_model_config']['rl_training_params']['batch_size'] = 2 # Smaller batch size

    # Modify SFT training parameters.
    dry_run_config['reasoning_model_config']['sft_training_params']['epochs'] = 1
    dry_run_config['reasoning_model_config']['sft_training_params']['batch_size'] = 2

    # Modify forecasting backbone training parameters.
    dry_run_config['forecasting_model_config']['training_params']['epochs'] = 1
    dry_run_config['forecasting_model_config']['training_params']['batch_size'] = 4

    # Modify conditional forecaster training parameters.
    dry_run_config['conditional_fusion_config']['training_params']['epochs'] = 1
    dry_run_config['conditional_fusion_config']['training_params']['batch_size'] = 4

    # Modify baseline training parameters.
    if 'baseline_configs' in dry_run_config:
        for model_name in dry_run_config['baseline_configs']:
            dry_run_config['baseline_configs'][model_name]['train_params']['epochs'] = 1
            dry_run_config['baseline_configs'][model_name]['train_params']['batch_size'] = 4

    return dry_run_config


def run_vta_pipeline(
    raw_market_data_df: pd.DataFrame,
    config: Dict[str, Any],
    ticker_to_market_map: Dict[str, str],
    ticker_to_dataset_map: Dict[str, str],
    dataset_name: str,
    run_id: str = None,
    dry_run: bool = False
) -> Dict[str, Any]:
    """
    Executes the complete, end-to-end VTA research pipeline.

    This master orchestrator manages the entire workflow from raw data to final
    evaluation. It establishes a reproducible environment by setting random seeds,
    configuring file-based logging, and creating a unique directory for all
    experimental artifacts. It also includes a `dry_run` mode for rapid,
    end-to-end testing of the entire codebase.

    Args:
        raw_market_data_df: The raw input DataFrame of market data.
        config: The master configuration dictionary.
        ticker_to_market_map: A map from ticker to market (e.g., 'US').
        ticker_to_dataset_map: A map from ticker to dataset name (e.g., 'StockNet').
        dataset_name: The name of the dataset to guide splitting logic.
        run_id: An optional identifier for the run. If None, a timestamp-based
                ID is generated.
        dry_run: If True, runs the pipeline on a small subset of data with minimal
                 training settings for quick validation of the code.

    Returns:
        A comprehensive dictionary containing all artifacts generated throughout
        the entire pipeline, organized by stage.
    """
    # --- 1. Reproducibility and Environment Setup ---
    # Generate a unique run ID using the current timestamp if one is not provided.
    if run_id is None:
        run_id = datetime.now().strftime('%Y%m%d_%H%M%S')
    if dry_run:
        run_id += "_dry_run"

    # Define the root directory for all artifacts of this run.
    run_dir = os.path.join("./results", run_id)
    os.makedirs(run_dir, exist_ok=True)

    # Configure centralized logging to both a file and the console for this run.
    log_file = os.path.join(run_dir, "pipeline.log")
    # Remove any existing handlers to ensure a clean logger setup.
    for handler in logging.root.handlers[:]:
        logging.root.removeHandler(handler)
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s [%(levelname)s] - %(message)s',
        handlers=[
            logging.FileHandler(log_file),
            logging.StreamHandler()
        ]
    )

    logging.info("--- Initializing VTA End-to-End Pipeline ---")
    logging.info(f"Run ID: {run_id}")
    logging.info(f"Artifacts will be saved in: {run_dir}")

    # Make a deep copy of the config to avoid modifying the original object.
    run_config = copy.deepcopy(config)

    # Set global random seeds for full reproducibility.
    try:
        seeds = run_config['reproducibility']['seeds']
        random.seed(seeds['python'])
        np.random.seed(seeds['numpy'])
        torch.manual_seed(seeds['framework'])
        if torch.cuda.is_available():
            torch.cuda.manual_seed_all(seeds['framework'])
            torch.backends.cudnn.deterministic = True
            torch.backends.cudnn.benchmark = False
        logging.info(f"Random seeds set: {seeds}")
    except KeyError as e:
        raise ValueError(f"Configuration missing 'reproducibility.seeds' key: {e}")

    # --- 2. Handle Dry-Run Mode ---
    if dry_run:
        # Modify the configuration for a fast, low-resource run.
        run_config = _configure_for_dry_run(run_config)

        # Slice the input data to a small subset.
        all_tickers = raw_market_data_df.index.get_level_values('ticker').unique()
        if len(all_tickers) > 2:
            dry_run_tickers = all_tickers[:2]
            raw_market_data_df = raw_market_data_df[
                raw_market_data_df.index.get_level_values('ticker').isin(dry_run_tickers)
            ]
            logging.info(f"Data sliced to {len(dry_run_tickers)} tickers for dry run: {dry_run_tickers.tolist()}")

    # Save the exact configuration used for this run for a complete audit trail.
    _save_artifact(run_config, os.path.join(run_dir, "run_config.pkl"))

    # --- 3. Execute Sub-Pipelines ---
    try:
        # Execute the data preparation pipeline (Tasks 1-5).
        data_artifacts = _run_data_preparation_pipeline(
            raw_market_data_df=raw_market_data_df,
            config=run_config,
            ticker_to_market_map=ticker_to_market_map,
            dataset_name=dataset_name
        )

        # Execute the modeling and evaluation pipeline (Tasks 6-13).
        modeling_artifacts = _run_modeling_pipeline(
            data_artifacts=data_artifacts,
            config=run_config,
            run_dir=run_dir
        )
    except Exception as e:
        # Catch any exception during the pipeline execution, log it, and re-raise.
        logging.error("!!! PIPELINE EXECUTION FAILED !!!", exc_info=True)
        raise e

    # --- 4. Finalization ---
    # Combine all artifacts into a single, comprehensive output dictionary.
    final_artifacts = {
        "run_id": run_id,
        "run_dir": run_dir,
        "data_preparation_artifacts": data_artifacts,
        "modeling_and_evaluation_artifacts": modeling_artifacts
    }

    logging.info(f"--- VTA End-to-End Pipeline Finished Successfully for run {run_id} ---")

    # Close the logger handlers.
    logging.shutdown()

    return final_artifacts


In [None]:
# Task 15: Conduct Robustness Analyses (Sensitivity Analyses)

# ==============================================================================
# Task 15: Conduct Robustness Analyses (Sensitivity Analyses)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 15, Step 1: Sensitivity to key hyperparameters
# ------------------------------------------------------------------------------

def _deep_set_value(d: Dict[str, Any], key_path: str, value: Any) -> None:
    """
    Safely sets a value in a deeply nested dictionary using a dot-separated key path.

    Args:
        d: The dictionary to modify.
        key_path: A string representing the path (e.g., 'a.b.c').
        value: The value to set at the specified path.

    Raises:
        KeyError: If an intermediate key in the path does not exist.
    """
    # Split the key path into individual keys.
    keys = key_path.split('.')
    # Traverse the dictionary to the second-to-last key.
    for key in keys[:-1]:
        d = d[key]
    # Set the value on the final key.
    d[keys[-1]] = value


# ------------------------------------------------------------------------------
# Task 15, Orchestrator Function
# ------------------------------------------------------------------------------

def run_sensitivity_analysis(
    base_config: Dict[str, Any],
    # Pass through all required arguments for the main pipeline
    raw_market_data_df: pd.DataFrame,
    ticker_to_market_map: Dict[str, str],
    ticker_to_dataset_map: Dict[str, str],
    dataset_name: str,
    base_run_id: str
) -> pd.DataFrame:
    """
    Orchestrates a sensitivity analysis by systematically varying key hyperparameters.

    This function runs the full end-to-end VTA pipeline multiple times, each
    time modifying a single hyperparameter to observe its effect on the final
    performance metrics. It leverages the resumability of the main pipeline
    to avoid re-computing stages that are unaffected by a given change.

    Args:
        base_config: The baseline master configuration dictionary.
        raw_market_data_df: The raw input DataFrame of market data.
        ticker_to_market_map: A map from ticker to market (e.g., 'US').
        ticker_to_dataset_map: A map from ticker to dataset name (e.g., 'StockNet').
        dataset_name: The name of the dataset to guide splitting logic.
        base_run_id: A base identifier for this suite of experiments.

    Returns:
        A pandas DataFrame summarizing the performance metrics (MSE, MAE, Sharpe)
        for each tested hyperparameter value.
    """
    # --- Define the Hyperparameter Space for Sensitivity Analysis ---
    # This dictionary maps a human-readable name to the config path and values to test.
    sensitivity_params = {
        'p_uncond': {
            'path': 'conditional_fusion_config.training_params.unconditional_probability',
            'values': [0.1, 0.3, 0.5]
        },
        'guidance_scale_s': {
            'path': 'conditional_fusion_config.inference_params.guidance_scale',
            'values': [0.0, 0.1, 0.3]
        },
        'reward_scale_lambda': {
            'path': 'reasoning_model_config.rl_training_params.reward_scale_lambda',
            'values': [0.5, 1.0, 2.0]
        },
        'group_size_G': {
            'path': 'reasoning_model_config.rl_training_params.group_size_G',
            'values': [4, 8, 16]
        },
        'pca_prototypes_D': {
            'path': 'forecasting_model_config.architecture_params.pca_n_prototypes_D',
            'values': [50, 100, 200]
        }
    }

    # List to store the results of each experimental run.
    all_results = []

    # --- Main Experiment Loop ---
    # Iterate through each hyperparameter to be tested.
    for param_name, details in sensitivity_params.items():
        print(f"\n{'='*80}\nSENSITIVITY ANALYSIS FOR: {param_name}\n{'='*80}")

        # Iterate through each value for the current hyperparameter.
        for value in details['values']:
            # --- 1. Create Modified Configuration ---
            # Create a deep copy of the base config to avoid side effects.
            experiment_config = copy.deepcopy(base_config)

            # Use the helper function to safely set the nested value.
            _deep_set_value(experiment_config, details['path'], value)

            # --- 2. Define Unique Run ID ---
            # Create a unique ID for this specific experiment run.
            # This ensures that artifacts are stored separately for each configuration.
            experiment_run_id = f"{base_run_id}_{param_name}_{value}"
            print(f"\n--- Running experiment with {param_name} = {value} (Run ID: {experiment_run_id}) ---")

            # --- 3. Run the Full Pipeline ---
            # Call the master orchestrator from Task 14. Its internal resumability
            # will handle skipping of already-computed steps if possible.
            final_artifacts = run_vta_pipeline(
                raw_market_data_df=raw_market_data_df,
                config=experiment_config,
                ticker_to_market_map=ticker_to_market_map,
                ticker_to_dataset_map=ticker_to_dataset_map,
                dataset_name=dataset_name,
                run_id=experiment_run_id,
                dry_run=False # Always run full experiments for analysis
            )

            # --- 4. Record Results ---
            # Extract the key performance metrics from the final evaluation results.
            eval_results = final_artifacts['modeling_and_evaluation']['final_evaluation_results']
            error_metrics = eval_results['error_metrics']
            portfolio_metrics = eval_results['portfolio_metrics']

            # Store the results in a structured dictionary.
            result_record = {
                'parameter': param_name,
                'value': value,
                'MSE': error_metrics.loc['VTA', 'All_Data_MSE'],
                'MAE': error_metrics.loc['VTA', 'All_Data_MAE'],
                'Sharpe': portfolio_metrics.loc['VTA', 'Sharpe']
            }
            all_results.append(result_record)

            print(f"--- Completed experiment with {param_name} = {value}. Results: {result_record} ---")

    # --- 5. Finalization ---
    # Convert the list of result dictionaries into a final summary DataFrame.
    results_df = pd.DataFrame(all_results)

    # Set a multi-index for easy analysis and plotting.
    results_df = results_df.set_index(['parameter', 'value'])

    print("\n\n--- Sensitivity Analysis Complete ---")
    print("Summary of Results:")
    print(results_df)

    # Save the final results table to the base run directory.
    results_df.to_csv(f"./results/{base_run_id}_sensitivity_analysis_summary.csv")

    return results_df


In [None]:
# Top-Level Orchestrator

# ==============================================================================
# Final Task: Create a Top-Level Orchestrator for the Entire Project
# ==============================================================================

def main(
    market_data_path: str,
    config: Dict[str, Any],
    ticker_to_market_map: Dict[str, str],
    ticker_to_dataset_map: Dict[str, str],
    dataset_name: str,
    base_run_id: str,
    run_sensitivity: bool = True,
    dry_run: bool = False
) -> Dict[str, Any]:
    """
    The master orchestrator for the entire VTA research project.

    This function serves as the main entry point for running the complete
    experimental pipeline. It executes two major phases:
    1.  A baseline run of the full VTA pipeline (Tasks 1-14) to establish the
        primary results.
    2.  An optional, comprehensive sensitivity analysis (Task 15) to evaluate
        the model's robustness to key hyperparameter changes.

    It leverages the robust, resumable sub-orchestrators for each phase and
    manages the overall experimental setup, including data loading, configuration,
    and artifact aggregation.

    Args:
        market_data_path: The file path to the raw market data (e.g., CSV or Parquet).
        config: The master configuration dictionary for the entire project.
        ticker_to_market_map: A map from ticker to market (e.g., 'US').
        ticker_to_dataset_map: A map from ticker to dataset name (e.g., 'StockNet').
        dataset_name: The name of the primary dataset to guide splitting logic.
        base_run_id: A base identifier for this suite of experiments, used to
                     group related runs.
        run_sensitivity: If True, the sensitivity analysis will be run after the
                         baseline pipeline completes.
        dry_run: If True, both pipelines will be run in dry-run mode on a small
                 subset of data for rapid testing.

    Returns:
        A comprehensive dictionary containing the artifacts from the baseline run
        and, if executed, the results of the sensitivity analysis.

    Raises:
        FileNotFoundError: If the `market_data_path` is invalid.
    """
    # --- 1. Initial Setup and Data Loading ---
    print(f"{'='*80}\nSTARTING VTA PROJECT EXECUTION\n{'='*80}")

    # --- Input Validation ---
    # Verify that the input data file exists before proceeding.
    if not os.path.exists(market_data_path):
        raise FileNotFoundError(f"Market data file not found at: {market_data_path}")

    # Load the raw market data from the specified path.
    # This assumes a CSV with the correct MultiIndex structure.
    # A production system might support multiple formats like Parquet.
    print(f"Loading raw market data from: {market_data_path}")
    raw_market_data_df = pd.read_csv(market_data_path, index_col=[0, 1], parse_dates=[0])

    # --- 2. Execute the Baseline VTA Pipeline ---
    # This run establishes the main results and creates all necessary trained models.
    print(f"\n--- Phase 1: Executing Baseline VTA Pipeline ---")

    # Define a specific run ID for the baseline experiment.
    baseline_run_id = f"{base_run_id}_baseline"

    # Call the master orchestrator from Task 14.
    baseline_artifacts = run_vta_pipeline(
        raw_market_data_df=raw_market_data_df,
        config=config,
        ticker_to_market_map=ticker_to_market_map,
        ticker_to_dataset_map=ticker_to_dataset_map,
        dataset_name=dataset_name,
        run_id=baseline_run_id,
        dry_run=dry_run
    )

    # Initialize the final results dictionary with the baseline artifacts.
    final_project_artifacts = {
        "baseline_run_artifacts": baseline_artifacts
    }

    # --- 3. Optionally Execute the Sensitivity Analysis ---
    if run_sensitivity:
        print(f"\n--- Phase 2: Executing Sensitivity Analysis ---")

        # Call the sensitivity analysis orchestrator from Task 15.
        # It uses the same base data and config as the baseline run.
        sensitivity_results_df = run_sensitivity_analysis(
            base_config=config,
            raw_market_data_df=raw_market_data_df,
            ticker_to_market_map=ticker_to_market_map,
            ticker_to_dataset_map=ticker_to_dataset_map,
            dataset_name=dataset_name,
            base_run_id=base_run_id # The function will append param-specific suffixes
        )

        # Add the sensitivity results to the final artifacts.
        final_project_artifacts["sensitivity_analysis_results"] = sensitivity_results_df

    else:
        # Log that the sensitivity analysis was skipped.
        print("\n--- Phase 2: Skipping Sensitivity Analysis as per configuration. ---")
        final_project_artifacts["sensitivity_analysis_results"] = None

    # --- 4. Final Summary ---
    print(f"\n{'='*80}\nVTA PROJECT EXECUTION COMPLETE\n{'='*80}")

    # Print a summary of the key results from the baseline run.
    try:
        final_evals = baseline_artifacts['modeling_and_evaluation']['final_evaluation_results']
        print("\nBaseline VTA Model Performance Summary:")
        print(final_evals['error_metrics'].loc[['VTA']])
        print("\nBaseline VTA Portfolio Utility Summary:")
        print(final_evals['portfolio_metrics'].loc[['VTA']])
    except KeyError:
        print("\nCould not display final summary as evaluation artifacts were not found.")

    # Return the complete set of artifacts from the entire project execution.
    return final_project_artifacts
