## 1. Setup and Imports

In [1]:
import pandas as pd
import numpy as np
import os
import sys
import asyncio
import requests
import time
import logging
from datetime import datetime, timezone, date
from collections import defaultdict
import warnings
warnings.filterwarnings('ignore')

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Add project paths for imports (notebooks folder)
sys.path.append('/workspaces/mtgecorec')
sys.path.append('/workspaces/mtgecorec/core')

# Import the simple core database driver
from core.data_engine.cosmos_driver import get_mongo_client, get_collection

print("Libraries imported successfully!")
print(f"Pandas version: {pd.__version__}")
print(f"NumPy version: {np.__version__}")
print(f"Using core cosmos_driver system")
print(f"Notebook location: /workspaces/mtgecorec/notebooks/")

Libraries imported successfully!
Pandas version: 2.3.3
NumPy version: 2.3.5
Using core cosmos_driver system
Notebook location: /workspaces/mtgecorec/notebooks/


## 2. Database Connection

In [13]:
# Set up database connection with retry logic
def connect_to_database_with_retry(max_retries=3):
    """Connect to database with retry logic for connection issues"""
    for attempt in range(max_retries):
        try:
            print(f"üîÑ Connection attempt {attempt + 1}/{max_retries}...")
            
            # Get fresh MongoDB client
            client = get_mongo_client()
            database_name = "mtgecorec" 
            
            # Test the connection by getting collection info
            cards_collection = get_collection(client, database_name, "cards")
            pricing_collection = get_collection(client, database_name, "card_pricing_daily")
            
            # Test with a simple query
            cards_count = cards_collection.count_documents({}, limit=1)
            
            print(f"‚úÖ Connected to database successfully!")
            print(f"Database: {database_name}")
            print(f"Connection test: Found {cards_count:,}+ cards")
            
            return client, database_name, cards_collection, pricing_collection
            
        except Exception as e:
            print(f"‚ùå Connection attempt {attempt + 1} failed: {e}")
            if attempt < max_retries - 1:
                print(f"‚è≥ Waiting 2 seconds before retry...")
                time.sleep(2)
            else:
                print(f"üí• All connection attempts failed!")
                raise

# Connect to database
client, database_name, cards_collection, pricing_collection = connect_to_database_with_retry()
print(f"Ready to load card data...")

üîÑ Connection attempt 1/3...
‚úÖ Connected to database successfully!
Database: mtgecorec
Connection test: Found 1+ cards
Ready to load card data...


## 3. Get Data

### Get card data

In [17]:
# Load cards data efficiently - just get counts and sample for now
def load_cards_efficiently():
    """Load card data efficiently for testing without loading all 110K cards"""
    global client, cards_collection, pricing_collection
    
    try:
        print("Getting card database statistics...")
        
        # Get total count first
        total_cards = cards_collection.count_documents({})
        print(f"üìä Found {total_cards:,} total cards in database")
        
        # Get a sample of cards for structure analysis
        print("üì• Loading sample cards for analysis...")
        sample_cards = list(cards_collection.find({}).limit(100))
        print(f"‚úÖ Successfully loaded {len(sample_cards)} sample cards")
        
        # Create DataFrame from sample for structure analysis
        df_sample = pd.DataFrame(sample_cards)
        
        return df_sample, total_cards
        
    except Exception as e:
        print(f"‚ùå Error loading cards: {e}")
        print("üîÑ Attempting to reconnect...")
        
        # Try to reconnect
        client, _, cards_collection, pricing_collection = connect_to_database_with_retry()
        
        # Retry with sample
        total_cards = cards_collection.count_documents({})
        sample_cards = list(cards_collection.find({}).limit(100))
        df_sample = pd.DataFrame(sample_cards)
        print(f"‚úÖ Successfully loaded sample after reconnection")
        return df_sample, total_cards

# Load the cards efficiently
df_cards, total_card_count = load_cards_efficiently()

if len(df_cards) > 0:
    print(f"\nSample DataFrame shape: {df_cards.shape}")
    print(f"Total cards in database: {total_card_count:,}")
    print(f"Sample columns: {list(df_cards.columns)}")
    
    # Show sample data
    print("\n=== SAMPLE CARDS DATA ===")
    display_cols = ['id', 'name', 'type_line'] if all(col in df_cards.columns for col in ['id', 'name', 'type_line']) else df_cards.columns[:3]
    print(df_cards[display_cols].head(3))
else:
    print("‚ö†Ô∏è  No card data loaded")

Getting card database statistics...
üìä Found 110,031 total cards in database
üì• Loading sample cards for analysis...
‚úÖ Successfully loaded 100 sample cards

Sample DataFrame shape: (100, 80)
Total cards in database: 110,031
Sample columns: ['_id', 'object', 'id', 'oracle_id', 'multiverse_ids', 'mtgo_id', 'arena_id', 'tcgplayer_id', 'cardmarket_id', 'name', 'lang', 'released_at', 'uri', 'scryfall_uri', 'layout', 'highres_image', 'image_status', 'image_uris', 'mana_cost', 'cmc', 'type_line', 'oracle_text', 'colors', 'color_identity', 'keywords', 'produced_mana', 'legalities', 'games', 'reserved', 'game_changer', 'foil', 'nonfoil', 'finishes', 'oversized', 'promo', 'reprint', 'variation', 'set_id', 'set', 'set_name', 'set_type', 'set_uri', 'set_search_uri', 'scryfall_set_uri', 'rulings_uri', 'prints_search_uri', 'collector_number', 'digital', 'rarity', 'card_back_id', 'artist', 'artist_ids', 'illustration_id', 'border_color', 'frame', 'full_art', 'textless', 'booster', 'story_spotli

### Get pricing data

In [18]:
# Load pricing data  
print("Loading pricing data from database...")
pricing_collection = get_collection(client, database_name, "card_pricing_daily")
all_pricing = list(pricing_collection.find({}))
print(f"‚úÖ Loaded {len(all_pricing)} pricing records from 'card_pricing_daily' collection")

df_pricing = pd.DataFrame(all_pricing)
print(f"Pricing DataFrame shape: {df_pricing.shape}")
print(f"Pricing columns: {list(df_pricing.columns)}")

print("\n=== SAMPLE PRICING DATA ===")
print(df_pricing.head(3))

Loading pricing data from database...
‚úÖ Loaded 792 pricing records from 'card_pricing_daily' collection
Pricing DataFrame shape: (792, 14)
Pricing columns: ['_id', 'card_uuid', 'card_name', 'set_code', 'scryfall_id', 'price_usd', 'price_type', 'source', 'tcgplayer_id', 'cardmarket_id', 'date', 'timestamp', 'created_at', 'collected_at']

=== SAMPLE PRICING DATA ===
                        _id                 card_uuid  \
0  693c9a8390c74c6f72124891  68d04c8859fb4c414fdabc7e   
1  693c9a8390c74c6f72124892  68d04c8859fb4c414fdabc7e   
2  693c9a8390c74c6f72124893  68d04c9e59fb4c414fdabe5e   

                   card_name set_code                           scryfall_id  \
0              Jolly Gerbils      blb  0eab51d6-ba17-4a8c-8834-25db363f2b6b   
1              Jolly Gerbils      blb  0eab51d6-ba17-4a8c-8834-25db363f2b6b   
2  Beza, the Bounding Spring      blb  0fc98b72-d268-4ce5-93b4-57c812a24eff   

   price_usd price_type             source  tcgplayer_id  cardmarket_id  \
0       0.

## 4. Current Pricing Coverage Analysis

In [19]:
# Analyze current pricing coverage
print("=== PRICING COVERAGE ANALYSIS ===")
print(f"Total cards: {len(df_cards):,}")
print(f"Total pricing records: {len(df_pricing):,}")

# Check for pricing data linkage
if 'scryfall_id' in df_pricing.columns:
    unique_cards_with_pricing = df_pricing['scryfall_id'].nunique()
    print(f"Unique cards with pricing (by scryfall_id): {unique_cards_with_pricing:,}")
    
    # Calculate coverage
    cards_without_pricing = len(df_cards) - unique_cards_with_pricing
    coverage_pct = (unique_cards_with_pricing / len(df_cards)) * 100
    
    print(f"Cards WITHOUT pricing: {cards_without_pricing:,}")
    print(f"Coverage: {coverage_pct:.2f}%")
    
    # Pricing by date analysis
    if 'date' in df_pricing.columns:
        pricing_by_date = df_pricing.groupby('date').size().sort_index(ascending=False)
        print(f"\nüìÖ Pricing records by date:")
        for date, count in pricing_by_date.head().items():
            print(f"   {date}: {count:,} records")
    
    # Price type analysis  
    if 'price_type' in df_pricing.columns:
        price_type_counts = df_pricing['price_type'].value_counts()
        print(f"\nüí∞ Price type distribution:")
        for price_type, count in price_type_counts.items():
            print(f"   {price_type}: {count:,} records")
else:
    print("‚ö†Ô∏è  No 'scryfall_id' column found in pricing data")
    print("Available pricing columns:", list(df_pricing.columns))

# Sample card analysis
if len(df_pricing) > 0:
    print(f"\nüìã Sample pricing records structure:")
    sample_pricing = df_pricing.iloc[0]
    for key, value in sample_pricing.items():
        print(f"   {key}: {value} ({type(value).__name__})")

=== PRICING COVERAGE ANALYSIS ===
Total cards: 100
Total pricing records: 792
Unique cards with pricing (by scryfall_id): 397
Cards WITHOUT pricing: -297
Coverage: 397.00%

üìÖ Pricing records by date:
   2025-12-13: 25 records
   2025-12-12: 767 records

üí∞ Price type distribution:
   usd: 405 records
   usd_foil: 387 records

üìã Sample pricing records structure:
   _id: 693c9a8390c74c6f72124891 (ObjectId)
   card_uuid: 68d04c8859fb4c414fdabc7e (str)
   card_name: Jolly Gerbils (str)
   set_code: blb (str)
   scryfall_id: 0eab51d6-ba17-4a8c-8834-25db363f2b6b (str)
   price_usd: 0.03 (float64)
   price_type: usd (str)
   source: scryfall_usd (str)
   tcgplayer_id: 559994 (int64)
   cardmarket_id: 779260.0 (float64)
   date: 2025-12-12 (str)
   timestamp: 2025-12-12T22:43:07.906849+00:00 (str)
   created_at: 2025-12-12 22:43:07.906000 (Timestamp)
   collected_at: NaT (NaTType)


## 5. Bulk Pricing Collection System

In [22]:
class ScryfallBulkCollector:
    """
    Efficient bulk pricing collection using Scryfall's /cards/collection endpoint
    Handles up to 75 cards per request with comprehensive price data extraction
    """
    
    def __init__(self, rate_limit_delay=0.1):
        self.base_url = "https://api.scryfall.com/cards/collection"
        self.batch_size = 75  # Scryfall's limit
        self.rate_limit_delay = rate_limit_delay
        self.session = requests.Session()
        self.session.headers.update({
            'Content-Type': 'application/json',
            'User-Agent': 'MTGEcoRec/2.0'
        })
        
    def create_identifiers(self, cards_list):
        """Convert MongoDB card documents to Scryfall identifiers"""
        return [{'id': card['id']} for card in cards_list]
    
    def batch_identifiers(self, identifiers):
        """Split identifiers into batches of 75"""
        for i in range(0, len(identifiers), self.batch_size):
            yield identifiers[i:i + self.batch_size]
    
    def fetch_batch_pricing(self, identifiers_batch):
        """Fetch pricing for a batch of card identifiers"""
        try:
            payload = {'identifiers': identifiers_batch}
            response = self.session.post(self.base_url, json=payload, timeout=30)
            
            if response.status_code == 200:
                return response.json()
            else:
                logger.error(f"API Error: {response.status_code} - {response.text}")
                return None
                
        except Exception as e:
            logger.error(f"Request failed: {e}")
            return None
    
    def extract_pricing_data(self, card_data, target_date=None):
        """Extract all available pricing information from Scryfall card data"""
        if target_date is None:
            target_date = date.today().isoformat()
            
        pricing_records = []
        card_id = card_data.get('id')
        card_name = card_data.get('name', 'Unknown')
        prices = card_data.get('prices', {})
        
        # All available price types
        price_mappings = {
            'usd': 'usd',
            'usd_foil': 'usd_foil', 
            'usd_etched': 'usd_etched',
            'eur': 'eur',
            'eur_foil': 'eur_foil',
            'tix': 'tix'  # MTGO tickets
        }
        
        for price_key, price_type in price_mappings.items():
            price_value = prices.get(price_key)
            
            if price_value and price_value != "":
                try:
                    price_float = float(price_value)
                    
                    record = {
                        'card_name': card_name,
                        'scryfall_id': card_id,
                        'date': target_date,
                        'price_type': price_type,
                        'price_value': price_float,
                        'currency': price_key.split('_')[0],  # usd, eur, tix
                        'finish': 'foil' if 'foil' in price_key else 'etched' if 'etched' in price_key else 'nonfoil',
                        'tcgplayer_id': card_data.get('tcgplayer_id'),
                        'cardmarket_id': card_data.get('cardmarket_id'),
                        'collected_at': datetime.now(),
                        'source': 'scryfall_bulk'
                    }
                    
                    pricing_records.append(record)
                    
                except (ValueError, TypeError):
                    continue
        
        return pricing_records
    
    def collect_pricing_for_cards(self, cards_list, target_date=None):
        """Main method to collect pricing for a list of cards"""
        if target_date is None:
            target_date = date.today().isoformat()
            
        identifiers = self.create_identifiers(cards_list)
        all_pricing_records = []
        
        logger.info(f"Starting bulk collection for {len(identifiers)} cards")
        
        for batch_num, batch in enumerate(self.batch_identifiers(identifiers), 1):
            logger.info(f"Processing batch {batch_num} ({len(batch)} cards)")
            
            # Fetch batch data
            batch_response = self.fetch_batch_pricing(batch)
            
            if batch_response and 'data' in batch_response:
                # Process each card in the batch
                for card_data in batch_response['data']:
                    pricing_records = self.extract_pricing_data(card_data, target_date)
                    all_pricing_records.extend(pricing_records)
            
            # Rate limiting
            time.sleep(self.rate_limit_delay)
        
        logger.info(f"Collection complete: {len(all_pricing_records)} pricing records")
        return all_pricing_records

# Initialize collector
collector = ScryfallBulkCollector()
print("üöÄ Bulk collector initialized and ready!")

üöÄ Bulk collector initialized and ready!


## 4. Test with Sample Data

In [23]:
# Test with a small sample from BLB set
sample_cards = list(cards_collection.find({'set': 'blb'}).limit(10))
print(f"Testing with {len(sample_cards)} cards from BLB set")

# Collect pricing data
test_pricing = collector.collect_pricing_for_cards(sample_cards)

print(f"\nüìä Test Results:")
print(f"   Cards processed: {len(sample_cards)}")
print(f"   Pricing records: {len(test_pricing)}")
print(f"   Avg records per card: {len(test_pricing)/len(sample_cards):.1f}")

# Show sample records
if test_pricing:
    print(f"\nüìã Sample pricing record:")
    sample = test_pricing[0]
    for key, value in sample.items():
        if key == 'collected_at':
            print(f"   {key}: {value.isoformat()}")
        else:
            print(f"   {key}: {value}")

2025-12-12 23:52:18,509 - INFO - Starting bulk collection for 10 cards
2025-12-12 23:52:18,510 - INFO - Processing batch 1 (10 cards)


Testing with 10 cards from BLB set


2025-12-12 23:52:18,925 - INFO - Collection complete: 48 pricing records



üìä Test Results:
   Cards processed: 10
   Pricing records: 48
   Avg records per card: 4.8

üìã Sample pricing record:
   card_name: Forest
   scryfall_id: 0000419b-0bba-4488-8f7a-6194544ce91e
   date: 2025-12-12
   price_type: usd
   price_value: 0.22
   currency: usd
   finish: nonfoil
   tcgplayer_id: 558404
   cardmarket_id: 777725
   collected_at: 2025-12-12T23:52:18.824974
   source: scryfall_bulk


## 6. Larger Scale Test

Let's test with 150 cards (2 full batches) to validate performance at scale.

In [None]:
# Test with 150 cards (2 full batches of 75 each) to validate scale performance
print("üîç Starting larger scale test...")
print("=" * 50)

# Get 150 cards from different sets to test diversity
test_cards_mixed = []

# Get cards from multiple sets for diverse testing
sets_to_test = ['blb', 'dsk', 'otj', 'mkm', 'lci']  # Various recent sets
cards_per_set = 30  # 30 cards from each set = 150 total

for set_code in sets_to_test:
    set_cards = list(cards_collection.find({'set': set_code}).limit(cards_per_set))
    test_cards_mixed.extend(set_cards)
    print(f"üì¶ Collected {len(set_cards)} cards from {set_code.upper()} set")

print(f"\nüéØ Total test cards: {len(test_cards_mixed)}")
print(f"üìä Expected batches: {(len(test_cards_mixed) + 74) // 75}")  # Ceiling division

# Record start time for performance analysis
import time
start_time = time.time()

# Run the bulk collection
print(f"\nüöÄ Starting bulk collection test...")
large_test_pricing = collector.collect_pricing_for_cards(test_cards_mixed)

# Calculate performance metrics
end_time = time.time()
total_time = end_time - start_time
cards_per_second = len(test_cards_mixed) / total_time if total_time > 0 else 0
records_per_second = len(large_test_pricing) / total_time if total_time > 0 else 0

print(f"\nüìà LARGE SCALE TEST RESULTS")
print("=" * 50)
print(f"   Cards processed: {len(test_cards_mixed):,}")
print(f"   Pricing records: {len(large_test_pricing):,}")
print(f"   Avg records per card: {len(large_test_pricing)/len(test_cards_mixed):.1f}")
print(f"   Total processing time: {total_time:.1f} seconds")
print(f"   Cards per second: {cards_per_second:.1f}")
print(f"   Records per second: {records_per_second:.1f}")
print(f"   Efficiency vs individual API: ~{75:.0f}x faster")

# Analyze price type distribution
if large_test_pricing:
    price_types = {}
    currencies = {}
    
    for record in large_test_pricing:
        price_type = record.get('price_type', 'unknown')
        currency = record.get('currency', 'unknown')
        
        price_types[price_type] = price_types.get(price_type, 0) + 1
        currencies[currency] = currencies.get(currency, 0) + 1
    
    print(f"\nüí∞ Price Type Distribution:")
    for ptype, count in sorted(price_types.items(), key=lambda x: x[1], reverse=True):
        print(f"   {ptype}: {count:,} records ({count/len(large_test_pricing)*100:.1f}%)")
    
    print(f"\nüí± Currency Distribution:")
    for curr, count in sorted(currencies.items(), key=lambda x: x[1], reverse=True):
        print(f"   {curr}: {count:,} records ({count/len(large_test_pricing)*100:.1f}%)")
    
    # Show sample of collected data
    print(f"\nüìã Sample records from different price types:")
    seen_types = set()
    for record in large_test_pricing:
        ptype = record.get('price_type')
        if ptype not in seen_types and len(seen_types) < 3:
            seen_types.add(ptype)
            print(f"   {record['card_name']} - ${record['price_value']} ({ptype})")

print(f"\n‚úÖ Large scale test completed successfully!")

## 5. Production Pipeline Function

In [24]:
def run_daily_pricing_pipeline(target_date=None, batch_size=1000, skip_existing=True):
    """
    Production-ready daily pricing pipeline
    
    Args:
        target_date: Date string (YYYY-MM-DD) or None for today
        batch_size: Number of cards to process per database batch
        skip_existing: Skip cards that already have pricing for target_date
    
    Returns:
        dict: Summary statistics
    """
    if target_date is None:
        target_date = date.today().isoformat()
    
    logger.info(f"Starting daily pricing pipeline for {target_date}")
    
    # Get cards that need pricing
    if skip_existing:
        # Get cards without pricing for this date
        existing_card_ids = set(
            record['scryfall_id'] 
            for record in pricing_collection.find(
                {'date': target_date}, 
                {'scryfall_id': 1}
            )
        )
        
        cards_query = {'id': {'$nin': list(existing_card_ids)}}
        logger.info(f"Skipping {len(existing_card_ids)} cards with existing pricing")
    else:
        cards_query = {}
    
    total_cards_needed = cards_collection.count_documents(cards_query)
    logger.info(f"Cards needing pricing: {total_cards_needed:,}")
    
    if total_cards_needed == 0:
        logger.info("No cards need pricing collection")
        return {'status': 'complete', 'cards_processed': 0, 'records_created': 0}
    
    # Process in batches
    total_records = 0
    cards_processed = 0
    
    cards_cursor = cards_collection.find(cards_query).batch_size(batch_size)
    current_batch = []
    
    for card in cards_cursor:
        current_batch.append(card)
        
        if len(current_batch) >= batch_size:
            # Process batch
            batch_records = collector.collect_pricing_for_cards(current_batch, target_date)
            
            # Insert to database
            if batch_records:
                pricing_collection.insert_many(batch_records)
                total_records += len(batch_records)
            
            cards_processed += len(current_batch)
            logger.info(f"Progress: {cards_processed:,}/{total_cards_needed:,} cards ({cards_processed/total_cards_needed*100:.1f}%)")
            
            current_batch = []
    
    # Process final batch
    if current_batch:
        batch_records = collector.collect_pricing_for_cards(current_batch, target_date)
        if batch_records:
            pricing_collection.insert_many(batch_records)
            total_records += len(batch_records)
        cards_processed += len(current_batch)
    
    logger.info(f"Pipeline complete: {cards_processed:,} cards, {total_records:,} records")
    
    return {
        'status': 'complete',
        'date': target_date,
        'cards_processed': cards_processed,
        'records_created': total_records,
        'avg_records_per_card': total_records / cards_processed if cards_processed > 0 else 0
    }

print("‚úÖ Production pipeline function ready")

‚úÖ Production pipeline function ready


## üöÄ Production Pipeline Execution

Starting full production pipeline to collect pricing for all remaining cards.

In [25]:
# üöÄ PRODUCTION PIPELINE EXECUTION
# This will collect pricing for all cards that don't have today's pricing data

print("üî• STARTING PRODUCTION PRICING PIPELINE")
print("=" * 60)
print(f"üìÖ Target date: {date.today().isoformat()}")
print(f"üéØ Expected cards to process: ~109,634 (total - cards with existing pricing)")
print(f"‚ö° Batch size: 75 cards per API call (~1,467 total API calls)")
print(f"‚è±Ô∏è  Estimated time: ~15-20 minutes (with 0.1s rate limiting)")
print("=" * 60)

# Run the production pipeline
print("\\nüöÄ Starting pipeline execution...")
result = run_daily_pricing_pipeline(
    target_date='2025-12-13',  # Today's date
    batch_size=1000,           # Cards per database batch 
    skip_existing=True         # Skip cards that already have pricing for today
)

print("\\n" + "=" * 60)
print("üéâ PRODUCTION PIPELINE COMPLETED!")
print("=" * 60)
print(f"üìä Final Results:")
for key, value in result.items():
    if isinstance(value, (int, float)):
        if key in ['cards_processed', 'records_created']:
            print(f"   {key}: {value:,}")
        else:
            print(f"   {key}: {value}")
    else:
        print(f"   {key}: {value}")
        
# Show updated coverage statistics
print(f"\\nüìà Updated Database Statistics:")
total_cards_after = cards_collection.count_documents({})
total_pricing_after = pricing_collection.count_documents({})
unique_cards_with_pricing_after = len(pricing_collection.distinct('scryfall_id'))

print(f"   Total cards in database: {total_cards_after:,}")
print(f"   Total pricing records: {total_pricing_after:,}")
print(f"   Unique cards with pricing: {unique_cards_with_pricing_after:,}")
print(f"   Coverage: {(unique_cards_with_pricing_after/total_cards_after*100):.2f}%")
print(f"\\n‚úÖ Production pipeline execution complete!")

2025-12-13 00:01:01,007 - INFO - Starting daily pricing pipeline for 2025-12-13


üî• STARTING PRODUCTION PRICING PIPELINE
üìÖ Target date: 2025-12-13
üéØ Expected cards to process: ~109,634 (total - cards with existing pricing)
‚ö° Batch size: 75 cards per API call (~1,467 total API calls)
‚è±Ô∏è  Estimated time: ~15-20 minutes (with 0.1s rate limiting)
\nüöÄ Starting pipeline execution...


2025-12-13 00:01:01,502 - INFO - Skipping 25 cards with existing pricing
2025-12-13 00:01:02,299 - INFO - Cards needing pricing: 110,006
2025-12-13 00:01:03,485 - INFO - Starting bulk collection for 1000 cards
2025-12-13 00:01:03,486 - INFO - Processing batch 1 (75 cards)
2025-12-13 00:01:04,525 - INFO - Processing batch 2 (75 cards)
2025-12-13 00:01:05,014 - INFO - Processing batch 3 (75 cards)
2025-12-13 00:01:05,819 - INFO - Processing batch 4 (75 cards)
2025-12-13 00:01:06,395 - INFO - Processing batch 5 (75 cards)
2025-12-13 00:01:07,040 - INFO - Processing batch 6 (75 cards)
2025-12-13 00:01:07,485 - INFO - Processing batch 7 (75 cards)
2025-12-13 00:01:08,096 - INFO - Processing batch 8 (75 cards)
2025-12-13 00:01:08,756 - INFO - Processing batch 9 (75 cards)
2025-12-13 00:01:09,240 - INFO - Processing batch 10 (75 cards)
2025-12-13 00:01:09,632 - INFO - Processing batch 11 (75 cards)
2025-12-13 00:01:10,474 - INFO - Processing batch 12 (75 cards)
2025-12-13 00:01:11,425 - INFO 

üéâ PRODUCTION PIPELINE COMPLETED!
üìä Final Results:
   status: complete
   date: 2025-12-13
   cards_processed: 110,006
   records_created: 320,641
   avg_records_per_card: 2.9147591949530027
\nüìà Updated Database Statistics:
   Total cards in database: 110,031
   Total pricing records: 321,433
   Unique cards with pricing: 99,341
   Coverage: 90.28%
\n‚úÖ Production pipeline execution complete!


## 6. Analysis Functions

In [26]:
def analyze_pricing_coverage():
    """Analyze current pricing data coverage"""
    total_cards = cards_collection.count_documents({})
    
    # Cards with any pricing
    cards_with_pricing = len(
        pricing_collection.distinct('scryfall_id')
    )
    
    # Pricing by date
    pricing_by_date = list(pricing_collection.aggregate([
        {'$group': {'_id': '$date', 'count': {'$sum': 1}}},
        {'$sort': {'_id': -1}}
    ]))
    
    # Price type distribution
    price_types = list(pricing_collection.aggregate([
        {'$group': {'_id': '$price_type', 'count': {'$sum': 1}}},
        {'$sort': {'count': -1}}
    ]))
    
    print("üìä PRICING COVERAGE ANALYSIS")
    print("=" * 40)
    print(f"Total Cards: {total_cards:,}")
    print(f"Cards with Pricing: {cards_with_pricing:,}")
    print(f"Coverage: {(cards_with_pricing/total_cards*100):.2f}%")
    print()
    
    print("üìÖ Recent Pricing by Date:")
    for item in pricing_by_date[:5]:
        date_str = item['_id'] or 'Unknown'
        count = item['count']
        print(f"   {date_str}: {count:,} records")
    
    print()
    print("üí∞ Price Types:")
    for item in price_types:
        print(f"   {item['_id']}: {item['count']:,} records")

def get_pricing_summary_for_set(set_code, limit=10):
    """Get pricing summary for a specific set"""
    # Get cards from set with pricing
    pipeline = [
        {'$match': {'set': set_code.lower()}},
        {'$lookup': {
            'from': 'card_pricing_daily',
            'localField': 'id',
            'foreignField': 'scryfall_id',
            'as': 'pricing'
        }},
        {'$match': {'pricing': {'$ne': []}}},
        {'$limit': limit}
    ]
    
    results = list(cards_collection.aggregate(pipeline))
    
    print(f"üéØ PRICING SUMMARY FOR {set_code.upper()}")
    print("=" * 40)
    print(f"Cards found: {len(results)}")
    print()
    
    for card in results[:5]:
        name = card['name']
        rarity = card.get('rarity', 'unknown')
        pricing = card['pricing']
        
        print(f"üÉè {name} ({rarity})")
        
        # Group pricing by date
        price_by_date = defaultdict(list)
        for p in pricing:
            price_by_date[p['date']].append(p)
        
        for date_key in sorted(price_by_date.keys())[-2:]:  # Last 2 dates
            prices = price_by_date[date_key]
            usd_prices = [p for p in prices if p['currency'] == 'usd']
            
            if usd_prices:
                price_str = ", ".join(f"${p['price_value']} ({p['price_type']})" for p in usd_prices)
                print(f"   {date_key}: {price_str}")
        print()

print("‚úÖ Analysis functions ready")

‚úÖ Analysis functions ready


## 7. Current Status

In [27]:
# Run coverage analysis
analyze_pricing_coverage()

üìä PRICING COVERAGE ANALYSIS
Total Cards: 110,031
Cards with Pricing: 99,341
Coverage: 90.28%

üìÖ Recent Pricing by Date:
   2025-12-13: 320,666 records
   2025-12-12: 767 records

üí∞ Price Types:
   usd: 80,819 records
   eur: 77,013 records
   usd_foil: 55,148 records
   tix: 54,222 records
   eur_foil: 53,013 records
   usd_etched: 1,218 records


## 8. Ready for Production

The pipeline is now clean and production-ready. Key improvements:

1. **Bulk API Usage**: 75 cards per request (much more efficient)
2. **Comprehensive Pricing**: Captures USD, EUR, MTGO, and all foil variants
3. **Smart Batching**: Processes cards in manageable chunks
4. **Duplicate Prevention**: Skips existing records automatically
5. **Error Handling**: Robust logging and error recovery
6. **Scalable**: Can handle full database (110K+ cards)

To run the full pipeline:
```python
# For today's pricing
result = run_daily_pricing_pipeline()

# For specific date
result = run_daily_pricing_pipeline(target_date='2025-12-13')
```