# Creating a Coding AI Agent using MCP

In this notebook we will build an AI agent that does automatic coding and correct itself.
The idea behind this is that we can give our agent tasks such as "evaluate data xy" and it will automatically execute and analyze the code.
In later steps we will even extend this and more functionalities.

> **Note:** This notebook has a more complicated setup and uses some advanced python programming functionalities.
You don't need to understand every detail here, it is just important to understand the overall steps.

We are using the **Model Context Protocol (MCP) Framework** - introduced in late 2024 by Anthropic - for agentic AI. It consists of a server-client architecture. They client (here in the notebook) sends requests to the servers (running in the background).
Key components of this framework are:
- **Resources** → external files and data the agent can use (e.g., CSV files in `resources/data/`).  
- **Prompts** → standardized prompt/text templates that guide the AI when planning, executing tasks, or summarizing results (in `prompts/` folder).  
- **Tools** → the actual functions exposed by servers (e.g., `code.run`, `csv.inspect`, `fridge.add`). These are what the agent calls to get work done.

Feel free to take a look at the folders to get a feeling of the content of the files!


**Folder Structure:**

```text
├─ agent.ipynb                  # ← You are here
├─ servers/
│  ├─ code_server.py            # Tools the agent can call (e.g., run code, inspect CSVs)
│  └─ ...                       # (optional) more servers, e.g., search/fridge
├─ resources/
│  ├─ data/
│  │  ├─ supermarket_sales.csv  # Example CSVs to analyze
│  │  └─ ...
│  └─ runs/                     # Agent outputs: code, logs, plots, result tables
│     └─ ...
├─ prompts/                     # Lightweight prompt templates (planning, repair, etc.)
│  ├─ error_repair.md
│  └─ ...
└─ utils/
   ├─ agent_support.py          # Small helpers to keep the notebook clean (mostly technical details - not important for general understanding)
   └─ multiprocess_utils.py     # Further helpers to manage multiprocessing  (mostly technical details - not important for general understanding)

```


## Installations and Imports

In [None]:
!pip install -U "mcp>=1.13.1" "openai>=1.0.0" nest-asyncio ipywidgets matplotlib pandas ddgs numpy

import atexit
import asyncio
import contextlib
from datetime import datetime, timedelta
import json
import os
import sys
from pathlib import Path
from typing import Any, Dict, List
import nest_asyncio; nest_asyncio.apply()
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from utils.multiprocess_utils import sweep_stale_servers_in_dir
import pandas as pd

# Setup for OpenAI API
os.environ.setdefault("AZURE_OPENAI_API_KEY", "986IfxLKwN3Paiq4yx1Kn2iTG7FyG2GxFg17qQSyr1KZqGLaizAGJQQJ99BCACI8hq2XJ3w3AAABACOGQfvw")
os.environ.setdefault("AZURE_OPENAI_ENDPOINT", "https://cas-dml-llm.openai.azure.com/")
os.environ.setdefault("AZURE_OPENAI_DEPLOYMENT", "gpt-4o") # Available models: "gpt-35-turbo" or "gpt-4o"
os.environ.setdefault("AZURE_OPENAI_API_VERSION", "2024-10-21")


from utils.agent_support import * # all helper functions from agent_support.py file

ROOT = Path().resolve()
SERVERS_DIR = ROOT / "servers"
os.environ["ROOT"] = str(ROOT)

# (IGNORE) manages processes in the background
atexit.register(lambda: asyncio.get_event_loop().run_until_complete(teardown()))
sweep_stale_servers_in_dir(SERVERS_DIR)

## Overview and Core Function `run_task`
Before we dive into implementation details let's look at the overview of the agent we want to implement shown in this graphic.


![agent_overview](resources/agent_overview.png)

The process of answering a request is managed through the `run_task` function, where we first make a plan and then execute this plan step by step.
This is implemented here. Ignore for now the details of the implementation of the execution step, those are clarified in the following steps.



In [None]:
async def run_task(task_text: str, max_attempts: int = 5):

    # (0) Received task
    print_step("START Task")
    print(f"User task: '{task_text}'")

    # (1) Make plan how to solve task
    plan = await plan_task(task_text)
    print_plan(plan)

    # Is the task doable with our available functionalities?
    if plan[0]["intent"] == "out_of_scope":
        print("Requested task is considered as 'out-of-scope' for the agent.")
        return

    # (2) Execute the plan step by step
    ctx = {"user_task": task_text, "plan": plan} # context in which all steps can write intermediate results
    for i, step in enumerate(plan, 1):
        intent = step["intent"]
        print_step(f"STEP {i}: {intent}")

        # Check if the collected ctx contains everything needed for next step
        needed = step.get("use") or []
        missing = [k for k in needed if k not in ctx]
        if missing:
            print(f"Missing inputs {missing}. Available: {list(ctx.keys())}")
            return

        # Get handler for current step and execute it
        handler = INTENT_REGISTRY[intent]['fn']
        out = await handler(step=step, ctx=ctx, max_attempts=max_attempts)
        out = out or {}

        # Save intermediated outputs under requested names if provided
        save_keys = step.get("save") or list(out.keys())
        if save_keys and out:
            for new_name, (k, v) in zip(save_keys, out.items()):
                ctx[new_name] = v
        ctx.update(out) # also adding raw out keys

        compact_preview(out)

    # (3) Print final output/answer
    print_step("DONE with Task ✅")
    show_final_outputs({k: ctx[k] for k in step["output"] if k in ctx})

## Task Planner
As seen, a main component and needed for the first step is the **planner**. The planner takes an initial request and then -knowing which tools (intents) are available- using an LLM plans what steps need to be done after each other to complete the task. For example a standard request asking for a CSV analysis could look like:

```json
[
  {"intent": "csv_analytics", "params": {"wants_plot": true}, "save": ["result_csv", "plot_png"]},
  {"intent": "summarize", "use": ["result_csv", "plot_png"], "save": ["answer_text"], "output": ["answer_text", "plot_png"]}
]
```

The following function implements this.

In [None]:
async def plan_task(task_text: str) -> List[Dict[str, Any]]:
    """
    Creating a plan via asking the LLM to propose a 1–5 step plan using registered intents.
    Returns a LIST of 1-5 steps. Each step:
      {
        "intent": "code_general" | "csv_analytics" | ... ,
        "params": {"...": "..." },      # step-specific params
        "use": ["..."],                 # optional: ctx keys this step expects
        "save": ["..."],                # optional: keys to store outputs under
        "output" : ["..."]              # optional: for the last step, specify the overall final output
      }
    """
    print_step("PLANNING Task")

    # Collect additional infos we need for the prompt
    allowed_intents = list(INTENT_REGISTRY.keys())
    csvs = await available_csvs()

    # Creating the prompt using the user's request, the available intents, and csvs
    planning_prompt = f"""
You are a concise planner for an agentic notebook.
Create a SHORT plan (1–5 steps) to solve the USER_TASK.
Keep it simple, many tasks are solvable with 1-3 step.
Use ONLY these intents for the planning steps (including potential use and save values they have):
{intent_specs_text(INTENT_REGISTRY)}
If totally unrelated to these, return a SINGLE STEP with intent "out_of_scope".

Step schema:
[
  {{
    "intent": "...",                 // one of: {allowed_intents} or 'out_of_scope'
    "params": {{ "...": "..." }},    // OPTIONAL step-specific params
    "use": ["..."],                  // OPTIONAL list of ctx keys this step expects
    "save": ["..."]                  // OPTIONAL list of keys to save outputs under
    "output" : ["..."]               // ONLY for the last step, specify the overall final output
  }},
  ...
]

For "params", "use", and "save" you can ONLY use the ones that are specified before for each intent !!!
All other parameters (like csv names) are managed in the functions! Don't introduce new params/use/save!

{{To help with your csv_analytics decision, these CSV files are available: {csvs}}}

Conventions:
- IN the LAST  add a key "output" with all displayable results (only from the "save" values from the steps before).
- Use "summarize" to create a friendly final explanation from "stdout", "answer_text",  "result_csv", or "plot_png" to check if there are plots.
- Make sure if steps require ctx keys, that previous steps save those
- DO NOT add IDs; order is the execution order.
- Keep each step small and understandable.

USER_TASK: {task_text}
""".strip()

    #  Querying the LLM witht the prompt
    content = await llm_chat(
        [
            {"role": "system", "content": load_prompts()['system_return_json']},
            {"role": "user",   "content": planning_prompt}
        ]
    )

    try:
        # Parse LLM output
        plan = json.loads(content)
        if isinstance(plan, dict):
            plan = [plan]
    
        # Check if all steps (intents) are valid
        final_plan = []
        for step in plan:
            intent = step.get("intent")
            if not isinstance(step, dict) or intent not in (allowed_intents + ['out_of_scope']):
                raise ValueError
            final_plan.append({
                "intent": intent,
                "params": step.get("params") or {},
                "use": step.get("use") or [],
                "save": step.get("save") or [],
                "output": step.get("output") or []
            })
    except Exception:
        raise ValueError(f"The LLM returned an invalid planning output, got: {content}")
            
    return final_plan

## MCP Client

We now start with implementing the functions and objects needed for the execution step. First of all we need a **client** that communicates with the servers (remember the graphic).
We’ll use a `MCPClient` wrapper that connects to “servers” (tools).
We can then add servers to this client.

Each server exposes a few **tools** (e.g., `code.run`, `code.inspect_csv`) the agent can call.


> **Quick note on asynchronous functions**
>
> Some functions in this notebook are **asynchronous** (they run tasks in the background).
They are defined with the keyword `async`. Aside from that they work exactly as normal functions.
>
> In Jupyter you call them with `await`, see in the following cells.


In [None]:
class MCPClient:
    def __init__(self, default_workspace_dir=ROOT, default_env=None):
        self.default_workspace_dir = default_workspace_dir
        self.default_env = default_env or {}
        self._cfg = {}
        self._state = {}
        self._stack = None

    # (IGNORE)
    async def __aenter__(self):
        if self._stack is None:
            self._stack = contextlib.AsyncExitStack()
            await self._stack.__aenter__()
        return self
    # (IGNORE)
    async def __aexit__(self, *exc):
        await self.aclose()
    # (IGNORE)
    async def aclose(self):
        if self._stack:
            await self._stack.aclose()
            self._stack = None
            self._state.clear()

    async def add_server(self, name, server_py, *, workspace_dir=None, env=None, print_tools=True):
        """Add a new MCP server to the agent with optional environment and workspace."""
        if self._stack is None:
            self._stack = contextlib.AsyncExitStack()
            await self._stack.__aenter__()

        if name in self._state:
            print(f"Server '{name}' already exists, no new server added.")
            return

        ws = str(workspace_dir or self.default_workspace_dir or "")
        self._cfg[name] = {"server_py": str(server_py), "workspace_dir": ws, "env": env or {}}

        env_all = {
            **os.environ,
            **self.default_env,
            "WORKSPACE_DIR": ws,
            **(env or {}),
        }
        params = StdioServerParameters(
            command=sys.executable,
            args=["-u", str(server_py)],
            env=env_all
        )

        stdio_r, write = await self._stack.enter_async_context(stdio_client(params))
        session = await self._stack.enter_async_context(ClientSession(stdio_r, write))
        await session.initialize()
        tools = [t.name for t in (await session.list_tools()).tools]

        self._state[name] = {"session": session, "stdio": stdio_r, "write": write, "tools": tools}

        if print_tools:
            print(f"Connected to server [{name}]. Available tools: {tools}")

    async def tools(self, name: str) -> list[dict]:
        """Return available tools for a given server."""
        return list(self._state[name]["tools"])

    def servers(self) -> dict[str, list[str]]:
        """Return all available servers with its tools."""
        return {name: list(st.get("tools") or []) for name, st in self._state.items()}

    async def call(self, selector: str, args: dict):
        """Call a specific tool with given arguments. The selector format is 'server:tool' (e.g. 'code:code.run')"""
        try:
            server, tool = selector.split(":", 1)
        except ValueError:
            raise ValueError("Use 'server:tool', e.g. 'code:run_code'.")

        session = self._state[server]["session"]
        res = await session.call_tool(tool, args)

        for item in res.content:
            typ = getattr(item, "type", None)
            if typ == "json":
                return item.data
            if typ == "text":
                try:
                    return json.loads(item.text)
                except Exception:
                    return {"text": item.text}
        return {}

# Instantiating our client
client = MCPClient()

# Function to conveniently call the servers of our client
async def mcp(selector: str, **kwargs):
    return await client.call(selector, kwargs)

Having instantiated our client, we can now add our first server. This is how we add a server:

In [None]:
# This is how we add a new server to our client. Here we add the code_server:
await client.add_server("code", SERVERS_DIR / "code_server.py")

With the following call, we can see which servers we are connected to and which tools are available. So far there should be only one server called 'code':

In [None]:
# Inspecting all our available servers and the tools per server
client.servers()

Now we can call the servers with an input (this is not our agent yet, just a simple server call to demonstrate the functionality).
In this case a code sample:

In [None]:
# Testing the server's coding functionality
test_code = """
a = 21
b = 12
print(a+b)
"""
# we call it in the format server:server.tool e.g.:
out = await mcp("code:code.run", code=test_code)
print("Raw 'code:code.run' output:", out)
print("\nCode output:", out['stdout_tail'])

## Available resources (CSV files)

Let's next explore the available data (in our case CSV files). The files are located in `resources/data/`. 

In [None]:
# Display our available data sources (csv files only in our case)
async def available_csvs():
    """List available CSV files in the workspace."""
    res = await mcp("code:code.list_csvs")
    csvs = res.get("csvs", [])
    return [csv['filename'] for csv in csvs]
    

# Function to peek into all available CSV files
async def peek_data(n_head: int = 5):
    """Preview all available CSV files"""
    print_step("Listing CSVs")
    res = await mcp("code:code.list_csvs")
    csvs = res.get("csvs", [])
    for i, it in enumerate(csvs, 1):
        print(f"{i}. {it['filename']}  ({it['size_kb']} KB)  @ {it['path']}")
    print_step("Inspecting CSVs")
    for csv in csvs:
        meta = await mcp("code:code.inspect_csv", path=csv["path"], n_head=n_head)
        print(f"path: {meta['path']}")
        print(f"rows: {meta['n_rows']}")
        print("columns:", meta["columns"])
        compact_df_preview(meta.get("head", []))
    return

await peek_data()
await available_csvs()

## Adding the coding functionality to our agent

A last component missing are the **handlers**. 
Here we implement the coding handlers.
Before doing so, we define some helpers for the coding functionality.

These helpers let the agent:
- choose the best fitting CSV (with the help of an LLM),
- generate Python code for a given task task (with the help of an LLM),
- run it, and if it fails, repair and retry a few times,
- save any plot (e.g., `plot.png`) and table results (e.g., `result.csv`),
- optionally validate that certain result columns exist.

In [None]:
async def _choose_csv(task_text, csvs, max_cols=20, n_head=2):
    """Helper function to pick the best fitting CSV for a prompt"""

    # Collecting some data for each availabe CSV file
    inspected, candidates = [], []
    for i, it in enumerate(csvs, 1):
        m = await mcp("code:code.inspect_csv", path=it["path"], n_head=n_head)
        inspected.append((it, m))
        cols = m.get("columns", []) or []
        candidates.append({
            "index": i,
            "filename": it.get("filename"),
            "n_rows": m.get("n_rows", 0),
            "n_cols": len(cols),
            "columns": cols[:max_cols],
            "sample": (m.get("head", []) or [])[:1],
        })

    # Formulate prompt
    prompt = f"""
You are selecting the best CSV for this task.

TASK: {task_text}

Choose EXACTLY ONE candidate that best supports this task based on filename and basic metadata.
Return STRICT JSON ONLY (no prose, no wrapping in code etc., just plain json):
{{
  "index": <int from the list below>,
  "reason": "<one-line reason>"
}}

Candidates:
{json.dumps(candidates, indent=2)}
""".strip()
    # Asl LLM for best fitting CSV
    resp = await llm_chat(
        [{"role":"system","content":load_prompts()['system_return_json']},
         {"role":"user","content":prompt}]
    )
    # Extract and return best match
    data = json.loads(resp)
    i = data["index"]
    chosen, meta = inspected[i-1]
    print_step("Selecting CSV", level=2)
    print(f"LLM chose: {chosen['filename']} — {data.get('reason','')}")
    return chosen, meta

def _compose_user_prompt(task_text: str, plan: Dict[str,Any], csv_meta: Dict[str,Any] | None) -> str:
    """Helper function to compose a coding prompt"""
    if plan["intent"] == "csv_analytics" and csv_meta:
        return f"""
USER TASK:
{task_text}

INTENT: {plan['intent']}
OUTPUT PREFERENCE: {plan.get('result_preference','text')}
WANTS_PLOT: {plan.get('wants_plot', False)}

CSV CONTEXT:
path: {csv_meta.get('path')}
columns: {json.dumps(csv_meta.get('columns', []), indent=2)}
dtypes: {json.dumps(csv_meta.get('dtypes', {}), indent=2)}
head (sample rows):
{json.dumps(csv_meta.get('head', [])[:5], indent=2)}

Produce ONLY the Python code (no explanations, no backticks).
Follow the intent's rules. If table-like, save 'result.csv'. If a chart, save 'plot.png'.
""".strip()
    else:
        return f"""
USER TASK:
{task_text}

INTENT: {plan['intent']}
OUTPUT PREFERENCE: {plan.get('result_preference','text')}
WANTS_PLOT: {plan.get('wants_plot', False)}

Produce ONLY the Python code (no explanations, no backticks).
- If scalar or short text, print as: print("ANSWER:", value_or_text)
- If table-like, save 'result.csv'
- If a chart is appropriate, save 'plot.png'
""".strip()


async def _generate_code(task_text: str, plan: Dict[str,Any], csv_meta: Dict[str,Any] | None) -> str:
    """Helper function to generate code giving the task and meta data"""
    print_step("Generating code", level=2)

    resp = await llm_chat(
        [{"role":"system","content": load_prompts().get(f"system_{plan['intent']}","")},
         {"role":"user","content": _compose_user_prompt(task_text, plan, csv_meta)}]
    )
    code = extract_code(resp)
    return code

async def _repair_code(prev_code: str, stdout_tail: str, stderr_tail: str, task_text: str, plan: Dict[str,Any], csv_meta: Dict[str,Any]) -> str:
    """Helper function to regenerate code giving the task, meta data, and previous code, outputs, and errors"""
    print_step("Repairing code", level=2)
    original_prompt = _compose_user_prompt(task_text, plan, csv_meta)
    prompt = f"""
Prior attempt failed. Error tail:
{stderr_tail}

Prior Output tail:
{stdout_tail}

This was the original prompt (reminder):
{original_prompt}

CSV columns/dtypes:
{json.dumps(csv_meta.get('columns', []))} / {json.dumps(csv_meta.get('dtypes', {}))}

PREVIOUS CODE:
{prev_code}

""".strip()
    resp = await llm_chat(
        [{"role":"system","content": load_prompts()["system_code_csv_analytics"] + load_prompts()["error_repair"]},
         {"role":"user","content": prompt}]
    )
    return extract_code(resp)

### Coding handler functions

With these helpers we can now implement the **handlers**. The handlers wrap all the helper functions and server calls into one central function that fully executes one step (e.g. creating and verifying code).
Each handler receives the current **step** + shared **context**, runs the right tool(s),
and returns compact outputs (stdout, file paths, etc.). The agent stitches those together.

For now we define two coding handlers:

1. `code_general`, which is suited for any general coding task
2. `csv_analytics`, which is tailored for analyzing CSV files

In [None]:
async def handler_code_general(step, ctx, max_attempts=5) -> Dict[str, Any]:
    """Iteratively generate/repair user-task code, run it in a persistent work dir, collect artifacts, and optionally validate outputs."""
    # Reading in relevant context
    params = step.get("params") or {}
    required_cols = params.get("result_columns") or []
    if isinstance(required_cols, str):
        required_cols = [c.strip() for c in required_cols.split(",") if c.strip()]
    task_text = ctx.get("user_task").strip()

    # Loop until success
    plan = {"intent":"code_general", **params}
    persistent_run_dir = None
    for attempt in range(1, max_attempts + 1):
        # Generating new code
        code = await _generate_code(task_text, plan, csv_meta=None) if attempt == 1 else \
               await _repair_code(last_code, last_out, last_errors, task_text, plan, csv_meta=None)
        last_code = code

        # Running the new code
        print_step(f"Attempt {attempt}: run code", level=2)
        last_run_out = await mcp("code:code.run", code=code, attempt=attempt, run_dir=persistent_run_dir)
        last_out = last_run_out.get("stdout_tail")

        # Directory where all files (python, png, outputs) of the code are stored
        if persistent_run_dir is None:
            persistent_run_dir = last_run_out.get("run_dir")
        print(f"Created and ran code for attempt {attempt} in folder {persistent_run_dir}")

        # Writing some relevant infos into the results
        result = {
            "stdout": last_out,
            "run_dir": persistent_run_dir
        }
        result.update(discover_run_artifacts(persistent_run_dir))

        # Check if code output is valid
        if last_run_out.get("success"):
            if required_cols and result.get("result_csv"):
                val = await mcp("code:code.validate", run_dir=persistent_run_dir, required_columns=required_cols)
                if val.get("passed"):
                    print("Validation OK.")
                    return result
                else:
                    last_errors = val.get("messages", "")
            else:
                return result
        else:
            last_errors = last_run_out.get("stderr_tail")
        print("Got error: ", last_errors[-200:])

    # If didn't succeed after all attempts, add error
    print_step("FAILED (code_general) ❌", level=2)
    result["error"] = "Could not sucesfully complete the coding step"
    return result

async def handler_csv_analytics(step, ctx, max_attempts=5) -> Dict[str, Any]:
    """Same as handler_code_general, but optimized for tasks that involve CSV analytics"""
    # Reading in relevant context
    params = step.get("params") or {}
    required_cols = params.get("result_columns") or []
    if isinstance(required_cols, str):
        required_cols = [c.strip() for c in required_cols.split(",") if c.strip()]
    task_text = ctx.get("user_task","").strip()

    # Choosing the best fitting CSV
    res = await mcp("code:code.list_csvs")
    csvs = res.get("csvs", [])
    chosen, csv_meta = await _choose_csv(task_text, csvs)
    os.environ['CSV_PATH'] = csv_meta["path"] # backup for more robust results

    # Loop until success
    plan = {"intent":"csv_analytics", **params}
    persistent_run_dir = None
    for attempt in range(1, max_attempts + 1):
        # Generating new code
        code = await _generate_code(task_text, plan, csv_meta) if attempt == 1 else \
               await _repair_code(last_code, last_out, last_errors, task_text, plan, csv_meta)
        last_code = code

        # Running the new code
        print_step(f"Attempt {attempt}: run CSV analytics", level=2)
        last_run_out = await mcp("code:code.run", code=code, attempt=attempt, run_dir=persistent_run_dir)
        last_out = last_run_out.get("stdout_tail")

        # Directory where all files (python, png, outputs) of the code are stored
        if persistent_run_dir is None:
            persistent_run_dir = last_run_out.get("run_dir")
        print(f"Created and ran code for attempt {attempt} in folder {persistent_run_dir}")

        # Writing some relevant infos into the results
        result = {
            "stdout": last_out,
            "run_dir": persistent_run_dir
        }
        result.update(discover_run_artifacts(persistent_run_dir))

        # Check if code output is valid
        if last_run_out.get("success"):
            if required_cols and result.get("result_csv"):
                val = await mcp("code:code.validate", run_dir=persistent_run_dir, required_columns=required_cols)
                if val.get("passed"):
                    print("Validation OK.")
                    return result
                else:
                    last_errors = val.get("messages", "")
            else:
                return result
        else:
            last_errors = last_run_out.get("stderr_tail")
        print("Got error: ", last_errors[-200:])

    # If didn't succeed after all attempts, add error
    print_step("FAILED (csv_analytics) ❌", level=2)
    result["error"] = "Could not sucesfully complete the coding csv analytics step"
    return result

We add one more handler that is useful to have:

`summarize`, which takes any kind of outputs and can summarize it into a text. This is also well suited for final outputs and answers.

In [None]:
async def handler_summarize(step, ctx, **kw) -> Dict[str, Any]:
    """Summarize prior outputs (e.g., answer_text / stdout / result_csv head)"""

    # Check what we want to consider and collect it in 'parts'
    wants = step.get("use") or []
    parts = []
    if "answer_text" in wants and ctx.get("answer_text"):
        parts.append(f"[answer_text]\n{ctx['answer_text']}")
    if "stdout" in wants and ctx.get("stdout"):
        parts.append(f"[stdout]\n{ctx['stdout']}")
    if "result_csv" in wants and ctx.get("result_csv"):
        df = pd.read_csv(ctx["result_csv"]).head(10)
        parts.append(f"[result_csv head]\n{df.to_csv(index=False)}")
    if "plot_png" in wants:
        parts.append(f"[plot_png] exists:\n{'plot_png' in ctx}")
    if "error" in ctx:
         parts.append(f"[error before]\n{ctx['error']}")

    # Formulate prompt and integrating the original user's request and the collected outputs
    blob = "\n\n".join(parts) if parts else "(no inputs to summarize)"
    prompt = f"""
    You are part of an agentic task chain. 
    The original user prompt was:\n{ctx.get("user_task")}\n
    This was the full derived plan to solve this (in order of the list): \n{ctx.get("plan")}\n
    Summarize these results:\n{blob}\n
    Give a short explanation and possibly some key takeaway if the output needs to be interpreted.
    If the input is trivial and/or unambigous, just return it in a compact way and skip the takeaways.
    If there is mainly a plot as result, simply refer to it.
    It's possible that there have been errors before, keep that in mind and consider this in your response.
    Your output will be prinited in the console, so keep that in mind for the formatting.
    """ 
    # Let LLM answer this prompt and return the result
    msg = await llm_chat(
        [{"role":"system","content":"Explain clearly and briefly. Avoid jargon."},
         {"role":"user","content": prompt}]
    )
    return {"answer_text": msg.strip()}

### Intent registry

To manage all the functionalities and handlers, we introduce a global register called `INTENT_REGISTRY`.
Here we specify what inputs ("uses"), outputs ("saves") each of these functionality can expect and write to.
This is very useful to easily select the right handlers and get all relevant information about them we can then provide the planner in the first place.

In [None]:
INTENT_REGISTRY = {}
INTENT_REGISTRY.update({
    "code_general": {
        "params": {
            "wants_plot": "Optional bool. If True, prefer producing a chart (save as plot.png).",
            "result_columns": "list[str]. (e.g. ['abc', 'def']) If provided, validate result_csv has these columns. Do a PROPER LIST here as indicated or don'T provide this param at all!"
        },
        "uses": [],
        "saves": ["stdout", "run_dir", "result_csv", "plot_png", "error"],
        "fn": handler_code_general,
        "description": "write & run small Python code (no specific CSV expected and can't read from the provided CSVs)",
    },
    "csv_analytics": {
        "params": {
            "wants_plot": "Optional bool. If True, prefer producing a chart (save as plot.png).",
            "result_columns": "Optional list[str]. If provided, validate result_csv has these columns."
        },
        "uses": [],
        "saves": ["stdout", "run_dir", "result_csv", "plot_png", "error"],
        "fn": handler_csv_analytics,
        "description": "analyze a CSV and optionally produce a table (result_csv) and/or a plot (plot_png). If any CSV reading is required, use this.",
    },
    "summarize": {
        "params": {},
        "uses": ["answer_text", "stdout", "result_csv", "error"],
        "saves": ["answer_text"],
        "fn": handler_summarize,
        "description": "summarize/clarify results from earlier steps",
    },
})

This completes now all the functionalities we need for our agent. At this point, you might now want to revisit the `run_task` function and check the execution step in detail. Now you should be able to follow some of the details of this step!

## Running the agent

Let's now try the agent with some requests!

When the agent creates and runs code, you can inspect the generated files under `resources/runs/`.

> **Note:** We are at several points parsing LLM outputs to desired formats. As you know LLM outputs can potentially be arbitrary, this is not guaranteed to suceed always. Moreover, this notebook is not desined to catch every possibly occuring *error*.
> Hence, it is likely that on certain requests the pipeline will crash.
>
> Usually **retrying or changing the prompt** solves this.
> 
> However, it is also interesting to analyze these crashes to understand why it fails! You can even try to **make it crash** with your prompts!

In [None]:
# 1.1) A simple coding task
user_task = "calculate 17 times 23"
await run_task(user_task)

In [None]:
# 1.2) A more advanced analytics task
user_task = "use the supermarkt csv data and figure out what's the most bought product. No plots needed, just the results"
await run_task(user_task)

In [None]:
# 1.3) Another analytics task with visualization
user_task = "check given the data, which swiss train station is the most busiest one on average per year and find a proper plot to visualize this"
await run_task(user_task)

In [None]:
# 1.4) A challenging ML task. We didn't even design the agent for this, but it might be able to do this. Howver, this request also might fail.
user_task = "Use the bahnhof data and build a simple machine learning model that predicts the number of passengers in the future. Give me your prediction for Zurich HB 2025 and visualize the outcome. Do this in max two steps."
await run_task(user_task)

In [None]:
# 1.5) Ideas for further alternative tasks the agent might be able to solve
user_task = "plot the mandelbrot set"
user_task = "visualize all siwss train stations on a map. make their size according to their average passenger number per year"
user_task = "draw an image with a house, a tree, and a sun"

**(TO-DO)** Try out your own task requests that might be solvable via coding! Be creative and explore the limits of this agent!

## Adding more functionality: Fridge management

Having now the coding functionality, we can also add new functionalities to our agent!
In the next cells we will add (roughly) the same logic of the fridge management as we had it in the notebook before.

As before, we define handler functions, register them in the `INTENT_REGISTRY`, add the corresponding server and try out new prompts!

In [None]:
async def handler_fridge_add(step: Dict[str, Any], ctx: Dict[str, Any], **kw) -> Dict[str, Any]:
    """Adding items to the fridge (the fridge CSV file)"""
    # Reading in relevant context
    print_step("Fridge: add item", level=2)
    p = step.get("params") or {}
    item = (p.get("item") or "").strip()
    qty  = int(p.get("quantity") or 1) # default 1
    exp = str(p.get("expiry_date") or (datetime.today() + timedelta(days=14)).strftime("%Y-%m-%d")) # default 2 weeks from now
    if not item:
        msg = "Missing 'item' in params."
    else:
        # Execute addition functionality on the server
        out = await mcp("fridge:fridge.add", item=item, quantity=qty, expiry_date=exp)
        # Check if operation was successfull and formulate output message accordingly
        if out["ok"]:
            msg = f"Added {qty} × {item}" + (f" (exp {exp})" if exp else "")
        else:
            msg = f"error: {out['error']}, message: {out['message']}"
    print(msg)
    return {"answer_text": msg, "stdout": msg}

async def handler_fridge_remove(step: Dict[str, Any], ctx: Dict[str, Any], **kw) -> Dict[str, Any]:
    """Removing items to the fridge (the fridge CSV file)"""
    # Reading in relevant context
    print_step("Fridge: remove item", level=2)
    p = step.get("params") or {}
    item = (p.get("item") or "").strip()
    qty  = int(p.get("quantity") or 1) # default 1
    if not item:
        msg = "Missing 'item' in params."
    else:
        # Execute addition functionality on the server
        out = await mcp("fridge:fridge.remove", item=item, quantity=qty)
        # Check if operation was successfull and formulate output message accordingly
        if out["ok"]:
            msg = f"Removed {qty} '{item}'"
        else:
            msg = f"error: {out['error']}, message: {out['message']}"
            
    print(msg)
    return {"answer_text": msg, "stdout": msg}

async def handler_fridge_cost(step: Dict[str, Any], ctx: Dict[str, Any], **kw) -> Dict[str, Any]:
    """Estimating the total costs of the current fridge content using an LLM"""
    # Reading in relevant context
    print_step("Fridge: estimate total cost", level=2)
    inv = await mcp("fridge:fridge.list")
    rows = (inv or {}).get("rows", [])
    names = sorted({r.get("Item","") for r in rows if r.get("Item")})

    price_map_raw = {}
    if names:
        # Formulating prompt
        pricing_prompt = (
            "You estimate typical Swiss supermarket prices in CHF for everyday groceries.\n"
            "Return ONLY valid JSON: an object that maps each item name to a positive float (unit price in CHF).\n"
            "Use the EXACT keys I provide (same casing) and assume standard package sizes when relevant.\n\n"
            "Items to price:\n" + "\n".join(f"- {it}" for it in names)
        )
        # Getting price estimations from LLM
        resp = await llm_chat(
            [{"role":"system","content": load_prompts()["system_return_json"]},
             {"role":"user","content": pricing_prompt}]
        )
        print(resp)
        try:
            price_map_raw = json.loads(resp)
        except Exception:
            price_map_raw = {}

    # Create a proper price map
    price_map: Dict[str, float] = {}
    for k, v in (price_map_raw.items() if isinstance(price_map_raw, dict) else []):
        try:
            price_map[str(k)] = float(v)
        except Exception:
            continue

    # Execute cost calculation functionality on the server
    cost = await mcp("fridge:fridge.cost", prices=price_map)

    # Returning result
    items = cost.get("items", [])
    total = cost.get("total_cost")
    cur = cost.get("currency", "CHF")
    missing = cost.get("missing_prices", [])
    msg = f"Estimated total: {total} {cur} (pricing_mode={cost.get('pricing_mode','unknown')})"
    if missing:
        msg += " | Missing: " + ", ".join(missing)

    # Return compact text + in-memory table
    return {"answer_text": msg, "stdout": msg}

In [None]:
await client.add_server("fridge", SERVERS_DIR / "fridge_server.py")

INTENT_REGISTRY.update({
    "fridge_add": {
        "params": {
            "item": "str. Required. Name of the item to add.",
            "quantity": "int. Optional; default 1.",
            "expiry_date": "str. Optional ISO date (YYYY-MM-DD); default two weeks from today."
        },
        "uses": [],
        "saves": ["answer_text", "stdout"],
        "fn": handler_fridge_add,
        "description": "Add an item to the fridge with optional quantity and expiry date."
    },
    "fridge_remove": {
        "params": {
            "item": "str. Required. Name of the item to remove.",
            "quantity": "int. Optional; default 1.",
        },
        "uses": [],
        "saves": ["answer_text", "stdout"],
        "fn": handler_fridge_remove,
        "description": "Remove an item from the fridge by name."
    },
    "fridge_cost": {
        "params": {},
        # If your handler reads a prices table from a previous step, leave 'result_csv' here.
        # If not, you can set [].
        "uses": ["result_csv"],
        "saves": ["answer_text", "stdout"],
        "fn": handler_fridge_cost,
        "description": "Estimate total fridge cost using built-in or provided price guesses."
    },
})

In [None]:
# 2.1) Simple fridge task
user_task = "Show me what's in the fridge!"
await run_task(user_task)

In [None]:
# 2.2) More complex fridge task
user_task = "add two Tomatos to the fridge, then look again into the fridge data and show me what's actually in the fridge"
await run_task(user_task)

In [None]:
# 2.3) (Advanced) Analyze the output here, why does the summary here potentially fail?
user_task = "calculate the fridgs costs, remove then one Tomato from the fridge and then calculate the costs again"
await run_task(user_task)

**(TO-DO)** Try out additional prompts. Specifically, search for tasks that might require combinations of different functionalities, or, tasks, where it's not that trivial to come up with a good plan for the agent!

## (TO-DO) Adding more functionality: Web Search

Now we want to add a web search functionality. it enables our agent to do similar things as for example ChatGPT Web Search.

We already provide the helper function below. Your task is to "plug it in" into our agent and test it with some requests!

> **Note:** We are using a very simple open web browser version. Depending on the search request, the results can be potentially let's say creative, some might say very poor. Keep that in mind for your evaluation.

In [None]:
async def handler_search_web(step: Dict[str, Any], ctx: Dict[str, Any], **kw) -> Dict[str, Any]:
    """
    LLM-in-the-loop search:
      1) LLM proposes 1-3 concise queries (starting from the user question).
      2) For each query, run search.query and (optionally) search.fetch_text.
      3) Ask LLM if the gathered evidence is sufficient; if not, refine and loop (max ~2-3 rounds).
      4) Print a compact final answer + top sources.

    Returns:
      {"answer_text": str, "sources": List[str]}
    """
    print_step("Running web search (LLM-assisted)")

    # keep 'question' behavior identical to original, but read from step/ctx
    question = (step.get("params", {}).get("query") or ctx.get("user_task")).strip()

    # 1. Ask LLM for a few smart queries
    search_planner_prompt = f"""
You are a precise search strategist. Given the user question below,
return STRICT JSON with up to 3 short queries that would likely find the answer quickly.

Return JSON ONLY:
{{
  "queries": ["<short query 1>", "<short query 2>", "<short query 3>"],
  "answer_style": "one-liner" | "bullets" | "short-paragraph"
}}

Question: {question}
""".strip()

    plan_json = json.loads(await llm_chat(
        [{"role":"system","content": load_prompts()["system_return_json"]},
         {"role":"user","content": search_planner_prompt}]
    ))

    queries = [q for q in (plan_json.get("queries") or []) if isinstance(q, str) and q.strip()] or [question]
    answer_style = plan_json.get("answer_style") or "bullets"

    # 2. Loop: search → (optional) fetch → judge sufficiency
    evidence = []  # list of {url,title,snippet,text?}
    seen_urls = set()
    max_rounds = min(3, len(queries) + 1)  # allow a refined query to be added

    for round_idx in range(max_rounds):
        q = queries[round_idx] if round_idx < len(queries) else queries[-1]
        out = await mcp("search:search.query", q=q, top_k=5)
        results = (out or {}).get("results", []) or []

        print(f"Search {round_idx+1}: “{(out or {}).get('query', q)}” → {len(results)} results")

        # Pull top 3 as evidence
        top = results[:3]
        for r in top:
            url = r.get("url", "")
            if not url or url in seen_urls:
                continue
            item = {
                "url": url,
                "title": (r.get("title") or "")[:200],
                "snippet": (r.get("snippet") or "")[:400],
                "text": ""
            }
            evidence.append(item)
            seen_urls.add(url)

        # 3. Ask LLM: do we have enough? If yes, produce the final compact answer
        judge_prompt = f"""
You are a careful answerer.

User question:
{question}

Evidence (array of objects with title, snippet, optional extracted text, url):
{json.dumps(evidence)[:8000]}

Decide if there is enough evidence to answer confidently. Then output STRICT JSON:

{{
  "sufficient": true | false,
  "refined_query": "<few-keyword refined query if not sufficient, else empty>",
  "answer": "<a {answer_style} answer, extremely concise; if bullets, max 5 bullets; if one-liner, keep to one sentence>",
  "sources": ["<up to 3 most relevant urls>"]
}}
""".strip()

        judge = json.loads(await llm_chat(
            [{"role": "system", "content": load_prompts()["system_return_json"]},
             {"role": "user", "content": judge_prompt}]
        ))

        if judge.get("sufficient"):
            # 4. Final compact answer + 2–3 sources 
            ans = (judge.get("answer") or "").strip()
            if not ans:
                ans = "No concise answer could be generated."
            print("\n" + ans)
            srcs = [u for u in (judge.get("sources") or []) if isinstance(u, str) and u.strip()][:3]
            if srcs:
                print("\nSources:")
                for u in srcs:
                    print("-", u)
            return {"answer_text": ans, "sources": srcs}

        # Not sufficient → add refined query once if present and new
        rq = (judge.get("refined_query") or "").strip()
        if rq and rq not in queries:
            queries.append(rq)

    # Fallback if loop ends without sufficient answer
    print("Couldn’t confidently answer from snippets. Consider rephrasing or enabling `search.fetch_text` for deeper context.")
    return {"answer_text": "", "sources": []}

Before actually adding the we search functionality, let's first try the following request. Try this request again after having added the functionality.

In [None]:
user_task = "What were the main events in 2025?"
await run_task(user_task)

In [None]:
# (TO-DO) Add the web search server to client 
# ...
# (TO-DO) Update the registry
# ...

Let's try the request from before again now with the functionality added:

In [None]:
user_task = "What were the main events in 2025?"
await run_task(user_task)

(TO-DO) Feel free to try out new requests, that might require the web search functionality combined with some of the functionalities from before and check if the agent can solve this!