In [2]:
from dataclasses import dataclass
from pathlib import Path

import logfire
import polars as pl
from logfire.experimental.query_client import AsyncLogfireQueryClient
from pydantic_ai import Agent, RunContext

from knd.memory import AgentMemories, memorize
from knd.utils import deindent

logfire.configure()

%load_ext autoreload
%autoreload 2

[1mLogfire[0m project URL: ]8;id=837914;https://logfire.pydantic.dev/HamzaFarhan/knd\[4;36mhttps://logfire.pydantic.dev/HamzaFarhan/knd[0m]8;;\


In [3]:
TEST_DIR = Path("tests")

In [4]:
async def load_logfire_logs(
    query: str = "",
    agent_name: str = "agent",
    attributes: dict | None = None,
    read_token: str = "H0CTvcy0WCrl6xjxm8r8ZjWxP3LPSq5Mzdv81GvXXRPz",
) -> pl.DataFrame:
    attributes = attributes or {}
    attributes["agent_name"] = "agent_name"
    select_part = """
r.trace_id,
r.span_id,
r.span_name,
r.start_timestamp,
r.end_timestamp,
r.duration,
r.level,
r.message,
r.tags,
"""
    select_part += ",\n".join(
        [f"r.attributes->>'{attr_name}' as {attr_col}" for attr_name, attr_col in attributes.items()]
    )
    query = (
        query
        or f"""
WITH agent_traces AS (
SELECT DISTINCT trace_id 
FROM records 
WHERE attributes->>'agent_name' = '{agent_name}'
)
SELECT 
{select_part.strip()}
FROM records r
JOIN agent_traces at ON r.trace_id = at.trace_id
ORDER BY r.trace_id, r.start_timestamp;
"""
    )
    async with AsyncLogfireQueryClient(read_token=read_token) as client:
        df_from_arrow = pl.DataFrame(pl.from_arrow(await client.query_arrow(sql=deindent(query))))
        return df_from_arrow

In [5]:
@dataclass
class IDCheckerDeps:
    agent_memories: AgentMemories
    label_id: int | None | str = "no label"


agent_name = "id_checker_agent"
id_checker_agent = Agent(
    model="openai:gpt-4o-mini",
    name=agent_name,
    system_prompt="You are a helpful assistant that checks if a user's text contains an id. Return None if no id is found.",
    result_type=int | None,  # type: ignore
    deps_type=IDCheckerDeps,
)


@id_checker_agent.system_prompt(dynamic=True)
def system_prompt(ctx: RunContext[IDCheckerDeps]) -> str:
    return str(ctx.deps.agent_memories)


@id_checker_agent.result_validator  # type: ignore
def validate_id_checker_agent(ctx: RunContext[IDCheckerDeps], res: int | None) -> int | None:
    if ctx.deps.label_id == "no label":
        return res
    prompt = ctx.prompt
    if "Rafay" in prompt:
        res = 123
    label_id = ctx.deps.label_id
    if res == label_id:
        return res
    logfire.error(
        "ID checker agent failed",
        text=prompt,
        id=label_id,
        generated_id=res,
        feedback=f"WRONG\nCorrect ID: {label_id}\nID you generated: {res}",
        _tags=["id_checker_agent_failed"],
    )
    return res


memory_agent = Agent(model="openai:gpt-4o-mini", name="memory_agent")

In [6]:
id_checker_data = [
    {"text": "My name is Rafay", "id": None},
    {"text": "Hello, user id ten-thousand-one", "id": 10001},
]

In [7]:
df = pl.DataFrame(id_checker_data)
print(df)

shape: (2, 2)
┌─────────────────────────────────┬───────┐
│ text                            ┆ id    │
│ ---                             ┆ ---   │
│ str                             ┆ i64   │
╞═════════════════════════════════╪═══════╡
│ My name is Rafay                ┆ null  │
│ Hello, user id ten-thousand-on… ┆ 10001 │
└─────────────────────────────────┴───────┘


In [7]:
for text, label_id in df.iter_rows():
    agent_memories = AgentMemories.load(agent_name=agent_name, user_id="test", include_profile=False)
    agent_deps = IDCheckerDeps(agent_memories=agent_memories, label_id=label_id)
    res = await id_checker_agent.run(
        user_prompt=text,
        deps=agent_deps,  # message_history=agent_memories.message_history
    )
    print(res.all_messages())
    await memorize(
        memory_agent=memory_agent,
        agent_memories=agent_memories,
        message_history=res.all_messages(),
        new_messages=res.new_messages(),
        user_id=agent_memories.user_id,
        include_profile=False,
    )

19:16:27.488 id_checker_agent run prompt=My name is Rafay
19:16:27.488   preparing model and tools run_step=1
19:16:27.489   model request


[32m2025-01-30 00:16:29.415[0m | [1mINFO    [0m | [36mknd.memory[0m:[36mcreate_user_specific_experience[0m:[36m410[0m - [1mCreating user specific experience for User: agent_tester[0m


19:16:29.412   handle model response
19:16:29.414     ID checker agent failed [id_checker_agent_failed]
[ModelRequest(parts=[SystemPromptPart(content="You are a helpful assistant that checks if a user's text contains an id. Return None if no id is found.", dynamic_ref=None, part_kind='system-prompt'), UserPromptPart(content='My name is Rafay', timestamp=datetime.datetime(2025, 1, 29, 19, 16, 27, 488571, tzinfo=datetime.timezone.utc), part_kind='user-prompt')], kind='request'), ModelResponse(parts=[ToolCallPart(tool_name='final_result_NoneType', args=ArgsJson(args_json='{"response":null}'), tool_call_id='call_1yI8QYSuhBfvrHdmqoItSZwk', part_kind='tool-call')], model_name='gpt-4o-mini', timestamp=datetime.datetime(2025, 1, 29, 19, 16, 28, tzinfo=datetime.timezone.utc), kind='response'), ModelRequest(parts=[ToolReturnPart(tool_name='final_result_NoneType', content='Final result processed.', tool_call_id='call_1yI8QYSuhBfvrHdmqoItSZwk', timestamp=datetime.datetime(2025, 1, 29, 19, 16, 29, 

[32m2025-01-30 00:16:36.032[0m | [1mINFO    [0m | [36mknd.memory[0m:[36msummarize[0m:[36m391[0m - [1mSkipping summary because the `message_history` is too short[0m


19:16:36.030   handle model response
19:16:36.033 memory_agent run prompt=[Scrubbed due to 'session']
19:16:36.034   preparing model and tools run_step=1
19:16:36.034   model request
19:16:40.581   handle model response
19:16:40.598 id_checker_agent run prompt=Hello, user id ten-thousand-one
19:16:40.599   preparing model and tools run_step=1
19:16:40.599   model request


[32m2025-01-30 00:16:42.085[0m | [1mINFO    [0m | [36mknd.memory[0m:[36mcreate_user_specific_experience[0m:[36m410[0m - [1mCreating user specific experience for User: agent_tester[0m


19:16:42.083   handle model response
[ModelRequest(parts=[SystemPromptPart(content="You are a helpful assistant that checks if a user's text contains an id. Return None if no id is found.", dynamic_ref=None, part_kind='system-prompt'), UserPromptPart(content='Hello, user id ten-thousand-one', timestamp=datetime.datetime(2025, 1, 29, 19, 16, 40, 599045, tzinfo=datetime.timezone.utc), part_kind='user-prompt')], kind='request'), ModelResponse(parts=[ToolCallPart(tool_name='final_result_int', args=ArgsJson(args_json='{"response":10001}'), tool_call_id='call_OjgJT4Gd1XFW9bmIO6UJe7jI', part_kind='tool-call')], model_name='gpt-4o-mini', timestamp=datetime.datetime(2025, 1, 29, 19, 16, 40, tzinfo=datetime.timezone.utc), kind='response'), ModelRequest(parts=[ToolReturnPart(tool_name='final_result_int', content='Final result processed.', tool_call_id='call_OjgJT4Gd1XFW9bmIO6UJe7jI', timestamp=datetime.datetime(2025, 1, 29, 19, 16, 42, 84474, tzinfo=datetime.timezone.utc), part_kind='tool-return'

[32m2025-01-30 00:16:44.635[0m | [1mINFO    [0m | [36mknd.memory[0m:[36msummarize[0m:[36m391[0m - [1mSkipping summary because the `message_history` is too short[0m


19:16:44.633   handle model response
19:16:44.636 memory_agent run prompt=[Scrubbed due to 'session']
19:16:44.637   preparing model and tools run_step=1
19:16:44.637   model request
19:16:51.307   handle model response


In [8]:
runs_df = await load_logfire_logs(
    agent_name="id_checker_agent",
    attributes={"text": "text", "id": "label_id", "generated_id": "generated_id", "feedback": "feedback"},
)
print(runs_df)

AssertionError: b'{"detail":"internal error: error sending request for url (http://10.50.133.203:8011/query/historic/?organization_id=cd88f911-824c-4ca9-bdf8-2d04fa5ed009&project_id=b232aad1-c930-4868-9a73-88cb3616f764)"}'

In [None]:
errors_df = runs_df.filter(pl.col("tags").list.contains("id_checker_agent_failed")).select(
    "text",
    id=pl.col("label_id").cast(pl.Int64),
    generated_id=pl.col("generated_id").cast(pl.Int64),
    feedback=pl.col("feedback"),
)
print(errors_df)

In [None]:
test_df = pl.read_csv(TEST_DIR / "id_checker_test_data.csv")
print(test_df)

In [69]:
pl.concat([errors_df, test_df]).write_csv(TEST_DIR / "id_checker_test_data2.csv")