# 🎯 Stage Testing Framework Demo: Stages 1-4 (Competitive Intelligence)

**Demo Purpose**: Emulate the stage testing framework to execute Stages 1-4 independently:
- **Stage 1**: Discovery Engine (Competitor candidates)
- **Stage 2**: AI Curation (Validated competitors)  
- **Stage 3**: Meta Activity Ranking (Active competitors)
- **Stage 4**: Ad Ingestion (Raw ads with media)

**Key Features**:
- ✅ **Preserves existing ads_with_dates** (496 ads from successful runs)
- ✅ **Accumulation-friendly** - builds corpus progressively
- ✅ **Individual stage execution** with full traceability
- ✅ **No clean-persistent flag** to maintain existing data

## 📋 Setup & Imports

In [None]:
import os
import sys
import json
import time
from datetime import datetime
from pathlib import Path

# Add project root to path
project_root = Path.cwd()
if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root))

from src.pipeline.core.base import PipelineContext
from src.pipeline.stages.discovery import DiscoveryStage
from src.pipeline.stages.curation import CurationStage
from src.pipeline.stages.ranking import RankingStage
from src.pipeline.stages.ingestion import IngestionStage

print("✅ All imports successful!")
print(f"📁 Working directory: {project_root}")

## ⚙️ Configuration & Context

In [None]:
# Configuration
BRAND = "Warby Parker"
VERTICAL = "eyewear"
TEST_ID = f"notebook_demo_{datetime.now().strftime('%Y%m%d_%H%M%S')}"

# Create context (no clean-persistent - preserves existing data)
context = PipelineContext(BRAND, VERTICAL, TEST_ID, verbose=True)

print(f"🎯 Brand: {BRAND}")
print(f"🏷️ Vertical: {VERTICAL}")
print(f"🆔 Test ID: {TEST_ID}")
print(f"📊 Context created - preserving existing ads_with_dates")

## 🔍 Current Corpus Status

Check existing ads_with_dates to understand baseline before starting stages.

In [None]:
# Check current corpus status
try:
    from src.utils.bigquery_client import run_query
    
    status_query = """
    SELECT 
        COUNT(*) as total_ads,
        COUNT(CASE WHEN media_storage_path IS NOT NULL THEN 1 END) as ads_with_media,
        COUNT(DISTINCT brand) as unique_brands,
        STRING_AGG(DISTINCT brand ORDER BY brand) as brands
    FROM `bigquery-ai-kaggle-469620.ads_demo.ads_with_dates`
    """
    
    result = run_query(status_query)
    if not result.empty:
        row = result.iloc[0]
        print("📊 CURRENT CORPUS STATUS:")
        print(f"   Total ads: {row['total_ads']}")
        print(f"   Ads with media: {row['ads_with_media']}")
        print(f"   Unique brands: {row['unique_brands']}")
        print(f"   Brands: {row['brands']}")
        print(f"   Media coverage: {(row['ads_with_media']/row['total_ads']*100):.1f}%")
        
        initial_corpus_size = row['total_ads']
    else:
        print("📊 CURRENT CORPUS STATUS: No existing data")
        initial_corpus_size = 0
        
except Exception as e:
    print(f"⚠️ Could not check corpus status: {e}")
    initial_corpus_size = 0

## 🚀 Stage 1: Discovery Engine

**Purpose**: Find competitive candidates using algorithmic competitor discovery

**Output**: ~400-500 raw competitor candidates for AI validation

In [None]:
print("🔍 === STAGE 1: DISCOVERY ENGINE ===")
print(f"Target brand: {BRAND}")
print(f"Vertical: {VERTICAL}")

# Initialize Stage 1
discovery_stage = DiscoveryStage(context, dry_run=False, verbose=True)

try:
    start_time = time.time()
    
    # Execute discovery
    print("\n🚀 Executing competitor discovery...")
    discovery_results = discovery_stage.execute(None)  # No input needed for discovery
    
    duration = time.time() - start_time
    
    print(f"\n✅ Stage 1 Complete in {duration:.1f}s!")
    print(f"📊 Found {discovery_results.total_candidates} competitor candidates")
    print(f"🎯 Ready for Stage 2 (AI Curation)")
    
    # Store results for next stage
    stage1_results = discovery_results
    
except Exception as e:
    print(f"❌ Stage 1 Failed: {e}")
    stage1_results = None
    import traceback
    traceback.print_exc()

## 🤖 Stage 2: AI Competitor Curation

**Purpose**: Use AI consensus to validate and curate high-quality competitors

**Input**: ~400-500 raw candidates from Stage 1
**Output**: ~7 validated, high-quality competitors

In [None]:
print("🤖 === STAGE 2: AI COMPETITOR CURATION ===")

if stage1_results is None:
    print("❌ Cannot proceed - Stage 1 failed")
else:
    print(f"📥 Input: {stage1_results.total_candidates} candidates from Stage 1")
    
    # Initialize Stage 2
    curation_stage = CurationStage(context, dry_run=False, verbose=True)
    
    try:
        start_time = time.time()
        
        # Execute AI curation
        print("\n🧠 Executing AI competitor validation...")
        curation_results = curation_stage.execute(stage1_results)
        
        duration = time.time() - start_time
        
        print(f"\n✅ Stage 2 Complete in {duration:.1f}s!")
        print(f"📊 Curated {curation_results.curated_count} high-quality competitors")
        print(f"🎯 Ready for Stage 3 (Meta Activity Ranking)")
        
        # Store results for next stage
        stage2_results = curation_results
        
    except Exception as e:
        print(f"❌ Stage 2 Failed: {e}")
        stage2_results = None
        import traceback
        traceback.print_exc()

## 📊 Stage 3: Meta Ad Activity Ranking

**Purpose**: Rank competitors by their Meta advertising activity and budget

**Input**: ~7 validated competitors from Stage 2
**Output**: ~4 Meta-active competitors ranked by advertising volume

In [None]:
print("📊 === STAGE 3: META AD ACTIVITY RANKING ===")

if stage2_results is None:
    print("❌ Cannot proceed - Stage 2 failed")
else:
    print(f"📥 Input: {stage2_results.curated_count} curated competitors from Stage 2")
    
    # Initialize Stage 3
    ranking_stage = RankingStage(context, dry_run=False, verbose=True)
    
    try:
        start_time = time.time()
        
        # Execute Meta activity ranking
        print("\n📈 Executing Meta advertising activity analysis...")
        ranking_results = ranking_stage.execute(stage2_results)
        
        duration = time.time() - start_time
        
        print(f"\n✅ Stage 3 Complete in {duration:.1f}s!")
        print(f"📊 Ranked {ranking_results.ranked_count} Meta-active competitors")
        print(f"🎯 Ready for Stage 4 (Ad Ingestion)")
        
        # Store results for next stage
        stage3_results = ranking_results
        
    except Exception as e:
        print(f"❌ Stage 3 Failed: {e}")
        stage3_results = None
        import traceback
        traceback.print_exc()

## 📦 Stage 4: Ad Ingestion

**Purpose**: Fetch and store actual ads with media from Meta Ad Library

**Input**: ~4 Meta-active competitors from Stage 3
**Output**: Raw ads stored in BigQuery with media files in GCS

In [None]:
print("📦 === STAGE 4: AD INGESTION ===")

if stage3_results is None:
    print("❌ Cannot proceed - Stage 3 failed")
else:
    print(f"📥 Input: {stage3_results.ranked_count} Meta-active competitors from Stage 3")
    
    # Initialize Stage 4
    ingestion_stage = IngestionStage(context, dry_run=False, verbose=True)
    
    try:
        start_time = time.time()
        
        # Execute ad ingestion
        print("\n📡 Executing Meta Ad Library ingestion...")
        ingestion_results = ingestion_stage.execute(stage3_results)
        
        duration = time.time() - start_time
        
        print(f"\n✅ Stage 4 Complete in {duration:.1f}s!")
        print(f"📊 Ingested {ingestion_results.total_ads} raw ads")
        print(f"💾 Stored in BigQuery table: {ingestion_results.ads_table_id}")
        print(f"🎯 Stages 1-4 Complete - Ready for Stage 5 (Strategic Labeling)")
        
        # Store results
        stage4_results = ingestion_results
        
    except Exception as e:
        print(f"❌ Stage 4 Failed: {e}")
        stage4_results = None
        import traceback
        traceback.print_exc()

## 📈 Final Status & Next Steps

In [None]:
print("📈 === FINAL STAGE TESTING RESULTS ===")
print()

# Summary of all stages
print("🎯 STAGE COMPLETION SUMMARY:")
print(f"   Stage 1 (Discovery): {'✅ Complete' if stage1_results else '❌ Failed'}")
if stage1_results:
    print(f"     └── {stage1_results.total_candidates} competitor candidates found")

print(f"   Stage 2 (Curation): {'✅ Complete' if stage2_results else '❌ Failed'}")
if stage2_results:
    print(f"     └── {stage2_results.curated_count} competitors validated")

print(f"   Stage 3 (Ranking): {'✅ Complete' if stage3_results else '❌ Failed'}")
if stage3_results:
    print(f"     └── {stage3_results.ranked_count} Meta-active competitors ranked")

print(f"   Stage 4 (Ingestion): {'✅ Complete' if stage4_results else '❌ Failed'}")
if stage4_results:
    print(f"     └── {stage4_results.total_ads} ads ingested")

print()

# Check final corpus status if Stage 4 succeeded
if stage4_results:
    try:
        final_result = run_query(status_query)
        if not final_result.empty:
            row = final_result.iloc[0]
            final_corpus_size = row['total_ads']
            new_ads = final_corpus_size - initial_corpus_size
            
            print("📊 FINAL CORPUS STATUS:")
            print(f"   Total ads: {final_corpus_size} (+{new_ads} new)")
            print(f"   Ads with media: {row['ads_with_media']}")
            print(f"   Unique brands: {row['unique_brands']}")
            print(f"   Media coverage: {(row['ads_with_media']/row['total_ads']*100):.1f}%")
            print()
            
    except Exception as e:
        print(f"⚠️ Could not check final status: {e}")

# Next steps
print("🚀 NEXT STEPS:")
if stage4_results:
    print("   ✅ Stages 1-4 completed successfully")
    print("   🎯 Ready to proceed to Stage 5 (Strategic Labeling)")
    print("   📊 Corpus successfully expanded with new competitive intelligence")
    print("   💾 All data preserved and accumulated properly")
else:
    print("   ⚠️ Some stages failed - check error messages above")
    print("   🔧 Debug individual stages and retry")

print(f"\n🎉 Stage Testing Framework Demo Complete!")