
> **NOTE: DEMO ONLY**  
> This notebook is a simplified proof-of-concept.  
> - Catalog/AuthZ/Monitoring are mocked.  
> - LLM calls are simulated with a local `fake_chatgpt_call()` function for **planning** and **discovery** prompts.  
> - Do **not** use for production or compliance decisions.



# Agentic Data Mesh LLM — KPI Example

This notebook demonstrates step-by-step how an **Agentic Mesh** system can:
1. Discover datasets
2. Plan a query
3. Execute the KPI calculation
4. Return results with explainability & reusability

**Business Question:**  
*“What % of customers cancelled their orders because the delivery was late?”*

**Entities:**
- `orders_dataset` (Sales)
- `shipment_tracking` (Logistics)


In [None]:

# --- LLM Simulation Utilities (DEMO) ---
import json
from datetime import datetime

def fake_chatgpt_call(system_prompt: str, user_prompt: str) -> dict:
    """Return a deterministic, mock response representing an LLM plan.
    This is for demo only; replace with real ChatGPT API in production."""
    now = datetime.utcnow().isoformat()
    if "discovery" in user_prompt.lower():
        return {
            "model": "gpt-5-demo",
            "ts": now,
            "stage": "discovery",
            "entities": ["orders", "shipment", "delivery_delay"],
            "suggested_sources": [
                {"name": "orders_dataset", "domain": "Sales", "join_key": "order_id", "pii":["customer_id"]},
                {"name": "shipment_tracking", "domain": "Logistics", "join_key": "order_id", "pii":[]}
            ],
            "draft_plan": {
                "join_keys": ["order_id"],
                "filters": ["status='cancelled'", "delivery_delay_bucket IN ('>24h','>48h')"]
            }
        }
    elif "planning" in user_prompt.lower():
        return {
            "model": "gpt-5-demo",
            "ts": now,
            "stage": "planning",
            "kpi_spec": {
                "name": "cancel_due_to_delay_rate",
                "numerator": "COUNT(cancelled AND delay_bucket in {>24h, >48h})",
                "denominator": "COUNT(all orders)",
                "window": "ALL"
            },
            "plan": {
                "sources": ["orders_dataset", "shipment_tracking"],
                "join": "orders_dataset.order_id = shipment_tracking.order_id",
                "filters": ["status='cancelled'", "delivery_delay_bucket IN ('>24h','>48h')"],
                "validations": ["schema check", "freshness SLA"]
            }
        }
    else:
        return {"model": "gpt-5-demo", "ts": now, "stage": "unknown", "message": "No demo route."}

print("fake_chatgpt_call() ready — DEMO only.")


In [None]:

import pandas as pd
import numpy as np

# --- Sample data ---
orders_df = pd.DataFrame([
    (1, "C001", "delivered", "2025-08-10 10:12:00"),
    (2, "C002", "cancelled", "2025-08-10 11:20:00"),
    (3, "C003", "cancelled", "2025-08-11 09:02:00"),
    (4, "C004", "delivered", "2025-08-11 11:55:00"),
    (5, "C005", "delivered", "2025-08-11 12:32:00"),
    (6, "C006", "cancelled", "2025-08-12 08:10:00"),
    (7, "C007", "delivered", "2025-08-12 15:37:00"),
    (8, "C008", "cancelled", "2025-08-12 16:05:00"),
], columns=["order_id", "customer_id", "status", "created_at"])

ship_df = pd.DataFrame([
    (1,  1.5),  (2, 36.0), (3, 50.0), (4, 0.5),
    (5, 26.0),  (6, 60.0), (7, 1.0),  (8, 28.0),
], columns=["order_id", "delivery_delay_hours"])

def bucket(h):
    if h <= 1: return "≤1h"
    if h <= 24: return "1–24h"
    if h <= 48: return ">24h"
    return ">48h"

ship_df["delivery_delay_bucket"] = ship_df["delivery_delay_hours"].apply(bucket)

orders_df, ship_df.head()


In [None]:

# --- Step 1: Data Discovery ---
# (DEMO) Ask the LLM to propose discovery steps
llm_discovery = fake_chatgpt_call(
    system_prompt='You are a data agent that performs discovery.',
    user_prompt='Discovery: identify sources and join/filter strategy for late-delivery cancellations.'
)
print('LLM Discovery Suggestion:\n', json.dumps(llm_discovery, indent=2))


user_query = "What % of customers cancelled their orders because the delivery was late?"
kpi = {
    "name": "cancel_due_to_delay_rate",
    "required_entities": ["orders", "shipment", "delivery_delay"],
    "numerator_rule": "status='cancelled' AND delay_bucket IN ('>24h','>48h')",
    "denominator_rule": "all_orders",
    "visual_spec": {"type": "bar"}
}

class Catalog:
    @staticmethod
    def search(required_entities):
        return ["orders_dataset", "shipment_tracking"]
    @staticmethod
    def get_contracts(datasets):
        return {
            "orders_dataset": {"schema": set(orders_df.columns), "freshness_sla_min": 60, "pii": ["customer_id"]},
            "shipment_tracking": {"schema": set(ship_df.columns), "freshness_sla_min": 15, "pii": []},
        }

class Authz:
    @staticmethod
    def get_effective_permissions(identity, datasets):
        return {
            "allow_read": {d: True for d in datasets},
            "field_masks": {"orders_dataset": ["customer_id"], "shipment_tracking": []},
            "row_filters": {}
        }

candidates = Catalog.search(kpi["required_entities"])
contracts  = Catalog.get_contracts(candidates)
permissions = Authz.get_effective_permissions({"user_id":"alice@sales.company","roles":["SalesAnalyst"]}, candidates)

accessible = [d for d in candidates if permissions["allow_read"].get(d, False)]
draft_plan = {
    "sources": ["orders_dataset","shipment_tracking"],
    "join_keys": ["order_id"],
    "filters": ["status='cancelled'","delivery_delay_bucket IN ('>24h','>48h')"],
    "masked_fields": permissions["field_masks"]
}

draft_plan


In [None]:

# --- Step 2: Query Planning ---
# (DEMO) Ask the LLM to propose a KPI spec + plan
llm_planning = fake_chatgpt_call(
    system_prompt='You are a data agent that plans queries.',
    user_prompt='Planning: define KPI spec and SQL plan for late-delivery cancellations.'
)
print('LLM Planning Suggestion:\n', json.dumps(llm_planning, indent=2))


plan = {
    "sources": draft_plan["sources"],
    "joins": ["orders_dataset.order_id = shipment_tracking.order_id"],
    "filters": draft_plan["filters"],
    "masked_fields": draft_plan["masked_fields"],
    "sql": """
        WITH joined AS (
          SELECT o.order_id,
                 o.status,
                 s.delivery_delay_bucket
          FROM orders_dataset AS o
          JOIN shipment_tracking AS s
            ON o.order_id = s.order_id
        ),
        agg AS (
          SELECT
            SUM(CASE WHEN status='cancelled'
                       AND delivery_delay_bucket IN ('>24h','>48h') THEN 1 ELSE 0 END) AS numerator,
            COUNT(*) AS denominator
          FROM joined
        )
        SELECT numerator, denominator,
               CASE WHEN denominator=0 THEN 0
                    ELSE 100.0 * numerator/denominator END AS rate_percent
        FROM agg;
    """.strip()
}

plan


In [None]:

# --- Step 3: Execution ---

joined = pd.merge(
    orders_df[["order_id","status"]],
    ship_df[["order_id","delivery_delay_bucket"]],
    on="order_id",
    how="inner"
)

joined["cancel_due_to_delay"] = np.where(
    (joined["status"]=="cancelled") & (joined["delivery_delay_bucket"].isin([">24h",">48h"])),
    1, 0
)

numerator   = int(joined["cancel_due_to_delay"].sum())
denominator = int(len(orders_df))
rate_pct    = round(100.0 * numerator/denominator, 2) if denominator else 0.0

joined, numerator, denominator, rate_pct


In [None]:

# --- Step 4: Explainability & Reuse ---

explanation = {
    "sources": plan["sources"],
    "joins": plan["joins"],
    "filters": plan["filters"],
    "masked_fields": plan["masked_fields"],
    "contracts": {
        "orders_dataset": {"freshness_sla_min": contracts["orders_dataset"]["freshness_sla_min"]},
        "shipment_tracking": {"freshness_sla_min": contracts["shipment_tracking"]["freshness_sla_min"]},
    },
    "kpi": {
        "name": "cancel_due_to_delay_rate",
        "numerator": numerator,
        "denominator": denominator,
        "rate_percent": rate_pct
    }
}

explanation
