In [47]:
from typing import Annotated, List, Optional, Tuple
from pydantic import BaseModel, Field
from typing_extensions import TypedDict
import os
import requests
import json
import re
import dotenv
import asyncio
from tavily import TavilyClient
from langchain_community.document_loaders import WebBaseLoader
from pydantic_ai import Agent, RunContext, ModelRetry
from dataclasses import dataclass
from markdown_pdf import MarkdownPdf, Section
from pydantic_ai.models import openai

dotenv.load_dotenv()



True

In [61]:
import logfire

logfire.configure(token=os.getenv("LOGFIRE_TOKEN"))


<logfire._internal.main.Logfire at 0x1748b61f850>

[1mLogfire[0m project URL: [4;36mhttps://logfire.pydantic.dev/kumarvipu1/personal-proj[0m


In [62]:
model = openai.OpenAIModel('gpt-4o', api_key=os.getenv("OPENAI_API_KEY"))


In [63]:
class BlogOutput(BaseModel):
    title: str = Field(description="The title of the blog post.")
    content: str = Field(description="The content of the blog post.")
    sources: List[str] = Field(description="The sources used to write the blog post.")
    
@dataclass
class BlogInput:
    subject: str = Field(description="The subject of the blog post.")
    length: int = Field(description="The length of the blog post in words.")
    websites: List[str] = Field(description="The websites to scrape for more information and data for writing the blog.")
    image_source: List[str] = Field(description="The source of the image to use for the blog post.")
    

def get_source_url(query: Annotated[str, "The query to search for"]) -> str:
    """Use this tool to get source urls for the query. Later you can use the web_scraper tool to get the content of the urls."""
    client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))
    results = client.search(query=query, max_results=3, search_depth="advanced", 
                            include_images=True, include_image_description=True)
    print(results)
    scores = [result['score'] for result in results['results']]
    urls = [result['url'] for result in results['results']]
    images = results['images'][:len(urls)]
    return urls, images


In [64]:
agent = Agent(model=model,
              result_type=BlogOutput,
              deps_type=BlogInput)


In [65]:
@agent.system_prompt
async def get_system_prompt(ctx: RunContext[BlogInput]) -> str:
    prompt = f"""
    You are a helpful assistant taksed with researching and writing a blog post about {ctx.deps.subject}.
    The blog post should be {ctx.deps.length} words long.
    The blogpost can be in various formats like a news, research article, a tutorial, a long form article, short story, etc.
    Structure the blogpost in relevant sections and sub-sections depending on the format based on the user pompt.
    Think step by step.
     
     You have the following sources of websites to use for the blog post:
     {ctx.deps.websites}
     
     You have the following source of image to use for the blog post:
     {ctx.deps.image_source}
     
     You have the following tools at your disposal:
     - web_scraper: Use this tool to get the content of the urls.
     - download_image: Use this tool to download the image from the given source. Pass a list of names of the image as the argumenent same as the number of image urls.
     - write_markdown_to_file: Use this tool to write the markdown to a file.
     
     Give the final output in the markdown format and place the images in the markdown wherever necessary.
    """
    return prompt

@agent.tool
async def web_scraper(ctx: RunContext[BlogInput]) -> str:
    """Tool to scrape the content of the websites."""
    
    words_per_url = ctx.deps.length // len(ctx.deps.websites)  # Distribute words evenly across URLs
    text_data = ""
    
    for url in ctx.deps.websites:
        loader = WebBaseLoader(url)
        data = loader.load()
        
        for doc in data:
            # Remove HTML/XML tags first
            content = re.sub(r'<[^>]+>', '', doc.page_content)
            
            # Split into paragraphs
            paragraphs = content.split('\n')
            clean_paragraphs = []
            
            for p in paragraphs:
                # Remove special characters and normalize spaces
                cleaned = re.sub(r'[^\w\s]', '', p)  # Keep only alphanumeric and spaces
                cleaned = re.sub(r'\s+', ' ', cleaned).strip()  # Normalize to single spaces
                cleaned = re.sub(r'[^a-zA-Z0-9\s]', '', cleaned)  # Remove non-English characters
                
                # Only keep paragraphs relevant to the query
                if len(cleaned.split()) > 10 and cleaned:
                    clean_paragraphs.append(cleaned)
            
            filtered_content = ' '.join(clean_paragraphs)  # Join all paragraphs into single text
            final_content = ' '.join(filtered_content.split()[:words_per_url])  # Take exact number of words needed
                
            title = doc.metadata.get("title", "")
            text_data += f'{title}\n{final_content}\n\n'
    
    return text_data

@agent.tool
async def download_image(ctx: RunContext[BlogInput], image_names: Annotated[List[str], "The names of the images to download"]) -> str:
    """Tool to download the image from the given source.
    Parameters:
    - image_names: The names of the images to download.
    """
    img_string = ""
    for i, (image_url, image_name) in enumerate(zip(ctx.deps.image_source, image_names)):
        try:
            response = requests.get(image_url)
            response.raise_for_status()  # Raise an exception for bad status codes
            
            if not image_name.endswith('.jpg', '.png', '.jpeg', '.webp'):
                image_name += '.jpg'
            
            with open(image_name, 'wb') as f:
                f.write(response.content)
            
            img_string += f"Image {i+1} successfully downloaded and saved as {image_name}\n"
        except requests.exceptions.RequestException as e:
            img_string += f"Error downloading image {i+1}: {str(e)}\n"
    
    return img_string

@agent.tool
async def write_markdown_to_file(ctx: RunContext[None], content: Annotated[str, "The markdown content to write"], 
                           filename: Annotated[str, "The name of the file (with or without .md extension)"] = "blog.md") -> None:
    """
    Write markdown content to a file with .md extension.
    Parameters:
    - content: The markdown content to write.
    - filename: The name of the file (with or without .md extension).
    """
    # Ensure filename has .md extension
    if not filename.endswith('.md'):
        filename += '.md'
    
    with open(filename, 'w', encoding='utf-8') as f:
        f.write(content)
        
    pdf = MarkdownPdf()
    pdf.add_section(Section(content, toc=False))
    pdf.save(filename.replace('.md', '.pdf'))
        
    return f"File {filename} has been created successfully. \n the content is:\n {content}"

        
    


In [66]:
async def run_agent(user_input: str):
    urls, images = get_source_url(user_input)
    deps = BlogInput(subject=user_input, length=4000, websites=urls, image_source=images)
    result = await agent.run_sync(user_input, deps=deps)
    print(result.data)


In [67]:
import nest_asyncio
nest_asyncio.apply()



In [None]:
output = await run_agent("What is the latest news on the USA and Canada situation?")
print(output)



