<a href="https://colab.research.google.com/github/rsrini7/Colabs/blob/main/PydanticAI_Graph.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
!pip install pydantic-graph pydantic-ai nest-asyncio -q

In [15]:
import nest_asyncio
nest_asyncio.apply()

from dataclasses import dataclass
from pydantic_graph import GraphRunContext, BaseNode, Graph, End

@dataclass
class NodeA(BaseNode[int]):
    track_number: int
    async def run(self, ctx: GraphRunContext) -> BaseNode:
        print(f'Calling Node A')
        return NodeB(self.track_number)

@dataclass
class NodeB(BaseNode[int]):
    track_number: int
    async def run(self, ctx: GraphRunContext) -> BaseNode | End:
        print(f'Calling Node B')
        if self.track_number == 1:
            return End(f'Stop at Node B with value --> {self.track_number}')
        else:
            return NodeC(self.track_number)

@dataclass
class NodeC(BaseNode[int]):
    track_number: int
    async def run(self, ctx: GraphRunContext) -> End:
      print(f'Calling Node C')
      return End(f'Value to be returned at Node C: {self.track_number}')


graph = Graph(nodes=[NodeA, NodeB, NodeC])

result = graph.run_sync(start_node=NodeA(track_number=4))

print('-' * 40)
print(f'Result: {result}')
print(f'Result Persistence: {result.persistence}')

Calling Node A
Calling Node B
Calling Node C
----------------------------------------
Result: GraphRunResult(output='Value to be returned at Node C: 4', state=None)
Result Persistence: SimpleStatePersistence(last_snapshot=EndSnapshot(state=None, result=End(data='Value to be returned at Node C: 4'), ts=datetime.datetime(2025, 5, 11, 3, 39, 55, 334144, tzinfo=datetime.timezone.utc), kind='end', id='end:a7bda02c74d345158aae2e480bbb00ca'))


In [29]:
import nest_asyncio
nest_asyncio.apply()

from __future__ import annotations as _annotations
from dataclasses import dataclass, field

from pydantic import BaseModel, EmailStr
from pydantic_ai import Agent
from pydantic_ai.format_as_xml import format_as_xml
from pydantic_ai.messages import ModelMessage
from pydantic_graph import BaseNode, End, Graph, GraphRunContext
from pydantic_ai.models.openai import OpenAIModel
from pydantic_ai.providers.openai import OpenAIProvider
from google.colab import userdata


@dataclass
class User:
  name: str
  email: EmailStr
  interests: list[str]

@dataclass
class State:
  user: User
  write_agent_messages: list[ModelMessage] = field(default_factory=list)

@dataclass
class Email:
  subject: str
  body: str

class EmailRequiresWrite(BaseModel):
  feedback: str

class EmailOK(BaseModel):
  pass

openrouter_api_key = userdata.get('OPENROUTER_API_KEY')
openai_model = OpenAIModel('openai/gpt-3.5-turbo',
    provider=OpenAIProvider(api_key=openrouter_api_key,
                    base_url="https://openrouter.ai/api/v1",))


email_writer_agent = Agent(
    model=openai_model,
    result_type=Email,
    system_prompt='Write a welcome email for people who subscribe to my tech blog.'
)

feedback_agent = Agent(
    model=openai_model,
    result_type=EmailRequiresWrite | EmailOK,
    system_prompt=(
        'Review the email and provide feedback, email must reference the users specific interests.'
    )
)


@dataclass
class WriteEmail(BaseNode[State]):
    email_feedback: str | None = None

    async def run(self, ctx: GraphRunContext[State]) -> Feedback:
        print(f"{'-' * 50}\nWriteEmail call fired. Email feedback: {self.email_feedback}")
        print()
        if self.email_feedback:
            prompt = (
                f'Rewrite the email for the user:\n'
                f'{format_as_xml(ctx.state.user)}\n'
                f'Feedback: {self.email_feedback}'
            )
        else:
            user_xml = f"""
<examples>
    <name>John Doe</name>
    <email>john.joe@example.com</email>
</examples>
            """ # Note: The video shows example like this, but it should ideally use format_as_xml(ctx.state.user)
            prompt = (
                f'Write a welcome email for the user:\n'
                f'{format_as_xml(ctx.state.user)}' # Corrected to use actual user data
            )
        import asyncio
        await asyncio.sleep(2)
        result = await email_writer_agent.run(
            prompt,
            message_history=ctx.state.write_agent_messages,
        )
        print(f"WriteEmail result Received. Result: {result.output}")
        print(f"{'-' * 50}")
        ctx.state.write_agent_messages += result.all_messages()
        return Feedback(result.output)

from typing import Union

@dataclass
class Feedback(BaseNode[State, None, Email]): # State, Dependencies (None), Output (Email)
    email: Email

    async def run(self, ctx: GraphRunContext[State]) -> Union[WriteEmail, End]: # Changed return type annotation
        print(f"Feedback call fired. Email object received: {self.email}")
        print()

        prompt = format_as_xml({'user': ctx.state.user, 'email': self.email})
        import asyncio
        await asyncio.sleep(2)
        result = await feedback_agent.run(prompt)
        print(f"Feedback result received. Feedback result: {result.output}")

        if isinstance(result.output, EmailRequiresWrite):
            return WriteEmail(email_feedback=result.output.feedback)
        else: # EmailOK
            # Return an End node containing the email content
            return End(self.email.subject + '\n' + self.email.body)


feedback_graph = Graph(nodes=[WriteEmail, Feedback])

user = User(
    name='Jay',
    email='jay@example.com',
    interests=['AI Agent', 'Photography', 'Automation'],
)

state = State(user)
try:
  email = feedback_graph.run_sync(WriteEmail(), state=state)
except TypeError as e:
        print(f"An error occurred: {e}")
print(email)

--------------------------------------------------
WriteEmail call fired. Email feedback: None



  f'{format_as_xml(ctx.state.user)}' # Corrected to use actual user data


An error occurred: 'NoneType' object cannot be interpreted as an integer
GraphRunResult(output="Welcome to Our Tech Blog, Jay!\nHi Jay, \n\nWelcome to our tech blog! We are excited to have you join our community of tech enthusiasts. Your interests in AI Agent, Photography, and Automation resonate with the diverse topics we cover. \n\nPrepare to explore cutting-edge technologies, thoughtful articles, and interactive discussions. We look forward to sharing our latest updates and industry insights with you. \n\nIf you have any questions or ideas, please don't hesitate to reach out. \n\nStay tuned for captivating content, innovation, and tech news on our blog. Let's embark on an exciting journey through the world of technology together! \n\nWarm regards, \nThe Tech Blog Team", state=State(user=User(name='Jay', email='jay@example.com', interests=['AI Agent', 'Photography', 'Automation']), write_agent_messages=[ModelRequest(parts=[SystemPromptPart(content='Write a welcome email for people wh