In [None]:
from dotenv import load_dotenv

load_dotenv()

In [None]:
%set_env CHETANBASE_SECRET_KEY=secret

In [None]:
from chetan import SessionManager, ChetanbaseClient


mgr = SessionManager(ChetanbaseClient("<api_url>"))

### Language Model

In [None]:
# from chetan.lm.local import LMTransformers
from chetan.lm.openai import LMOpenAI
from openai import AzureOpenAI, OpenAI
from chetan.lm.groq import LMGroq
from chetan.lm.anthropic import LMAnthropic

import os

# # Load a local model for maximum control
# lm = LMTransformers("NousResearch/DeepHermes-3-Llama-3-8B-Preview")

# Supports OpenAI and Azure OpenAI
lm = LMOpenAI(
    client=AzureOpenAI(
        azure_endpoint=os.getenv("AZURE_ENDPOINT"),
        api_key=os.getenv("AZURE_KEY"),
        api_version=os.getenv("AZURE_API_VERSION"),
    ),
    model="gpt-4.1" 
    # model="gpt-4.1-nano", # * Works like a charm, despite being a nano model
    
    # # ! Local vLLM server, couldn't test due to GPU memory constraints
    # client=OpenAI(base_url="http://192.168.10.179:8000/v1", api_key="none"), 
    # model="NousResearch/Hermes-3-Llama-3.2-3B",
)

# # Supports Anthropic
# lm = LMAnthropic()

# # Supports Groq
# lm = LMGroq()

# Register the language model as default
mgr.lm["default"] = lm

### Agent Architecture
#### Modules


In [None]:
from llama_index.core import SimpleDirectoryReader
from chetan.modules import rag, memory, recommender
from llama_index.embeddings.huggingface import HuggingFaceEmbedding

rag_module = rag.LlamaIndexRAGModule(
    lambda: SimpleDirectoryReader("/Users/arjo/Downloads/rag_data").load_data(),
    embed_model_fn=lambda: HuggingFaceEmbedding(model_name="all-MiniLM-L6-v2"),
)
memory_module = memory.SmritiMemoryModule()
tool_rec_module = recommender.ToolRecommenderModule(
    embedder="NovaSearch/stella_en_400M_v5"
)

### DONE ####

### Agent Loop

These are the stages of an agent loop:
- Prologue (every function run in parallel)
- Process (Agent text generation with tool calls)
- Epilogue (every function run in paralle)
- Retrigger (a condition that must be fulfilled to reiterate the loop, generally user approval)

In [None]:
from chetan.agent import AgentLoop
from chetan.agent.loop import ProcessFunctionContext

agentloop = AgentLoop(mgr).use(
    # rag_module,
    memory_module,
    tool_rec_module,
)


@agentloop.process
async def process_fn(ctx: ProcessFunctionContext):
    await ctx.generate_with_tool_call()
    await ctx.execute_tool_calls()


mgr.agentloop["default"] = agentloop

### Tools
#### Local tool
##### Code Executor

In [None]:
from chetan.tools import Tool, toolfn
from agentrun import AgentRun


class CodeExecutor(Tool):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.runner: AgentRun = AgentRun(container_name="agentrun-api-api-1")

    @toolfn
    def run_code(self, code: str) -> str:
        """
        Run the given code in the container.
        """
        # Use the agentrun container to run the code
        result = self.runner.execute_code_in_container(code)
        return result


##### Web Crawler

In [None]:
from crawl4ai import AsyncWebCrawler

class WebCrawler(Tool):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.crawler = AsyncWebCrawler()

    @toolfn
    async def crawl(self, url: str) -> str:
        """
        Crawl the given URL and return the content.
        """
        return (await self.crawler.arun(url)).markdown

#### MCP
Supports both local and remote.

In [None]:
# from chetan.tools import ToolNamespace
from chetan.tools.mcp import MCPLoader
from mcp import StdioServerParameters

mcp_tool_paths = {
    # ! US National Weather Service API
    "weather": "/Users/arjo/Work/self/mcp_test/weather.py",
    # "http://192.168.10.225:8000/sse", # * Same as the one above, but using a HTTP SSE endpoint
    # These two on the top are the same
    # ! Tavily Search
    "tavily": StdioServerParameters(
        command="python",
        args=["-m", "mcp_server_tavily"],
        env={"TAVILY_API_KEY": os.getenv("TAVILY_API_KEY")},
    ),
    # # ! GitHub API
    # "github": StdioServerParameters(
    #     command="docker",
    #     args=[
    #         "run",
    #         "-i",
    #         "--rm",
    #         "-e",
    #         "GITHUB_PERSONAL_ACCESS_TOKEN",
    #         "ghcr.io/github/github-mcp-server",
    #     ],
    #     env={"GITHUB_PERSONAL_ACCESS_TOKEN": os.getenv("GITHUB_PERSONAL_ACCESS_TOKEN")},
    # ),
    # # ! Notion API
    # "notion": StdioServerParameters(
    #     command="npx",
    #     args=["-y", "@notionhq/notion-mcp-server"],
    #     env={
    #         "OPENAPI_MCP_HEADERS": '{"Authorization": "Bearer '
    #         + os.getenv("NOTION_INTEGRATION_SECRET")
    #         + '", "Notion-Version": "2022-06-28" }'
    #     },
    # ),
}

mcp_tools = await MCPLoader.load_from_paths(
    mcp_tool_paths,
    # python_default="/Users/arjo/Work/self/mcp_test/.venv/bin/python",
)

##### Adding tools to the registry namespace

In [None]:
for name, tool in mcp_tools.items():
    mgr.tools.register("mcp." + name, tool)
    
# mgr.tools.register("code", CodeExecutor())
mgr.tools.register("web", WebCrawler())

In [None]:
mgr.tools.print_tree()

In [None]:
from chetan.agent import Agent
from chetan.entity.user import User

mgr.agents.add(
    Agent(
        id="support_agent",
        role="Support Agent",
        description="A customer support agent with supervision",
        # system_prompt="You are a support agent. Provide assistance to the user thereby. If clarification is needed, ask the supervisor.",
        system_prompt="""You are a helpful agent operating in a loop of observation and action.

You cannot use any tool without first informing the user of your intention. Only after writing to the user do you automatically invoke the tool in the same response output.

Always use <|stop|> when you have fully answered the user's query and do not need to call any tools or wait for more information. 
Do NOT use <|stop|> if you are about to call a tool or need to continue the loop.

After you have provided a direct answer to the user's query, always output <|stop|> unless the user has explicitly asked for further details, follow-up, or another action.

General guidelines for <|stop|>:
- Use it to end the loop when you have provided a complete answer to the user's query.
- Always use it to wait and ask for more information from the user when needed.
""",
    )
)
    

mgr.users.add(
    User(id="customer", description="A customer seeking support"),
    User(id="supervisor", description="A supervisor overseeing support"),
)


In [None]:
mgr.setup()

In [None]:
mgr.agents["support_agent"].context.clear(retain_system_prompt=True)
await mgr.agents["support_agent"].prompt(input())

In [None]:
mgr.agents["support_agent"].context.save_json("./ctx.json")

In [None]:
# * Trial if agent is exiting the loop without endless loop
for _ in range(3):
    mgr.agents["support_agent"].context.clear(retain_system_prompt=True)
    await mgr.agents["support_agent"].prompt("How are you doing?")
    print("-" * 100)

In [None]:
from pydantic import BaseModel

class FinancialReport(BaseModel):
    revenue: float = 0.0
    net_income: float = 0.0
    earnings_per_share: float = 0.0
    total_assets: float = 0.0
    total_liabilities: float = 0.0
    total_equity: float = 0.0
    operating_income: float = 0.0
    gross_profit: float = 0.0
    tax_expense: float = 0.0
    cash_flow_operations: float = 0.0
    cash_flow_investing: float = 0.0
    cash_flow_financing: float = 0.0
    fiscal_year: str = ""
    quarter: int = 0
    currency: str = "USD"
    company_name: str = ""
    debt_to_equity_ratio: float = 0.0
    current_ratio: float = 0.0
    return_on_assets: float = 0.0
    return_on_equity: float = 0.0
    profit_margin: float = 0.0
    segment_revenues: dict = {}
    tax_rate: float = 0.0
    dividend_payout: float = 0.0
    research_development_expense: float = 0.0

await mgr.agents["support_agent"].prompt(
    "Provide a report on Google finance statement regarding their income, sales, taxes in the last 2 years",
    FinancialReport
)

#### Manifesto

A rule can check on these:
- Agent Output (from LLM)
- Structured outputs (from LLM)
- Explicit communication messages (in a System)
- Structured protocols (in a System)
- Tool Calls

It can possible also check:
- User behavior

In [None]:
from chetan.manifesto import Manifesto, validator_on_message, ValidatorFeedback, Connection
from chetan.manifesto.rules import Rule
from chetan.types.context.agent import EntityMessage


class CustomerSupportSystemManifesto(Manifesto):
    def __init__(self):
        
        # These will be injected into the agent's system prompt
        self.purpose = "The customer support system is designed to assist users with their queries while maintaining a professional and respectful communication environment."
        self.rules.extend(
            {
                "no_cuss_word": Rule(
                    "Never use any cusswords or offensive language in any communication with the user.",
                    connections=[
                        Connection("support_agent", "*"),  # support agent to anyone
                    ],
                ),
            }
        )
        super().__init__("Customer Support System Manifesto")

    @validator_on_message("no_cuss_word")
    def validate_cuss_word(self, message: EntityMessage) -> bool:
        message_text = message.content.lower()
        cusswords = []  # some cusswords to check against
        for cussword in cusswords:
            if cussword in message_text:
                return ValidatorFeedback(
                    score=-2, message="Use of cussword is strictly prohibited"
                )


### System
```mermaid
graph LR
    customer <--> agent <--> supervisor
```

In [None]:
from chetan.system.sequential import SequentialSystem

mgr.system = SequentialSystem(mgr).create("customer", "support_agent", "supervisor")


In [None]:
from chetan.orchestra.chat import ChatOrchestrator

chat = ChatOrchestrator(mgr)


In [None]:
# mgr.setup()

In [None]:
@chat.user("customer").handle_incomeing_messages
def handle_incoming_messages(self, message):
    # Process the incoming message from the customer
    print(f"Customer message received: {message.content}")
    # Here you can add logic to handle the message, e.g., logging, processing, etc.
    return "Message received and being processed."


In [None]:
chat.user("customer").to("support_agent").message("Can you summarize your privacy policy?")