In [1]:
!pip install -q boto3
!pip install -q requests
!pip install -q requests-aws4auth
!pip install -q opensearch-py
!pip install -q tqdm

In [2]:
!pip install "faiss-cpu" --quiet
!pip install langchain --quiet
!pip install jq --quiet

In [3]:
import os
import sys
from langchain.document_loaders.json_loader import JSONLoader
from langchain.docstore.document import Document
import json
import re
from langchain.vectorstores import FAISS
from langchain.embeddings import BedrockEmbeddings
from functools import reduce
from langchain.prompts import PromptTemplate
from sqlalchemy import MetaData
from sqlalchemy import create_engine


import re
import pandas as pd
import numpy as np
import json
import sqlite3

data_path = './data'
with open(f'{data_path}/tables.json', 'rb') as ofp:
    meta = json.load(ofp)
data = meta[0]

data = [i for i in meta if i['db_id'] == 'department_store']

data  = data[0]    
columns = data["column_names_original"]
col_df = pd.DataFrame(columns).iloc[1:]
col_df.rename(columns={0: 'table_idx', 1: 'col_name'}, inplace=True)
col_df

types_df = pd.DataFrame(data["column_types"]).iloc[1:]
types_df.rename(columns={0: 'type'}, inplace=True)
types_df

merged_col = pd.concat([col_df, types_df], axis=1)

tables_df = pd.DataFrame(data["table_names_original"])
tables_df.reset_index(inplace=True)
tables_df.columns = ['table_idx', 'table_name']

meta = pd.merge(tables_df, merged_col, on=['table_idx'])
meta = meta.drop(columns=['table_idx'])

In [48]:
DB_NAME = "text2sql"
DB_FAISS_PATH = './vectorstore/db_faiss'

In [7]:
import boto3
from botocore.config import Config

bedrock_region = athena_region = boto3.session.Session().region_name
retry_config = Config(retries = {'max_attempts': 100})
session = boto3.Session(region_name=bedrock_region)
bedrock = session.client('bedrock-runtime', region_name=bedrock_region, config=retry_config)

In [68]:

def ask_llm(question):

    body = json.dumps({
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens": 1024,
                "temperature" : 0.1,
                "top_p": 0.5,
                "messages": [
                    {
                        "role": "user",
                        "content": [
                            {"type": "text", "text": question},
                        ],
                    }
                ],
            }) 

    response = bedrock.invoke_model(
        body=body, 
        modelId=bedrock_model_id,
        accept='application/json',
        contentType='application/json') #payload를 Bedrock으로 전송

    response_body = json.loads(response.get("body").read())
    llm_output = response_body.get("content")[0].get("text")
    return llm_output


In [69]:
# Create new docs with the right metadata we need for indexing
def create_docs_with_correct_metadata(documents):
    # We are going to return a list of new documents
    new_docs = []

    # For each document
    for doc in documents:
        # Get it's metadata and contents
        metadata = doc.metadata
        contents = json.loads(doc.page_content)

        # Now calculate the new metadata that we want to add
        new_metadata = {
            "tableName": contents["tableName"],
            "question": contents["question"],
            "tableSchema": contents["tableSchema"],
        }

        # Print out the new metadata for our documents
        # print(new_metadata)

        new_docs.append(
            Document(page_content=new_metadata["question"], metadata=new_metadata)
        )

    return new_docs

def load_json_file(filename):
    loader = JSONLoader(file_path=filename, jq_schema=".[]", text_content=False)

    # This is our internal Langchain document data structure
    docs = loader.load()
    return docs


In [94]:
# This function asks the LLM to inspect a table schema, generate some questions which could be answered
# by that schema, and then it stores those questions to file, loads them all into a single vectorDB
def add_new_table(schema, table_name, is_incremental, bedrock_embeddings):
    """
    :schema         :   
    :table_name     :
    :model_id       :
    :is_incremental :
    """
    print(f"Adding table {table_name} with schema {schema}")
    
    question = f"""
    \n\nHuman: 
    only return the a bulleted numbered list of unique and detailed questions that could be answered by this table called {table_name} with schema:
    {schema}.
    Instructions:
        Use natural language descriptions only.
        Do not use SQL.
        Produced a varied list of questions, but the questions should be unique and detailed.
        The questions should be in a format that is easy to understand and answer.
        Ask about as much of the information in the table as possible.
        You can ask about more than one aspect of the data at a time.
        Qustions should begin with, 'What', 'Which', 'How', 'When' or 'Can'. Use variable names. 
        The questions should use relevant buisness vocabularly and terminology only. 
        Do not use column names in your output - use relevant natural language descriptions only. 
        Do not output any numeric values.
        Output questions starting with bulleted numbered list. 
         
        \n Questions: 1.
        \n Assistant:
        """
       
    answer = ask_llm(question)
    os.makedirs('./data/rag', exist_ok=True)
    question_list_filename = f"./data/rag/questionList{table_name}.json"

    # # Get rid of anything before the 1.
    # if re.match(r"^[^\d+]\. ", answer) and re.search(r"\d+\. ", answer):
    #     answer = "1. " + answer.split("1. ")[1]
    # else:
    #     answer = "1. " + answer

    print(
        f"Writing questions to {question_list_filename}, with schema {schema}, with table name {table_name} and answer {answer}.\n\n"
    )

    write_questions_to_file(question_list_filename, table_name, schema, answer)

    docs = load_json_file(question_list_filename)
    docs = create_docs_with_correct_metadata(docs)
    new_questions = FAISS.from_documents(docs, bedrock_embeddings)
    db_exists = True if os.path.exists(f"{DB_FAISS_PATH}/index.faiss") else False
    # Add new tables
    if is_incremental and db_exists:
            question_db = FAISS.load_local(DB_FAISS_PATH, bedrock_embeddings, allow_dangerous_deserialization=True)
            question_db.merge_from(new_questions)
            question_db.save_local(DB_FAISS_PATH)

    # Load for the first time
    else:
        print(f"is_incremental set to {str(is_incremental)} and/or no vector db found. Creating...")
        new_questions.save_local(DB_FAISS_PATH)
        


In [95]:
def write_questions_to_file(question_list_filename, table_name, table_schema, answer):
    data_list = []
    question_list_obj = answer
    questions_list = question_list_obj.splitlines()
    # Open the file in write mode
    with open(question_list_filename, mode="w", newline="") as file:
        for question in questions_list:

            # Skip if it doesn't really have a question
            if "?" not in question:
                continue

            questionSplit = re.split(r"\d{1,5}.||. ||- ", question, maxsplit=1)
            question = questionSplit[1]
            data = {
                "tableName": table_name,
                "question": question,
                "tableSchema": table_schema.lstrip(" "),
            }
            data_list.append(data)

        json.dump(data_list, file)

In [96]:

from langchain.vectorstores import FAISS
from langchain.embeddings import BedrockEmbeddings
from functools import reduce
bedrock_embeddings = BedrockEmbeddings(client=bedrock)


In [97]:
new_meta = meta.groupby(['table_name'])['col_name'].apply(list).reset_index()
new_meta = new_meta.set_index('table_name')

tpc_ds = []
for idx, row in new_meta.iterrows():
    v = (('|').join(row.values[0]), idx)
    print(v)
    tpc_ds.append(v)

('address_id|address_details', 'Addresses')
('customer_id|address_id|date_from|date_to', 'Customer_Addresses')
('order_id|customer_id|order_status_code|order_date', 'Customer_Orders')
('customer_id|payment_method_code|customer_code|customer_name|customer_address|customer_phone|customer_email', 'Customers')
('dept_store_chain_id|dept_store_chain_name', 'Department_Store_Chain')
('dept_store_id|dept_store_chain_id|store_name|store_address|store_phone|store_email', 'Department_Stores')
('department_id|dept_store_id|department_name', 'Departments')
('order_item_id|order_id|product_id', 'Order_Items')
('product_id|supplier_id|date_supplied_from|date_supplied_to|total_amount_purchased|total_value_purchased', 'Product_Suppliers')
('product_id|product_type_code|product_name|product_price', 'Products')
('staff_id|staff_gender|staff_name', 'Staff')
('staff_id|department_id|date_assigned_from|job_title_code|date_assigned_to', 'Staff_Department_Assignments')
('supplier_id|address_id|date_from|date

In [98]:
for x in tpc_ds:
    print(x)
    add_new_table(
        schema=x[0], 
        table_name=x[1],
        is_incremental=True, 
        bedrock_embeddings=bedrock_embeddings
    )
    print('-------------------')


('address_id|address_details', 'Addresses')
Adding table Addresses with schema address_id|address_details
Writing questions to ./data/rag/questionListAddresses.json, with schema address_id|address_details, with table name Addresses and answer • 1. What is the total number of unique addresses in the system?

• 2. Which addresses have specific details or characteristics (e.g., residential, commercial, located in a particular city or region)?

• 3. How many addresses are associated with a particular customer or entity?

• 4. Can you provide a list of addresses that meet certain criteria (e.g., within a specific zip code range, containing a specific keyword in the address details)?

• 5. What is the distribution of addresses across different geographic regions or areas?

• 6. Which addresses have been recently added or updated in the system?

• 7. How can I identify duplicate or potentially redundant address entries?

• 8. Can you provide a summary of address details for a specific subset 

### Search

In [83]:
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth

def get_cfn_outputs(stackname, cfn):
    outputs = {}
    for output in cfn.describe_stacks(StackName=stackname)["Stacks"][0]["Outputs"]:
        outputs[output["OutputKey"]] = output["OutputValue"]
    return outputs

region_name = "us-west-2"

cfn = boto3.client("cloudformation", region_name)
kms = boto3.client("secretsmanager", region_name)

stackname = "opensearch-workshop"
cfn_outputs = get_cfn_outputs(stackname, cfn)

aos_credentials = json.loads(
    kms.get_secret_value(SecretId=cfn_outputs["OpenSearchSecret"])["SecretString"]
)

aos_host = cfn_outputs["OpenSearchDomainEndpoint"]
aos_host

auth = (aos_credentials["username"], aos_credentials["password"])

aos_client = OpenSearch(
    hosts=[{"host": aos_host, "port": 443}],
    http_auth=auth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
)

In [84]:
pipeline = {
    "description": "Text to Sql Task - OpenSearch-cohere-060124084807",
    "processors": [
        {
            "text_embedding": {
                "model_id": model_id,
                "field_map": {
                    "text": "vector_field",
                },
            }
        }
    ],
}

pipeline_id = "text2sql_meta_data"
# aos_client.ingest.delete_pipeline(id=pipeline_id)
aos_client.ingest.put_pipeline(id=pipeline_id, body=pipeline)

{'acknowledged': True}

In [85]:
import requests

search_model = {"query": {"match": {"name": "OpenSearch-Cohere"}}, "size": 10}

response = requests.get(
    "https://" + aos_host + "/_plugins/_ml/models/_search", auth=auth, json=search_model
)
model_info = json.loads(response.text)
model_id = model_info["hits"]["hits"][0]["_id"]
model_id

'DTDeTJAB3Hj2edbFglKU'

In [86]:
pipeline = {
    "description": "Text to Sql Task - OpenSearch-cohere-060124084807",
    "processors": [
        {
            "text_embedding": {
                "model_id": model_id,
                "field_map": {
                    "text": "vector_field",
                },
            }
        }
    ],
}

pipeline_id = "text2sql_meta_data"
# aos_client.ingest.delete_pipeline(id=pipeline_id)
aos_client.ingest.put_pipeline(id=pipeline_id, body=pipeline)

{'acknowledged': True}

In [117]:
index_name = "rag_semantic_ver4"

# aos_client.indices.delete(index=index_name)

rag_semantic = {
    "settings": {
        "max_result_window": 15000,
        "analysis": {"analyzer": {"analysis-nori": {"type": "nori", "stopwords": "_korean_"}}},
        "index.knn": True,
        "default_pipeline": pipeline_id,
        "index.knn.space_type": "l2",
    },
    "mappings": {
        "properties": {
            "tableName": {
                "type": "text",
            },
            "question": {
                "type": "text",
            },
            "tableSchema": {
                "type": "text",
                "fields": {"keyword": {"type": "keyword", "ignore_above": 256}},
            },
            "vector_field": {
                "type": "knn_vector",
                "dimension": 1024,
                "method": {"name": "hnsw", "space_type": "l2", "engine": "faiss"},
                "store": True,
            },

        }
    },
}
aos_client.indices.create(index=index_name, body=rag_semantic)

{'acknowledged': True,
 'shards_acknowledged': True,
 'index': 'rag_semantic_ver4'}

In [118]:
from tqdm import tqdm
from opensearchpy import helpers

def _generate_data():
    for doc in docs:
        yield {"_index": index_name, "_source": doc}

succeeded = []
failed = []

json_files = os.listdir('./data/rag')
for p in json_files:
    with open(f"./data/rag/{p}", 'rb') as ofp:
        docs = json.load(ofp)

    for success, item in helpers.parallel_bulk(
        aos_client, actions=_generate_data(), chunk_size=10, thread_count=1, queue_size=1
    ):
        if success:
            succeeded.append(item)
        else:
            failed.append(item)

In [119]:
# Refresh the index to make the changes visible
aos_client.indices.refresh(index=index_name)

count = aos_client.count(index=index_name)
print(count)

{'count': 140, '_shards': {'total': 5, 'successful': 5, 'skipped': 0, 'failed': 0}}


In [120]:
def keyword_search(query_text):
    query = {
        "size": 10,
        "_source": {"excludes": ["text", "vector_field"]},
        "query": {
            "multi_match": {
                "query": query_text,
                "fields": ["tableName", "question", "tableSchema"],
            }
        },
    }

    res = aos_client.search(index=index_name, body=query)

    query_result = []
    for hit in res["hits"]["hits"]:
        row = [
            hit["_score"],
            hit["_source"]["tableName"],
            hit["_source"]["question"],
            hit["_source"]["tableSchema"],            
        ]
        query_result.append(row)

    query_result_df = pd.DataFrame(
        data=query_result, columns=["_score", "tableName", "question", "tableSchema"]
    )
    display(query_result_df)

In [121]:
def semantic_search(query_text):
    query = {
        "size": 10,
        "_source": {"excludes": ["text", "vector_field"]},
        "query": {
            "neural": {"vector_field": {"query_text": query_text, "model_id": model_id, "k": 10}},
        },
    }

    res = aos_client.search(index=index_name, body=query)

    query_result = []
    for hit in res["hits"]["hits"]:
        row = [
            hit["_score"],
            hit["_source"]["tableName"],
            hit["_source"]["question"],
            hit["_source"]["tableSchema"],            
        ]
        query_result.append(row)

    query_result_df = pd.DataFrame(
        data=query_result, columns=["_score", "tableName", "question", "tableSchema"]
    )
    display(query_result_df)

In [122]:
query_text =  "customer email"
keyword_search(query_text)

Unnamed: 0,_score,tableName,question,tableSchema
0,4.621939,Customers,What is the relationship between customer cod...,customer_id|payment_method_code|customer_code|...
1,2.99143,Department_Stores,Can customers reach out to a department store...,dept_store_id|dept_store_chain_id|store_name|s...
2,2.698285,Customers,What are the different types of customer code...,customer_id|payment_method_code|customer_code|...
3,2.661865,Customers,Which customers have provided both phone numb...,customer_id|payment_method_code|customer_code|...
4,2.353212,Customers,How can customers be categorized based on the...,customer_id|payment_method_code|customer_code|...
5,2.215492,Customer_Orders,What is the total number of orders placed by ...,order_id|customer_id|order_status_code|order_date
6,2.151422,Customer_Addresses,What is the distribution of address changes a...,customer_id|address_id|date_from|date_to
7,2.143315,Addresses,• 3. How many addresses are associated with a ...,address_id|address_details
8,2.143315,Department_Stores,How can a customer contact a specific departm...,dept_store_id|dept_store_chain_id|store_name|s...
9,1.896213,Customer_Orders,Can you determine the average time between or...,order_id|customer_id|order_status_code|order_date


In [123]:
semantic_search(query_text)
# print(index_name)

Unnamed: 0,_score,tableName,question,tableSchema


In [50]:
question_db = FAISS.load_local(DB_FAISS_PATH, bedrock_embeddings, allow_dangerous_deserialization=True)


In [58]:
query = "고객 이메일 주소가 적제된 테이블을 알려줘"


In [59]:
schema =  {}

results_with_scores = question_db.similarity_search_with_score(query)
for doc, score in results_with_scores:
    print(doc.metadata['question'])
    schema[doc.metadata['tableName']] = doc.metadata['tableSchema']

 Can customers reach out to a department store via email, and if so, what is the email address?
 How can customers be categorized based on their payment method and customer code?
 How can customers be grouped based on their location or address?
 Which customers have provided their contact information?


In [60]:
schema

{'Department_Stores': 'dept_store_id|dept_store_chain_id|store_name|store_address|store_phone|store_email',
 'Customers': 'customer_id|payment_method_code|customer_code|customer_name|customer_address|customer_phone|customer_email'}