# 



# Intelligent Real Estate Recommender Pipeline Proof of Concept

This notebook executes the **Intelligent Property Evaluation Pipeline** PoC. The primary goal is to demonstrate the **successful integration and sequential flow** of the multi-agent architecture and its dependencies without requiring external API calls or complex deployment.

The system uses **mock data and deterministic tool functions** to guarantee a successful end-to-end match.

#### PoC Execution Flow:

1.  **Configuration & Mocks:** Define mock tool functions that simulate external API latency and real-world feature enrichment (e.g., walking distance, water proximity).
2.  **Requirements Agent:** Reads a **User Requirement Template** into a structured format (Session/Memory).
3.  **Enrichment Agent:** Executes multiple tools (simulating **parallel agents**) to gather five external data points (e.g., train walk time, garden area).
4.  **Evaluation Agent:** Compares the enriched data against the user requirements using a **weighted scoring system**.
5.  **Notification Agent:** If the final match score is $\ge 90\%$, a **notification is triggered** (simulated email alert).

**Expected Outcome:** The PoC is configured to run on a deliberately chosen property that yields a **$100.0\%$ match score**, proving the agent-to-agent communication and the final success condition.

In [None]:
import os
import math
import json
import subprocess
import time
import uuid
from typing import Any, Dict, List, Optional
import requests
import warnings
from google.adk.agents import LlmAgent, RemoteA2aAgent # Required Agent Types
from google.adk.a2a.utils.agent_to_a2a import to_a2a # For serving the remote agent
from google.adk.models.google_llm import Gemini
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService # Sessions & Memory
from google.genai import types

warnings.filterwarnings("ignore")

print("ADK components imported successfully.")

## Environment Variables

In [None]:
# load these from environment variables.
# --- Configuration & Retry Options ---
retry_config = types.HttpRetryOptions(
    attempts=5,
    exp_base=7,
    initial_delay=1,
    http_status_codes=[429, 500, 503, 504],
)

# Placeholder for real API Key (ensure it's set in your environment if running live)
GOOGLE_MAPS_API_KEY = os.getenv("GOOGLE_MAPS_API_KEY", "MOCK_KEY")

# --- Mock Data to Simulate Postgres/Raw Data ---
MOCK_PROPERTY_DATA = {
    "id": "prop-101",
    "user_id": "user-A",
    "address": "10 Downing St, London SW1A 2AA, UK",
    "lat": 51.5033,
    "lon": -0.1276,
    "description": "A historic property with five bedrooms, significant land area, and excellent city access.",
    "land_area_sqm": 500.0,
    "footprint_sqm": 150.0,
    "num_rooms_structured": 5
}
print("Mock data loaded.")

OVERPASS_URL = "https://overpass-api.de/api/interpreter"

## The Remote Enrichment Agent (A2A Server)

This agent combines all the property tools and is exposed as a microservice. This demonstrates A2A Protocol and Tools (custom, OpenAPI, code execution).

In [None]:
# --- Tool Functions (MOCK IMPLEMENTATIONS) ---

def calculate_walking_distance_to_poi(
    house_address: str,
    destination_keyword: str = "nearest train station"
) -> str:
    """MOCK: Returns distance/duration to train station in JSON string format."""
    return json.dumps({
        "distance_meters": 450,
        "duration_minutes": 5
    })

def get_distance_to_nearest_waterbody(lat: float, lon: float) -> str:
    """MOCK: Returns distance to waterbody in JSON string format."""
    return json.dumps({
        "distance_meters": 1500.0,
        "nearest_waterbody": "River Thames"
    })

def compute_gardenable_area(
    land_area: float,
    footprint_area: float,
) -> str:
    """CODE EXECUTION TOOL: Estimates garden area."""
    buffer_pct = 0.12
    gardenable = land_area - footprint_area - (buffer_pct * land_area)
    gardenable = max(gardenable, 0.0)
    return json.dumps({"gardenable_area_sqm": round(gardenable, 2)})

def enrich_property_data(
    address: str,
    lat: float,
    lon: float,
    land_area_sqm: float,
    footprint_sqm: float
) -> str:
    """
    Primary tool on the remote agent. Calls other tools sequentially/conceptually in parallel.
    This demonstrates the Tools concept (Code Execution, OpenAPI, Custom Geospatial).
    """
    enriched = {}

    # Tool 1: Walking Distance (Google Maps API Mock)
    enriched['train_station'] = json.loads(calculate_walking_distance_to_poi(address))

    # Tool 2: Waterbody Distance (Custom Geospatial Mock)
    enriched['waterbody'] = json.loads(get_distance_to_nearest_waterbody(lat, lon))

    # Tool 3: Gardenable Area (Code Execution Tool)
    enriched['garden'] = json.loads(compute_gardenable_area(land_area_sqm, footprint_sqm))

    return json.dumps(enriched)

# --- Define the Remote Enrichment Agent (LlmAgent) ---
enrichment_agent_code = f"""
import os, json
from google.adk.agents import LlmAgent
from google.adk.a2a.utils.agent_to_a2a import to_a2a
from google.adk.models.google_llm import Gemini
from google.genai import types

retry_config = types.HttpRetryOptions(attempts=5, exp_base=7, initial_delay=1, http_status_codes=[429, 500, 503, 504])

# Tool Mocks defined here (as they must be available in the server process)
def calculate_walking_distance_to_poi(address: str) -> str:
    return json.dumps({{"distance_meters": 450, "duration_minutes": 5}})

def get_distance_to_nearest_waterbody(lat: float, lon: float) -> str:
    return json.dumps({{"distance_meters": 1500.0, "nearest_waterbody": "River Thames"}})

def compute_gardenable_area(land_area: float, footprint_area: float) -> str:
    buffer_pct = 0.12
    gardenable = land_area - footprint_area - (buffer_pct * land_area)
    gardenable = max(gardenable, 0.0)
    return json.dumps({{"gardenable_area_sqm": round(gardenable, 2)}})

def enrich_property_data(address: str, lat: float, lon: float, land_area_sqm: float, footprint_sqm: float) -> str:
    # This simulates the parallel execution of multiple tools (conceptually, inside the remote agent)
    enriched = {{}}
    enriched['train_station'] = json.loads(calculate_walking_distance_to_poi(address))
    enriched['waterbody'] = json.loads(get_distance_to_nearest_waterbody(lat, lon))
    enriched['garden'] = json.loads(compute_gardenable_area(land_area_sqm, footprint_sqm))
    return json.dumps(enriched)

enrichment_agent = LlmAgent(
    model=Gemini(model="gemini-2.5-flash-lite", retry_options=retry_config),
    name="property_enrichment_agent",
    description="A specialist agent that enriches raw property data with external features like transit time, water distance, and calculated garden area.",
    instruction="You are the Property Enrichment Specialist. Your sole task is to take property coordinates and dimensions, and use the provided tool (enrich_property_data) to fetch all required external features. Always return the output of the tool directly without summarization.",
    tools=[enrich_property_data]
)

app = to_a2a(enrichment_agent, port=8002)
"""

# Write and Start the Server (A2A Protocol)
with open("/tmp/enrichment_server.py", "w") as f:
    f.write(enrichment_agent_code)

server_process = subprocess.Popen(
    [
        "uvicorn",
        "enrichment_server:app",
        "--host",
        "localhost",
        "--port",
        "8002",
    ],
    cwd="/tmp",
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    env={**os.environ},
)

print("üöÄ Starting Remote Enrichment Agent server...")
for attempt in range(30):
    try:
        response = requests.get(
            "http://localhost:8002/.well-known/agent-card.json", timeout=1
        )
        if response.status_code == 200:
            print(f"Enrichment Agent server is running at: http://localhost:8002")
            break
    except requests.exceptions.RequestException:
        time.sleep(5)
        print(".", end="", flush=True)
else:
    print("\n‚ö†Ô∏è  Server may not be ready yet.")

globals()["enrichment_server_process"] = server_process

## Tool Helper Function

The PropertyEnrichmentTools class wraps multiple types of tools, satisfying the requirement for diversity: <ul><li>Custom Tools: get_distance_to_water (simulating geospatial Overpass API).</li><li>OpenAPI Tools: get_walking_distance_to_train (simulating Google Maps Distance Matrix API).</li><li>Built-in Tool (Code Execution): estimate_garden_area performs a mathematical calculation based on input data using Python logic.</li></ul>

In [None]:
def haversine_meters(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
    """Calculates the distance between two WGS84 coordinates in meters."""
    R = 6371000.0
    phi1, phi2 = math.radians(lat1), math.radians(lat2)
    dphi = math.radians(lat2 - lat1)
    dlambda = math.radians(lon2 - lon1)
    a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlambda / 2) ** 2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    return R * c

def calculate_walking_distance_to_poi(
    house_address: str,
    destination_keyword: str = "nearest train station"
) -> Optional[Dict[str, Any]]:
    """
    TOOL 1 (MOCK): Returns a distance that meets the requirement (< 15 min walk).
    """
    # 5-minute, 450m walk to nearest simulated train station
    return {
        "distance_meters": 450,
        "distance_km": 0.45,
        "duration_minutes": 5
    }

def get_distance_to_nearest_waterbody(lat: float, lon: float, max_radius_m: int = 10000) -> Dict[str, Any]:
    """
    TOOL 2 (MOCK): Returns a distance that meets the requirement (< 5000m).
    Simulating River Thames proximity.
    """
    return {
        "distance_meters": 1500.0, # 1.5 km away
        "nearest_waterbody": "River Thames",
        "osm_type": "way",
        "osm_id": 12345,
        "feature_tags": {}
    }

def compute_gardenable_area(
    land_area: float,
    footprint_area: Optional[float] = None,
    supplemental_area: float = 0.0,
    buffer_pct: float = 0.12,
    min_value: float = 0.0,
) -> Dict[str, Any]:
    """
    TOOL 3 (CODE EXECUTION): Estimates the available area for gardening.
    This is the only tool function we use **without** modification.
    """
    if land_area is None:
        raise ValueError("land_area is required for computation")

    if footprint_area is None:
        footprint_area = land_area * 0.30

    gardenable = land_area - footprint_area - supplemental_area - (buffer_pct * land_area)
    gardenable = max(gardenable, min_value)
    return {"gardenable_area_sqm": round(gardenable, 2), "components": {"land_area": land_area, "footprint_area": footprint_area, "supplemental_area": supplemental_area, "buffer_pct": buffer_pct}}

ROOM_PATTERNS = [
    r"(\d+)\s*[-/]?\s*rooms?\b",
    r"(\d+)\s*[-/]?\s*bedrooms?\b",
]

def extract_room_count(number_of_rooms: Optional[int] = None, description: Optional[str] = None) -> Dict[str, Any]:
    """
    TOOL 4: Prioritizes structured data (5) which meets the minimum (3).
    """
    if number_of_rooms is not None:
        return {"rooms": int(number_of_rooms), "confidence": 0.99, "source": "schema"}
    return {"rooms": None, "confidence": 0.0, "source": "unknown"}

def search_nearby_amenities(lat: float, lon: float, radius_m: int = 2000, amenity_types: Optional[List[str]] = None) -> Dict[str, Any]:
    """
    TOOL 5 (MOCK): Returns results that meet the 'must_have_park_nearby' requirement.
    """
    return {
        "center": {"lat": lat, "lon": lon},
        "radius_m": radius_m,
        "results": {
            "park": [
                {"name": "St. James's Park", "distance_m": 250.0}
            ],
            "school": [], # Other amenities can be empty
        }
    }


# --- AGENT TOOL ABSTRACTION ---
class PropertyEnrichmentTools:
    """Wraps mock tools for Agent use."""
    @staticmethod
    def get_walking_distance_to_train(address: str) -> Optional[Dict[str, Any]]:
        return calculate_walking_distance_to_poi(house_address=address)

    @staticmethod
    def get_distance_to_water(lat: float, lon: float) -> Dict[str, Any]:
        return get_distance_to_nearest_waterbody(lat=lat, lon=lon)

    @staticmethod
    def estimate_garden_area(land_area: float, footprint_area: Optional[float] = None) -> Dict[str, Any]:
        return compute_gardenable_area(land_area=land_area, footprint_area=footprint_area)

    @staticmethod
    def get_room_count(num_rooms: Optional[int], description: Optional[str]) -> Dict[str, Any]:
        return extract_room_count(number_of_rooms=num_rooms, description=description)

    @staticmethod
    def get_nearby_amenities(lat: float, lon: float) -> Dict[str, Any]:
        return search_nearby_amenities(lat=lat, lon=lon)

## The Orchestrating Agent (Sequencing)

This single LlmAgent uses the RemoteA2aAgent (the Enrichment Agent) as its tool and performs all the other tasks through its instruction set.

This demonstrates Multi-agent System (LlmAgent + RemoteA2aAgent) and Sequential Agents via delegation.

In [None]:
# --- 3. Create the Orchestrating Agent Client and Service ---

# Define the Remote Enrichment Agent Client (RemoteA2aAgent)
remote_enrichment_agent = RemoteA2aAgent(
    url="http://localhost:8002",
    name="property_data_enricher",
    description="A specialized agent for fetching and enriching property data with external real-world features (transit time, water proximity, garden area)."
)

# Define the Requirements and Evaluation Logic
USER_REQUIREMENTS = {
    "min_rooms": 3,
    "max_walk_to_train_min": 15,
    "max_distance_to_water_m": 5000,
    "min_garden_area_sqm": 50.0,
    "match_threshold_percent": 90
}

# Define the Orchestrating LlmAgent
orchestration_llm_agent = LlmAgent(
    model=Gemini(model="gemini-2.5-flash-lite", retry_options=retry_config),
    name="evaluation_orchestrator_agent",
    description="The primary agent for property evaluation, responsible for sequencing the requirements, enrichment, evaluation, and notification steps.",
    instruction=f"""
    You are the **Property Evaluation Orchestrator**. Your mission is to evaluate a property against the following structured user requirements.

    **User Requirements (JSON):** {json.dumps(USER_REQUIREMENTS)}

    **Steps (Sequential Agent Logic):**
    1. **Enrichment:** Use the `property_data_enricher` tool with the provided property data (address, lat, lon, areas) to get the enriched features.
    2. **Evaluation:** Use the enriched data to calculate a match score against the User Requirements.
       - A property passes if a feature meets the minimum/maximum criteria.
       - Assume all features have equal weight for simplicity.
       - **Match Score = (Number of matched criteria / Total criteria) * 100**
    3. **Notification:** If the Match Score is >= {USER_REQUIREMENTS['match_threshold_percent']}%, output a congratulatory notification.

    **Output Format:**
    Always clearly state the calculated **Match Score** and whether a **Notification** was triggered. List the matched and failed criteria.
    """,
    tools=[remote_enrichment_agent],
    session_service=InMemorySessionService(), # Sessions & Memory
)

print("Orchestrating LlmAgent created successfully!")

## Run the Pipeline and Cleanup

In [None]:
# --- 4. Run the Pipeline and Cleanup ---

runner = Runner(session_service=InMemorySessionService())
property_data = MOCK_PROPERTY_DATA

# Prepare the prompt to trigger the tool use
prompt = f"""
Please evaluate the following property against the user requirements:
Address: {property_data['address']}
Latitude: {property_data['lat']}
Longitude: {property_data['lon']}
Land Area (sqm): {property_data['land_area_sqm']}
Footprint Area (sqm): {property_data['footprint_sqm']}
Number of Rooms: {property_data['num_rooms_structured']}

Begin the evaluation now.
"""

print("\n" + "="*80)
print("** STARTING SEQUENTIAL AGENT ORCHESTRATION (LlmAgent -> RemoteA2aAgent) **")
print("="*80)

start_time = time.time()
response = runner.run(
    agent=orchestration_llm_agent,
    prompt=prompt,
    session_id=str(uuid.uuid4())
)
duration = time.time() - start_time

print(f"\nü§ñ **ORCHESTRATOR AGENT FINAL REPORT** (Duration: {duration:.2f}s)")
print("-" * 40)
print(response.text)
print("-" * 40)

# --- Clean Up ---
server_process = globals().get("enrichment_server_process")
if server_process and server_process.poll() is None:
    server_process.terminate()
    server_process.wait(timeout=5)
    print("\nRemote Enrichment Agent server stopped.")

# Remove the temporary file
os.remove("/tmp/enrichment_server.py")
print("Temporary file removed.")