In [None]:
"""
================================================================================
Phase 01 - Data Ingestion: Underlying Asset Daily Aggregates
================================================================================

PIPELINE POSITION:
    Phase 01 (Data Ingestion) → Component B: Underlying Asset Data
    Part of the UOA Research Pipeline for institutional options flow analysis.

PURPOSE:
    Downloads daily stock aggregates (OHLCV) from Massive S3 and converts to
    Parquet format. This data provides the underlying price context needed for
    options trade analysis in subsequent pipeline phases.

DATA SOURCE:
    Massive S3: s3://flatfiles/us_stocks_sip/day_aggs_v1/{YYYY}/{MM}/{YYYY-MM-DD}.csv.gz

INPUT:
    - Gzipped CSV files from Massive S3 (one per trading day)
    - Date range: START_DATE to END_DATE (configurable)

OUTPUT:
    - Directory: ./US_STOCKS_DAY_AGGS/
    - Format: {YYYY-MM-DD}.parquet (Snappy compression)
    - One file per trading day, all tickers combined

OUTPUT SCHEMA:
    ticker        : str   - Exchange symbol (e.g., "AAPL")
    volume        : int   - Trading volume for the day
    open          : float - Open price 
    close         : float - Close price (UNADJUSTED)
    high          : float - High price
    low           : float - Low price
    window_start  : int   - Unix nanosecond timestamp (market open)
    transactions  : int   - Number of transactions

NOTES:
    - Close prices are UNADJUSTED (no split/dividend adjustments)
    - Skips weekends; returns "no file" for market holidays
    - Idempotent: skips dates with existing .parquet files
    - Massive returns 403 for missing files (treated as holiday/unavailable)

================================================================================
"""

import os
import pandas as pd
from datetime import datetime, timedelta
from concurrent.futures import ProcessPoolExecutor, as_completed
import boto3
from botocore.config import Config
from tqdm import tqdm
import tempfile

# ==================== CONFIGURATION ====================

ACCESS_KEY = ""
SECRET_KEY = ""

BUCKET_NAME = "flatfiles"
S3_ENDPOINT = "https://files.massive.com"

START_DATE = "2020-01-01"
END_DATE   = "2025-12-31"

OUTPUT_DIR = "./US_STOCKS_DAY_AGGS"
os.makedirs(OUTPUT_DIR, exist_ok=True)

MAX_WORKERS = 10

# ======================================================


def get_s3_client():
    """Create a Massive.com-compatible S3 client."""
    return boto3.client(
        "s3",
        endpoint_url=S3_ENDPOINT,
        region_name="us-east-1",
        aws_access_key_id=ACCESS_KEY,
        aws_secret_access_key=SECRET_KEY,
        config=Config(
            signature_version="s3v4",
            retries={"max_attempts": 3},
            s3={"addressing_style": "path"}
        ),
    )


def daterange(start_date, end_date):
    """Yield business days between start and end date."""
    start = datetime.strptime(start_date, "%Y-%m-%d")
    end = datetime.strptime(end_date, "%Y-%m-%d")
    for n in range((end - start).days + 1):
        dt = start + timedelta(days=n)
        if dt.weekday() < 5:  # Monday–Friday
            yield dt.strftime("%Y-%m-%d")


def download_day(date: str):
    filename = f"{date}.csv.gz"
    year = date[:4]
    month = date[5:7]
    s3_key = f"us_stocks_sip/day_aggs_v1/{year}/{month}/{filename}"

    temp_path = os.path.join(tempfile.gettempdir(), filename)

    # Early skip: if the daily file already exists, don't download
    outpath = os.path.join(OUTPUT_DIR, f"{date}.parquet")
    if os.path.exists(outpath):
        return f"✓ {date}: already processed"

    try:
        s3 = get_s3_client()
        s3.download_file(BUCKET_NAME, s3_key, temp_path)

        # Read the CSV with actual column names
        df = pd.read_csv(
            temp_path,
            compression="gzip",
            low_memory=False
        )
        
        if df.empty:
            return f"− {date}: no data"

        # Save the full DataFrame (no need for underlying extraction as this is stock data)
        df.to_parquet(outpath, compression="snappy")

        size_mb = os.path.getsize(outpath) / 1e6
        return f"✓ {date}: {len(df):,} rows ({size_mb:.2f} MB)"

    except Exception as e:
        msg = str(e)
        # Normal behavior: Massive returns 403 for "file not found"
        if "Forbidden" in msg or "404" in msg or "NoSuchKey" in msg:
            return f"− {date}: no file (holiday or unavailable)"
        return f"✗ {date}: {msg[:200]}"

    finally:
        if os.path.exists(temp_path):
            try:
                os.remove(temp_path)
            except:
                pass


# ==================== MAIN ====================

if __name__ == "__main__":
    dates = list(daterange(START_DATE, END_DATE))

    print(f"Downloading US stocks day aggregates")
    print(f"Date range : {START_DATE} → {END_DATE}")
    print(f"Days       : {len(dates)} business days")
    print(f"Workers    : {MAX_WORKERS}")
    print(f"Output     : {OUTPUT_DIR}\n")

    # Process dates sequentially for easier debugging
    for date in tqdm(dates, desc="Progress"):
        result = download_day(date)
        print(result)

    print("\nSummary:")
    print("=" * 60)
    files = [f for f in os.listdir(OUTPUT_DIR) if f.endswith(".parquet")]
    total_gb = sum(os.path.getsize(os.path.join(OUTPUT_DIR, f)) for f in files) / 1e9
    print(f"  {len(files)} files, {total_gb:.2f} GB")
    print("=" * 60)
    print("Done!")