In [1]:
! pip install vnstock llama_index pandas plotly sqlite-vec tabulate llama-index llama-index-experimental  llama-index-callbacks-langfuse "nbformat>=4.2.0"



# Import packages

In [1]:
from dotenv import load_dotenv
import pandas as pd
from typing import Dict, Optional, Literal

load_dotenv()
from datetime import date
from llama_index.llms.openai import OpenAI
from llama_index.core.agent.workflow import FunctionAgent
from functools import lru_cache
from llama_index.experimental.query_engine import PandasQueryEngine
from llama_index.core.agent.runner.base import AgentRunner
import sqlite3
from pandas.io.json import build_table_schema
from vnstock import Listing, Quote
from llama_index.core.prompts import PromptTemplate

import plotly.io as pio

pio.renderers.default = "vscode"  # Change to "vscode" for VSCode, "browser" for browser, or "notebook" for Jupyter Notebook
pd.options.plotting.backend = "plotly"


import plotly.graph_objects as go



  import pkg_resources


# Create class to save data frames into SQLite database

In [3]:
class SqliteStorage:
    def __init__(self, db_path: str = ":memory:"):
        self.conn = sqlite3.connect(db_path, check_same_thread=False)

    def save_dataframe(
        self, df: pd.DataFrame, table_name: str, description: Optional[str] = None
    ):
        df.to_sql(table_name, self.conn, if_exists="fail", index=False)
        if description:
            self._save_table_description(table_name, description)

    def get_dataframe(self, table_name: str) -> pd.DataFrame:
        return pd.read_sql_query(f"SELECT * FROM {table_name}", self.conn)

    def remove_table(self, table_name: str):
        cursor = self.conn.cursor()
        cursor.execute(f"DROP TABLE IF EXISTS {table_name}")

        cursor.execute(
            "DELETE FROM table_descriptions WHERE table_name = ?",
            (table_name,),
        )
        self.conn.commit()

    def _save_table_description(self, table_name: str, description: str):
        cursor = self.conn.cursor()
        cursor.execute(
            "CREATE TABLE IF NOT EXISTS table_descriptions (table_name TEXT, description TEXT)"
        )
        cursor.execute(
            "INSERT INTO table_descriptions (table_name, description) VALUES (?, ?)",
            (table_name, description),
        )
        self.conn.commit()

    def get_table_description(self, table_name: str) -> Optional[str]:
        cursor = self.conn.cursor()
        cursor.execute(
            "SELECT description FROM table_descriptions WHERE table_name = ?",
            (table_name,),
        )
        row = cursor.fetchone()
        return row[0] if row else None

    def get_all_table_descriptions(self) -> Dict[str, str]:
        cursor = self.conn.cursor()
        cursor.execute("SELECT table_name, description FROM table_descriptions")
        rows = cursor.fetchall()
        return {row[0]: row[1] for row in rows}

    def check_table_exists(self, table_name: str) -> bool:
        cursor = self.conn.cursor()
        cursor.execute(
            "SELECT name FROM sqlite_master WHERE type='table' AND name=?",
            (table_name,),
        )
        return cursor.fetchone() is not None

In [4]:
db_storage = SqliteStorage()


In [5]:
df = pd.DataFrame.from_records(
    [
        {
            "symbol": "VIC",
            "name": "Vingroup JSC",
            "industry": "Real Estate",
            "sector": "Consumer Discretionary",
            "market_cap": 100000000000,
            "ipo_date": date(2006, 5, 8),
        },
        {
            "symbol": "VNM",
            "name": "Vinamilk JSC",
            "industry": "Food & Beverage",
            "sector": "Consumer Staples",
            "market_cap": 50000000000,
            "ipo_date": date(2006, 11, 19),
        },
    ]
)

db_storage.save_dataframe(
    df=df, table_name="test_listings", description="Test table for listings"
)


In [6]:

db_storage.get_dataframe(table_name="test_listings" ).head()

Unnamed: 0,symbol,name,industry,sector,market_cap,ipo_date
0,VIC,Vingroup JSC,Real Estate,Consumer Discretionary,100000000000,2006-05-08
1,VNM,Vinamilk JSC,Food & Beverage,Consumer Staples,50000000000,2006-11-19


In [7]:
db_storage.get_table_description("test_listings")

'Test table for listings'

In [8]:
db_storage.get_all_table_descriptions()

{'test_listings': 'Test table for listings'}

# Create class to handle the data

In [None]:
class StockDataSource:
    """
    A data source for stock listings and quotes.
    This class interacts with the Vnstock API to retrieve stock listings and quotes.
    It uses a SqliteStorage instance to store and retrieve data and save them in a SQLite database.
    """

    def __init__(self, db_storage: SqliteStorage):
        self.db_storage = db_storage
        self._listing = Listing()

    def _describe_dataframe(
        self, df: pd.DataFrame, additional_metadata: dict[str, str]
    ) -> str:
        """
        Returns a DataFrame with the description of the specified table.
        """

        return "\n".join(
            [
                f"This table contains: {df.columns.tolist()}",
                f"Its statistics:\n {df.describe().to_string()}",
                f"Data types:\n {build_table_schema(df)['fields']}",
                f"Queried with these parameters: {additional_metadata}",
            ]
        )

    def pull_symbols(self) -> str:
        """
        Pull all stock listings from the Vnstock API and save them to the database.
        Returned a string indicating the table name.
        """
        if self.db_storage.check_table_exists("symbols"):
            return "symbols"

        df = self._listing.symbols_by_industries()

        industry_code_cols = [
            column for column in df.columns if "icb_code" in column.lower()
        ]

        industry_name_cols = [
            column for column in df.columns if "icb_name" in column.lower()
        ]

        df["industry_codes"] = df[industry_code_cols].apply(
            lambda row: row[industry_code_cols].dropna().tolist(), axis=1
        )
        df["industry_names"] = df[industry_name_cols].apply(
            lambda row: row[industry_name_cols].dropna().tolist(), axis=1
        )
        df = df.drop(columns=industry_code_cols + industry_name_cols)

        self.db_storage.save_dataframe(
            df=df,
            table_name="symbols",
            description=self._describe_dataframe(
                df,
                additional_metadata=dict(
                    source="Vnstock API",
                ),
            ),
        )

        return "symbols"

    def pull_industries(self) -> str:
        """Pulls all industries from the Vnstock API and saves them to the database.
        Returns a string indicating the table name.
        """
        if self.db_storage.check_table_exists("industries"):
            return "industries"

        df = self._listing.industries_icb()

        self.db_storage.save_dataframe(
            df=df,
            table_name="industries",
            description=self._describe_dataframe(
                df,
                additional_metadata=dict(
                    source="Vnstock API",
                ),
            ),
        )

        return "industries"

    def pull_quotes(
        self,
        symbol: str,
        start_date: str,
        end_date: str,
        interval: str = "1D",
    ) -> str:
        """
        Pulls stock quotes for a given symbol and date range from the Vnstock API and saves them to the database.
        Start and end dates should be in the format YYYY-MM-DD.
        Interval can be one of ["1m", "5m", "15m", "30m", "1H", "1D", "1W", "1M"]
        Returns a string indicating the table name.
        """
        table_name = f"{symbol}_quotes_{start_date}__{end_date}__{interval}".replace(
            "-", "_"
        )
        if self.db_storage.check_table_exists(table_name):
            return table_name

        quote = Quote(symbol=symbol, source="VCI")

        df = quote.history(
            start=start_date,
            end=end_date,
            interval="1D",
        )

        self.db_storage.save_dataframe(
            df=df,
            table_name=table_name,
            description=self._describe_dataframe(
                df,
                additional_metadata=dict(
                    symbol=symbol,
                    start_date=start_date,
                    end_date=end_date,
                    interval=interval,
                ),
            ),
        )
        return table_name

    def get_all_table_descriptions(self) -> str:
        """
        Returns all table descriptions from the database.
        """
        text = ""
        for table, metadata in self.db_storage.get_all_table_descriptions().items():
            text += f"Table: {table}\nDescription:\n{metadata}\n\n"
        return text

    def clear_storage(self):
        """
        Clears all tables in the database.
        """
        cursor = self.db_storage.conn.cursor()
        cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
        tables = cursor.fetchall()
        for table in tables:
            self.db_storage.remove_table(table[0])
        self.db_storage.conn.commit()

    @property
    def all_tools(self):
        return [
            self.pull_symbols,
            self.pull_quotes,
            self.get_all_table_descriptions,
            self.clear_storage,
        ]

In [None]:
data_source = StockDataSource(db_storage=db_storage)

data_source.pull_symbols()
test_table = data_source.pull_quotes(
    symbol="ACB",
    start_date="2024-01-01",
    end_date="2025-03-10",
    interval="1D",
)

In [19]:
print(data_source.get_all_table_descriptions())

Table: test_listings
Description:
Test table for listings

Table: listings
Description:
This table contains: ['symbol', 'organ_name']
Its statistics:
        symbol                organ_name
count    1715                      1688
unique   1715                      1687
top       YTC  Công ty Cổ phần Nam Việt
freq        1                         2
Data types:
 [{'name': 'index', 'type': 'integer'}, {'name': 'symbol', 'type': 'string'}, {'name': 'organ_name', 'type': 'string'}]
Queried with these parameters: {'source': 'Vnstock API', 'date': datetime.date(2025, 6, 22)}

Table: ACB_quotes_2024-01-01__2025-03-10__1D
Description:
This table contains: ['time', 'open', 'high', 'low', 'close', 'volume']
Its statistics:
                                 time        open        high         low       close        volume
count                            293  293.000000  293.000000  293.000000  293.000000  2.930000e+02
mean   2024-08-05 12:51:36.245733888   20.236792   20.391160   20.079966   20.

# Create class to manipulate the dataframe

In [20]:
DEFAULT_RESPONSE_SYNTHESIS_PROMPT_TMPL = (
    "Given an input question, generate Plotly JSON could be used to render this result\n"
    "Based on the query and the Pandas DataFrame output, you must choose the appropriate visualization type, ensure that the visualization has a clear title, and that the axes are labeled correctly. You must also ensure that the visualization is clear and easy to understand.\n"
    "Title and axes labels should be same language as the query.\n"
    "Query: {query_str}\n\n"
    "Pandas Instructions (optional):\n{pandas_instructions}\n\n"
    "Pandas Output: {pandas_output}\n\n"
    "Response: "
)
DEFAULT_RESPONSE_SYNTHESIS_PROMPT = PromptTemplate(
    DEFAULT_RESPONSE_SYNTHESIS_PROMPT_TMPL,
)


class PandasQueryRunner:
    """
    A class to run queries on Pandas DataFrames.
    It uses duckdb to execute SQL queries on DataFrames stored in the SqliteStorage.
    """

    def __init__(self, db_storage: SqliteStorage):
        self.db_storage = db_storage
        self.get_pandas_query_engine = lru_cache(maxsize=128)(
            self._get_pandas_query_engine
        )

    def _get_pandas_query_engine(self, table_name: str) -> PandasQueryEngine:
        """
        Returns a PandasQueryEngine for the specified table name.
        """
        if not self.db_storage.check_table_exists(table_name):
            raise ValueError(f"Table '{table_name}' does not exist in the database.")

        df = self.db_storage.get_dataframe(table_name)
        return PandasQueryEngine(
            df,
            synthesize_response=True,
            response_synthesis_prompt=DEFAULT_RESPONSE_SYNTHESIS_PROMPT,
            verbose=True,
        )

    def run_query(self, table_name: str, query: str) -> str:
        """
        Runs a SQL query on the specified table and returns the result as a DataFrame.
        """
        query_engine = self.get_pandas_query_engine(table_name)

        response = query_engine.query(query)

        return str(response) if response is not None else "No results found."


pandas_query_runner = PandasQueryRunner(db_storage=db_storage)

output = pandas_query_runner.run_query(
    test_table,
    "Từ ngày 2024-01-01 đến 2025-03-19, giá đóng cửa cao nhất và thấp nhất của cổ phiếu ACB là bao nhiêu?",
)

print(output)

INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
> Pandas Instructions:
```
df[(df['time'] >= '2024-01-01') & (df['time'] <= '2025-03-19')].agg({'high': 'max', 'low': 'min'})
```
> Pandas Output: high    22.39
low     16.81
dtype: float64
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
{
    "data": [
        {
            "type": "bar",
            "x": ["High Price", "Low Price"],
            "y": [22.39, 16.81]
        }
    ],
    "layout": {
        "title": "Giá đóng cửa cao nhất và thấp nhất của cổ phiếu ACB từ 2024-01-01 đến 2025-03-19",
        "xaxis": {
            "title": "Loại giá"
        },
        "yaxis": {
            "title": "Giá"
        }
    }
}


# Simple tool to plot Plotly graphs

In [21]:
def show_plotly_chart(chart_json: str):
    """
    Displays a Plotly chart from a JSON string.
    """
    import plotly.io as pio
    chart = pio.from_json(chart_json)
    chart.show()

show_plotly_chart(output)

# Some time related functions


In [22]:
def get_current_date() -> str:
    """
    Returns the current date in YYYY-MM-DD format.
    """
    return date.today().strftime("%Y-%m-%d")


In [23]:
all_tools = data_source.all_tools + [
    pandas_query_runner.run_query,
    show_plotly_chart,
    get_current_date
]

In [24]:
import llama_index.core

llama_index.core.set_global_handler("simple")

In [25]:

workflow = FunctionAgent(
    tools=all_tools,
    llm=OpenAI(
        model="gpt-4o",
    ),
    system_prompt=(
        "You are a helpful assistant that can answer questions about stock listings and quotes. "
        "You can pull stock listings and quotes from the Vnstock API, run SQL queries on Pandas DataFrames, "
        "and provide information about the data stored in the database."
        "You can also clear the storage and get all table descriptions if user want to create a conversation with you."
        "Note that data is not available at start, so you need to pull listings and quotes first before querying them."
        "You can also visualize the data using Plotly charts but only for the final result of the query, not intermediate steps."
    ),
    verbose=True,
)

from llama_index.core.memory import ChatMemoryBuffer

memory = ChatMemoryBuffer.from_defaults(token_limit=40000)


In [26]:
response = await workflow.run(
    "Mã chứng khoán của Công ty Cổ phần Xi măng và Khoáng sản Yên Bái là gì?",
)

print(str(response))


Running step init_run
Step init_run produced event AgentInput
Running step setup_agent
Step setup_agent produced event AgentSetup
Running step run_agent_step
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
** Messages: **
system: You are a helpful assistant that can answer questions about stock listings and quotes. You can pull stock listings and quotes from the Vnstock API, run SQL queries on Pandas DataFrames, and provide information about the data stored in the database.You can also clear the storage and get all table descriptions if user want to create a conversation with you.Note that data is not available at start, so you need to pull listings and quotes first before querying them.You can also visualize the data using Plotly charts but only for the final result of the query, not intermediate steps.
user: Mã chứng khoán của Công ty Cổ phần Xi măng và Khoáng sả

INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
> Pandas Instructions:
```
df.agg({'high': 'max', 'low': 'min'})
```
> Pandas Output: high    25.0
low      5.1
dtype: float64
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
> Pandas Instructions:
```
df[['time', 'close']].sort_values('time')
```
> Pandas Output:                   time  close
0  2025-05-23 00:00:00   12.5
1  2025-05-26 00:00:00   12.5
2  2025-05-28 00:00:00   11.0
3  2025-06-02 00:00:00   10.2
4  2025-06-03 00:00:00   10.7
5  2025-06-09 00:00:00   10.7
6  2025-06-12 00:00:00   10.7
7  2025-06-16 00:00:00   10.7


In [None]:
response = await workflow.run(
    "Từ ngày 2024-10-01 đến 2025-03-31, giá cao nhất và thấp nhất của cổ phiếu YBC là bao nhiêu?",
)

print(str(response))

Running step init_run
Step init_run produced event AgentInput
Running step setup_agent
Step setup_agent produced event AgentSetup
Running step run_agent_step
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
** Messages: **
system: You are a helpful assistant that can answer questions about stock listings and quotes. You can pull stock listings and quotes from the Vnstock API, run SQL queries on Pandas DataFrames, and provide information about the data stored in the database.You can also clear the storage and get all table descriptions if user want to create a conversation with you.Note that data is not available at start, so you need to pull listings and quotes first before querying them.You can also visualize the data using Plotly charts but only for the final result of the query, not intermediate steps.
user: Mã chứng khoán của Công ty Cổ phần Xi măng và Khoáng sả

Step call_tool produced event ToolCallResult
Running step aggregate_tool_results
Step aggregate_tool_results produced event AgentInput
Running step setup_agent
Step setup_agent produced event AgentSetup
Running step run_agent_step
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
** Messages: **
system: You are a helpful assistant that can answer questions about stock listings and quotes. You can pull stock listings and quotes from the Vnstock API, run SQL queries on Pandas DataFrames, and provide information about the data stored in the database.You can also clear the storage and get all table descriptions if user want to create a conversation with you.Note that data is not available at start, so you need to pull listings and quotes first before querying them.You can also visualize the data using Plotly charts but only for the final result of the query, not intermed

In [29]:
response = await workflow.run(
    "Trong một tháng qua, cổ phiếu YBC có xu hướng tăng hay giảm?",
)

print(str(response))

Running step init_run
Step init_run produced event AgentInput
Running step setup_agent
Step setup_agent produced event AgentSetup
Running step run_agent_step
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
** Messages: **
system: You are a helpful assistant that can answer questions about stock listings and quotes. You can pull stock listings and quotes from the Vnstock API, run SQL queries on Pandas DataFrames, and provide information about the data stored in the database.You can also clear the storage and get all table descriptions if user want to create a conversation with you.Note that data is not available at start, so you need to pull listings and quotes first before querying them.You can also visualize the data using Plotly charts but only for the final result of the query, not intermediate steps.
user: Trong một tháng qua, cổ phiếu YBC có xu hướng tăng hay 

Step call_tool produced event ToolCallResult
Running step aggregate_tool_results
Step aggregate_tool_results produced event AgentInput
Running step setup_agent
Step setup_agent produced event AgentSetup
Running step run_agent_step
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
** Messages: **
system: You are a helpful assistant that can answer questions about stock listings and quotes. You can pull stock listings and quotes from the Vnstock API, run SQL queries on Pandas DataFrames, and provide information about the data stored in the database.You can also clear the storage and get all table descriptions if user want to create a conversation with you.Note that data is not available at start, so you need to pull listings and quotes first before querying them.You can also visualize the data using Plotly charts but only for the final result of the query, not intermed