In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [2]:
!pip install langchain[all]

Collecting langchain[all]
  Downloading langchain-0.0.222-py3-none-any.whl (1.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m15.3 MB/s[0m eta [36m0:00:00[0m00:01[0m0:01[0m
Collecting langchainplus-sdk>=0.0.17 (from langchain[all])
  Downloading langchainplus_sdk-0.0.20-py3-none-any.whl (25 kB)
Collecting openapi-schema-pydantic<2.0,>=1.2 (from langchain[all])
  Downloading openapi_schema_pydantic-1.2.4-py3-none-any.whl (90 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m90.0/90.0 kB[0m [31m7.4 MB/s[0m eta [36m0:00:00[0m
Collecting O365<3.0.0,>=2.0.26 (from langchain[all])
  Downloading O365-2.0.27-py3-none-any.whl (164 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m164.2/164.2 kB[0m [31m12.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting aleph-alpha-client<3.0.0,>=2.15.0 (from langchain[all])
  Downloading aleph_alpha_client-2.17.0-py3-none-any.whl (41 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━

In [3]:
!pip install clickhouse-connect

[0m

In [7]:
import clickhouse_connect

In [102]:
class ClickhouseConnector:
    
    def __init__(self, host: str, username: str, password: str, port: int = 8443) -> None:
        
        self.__host = host
        self.__username = username
        self.__password = password
        self.__port = port
    
    
    def create_connection(self): 
    
        """
        Create a new ClickHouse connection.
        """
    
        client = clickhouse_connect.get_client(host=self.__host, port=self.__port, username=self.__username, password=self.__password)
    
        return client

In [106]:
class PromptSchemaTransformer:
    
    def __init__(self, client) -> None: 
        
        self.__client = client
        
        
    def __get_database_schema(self) -> pd.DataFrame:
    
        """
        Get all tables, their columns and column data types in the database.
        """
        
        sql_query_db_schema = """
        SELECT DISTINCT 
            TABLE_NAME, 
            column_name, 
            data_type
        FROM 
            INFORMATION_SCHEMA.COLUMNS
        WHERE 
            table_catalog = 'default'
            AND TABLE_NAME != 'statistics'
        """
    
        df_db_schema = self.__client.query_df(sql_query_db_schema)
        return df_db_schema
    
    
    def __create_db_schema_for_prompt(self, df_db_schema: pd.DataFrame) -> str:
    
        """
        Create DB schema description in text format for prompt engineering. 
        The DB schema description is created in the following format: 
        Table: {TABLE_NAME}
        Columns and data types: 
        {Column_Name1}: {Data_Type1}
        {Column_Name2}: {Data_Type2}
        """
        
        db_schema_description = """"""

        tables = list(df_db_schema['TABLE_NAME'].unique())
        for table in tables: 

            table_name_description = f'Table: {table}'
            columns_and_datatypes_line = 'Columns and data types:'
            table_columns = list(df_db_schema.loc[df_db_schema['TABLE_NAME'] == table, 'column_name'].unique())
            table_column_datatypes = list(df_db_schema.loc[df_db_schema['TABLE_NAME'] == table, 'data_type'].unique())
            table_column_descriptions = [f'{column}: {data_type}' for column, data_type in list(zip(table_columns, table_column_datatypes))]

            db_schema_description += """\n""".join([table_name_description, columns_and_datatypes_line] + table_column_descriptions + [''])

        return db_schema_description
    
    
    def transform_db_schema_for_prompt(self) -> str:
        
        """
        Transform the Clickhouse DB schema to a view
        suitable for the prompt.
        """
        
        df_db_schema = self.__get_database_schema()
        db_schema_for_prompt = self.__create_db_schema_for_prompt(df_db_schema)
        
        return db_schema_for_prompt

In [107]:
class Text2Sql:
    
    def __init__(self, model) -> None:
        
        self.__llm = model
    
    def __create_prompt_template(self) -> str:
        
        """
        Create a prompt template
        according to which we will conduct the prompt engineering.
        """
        
        template = """Create a SQL query based on the following question:  {question}

        Database schema:
        {db_schema}

        Answer: Provide only the query as a multiline string as the output."""

        prompt_template = PromptTemplate(template=template, input_variables=["question", "db_schema"])

        return prompt_template
   

    def __create_llm_chain(self, prompt_template: str, llm) -> LLMChain:
        
        """
        Create LLM chain taking pre-created prompt template and initialized LLM as inputs.
        """

        llm_chain = LLMChain(prompt=prompt_template, llm=llm)

        return llm_chain


    def __create_sql_by_question(self, llm_chain: LLMChain, question: str, db_schema: str) -> str: 
        
        """
        Internal method creating a SQL by natural-language question and DB schema definition.
        """
        
        sql = llm_chain.run({'question': question, 'db_schema': db_schema})
        return sql
        
    
    def get_sql_by_question_and_schema(self, question: str, db_schema: str) -> str:
        
        """
        This function should be called externally to create an SQL query 
        based on natural-language question and pre-defined DB schema.
        """
        
        prompt_template = self.__create_prompt_template()
        llm_chain = self.__create_llm_chain(prompt_template, self.__llm)
        sql = self.__create_sql_by_question(llm_chain, question=question, db_schema=db_schema)
        
        return sql

In [109]:
OPENAI_API_KEY = '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'

os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY

host='pjhlc09pbp.us-east-1.aws.clickhouse.cloud'
username='default'
password='995ubDA~PHRHp'
port = 8443

connector = ClickhouseConnector(host, username, password)
client = connector.create_connection()

pst = PromptSchemaTransformer(client)
db_schema = pst.transform_db_schema_for_prompt()

params = {'temperature': 0}
model = OpenAI(**params)
text2sql = Text2Sql(model)

question = 'When did we get the highest number of users per day in Q1 2023?'

sql = text2sql.get_sql_by_question_and_schema(question, db_schema)

In [None]:
df = client.query_df(sql)