## Get envirionment variables

In [1358]:
import os
from dotenv import find_dotenv, dotenv_values

keys = list(dotenv_values(find_dotenv('.env')).items())
OPENAI_API_KEY = os.environ['OPENAI_API_KEY'] = keys[0][1]
LANGCHAIN_API_KEY = os.environ['LANGCHAIN_API_KEY'] = keys[1][1]
POLYGON_API_KEY = os.environ['POLYGON_API_KEY'] = keys[2][1]
EMAIL = os.environ['EMAIL'] = keys[3][1] #make this a user entry

## Install Required Libraries

In [1359]:
!pip install langchain_core langchain_openai langchain_community langsmith openai tiktoken cohere lxml polygon-api-client weasyprint html5lib pydyf CFFI tinycss2 cssselect2 Pyphen Pillow fontTools -qU


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.1.2[0m[39;49m -> [0m[32;49m24.1.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


## Create Tools

The following tools were ripped directly from the langchain source code to remove the requirement for current data from the API.

In [1360]:
"""
Util that calls several of Polygon's stock market REST APIs.
Docs: https://polygon.io/docs/stocks/getting-started
"""

import json
from typing import Any, Dict, Optional

import requests
from langchain_core.pydantic_v1 import BaseModel, root_validator
from langchain_core.utils import get_from_dict_or_env

POLYGON_BASE_URL = "https://api.polygon.io/"

class PolygonAPIWrapper(BaseModel):
    """Wrapper for Polygon API."""

    polygon_api_key: Optional[str] = None

    @root_validator(pre=True)
    def validate_environment(cls, values: Dict) -> Dict:
        """Validate that api key in environment."""
        polygon_api_key = get_from_dict_or_env(
            values, "polygon_api_key", "POLYGON_API_KEY"
        )
        values["polygon_api_key"] = polygon_api_key

        return values

    def get_financials(self, ticker: str) -> Optional[dict]:
        """
        Get fundamental financial data, which is found in balance sheets,
        income statements, and cash flow statements for a given ticker.

        /vX/reference/financials
        """
        url = (
            f"{POLYGON_BASE_URL}vX/reference/financials?"
            f"ticker={ticker}&"
            f"apiKey={self.polygon_api_key}"
        )
        response = requests.get(url)
        data = response.json()

        status = data.get("status", None)
        if status != "OK":
            raise ValueError(f"API Error: {data}")

        return data.get("results", None)

    def get_last_quote(self, ticker: str) -> Optional[dict]:
        """
        Get the most recent National Best Bid and Offer (Quote) for a ticker.

        /v2/last/nbbo/{ticker}
        """
        url = f"{POLYGON_BASE_URL}v2/last/nbbo/{ticker}?apiKey={self.polygon_api_key}"
        response = requests.get(url)
        data = response.json()

        status = data.get("status", None)
        if status != "OK":
            raise ValueError(f"API Error: {data}")

        return data.get("results", None)

    def get_ticker_news(self, ticker: str) -> Optional[dict]:
        """
        Get the most recent news articles relating to a stock ticker symbol,
        including a summary of the article and a link to the original source.

        /v2/reference/news
        """
        url = (
            f"{POLYGON_BASE_URL}v2/reference/news?"
            f"ticker={ticker}&"
            f"apiKey={self.polygon_api_key}"
        )
        response = requests.get(url)
        data = response.json()

        status = data.get("status", None)
        if status != "OK":
            raise ValueError(f"API Error: {data}")

        return data.get("results", None)

    def get_aggregates(self, ticker: str, **kwargs: Any) -> Optional[dict]:
        """
        Get aggregate bars for a stock over a given date range
        in custom time window sizes.

        /v2/aggs/ticker/{ticker}/range/{multiplier}/{timespan}/{from_date}/{to_date}
        """
        timespan = kwargs.get("timespan", "day")
        multiplier = kwargs.get("timespan_multiplier", 1)
        from_date = kwargs.get("from_date", None)
        to_date = kwargs.get("to_date", None)
        adjusted = kwargs.get("adjusted", True)
        sort = kwargs.get("sort", "asc")

        url = (
            f"{POLYGON_BASE_URL}v2/aggs"
            f"/ticker/{ticker}"
            f"/range/{multiplier}"
            f"/{timespan}"
            f"/{from_date}"
            f"/{to_date}"
            f"?apiKey={self.polygon_api_key}"
            f"&adjusted={adjusted}"
            f"&sort={sort}"
        )
        response = requests.get(url)
        data = response.json()

        status = data.get("status", None)
        if status != "DELAYED":
            raise ValueError(f"API Error: {data}")

        return data.get("results", None)

    def run(self, mode: str, ticker: str, **kwargs: Any) -> str:
        if mode == "get_financials":
            return json.dumps(self.get_financials(ticker))
        elif mode == "get_last_quote":
            return json.dumps(self.get_last_quote(ticker))
        elif mode == "get_ticker_news":
            return json.dumps(self.get_ticker_news(ticker))
        elif mode == "get_aggregates":
            return json.dumps(self.get_aggregates(ticker, **kwargs))
        else:
            raise ValueError(f"Invalid mode {mode} for Polygon API.")


In [1361]:
from typing import Optional, Type

from langchain_core.callbacks import CallbackManagerForToolRun
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.tools import BaseTool

class PolygonAggregatesSchema(BaseModel):
    """Input for PolygonAggregates."""

    ticker: str = Field(
        description="The ticker symbol to fetch aggregates for.",
    )
    timespan: str = Field(
        description="The size of the time window. "
        "Possible values are: "
        "second, minute, hour, day, week, month, quarter, year. "
        "Default is 'day'",
    )
    timespan_multiplier: int = Field(
        description="The number of timespans to aggregate. "
        "For example, if timespan is 'day' and "
        "timespan_multiplier is 1, the result will be daily bars. "
        "If timespan is 'day' and timespan_multiplier is 5, "
        "the result will be weekly bars.  "
        "Default is 1.",
    )
    from_date: str = Field(
        description="The start of the aggregate time window. "
        "Either a date with the format YYYY-MM-DD or "
        "a millisecond timestamp.",
    )
    to_date: str = Field(
        description="The end of the aggregate time window. "
        "Either a date with the format YYYY-MM-DD or "
        "a millisecond timestamp.",
    )


class PolygonAggregates(BaseTool):
    """
    Tool that gets aggregate bars (stock prices) over a
    given date range for a given ticker from Polygon.
    """

    mode: str = "get_aggregates"
    name: str = "polygon_aggregates"
    description: str = (
        "A wrapper around Polygon's Aggregates API. "
        "This tool is useful for fetching aggregate bars (stock prices) for a ticker. "
        "Input should be the ticker, date range, timespan, and timespan multiplier"
        " that you want to get the aggregate bars for."
    )
    args_schema: Type[PolygonAggregatesSchema] = PolygonAggregatesSchema

    api_wrapper: PolygonAPIWrapper

    def _run(
        self,
        ticker: str,
        timespan: str,
        timespan_multiplier: int,
        from_date: str,
        to_date: str,
        run_manager: Optional[CallbackManagerForToolRun] = None,
    ) -> str:
        """Use the Polygon API tool."""
        return self.api_wrapper.run(
            mode=self.mode,
            ticker=ticker,
            timespan=timespan,
            timespan_multiplier=timespan_multiplier,
            from_date=from_date,
            to_date=to_date,
        )


In [1362]:
from typing import Optional, Type

from langchain_core.callbacks import CallbackManagerForToolRun
from langchain_core.pydantic_v1 import BaseModel
from langchain_core.tools import BaseTool

class Inputs(BaseModel):
    """Inputs for Polygon's Financials API"""

    query: str


class PolygonFinancials(BaseTool):
    """Tool that gets the financials of a ticker from Polygon"""

    mode: str = "get_financials"
    name: str = "polygon_financials"
    description: str = (
        "A wrapper around Polygon's Stock Financials API. "
        "This tool is useful for fetching fundamental financials from "
        "balance sheets, income statements, and cash flow statements "
        "for a stock ticker. The input should be the ticker that you want "
        "to get the latest fundamental financial data for."
    )
    args_schema: Type[BaseModel] = Inputs

    api_wrapper: PolygonAPIWrapper

    def _run(
        self,
        query: str,
        run_manager: Optional[CallbackManagerForToolRun] = None,
    ) -> str:
        """Use the Polygon API tool."""
        return self.api_wrapper.run(self.mode, ticker=query)


In [1363]:
from typing import Optional, Type

from langchain_core.callbacks import CallbackManagerForToolRun
from langchain_core.pydantic_v1 import BaseModel
from langchain_core.tools import BaseTool

class Inputs(BaseModel):
    """Inputs for Polygon's Ticker News API"""

    query: str


class PolygonTickerNews(BaseTool):
    """Tool that gets the latest news for a given ticker from Polygon"""

    mode: str = "get_ticker_news"
    name: str = "polygon_ticker_news"
    description: str = (
        "A wrapper around Polygon's Ticker News API. "
        "This tool is useful for fetching the latest news for a stock. "
        "Input should be the ticker that you want to get the latest news for."
    )
    args_schema: Type[BaseModel] = Inputs

    api_wrapper: PolygonAPIWrapper

    def _run(
        self,
        query: str,
        run_manager: Optional[CallbackManagerForToolRun] = None,
    ) -> str:
        """Use the Polygon API tool."""
        return self.api_wrapper.run(self.mode, ticker=query)


## Set up vectorstore

In [1364]:
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
from langchain_community.document_loaders import PyPDFLoader

"""

embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

if os.path.exists("./data/vectorstore"):
    vectorstore = FAISS.load_local(
        "./data/vectorstore", 
        embeddings, 
        allow_dangerous_deserialization=True # this is necessary to load the vectorstore from disk as it's stored as a `.pkl` file.
    )
    retriever = vectorstore.as_retriever()
    print("Loaded Vectorstore")
else:
    print("Indexing Files")
    os.makedirs("./data/vectorstore", exist_ok=True)
    for i in range(0, len(split_documents), 32):
        if i == 0:
            vectorstore = FAISS.from_documents(split_documents[i:i+32], embeddings)
            continue
        vectorstore.add_documents(split_documents[i:i+32])
    vectorstore.save_local("./data/vectorstore")

"""

'\n\nembeddings = OpenAIEmbeddings(model="text-embedding-3-small")\n\nif os.path.exists("./data/vectorstore"):\n    vectorstore = FAISS.load_local(\n        "./data/vectorstore", \n        embeddings, \n        allow_dangerous_deserialization=True # this is necessary to load the vectorstore from disk as it\'s stored as a `.pkl` file.\n    )\n    retriever = vectorstore.as_retriever()\n    print("Loaded Vectorstore")\nelse:\n    print("Indexing Files")\n    os.makedirs("./data/vectorstore", exist_ok=True)\n    for i in range(0, len(split_documents), 32):\n        if i == 0:\n            vectorstore = FAISS.from_documents(split_documents[i:i+32], embeddings)\n            continue\n        vectorstore.add_documents(split_documents[i:i+32])\n    vectorstore.save_local("./data/vectorstore")\n\n'

In [1365]:
from langchain.tools import tool
import datetime
import requests
from weasyprint import HTML

@tool
def get_datetime() -> str:
    """Get the current date and time in YYYY-MM-DD HH:MM:SS format."""
    return str(datetime.datetime.now())

@tool
def get_date() -> str:
    """Get the current date in YYYY-MM-DD format. Also useful when determining the current quarter."""
    return str(datetime.datetime.now()).split(" ")[0]

@tool
def get_time() -> str:
    """Get the current time in HH:MM:SS format."""
    return str(datetime.datetime.now()).split(" ")[1]

@tool
def get_quarter(date:str) -> str:
    """This tool takes a date in YYYY-MM-DD format as an argument and returns the quarter and year in the format 'QQ YYYY'."""
    quarters = {
        "01" : "Q1",
        "02" : "Q1",
        "03" : "Q1",
        "04" : "Q2",
        "05" : "Q2",
        "06" : "Q2",
        "07" : "Q3",
        "08" : "Q3",
        "09" : "Q3",
        "10" : "Q4",
        "11" : "Q4",
        "12" : "Q4",
    }
    return quarters[date.split("-")[1]] + f" {date.split('-')[0]}"

@tool
def get_CIK(ticker) -> str:
    """This tool takes a company stock ticker as an argument and returns the CIK number. This is used when trying to query the EDGAR database for financial statements."""
    if ticker is not None:
        result = CIK_df[CIK_df["ticker"] == ticker]
        cik = result["cik"]
        return str(cik.item()).zfill(10)

@tool
def get_financial_report(cik, report_type, date=None):
    """This tool takes a company CIK number from the get_CIK tool, financial report type, and an optional date. 
    This information is used to retrieve requested document from the EDGAR database and save it to a path that can be returned to the user. 
    If a date is provided, it must be in YYYY-MM-DD format."""

    url = f"https://data.sec.gov/submissions/CIK{cik}.json"
    header = {
        "User-Agent" : EMAIL
    }
    company_filings = requests.get(url, headers=header).json()
    company_filings_df = pd.DataFrame(company_filings["filings"]["recent"])
    company_filings_df = company_filings_df[company_filings_df.form == report_type]
    
    if date:
        filing_dates = company_filings_df["filingDate"].values
        nearest_filing_date = get_nearest_filing_dates(date, filing_dates)
        company_filings_df = company_filings_df[company_filings_df.filingDate == nearest_filing_date]
        access_number = company_filings_df.accessionNumber.values[0].replace("-", "")
        file_name = company_filings_df.primaryDocument.values[0]
        url = f"https://www.sec.gov/Archives/edgar/data/{cik}/{access_number}/{file_name}"
        # dowloading and saving requested document to working directory
        req_content = requests.get(url, headers=header).content.decode("utf-8")
        pdf_path = f'./data/{file_name}'+".pdf"
        HTML(string=req_content, base_url="").write_pdf(pdf_path)
        return pdf_path

    else:
        access_number = company_filings_df.accessionNumber.values[0].replace("-", "")
        file_name = company_filings_df.primaryDocument.values[0]
        url = f"https://www.sec.gov/Archives/edgar/data/{cik}/{access_number}/{file_name}"
        # dowloading and saving requested document to working directory
        req_content = requests.get(url, headers=header).content.decode("utf-8")
        pdf_path = f'./data/{file_name}'+".pdf"
        HTML(string=req_content, base_url="").write_pdf(pdf_path)
        return pdf_path

def get_nearest_filing_dates(date, filing_dates):
    """This tool takes a date in YYYY-MM-DD format as arguments. It should be called 
    The tool is used to find the filing date closest to the user requested filing date. The user should be told what filing 
    data is the closest match to their query."""

    sums = []
    for filing_date in filing_dates:
        filing_date_year, filing_date_month, filing_date_day = filing_date.split("-")[0], filing_date.split("-")[1], filing_date.split("-")[2]
        date_year, date_month, date_day = date.split("-")[0], date.split("-")[1], date.split("-")[2]
        #Weighted sum of the difference of each component part. The year should take priority over the month and the month should take priority over the day.
        sum = abs(int(date_year)-int(filing_date_year))*0.5 + abs(int(date_month)-int(filing_date_month))*0.3 + abs(int(date_day)-int(filing_date_day))*0.2
        sums.append(sum)
    m = min(sums)
    idx = sums.index(m)
    return filing_dates[idx]

"""
@tool
def vectorize_data(url):
    This tool is used to embed html data into the locally cached vector database for Retrieval Augmented Generation. Any time a financial report is retrieved from the EDGAR database, the url should be passed to this function to ensure that the data is embedded and cached. """

'\n@tool\ndef vectorize_data(url):\n    This tool is used to embed html data into the locally cached vector database for Retrieval Augmented Generation. Any time a financial report is retrieved from the EDGAR database, the url should be passed to this function to ensure that the data is embedded and cached. '

## Get JSON CIK data

In [1366]:
import json
import pandas as pd

#Reference found here: https://www.kaggle.com/code/svendaj/extracting-data-from-sec-edgar-restful-apis

with open("./data/company_tickers_exchange.json", "r") as f:
    CIK_dict = json.load(f)

CIK_df = pd.DataFrame(CIK_dict["data"], columns=CIK_dict["fields"])

## Set up tool belt

In [1367]:
from langchain_community.tools.ddg_search import DuckDuckGoSearchRun

api_wrapper = PolygonAPIWrapper(polygon_api_key=POLYGON_API_KEY)

tool_belt = [
    get_datetime,
    get_date,
    get_time,
    get_quarter,
    get_CIK,
    get_financial_report,
    DuckDuckGoSearchRun(),
    PolygonAggregates(api_wrapper=api_wrapper),
    PolygonFinancials(api_wrapper=api_wrapper),
    PolygonTickerNews(api_wrapper=api_wrapper),
]

## Set up tool executor

In [1368]:
from langgraph.prebuilt import ToolExecutor

tool_executor = ToolExecutor(tool_belt)

## Set up model

In [1369]:
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4o", temperature=0)

## Set up function calling

In [1370]:
from langchain_core.utils.function_calling import convert_to_openai_function

functions = [convert_to_openai_function(t) for t in tool_belt]
model = model.bind_functions(functions)

## Set up agent state

In [1371]:
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages
import operator
from langchain_core.messages import BaseMessage

class AgentState(TypedDict):
  messages: Annotated[list, add_messages]

## Create nodes

In [1372]:
from langgraph.prebuilt import ToolInvocation
import json
from langchain_core.messages import FunctionMessage

def call_model(state):
  messages = state["messages"]
  response = model.invoke(messages)
  return {"messages" : [response]}

def call_tool(state):
  last_message = state["messages"][-1]

  action = ToolInvocation(
      tool=last_message.additional_kwargs["function_call"]["name"],
      tool_input=json.loads(
          last_message.additional_kwargs["function_call"]["arguments"]
      )
  )

  response = tool_executor.invoke(action)

  function_message = FunctionMessage(content=str(response), name=action.tool)

  return {"messages" : [function_message]}

In [1373]:
from langgraph.graph import StateGraph, END

workflow = StateGraph(AgentState)

workflow.add_node("agent", call_model)
workflow.add_node("action", call_tool)

In [1374]:
workflow.set_entry_point("agent")

In [1375]:
def should_continue(state):
  last_message = state["messages"][-1]

  if "function_call" not in last_message.additional_kwargs:
    return "end"

  return "continue"

workflow.add_conditional_edges(
    "agent",
    should_continue,
    {
        "continue" : "action",
        "end" : END
    }
)

In [1376]:
workflow.add_edge("action", "agent")

In [1377]:
app = workflow.compile()

In [1378]:
def print_messages(messages):
  next_is_tool = False
  initial_query = True
  for message in messages["messages"]:
    if "function_call" in message.additional_kwargs:
      print()
      print(f'Tool Call - Name: {message.additional_kwargs["function_call"]["name"]} + Query: {message.additional_kwargs["function_call"]["arguments"]}')
      next_is_tool = True
      continue
    if next_is_tool:
      print(f"Tool Response: {message.content}")
      next_is_tool = False
      continue
    if initial_query:
      print(f"Initial Query: {message.content}")
      print()
      initial_query = False
      continue
    print()
    print(f"Agent Response: {message.content}")

In [1379]:
from langchain_core.messages import HumanMessage

inputs = {"messages" : [HumanMessage(content="Can you retreive Nvidia's 10-Q filings for April 1st, 2021?")]}

messages = app.invoke(inputs)

print_messages(messages)

Initial Query: Can you retreive Nvidia's 10-Q filings for April 1st, 2021?


Tool Call - Name: get_CIK + Query: {"ticker":"NVDA"}
Tool Response: 0001045810

Tool Call - Name: get_financial_report + Query: {"cik":"0001045810","report_type":"10-Q","date":"2021-04-01"}
Tool Response: ./data/nvda2020q110q.htm.pdf

Agent Response: I have retrieved Nvidia's 10-Q filing for April 1st, 2021. You can download it using the link below:

[Nvidia 10-Q Filing - April 1st, 2021](sandbox:/data/nvda2020q110q.htm.pdf)
