### Joel Markapudi. 
- 2-Nov Start
- Model dev work, experiments, brainstorming on design, etc.
- Will work here first, move onto clean py files later.

### Why so much cost tracking and token-wise API tracking analysis? Costs.

- console itself runs inside an AWS-managed web app -> browser fetches a pre-signed HTTPS URL generated by the console service -> in-region access or intra-AWS traffic, which is often free. but.
- personal downloads also route through CloudFront or their internal edge acceleration, which may absorb small transfer fees.
- Batch ETL pulling hundreds of GB per day from S3 to local, Repeated egress from multiple regions - Awareness.


### Potential Structure for S3.
```
├── DATA_MERGE_ASSETS/                 # existing structure
│   ├── FINRAG_FACT_SENTENCES/
│   └── FINRAG_FACT_METRICS/
│
└── ML_EMBED_ASSETS/                        
    ├── EMBED_META_FACT/
    │   └── finrag_fact_sentences_final.parquet
    │
    └── EMBED_VECTORS/
        ├── cohere_v3_768d/
        │   ├── finrag_embeddings_cohere_v3.parquet
        │   ├── metadata.json
        │   └── validation_report.json
        │
        └── titan_v2_1024d/            # Future
            └── ...
```


### Tests 1 - 2, check for boto3.client and then check for model access through AWS org account credentials. Works!

In [1]:
# Zero-cost test (no API call)
import boto3

try:
    bedrock = boto3.client(
        service_name='bedrock-runtime',
        region_name='us-east-1'
    )
    print("✓ Bedrock client created successfully")
    print(f"  Region: {bedrock.meta.region_name}")
except Exception as e:
    print(f"✗ Failed: {e}")

✓ Bedrock client created successfully
  Region: us-east-1


In [3]:
# Cell 1: Setup
import sys
from pathlib import Path

# Add loaders to path
sys.path.append(str(Path.cwd().parent / 'loaders'))

from ml_config_loader import MLConfig

# Initialize config (loads AWS credentials automatically)
config = MLConfig()

print("✓ Config loaded")
print(f"  Bucket: {config.bucket}")
print(f"  Region: {config.region}")
print(f"  Model: {config.bedrock_model_id}")



# Cell 2: Test Bedrock API
import json

# Get Bedrock client (uses config credentials)
bedrock = config.get_bedrock_client()

# Test embedding with v4
body = json.dumps({
    "texts": ["Revenue increased significantly."],
    "input_type": config.bedrock_input_type,
    "embedding_types": ["float"],
    "output_dimension": config.bedrock_dimensions
})

response = bedrock.invoke_model(
    body=body,
    modelId=config.bedrock_model_id,
    accept='*/*',
    contentType='application/json'
)

result = json.loads(response['body'].read())
embeddings = result['embeddings']['float']

print(f"✓ Bedrock API works!")
print(f"  Model: {config.bedrock_model_id}")
print(f"  Dimensions: {len(embeddings[0])}")
print(f"  First 5 values: {embeddings[0][:5]}")
# print(f"  Cost: ~$0.0000005")



[DEBUG] ✓ AWS credentials loaded from aws_credentials.env
✓ Config loaded
  Bucket: sentence-data-ingestion
  Region: us-east-1
  Model: cohere.embed-v4:0
✓ Bedrock API works!
  Model: cohere.embed-v4:0
  Dimensions: 1024
  First 5 values: [0.048339844, 0.040039062, -0.020874023, -0.012573242, -0.044921875]


### Prep work: 01: Load S3 fact sentences, modify and create new columns, save back to S3 - finrag_fact_sentences_meta_embeds.parquet



### Analysis and patterns.
- Apple 2016 ITEM_1:
  first: 0000320193_10-K_2016_section_1_0
  last:  0000320193_10-K_2016_section_1_99
- Pattern: `{CIK}_{filing}_{year}_section_{section_ID}_{sequence}`
- We'll create the shifts of plus one and minus one for the sentence ID to perform a concept of previous and next sentence ID. But this is a rough scheme or an idea. Later we may not actually use this thoroughly because we cannot really depend on this particular element. element if the clustered key or unique key from various sources is not following the exact same pattern.
- Local file downloaded at: ModelPipeline\finrag_ml_tg1\data_cache\fact_sentences\finrag_fact_sentences.parquet

- "Revenue increased 15% to $274.5 billion." Average across millions of English sentences: 1 word ≈ 1.33 tokens
  ``` 
  - Word count: 6 words
  - Token count (actual): 9 tokens
    - ['Revenue', 'increased', '15', '%', 'to', '$', '274', '.', '5', 'billion', '.']
  - Approximation: 6 × 1.33 = 8 tokens
  ```


In [2]:
import polars as pl
from pathlib import Path
import re

# Load from local cache
cache_file = Path.cwd().parent / 'data_cache' / 'fact_sentences' / 'finrag_fact_sentences.parquet'
print(f"Loading: {cache_file}")

df = pl.read_parquet(cache_file)
print(f"✓ Loaded: {len(df):,} rows\n")

# Define expected pattern
# Format: {CIK}_{filing}_{year}_section_{section_ID}_{sequence}
# Example: 0000320193_10-K_2016_section_1_42
pattern = r'^(\d{10})_(10-[KQ]|8-K)_(\d{4})_section_(\w+)_(\d+)$'

# Validate pattern
df_validated = df.with_columns([
    # Check if matches full pattern
    pl.col('sentenceID').str.contains(pattern).alias('_matches_full_pattern'),
    
    # Extract components
    pl.col('sentenceID').str.extract(pattern, 1).alias('_cik_part'),
    pl.col('sentenceID').str.extract(pattern, 2).alias('_filing_part'),
    pl.col('sentenceID').str.extract(pattern, 3).alias('_year_part'),
    pl.col('sentenceID').str.extract(pattern, 4).alias('_section_part'),
    pl.col('sentenceID').str.extract(pattern, 5).alias('_sequence_part'),
    
    # Check numeric sequence specifically
    pl.col('sentenceID').str.split('_').list.last()
        .cast(pl.Int32, strict=False)
        .is_not_null()
        .alias('_has_numeric_suffix')
])

# Calculate statistics
total_rows = len(df_validated)
full_pattern_valid = df_validated.filter(pl.col('_matches_full_pattern')).shape[0]
numeric_suffix_valid = df_validated.filter(pl.col('_has_numeric_suffix')).shape[0]

print("="*70)
print("SENTENCEID PATTERN VALIDATION")
print("="*70)
print(f"\nTotal rows: {total_rows:,}")
print(f"\n[Full Pattern: CIK_filing_year_section_sectionID_sequence]")
print(f"  Valid: {full_pattern_valid:,} ({full_pattern_valid/total_rows*100:.2f}%)")
print(f"  Invalid: {total_rows - full_pattern_valid:,} ({(1-full_pattern_valid/total_rows)*100:.2f}%)")

print(f"\n[Numeric Suffix Only: ends with number]")
print(f"  Valid: {numeric_suffix_valid:,} ({numeric_suffix_valid/total_rows*100:.2f}%)")
print(f"  Invalid: {total_rows - numeric_suffix_valid:,} ({(1-numeric_suffix_valid/total_rows)*100:.2f}%)")

# Show invalid examples
invalid_full = df_validated.filter(~pl.col('_matches_full_pattern'))
if len(invalid_full) > 0:
    print(f"\nExamples of INVALID sentenceIDs (full pattern):")
    for row in invalid_full.select('sentenceID').head(10).iter_rows():
        print(f"  - {row[0]}")

# Show valid examples with parsed components
valid_samples = df_validated.filter(pl.col('_matches_full_pattern')).head(5)
print(f"\n✓ Examples of VALID sentenceIDs (parsed):")
for row in valid_samples.select(['sentenceID', '_cik_part', '_year_part', '_section_part', '_sequence_part']).iter_rows():
    print(f"  {row[0]}")
    print(f"    → CIK: {row[1]}, Year: {row[2]}, Section: {row[3]}, Seq: {row[4]}")

# Component-level validation
print(f"\n[Component Validation]")
print(f"  CIK extracted: {df_validated.filter(pl.col('_cik_part').is_not_null()).shape[0]:,} rows")
print(f"  Filing extracted: {df_validated.filter(pl.col('_filing_part').is_not_null()).shape[0]:,} rows")
print(f"  Year extracted: {df_validated.filter(pl.col('_year_part').is_not_null()).shape[0]:,} rows")
print(f"  Section extracted: {df_validated.filter(pl.col('_section_part').is_not_null()).shape[0]:,} rows")
print(f"  Sequence extracted: {df_validated.filter(pl.col('_sequence_part').is_not_null()).shape[0]:,} rows")

# Final recommendation
print(f"\n{'='*70}")
if full_pattern_valid / total_rows >= 0.95:
    print("✅ RECOMMENDATION: Pattern highly reliable (≥95%)")
    print("   → Safe to use shift() for prev/next_sentenceID")
    print("   → Sequence numbers are trustworthy for ordering")
elif numeric_suffix_valid / total_rows >= 0.95:
    print("RECOMMENDATION: Full pattern has issues, but numeric suffix reliable")
    print("   → Can use shift() but validate sorting carefully")
else:
    print("❌ RECOMMENDATION: Pattern unreliable (<95%)")
    print("   → Skip prev/next_sentenceID columns")
    print("   → Use runtime neighbor lookups instead")
print("="*70)


# Show sentenceIDs from most recent year
latest_year = df['report_year'].max()
sample = df.filter(pl.col('report_year') == latest_year).select('sentenceID').head(10)

print(f"Latest year: {latest_year}\nSample sentenceIDs:")
for sid in sample['sentenceID']:
    print(f"  {sid}")

Loading: d:\JoelDesktop folds_24\NEU FALL2025\MLops IE7374 Project\FinSights\ModelPipeline\finrag_ml_tg1\data_cache\fact_sentences\finrag_fact_sentences.parquet
✓ Loaded: 469,252 rows

SENTENCEID PATTERN VALIDATION

Total rows: 469,252

[Full Pattern: CIK_filing_year_section_sectionID_sequence]
  Valid: 469,252 (100.00%)
  Invalid: 0 (0.00%)

[Numeric Suffix Only: ends with number]
  Valid: 469,252 (100.00%)
  Invalid: 0 (0.00%)

✓ Examples of VALID sentenceIDs (parsed):
  0000034088_10-K_2006_section_10_1
    → CIK: 0000034088, Year: 2006, Section: 10, Seq: 1
  0000034088_10-K_2006_section_11_2
    → CIK: 0000034088, Year: 2006, Section: 11, Seq: 2
  0000034088_10-K_2006_section_12_10
    → CIK: 0000034088, Year: 2006, Section: 12, Seq: 10
  0000034088_10-K_2006_section_12_11
    → CIK: 0000034088, Year: 2006, Section: 12, Seq: 11
  0000034088_10-K_2006_section_12_13
    → CIK: 0000034088, Year: 2006, Section: 12, Seq: 13

[Component Validation]
  CIK extracted: 469,252 rows
  Filing 

In [None]:
# ============================================================================
# DATA PREPARATION PIPELINE
# Creates Stage 2 meta table and initializes empty vectors table
# ============================================================================
# ============================================================================
# PARAMETERS - Execution Control
# INIT_*: Creates on S3 (one-time)
# FORCE_REINIT_*: Recreates (destructive)
# CACHE_*: Downloads locally
# FORCE_RECACHE_*: Re-downloads
# ============================================================================

# S3 Table Initialization (One-time setup)
INIT_META_TABLE = False           # Create Stage 2 meta table (24→35 cols)
INIT_VECTORS_TABLE = False        # Create empty vectors table

FORCE_REINIT_META = False        # Delete and recreate meta table
FORCE_REINIT_VECTORS = False     # Delete and recreate vectors table

# Local Caching (Development optimization)
CACHE_STAGE1_LOCALLY = True      # Download Stage 1 fact table
FORCE_RECACHE_STAGE1 = True     # Re-download even if cached

CACHE_STAGE2_LOCALLY = True       # Download Stage 2 (meta_embeds) table
FORCE_RECACHE_STAGE2 = True      # Re-download Stage 2 even if cached

CACHE_EMBEDS_LOCALLY = True       # Download embeddings fact table(s)
FORCE_RECACHE_EMBEDS = True      # Re-download embeddings even if cached

EMBEDS_PROVIDER = "cohere_1024d"            # e.g. "cohere_1024d" (None → try sensible default or all)


# ============================================================================
# IMPORTS
# ============================================================================

import sys
from pathlib import Path
sys.path.append(str(Path.cwd().parent / 'loaders'))

from ml_config_loader import MLConfig
import polars as pl
from botocore.exceptions import ClientError

# ============================================================================
# HELPER FUNCTIONS
# ============================================================================

def check_s3_exists(s3_client, bucket, s3_key):
    """Check if S3 object exists (no download, no cost)"""
    try:
        s3_client.head_object(Bucket=bucket, Key=s3_key)
        return True
    except ClientError as e:
        if e.response['Error']['Code'] == '404':
            return False
        else:
            raise

def _egress_cost_usd(bytes_count: int) -> float:
    # Rough public egress reference: $0.09 per GB
    gb = bytes_count / (1024 * 1024 * 1024)
    return gb * 0.09

def _download_parquet_from_s3_to_local(config, s3_key: str, local_path: Path) -> pl.DataFrame:
    """Read a parquet from S3 via Polars and cache locally (zstd)."""
    s3_uri = f"s3://{config.bucket}/{s3_key}"
    s3_client = config.get_s3_client()
    head = s3_client.head_object(Bucket=config.bucket, Key=s3_key)
    size_mb = head['ContentLength'] / 1024 / 1024
    cost = _egress_cost_usd(head['ContentLength'])

    print(f"  Source: {s3_uri}")
    print(f"  Size: {size_mb:.1f} MB")
    df = pl.read_parquet(s3_uri, storage_options=config.get_storage_options())
    print(f"  Downloaded: {len(df):,} rows (Cost: ${cost:.4f} egress)")

    local_path.parent.mkdir(parents=True, exist_ok=True)
    df.write_parquet(local_path, compression='zstd')
    print(f"  ✓ Cached to: {local_path}")
    return df


# ========================================================================================================================
# ========================================================================================================================


def cache_stage1_table(config, force_recache=False):
    """
    Download and cache Stage 1 fact table (original 24 columns)
    Returns: DataFrame loaded from cache or S3
    """
    
    cache_file = Path.cwd().parent / 'data_cache' / 'stage1_facts' / 'finrag_fact_sentences.parquet'
    
    # Check local cache first
    if not force_recache and cache_file.exists():
        print(f"\n[Stage 1 Table - Using Cache]")
        print(f"  Location: {cache_file.name}")
        df = pl.read_parquet(cache_file)
        print(f"  Loaded: {len(df):,} rows × {len(df.columns)} columns (Cost: $0.00)")
        return df
    
    # Download from S3
    print(f"\n[Stage 1 Table - Downloading from S3]")
    s3_uri = f"s3://{config.bucket}/{config.input_sentences_path}"
    
    s3_client = config.get_s3_client()
    response = s3_client.head_object(Bucket=config.bucket, Key=config.input_sentences_path)
    file_size_mb = response['ContentLength'] / 1024 / 1024
    egress_cost = file_size_mb / 1024 * 0.09
    
    print(f"  Source: {s3_uri}")
    print(f"  Size: {file_size_mb:.1f} MB")
    
    df = pl.read_parquet(s3_uri, storage_options=config.get_storage_options())
    print(f"  Downloaded: {len(df):,} rows (Cost: ${egress_cost:.4f} egress)")
    
    # Cache for future use
    cache_file.parent.mkdir(parents=True, exist_ok=True)
    df.write_parquet(cache_file, compression='zstd')
    print(f"  ✓ Cached to: {cache_file}")
    
    return df



def cache_stage2_meta_table(config, force_recache=False):
    """
    Download and cache Stage-2 meta table (35 columns with ML metadata).
    Returns: DataFrame loaded from cache or S3.
    """
    # Use the final filename from S3 key for local cache filename
    meta_key = config.meta_embeds_path                          # e.g., "ML_EMBED_ASSETS/EMBED_META_FACT/finrag_fact_sentences_meta_embeds.parquet"
    meta_filename = Path(meta_key).name
    cache_file = Path.cwd().parent / 'data_cache' / 'meta_embeds' / meta_filename

    if not force_recache and cache_file.exists():
        print(f"\n[Stage 2 Meta Table - Using Cache]")
        print(f"  Location: {cache_file.name}")
        df = pl.read_parquet(cache_file)
        print(f"  Loaded: {len(df):,} rows × {len(df.columns)} columns (Cost: $0.00)")
        return df

    print(f"\n[Stage 2 Meta Table - Downloading from S3]")
    return _download_parquet_from_s3_to_local(config, meta_key, cache_file)




def _resolve_embed_providers(config, explicit: str | None):
    """
    Try to resolve provider list:
    - If explicit provided → [explicit]
    - Else, try config.providers if exposed
    - Else, fall back to default model key (single)
    """
    if explicit:
        return [explicit]

    # If MLConfig exposes a list of providers (preferred)
    if hasattr(config, "embedding_providers") and isinstance(config.embedding_providers, (list, tuple)) and config.embedding_providers:
        return list(config.embedding_providers)

    # Fallback: try default model key
    if hasattr(config, "bedrock_default_model_key") and config.bedrock_default_model_key:
        return [config.bedrock_default_model_key]

    # As a last resort, try a few common keys the YAML showed (only if present in config)
    candidates = [k for k in ("cohere_768d", "cohere_1024d", "titan_1024d") if hasattr(config, "embeddings_path") and config.embeddings_path(provider=k)]
    return candidates or []


def cache_embeddings_tables(config, providers=None, force_recache=False):
    """
    Download and cache embeddings parquet for one or many providers.
    Returns: dict {provider: DataFrame}
    """
    if providers is None:
        providers = _resolve_embed_providers(config, explicit=EMBEDS_PROVIDER)

    results = {}
    if not providers:
        print("\n[Embeddings Fact - No providers resolved] Skipping.")
        return results

    for prov in providers:
        # Expect MLConfig to resolve full S3 key for the provider's embeddings fact parquet
        # e.g., "ML_EMBED_ASSETS/EMBED_VECTORS/cohere_1024d/finrag_embeddings_cohere_1024d.parquet"
        try:
            emb_key = config.embeddings_path(provider=prov)
        except TypeError:
            # If embeddings_path does not accept provider kw, try attribute
            emb_key = getattr(config, "embeddings_path", None)
        if not emb_key:
            print(f"\n[Embeddings Fact - {prov}] Cannot resolve S3 key via config.embeddings_path(provider=...). Skipping.")
            continue

        emb_filename = Path(emb_key).name
        cache_file = Path.cwd().parent / 'data_cache' / 'embeddings' / prov / emb_filename

        if not force_recache and cache_file.exists():
            print(f"\n[Embeddings Fact - Using Cache] Provider: {prov}")
            print(f"  Location: {emb_filename}")
            df = pl.read_parquet(cache_file)
            print(f"  Loaded: {len(df):,} rows × {len(df.columns)} columns (Cost: $0.00)")
            results[prov] = df
            continue

        print(f"\n[Embeddings Fact - Downloading from S3] Provider: {prov}")
        df = _download_parquet_from_s3_to_local(config, emb_key, cache_file)
        results[prov] = df

    return results

# ========================================================================================================================
# ========================================================================================================================




def add_ml_columns(df):
    """Transform Stage 1 (24 cols) → Stage 2 (35 cols) with ML metadata"""
    
    # Extract sequence for sorting
    df = df.with_columns([
        pl.col('sentenceID').str.split('_').list.slice(0, -1).list.join('_').alias('_doc_prefix'),
        pl.col('sentenceID').str.split('_').list.last().cast(pl.Int32).alias('_sequence_num')
    ])
    
    df = df.sort(['_doc_prefix', '_sequence_num'])
    
    # Neighbor pointers
    df = df.with_columns([
        pl.col('sentenceID').shift(1).over('_doc_prefix').alias('prev_sentenceID'),
        pl.col('sentenceID').shift(-1).over('_doc_prefix').alias('next_sentenceID')
    ])
    
    # Content metadata
    df = df.with_columns([
        pl.col('sentence').str.len_chars().alias('sentence_char_length'),
        ((pl.col('sentence').str.count_matches(' ') + 1) * 1.33).cast(pl.Int16).alias('sentence_token_count')
    ])
    
    # Section metadata
    section_counts = df.group_by(['docID', 'section_ID']).agg(pl.len().alias('section_sentence_count'))
    df = df.join(section_counts, on=['docID', 'section_ID'], how='left')
    
    # ML metadata (NULL initially)
    df = df.with_columns([
        pl.lit(None).cast(pl.Utf8).alias('embedding_id'),
        pl.lit(None).cast(pl.Utf8).alias('embedding_model'),
        pl.lit(None).cast(pl.Int16).alias('embedding_dims'),
        pl.lit(None).cast(pl.Datetime).alias('embedding_date'),
        pl.lit(None).cast(pl.Utf8).alias('embedding_ref')
    ])
    
    # Cleanup
    df = df.drop(['_doc_prefix', '_sequence_num'])
    
    print(f"  ✓ Transformed: 24 cols → {len(df.columns)} cols")
    
    return df

# ========================================================================================================================
# ========================================================================================================================


def initialize_meta_table(config, df_stage1, force_reinit=False):
    """Create Stage 2 meta table on S3 (35 columns)"""
    
    s3_client = config.get_s3_client()
    meta_s3_key = config.meta_embeds_path
    meta_exists = check_s3_exists(s3_client, config.bucket, meta_s3_key)
    
    # Handle existing table
    if meta_exists and not force_reinit:
        print(f"\n[Stage 2 Meta Table - Already Exists]")
        print(f"  Location: s3://{config.bucket}/{meta_s3_key}")
        print(f"  Set FORCE_REINIT_META=True to recreate")
        return
    
    elif meta_exists and force_reinit:
        print(f"\n[Stage 2 Meta Table - Recreating]")
        s3_client.delete_object(Bucket=config.bucket, Key=meta_s3_key)
        print(f"  ✓ Deleted existing")
    
    # Create new table
    print(f"\n[Stage 2 Meta Table - Creating]")
    df_stage2 = add_ml_columns(df_stage1)
    
    meta_uri = f"s3://{config.bucket}/{meta_s3_key}"
    print(f"  Saving to: {meta_uri}")
    df_stage2.write_parquet(meta_uri, storage_options=config.get_storage_options(), compression='zstd')
    print(f"  ✓ Created: {len(df_stage2):,} rows × {len(df_stage2.columns)} cols (Cost: $0.00 ingress)")


def initialize_vectors_table(config, force_reinit=False):
    """Create empty vectors table on S3"""
    
    s3_client = config.get_s3_client()
    vectors_s3_key = config.embeddings_path(provider=None)
    vectors_exists = check_s3_exists(s3_client, config.bucket, vectors_s3_key)
    
    # Handle existing table
    if vectors_exists and not force_reinit:
        print(f"\n[Vectors Table - Already Exists]")
        print(f"  Location: s3://{config.bucket}/{vectors_s3_key}")
        print(f"  Provider: {config.bedrock_default_model_key} ({config.bedrock_dimensions}d)")
        print(f"  Set FORCE_REINIT_VECTORS=True to recreate")
        return
    
    elif vectors_exists and force_reinit:
        print(f"\n[Vectors Table - Recreating]")
        s3_client.delete_object(Bucket=config.bucket, Key=vectors_s3_key)
        print(f"  ✓ Deleted existing")
    
    # Create empty table
    print(f"\n[Vectors Table - Creating]")
    empty_vectors = pl.DataFrame({
        'sentenceID': pl.Series([], dtype=pl.Utf8),
        'embedding_id': pl.Series([], dtype=pl.Utf8),
        'embedding': pl.Series([], dtype=pl.List(pl.Float32))
    })
    
    vectors_uri = f"s3://{config.bucket}/{vectors_s3_key}"
    empty_vectors.write_parquet(vectors_uri, storage_options=config.get_storage_options(), compression='zstd')
    print(f"  ✓ Created: {vectors_uri}")
    print(f"  Schema: [sentenceID, embedding_id, embedding (Float32)]")

# ========================================================================================================================
# ========================================================================================================================



# ============================================================================
# MAIN EXECUTION
# ============================================================================

config = MLConfig()

print("="*70)
print("DATA PREPARATION PIPELINE")
print("="*70)
print(f"Model: {config.bedrock_model_id} ({config.bedrock_dimensions}d)")



# ============================================================================
# TASK 1: Cache Stage 1 Table Locally
# ============================================================================

df_stage1 = None

if CACHE_STAGE1_LOCALLY:
    df_stage1 = cache_stage1_table(config, force_recache=FORCE_RECACHE_STAGE1)

# ============================================================================
# TASK 1: Cache Stage 2 Meta Table Locally
# ============================================================================

df_stage2_meta = None
if CACHE_STAGE2_LOCALLY:
    df_stage2_meta = cache_stage2_meta_table(config, force_recache=FORCE_RECACHE_STAGE2)

# ============================================================================
# TASK 1: Cache Embeddings Fact Table(s) Locally
# ============================================================================

embeds_cached = {}
if CACHE_EMBEDS_LOCALLY:
    # If EMBEDS_PROVIDER is None, resolver will try providers list or default model
    embeds_cached = cache_embeddings_tables(config, providers=None, force_recache=FORCE_RECACHE_EMBEDS)

# ============================================================================
# TASK 2: Initialize Meta Table (Stage 2) on S3
# ============================================================================

if INIT_META_TABLE:
    if df_stage1 is None:
        print(f"\n  Loading Stage 1 for transformation...")
        df_stage1 = cache_stage1_table(config, force_recache=False)
    initialize_meta_table(config, df_stage1, force_reinit=FORCE_REINIT_META)

# ============================================================================
# TASK 3: Initialize Empty Vectors Table on S3
# ============================================================================

if INIT_VECTORS_TABLE:
    initialize_vectors_table(config, force_reinit=FORCE_REINIT_VECTORS)

# ============================================================================
# SUMMARY
# ============================================================================

print(f"\n{'='*70}")
print(f"✓ DATA PREPARATION COMPLETE")
print(f"{'='*70}")
print(f"\nTables initialized / cached:")
if CACHE_STAGE1_LOCALLY:
    print(f"  ✓ Stage 1 cached locally")
if CACHE_STAGE2_LOCALLY:
    print(f"  ✓ Stage 2 meta cached locally")
if CACHE_EMBEDS_LOCALLY:
    if embeds_cached:
        prows = ", ".join([f"{k}: {len(v)} rows" for k, v in embeds_cached.items()])
        print(f"  ✓ Embeddings cached locally → {{ {prows} }}")
    else:
        print(f"  ✓ Embeddings cache attempted (no providers resolved)")
if INIT_META_TABLE:
    print(f"  ✓ Stage 2 meta table ready (S3)")
if INIT_VECTORS_TABLE:
    print(f"  ✓ Vectors table ready (S3)")
print("="*70)





[DEBUG] ✓ AWS credentials loaded from aws_credentials.env
DATA PREPARATION PIPELINE
Model: cohere.embed-v4:0 (1024d)

[Stage 1 Table - Downloading from S3]
  Source: s3://sentence-data-ingestion/DATA_MERGE_ASSETS/FINRAG_FACT_SENTENCES/finrag_fact_sentences.parquet
  Size: 23.1 MB
  Downloaded: 469,252 rows (Cost: $0.0020 egress)
  ✓ Cached to: d:\JoelDesktop folds_24\NEU FALL2025\MLops IE7374 Project\FinSights\ModelPipeline\finrag_ml_tg1\data_cache\stage1_facts\finrag_fact_sentences.parquet

[Stage 2 Meta Table - Downloading from S3]
  Source: s3://sentence-data-ingestion/ML_EMBED_ASSETS/EMBED_META_FACT/finrag_fact_sentences_meta_embeds.parquet
  Size: 33.6 MB
  Downloaded: 469,252 rows (Cost: $0.0030 egress)
  ✓ Cached to: d:\JoelDesktop folds_24\NEU FALL2025\MLops IE7374 Project\FinSights\ModelPipeline\finrag_ml_tg1\data_cache\meta_embeds\finrag_fact_sentences_meta_embeds.parquet

[Embeddings Fact - Downloading from S3] Provider: cohere_1024d
  Source: s3://sentence-data-ingestion/ML

### Embedding Generation, Storage and Push, & Embedding metadata update in main fact_sentences_meta_embed table

- At filtering step:
   - `filtered_sentence_ids = df_filtered['sentenceID'].to_list()`
- List is your anchor throughout the pipeline
   - Knowing which sentences to embed. Knowing which meta rows to update. Progress tracking. Cost estimation.
- Merge.
  - `merged = pl.concat([existing_df, new_df])` 
  - `merged = merged.unique(subset=[key], keep='last') `

- p50 (median): 33 tokens
- p75: 48 tokens
- p95: 77 tokens
- p99: 117 tokens
- max: 2,281 tokens (outlier - likely table)
- 99.7% of sentences: <500 tokens

- Bedrock Cohere v4 limits:
   - Max tokens per text: 512 tokens
   - Max texts per batch: 96
   - Max total request tokens: ~50K (undocumented, but conservative estimate)

### Right on aws cohere page: "Smaller chunks improve retrieval and cost" 
- Long chunks: More tokens = higher embedding cost
- Short chunks: Fewer tokens = lower cost
-   1,000 chunks × 500 tokens each = 500K tokens → $0.05
-   1,000 chunks × 50 tokens each = 50K tokens → $0.005



```

# TOP: Constants & Path Resolution
config = MLConfig()
VECTORS_URI = ...
META_URI = ...

# STEP 1: Load meta table (helper)
df_meta = load_meta_table_with_cache(config)

# STEP 2: Filter (returns anchor)
df_filtered, filtered_ids = filter_sentences(df_meta, config)

# STEP 3: Generate embeddings
df_vectors, embedding_id, skipped_ids = generate_embeddings_batch(...)

# STEP 4: Merge vectors (simplified - no existence checks)
merged_vectors = merge_vectors_table(df_vectors, VECTORS_URI, storage_options)

# STEP 5: Update meta (use anchor)
updated_meta = update_meta_table(df_meta, filtered_ids, skipped_ids, model_info)

# STEP 6: Save both
Save vectors
Save meta


--------------------------------------------------------------------------------------------------------------------

- EMBEDS:
- APPLE; 2016? -> // → 47,755 tokens processed → Cost: $0.0048

- Actual file on S3: 21MB (compressed with ZSTD)
- EGRESS: code overstated the cost by 10x // 21MB / 1024 × $0.09 = $0.0018 (not $0.0246)

data_cache/stage1_facts/ → Stage-1 parquet
data_cache/meta_embeds/ → Stage-2 meta (35 cols)
data_cache/embeddings/<provider>/ → vectors per provider (e.g., cohere_1024d/)


```

In [None]:
# ============================================================================
# EMBEDDING GENERATION PIPELINE
# Generates embeddings for filtered sentences and merges with existing data
# ============================================================================

"""
COHERE SPECIFIC:
    Batch fills when EITHER condition met:
    1. 96 texts reached, OR
    2. Total tokens reached
"""

# ============================================================================

# PIPELINE CONSTANTS
MAX_TOKENS_PER_SENTENCE = 1000     # Filter extreme outliers

# MAX_TEXTS_PER_BATCH = 96           # API limit
# MAX_TOKENS_PER_BATCH = 128000       # PREV used 15000

MAX_TEXTS_PER_BATCH = 96           
MAX_TOKENS_PER_BATCH = 128000        # Cohere v4's maximum capacity (128K tokens)


import sys
from pathlib import Path
sys.path.append(str(Path.cwd().parent / 'loaders'))

from loaders.ml_config_loader import MLConfig
import polars as pl
import json
from datetime import datetime
from botocore.exceptions import ClientError
import time

# ============================================================================
# HELPER FUNCTIONS
# ============================================================================

def load_meta_table_with_cache(config):
    """Load meta table (cache first, S3 fallback, error if missing)"""
    
    cache_file = Path.cwd().parent / 'data_cache' / 'meta_embeds' / 'finrag_fact_sentences_meta_embeds.parquet'
    
    if cache_file.exists():
        print(f"✓ Using cached meta table")
        df = pl.read_parquet(cache_file)
        print(f"  Loaded: {len(df):,} rows × {len(df.columns)} columns (Cost: $0.00)")
        return df
    
    # Try S3
    meta_uri = f"s3://{config.bucket}/{config.meta_embeds_path}"
    s3_client = config.get_s3_client()

    try:
        response = s3_client.head_object(Bucket=config.bucket, Key=config.meta_embeds_path)
        file_size_bytes = response['ContentLength']
        file_size_mb = file_size_bytes / 1024 / 1024
        egress_cost = file_size_mb / 1024 * 0.09

        print(f"⬇️  Loading meta table from S3")
        df = pl.read_parquet(meta_uri, storage_options=config.get_storage_options())
        
        print(f"  Downloaded: {len(df):,} rows × {len(df.columns)} columns")
        print(f"  Cost: ${egress_cost:.4f} egress")
        
        # Cache for next time
        cache_file.parent.mkdir(parents=True, exist_ok=True)
        df.write_parquet(cache_file)
        print(f"✓ Cached for future use")
        
        return df
        
    except:
        raise FileNotFoundError(
            f"Meta table not found!\n"
            f"  Cache: {cache_file}\n"
            f"  S3: {meta_uri}\n"
            f"  → Run '01_dataprep.ipynb' first to create meta table"
        )


def filter_sentences(df_meta, config):
    """Filter sentences with multi-value support for CIK and year"""
    
    mode = config.embedding_mode
    
    if mode == "full":
        print(f"\n[Filtering: FULL MODE]")
        df_filtered = df_meta
    
    elif mode == "parameterized":
        cik_filter = config.filter_cik
        year_filter = config.filter_year
        
        # Validate filters are not null
        if cik_filter is None or year_filter is None:
            raise ValueError(
                "PARAMETERIZED mode requires filters!\n"
                "  Set 'cik_int' and 'year' in ml_config.yaml\n"
                "  Example: cik_int: [320193] or cik_int: [320193, 789019]\n"
                "  Example: year: [2022] or year: [2020, 2021, 2022]"
            )
        
        # Ensure filters are lists
        cik_list = cik_filter if isinstance(cik_filter, list) else [cik_filter]
        year_list = year_filter if isinstance(year_filter, list) else [year_filter]
        
        print(f"\n[Filtering: PARAMETERIZED MODE]")
        print(f"  CIKs: {cik_list}")
        print(f"  Years: {year_list}")
        
        # Apply filters
        df_filtered = df_meta.filter(
            pl.col('cik_int').is_in(cik_list) &
            pl.col('report_year').is_in(year_list)
        )
        
        if len(df_filtered) == 0:
            raise ValueError(f"No sentences found for cik={cik_list}, year={year_list}")
        
        # Show selected companies
        companies = df_filtered.select(['cik_int', 'name']).unique().sort('cik_int')
        print(f"  Companies selected:")
        for row in companies.iter_rows(named=True):
            print(f"    - {row['name']} (CIK: {row['cik_int']})")
        
        print(f"  Total sentences: {len(df_filtered):,}")
    
    else:
        raise ValueError(f"Unknown mode: {mode}")
    
    filtered_ids = df_filtered['sentenceID'].to_list()
    
    return df_filtered, filtered_ids





def generate_embeddings_batch(df_sentences, config):
    """
    Generate embeddings with intelligent batching and outlier handling
    
    Returns:
      - df_vectors: DataFrame with [sentenceID, embedding_id, embedding]
      - embedding_id: Unique ID for this embedding run
      - skipped_ids: List of sentenceIDs that were filtered out
    """
    
    bedrock = config.get_bedrock_client()
    model_id = config.bedrock_model_id
    input_type = config.bedrock_input_type
    dimensions = config.bedrock_dimensions
    
    print(f"\n[Generating Embeddings]")
    print(f"  Model: {model_id}")
    print(f"  Dimensions: {dimensions}")
    print(f"  Input sentences: {len(df_sentences):,}")
    
    # Pre-filter: Remove extreme outliers
    df_valid = df_sentences.filter(
        pl.col('sentence_token_count') <= MAX_TOKENS_PER_SENTENCE
    )
    
    df_skipped = df_sentences.filter(
        pl.col('sentence_token_count') > MAX_TOKENS_PER_SENTENCE
    )
    
    if len(df_skipped) > 0:
        skipped_pct = len(df_skipped) / len(df_sentences) * 100
        print(f"  ⚠️  Skipped {len(df_skipped):,} outliers (>{MAX_TOKENS_PER_SENTENCE} tokens, {skipped_pct:.2f}%)")
    
    print(f"  Embedding: {len(df_valid):,} sentences")
    
    # Prepare for batching
    all_embeddings = []
    all_sentence_ids = []
    sentences_data = df_valid.select(['sentenceID', 'sentence', 'sentence_token_count']).to_dicts()
    
    current_batch = []
    current_batch_tokens = 0
    batches_processed = 0
    total_tokens = 0
    # timing trackers
    t0 = time.perf_counter()        
    batch_times = []
    batch_log_interval = 40                

    # Token-aware batching
    for idx, row in enumerate(sentences_data):
        sent_id = row['sentenceID']
        sent_text = row['sentence']
        sent_tokens = row['sentence_token_count']
        
        # Check if batch is full
        would_exceed_tokens = (current_batch_tokens + sent_tokens) > MAX_TOKENS_PER_BATCH
        would_exceed_size = len(current_batch) >= MAX_TEXTS_PER_BATCH
        
        if would_exceed_tokens or would_exceed_size:
            # Process current batch
            if current_batch:
                b_start = time.perf_counter()
                batch_embeddings = _call_bedrock_api(
                    bedrock, model_id, current_batch, input_type, dimensions
                )
                batch_time = time.perf_counter() - b_start   
                batch_times.append(batch_time)               # <- track batch time


                all_embeddings.extend(batch_embeddings)
                batches_processed += 1
                total_tokens += current_batch_tokens
                
                # Progress every n batches
                if batches_processed % batch_log_interval == 0:
                    avg = sum(batch_times) / len(batch_times)
                    eta = avg * ((len(df_valid) / len(current_batch)) - batches_processed)
                    print(f"    Batch Number: {batches_processed} | Progress: {len(all_embeddings):,}/{len(df_valid):,} "
                          f"({len(all_embeddings)/len(df_valid)*100:.1f}%) "
                          f"| last {batch_time:.2f}s | avg/batch {avg:.2f}s | ETA {eta:.0f}s")
            
            # Reset batch
            current_batch = []
            current_batch_tokens = 0
        
        # Add to batch
        current_batch.append({'id': sent_id, 'text': sent_text})
        current_batch_tokens += sent_tokens
        all_sentence_ids.append(sent_id)
    
    # Final batch
    if current_batch:
        b_start = time.perf_counter()              # <-- added
        batch_embeddings = _call_bedrock_api(
            bedrock, model_id, current_batch, input_type, dimensions
        )
        batch_time = time.perf_counter() - b_start # <-- added
        batch_times.append(batch_time)             # <-- added

        all_embeddings.extend(batch_embeddings)
        batches_processed += 1
        total_tokens += current_batch_tokens

    elapsed = time.perf_counter() - t0             # <-- added
    print(f"  ✓ Completed: {len(all_embeddings):,} embeddings in {batches_processed} batches "
          f"| time {elapsed:.1f}s | avg/batch {elapsed/max(1,batches_processed):.2f}s")  # <-- added

    embedding_id = f"bedrock_cohere_v4_{dimensions}d_{datetime.now().strftime('%Y%m%d_%H%M')}"
    df_vectors = pl.DataFrame({
        'sentenceID': all_sentence_ids,
        'embedding_id': [embedding_id] * len(all_embeddings),
        'embedding': pl.Series(all_embeddings, dtype=pl.List(pl.Float32))
    })

    cost = total_tokens / 1000 * config.get_cost_per_1k()
    print(f"  Tokens: {total_tokens:,} | Cost: ${cost:.4f}")

    skipped_ids = df_skipped['sentenceID'].to_list() if len(df_skipped) > 0 else []
    return df_vectors, embedding_id, skipped_ids



def _call_bedrock_api(bedrock, model_id, batch, input_type, dimensions):
    """Single Bedrock API call (internal helper)"""
    
    texts = [item['text'] for item in batch]
    
    body = json.dumps({
        "texts": texts,
        "input_type": input_type,
        "embedding_types": ["float"],
        "output_dimension": dimensions,
        "max_tokens": 128000,      #: Per-input token budget
        "truncate": "RIGHT"        #: Safety for edge cases
    })
    
    response = bedrock.invoke_model(
        body=body,
        modelId=model_id,
        accept='*/*',
        contentType='application/json'
    )
    
    result = json.loads(response['body'].read())
    return result['embeddings']['float']


def merge_vectors_table(new_vectors, vectors_uri, storage_options):
    """Merge new vectors with existing (table exists from data prep)"""
    
    # Load existing (guaranteed to exist)
    existing_vectors = pl.read_parquet(vectors_uri, storage_options=storage_options)
    
    print(f"\n[Merging Vectors]")
    print(f"  Existing: {len(existing_vectors):,} rows")
    print(f"  New: {len(new_vectors):,} rows")
    
    # Ensure column order matches before concat
    new_vectors = new_vectors.select(existing_vectors.columns)

    # ETL pattern: concat + unique
    merged = pl.concat([existing_vectors, new_vectors])
    merged = merged.unique(subset=['sentenceID'], keep='last')
    
    duplicates = len(existing_vectors) + len(new_vectors) - len(merged)
    print(f"  ✓ Merged: {len(merged):,} rows (replaced {duplicates:,})")
    
    return merged


def update_meta_table(df_meta_full, filtered_ids, skipped_ids, model_info, vectors_s3_uri):
    """Update metadata columns for embedded sentences (ETL pattern)"""
    
    # Successfully embedded = filtered - skipped
    embedded_ids = [sid for sid in filtered_ids if sid not in skipped_ids]
    
    print(f"\n[Updating Meta Table]")
    print(f"  Total rows: {len(df_meta_full):,}")
    print(f"  Successfully embedded: {len(embedded_ids):,}")
    if skipped_ids:
        print(f"  Skipped (outliers): {len(skipped_ids):,}")
    
    # Create updated rows for embedded sentences
    df_updated_rows = df_meta_full.filter(
        pl.col('sentenceID').is_in(embedded_ids)
    ).with_columns([
        pl.lit(model_info['embedding_id']).alias('embedding_id'),
        pl.lit(model_info['model']).alias('embedding_model'),
        ## pl.lit(model_info['dimensions']).alias('embedding_dims'),
        pl.lit(model_info['dimensions']).cast(pl.Int16).alias('embedding_dims'),  # CAST to Int16
        pl.lit(model_info['timestamp']).alias('embedding_date'),
        pl.lit(vectors_s3_uri).alias('embedding_ref')  
        ## // pl.lit(None).cast(pl.Utf8).alias('embedding_ref')
    ])
    
    # Rows not embedded (keep unchanged)
    df_unchanged_rows = df_meta_full.filter(
        ~pl.col('sentenceID').is_in(embedded_ids)
    )
    
    # ETL pattern: concat + unique
    merged_meta = pl.concat([df_unchanged_rows, df_updated_rows])
    merged_meta = merged_meta.unique(subset=['sentenceID'], keep='last')
    
    # Verify row count
    assert len(merged_meta) == len(df_meta_full), "Row count mismatch after meta update!"
    
    total_embedded = merged_meta.filter(pl.col('embedding_id').is_not_null()).shape[0]
    print(f"  ✓ Updated: {total_embedded:,} total rows now have embeddings")
    
    return merged_meta


# ============================================================================
# INITIALIZATION & PROVIDER RESOLUTION
# ============================================================================

config = MLConfig()

# Resolve paths once at start
VECTORS_S3_KEY = config.embeddings_path(provider=None)
META_S3_KEY = config.meta_embeds_path
VECTORS_URI = f"s3://{config.bucket}/{VECTORS_S3_KEY}"
META_URI = f"s3://{config.bucket}/{META_S3_KEY}"

print("="*70)
print("EMBEDDING GENERATION PIPELINE")
print("="*70)
print(f"Mode: {config.embedding_mode}")
print(f"Model: {config.bedrock_model_id} ({config.bedrock_dimensions}d)")
print(f"\n[Resolved Paths]")
print(f"  Vectors: {VECTORS_S3_KEY}")
print(f"  Meta: {META_S3_KEY}")

# ============================================================================
# STEP 1: LOAD META TABLE
# ============================================================================

df_meta = load_meta_table_with_cache(config)

# ============================================================================
# STEP 2: FILTER SENTENCES
# ============================================================================

df_filtered, filtered_ids = filter_sentences(df_meta, config)

# ============================================================================
# STEP 3: GENERATE EMBEDDINGS
# ============================================================================

df_new_vectors, embedding_id, skipped_ids = generate_embeddings_batch(df_filtered, config)

# ============================================================================
# STEP 4: MERGE VECTORS TABLE
# ============================================================================

df_merged_vectors = merge_vectors_table(
    new_vectors=df_new_vectors,
    vectors_uri=VECTORS_URI,
    storage_options=config.get_storage_options()
)

# ============================================================================
# STEP 5: UPDATE META TABLE
# ============================================================================

model_info = {
    'embedding_id': embedding_id,
    'model': config.bedrock_model_id,
    'dimensions': config.bedrock_dimensions,
    'timestamp': datetime.now()
}

df_updated_meta = update_meta_table(
    df_meta_full=df_meta,
    filtered_ids=filtered_ids,
    skipped_ids=skipped_ids,
    model_info=model_info, 
    vectors_s3_uri=VECTORS_URI
)

# ============================================================================
# STEP 6: SAVE TO S3
# S3 is source of truth, local cache for fast iteration
# ============================================================================

# ============================================================================
# STEP 6: SAVE TO S3
# S3 is source of truth, local cache for fast iteration
# ============================================================================

print(f"\n[Saving Results]")

# Derive filenames/folders from resolved S3 keys
vectors_filename = Path(VECTORS_S3_KEY).name                      # e.g., finrag_embeddings_cohere_1024d.parquet
vectors_provider = Path(VECTORS_S3_KEY).parent.name               # e.g., cohere_1024d
meta_filename    = Path(META_S3_KEY).name                         # e.g., finrag_fact_sentences_meta_embeds.parquet

# Canonical local cache locations (align with prep/cache cells)
meta_cache_dir    = Path.cwd().parent / 'data_cache' / 'meta_embeds'
vectors_cache_dir = Path.cwd().parent / 'data_cache' / 'embeddings' / vectors_provider

meta_cache_dir.mkdir(parents=True, exist_ok=True)
vectors_cache_dir.mkdir(parents=True, exist_ok=True)

meta_cache_path    = meta_cache_dir / meta_filename
vectors_cache_path = vectors_cache_dir / vectors_filename

# Save vectors to S3
print(f"  Vectors → S3: {VECTORS_URI}")
df_merged_vectors.write_parquet(
    VECTORS_URI,
    storage_options=config.get_storage_options(),
    compression='zstd'
)
print(f"  ✓ S3 saved: {len(df_merged_vectors):,} rows (Cost: $0.00 - ingress)")

# Save vectors to local cache (provider-scoped folder)
print(f"  Vectors → Local: {vectors_cache_path}")
df_merged_vectors.write_parquet(vectors_cache_path, compression='zstd')
print(f"  ✓ Cached locally")

# Save meta to S3
print(f"  Meta → S3: {META_URI}")
df_updated_meta.write_parquet(
    META_URI,
    storage_options=config.get_storage_options(),
    compression='zstd'
)
print(f"  ✓ S3 saved: {len(df_updated_meta):,} rows (Cost: $0.00 - ingress)")

# Save meta to local cache
print(f"  Meta → Local: {meta_cache_path}")
df_updated_meta.write_parquet(meta_cache_path, compression='zstd')
print(f"  ✓ Cached locally")

print(f"\n  Local cache locations:")
print(f"    - {vectors_cache_path}")
print(f"    - {meta_cache_path}")


# ============================================================================
# SUMMARY
# ============================================================================

print(f"\n{'='*70}")
print(f"✓ EMBEDDING PIPELINE COMPLETE")
print(f"{'='*70}")
print(f"  Mode: {config.embedding_mode}")
print(f"  Sentences embedded: {len(df_new_vectors):,}")
if skipped_ids:
    print(f"  Sentences skipped: {len(skipped_ids):,}")
print(f"  Total vectors in storage: {len(df_merged_vectors):,}")
print(f"  Total meta rows with embeddings: {df_updated_meta.filter(pl.col('embedding_id').is_not_null()).shape[0]:,}")
print("="*70)


[DEBUG] ✓ AWS credentials loaded from aws_credentials.env
EMBEDDING GENERATION PIPELINE
Mode: parameterized
Model: cohere.embed-v4:0 (1024d)

[Resolved Paths]
  Vectors: ML_EMBED_ASSETS/EMBED_VECTORS/cohere_1024d/finrag_embeddings_cohere_1024d.parquet
  Meta: ML_EMBED_ASSETS/EMBED_META_FACT/finrag_fact_sentences_meta_embeds.parquet
✓ Using cached meta table
  Loaded: 469,252 rows × 34 columns (Cost: $0.00)

[Filtering: PARAMETERIZED MODE]
  CIKs: [1276520, 1318605, 1326801, 1341439, 1403161, 1652044]
  Years: [2015, 2016, 2017, 2018, 2019, 2020]
  Companies selected:
    - GENWORTH FINANCIAL INC (CIK: 1276520)
    - Tesla, Inc. (CIK: 1318605)
    - Meta Platforms, Inc. (CIK: 1326801)
    - Facebook Inc (CIK: 1326801)
    - ORACLE CORP (CIK: 1341439)
    - VISA INC. (CIK: 1403161)
    - Alphabet Inc. (CIK: 1652044)
  Total sentences: 69,512

[Generating Embeddings]
  Model: cohere.embed-v4:0
  Dimensions: 1024
  Input sentences: 69,512
  ⚠️  Skipped 5 outliers (>1000 tokens, 0.01%)
  Em

### Order Preserve Proof: Important.
```

# Building batch (order maintained)
current_batch = []  # List preserves insertion order
for idx, row in enumerate(sentences_data):
    current_batch.append({'id': sent_id, 'text': sent_text})
    all_sentence_ids.append(sent_id)  # Same order as batch

# API call
texts = [item['text'] for item in current_batch]  # Preserves order
batch_embeddings = _call_bedrock_api(...)  # Returns in same order

# Collection
all_embeddings.extend(batch_embeddings)  # Appends in order


df_vectors = pl.DataFrame({
    'sentenceID': all_sentence_ids,  # [id_0, id_1, id_2, ...]
    'embedding': all_embeddings       # [emb_0, emb_1, emb_2, ...]
})

# Row 0: sentenceID[0] → embedding[0]
# Row 1: sentenceID[1] → embedding[1]
# Perfect 1-to-1 mapping

```

### True cost analysis:
```
1. Polars: Serialize 469,252 rows → 281MB Parquet file (in RAM)
2. PyArrow: Write to temporary buffer
3. Boto3: PUT request to S3
4. S3: Receives 281MB upload
5. S3: Atomically replaces old object
6. Old version: Deleted (or moved to versioning if enabled)

Network transfer: 281MB upload (ingress = $0.00)
S3 operation: PutObject (free)
Storage: 281MB × $0.023/GB/month = $0.006/month
```