# USDA Ingestion Pipeline - Complete Testing

This notebook walks through the complete USDA ETL pipeline testing:
1. **Environment Setup**: Configure PYTHONPATH and imports
2. **Database Connection**: Verify connectivity
3. **Commodity Mapper**: Test USDA code lookups
4. **Extract**: Fetch data from USDA NASS API
5. **Transform**: Clean and normalize data
6. **Load**: Insert into database
7. **Verification**: Query and confirm results

**Goal**: Demonstrate full working USDA ingestion pipeline with output ‚úì

## Step 1: Environment Setup

In [1]:
import os
import sys
from pathlib import Path
import pandas as pd
from datetime import datetime

# Configure PYTHONPATH for namespace packages
workspace_root = Path(r'c:\Users\meili\forked\ca-biositing')
sys.path.insert(0, str(workspace_root / 'src' / 'ca_biositing' / 'pipeline'))
sys.path.insert(0, str(workspace_root / 'src' / 'ca_biositing' / 'datamodels'))
sys.path.insert(0, str(workspace_root / 'src' / 'ca_biositing' / 'webservice'))
os.chdir(str(workspace_root))

# Load environment variables
from dotenv import load_dotenv
load_dotenv(workspace_root / '.env')

print("‚úì Environment configured")
print(f"‚úì Working directory: {os.getcwd()}")
print(f"‚úì DATABASE_URL loaded: {bool(os.getenv('DATABASE_URL'))}")
print(f"‚úì USDA_NASS_API_KEY loaded: {bool(os.getenv('USDA_NASS_API_KEY'))}")

‚úì Environment configured
‚úì Working directory: c:\Users\meili\forked\ca-biositing
‚úì DATABASE_URL loaded: True
‚úì USDA_NASS_API_KEY loaded: True


## Step 2: Test Database Connection

In [2]:
from sqlalchemy import create_engine, text

engine = create_engine(os.getenv('DATABASE_URL'))

try:
    with engine.connect() as conn:
        result = conn.execute(text("SELECT version();"))
        version = result.fetchone()[0]
        print(f"‚úì Database connected")
        print(f"  PostgreSQL version: {version[:60]}...")
except Exception as e:
    print(f"‚úó Database connection failed: {e}")
    raise

‚úì Database connected
  PostgreSQL version: PostgreSQL 13.5 (Debian 13.5-1.pgdg110+1) on x86_64-pc-linux...


## Step 3: Test Commodity Mapper

In [3]:
from ca_biositing.pipeline.utils.commodity_mapper import get_mapped_commodity_ids

print("Testing Commodity Mapper:")
print("="*50)

try:
    commodity_codes = get_mapped_commodity_ids()
    print(f"‚úì Retrieved {len(commodity_codes)} commodity codes:")
    for idx, code in enumerate(commodity_codes[:5]):
        print(f"  - Code {idx + 1}: {code}")
except Exception as e:
    print(f"‚úó Commodity mapper failed: {e}")
    raise

Testing Commodity Mapper:
‚úì Retrieved 4 commodity codes:
  - Code 1: 1
  - Code 2: 2
  - Code 3: 3
  - Code 4: 4


## Step 4: Test USDA Extract (Fetch from API)

In [4]:

import requests
import time
from ca_biositing.pipeline.utils.nass_config import PRIORITY_COUNTIES

print("Testing USDA API - North San Joaquin Valley County-Level Data:")
print("="*60)

api_key = os.getenv('USDA_NASS_API_KEY')

# Map FIPS codes to 3-digit county codes (API requires separate state + county)
fips_to_county_code = {
    "06077": "077",  # San Joaquin
    "06099": "099",  # Stanislaus  
    "06047": "047",  # Merced
}

results_by_county = {}

for county_name, fips_code in PRIORITY_COUNTIES.items():
    county_code = fips_to_county_code[fips_code]
    print(f"\n[{county_name}] FIPS: {fips_code} ‚Üí County Code: {county_code}")
    
    # Use state_alpha + county_code (confirmed working from R package docs)
    params = {
        "key": api_key,
        "state_alpha": "CA",
        "county_code": county_code,  # 3-digit county code (077, 099, 047)
        "format": "JSON",
        "year": 2022  # Using 2022 since 2023 may not have complete data yet
    }
    
    try:
        resp = requests.get("https://quickstats.nass.usda.gov/api/api_GET", params=params, timeout=30)
        print(f"  Status: {resp.status_code}")
        
        data = resp.json()
        if isinstance(data, dict) and "data" in data:
            records = data["data"]
            print(f"  Records: {len(records)}")
            
            if len(records) > 0:
                results_by_county[county_name] = records
                commodities = set([r.get('commodity_desc') for r in records if r.get('commodity_desc')])
                print(f"  Commodities available: {', '.join(sorted(commodities)[:5])}...")
                
                # Show a sample
                sample = records[0]
                print(f"  Sample: {sample.get('commodity_desc')} - {sample.get('short_desc')[:50]}...")
        elif "error" in data:
            print(f"  Error: {data['error']}")
        else:
            print(f"  No data returned")
    except Exception as e:
        print(f"  Exception: {e}")
    
    time.sleep(1)

print(f"\n{'='*60}")
print(f"‚úì County-level exploration complete!")
print(f"  Counties with data: {len(results_by_county)}")

# Combine all results into a single DataFrame
if results_by_county:
    all_records = []
    for county_name, records in results_by_county.items():
        all_records.extend(records)
    
    raw_data = pd.DataFrame(all_records)
    print(f"  Total records: {len(raw_data)}")
    print(f"  Unique commodities: {raw_data['commodity_desc'].nunique()}")
    
    print(f"\n  Sample:")
    print(raw_data[['year', 'county_name', 'commodity_desc', 'short_desc']].drop_duplicates().head(3).to_string(index=False))
else:
    print("  ‚ö† No data found in any county")
    raw_data = pd.DataFrame()


Testing USDA API - North San Joaquin Valley County-Level Data:

[San Joaquin] FIPS: 06077 ‚Üí County Code: 077
  Status: 200
  Records: 2233
  Commodities available: AG LAND, AG SERVICES, ALMONDS, ALPACAS, ANIMAL TOTALS...
  Sample: ANIMAL TOTALS - ANIMAL TOTALS, INCL PRODUCTS - SALES, MEASURED IN ...

[Stanislaus] FIPS: 06099 ‚Üí County Code: 099
  Status: 200
  Records: 2102
  Commodities available: AG LAND, AG SERVICES, ALMONDS, ALPACAS, ANIMAL TOTALS...
  Sample: ANIMAL TOTALS - ANIMAL TOTALS, INCL PRODUCTS - SALES, MEASURED IN ...

[Merced] FIPS: 06047 ‚Üí County Code: 047
  Status: 200
  Records: 2229
  Commodities available: AG LAND, AG SERVICES, ALMONDS, ALPACAS, ANIMAL TOTALS...
  Sample: ANIMAL TOTALS - ANIMAL TOTALS, INCL PRODUCTS - SALES, MEASURED IN ...

‚úì County-level exploration complete!
  Counties with data: 3
  Total records: 6564
  Unique commodities: 191

  Sample:
 year county_name     commodity_desc                                               short_desc
 2022 

### Inspect raw data from API

In [5]:
print("="*80)
print("Inspecting Raw Data from USDA API")
print("="*80)

if 'raw_data' in locals() and len(raw_data) > 0:
    # CRITICAL: Filter to only the counties we requested
    # NOTE: API returns uppercase county names, so we need case-insensitive comparison
    priority_county_names = [name.upper() for name in PRIORITY_COUNTIES.keys()]
    print(f"\nüîç Filtering to priority counties (case-insensitive): {priority_county_names}")
    print(f"   Before filter: {len(raw_data)} records from counties: {raw_data['county_name'].unique().tolist()}")
    
    # Convert county_name to uppercase for comparison, then filter
    raw_data = raw_data[raw_data['county_name'].str.upper().isin(priority_county_names)].copy()
    print(f"   After filter: {len(raw_data)} records from counties: {raw_data['county_name'].unique().tolist()}")
    
    if len(raw_data) == 0:
        print("\n‚ö†Ô∏è WARNING: No records found for priority counties after filtering!")
        print("   This means the API returned data for different counties than requested.")
        print("   The NASS API state_fips + county_code parameters may not be working as expected.")
    
    print(f"\nüìä DataFrame Shape: {raw_data.shape}")
    print(f"   Rows: {len(raw_data)}, Columns: {len(raw_data.columns)}")
    
    print(f"\nüìã Column Information:")
    print(raw_data.info())
    
    print(f"\nüîç First 5 Rows:")
    pd.set_option('display.max_columns', None)
    pd.set_option('display.width', None)
    pd.set_option('display.max_colwidth', 50)
    print(raw_data.head())
    
    print(f"\nüìà Data Types:")
    print(raw_data.dtypes)
    
    print(f"\n‚ùå Missing Values:")
    missing = raw_data.isnull().sum()
    print(missing[missing > 0] if missing.sum() > 0 else "No missing values")
    
    print(f"\nüè∑Ô∏è Unique Values (key columns):")
    key_cols = ['commodity_desc', 'county_name', 'year', 'short_desc']
    for col in key_cols:
        if col in raw_data.columns:
            unique_count = raw_data[col].nunique()
            print(f"   {col}: {unique_count} unique values")
            if unique_count <= 10:
                print(f"      Values: {raw_data[col].unique().tolist()}")
    
    print(f"\nüìä Sample Value Ranges:")
    numeric_cols = raw_data.select_dtypes(include=['number']).columns
    for col in numeric_cols:
        print(f"   {col}: min={raw_data[col].min()}, max={raw_data[col].max()}")
    
    print(f"\n‚úÖ Sample Full Record (first row, all columns):")
    print(raw_data.iloc[0].to_string())
    
else:
    print("‚ö†Ô∏è No raw_data available to inspect")


Inspecting Raw Data from USDA API

üîç Filtering to priority counties (case-insensitive): ['SAN JOAQUIN', 'STANISLAUS', 'MERCED']
   Before filter: 6564 records from counties: ['SAN JOAQUIN', 'STANISLAUS', 'MERCED']
   After filter: 6564 records from counties: ['SAN JOAQUIN', 'STANISLAUS', 'MERCED']

üìä DataFrame Shape: (6564, 39)
   Rows: 6564, Columns: 39

üìã Column Information:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6564 entries, 0 to 6563
Data columns (total 39 columns):
 #   Column                 Non-Null Count  Dtype 
---  ------                 --------------  ----- 
 0   agg_level_desc         6564 non-null   object
 1   county_code            6564 non-null   object
 2   load_time              6564 non-null   object
 3   source_desc            6564 non-null   object
 4   reference_period_desc  6564 non-null   object
 5   domain_desc            6564 non-null   object
 6   week_ending            6564 non-null   object
 7   country_name           6564 non-null   o

In [6]:
# Verify raw_data is ready for transform
print("Data ready for transform:")
print(f"  Rows: {len(raw_data)}")
print(f"  Columns: {list(raw_data.columns)}")
print(f"  Counties: {raw_data['county_name'].unique().tolist() if 'county_name' in raw_data.columns else 'N/A'}")

# The Data Wrangler will be opened with the variable below
raw_data

Data ready for transform:
  Rows: 6564
  Columns: ['agg_level_desc', 'county_code', 'load_time', 'source_desc', 'reference_period_desc', 'domain_desc', 'week_ending', 'country_name', 'watershed_code', 'begin_code', 'commodity_desc', 'statisticcat_desc', 'country_code', 'short_desc', 'unit_desc', 'group_desc', 'state_alpha', 'asd_code', 'congr_district_code', 'freq_desc', 'state_fips_code', 'Value', 'watershed_desc', 'state_name', 'end_code', 'asd_desc', 'util_practice_desc', 'state_ansi', 'location_desc', 'sector_desc', 'year', 'CV (%)', 'region_desc', 'domaincat_desc', 'county_name', 'county_ansi', 'zip_5', 'prodn_practice_desc', 'class_desc']
  Counties: ['SAN JOAQUIN', 'STANISLAUS', 'MERCED']


Unnamed: 0,agg_level_desc,county_code,load_time,source_desc,reference_period_desc,domain_desc,week_ending,country_name,watershed_code,begin_code,commodity_desc,statisticcat_desc,country_code,short_desc,unit_desc,group_desc,state_alpha,asd_code,congr_district_code,freq_desc,state_fips_code,Value,watershed_desc,state_name,end_code,asd_desc,util_practice_desc,state_ansi,location_desc,sector_desc,year,CV (%),region_desc,domaincat_desc,county_name,county_ansi,zip_5,prodn_practice_desc,class_desc
0,COUNTY,077,2024-07-02 12:00:00.000,CENSUS,YEAR,TOTAL,,UNITED STATES,00000000,00,ANIMAL TOTALS,SALES,9000,"ANIMAL TOTALS, INCL PRODUCTS - SALES, MEASURED...",$,ANIMAL TOTALS,CA,51,,ANNUAL,06,910695000,,CALIFORNIA,00,SAN JOAQUIN VALLEY,ALL UTILIZATION PRACTICES,06,"CALIFORNIA, SAN JOAQUIN VALLEY, SAN JOAQUIN",ANIMALS & PRODUCTS,2022,(L),,NOT SPECIFIED,SAN JOAQUIN,077,,ALL PRODUCTION PRACTICES,INCL PRODUCTS
1,COUNTY,077,2024-07-02 12:00:00.000,CENSUS,YEAR,TOTAL,,UNITED STATES,00000000,00,ANIMAL TOTALS,SALES,9000,"ANIMAL TOTALS, INCL PRODUCTS - OPERATIONS WITH...",OPERATIONS,ANIMAL TOTALS,CA,51,,ANNUAL,06,560,,CALIFORNIA,00,SAN JOAQUIN VALLEY,ALL UTILIZATION PRACTICES,06,"CALIFORNIA, SAN JOAQUIN VALLEY, SAN JOAQUIN",ANIMALS & PRODUCTS,2022,14.7,,NOT SPECIFIED,SAN JOAQUIN,077,,ALL PRODUCTION PRACTICES,INCL PRODUCTS
2,COUNTY,077,2024-07-02 12:00:00.000,CENSUS,YEAR,TOTAL,,UNITED STATES,00000000,00,AQUACULTURE TOTALS,SALES & DISTRIBUTION,9000,"AQUACULTURE TOTALS - SALES & DISTRIBUTION, MEA...",$,AQUACULTURE,CA,51,,ANNUAL,06,(D),,CALIFORNIA,00,SAN JOAQUIN VALLEY,ALL UTILIZATION PRACTICES,06,"CALIFORNIA, SAN JOAQUIN VALLEY, SAN JOAQUIN",ANIMALS & PRODUCTS,2022,(D),,NOT SPECIFIED,SAN JOAQUIN,077,,ALL PRODUCTION PRACTICES,ALL CLASSES
3,COUNTY,077,2024-07-02 12:00:00.000,CENSUS,YEAR,TOTAL,,UNITED STATES,00000000,00,AQUACULTURE TOTALS,SALES & DISTRIBUTION,9000,AQUACULTURE TOTALS - OPERATIONS WITH SALES & D...,OPERATIONS,AQUACULTURE,CA,51,,ANNUAL,06,2,,CALIFORNIA,00,SAN JOAQUIN VALLEY,ALL UTILIZATION PRACTICES,06,"CALIFORNIA, SAN JOAQUIN VALLEY, SAN JOAQUIN",ANIMALS & PRODUCTS,2022,(L),,NOT SPECIFIED,SAN JOAQUIN,077,,ALL PRODUCTION PRACTICES,ALL CLASSES
4,COUNTY,077,2024-07-02 12:00:00.000,CENSUS,YEAR,TOTAL,,UNITED STATES,00000000,00,FOOD FISH,SALES & DISTRIBUTION,9000,"FOOD FISH, CATFISH - SALES & DISTRIBUTION, MEA...",$,AQUACULTURE,CA,51,,ANNUAL,06,(D),,CALIFORNIA,00,SAN JOAQUIN VALLEY,ALL UTILIZATION PRACTICES,06,"CALIFORNIA, SAN JOAQUIN VALLEY, SAN JOAQUIN",ANIMALS & PRODUCTS,2022,(D),,NOT SPECIFIED,SAN JOAQUIN,077,,ALL PRODUCTION PRACTICES,CATFISH
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
6559,COUNTY,047,2024-03-08 15:00:00.000,SURVEY,YEAR,TOTAL,,UNITED STATES,00000000,00,TOMATOES,AREA PLANTED,9000,"TOMATOES, IN THE OPEN, PROCESSING - ACRES PLANTED",ACRES,VEGETABLES,CA,51,,ANNUAL,06,24700,,CALIFORNIA,00,SAN JOAQUIN VALLEY,PROCESSING,06,"CALIFORNIA, SAN JOAQUIN VALLEY, MERCED",CROPS,2022,,,NOT SPECIFIED,MERCED,047,,IN THE OPEN,ALL CLASSES
6560,COUNTY,047,2024-03-08 15:00:00.000,SURVEY,YEAR,TOTAL,,UNITED STATES,00000000,00,TOMATOES,YIELD,9000,"TOMATOES, IN THE OPEN, PROCESSING - YIELD, MEA...",TONS / ACRE,VEGETABLES,CA,51,,ANNUAL,06,44.15,,CALIFORNIA,00,SAN JOAQUIN VALLEY,PROCESSING,06,"CALIFORNIA, SAN JOAQUIN VALLEY, MERCED",CROPS,2022,,,NOT SPECIFIED,MERCED,047,,IN THE OPEN,ALL CLASSES
6561,COUNTY,047,2024-03-08 15:00:00.000,SURVEY,YEAR,TOTAL,,UNITED STATES,00000000,00,TOMATOES,PRODUCTION,9000,"TOMATOES, IN THE OPEN, PROCESSING, UTILIZED - ...",TONS,VEGETABLES,CA,51,,ANNUAL,06,1086000,,CALIFORNIA,00,SAN JOAQUIN VALLEY,"PROCESSING, UTILIZED",06,"CALIFORNIA, SAN JOAQUIN VALLEY, MERCED",CROPS,2022,,,NOT SPECIFIED,MERCED,047,,IN THE OPEN,ALL CLASSES
6562,COUNTY,047,2022-08-26 15:00:22.000,SURVEY,YEAR,TOTAL,,UNITED STATES,00000000,00,RENT,EXPENSE,9000,"RENT, CASH, CROPLAND, IRRIGATED - EXPENSE, MEA...",$ / ACRE,EXPENSES,CA,51,,ANNUAL,06,325,,CALIFORNIA,00,SAN JOAQUIN VALLEY,ALL UTILIZATION PRACTICES,06,"CALIFORNIA, SAN JOAQUIN VALLEY, MERCED",ECONOMICS,2022,7.6,,NOT SPECIFIED,MERCED,047,,IRRIGATED,"CASH, CROPLAND"


## Step 5: Test USDA Transform (Clean Data)

### Transform Step: Map API data to database format

**What this does:**
1. Maps commodity names ‚Üí commodity_code IDs (from usda_commodity table)
2. Creates Parameter records if they don't exist (YIELD, PRODUCTION, etc.)
3. Creates Unit records if they don't exist (BUSHELS, TONS, etc.)
4. Creates a single transformed DataFrame with all columns needed for both tables
5. Load step routes the data to two tables

**Output:** Single `transformed_data` DataFrame that load step uses to populate:
   - `UsdaCensusRecord` table (one per geoid+year+commodity)
   - `Observation` table (one per measurement)

In [7]:
from sqlalchemy import text
import pandas as pd
import numpy as np
from sqlmodel import Session, select
from ca_biositing.datamodels.database import engine
from ca_biositing.datamodels.schemas.generated.ca_biositing import Parameter, Unit

print("Transform Step: Mapping API data to database schema")
print("="*70)

if 'raw_data' not in locals() or len(raw_data) == 0:
    print("‚ö† No raw_data - run API extraction first")
else:
    # Print actual columns to debug
    print(f"Debug: Available columns in raw_data: {list(raw_data.columns)[:10]}...")
    
    # Define parameter/unit configurations (will be keyed by name for DB inserts)
    # Note: Keys are in CAPS for config readability, but will be lowercased when stored in DB
    PARAMETER_CONFIGS = {
        'YIELD': 'Yield per unit area',
        'PRODUCTION': 'Total production quantity',
        'AREA HARVESTED': 'Area harvested',
        'AREA PLANTED': 'Area planted',
        'PRICE RECEIVED': 'Price received by farmer',
        'PRICE PAID': 'Price paid by farmer',
    }
    
    UNIT_CONFIGS = {
        'BUSHELS': 'US bushels',
        'TONS': 'Short tons (US)',
        'ACRES': 'US acres',
        'DOLLARS': 'US dollars',
        'DOLLARS PER BUSHEL': 'US dollars per bushel',
        'DOLLARS PER TON': 'US dollars per ton',
    }
    
    # Step 1: Ensure Parameter/Unit records exist (following coworker's pattern)
    print("Step 1: Creating Parameter/Unit records if needed...")
    with Session(engine) as session:
        # Get existing parameters
        existing_params = session.exec(select(Parameter.name)).all()
        existing_param_names = set(existing_params)
        
        # Add only new parameters (lowercase names for consistency)
        params_to_add = []
        for param_name, param_desc in PARAMETER_CONFIGS.items():
            param_name_lower = param_name.lower()
            if param_name_lower not in existing_param_names:
                param = Parameter(name=param_name_lower, description=param_desc, calculated=False)
                params_to_add.append(param)
                existing_param_names.add(param_name_lower)
        
        if params_to_add:
            session.add_all(params_to_add)
            print(f"  Adding {len(params_to_add)} new parameters")
        else:
            print(f"  All {len(PARAMETER_CONFIGS)} parameters already exist")
        
        # Get existing units
        existing_units = session.exec(select(Unit.name)).all()
        existing_unit_names = set(existing_units)
        
        # Add only new units (lowercase names for consistency)
        units_to_add = []
        for unit_name, unit_desc in UNIT_CONFIGS.items():
            unit_name_lower = unit_name.lower()
            if unit_name_lower not in existing_unit_names:
                unit = Unit(name=unit_name_lower, description=unit_desc)
                units_to_add.append(unit)
                existing_unit_names.add(unit_name_lower)
        
        if units_to_add:
            session.add_all(units_to_add)
            print(f"  Adding {len(units_to_add)} new units")
        else:
            print(f"  All {len(UNIT_CONFIGS)} units already exist")
        
        # Commit only if we added anything
        if params_to_add or units_to_add:
            session.commit()
            print(f"  ‚úì Committed {len(params_to_add)} parameters, {len(units_to_add)} units")
    
    # Step 2: Map commodity names to IDs from database
    print("\nStep 2: Mapping commodity names to database IDs...")
    commodity_map = {}
    with engine.connect() as conn:
        result = conn.execute(text("SELECT id, name FROM usda_commodity"))
        for row in result:
            commodity_map[row.name.upper()] = row.id
    print(f"  Found {len(commodity_map)} commodities in database")
    
    # Step 3: Look up parameter_id and unit_id from database (by name, lowercased)
    print("\nStep 3: Looking up parameter and unit IDs...")
    parameter_id_map = {}
    unit_id_map = {}
    with engine.connect() as conn:
        # Query for lowercase parameter names
        param_names_lower = [p.lower() for p in PARAMETER_CONFIGS.keys()]
        param_result = conn.execute(text("SELECT id, name FROM parameter WHERE name IN ({})".format(
            ','.join(f"'{p}'" for p in param_names_lower)
        )))
        for row in param_result:
            parameter_id_map[row.name.upper()] = row.id
        
        # Query for lowercase unit names
        unit_names_lower = [u.lower() for u in UNIT_CONFIGS.keys()]
        unit_result = conn.execute(text("SELECT id, name FROM unit WHERE name IN ({})".format(
            ','.join(f"'{u}'" for u in unit_names_lower)
        )))
        for row in unit_result:
            unit_id_map[row.name.upper()] = row.id
    print(f"  Found {len(parameter_id_map)} parameters, {len(unit_id_map)} units")
    
    # Step 4: Create single transformed dataframe
    print("\nStep 4: Creating transformed dataframe...")
    
    transformed_data = raw_data.copy()
    
    # Map NASS API columns to our schema
    column_mapping = {
        'commodity_desc': 'commodity',
        'statisticcat_desc': 'statistic',
        'unit_desc': 'unit',
        'Value': 'observation',
        'county_name': 'county',
        'short_desc': 'description',
        'year': 'year',
        # Survey-specific fields
        'freq_desc': 'survey_period',           # ANNUAL, MONTHLY, etc.
        'reference_period_desc': 'reference_month',  # MAY, END OF DEC, etc.
        'begin_code': 'begin_code',
        'end_code': 'end_code'
    }
    
    # Rename columns that exist
    rename_dict = {k: v for k, v in column_mapping.items() if k in transformed_data.columns}
    transformed_data = transformed_data.rename(columns=rename_dict)
    
    print(f"  Renamed columns: {list(rename_dict.keys())}")
    
    # Construct 5-digit FIPS geoid from state + county codes (keep as string)
    state_fips_default = '06'  # California
    if 'state_fips_code' in transformed_data.columns and 'county_code' in transformed_data.columns:
        transformed_data['geoid'] = transformed_data['state_fips_code'].astype(str).str.zfill(2) + \
                                    transformed_data['county_code'].astype(str).str.zfill(3)
    elif 'state_alpha' in transformed_data.columns and 'county_code' in transformed_data.columns:
        state_alpha_to_fips = {'CA': '06'}
        transformed_data['geoid'] = transformed_data['state_alpha'].map(
            lambda x: state_alpha_to_fips.get(str(x).upper(), state_fips_default)
        ).astype(str) + transformed_data['county_code'].astype(str).str.zfill(3)
    elif 'county_code' in transformed_data.columns:
        transformed_data['geoid'] = state_fips_default + transformed_data['county_code'].astype(str).str.zfill(3)
    else:
        print("  ‚ö† Warning: 'county_code' not found; cannot construct geoid")
        transformed_data['geoid'] = None
    
    transformed_data['geoid'] = transformed_data['geoid'].astype(str).str.zfill(5)
    
    # Map commodity names to IDs ‚Üí RENAME TO commodity_code for database consistency
    def get_commodity_id(name):
        if pd.isna(name):
            return None
        if name.upper() in commodity_map:
            return commodity_map[name.upper()]
        for db_name, db_id in commodity_map.items():
            if name.upper() in db_name or db_name in name.upper():
                return db_id
        return None
    
    if 'commodity' in transformed_data.columns:
        transformed_data['commodity_code'] = transformed_data['commodity'].apply(get_commodity_id)
    else:
        print("  ‚ö† Warning: 'commodity' column not found")
        transformed_data['commodity_code'] = None
    
    # Map to parameter_id and unit_id from database (by name lookup)
    if 'statistic' in transformed_data.columns:
        transformed_data['parameter_id'] = transformed_data['statistic'].map(
            lambda x: parameter_id_map.get(x.upper()) if pd.notna(x) else None
        )
    
    if 'unit' in transformed_data.columns:
        transformed_data['unit_id'] = transformed_data['unit'].map(
            lambda x: unit_id_map.get(x.upper()) if pd.notna(x) else None
        )
    
    # Add metadata columns
    transformed_data['source_reference'] = 'USDA NASS QuickStats API'
    
    # Capture source type (CENSUS vs SURVEY) for routing to correct table
    if 'source_desc' in transformed_data.columns:
        transformed_data['source_type'] = transformed_data['source_desc']
        print(f"  Captured source_type: {transformed_data['source_type'].value_counts().to_dict()}")
    else:
        print("  ‚ö† Warning: source_desc not found - defaulting to CENSUS")
        transformed_data['source_type'] = 'CENSUS'
    
    # Set record_type for polymorphic relationship (table name for discriminator)
    transformed_data['record_type'] = transformed_data['source_type'].map({
        'CENSUS': 'usda_census_record',
        'SURVEY': 'usda_survey_record'
    })
    print(f"  Set record_type: {transformed_data['record_type'].value_counts().to_dict()}")
    
    # ====================================================================
    # APPLY COWORKER CLEANING FUNCTIONS (from cleaning_functions.ipynb)
    # ====================================================================
    print("\nüßπ Step 4b: Apply cleaning functions from coworker pattern...")
    
    # Import cleaning functions (functions are embedded here for notebook portability)
    # In production, these would be imported from ca_biositing.pipeline.utils.cleaning_functions
    
    def replace_empty_with_na(df, columns=None, regex=r'^\s*$'):
        """Replace empty/whitespace-only strings with NaN"""
        if columns is None:
            return df.replace(regex, np.nan, regex=True)
        df = df.copy()
        cols = [c for c in columns if c in df.columns]
        if cols:
            df[cols] = df[cols].replace(regex, np.nan, regex=True)
        return df
    
    def to_lowercase_df(df, columns=None):
        """Lowercase string columns to reduce variations (e.g., 'Corn' vs 'corn')"""
        df = df.copy()
        if columns is None:
            str_cols = df.select_dtypes(include=['object', 'string']).columns
        else:
            str_cols = [c for c in columns if c in df.columns]
        for c in str_cols:
            df[c] = df[c].astype('string').str.lower().where(df[c].notna(), df[c])
        return df
    
    # Apply cleaning: replace empty strings with NaN
    string_cols = ['commodity', 'statistic', 'unit', 'county', 'description', 'survey_period', 'reference_month']
    string_cols = [c for c in string_cols if c in transformed_data.columns]
    transformed_data = replace_empty_with_na(transformed_data, columns=string_cols)
    print(f"  ‚úì Replaced empty strings with NaN in {len(string_cols)} columns")
    
    # Apply cleaning: lowercase all string columns for consistency
    transformed_data = to_lowercase_df(transformed_data, columns=string_cols)
    print(f"  ‚úì Lowercased {len(string_cols)} string columns for consistency")
    
    # Convert observation strings (with commas/decimals) to numeric float
    if 'observation' in transformed_data.columns:
        transformed_data['value_numeric'] = transformed_data['observation'].astype(str).str.replace(',', '')
        transformed_data['value_numeric'] = pd.to_numeric(transformed_data['value_numeric'], errors='coerce')
        transformed_data['value_text'] = transformed_data['observation'].astype(str)
    
    # Handle CV% field
    if 'CV (%)' in transformed_data.columns:
        transformed_data['cv_pct'] = pd.to_numeric(transformed_data['CV (%)'], errors='coerce')
    else:
        transformed_data['cv_pct'] = None
    
    # Coerce all ID columns to integers (nullable Int64 type)
    id_columns = ['commodity_code', 'parameter_id', 'unit_id']
    for col in id_columns:
        if col in transformed_data.columns:
            transformed_data[col] = pd.to_numeric(transformed_data[col], errors='coerce').astype('Int64')
    
    # Create note field
    transformed_data['note'] = transformed_data.apply(
        lambda row: f"{row.get('statistic', 'N/A')} in {row.get('unit', 'N/A')} for {row.get('commodity', 'N/A')} in {row.get('county', 'N/A')}", 
        axis=1
    )
    
    # Keep relevant columns (load step will create record_id FK)
    final_columns = [
        # Record fields (for UsdaCensusRecord/UsdaSurveyRecord)
        'geoid', 'year', 'commodity_code', 'source_reference', 'source_type', 'record_type',
        # Survey-specific fields
        'survey_period', 'reference_month', 'begin_code', 'end_code',
        # Observation fields
        'parameter_id', 'value_numeric', 'value_text', 'cv_pct', 'unit_id', 'note',
        # Original for reference
        'commodity', 'statistic', 'unit', 'county', 'description'
    ]
    
    # Only include columns that exist
    final_columns = [col for col in final_columns if col in transformed_data.columns]
    transformed_data = transformed_data[final_columns]
    
    # Drop rows with missing required fields
    required_fields = ['geoid', 'year', 'commodity_code', 'parameter_id', 'unit_id', 'value_numeric']
    required_fields = [col for col in required_fields if col in transformed_data.columns]
    transformed_data = transformed_data.dropna(subset=required_fields)
    
    print(f"\n‚úì Transform complete!")
    print(f"  Total rows: {len(transformed_data)}")
    print(f"  Columns: {list(transformed_data.columns)}")
    
    # Show data types for ID columns
    print(f"\nData types for ID columns:")
    for col in ['commodity_code', 'parameter_id', 'unit_id', 'value_numeric']:
        if col in transformed_data.columns:
            print(f"  {col}: {transformed_data[col].dtype}")
    
    # Show survey-specific fields captured
    print(f"\nSurvey-specific fields captured:")
    for col in ['survey_period', 'reference_month']:
        if col in transformed_data.columns:
            unique_vals = transformed_data[col].dropna().unique()
            print(f"  {col}: {len(unique_vals)} unique values - {unique_vals[:5].tolist()}")
    
    print(f"\nSample record:")
    if len(transformed_data) > 0:
        sample = transformed_data.head(1).to_dict('records')[0]
        for key, val in sample.items():
            print(f"  {key}: {val} (type: {type(val).__name__})")
    else:
        print("  ‚ö† No valid records after transformation")

Transform Step: Mapping API data to database schema
Debug: Available columns in raw_data: ['agg_level_desc', 'county_code', 'load_time', 'source_desc', 'reference_period_desc', 'domain_desc', 'week_ending', 'country_name', 'watershed_code', 'begin_code']...
Step 1: Creating Parameter/Unit records if needed...
  Adding 6 new parameters
  Adding 6 new units
  ‚úì Committed 6 parameters, 6 units

Step 2: Mapping commodity names to database IDs...
  Found 4 commodities in database

Step 3: Looking up parameter and unit IDs...
  Found 6 parameters, 6 units

Step 4: Creating transformed dataframe...
  Renamed columns: ['commodity_desc', 'statisticcat_desc', 'unit_desc', 'Value', 'county_name', 'short_desc', 'year', 'freq_desc', 'reference_period_desc', 'begin_code', 'end_code']
  Captured source_type: {'CENSUS': 6496, 'SURVEY': 68}
  Set record_type: {'usda_census_record': 6496, 'usda_survey_record': 68}

üßπ Step 4b: Apply cleaning functions from coworker pattern...
  ‚úì Replaced empty st

In [31]:
# Display transformed_data in Data Wrangler
print("Preparing to display transformed_data in Data Wrangler...")
print(f"Shape: {transformed_data.shape}")
print(f"\nPreview (first 5 rows):")
print(transformed_data.head().to_string())

# The Data Wrangler will be opened with the variable below
transformed_data

Preparing to display transformed_data in Data Wrangler...
Shape: (69, 21)

Preview (first 5 rows):
     geoid  year  commodity_code          source_reference source_type         record_type survey_period reference_month begin_code end_code  parameter_id  value_numeric value_text  cv_pct  unit_id                                             note commodity       statistic   unit       county                                  description
310  06077  2022               1  USDA NASS QuickStats API      CENSUS  usda_census_record        annual            year         00       00            19        14503.0     14,503    62.8        6  area harvested in acres for corn in san joaquin      corn  area harvested  acres  san joaquin                corn, grain - acres harvested
318  06077  2022               1  USDA NASS QuickStats API      CENSUS  usda_census_record        annual            year         00       00            19        51836.0     51,836    22.0        6  area harvested in acres fo

Unnamed: 0,geoid,year,commodity_code,source_reference,source_type,record_type,survey_period,reference_month,begin_code,end_code,parameter_id,value_numeric,value_text,cv_pct,unit_id,note,commodity,statistic,unit,county,description
310,06077,2022,1,USDA NASS QuickStats API,CENSUS,usda_census_record,annual,year,00,00,19,14503.0,14503,62.8,6,area harvested in acres for corn in san joaquin,corn,area harvested,acres,san joaquin,"corn, grain - acres harvested"
318,06077,2022,1,USDA NASS QuickStats API,CENSUS,usda_census_record,annual,year,00,00,19,51836.0,51836,22.0,6,area harvested in acres for corn in san joaquin,corn,area harvested,acres,san joaquin,"corn, silage - acres harvested"
326,06077,2022,1,USDA NASS QuickStats API,CENSUS,usda_census_record,annual,year,00,00,18,1345187.0,1345187,21.1,5,production in tons for corn in san joaquin,corn,production,tons,san joaquin,"corn, silage - production, measured in tons"
327,06077,2022,1,USDA NASS QuickStats API,CENSUS,usda_census_record,annual,year,00,00,19,14503.0,14503,63.5,6,area harvested in acres for corn in san joaquin,corn,area harvested,acres,san joaquin,"corn, grain, irrigated - acres harvested"
329,06077,2022,1,USDA NASS QuickStats API,CENSUS,usda_census_record,annual,year,00,00,19,51644.0,51644,21.8,6,area harvested in acres for corn in san joaquin,corn,area harvested,acres,san joaquin,"corn, silage, irrigated - acres harvested"
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
6525,06047,2022,1,USDA NASS QuickStats API,SURVEY,usda_survey_record,annual,year,00,00,19,63200.0,63200,11.2,6,area harvested in acres for corn in merced,corn,area harvested,acres,merced,"corn, silage - acres harvested"
6526,06047,2022,1,USDA NASS QuickStats API,SURVEY,usda_survey_record,annual,year,00,00,18,1612000.0,1612000,12.9,5,production in tons for corn in merced,corn,production,tons,merced,"corn, silage - production, measured in tons"
6558,06047,2022,4,USDA NASS QuickStats API,SURVEY,usda_survey_record,annual,year,00,00,19,24600.0,24600,,6,area harvested in acres for tomatoes in merced,tomatoes,area harvested,acres,merced,"tomatoes, in the open, processing - acres harv..."
6559,06047,2022,4,USDA NASS QuickStats API,SURVEY,usda_survey_record,annual,year,00,00,20,24700.0,24700,,6,area planted in acres for tomatoes in merced,tomatoes,area planted,acres,merced,"tomatoes, in the open, processing - acres planted"


In [9]:

print("="*80)
print("üîç DIAGNOSTIC: Data Reduction Breakdown")
print("="*80)

if 'raw_data' in locals() and 'transformed_data' in locals():
    print(f"\nStarting point:")
    print(f"  raw_data rows: {len(raw_data)}")
    
    # Check commodity mapping
    commodity_valid = transformed_data['commodity_code'].notna().sum()
    commodity_invalid = transformed_data['commodity_code'].isna().sum()
    print(f"\n1Ô∏è‚É£  After commodity mapping:")
    print(f"  Valid commodity_code: {commodity_valid} ({100*commodity_valid/len(transformed_data):.1f}%)")
    print(f"  Invalid/NULL commodity_code: {commodity_invalid} ({100*commodity_invalid/len(transformed_data):.1f}%)")
    
    # Check parameter mapping
    parameter_valid = transformed_data['parameter_id'].notna().sum()
    parameter_invalid = transformed_data['parameter_id'].isna().sum()
    print(f"\n2Ô∏è‚É£  After parameter mapping:")
    print(f"  Valid parameter_id: {parameter_valid} ({100*parameter_valid/len(transformed_data):.1f}%)")
    print(f"  Invalid/NULL parameter_id: {parameter_invalid} ({100*parameter_invalid/len(transformed_data):.1f}%)")
    
    # Check unit mapping
    unit_valid = transformed_data['unit_id'].notna().sum()
    unit_invalid = transformed_data['unit_id'].isna().sum()
    print(f"\n3Ô∏è‚É£  After unit mapping:")
    print(f"  Valid unit_id: {unit_valid} ({100*unit_valid/len(transformed_data):.1f}%)")
    print(f"  Invalid/NULL unit_id: {unit_invalid} ({100*unit_invalid/len(transformed_data):.1f}%)")
    
    # Check value conversion
    value_valid = transformed_data['value_numeric'].notna().sum()
    value_invalid = transformed_data['value_numeric'].isna().sum()
    print(f"\n4Ô∏è‚É£  After value_numeric conversion:")
    print(f"  Valid value_numeric: {value_valid} ({100*value_valid/len(transformed_data):.1f}%)")
    print(f"  Invalid/NULL value_numeric: {value_invalid} ({100*value_invalid/len(transformed_data):.1f}%)")
    
    # Check all required fields
    required_fields = ['geoid', 'year', 'commodity_code', 'parameter_id', 'unit_id', 'value_numeric']
    all_valid = transformed_data.dropna(subset=required_fields)
    print(f"\n5Ô∏è‚É£  After dropna (ALL required fields non-NULL):")
    print(f"  Records with all fields: {len(all_valid)} ({100*len(all_valid)/len(transformed_data):.1f}%)")
    print(f"  Records dropped: {len(transformed_data) - len(all_valid)}")
    
    # Show which single filter is most aggressive
    print(f"\nüìä Most aggressive single filters:")
    print(f"  Commodity mapping loses: {commodity_invalid} records ({100*commodity_invalid/len(raw_data):.1f}% of original)")
    print(f"  Parameter mapping loses: {parameter_invalid} records ({100*parameter_invalid/len(raw_data):.1f}% of original)")
    print(f"  Unit mapping loses: {unit_invalid} records ({100*unit_invalid/len(raw_data):.1f}% of original)")
    print(f"  Value conversion loses: {value_invalid} records ({100*value_invalid/len(raw_data):.1f}% of original)")
    
    # Show some examples of dropped records
    print(f"\nüìã Sample of DROPPED records (missing required fields):")
    dropped = transformed_data[transformed_data[required_fields].isna().any(axis=1)]
    print(f"  Total dropped: {len(dropped)}")
    if len(dropped) > 0:
        print(f"\n  Reasons for dropping (top 5):")
        reasons = []
        for _, row in dropped.head(5).iterrows():
            missing = []
            if pd.isna(row['commodity_code']):
                missing.append("commodity_code")
            if pd.isna(row['parameter_id']):
                missing.append("parameter_id")
            if pd.isna(row['unit_id']):
                missing.append("unit_id")
            if pd.isna(row['value_numeric']):
                missing.append("value_numeric")
            reasons.append(missing)
            print(f"    - {row.get('commodity', 'N/A')} / {row.get('statistic', 'N/A')} / {row.get('unit', 'N/A')}: Missing {', '.join(missing)}")
    
    print(f"\n{'='*80}")
else:
    print("‚ö†Ô∏è  raw_data or transformed_data not available - run Extract and Transform first")


üîç DIAGNOSTIC: Data Reduction Breakdown

Starting point:
  raw_data rows: 6564

1Ô∏è‚É£  After commodity mapping:
  Valid commodity_code: 69 (100.0%)
  Invalid/NULL commodity_code: 0 (0.0%)

2Ô∏è‚É£  After parameter mapping:
  Valid parameter_id: 69 (100.0%)
  Invalid/NULL parameter_id: 0 (0.0%)

3Ô∏è‚É£  After unit mapping:
  Valid unit_id: 69 (100.0%)
  Invalid/NULL unit_id: 0 (0.0%)

4Ô∏è‚É£  After value_numeric conversion:
  Valid value_numeric: 69 (100.0%)
  Invalid/NULL value_numeric: 0 (0.0%)

5Ô∏è‚É£  After dropna (ALL required fields non-NULL):
  Records with all fields: 69 (100.0%)
  Records dropped: 0

üìä Most aggressive single filters:
  Commodity mapping loses: 0 records (0.0% of original)
  Parameter mapping loses: 0 records (0.0% of original)
  Unit mapping loses: 0 records (0.0% of original)
  Value conversion loses: 0 records (0.0% of original)

üìã Sample of DROPPED records (missing required fields):
  Total dropped: 0



In [10]:
# Check actual data types from NASS API for Value column
print("Investigating NASS API data types:")
print("="*60)

# 1) Raw Value dtype and sample values
print("\n1. Raw 'Value' column dtype and samples:")
try:
    raw_dtype = raw_data['Value'].dtype
    print(f"  Raw dtype: {raw_dtype}")
    print(f"  Sample values (first 10):")
    for idx, val in enumerate(raw_data['Value'].head(10)):
        print(f"    [{idx}] {repr(val)} (type: {type(val).__name__})")
except Exception as e:
    print(f"  ‚ö† Unable to inspect raw_data['Value']: {e}")

# 2) String formatting patterns: commas, decimals, whitespace
print("\n2. String formatting patterns in 'Value':")
try:
    value_str = raw_data['Value'].astype(str)
    has_commas = value_str.str.contains(',').sum()
    has_decimals = value_str.str.contains(r'\.').sum()
    has_whitespace = value_str.str.contains(r'\s').sum()
    total = len(value_str)
    print(f"  With commas: {has_commas}/{total}")
    print(f"  With decimal point: {has_decimals}/{total}")
    print(f"  With whitespace: {has_whitespace}/{total}")
except Exception as e:
    print(f"  ‚ö† Unable to analyze string patterns: {e}")

# 3) Coerce to numeric: remove commas, convert to float
print("\n3. Coercion to numeric float (remove commas, handle decimals):")
try:
    value_num = pd.to_numeric(value_str.str.replace(',', ''), errors='coerce')
    non_null = value_num.notna().sum()
    nulls = value_num.isna().sum()
    pct_numeric = round(100 * non_null / (non_null + nulls), 2) if (non_null + nulls) > 0 else 0.0
    print(f"  Converted dtype: {value_num.dtype}")
    print(f"  Numeric rows: {non_null}, Non-numeric (NaN): {nulls}, % numeric: {pct_numeric}%")
    if non_null > 0:
        print(f"  Range: min={value_num.min()}, max={value_num.max()}")
    # Show a few rows that failed conversion, if any
    if nulls > 0:
        failed_samples = value_str[value_num.isna()].head(5).tolist()
        print(f"  Samples that failed conversion: {failed_samples}")
except Exception as e:
    print(f"  ‚ö† Unable to convert 'Value' to numeric: {e}")


Investigating NASS API data types:

1. Raw 'Value' column dtype and samples:
  Raw dtype: object
  Sample values (first 10):
    [0] '910,695,000' (type: str)
    [1] '560' (type: str)
    [2] '                 (D)' (type: str)
    [3] '2' (type: str)
    [4] '                 (D)' (type: str)
    [5] '1' (type: str)
    [6] '                 (D)' (type: str)
    [7] '1' (type: str)
    [8] '                 (D)' (type: str)
    [9] '1' (type: str)

2. String formatting patterns in 'Value':
  With commas: 2043/6564
  With decimal point: 30/6564
  With whitespace: 785/6564

3. Coercion to numeric float (remove commas, handle decimals):
  Converted dtype: float64
  Numeric rows: 5779, Non-numeric (NaN): 785, % numeric: 88.04%
  Range: min=-999000.0, max=17806949000.0
  Samples that failed conversion: ['                 (D)', '                 (D)', '                 (D)', '                 (D)', '                 (D)']


## Step 6: Test USDA Load (Insert to Database)

### Reset database to test load code (optional)


In [None]:
print("="*80)
print("‚ö†Ô∏è  CLEANUP: Delete USDA data from database (for fresh testing)")
print("="*80)

from sqlalchemy import text

# First, check how many USDA observations exist
with engine.connect() as conn:
    usda_obs_count = conn.execute(text("""
        SELECT COUNT(*) FROM observation 
        WHERE record_type IN ('usda_census_record', 'usda_survey_record')
    """)).scalar()
    
    census_count = conn.execute(text("SELECT COUNT(*) FROM usda_census_record")).scalar()
    survey_count = conn.execute(text("SELECT COUNT(*) FROM usda_survey_record")).scalar()

print(f"\nüìä USDA data in database:")
print(f"  - USDA observations (record_type='usda_census_record'/'usda_survey_record'): {usda_obs_count}")
print(f"  - Census records: {census_count}")
print(f"  - Survey records: {survey_count}")

if usda_obs_count == 0 and census_count == 0 and survey_count == 0:
    print("\n‚úì No USDA data to delete - database is already clean")
else:
    confirm = input(f"\n‚ö†Ô∏è  Delete the {usda_obs_count} USDA observations, {census_count} census records, and {survey_count} survey records?\nType 'YES' to confirm, anything else to cancel: ").strip()
    
    if confirm == 'YES':
        with engine.begin() as conn:
            print("\nüóëÔ∏è  Truncating observations linked to USDA records...")
            # Delete observations that reference USDA records (census or survey)
            result = conn.execute(text("""
                DELETE FROM observation 
                WHERE record_id IN (
                    SELECT id FROM usda_census_record
                    UNION
                    SELECT id FROM usda_survey_record
                )
            """))
            print(f"  ‚úì Deleted {result.rowcount} observations")
            
            print("üóëÔ∏è  Truncating survey records (resets auto-increment)...")
            conn.execute(text("TRUNCATE TABLE usda_survey_record CASCADE"))
            print(f"  ‚úì Truncated usda_survey_record and reset sequence")
            
            print("üóëÔ∏è  Truncating census records (resets auto-increment)...")
            conn.execute(text("TRUNCATE TABLE usda_census_record CASCADE"))
            print(f"  ‚úì Truncated usda_census_record and reset sequence")
        
        print("\n‚úÖ CLEANUP COMPLETE - USDA data removed, ID sequences reset, other data preserved")
    else:
        print("\n‚ùå Cleanup cancelled")


‚ö†Ô∏è  CLEANUP: Delete USDA data from database (for fresh testing)

üìä USDA data in database:
  - USDA observations (record_type='usda_census_record'/'usda_survey_record'): 0
  - Census records: 0
  - Survey records: 0

‚úì No USDA data to delete - database is already clean


### Load to database

In [21]:
# ============================================================================
# STEP 0: Create/Link DataSource and USDA datasets
# ============================================================================
print("\nüîç STEP 0: Create DataSource (USDA NASS API) and datasets...")

from ca_biositing.datamodels.schemas.generated.ca_biositing import (
    DataSource,
    Dataset,
)
from sqlalchemy import insert

now = datetime.now(timezone.utc)

# First, ensure DataSource record exists for "USDA NASS API"
usda_data_source_id = None
with engine.connect() as conn:
    result = conn.execute(text("SELECT id FROM data_source WHERE name = 'USDA NASS API'"))
    row = result.fetchone()
    if row:
        usda_data_source_id = row[0]
        print(f"  ‚úì DataSource 'USDA NASS API' already exists (id={usda_data_source_id})")
    else:
        # Create DataSource record
        datasource_record = {
            'name': 'USDA NASS API',
            'description': 'United States Department of Agriculture National Agricultural Statistics Service QuickStats API',
            'biocirv': False,  # External data
            'created_at': now,
            'updated_at': now
        }
        datasource_table = DataSource.__table__
        with engine.begin() as conn2:
            result = conn2.execute(insert(datasource_table), [datasource_record])
        print(f"  ‚úÖ Created DataSource: 'USDA NASS API'")
        
        # Query to get the ID we just created
        with engine.connect() as conn2:
            result = conn2.execute(text("SELECT id FROM data_source WHERE name = 'USDA NASS API'"))
            usda_data_source_id = result.fetchone()[0]
            print(f"     (id={usda_data_source_id})")

# Now create USDA datasets, each referencing this data source
print("\nüîç STEP 0b: Query available datasets and create missing ones...")

# Determine which years we have in the transformed data
years_in_data = sorted(transformed_data['year'].unique())
print(f"  Years in API data: {years_in_data}")

# Get existing datasets
existing_datasets = {}
with engine.connect() as conn:
    result = conn.execute(text("SELECT id, name FROM dataset WHERE name LIKE 'USDA_%'"))
    for dataset_id, name in result:
        existing_datasets[name] = dataset_id

print(f"  Existing USDA datasets: {len(existing_datasets)}")

# Create missing datasets
new_datasets = []
for year in sorted(years_in_data):
    for source_type in ['CENSUS', 'SURVEY']:
        dataset_name = f"USDA_{source_type}_{year}"
        
        if dataset_name not in existing_datasets:
            new_datasets.append({
                'name': dataset_name,
                'description': f"USDA {source_type} data for {year}",
                'record_type': f"usda_{source_type.lower()}_record",
                'source_id': usda_data_source_id,
                'start_date': f"{year}-01-01",
                'end_date': f"{year}-12-31",
                'created_at': now,
                'updated_at': now
            })
        else:
            print(f"  ‚úì {dataset_name} already exists (id={existing_datasets[dataset_name]})")

if new_datasets:
    dataset_table = Dataset.__table__
    with engine.begin() as conn:
        result = conn.execute(insert(dataset_table), new_datasets)
        print(f"  ‚úÖ Created {result.rowcount} new datasets:")
        for ds in new_datasets:
            print(f"     - {ds['name']}")

# ============================================================================
# STEP 0c: Query available datasets (by year and source type)
# ============================================================================
print("\nüîç STEP 0c: Build dataset_id mapping...")
dataset_map = {}  # (year, source_type) -> dataset_id
with engine.connect() as conn:
    result = conn.execute(text("""
        SELECT id, name FROM dataset 
        WHERE name LIKE 'USDA_%'
        ORDER BY id
    """))
    for row in result:
        dataset_id, name = row
        try:
            year = int(name.split('_')[-1])
            source_type = 'CENSUS' if 'CENSUS' in name else 'SURVEY'
            key = (year, source_type)
            dataset_map[key] = dataset_id
            print(f"  Mapped: {name} (id={dataset_id}) ‚Üí year={year}, source={source_type}")
        except (ValueError, IndexError):
            pass

if not dataset_map:
    print("  ‚ö†Ô∏è  WARNING: No USDA datasets found in database!")
else:
    print(f"  ‚úì Total USDA datasets: {len(dataset_map)}")


üîç STEP 0: Create DataSource (USDA NASS API) and datasets...
  ‚úì DataSource 'USDA NASS API' already exists (id=1)

üîç STEP 0b: Query available datasets and create missing ones...
  Years in API data: [np.int64(2022)]
  Existing USDA datasets: 0
  ‚úÖ Created 2 new datasets:
     - USDA_CENSUS_2022
     - USDA_SURVEY_2022

üîç STEP 0c: Build dataset_id mapping...
  Mapped: USDA_CENSUS_2022 (id=2) ‚Üí year=2022, source=CENSUS
  Mapped: USDA_SURVEY_2022 (id=3) ‚Üí year=2022, source=SURVEY
  ‚úì Total USDA datasets: 2


In [None]:
# ============================================================================
# LOAD PIPELINE: STEPS 1-6
# ============================================================================
# 
# OVERALL PHILOSOPHY:
# The load process follows a hierarchical, idempotent pattern:
#   1. Parent records (census/survey) link to datasets for lineage
#   2. Observations reference parent records via (record_id, record_type) composite key
#   3. All steps deduplicate at multiple levels to ensure idempotency
#   4. Each step is atomic (all-or-nothing transactions)
#
# DEDUPLICATION STRATEGY (3-level defense):
#   - Level 1: Existing DB check - Skip if key already exists in database
#   - Level 2: Batch dedup - Skip if key seen earlier in same load batch
#   - Level 3: Final insert - PostgreSQL ON CONFLICT for observations only
#     (handles edge case where transformed_data has internal duplicate rows)
#
# DATASET LINKAGE:
#   - All records MUST reference a dataset (created in STEP 0)
#   - Dataset provides: (source_id, start_date, end_date, record_type)
#   - Lookup uses (year, source_type) as key into dataset_map
#   - Null dataset_id is a data quality issue that blocks insights
#
# ============================================================================

# ============================================================================
# STEP 1: Load Census Records (Create Parent Fact Records)
# ============================================================================
# 
# PURPOSE:
#   Populate UsdaCensusRecord table with one row per unique
#   (geoid, year, commodity_code) combination from API data.
#   These are the PRIMARY parent records for observations.
#
# KEY DECISIONS:
#   - One census record per county-year-commodity (not per measurement)
#   - record_id auto-generated by database (identity column)
#   - dataset_id links to USDA_CENSUS/SURVEY_YYYY dataset created in STEP 0
#   - Timestamps audit when record was loaded
#
# IDEMPOTENCY:
#   - If run multiple times, no duplicate (geoid, year, commodity_code) tuples
#   - Achieved via Level 1 & Level 2 deduplication
#   - Safe to re-run without manual cleanup
#
print("\nüîç STEP 1: Load Census Records (Parent Fact Records)...")

from ca_biositing.datamodels.schemas.generated.ca_biositing import UsdaCensusRecord

# Level 1 Dedup: Query existing census records to build exclusion set
existing_census_keys = set()
with engine.connect() as conn:
    result = conn.execute(text("""
        SELECT geoid, year, commodity_code FROM usda_census_record
    """))
    for row in result:
        existing_census_keys.add((row[0], row[1], row[2]))

print(f"  Existing census records in DB: {len(existing_census_keys)}")

# Build new census records with Level 1 + Level 2 deduplication
new_census = []
seen_keys = set()  # Level 2: Track keys seen in this batch
for _, row in transformed_data[transformed_data['source_type'] == 'CENSUS'].iterrows():
    geoid = str(row['geoid']).zfill(5)
    year = int(row['year'])
    commodity_code = int(row['commodity_code']) if pd.notna(row['commodity_code']) else None
    
    if not commodity_code:
        continue  # Skip rows without commodity mapping
    
    key = (geoid, year, commodity_code)
    if key in existing_census_keys or key in seen_keys:
        continue  # Skip: exists in DB or already queued in this batch
    
    seen_keys.add(key)
    
    # Look up which dataset this record belongs to
    dataset_id = dataset_map.get((year, 'CENSUS'))
    if not dataset_id:
        print(f"    ‚ö†Ô∏è  Warning: No USDA_CENSUS_{year} dataset found - skipping record")
        continue
    
    new_census.append({
        'geoid': geoid,
        'year': year,
        'commodity_code': commodity_code,
        'source_reference': row.get('source_reference'),
        'dataset_id': dataset_id,  # Critical: Links to dataset for lineage
        'created_at': now,
        'updated_at': now
    })

if new_census:
    census_table = UsdaCensusRecord.__table__
    with engine.begin() as conn:
        result = conn.execute(insert(census_table), new_census)
        inserted_census = result.rowcount
    print(f"  ‚úÖ Inserted {inserted_census} census records")
    print(f"     Batch had {len(new_census)} unique keys, inserted {inserted_census}")
else:
    inserted_census = 0
    print(f"  ‚ÑπÔ∏è  No new census records to insert (all deduplicated)")

# ============================================================================
# STEP 2: Load Survey Records (Create Secondary Parent Fact Records)
# ============================================================================
#
# PURPOSE:
#   Populate UsdaSurveyRecord table with one row per unique
#   (geoid, year, commodity_code) from API survey data.
#   These are ALTERNATE parent records (polymorphic) for observations.
#
# KEY DIFFERENCES FROM CENSUS:
#   - Includes survey-specific fields: survey_period, reference_month, begin_code, end_code
#   - Links to USDA_SURVEY_YYYY dataset (different from USDA_CENSUS_YYYY)
#   - record_type = 'usda_survey_record' (vs 'usda_census_record' for census)
#   - Allows same commodity to have both census AND survey measurements
#
# POLYMORPHIC PATTERN:
#   - Observation.record_type discriminates which parent table to join to
#   - Observation can reference census_record #5 OR survey_record #5
#   - (record_id=5, record_type='usda_census_record') != (record_id=5, record_type='usda_survey_record')
#
print("\nüîç STEP 2: Load Survey Records (Secondary Parent Fact Records)...")

from ca_biositing.datamodels.schemas.generated.ca_biositing import UsdaSurveyRecord

# Level 1 Dedup: Query existing survey records
existing_survey_keys = set()
with engine.connect() as conn:
    result = conn.execute(text("""
        SELECT geoid, year, commodity_code FROM usda_survey_record
    """))
    for row in result:
        existing_survey_keys.add((row[0], row[1], row[2]))

print(f"  Existing survey records in DB: {len(existing_survey_keys)}")

# Build new survey records with Level 1 + Level 2 deduplication
new_survey = []
seen_keys = set()
for _, row in transformed_data[transformed_data['source_type'] == 'SURVEY'].iterrows():
    geoid = str(row['geoid']).zfill(5)
    year = int(row['year'])
    commodity_code = int(row['commodity_code']) if pd.notna(row['commodity_code']) else None
    
    if not commodity_code:
        continue
    
    key = (geoid, year, commodity_code)
    if key in existing_survey_keys or key in seen_keys:
        continue
    
    seen_keys.add(key)
    
    # Look up dataset for this survey record
    dataset_id = dataset_map.get((year, 'SURVEY'))
    if not dataset_id:
        print(f"    ‚ö†Ô∏è  Warning: No USDA_SURVEY_{year} dataset found - skipping record")
        continue
    
    new_survey.append({
        'geoid': geoid,
        'year': year,
        'commodity_code': commodity_code,
        'source_reference': row.get('source_reference'),
        # Survey-specific fields (null if not present)
        'survey_period': row.get('survey_period') if pd.notna(row.get('survey_period')) else None,
        'reference_month': row.get('reference_month') if pd.notna(row.get('reference_month')) else None,
        'begin_code': row.get('begin_code') if pd.notna(row.get('begin_code')) else None,
        'end_code': row.get('end_code') if pd.notna(row.get('end_code')) else None,
        'dataset_id': dataset_id,
        'created_at': now,
        'updated_at': now
    })

if new_survey:
    survey_table = UsdaSurveyRecord.__table__
    with engine.begin() as conn:
        result = conn.execute(insert(survey_table), new_survey)
        inserted_survey = result.rowcount
    print(f"  ‚úÖ Inserted {inserted_survey} survey records")
else:
    inserted_survey = 0
    print(f"  ‚ÑπÔ∏è  No new survey records to insert (all deduplicated)")

# ============================================================================
# STEP 3: Load Observations (Create Measurement Child Records)
# ============================================================================
#
# PURPOSE:
#   Populate Observation table with actual measurements. Each observation
#   REFERENCES a census or survey record via (record_id, record_type) pair.
#   One parent record can have MANY observations (e.g., yield, production, price).
#
# KEY RELATIONSHIPS:
#   - observation.record_id -> usda_census_record.id OR usda_survey_record.id
#   - observation.record_type = 'usda_census_record' OR 'usda_survey_record'
#   - Together, (record_id, record_type) uniquely identify the parent
#   - One observation per unique (record_id, record_type, parameter_id, unit_id)
#
# DEDUPLICATION CRITICAL:
#   - Level 1: Skip if (record_id, record_type, param_id, unit_id) in DB
#   - Level 2: Skip if seen earlier in batch
#   - Level 3: PostgreSQL ON CONFLICT DO NOTHING for final insert
#     (catches edge case: same (geoid, year, commodity, param, unit) twice in raw API data)
#
print("\nüîç STEP 3: Load Observations (Measurement Child Records)...")

# First, build a map: (geoid, year, commodity, source_type) -> record_id
# This allows us to look up which parent record each observation belongs to
record_id_map = {}
with engine.connect() as conn:
    # Map from census records
    result = conn.execute(text("""
        SELECT id, geoid, year, commodity_code FROM usda_census_record
    """))
    for record_id, geoid, year, commodity_code in result:
        record_id_map[(geoid, year, commodity_code, 'CENSUS')] = record_id
    
    # Map from survey records
    result = conn.execute(text("""
        SELECT id, geoid, year, commodity_code FROM usda_survey_record
    """))
    for record_id, geoid, year, commodity_code in result:
        record_id_map[(geoid, year, commodity_code, 'SURVEY')] = record_id

print(f"  Built parent record lookup map: {len(record_id_map)} entries")

# Level 1 Dedup: Query existing observations (using unique constraint key)
existing_obs_keys = set()
with engine.connect() as conn:
    result = conn.execute(text("""
        SELECT record_id, record_type, parameter_id, unit_id FROM observation
    """))
    for row in result:
        existing_obs_keys.add((row[0], row[1], row[2], row[3]))

print(f"  Existing observations in DB: {len(existing_obs_keys)}")

# Build observation records with all 3 dedup levels
obs_records = []
obs_skipped_no_parent = 0
obs_skipped_exists = 0
obs_skipped_dupe_in_load = 0
seen_obs_keys = set()  # Level 2

debug_count = 0  # Debug: Track first few rows
for _, row in transformed_data.iterrows():
    # Extract and validate required fields
    geoid = str(row['geoid']).zfill(5)
    year = int(row['year'])
    commodity_code = int(row['commodity_code']) if pd.notna(row['commodity_code']) else None
    parameter_id = int(row['parameter_id']) if pd.notna(row['parameter_id']) else None
    unit_id = int(row['unit_id']) if pd.notna(row['unit_id']) else None
    value_numeric = float(row['value_numeric']) if pd.notna(row['value_numeric']) else None
    
    # DEBUG: Print first 3 rows
    if debug_count < 3:
        print(f"  DEBUG row {debug_count}: value_numeric={value_numeric}, row['value_numeric']={row['value_numeric']}, pd.notna={pd.notna(row['value_numeric'])}")
        debug_count += 1
    
    # Skip if any required field is null
    if not all([commodity_code, parameter_id, unit_id, value_numeric]):
        continue
    
    # Find the parent record (census or survey) this observation belongs to
    source_type = 'CENSUS' if row['source_type'] == 'CENSUS' else 'SURVEY'
    record_key = (geoid, year, commodity_code, source_type)
    parent_record_id = record_id_map.get(record_key)
    
    if not parent_record_id:
        obs_skipped_no_parent += 1
        continue  # Parent not found - may have been filtered out
    
    # Level 1 Dedup: Check if observation already in DB
    obs_key = (parent_record_id, row['record_type'], parameter_id, unit_id)
    if obs_key in existing_obs_keys:
        obs_skipped_exists += 1
        continue
    
    # Level 2 Dedup: Check if already queued in this batch
    if obs_key in seen_obs_keys:
        obs_skipped_dupe_in_load += 1
        continue
    
    seen_obs_keys.add(obs_key)
    
    # Get dataset_id from parent's dataset (for lineage tracking)
    dataset_id = dataset_map.get((year, source_type))
    
    obs_records.append({
        'record_id': parent_record_id,
        'record_type': row['record_type'],  # 'usda_census_record' or 'usda_survey_record'
        'parameter_id': parameter_id,
        'unit_id': unit_id,
        'value': value_numeric,
        'dataset_id': dataset_id,
        'created_at': now,
        'updated_at': now
    })

print(f"  Built observation batch:")
print(f"    New observations to insert: {len(obs_records)}")
print(f"    Skipped (no parent record): {obs_skipped_no_parent}")
print(f"    Skipped (already exists in DB): {obs_skipped_exists}")
print(f"    Skipped (duplicate in load batch): {obs_skipped_dupe_in_load}")

# Level 3 Dedup: Use PostgreSQL ON CONFLICT DO NOTHING
# This handles the edge case where raw_data has the same key twice
if obs_records:
    from sqlalchemy.dialects.postgresql import insert as pg_insert
    observation_table = Observation.__table__
    with engine.begin() as conn:
        stmt = pg_insert(observation_table).values(obs_records).on_conflict_do_nothing(
            index_elements=['record_id', 'record_type', 'parameter_id', 'unit_id']
        )
        result = conn.execute(stmt)
        obs_inserted = result.rowcount
    print(f"  ‚úÖ Inserted {obs_inserted} observations (Level 3 dedup may have skipped some)")
else:
    obs_inserted = 0
    print(f"  ‚ÑπÔ∏è  No new observations to insert")

# ============================================================================
# STEP 4: Verify Foreign Key Relationships (Referential Integrity Check)
# ============================================================================
#
# PURPOSE:
#   Check that all observations reference valid parent records (census or survey).
#   Also verify all parent records link to valid datasets.
#   
# WHAT THIS CHECKS:
#   - Census records: Do they all link to a valid dataset?
#   - Survey records: Do they all link to a valid dataset?
#   - Observations: Do they all have valid (record_id, record_type) pairs?
#
print("\nüîç STEP 4: Verify Foreign Key Relationships (Referential Integrity)...")

with engine.connect() as conn:
    # Check: Census records linking to valid datasets
    census_no_dataset = conn.execute(text("""
        SELECT COUNT(*) FROM usda_census_record WHERE dataset_id IS NULL
    """)).scalar()
    
    # Check: Survey records linking to valid datasets
    survey_no_dataset = conn.execute(text("""
        SELECT COUNT(*) FROM usda_survey_record WHERE dataset_id IS NULL
    """)).scalar()
    
    # Check: Observations that reference census records
    census_obs_orphans = conn.execute(text("""
        SELECT COUNT(*) FROM observation o
        WHERE o.record_type = 'usda_census_record'
        AND NOT EXISTS (
            SELECT 1 FROM usda_census_record c WHERE c.id = o.record_id::integer
        )
    """)).scalar()
    
    # Check: Observations that reference survey records
    survey_obs_orphans = conn.execute(text("""
        SELECT COUNT(*) FROM observation o
        WHERE o.record_type = 'usda_survey_record'
        AND NOT EXISTS (
            SELECT 1 FROM usda_survey_record s WHERE s.id = o.record_id::integer
        )
    """)).scalar()

print(f"  Census records without dataset_id: {census_no_dataset}")
print(f"  Survey records without dataset_id: {survey_no_dataset}")
print(f"  Orphan observations (referencing missing census): {census_obs_orphans}")
print(f"  Orphan observations (referencing missing survey): {survey_obs_orphans}")

all_valid = (census_no_dataset == 0 and survey_no_dataset == 0 and 
             census_obs_orphans == 0 and survey_obs_orphans == 0)

if all_valid:
    print(f"  ‚úÖ All relationships valid!")
else:
    print(f"  ‚ö†Ô∏è  Referential integrity issues detected - investigation needed")

# ============================================================================
# STEP 5: Verify Non-NULL dataset_id (Lineage Completeness)
# ============================================================================
#
# PURPOSE:
#   Ensure ALL records have dataset linkage. This is critical because
#   dataset_id enables filtering by source and time period.
#   NULL dataset_id = "where did this data come from?" unanswerable
#
print("\nüîç STEP 5: Verify Lineage Completeness (Non-NULL dataset_id)...")

with engine.connect() as conn:
    # Check dataset linkage by type
    census_with_dataset = conn.execute(text("""
        SELECT COUNT(*) FROM usda_census_record WHERE dataset_id IS NOT NULL
    """)).scalar()
    census_total = conn.execute(text("""
        SELECT COUNT(*) FROM usda_census_record
    """)).scalar()
    
    survey_with_dataset = conn.execute(text("""
        SELECT COUNT(*) FROM usda_survey_record WHERE dataset_id IS NOT NULL
    """)).scalar()
    survey_total = conn.execute(text("""
        SELECT COUNT(*) FROM usda_survey_record
    """)).scalar()
    
    obs_with_dataset = conn.execute(text("""
        SELECT COUNT(*) FROM observation 
        WHERE record_type IN ('usda_census_record', 'usda_survey_record')
        AND dataset_id IS NOT NULL
    """)).scalar()
    obs_total = conn.execute(text("""
        SELECT COUNT(*) FROM observation 
        WHERE record_type IN ('usda_census_record', 'usda_survey_record')
    """)).scalar()

print(f"  Census records: {census_with_dataset}/{census_total} have dataset_id")
print(f"  Survey records: {survey_with_dataset}/{survey_total} have dataset_id")
print(f"  Observations:   {obs_with_dataset}/{obs_total} have dataset_id")

census_pct = round(100 * census_with_dataset / census_total, 1) if census_total > 0 else 0
survey_pct = round(100 * survey_with_dataset / survey_total, 1) if survey_total > 0 else 0
obs_pct = round(100 * obs_with_dataset / obs_total, 1) if obs_total > 0 else 0

if census_pct == 100 and survey_pct == 100 and obs_pct == 100:
    print(f"  ‚úÖ LINEAGE COMPLETE - All records linked to datasets ({census_pct}%, {survey_pct}%, {obs_pct}%)")
else:
    print(f"  ‚ö†Ô∏è  LINEAGE INCOMPLETE - {census_pct}% census, {survey_pct}% survey, {obs_pct}% observations")

# ============================================================================
# STEP 6: Generate Load Summary Report
# ============================================================================
#
# PURPOSE:
#   Provide visibility into what was loaded, what was skipped, and final counts.
#   This helps verify the load behaved as expected.
#
print("\nüîç STEP 6: Load Summary Report...")
print("\n" + "="*80)
print("LOAD SUMMARY")
print("="*80)

with engine.connect() as conn:
    total_census = conn.execute(text("SELECT COUNT(*) FROM usda_census_record")).scalar()
    total_survey = conn.execute(text("SELECT COUNT(*) FROM usda_survey_record")).scalar()
    total_obs = conn.execute(text("""
        SELECT COUNT(*) FROM observation 
        WHERE record_type IN ('usda_census_record', 'usda_survey_record')
    """)).scalar()

print(f"\nüìä Records Loaded This Session:")
print(f"  Census records inserted:    {inserted_census}")
print(f"  Survey records inserted:    {inserted_survey}")
print(f"  Observations inserted:      {obs_inserted}")
print(f"  Total inserted:             {inserted_census + inserted_survey + obs_inserted}")

print(f"\nüìä Total Records Now in Database:")
print(f"  Census records:   {total_census:,}")
print(f"  Survey records:   {total_survey:,}")
print(f"  Observations:     {total_obs:,}")
print(f"  Total USDA data:  {total_census + total_survey + total_obs:,}")

print(f"\n‚úÖ Load Complete!")
print("="*80)


üîç STEP 1: Load Census Records (Parent Fact Records)...
  Existing census records in DB: 9
  ‚ÑπÔ∏è  No new census records to insert (all deduplicated)

üîç STEP 2: Load Survey Records (Secondary Parent Fact Records)...
  Existing survey records in DB: 7
  ‚ÑπÔ∏è  No new survey records to insert (all deduplicated)

üîç STEP 3: Load Observations (Measurement Child Records)...
  Built parent record lookup map: 16 entries
  Existing observations in DB: 1779
  DEBUG row 0: value_numeric=14503.0, row['value_numeric']=14503.0, pd.notna=True
  DEBUG row 1: value_numeric=51836.0, row['value_numeric']=51836.0, pd.notna=True
  DEBUG row 2: value_numeric=1345187.0, row['value_numeric']=1345187.0, pd.notna=True
  Built observation batch:
    New observations to insert: 29
    Skipped (no parent record): 0
    Skipped (already exists in DB): 0
    Skipped (duplicate in load batch): 40
  ‚úÖ Inserted 0 observations (Level 3 dedup may have skipped some)

üîç STEP 4: Verify Foreign Key Relations

### As needed backfilling dataset linkages to records

In [28]:

# ============================================================================
# CLEANUP: Link existing records to datasets
# ============================================================================
print("\nüîß CLEANUP: Linking existing records to datasets...")

with engine.begin() as conn:
    # Get all USDA datasets
    datasets = {}
    result = conn.execute(text("""
        SELECT id, name FROM dataset WHERE name LIKE 'USDA_%'
    """))
    for ds_id, ds_name in result:
        datasets[ds_name] = ds_id
    
    print(f"  Available datasets: {list(datasets.keys())}")
    
    # Update census records that don't have dataset_id
    for year in [2022]:  # Years in our data
        ds_name = f"USDA_CENSUS_{year}"
        ds_id = datasets.get(ds_name)
        
        if ds_id:
            result = conn.execute(text(f"""
                UPDATE usda_census_record 
                SET dataset_id = {ds_id}, updated_at = NOW()
                WHERE dataset_id IS NULL AND year = {year}
            """))
            print(f"  ‚úì Updated {result.rowcount} census records for {year} ‚Üí dataset_id={ds_id}")
    
    # Update survey records that don't have dataset_id
    for year in [2022]:
        ds_name = f"USDA_SURVEY_{year}"
        ds_id = datasets.get(ds_name)
        
        if ds_id:
            result = conn.execute(text(f"""
                UPDATE usda_survey_record 
                SET dataset_id = {ds_id}, updated_at = NOW()
                WHERE dataset_id IS NULL AND year = {year}
            """))
            print(f"  ‚úì Updated {result.rowcount} survey records for {year} ‚Üí dataset_id={ds_id}")
    
    # Update observations that don't have dataset_id
    result = conn.execute(text("""
        UPDATE observation 
        SET dataset_id = (
            SELECT id FROM dataset WHERE name = 'USDA_CENSUS_2022'
        )
        WHERE dataset_id IS NULL 
        AND record_type = 'usda_census_record'
    """))
    print(f"  ‚úì Updated {result.rowcount} census observations ‚Üí dataset_id=2")
    
    result = conn.execute(text("""
        UPDATE observation 
        SET dataset_id = (
            SELECT id FROM dataset WHERE name = 'USDA_SURVEY_2022'
        )
        WHERE dataset_id IS NULL 
        AND record_type = 'usda_survey_record'
    """))
    print(f"  ‚úì Updated {result.rowcount} survey observations ‚Üí dataset_id=3")

print("  ‚úÖ Cleanup complete!")



üîß CLEANUP: Linking existing records to datasets...
  Available datasets: ['USDA_CENSUS_2022', 'USDA_SURVEY_2022']
  ‚úì Updated 52 census records for 2022 ‚Üí dataset_id=2
  ‚úì Updated 17 survey records for 2022 ‚Üí dataset_id=3
  ‚úì Updated 12 census observations ‚Üí dataset_id=2
  ‚úì Updated 17 survey observations ‚Üí dataset_id=3
  ‚úÖ Cleanup complete!


# Step 7: Verification

In [36]:
print("="*80)
print("‚úÖ VERIFICATION: Check data in database")
print("="*80)

with engine.connect() as conn:
    # Total counts
    census_count = conn.execute(text("SELECT COUNT(*) FROM usda_census_record")).scalar()
    survey_count = conn.execute(text("SELECT COUNT(*) FROM usda_survey_record")).scalar()
    obs_count = conn.execute(text("SELECT COUNT(*) FROM observation")).scalar()
    
    print(f"\nüìä Total records in database:")
    print(f"  Census records: {census_count}")
    print(f"  Survey records: {survey_count}")
    print(f"  Observations:   {obs_count}")
    
    # Check timestamp coverage
    obs_with_timestamps = conn.execute(text("""
        SELECT COUNT(created_at), COUNT(updated_at) 
        FROM observation
    """)).fetchone()
    
    print(f"\n‚è±Ô∏è  Observation timestamps:")
    print(f"  With created_at: {obs_with_timestamps[0]}")
    print(f"  With updated_at: {obs_with_timestamps[1]}")
    
    # Show sample of newest observations with timestamps
    print(f"\nüìã Sample of newest observations (with timestamps):")
    result = conn.execute(text("""
        SELECT id, record_id, created_at, updated_at
        FROM observation
        WHERE created_at IS NOT NULL
        ORDER BY id DESC LIMIT 3
    """))
    for row in result:
        print(f"  ID {row[0]}: created={row[2]}, updated={row[3]}")

‚úÖ VERIFICATION: Check data in database

üìä Total records in database:
  Census records: 52
  Survey records: 17
  Observations:   1779

‚è±Ô∏è  Observation timestamps:
  With created_at: 1779
  With updated_at: 1779

üìã Sample of newest observations (with timestamps):
  ID 1933: created=2026-01-29 17:44:30.800078, updated=2026-01-29 17:44:30.800078
  ID 1932: created=2026-01-29 17:44:30.800078, updated=2026-01-29 17:44:30.800078
  ID 1931: created=2026-01-29 17:44:30.800078, updated=2026-01-29 17:44:30.800078



# Production Refactoring Plan: USDA ETL

## Overview
This guide shows how to transition the working notebook code (STEPS 1-6) into production Prefect flows, following your coworker's template patterns.

## Key Questions Answered

### 1. Print Statements in Production Code

**Best Practice: Keep them via `logger` (not `print`)**

- ‚úÖ **USE**: `logger.info()`, `logger.warning()`, `logger.error()`
  - Integrates with Prefect's logging system
  - Appears in Prefect UI for monitoring
  - Can be filtered/searched by log level
  - Structured for production systems

- ‚ùå **AVOID**: `print()` statements
  - Only visible in task logs, not structured
  - Hard to filter in monitoring systems
  - Less professional for auditing

**Pattern from your coworker (landiq flow):**
```python
logger.info(f"Processing chunk: {skip} to {min(skip + chunk_size, total_features)}")
logger.warning(f"‚ö† WARNING: {census_missing} census records missing dataset_id")
logger.error(f"‚úó Load failed: {e}")
```

---

### 2. Do We Still Need `link_dataset_ids`?

**SHORT ANSWER: YES, but we can integrate it INTO the load step**

**Current Architecture (in existing USDA flow):**
```
Load ‚Üí (separate step) ‚Üí Link Datasets ‚Üí Verify Lineage
```

**BETTER Architecture (what your notebook showed):**
```
Load (which creates datasets + links them in same transaction)
```

**Why this is better:**
- **Single atomic operation**: No gap where records exist but aren't linked
- **Simpler**: No separate task/step needed
- **Faster**: One database round-trip instead of multiple
- **Your notebook already does this in STEP 0!**

**How to integrate:**
1. Move `create_usda_datasets()` logic into LOAD task (as STEP 0)
2. Move `link_census_records_to_datasets()` into LOAD task (part of building records)
3. Keep `verify_dataset_linkage()` as optional verification in flow

---

### 3. ETL Run ID and Lineage Group Tracking

**Pattern from coworker (in `lineage.py`):**

```python
# In flow
etl_run_id = create_etl_run_record.fn(pipeline_name="Land IQ ETL")
lineage_group_id = create_lineage_group.fn(
    etl_run_id=etl_run_id,
    note="Land IQ 2023 Crop Mapping"
)

# In tasks
def transform(..., etl_run_id: int, lineage_group_id: int):
    # Add these to every record
    transformed_data['etl_run_id'] = etl_run_id
    transformed_data['lineage_group_id'] = lineage_group_id
```

**For USDA, replicate this pattern:**
1. In `usda_etl_flow()`: Create etl_run and lineage_group at start
2. Pass to `extract()`, `transform()`, `load()` tasks
3. Add to all records before inserting

---

## File Structure for Production

```
src/ca_biositing/pipeline/
‚îú‚îÄ‚îÄ etl/
‚îÇ   ‚îú‚îÄ‚îÄ extract/
‚îÇ   ‚îÇ   ‚îî‚îÄ‚îÄ usda_census_survey.py          ‚úì (already exists)
‚îÇ   ‚îÇ
‚îÇ   ‚îú‚îÄ‚îÄ transform/
‚îÇ   ‚îÇ   ‚îî‚îÄ‚îÄ usda/
‚îÇ   ‚îÇ       ‚îî‚îÄ‚îÄ usda_census_survey.py      ‚Üê CREATE THIS
‚îÇ   ‚îÇ
‚îÇ   ‚îî‚îÄ‚îÄ load/
‚îÇ       ‚îî‚îÄ‚îÄ usda/
‚îÇ           ‚îî‚îÄ‚îÄ usda_census_survey.py      ‚Üê UPDATE THIS
‚îÇ
‚îî‚îÄ‚îÄ flows/
    ‚îî‚îÄ‚îÄ usda_etl.py                        ‚Üê UPDATE THIS (orchestration)
```

---

## Implementation Guide

### STEP 1: Create Transform Task
**File**: `src/ca_biositing/pipeline/etl/transform/usda/usda_census_survey.py`

Key sections from notebook:
- ‚úÖ Commodity/parameter/unit mapping
- ‚úÖ Value conversion (remove commas, coerce to float)
- ‚úÖ Record type discrimination (CENSUS vs SURVEY)
- ‚úÖ Survey-specific field handling
- ‚úÖ Cleaning functions (lowercase, empty string handling)

**Use `logger.info()` for:**
- Rows processed
- Rows filtered out (reason)
- Data type confirmations
- Mapping results

---

### STEP 2: Update Load Task
**File**: `src/ca_biositing/pipeline/etl/load/usda/usda_census_survey.py`

Key changes:
1. ‚úÖ STEP 0: Create datasets + build dataset_map (from notebook)
2. ‚úÖ STEP 1-3: Load census ‚Üí survey ‚Üí observations (from notebook)
   - **Include all 3 dedup levels** (existing DB, batch, ON CONFLICT)
   - Use `logger.info()` for counts
3. Remove separate `link_dataset_ids` calls (now integrated)
4. Add `etl_run_id` and `lineage_group_id` to all records

**Critical: Lazy imports inside task**
```python
@task
def load(...):
    from ca_biositing.datamodels.schemas.generated.ca_biositing import (
        UsdaCensusRecord, UsdaSurveyRecord, Observation
    )
    # ... rest of code
```

---

### STEP 3: Update Flow Orchestration
**File**: `src/ca_biositing/pipeline/flows/usda_etl.py`

**New structure:**
```python
@flow(name="USDA Census Survey ETL", log_prints=True)
def usda_etl_flow():
    logger = get_run_logger()
    
    # 0. Create lineage tracking
    etl_run_id = create_etl_run_record.fn(pipeline_name="USDA Census Survey ETL")
    lineage_group_id = create_lineage_group.fn(
        etl_run_id=etl_run_id,
        note="USDA agricultural data"
    )
    
    # 1. Extract
    raw_data = extract()
    
    # 2. Transform (pass IDs)
    cleaned_data = transform(
        data_sources={"usda": raw_data},
        etl_run_id=etl_run_id,
        lineage_group_id=lineage_group_id
    )
    
    # 3. Load (handles dataset creation + linking)
    success = load(
        transformed_df=cleaned_data,
        etl_run_id=etl_run_id,
        lineage_group_id=lineage_group_id
    )
    
    # 4. Verify (optional - can be separate or in load)
    if success:
        logger.info("‚úì USDA ETL Complete")
```

---

## Code Snippets to Reuse from Notebook

### Deduplication Pattern (STEP 3 from notebook)
```python
# Level 1: Existing DB check
existing_obs_keys = set()
with engine.connect() as conn:
    result = conn.execute(text("""
        SELECT record_id, record_type, parameter_id, unit_id FROM observation
    """))
    for row in result:
        existing_obs_keys.add((row[0], row[1], row[2], row[3]))

# Level 2: Batch dedup
seen_obs_keys = set()
for _, row in transformed_data.iterrows():
    # ... extraction ...
    obs_key = (record_id, record_type, parameter_id, unit_id)
    if obs_key in existing_obs_keys or obs_key in seen_obs_keys:
        continue
    seen_obs_keys.add(obs_key)
    obs_records.append({...})

# Level 3: PostgreSQL ON CONFLICT
stmt = pg_insert(observation_table).values(obs_records).on_conflict_do_nothing(
    index_elements=['record_id', 'record_type', 'parameter_id', 'unit_id']
)
result = conn.execute(stmt)
```

### Dataset Linking Pattern (STEP 0 from notebook)
```python
# Create dataset if missing
dataset_name = f"USDA_CENSUS_{year}"
if dataset_name not in existing_datasets:
    new_datasets.append({
        'name': dataset_name,
        'record_type': 'usda_census_record',
        'source_id': usda_data_source_id,
        'start_date': f"{year}-01-01",
        'end_date': f"{year}-12-31",
        'created_at': now,
        'updated_at': now
    })

# Link parent records to dataset
dataset_id = dataset_map.get((year, 'CENSUS'))
new_census.append({
    'geoid': geoid,
    'year': year,
    'commodity_code': commodity_code,
    'dataset_id': dataset_id,  # ‚Üê CRITICAL: Include in insert
    'created_at': now,
    'updated_at': now
})
```

---

## Testing the Production Code

1. **Unit test**: Each task independently with mock data
2. **Integration test**: Run flow locally with `pixi run python -m pytest`
3. **Prefect UI test**: Deploy and trigger in Prefect UI
4. **Verify in database**:
   ```sql
   SELECT COUNT(*), COUNT(DISTINCT etl_run_id) FROM observation 
   WHERE record_type = 'usda_census_record';
   ```

---

## Next Steps

1. Create `etl/transform/usda/usda_census_survey.py` (copy notebook STEPS 0-2 logic)
2. Update `etl/load/usda/usda_census_survey.py` (copy notebook STEPS 0-3, integrate linking)
3. Update `flows/usda_etl.py` (add etl_run_id tracking, simplify flow)
4. Test: `pixi run python -m pytest src/ca_biositing/pipeline/tests/...`
5. Deploy: `pixi run prefect deployment build ...`




# Code Templates for Production Implementation

## Transform Task Template

```python
# File: src/ca_biositing/pipeline/etl/transform/usda/usda_census_survey.py
"""
USDA Census/Survey Data Transform
---
Transforms raw USDA API data into normalized records ready for loading.
"""

import pandas as pd
import numpy as np
from typing import Dict, Optional
from prefect import task, get_run_logger


@task
def transform(
    data_sources: Dict[str, pd.DataFrame],
    etl_run_id: int,
    lineage_group_id: int
) -> Optional[pd.DataFrame]:
    """
    Transform raw USDA data into database-ready format.
    
    Args:
        data_sources: {"usda": raw_dataframe}
        etl_run_id: For tracking this ETL run
        lineage_group_id: For grouping related records
    
    Returns:
        Transformed DataFrame ready for load task
    """
    logger = get_run_logger()
    
    if "usda" not in data_sources:
        logger.error("Missing 'usda' in data_sources")
        return None
    
    raw_data = data_sources["usda"]
    
    if raw_data is None or len(raw_data) == 0:
        logger.warning("No raw data to transform")
        return None
    
    logger.info(f"Transforming {len(raw_data)} raw records...")
    
    # --- LAZY IMPORT (critical for Docker) ---
    from ca_biositing.datamodels.schemas.generated.ca_biositing import (
        Parameter, Unit
    )
    
    # 1. Map commodities, parameters, units
    logger.info("Step 1: Building lookup maps...")
    commodity_map, parameter_id_map, unit_id_map = _build_lookup_maps()
    logger.info(f"  - {len(commodity_map)} commodities")
    logger.info(f"  - {len(parameter_id_map)} parameters")
    logger.info(f"  - {len(unit_id_map)} units")
    
    # 2. Copy and rename
    logger.info("Step 2: Normalizing columns...")
    transformed_data = _normalize_columns(raw_data)
    
    # 3. Clean strings
    logger.info("Step 3: Cleaning string fields...")
    transformed_data = _clean_strings(transformed_data)
    
    # 4. Convert values to numeric
    logger.info("Step 4: Converting values to numeric...")
    transformed_data['value_numeric'] = _convert_to_numeric(
        transformed_data['observation']
    )
    
    # 5. Map IDs
    logger.info("Step 5: Mapping IDs...")
    transformed_data['commodity_code'] = transformed_data['commodity'].apply(
        lambda x: commodity_map.get(x.upper()) if pd.notna(x) else None
    )
    transformed_data['parameter_id'] = transformed_data['statistic'].apply(
        lambda x: parameter_id_map.get(x.upper()) if pd.notna(x) else None
    )
    transformed_data['unit_id'] = transformed_data['unit'].apply(
        lambda x: unit_id_map.get(x.upper()) if pd.notna(x) else None
    )
    
    # 6. Add metadata
    logger.info("Step 6: Adding metadata...")
    transformed_data['record_type'] = transformed_data['source_type'].map({
        'CENSUS': 'usda_census_record',
        'SURVEY': 'usda_survey_record'
    })
    transformed_data['etl_run_id'] = etl_run_id
    transformed_data['lineage_group_id'] = lineage_group_id
    
    # 7. Filter required fields
    logger.info("Step 7: Filtering required fields...")
    required = ['geoid', 'year', 'commodity_code', 'parameter_id', 'unit_id', 'value_numeric']
    transformed_data = transformed_data.dropna(subset=required)
    
    logger.info(f"Transform complete: {len(transformed_data)} records ready for load")
    return transformed_data


def _build_lookup_maps():
    """Build commodity, parameter, unit maps from database"""
    from sqlalchemy import create_engine, text
    import os
    
    db_url = os.getenv('DATABASE_URL')
    engine = create_engine(db_url)
    
    commodity_map = {}
    parameter_map = {}
    unit_map = {}
    
    with engine.connect() as conn:
        # Commodities
        result = conn.execute(text("SELECT id, name FROM usda_commodity"))
        for row in result:
            commodity_map[row[1].upper()] = row[0]
        
        # Parameters
        result = conn.execute(text("SELECT id, name FROM parameter"))
        for row in result:
            parameter_map[row[1].upper()] = row[0]
        
        # Units
        result = conn.execute(text("SELECT id, name FROM unit"))
        for row in result:
            unit_map[row[1].upper()] = row[0]
    
    return commodity_map, parameter_map, unit_map


def _normalize_columns(df):
    """Rename USDA columns to match schema"""
    mapping = {
        'commodity_desc': 'commodity',
        'statisticcat_desc': 'statistic',
        'unit_desc': 'unit',
        'Value': 'observation',
        'county_name': 'county',
        'year': 'year'
    }
    return df.rename(columns={k: v for k, v in mapping.items() if k in df.columns})


def _clean_strings(df):
    """Clean string columns"""
    str_cols = ['commodity', 'statistic', 'unit']
    for col in str_cols:
        if col in df.columns:
            df[col] = df[col].astype(str).str.strip().str.lower()
    return df


def _convert_to_numeric(series):
    """Convert to numeric, handling commas and decimals"""
    return pd.to_numeric(
        series.astype(str).str.replace(',', ''),
        errors='coerce'
    )
```

---

## Load Task Template

```python
# File: src/ca_biositing/pipeline/etl/load/usda/usda_census_survey.py
"""
USDA Census/Survey Data Load
---
Loads transformed USDA data into database with atomic dataset creation + linking.
"""

from typing import Optional
import pandas as pd
from datetime import datetime, timezone
from prefect import task, get_run_logger
from sqlalchemy import create_engine, text, insert
from sqlalchemy.dialects.postgresql import insert as pg_insert


@task
def load(
    transformed_df: Optional[pd.DataFrame],
    etl_run_id: int = None,
    lineage_group_id: int = None
) -> bool:
    """
    Load transformed USDA data with integrated dataset creation and linking.
    
    Implements 3-level deduplication:
    - Level 1: Skip if exists in database
    - Level 2: Skip if seen earlier in this batch
    - Level 3: PostgreSQL ON CONFLICT for final safety
    """
    logger = get_run_logger()
    
    if transformed_df is None or len(transformed_df) == 0:
        logger.warning("No data to load")
        return True
    
    logger.info(f"Starting load of {len(transformed_df)} records...")
    
    try:
        # --- LAZY IMPORT ---
        from ca_biositing.datamodels.schemas.generated.ca_biositing import (
            DataSource, Dataset, UsdaCensusRecord, UsdaSurveyRecord, Observation
        )
        
        db_url = __get_db_url()
        engine = create_engine(db_url)
        now = datetime.now(timezone.utc)
        
        # STEP 0: Create datasets + build map
        logger.info("\nSTEP 0: Creating datasets...")
        dataset_map = _create_and_map_datasets(engine, transformed_df, now)
        
        # STEP 1: Load census records
        logger.info("\nSTEP 1: Loading census records...")
        census_inserted = _load_census_records(
            engine, transformed_df, dataset_map, etl_run_id, 
            lineage_group_id, now
        )
        
        # STEP 2: Load survey records
        logger.info("\nSTEP 2: Loading survey records...")
        survey_inserted = _load_survey_records(
            engine, transformed_df, dataset_map, etl_run_id,
            lineage_group_id, now
        )
        
        # STEP 3: Load observations
        logger.info("\nSTEP 3: Loading observations...")
        obs_inserted = _load_observations(
            engine, transformed_df, dataset_map, etl_run_id,
            lineage_group_id, now
        )
        
        logger.info(f"\nLoad complete:")
        logger.info(f"  Census: {census_inserted}")
        logger.info(f"  Survey: {survey_inserted}")
        logger.info(f"  Observations: {obs_inserted}")
        
        return True
        
    except Exception as e:
        logger.error(f"Load failed: {e}", exc_info=True)
        return False


def _create_and_map_datasets(engine, transformed_df, now):
    """STEP 0: Create USDA datasets if needed, return mapping"""
    logger = get_run_logger()
    from ca_biositing.datamodels.schemas.generated.ca_biositing import (
        DataSource, Dataset
    )
    
    dataset_map = {}
    years = sorted(transformed_df['year'].unique())
    
    with engine.begin() as conn:
        # Ensure DataSource exists
        result = conn.execute(
            text("SELECT id FROM data_source WHERE name = 'USDA NASS API'")
        )
        ds_row = result.fetchone()
        if not ds_row:
            conn.execute(
                text("""
                    INSERT INTO data_source (name, description, created_at, updated_at)
                    VALUES ('USDA NASS API', 'USDA NASS QuickStats API', :now, :now)
                """),
                {"now": now}
            )
            result = conn.execute(
                text("SELECT id FROM data_source WHERE name = 'USDA NASS API'")
            )
            ds_row = result.fetchone()
        
        ds_id = ds_row[0]
        
        # Create datasets for each year
        for year in years:
            for source in ['CENSUS', 'SURVEY']:
                ds_name = f"USDA_{source}_{year}"
                result = conn.execute(
                    text(f"SELECT id FROM dataset WHERE name = '{ds_name}'")
                )
                row = result.fetchone()
                
                if not row:
                    conn.execute(
                        text("""
                            INSERT INTO dataset 
                            (name, record_type, source_id, start_date, end_date, 
                             created_at, updated_at)
                            VALUES (:name, :rtype, :sid, :start, :end, :now, :now)
                        """),
                        {
                            "name": ds_name,
                            "rtype": f"usda_{source.lower()}_record",
                            "sid": ds_id,
                            "start": f"{year}-01-01",
                            "end": f"{year}-12-31",
                            "now": now
                        }
                    )
                    result = conn.execute(
                        text(f"SELECT id FROM dataset WHERE name = '{ds_name}'")
                    )
                    row = result.fetchone()
                
                dataset_map[(year, source)] = row[0]
                logger.info(f"  Dataset: {ds_name} (id={row[0]})")
    
    return dataset_map


def _load_census_records(engine, transformed_df, dataset_map, etl_run_id, 
                        lineage_group_id, now):
    """STEP 1: Load census records with dedup"""
    logger = get_run_logger()
    from ca_biositing.datamodels.schemas.generated.ca_biositing import UsdaCensusRecord
    
    # Level 1: Query existing
    existing_keys = set()
    with engine.connect() as conn:
        result = conn.execute(
            text("SELECT geoid, year, commodity_code FROM usda_census_record")
        )
        for row in result:
            existing_keys.add((row[0], row[1], row[2]))
    
    # Build new records with Level 2 dedup
    new_records = []
    seen_keys = set()
    
    for _, row in transformed_df[transformed_df['source_type'] == 'CENSUS'].iterrows():
        key = (str(row['geoid']).zfill(5), int(row['year']), 
               int(row['commodity_code']) if pd.notna(row['commodity_code']) else None)
        
        if key in existing_keys or key in seen_keys:
            continue
        
        seen_keys.add(key)
        year = int(row['year'])
        ds_id = dataset_map.get((year, 'CENSUS'))
        
        new_records.append({
            'geoid': key[0],
            'year': key[1],
            'commodity_code': key[2],
            'source_reference': 'USDA NASS QuickStats API',
            'dataset_id': ds_id,
            'etl_run_id': etl_run_id,
            'lineage_group_id': lineage_group_id,
            'created_at': now,
            'updated_at': now
        })
    
    if new_records:
        with engine.begin() as conn:
            conn.execute(
                insert(UsdaCensusRecord.__table__),
                new_records
            )
        logger.info(f"  Inserted {len(new_records)} census records")
    
    return len(new_records)


def _load_survey_records(engine, transformed_df, dataset_map, etl_run_id,
                        lineage_group_id, now):
    """STEP 2: Load survey records (same pattern as census)"""
    # Similar to _load_census_records but for SURVEY
    # Includes survey-specific fields: survey_period, reference_month
    logger = get_run_logger()
    logger.info("  (survey loading - similar to census pattern)")
    return 0  # Placeholder


def _load_observations(engine, transformed_df, dataset_map, etl_run_id,
                      lineage_group_id, now):
    """STEP 3: Load observations with 3-level dedup"""
    logger = get_run_logger()
    from ca_biositing.datamodels.schemas.generated.ca_biositing import Observation
    
    # Build parent record map
    record_id_map = {}
    with engine.connect() as conn:
        # Census records
        result = conn.execute(
            text("""
                SELECT id, geoid, year, commodity_code 
                FROM usda_census_record
            """)
        )
        for record_id, geoid, year, commodity_code in result:
            record_id_map[(geoid, year, commodity_code, 'CENSUS')] = record_id
        
        # Survey records
        result = conn.execute(
            text("""
                SELECT id, geoid, year, commodity_code 
                FROM usda_survey_record
            """)
        )
        for record_id, geoid, year, commodity_code in result:
            record_id_map[(geoid, year, commodity_code, 'SURVEY')] = record_id
    
    # Level 1: Query existing observations
    existing_obs_keys = set()
    with engine.connect() as conn:
        result = conn.execute(
            text("""
                SELECT record_id, record_type, parameter_id, unit_id 
                FROM observation
            """)
        )
        for row in result:
            existing_obs_keys.add((row[0], row[1], row[2], row[3]))
    
    # Build obs records with Level 2 dedup
    obs_records = []
    seen_obs_keys = set()
    
    for _, row in transformed_df.iterrows():
        geoid = str(row['geoid']).zfill(5)
        year = int(row['year'])
        commodity_code = int(row['commodity_code']) if pd.notna(row['commodity_code']) else None
        parameter_id = int(row['parameter_id']) if pd.notna(row['parameter_id']) else None
        unit_id = int(row['unit_id']) if pd.notna(row['unit_id']) else None
        value_numeric = float(row['value_numeric']) if pd.notna(row['value_numeric']) else None
        
        if not all([commodity_code, parameter_id, unit_id, value_numeric]):
            continue
        
        source_type = 'CENSUS' if row['source_type'] == 'CENSUS' else 'SURVEY'
        record_key = (geoid, year, commodity_code, source_type)
        parent_record_id = record_id_map.get(record_key)
        
        if not parent_record_id:
            continue
        
        obs_key = (parent_record_id, row['record_type'], parameter_id, unit_id)
        if obs_key in existing_obs_keys or obs_key in seen_obs_keys:
            continue
        
        seen_obs_keys.add(obs_key)
        obs_records.append({
            'record_id': parent_record_id,
            'record_type': row['record_type'],
            'parameter_id': parameter_id,
            'unit_id': unit_id,
            'value': value_numeric,
            'dataset_id': dataset_map.get((year, source_type)),
            'etl_run_id': etl_run_id,
            'lineage_group_id': lineage_group_id,
            'created_at': now,
            'updated_at': now
        })
    
    # Level 3: PostgreSQL ON CONFLICT
    if obs_records:
        with engine.begin() as conn:
            stmt = pg_insert(Observation.__table__).values(obs_records).on_conflict_do_nothing(
                index_elements=['record_id', 'record_type', 'parameter_id', 'unit_id']
            )
            result = conn.execute(stmt)
            logger.info(f"  Inserted {result.rowcount} observations")
            return result.rowcount
    
    return 0


def __get_db_url():
    """Get database URL, handling Docker vs local"""
    import os
    db_url = os.getenv('DATABASE_URL')
    if '@db:' in db_url:
        db_url = db_url.replace('@db:', '@localhost:')
    return db_url
```

---

## Updated Flow Template

```python
# File: src/ca_biositing/pipeline/flows/usda_etl.py
from prefect import flow, get_run_logger
from ca_biositing.pipeline.etl.extract.usda_census_survey import extract
from ca_biositing.pipeline.etl.transform.usda.usda_census_survey import transform
from ca_biositing.pipeline.etl.load.usda.usda_census_survey import load
from ca_biositing.pipeline.utils.lineage import create_etl_run_record, create_lineage_group


@flow(name="USDA Census Survey ETL", log_prints=True)
def usda_etl_flow():
    """
    Orchestrates ETL for USDA agricultural data.
    
    Now with:
    - Integrated dataset creation in load task
    - Automatic etl_run_id and lineage_group_id tracking
    - Cleaner orchestration (no separate linking step)
    """
    logger = get_run_logger()
    
    logger.info("=" * 70)
    logger.info("USDA ETL Flow Started")
    logger.info("=" * 70)
    
    # Step 0: Create lineage tracking
    logger.info("\n[Step 0] Creating lineage tracking...")
    etl_run_id = create_etl_run_record.fn(pipeline_name="USDA ETL")
    lineage_group_id = create_lineage_group.fn(
        etl_run_id=etl_run_id,
        note="USDA Census/Survey agricultural data"
    )
    logger.info(f"‚úì etl_run_id={etl_run_id}, lineage_group_id={lineage_group_id}")
    
    # Step 1: Extract
    logger.info("\n[Step 1] Extracting USDA data...")
    raw_data = extract()
    if raw_data is None:
        logger.error("‚úó Extract failed")
        return False
    logger.info(f"‚úì Extracted {len(raw_data)} records")
    
    # Step 2: Transform
    logger.info("\n[Step 2] Transforming data...")
    cleaned_data = transform(
        data_sources={"usda": raw_data},
        etl_run_id=etl_run_id,
        lineage_group_id=lineage_group_id
    )
    if cleaned_data is None:
        logger.error("‚úó Transform failed")
        return False
    logger.info(f"‚úì Transformed {len(cleaned_data)} records")
    
    # Step 3: Load (now includes dataset creation + linking)
    logger.info("\n[Step 3] Loading data...")
    success = load(
        transformed_df=cleaned_data,
        etl_run_id=etl_run_id,
        lineage_group_id=lineage_group_id
    )
    if not success:
        logger.error("‚úó Load failed")
        return False
    logger.info("‚úì Load complete")
    
    logger.info("\n" + "=" * 70)
    logger.info("‚úì USDA ETL Flow Completed Successfully")
    logger.info("=" * 70)
    
    return True


if __name__ == "__main__":
    usda_etl_flow()
```



## Testing Production Code Directly (No Prefect Orchestration)

Run the production Prefect tasks locally to get full error visibility and debug without rebuilding/deploying.


In [4]:
import sys
import os
import traceback
from prefect import flow

# Setup paths for production code imports
sys.path.insert(0, '/mnt/c/Users/meili/forked/ca-biositing/src/ca_biositing/pipeline' if os.name != 'nt' else 'c:\\Users\\meili\\forked\\ca-biositing\\src\\ca_biositing\\pipeline')

# Set environment
os.environ['DATABASE_URL'] = 'postgresql+psycopg2://biocirv_user:biocirv_dev_password@localhost:5432/biocirv_db'
os.environ['USDA_NASS_API_KEY'] = 'A95E83AA-D37A-37D7-8365-3C77DD57CE34'

print("[Step 1] Importing production extract code...")
try:
    from ca_biositing.pipeline.etl.extract.usda_census_survey import extract
    print("‚úì Extract imported")
except Exception as e:
    print(f"‚úó Failed to import extract: {e}")
    traceback.print_exc()

print("\n[Step 2] Running extract directly in local Prefect flow context...")

@flow(name="Local Test Extract")
def test_extract():
    """Run extract in a Prefect context so get_run_logger() works"""
    return extract()

try:
    raw_data = test_extract()
    print(f"‚úì Extract returned: {type(raw_data)}")
    if raw_data is not None:
        print(f"  Rows: {len(raw_data)}")
        print(f"  Columns: {list(raw_data.columns)}")
        print(f"  Sample:\n{raw_data.head(2)}")
    else:
        print("  Result: None")
except Exception as e:
    print(f"‚úó Extract failed: {e}")
    traceback.print_exc()


[Step 1] Importing production extract code...
‚úì Extract imported

[Step 2] Running extract directly in local Prefect flow context...


Querying USDA API for 4 commodities...
  [1/4] Fetching commodity: ALMONDS...
    [OK] Retrieved 30 records for commodity ALMONDS
  [2/4] Fetching commodity: CORN...
    [OK] Retrieved 447 records for commodity CORN
  [3/4] Fetching commodity: TOMATOES...
    [OK] Retrieved 114 records for commodity TOMATOES
  [4/4] Fetching commodity: WHEAT...
    [OK] Retrieved 1756 records for commodity WHEAT
‚úì Combined 4 queries into 2347 total records

IMPORT SUMMARY
Total Records Imported: 2347
Parameters Used:
  - State: CA
  - Year: All
  - Aggregation Level: COUNTY
  - Domain: TOTAL
  - County Code: 047



Querying USDA API for 4 commodities...
  [1/4] Fetching commodity: ALMONDS...
    [OK] Retrieved 30 records for commodity ALMONDS
  [2/4] Fetching commodity: CORN...
    [OK] Retrieved 470 records for commodity CORN
  [3/4] Fetching commodity: TOMATOES...
    [OK] Retrieved 110 records for commodity TOMATOES
  [4/4] Fetching commodity: WHEAT...
    [OK] Retrieved 1883 records for commodity WHEAT
‚úì Combined 4 queries into 2493 total records

IMPORT SUMMARY
Total Records Imported: 2493
Parameters Used:
  - State: CA
  - Year: All
  - Aggregation Level: COUNTY
  - Domain: TOTAL
  - County Code: 077



Querying USDA API for 4 commodities...
  [1/4] Fetching commodity: ALMONDS...
    [OK] Retrieved 30 records for commodity ALMONDS
  [2/4] Fetching commodity: CORN...
    [OK] Retrieved 441 records for commodity CORN
  [3/4] Fetching commodity: TOMATOES...
    [OK] Retrieved 102 records for commodity TOMATOES
  [4/4] Fetching commodity: WHEAT...
    [OK] Retrieved 1495 records for commodity WHEAT
‚úì Combined 4 queries into 2068 total records

IMPORT SUMMARY
Total Records Imported: 2068
Parameters Used:
  - State: CA
  - Year: All
  - Aggregation Level: COUNTY
  - Domain: TOTAL
  - County Code: 099



‚úì Extract returned: <class 'pandas.core.frame.DataFrame'>
  Rows: 6908
  Columns: ['domain_desc', 'state_fips_code', 'zip_5', 'county_ansi', 'CV (%)', 'country_code', 'util_practice_desc', 'statisticcat_desc', 'country_name', 'source_desc', 'watershed_code', 'congr_district_code', 'begin_code', 'region_desc', 'state_name', 'group_desc', 'location_desc', 'unit_desc', 'reference_period_desc', 'county_name', 'year', 'prodn_practice_desc', 'class_desc', 'watershed_desc', 'agg_level_desc', 'Value', 'state_ansi', 'freq_desc', 'asd_code', 'county_code', 'end_code', 'asd_desc', 'sector_desc', 'state_alpha', 'week_ending', 'short_desc', 'commodity_desc', 'domaincat_desc', 'load_time']
  Sample:
  domain_desc state_fips_code zip_5 county_ansi CV (%) country_code  \
0       TOTAL              06               047   10.4         9000   
1       TOTAL              06               047   11.7         9000   

          util_practice_desc statisticcat_desc   country_name source_desc  \
0  ALL UTILI

In [6]:
print("[Step 3] Importing production transform code...")
try:
    from ca_biositing.pipeline.etl.transform.usda.usda_census_survey import transform
    print("‚úì Transform imported")
except Exception as e:
    print(f"‚úó Failed to import transform: {e}")
    traceback.print_exc()

print("\n[Step 4] Running transform directly on extracted data...")
if raw_data is not None:
    @flow(name="Local Test Transform")
    def test_transform():
        """Run transform in a Prefect context"""
        return transform(
            data_sources={"usda": raw_data},
            etl_run_id=999,
            lineage_group_id=999
        )
    
    try:
        print(f"Calling transform with {len(raw_data)} rows...")
        transformed = test_transform()
        
        print(f"‚úì Transform returned: {type(transformed)}")
        if transformed is not None:
            print(f"  Rows: {len(transformed)}")
            print(f"  Columns: {list(transformed.columns)}")
            print(f"  Sample:\n{transformed.head(2)}")
        else:
            print("  Result: None")
    except Exception as e:
        print(f"‚úó Transform failed: {e}")
        print("\nFull traceback:")
        traceback.print_exc()
else:
    print("Skipping transform - extract returned None")


[Step 3] Importing production transform code...
‚úì Transform imported

[Step 4] Running transform directly on extracted data...
Calling transform with 6908 rows...


‚úì Transform returned: <class 'pandas.core.frame.DataFrame'>
  Rows: 3169
  Columns: ['domain_desc', 'state_fips_code', 'zip_5', 'county_ansi', 'CV (%)', 'country_code', 'util_practice_desc', 'statistic', 'country_name', 'source_type', 'watershed_code', 'congr_district_code', 'begin_code', 'region_desc', 'state_name', 'group_desc', 'location_desc', 'unit', 'reference_month', 'county', 'year', 'prodn_practice_desc', 'class_desc', 'watershed_desc', 'agg_level_desc', 'observation', 'state_ansi', 'survey_period', 'asd_code', 'county_code', 'end_code', 'asd_desc', 'sector_desc', 'state_alpha', 'week_ending', 'short_desc', 'commodity', 'domaincat_desc', 'load_time', 'geoid', 'value_numeric', 'value_text', 'commodity_code', 'parameter_id', 'unit_id', 'record_type', 'cv_pct', 'note', 'etl_run_id', 'lineage_group_id']
  Sample:
   domain_desc state_fips_code zip_5 county_ansi CV (%) country_code  \
38       TOTAL              06               047   62.8         9000   
39       TOTAL          


# Quick Reference: Notebook ‚Üí Production Checklist

## Questions Summary

| Question | Answer | Where? |
|----------|--------|--------|
| **Print statements in prod?** | Use `logger.info/warning/error()` not `print()` | `utils/lineage.py` shows pattern |
| **Still need `link_dataset_ids`?** | ‚úÖ YES, but integrate INTO load task (atomic) | Notebook STEP 0 in load task |
| **How to track etl_run_id?** | Create at flow start, pass to all tasks | `flows/landiq_etl.py` shows pattern |

---

## Key Design Decision

### OLD (current USDA flow):
```
EXTRACT ‚Üí TRANSFORM ‚Üí LOAD ‚Üí [separate] Link Datasets ‚Üí Verify
```
‚ùå Gap where records exist but aren't linked

### NEW (from your working notebook):
```
EXTRACT ‚Üí TRANSFORM ‚Üí LOAD (includes: create datasets + link in one transaction)
```
‚úÖ Atomic: Records always linked to dataset

---

## Mapping: Notebook STEPS ‚Üí Production Tasks

| Notebook STEP | Description | Production Task | File |
|---------------|-------------|-----------------|------|
| Setup | Configure API key, load libraries | N/A (in extract) | extract/usda_census_survey.py |
| Extract | Query USDA API | `extract()` | extract/usda_census_survey.py ‚úì |
| Transform | Clean + normalize | `transform()` | transform/usda/usda_census_survey.py (NEW) |
| **STEP 0** | **Create datasets** | **`_create_and_map_datasets()`** | **load/usda/usda_census_survey.py** |
| **STEP 1** | **Load census** | **`_load_census_records()`** | **load/usda/usda_census_survey.py** |
| **STEP 2** | **Load survey** | **`_load_survey_records()`** | **load/usda/usda_census_survey.py** |
| **STEP 3** | **Load observations** | **`_load_observations()`** | **load/usda/usda_census_survey.py** |
| STEP 4 | Verify FK relationships | (optional, can keep) | flow orchestration |
| STEP 5 | Verify lineage | (optional, can keep) | flow orchestration |
| STEP 6 | Summary report | (optional, can keep) | flow orchestration |

---

## Critical Code Patterns to Copy from Notebook

### 1. Deduplication (Level 1-3)
```python
# Check existing in DB (Level 1)
existing_keys = set()
with engine.connect() as conn:
    result = conn.execute(text("SELECT ... FROM observation"))
    for row in result:
        existing_keys.add((row[0], row[1], row[2], row[3]))

# Track seen in batch (Level 2)
seen_keys = set()
for obs in batch:
    if obs_key in existing_keys or obs_key in seen_keys:
        continue
    seen_keys.add(obs_key)
    records.append(obs)

# PostgreSQL dedup (Level 3)
stmt = pg_insert(table).values(records).on_conflict_do_nothing(
    index_elements=['record_id', 'record_type', 'parameter_id', 'unit_id']
)
```

### 2. Type Casting for PostgreSQL
```python
# Fix operator mismatch (integer vs text)
result = conn.execute(text("""
    SELECT COUNT(*) FROM observation o
    WHERE o.record_type = 'usda_census_record'
    AND NOT EXISTS (
        SELECT 1 FROM usda_census_record c 
        WHERE c.id = o.record_id::integer  ‚Üê CAST HERE
    )
"""))
```

### 3. Dataset Linking (ATOMIC)
```python
# Add dataset_id in the INSERT, not separate UPDATE
new_census.append({
    'geoid': geoid,
    'year': year,
    'commodity_code': commodity_code,
    'dataset_id': dataset_map.get((year, 'CENSUS')),  ‚Üê INCLUDE
    'created_at': now,
    'updated_at': now
})
```

---

## Testing in Production

### 1. Local test with test data
```bash
cd src/ca_biositing/pipeline
python -c "from etl.transform.usda.usda_census_survey import transform; print('Import OK')"
```

### 2. Run flow locally
```bash
pixi run python src/ca_biositing/pipeline/flows/usda_etl.py
```

### 3. Verify database
```sql
-- Check records were loaded
SELECT COUNT(*), COUNT(DISTINCT etl_run_id) FROM observation 
WHERE record_type IN ('usda_census_record', 'usda_survey_record');

-- Check all have dataset_id
SELECT COUNT(*) FROM usda_census_record WHERE dataset_id IS NULL;
SELECT COUNT(*) FROM observation WHERE dataset_id IS NULL 
AND record_type IN ('usda_census_record', 'usda_survey_record');
```

---

## Common Pitfalls to Avoid

| Pitfall | Solution |
|---------|----------|
| Import at module level hangs Docker | Use lazy imports inside @task functions |
| `print()` doesn't appear in Prefect UI | Use `logger.info()` instead |
| Records loaded but not linked | Include dataset_id in INSERT statement |
| Type mismatch errors in PostgreSQL | Use explicit casts (e.g., `::integer`) |
| Duplicate observations inserted | Implement all 3 dedup levels |
| etl_run_id not tracked | Create record at flow start, pass to all tasks |

---

## Files to Create/Update

### CREATE (NEW):
- ‚úÖ `src/ca_biositing/pipeline/etl/transform/usda/usda_census_survey.py`
- ‚úÖ `src/ca_biositing/pipeline/etl/transform/usda/__init__.py`

### UPDATE:
- üîÑ `src/ca_biositing/pipeline/etl/load/usda/usda_census_survey.py` (integrate STEPS 0-3)
- üîÑ `src/ca_biositing/pipeline/flows/usda_etl.py` (simplify, add etl_run tracking)

### DELETE (after refactoring):
- ‚ùå `src/ca_biositing/pipeline/tasks/link_dataset_ids.py` (merge into load task)

---

## Next Action Items

```
[ ] 1. Create transform task file
    - Copy structure from templates
    - Implement _build_lookup_maps()
    - Implement _normalize_columns()
    
[ ] 2. Update load task file
    - Add _create_and_map_datasets()
    - Add _load_census_records()
    - Add _load_survey_records()
    - Add _load_observations()
    - Remove old code that doesn't have dedup
    
[ ] 3. Update flow orchestration
    - Add etl_run_id creation
    - Pass to extract/transform/load
    - Remove separate linking step
    
[ ] 4. Test locally
    - Run flow: pixi run python flows/usda_etl.py
    - Check database: SELECT COUNT(*) FROM observation...
    
[ ] 5. Deploy to Prefect
    - pixi run prefect deployment build flows/usda_etl.py:usda_etl_flow
    - pixi run prefect deployment apply usda_etl_flow-deployment.yaml
    
[ ] 6. Remove old link_dataset_ids task (after confirming works)
```

---

## Final Notes

Your working notebook is **production-ready code**. The production refactoring is mainly about:
1. ‚úÖ Splitting into tasks/flows (structure)
2. ‚úÖ Adding logging instead of print (monitoring)
3. ‚úÖ Passing etl_run_id through (tracking)
4. ‚úÖ Following coworker's patterns (consistency)

The core logic (STEPS 1-3, all dedup patterns, type casting) should copy directly!

