In [None]:
import os
import ast
import sqlite3
import openai
import pandas as pd
from pathlib import Path, PosixPath
from typing import NamedTuple
from langchain import OpenAI, SQLDatabase, PromptTemplate, LLMChain
from langchain.chat_models import ChatOpenAI
from langchain.llms import OpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema.output_parser import StrOutputParser
from langchain.schema.runnable import RunnablePassthrough  # Try without .passthrough
from langchain_experimental.sql import SQLDatabaseChain
# from langchain.utilities import SQLDatabase

In [None]:
# NamedTuple type hint
class ParametersType(NamedTuple):
    data_dir: PosixPath # Platform neutral pathlib PosixPath to data directory
    acs_path: PosixPath # Platform neutral pathlib PosixPath to ACS data
    db_path: PosixPath # Platform neutral pathlib PosixPath to SQLite3 database
    db_connection: sqlite3.Connection # SQLite3 database connection
    openai_api_key: str # OpenAI API key

In [None]:
Parameters: ParametersType = ParametersType(
    data_dir = Path.cwd() / "Data",
    acs_path = Path.cwd() / "Data/ACS_2012_21.csv",
    db_path= Path.cwd() / "Data/data.sqlite3",
    db_connection = sqlite3.connect(Path.cwd() / "Data/data.sqlite3"),  # ":memory:", "Data/data.sqlite3", "Data/acs.sqlite3"
    openai_api_key = os.environ["OPENAI_API_KEY"],
)

# List all tables in SQLite database

In [None]:
cursor = Parameters.db_connection.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables: list[tuple] = cursor.fetchall()
tables

# ACS Data

In [None]:
acs_df: pd.DataFrame = pd.read_csv(Parameters.acs_path)
acs_df.drop(columns=["Unnamed: 0"], inplace=True)
display(acs_df.info())
display(acs_df.head())

# Test OpenAI API Key

In [None]:
"""
Please provide python code to ask openai a question and retrieve the answer

To ask OpenAI a question and retrieve the answer, you can use the OpenAI API.
Here's an example Python code that demonstrates how to do this using the openai package:

In this example, you need to replace "YOUR_API_KEY" with your actual API key,
which you can obtain by signing up for OpenAI's API at https://beta.openai.com/signup/.

The ask_openai function takes three parameters: the question you want to ask,
the name of the OpenAI model you want to use (e.g., "davinci" or "curie"),
and the max_length of the generated answer (in number of tokens).

The function sends a request to the OpenAI API using the openai.Completion.create method,
which takes the engine, prompt, and max_tokens as parameters. The response is a list of
completions (i.e., possible answers), and we take the first one (which is usually the most likely answer)
and return it as a string.
"""

openai.api_key = Parameters.openai_api_key # "YOUR_API_KEY" # replace with your API key

def ask_openai(question, model, max_length):
    prompt = f"Q: {question}\nA:"
    completions = openai.Completion.create(
        engine=model,
        prompt=prompt,
        max_tokens=max_length
    )
    answer = completions.choices[0].text.strip()
    return answer

# Example usage
question = "What is the capital of France?"
model = "davinci"
max_length = 100
answer = ask_openai(question, model, max_length)
print(answer)

# [LangChain](https://pypi.org/project/langchain/)
https://coinsbench.com/chat-with-your-databases-using-langchain-bb7d31ed2e76  
https://medium.com/@hannanmentor/python-custom-chatgpt-with-your-own-data-f307635dd5bd  

## Check that LangChain works

In [None]:
# Simple LLM call Using LangChain
llm = OpenAI(model_name="text-davinci-003", openai_api_key=Parameters.openai_api_key)
question = "Which language is used to create chatgpt ?"
print(question, llm(question))

## Prompt template

In [None]:
# Creating a prompt template and running the LLM chain
template = "What are the top {n} resources to learn {language} programming?"
prompt = PromptTemplate(template=template,input_variables=['n','language'])
chain = LLMChain(llm=llm,prompt=prompt)
input_ = {'n':3,'language':'Python'}
print(chain.run(input_))

In [None]:
type(prompt)

In [None]:
# SQLDatabase.create_table_from_df(df=acs_df, table_name="acs", db_connection=Parameters.db_connection)  # Create table from dataframe

In [None]:
# Use LangChain to answers quetions using a SQLite3 database
llm = OpenAI(model_name="text-davinci-003", openai_api_key=Parameters.openai_api_key)
dburi = "sqlite:///Data/acs.sqlite3"
db = SQLDatabase.from_uri(dburi)
question = "What is the population of Kentucky?"
print(question, llm(question,db))

In [None]:
template = """Based on the table schema below, write a SQL query that would answer the user's question:
{schema}

Question: {question}
SQL Query:"""
prompt = ChatPromptTemplate.from_template(template)

In [None]:
type(prompt)

In [None]:
print(prompt)

In [None]:
# db = SQLDatabase.from_uri("sqlite:///Data/data.sqlite3")
db = SQLDatabase.from_uri("sqlite:///Data/acs.sqlite3")

In [None]:
result = db.run("SELECT * FROM acs LIMIT 5")
result

In [None]:
result: list[tuple] = ast.literal_eval(db.run("SELECT * FROM acs LIMIT 5"))
print(result)

In [None]:
def get_schema(_):
    return db.get_table_info()

In [None]:
def run_query(query):
    return db.run(query)

In [None]:
model = ChatOpenAI()

sql_response = (
    RunnablePassthrough.assign(schema=get_schema)
    | prompt
    | model.bind(stop=["\nSQLResult:"])
    | StrOutputParser()
)

In [None]:
sql_response.invoke({"question": "How many geographic areas are in the acs table?"})

In [None]:
print(get_schema(_))

In [None]:
type(sql_response)

In [None]:
sql_response

In [None]:
openai.api_key = Parameters.openai_api_key

def count_tokens(prompt):
    response = openai.Completion.create(
        engine="davinci",
        prompt=prompt,
        max_tokens=0,
        n=1,
        stop=None,
        temperature=0.0,
        logprobs=0,
        echo=True,
    )
    return len(response.choices[0].text.split())

prompt = "This is a prompt to count tokens."
print(count_tokens(prompt)) # Output: 6

In [None]:
prompt = get_schema(_)
print(count_tokens(prompt))

In [None]:

# setup llm
llm = ChatOpenAI()  # OpenAI(temperature=0, openai_api_key=Parameters.openai_api_key)

dburi = "sqlite:///Data/acs.sqlite3"
db = SQLDatabase.from_uri(dburi)

# Create db chain
QUERY = """
Given an input question: first create a syntactically correct SQLite query to run, then look at the results of the query and return the answer.

Use the following format:
Question: Question here
SQLQuery: SQL Query to run
SQLResult: Result of the SQLQuery
Answer: Final answer here

{question}
"""

# Setup the database chain
db_chain = SQLDatabaseChain.from_llm(llm=llm, db=db, verbose=True)


def get_prompt():
    print("Type 'exit' to quit")

    while True:
        prompt = input("Enter a prompt: ")

        if prompt.lower() == 'exit':
            print('Exiting...')
            break
        else:
            try:
                question = QUERY.format(question=prompt)
                print(db_chain.run(question))
            except Exception as e:
                print(e)

get_prompt()

In [None]:
template = """Based on the table schema below, write a SQL query that would answer the user's question:
{schema}

Question: {question}
SQL Query:"""
prompt = ChatPromptTemplate.from_template(template)

In [None]:
model = ChatOpenAI()

sql_response = (
    RunnablePassthrough.assign(schema=get_schema)
    | prompt
    | model.bind(stop=["\nSQLResult:"])
    | StrOutputParser()
)

In [None]:
sql_response.invoke({"question": "How many geographic areas are in the acs table?"})

In [None]:
# setup llm
llm = ChatOpenAI()  # OpenAI(temperature=0, openai_api_key=Parameters.openai_api_key)

dburi = "sqlite:///Data/acs.sqlite3"
db = SQLDatabase.from_uri(dburi)

# Create db chain
QUERY = """
Given an input question: first create a syntactically correct SQLite query to run, then look at the results of the query and return the answer.

Use the following format:
Question: Question here
SQLQuery: SQL Query to run
SQLResult: Result of the SQLQuery
Answer: Final answer here

{question}
"""

# Setup the database chain
db_chain = SQLDatabaseChain.from_llm(llm=llm, db=db, verbose=True)

print("Type 'exit' to quit")

prompt = input("Enter a prompt: ")

if prompt.lower() == 'exit':
    print('Exiting...')
else:
    try:
        sql_response.invoke({"question": prompt})
        question = sql_response  # prompt  # QUERY.format(question=prompt)
        print(db_chain.run(question))
    except Exception as e:
        print(e)

In [None]:
# How many nine year olds were in Kentucky in 2020

In [None]:
for k, v in db_chain.dict()["llm_chain"]["prompt"].items():
    print(k)
    print(v)
    print()