In [2]:
from dotenv import load_dotenv
import os
from langchain_openai import ChatOpenAI

load_dotenv()

model = ChatOpenAI(model='gpt-4o')
result = model.invoke("hello")
result

AIMessage(content='Hello! How can I assist you today?', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 9, 'prompt_tokens': 8, 'total_tokens': 17, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_a288987b44', 'id': 'chatcmpl-BrpM4XmmWEz5U3ZXLsjPk0AMYgnCy', 'service_tier': 'default', 'finish_reason': 'stop', 'logprobs': None}, id='run--23a78a4a-412c-4bda-84fe-065e0bde9608-0', usage_metadata={'input_tokens': 8, 'output_tokens': 9, 'total_tokens': 17, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}})

In [3]:
from typing import Annotated, List
import operator
from typing_extensions import Literal
from pydantic import BaseModel, Field
from langchain_core.messages import HumanMessage, SystemMessage
from typing_extensions import TypedDict

In [None]:
class Section(BaseModel):
    name:str=Field(description="Name for this section of the report")
    description:str=Field(description="Brief overview of the main topics and concepts of the section")

class Sections(BaseModel):
    sections:List[Section]=Field(
        description="Sections of the report"
    )

planner = model.with_structured_output(Sections)


In [8]:
###Creating Workers Dynamically in LangGraph
from langgraph.constants import Send

class State(TypedDict):
    topic:str #Report Topic
    sections: list[Section] #List of report sections
    completed_sections: Annotated[list, operator.add] #All workers write to this key in parallel
    final_report:str #Final report

class WorkerState(TypedDict):
    section:Section
    completed_sections:Annotated[list, operator.add]

In [9]:
def orchestrator(state:State):
    """Orchestrator that generates a plan for the report"""
    #Generate Queries
    report_sections = planner.invoke(
        [
            SystemMessage(content="Generate a plan for the report"),
            HumanMessage(content=f"Here is the report topic: {state['topic']}"),
        ]
    )
    print("Report Sections: ", report_sections)
    return {"sections": report_sections.sections}

### LLM Call

In [12]:
def llm_call(state: WorkerState):
    """Worker writes a section of the report"""
    section = model.invoke(
        [
        SystemMessage(
            content = f"Write a report section following the provided name and description. Inclue no preamble for each section. Use markdown formatting"
        ),
        HumanMessage(
            content=f"Here is the section name: {state['section'].name} and description: {state['section'].description}"
        ),
        ]
    )

    return {"completed sections": [section.content]}

def assign_workers(state:State):
    """Assign a worker to each section in the plan"""
    #Kick off section writing in parallel via Send() API
    return [Send("llm_call", {"section":s}) for s in state["sections"]]

def synthesizer(state: State):
    """Synthesize full report from sections"""
    #List of completed sections
    completed_sections = state["completed_sections"]

    #Format completed section to str to use as context for final sections
    completed_report_sections = "\n\n---\n\n".join(completed_sections)
    return {"final_report": completed_report_sections}

### Build Workflow

In [15]:
from langgraph.graph import StateGraph, START, END
orchestrator_worker_builder = StateGraph(State)

#Add the nodes
orchestrator_worker_builder.add_node("orchestrator", orchestrator)
orchestrator_worker_builder.add_node("llm_call", llm_call)
orchestrator_worker_builder.add_node("synthesizer", synthesizer)

#Add edges to connect nodes
orchestrator_worker_builder.add_edge(START, "orchestrator")
orchestrator_worker_builder.add_conditional_edges(
    "orchestrator", assign_workers, ["llm_call"]
)
orchestrator_worker_builder.add_edge("llm_call", "synthesizer")
orchestrator_worker_builder.add_edge("synthesizer", END)

orchestrator_worker = orchestrator_worker_builder.compile()

In [17]:
state = orchestrator_worker.invoke({"topic":"Create a report on Agentic AI RAGs"})
from IPython.display import Markdown
Markdown(state["final_report"])

Report Sections:  sections=[Section(name='Introduction', description='This section will introduce the concept of Agentic AI RAGs (Retrieval-Augmented Generation systems), outlining their historical context, emergence, and relevance in contemporary AI developments. It will also briefly touch on the objective of the report to explore the potential applications and implications of these systems.'), Section(name='Understanding Agentic AI RAGs', description='This section will delve deeper into what Agentic AI RAGs are, providing a clear definition and explanation. It will cover the principles behind retrieval-augmented generation and how it integrates with agency – the ability of AI to perform autonomous tasks. Examples of current implementations and theoretical frameworks will be discussed.'), Section(name='Technical Architecture of Agentic AI RAGs', description='An exploration of the technical underpinnings of Agentic AI RAGs, detailing the architecture, components, and technologies invol

