# Planner agent

The Planner Agent acts as the strategic core of the RCA workflow. It receives all identified symptoms from the Triage Agent and performs a holistic analysis to identify correlations and potential causal links. Based on this analysis, it generates a unified, de-duplicated, and prioritized investigation plan, which is then distributed to parallel RCA Workers for execution. This centralized planning step transforms the diagnostic process from a reactive to a strategic operation, significantly improving efficiency and accuracy.

In [None]:
from dotenv import load_dotenv
import os
# Get the path to the root directory of the repository
root_dir = os.path.abspath(os.path.join(os.getcwd(), '../..'))

# Load environment variables from .env file in the root directory
load_dotenv(os.path.join(root_dir, '.env'), verbose=True)

In [None]:
import sys

# Add MCP-server to path
mcp_server_path = os.path.abspath(os.path.join(os.getcwd(), '../../MCP-server'))
sys.path.insert(0, mcp_server_path)

## Build the agent

In [None]:
from pydantic import BaseModel, Field
from typing import List, Literal

class Symptom(BaseModel):
    """A symptom observed in the Kubernetes cluster"""
    potential_symptom: str = Field(..., description="Type of symptom observed")
    resource_type: Literal["pod", "service"] = Field(..., description="Type of resource experiencing the issue")
    affected_resource: str = Field(..., description="Name of the resource experiencing the issue")
    evidence: str = Field(..., description="Evidence supporting this symptom identification")

In [None]:
class RCATask(BaseModel):
    """A RCA task to be performed by the RCA agent"""
    investigation_goal: str = Field(..., description="Goal of the investigation")
    target_resource: str = Field(..., description="Name of the resource to investigate")
    resource_type: Literal["pod", "service"] = Field(..., description="Type of resource being investigated")
    suggested_tools: List[str] = Field(default_factory=list, description="List of tools suggested for the investigation")

class RCATaskList(BaseModel):
    "A list of RCA tasks o be performed by the RCA agent in parallel"
    rca_tasks: List[RCATask] = Field(default_factory=list, description="List of RCA tasks to be performed")

In [None]:
from typing import TypedDict, List, Literal, Annotated

class PlannerAgentState(TypedDict):
    app_name: str
    app_summary: str
    target_namespace: str
    symptoms: List[Symptom]
    rca_tasks: List[RCATask]

In [None]:
from api.k8s_api import K8sAPI
from api.datagraph import DataGraph

def get_resource_dependencies(symptom: Symptom) -> dict:

    result = {}
    result["resource_name"] = symptom.affected_resource
    result["resource_type"] = symptom.resource_type

    service = ""

    k8s_api = K8sAPI()

    if symptom.resource_type == "pod":
        services = k8s_api.get_services_from_pod(symptom.affected_resource)
        service =  services["services"][0]["service_name"]
    else:
        service = symptom.affected_resource

    datagraph = DataGraph()
    
    data_dependencies = datagraph.get_services_used_by(service)
    infra_dependencies = datagraph.get_dependencies(service)

    if len(data_dependencies) > 0:
        result["data_dependencies"] = []
        for dep in data_dependencies:
            temp = {}
            temp["service"] = dep
            pods = k8s_api.get_pods_from_service(dep)
            temp["pods"] = []
            for pod in pods["pods"]:
                temp["pods"].append(pod["pod_name"])
            result["data_dependencies"].append(temp)

    if isinstance(infra_dependencies, dict) and len(infra_dependencies) > 0:
        result["infra_dependencies"] = []
        for dep_name, dep_type in infra_dependencies.items():
            dep = {}
            dep["service"] = dep_name
            dep["dependency_type"] = dep_type
            dep["pods"] = []
            pods = k8s_api.get_pods_from_service(dep_name)
            for pod in pods["pods"]:
                dep["pods"].append(pod["pod_name"])
            result["infra_dependencies"].append(dep)   
    return result

In [None]:
symptom = Symptom(
    potential_symptom= "Container repeatedly crashing (terminated with error)",
    resource_type="pod",
    affected_resource="geo-6b4b89b5f5-rsrh7",
    evidence="Container 'hotel-reserv-geo' reported 'Terminated With Error' (reason: Error) with exit_code=2 and restart_count=2 (pod_phase: Running)."
)

In [None]:
from langchain_openai import ChatOpenAI

gpt5mini = ChatOpenAI(model="gpt-5-mini")

llm_for_tasks = gpt5mini.with_structured_output(RCATaskList)

In [None]:
import json
from langchain_core.prompts import ChatPromptTemplate

planner_prompt_template = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            """You are an expert Site Reliability Engineer planning RCA investigations.

Your task is to analyze identified symptoms and create a list of RCA tasks for parallel execution.

**Available Tools:**

*Kubernetes Inspection:*
- kubectl_get: Get/list Kubernetes resources
- kubectl_describe: Describe resource details
- get_pods_from_service: Get pods belonging to a service
- get_cluster_pods_and_services: Get cluster overview

*Observability & Dependencies:*
- get_logs: Retrieve pod/service logs
- get_traces: Get traces with error filtering
- get_trace: Get detailed trace by ID
- get_metrics: Get current metrics (CPU, memory, network)
- get_metrics_range: Get historical metrics
- get_services_used_by: Get downstream service dependencies
- get_dependencies: Get infrastructure dependencies (databases, etc.)

**Guidelines:**
1. Each task should target ONE specific resource and investigation area
2. Suggest tools most likely to reveal the root cause based on symptom type
3. De-duplicate: if multiple symptoms share a resource, investigate that resource ONCE
4. Prioritize by likelihood of revealing root cause:
   - Pod crashes/errors → get_logs, kubectl_describe, get_metrics
   - High latency → get_traces, get_services_used_by, get_metrics
   - Connectivity issues → get_services_used_by, get_dependencies, kubectl_describe

**Task Format:**
- investigation_goal: Clear, specific goal (what to investigate and why)
- target_resource: The specific resource name (ONLY the exact name, no namespace or other prefixes)
- resource_type: "pod" or "service"
- suggested_tools: List of relevant tools (start with most impactful)

**IMPORTANT: Resource Names**
- Provide ONLY the exact resource name in `target_resource`
- Do NOT include namespace prefix (e.g., use "geo-6b4b89b5f5-rsrh7" NOT "test-hotel-reservation/geo-6b4b89b5f5-rsrh7")
- Do NOT include any other qualifiers or decorations
""",
        ),
        ("human", "{human_input}"),
    ]
)

In [None]:
def planner_agent(state: PlannerAgentState):
    """Create RCA investigation tasks from symptoms and their dependencies"""
    
    symptoms = state["symptoms"]
    
    if not symptoms:
        return {"rca_tasks": []}
    
    # Enrich symptoms with dependencies
    enriched_symptoms = []
    for symptom in symptoms:
        enriched = {
            "symptom": symptom.model_dump(),
            "dependencies": get_resource_dependencies(symptom)
        }
        enriched_symptoms.append(enriched)
    
    # Build human prompt with all symptom information in markdown format
    human_parts = []
    human_parts.append(f"# Application Context\n\n")
    human_parts.append(f"- **Application**: {state['app_name']}\n")
    human_parts.append(f"- **Namespace**: `{state['target_namespace']}`\n")
    human_parts.append(f"- **Summary**: {state['app_summary']}\n\n")
    human_parts.append("---\n\n")
    human_parts.append("# Symptoms to Investigate\n\n")
    
    for i, enriched in enumerate(enriched_symptoms, 1):
        symptom_dict = enriched["symptom"]
        deps = enriched["dependencies"]
        
        human_parts.append(f"## Symptom {i}\n\n")
        human_parts.append(f"**Type**: {symptom_dict['potential_symptom']}\n\n")
        human_parts.append(f"**Resource**: `{symptom_dict['affected_resource']}` (`{symptom_dict['resource_type']}`)\n\n")
        human_parts.append(f"**Evidence**:\n{symptom_dict['evidence']}\n\n")
        
        # Add dependencies if they exist
        if "data_dependencies" in deps and deps["data_dependencies"]:
            human_parts.append(f"**Data Dependencies**:\n```json\n{json.dumps(deps['data_dependencies'], indent=2)}\n```\n\n")
        
        if "infra_dependencies" in deps and deps["infra_dependencies"]:
            human_parts.append(f"**Infrastructure Dependencies**:\n```json\n{json.dumps(deps['infra_dependencies'], indent=2)}\n```\n\n")
        
        if "data_dependencies" not in deps and "infra_dependencies" not in deps:
            human_parts.append("**Dependencies**: None found\n\n")
        
        human_parts.append("---\n\n")
    
    human_input = "".join(human_parts)
    
    # Create and invoke chain
    planner_chain = planner_prompt_template | llm_for_tasks
    task_list = planner_chain.invoke({"human_input": human_input})
    
    return {"rca_tasks": task_list.rca_tasks} # type: ignore


In [None]:
from langgraph.graph import START, END, StateGraph
from IPython.display import Image, display

# Build the planner graph
builder = StateGraph(PlannerAgentState)
builder.add_node("planner", planner_agent)
builder.add_edge(START, "planner")
builder.add_edge("planner", END)

planner_graph = builder.compile()

# Visualize
display(Image(planner_graph.get_graph(xray=True).draw_mermaid_png()))


In [None]:
import time
from IPython.display import Markdown

def run_planner_agent(graph, app_name: str, app_summary: str, target_namespace: str, symptoms: List[Symptom], trace_name: str | None = None):
    """Execute the planner agent"""
    
    initial_state = {
        "app_name": app_name,
        "app_summary": app_summary,
        "target_namespace": target_namespace,
        "symptoms": symptoms,
        "rca_tasks": []
    }
    
    start_time = time.time()
    
    # Configuration for the graph execution
    config = {"recursion_limit": 50}
    if trace_name:
        config["run_name"] = trace_name  # type: ignore
    
    result = graph.invoke(initial_state, config)
    execution_time = time.time() - start_time
    
    return result, execution_time


def display_rca_tasks(result):
    """Display RCA tasks in markdown"""
    
    tasks = result["rca_tasks"]
    
    md = f"""
# 📋 RCA Investigation Plan

**Total Tasks**: {len(tasks)}

---

"""
    
    for i, task in enumerate(tasks, 1):
        md += f"\n## Task {i}\n\n"
        md += f"**Goal**: {task.investigation_goal}\n\n"
        md += f"**Target**: `{task.target_resource}` ({task.resource_type})\n\n"
        md += f"**Tools**: "
        md += ", ".join([f"`{t}`" for t in task.suggested_tools]) + "\n\n"
        md += "---\n"
    
    return Markdown(md)

In [None]:

# Get experiment name for tracing
experiment_name = input("Enter experiment name: ")

if experiment_name.strip() == "":
    experiment_name = "Planner agent"

# Run the planner agent
app_summary = """
The application implements a hotel reservation service, build with Go and gRPC. 
The initial project is extended in several ways, including adding back-end in-memory 
and persistent databases, adding a recommender system for obtaining hotel recommendations, 
and adding the functionality to place a hotel reservation.
"""

target_namespace = "test-hotel-reservation"

# Create symptoms list with the test symptom
test_symptoms = [symptom]

# Execute planner
result, exec_time = run_planner_agent(
    graph=planner_graph,
    app_name="Hotel Reservation Service",
    app_summary=app_summary,
    target_namespace=target_namespace,
    symptoms=test_symptoms,
    trace_name=experiment_name
)

print(f"✅ Planning completed in {exec_time:.2f} seconds")

# Display the plan
display(display_rca_tasks(result))

In [None]:
result["rca_tasks"]