In [1]:
import os
import pandas as pd
from dotenv import load_dotenv
import snowflake.connector

load_dotenv()

conn = snowflake.connector.connect(
    user=os.getenv('SNOWFLAKE_USER'),
    password=os.getenv('SNOWFLAKE_PASSWORD'),
    account=os.getenv('SNOWFLAKE_ACCOUNT'),
    warehouse=os.getenv('SNOWFLAKE_WAREHOUSE', 'COMPUTE_WH'),
    database='INCREMENTALITY',
    schema='INCREMENTALITY_RESEARCH'
)

print("Connected")

Connected


In [4]:

query = """
SELECT *
FROM CATALOG
LIMIT 10
"""

cursor = conn.cursor()
cursor.execute(query)
results = cursor.fetchall()

columns = ['PRODUCT_ID', 'NAME', 'ACTIVE', 'CATEGORIES', 'DESCRIPTION', 'PRICE', 'VENDORS', 'IS_DELETED']
df = pd.DataFrame(results, columns=columns)

print(f"Got {len(df)} rows from PRODUCT_CATALOG")
print(df.to_string(index=False))
print("\nData types:")
print(df.dtypes)


Got 10 rows from PRODUCT_CATALOG
              PRODUCT_ID                                                                         NAME  ACTIVE                                                                                                                                                                                                                                                                                                                                        CATEGORIES                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        

In [2]:
import pandas as pd
from datetime import date, timedelta
from pathlib import Path
import snowflake.connector
from dotenv import load_dotenv
import json
import time
from tqdm import tqdm

ANALYSIS_START = date(2025, 3, 14)
ANALYSIS_END = date(2025, 9, 7)
BASE_PATH = Path("/Users/pranjal/Code/marketplace-incrementality/daily_summaries/data")
OUTPUT_DIR = BASE_PATH / "product_catalog_processed"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

print(f"--- Product Catalog Processing Configuration ---")
print(f"Analysis Period: {ANALYSIS_START} to {ANALYSIS_END}")
print(f"Output Directory: {OUTPUT_DIR}")

def get_daily_catalog_query(date_str: str, next_date_str: str) -> str:
    """
    SQL query that:
    1. Gets product IDs from purchases and clicks for the day
    2. Joins with catalog to get product data
    3. Processes/cleans categories in SQL
    4. Returns clean, processed product catalog data
    """
    return f"""
    WITH daily_products AS (
        -- Get unique product IDs from purchases
        SELECT DISTINCT PRODUCT_ID
        FROM PURCHASES
        WHERE PURCHASED_AT >= '{date_str}'::TIMESTAMP_NTZ 
        AND PURCHASED_AT < '{next_date_str}'::TIMESTAMP_NTZ
        AND PRODUCT_ID IS NOT NULL
        
        UNION
        
        -- Get unique product IDs from clicks
        SELECT DISTINCT PRODUCT_ID
        FROM CLICKS
        WHERE OCCURRED_AT >= '{date_str}'::TIMESTAMP_NTZ 
        AND OCCURRED_AT < '{next_date_str}'::TIMESTAMP_NTZ
        AND PRODUCT_ID IS NOT NULL
    ),
    processed_catalog AS (
        SELECT 
            c.PRODUCT_ID,
            c.NAME,
            c.PRICE,
            c.ACTIVE,
            c.IS_DELETED,
            c.DESCRIPTION,
            -- Extract and clean category data using higher-order functions
            SPLIT_PART(ARRAY_TO_STRING(FILTER(c.CATEGORIES, x -> x LIKE 'brand#%'), ''), '#', 2) AS brand,
            SPLIT_PART(ARRAY_TO_STRING(FILTER(c.CATEGORIES, x -> x LIKE 'department#%'), ''), '#', 2) AS department_id,
            SPLIT_PART(ARRAY_TO_STRING(FILTER(c.CATEGORIES, x -> x LIKE 'category#%'), ''), '#', 2) AS category_id,
            SPLIT_PART(ARRAY_TO_STRING(FILTER(c.CATEGORIES, x -> x LIKE 'category_feature#%'), ''), '#', 2) AS category_feature_id,
            ARRAY_TO_STRING(TRANSFORM(FILTER(c.CATEGORIES, x -> x LIKE 'color#%'), y -> SPLIT_PART(y, '#', 2)), '|') AS colors,
            SPLIT_PART(ARRAY_TO_STRING(FILTER(c.CATEGORIES, x -> x LIKE 'size#%'), ''), '#', 2) AS size_info,
            ARRAY_TO_STRING(TRANSFORM(FILTER(c.CATEGORIES, x -> x LIKE 'style_tag#%'), y -> SPLIT_PART(y, '#', 2)), '|') AS style_tags,
            SPLIT_PART(ARRAY_TO_STRING(FILTER(c.CATEGORIES, x -> x LIKE 'domain#%'), ''), '#', 2) AS domain,
            -- Extract created_at from categories
            SPLIT_PART(ARRAY_TO_STRING(FILTER(c.CATEGORIES, x -> x LIKE 'created_at#%'), ''), '#', 2) AS product_created_at,
            -- Category counts
            ARRAY_SIZE(c.CATEGORIES) AS total_categories,
            ARRAY_SIZE(FILTER(c.CATEGORIES, x -> x LIKE 'color#%')) AS color_count,
            ARRAY_SIZE(FILTER(c.CATEGORIES, x -> x LIKE 'style_tag#%')) AS style_tag_count,
            -- Data quality flags
            CASE WHEN c.PRICE <= 0 OR c.PRICE > 10000 THEN TRUE ELSE FALSE END AS price_outlier,
            CASE WHEN LENGTH(c.NAME) < 5 THEN TRUE ELSE FALSE END AS short_name,
            CASE WHEN c.DESCRIPTION IS NULL OR LENGTH(c.DESCRIPTION) < 10 THEN TRUE ELSE FALSE END AS poor_description,
            -- Extract vendors (assuming it's an array)
            c.VENDORS,
            -- Add extraction metadata
            '{date_str}'::DATE AS extraction_date,
            CURRENT_TIMESTAMP() AS processed_at
        FROM CATALOG c
        INNER JOIN daily_products dp ON c.PRODUCT_ID = dp.PRODUCT_ID
    )
    SELECT * FROM processed_catalog
    """

def process_daily_catalog_data():
    """Process catalog data day by day, merging SQL operations"""

    date_list = pd.date_range(start=ANALYSIS_START, end=ANALYSIS_END, freq='D')
    all_processed_data = []
    processed_products = set()
    daily_stats = []

    # Check for existing processed data
    checkpoint_file = OUTPUT_DIR / "processed_products_checkpoint.json"
    if checkpoint_file.exists():
        with open(checkpoint_file, 'r') as f:
            checkpoint_data = json.load(f)
            processed_products = set(checkpoint_data.get('processed_products', []))
            print(f"Resuming from checkpoint: {len(processed_products):,} products already processed")

    for current_date in tqdm(date_list, desc="Processing Daily Catalog Data"):
        date_str = current_date.strftime('%Y-%m-%d')
        next_date_str = (current_date + timedelta(days=1)).strftime('%Y-%m-%d')

        # Check if this date was already processed
        daily_output_file = OUTPUT_DIR / f"catalog_processed_{date_str}.parquet"
        if daily_output_file.exists():
            print(f"Skipping {date_str} - already processed")
            continue

        try:
            print(f"Processing {date_str}...")

            # Execute the combined SQL query
            query = get_daily_catalog_query(date_str, next_date_str)
            cursor = conn.cursor()
            cursor.execute(query)
            results = cursor.fetchall()

            if results:
                columns = [desc[0] for desc in cursor.description]
                df_day = pd.DataFrame(results, columns=columns)

                # Filter out products we've already processed
                new_products = df_day[~df_day['PRODUCT_ID'].isin(processed_products)]

                if not new_products.empty:
                    # Save daily processed data
                    new_products.to_parquet(daily_output_file, index=False, engine='pyarrow', compression='snappy')
                    all_processed_data.append(new_products)

                    # Update processed products set
                    new_product_ids = set(new_products['PRODUCT_ID'].unique())
                    processed_products.update(new_product_ids)

                    # Track daily statistics
                    daily_stats.append({
                        'date': date_str,
                        'products_found': len(df_day),
                        'new_products': len(new_products),
                        'cumulative_processed': len(processed_products),
                        'has_purchases': len(df_day) > 0,  # Indicates if day had purchase/click data
                        'avg_price': float(new_products['PRICE'].mean()) if 'PRICE' in new_products.columns else None,
                        'active_products': int(new_products['ACTIVE'].sum()) if 'ACTIVE' in new_products.columns else 0
                    })

                    print(f"  Found {len(df_day):,} products, {len(new_products):,} new")
                else:
                    print(f"  No new products found for {date_str}")
            else:
                print(f"  No data found for {date_str}")

        except Exception as e:
            print(f"Error processing {date_str}: {e}")
            continue

        # Save checkpoint every 7 days
        if len(daily_stats) % 7 == 0:
            checkpoint_data = {
                'processed_products': list(processed_products),
                'last_processed_date': date_str,
                'daily_stats': daily_stats
            }
            with open(checkpoint_file, 'w') as f:
                json.dump(checkpoint_data, f, default=str)
            print(f"Checkpoint saved: {len(processed_products):,} products processed")

    return all_processed_data, daily_stats, processed_products

def create_final_dataset(all_processed_data, daily_stats, processed_products):
    """Combine all daily processed data into final dataset"""

    if all_processed_data:
        print("\n=== Creating Final Dataset ===")

        # Combine all daily data
        df_final = pd.concat(all_processed_data, ignore_index=True)

        # Remove duplicates (keep latest extraction_date for each product)
        df_final = df_final.sort_values('extraction_date').drop_duplicates(subset=['PRODUCT_ID'], keep='last')

        # Save final combined dataset
        final_file = OUTPUT_DIR / "product_catalog_final_processed.parquet"
        df_final.to_parquet(final_file, index=False, engine='pyarrow', compression='snappy')

        # Create summary statistics
        summary_stats = {
            'total_unique_products': len(df_final),
            'total_processed_products': len(processed_products),
            'active_products': int(df_final['ACTIVE'].sum()),
            'deleted_products': int(df_final['IS_DELETED'].sum()),
            'price_statistics': df_final['PRICE'].describe().to_dict(),
            'coverage_by_category': {
                'with_brand': int(df_final['BRAND'].notna().sum()),
                'with_colors': int(df_final['COLORS'].notna().sum()),
                'with_style_tags': int(df_final['STYLE_TAGS'].notna().sum()),
                'with_size': int(df_final['SIZE_INFO'].notna().sum())
            },
            'data_quality': {
                'price_outliers': int(df_final['PRICE_OUTLIER'].sum()),
                'short_names': int(df_final['SHORT_NAME'].sum()),
                'poor_descriptions': int(df_final['POOR_DESCRIPTION'].sum())
            },
            'daily_processing_stats': daily_stats,
            'processing_period': f"{ANALYSIS_START} to {ANALYSIS_END}",
            'final_processing_timestamp': time.time()
        }

        # Save summary
        with open(OUTPUT_DIR / "processing_summary.json", 'w') as f:
            json.dump(summary_stats, f, indent=2, default=str)

        print(f"Final dataset created: {df_final.shape[0]:,} products")
        print(f"Saved to: {final_file}")

        # Display key statistics
        print(f"\n=== FINAL STATISTICS ===")
        print(f"Total unique products: {len(df_final):,}")
        print(f"Active products: {df_final['ACTIVE'].sum():,}")
        print(f"Products with brands: {df_final['BRAND'].notna().sum():,}")
        print(f"Products with colors: {df_final['COLORS'].notna().sum():,}")
        print(f"Average price: ${df_final['PRICE'].mean():.2f}")
        print(f"Price outliers: {df_final['PRICE_OUTLIER'].sum():,}")

        return df_final, summary_stats

    else:
        print("No data to process")
        return None, None

def main():
    """Main execution function"""

    print("=== PROCESSING PRODUCT CATALOG DATA ===")
    print("Using SQL-based approach: Purchases + Clicks -> Catalog Join -> Process -> Save")

    # Process data day by day
    all_data, daily_stats, processed_products = process_daily_catalog_data()

    # Create final combined dataset
    final_df, summary = create_final_dataset(all_data, daily_stats, processed_products)

    print(f"\n=== PROCESSING COMPLETE ===")
    print(f"All output saved to: {OUTPUT_DIR}")

# For incremental updates (new data)
def process_incremental_update(start_date: date, end_date: date):
    """Process only new dates that haven't been processed before"""

    # Load existing processed products
    checkpoint_file = OUTPUT_DIR / "processed_products_checkpoint.json"
    if checkpoint_file.exists():
        with open(checkpoint_file, 'r') as f:
            checkpoint_data = json.load(f)
            existing_products = set(checkpoint_data.get('processed_products', []))
    else:
        existing_products = set()

    print(f"Incremental update: {start_date} to {end_date}")
    print(f"Starting with {len(existing_products):,} existing products")

if __name__ == "__main__":
    main()

--- Product Catalog Processing Configuration ---
Analysis Period: 2025-03-14 to 2025-09-07
Output Directory: /Users/pranjal/Code/marketplace-incrementality/daily_summaries/data/product_catalog_processed
=== PROCESSING PRODUCT CATALOG DATA ===
Using SQL-based approach: Purchases + Clicks -> Catalog Join -> Process -> Save
Resuming from checkpoint: 12,252,147 products already processed


Processing Daily Catalog Data:   0%|          | 0/178 [00:00<?, ?it/s]

Skipping 2025-03-14 - already processed
Skipping 2025-03-15 - already processed
Skipping 2025-03-16 - already processed
Skipping 2025-03-17 - already processed
Skipping 2025-03-18 - already processed
Skipping 2025-03-19 - already processed
Skipping 2025-03-20 - already processed
Skipping 2025-03-21 - already processed
Skipping 2025-03-22 - already processed
Skipping 2025-03-23 - already processed
Skipping 2025-03-24 - already processed
Skipping 2025-03-25 - already processed
Skipping 2025-03-26 - already processed
Skipping 2025-03-27 - already processed
Skipping 2025-03-28 - already processed
Skipping 2025-03-29 - already processed
Skipping 2025-03-30 - already processed
Skipping 2025-03-31 - already processed
Skipping 2025-04-01 - already processed
Skipping 2025-04-02 - already processed
Skipping 2025-04-03 - already processed
Skipping 2025-04-04 - already processed
Skipping 2025-04-05 - already processed
Skipping 2025-04-06 - already processed
Skipping 2025-04-07 - already processed


Processing Daily Catalog Data:  84%|████████▎ | 149/178 [01:43<00:20,  1.44it/s]

  Found 541,888 products, 91,295 new
Processing 2025-08-10...


Processing Daily Catalog Data:  84%|████████▍ | 150/178 [03:26<00:46,  1.66s/it]

  Found 570,136 products, 87,237 new
Processing 2025-08-11...


Processing Daily Catalog Data:  85%|████████▍ | 151/178 [05:04<01:19,  2.96s/it]

  Found 521,520 products, 74,963 new
Processing 2025-08-12...


Processing Daily Catalog Data:  85%|████████▌ | 152/178 [06:38<02:01,  4.66s/it]

  Found 515,236 products, 74,333 new
Processing 2025-08-13...


Processing Daily Catalog Data:  86%|████████▌ | 153/178 [08:05<02:50,  6.81s/it]

  Found 522,194 products, 73,307 new
Processing 2025-08-14...


Processing Daily Catalog Data:  87%|████████▋ | 154/178 [09:26<03:47,  9.46s/it]

  Found 522,600 products, 72,662 new
Processing 2025-08-15...
  Found 528,320 products, 73,416 new


Processing Daily Catalog Data:  87%|████████▋ | 155/178 [10:44<04:54, 12.82s/it]

Checkpoint saved: 12,799,360 products processed
Processing 2025-08-16...


Processing Daily Catalog Data:  88%|████████▊ | 156/178 [12:10<06:27, 17.60s/it]

  Found 560,067 products, 77,963 new
Processing 2025-08-17...


Processing Daily Catalog Data:  88%|████████▊ | 157/178 [14:06<09:06, 26.04s/it]

  Found 582,709 products, 79,648 new
Processing 2025-08-18...


Processing Daily Catalog Data:  89%|████████▉ | 158/178 [18:41<17:39, 52.98s/it]

  Found 526,908 products, 70,271 new
Processing 2025-08-19...


Processing Daily Catalog Data:  89%|████████▉ | 159/178 [20:07<18:10, 57.41s/it]

  Found 535,157 products, 72,720 new
Processing 2025-08-20...


Processing Daily Catalog Data:  90%|████████▉ | 160/178 [21:37<18:48, 62.71s/it]

  Found 520,498 products, 70,121 new
Processing 2025-08-21...


Processing Daily Catalog Data:  90%|█████████ | 161/178 [23:06<19:08, 67.57s/it]

  Found 514,541 products, 68,864 new
Processing 2025-08-22...
  Found 516,580 products, 68,767 new


Processing Daily Catalog Data:  91%|█████████ | 162/178 [24:41<19:35, 73.45s/it]

Checkpoint saved: 13,307,714 products processed
Processing 2025-08-23...


Processing Daily Catalog Data:  92%|█████████▏| 163/178 [27:46<24:47, 99.18s/it]

  Found 548,485 products, 75,061 new
Processing 2025-08-24...


Processing Daily Catalog Data:  92%|█████████▏| 164/178 [30:44<27:44, 118.93s/it]

  Found 577,124 products, 79,140 new
Processing 2025-08-25...


Processing Daily Catalog Data:  93%|█████████▎| 165/178 [33:47<29:23, 135.63s/it]

  Found 521,781 products, 69,546 new
Processing 2025-08-26...


Processing Daily Catalog Data:  93%|█████████▎| 166/178 [35:22<24:54, 124.51s/it]

  Found 519,026 products, 70,372 new
Processing 2025-08-27...


Processing Daily Catalog Data:  94%|█████████▍| 167/178 [36:54<21:09, 115.39s/it]

  Found 519,433 products, 69,801 new
Processing 2025-08-28...


Processing Daily Catalog Data:  94%|█████████▍| 168/178 [38:22<17:57, 107.71s/it]

  Found 511,034 products, 68,688 new
Processing 2025-08-29...
  Found 534,340 products, 72,244 new


Processing Daily Catalog Data:  95%|█████████▍| 169/178 [39:59<15:40, 104.49s/it]

Checkpoint saved: 13,812,566 products processed
Processing 2025-08-30...


Processing Daily Catalog Data:  96%|█████████▌| 170/178 [41:38<13:42, 102.81s/it]

  Found 568,998 products, 77,691 new
Processing 2025-08-31...


Processing Daily Catalog Data:  96%|█████████▌| 171/178 [43:12<11:42, 100.32s/it]

  Found 588,197 products, 79,572 new
Processing 2025-09-01...


Processing Daily Catalog Data:  97%|█████████▋| 172/178 [44:48<09:54, 99.02s/it] 

  Found 588,693 products, 80,236 new
Processing 2025-09-02...


Processing Daily Catalog Data:  97%|█████████▋| 173/178 [46:19<08:03, 96.75s/it]

  Found 527,059 products, 69,985 new
Processing 2025-09-03...


Processing Daily Catalog Data:  98%|█████████▊| 174/178 [47:54<06:24, 96.04s/it]

  Found 533,723 products, 73,545 new
Processing 2025-09-04...


Processing Daily Catalog Data:  98%|█████████▊| 175/178 [49:25<04:43, 94.48s/it]

  Found 517,107 products, 73,173 new
Processing 2025-09-05...
  Found 531,857 products, 77,036 new


Processing Daily Catalog Data:  99%|█████████▉| 176/178 [51:01<03:10, 95.15s/it]

Checkpoint saved: 14,343,804 products processed
Processing 2025-09-06...


Processing Daily Catalog Data:  99%|█████████▉| 177/178 [52:37<01:35, 95.30s/it]

  Found 607,262 products, 88,407 new
Processing 2025-09-07...


Processing Daily Catalog Data: 100%|██████████| 178/178 [54:14<00:00, 18.28s/it]

  Found 635,765 products, 92,794 new

=== Creating Final Dataset ===





KeyError: 'extraction_date'

In [None]:

# Complete the incomplete process_incremental_update function
def process_incremental_update(start_date: date, end_date: date):
    """Process only new dates that haven't been processed before"""

    # Load existing processed products from checkpoint
    checkpoint_file = OUTPUT_DIR / "processed_products_checkpoint.json"
    if checkpoint_file.exists():
        with open(checkpoint_file, 'r') as f:
            checkpoint_data = json.load(f)
            existing_products = set(checkpoint_data.get('processed_products', []))
            existing_daily_stats = checkpoint_data.get('daily_stats', [])
    else:
        existing_products = set()
        existing_daily_stats = []

    print(f"Incremental update: {start_date} to {end_date}")
    print(f"Starting with {len(existing_products):,} existing products")

    # Process only the new date range
    date_list = pd.date_range(start=start_date, end=end_date, freq='D')
    new_processed_data = []
    updated_stats = existing_daily_stats.copy()

    for current_date in tqdm(date_list, desc="Processing Remaining Days"):
        date_str = current_date.strftime('%Y-%m-%d')
        next_date_str = (current_date + timedelta(days=1)).strftime('%Y-%m-%d')

        # Check if this date was already processed
        daily_output_file = OUTPUT_DIR / f"catalog_processed_{date_str}.parquet"
        if daily_output_file.exists():
            print(f"Skipping {date_str} - already processed")
            continue

        try:
            print(f"Processing {date_str}...")

            # Execute the combined SQL query
            query = get_daily_catalog_query(date_str, next_date_str)
            cursor = conn.cursor()
            cursor.execute(query)
            results = cursor.fetchall()

            if results:
                columns = [desc[0] for desc in cursor.description]
                df_day = pd.DataFrame(results, columns=columns)

                # Filter out products we've already processed
                new_products = df_day[~df_day['PRODUCT_ID'].isin(existing_products)]

                if not new_products.empty:
                    # Save daily processed data
                    new_products.to_parquet(daily_output_file, index=False, engine='pyarrow', compression='snappy')
                    new_processed_data.append(new_products)

                    # Update processed products set
                    new_product_ids = set(new_products['PRODUCT_ID'].unique())
                    existing_products.update(new_product_ids)

                    # Track daily statistics
                    updated_stats.append({
                        'date': date_str,
                        'products_found': len(df_day),
                        'new_products': len(new_products),
                        'cumulative_processed': len(existing_products),
                        'has_purchases': len(df_day) > 0,
                        'avg_price': float(new_products['PRICE'].mean()) if 'PRICE' in new_products.columns else None,
                        'active_products': int(new_products['ACTIVE'].sum()) if 'ACTIVE' in new_products.columns else 0
                    })

                    print(f"  Found {len(df_day):,} products, {len(new_products):,} new")
                else:
                    print(f"  No new products found for {date_str}")
            else:
                print(f"  No data found for {date_str}")

        except Exception as e:
            print(f"Error processing {date_str}: {e}")
            continue

        # Save checkpoint every 7 days
        if len([s for s in updated_stats if s['date'] >= start_date.strftime('%Y-%m-%d')]) % 7 == 0:
            checkpoint_data = {
                'processed_products': list(existing_products),
                'last_processed_date': date_str,
                'daily_stats': updated_stats
            }
            with open(checkpoint_file, 'w') as f:
                json.dump(checkpoint_data, f, default=str)
            print(f"Checkpoint saved: {len(existing_products):,} products processed")

    # Final checkpoint update
    checkpoint_data = {
        'processed_products': list(existing_products),
        'last_processed_date': end_date.strftime('%Y-%m-%d'),
        'daily_stats': updated_stats
    }
    with open(checkpoint_file, 'w') as f:
        json.dump(checkpoint_data, f, default=str)

    print(f"\nIncremental update complete!")
    print(f"Total products now: {len(existing_products):,}")
    print(f"New products added: {sum(len(df) for df in new_processed_data):,}")

    # Create updated final dataset if we processed new data
    if new_processed_data:
        create_final_dataset(new_processed_data, updated_stats, existing_products)

    return new_processed_data, updated_stats, existing_products

# Resume processing from where you left off (2025-08-09 to 2025-09-07)
process_incremental_update(date(2025, 8, 9), date(2025, 9, 7))
