In [None]:
from litellm import completion
from typing import List, Dict, Any, Optional
import jinja2
import json
from abc import ABC, abstractmethod
from tavily import TavilyClient
import os
from pydantic import BaseModel
from copy import deepcopy


# Objective: Create a conversation object that can be used to send messages to the model and get responses and tools
class Conversation:
    def __init__(self, model: str = "gpt-4o"):  # gemini/gemini-2.0-flash
        self.model = model
        self.messages = []

    def add_message(self, role, content):
        self.messages.append({"role": role, "content": content})

    def get_response(
        self,
        tool_choice: str = "auto",
        tools: Optional[List] = None,
        response_format: BaseModel = None,
    ):
        if tools is None:
            tool_choice = None

        return completion(
            model=self.model,
            messages=self.messages,
            tools=tools,
            tool_choice=tool_choice,
            response_format=response_format,
        )


# Updated abstract Tool class that enforces `name` and `terminating` and accepts extra kwargs.
class Tool(ABC):
    def __init__(self, name: str, terminating: bool = False):
        self.name = name
        self.terminating = terminating

    @abstractmethod
    def run(self, conversation: Conversation) -> str:
        pass

    @abstractmethod
    def get_config(self) -> Dict[str, Any]:
        pass


# Reasoning tool (non‐terminating)
class Reasoning(Tool):
    def __init__(self, terminating: bool = False):
        super().__init__(name="reasoning", terminating=terminating)

    def run(self, conversation: Conversation) -> str:
        with open("templates/reasoning.jinja2", "r") as file:
            reasoning_template = jinja2.Template(file.read()).render()
        conversation.add_message("system", reasoning_template)
        response = conversation.get_response(tool_choice=None)
        conversation.messages.pop()  # remove the reasoning message
        return response.choices[0].message.content

    def get_config(self) -> Dict[str, Any]:
        return {
            "type": "function",
            "function": {
                "name": self.name,
                "description": "This tool provides reasoning for the given context.",
            },
        }


# Respond tool (terminating) – note the extra parameter `response_type`
class Respond(Tool):
    def __init__(self, response_types: List[str], terminating: bool = True):
        # Mark this tool as terminating since its response is meant to be final.
        super().__init__(name="respond", terminating=terminating)
        self.response_types = response_types

    def run(self, conversation: Conversation, response_type: str = "") -> str:
        if not response_type:
            response_type = "respond"

        with open(f"templates/{response_type}.jinja2", "r") as file:
            template_content = jinja2.Template(file.read()).render()
        conversation.add_message("system", template_content)
        response = conversation.get_response(tool_choice=None)
        conversation.messages.pop()
        return response.choices[0].message.content

    def get_config(self) -> dict:
        return {
            "type": "function",
            "function": {
                "name": self.name,
                "description": "This tool provides responses for the given user message.",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "response_type": {
                            "type": "string",
                            "description": "The type of response to be generated.",
                            "enum": self.response_types,
                        },
                    },
                    "required": ["response_type"],
                },
            },
        }


class Section(BaseModel):
    title: str
    description: str


class Outline(BaseModel):
    title: str
    sections: List[Section]


class Report(Tool):
    def __init__(self, terminating: bool = True):
        super().__init__(name="report", terminating=terminating)

    def run(self, conversation: Conversation) -> str:
        outline = self.generate_outline(conversation)
        return self.write_report(conversation, outline)

    def write_report(self, conversation: Conversation, outline: Outline) -> str:

        report_conversation = deepcopy(conversation)

        report_so_far = f"# {outline.title}\n\n"

        for section in outline.sections:
            print(f"Writing section: {section.title}")

            with open("templates/write_report.jinja2", "r") as file:
                template = jinja2.Template(file.read()).render(
                    {
                        "outline": outline,
                        "current_section": section,
                        "report_so_far": report_so_far,
                    }
                )

            report_conversation.add_message("user", template)
            response = report_conversation.get_response(tool_choice=None)
            report_so_far += response.choices[0].message.content + "\n\n"

        return report_so_far

    def generate_outline(self, conversation: Conversation) -> Outline:
        with open("templates/report_outline.jinja2", "r") as file:
            template = jinja2.Template(file.read()).render()
        conversation.add_message("user", template)
        response = conversation.get_response(tool_choice=None, response_format=Outline)
        conversation.messages.pop()
        return Outline(**json.loads(response.choices[0].message.content))

    def get_config(self) -> dict:
        return {
            "type": "function",
            "function": {
                "name": self.name,
                "description": "Generate a comprehensive report based on the research.",
            },
        }


# Search tool (non‐terminating)
class Search(Tool):
    def __init__(self, terminating: bool = False):
        super().__init__(name="search", terminating=terminating)
        self.tavily_client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))

    def run(self, conversation: Conversation, query: str) -> str:
        response = self.tavily_client.search(query)
        return response

    def get_config(self) -> dict:
        return {
            "type": "function",
            "function": {
                "name": self.name,
                "description": "Execute a search query using Tavily Search",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "query": {
                            "type": "string",
                            "description": "The query to search for",
                        },
                    },
                    "required": ["query"],
                },
            },
        }


class Extract(Tool):
    def __init__(self, terminating: bool = False):
        super().__init__(name="extract", terminating=terminating)
        self.tavily_client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))

    def run(self, conversation: Conversation, urls: list[str]) -> str:
        response = self.tavily_client.extract(urls)
        return response

    def get_config(self) -> dict:
        return {
            "type": "function",
            "function": {
                "name": self.name,
                "description": "Extract web page content from one or more specified URLs using Tavily Extract",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "urls": {
                            "type": "string",
                            "description": "The urls to extract from",
                        },
                    },
                    "required": ["urls"],
                },
            },
        }


# An agent is a tool that can be used to interact with the model and get responses and use tools as required.
class Agent(Tool):
    def __init__(
        self,
        name: str,
        description: str,
        fallback_tool: Tool,
        max_turns: int = 3,
        terminating: bool = False,
        tools: List[Tool] = [],
    ):
        super().__init__(name=name, terminating=terminating)
        self.description = description
        self.max_turns = max_turns
        self.available_tools = {tool.name: tool for tool in tools}
        self.fallback_tool = fallback_tool

    def run(self, conversation: Conversation) -> str:
        for turn in range(self.max_turns):
            tool, function_args = self.get_next_action(conversation)
            print(f"Using the {tool.name} tool with arguments {function_args}")
            response = tool.run(conversation, **function_args)
            if tool.terminating:
                return response

            conversation.add_message(
                "user",
                f"The {tool.name} tool has been used with args {function_args} and it responded with {response}",
            )
        return self.fallback_tool.run(conversation)

    def get_next_action(self, conversation: Conversation) -> Tool:
        response = conversation.get_response(
            tool_choice="required",
            tools=[tool.get_config() for tool in self.available_tools.values()],
        )
        tool_calls = response.choices[0].message.tool_calls
        function_name = tool_calls[0].function.name
        function_args = json.loads(tool_calls[0].function.arguments)
        return self.available_tools[function_name], function_args

    def get_config(self) -> dict:
        return {
            "type": "function",
            "function": {
                "name": self.name,
                "description": self.description,
            },
        }

Testing

In [None]:
with open("templates/system_message.jinja2", "r") as file:
    system_message = jinja2.Template(file.read()).render(tools="")

conversation = Conversation()
conversation.add_message("system", system_message)

write_report = Report(terminating=True)
research_agent = Agent(
    name="research_agent",
    description="This agent is designed to help users with research.",
    tools=[Search(), Extract(), Reasoning(), write_report],
    max_turns=15,
    terminating=True,
    fallback_tool=write_report,
)

respond = Respond(response_types=["respond", "clarification"])

agent = Agent(
    name="main_agent",
    description="This agent is designed to help users with market research.",
    tools=[Reasoning(), respond, research_agent],
    fallback_tool=respond,
)

conversation.add_message("user", "I want to research retrieval augmented generation, with a focus on use of knowledge graphs and LLMs.")


response = agent.run(conversation)

print(response)

Using the reasoning tool with arguments {}
Using the respond tool with arguments {'response_type': 'clarification'}
From what you've mentioned, it seems your primary objectives are to:

1. **Understand Retrieval Augmented Generation (RAG):** Gain a comprehensive understanding of how RAG works, specifically how knowledge graphs and large language models (LLMs) are used within these systems.

2. **Explore the Role of Knowledge Graphs and LLMs:** Look into the specific functions and benefits of incorporating knowledge graphs and LLMs in RAG systems.

3. **Identify Applications and Industry Use Cases:** Discover current real-world applications, case studies, and emerging trends where RAG, knowledge graphs, and LLMs are being used today.

4. **Analyze the Competitive Landscape:** Learn about the key players and technological innovations in this space and understand how companies differentiate themselves.

5. **Investigate Market Opportunities and Trajectory:** Examine potential growth areas

In [5]:
conversation.add_message("assistant", response)

message = """Thank you for your detailed breakdown! You’ve captured the core aspects of my interests well.
To clarify:
I’m particularly interested in the technological innovations and practical implementations of RAG with knowledge graphs and LLMs.
My focus is primarily on technological insights and competitive landscape, though market opportunities are also relevant.
The scope is global, but insights on adoption trends in specific regions (e.g., US, EU, APAC) could be useful.
Regarding industries, I'm especially interested in legal tech, finance, and climate risk modeling, but I’m open to other high-impact use cases as well.
Looking forward to your insights!"""

conversation.add_message("user", message)


response = agent.run(conversation)

print(response)

Using the research_agent tool with arguments {}
Using the reasoning tool with arguments {}
Using the search tool with arguments {'query': 'Retrieval Augmented Generation (RAG) with knowledge graphs and LLMs technology insights'}
Using the extract tool with arguments {'urls': ['https://medium.com/@kudoysl/enhancing-retrieval-augmented-generation-with-knowledge-graphs-0350c823369d', 'https://www.mckinsey.com/featured-insights/mckinsey-explainers/what-is-retrieval-augmented-generation-rag', 'https://blog.adyog.com/2024/12/25/supercharging-retrieval-augmented-generation-rag-with-knowledge-graphs-a-deep-dive-into-graphrag/']}
Using the extract tool with arguments {'urls': 'https://dl.acm.org/doi/10.1007/978-981-97-5678-0_34'}
Using the extract tool with arguments {'urls': 'https://outshift.cisco.com/blog/combining-retrieval-augmented-generation-knowledge-graphs'}
Using the report tool with arguments {}
Writing section: Introduction
Writing section: Understanding Retrieval Augmented Generati

In [31]:
conversation.add_message("assistant", response)

conversation.add_message("user", "Global. No particular parties. No claims of interest, except ability to remove carbon from the atmosphere. Give me the recent developments. I want a high level overview and in-depth analysis. Please, now research.")


response = agent.run(conversation)

print(response)

Using the research_agent tool with arguments {}
Using the search tool with arguments {'query': 'global landscape of carbon capture technologies 2023'}
Using the reasoning tool with arguments {}
Using the search tool with arguments {'query': '5 key carbon capture technology trends for 2023 site:elsevier.com'}
Using the search tool with arguments {'query': '5 key carbon capture technology trends for 2023'}
Using the reasoning tool with arguments {}
Using the respond tool with arguments {'response_type': 'write_report'}
## Comprehensive Report on Carbon Capture Technologies (2023 Overview)

### Introduction

Carbon capture technologies have increasingly become a focal point in global efforts to mitigate climate change by reducing atmospheric CO2 levels. This report provides a comprehensive overview of the current landscape of carbon capture technologies, highlighting recent developments, emerging trends, technological innovations, and economic implications.

### Current Landscape

#### Ke

In [32]:
Search().run(None, "carbon capture technologies")

{'query': 'carbon capture technologies',
 'follow_up_questions': None,
 'answer': None,
 'images': [],
 'results': [{'title': 'Carbon capture, utilization, and storage (CCUS) technologies ...',
   'url': 'https://www.sciencedirect.com/science/article/pii/S2666845924002010',
   'content': 'Carbon Capture, Utilization, and Storage (CCUS) Technologies: Evaluating the Effectiveness of Advanced CCUS Solutions for Reducing CO2 Emissions - ScienceDirect Carbon Capture, Utilization, and Storage (CCUS) Technologies: Evaluating the Effectiveness of Advanced CCUS Solutions for Reducing CO2 Emissions This review provides a comprehensive examination of Carbon Capture, Utilization, and Storage (CCUS) technologies, focusing on their advancements, challenges, and future prospects. It begins with an overview of carbon capture methods, including pre-combustion, post-combustion, and oxy-fuel combustion techniques, highlighting recent technological improvements and associated challenges. The review then e