In [1]:
import json
import openai
import requests
from tenacity import retry, wait_random_exponential, stop_after_attempt
from termcolor import colored

import constants
import os
import sqlite3

from db import DBA

# conn = sqlite3.connect("data/chinook.db")

print("Opened database successfully")

os.environ["OPENAI_API_KEY"] = constants.APIKEY
GPT_MODEL = "gpt-3.5-turbo-0613"

Opened database successfully


@retry(wait=wait_random_exponential(multiplier=1, max=40), stop=stop_after_attempt(3))
def chat_completion_request(messages, functions=None, function_call=None, model=GPT_MODEL):
    headers = {
        "Content-Type": "application/json",
        "Authorization": "Bearer " + os.environ["OPENAI_API_KEY"],
    }
    json_data = {"model": model, "messages": messages}
    if functions is not None:
        json_data.update({"functions": functions})
    if function_call is not None:
        json_data.update({"function_call": function_call})
    try:
        response = requests.post(
            "https://api.openai.com/v1/chat/completions",
            headers=headers,
            json=json_data,
        )
        return response
    except Exception as e:
        print("Unable to generate ChatCompletion response")
        print(f"Exception: {e}")
        return e
    
def pretty_print_conversation(messages):
    role_to_color = {
        "system": "red",
        "user": "green",
        "assistant": "blue",
        "function": "magenta",
    }
    
    for message in messages:
        if message["role"] == "system":
            print(colored(f"system: {message['content']}\n", role_to_color[message["role"]]))
        elif message["role"] == "user":
            print(colored(f"user: {message['content']}\n", role_to_color[message["role"]]))
        elif message["role"] == "assistant" and message.get("function_call"):
            print(colored(f"assistant: {message['function_call']}\n", role_to_color[message["role"]]))
        elif message["role"] == "assistant" and not message.get("function_call"):
            print(colored(f"assistant: {message['content']}\n", role_to_color[message["role"]]))
        elif message["role"] == "function":
            print(colored(f"function ({message['name']}): {message['content']}\n", role_to_color[message["role"]]))

In [2]:
def get_table_names(conn):
    """Return a list of table names."""
    table_names = []
    ret, tables = conn.execute(query="SELECT table_name FROM information_schema.tables WHERE table_schema = 'public';")

    if not ret:
        raise Exception("Unable to get table names.")

    for table in tables:
        table_names.append(table['table_name'])
    return table_names


def get_column_names(conn, table_name):
    """Return a list of column names."""
    column_names = []
    ret, columns = conn.execute(query=f"SELECT column_name FROM information_schema.columns WHERE table_name = '{table_name}';")

    if not ret:
        return []
        # raise Exception(f"Unable to get column names for table {table_name}.")

    for col in columns:
        column_names.append(col['column_name'])
    return column_names

def get_table_comments(conn, table_name):
    """Retrieve table comments"""
    ret, comments = conn.execute(query=f"SELECT obj_description('{table_name}'::regclass);")

    if not ret:
        return ""
        # raise Exception(f"Unable to get column names for table {table_name}.")

    return comments[0]['obj_description']

def get_column_comments(conn, table_name):
    """Retrieve column comments"""
    ret, comments = conn.execute(query=f"""
                                 SELECT column_name, col_description('{table_name}'::regclass, ordinal_position) as column_comment
                                 FROM information_schema.columns
                                 WHERE table_name = '{table_name}'
                                 """)

    if not ret:
        return ""
        # raise Exception(f"Unable to get column names for table {table_name}.")

    return comments

def get_database_info(_):
    with DBA() as conn:
        """Return a list of dicts containing the table name and columns for each table in the database."""
        table_dicts = []
        for table_name in get_table_names(conn):
            columns_names = get_column_comments(conn, table_name)
            table_description = get_table_comments(conn, table_name)

            table_dicts.append({"table_name": table_name, "table_description": table_description, "columns": columns_names})
        return table_dicts
    
def database_run(query):
    with DBA() as conn:
        ret, results = conn.execute(query=query)
        if not ret:
            return "Error: " + results
        return results

In [3]:
with DBA() as conn:
    print(get_table_comments(conn, 'cdr'))
    print(get_column_comments(conn, 'cdr'))

Tabla que almacena información de transacciones de llamadas
[{'column_name': 'cdr_msisdn', 'column_comment': 'Número de teléfono del usuario'}, {'column_name': 'cdr_imsi', 'column_comment': 'Número de identificación internacional de abonado móvil'}, {'column_name': 'cdr_imei', 'column_comment': 'Número de identificación internacional de equipo móvil'}, {'column_name': 'cdr_start_time', 'column_comment': 'Fecha y hora de inicio de la transacción'}, {'column_name': 'cdr_end_time', 'column_comment': 'Fecha y hora de finalización de la transacción'}, {'column_name': 'cdr_interface', 'column_comment': 'Interfaz utilizada para la transacción'}, {'column_name': 'cdr_transaction_type', 'column_comment': 'Tipo de transacción realizada'}, {'column_name': 'cdr_transaction_status', 'column_comment': 'Estado de la transacción'}]


In [10]:
from langchain.prompts import ChatPromptTemplate
from langchain.chat_models import ChatOpenAI
from langchain.schema.output_parser import StrOutputParser
from langchain.schema.runnable import RunnablePassthrough

template = """Basado en el siguiente esquema de base de datos PostrgeSQL, escriba una consulta PostrgeSQL para responder a la siguiente pregunta:
{schema}
Pregunta: {question}
Consulta SQL:"""
prompt_query = ChatPromptTemplate.from_template(template)
model = ChatOpenAI()
output_parser = StrOutputParser()

runnable = RunnablePassthrough().assign(schema=get_database_info)
sql_query = (runnable | prompt_query | model | output_parser)

# question = """
#     Genera atributo descriptivo tipo comentario para la tabla cdr y también sus columnas. Corresponde a un problema de saturación de la red, las columnas son: 
#     MSISDN (numero teléfono), IMSI, IMEI, comienzo transacción, término transacción, interfaz, tipo de transacción, estado transacción. La base de datos es PostgreSQL.
#     """

question = """
    Calcula la diferencia de tiempo entre el inicio y término de cada transacción, para el problema de transacciones de llamadas
    """
sql_query.invoke({"question": question})

'SELECT cdr_start_time, cdr_end_time, cdr_end_time - cdr_start_time AS tiempo_transaccion\nFROM cdr'

In [12]:
template = """Basado en el siguiente esquema de base de datos PostrgeSQL, escriba una consulta PostrgeSQL para responder a la siguiente pregunta:
{schema}

Pregunta: {question}
Consulta SQL: {query}
Respuesta: {response}"""
prompt_response = ChatPromptTemplate.from_template(template)

full_chain = (
    RunnablePassthrough.assign(query=sql_query)
    | RunnablePassthrough.assign(
        schema=get_database_info,
        response=lambda x: database_run(x["query"]),
    )
    | prompt_response
    | model
    | output_parser
)

question = """
    Calcula la diferencia de tiempo entre el inicio y término de cada transacción, para el problema de transacciones de llamadas
    """
full_chain.invoke({"question": question})

'La consulta SQL que has escrito presenta un error debido a que estás intentando restar dos valores de tipo character varying (cdr_end_time y cdr_start_time), lo cual no está permitido en PostgreSQL.\n\nPara calcular la diferencia de tiempo entre el inicio y término de cada transacción, debes convertir los valores de cdr_end_time y cdr_start_time a un tipo de dato compatible para realizar la resta. Suponiendo que estos campos son de tipo timestamp, puedes utilizar la función de PostgreSQL llamada extract para obtener la diferencia en segundos entre ambos tiempos. Aquí te dejo la consulta corregida:\n\nSELECT cdr_msisdn, cdr_start_time, cdr_end_time, EXTRACT(EPOCH FROM cdr_end_time - cdr_start_time) AS transaction_duration\nFROM cdr'

In [19]:
full_chain