# Notebook Application for Weekly Research on Daily Portfolio Companies using Tavily and LangGraph Agents

## Requirements

In [None]:
!pip install langchain-core langgraph langchain_core python-dotenv tavily-python

## Libraries

In [2]:
import os
import json
import asyncio
import operator
from typing import TypedDict, List, Annotated, Literal, Dict, Union, Optional 
from datetime import datetime

from tavily import AsyncTavilyClient, TavilyClient

from langchain_core.tools import tool
from langchain_core.messages import AnyMessage, AIMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_openai import ChatOpenAI
from langchain_cohere.chat_models import ChatCohere
from langgraph.graph import StateGraph, START, END, add_messages

* 'allow_population_by_field_name' has been renamed to 'populate_by_name'
* 'smart_union' has been removed


## Set API KEYS

In [3]:
# Set Your API Keys
TAVILY_API_KEY="YOUR TAIVLY API KEY"
OPENAI_API_KEY="YOUR OPEN API KEY"

# Or use .env file (paste the above in .env file)
from dotenv import load_dotenv
load_dotenv('.env')

True

## Code for Generating PDF Report

In [14]:
import re
from fpdf import FPDF

class PDF(FPDF):
    def header(self):
        self.set_font("Arial", "B", 12)
        self.cell(0, 10, "", 0, 1, "C")

    def footer(self):
        self.set_y(-15)
        self.set_font("Arial", "I", 8)
        self.cell(0, 10, f"Page {self.page_no()}", 0, 0, "C")

def sanitize_content(content):
    try:
        # Use 'utf-8' encoding to handle Unicode characters
        encoded_content = content.encode('utf-8', 'ignore').decode('utf-8')
        return encoded_content
    except UnicodeEncodeError as e:
        print(f"Encoding error: {e}")

        # Remove problematic characters using 'ascii' encoding
        sanitized_content = content.encode('ascii', 'ignore').decode('ascii')
        return sanitized_content

def replace_problematic_characters(content):
    # Replace or remove problematic characters
    replacements = {
        '\u2013': '-',  # en dash to hyphen
        '\u2014': '--',  # en dash to double hyphen
        '\u2018': "'",  # left single quotation mark to apostrophe
        '\u2019': "'",  # right single quotation mark to apostrophe
        '\u201c': '"',  # left double quotation mark to double quote
        '\u201d': '"',  # right double quotation mark to double quote
        '\u2026': '...',  # horizontal ellipsis
        '\u2010': '-',   # dash
        '\u2022': '*',   # bullet
        '\u2122': 'TM'  # TradeMark Symbol
    }

    for char, replacement in replacements.items():
        content = content.replace(char, replacement)

    return content

def generate_pdf_from_md(content, filename='output.pdf'):
    try:
        pdf = PDF()
        pdf.add_page()
        pdf.set_auto_page_break(auto=True, margin=15)
        pdf.set_font('Arial', '', 12)

        sanitized_content = sanitize_content(content)
        sanitized_content = replace_problematic_characters(sanitized_content)

        lines = sanitized_content.split('\n')

        for line in lines:
            if line.startswith('#'):
                header_level = min(line.count('#'), 4)
                header_text = re.sub(r'\*{2,}', '', line.strip('# ').strip())
                pdf.set_font('Arial', 'B', 12 + (4 - header_level) * 2)
                pdf.multi_cell(0, 10, header_text)
                pdf.set_font('Arial', '', 12)
            else:
                parts = re.split(r'(\*\*\*.*?\*\*\*|\*\*.*?\*\*|\*.*?\*|\[.*?\]\(.*?\)|\([^ ]+?\))', line)
                for part in parts:
                    if re.match(r'\*\*\*.*?\*\*\*', part):  # Bold Italic
                        text = part.strip('*')
                        pdf.set_font('Arial', 'BI', 12)
                        pdf.write(10, text)
                    elif re.match(r'\*\*.*?\*\*', part):  # Bold
                        text = part.strip('*')
                        pdf.set_font('Arial', 'B', 12)
                        pdf.write(10, text)
                    elif re.match(r'\*.*?\*', part):  # Italic
                        text = part.strip('*')
                        pdf.set_font('Arial', 'I', 12)
                        pdf.write(10, text)
                    elif re.match(r'\[.*?\]\(.*?\)', part):  # Markdown-style link
                        display_text = re.search(r'\[(.*?)\]', part).group(1)
                        url = re.search(r'\((.*?)\)', part).group(1)
                        pdf.set_text_color(0, 0, 255)  # Set text color to blue
                        pdf.set_font('', 'U')
                        pdf.write(10, display_text, url)
                        pdf.set_text_color(0, 0, 0)  # Reset text color
                        pdf.set_font('Arial', '', 12)
                    # elif re.match(r'\([^ ]+?\)', part):  # Plain URL
                    #     url = part[1:-1]
                    #     pdf.set_text_color(0, 0, 255)  # Set text color to blue
                    #     pdf.set_font('', 'U')
                    #     pdf.write(10, url, url)
                    else:
                        pdf.write(10, part)
                    pdf.set_text_color(0, 0, 0)             # Reset text color
                    pdf.set_font('Arial', '', 12)   # Reset font

                pdf.ln(10)

        pdf.output(filename)
        return f"PDF generated: {filename}"

    except Exception as e:
        return f"Error generating PDF: {e}"

## Create an Agentic Workflow

This code defines a workflow for conducting research on a company, gathering information from various sources (using Tavily), and generating a detailed report. It starts by setting up a `ResearchState` data structure, which holds the company's name, documents retrieved during research, and messages exchanged during the process. The `Citation` and `QuotedAnswer` classes define how citations and answers should be structured when generating the report.

The `TavilyQuery` and `TavilySearchInput` classes allow for multi-query searches using the Tavily search tool, enabling more precise and efficient information gathering. The `tavily_search` function is an asynchronous tool that performs these searches in parallel, collecting and consolidating the results.

The workflow involves several key functions. `tool_node` handles the execution of search tools and saves the results for later processing. The `call_model` function invokes the research model to gather relevant information about the company, based on the most recent week’s developments. The `should_continue` function decides whether to continue using research tools or to proceed with writing the report. Finally, the `write_report` function generates a detailed report based on the retrieved documents.

The workflow is managed by a `StateGraph`, which orchestrates the sequence of operations, deciding which node (research, tools, or report writing) to execute next based on the current state. This setup enables a structured and systematic approach to researching a company and producing a well-informed report.

In [15]:
# Define the research state
class ResearchState(TypedDict):
    company: str
    report: str
    # Declare a dictionary where:
    # - The outer dictionary has string keys.
    # - The inner dictionary can have keys of different types (e.g., str, int).
    # - The inner dictionary values can be of different types (e.g., str, float).
    documents: Dict[str, Dict[Union[str, int], Union[str, float]]]
    messages: Annotated[list[AnyMessage], add_messages]

# Define the structure for the model's response, which includes citations.
class Citation(BaseModel):
    source_id: str = Field(
        ...,
        description="The url of a SPECIFIC source which justifies the answer.",
    )
    quote: str = Field(
        ...,
        description="The VERBATIM quote from the specified source that justifies the answer.",
    )


class QuotedAnswer(BaseModel):
    """Answer the user question based only on the given sources, and cite the sources used."""
    answer: str = Field(
        ...,
        description="The answer to the user question, which is based only on the given sources. Include any relevant sources in the answer as markdown hyperlinks. For example: 'This is a sample text ([url website](url))'"
    )
    citations: List[Citation] = Field(
        ..., description="Citations from the given sources that justify the answer."
    )
    
# Add Tavily's arguments to enhance the web search tool's capabilities
class TavilyQuery(BaseModel):
    query: str = Field(description="sub query")
    topic: str = Field(description="type of search, should be 'general' or 'news'")
    days: int = Field(description="number of days back to run 'news' search")
    raw_content: bool = Field(description="include raw content from found sources, use it ONLY if you need more information besides the summary content provided")
    domains: Optional[List[str]] = Field(default=None, description="list of domains to include in the research. Useful when trying to gather more detailed information.")
 

# Define the args_schema for the tavily_search tool using a multi-query approach, enabling more precise queries for Tavily.
class TavilySearchInput(BaseModel):
    sub_queries: List[TavilyQuery] = Field(description="set of sub-queries that can be answered in isolation")


@tool("tavily_search", args_schema=TavilySearchInput, return_direct=True)
async def tavily_search(sub_queries: List[TavilyQuery]):
    """Perform searches for each sub-query using the Tavily search tool concurrently."""  
    # Define a coroutine function to perform a single search with error handling
    async def perform_search(itm):
        try:
            # Add date to the query as we need the most recent results
            query_with_date = f"{itm.query} {datetime.now().strftime('%m-%Y')}"
            # Attempt to perform the search, hardcoding days to 7 (days will be used only when topic is news)
            response = await tavily_client.search(query=query_with_date, topic=itm.topic, days=itm.days, include_raw_content=itm.raw_content, max_results=10)
            return response['results']
        except Exception as e:
            # Handle any exceptions, log them, and return an empty list
            print(f"Error occurred during search for query '{itm.query}': {str(e)}")
            return []
    
    # Run all the search tasks in parallel
    search_tasks = [perform_search(itm) for itm in sub_queries]
    search_responses = await asyncio.gather(*search_tasks)
    
    # Combine the results from all the responses
    search_results = []
    for response in search_responses:
        search_results.extend(response)
    
    return search_results


tools = [tavily_search]
tools_by_name = {tool.name: tool for tool in tools}
tavily_client = AsyncTavilyClient()
model = ChatOpenAI(model="gpt-4o-mini",temperature=0).bind_tools(tools)

# Define an async custom tool node to store Tavily's search results for improved processing and filtering.
async def tool_node(state: ResearchState):
    docs = state['documents'] or {}
    docs_str = ""
    msgs = []
    for tool_call in state["messages"][-1].tool_calls:
        tool = tools_by_name[tool_call["name"]]
        new_docs = await tool.ainvoke(tool_call["args"])
        for doc in new_docs:
            # Make sure that this document was not retrieved before
            if not docs or doc['url'] not in docs:
                docs[doc['url']] = doc
                docs_str += json.dumps(doc)
        msgs.append(ToolMessage(content=f"Found the following new documents: {docs_str}", tool_call_id=tool_call["id"]))
    return {"messages": msgs, "documents": docs}
    
# Invoke the model with research tools to gather information about the company.     
def call_model(state: ResearchState):
    prompt = f"""Today's date is {datetime.now().strftime('%d/%m/%Y')}.
    You are an expert researcher tasked with preparing a weekly report on recent developments in portfolio companies.
    Your current objective is to gather detailed information about any significant events that occurred in the past week for the following company: {state['company']}.\n
    """
    messages = state['messages'] + [SystemMessage(content=prompt)]
    # print("state['messages']:",state['messages'])
    response = model.invoke(messages)
    # We return a list, because this will get added to the existing list
    return {"messages": [response]}
    

# Define the function that decides whether to continue research using tools or proceed to writing the report
def should_continue(state: ResearchState) -> Literal["tools", "write_report"]:
    messages = state['messages']
    last_message = messages[-1]
    # If the LLM makes a tool call, then we route to the "tools" node
    if last_message.tool_calls:
        return "tools"
    # Otherwise, we stop (reply to the user with citations)
    return "write_report"

# Define the function to write the report based on the retrieved documents.
def write_report(state: ResearchState):
    # Create the prompt
    prompt = f"""Today's date is {datetime.now().strftime('%d/%m/%Y')}\n.
    You are an expert researcher, writing a weekly report about recent events in portfolio companies.\n
    Your task is to write an in-depth, well-written, and detailed report on the following company: {state['company']}.\n
    Here are all the documents you gathered so far:\n{state['documents']}\n
    Use only the relevant and most recent documents.""" 
    messages = [state['messages'][-1]] + [SystemMessage(content=prompt)]
    response = model.with_structured_output(QuotedAnswer).invoke(messages)
    # We return a list, because this will get added to the existing list
    return {"messages": [AIMessage(content=f"Generated Report:\n{response.answer}")], "report": response.answer}

def generete_pdf(state: ResearchState):
    directory = "reports"
    file_name = f"{state['company']} Weekly Report {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
    # Check if the directory exists
    if not os.path.exists(directory):
        # Create the directory
        os.makedirs(directory)
    msg = generate_pdf_from_md(state['report'], filename=f'{directory}/{file_name}.pdf')
    return {"messages": [AIMessage(content=msg)]}

# Define a graph
workflow = StateGraph(ResearchState)

# Add nodes
workflow.add_node("research", call_model)
workflow.add_node("tools", tool_node)
workflow.add_node("write_report", write_report)
workflow.add_node("generate_pdf", generete_pdf)
# Set the entrypoint as route_query
workflow.set_entry_point("research")

# Determine which node is called next
workflow.add_conditional_edges(
    "research",
    # Next, we pass in the function that will determine which node is called next.
    should_continue,
)

# Add a normal edge from `tools` to `route_query`.
# This means that after `tools` is called, `route_query` node is called next.
workflow.add_edge("tools", "research")
workflow.add_edge("write_report", "generate_pdf")  # Option in the future, to add another step and filter the documents retrieved using rerhank before writing the report
workflow.add_edge("generate_pdf", END)  # Option in the future, to add another step and filter the documents retrieved using rerhank before writing the report

app = workflow.compile()

# Run Research

In [16]:
# You may update the content of the human message with some guidlies of your own
company = "Tavily"
your_additional_guidelines=f"I would like a comprehensive and detailed report on the latest developments concerning the company {company}."
messages = [
    HumanMessage(content=your_additional_guidelines)
]
async for s in app.astream({"company": company, "messages":messages}, stream_mode="values"):
    message = s["messages"][-1]
    if isinstance(message, tuple):
        print(message)
    else:
        message.pretty_print()


I would like a comprehensive and detailed report on the latest developments concerning the company Tavily.
Tool Calls:
  tavily_search (call_uMym0dxM1WjDSP6DPcvL8V6c)
 Call ID: call_uMym0dxM1WjDSP6DPcvL8V6c
  Args:
    sub_queries: [{'query': 'Tavily latest news', 'topic': 'news', 'days': 7, 'raw_content': False}, {'query': 'Tavily company updates', 'topic': 'general', 'days': 7, 'raw_content': False}, {'query': 'Tavily press releases', 'topic': 'general', 'days': 7, 'raw_content': False}]

Found the following new documents: {"url": "https://titanswire.usatoday.com/2024/09/02/tennessee-titans-offseason-spending-spree-2024-nfl-season/", "title": "On paper, Titans' offseason 'spending spree' pays off - Titans Wire", "score": 0.89805347, "published_date": "Tue, 03 Sep 2024 02:28:00 GMT", "content": "Tennessee Titans: On paper, offseason \u2018spending spree\u2019 pays off https://titanswire.usatoday.com/2024/09/02/tennessee-titans-offseason-spending-spree-2024-nfl-season/ On paper, Titan