In [2]:
from langchain.schema import SystemMessage
import os
import requests
import textwrap

from typing import Type
from langchain.chat_models import ChatOpenAI
from langchain.tools import BaseTool
from pydantic import BaseModel, Field
from langchain.agents import initialize_agent, AgentType
from langchain.document_loaders import WebBaseLoader
from langchain.tools import DuckDuckGoSearchResults
from langchain.tools import WikipediaQueryRun
from langchain.utilities import WikipediaAPIWrapper
from bs4 import BeautifulSoup

llm = ChatOpenAI(temperature=0.1, model_name="gpt-4o-mini")

alpha_vantage_api_key = os.environ.get("ALPHA_VANTAGE_API_KEY")


class CustomWebBaseLoader(WebBaseLoader):
    def __init__(self, urls: list):
        self.urls = urls

    def scrape(self, url: str):
        response = requests.get(url)
        soup = BeautifulSoup(response.content, "html.parser")

        if soup.header:
            soup.header.decompose()
        if soup.footer:
            soup.footer.decompose()

        return soup.get_text()

    def load(self):
        all_documents = []
        for url in self.urls:
            scraped_content = self.scrape(url)
            all_documents.append(scraped_content)
        return all_documents

class SearchToolArgsSchema(BaseModel):
    query: str = Field(
        description="The query you will search for.Example query: Research about the XZ backdoor"
    )

class DuckDuckGoResultsArgsSchema(BaseModel):
    data: str = Field(
        description="The result of DuckDuckGoSearch."
    )

class DuckDuckGoUrlArgsSchema(BaseModel):
    urls: list = Field(
        description="List of urls you will scrape. Example urls: ['first url', 'second url', 'third url']"
    )



class DuckDuckGoSearchTool(BaseTool):
    name = "DuckDuckGoSearch"
    description = """
    Use this tool to search by using DuckDuckGo.
    It takes a query as an argument.
    """
    args_schema: Type[SearchToolArgsSchema] = SearchToolArgsSchema

    def _run(self, query):
        ddg = DuckDuckGoSearchResults()
        return ddg.run(query)
        
class DuckDuckGoSearchGetUrlTool(BaseTool):
    name = "DuckDuckGoSearchGetUrl"
    description = """
    Use this tool to get the url used when searching by DuckDuckGo.
    It takes a string as an argument.
    """
    args_schema: Type[DuckDuckGoResultsArgsSchema] = DuckDuckGoResultsArgsSchema

    def _run(self, data):
        urls = []
        results_list = data.split("], [")
        for result in results_list:
            url = result.replace("[", "").replace("]", "").split(": ")[-1]
            urls.append(url)
            
        return urls

class DuckDuckGoSearchScrapeTool(BaseTool):
    name = "DuckDuckGoSearchScrape"
    description = """
    Use this tool to scrape the given websites used when searching by DuckDuckGo.
    It takes a list of urls as an argument.
    """
    args_schema: Type[DuckDuckGoUrlArgsSchema] = DuckDuckGoUrlArgsSchema

    def _run(self, urls):
        loader = CustomWebBaseLoader(urls)
        docs = loader.load()
        for index, doc in enumerate(docs):
            comment = f"\t----- Source: {urls[index]} -----\n\n\n"
            doc = '\n'.join(textwrap.wrap(doc, 70, break_long_words=False))
            save_file(doc, comment, "source")
        return "Finished Scraping and Saved file\n\n"

class WikipediaSearchTool(BaseTool):
    name = "WikipediaSearch"
    description = """
    Use this tool to search by using Wikipedia.
    It takes a query as an argument.
    """
    args_schema: Type[SearchToolArgsSchema] = SearchToolArgsSchema

    def _run(self, query):
        wikipedia = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper())
        return wikipedia.run(query)
        


def save_file(doc, comment, file_name):
    file_path = f"./{file_name}.txt"
    if not os.path.exists(file_path):
        with open(file_path, 'w', encoding="utf-8") as f:
            f.write(comment)
            f.write(doc)
    else:
        with open(file_path, 'a', encoding="utf-8") as f:
            f.write(comment)
            f.write(doc)
            f.write("\n\n")

agent = initialize_agent(
    llm=llm,
    verbose=True,
    agent=AgentType.OPENAI_FUNCTIONS,
    handle_parsing_errors=True,
    tools=[
        WikipediaSearchTool(),
        DuckDuckGoSearchTool(),
        DuckDuckGoSearchScrapeTool(),
        DuckDuckGoSearchGetUrlTool(),
    ],
    agent_kwargs={
        "system_message": SystemMessage(
            content="""
            You are a professional researcher.
            
            You search about the keyword given by using DuckDuckGo or Wikipedia.

            If DuckDuckGo is used to search about the keyword, scrape the url used and save in a file.

            If DuckDuckGoSearchTool is used, use DuckDuckGoSearchGetUrlTool to get the urls and then use DuckDuckGoSearchScrapeTool to scrape.

            After searching, explain about the keyword using the results of research.
        """
        )
    },
)

prompt = "Research about the XZ backdoor"

result = agent.invoke(prompt)




[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m
Invoking: `DuckDuckGoSearch` with `{'query': 'XZ backdoor'}`


[0m[33;1m[1;3m[snippet: What Does the Backdoor Do? Malicious code added to xz Utils versions 5.6.0 and 5.6.1 modified the way the software functions. The backdoor manipulated sshd, the executable file used to make remote ..., title: The XZ Backdoor: Everything You Need to Know - WIRED, link: https://www.wired.com/story/xz-backdoor-everything-you-need-to-know/], [snippet: That operation matches the style of the XZ Utils backdoor far more than the cruder supply chain attacks of APT41 or Lazarus, by comparison. "It could very well be someone else," says Aitel., title: The Mystery of 'Jia Tan,' the XZ Backdoor Mastermind - WIRED, link: https://www.wired.com/story/jia-tan-xz-backdoor/], [snippet: A backdoor was planted in the XZ utils package, a popular open-source compression utility for Linux/UNIX systems, by a threat actor named JiaT75. The backdoor exploited th

In [3]:
comment = f"Result of: '{prompt}' \n\n"
save_file(result['output'], comment, "result")
