# Thai Election Form Extractor - BigQuery + Google Drive Edition

Extract structured data from election form PDFs stored in **Google Drive** using **BigQuery** to find files.

**Key Advantages:**
- üîç Query BigQuery to find PDF files
- üìÅ Direct access to Google Drive files (no download needed!)
- üöÄ Uses Gemini's External URLs file input method
- ü§ñ Structured output with Pydantic schema validation

**Reference:** [Gemini File Input Methods](https://ai.google.dev/gemini-api/docs/file-input-methods)

## 1. Setup and Dependencies

In [None]:
!source ../.env
!pip install --upgrade pip
!pip install -q google-cloud-bigquery google-genai pydantic pandas ddtrace

In [87]:
import json
import os
from typing import Optional

from google.cloud import bigquery
from google import genai
from google.genai import types
from pydantic import BaseModel, Field
from IPython.display import display, HTML
import pandas as pd

## Experiment Configuration

In [88]:
from dataclasses import dataclass
from typing import Optional, Literal

@dataclass
class ExperimentConfig:
    """Configuration for extraction experiments."""
    model: str
    temperature: float = 0.0
    max_tokens: int = 8192
    thinking_mode: Optional[Literal["LOW", "HIGH"]] = None
    
    def __post_init__(self):
        # Validate thinking_mode only for supported models
        if self.thinking_mode:
            if 'gemini-3' not in self.model.lower():
                raise ValueError(
                    f"thinking_mode is only supported for gemini-3-pro-preview "
                    f"and gemini-3-flash-preview. Got: {self.model}"
                )
    
    def to_dict(self):
        return {
            'model': self.model,
            'temperature': self.temperature,
            'max_tokens': self.max_tokens,
            'thinking_mode': self.thinking_mode,
        }

# Define experiment configurations
EXPERIMENT_CONFIGS = [
    # Temperature 0.0 (deterministic)
    ExperimentConfig(model="gemini-3-flash-preview", temperature=0.0, thinking_mode="LOW"),
    ExperimentConfig(model="gemini-3-flash-preview", temperature=0.0, thinking_mode="HIGH"),
    ExperimentConfig(model="gemini-3-pro-preview", temperature=0.0, thinking_mode="LOW"),
    ExperimentConfig(model="gemini-3-pro-preview", temperature=0.0, thinking_mode="HIGH"),
    ExperimentConfig(model="gemini-2.5-flash", temperature=0.0),  # No thinking mode
    
    # Temperature 0.5 (more creative)
    ExperimentConfig(model="gemini-3-flash-preview", temperature=0.5, thinking_mode="LOW"),
    ExperimentConfig(model="gemini-3-flash-preview", temperature=0.5, thinking_mode="HIGH"),
    ExperimentConfig(model="gemini-3-pro-preview", temperature=0.5, thinking_mode="LOW"),
    ExperimentConfig(model="gemini-3-pro-preview", temperature=0.5, thinking_mode="HIGH"),
    ExperimentConfig(model="gemini-2.5-flash", temperature=0.5),  # No thinking mode
]

print("‚úÖ Experiment configurations defined:")
for i, config in enumerate(EXPERIMENT_CONFIGS, 1):
    thinking = f" (thinking: {config.thinking_mode})" if config.thinking_mode else ""
    temp_str = f"temp: {config.temperature}"
    print(f"   {i}. {config.model:<30} {temp_str:<10} {thinking}")

‚úÖ Experiment configurations defined:
   1. gemini-3-flash-preview         temp: 0.0   (thinking: LOW)
   2. gemini-3-flash-preview         temp: 0.0   (thinking: HIGH)
   3. gemini-3-pro-preview           temp: 0.0   (thinking: LOW)
   4. gemini-3-pro-preview           temp: 0.0   (thinking: HIGH)
   5. gemini-2.5-flash               temp: 0.0  
   6. gemini-3-flash-preview         temp: 0.5   (thinking: LOW)
   7. gemini-3-flash-preview         temp: 0.5   (thinking: HIGH)
   8. gemini-3-pro-preview           temp: 0.5   (thinking: LOW)
   9. gemini-3-pro-preview           temp: 0.5   (thinking: HIGH)
   10. gemini-2.5-flash               temp: 0.5  


## 2. Configuration

In [89]:
# Google Cloud Configuration

# Set Gemini API Key (hidden input)
if 'GEMINI_API_KEY' not in os.environ:
    print("‚ö†Ô∏è  GEMINI_API_KEY not found in environment variables.")
    os.environ['GEMINI_API_KEY'] = 'AIzaSyC...' #@param {type:"string"}

GEMINI_API_KEY = os.environ['GEMINI_API_KEY']

# Set Google Cloud Project ID
if 'GOOGLE_CLOUD_PROJECT' not in os.environ:
    print("‚ö†Ô∏è  GOOGLE_CLOUD_PROJECT not found in environment variables.")
    os.environ['GOOGLE_CLOUD_PROJECT'] = 'YOUR_GCP_PROJECT_ID' #@param {type:"string"}

GOOGLE_CLOUD_PROJECT = os.environ['GOOGLE_CLOUD_PROJECT']

print("‚úÖ Environment variables set!")

# Model Configuration
MODEL_NAME = "gemini-3-flash-preview"  # or "gemini-3-pro-preview", "gemini-2.5-flash"

# BigQuery Configuration
BQ_TABLE = "sourceinth.vote69_ect.raw_files"

# Warn if API key is not set
if not GEMINI_API_KEY:
    print("\n‚ö†Ô∏è  WARNING: GEMINI_API_KEY is not set!")
    print("   Set it with: export GEMINI_API_KEY='your-key-here'")
    print("   Or create a .env file with GEMINI_API_KEY=your-key-here")

# Verify configuration
print(f"‚úÖ Configuration")
print(f"   Project: {GOOGLE_CLOUD_PROJECT}")
print(f"   API Key: {'*' * 20 + GEMINI_API_KEY[-8:] if GEMINI_API_KEY and len(GEMINI_API_KEY) > 8 else 'NOT SET ‚ö†Ô∏è'}")
print(f"   Model: {MODEL_NAME}")
print(f"   BigQuery Table: {BQ_TABLE}")


‚úÖ Environment variables set!
‚úÖ Configuration
   Project: datadog-ese-sandbox
   API Key: ********************jezISbnw
   Model: gemini-3-flash-preview
   BigQuery Table: sourceinth.vote69_ect.raw_files


In [90]:
from ddtrace.llmobs import LLMObs
from ddtrace.llmobs.decorators import workflow, task
from typing import Dict, Any, Optional, List

ML_APP = "gemini-ss5_18" #@param {type:"string"}
LLMOBS_PROJECT_NAME = "vote-extraction-project" #@param {type:"string"}

LLMObs.enable(
  ml_app=ML_APP,
  api_key=os.environ['DD_API_KEY'],
  app_key=os.environ['DD_APP_KEY'],
  project_name=LLMOBS_PROJECT_NAME,
  site="us3.datadoghq.com",
  agentless_enabled=True,
)

In [91]:
# Pull dataset from LLMObs for experiments
dataset = LLMObs.pull_dataset(
    dataset_name="ss5_18_nuttee",
    project_name=LLMOBS_PROJECT_NAME,
)

print(f"‚úÖ Dataset loaded: {len(dataset)} items")
print(f"   Dataset name: ss5_18_nuttee")
print(f"   Project: {LLMOBS_PROJECT_NAME}")

‚úÖ Dataset loaded: 5 items
   Dataset name: ss5_18_nuttee
   Project: vote-extraction-project


## 3. Pydantic Schema

In [92]:
class NumberTextPair(BaseModel):
    """Thai document number representation (both Arabic numeral and Thai text)."""
    arabic: int = Field(..., description="Arabic numeral (e.g., 120)")
    thai_text: Optional[str] = Field(None, description="Thai text (e.g., '‡∏´‡∏ô‡∏∂‡πà‡∏á‡∏£‡πâ‡∏≠‡∏¢‡∏¢‡∏µ‡πà‡∏™‡∏¥‡∏ö')")


class FormInfo(BaseModel):
    """Header information identifying the polling station."""
    form_type: Optional[str] = Field(None, description="Constituency or PartyList")
    set_number: Optional[str] = Field(None, description="Set number (‡∏ä‡∏∏‡∏î‡∏ó‡∏µ‡πà)")  # NEW
    date: Optional[str] = Field(None, description="Date of election")
    province: Optional[str] = Field(None, description="Province name")
    constituency_number: Optional[str] = Field(None, description="Constituency number")
    district: str = Field(..., description="District name")
    sub_district: Optional[str] = Field(None, description="Sub-district name")
    polling_station_number: str = Field(..., description="Polling station number")
    village_moo: Optional[str] = Field(None, description="Village number (‡∏´‡∏°‡∏π‡πà‡∏ó‡∏µ‡πà)")  # NEW


class VoterStatistics(BaseModel):
    """Voter statistics (Section 1)."""
    eligible_voters: Optional[NumberTextPair] = Field(None, description="Total eligible voters")
    present_voters: Optional[NumberTextPair] = Field(None, description="Voters who showed up")


class BallotStatistics(BaseModel):
    """Ballot accounting statistics (Section 2)."""
    ballots_allocated: Optional[NumberTextPair] = Field(None, description="Allocated ballots")
    ballots_used: Optional[NumberTextPair] = Field(None, description="Used ballots")
    good_ballots: Optional[NumberTextPair] = Field(None, description="Valid ballots")
    bad_ballots: Optional[NumberTextPair] = Field(None, description="Invalid ballots")
    no_vote_ballots: Optional[NumberTextPair] = Field(None, description="No vote ballots")
    ballots_remaining: Optional[NumberTextPair] = Field(None, description="Remaining ballots")


class VoteResult(BaseModel):
    """Individual vote result."""
    number: int = Field(..., description="Candidate/Party number")
    candidate_name: Optional[str] = Field(None, description="Candidate name (Constituency only)")
    party_name: Optional[str] = Field(None, description="Party name")
    vote_count: NumberTextPair = Field(..., description="Vote count (number + text)")


class Official(BaseModel):
    """Committee member/official."""
    name: str = Field(..., description="Full name of official")
    position: str = Field(..., description="Position/role (e.g., ‡∏õ‡∏£‡∏∞‡∏ò‡∏≤‡∏ô, ‡∏Å‡∏£‡∏£‡∏°‡∏Å‡∏≤‡∏£)")


class ElectionFormData(BaseModel):
    """Complete election form extraction result."""
    form_info: FormInfo
    voter_statistics: Optional[VoterStatistics] = None
    ballot_statistics: Optional[BallotStatistics] = None
    vote_results: list[VoteResult] = Field(default_factory=list)
    total_votes_recorded: Optional[NumberTextPair] = Field(
        None, 
        description="Total vote count from table footer"
    )  # NEW
    officials: Optional[list[Official]] = Field(
        None,
        description="Committee members who signed the form"
    )  # NEW

## Evaluator Functions for LLMObs Experiments

In [93]:
from typing import Dict, Any, List

def evaluate_ballot_statistics(input_data, output_data, expected_output) -> float:
    """
    Evaluate ballot statistics correctness.
    
    Returns float score (0.0 to 1.0) based on correctness of:
    - ballots_allocated, ballots_used
    - good_ballots, bad_ballots, no_vote_ballots
    - ballots_remaining
    - Validation: ballots_used = good + bad + no_vote
    
    Args:
        input_data: File metadata (not used in evaluation)
        output_data: Extracted report data (list of dicts)
        expected_output: Expected values (dict with ballot_statistics)
    
    Returns:
        float: Score from 0.0 to 1.0
    """
    if not output_data or len(output_data) == 0:
        return 0.0
    
    report = output_data[0]  # Assume first report
    ballot_stats = report.get('ballot_statistics', {})
    expected_stats = expected_output.get('ballot_statistics', {})
    
    if not ballot_stats:
        return 0.0
    
    # Helper to get arabic number
    def get_val(obj):
        if isinstance(obj, dict):
            return obj.get('arabic', 0)
        return obj or 0
    
    # Extract values
    allocated = get_val(ballot_stats.get('ballots_allocated'))
    used = get_val(ballot_stats.get('ballots_used'))
    good = get_val(ballot_stats.get('good_ballots'))
    bad = get_val(ballot_stats.get('bad_ballots'))
    no_vote = get_val(ballot_stats.get('no_vote_ballots'))
    remaining = get_val(ballot_stats.get('ballots_remaining'))
    
    # Expected values
    exp_allocated = get_val(expected_stats.get('ballots_allocated'))
    exp_used = get_val(expected_stats.get('ballots_used'))
    exp_good = get_val(expected_stats.get('good_ballots'))
    exp_bad = get_val(expected_stats.get('bad_ballots'))
    exp_no_vote = get_val(expected_stats.get('no_vote_ballots'))
    exp_remaining = get_val(expected_stats.get('ballots_remaining'))
    
    # Calculate accuracy
    checks = [
        allocated == exp_allocated,
        used == exp_used,
        good == exp_good,
        bad == exp_bad,
        no_vote == exp_no_vote,
        remaining == exp_remaining,
    ]
    
    # Validation check
    validation_pass = (used == good + bad + no_vote)
    checks.append(validation_pass)
    
    correct = sum(checks)
    total = len(checks)
    score = correct / total
    
    return score


def evaluate_voter_statistics(input_data, output_data, expected_output) -> float:
    """
    Evaluate voter statistics correctness.
    
    Returns float score (0.0 to 1.0) based on:
    - eligible_voters
    - present_voters
    
    Args:
        input_data: File metadata (not used in evaluation)
        output_data: Extracted report data (list of dicts)
        expected_output: Expected values (dict with voter_statistics)
    
    Returns:
        float: Score from 0.0 to 1.0
    """
    if not output_data or len(output_data) == 0:
        return 0.0
    
    report = output_data[0]
    voter_stats = report.get('voter_statistics', {})
    expected_stats = expected_output.get('voter_statistics', {})
    
    if not voter_stats:
        return 0.0
    
    def get_val(obj):
        if isinstance(obj, dict):
            return obj.get('arabic', 0)
        return obj or 0
    
    eligible = get_val(voter_stats.get('eligible_voters'))
    present = get_val(voter_stats.get('present_voters'))
    
    exp_eligible = get_val(expected_stats.get('eligible_voters'))
    exp_present = get_val(expected_stats.get('present_voters'))
    
    checks = [
        eligible == exp_eligible,
        present == exp_present,
    ]
    
    correct = sum(checks)
    total = len(checks)
    score = correct / total
    
    return score


def evaluate_total_votes(input_data, output_data, expected_output) -> float:
    """
    Evaluate total votes correctness.
    
    Returns float score (0.0 to 1.0) based on:
    - Sum of all vote counts matches total_votes_recorded
    - Total matches expected value
    
    Args:
        input_data: File metadata (not used in evaluation)
        output_data: Extracted report data (list of dicts)
        expected_output: Expected values (dict with total_votes_recorded)
    
    Returns:
        float: Score from 0.0 to 1.0
    """
    if not output_data or len(output_data) == 0:
        return 0.0
    
    report = output_data[0]
    vote_results = report.get('vote_results', [])
    total_recorded = report.get('total_votes_recorded')
    
    if not vote_results:
        return 0.0
    
    def get_val(obj):
        if isinstance(obj, dict):
            return obj.get('arabic', 0)
        return obj or 0
    
    # Calculate sum
    calculated_total = sum(get_val(v.get('vote_count')) for v in vote_results)
    recorded_total = get_val(total_recorded)
    expected_total = get_val(expected_output.get('total_votes_recorded'))
    
    # Checks
    internal_match = calculated_total == recorded_total
    expected_match = recorded_total == expected_total
    
    checks = [internal_match, expected_match]
    correct = sum(checks)
    score = correct / len(checks)
    
    return score


# Register evaluators for LLMObs
EVALUATORS = {
    "ballot_statistics": evaluate_ballot_statistics,
    "voter_statistics": evaluate_voter_statistics,
    "total_votes": evaluate_total_votes,
}

print("‚úÖ Evaluator functions defined:")
for name in EVALUATORS.keys():
    print(f"   - {name}")
print("\nEvaluators return float (0.0 to 1.0) for LLMObs experiments")

‚úÖ Evaluator functions defined:
   - ballot_statistics
   - voter_statistics
   - total_votes

Evaluators return float (0.0 to 1.0) for LLMObs experiments


## Run Experiments with Dataset

In [104]:
def run_single_experiment(
    config: ExperimentConfig,
    dataset,
    experiment_name: str = None,
    description: str = None,
) -> dict:
    """
    Run a single experiment with LLMObs.experiment API.
    
    Args:
        config: ExperimentConfig to test
        dataset: LLMObs dataset
        experiment_name: Optional experiment name
        description: Optional description
    
    Returns:
        Experiment object with results
    """
    # Generate experiment name if not provided
    if experiment_name is None:
        thinking = f"_thinking_{config.thinking_mode}" if config.thinking_mode else ""
        experiment_name = f"{config.model.replace('.', '_').replace('-', '_')}{thinking}_temp{config.temperature}"
    
    # Generate description
    if description is None:
        thinking_str = f" with thinking mode {config.thinking_mode}" if config.thinking_mode else ""
        description = f"Testing {config.model}{thinking_str} (temp={config.temperature})"
    
    print(f"\nüöÄ Running experiment: {experiment_name}")
    print(f"   Model: {config.model}")
    print(f"   Temperature: {config.temperature}")
    print(f"   Thinking mode: {config.thinking_mode or 'N/A'}")
    print(f"   Dataset items: {len(list(dataset))}")
    print()
    
    # Create task wrapper that passes config
    # Dataset items have structure: {'input_data': file_info, 'expected_output': ...}
    def task_with_config(input_data, config) -> list[dict]:
        """Task function for LLMObs - must accept input_data and config."""
        return extraction_task(input_data, config)
    
    # Run experiment using LLMObs.experiment API
    experiment = LLMObs.experiment(
        name=experiment_name,
        task=task_with_config,
        dataset=dataset,
        evaluators=list(EVALUATORS.values()),
        description=description,
        config=config.to_dict(),
    )
    
    print(f"\n‚úÖ Experiment completed: {experiment_name}")
    print(f"   Results available in Datadog LLMObs dashboard")
    
    return experiment


def run_all_experiments(
    dataset,
    configs: list[ExperimentConfig],
) -> list:
    """
    Run experiments for all configurations.
    
    Args:
        dataset: LLMObs dataset
        configs: List of ExperimentConfig to test
    
    Returns:
        List of experiment objects
    """
    experiments = []
    
    print("="*80)
    print("üöÄ Running Multiple Experiments")
    print("="*80)
    print(f"\nTotal configurations: {len(configs)}")
    print(f"Dataset items: {len(list(dataset))}")
    print(f"Total experiments: {len(configs)}\n")
    
    for i, config in enumerate(configs, 1):
        print(f"\n{'='*80}")
        print(f"Experiment {i}/{len(configs)}")
        print(f"{'='*80}")
        
        try:
            experiment = run_single_experiment(config, dataset)
            experiments.append(experiment)
        except Exception as e:
            print(f"\n‚ùå Error running experiment {i}: {e}")
            import traceback
            traceback.print_exc()
            continue
    
    print(f"\n\n{'='*80}")
    print(f"‚úÖ All experiments completed: {len(experiments)}/{len(configs)} successful")
    print(f"{'='*80}")
    print(f"\nüí° View results in Datadog LLMObs dashboard")
    print(f"   Project: {LLMOBS_PROJECT_NAME}")
    print(f"   ML App: {ML_APP}")
    
    return experiments


print("‚úÖ Experiment functions defined")
print("\nUsage:")
print("  # Run single experiment:")
print("  experiment = run_single_experiment(EXPERIMENT_CONFIGS[0], dataset)")
print("\n  # Run all experiments:")
print("  experiments = run_all_experiments(dataset, EXPERIMENT_CONFIGS)")

‚úÖ Experiment functions defined

Usage:
  # Run single experiment:
  experiment = run_single_experiment(EXPERIMENT_CONFIGS[0], dataset)

  # Run all experiments:
  experiments = run_all_experiments(dataset, EXPERIMENT_CONFIGS)


## 4. Gemini Schema for Structured Output

In [95]:
# Enhanced schema for Gemini structured output with NumberTextPair
ELECTION_DATA_SCHEMA = {
    "type": "ARRAY",
    "description": "List of election reports found in the PDF",
    "items": {
        "type": "OBJECT",
        "properties": {
            "form_info": {
                "type": "OBJECT",
                "description": "Header information",
                "properties": {
                    "form_type": {
                        "type": "STRING",
                        "enum": ["Constituency", "PartyList"],
                        "description": "Form type: Constituency (candidates) or PartyList (parties only)"
                    },
                    "set_number": {"type": "STRING", "description": "Set number (‡∏ä‡∏∏‡∏î‡∏ó‡∏µ‡πà)"},
                    "date": {"type": "STRING", "description": "Date of election"},
                    "province": {"type": "STRING", "description": "Province name"},
                    "constituency_number": {"type": "STRING", "description": "Constituency number"},
                    "district": {"type": "STRING", "description": "District name"},
                    "sub_district": {"type": "STRING", "description": "Sub-district name"},
                    "polling_station_number": {"type": "STRING", "description": "Polling station number"},
                    "village_moo": {"type": "STRING", "description": "Village number (‡∏´‡∏°‡∏π‡πà‡∏ó‡∏µ‡πà)"},
                },
                "required": ["form_type", "province", "district", "polling_station_number"],
            },
            "voter_statistics": {
                "type": "OBJECT",
                "description": "Section 1: Voter statistics",
                "properties": {
                    "eligible_voters": {
                        "type": "OBJECT",
                        "description": "1.1 Total eligible voters",
                        "properties": {
                            "arabic": {"type": "INTEGER"},
                            "thai_text": {"type": "STRING"}
                        }
                    },
                    "present_voters": {
                        "type": "OBJECT",
                        "description": "1.2 Voters who showed up",
                        "properties": {
                            "arabic": {"type": "INTEGER"},
                            "thai_text": {"type": "STRING"}
                        }
                    }
                }
            },
            "ballot_statistics": {
                "type": "OBJECT",
                "description": "Section 2: Ballot accounting",
                "properties": {
                    "ballots_allocated": {
                        "type": "OBJECT",
                        "description": "2.1 Allocated ballots",
                        "properties": {
                            "arabic": {"type": "INTEGER"},
                            "thai_text": {"type": "STRING"}
                        }
                    },
                    "ballots_used": {
                        "type": "OBJECT",
                        "description": "2.2 Used ballots",
                        "properties": {
                            "arabic": {"type": "INTEGER"},
                            "thai_text": {"type": "STRING"}
                        },
                        "required": ["arabic"]
                    },
                    "good_ballots": {
                        "type": "OBJECT",
                        "description": "2.2.1 Valid ballots",
                        "properties": {
                            "arabic": {"type": "INTEGER"},
                            "thai_text": {"type": "STRING"}
                        },
                        "required": ["arabic"]
                    },
                    "bad_ballots": {
                        "type": "OBJECT",
                        "description": "2.2.2 Invalid ballots",
                        "properties": {
                            "arabic": {"type": "INTEGER"},
                            "thai_text": {"type": "STRING"}
                        },
                        "required": ["arabic"]
                    },
                    "no_vote_ballots": {
                        "type": "OBJECT",
                        "description": "2.2.3 No vote ballots",
                        "properties": {
                            "arabic": {"type": "INTEGER"},
                            "thai_text": {"type": "STRING"}
                        },
                        "required": ["arabic"]
                    },
                    "ballots_remaining": {
                        "type": "OBJECT",
                        "description": "2.3 Remaining ballots",
                        "properties": {
                            "arabic": {"type": "INTEGER"},
                            "thai_text": {"type": "STRING"}
                        }
                    }
                }
            },
            "vote_results": {
                "type": "ARRAY",
                "description": "Section 3: Vote counts for all candidates/parties",
                "items": {
                    "type": "OBJECT",
                    "properties": {
                        "number": {"type": "INTEGER", "description": "Candidate/Party number"},
                        "candidate_name": {
                            "type": "STRING",
                            "description": "Candidate name (for Constituency forms only)"
                        },
                        "party_name": {"type": "STRING", "description": "Party name"},
                        "vote_count": {
                            "type": "OBJECT",
                            "description": "Vote count (both number and Thai text)",
                            "properties": {
                                "arabic": {"type": "INTEGER"},
                                "thai_text": {"type": "STRING"}
                            },
                            "required": ["arabic"]
                        }
                    },
                    "required": ["number", "vote_count"]
                },
            },
            "total_votes_recorded": {
                "type": "OBJECT",
                "description": "Total vote count from bottom of table (for validation)",
                "properties": {
                    "arabic": {"type": "INTEGER"},
                    "thai_text": {"type": "STRING"}
                }
            },
            "officials": {
                "type": "ARRAY",
                "description": "Committee members who signed the form",
                "items": {
                    "type": "OBJECT",
                    "properties": {
                        "name": {"type": "STRING", "description": "Full name"},
                        "position": {"type": "STRING", "description": "Position (‡∏õ‡∏£‡∏∞‡∏ò‡∏≤‡∏ô, ‡∏Å‡∏£‡∏£‡∏°‡∏Å‡∏≤‡∏£, etc.)"}
                    },
                    "required": ["name", "position"]
                }
            }
        },
        "required": ["form_info", "vote_results"],
    },
}

## 5. Initialize Clients

In [96]:
# Initialize BigQuery client
bq_client = bigquery.Client(project=GOOGLE_CLOUD_PROJECT)
print("‚úÖ BigQuery client initialized")

# Initialize Gemini client with API key
if not GEMINI_API_KEY:
    raise ValueError(
        "GEMINI_API_KEY is required! Set it with:\n"
        "  export GEMINI_API_KEY='your-key-here'\n"
        "  or create a .env file"
    )

gemini_client = genai.Client(
    api_key=GEMINI_API_KEY,
    vertexai=False,
)
print("‚úÖ Gemini client initialized (using API key)")
print(f"   Ready to use {MODEL_NAME}")

‚úÖ BigQuery client initialized
‚úÖ Gemini client initialized (using API key)
   Ready to use gemini-3-flash-preview


## 6. Query BigQuery for PDF Files

In [97]:
def query_pdf_files(
    limit: int = 10,
    province: Optional[str] = None,
    min_size_kb: float = 50.0,
    max_size_mb: Optional[float] = 50.0
) -> list[dict]:
    """
    Query BigQuery for PDF files.
    
    Args:
        limit: Maximum number of files to return
        province: Filter by province name (optional)
        min_size_kb: Minimum file size in KB (default: 50 KB to exclude corrupted files)
        max_size_mb: Maximum file size in MB (optional)
    
    Returns:
        List of file metadata dicts
    """
    # Build query
    conditions = ["mime_type = 'application/pdf'"]
    
    # Add minimum size filter (exclude very small/corrupted files)
    min_bytes = int(min_size_kb * 1024)
    conditions.append(f"size >= {min_bytes}")
    
    if province:
        conditions.append(f"province_name = '{province}'")
    
    if max_size_mb:
        max_bytes = int(max_size_mb * 1024 * 1024)
        conditions.append(f"size <= {max_bytes}")
    
    where_clause = " AND ".join(conditions)
    
    query = f"""
    SELECT 
        file_id, 
        path,
        mime_type, 
        folder_id, 
        province_name,
        size,
        mod_time
    FROM `{BQ_TABLE}`
    WHERE {where_clause}
    ORDER BY size ASC
    LIMIT {limit}
    """
    
    print(f"üîç Querying BigQuery...")
    print(f"   Filters: {where_clause}")
    
    # Execute query
    query_job = bq_client.query(query)
    results = query_job.result()
    
    # Convert to list
    files = []
    for row in results:
        files.append({
            "file_id": row.file_id,
            "path": row.path,
            "mime_type": row.mime_type,
            "folder_id": row.folder_id,
            "province_name": row.province_name,
            "size": row.size,
            "size_mb": row.size / (1024 * 1024) if row.size else 0,
            "size_kb": row.size / 1024 if row.size else 0,
            "mod_time": row.mod_time,
        })
    
    print(f"‚úÖ Found {len(files)} file(s)")
    print(f"   Size range: {files[0]['size_kb']:.1f} KB - {files[-1]['size_kb']:.1f} KB" if files else "")
    return files


# Query for files (min 50 KB, max 50 MB)
pdf_files = query_pdf_files(limit=10, min_size_kb=50.0, max_size_mb=50.0)

# Display as DataFrame
if pdf_files:
    df = pd.DataFrame(pdf_files)
    display(df[['province_name', 'path', 'size_mb', 'file_id']].head())

üîç Querying BigQuery...
   Filters: mime_type = 'application/pdf' AND size >= 51200 AND size <= 52428800
‚úÖ Found 10 file(s)
   Size range: 50.0 KB - 50.4 KB


Unnamed: 0,province_name,path,size_mb,file_id
0,‡∏û‡∏¥‡∏à‡∏¥‡∏ï‡∏£,‡πÄ‡∏Ç‡∏ï‡πÄ‡∏•‡∏∑‡∏≠‡∏Å‡∏ï‡∏±‡πâ‡∏á‡∏ó‡∏µ‡πà 3/‡∏≠‡∏≥‡πÄ‡∏†‡∏≠‡πÇ‡∏û‡∏ò‡∏¥‡πå‡∏õ‡∏£‡∏∞‡∏ó‡∏±‡∏ö‡∏ä‡πâ‡∏≤‡∏á/‡∏ó‡∏ï.‡πÇ‡∏û‡∏ò‡∏¥...,0.048851,1_j0DNaqCXIkEk0MK3y0J1eCN3hUOCXeF
1,‡∏û‡∏¥‡∏à‡∏¥‡∏ï‡∏£,‡πÄ‡∏Ç‡∏ï‡πÄ‡∏•‡∏∑‡∏≠‡∏Å‡∏ï‡∏±‡πâ‡∏á‡∏ó‡∏µ‡πà 3/‡∏≠‡∏≥‡πÄ‡∏†‡∏≠‡πÇ‡∏û‡∏ò‡∏¥‡πå‡∏õ‡∏£‡∏∞‡∏ó‡∏±‡∏ö‡∏ä‡πâ‡∏≤‡∏á/‡∏ï‡∏≥‡∏ö‡∏•‡πÑ‡∏ú‡πà...,0.048915,1gDxp58u2W14uhdb6NpRDqxl1d7aa2WFy
2,‡∏û‡∏¥‡∏à‡∏¥‡∏ï‡∏£,‡πÄ‡∏Ç‡∏ï‡πÄ‡∏•‡∏∑‡∏≠‡∏Å‡∏ï‡∏±‡πâ‡∏á‡∏ó‡∏µ‡πà 3/‡∏≠‡∏≥‡πÄ‡∏†‡∏≠‡πÇ‡∏û‡∏ò‡∏¥‡πå‡∏õ‡∏£‡∏∞‡∏ó‡∏±‡∏ö‡∏ä‡πâ‡∏≤‡∏á/‡∏ï‡∏≥‡∏ö‡∏•‡πÑ‡∏ú‡πà...,0.048917,1a5jF1Oyv3UEatBq1MT1ga8kS10uR19-c
3,‡∏û‡∏¥‡∏à‡∏¥‡∏ï‡∏£,‡πÄ‡∏Ç‡∏ï‡πÄ‡∏•‡∏∑‡∏≠‡∏Å‡∏ï‡∏±‡πâ‡∏á‡∏ó‡∏µ‡πà 3/‡∏≠‡∏≥‡πÄ‡∏†‡∏≠‡πÇ‡∏û‡∏ò‡∏¥‡πå‡∏õ‡∏£‡∏∞‡∏ó‡∏±‡∏ö‡∏ä‡πâ‡∏≤‡∏á/‡∏ï‡∏≥‡∏ö‡∏•‡∏î‡∏á‡πÄ...,0.048927,1tzz6gMXk1n3pQtreQWMIU2xlkncg-g_r
4,‡∏û‡∏¥‡∏à‡∏¥‡∏ï‡∏£,‡πÄ‡∏Ç‡∏ï‡πÄ‡∏•‡∏∑‡∏≠‡∏Å‡∏ï‡∏±‡πâ‡∏á‡∏ó‡∏µ‡πà 3/‡∏≠‡∏≥‡πÄ‡∏†‡∏≠‡πÇ‡∏û‡∏ò‡∏¥‡πå‡∏õ‡∏£‡∏∞‡∏ó‡∏±‡∏ö‡∏ä‡πâ‡∏≤‡∏á/‡∏ó‡∏ï.‡πÇ‡∏û‡∏ò‡∏¥...,0.048993,1-MsML3nSXUrscvmzdZb7R4yTkZcuTc5m


failed to send, dropping 2 traces to intake at http://datadog-agent:8126/v0.5/traces: client error (Connect) [1 skipped]


## 7. Select Test File

In [98]:
# Select first file for testing
test_file = pdf_files[0]

print("üìÑ Selected Test File:")
print("=" * 80)
print(f"Province: {test_file['province_name']}")
print(f"Path: {test_file['path']}")
print(f"File ID: {test_file['file_id']}")
print(f"Size: {test_file['size_mb']:.2f} MB")
print(f"Modified: {test_file['mod_time']}")
print()

üìÑ Selected Test File:
Province: ‡∏û‡∏¥‡∏à‡∏¥‡∏ï‡∏£
Path: ‡πÄ‡∏Ç‡∏ï‡πÄ‡∏•‡∏∑‡∏≠‡∏Å‡∏ï‡∏±‡πâ‡∏á‡∏ó‡∏µ‡πà 3/‡∏≠‡∏≥‡πÄ‡∏†‡∏≠‡πÇ‡∏û‡∏ò‡∏¥‡πå‡∏õ‡∏£‡∏∞‡∏ó‡∏±‡∏ö‡∏ä‡πâ‡∏≤‡∏á/‡∏ó‡∏ï.‡πÇ‡∏û‡∏ò‡∏¥‡πå‡∏õ‡∏£‡∏∞‡∏ó‡∏±‡∏ö‡∏ä‡πâ‡∏≤‡∏á/‡∏´‡∏ô‡πà‡∏ß‡∏¢‡πÄ‡∏•‡∏∑‡∏≠‡∏Å‡∏ï‡∏±‡πâ‡∏á‡∏ó‡∏µ‡πà 9/‡∏™‡∏™5‡∏ó‡∏±‡∏ö18 ‡∏ô_09.pdf
File ID: 1_j0DNaqCXIkEk0MK3y0J1eCN3hUOCXeF
Size: 0.05 MB
Modified: 2026-02-10T03:26:51.000Z



In [99]:
@task
def extract_from_drive_url(
    file_info: dict,
    config = None,
) -> list[dict]:
    """
    Extract vote data from a PDF file stored in Google Drive.
    
    Args:
        file_info: Dictionary containing file metadata from BigQuery with keys:
            - file_id: Google Drive file ID
            - province_name: Province name
            - path: File path
            - size_mb: File size in MB
            - folder_id: Google Drive folder ID
        config: ExperimentConfig or dict with model, temperature, max_tokens, thinking_mode.
                If None, uses defaults (MODEL_NAME, temp=0.0, max_tokens=8192)
    
    Returns:
        List of extracted election form data with file_info embedded
    """
    # Use config or defaults
    if config is None:
        config = ExperimentConfig(model=MODEL_NAME, temperature=0.0, max_tokens=8192)
    elif isinstance(config, dict):
        # Convert dict to ExperimentConfig
        config = ExperimentConfig(**config)
    
    # Extract file_id and construct drive_uri
    file_id = file_info['file_id']
    drive_uri = f"https://drive.google.com/uc?export=download&id={file_id}"
    
    print(f"ü§ñ Extracting with {config.model}...")
    print(f"   Temperature: {config.temperature}, Max tokens: {config.max_tokens}")
    if config.thinking_mode:
        print(f"   Thinking mode: {config.thinking_mode}")
    print(f"   Province: {file_info.get('province_name', 'N/A')}")
    print(f"   File: {file_info.get('path', 'N/A')}")
    print(f"   Size: {file_info.get('size_mb', 0):.2f} MB")
    print(f"   Using Google Drive URI (External URL method)")
    
    # Create file part from URI
    file_part = types.Part.from_uri(
        file_uri=drive_uri,
        mime_type="application/pdf"
    )
    
    # Enhanced extraction prompt
    prompt = """
    You are an expert data entry assistant for Thai Election documents (Form S.S. 5/18).
    
    CRITICAL INSTRUCTIONS:
    
    1. **Analyze all pages** of this PDF document carefully.
    
    2. **Extract BOTH number formats** for all numerical values:
       - Arabic numerals (e.g., 120)
       - Thai text (e.g., "‡∏´‡∏ô‡∏∂‡πà‡∏á‡∏£‡πâ‡∏≠‡∏¢‡∏¢‡∏µ‡πà‡∏™‡∏¥‡∏ö")
       This applies to: voter statistics, ballot statistics, vote counts, and total votes.
    
    3. **Header Information** (usually on first page):
       - Form type: "Constituency" (‡πÅ‡∏ö‡∏ö‡πÅ‡∏ö‡πà‡∏á‡πÄ‡∏Ç‡∏ï) or "PartyList" (‡∏ö‡∏±‡∏ç‡∏ä‡∏µ‡∏£‡∏≤‡∏¢‡∏ä‡∏∑‡πà‡∏≠)
       - Set number (‡∏ä‡∏∏‡∏î‡∏ó‡∏µ‡πà) if present
       - Date, Province, District, Sub-district
       - Polling station number (‡∏´‡∏ô‡πà‡∏ß‡∏¢‡πÄ‡∏•‡∏∑‡∏≠‡∏Å‡∏ï‡∏±‡πâ‡∏á‡∏ó‡∏µ‡πà)
       - Village number (‡∏´‡∏°‡∏π‡πà‡∏ó‡∏µ‡πà) if present
    
    4. **Section 1 - Voter Statistics:**
       - 1.1 Eligible voters (‡∏ú‡∏π‡πâ‡∏°‡∏µ‡∏™‡∏¥‡∏ó‡∏ò‡∏¥‡πÄ‡∏•‡∏∑‡∏≠‡∏Å‡∏ï‡∏±‡πâ‡∏á‡∏ï‡∏≤‡∏°‡∏ö‡∏±‡∏ç‡∏ä‡∏µ)
       - 1.2 Present voters (‡∏ú‡∏π‡πâ‡∏°‡∏≤‡πÅ‡∏™‡∏î‡∏á‡∏ï‡∏ô)
       Extract both arabic and thai_text for each.
    
    5. **Section 2 - Ballot Statistics:**
       - 2.1 Allocated ballots (‡∏ö‡∏±‡∏ï‡∏£‡∏ó‡∏µ‡πà‡πÑ‡∏î‡πâ‡∏£‡∏±‡∏ö‡∏à‡∏±‡∏î‡∏™‡∏£‡∏£)
       - 2.2 Used ballots (‡∏ö‡∏±‡∏ï‡∏£‡∏ó‡∏µ‡πà‡πÉ‡∏ä‡πâ)
       - 2.2.1 Valid ballots (‡∏ö‡∏±‡∏ï‡∏£‡∏î‡∏µ)
       - 2.2.2 Invalid ballots (‡∏ö‡∏±‡∏ï‡∏£‡πÄ‡∏™‡∏µ‡∏¢)
       - 2.2.3 No vote ballots (‡πÑ‡∏°‡πà‡πÄ‡∏•‡∏∑‡∏≠‡∏Å)
       - 2.3 Remaining ballots (‡∏ö‡∏±‡∏ï‡∏£‡πÄ‡∏´‡∏•‡∏∑‡∏≠)
       Extract both arabic and thai_text for each.
    
    6. **Section 3 - Vote Results Table:**
       - Consolidate all pages (table often spans multiple pages)
       - For each entry: number, candidate name (if Constituency), party name, vote count
       - Extract vote_count as {arabic: int, thai_text: str}
    
    7. **Total Votes Recorded:**
       - Look for "‡∏£‡∏ß‡∏°" (total) at the bottom of the vote results table
       - Extract both arabic and thai_text
    
    8. **Officials (Committee Members):**
       - Extract names and positions from signature section
       - Common positions: ‡∏õ‡∏£‡∏∞‡∏ò‡∏≤‡∏ô (Chair), ‡∏Å‡∏£‡∏£‡∏°‡∏Å‡∏≤‡∏£ (Member), ‡πÄ‡∏•‡∏Ç‡∏≤‡∏ô‡∏∏‡∏Å‡∏≤‡∏£ (Secretary)
    
    9. **Validation:**
       - ballots_used.arabic = good_ballots.arabic + bad_ballots.arabic + no_vote_ballots.arabic
       - total_votes_recorded.arabic = sum of all vote_count.arabic
    """
    
    # Configure generation
    gen_config_params = {
        "response_mime_type": "application/json",
        "response_schema": ELECTION_DATA_SCHEMA,
        "temperature": config.temperature,
        "max_output_tokens": config.max_tokens,
        "top_p": 0.95,
        "top_k": 40,
    }
    
    # Add thinking_mode if supported
    if config.thinking_mode:
        gen_config_params["thinking_mode"] = config.thinking_mode
    
    generation_config = types.GenerateContentConfig(**gen_config_params)
    
    # Generate content (ddtrace will automatically capture this as LLM trace)
    print("   Sending request to Gemini...")
    response = gemini_client.models.generate_content(
        model=config.model,
        contents=[file_part, prompt],
        config=generation_config,
    )
    
    # Parse response
    result = json.loads(response.text)
    
    print(f"\n‚úÖ Extraction complete!")
    print(f"   Extracted {len(result)} report(s)")
    
    # Add file_info to each report in result
    for report in result:
        report['file_info'] = file_info
    
    return result


print("‚úÖ Updated extraction function - uses ddtrace for LLM observability")

‚úÖ Updated extraction function - uses ddtrace for LLM observability


In [103]:
@task
def extraction_task(input_data, config=None) -> list[dict]:
    """
    Task wrapper for LLMObs experiments.
    
    Args:
        input_data: File metadata from dataset (contains file_info)
        config: Optional experiment configuration (ExperimentConfig or None)
    
    Returns:
        List of extracted reports with file_info
    """
    # Use config or create default
    if config is None:
        config = ExperimentConfig(model=MODEL_NAME, temperature=0.0, max_tokens=8192)
    elif isinstance(config, dict):
        # Convert dict to ExperimentConfig
        config = ExperimentConfig(**config)
    
    return extract_from_drive_url(input_data, config=config)


## Run Experiments with Dataset

## 12. LLMObs Experiments

In [None]:
# Run experiments using LLMObs.experiment() API

print("üöÄ LLMObs Experiments Setup\n")
print(f"Dataset: {len(list(dataset))} items")
print(f"Configurations: {len(EXPERIMENT_CONFIGS)}")
print(f"ML App: {ML_APP}")
print(f"Project: {LLMOBS_PROJECT_NAME}\n")

print("Available configurations:")
for i, config in enumerate(EXPERIMENT_CONFIGS, 1):
    thinking = f" (thinking: {config.thinking_mode})" if config.thinking_mode else ""
    print(f"  {i}. {config.model}{thinking}")

print("\n" + "="*80)
print("CHOOSE AN OPTION:")
print("="*80)

print("\nOption 1: Run single experiment (recommended for testing)")
print("  experiment = run_single_experiment(EXPERIMENT_CONFIGS[0], dataset)")

print("\nOption 2: Run specific configuration")
print("  # Example: Test gemini-3-flash-preview with thinking LOW")
print("  config = EXPERIMENT_CONFIGS[0]")
print("  experiment = run_single_experiment(config, dataset)")

print("\nOption 3: Run all experiments (takes longer)")
print("  experiments = run_all_experiments(dataset, EXPERIMENT_CONFIGS)")

print("\nOption 4: Run subset of experiments")
print("  # Test only gemini-3 models")
print("  gemini3_configs = [c for c in EXPERIMENT_CONFIGS if 'gemini-3' in c.model]")
print("  experiments = run_all_experiments(dataset, gemini3_configs)")

print("\n" + "="*80)
print("üí° Uncomment one of the options below to run:")
print("="*80 + "\n")

# OPTION 1: Single experiment (quick test)
# experiment = run_single_experiment(EXPERIMENT_CONFIGS[0], dataset)

# OPTION 2: Specific config
# config = EXPERIMENT_CONFIGS[1]  # gemini-3-flash-preview (HIGH)
# experiment = run_single_experiment(config, dataset)

# OPTION 3: All experiments
# experiments = run_all_experiments(dataset, EXPERIMENT_CONFIGS)

# OPTION 4: Subset (gemini-3 only)
# gemini3_configs = [c for c in EXPERIMENT_CONFIGS if 'gemini-3' in c.model]
# experiments = run_all_experiments(dataset, gemini3_configs)

print("Select and uncomment one option above, then run this cell.")

üöÄ LLMObs Experiments Setup

Dataset: 5 items
Configurations: 10
ML App: gemini-ss5_18
Project: vote-extraction-project

Available configurations:
  1. gemini-3-flash-preview (thinking: LOW)
  2. gemini-3-flash-preview (thinking: HIGH)
  3. gemini-3-pro-preview (thinking: LOW)
  4. gemini-3-pro-preview (thinking: HIGH)
  5. gemini-2.5-flash
  6. gemini-3-flash-preview (thinking: LOW)
  7. gemini-3-flash-preview (thinking: HIGH)
  8. gemini-3-pro-preview (thinking: LOW)
  9. gemini-3-pro-preview (thinking: HIGH)
  10. gemini-2.5-flash

CHOOSE AN OPTION:

Option 1: Run single experiment (recommended for testing)
  experiment = run_single_experiment(EXPERIMENT_CONFIGS[0], dataset)

Option 2: Run specific configuration
  # Example: Test gemini-3-flash-preview with thinking LOW
  config = EXPERIMENT_CONFIGS[0]
  experiment = run_single_experiment(config, dataset)

Option 3: Run all experiments (takes longer)
  experiments = run_all_experiments(dataset, EXPERIMENT_CONFIGS)

Option 4: Run s

In [105]:
experiments = run_all_experiments(dataset, EXPERIMENT_CONFIGS)

üöÄ Running Multiple Experiments

Total configurations: 10
Dataset items: 5
Total experiments: 10


Experiment 1/10

üöÄ Running experiment: gemini_3_flash_preview_thinking_LOW_temp0.0
   Model: gemini-3-flash-preview
   Temperature: 0.0
   Thinking mode: LOW
   Dataset items: 5


‚úÖ Experiment completed: gemini_3_flash_preview_thinking_LOW_temp0.0
   Results available in Datadog LLMObs dashboard

Experiment 2/10

üöÄ Running experiment: gemini_3_flash_preview_thinking_HIGH_temp0.0
   Model: gemini-3-flash-preview
   Temperature: 0.0
   Thinking mode: HIGH
   Dataset items: 5


‚úÖ Experiment completed: gemini_3_flash_preview_thinking_HIGH_temp0.0
   Results available in Datadog LLMObs dashboard

Experiment 3/10

üöÄ Running experiment: gemini_3_pro_preview_thinking_LOW_temp0.0
   Model: gemini-3-pro-preview
   Temperature: 0.0
   Thinking mode: LOW
   Dataset items: 5


‚úÖ Experiment completed: gemini_3_pro_preview_thinking_LOW_temp0.0
   Results available in Datadog LLMObs dash

In [None]:
# View experiment results summary
# Run this after experiments complete

if 'experiment' in locals():
    print("üìä Single Experiment Results\n")
    print(f"Experiment: {experiment}")
    print(f"\nüí° View detailed results in Datadog LLMObs dashboard:")
    print(f"   Project: {LLMOBS_PROJECT_NAME}")
    print(f"   ML App: {ML_APP}")
    print(f"\nThe experiment has been submitted to Datadog LLMObs.")
    print(f"Results will be available in the dashboard shortly.")
    
elif 'experiments' in locals() and experiments:
    print("üìä Multiple Experiments Results\n")
    print(f"Total experiments completed: {len(experiments)}\n")
    
    print("Experiments:")
    for i, exp in enumerate(experiments, 1):
        print(f"  {i}. {exp}")
    
    print(f"\nüí° View detailed results in Datadog LLMObs dashboard:")
    print(f"   Project: {LLMOBS_PROJECT_NAME}")
    print(f"   ML App: {ML_APP}")
    
    print(f"\nAll {len(experiments)} experiments have been submitted to Datadog LLMObs.")
    print(f"Results and comparisons will be available in the dashboard shortly.")
    
    print("\n" + "="*80)
    print("üìà What to check in Datadog LLMObs:")
    print("="*80)
    print("  1. Evaluation scores for each configuration")
    print("  2. Pass/fail rates per evaluator")
    print("  3. Token usage and costs")
    print("  4. Thinking mode performance comparison")
    print("  5. Model-by-model accuracy breakdown")
    print("  6. Individual trace details for debugging")
    
else:
    print("‚ùå No experiments found.")
    print("\nRun the cell above to start experiments first.")
    print("\nAfter experiments complete, this cell will show a summary.")

In [None]:
result = extract_from_drive_url(
    file_info=test_file,
    model=MODEL_NAME,
    temperature=0.0,
    max_tokens=32000,
)

### üìä Viewing Results in Datadog

After running experiments, view comprehensive results in **Datadog LLMObs Dashboard**:

**What you'll see:**
- ‚úÖ **Experiment Comparison**: Side-by-side performance metrics
- ‚úÖ **Evaluation Scores**: Ballot, voter, and total votes accuracy
- ‚úÖ **Pass/Fail Rates**: For each evaluator across configs
- ‚úÖ **Token Usage**: Cost analysis per configuration
- ‚úÖ **Thinking Mode Impact**: LOW vs HIGH performance
- ‚úÖ **Individual Traces**: Debug specific extractions

**Key Metrics:**
1. **ballot_statistics**: Correctness of ballot counts (‚â•85% = pass)
2. **voter_statistics**: Accuracy of voter numbers (100% = pass)
3. **total_votes**: Sum validation and expected match (100% = pass)

**Dashboard URL**: Check your Datadog account ‚Üí LLM Observability ‚Üí Experiments

**Filter by:**
- Project: `{LLMOBS_PROJECT_NAME}`
- ML App: `{ML_APP}`
- Dataset: `ss5_18_nuttee`

## 10. Display Results

In [None]:
def get_number_value(num_obj) -> int:
    """Extract arabic number from NumberTextPair or plain int."""
    if isinstance(num_obj, dict):
        return num_obj.get('arabic', 0)
    elif isinstance(num_obj, int):
        return num_obj
    return 0


def get_thai_text(num_obj) -> str:
    """Extract Thai text from NumberTextPair or return empty string."""
    if isinstance(num_obj, dict):
        return num_obj.get('thai_text', '')
    return ''


def display_results(result: list[dict]):
    """Display enhanced extraction results with NumberTextPair support."""
    if not result:
        print("‚ùå No data extracted")
        return
    
    for idx, report in enumerate(result, 1):
        print(f"\n{'='*80}")
        print(f"REPORT #{idx}")
        print(f"{'='*80}")
        
        # Form Info
        form_info = report.get("form_info", {})
        print(f"\nüìã FORM INFORMATION")
        print(f"   Form Type: {form_info.get('form_type', 'N/A')}")
        
        # Show set_number if available
        if form_info.get('set_number'):
            print(f"   Set Number: {form_info.get('set_number')}")
        
        print(f"   Province: {form_info.get('province', 'N/A')}")
        print(f"   District: {form_info.get('district', 'N/A')}")
        
        if form_info.get('sub_district'):
            print(f"   Sub-district: {form_info.get('sub_district')}")
        
        print(f"   Station: {form_info.get('polling_station_number', 'N/A')}")
        
        # Show village_moo if available
        if form_info.get('village_moo'):
            print(f"   Village (‡∏´‡∏°‡∏π‡πà): {form_info.get('village_moo')}")
        
        if form_info.get('date'):
            print(f"   Date: {form_info.get('date')}")
        
        # Voter Statistics
        voter_stats = report.get("voter_statistics")
        if voter_stats and (voter_stats.get("eligible_voters") or voter_stats.get("present_voters")):
            print(f"\nüë• VOTER STATISTICS")
            
            eligible = voter_stats.get("eligible_voters")
            if eligible:
                arabic = get_number_value(eligible)
                thai = get_thai_text(eligible)
                if thai:
                    print(f"   Eligible: {arabic:,} ({thai})")
                else:
                    print(f"   Eligible: {arabic:,}")
            
            present = voter_stats.get("present_voters")
            if present:
                arabic = get_number_value(present)
                thai = get_thai_text(present)
                if thai:
                    print(f"   Present: {arabic:,} ({thai})")
                else:
                    print(f"   Present: {arabic:,}")
        
        # Ballot Statistics
        ballot_stats = report.get("ballot_statistics")
        if ballot_stats:
            print(f"\nüì¶ BALLOT STATISTICS")
            
            # Extract values safely
            used = get_number_value(ballot_stats.get('ballots_used'))
            good = get_number_value(ballot_stats.get('good_ballots'))
            bad = get_number_value(ballot_stats.get('bad_ballots'))
            no_vote = get_number_value(ballot_stats.get('no_vote_ballots'))
            allocated = get_number_value(ballot_stats.get('ballots_allocated'))
            remaining = get_number_value(ballot_stats.get('ballots_remaining'))
            
            if allocated > 0:
                print(f"   Allocated: {allocated:,}")
            if used > 0:
                print(f"   Used: {used:,}")
            if good > 0:
                print(f"   - Good: {good:,}")
            if bad > 0:
                print(f"   - Bad: {bad:,}")
            if no_vote > 0:
                print(f"   - No Vote: {no_vote:,}")
            if remaining > 0:
                print(f"   Remaining: {remaining:,}")
            
            # Validation
            if used > 0 and (good > 0 or bad > 0 or no_vote > 0):
                expected = good + bad + no_vote
                if used == expected:
                    print(f"   ‚úÖ Validation: PASSED ({used:,} = {expected:,})")
                else:
                    print(f"   ‚ö†Ô∏è  Validation: FAILED ({used:,} ‚â† {expected:,})")
        
        # Vote Results
        vote_results = report.get("vote_results", [])
        if vote_results:
            print(f"\nüìä VOTE RESULTS ({len(vote_results)} entries)")
            
            # Create DataFrame
            df_data = []
            for v in vote_results:
                vote_count_obj = v.get("vote_count")
                vote_arabic = get_number_value(vote_count_obj)
                vote_thai = get_thai_text(vote_count_obj)
                
                row = {
                    "#": v.get("number"),
                    "Candidate": v.get("candidate_name") or "-",
                    "Party": v.get("party_name") or "-",
                    "Votes": vote_arabic,
                }
                
                # Add Thai text column if any results have it
                if vote_thai:
                    row["Votes (Thai)"] = vote_thai[:30] + "..." if len(vote_thai) > 30 else vote_thai
                
                df_data.append(row)
            
            df = pd.DataFrame(df_data)
            display(df)
            
            # Calculate total
            total = df["Votes"].sum()
            print(f"\n   Calculated Total: {total:,}")
            
            # Show recorded total if available
            total_recorded = report.get("total_votes_recorded")
            if total_recorded:
                recorded_arabic = get_number_value(total_recorded)
                recorded_thai = get_thai_text(total_recorded)
                
                if recorded_thai:
                    print(f"   Recorded Total: {recorded_arabic:,} ({recorded_thai})")
                else:
                    print(f"   Recorded Total: {recorded_arabic:,}")
                
                # Validation
                if total == recorded_arabic:
                    print(f"   ‚úÖ Total validation: PASSED")
                else:
                    print(f"   ‚ö†Ô∏è  Total validation: FAILED ({total:,} ‚â† {recorded_arabic:,})")
        
        # Officials
        officials = report.get("officials")
        if officials and len(officials) > 0:
            print(f"\nüëî COMMITTEE MEMBERS ({len(officials)} members)")
            for i, official in enumerate(officials[:10], 1):  # Show max 10
                name = official.get('name', 'N/A')
                position = official.get('position', 'N/A')
                print(f"   {i}. {name} - {position}")
            
            if len(officials) > 10:
                print(f"   ... and {len(officials) - 10} more")


# Display results
try:
    display_results(result)
except Exception as e:
    print(f"‚ùå Error displaying results: {e}")
    import traceback
    traceback.print_exc()
    
    # Show raw result for debugging
    print("\nüîç Raw result (first 500 chars):")
    print(json.dumps(result, ensure_ascii=False, indent=2)[:500])

In [None]:
def validate_extraction(data: dict) -> tuple[bool, list[str]]:
    """
    Enhanced validation with NumberTextPair support.
    
    Args:
        data: Extracted form data
    
    Returns:
        Tuple of (is_valid, list of error messages)
    """
    errors = []
    warnings = []
    
    # 1. Ballot statistics validation
    ballot_stats = data.get("ballot_statistics")
    if ballot_stats:
        used = get_number_value(ballot_stats.get("ballots_used"))
        good = get_number_value(ballot_stats.get("good_ballots"))
        bad = get_number_value(ballot_stats.get("bad_ballots"))
        no_vote = get_number_value(ballot_stats.get("no_vote_ballots"))
        
        expected_total = good + bad + no_vote
        
        if used != expected_total:
            errors.append(
                f"Ballot mismatch: ballots_used ({used:,}) != "
                f"good+bad+no_vote ({expected_total:,})"
            )
    
    # 2. Total votes validation (NEW!)
    vote_results = data.get("vote_results", [])
    total_recorded = data.get("total_votes_recorded")
    
    if vote_results and total_recorded:
        # Sum up all vote counts
        calculated_total = sum(get_number_value(v.get("vote_count")) for v in vote_results)
        recorded_total = get_number_value(total_recorded)
        
        if calculated_total != recorded_total:
            errors.append(
                f"Vote total mismatch: sum of votes ({calculated_total:,}) != "
                f"recorded total ({recorded_total:,})"
            )
    
    # 3. Voter statistics vs ballot statistics (NEW!)
    voter_stats = data.get("voter_statistics")
    if voter_stats and ballot_stats:
        present = get_number_value(voter_stats.get("present_voters"))
        used = get_number_value(ballot_stats.get("ballots_used"))
        
        # Present voters should roughly match ballots used (allow small discrepancy)
        discrepancy = abs(present - used)
        if discrepancy > 5:
            warnings.append(
                f"Voter count ({present:,}) differs from ballots used ({used:,}) by {discrepancy}"
            )
    
    # 4. Vote count non-negative check
    for i, result in enumerate(vote_results, 1):
        vote_count = get_number_value(result.get("vote_count"))
        if vote_count < 0:
            name = result.get("candidate_name") or result.get("party_name") or f"Entry #{i}"
            errors.append(f"Negative vote count for {name}: {vote_count}")
    
    # 5. Check for empty vote results
    if not vote_results:
        errors.append("No vote results extracted")
    
    # Display results
    print(f"\n{'='*80}")
    print(f"VALIDATION RESULTS")
    print(f"{'='*80}")
    
    if errors:
        print(f"\n‚ùå ERRORS ({len(errors)}):")
        for error in errors:
            print(f"   - {error}")
    
    if warnings:
        print(f"\n‚ö†Ô∏è  WARNINGS ({len(warnings)}):")
        for warning in warnings:
            print(f"   - {warning}")
    
    if not errors and not warnings:
        print(f"\n‚úÖ All validation checks PASSED!")
    elif not errors:
        print(f"\n‚úÖ No errors, but {len(warnings)} warning(s)")
    
    return len(errors) == 0, errors


# Run validation on extracted data
is_valid, errors = validate_extraction(result[0] if result else {})

print(f"\n{'='*80}")
print(f"Overall: {'‚úÖ VALID' if is_valid else '‚ùå INVALID'}")

In [None]:
# Validate each report with enhanced Pydantic models
print("\n" + "="*80)
print("PYDANTIC MODEL VALIDATION")
print("="*80)

for idx, report_data in enumerate(result, 1):
    try:
        # Parse into Pydantic model
        form_data = ElectionFormData(**report_data)
        
        print(f"\n‚úÖ Report #{idx} - Pydantic validation PASSED")
        print(f"   Form Type: {form_data.form_info.form_type}")
        print(f"   District: {form_data.form_info.district}")
        print(f"   Set Number: {form_data.form_info.set_number or 'N/A'}")
        print(f"   Village: {form_data.form_info.village_moo or 'N/A'}")
        
        # Show voter statistics if available
        if form_data.voter_statistics:
            if form_data.voter_statistics.eligible_voters:
                print(f"   Eligible Voters: {form_data.voter_statistics.eligible_voters.arabic:,}")
            if form_data.voter_statistics.present_voters:
                print(f"   Present Voters: {form_data.voter_statistics.present_voters.arabic:,}")
        
        # Show ballot statistics
        if form_data.ballot_statistics and form_data.ballot_statistics.ballots_used:
            print(f"   Ballots Used: {form_data.ballot_statistics.ballots_used.arabic:,}")
        
        # Show vote results count
        print(f"   Vote Results: {len(form_data.vote_results)} entries")
        
        # Show total votes if available
        if form_data.total_votes_recorded:
            print(f"   Total Votes: {form_data.total_votes_recorded.arabic:,}")
        
        # Show officials count if available
        if form_data.officials:
            print(f"   Officials: {len(form_data.officials)} members")
        
    except Exception as e:
        print(f"\n‚ùå Report #{idx} - Pydantic validation FAILED")
        print(f"   Error: {e}")
        
        # Show which field caused the error
        import traceback
        error_details = traceback.format_exc()
        if "Field required" in str(e):
            print(f"   Hint: Missing required field")
        elif "validation error" in str(e).lower():
            print(f"   Hint: Data type mismatch")
        
        # Show first few lines of error for debugging
        error_lines = error_details.split('\n')
        relevant_lines = [line for line in error_lines if 'Field' in line or 'validation' in line.lower()]
        if relevant_lines:
            print(f"   Details: {relevant_lines[0][:100]}")

## 11. Validate with Pydantic

In [None]:
# Validate each report
for idx, report_data in enumerate(result, 1):
    try:
        form_data = ElectionFormData(**report_data)
        print(f"‚úÖ Report #{idx} - Pydantic validation PASSED")
        print(f"   Form Type: {form_data.form_info.form_type}")
        print(f"   District: {form_data.form_info.district}")
        print(f"   Vote Results: {len(form_data.vote_results)} entries")
        print()
    except Exception as e:
        print(f"‚ùå Report #{idx} - Pydantic validation FAILED")
        print(f"   Error: {e}")
        print()

## 12. LLMObs Experiments

In [None]:
dataset = LLMObs.create_dataset(
    dataset_name="ss5_18_nuttee",
    project_name=LLMOBS_PROJECT_NAME, # optional, defaults to project_name used in LLMObs.enable
)

In [None]:
dataset = LLMObs.pull_dataset(
    dataset_name="ss5_18_nuttee",
    project_name=LLMOBS_PROJECT_NAME,
)

# Get dataset length
print(len(dataset))

# Convert dataset to pandas DataFrame
df = dataset.as_dataframe()
print(df.head())

## Summary

This notebook demonstrates the **most efficient workflow** for vote extraction:

‚úÖ **BigQuery Integration** - Query metadata to find files  
‚úÖ **Google Drive Direct Access** - No local downloads needed!  
‚úÖ **External URLs Method** - Gemini fetches files directly  
‚úÖ **Structured Output** - Guaranteed JSON schema  
‚úÖ **Pydantic Validation** - Type-safe data models  
‚úÖ **Datadog LLMObs** - Automatic LLM trace collection  

## Key Advantages Over Local PDF Processing:

1. **No Local Storage** - Files stay in Google Drive
2. **No PDF Conversion** - Gemini handles PDF directly
3. **Faster** - No download/upload overhead
4. **Scalable** - Easy to process thousands of files
5. **Cost Effective** - No egress charges for data transfer
6. **Full Observability** - ddtrace captures all LLM interactions

## Next Steps:

1. **Process by Province** - Filter BigQuery by province
2. **Save to BigQuery** - Store results back in BigQuery
3. **Error Handling** - Add retry logic for failed extractions
4. **Monitoring** - Track processing status via Datadog LLMObs
5. **Automation** - Schedule regular processing with Cloud Functions