In [45]:
# Import libraries
from abc import ABC, abstractmethod
import os
from typing import Any, Dict, List, Literal, TypedDict, Optional
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from langgraph.graph import StateGraph, START, END
from langchain.messages import AIMessage, HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI

In [46]:
# constant settings
load_dotenv(".env")

BASE_URL_GEMINI = "https://generativelanguage.googleapis.com/v1beta/openai/"

GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
if not GEMINI_API_KEY:
    raise RuntimeError("Missing GEMINI_API_KEY in env.")

In [None]:
# abstact agent class as base
class BaseAgent(ABC):
    """Hold base capabilities that must be owned by every agent."""
    def __init__(
        self,
        id: str,
        llm: ChatOpenAI,
        system_prompt: str="",
        name: str="My AI"
    ):
        """
        Constructor of the agent.

        Args:
            id: Unique ID represents the identity of the agent.
            llm: A langchain ChatOpenAI class to handle the request
            name: Name of the agent to identify them.
            system_prompt: Prompt to set up the AI.
        """
        self.id = id
        self.name = name
        self.llm = llm
        self.system_prompt = system_prompt

    @abstractmethod
    async def ainvoke_llm(
        self,
        messages: List[Any],
        additional_context: Dict[Any, Any]={}
    ):
        """
        Invoking the llm to get the answer from the AI.

        Args:
            messages: A list of messages sent to the llm.
            additional_context: Inject any additional context to teh LLM when needed.
        """
        ...

In [35]:
class Agent(BaseAgent):
    """Implementation of the BaseAgent class"""
    def __init__(
        self, id: str, llm: ChatOpenAI, system_prompt: str="", name: str="My AI"
    ):
        super().__init__(id, llm, system_prompt, name)

    async def ainvoke_llm(
        self,
        messages: List[Any],
        additional_context: Dict[Any, Any]={}
    ):
        messages[0:0] = [SystemMessage(self.system_prompt)]
        return await self.llm.ainvoke(messages)

In [59]:
gemini_2_5_flash = ChatOpenAI(
    model="gemini-2.5-flash",
    base_url=BASE_URL_GEMINI,
    api_key=GEMINI_API_KEY
)

We are making a supervisor that can understand user input and its intention. We want to detect whether users want to ask something (q and a), doing some research, doing an analysis or any other specific task that we want our agent do.

In [None]:
class TaskTypes(BaseModel):
    task_type: Literal[
        "question_answering",
        "budget_analysis"
    ]
    # reasoning: str = Field(
    #     description="Brief explanation why you choose the type of the task."
    # )

main_supervisor = gemini_2_5_flash.with_structured_output(TaskTypes, include_raw=True)

Since we are using OpenAI protocol, we will always get the same response format. We can catch them using specific format as follow.

In [63]:
# format to catch OpenAI response
class ResponseMetadata(BaseModel):
    token_usage: Dict[str, Any]
    model_provider: Optional[str]
    model_name: Optional[str]
    system_fingerprint: Optional[str]
    id: Optional[str]
    finish_reason: Optional[str]
    logprobs: Optional[List[Any]] = []

class UsageMetadata(BaseModel):
    input_tokens: Optional[int]
    output_tokens: Optional[int]
    total_tokens: Optional[int]
    input_token_details: Optional[dict]
    output_token_details: Optional[dict]

class AgentResponse(BaseModel):
    """Standard response format for agents"""
    id: str
    content: str
    additional_kwargs: Optional[Dict[str, Any]]
    response_metadata: ResponseMetadata
    type: str
    name: Optional[str]
    tool_calls: Optional[List[str]]
    invalid_tool_calls: list
    usage_metadata: UsageMetadata

Another important thing that we need to prepare is the state. State is a collection of data that make the between agent communication becomes possible. Imagine, you are working in group, A will write down part 5 only after B wrote part 3. In between agent communication, this is possible to be achieved using State.

In [64]:
class TaskState(TypedDict):
    user_input: List[Any]
    task_type: str
    result: str
    total_token: float = 0.0

In [65]:
# Supervisor agent. Agent that will manage another agent
# In this case, this agent will accept user input and distribute the task to
# specific agent
supervisor = Agent(
    id="Supervisor",
    llm=main_supervisor,
    system_prompt= (
        "You are  Queen! AI that responsible to route user's request"
        "to specific AI agent."
        "Your job is to classify users request into specific task!"
    )
)

In [66]:
# Worker agent. Worker agent is the specialist agent that will work by executing
# specific tasks.
question_answer = Agent(
    id="QnA",
    llm=gemini_2_5_flash,
    system_prompt="You are a chatbot agent that will answer users question."
)

budget_analyst = Agent(
    id="BudgetAnalyzer",
    llm=gemini_2_5_flash,
    system_prompt="You are a professional budget analyst. Your task is to help users give advice about budget and transactions."
)

# Building the Workflow

Now, the last step is building the workflow of our agents. Here, we will define how our workflow will go through.

In [67]:
task = await supervisor.ainvoke_llm(
    messages = [SystemMessage(content='You are a helpful assistant', additional_kwargs={}, response_metadata={}),
  HumanMessage(content='Apa itu langit', additional_kwargs={}, response_metadata={})]
)

In [106]:
# this will be first flow
async def task_type_decided(state: TaskState):
    task = await supervisor.ainvoke_llm(
        messages = [{'role': "user", 'content': state['user_input']}]
    )
    print(task)
    return {
        'task_type': task.get("parsed").task_type,
        'total_token': task.get('raw').usage_metadata.get('total_tokens')
    }

# second flow, after we know the type of the task
async def qna(state: TaskState):
    res = await question_answer.ainvoke_llm(
        messages = [{'role': "user", 'content': state['user_input']}]
    )
    return {
        'result': res.content,
        'total_token': state['total_token'] + res.usage_metadata.get('total_tokens')
    }

async def budget_analysis(state: TaskState):
    res = await budget_analyst.ainvoke_llm(
        messages =  [{'role': "user", 'content': state['user_input']}]
    )
    return {
        'result': res.content,
        'total_token': state['total_token'] + res.usage_metadata.get('total_tokens')
    }

In [None]:
async def decision(state: TaskState):
    if state['task_type'] == "question_answering":
        return "qna"
    else:
        return "budget_analysis"

In [107]:
workflow = StateGraph(TaskState)

workflow.add_node("initial_decision", task_type_decided)
workflow.add_node("qna", qna)
workflow.add_node("budget_analysis", budget_analysis)

workflow.add_edge(START, "initial_decision")
workflow.add_conditional_edges(
    "initial_decision",
    decision,
    {
        "qna": "qna",
        "budget_analysis": "budget_analysis"
    }
)
workflow.add_edge("qna", END)
workflow.add_edge("budget_analysis", END)

compiled_workflow = workflow.compile()

In [108]:
task_qna = await compiled_workflow.ainvoke({'user_input': "Apa itu langit?"})

{'raw': AIMessage(content='{"task_type": "question_answering", "reasoning": "The user is asking a direct question \'What is the sky?\', which requires an informative answer."}', additional_kwargs={'parsed': TaskTypes(task_type='question_answering', reasoning="The user is asking a direct question 'What is the sky?', which requires an informative answer."), 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 36, 'prompt_tokens': 36, 'total_tokens': 126, 'completion_tokens_details': None, 'prompt_tokens_details': None}, 'model_provider': 'openai', 'model_name': 'gemini-2.5-flash', 'system_fingerprint': None, 'id': 'Wm4Yaa69J5icjuMPyZSIwAo', 'finish_reason': 'stop', 'logprobs': None}, id='lc_run--57c03776-85bd-44f3-b15e-a8a764866db5-0', usage_metadata={'input_tokens': 36, 'output_tokens': 36, 'total_tokens': 126, 'input_token_details': {}, 'output_token_details': {}}), 'parsed': TaskTypes(task_type='question_answering', reasoning="The user is asking a direct question 

In [109]:
task_qna

{'user_input': 'Apa itu langit?',
 'task_type': 'question_answering',
 'result': 'Langit adalah **segala sesuatu yang membentang di atas kepala kita ketika kita melihat ke atas dari permukaan Bumi**. Ini adalah pemandangan yang selalu berubah, penuh dengan fenomena alam dan objek-objek langit.\n\nSecara ilmiah, langit yang kita lihat sebenarnya adalah **atmosfer Bumi**, yaitu lapisan gas yang menyelimuti planet kita. Atmosfer ini terdiri dari berbagai gas seperti nitrogen, oksigen, argon, dan karbon dioksida.\n\nBerikut adalah beberapa aspek utama tentang langit:\n\n1.  **Warna Langit:**\n    *   **Siang hari:** Langit tampak biru karena fenomena **hamburan Rayleigh**. Sinar matahari yang masuk ke atmosfer dihamburkan oleh molekul-molekul gas. Cahaya biru memiliki gelombang yang lebih pendek dan lebih mudah dihamburkan ke segala arah, sehingga kita melihat langit berwarna biru.\n    *   **Matahari Terbit/Terbenam:** Langit sering berwarna merah, oranye, atau merah muda. Ini terjadi kar

In [110]:
task_budgeting = await compiled_workflow.ainvoke({'user_input': (
    "Kalau pengeluaran ku setiap bulan adalah sebagai berikut: beli makan pokok:"
    " 1 juta, beli jajan: 300 ribu, uang kos: 450 ribu, kirim ortu: 450 ribu"
    "Dengan income 4.5 juta, kira kira gimana budgeting yang bagus ya?"
)})

{'raw': AIMessage(content='{"task_type": "budget_analysis", "reasoning": "The user provided details of their monthly income and expenses and is asking for advice on how to create a good budget. This involves analyzing financial figures to provide budgeting recommendations."}', additional_kwargs={'parsed': TaskTypes(task_type='budget_analysis', reasoning='The user provided details of their monthly income and expenses and is asking for advice on how to create a good budget. This involves analyzing financial figures to provide budgeting recommendations.'), 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 49, 'prompt_tokens': 96, 'total_tokens': 195, 'completion_tokens_details': None, 'prompt_tokens_details': None}, 'model_provider': 'openai', 'model_name': 'gemini-2.5-flash', 'system_fingerprint': None, 'id': 'zm8YaYPSL4_dqfkPnYzgyAo', 'finish_reason': 'stop', 'logprobs': None}, id='lc_run--6f53c433-877d-466b-af34-c4bc804d3f4e-0', usage_metadata={'input_tokens': 9

In [113]:
task_budgeting

{'user_input': 'Kalau pengeluaran ku setiap bulan adalah sebagai berikut: beli makan pokok: 1 juta, beli jajan: 300 ribu, uang kos: 450 ribu, kirim ortu: 450 ribuDengan income 4.5 juta, kira kira gimana budgeting yang bagus ya?',
 'task_type': 'budget_analysis',
 'result': 'Halo! Selamat, dengan income Rp 4.5 juta dan pengeluaran saat ini, Anda memiliki potensi besar untuk menabung dan mengatur keuangan dengan sangat baik. Mari kita bedah dan susun budgeting yang bagus untuk Anda.\n\n**1. Analisis Situasi Saat Ini:**\n\n*   **Income Bulanan:** Rp 4.500.000\n*   **Pengeluaran Rutin Anda:**\n    *   Beli makan pokok: Rp 1.000.000\n    *   Beli jajan: Rp 300.000\n    *   Uang kos: Rp 450.000\n    *   Kirim ortu: Rp 450.000\n*   **Total Pengeluaran Saat Ini:** Rp 1.000.000 + Rp 300.000 + Rp 450.000 + Rp 450.000 = **Rp 2.200.000**\n\n*   **Sisa Dana Setelah Pengeluaran Rutin:** Rp 4.500.000 - Rp 2.200.000 = **Rp 2.300.000**\n\nIni adalah kabar baik! Anda memiliki sisa dana yang cukup besar 