# Extract and classify data from text

In [None]:
# Colab stuff

import sys, pathlib

if ('google.colab' in sys.modules) and (not pathlib.Path('repo').exists()):
    !git clone https://github.com/marcelpauly/scicar-agents.git
    %cd scicar-agents
    %pip install -q -r requirements.txt

In [None]:
import os
from typing import Literal

import pandas as pd
from pydantic import BaseModel, Field

from pydantic_ai import Agent, NativeOutput, PromptedOutput, WebSearchTool
from pydantic_ai.models.openai import OpenAIResponsesModel
from pydantic_ai.models.anthropic import AnthropicModel
from pydantic_ai.models.google import GoogleModel

# Allow async code to run inside Jupyter Notebook's existing event loop
import nest_asyncio
nest_asyncio.apply()

# Progress bar for pandas
from tqdm import tqdm
tqdm.pandas()

In [None]:
models = {}

# OpenAI
# Generate API key: https://platform.openai.com/settings/organization/api-keys
# Models: https://platform.openai.com/docs/models
# Pricing: https://platform.openai.com/docs/pricing
os.environ['OPENAI_API_KEY'] = 'YOUR_OPENAI_API_KEY'
models['gpt'] = OpenAIResponsesModel('gpt-5-nano-2025-08-07')

# Anthropic
# Generate API key: https://console.anthropic.com/settings/keys
# Models: https://docs.anthropic.com/en/docs/about-claude/models/overview
# Pricing: https://docs.anthropic.com/en/docs/about-claude/pricing
os.environ['ANTHROPIC_API_KEY'] = 'YOUR_ANTHROPIC_API_KEY'
models['claude'] = AnthropicModel('claude-3-5-haiku-20241022')

# Google
# Generate API key: https://aistudio.google.com/apikey
# Models: https://ai.google.dev/gemini-api/docs/models
# Pricing: https://ai.google.dev/gemini-api/docs/pricing
os.environ['GOOGLE_API_KEY'] = 'YOUR_GOOGLE_API_KEY'
models['gemini'] = GoogleModel('gemini-2.5-flash-lite')

In [None]:
# Choose a model
model = models['gpt']

In [None]:
# Load the MPs disclosure data
df = pd.read_csv('data/mdb_activities.csv')
df.head()

In [None]:
# Prompts

system_prompt = 'You are an expert on entity recognition, data extraction and classification. Return only the structured output requested. You use the tools provided to you: Use the web search (`WebSearchTool`) to find information on the internet.'

def get_prompt(activity: str) -> str:
    return f'''Task: Convert the German free-text disclosure below into exactly ONE `Activities` object (as defined by the schema you were given).

You are extracting ONE activity only. If the input mentions multiple activities, pick the most prominent/first one with the clearest payment info.

Use the web search tool (`WebSearchTool`) when the organization/activity is unclear, to determine its sector and whether it is “self” or “organisation”. Prefer authoritative sources (official org sites, registries, reputable press). If still unclear after a brief search, use `unknown`.

Output rules — STRICT:
- Return only the structured data for a single `Activities` object. No prose, no code fences.
- Do not hallucinate amounts, dates, names, or sectors. Leave fields as specified below when uncertain.
- `is_paid` is `true` only if you can extract at least one numeric payment (→ at least one element in `amounts`). Otherwise `false` and `amounts` must be an empty list.

How to populate fields:

1) activitiy_type:
    - `"self"` for independent/self-employed work (e.g., Landwirt, Rechtsanwältin, Vorträge, Autor).
    - `"organisation"` when there is a role/connection to a company, association, federation, foundation, NGO, think tank, or other organization.

2) name:
    - For `"organisation"`: the organization’s proper name (as written in the disclosure; normalize whitespace; keep legal suffixes like GmbH/AG/e.V.).
    - For `"self"`: the activity label (e.g., "Rechtsanwalt", "Autor", "Vorträge").

3) location:
    Location of the organization or activity (e.g., city name), if explicitly mentioned. Otherwise `null`.

4) sector (choose ONE):
    `agriculture`, `forestry_fishing`, `mining_quarrying`, `energy_utilities`, `renewable_energy`, `manufacturing`, `chemicals`,
    `pharmaceuticals_biotech`, `automotive_mobility`, `aerospace_defense`, `construction_engineering`, `real_estate`,
    `finance_banking`, `insurance`, `asset_management`, `accounting_audit`, `legal_services`, `management_consulting`,
    `it_software`, `technology_hardware`, `telecommunications`, `media_publishing`, `advertising_marketing`,
    `retail_ecommerce`, `wholesale_distribution`, `transport_logistics`, `hospitality_tourism`, `food_beverage`,
    `healthcare_medical`, `education_research`, `sports_culture_arts`, `environment_waste`, `nonprofit_foundation`,
    `trade_association_chamber`, `public_sector_municipal`, `security_private`, `public_affairs_lobbying`, `other`, `unknown`.
    - If unsure after searching, use `unknown`. If clearly outside the list, use `other`.

5) is_paid:
    - `true` iff at least one numeric payment can be extracted.
    - Otherwise `false`.
    - If the text is exclusively honorary/ehrenamtlich and no numeric amount appears, `is_paid=false`.

6) amounts (list of `Payments`):
    - Create one element per distinct numeric payment found.
    - amount (float, EUR):
        • Accept German formats: convert "1.234,56 €" → 1234.56 (dot as decimal separator, no thousands separators).
        • Recognize units: "Tsd.", "T€", "tausend" = ×1,000; "Mio."/"Million(en)" = ×1,000,000.
    - frequency (enum):
        • `"monthly"` for "monatlich", "pro Monat", "mtl."
        • `"quarterly"` for "vierteljährlich", "quartalsweise"
        • `"annual"` for "jährlich", "pro Jahr", "p.a."
        • `"one-time"` for one-off items like "Honorar pro Vortrag", "einmalig", "Auftrittshonorar (single event)"
        • `"other"` for clear periodicity not covered above (e.g., per meeting/session)
        • `"unknown"` if frequency is not stated or ambiguous
    - date / start_date / end_date (strings or null):
        • Use ISO-like strings. Prefer `YYYY-MM` when month is known; else `YYYY`.
        • For one-time payments, set `date` (e.g., "2024-11" or "2024").
        • For durations, set `start_date` and `end_date` when present.
        • Handle German cues: "seit 2021" → start_date="2021"; open-ended → end_date=null. "2019–2022" → start_date="2019", end_date="2022".
        • Do not invent months/days if missing.

Input disclosure (German):
```
{activity}
```'''

In [None]:
# Data structure

class Payments(BaseModel):
    amount: float = Field(
        description=(
            "Amount of money received (EUR). Parse German formats (e.g., '1.234,56 €' → 1234.56). "
            "Use a dot as decimal separator; do not use thousands separators."
        )
    )
    frequency: Literal['monthly', 'quarterly', 'annual', 'one-time', 'other', 'unknown'] = Field(
        description=(
            "Payment periodicity. Map German cues: monatlich→monthly; vierteljährlich/quartalsweise→quarterly; "
            "jährlich/p.a.→annual; einmalig/unregelmäßig/pro Vortrag→one-time; other for other regular patterns; "
            "unknown if ambiguous or unstated."
        )
    )
    date: str | None = Field(
        description=(
            "For one-time payments, the payment date as 'YYYY' or 'YYYY-MM' if the month is known or 'YYYY-MM-DD' for exact dates."
        )
    )
    start_date: str | None = Field(
        description=(
            "Start of a period as 'YYYY' or 'YYYY-MM' or 'YYYY-MM-DD'. Handle 'seit <YYYY>' by setting start_date and leaving end_date null."
        )
    )
    end_date: str | None = Field(
        description=(
            "End of a period as 'YYYY' or 'YYYY-MM' or 'YYYY-MM-DD'. Use null for open-ended ('seit')."
        )
    )

class Activities(BaseModel):
    activity_type: Literal['self', 'organisation'] = Field(
        description=(
            "Self-employed/independent work ('self') vs. roles connected to an organization ('organisation'). "
            "Examples self: Landwirt, Rechtsanwältin, Vorträge, Autor. "
            "Examples organisation: positions in companies, associations, foundations, NGOs, think tanks, chambers."
        )
    )
    name: str = Field(
        description=(
            "Organization name (for 'organisation', keep legal suffixes like GmbH/AG/e.V.) or a concise label of the self-employed activity "
            "(e.g., 'Rechtsanwalt', 'Autor', 'Vorträge'). Drop location information (unless it is part of the name)."
        )
    )
    location: str | None = Field(
        description=(
            "Location of the organization or activity (e.g., city name)."
        )
    )
    sector: Literal[
        'agriculture','forestry_fishing','mining_quarrying','energy_utilities','renewable_energy',
        'manufacturing','chemicals','pharmaceuticals_biotech','automotive_mobility','aerospace_defense',
        'construction_engineering','real_estate','finance_banking','insurance','asset_management',
        'accounting_audit','legal_services','management_consulting','it_software','technology_hardware',
        'telecommunications','media_publishing','advertising_marketing','retail_ecommerce',
        'wholesale_distribution','transport_logistics','hospitality_tourism','food_beverage',
        'healthcare_medical','education_research','sports_culture_arts','environment_waste',
        'nonprofit_foundation','trade_association_chamber','public_sector_municipal','security_private',
        'public_affairs_lobbying','other','unknown'
    ] = Field(
        description=(
            "Economic sector for the activity. Choose the single best fit from the allowed values; "
            "use 'other' for out-of-scope sectors and 'unknown' if unclear after searching."
        )
    )
    is_paid: bool = Field(
        description=(
            "True iff at least one numeric payment was extracted into 'amounts'; otherwise false."
        )
    )
    amounts: list[Payments] = Field(
        description=(
            "List of extracted payments for this activity. Empty list if no numeric payments are present."
        )
    )

In [None]:
def get_output_type(model, output_schema):
    """
    Use NativeOutput for OpenAI models, PromptedOutput for others, to ensure
    each model is able to both use tools and provide structured output.
    """
    if isinstance(model, OpenAIResponsesModel):
        return NativeOutput(output_schema)
    return PromptedOutput(output_schema)

output_type = get_output_type(model, Activities)

agent = Agent(
    model = model,
    system_prompt = system_prompt,
    output_type = output_type,
    builtin_tools = [WebSearchTool()],
    model_settings = {'temperature': 0},
    retries = 2,
    output_retries = 2
)

def run_model(activity):
    prompt = get_prompt(activity)
    result = agent.run_sync(prompt)
    return result.output.model_dump()

In [None]:
df.head()

In [None]:
# Test
activity = df.iloc[2]['info']
print(activity)
print()

prompt = get_prompt(activity)
run_model(activity)

In [None]:
parsed_activities = []

def apply_model_to_all_activities(row):
    try:
        result = run_model(row['info'])
    except Exception as e:
        print(f"Error processing row {row['mdb_id']}: {e}")
        return
    if result['is_paid']:
        for payment in result['amounts']:
            parsed_activities.append({
                'mdb_id': row['mdb_id'],
                'activity_category': row['type'],
                'activity_type': result['activity_type'],
                'activity_name': result['name'],
                'activity_location': result['location'],
                'activity_sector': result['sector'],
                'is_paid': True,
                'payment_amount': payment['amount'],
                'payment_frequency': payment['frequency'],
                'payment_date': payment['date'],
                'payment_start_date': payment['start_date'],
                'payment_end_date': payment['end_date']
            })
    else:
        parsed_activities.append({
            'mdb_id': row['mdb_id'],
            'activity_category': row['type'],
            'activity_type': result['activity_type'],
            'activity_name': result['name'],
            'activity_location': result['location'],
            'activity_sector': result['sector'],
            'is_paid': False,
            'payment_amount': None,
            'payment_frequency': None,
            'payment_date': None,
            'payment_start_date': None,
            'payment_end_date': None
        })

df.progress_apply(apply_model_to_all_activities, axis=1)

df_parsed = pd.DataFrame(parsed_activities)
df_parsed