In [1]:
%load_ext autoreload
%autoreload 2

In [120]:
import os
from dotenv import load_dotenv, find_dotenv
import nest_asyncio
import warnings

_ = load_dotenv(find_dotenv())
nest_asyncio.apply()
warnings.filterwarnings('ignore')

from llama_index.core import Settings
from llama_index.core.tools.tool_spec.base import BaseToolSpec
from llama_index.llms.bedrock_converse import BedrockConverse
from llama_index.embeddings.bedrock import BedrockEmbedding

import yfinance as yf
from datetime import datetime

import pandas as pd

from typing import Optional, Literal, List, Union, Tuple

In [129]:
## Utility functions ##
def rename_columns(ticker: str, df: pd.DataFrame) -> pd.DataFrame:
    """Helper function to post-process dataframe column names"""
    column_dict = {old_name: process_string(ticker = ticker,
                                            string_ = old_name) \
        for old_name in df.columns.values.tolist()}
    df.rename(columns=column_dict, inplace=True)
    return df

def process_string(ticker: str, 
                    string_: str) -> str:
    return f"{ticker}_{'_'.join(string_.lower().split(" "))}"

class FinanceDataTools(BaseToolSpec):
    spec_functions = [
        "get_stock_data",
        "get_min_or_max",
        "get_correlation_between_tickers",
        "get_rolling_average",
        "get_rolling_average_correl",
        "get_longest_uptrend",
        "get_longest_downtrend",
        "cagr",
        "get_statistics"
    ]
    
    def __init__(self) -> None:
        """Initializes the Yahoo finance tool spec"""
        
    def get_stock_data(self, 
                       ticker: str,
                       period: Optional[
                           Literal["1d",
                                   "5d",
                                   "1mo",
                                   "3mo",
                                   "6mo",
                                   "1y",
                                   "2y",
                                   "5y",
                                   "10y",
                                   "ytd",
                                   "max"]
                           ] = "10y") -> pd.DataFrame:
        """Gets the daily historical prices and volume for a ticker across a specified period"""
        return rename_columns(ticker = ticker, df = yf.Ticker(ticker).history(period=period))
    
    def get_min_or_max(self, 
                       ticker: str,
                       field: Literal["Open", 
                                      "High", 
                                      "Low",
                                      "Close",
                                      "Volume",
                                      "Dividends",
                                      "Stock Splits"],
                       period: Optional[
                           Literal["1d",
                                   "5d",
                                   "1mo",
                                   "3mo",
                                   "6mo",
                                   "1y",
                                   "2y",
                                   "5y",
                                   "10y",
                                   "ytd",
                                   "max"]
                           ] = "10y",
                       get_max: Optional[bool] = True):
        """Gets min or max value of the field of interest"""
        df = self.get_stock_data(ticker=ticker, period=period)
        field = process_string(ticker=ticker, string_=field)
        if get_max is True:
            value = df[field].max()
        else:
            value = df[field].min()
        return df[df[field] == value]
    
    def get_correlation(self, 
                        ticker: str,
                        period: Optional[
                           Literal["1d",
                                   "5d",
                                   "1mo",
                                   "3mo",
                                   "6mo",
                                   "1y",
                                   "2y",
                                   "5y",
                                   "10y",
                                   "ytd",
                                   "max"]
                           ] = "10y"):
        """Computes correlation between all metrics for a specific ticker"""
        return self.get_stock_data(ticker=ticker, period=period).corr()
    
    def get_correlation_between_tickers(
        self,
        tickers: List[str],
        period: Optional[
                        Literal["1d",
                                "5d",
                                "1mo",
                                "3mo",
                                "6mo",
                                "1y",
                                "2y",
                                "5y",
                                "10y",
                                "ytd",
                                "max"]
                        ] = "10y"
    ):
        """Computes correlation of all metrics for all tickers"""
        df_fin = None
        for ticker in tickers:
            df = self.get_stock_data(ticker=ticker, period=period)
            if df_fin is None:
                df_fin = df
            else:
                df_fin = pd.merge(df_fin, 
                                  df, 
                                  left_index = True,
                                  right_index = True)
        return df_fin.corr()
    
    def get_rolling_average(self, 
                            ticker: str,
                            field: Literal["Open", 
                                      "High", 
                                      "Low",
                                      "Close",
                                      "Volume",
                                      "Dividends",
                                      "Stock Splits"], 
                            n: Optional[int] = 30,
                            period: Optional[
                                        Literal["1d",
                                                "5d",
                                                "1mo",
                                                "3mo",
                                                "6mo",
                                                "1y",
                                                "2y",
                                                "5y",
                                                "10y",
                                                "ytd",
                                                "max"]
                                        ] = "10y"):
        """Computes moving average for field of interest across a specified period"""
        df = self.get_stock_data(ticker=ticker, period=period)
        field = process_string(ticker=ticker, string_=field)
        return pd.DataFrame(df[field].rolling(n).mean())
    
    def get_rolling_average_correl(
        self, 
        tickers: List[str], 
        field: Literal["Open", 
                        "High", 
                        "Low",
                        "Close",
                        "Volume",
                        "Dividends",
                        "Stock Splits"], 
        n: Optional[int] = 30,
        period: Optional[
                    Literal["1d",
                            "5d",
                            "1mo",
                            "3mo",
                            "6mo",
                            "1y",
                            "2y",
                            "5y",
                            "10y",
                            "ytd",
                            "max"]
                    ] = "10y"
    ):
        """Gets correlation of rolling average for a specified field across a list of tickers"""
        df_fin = None
        for ticker in tickers:
            df = self.get_rolling_average(
                ticker = ticker,
                field = field,
                n = n,
                period = period
            )
            if df_fin is None:
                df_fin = df
            else:
                df_fin = pd.merge(df_fin, 
                                  df,
                                  left_index = True,
                                  right_index = True)
        return df_fin.corr()
    
    def get_longest_uptrend(
        self,
        ticker: str
    ):
        """Computes longest stock price uptrend duration for ticker"""
        df = self.get_stock_data(ticker=ticker)
        df['uptrend_days'] = df[f'{ticker}_close'].diff().lt(0).cumsum()
        sizes = df.groupby('uptrend_days')[f'{ticker}_close'].transform('size')
        dates = df[sizes == sizes.max()].index 
        return dates, f"{(dates[-1] - dates[0]).days} days"
    
    def get_longest_downtrend(
        self,
        ticker: str
    ):
        """Computes longest stock price downtrend duration for ticker"""
        df = self.get_stock_data(ticker=ticker)
        df['downtrend_days'] = df[f'{ticker}_close'].diff().gt(0).cumsum()
        sizes=df.groupby('downtrend_days')[f'{ticker}_close'].transform('size')
        dates = df[sizes == sizes.max()].index 
        return dates, f"{(dates[-1] - dates[0]).days} days"
    
    def cagr(self,
             ticker: str,
             period: Optional[
                    Literal["1d",
                            "5d",
                            "1mo",
                            "3mo",
                            "6mo",
                            "1y",
                            "2y",
                            "5y",
                            "10y",
                            "ytd",
                            "max"]
                    ] = "10y"): 
        """Computes compounded annual growth rate of closing prices for specified ticker"""
        df = self.get_stock_data(ticker=ticker, period=period)
        df['year'] = df.index.year
        trimmed = df.iloc[[0, -1]][['year',f'{ticker}_close']]
        n = trimmed['year'].iloc[-1] - trimmed['year'].iloc[0]
        start, end = trimmed[f'{ticker}_close'].iloc[-1], trimmed[f'{ticker}_close'].iloc[0]
        return f"{round(100*((end/start)**(1/n)-1), 1)}%"
    
    def get_statistics(
        self,
        ticker: str,
        period: Optional[
                    Literal["1d",
                            "5d",
                            "1mo",
                            "3mo",
                            "6mo",
                            "1y",
                            "2y",
                            "5y",
                            "10y",
                            "ytd",
                            "max"]
                    ] = "10y",
        grouping: Optional[Literal["year",
                                   "quarter",
                                   "month",
                                   "week",
                                   None]] = None
    ) -> Union[Tuple[float, float], pd.DataFrame]:
        """Gets descriptive statistics of data for grouping of interest"""
        df = self.get_stock_data(ticker=ticker, period=period)
        if grouping is None:
            return df.describe()
        df['year'] = df.index.year
        if grouping == 'year':
            return df.groupby(grouping).mean(), df.groupby(grouping).std()
        if grouping == 'quarter':
            df['quarter'] = df.index.quarter
            return df.groupby(["year", "quarter"]).mean(), df.groupby(["year", "quarter"]).std() 
        if grouping == 'month':
            df['month'] = df.index.month
            return df.groupby(["year", "month"]).mean(), df.groupby(["year", "month"]).std() 

In [132]:
ft = FinanceDataTools()
ft_tool_list = ft.to_tool_list()

In [139]:
Settings.llm = BedrockConverse(
    model = "anthropic.claude-3-haiku-20240307-v1:0",
    aws_access_key_id = os.environ["AWS_ACCESS_KEY"],
    aws_secret_access_key = os.environ["AWS_SECRET_ACCESS_KEY"],
    region_name = os.environ["AWS_DEFAULT_REGION"]
)
Settings.embed_model = BedrockEmbedding(
    model = "amazon.titan-embed-text-v1",
    aws_access_key_id = os.environ["AWS_ACCESS_KEY"],
    aws_secret_access_key = os.environ["AWS_SECRET_ACCESS_KEY"],
    aws_region_name = os.environ["AWS_DEFAULT_REGION"]
)

In [141]:
from llama_index.core.agent import (
    FunctionCallingAgentWorker,
    AgentRunner
)
from IPython.display import display, Markdown

agent_worker = FunctionCallingAgentWorker.from_tools(
    tools = ft_tool_list,
    llm = Settings.llm,
    verbose = True)
agent = AgentRunner(agent_worker=agent_worker)

In [142]:
query = "Is there any correlation between Illumina, Apple and NVIDIA closing prices?"
response = agent.chat(query)

display(Markdown(f"<b>{response}</b>"))

Added user message to memory: Is there any correlation between Illumina, Apple and NVIDIA closing prices?
=== LLM Response ===
Okay, let's check the correlation between the closing prices of Illumina, Apple, and NVIDIA over the past 10 years.
=== Calling Function ===
Calling function: get_correlation_between_tickers with args: {"period": "10y", "tickers": ["ILMN", "AAPL", "NVDA"]}
=== Function Output ===
                   ILMN_open  ILMN_high  ILMN_low  ILMN_close  ILMN_volume  \
ILMN_open           1.000000   0.999084  0.999184    0.998368    -0.237323   
ILMN_high           0.999084   1.000000  0.998830    0.999179    -0.228818   
ILMN_low            0.999184   0.998830  1.000000    0.999280    -0.244077   
ILMN_close          0.998368   0.999179  0.999280    1.000000    -0.235977   
ILMN_volume        -0.237323  -0.228818 -0.244077   -0.235977     1.000000   
ILMN_dividends           NaN        NaN       NaN         NaN          NaN   
ILMN_stock_splits  -0.028809  -0.028294 -0.028

<b>The results show that there is some correlation between the closing prices of Illumina (ILMN), Apple (AAPL), and NVIDIA (NVDA) over the past 10 years:

- The correlation between ILMN and AAPL closing prices is around 0.18, indicating a weak positive correlation.
- The correlation between ILMN and NVDA closing prices is around -0.18, indicating a weak negative correlation.
- The correlation between AAPL and NVDA closing prices is around 0.78, indicating a strong positive correlation.

So while there is some relationship between the closing prices of these three tickers, the correlations are not extremely strong, except for the correlation between AAPL and NVDA. This suggests that the closing prices of these stocks do not move in lockstep with each other, and there may be other factors driving the individual stock price movements.</b>

In [145]:
query = "What were the longest downtrend days for the Illumina, Apple and NVIDIA? Were there any overlaps in the dates?"
response = agent.chat(query)

display(Markdown(f"<b>{response}</b>"))

Added user message to memory: What were the longest downtrend days for the Illumina, Apple and NVIDIA? Were there any overlaps in the dates?
=== LLM Response ===
Okay, let's find the longest downtrends for Illumina, Apple, and NVIDIA:
=== Calling Function ===
Calling function: get_longest_downtrend with args: {"ticker": "ILMN"}
=== Function Output ===
(DatetimeIndex(['2021-11-04 00:00:00-04:00', '2021-11-05 00:00:00-04:00',
               '2021-11-08 00:00:00-05:00', '2021-11-09 00:00:00-05:00',
               '2021-11-10 00:00:00-05:00', '2021-11-11 00:00:00-05:00',
               '2021-11-12 00:00:00-05:00', '2021-11-15 00:00:00-05:00',
               '2021-11-16 00:00:00-05:00', '2021-11-17 00:00:00-05:00',
               '2021-11-18 00:00:00-05:00'],
              dtype='datetime64[ns, America/New_York]', name='Date', freq=None), '14 days')
=== LLM Response ===
The longest downtrend for Illumina (ticker ILMN) was 14 days.
=== Calling Function ===
Calling function: get_longest_downt

<b>The longest downtrend for NVIDIA (ticker NVDA) was 13 days.

Comparing the results, there does not appear to be any overlap in the dates of the longest downtrends for these three tickers. The longest downtrends occurred at different time periods for each company:

- Illumina: 14 days
- Apple: 2108 days 
- NVIDIA: 13 days

The dates of the longest downtrends did not overlap between the three tickers.</b>

In [146]:
query = "Is there any correlation between Illumina, Apple and NVIDIA 30 day moving average closing prices?"
response = agent.chat(query)

display(Markdown(f"<b>{response}</b>"))

Added user message to memory: Is there any correlation between Illumina, Apple and NVIDIA 30 day moving average closing prices?
=== LLM Response ===
Okay, let's check the correlation between the 30-day moving average of the closing prices for Illumina, Apple, and NVIDIA.
=== Calling Function ===
Calling function: get_rolling_average_correl with args: {"tickers": ["ILMN", "AAPL", "NVDA"], "field": "Close", "n": 30, "period": "10y"}
=== Function Output ===
            ILMN_close  AAPL_close  NVDA_close
ILMN_close    1.000000    0.193005   -0.152141
AAPL_close    0.193005    1.000000    0.792382
NVDA_close   -0.152141    0.792382    1.000000
=== LLM Response ===
The results show the following:

- The correlation between Illumina (ILMN) and Apple (AAPL) 30-day moving average closing prices is 0.193, which is a weak positive correlation.
- The correlation between Illumina (ILMN) and NVIDIA (NVDA) 30-day moving average closing prices is -0.152, which is a weak negative correlation.
- The cor

<b>The results show the following:

- The correlation between Illumina (ILMN) and Apple (AAPL) 30-day moving average closing prices is 0.193, which is a weak positive correlation.
- The correlation between Illumina (ILMN) and NVIDIA (NVDA) 30-day moving average closing prices is -0.152, which is a weak negative correlation.
- The correlation between Apple (AAPL) and NVIDIA (NVDA) 30-day moving average closing prices is 0.792, which is a strong positive correlation.

So there is a weak correlation between Illumina and the other two tickers, but a strong positive correlation between Apple and NVIDIA in terms of their 30-day moving average closing prices over the 10 year period.</b>

## Export

In [1]:
%%writefile ../tools/data_analysis_tools.py
#%%
import warnings
warnings.filterwarnings('ignore')

from llama_index.core.tools.tool_spec.base import BaseToolSpec
import yfinance as yf
from datetime import datetime
import pandas as pd
from typing import Optional, Literal, List, Union, Tuple

import os
import sys
__curdir__ = os.getcwd()

if "tools" in __curdir__:
    sys.path.append(os.path.join(
        __curdir__,
        "../src"
    ))
else:
    sys.path.append("./src")
from utils import rename_columns, process_string

#%%
class DataAnalysisTools(BaseToolSpec):
    """These tools are intended for general data analysis by agents. They allow for very general
    questions and specific questions by users."""
    
    spec_functions = [
        "get_stock_data",
        "get_min_or_max",
        "get_correlation_between_tickers",
        "get_rolling_average",
        "get_rolling_average_correl",
        "get_longest_uptrend",
        "get_longest_downtrend",
        "cagr",
        "get_statistics"
    ]
    
    def __init__(self) -> None:
        """Initializes the Yahoo finance tool spec"""
        
    def get_stock_data(
        self, 
        ticker: str,
        period: Optional[
            Literal["1d",
                    "5d",
                    "1mo",
                    "3mo",
                    "6mo",
                    "1y",
                    "2y",
                    "5y",
                    "10y",
                    "ytd",
                    "max"]
            ] = "10y") -> pd.DataFrame:
        """Gets the daily historical prices and volume for a ticker across a specified period"""
        return rename_columns(ticker = ticker, df = yf.Ticker(ticker).history(period=period))
    
    def get_min_or_max(
        self, 
        ticker: str,
        field: Literal["Open", 
                        "High", 
                        "Low",
                        "Close",
                        "Volume",
                        "Dividends",
                        "Stock Splits"],
        period: Optional[
            Literal["1d",
                    "5d",
                    "1mo",
                    "3mo",
                    "6mo",
                    "1y",
                    "2y",
                    "5y",
                    "10y",
                    "ytd",
                    "max"]
            ] = "10y",
        get_max: Optional[bool] = True):
        """Gets min or max value of the field of interest"""
        df = self.get_stock_data(ticker=ticker, period=period)
        field = process_string(ticker=ticker, string_=field)
        if get_max is True:
            value = df[field].max()
        else:
            value = df[field].min()
        return df[df[field] == value]
    
    def get_correlation(
        self, 
        ticker: str,
        period: Optional[
            Literal["1d",
                    "5d",
                    "1mo",
                    "3mo",
                    "6mo",
                    "1y",
                    "2y",
                    "5y",
                    "10y",
                    "ytd",
                    "max"]
            ] = "10y"):
        """Computes correlation between all metrics for a specific ticker"""
        return self.get_stock_data(ticker=ticker, period=period).corr()
    
    def get_correlation_between_tickers(
        self,
        tickers: List[str],
        period: Optional[
                        Literal["1d",
                                "5d",
                                "1mo",
                                "3mo",
                                "6mo",
                                "1y",
                                "2y",
                                "5y",
                                "10y",
                                "ytd",
                                "max"]
                        ] = "10y"
    ):
        """Computes correlation of all metrics for all tickers"""
        df_fin = None
        for ticker in tickers:
            df = self.get_stock_data(ticker=ticker, period=period)
            if df_fin is None:
                df_fin = df
            else:
                df_fin = pd.merge(
                    df_fin, 
                    df, 
                    left_index = True,
                    right_index = True)
        return df_fin.corr()
    
    def get_rolling_average(
        self, 
        ticker: str,
        field: Literal["Open", 
                    "High", 
                    "Low",
                    "Close",
                    "Volume",
                    "Dividends",
                    "Stock Splits"], 
        n: Optional[int] = 30,
        period: Optional[
                    Literal["1d",
                            "5d",
                            "1mo",
                            "3mo",
                            "6mo",
                            "1y",
                            "2y",
                            "5y",
                            "10y",
                            "ytd",
                            "max"]
                    ] = "10y"):
        """Computes moving average for field of interest across a specified period"""
        df = self.get_stock_data(ticker=ticker, period=period)
        field = process_string(ticker=ticker, string_=field)
        return pd.DataFrame(df[field].rolling(n).mean())
    
    def get_rolling_average_correl(
        self, 
        tickers: List[str], 
        field: Literal["Open", 
                        "High", 
                        "Low",
                        "Close",
                        "Volume",
                        "Dividends",
                        "Stock Splits"], 
        n: Optional[int] = 30,
        period: Optional[
                    Literal["1d",
                            "5d",
                            "1mo",
                            "3mo",
                            "6mo",
                            "1y",
                            "2y",
                            "5y",
                            "10y",
                            "ytd",
                            "max"]
                    ] = "10y"
    ):
        """Gets correlation of rolling average for a specified field across a list of tickers"""
        df_fin = None
        for ticker in tickers:
            df = self.get_rolling_average(
                ticker = ticker,
                field = field,
                n = n,
                period = period
            )
            if df_fin is None:
                df_fin = df
            else:
                df_fin = pd.merge(
                    df_fin, 
                    df,
                    left_index = True,
                    right_index = True)
        return df_fin.corr()
    
    def get_longest_uptrend(
        self,
        ticker: str
    ):
        """Computes longest stock price uptrend duration for ticker"""
        df = self.get_stock_data(ticker=ticker)
        df['uptrend_days'] = df[f'{ticker}_close'].diff().lt(0).cumsum()
        sizes = df.groupby('uptrend_days')[f'{ticker}_close'].transform('size')
        dates = df[sizes == sizes.max()].index 
        return dates, f"{(dates[-1] - dates[0]).days} days"
    
    def get_longest_downtrend(
        self,
        ticker: str
    ):
        """Computes longest stock price downtrend duration for ticker"""
        df = self.get_stock_data(ticker=ticker)
        df['downtrend_days'] = df[f'{ticker}_close'].diff().gt(0).cumsum()
        sizes=df.groupby('downtrend_days')[f'{ticker}_close'].transform('size')
        dates = df[sizes == sizes.max()].index 
        return dates, f"{(dates[-1] - dates[0]).days} days"
    
    def cagr(
        self,
        ticker: str,
        period: Optional[
            Literal["1d",
                    "5d",
                    "1mo",
                    "3mo",
                    "6mo",
                    "1y",
                    "2y",
                    "5y",
                    "10y",
                    "ytd",
                    "max"]
            ] = "10y"): 
        """Computes compounded annual growth rate of closing prices for specified ticker"""
        df = self.get_stock_data(ticker=ticker, period=period)
        df['year'] = df.index.year
        trimmed = df.iloc[[0, -1]][['year',f'{ticker}_close']]
        n = trimmed['year'].iloc[-1] - trimmed['year'].iloc[0]
        start, end = trimmed[f'{ticker}_close'].iloc[-1], trimmed[f'{ticker}_close'].iloc[0]
        return f"{round(100*((end/start)**(1/n)-1), 1)}%"
    
    def get_statistics(
        self,
        ticker: str,
        period: Optional[
                    Literal["1d",
                            "5d",
                            "1mo",
                            "3mo",
                            "6mo",
                            "1y",
                            "2y",
                            "5y",
                            "10y",
                            "ytd",
                            "max"]
                    ] = "10y",
        grouping: Optional[
            Literal["year",
                    "quarter",
                    "month",
                    "week",
                    None]] = None
    ) -> Union[Tuple[float, float], pd.DataFrame]:
        """Gets descriptive statistics of data for grouping of interest"""
        df = self.get_stock_data(ticker=ticker, period=period)
        if grouping is None:
            return df.describe()
        df['year'] = df.index.year
        if grouping == 'year':
            return df.groupby(grouping).mean(), df.groupby(grouping).std()
        if grouping == 'quarter':
            df['quarter'] = df.index.quarter
            return df.groupby(["year", "quarter"]).mean(), df.groupby(["year", "quarter"]).std() 
        if grouping == 'month':
            df['month'] = df.index.month
            return df.groupby(["year", "month"]).mean(), df.groupby(["year", "month"]).std()

def get_da_tools():
    da = DataAnalysisTools()
    return da.to_tool_list()  

Overwriting data_analysis_tools.py
