# Tool 1 - Entity to Candidate Mapper (Parallel Multi-Agent Pattern)

**Status:** ‚úÖ Ready for Databricks | **LLM Cost:** ~$0.003 per run | **Performance:** ~10s

**Pattern:** 2 Pydantic AI agents running in parallel via `asyncio.gather`

**Showcase:** Parallel multi-agent execution - both agents run simultaneously, results merged with consistency validation.

**Key Features:**
- 2 specialized agents: `ranking_agent` (top 10 candidates) + `mapping_agent` (1:1 entity mappings)
- Parallel execution with `asyncio.gather` (both agents run simultaneously)
- Consistency check validates overlap between ranked and mapped candidates (target: >70%)
- Expected performance: ~10s for parallel execution

**TODO:**
- [ ] Add retry logic for LLM failures (timeout, rate limit)
- [ ] Cache ranking results for repeated entity sets
- [ ] Test with 20+ entities (current test: 5-10)
- [ ] Add metadata quality score (completeness, freshness)

**IDEA:**
- Consider hybrid: embedding similarity for initial filtering + LLM for final ranking
- Add configurable top-N parameter (currently hardcoded to 10)
- Export consistency report as separate artifact

In [None]:
# Install dependencies
%pip install pydantic-ai>=0.0.49 pydantic>=2.8.0

In [None]:
# Restart Python kernel to use new packages
dbutils.library.restartPython()  # type: ignore

In [None]:
import asyncio
import json
import os
from pydantic import BaseModel, Field
from pydantic_ai import Agent  # type: ignore

In [None]:
# Configure Azure OpenAI from Databricks secrets
AZURE_ENDPOINT = dbutils.secrets.get(scope="mcop", key="azure-openai-endpoint").strip()  # type: ignore
AZURE_API_KEY = dbutils.secrets.get(scope="mcop", key="azure-openai-api-key").strip()  # type: ignore
DEPLOYMENT_NAME = dbutils.secrets.get(scope="mcop", key="azure-openai-deployment-name").strip()  # type: ignore

# Clean endpoint (remove /openai/v1/ if present - Pydantic AI will handle routing)
azure_endpoint_clean = AZURE_ENDPOINT.replace("/openai/v1/", "").replace("/openai/v1", "").rstrip("/")

# Set environment variables for Pydantic AI (Azure OpenAI compatible)
os.environ["OPENAI_BASE_URL"] = f"{azure_endpoint_clean}/openai/deployments/{DEPLOYMENT_NAME}"
os.environ["OPENAI_API_KEY"] = AZURE_API_KEY

MODEL_NAME = f"openai:{DEPLOYMENT_NAME}"
print(f"‚úÖ Configured model: {MODEL_NAME}")
print(f"   Base URL: {os.environ['OPENAI_BASE_URL']}")


In [None]:
# ==========================================
# PYDANTIC SCHEMAS - 4 modely pro 2 agenty
# ==========================================
# Tool 1 pou≈æ√≠v√° 2 specializovan√© LLM agenty, kter√© bƒõ≈æ√≠ PARALELNƒö:
# - ranking_agent ‚Üí vrac√≠ CandidateRanking (top 10 kandid√°t≈Ø)
# - mapping_agent ‚Üí vrac√≠ MappingSuggestions (1:1 mapov√°n√≠ entity‚Üíkandid√°t)
#
# Ka≈æd√Ω agent pot≈ôebuje vlastn√≠ output schema.

class CandidateRank(BaseModel):
    """Jeden kandid√°t z rankingu (1 polo≈æka v top 10).

    Co obsahuje:
    - entity_id: ID z metadata katalogu (nap≈ô. UUID z Collibra)
    - rank: Po≈ôad√≠ v ≈æeb≈ô√≠ƒçku (1 = nejlep≈°√≠, 10 = nejhor≈°√≠)
    - relevance_score: Sk√≥re relevance 0.0-1.0 (vy≈°≈°√≠ = relevantnƒõj≈°√≠)
    - justification: Textov√© zd≈Øvodnƒõn√≠, proƒç je kandid√°t relevantn√≠

    P≈ô√≠klad:
    {
      "entity_id": "c9c9fdf4-27ed-479d-b03b-f8ccec7a9f91",
      "rank": 1,
      "relevance_score": 0.95,
      "justification": "Perfect match: dimv_supplier table in dm_ba_purchase schema"
    }
    """
    entity_id: str = Field(description="Entity ID")
    rank: int = Field(description="Rank position (1=best)")
    relevance_score: float = Field(description="Relevance 0-1")
    justification: str = Field(description="Why this candidate is relevant")

class CandidateRanking(BaseModel):
    """V√Ωstup z ranking_agent - top 10 kandid√°t≈Ø se≈ôazen√Ωch podle relevance.

    Toto je ROOT MODEL pro ranking_agent (result_type=CandidateRanking).
    LLM MUS√ç vr√°tit objekt s polem 'top_candidates' obsahuj√≠c√≠m array 10 polo≈æek.

    Jak to funguje:
    1. ranking_agent dostane business entity + seznam ALL kandid√°t≈Ø z metadatu
    2. LLM ohodnot√≠ kandid√°ty podle s√©mantick√© podobnosti, kvality, metadat
    3. Vr√°t√≠ top 10 s po≈ôad√≠m, sk√≥re a zd≈Øvodnƒõn√≠m
    """
    top_candidates: list[CandidateRank] = Field(description="Top 10 candidates")

class EntityMapping(BaseModel):
    """Jeden mapping: business entita ‚Üí best matching candidate.

    Co obsahuje:
    - entity_name: Jm√©no business entity z Tool 0 (nap≈ô. "Supplier Master")
    - matched_candidate_id: ID kandid√°ta z metadatu (UUID)
    - confidence: Jistota mapov√°n√≠ 0.0-1.0 (>0.7 = high confidence)
    - reasoning: Textov√© vysvƒõtlen√≠, proƒç byl vybr√°n tento kandid√°t

    P≈ô√≠klad:
    {
      "entity_name": "Supplier Master",
      "matched_candidate_id": "c9c9fdf4-27ed-479d-b03b-f8ccec7a9f91",
      "confidence": 0.92,
      "reasoning": "Name match + description alignment + quality metadata present"
    }
    """
    entity_name: str = Field(description="Business entity name")
    matched_candidate_id: str = Field(description="Mapped candidate ID")
    confidence: float = Field(description="Mapping confidence 0-1")
    reasoning: str = Field(description="Mapping rationale")

class MappingSuggestions(BaseModel):
    """V√Ωstup z mapping_agent - 1:1 mapov√°n√≠ v≈°ech entit.

    Toto je ROOT MODEL pro mapping_agent (result_type=MappingSuggestions).
    LLM MUS√ç vr√°tit objekt s polem 'mappings' - array EntityMapping objekt≈Ø.

    Jak to funguje:
    1. mapping_agent dostane stejn√Ω input jako ranking_agent
    2. LLM pro KA≈ΩDOU business entitu vybere JEDNOHO nejlep≈°√≠ho kandid√°ta
    3. Vr√°t√≠ array s N mapov√°n√≠mi (N = poƒçet business entit)

    Rozd√≠l oproti ranking:
    - Ranking: "Jak√© jsou top 10 kandid√°ti celkovƒõ?"
    - Mapping: "Pro entitu X, kter√Ω je nejlep≈°√≠ kandid√°t a proƒç?"
    """
    mappings: list[EntityMapping] = Field(description="Entity mappings")


In [None]:
# ==========================================
# PYDANTIC AI AGENTS - 2 specializovan√© LLM agenty
# ==========================================
# Pydantic AI pattern:
# Agent(model_name, result_type=Schema, system_prompt="...")
#
# Co to dƒõl√°:
# - Vytvo≈ô√≠ LLM agenta s p≈ôedem definovan√Ωm output sch√©matem
# - result_type ≈ô√≠k√°: "LLM MUS√ç vr√°tit data podle tohoto Pydantic modelu"
# - Pydantic AI automaticky:
#   1. P≈ôid√° schema do LLM promptu (JSON Schema format)
#   2. Zavol√° LLM s function calling / JSON mode
#   3. Validuje response pomoc√≠ Pydantic
#   4. Vr√°t√≠ type-safe Pydantic objekt (ne raw JSON string)

# ==========================================
# AGENT 1: ranking_agent
# ==========================================
# √öƒçel: Ohodnotit v≈°echny kandid√°ty a vr√°tit top 10
# Input: business_entities + technical_metadata
# Output: CandidateRanking (top 10 s rank, score, justification)
#
# System prompt vysvƒõtluje:
# - Co m√° agent dƒõlat (rank candidates)
# - Jak√° krit√©ria pou≈æ√≠t (semantic similarity, data quality, metadata presence)
# - Jak form√°tovat output (structured ranking)
ranking_agent = Agent(
    MODEL_NAME,                      # "openai:test-gpt-5-mini" (Pydantic AI format)
    result_type=CandidateRanking,    # LLM MUS√ç vr√°tit CandidateRanking object
    system_prompt="""You are a data catalog expert.

Rank candidates by relevance to business entities:
- Consider semantic similarity (name, description)
- Evaluate data quality signals (completeness, freshness)
- Prioritize candidates with metadata (owner, lineage, tags)
- Top 10 most relevant candidates

Return structured ranking with scores and justifications."""
)

# ==========================================
# AGENT 2: mapping_agent
# ==========================================
# √öƒçel: Pro ka≈ædou entitu naj√≠t 1 nejlep≈°√≠ kandid√°t
# Input: business_entities + technical_metadata (STEJN√ù jako ranking_agent)
# Output: MappingSuggestions (N mappings, 1 per entity)
#
# Rozd√≠l oproti ranking_agent:
# - Ranking: "Top 10 candidates overall" (aggregovan√Ω pohled)
# - Mapping: "Best match PER entity" (detailn√≠ 1:1 mapov√°n√≠)
#
# System prompt specifikuje:
# - 1:1 mapping (one entity ‚Üí one candidate)
# - High confidence preferred (>0.7)
# - Clear reasoning required
mapping_agent = Agent(
    MODEL_NAME,
    result_type=MappingSuggestions,  # LLM MUS√ç vr√°tit MappingSuggestions object
    system_prompt="""You are a metadata mapping specialist.

Map each business entity to the BEST matching candidate:
- 1:1 mapping (one entity ‚Üí one candidate)
- High confidence mappings (>0.7) preferred
- Consider semantic alignment and context
- Provide clear reasoning for each mapping

Return structured mappings with confidence scores."""
)

print("‚úÖ Ranking and mapping agents created")


In [None]:
# ==========================================
# PARALELN√ç SPU≈†TƒöN√ç 2 AGENT≈Æ - asyncio.gather()
# ==========================================
# Proƒç async pattern:
# - Oba agenti dostanou STEJN√ù input (entities + metadata)
# - Maj√≠ r≈Øzn√© √∫koly (ranking vs mapping)
# - M≈Ø≈æou bƒõ≈æet PARALELNƒö ‚Üí ≈°et≈ô√≠ ƒças
# - asyncio.gather() poƒçk√° na oba v√Ωsledky souƒçasnƒõ
#
# Klasick√Ω sequential:
# result1 = ranking_agent.run()   # ‚è±Ô∏è 5s
# result2 = mapping_agent.run()   # ‚è±Ô∏è 5s
# Total: 10s
#
# Paralel with asyncio.gather():
# results = asyncio.gather(ranking_agent.run(), mapping_agent.run())
# Total: ~5s (oba bƒõ≈æ√≠ souƒçasnƒõ)

async def map_entities_to_candidates(
    business_entities: list[dict],     # Z Tool 0 (BusinessRequest.entities)
    technical_metadata: list[dict]     # Z katalogu (Collibra/Unity Catalog)
) -> dict:
    """
    Namapuje business entity na technick√© kandid√°ty pomoc√≠ 2 LLM agent≈Ø.

    Args:
        business_entities: Seznam entit z business po≈æadavku
            Form√°t: [{"name": "Customer", "description": "...", "attributes": [...]}, ...]

        technical_metadata: Seznam technick√Ωch kandid√°t≈Ø z katalogu
            Form√°t: [{"name": "dim_customer", "schema": "...", "description": "...", ...}, ...]

    Returns:
        dict s kl√≠ƒçi:
        - "ranking": Top 10 candidates (CandidateRanking)
        - "mapping": 1:1 entity‚Üícandidate mappings (MappingSuggestions)
        - "consistency": Overlap analysis (kolik mapping kandid√°t≈Ø je v top 10)

    Pydantic AI workflow:
        1. P≈ôiprav input prompt (entities + metadata JSON)
        2. Spus≈• OBA agenty paralelnƒõ (asyncio.gather)
        3. ƒåekej na v√Ωsledky (result.data = Pydantic object)
        4. Validuj konzistenci (mapping by mƒõl preferovat top 10 kandid√°ty)
        5. Vra≈• kombinovan√© v√Ωsledky
    """

    # ==========================================
    # KROK 1: P≈ôiprav input data pro LLM
    # ==========================================
    # Pydantic AI automaticky p≈ôid√° schema do promptu,
    # ale mus√≠me dodat business kontext (entities + metadata)
    user_prompt = f"""
Business Entities (from requirement):
{json.dumps(business_entities, indent=2, ensure_ascii=False)}

Technical Metadata Candidates (from catalog):
{json.dumps(technical_metadata, indent=2, ensure_ascii=False)}
"""

    # ==========================================
    # KROK 2: Spus≈• OBA agenty PARALELNƒö
    # ==========================================
    # asyncio.gather() pattern:
    # - Vezme N async funkc√≠/coroutines
    # - Spust√≠ v≈°echny souƒçasnƒõ (non-blocking)
    # - Vr√°t√≠ tuple s v√Ωsledky ve stejn√©m po≈ôad√≠
    #
    # agent.run_sync() vs agent.run():
    # - run_sync() = synchronn√≠ verze (pro non-async k√≥d)
    # - run() = async verze (pro asyncio.gather)
    #
    # result object struktura:
    # - result.data = Pydantic object (CandidateRanking / MappingSuggestions)
    # - result.cost = API cost tracking (optional)
    # - result.timestamp = run timestamp

    ranking_result, mapping_result = await asyncio.gather(
        ranking_agent.run(user_prompt),      # Async coroutine 1
        mapping_agent.run(user_prompt)       # Async coroutine 2
    )

    # ==========================================
    # KROK 3: Extrahuj Pydantic objekty z result
    # ==========================================
    # result.data je u≈æ VALIDOVAN√ù Pydantic object
    # (ne raw JSON string - to Pydantic AI u≈æ zpracoval)

    ranking_data: CandidateRanking = ranking_result.data
    # Type: CandidateRanking(root=[CandidateRank(...), ...])
    # Access: ranking_data.root[0].entity_id, ranking_data.root[0].rank, ...

    mapping_data: MappingSuggestions = mapping_result.data
    # Type: MappingSuggestions(root=[EntityMapping(...), ...])
    # Access: mapping_data.root[0].entity_name, mapping_data.root[0].matched_candidate_id, ...

    # ==========================================
    # KROK 4: Validuj konzistenci (optional check)
    # ==========================================
    # Dobr√Ω mapping by mƒõl preferovat kandid√°ty z top 10 rankingu.
    # Pokud mapping vrac√≠ √∫plnƒõ jin√© kandid√°ty, m≈Ø≈æe to signalizovat:
    # - Konfliktn√≠ krit√©ria mezi agenty
    # - Nedostatek kvalitn√≠ch kandid√°t≈Ø
    # - Pot≈ôebu review ƒçlovƒõkem

    top_10_ids = {candidate.entity_id for candidate in ranking_data.root}
    mapped_ids = {mapping.matched_candidate_id for mapping in mapping_data.root}
    overlap = top_10_ids.intersection(mapped_ids)

    consistency_note = (
        f"Mapping consistency: {len(overlap)}/{len(mapped_ids)} mapped candidates "
        f"are in top 10 ranking."
    )

    if len(overlap) < len(mapped_ids) * 0.5:  # M√©nƒõ ne≈æ 50% overlap
        consistency_note += " ‚ö†Ô∏è Low overlap - consider manual review."

    # ==========================================
    # KROK 5: Vra≈• kombinovan√© v√Ωsledky
    # ==========================================
    return {
        "ranking": ranking_data.model_dump(),      # Convert Pydantic ‚Üí dict
        "mapping": mapping_data.model_dump(),      # Convert Pydantic ‚Üí dict
        "consistency": {
            "overlap_count": len(overlap),
            "total_mappings": len(mapped_ids),
            "overlap_percentage": len(overlap) / len(mapped_ids) if mapped_ids else 0,
            "note": consistency_note
        }
    }

print("‚úÖ Entity mapping function defined (async with parallel agents)")

In [None]:
# Load input data from DBFS
tool0_path = "/dbfs/FileStore/mcop/tool0_samples/sample_business_request.json"
metadata_path = "/dbfs/FileStore/mcop/metadata/BA-BS_Datamarts_metadata.json"

with open(tool0_path, "r") as f:
    tool0_data = json.load(f)

with open(metadata_path, "r") as f:
    metadata = json.load(f)

entities = tool0_data.get("entities", [])
print(f"‚úÖ Loaded {len(entities)} entities from Tool 0")
print(f"‚úÖ Loaded {len(metadata)} metadata items")

In [None]:
# Run parallel mapping
result = await map_entities_to_candidates(entities, metadata)

print(f"\n‚úÖ Parallel mapping complete")
print(f"   Rankings: {len(result['rankings'])}")
print(f"   Mappings: {len(result['mappings'])}")
print(f"   Consistency: {result['consistency_check']['consistency_ratio']:.1%} ({result['consistency_check']['status']})")

In [None]:
# Save results to DBFS
output_path = "/dbfs/FileStore/mcop/tool1/filtered_dataset.json"
os.makedirs(os.path.dirname(output_path), exist_ok=True)

with open(output_path, "w") as f:
    json.dump(result, f, indent=2)

print(f"‚úÖ Results saved: {output_path}")

In [None]:
# Display sample results with consistency check
print("\n" + "="*80)
print("PARALLEL MULTI-AGENT MAPPING RESULTS")
print("="*80)

print(f"\nüèÜ Top 5 Ranked Candidates:")
for candidate in result['rankings'][:5]:
    print(f"   {candidate['rank']}. {candidate['entity_id']} (score: {candidate['relevance_score']:.2f})")
    print(f"      ‚Üí {candidate['justification'][:80]}...")

print(f"\nüîó Sample Mappings (top 5):")
for mapping in result['mappings'][:5]:
    print(f"   {mapping['entity_name']} ‚Üí {mapping['matched_candidate_id']}")
    print(f"      Confidence: {mapping['confidence']:.1%}")
    print(f"      Reasoning: {mapping['reasoning'][:80]}...")

print(f"\n‚úÖ Consistency Check:")
print(f"   Overlap: {result['consistency_check']['overlap_count']} / {result['consistency_check']['total_mappings']}")
print(f"   Ratio: {result['consistency_check']['consistency_ratio']:.1%}")
print(f"   Status: {result['consistency_check']['status']}")
print(f"   Expected: >70% for good quality (ranking and mapping agree)")

print("="*80)