In [6]:
import os
import pandas as pd
from datetime import datetime, timedelta
from langfuse import Langfuse
from dotenv import load_dotenv

load_dotenv()

langfuse = Langfuse()

In [54]:
x = langfuse.api.trace.get("4132f6246668a058006e2fb58e47db2c")

In [55]:
x.dict()

{'id': '4132f6246668a058006e2fb58e47db2c',
 'timestamp': datetime.datetime(2025, 11, 20, 7, 4, 44, 24000, tzinfo=datetime.timezone.utc),
 'name': 'call_llm',
 'input': {'model': 'azure/gpt-4o-mini',
  'config': {'system_instruction': '\n        **Role:** Host agent for the agent-to-agent protocol; delegates queries to specialized remote agents with maximum efficiency.\n\n**Core Directives:**\n\n* **Task Delegation:** Use the `send_message` function to assign precise, actionable tasks to remote agents.\n* **Full Context Provision:** If an agent repeatedly asks for user confirmation, it likely lacks conversation history. Include all relevant context in the task to prevent this.\n* **Autonomous Multi-Agent Engagement:** Engage any required agents directly—never seek user permission or preference. If multiple agents are needed, orchestrate them seamlessly.\n* **Intelligent Inter-Agent Collaboration:** Instruct agents to determine if they need data from another agent. **If Agent A says "I n

In [50]:
def extract_spans(data):
    results = []

    observations = data.get("observations", [])

    # Each element inside the list is an observation
    for obs in observations:
        results.extend(find_spans(obs))

    return results


def find_spans(obj, results=None):
    if results is None:
        results = []

    if isinstance(obj, dict):

        # Check for span type with non-empty input
        if obj.get("type") == "span" and obj.get("input"):
            results.append({
                "id": obj.get("id"),
                "input": obj.get("input")
            })

        # Recurse deeper
        for v in obj.values():
            find_spans(v, results)

    elif isinstance(obj, list):
        for item in obj:
            find_spans(item, results)

    return results


# ---- USAGE ----
valid_spans = extract_spans(x)
print(valid_spans)


[]


In [None]:
# FINAL_WORKING_DEEPEVAL_NOV19_2025.py
import ast
from typing import Dict, List, Any

# DeepEval - latest 2025 API
from deepeval import evaluate
from deepeval.metrics import GEval, FaithfulnessMetric, ToxicityMetric
from deepeval.test_case import LLMTestCase
from deepeval.test_case import LLMTestCaseParams  # ← CRITICAL
import pprint
# Langfuse
from langfuse import Langfuse

# ===============================================================
# 1. YOUR TRACE (paste full string)
# ===============================================================
# raw_trace = '''{'id': '59b7f43597ffd7104da70b5ecd0b2d4b', ... 'externalId': None}'''
# trace_dict: Dict[str, Any] = ast.literal_eval(raw_trace)

# ===============================================================
# 2. Extract steps
# ===============================================================
def extract_steps(contents: List[Dict]) -> str:
    steps = []
    for part in contents:
        if part.get("role") == "user":
            for p in part["parts"]:
                if "text" in p:
                    steps.append(f"USER: {p['text']}")
                elif "function_response" in p:
                    try:
                        txt = p["function_response"]["response"]["result"]["artifacts"][0]["parts"][0]["text"]
                        steps.append(f"TOOL OUTPUT:\n{txt.strip()}")
                    except:
                        pass
        elif part.get("role") == "model":
            for p in part["parts"]:
                if "function_call" in p:
                    fc = p["function_call"]
                    agent = fc["args"]["agent_name"]
                    task = fc["args"]["task"]
                    steps.append(f"→ CALL: send_message('{agent}')\n    Task: {task}")
                elif "text" in p:
                    steps.append(f"FINAL ANSWER: {p['task']}")
    return "\n\n".join(steps)



# ===============================================================
# 3. Build test case
# ===============================================================
def build_test_case(trace: Dict) -> LLMTestCase:
    contents = trace["observations"]["input"]["contents"]
    # output = trace["output"]["content"]["parts"][0]["function_call"]["args"]["task"]
    output = trace["observations"]["input"]["parts"][0]["function_response"]["parts"]["text"]
    user_query = next((p["text"] for c in contents if c["role"] == "user" for p in c["parts"] if "text" in p), "")

    full_trace = "=== FULL AGENT ORCHESTRATION TRACE ===\n" + extract_steps(contents)

    return LLMTestCase(
        input=user_query,
        actual_output=output,
        context=[full_trace],
    )

# ===============================================================
# 4. METRICS — ONLY THIS WAY WORKS IN NOV 2025
# ===============================================================
def get_metrics():
    return [
        GEval(
            name="Multi-Agent Orchestration Quality",
            criteria="Rate how well the routing agent orchestrated other agents using the trace in retrieval_context.",
            evaluation_steps=[
                "Did it call Get Dept Number first?",
                "Then Get Department Manager with d001?",
                "Only 2 calls total?",
                "No loops, no user involvement?",
                "Final answer is direct and correct?"
            ],
            evaluation_params=[
                LLMTestCaseParams.INPUT,
                LLMTestCaseParams.ACTUAL_OUTPUT,
                LLMTestCaseParams.RETRIEVAL_CONTEXT,
            ],
            model="gpt-4o-mini",
            threshold=0.8
        ),
        GEval(
            name="Efficiency & Minimal Steps",
            criteria="Was the task solved in the absolute minimum number of steps?",
            evaluation_steps=[
                "Exactly 2 agent calls",
                "No redundancy",
                "Perfect dependency resolution"
            ],
            evaluation_params=[
                LLMTestCaseParams.ACTUAL_OUTPUT,
                LLMTestCaseParams.RETRIEVAL_CONTEXT,
            ],
            model="gpt-4o-mini",
            threshold=0.9
        ),
        FaithfulnessMetric(threshold=0.9),
        ToxicityMetric(threshold=0.1),
    ]

# ===============================================================
# 5. Evaluate + Log
# ===============================================================
def evaluate_and_log(trace: Dict):
    test_case = build_test_case(trace)
    metrics = get_metrics()

    print("Running DeepEval evaluation...")
    results = evaluate(test_cases=[test_case], metrics=metrics)  # ← Correct call
    result = results.test_results[0]
    pprint.pprint(result)

# ===============================================================
# 6. RUN
# ===============================================================
if __name__ == "__main__":
    evaluate_and_log(x)

KeyError: 13

In [None]:
# === ONE-TIME FIX: Turn whatever x is into the real dictionary ===
import json
import ast

if isinstance(x, str):
    # If you pasted the JSON as a string
    try:
        x = json.loads(x)
    except:
        x = ast.literal_eval(x)
elif isinstance(x, set):
    x = list(x)[0]
elif x is Ellipsis:
    # You probably typed x = ... by mistake
    raise ValueError("You set x = ..., replace it with the actual trace dictionary!")
# Now x is guaranteed to be the correct dict

ValueError: You set x = ..., replace it with the actual trace dictionary!

In [None]:
def extract_custom_logic(trace: dict):
    """
    Custom extraction logic for your multi-agent trace (Vertex AI Agent Builder / A2A style)
    Works reliably with the structure you posted.
    """
    result = {
        "user_query": None,
        "final_answer": None,      # What the user should see as the final response
        "full_prompt": None,       # The full input sent to the model (system + history)
        "agent_steps": [],         # All intermediate agent responses
        "capital_city": None
    }

    try:
        # 1. Extract original user query
        contents = trace["input"]["contents"]
        for msg in contents:
            if msg["role"] == "user" and "parts" in msg:
                for part in msg["parts"]:
                    if "text" in part:
                        result["user_query"] = part["text"]
                        break
                if result["user_query"]:
                    break

        # 2. Extract all function responses (agent answers)
        agent_responses = []
        for msg in contents:
            if "parts" in msg:
                for part in msg["parts"]:
                    if "function_response" in part:
                        resp = part["function_response"]["response"]
                        text = None
                        # Dig into the artifacts
                        if "result" in resp:
                            artifacts = resp["result"].get("artifacts", [])
                            for artifact in artifacts:
                                for p in artifact.get("parts", []):
                                    if p.get("kind") == "text":
                                        text = p["text"]
                                        break
                                if text:
                                    break
                        # Also check history (some agents put it there)
                        if not text and "history" in resp["result"]:
                            for h in resp["result"]["history"]:
                                for p in h.get("parts", []):
                                    if p.get("kind") == "text":
                                        text = p["text"]
                        if text:
                            agent_name = part["function_response"].get("name", "unknown_agent")
                            agent_responses.append({
                                "agent": agent_name,
                                "response": text.strip()
                            })

        result["agent_steps"] = agent_responses

        # 3. Determine final answer
        # In your flow: Capital Agent → General Q&A Agent
        # The last agent response is almost always the final detailed answer
        if agent_responses:
            # First response is usually just the capital name
            if len(agent_responses) >= 1:
                result["capital_city"] = agent_responses[0]["response"]

            # Last response is the detailed 10 lines → this is what user wants
            if len(agent_responses) >= 2:
                result["final_answer"] = agent_responses[-1]["response"]
            else:
                result["final_answer"] = agent_responses[0]["response"]

        # 4. Full prompt sent to the model (for debugging/cost analysis)
        # It's in observations → GENERATION → input.contents
        observations = trace.get("observations", [])
        for obs in observations:
            if obs.get("type") == "GENERATION" and "input" in obs:
                input_data = obs["input"]
                if "contents" in input_data:
                    # Reconstruct readable prompt
                    prompt_lines = []
                    if "config" in input_data and "system_instruction" in input_data["config"]:
                        prompt_lines.append("SYSTEM:")
                        prompt_lines.append(input_data["config"]["system_instruction"])
                        prompt_lines.append("\n" + "="*50 + "\n")

                    for msg in input_data["contents"]:
                        role = msg["role"].upper()
                        for part in msg["parts"]:
                            if "text" in part:
                                prompt_lines.append(f"{role}: {part['text']}")
                            elif "function_call" in part:
                                fc = part["function_call"]
                                prompt_lines.append(f"{role} → TOOL CALL: {fc['name']}({fc['args']})")
                            elif "function_response" in part:
                                fr = part["function_response"]
                                resp_text = "..."  # truncated
                                if "response" in fr and "result" in fr["response"]:
                                    artifacts = fr["response"]["result"].get("artifacts", [])
                                    for a in artifacts:
                                        for p in a.get("parts", []):
                                            if p.get("kind") == "text":
                                                resp_text = p["text"][:200] + "..." if len(p["text"]) > 200 else p["text"]
                                prompt_lines.append(f"TOOL RESPONSE: {resp_text}")

                    result["full_prompt"] = "\n".join(prompt_lines)

    except Exception as e:
        print(f"Error in extraction: {e}")

    return result


# ============= USAGE EXAMPLE =============
trace = {...}  # your huge trace dict here

extracted = extract_custom_logic(x)

print("User Query:", extracted["user_query"])
print("\nCapital City:", extracted["capital_city"])
print("\nFinal Answer to User:\n")
print(extracted["final_answer"])
print("\nAll Agent Steps:")
for step in extracted["agent_steps"]:
    print(f"- {step['agent']} → {step['response'][:100]}...")

In [None]:


# ------------------------------------------------------------------
# 1. Extract the user query (always the first plain text from user)
# ------------------------------------------------------------------
user_query = next(
    (p["text"] for c in contents 
     if c.get("role") == "user" 
     for p in c.get("parts", []) 
     if isinstance(p, dict) and p.get("text")),
    "Unknown query"
)

# ------------------------------------------------------------------
# 2. Extract the real final answer (the last agent response that has text)
# ------------------------------------------------------------------
actual_output = "No final answer found."

# Go backwards through the conversation – the last meaningful tool response is our answer
for message in reversed(contents):
    if message.get("role") == "user":                     # tool responses appear as "user" in Vertex traces
        for part in message.get("parts", []):
            if "function_response" in part:
                resp = part["function_response"]["response"]
                if "result" in resp:
                    artifacts = resp["result"].get("artifacts", [])
                    for artifact in artifacts:
                        for subpart in artifact.get("parts", []):
                            if subpart.get("kind") == "text" and subpart.get("text"):
                                actual_output = subpart["text"].strip()
                                break  # stop at the first (i.e. last in reverse) text we find
                    if actual_output != "No final answer found.":
                        break
    if actual_output != "No final answer found.":
        break

# ------------------------------------------------------------------
# Now you can use `user_query` and `actual_output` directly
# ------------------------------------------------------------------
print("User query   :", user_query)
print("Final answer :", actual_output)

TypeError: 'ellipsis' object is not subscriptable