In [52]:
from llama_index.llms.azure_openai import AzureOpenAI
from llama_index.embeddings.azure_openai import AzureOpenAIEmbedding
from llama_index.core import Settings
from dotenv import load_dotenv
import os
load_dotenv()


aoai_api_key = os.getenv("AZURE_OPENAI_API_KEY") 
aoai_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
aoai_api_version = os.getenv("AZURE_OPENAI_API_VERSION")

llm = AzureOpenAI(
    model="gpt-4o",
    deployment_name="gpt-4o",
    api_key=aoai_api_key,
    azure_endpoint=aoai_endpoint,
    api_version=aoai_api_version,
)


# You need to deploy your own embedding model as well as your own chat completion model
embed_model = AzureOpenAIEmbedding(
    model="text-embedding-ada-002",
    deployment_name="text-embedding-ada-002",
    api_key=aoai_api_key,
    azure_endpoint=aoai_endpoint,
    api_version=aoai_api_version,
)
Settings.llm=llm
Settings.embed_model=embed_model

In [53]:
response = llm.complete("Paul Graham is ")
print(response)

Paul Graham is a British-born computer scientist, entrepreneur, venture capitalist, author, and essayist. He is best known for his work in the field of programming languages, particularly for developing the Lisp dialect known as "Arc." Graham is also a co-founder of Viaweb, one of the first web-based applications, which was later acquired by Yahoo! and became Yahoo! Store.

In addition to his technical contributions, Paul Graham is a prominent figure in the startup community. He co-founded Y Combinator, a highly influential startup accelerator that has funded and mentored numerous successful startups, including Dropbox, Airbnb, and Reddit. His essays on startups, technology, and programming have been widely read and have had a significant impact on the tech industry.

Graham's writings cover a range of topics, including entrepreneurship, innovation, and the philosophy of work and life. His insights and advice are often sought after by aspiring entrepreneurs and technologists.


In [54]:
import pandas as pd
from pathlib import Path

data_dir = Path(".\data\client")
csv_files = sorted([f for f in data_dir.glob("*.csv")])
dfs = []
for csv_file in csv_files:
    print(f"processing file: {csv_file}")
    try:
        df = pd.read_csv(csv_file)
        dfs.append(df)
    except Exception as e:
        print(f"Error parsing {csv_file}: {str(e)}")

processing file: data\client\LLM Data.csv
processing file: data\client\LLM Data_2.csv
processing file: data\client\product master.csv


  data_dir = Path(".\data\client")


### Extract Table Name and Summary from each Table¶
-Here we use gpt-3.5 to extract a table name (with underscores) and summary from each table with our Pydantic program.

In [55]:
tableinfo_dir = "WikiTableQuestions_TableInfo"
!mkdir {tableinfo_dir}

A subdirectory or file WikiTableQuestions_TableInfo already exists.


In [66]:
from llama_index.core.program import LLMTextCompletionProgram
from llama_index.core.bridge.pydantic import BaseModel, Field



class TableInfo(BaseModel):
    """Information regarding a structured table."""

    table_name: str = Field(
        ..., description="table name (must be underscores and NO spaces)"
    )
    table_summary: str = Field(
        ..., description="short, concise summary/caption of the table"
    )


prompt_str = """\
Give me a summary of the table with the following JSON format.

- The table name must be unique to the table and describe it while being concise. 
- Do NOT output a generic table name (e.g. table, my_table).

Do NOT make the table name one of the following: {exclude_table_name_list}

Table:
{table_str}

Summary: """

program = LLMTextCompletionProgram.from_defaults(
    output_cls=TableInfo,
    llm=llm,
    prompt_template_str=prompt_str,
)

In [67]:
import json


def _get_tableinfo_with_index(idx: int) -> str:
    results_gen = Path(tableinfo_dir).glob(f"{idx}_*")
    results_list = list(results_gen)
    if len(results_list) == 0:
        return None
    elif len(results_list) == 1:
        path = results_list[0]
        return TableInfo.parse_file(path)
    else:
        raise ValueError(
            f"More than one file matching index: {list(results_gen)}"
        )


table_names = set()
table_infos = []
for idx, df in enumerate(dfs):
    table_info = _get_tableinfo_with_index(idx)
    if table_info:
        table_infos.append(table_info)
    else:
        while True:
            df_str = df.head(10).to_csv()
            table_info = program(
                table_str=df_str,
                exclude_table_name_list=str(list(table_names)),
            )
            table_name = table_info.table_name
            print(f"Processed table: {table_name}")
            if table_name not in table_names:
                table_names.add(table_name)
                break
            else:
                # try again
                print(f"Table name {table_name} already exists, trying again.")
                pass

        out_file = f"{tableinfo_dir}/{idx}_{table_name}.json"
        json.dump(table_info.dict(), open(out_file, "w"))
    table_infos.append(table_info)

Processed table: russia_product_sales_2024
Processed table: russia_ins_sales_2024
Processed table: russia_pharma_launch_2024


In [76]:
# type(table_names)
table_infos

[TableInfo(table_name='russia_product_sales_2024', table_summary='This table details the monthly actual sales, budget, and latest estimate of sales for product P#1 - INS in Russia for the fiscal year 2024.'),
 TableInfo(table_name='russia_ins_sales_2024', table_summary='This table details the monthly DRL and Market Sales for Product P#1 - INS in Russia for the fiscal year 2024.'),
 TableInfo(table_name='russia_pharma_launch_2024', table_summary='This table contains information about pharmaceutical product launches in Russia for the year 2024, including product codes, molecules, pack sizes, channels, launch dates, and brands.')]

In [82]:
table_infos_copy=table_infos.copy()
table_infos_copy[0].table_name= "Country_product_sales_yearly_data"
table_infos_copy[0].table_summary="This table details the monthly actual sales, budget, and latest estimate of sales for different Product in different countries for the different fiscal year."

table_infos_copy[1].table_name= "Country_ins_sales_yearly_data"
table_infos_copy[1].table_summary="This table details the monthly DRL and Market Sales for different in different countries for the different fiscal year."

table_infos_copy[2].table_name= "Product_infomration"
table_infos_copy[2].table_summary="This table contains information about pharmaceutical product launches , including product codes, molecules, pack sizes, channels, launch dates, and brands."

In [83]:
table_infos_copy

[TableInfo(table_name='Country_product_sales_yearly_data', table_summary='This table details the monthly actual sales, budget, and latest estimate of sales for different Product in different countries for the different fiscal year.'),
 TableInfo(table_name='Country_ins_sales_yearly_data', table_summary='This table details the monthly DRL and Market Sales for different in different countries for the different fiscal year.'),
 TableInfo(table_name='Product_infomration', table_summary='This table contains information about pharmaceutical product launches , including product codes, molecules, pack sizes, channels, launch dates, and brands.')]

In [84]:
table_info=table_infos_copy.copy()

## Put Data in SQL Database¶


In [85]:
# put data into sqlite db
from sqlalchemy import (
    create_engine,
    MetaData,
    Table,
    Column,
    String,
    Integer,
)
import re


# Function to create a sanitized column name
def sanitize_column_name(col_name):
    # Remove special characters and replace spaces with underscores
    return re.sub(r"\W+", "_", col_name)


# Function to create a table from a DataFrame using SQLAlchemy
def create_table_from_dataframe(
    df: pd.DataFrame, table_name: str, engine, metadata_obj
):
    # Sanitize column names
    sanitized_columns = {col: sanitize_column_name(col) for col in df.columns}
    df = df.rename(columns=sanitized_columns)

    # Dynamically create columns based on DataFrame columns and data types
    columns = [
        Column(col, String if dtype == "object" else Integer)
        for col, dtype in zip(df.columns, df.dtypes)
    ]

    # Create a table with the defined columns
    table = Table(table_name, metadata_obj, *columns)

    # Create the table in the database
    metadata_obj.create_all(engine)

    # Insert data from DataFrame into the table
    with engine.connect() as conn:
        for _, row in df.iterrows():
            insert_stmt = table.insert().values(**row.to_dict())
            conn.execute(insert_stmt)
        conn.commit()


engine = create_engine("sqlite:///:memory:")
metadata_obj = MetaData()
for idx, df in enumerate(dfs):
    tableinfo = _get_tableinfo_with_index(idx)
    print(f"Creating table: {tableinfo.table_name}")
    create_table_from_dataframe(df, tableinfo.table_name, engine, metadata_obj)

Creating table: Country_product_sales_yearly_data
Creating table: Country_ins_sales_yearly_data
Creating table: Product_infomration


### Advanced Capability 1: Text-to-SQL with Query-Time Table Retrieval.

In [86]:
from llama_index.core.objects import (
    SQLTableNodeMapping,
    ObjectIndex,
    SQLTableSchema,
)
from llama_index.core import SQLDatabase, VectorStoreIndex

sql_database = SQLDatabase(engine)

table_node_mapping = SQLTableNodeMapping(sql_database)
table_schema_objs = [
    SQLTableSchema(table_name=t.table_name, context_str=t.table_summary)
    for t in table_infos
]  # add a SQLTableSchema for each table

obj_index = ObjectIndex.from_objects(
    table_schema_objs,
    table_node_mapping,
    VectorStoreIndex,
)
obj_retriever = obj_index.as_retriever(similarity_top_k=3)

SQLRetriever + Table Parser



In [87]:
from llama_index.core.retrievers import SQLRetriever
from typing import List
from llama_index.core.query_pipeline import FnComponent

sql_retriever = SQLRetriever(sql_database)


def get_table_context_str(table_schema_objs: List[SQLTableSchema]):
    """Get table context string."""
    context_strs = []
    for table_schema_obj in table_schema_objs:
        table_info = sql_database.get_single_table_info(
            table_schema_obj.table_name
        )
        if table_schema_obj.context_str:
            table_opt_context = " The table description is: "
            table_opt_context += table_schema_obj.context_str
            table_info += table_opt_context

        context_strs.append(table_info)
    return "\n\n".join(context_strs)


table_parser_component = FnComponent(fn=get_table_context_str)

Text-to-SQL Prompt + Output Parser



In [88]:
from llama_index.core.prompts.default_prompts import DEFAULT_TEXT_TO_SQL_PROMPT
from llama_index.core import PromptTemplate
from llama_index.core.query_pipeline import FnComponent
from llama_index.core.llms import ChatResponse


def parse_response_to_sql(response: ChatResponse) -> str:
    """Parse response to SQL."""
    response = response.message.content
    sql_query_start = response.find("SQLQuery:")
    if sql_query_start != -1:
        response = response[sql_query_start:]
        # TODO: move to removeprefix after Python 3.9+
        if response.startswith("SQLQuery:"):
            response = response[len("SQLQuery:") :]
    sql_result_start = response.find("SQLResult:")
    if sql_result_start != -1:
        response = response[:sql_result_start]
    return response.strip().strip("```").strip()


sql_parser_component = FnComponent(fn=parse_response_to_sql)

text2sql_prompt = DEFAULT_TEXT_TO_SQL_PROMPT.partial_format(
    dialect=engine.dialect.name
)
print(text2sql_prompt.template)

Given an input question, first create a syntactically correct {dialect} query to run, then look at the results of the query and return the answer. You can order the results by a relevant column to return the most interesting examples in the database.

Never query for all the columns from a specific table, only ask for a few relevant columns given the question.

Pay attention to use only the column names that you can see in the schema description. Be careful to not query for columns that do not exist. Pay attention to which column is in which table. Also, qualify column names with the table name when needed. You are required to use the following format, each taking one line:

Question: Question here
SQLQuery: SQL Query to run
SQLResult: Result of the SQLQuery
Answer: Final answer here

Only use tables listed below.
{schema}

Question: {query_str}
SQLQuery: 


In [89]:
response_synthesis_prompt_str = (
    "Given an input question, synthesize a response from the query results.\n"
    "Query: {query_str}\n"
    "SQL: {sql_query}\n"
    "SQL Response: {context_str}\n"
    "Response: "
)
response_synthesis_prompt = PromptTemplate(
    response_synthesis_prompt_str,
)

Define Query Pipeline¶


In [90]:
from llama_index.core.query_pipeline import (
    QueryPipeline as QP,
    Link,
    InputComponent,
    CustomQueryComponent,
)

qp = QP(
    modules={
        "input": InputComponent(),
        "table_retriever": obj_retriever,
        "table_output_parser": table_parser_component,
        "text2sql_prompt": text2sql_prompt,
        "text2sql_llm": llm,
        "sql_output_parser": sql_parser_component,
        "sql_retriever": sql_retriever,
        "response_synthesis_prompt": response_synthesis_prompt,
        "response_synthesis_llm": llm,
    },
    verbose=True,
)

In [91]:
qp.add_chain(["input", "table_retriever", "table_output_parser"])
qp.add_link("input", "text2sql_prompt", dest_key="query_str")
qp.add_link("table_output_parser", "text2sql_prompt", dest_key="schema")
qp.add_chain(
    ["text2sql_prompt", "text2sql_llm", "sql_output_parser", "sql_retriever"]
)
qp.add_link(
    "sql_output_parser", "response_synthesis_prompt", dest_key="sql_query"
)
qp.add_link(
    "sql_retriever", "response_synthesis_prompt", dest_key="context_str"
)
qp.add_link("input", "response_synthesis_prompt", dest_key="query_str")
qp.add_link("response_synthesis_prompt", "response_synthesis_llm")

Visualize Query Pipeline¶


Run Some Queries!¶


In [92]:

response = qp.run(
    query="Show me Sales Performance by country"
)
print(str(response))

[1;3;38;2;155;135;227m> Running module input with input: 
query: Show me Sales Performance by country

[0m[1;3;38;2;155;135;227m> Running module table_retriever with input: 
input: Show me Sales Performance by country

[0m[1;3;38;2;155;135;227m> Running module table_output_parser with input: 
table_schema_objs: [SQLTableSchema(table_name='Country_ins_sales_yearly_data', context_str='This table details the monthly DRL and Market Sales for different in different countries for the different fiscal year.'), SQLT...

[0m[1;3;38;2;155;135;227m> Running module text2sql_prompt with input: 
query_str: Show me Sales Performance by country
schema: Table 'Country_ins_sales_yearly_data' has columns: ID (INTEGER), Country (VARCHAR), Product (VARCHAR), FY_Year (INTEGER), Month (INTEGER), DRL_Sales (INTEGER), Market_Sales (INTEGER), . The table desc...

[0m[1;3;38;2;155;135;227m> Running module text2sql_llm with input: 
messages: Given an input question, first create a syntactically correct s

NotImplementedError: Statement 'sql\nSELECT \n    Country_product_sales_yearly_data.Country, \n    SUM(Country_product_sales_yearly_data.Actual_Sales) AS Total_Actual_Sales, \n    SUM(Country_product_sales_yearly_data.Budget) AS Total_Budget, \n    SUM(Country_product_sales_yearly_data.LE_Latest_Estimate_Estimation_of_sales_for_running_12_months) AS Total_Latest_Estimate\nFROM \n    Country_product_sales_yearly_data\nGROUP BY \n    Country_product_sales_yearly_data.Country\nORDER BY \n    Total_Actual_Sales DESC;' is invalid SQL.

### Advanced Capability 2: Text-to-SQL with Query-Time Row Retrieval (along with Table Retrieval)

In [93]:
from llama_index.core import VectorStoreIndex, load_index_from_storage
from sqlalchemy import text
from llama_index.core.schema import TextNode
from llama_index.core import StorageContext
import os
from pathlib import Path
from typing import Dict


def index_all_tables(
    sql_database: SQLDatabase, table_index_dir: str = "table_index_dir"
) -> Dict[str, VectorStoreIndex]:
    """Index all tables."""
    if not Path(table_index_dir).exists():
        os.makedirs(table_index_dir)

    vector_index_dict = {}
    engine = sql_database.engine
    for table_name in sql_database.get_usable_table_names():
        print(f"Indexing rows in table: {table_name}")
        if not os.path.exists(f"{table_index_dir}/{table_name}"):
            # get all rows from table
            with engine.connect() as conn:
                cursor = conn.execute(text(f'SELECT * FROM "{table_name}"'))
                result = cursor.fetchall()
                row_tups = []
                for row in result:
                    row_tups.append(tuple(row))

            # index each row, put into vector store index
            nodes = [TextNode(text=str(t)) for t in row_tups]

            # put into vector store index (use OpenAIEmbeddings by default)
            index = VectorStoreIndex(nodes)

            # save index
            index.set_index_id("vector_index")
            index.storage_context.persist(f"{table_index_dir}/{table_name}")
        else:
            # rebuild storage context
            storage_context = StorageContext.from_defaults(
                persist_dir=f"{table_index_dir}/{table_name}"
            )
            # load index
            index = load_index_from_storage(
                storage_context, index_id="vector_index"
            )
        vector_index_dict[table_name] = index

    return vector_index_dict


vector_index_dict = index_all_tables(sql_database)

Indexing rows in table: Country_ins_sales_yearly_data
Indexing rows in table: Country_product_sales_yearly_data
Indexing rows in table: Product_infomration


Define Expanded Table Parser Component¶


In [96]:
from llama_index.core.retrievers import SQLRetriever
from typing import List
from llama_index.core.query_pipeline import FnComponent

sql_retriever = SQLRetriever(sql_database)


def get_table_context_and_rows_str(
    query_str: str, table_schema_objs: List[SQLTableSchema]
):
    """Get table context string."""
    context_strs = []
    for table_schema_obj in table_schema_objs:
        # first append table info + additional context
        table_info = sql_database.get_single_table_info(
            table_schema_obj.table_name
        )
        if table_schema_obj.context_str:
            table_opt_context = " The table description is: "
            table_opt_context += table_schema_obj.context_str
            table_info += table_opt_context

        # also lookup vector index to return relevant table rows
        vector_retriever = vector_index_dict[
            table_schema_obj.table_name
        ].as_retriever(similarity_top_k=2)
        relevant_nodes = vector_retriever.retrieve(query_str)
        if len(relevant_nodes) > 0:
            table_row_context = "\nHere are some relevant example rows (values in the same order as columns above)\n"
            for node in relevant_nodes:
                table_row_context += str(node.get_content()) + "\n"
            table_info += table_row_context

        context_strs.append(table_info)
    return "\n\n".join(context_strs)


table_parser_component = FnComponent(fn=get_table_context_and_rows_str)

Define Expanded Query Pipeline¶


In [97]:
from llama_index.core.query_pipeline import (
    QueryPipeline as QP,
    Link,
    InputComponent,
    CustomQueryComponent,
)

qp = QP(
    modules={
        "input": InputComponent(),
        "table_retriever": obj_retriever,
        "table_output_parser": table_parser_component,
        "text2sql_prompt": text2sql_prompt,
        "text2sql_llm": llm,
        "sql_output_parser": sql_parser_component,
        "sql_retriever": sql_retriever,
        "response_synthesis_prompt": response_synthesis_prompt,
        "response_synthesis_llm": llm,
    },
    verbose=True,
)

In [98]:
qp.add_link("input", "table_retriever")
qp.add_link("input", "table_output_parser", dest_key="query_str")
qp.add_link(
    "table_retriever", "table_output_parser", dest_key="table_schema_objs"
)
qp.add_link("input", "text2sql_prompt", dest_key="query_str")
qp.add_link("table_output_parser", "text2sql_prompt", dest_key="schema")
qp.add_chain(
    ["text2sql_prompt", "text2sql_llm", "sql_output_parser", "sql_retriever"]
)
qp.add_link(
    "sql_output_parser", "response_synthesis_prompt", dest_key="sql_query"
)
qp.add_link(
    "sql_retriever", "response_synthesis_prompt", dest_key="context_str"
)
qp.add_link("input", "response_synthesis_prompt", dest_key="query_str")
qp.add_link("response_synthesis_prompt", "response_synthesis_llm")

In [113]:
response = qp.run(
    query="Show me sales by product for FY 2024"
)
print(str(response))

[1;3;38;2;155;135;227m> Running module input with input: 
query: Show me sales by product for FY 2024

[0m[1;3;38;2;155;135;227m> Running module table_retriever with input: 
input: Show me sales by product for FY 2024

[0m[1;3;38;2;155;135;227m> Running module table_output_parser with input: 
query_str: Show me sales by product for FY 2024
table_schema_objs: [SQLTableSchema(table_name='Country_product_sales_yearly_data', context_str='This table details the monthly actual sales, budget, and latest estimate of sales for different Product in different countr...

[0m[1;3;38;2;155;135;227m> Running module text2sql_prompt with input: 
query_str: Show me sales by product for FY 2024
schema: Table 'Country_product_sales_yearly_data' has columns: ID (INTEGER), Country (VARCHAR), Product (VARCHAR), FY_Year (INTEGER), Month (INTEGER), Actual_Sales (INTEGER), Budget (INTEGER), LE_Latest_Estim...

[0m[1;3;38;2;155;135;227m> Running module text2sql_llm with input: 
messages: Given an input 

In [112]:
from sqlalchemy import text

with engine.connect() as con:
    rows = con.execute(text("""SELECT Country_ins_sales_yearly_data.Country ,
                            SUM(Country_ins_sales_yearly_data.DRL_Sales) AS Total_DRL_Sales,
                            SUM(Country_ins_sales_yearly_data.Market_Sales) AS Total_Market_Sales
                            from Country_ins_sales_yearly_data
                            GROUP BY
                            Country_ins_sales_yearly_data.Country
                            ORDER BY
                            Total_DRL_Sales DESC"""))
    for row in rows:
        print(row)

('Russia', 119159978, 122619268)
('Brazil', 118458193, 122030149)


In [106]:
from sqlalchemy import create_engine, inspect, text
# Use the Inspector to get column names
inspector = inspect(engine)
columns = inspector.get_columns('Country_ins_sales_yearly_data')

# Print column names
column_names = [column['name'] for column in columns]
print("Column names:", column_names)

Column names: ['ID', 'Country', 'Product', 'FY_Year', 'Month', 'DRL_Sales', 'Market_Sales']


In [109]:
with engine.connect() as con:
    rows = con.execute(text("""SELECT count(Country_ins_sales_yearly_data.Country) AS Country 
                            
                            from Country_ins_sales_yearly_data where Country=="Brazil" """))
    for row in rows:
        print(row)

(48,)
