# Data Download Workflow

**Systematic Macro Credit Research — Step 1 of 5**

This notebook downloads market data from Bloomberg Terminal for all configured securities in the aponyx framework. It represents the first step in the systematic research workflow outlined in PROJECT_STATUS.md.

## Workflow Position

```
1. Data Download ← YOU ARE HERE
   ↓
2. Signal Computation (02_signal_computation.ipynb)
   ↓
3. Signal Suitability Evaluation (03_suitability_evaluation.ipynb)
   ↓
4. Backtest Execution (04_backtest.ipynb)
   ↓
5. Performance Analysis (05_analysis.ipynb)
```

## Prerequisites

- Active Bloomberg Terminal session
- `xbbg` package installed (`uv pip install xbbg` or `uv sync --extra bloomberg`)
- Bloomberg Terminal authentication configured

## What This Notebook Does

1. **Registry Check** — Review existing datasets in data registry
2. **Security Configuration** — Display all securities from `bloomberg_securities.json`
3. **Data Download** — Fetch data from Bloomberg Terminal for each security
4. **Schema Validation** — Validate data conforms to project schemas
5. **Quality Checks** — Verify data completeness and continuity
6. **Registry Update** — Register downloaded datasets in data catalog

## Outputs

- **Cache files:** `data/cache/bloomberg/*.parquet`
- **Registry:** `data/registry.json` (updated with metadata)
- **Download summary:** Statistics and quality metrics

## Key Design Patterns

- **Provider Abstraction:** Uses `BloombergSource` from data layer
- **Unified Fetch Interface:** `fetch_cdx()`, `fetch_vix()`, `fetch_etf()`
- **TTL-Based Caching:** Automatic caching with 1-day staleness check
- **Schema Validation:** Ensures data meets project requirements
- **Metadata Tracking:** DataRegistry for dataset cataloging

---

# Data Download Workflow

**Systematic Macro Credit Research — Step 1 of 5**

This notebook downloads market data from Bloomberg Terminal for all configured securities in the aponyx framework. It represents the first step in the systematic research workflow outlined in PROJECT_STATUS.md.

## Workflow Position

```
1. Data Download ← YOU ARE HERE
   ↓
2. Signal Computation (02_signal_computation.ipynb)
   ↓
3. Signal Suitability Evaluation (03_suitability_evaluation.ipynb)
   ↓
4. Backtest Execution (04_backtest.ipynb)
   ↓
5. Performance Analysis (05_analysis.ipynb)
```

## Prerequisites

- Active Bloomberg Terminal session
- `xbbg` package installed (`uv pip install xbbg` or `uv sync --extra bloomberg`)
- Bloomberg Terminal authentication configured

## What This Notebook Does

1. **Registry Check** — Review existing datasets in data registry
2. **Security Configuration** — Display all securities from `bloomberg_securities.json`
3. **Data Download** — Fetch data from Bloomberg Terminal for each security
4. **Schema Validation** — Validate data conforms to project schemas
5. **Quality Checks** — Verify data completeness and continuity
6. **Registry Update** — Register downloaded datasets in data catalog

## Outputs

- **Cache files:** `data/cache/bloomberg/*.parquet`
- **Registry:** `data/registry.json` (updated with metadata)
- **Download summary:** Statistics and quality metrics

## Key Design Patterns

- **Provider Abstraction:** Uses `BloombergSource` from data layer
- **Unified Fetch Interface:** `fetch_cdx()`, `fetch_vix()`, `fetch_etf()`
- **TTL-Based Caching:** Automatic caching with 1-day staleness check
- **Schema Validation:** Ensures data meets project requirements
- **Metadata Tracking:** DataRegistry for dataset cataloging

---

In [1]:
import logging
from datetime import datetime, timedelta
from pathlib import Path

import pandas as pd

from aponyx.config import DATA_DIR, REGISTRY_PATH, CACHE_TTL_DAYS
from aponyx.data import fetch_cdx, fetch_vix, fetch_etf
from aponyx.data.sources import BloombergSource
from aponyx.data.registry import DataRegistry
from aponyx.data.bloomberg_config import (
    list_securities,
    get_security_spec,
    validate_bloomberg_registry,
)

# Configure logging for notebook
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
logger = logging.getLogger(__name__)

print("=" * 80)
print("DATA DOWNLOAD WORKFLOW — Step 1 of 5")
print("=" * 80)
print(f"\nConfiguration:")
print(f"  Data directory: {DATA_DIR}")
print(f"  Registry path: {REGISTRY_PATH}")
print(f"  Cache TTL: {CACHE_TTL_DAYS} days")
print(f"\n✓ Imports complete")

DATA DOWNLOAD WORKFLOW — Step 1 of 5

Configuration:
  Data directory: C:\Users\ROG3003\PythonProjects\aponyx\data
  Registry path: C:\Users\ROG3003\PythonProjects\aponyx\data\registry.json
  Cache TTL: 1 days

✓ Imports complete


## 1. Check Current Registry Status

Review what data is already available in the registry.

In [2]:
# Initialize registry
registry = DataRegistry(REGISTRY_PATH, DATA_DIR)

# List all registered datasets
datasets = registry.list_datasets()

print(f"\n{'='*80}")
print(f"CURRENT DATA REGISTRY STATUS")
print(f"{'='*80}\n")
print(f"Registry path: {REGISTRY_PATH}")
print(f"Total datasets: {len(datasets)}\n")

if datasets:
    # Create summary table
    registry_data = []
    for name in datasets:
        entry = registry.get_dataset_entry(name)
        registry_data.append({
            'Dataset': name,
            'Instrument': entry.instrument,
            'Tenor': entry.tenor or 'N/A',
            'Rows': entry.row_count or 0,
            'Start': entry.start_date or 'Unknown',
            'End': entry.end_date or 'Unknown',
        })
    
    registry_df = pd.DataFrame(registry_data)
    print(registry_df.to_markdown(index=False))
    print(f"\n✓ {len(datasets)} datasets currently registered")
else:
    print("No datasets registered yet.")
    print("This is expected on first run.")

2025-11-08 17:11:08,602 - aponyx.persistence.json_io - INFO - Loading JSON from C:\Users\ROG3003\PythonProjects\aponyx\data\registry.json
2025-11-08 17:11:08,603 - aponyx.data.registry - INFO - Loaded existing registry: path=C:\Users\ROG3003\PythonProjects\aponyx\data\registry.json, datasets=0



CURRENT DATA REGISTRY STATUS

Registry path: C:\Users\ROG3003\PythonProjects\aponyx\data\registry.json
Total datasets: 0

No datasets registered yet.
This is expected on first run.


## 2. Review Configured Securities

Display all securities configured in `bloomberg_securities.json` and their Bloomberg tickers.

In [3]:
# Validate Bloomberg registry configuration
instruments, securities = validate_bloomberg_registry()

print(f"\n{'='*80}")
print(f"BLOOMBERG SECURITY CONFIGURATION")
print(f"{'='*80}\n")

# Create summary DataFrame
security_list = []
for sec_id in list_securities():
    spec = get_security_spec(sec_id)
    security_list.append({
        'Security ID': sec_id,
        'Description': spec.description,
        'Bloomberg Ticker': spec.bloomberg_ticker,
        'Instrument Type': spec.instrument_type,
    })

securities_df = pd.DataFrame(security_list)

print(f"Total securities configured: {len(securities_df)}\n")
print(securities_df.to_markdown(index=False))

# Group by instrument type
print(f"\n\nBy Instrument Type:")
instrument_counts = securities_df.groupby('Instrument Type').size()
for inst_type, count in instrument_counts.items():
    print(f"  {inst_type}: {count}")

print(f"\n✓ Configuration validated: {len(instruments)} instrument types, {len(securities)} securities")

2025-11-08 17:11:21,235 - aponyx.data.bloomberg_config - INFO - Bloomberg registry validated: 3 instruments, 8 securities



BLOOMBERG SECURITY CONFIGURATION

Total securities configured: 8

| Security ID   | Description                                       | Bloomberg Ticker             | Instrument Type   |
|:--------------|:--------------------------------------------------|:-----------------------------|:------------------|
| cdx_ig_5y     | CDX North America Investment Grade 5Y             | CDX IG CDSI GEN 5Y Corp      | cdx               |
| cdx_ig_10y    | CDX North America Investment Grade 10Y            | CDX IG CDSI GEN 10Y Corp     | cdx               |
| cdx_hy_5y     | CDX North America High Yield 5Y                   | CDX HY CDSI GEN 5Y SPRD Corp | cdx               |
| itrx_xover_5y | iTraxx Europe Crossover 5Y                        | ITRX XOVER CDSI GEN 5Y Corp  | cdx               |
| itrx_eur_5y   | iTraxx Europe Main 5Y                             | ITRX EUR CDSI GEN 5Y Corp    | cdx               |
| hyg           | iShares iBoxx High Yield Corporate Bond ETF       | HYG US Equity   

## 3. Configure Download Parameters

Set date range and download options.

In [4]:
# Date range for download (default: 5 years of history)
end_date = datetime.now().strftime("%Y-%m-%d")
start_date = (datetime.now() - timedelta(days=5*365)).strftime("%Y-%m-%d")

# Bloomberg source
source = BloombergSource()

# Cache settings (use cache to avoid re-downloading)
use_cache = True

print(f"\n{'='*80}")
print(f"DOWNLOAD CONFIGURATION")
print(f"{'='*80}\n")
print(f"Date Range:")
print(f"  Start: {start_date}")
print(f"  End: {end_date}")
print(f"  Approximate trading days: ~{5 * 252}")
print(f"\nData Source:")
print(f"  Provider: Bloomberg Terminal")
print(f"  Type: BloombergSource()")
print(f"\nCache Settings:")
print(f"  Enabled: {use_cache}")
print(f"  TTL: {CACHE_TTL_DAYS} days")
print(f"  Cache directory: {DATA_DIR / 'cache' / 'bloomberg'}")
print(f"\nSecurities to Download: {len(securities_df)}")
print(f"\n✓ Configuration ready")


DOWNLOAD CONFIGURATION

Date Range:
  Start: 2020-11-09
  End: 2025-11-08
  Approximate trading days: ~1260

Data Source:
  Provider: Bloomberg Terminal
  Type: BloombergSource()

Cache Settings:
  Enabled: True
  TTL: 1 days
  Cache directory: C:\Users\ROG3003\PythonProjects\aponyx\data\cache\bloomberg

Securities to Download: 8

✓ Configuration ready


## 4. Download Data by Instrument Type

Download data for all securities, grouped by instrument type.

### CDX Indices

In [5]:
cdx_securities = list_securities(instrument_type="cdx")
print(f"Downloading {len(cdx_securities)} CDX indices...\n")

cdx_results = {}
for security in cdx_securities:
    try:
        spec = get_security_spec(security)
        print(f"Fetching {security}: {spec.description}")
        print(f"  Ticker: {spec.bloomberg_ticker}")
        
        df = fetch_cdx(
            source=source,
            security=security,
            start_date=start_date,
            end_date=end_date,
            use_cache=use_cache,
        )
        
        cdx_results[security] = df
        print(f"  ✓ Downloaded {len(df)} rows ({df.index.min()} to {df.index.max()})")
        print(f"  Columns: {list(df.columns)}")
        print()
        
    except Exception as e:
        print(f"  ✗ Error: {str(e)}")
        logger.error("Failed to fetch %s: %s", security, str(e))
        print()

print(f"CDX download complete: {len(cdx_results)}/{len(cdx_securities)} successful")

2025-11-08 17:12:10,514 - aponyx.data.fetch - INFO - Fetching CDX from bloomberg
2025-11-08 17:12:10,515 - aponyx.data.providers.bloomberg - INFO - Fetching cdx from Bloomberg: ticker=CDX IG CDSI GEN 5Y Corp, dates=2020-11-09 to 2025-11-08
2025-11-08 17:12:10,515 - aponyx.data.providers.bloomberg - INFO - Fetching cdx from Bloomberg: ticker=CDX IG CDSI GEN 5Y Corp, dates=2020-11-09 to 2025-11-08


Downloading 5 CDX indices...

Fetching cdx_ig_5y: CDX North America Investment Grade 5Y
  Ticker: CDX IG CDSI GEN 5Y Corp


Skipped: could not import 'blpapi': No module named 'blpapi'

### ETF Data

In [None]:
etf_securities = list_securities(instrument_type="etf")
print(f"Downloading {len(etf_securities)} ETFs...\n")

etf_results = {}
for security in etf_securities:
    try:
        spec = get_security_spec(security)
        print(f"Fetching {security}: {spec.description}")
        print(f"  Ticker: {spec.bloomberg_ticker}")
        
        df = fetch_etf(
            source=source,
            security=security,
            start_date=start_date,
            end_date=end_date,
            use_cache=use_cache,
        )
        
        etf_results[security] = df
        print(f"  ✓ Downloaded {len(df)} rows ({df.index.min()} to {df.index.max()})")
        print(f"  Columns: {list(df.columns)}")
        print()
        
    except Exception as e:
        print(f"  ✗ Error: {str(e)}")
        logger.error("Failed to fetch %s: %s", security, str(e))
        print()

print(f"ETF download complete: {len(etf_results)}/{len(etf_securities)} successful")

### VIX Index

In [None]:
vix_securities = list_securities(instrument_type="vix")
print(f"Downloading {len(vix_securities)} VIX index...\n")

vix_results = {}
for security in vix_securities:
    try:
        spec = get_security_spec(security)
        print(f"Fetching {security}: {spec.description}")
        print(f"  Ticker: {spec.bloomberg_ticker}")
        
        df = fetch_vix(
            source=source,
            start_date=start_date,
            end_date=end_date,
            use_cache=use_cache,
        )
        
        vix_results[security] = df
        print(f"  ✓ Downloaded {len(df)} rows ({df.index.min()} to {df.index.max()})")
        print(f"  Columns: {list(df.columns)}")
        print()
        
    except Exception as e:
        print(f"  ✗ Error: {str(e)}")
        logger.error("Failed to fetch %s: %s", security, str(e))
        print()

print(f"VIX download complete: {len(vix_results)}/{len(vix_securities)} successful")

## 5. Download Summary

Review download results and data quality.

In [None]:
# Combine all results
all_results = {**cdx_results, **etf_results, **vix_results}

print(f"\n{'='*80}")
print(f"DOWNLOAD SUMMARY")
print(f"{'='*80}\n")

# Create summary table
summary_data = []
for security, df in all_results.items():
    spec = get_security_spec(security)
    summary_data.append({
        'Security': security,
        'Instrument': spec.instrument_type.upper(),
        'Rows': len(df),
        'Start Date': df.index.min().strftime('%Y-%m-%d'),
        'End Date': df.index.max().strftime('%Y-%m-%d'),
        'Days Coverage': (df.index.max() - df.index.min()).days,
        'Columns': ', '.join(df.columns),
    })

summary_df = pd.DataFrame(summary_data)

print(f"Download Results: {len(all_results)}/{len(securities_df)} securities\n")
print(summary_df.to_markdown(index=False))

# Check for missing securities
all_securities = set(list_securities())
downloaded_securities = set(all_results.keys())
missing_securities = all_securities - downloaded_securities

if missing_securities:
    print(f"\n\n⚠️  Missing Securities ({len(missing_securities)}):")
    for sec in sorted(missing_securities):
        print(f"  - {sec}")
else:
    print(f"\n\n✓ All {len(all_securities)} configured securities downloaded successfully")

## 6. Verify Registry Update

Confirm that cache files were created and registry was updated.

In [None]:
# Reload registry to see updated state
registry = DataRegistry(REGISTRY_PATH, DATA_DIR)
updated_datasets = registry.list_datasets()

print(f"\n{'='*80}")
print(f"UPDATED REGISTRY STATUS")
print(f"{'='*80}\n")

# Group by instrument
cdx_datasets = registry.list_datasets(instrument="cdx")
etf_datasets = registry.list_datasets(instrument="etf")
vix_datasets = registry.list_datasets(instrument="vix")

print(f"Registry Summary:")
print(f"  Total Datasets: {len(updated_datasets)}")
print(f"  CDX Datasets: {len(cdx_datasets)}")
print(f"  ETF Datasets: {len(etf_datasets)}")
print(f"  VIX Datasets: {len(vix_datasets)}")

# Check cache directory structure
cache_dir = DATA_DIR / "cache" / "bloomberg"
print(f"\nCache Directory: {cache_dir}")

if cache_dir.exists():
    cache_files = list(cache_dir.glob("*.parquet"))
    total_size_mb = sum(f.stat().st_size for f in cache_files) / (1024 * 1024)
    
    print(f"  Files: {len(cache_files)} Parquet files")
    print(f"  Total size: {total_size_mb:.2f} MB")
    
    if len(cache_files) <= 10:
        print(f"\n  File details:")
        for f in sorted(cache_files):
            size_mb = f.stat().st_size / (1024 * 1024)
            print(f"    {f.name} ({size_mb:.2f} MB)")
else:
    print("  ⚠️  Cache directory not found")

print(f"\n✓ Registry updated with {len(updated_datasets)} datasets")

## 7. Data Quality Checks

Quick validation of downloaded data.

In [None]:
print(f"\n{'='*80}")
print(f"DATA QUALITY CHECKS")
print(f"{'='*80}\n")

quality_issues = []

for security, df in all_results.items():
    issues = []
    
    # Check for missing values
    missing_pct = (df.isna().sum() / len(df) * 100)
    for col, pct in missing_pct.items():
        if pct > 0:
            issues.append(f"Missing values in '{col}': {pct:.1f}%")
    
    # Check date continuity (gaps > 7 days)
    date_gaps = df.index.to_series().diff()
    large_gaps = date_gaps[date_gaps > pd.Timedelta(days=7)]
    if len(large_gaps) > 0:
        issues.append(f"Date gaps > 7 days: {len(large_gaps)} gaps")
    
    # Check for duplicates
    if df.index.duplicated().any():
        n_dups = df.index.duplicated().sum()
        issues.append(f"Duplicate dates: {n_dups}")
    
    # Check data range
    if len(df) < 100:
        issues.append(f"Low row count: {len(df)} rows")
    
    # Check for monotonic index
    if not df.index.is_monotonic_increasing:
        issues.append("Non-monotonic date index")
    
    if issues:
        quality_issues.append({
            'Security': security,
            'Issues': issues,
        })

if quality_issues:
    print(f"⚠️  Quality Issues Found ({len(quality_issues)} securities):\n")
    for item in quality_issues:
        print(f"{item['Security']}:")
        for issue in item['Issues']:
            print(f"  - {issue}")
        print()
else:
    print("✓ No quality issues detected\n")
    print("All datasets passed quality checks:")
    print("  - Complete dates (no missing values)")
    print("  - No duplicates")
    print("  - Sufficient coverage (>100 rows)")
    print("  - Monotonic date index")
    print("  - No large date gaps (>7 days)")
    
print(f"\n✓ Quality validation complete for {len(all_results)} securities")

---

## Workflow Complete

Data download successful! The downloaded datasets are now ready for the next steps in the systematic research workflow.

### What Was Accomplished

✓ **Data Downloaded** — {len(all_results)} securities fetched from Bloomberg Terminal  
✓ **Cache Created** — Parquet files saved to `data/cache/bloomberg/`  
✓ **Registry Updated** — Metadata registered in `data/registry.json`  
✓ **Quality Validated** — Schema and continuity checks passed

### Data Flow

```
Downloaded Data (this notebook)
    ↓
market_data: dict[str, pd.DataFrame]
├─ "cdx": spread, security
├─ "vix": level
└─ "etf": spread, security
    ↓
Signal Computation (next notebook)
```

### Re-Running This Notebook

- **Cache enabled:** Second run will use cached data (faster)
- **Cache TTL:** {CACHE_TTL_DAYS} days (configurable in `config/__init__.py`)
- **Force refresh:** Set `use_cache=False` in download cells
- **Update registry:** Run cell 6 to refresh metadata

### Key Files Generated

```
data/
├── cache/
│   └── bloomberg/
│       ├── cdx_*.parquet
│       ├── vix_*.parquet
│       └── etf_*.parquet
└── registry.json (updated)
```

### Troubleshooting

**Bloomberg Terminal not available:**
- Ensure Bloomberg Terminal is running
- Check authentication and subscription
- Install xbbg: `uv pip install xbbg`

**Quality issues detected:**
- Review warning messages in cell 7
- Check Bloomberg Terminal data availability
- Verify ticker configuration in `bloomberg_securities.json`

**Cache issues:**
- Clear cache: `rm -rf data/cache/bloomberg/*`
- Disable cache: Set `use_cache=False`
- Check cache TTL in `config/__init__.py`