# Install dependencies

1. Give service account below permission: \
AI platform developer \
Cloud Translation API User

2. Create a key for service account as well

In [18]:
! pip install langchain==0.0.340 --quiet # Agent
# ! pip install langsmith==0.1.84 --quiet
!pip install -qU langchain-google-vertexai # gemini pro

! pip install sentence-transformers --quiet # cross encoder

! pip install numexpr --quiet # Math tool

! pip install chromadb==0.4.13 --quiet # chroma vectore store

! pip install google-cloud-bigquery[pandas] --quiet # BQ
! pip install --upgrade google-cloud-aiplatform --quiet # vertex AI
! pip install google-cloud-translate==2.0.1 --quiet # Translation
! pip install google-auth==2.15.0 --quiet
! pip install google-cloud-core==1.7.3 --quiet

! pip install fuzzywuzzy --quiet # Fuzzy search for translator

! pip install nltk --quiet # clean data for entity recognition

! pip install sqlalchemy psycopg2-binary pandas --quiet # Postgres

!pip install pyjwt # decode user token

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
langchain-core 0.3.5 requires langsmith<0.2.0,>=0.1.125, but you have langsmith 0.0.92 which is incompatible.
langchain-google-vertexai 1.0.10 requires langchain-core<0.3,>=0.2.33, but you have langchain-core 0.3.5 which is incompatible.[0m[31m
[0m[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
google-api-python-client 1.8.0 requires google-api-core<2dev,>=1.13.0, but you have google-api-core 2.19.2 which is incompatible.
langchain 0.0.340 requires langsmith<0.1.0,>=0.0.63, but you have langsmith 0.1.126 which is incompatible.[0m[31m
[0m[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the sourc

# Logging

In [None]:
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Vertex configuration

In [2]:
VERTEX_PROJECT = "project-name"
VERTEX_REGION = "us-central1"

# Postgres configuration

In [3]:
import psycopg2
from psycopg2 import sql, OperationalError, DatabaseError

POSTGRES_CONN_PARAMS = {
    "database" : 'databasename',
    "user" : 'username',
    "password" : 'pa55w0rd',
    "host" : '00.000.00.000',
    "port" : 0000,
    "schema" : 'service'
}

def connect_db(database = POSTGRES_CONN_PARAMS['database'], user = POSTGRES_CONN_PARAMS['user'], password = POSTGRES_CONN_PARAMS['password'], host = POSTGRES_CONN_PARAMS['host'], port = POSTGRES_CONN_PARAMS['port']):
    try:
        logging.info(f"connecting to the database: {database}")
        logging.debug(f"database: {database}")
        logging.debug(f"host: {host}")
        logging.debug(f"port: {port}")

        conn_string = f"dbname={database} user={user} password={password} host={host} port={port}"
        conn = psycopg2.connect(conn_string)
        logging.info(f"Successfully connected to the database: {database}")
        return conn
    except psycopg2.DatabaseError as e:
        logging.error(f"Error connecting to database: {e}")
        raise
    except Exception as e:
        logging.error(f"Unexpected error: {e}")
        raise

# Authentication

In [4]:
import vertexai
import os

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "keys.json"
vertexai.init(project=VERTEX_PROJECT, location=VERTEX_REGION)


# Models

In [5]:
from langchain.chat_models import ChatVertexAI
from langchain.llms import VertexAI

CODE_BISON = ChatVertexAI(model_name="codechat-bison", max_output_tokens=1024)
INTERPRETOR_MODEL = ChatVertexAI(max_output_tokens=1024)
AGENT_MODEL = ChatVertexAI(max_output_tokens=1024)
CHAT_BISON = ChatVertexAI(model_name="chat-bison@002", max_output_tokens=1024)

TEXT_BISON = VertexAI( model_name="text-bison@002", max_output_tokens=1024)

In [None]:
from langchain_google_vertexai import VertexAI
GEMINI_PRO = VertexAI(model_name="gemini-pro")

# from langchain_google_vertexai import ChatVertexAI as GoogleChatVertexAI
# gemini_model = GoogleChatVertexAI(model_name="gemini-1.5-pro-001", max_output_tokens=1024) 


# Docs

### Descriptions

In [None]:
TABLE_DESCRIPTIONS = {
    "table_name1": "description",
    "table_name2": "description"
    }

In [None]:

COLUMN_DESCRIPTIONS = {"table_name1":
    {"column1": "Unique identifier for the Aramco station.",
    "column2": "Identifier for the tenant, referencing the tenants table.",
    },
    "table_name2":
    {"column1": "Unique identifier for the bill.",
    "column2": "Identifier for the type of bill.",
    },
    }

## Table schema docs 

In [None]:

def fetch_table_names(cursor, schema):
    """Fetch all table names in the given schema."""
    logging.info(f"Fetching table names from {schema}.")

    table_query = """
        SELECT table_name 
        FROM information_schema.tables 
        WHERE table_schema = %s
    """
    try:
        cursor.execute(table_query, (schema,))
        logging.info(f"Fetched table names from {schema}.")

        return cursor.fetchall()
    
    except psycopg2.Error as e:
        logging.error(f"Error fetching table names: {e}")
        raise

In [None]:
def fetch_column_details(cursor, schema, table_name):
    """Fetch details of all columns in the specified table."""
    logging.info(f"Fetching column deatils from {schema}, {table_name}.")

    column_query = """
    SELECT
        c.column_name,
        c.data_type,
        tc.constraint_type,
        ccu.table_name AS foreign_table_name,
        ccu.column_name AS foreign_column_name
    FROM
        information_schema.columns AS c
    LEFT JOIN
        information_schema.key_column_usage AS kcu
        ON c.table_name = kcu.table_name
        AND c.column_name = kcu.column_name
        AND c.table_schema = kcu.table_schema
    LEFT JOIN
        information_schema.table_constraints AS tc
        ON kcu.constraint_name = tc.constraint_name
        AND kcu.table_name = tc.table_name
        AND kcu.table_schema = tc.table_schema
    LEFT JOIN
        information_schema.referential_constraints AS rc
        ON kcu.constraint_name = rc.constraint_name
    LEFT JOIN
        information_schema.constraint_column_usage AS ccu
        ON rc.unique_constraint_name = ccu.constraint_name
    WHERE
        c.table_schema = %s
        AND c.table_name = %s
    ORDER BY
        c.ordinal_position;
    """
    try:
        cursor.execute(column_query, (schema, table_name))
        return cursor.fetchall()
    except psycopg2.Error as e:
        logging.error(f"Error fetching column details for table {table_name}: {e}")
        raise

In [None]:
TABLES_USED = ["table_name1", "table_name2"]

def get_parent_child_table_schemas(database, user, password, host, port, schema):
    """Fetch and organize parent child docs.
    parent - table schemas 
    child - column schema
    """
    logging.info(f"Fetching table schema from databse {database}.")
    logging.debug(f"database: {database}")
    logging.debug(f"host: {host}")
    logging.debug(f"port: {port}")

    child_docs = []
    parent_docs = []

    # Initialize connection to the database
    conn = None
    try:
        conn = connect_db(database, user, password, host, port)
        cursor = conn.cursor()

        # Fetch all table names in the schema
        tables = fetch_table_names(cursor, schema)

        for table in tables:
            table_name = table[0]
            if table_name in TABLES_USED:
                # Fetch column details for each table
                columns = fetch_column_details(cursor, schema, table_name)

                schema_fields = []
                column_description = ""

                for col in columns:
                    # Placeholder for column descriptions (if available)
                    if table_name in COLUMN_DESCRIPTIONS and col[0] in COLUMN_DESCRIPTIONS[table_name]:
                        column_description = COLUMN_DESCRIPTIONS[table_name][col[0]]

                    col_info = {
                        "column_name": col[0],
                        "data_type": col[1],
                        "constraint_type": col[2],
                        "foreign_table_name": col[3],
                        "foreign_column_name": col[4],
                        "column_description": column_description
                    }
                    schema_fields.append(col_info)

                    col_info_col = {
                        "column_name": col[0],
                        "column_description": column_description
                    }
                    child_doc = {
                        "table_name": table_name,
                        "schema": schema,
                        "column_details": col_info_col
                    }
                    child_docs.append(child_doc)

                # Placeholder for table descriptions (if available)
                table_description = ""
                if table_name in TABLE_DESCRIPTIONS:
                    table_description = TABLE_DESCRIPTIONS[table_name]

                parent_doc = {
                    "table_name": table_name,
                    "schema": schema,
                    "table_description": table_description,
                    "table_details": schema_fields
                }
                parent_docs.append(parent_doc)

    except Exception as e:
        logging.error(f"An error occurred in get_parent_child_table_schemas : {e}")
        raise
    finally:
        # Ensure that the connection is closed
        if conn:
            conn.close()
            logging.info("Database connection closed.")

    return parent_docs, child_docs

In [None]:
parent_docs, child_docs =get_parent_child_table_schemas(database = POSTGRES_CONN_PARAMS['database'], user = POSTGRES_CONN_PARAMS['user'], password = POSTGRES_CONN_PARAMS['password'], host = POSTGRES_CONN_PARAMS['host'], port = POSTGRES_CONN_PARAMS['port'], schema= POSTGRES_CONN_PARAMS['schema'])


## Entity Docs

In [None]:
###########################################################################
# Get all columns of type string and 3 sample data as context #######
###########################################################################
def get_string_column_with_sample(conn, table_name):
    string_columns_with_samples = []

    try:
        with conn.cursor() as cursor:
            # Get the columns of the specified table
            cursor.execute(sql.SQL("SELECT column_name, data_type FROM information_schema.columns WHERE table_name = %s"), [table_name])
            columns = cursor.fetchall()

            for column_name, data_type in columns:
                if data_type == "character varying" or data_type == "text":
                    # Query to get 3 samples from the column
                    query = sql.SQL("SELECT {column} FROM {table} LIMIT 3").format(
                        column=sql.Identifier(column_name),
                        table=sql.Identifier(table_name)
                    )
                    cursor.execute(query)
                    samples = [row[0] for row in cursor.fetchall()]

                    string_columns_with_samples.append({
                        "table": table_name,
                        "column": column_name,
                        "samples": samples
                    })
    except Exception as e:
        logging.error(f"Error retrieving string columns from table {table_name}: {e}")

    return string_columns_with_samples

def identify_proper_noun_columns(column_samples, llm):
    formatted_columns = []
    for col in column_samples:
        if col['samples'] is None:
            samples_str = ""  # Handle NoneType gracefully
        else:
            # Filter out NoneType values and convert to strings if necessary
            samples = [str(s) for s in col['samples'] if s is not None]
            samples_str = ', '.join(samples)
        formatted_columns.append(f"Column: {col['column']}, Samples: {samples_str}")
    
    formatted_columns_str = "\n".join(formatted_columns)

    prompt = PromptTemplate.from_template(
        """Given the list of columns in a table with their sample data, return all of the columns that likely contain proper nouns (names, locations, etc.).
        columns and samples:
        {columns}
        result (just the column names, separated by commas):"""
    )
    
    output_parser = CommaSeparatedListOutputParser()
    chain = LLMChain(prompt=prompt, llm=llm, output_parser=output_parser)

    noun_columns = chain.invoke({"columns": formatted_columns_str})
    noun_columns_list = noun_columns['text'].strip().split(", ")  # Split into list

    return noun_columns_list

###########################################################################
# Get all noun data that the LLM can refer to for making better decisions
###########################################################################
def get_all_noun_data(llm):
    conn = connect_db()  # Connect to the PostgreSQL database
    all_data = set()
    all_noun_map = []  # Initialize as a list

    try:
        with conn.cursor() as cursor:
            cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = %s", [POSTGRES_CONN_PARAMS['schema']])
            tables = fetch_table_names(cursor, POSTGRES_CONN_PARAMS['schema'])

            for table in tables:
                table_name = table[0]
                string_columns_with_samples = get_string_column_with_sample(conn, table_name)

                noun_columns_list = identify_proper_noun_columns(string_columns_with_samples, llm)

                for noun_column in noun_columns_list:
                    # Retrieve actual noun data from the table
                    query = sql.SQL("SELECT {column} FROM {table}").format(
                        column=sql.Identifier(noun_column),
                        table=sql.Identifier(table_name)
                    )
                    cursor.execute(query)
                    results = cursor.fetchall()

                    for row in results:
                        column_value = row[0]
                        if column_value is not None:
                            all_data.add(column_value)
                            all_noun_map.append({column_value: {"table": table_name, "column": noun_column}})  # Append to list as a dict

    except Exception as e:
        logging.error(f"Error retrieving data: {e}")

    finally:
        conn.close()  # Close the database connection

    return list(all_data), all_noun_map

In [None]:
all_noun_data, noun_maps = get_all_noun_data(TEXT_BISON)

## Translated Docs

In [None]:
from google.cloud import translate_v2 as translate

######################
"""
Basic Translator 
1. For getting initial translation of the user query 
2. For getting Entity translations for fuzzy and vector search # NEEDS human verification
above will be provided as context to LLM in advanced translator

IP:
1. Text to be translated 
2. Taregt language to be translate to

OP:
Translated text

"""
######################
def translate_text(text: str, tgt: str) -> str:
    try:
        logging.debug(f"translating text: {text[:50]}... to {tgt}")
        # Initialize Google Cloud Translation client
        translate_client = translate.Client()

        # Ensure text is in string format
        if isinstance(text, bytes):
            text = text.decode("utf-8")

        # Perform translation
        result = translate_client.translate(text, target_language=tgt)
        translated_text = result.get("translatedText", "")

        # Log successful translation
        logging.info(f"Successfully translated text to: {translated_text[:50]}...")

        return translated_text
    except Exception as e:
        # Log error
        logging.error(f"Error translating text: {text[:50]}... to {tgt} - {e}")
        return ""
######################
"""
Translate Entity:(SRF)
Provides tranlsation of entities. 
Used for mapping translated entites to entities in database(en)

IP:
1. List of entities 
2. Taregt language to be translate to

OP:
1. List of dicts [{en:ar}, {en:ar}] 
2. list of translated entities
"""
######################

def get_entity_translations(datas, tgt_lang):
    logging.info("Creating entity : translations map")
    logging.debug(f"Target language : {tgt_lang}")

    translate_map_list = []
    translated_texts_list = []

    for data in datas:
        if not data:
            continue

        try:
            # Translate text and build map
            translated_texts = translate_text(data, tgt_lang)
            if translated_texts:
                translate_map = {data: translated_texts}
                translated_texts_list.append(translated_texts)
                translate_map_list.append(translate_map)
            else:
                logging.warning(f"Translation failed for text: {data[:50]}...")

        except Exception as e:
            # Log error during translation
            logging.error(f"Error processing translation for {data[:50]}...: {e}")
    logging.info("Completed entity : translations map")
    return translate_map_list, translated_texts_list



In [None]:
entity_translate_map, translated_texts_list = get_entity_translations(all_noun_data, "ar")

## Entity to split words docs

In [None]:
######################
"""
Creats words to entity map: 
Splits given entity into words and maps each word to its entity

IP:
1. List of entities

OP:
1. Dict {entity: [words, in, entity], entity: [words, in, entity], ... }
2. list of split words [words, in, all, entity]
"""
######################

def get_entity_words_map(entities_list):
    logging.info("Splittig entity and mapping each word to its Entity")

    word_entity_map = {}
    all_split_words = set()

    for text in entities_list:
        if not text:  # Check for empty strings
            logging.warning("Encountered empty string in entities_list.")
            continue

        try:
            # Split text into words and update maps
            split_texts = text.split()
            all_split_words.update(split_texts)
            word_entity_map[text] = split_texts

            logging.info(f"Processed entity: '{text}' -> Words: {split_texts}")

        except Exception as e:
            logging.error(f"Error processing entity '{text}': {e}")

    return word_entity_map, all_split_words

In [None]:
##################################
# Entity context for sql query
##################################
en_word_entity_map, en_all_split_words = get_entity_words_map(all_noun_data)

In [None]:
# Extract name form email (NOT USED)
def extract_name_from_email(email):
    name = re.sub(r'@.*', '', email)
    name = re.sub(r'[^a-zA-Z0-9]', '', name)
    return name

In [None]:
#################################
Entity context for traslation
#################################
translated_texts_dict, all_translated_words = get_entity_words_map(translated_texts_list)

# Vector Store 

## Embedding model

In [None]:
##############################################
# Vector stores - Table Schema and Entities 
##############################################

#################################
"""
Create custom embeding finction using multi-lingual model
"""
#################################
from chromadb import Documents, EmbeddingFunction, Embeddings
from vertexai.language_models import TextEmbeddingModel, TextEmbeddingInput
from langchain.vectorstores import Chroma

class MyEmbeddingFunction(EmbeddingFunction):
    def __init__(self, model_id, task):
        try:
            self.model = TextEmbeddingModel.from_pretrained(model_id)
            self.task = task
            logging.info(f"Initialized embedding model: {model_id} for task: {task}")
        except Exception as e:
            logging.error(f"Error initializing embedding model '{model_id}': {e}")
            raise  # Reraise the exception to avoid using an uninitialized model

    def embed_documents(self, input: Documents) -> Embeddings:
        max_batch_size = 250  # Batch to max limit -> 250
        embeddings = []
        
        try:
            for i in range(0, len(input), max_batch_size):
                batch = input[i:i + max_batch_size]
                inputs = [TextEmbeddingInput(text, self.task) for text in batch]
                
                # Get embeddings for the batch
                embedding_result = self.model.get_embeddings(inputs)
                batch_embeddings = [embedding.values for embedding in embedding_result]
                embeddings.extend(batch_embeddings)

                logging.info(f"Processed batch {i // max_batch_size + 1}: {len(batch)} documents embedded.")
                
            return embeddings
        
        except Exception as e:
            logging.error(f"Error embedding documents: {e}")
            return []  # Return an empty list in case of error

    def embed_query(self, query: str) -> Embeddings:
        try:
            inputs = [TextEmbeddingInput(query, self.task)]
            embedding_result = self.model.get_embeddings(inputs)
            embeddings = [embedding.values for embedding in embedding_result]
            logging.info(f"Processed query: '{query}' -> Embedding obtained.")
            return embeddings[0] if embeddings else []  # Return first embedding or empty
            
        except Exception as e:
            logging.error(f"Error embedding query '{query}': {e}")
            return []  # Return an empty list in case of error

##############################
# Model, task type and creating embedding function
##############################
EMBEDDING_MODEL = "text-multilingual-embedding-002"
EMBEDDING_TASK = "RETRIEVAL_DOCUMENT"

try:
    embedding_function = MyEmbeddingFunction(EMBEDDING_MODEL, EMBEDDING_TASK)
except Exception as e:
    logging.critical(f"Failed to create embedding function: {e}")



## Translated entity vector store for Translator

In [None]:
# ##############################
# """
# Create vector store retriever for retrieving relevant {en:ar} entity map for user query  
# """
# #############################
# Convert entity translate map to string representation
entity_translate_map_str = [str(doc) for doc in entity_translate_map]

# Attempt to delete the existing collection and handle exceptions
try: 
    entity_translate_vector_store.delete_collection()
    logging.info("Deleted existing collection in vector store.")
except Exception as e:
    logging.warning(f"No need to clean the vector store or error occurred: {e}")

# Create a new vector store from the texts
try:
    entity_translate_vector_store = Chroma.from_texts(
        entity_translate_map_str, 
        embedding=embedding_function,
        collection_name="entity_translate_map_collection",
        persist_directory="entity_translate_map_collection",
    )
    logging.info("Successfully created a new vector store from texts.")
except Exception as e:
    logging.error(f"Error creating vector store: {e}")
    raise  # Reraise the exception for further handling if necessary

# Attempt to persist the collection to disk
try:
    entity_translate_vector_store.persist()
    logging.info("Persisted the collection to disk successfully.")
except Exception as e:
    logging.error(f"Error persisting the collection: {e}")



In [None]:

def get_entity_translate_retriever(count=8):
    try:
        entity_translate_retriever = entity_translate_vector_store.as_retriever(search_kwargs={'k': count})
        logging.info(f"Created entity translate retriever with retrive count: {count}")
        return entity_translate_retriever
    except Exception as e:
        logging.error(f"Error creating entity translate retriever: {e}")
        return None  # Return None or handle as appropriate

# Specify the directory to load the persisted vector store
translation_persist_directory = 'entity_translate_map_collection'

# Attempt to load the vector store from the persisted data
try:
    entity_translate_vector_store = Chroma(
        collection_name="entity_translate_map_collection", 
        persist_directory=translation_persist_directory, 
        embedding_function=embedding_function
    )
    logging.info("Successfully loaded the vector store from persisted data.")
except Exception as e:
    logging.error(f"Error loading vector store: {e}")
    raise  # Reraise the exception for further handling if necessary

# Use the retriever from the loaded store
entity_translate_retriever = get_entity_translate_retriever()

if entity_translate_retriever is None:
    logging.critical("Failed to create entity translate retriever. Please check the logs for details.")

## Table vector for SQL query generator

In [None]:
##############################################
# Table Schema vector stores
##############################################
import json

# Attempt to delete the existing collection and handle exceptions
try:
    schema_vector_store.delete_collection()
    logging.info("Deleted existing schema collection in vector store.")
except Exception as e:
    logging.warning(f"No need to clean the vector store or error occurred: {e}")

# Convert child documents to string representation
try:
    child_docs_as_strings = [json.dumps(doc) for doc in child_docs]
    logging.info("Converted child documents to string format.")
except Exception as e:
    logging.error(f"Error converting child documents to strings: {e}")
    raise  # Reraise the exception for further handling if necessary

# Attempt to create a new vector store from the texts
try:
    schema_vector_store = Chroma.from_texts(
        child_docs_as_strings, 
        embedding=embedding_function,
        collection_name="schema_collection",
        persist_directory="schema_collection"
    )
    logging.info("Successfully created schema vector store from texts.")
except Exception as e:
    logging.error(f"Error creating schema vector store: {e}")
    raise  # Reraise the exception for further handling if necessary

# Attempt to persist the collection to disk
try:
    schema_vector_store.persist()
    logging.info("Persisted the schema collection to disk successfully.")
except Exception as e:
    logging.error(f"Error persisting the schema collection: {e}")

In [None]:
def get_schema_retriever(count=10):
    try:
        schema_retriever = schema_vector_store.as_retriever(search_kwargs={'k': count})
        logging.info(f"Created schema retriever with retrive count: {count}")
        return schema_retriever
    except Exception as e:
        logging.error(f"Error creating schema retriever: {e}")
        return None  # Return None or handle as appropriate

# Specify the directory to load the persisted vector store
schema_persist_directory = 'schema_collection'

# Attempt to load the vector store from the persisted data
try:
    schema_vector_store = Chroma(
        collection_name="schema_collection", 
        persist_directory=schema_persist_directory, 
        embedding_function=embedding_function
    )
    logging.info("Successfully loaded the schema vector store from persisted data.")
except Exception as e:
    logging.error(f"Error loading schema vector store: {e}")
    raise  # Reraise the exception for further handling if necessary

# Use the retriever from the loaded store
schema_retriever = get_schema_retriever()

if schema_retriever is None:
    logging.critical("Failed to create schema retriever. Please check the logs for details.")


## Entity vector store for SQL query generator

In [None]:

##############################################
# Entities vector stores
##############################################

# Attempt to delete the existing noun vector store collection and handle exceptions
try:
    noun_vector_store.delete_collection()
    logging.info("Deleted existing noun collection in vector store.")
except Exception as e:
    logging.warning(f"No need to clean the vector store or an error occurred: {e}")

# Attempt to create a new noun vector store from the texts
try:
    noun_vector_store = Chroma.from_texts(
        all_noun_data,
        embedding=embedding_function,
        collection_name="noun_collection",
        persist_directory="noun_collection"
    )
    logging.info("Successfully created noun vector store from texts.")
except Exception as e:
    logging.error(f"Error creating noun vector store: {e}")
    raise  # Reraise the exception for further handling if necessary

# Attempt to persist the collection to disk
try:
    noun_vector_store.persist()
    logging.info("Persisted the noun collection to disk successfully.")
except Exception as e:
    logging.error(f"Error persisting the noun collection: {e}")

In [None]:
def get_noun_retriever(count=10):
    try:
        noun_retriever = noun_vector_store.as_retriever(search_kwargs={'k': count})
        logging.info(f"Noun retriever created with retrive cpount {count}.")
        return noun_retriever
    except Exception as e:
        logging.error(f"Error creating noun retriever: {e}")
        raise  # Reraise the exception for further handling if necessary

noun_persist_directory = 'noun_collection'

# Attempt to load the vector store from the persisted data
try:
    noun_vector_store = Chroma(
        collection_name="noun_collection",
        persist_directory=noun_persist_directory,
        embedding_function=embedding_function
    )
    logging.info("Successfully loaded noun vector store from persisted data.")
except Exception as e:
    logging.error(f"Error loading noun vector store: {e}")
    raise  # Reraise the exception for further handling if necessary

# Use the retriever from the loaded store
try:
    noun_retriever = get_noun_retriever()
    logging.info("Successfully retrieved noun retriever.")
except Exception as e:
    logging.error(f"Error retrieving noun retriever: {e}")


# Retriever

In [None]:

def get_entity_schema(entities):
    noun_rows = []
    
    try:
        conn = connect_db()
        cursor = conn.cursor()
        logging.info("Database connection established successfully.")
        
        for entity in entities:
            for noun_map in noun_maps:
                if entity in noun_map:
                    schema = noun_map[entity]['schema']
                    table_name = noun_map[entity]['table']
                    column_name = noun_map[entity]['column']
                    
                    sql_query = f"""SELECT * FROM "{schema}"."{table_name}" WHERE "{column_name}" = %s"""
                    cursor.execute(sql_query, (entity,))  # Use parameterized query to prevent SQL injection

                    columns = [desc[0] for desc in cursor.description]  # Get column names from cursor
                    for row in cursor.fetchall():
                        temp_dict = {}
                        temp_dict["query"] = sql_query
                        temp_dict["query_result"] = dict(zip(columns, row))  # Map column names to row values
                        noun_rows.append(temp_dict)
        
        logging.info(f"Processed {len(entities)} entities and retrieved their schemas successfully.")
    
    except Exception as e:
        logging.error(f"Error while retrieving entity schema: {e}")
        raise  # Reraise the exception for further handling if necessary
    
    finally:
        # Ensure cursor and connection are closed properly
        if cursor:
            cursor.close()
            logging.info("Cursor closed.")
        if conn:
            conn.close()
            logging.info("Database connection closed.")
    
    return noun_rows

## Entity retriever

In [None]:
##############################################
# Retriver for Entities 
##############################################

def get_noun_results(question):
    noun_rows = []
    
    try:
        logging.info(f"Retrieving relevant documents for question: {question}")
        results = noun_retriever.get_relevant_documents(question)

        if not results:
            logging.warning("No relevant documents found.")
            return noun_rows  # Return an empty list if no results

        entities = {result.page_content for result in results}
        logging.info(f"Identified entities: {entities}")

        noun_rows = get_entity_schema(entities) 
        logging.info(f"Retrieved schema for {len(entities)} entities.")
    
    except Exception as e:
        logging.error(f"Error while retrieving noun results: {e}")
        raise  # Reraise the exception for further handling if necessary
    
    return noun_rows

## Entity retriever for tranlsated entity

In [None]:

def get_schema(entities_list):
    noun_rows = []
    
    try:
        logging.info("Processing entities to retrieve schema.")
        
        # Use a set to avoid duplicates
        entities = set(entities_list)  
        logging.info(f"Identified unique entities: {entities}")

        # Get the entity schema
        noun_rows = get_entity_schema(entities)

        logging.info(f"Retrieved schema for {len(entities)} entities.")
    
    except Exception as e:
        logging.error(f"Error while retrieving schema: {e}")
        raise  # Reraise the exception for further handling if necessary
    
    return noun_rows

## Table schema retriever

In [None]:
##############################################
# Retriver for table schema - Auto merging retrieval
##############################################

# Create a lookup for column details to table name
column_details_to_table_name_lookup = {json.dumps(doc['column_details']): doc['table_name'] for doc in child_docs}

def get_relevant_parent_docs(question):
    retrieved_tables = []
    
    try:
        logging.info("Retrieving relevant parent documents based on the provided question.")
        
        # Retrieve relevant documents from schema_retriever
        results = schema_retriever.get_relevant_documents(question)
        logging.info(f"Retrieved {len(results)} documents from the schema retriever.")
        
        # Find table names from retrieved results
        retrieved_table_ids = set()
        for result in results:
            # Check if any of the JSON column details from child_docs are in the result's page content
            for column_details_json in column_details_to_table_name_lookup:
                if column_details_json in result.page_content:
                    retrieved_table_ids.add(column_details_to_table_name_lookup[column_details_json])
        
        logging.info(f"Identified table names: {retrieved_table_ids}")
        
        # Retrieve relevant parent documents
        retrieved_tables = [doc for doc in parent_docs if doc['table_name'] in retrieved_table_ids]
        logging.info(f"Retrieved {len(retrieved_tables)} relevant parent documents.")

    except Exception as e:
        logging.error(f"Error while retrieving relevant parent documents: {e}")
        raise  # Reraise the exception for further handling if necessary

    return retrieved_tables


# Summarizer (Not Used)

In [None]:
# from transformers import pipeline

# summarizer = pipeline("summarization", model="facebook/bart-large-cnn")

In [None]:
# summary = summarizer(res_str, max_length=100, min_length=30, do_sample=False,length_penalty=2.0, num_beams=4)

"""
length_penalty: A value >1.0 encourages the model to generate longer sequences, which can result in more detailed summaries.
num_beams: Beam search parameter that can improve the quality of the summary. More beams can lead to a more thorough exploration of possible summaries.
"""

In [None]:
# summary

# Re-Ranker


In [None]:
###################################
# Cross Encoder Model for re-ranking
###################################
from sentence_transformers import CrossEncoder

CROSS_ENCODER_MODEL = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')

In [None]:
###################################
# ReRanker for retrieved docs
###################################

def re_rank_doc(query, docs, cross_encoder, count=3):
    if not docs:
        logging.warning("No documents provided for re-ranking.")
        return []

    try:
        logging.info("Starting the re-ranking process.")

        # Create pairs of the query and each document
        pairs = [[query, doc] for doc in docs]
        logging.info(f"Generated {len(pairs)} pairs for prediction.")

        # Get scores from the cross encoder
        scores = cross_encoder.predict(pairs)
        logging.info("Scores have been predicted successfully.")

        # Sort indices based on scores
        sorted_indices = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)
        logging.info(f"Sorted document indices based on scores.")

        # Select top documents based on the sorted indices
        top_docs = [docs[i] for i in sorted_indices[:count]]
        logging.info(f"Selected top {count} documents for the final output.")

    except Exception as e:
        logging.error(f"Error during the re-ranking process: {e}")
        raise  # Reraise the exception for further handling if necessary

    return top_docs

# Fuzzy search

In [None]:
from fuzzywuzzy import process

#############################
"""
Perform fuzzy search

IP:
1. Word to be searched
2. list of words to be searched from
3. count of how many relevant search to retrieve
4. score threshold

OP:
Top matches with score higher than 60
"""
#############################
def fuzzy_search(search_word, search_from, limit=3, score=90):
    try:
        logging.info(f"Starting fuzzy search for '{search_word}' with limit {limit} and score threshold {score}.")
        fuzzy_matches = process.extract(search_word, search_from, limit=limit)
        
        top_matches = [match[0] for match in fuzzy_matches if match[1] > score]
        logging.info(f"Found {len(fuzzy_matches)} matches. Filtered down to {len(top_matches)} based on score threshold.")

    except Exception as e:
        logging.error(f"Error during fuzzy search: {e}")
        raise  # Reraise the exception for further handling if necessary

    return top_matches


#############################
"""
Map fuzzy search results to entities in DB

IP:
1. List of word to be searched
2. list of words to be searched from
3. count of how many relevant search to retrieve
4. score threshold

OP:
1. list of fuzzy search results to all input words
"""
#############################
def get_matches(word_list, search_from, limit=3, score=90):
    top_matches_set = set()

    try:
        logging.info(f"Starting to get matches for {len(word_list)} words.")
        for word in word_list:
            logging.info(f"Searching matches for word: '{word}'")
            matches = fuzzy_search(word, search_from, limit, score)
            top_matches_set.update(matches)
        
        top_matches = list(top_matches_set)
        logging.info(f"Found a total of {len(top_matches)} unique matches across all words.")

    except Exception as e:
        logging.error(f"Error while getting matches: {e}")
        raise  # Reraise the exception for further handling if necessary

    return top_matches


# Context 

## Context from entity for SQL query generator 

In [None]:
#############################
"""
Map fuzzy search results to entities

IP:
1. List of words to map
2. List of dict containg all word to entity maps [{Entity: [words, in, entity]}, ... ]

OP:
1. fuzzy search results mapped to noun maps - [entity1, entity2, ....]
"""
#############################
from collections import defaultdict

def build_reverse_lookup(entity_words_map):
    reverse_map = defaultdict(set)  # A dictionary to map words to entities
    try:
        logging.info("Building reverse lookup map.")
        for entity, words in entity_words_map.items():  # Iterate through the dictionary directly
            for word in words:
                reverse_map[word].add(entity)  # Map word to the corresponding entity
        logging.info("Successfully built the reverse lookup map.")

    except Exception as e:
        logging.error(f"Error while building reverse lookup map: {e}")
        raise  # Reraise the exception for further handling if necessary

    return reverse_map



def map_word_to_entity(word_list, reverse_map):
    entities = set()
    try:
        logging.info(f"Mapping words to entities for word list of size: {len(word_list)}.")
        for word in word_list:
            if word in reverse_map:  # Direct lookup in reverse map
                entities.update(reverse_map[word])
        logging.info(f"Mapping complete. Found {len(entities)} unique entities.")

    except Exception as e:
        logging.error(f"Error while mapping words to entities: {e}")
        raise  # Reraise the exception for further handling if necessary

    return entities

reverse_map = build_reverse_lookup(en_word_entity_map)


## Context from enity for translator

In [None]:
#############################
"""
Map fuzzy search results to entities in DB

IP:
1. List of words to map
2. List of dict containg all word to entity maps {Entity: [words, in, entity], ... }
3. list of dict containg [{en:ar},..] maps

OP:
1. fuzzy search results mapped to {en:ar}
"""
#############################
def build_translation_lookup(translate_map):
    try:
        logging.info("Building translation lookup.")
        translation_lookup = {v: k for ele in translate_map for k, v in ele.items()}
        logging.info("Successfully built the translation lookup.")
    except Exception as e:
        logging.error(f"Error while building translation lookup: {e}")
        raise  # Reraise the exception for further handling if necessary

    return translation_lookup

translation_lookup = build_translation_lookup(entity_translate_map)


In [None]:

def map_fuzzy_to_entity_translator_context(word_list, entity_words_map, translation_lookup=translation_lookup):
    if translation_lookup is None:
        logging.warning("No translation lookup provided; proceeding without it.")
        translation_lookup = {}

    try:
        logging.info("Mapping fuzzy words to entities.")
        
        # Step 1: Find matching entities for the word list
        entities = map_word_to_entity(word_list, entity_words_map)
        
        # Step 2: Map entities to their corresponding translations using the lookup
        word_entity_map = [
            {translation_lookup[entity]: entity} for entity in entities if entity in translation_lookup
        ]
        
        logging.info(f"Mapping complete. Found {len(word_entity_map)} translation mappings.")

    except Exception as e:
        logging.error(f"Error while mapping fuzzy words to entities: {e}")
        raise  # Reraise the exception for further handling if necessary

    return word_entity_map


## Context from query for sql query generator 

In [None]:
import json


def get_documents(question, entities_list):
    output = ""
    top_tabel_schema_str = ""
    noun_doc_str = ""
    noun_doc_str_prompt = ""

    try:
        ################################# RAG table schema from query #############################
        logging.info("Retrieving relevant parent documents for the table schema.")
        retrieved_parent_docs = get_relevant_parent_docs(question)
        retrieved_tables_str = [json.dumps(doc) for doc in retrieved_parent_docs]
        logging.info(f"Retrieved {len(retrieved_parent_docs)} parent documents.")

        ############################# Re-Rank & filter #############################
        if retrieved_tables_str:
            logging.info("Re-ranking table schema documents.")
            top_tabel_schema = re_rank_doc(question, retrieved_tables_str, CROSS_ENCODER_MODEL, 2)
            logging.debug(f"Length of table schema documents after Re-ranking is {len(top_tabel_schema)}")
            logging.debug(f"Table schema documents after Re-ranking is {top_tabel_schema}")

            logging.info("Filtering table schema documents.")
            filter_retrieved_parent_docs = filter_context(question, top_tabel_schema, CODE_BISON)
            logging.debug(f"Length of table schema documents after filtering is {len(filter_retrieved_parent_docs)}")
            logging.debug(f"Table schema documents after filtering is {filter_retrieved_parent_docs}")

            if filter_retrieved_parent_docs:
                top_tabel_schema_str = '\n\n'.join([str(doc) for doc in filter_retrieved_parent_docs])
            else:
                top_tabel_schema_str = '\n\n'.join([str(doc) for doc in top_tabel_schema])

        ################################# RAG entity from query #############################  
        logging.info("Retrieving noun documents from PostgreSQL.")
        retrieved_noun_docs = get_noun_results(question) 
        retrieved_noun_docs_str = [str(doc) for doc in retrieved_noun_docs]
        logging.info(f"Retrieved {len(retrieved_noun_docs)} noun documents.")

        if entities_list:
            logging.info("Getting schema for entities list.")
            entities_list_docs = get_schema(entities_list)
            entities_list_docs_str = [str(doc) for doc in entities_list_docs]
            combined_noun_docs_set = set(entities_list_docs_str + retrieved_noun_docs_str)
            retrieved_noun_docs_str = list(combined_noun_docs_set)

        ############################# Re-Rank & filter #############################
        if retrieved_noun_docs_str:
            logging.info("Re-ranking noun documents.")
            top_entity_count = 2
            logging.info(f"Max Re-ranked noun documents len {top_entity_count}.")
            top_noun_docs = re_rank_doc(question, retrieved_noun_docs_str, CROSS_ENCODER_MODEL, top_entity_count)
            logging.info(f"Noun documents after Re-ranking is {top_noun_docs}")

            logging.info("Filtering noun documents.")
            filtered_noun_docs_str = filter_context(question, top_noun_docs, CODE_BISON, True)
            logging.info(f"Length of noun documents after filtering is {len(filtered_noun_docs_str)}")
            logging.info(f"Noun documents after filtering is {filtered_noun_docs_str}")

            if filtered_noun_docs_str:
                noun_doc_str = '\n\n'.join([str(doc) for doc in filtered_noun_docs_str])

        if noun_doc_str:
            noun_doc_str_prompt = "\n\n" + "Below is the possible relevant SQL query and query result for the user's question: " + "\n\n" + noun_doc_str
        else:
            noun_doc_str_prompt = ""

        output = top_tabel_schema_str + noun_doc_str_prompt

    except Exception as e:
        logging.error(f"Error occurred while getting documents: {e}")
        output = "An error occurred while processing your request."

    return output

## NLP

In [None]:
########################################
# Arabic NLP
########################################
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize

# Download the stopwords and punkt tokenizer models if not already downloaded
try:
    nltk.download('stopwords', quiet=True)
    nltk.download('punkt', quiet=True)
    nltk.download('punkt_tab', quiet=True)
    logging.info("NLTK stopwords and tokenizer models downloaded successfully.")
except Exception as e:
    logging.error(f"Error downloading NLTK resources: {e}")

# Define the list of Arabic stop words
arabic_stopwords = set(stopwords.words('arabic'))

# Function to remove stop words from a given text
def remove_stopwords(text):
    try:
        # Modify the regex to include Arabic characters and numbers
        text = re.sub(r'[^\u0600-\u06FFa-zA-Z0-9\s]', '', text)
        logging.info("Text cleaned of non-Arabic characters.")

        words = nltk.word_tokenize(text)
        logging.info(f"Tokenized text into {len(words)} words.")

        filtered_words = [word for word in words if word not in arabic_stopwords]
        logging.info(f"Removed stopwords, resulting in {len(filtered_words)} words.")

        return ' '.join(filtered_words)

    except Exception as e:
        logging.error(f"Error occurred while removing stopwords: {e}")
        return text  # Return the original text if an error occurs



# Parsers

In [None]:
from langchain.schema import BaseOutputParser

#############################
"""
String parser for new translated query chain
"""
#############################

class CustomOutputParser(BaseOutputParser):
    def parse(self, text):
        try:
            # Split the text to extract translated text and entities
            translated_text_part, entities_part = text.split("Entities used:", 1)
            translated_text = translated_text_part.replace("Translated Text:", "").strip()
            entities = [entity.strip() for entity in entities_part.split(",") if entity.strip()]
            logging.info("Parsed translated text and entities successfully.")
            return {"translated_text": translated_text, "entities": entities}
        except Exception as e:
            logging.error(f"Error in CustomOutputParser: {e}")
            return {"translated_text": text, "entities": []}

#############################
"""
Dict parser for NER chain 
"""
#############################

class DictOutputParser(BaseOutputParser):
    def parse(self, output):
        if isinstance(output, str):
            match = re.search(r'{.*}', output, re.DOTALL)
            if match:
                dictionary_str = match.group(0)
                try:
                    dictionary = ast.literal_eval(dictionary_str)
                    logging.info("Parsed dictionary successfully.")
                    return dictionary
                except (SyntaxError, ValueError) as e:
                    logging.error(f"Error parsing dictionary: {e}")
                    return {}
        return {}

    def parse_result(self, result):
        if 'text' in result:
            return self.parse(result['text'])
        return {}

# Helper Chains

## RAG Filter Chain

In [None]:
#######################################
# Filter RAG documents
######################################
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.prompts import PromptTemplate
from langchain_core.chains import LLMChain

def filter_context(question, context, llm, entity_query=False, entity=False, fuzzy_entity=False):
    """
    Filters relevant contexts based on the given question using a specified LLM.

    Parameters:
    - question: The question to evaluate relevance against.
    - context: A list of context strings to filter.
    - llm: The language model to use for processing.
    - entity_query: Flag to indicate if relevance is for SQL queries.
    - entity: Flag to indicate if relevance is for entity presence in context.
    - fuzzy_entity: Flag to indicate if relevance is for entity presence in a question.

    Returns:
    - List of relevant contexts that are deemed relevant to the question.
    """
    
    # Define prompt templates for different scenarios
    prompt_templates = {
        'entity_query': "You are an assistant that determines the relevance of the SQL query and its result to a given question.\n\nQuestion: {question}\n\n{context}\n\nIs this query relevant to the question above? Answer 'yes' or 'no'.",
        'entity': "You are an assistant that determines the presence of the Entity in a given Context.\n\nEntity: {question}\n\nContext: {context}\n\nIs this Entity present in the context above? Answer 'yes' or 'no'.",
        'fuzzy_entity': "You are an assistant that determines the presence of the Entity in a given question.\n\nQuestion: {question}\n\nEntity: {context}\n\nIs this Entity present in the question above? Answer 'yes' or 'no'.",
        'default': "You are an assistant that determines the relevance of table schema to a given question.\n\nQuestion: {question}\n\nTable: {context}\n\nIs this table relevant to the question above? Answer 'yes' or 'no'."
    }
    
    prompt_key = 'entity_query' if entity_query else 'entity' if entity else 'fuzzy_entity' if fuzzy_entity else 'default'
    
    prompt_template = PromptTemplate(
        input_variables=["question", "context"],
        template=prompt_templates[prompt_key]
    )

    filter_chain = LLMChain(llm=llm, prompt=prompt_template)

    try:
        inputs = [{"question": question, "context": ctx} for ctx in context]
        results = filter_chain.apply(inputs)

        # Extract relevant contexts
        relevant_contexts = [ctx["context"] for ctx, res in zip(inputs, results) if 'yes' in res["text"].lower()]
        logging.info(f"Filtered relevant contexts: {len(relevant_contexts)} found.")
        return relevant_contexts

    except Exception as e:
        logging.error(f"Error in filtering context: {e}")
        return []


## Adv Translator chain

In [None]:
def translator_with_rag(llm, text, tgt, rag_result, translated_query):
    """
    Translates the user query based on rough translation and possible entities using a Language Model (LLM).
    
    Parameters:
    - llm: The language model used for translation.
    - text: The original user query.
    - tgt: The target language for translation.
    - rag_result: Retrieved documents or entities used for translation.
    - translated_query: The initial rough translation of the query.
    
    Returns:
    - result: A more accurate translated query based on the given inputs.
    """

    logging.info("Starting the translation process with RAG")
    logging.debug(f"Input text: {text}")
    logging.debug(f"Target language: {tgt}")
    logging.debug(f"RAG result: {rag_result}")
    logging.debug(f"Rough translation: {translated_query}")

    try:
        # Define the prompt template for LLM
        prompt = PromptTemplate.from_template(
            """You are an expert Translator especially in English and Arabic.
            The query can be related to Bills, Daily Sales, Dispenser Configurations, Fuel Requests, Fuel Types, Locations, Nozzles, Ports, Regions, Shifts, Fuel Stations, Tenants, Transactions, Users.
            
            Below provided are the original user query, query translated roughly to {tgt}, possible entities in the query and its translations in {tgt}. 
            Return more accurate translated query.
            
            Instructions:
            - Use the translation of an entity only if there is a good match. 
            - Do not alter or combine entities. Use them as they are.
            - Do not use entities if there is no good match.

            Original user query:
            {text}
            
            Roughly translated query:
            {translated_query}

            Possible entities:
            {rag_result}

            Result:
            """
        )

        # Initialize the output parser and LLM chain
        output_parser = CustomOutputParser()
        chain = LLMChain(prompt=prompt, llm=llm, output_parser=output_parser)

        # Run the chain to get the translation result
        result = chain.run({
            "tgt": tgt,
            "text": text,
            "rag_result": rag_result,
            "translated_query": translated_query
        })

        logging.info("Translation process completed successfully")
        logging.debug(f"Translation result: {result}")

        return result

    except Exception as e:
        logging.error(f"An error occurred during translation: {e}")
        return {"error": f"Failed to translate query: {str(e)}"}


## NER chain

In [None]:
##################################
# Identify entities in the user query 
##################################

def ner_identifier(llm, query):
    """
    Identifies named entities in the given user query using a Language Model (LLM).
    
    Parameters:
    - llm: The language model used for named entity recognition (NER).
    - query: The user query where named entities are to be identified.
    
    Returns:
    - entity_list: A comma-separated string of identified entities.
    """

    logging.info("Starting NER identification process")
    logging.debug(f"User query: {query}")

    try:
        # Define the NER prompt template for LLM
        ner_prompt = PromptTemplate.from_template(
            """You are an expert at identifying Entities.
            Given the original user query, identify the possible named entities in the query and return all entities (names, locations, addresses, etc.).

            Original user query:
            {text}

            Result (just the entities separated by commas):
            """
        )

        # Initialize the output parser and LLM chain
        output_parser = CommaSeparatedListOutputParser()
        chain = LLMChain(prompt=ner_prompt, llm=llm, output_parser=output_parser)

        # Run the chain to get the identified entities
        result = chain.invoke({"text": query})

        # Extract and return the list of entities
        entity_list = result['text']
        logging.info("NER identification completed successfully")
        logging.debug(f"Identified entities: {entity_list}")

        return entity_list

    except Exception as e:
        logging.error(f"An error occurred during NER identification: {e}")
        return {"error": f"Failed to identify entities: {str(e)}"}


In [None]:
##################################
# Identify entities in the user query 
##################################

def ner_translator(llm, query):
    """
    Translates an Arabic entity name into English, focusing on pronunciation rather than meaning.
    
    Parameters:
    - llm: The language model used for translation.
    - query: The Arabic entity name to be translated.
    
    Returns:
    - entity_list: A string representing the translated entity name, or 'None' if it cannot be translated.
    """
    
    logging.info("Starting Arabic-to-English entity translation")
    logging.debug(f"Arabic entity provided for translation: {query}")

    try:
        # Define the translation prompt template for LLM
        ner_tran_prompt = PromptTemplate.from_template(
            """Given an entity name in Arabic, translate it to English. Do not translate its meaning; translate its pronunciation.
            If you can't, just return 'None'.

            Arabic entity:
            {text}

            Result (just the translated Arabic entity):
            """
        )

        # Initialize the output parser and LLM chain
        output_parser = CommaSeparatedListOutputParser()
        chain = LLMChain(prompt=ner_tran_prompt, llm=llm, output_parser=output_parser)

        # Run the chain to get the translated entity
        result = chain.invoke({"text": query})

        # Extract and return the translation result
        entity_list = result['text']
        logging.info("Translation process completed successfully")
        logging.debug(f"Translated entity: {entity_list}")

        return entity_list

    except Exception as e:
        logging.error(f"An error occurred during entity translation: {e}")
        return "None"

## Get SQL query chain

In [None]:
# Convert dictionary to JSON formatted string
def _dict_to_json(x: dict) -> str:
    return "```\n" + json.dumps(x, indent=4) + "\n```"

# Clean SQL query by removing unwanted formatting
def clean_query(query: str) -> str:
    return query.replace("`sql\n", "").replace("```sql", "").replace("```", "").replace("``", "")

# Format SQL or Error prompt
def format_prompt(template, context=None, chat_history=None, question=None, error=None, result=None, query=None, entities=None, answer=None):
    """Helper function to format the SQL or Error prompt."""
    return template.format(
        context=context or '',
        chat_history=chat_history or '',
        question=question or '',
        error=error or '',
        query=query or '',
        result=result or '',
        entities=entities or '',
        answer=answer or ''
    )

In [None]:
# Execute SQL query
def execute_query(query, fetch_all=True):
    connection = None
    result = None
    try:
        logging.info(f"Executing query: {query}")
        connection = connect_db() 
        cursor = connection.cursor()
        cursor.execute(query)

        if fetch_all:
            result = cursor.fetchall()
        else:
            result = cursor.fetchone()

        logging.info("Query executed successfully.")
    except Exception as e:
        logging.error(f"Error executing query: {e}")
        raise
    finally:
        if cursor:
            cursor.close()
        if connection:
            connection.close()

    return result

In [None]:
from langchain.output_parsers import ResponseSchema, StructuredOutputParser
from langchain.schema.runnable import RunnableLambda
from operator import itemgetter
from langchain.schema import StrOutputParser
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from langchain.output_parsers import CommaSeparatedListOutputParser

SQL_PROMPT = """You are a PostgreSQL expert.

Create a PostgreSQL query based on the following schema and user input.

Schema (JSON format):
{context}

Conversation so far:
{chat_history}

User query:
{question}

Instructions:
- Return only the PostgreSQL code.
- Use standard PostgreSQL syntax.
- Do not use backticks or any markup.
- Use double quotes for schema names and table names.
- Use double quotes for column names.
- Use single quotes for values.

SQL query:
"""

#######################################################################
ERROR_PROMPT = """Here are the PostgreSQL queries and their errors:

{error}

Modify the query to fix the errors using the schema and context below.

Schema (JSON format):
{context}

Conversation so far:
{chat_history}

User query:
{question}

Instructions:
- Return only the PostgreSQL code.
- Use standard PostgreSQL syntax.
- Do not use backticks or any markup.
- Use double quotes for schema names and table names.
- Use double quotes for column names.
- Use single quotes for values.

Corrected SQL query:
"""

# Generate SQL query using LLM and retry if errors occur
def sql_query_generator(input, config, palm_max_attempts=3, gemini_max_attempts=2):
    """
    Generates a PostgreSQL query using an LLM model and retries corrections based on errors.
    
    Parameters:
    - input: Input dictionary containing the user's query, entities, and chat history.
    - config: Configuration parameters.
    - palm_max_attempts: Max attempts using the first model (LLM).
    - gemini_max_attempts: Max attempts using the second model (fallback model).

    Returns:
    - A dictionary with the final query and results, or error information if failed.
    """
    try:
        docs = get_documents(input['input'], input['entities'])
        question = itemgetter("input")(input)
        chat_history = itemgetter("chat_history")(input)
        formatted_prompt = format_prompt(template=SQL_PROMPT, context=docs, chat_history=chat_history, question=question)

        max_attempts = palm_max_attempts + gemini_max_attempts
        attempts = 0
        error_list = []

        while attempts <= max_attempts:
            logging.info(f"Attempt {attempts+1} of {max_attempts}")
            
            if attempts < palm_max_attempts:
                logging.debug(f"Using Chat Code Bison for sql query generation")
                logging.debug(f"Prompt: {formatted_prompt}")
                query = CODE_BISON.invoke(formatted_prompt).content
            else:
                logging.debug(f"Using Gemini Pro for sql query generation")
                logging.debug(f"Prompt: {formatted_prompt}")
                query = GEMINI_PRO(formatted_prompt)
            
            cleaned_query = clean_query(query)
            logging.debug(f"Cleaned query: {cleaned_query}")
            
            try:
                # Attempt to execute the query
                query_result = execute_query(cleaned_query)
                logging.info(f"Query executed successfully: {cleaned_query}")
                return {"query": cleaned_query, "result": query_result, "context": docs, "question": question}
            
            except Exception as e:
                logging.error(f"Error executing query: {e}")
                error_message = f"Query: {cleaned_query}\nError: {str(e)}"
                error_list.append(error_message)
                errors_so_far = "\n".join(error_list)
                
                attempts += 1
                if attempts < palm_max_attempts:
                    # Generate a corrected query using the error message
                    formatted_error_prompt = format_prompt(ERROR_PROMPT, docs, chat_history, question, error=errors_so_far)
                    formatted_prompt = formatted_error_prompt
                elif attempts == palm_max_attempts:
                    # Retry with initial prompt
                    formatted_prompt = format_prompt(template=SQL_PROMPT, context=docs, chat_history=chat_history, question=question)
                elif attempts <= max_attempts:
                    # Use the error prompt for the fallback model
                    formatted_error_prompt = format_prompt(ERROR_PROMPT, docs, chat_history, question, error=errors_so_far)
                    formatted_prompt = formatted_error_prompt
                else:
                    logging.error("Failed to generate a correct query after multiple attempts.")
                    return {
                        "error": "Failed to generate a correct query after multiple attempts.",
                        "query": cleaned_query,
                        "error_messages": error_list,
                        "query_result": None
                    }
        return {"error": "Max attempts reached, no valid query.", "query_result": None}

    except Exception as e:
        logging.error(f"An unexpected error occurred: {e}")
        return {"error": "An unexpected error occurred.", "query_result": None}


## Interpret the result

In [None]:
INTERPRET_PROMPT = """You are a PostgreSQL expert and skilled in extracting data from CSV.

A user asked:
{question}

The SQL query run was:
{query}

The query result in CSV format:
{result}

Based on this result, provide a brief answer to the user's question.

Instructions:
- Provide only the answer, without explaining how it was obtained.
- Extract the answer solely from the query result.
- Do not include the user question in your response.
- If the answer cannot be found, respond with "I cannot find the answer" and include a brief explanation with relevant details.
"""

def interpret_data(input, config):
    """
    Interprets data based on the input query and context.

    Parameters:
    - input: Dictionary containing the query, context, question, and result.
    - config: Configuration parameters (if any).

    Returns:
    - interpreted_result: The interpreted result of the BigQuery data.
    """

    try:
        logging.info("Interpreting data...")

        # Extract query and context from the input
        query = itemgetter("query")(input)
        context = itemgetter("context")(input)
        question = itemgetter("question")(input)
        result = itemgetter("result")(input)

        logging.info(f"Extracted input values: Query: {query}, Question: {question}")
        logging.debug(f"Query: {query}")
        logging.debug(f"Question: {question}")
        logging.debug(f"Context : {context}")


        # Format the prompt for data interpretation
        prompt_template = format_prompt(template=INTERPRET_PROMPT, question=question, result=result, query=query)
        logging.debug(f"Prompt: {prompt_template}")

        # Get the interpreted result
        interpreted_result = INTERPRETOR_MODEL.invoke(prompt_template)
        logging.info("Interpreted result retrieved successfully.")

        return interpreted_result.content

    except Exception as e:
        logging.error(f"Error interpreting data: {e}")
        return {"error": "Failed to interpret data", "details": str(e)}



## Translator English to Arabic Chain

In [None]:
def translate_en_to_ar(query, query_response, entities_list, config):
    """
    Translates an English query result and response into Arabic, considering entity mappings.

    Parameters:
    - query: A dictionary containing the question, query, and query result.
    - query_response: The result to be translated.
    - entities_list: A list of possible entities that need special handling for translation.
    - config: Configuration object containing additional parameters (e.g., translation models).

    Returns:
    - translation_result.content: Translated response in Arabic.
    """
    try:
        # Define the translation prompt template
        TRANSLATE_PROMPT = """You are an expert English to Arabic Translator.

        A user asked this question:
        {question}

        To find the answer, the following SQL query was run in BigQuery:
        ```
        {query}
        ```

        The result of that query was the following table in CSV format:
        ```
        {result}
        ```

        {entities}

        This is the brief answer to the user question:
        ```
        {answer}
        ```

        Using all the above information, translate the answer from English to Arabic.

        Follow these restrictions strictly:
        - Use only the given information to translate.
        - If the entities in the answer are present in the given possible English : Arabic entity translation, use the Arabic entity as is.
        - Just write the answer, omit the question from your answer, this is a chat, just provide the answer.
        """

        logging.info("Starting translate_en_to_ar function.")

        # Extract relevant fields from the query
        question = query.get("question", "")
        sql_query = query.get("query", "")
        query_result = query.get("result", "")
        answer = query_response

        # Handle entity mappings if available
        entities = None
        if entities_list:
            entity_maps = []
            for entity in entities_list:
                for entity_map in entity_translate_map:
                    if entity in entity_map:
                        entity_maps.append({entity: entity_map[entity]})

            # Prepare the entity translation mapping
            if entity_maps:
                entity_map_str = '\n'.join([f"{en}: {ar}" for mapping in entity_maps for en, ar in mapping.items()])
                entities = f"\n\nHere are some entities in English and their Arabic translations that might exist in the answer:\n```\n{entity_map_str}\n```"
        
        logging.debug(f"Entity mappings: {entities}")

        # Format the prompt for translation
        prompt_template = format_prompt(
            template=TRANSLATE_PROMPT,
            question=question,
            result=query_result,
            query=sql_query,
            entities=entities,
            answer=answer
        )
        logging.debug(f"Formatted prompt: {prompt_template}")

        # Invoke the translation model
        translation_result = CHAT_BISON.invoke(prompt_template)
        logging.info("Translation result successfully retrieved.")
        
        return translation_result.content

    except Exception as e:
        logging.error(f"Error during translation: {e}", exc_info=True)
        return {"error": "Failed to translate the query result.", "details": str(e)}


# Tools

## Math tool

In [None]:
from langchain import LLMMathChain
from langchain.tools import Tool

math_chain = LLMMathChain.from_llm(llm=AGENT_MODEL)
math_tool = Tool(
  name="Calculator",
  description="Useful for when you need to answer questions about math.",
  func=math_chain.run,
  coroutine=math_chain.arun)

# Advanced translator

In [None]:
def replace_words(text, replacements):
    words = text.split()
    new_words = [replacements.get(word, word) for word in words]
    return ' '.join(new_words)

# Define the dictionary for replacements
replacements = {
    "operations": "transactions",
    "sessions": "transactions"
}

In [None]:
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
import re
from concurrent.futures import ThreadPoolExecutor

#############################
# Translator with RAG
#############################

do_print = False
def translate_text_v2(llm, text: str, tgt: str):
    """
    Translates a given text into the target language, with entity recognition and RAG (Retrieve and Generate) support.
    
    Parameters:
    - llm: Large language model used for translation tasks.
    - text: The input text to be translated.
    - tgt: The target language code.
    
    Returns:
    - new_query: The translated text or the original text if no translation was needed.
    - translated_entities_used: List of entities translated during the process.
    """
    try:
        translate_client = translate.Client()

        if isinstance(text, bytes):
            text = text.decode("utf-8")
        
        detected_source_language = translate_client.detect_language(text)["language"]
        logging.info(f"Detected source language: {detected_source_language}")

        logging.debug(f"Source languages: {detected_source_language}.")
        logging.debug("Target languages: {tgt}.")
        logging.debug("Text: {text}.")

        translated_entities_used = []
        if detected_source_language == tgt:
            logging.info("Source and target languages are the same. No translation needed.")



            return text, translated_entities_used

        # Translate the text if necessary
        translated_query = translate_text(text, tgt)
        logging.info("Basic translation sucessful.")
        logging.debug("Basic translated text: {translated_query}.")


        # Initialize variables for storing NER results and entity matching
        ar_ner_res, en_ner_res, query_match_entities_list = [], [], []
        ar_ner_res_list, en_ner_res_list, ner_match_entities_list = [], [], []

        ############################# Query RAG and Filtering ##############################

        query_rag = entity_translate_retriever.get_relevant_documents(text)
        query_rag_list = list({doc.page_content for doc in query_rag})

        if query_rag_list:
            query_re_rank = re_rank_doc(text, query_rag_list, CROSS_ENCODER_MODEL, 2)
            query_filter = filter_context(text, query_rag_list, INTERPRETOR_MODEL)

            logging.debug(f"Re-ranked query: {query_re_rank}")
            logging.debug(f"Filtered query: {query_filter}")

        ############################# Fuzzy Search for Entities ############################

        query_cleaned = remove_stopwords(text)
        query_top_match = get_matches(query_cleaned.split(), all_translated_words)

        if query_top_match:
            query_re_ranked_match = re_rank_doc(text, query_top_match, CROSS_ENCODER_MODEL, 3)
            query_re_ranked_match_entities = map_fuzzy_to_entity_translator_context(query_re_ranked_match, translated_texts_dict, entity_translate_map)
            query_filtered_match = filter_context(text, query_top_match, INTERPRETOR_MODEL, fuzzy_entity=True)

            query_match_entities = map_fuzzy_to_entity_translator_context(query_filtered_match, translated_texts_dict, entity_translate_map)
            query_match_entities_list = [str(entity) for entity in query_match_entities]
            logging.debug(f"Fuzzy matched entities: {query_match_entities_list}")

        ############################# Named Entity Recognition (NER) ########################

        ar_ner = ner_identifier(CHAT_BISON, text)
        if ar_ner:
            en_ner = [translated for ner in ar_ner for translated in ner_translator(CHAT_BISON, ner)]
            logging.debug(f"Arabic NER: {ar_ner}")
            logging.debug(f"English NER: {en_ner}")

            # # Handle Arabic NER
            # for ner_item in ar_ner:
            #     process_ner_entities(ner_item, ar_ner_res, ar_ner_res_list, "Arabic")

            # # Handle English NER
            # for ner_item in en_ner:
            #     process_ner_entities(ner_item, en_ner_res, en_ner_res_list, "English")

        entity_count=1
        if ar_ner:
            entity_count = len(ar_ner)
        
            ner_res = []
            ner = []
            ner = ar_ner + en_ner
            # ner.append(en_ner)
            start_time = time.time()
            with ThreadPoolExecutor() as executor:
                ar_future = executor.submit(process_ner, ar_ner, 'ar')
                en_future = executor.submit(process_ner, en_ner, 'en')
                
                ar_ner_res_list = ar_future
                en_ner_res_list = en_future
                
                logging.debug(f"Filtered Arabic NER: {ar_ner_res_list}")
                logging.debug(f"Filtered English NER: {en_ner_res_list}")

                
                
        ############################# Fuzzy Search for NER Entities #########################

        ner_split_flattened = [item for sublist in [remove_stopwords(ele).split() for ele in ar_ner] for item in sublist]
        if 'محطة' in ner_split_flattened:
            ner_split_flattened.remove('محطة')

        ner_top_matches = get_matches(ner_split_flattened, all_translated_words, 5)
        if ner_top_matches:
            process_fuzzy_matches(ner_top_matches, ar_ner, en_ner_res_list, ar_ner_res_list)

        ############################# Final RAG and Fuzzy Result ############################

        entities_list = list(set(query_filter + query_match_entities_list + en_ner_res_list + ar_ner_res_list + ner_match_entities_list))
        logging.info(f"Final list of entities: {entities_list}")

        ############################# LLM Chain for Translation #############################
        
        result = translator_with_rag(CHAT_BISON, text, tgt, entities_list, translated_query)
        new_query = replace_words(result['translated_text'], replacements)
        logging.debug(f"Translation with adv translator with out replacer: {result}")
        logging.debug(f"Translation with replacer: {new_query}")

        translated_entities_used = result['entities']
        
        logging.info("Translation completed successfully.")
        return new_query, translated_entities_used

    except Exception as e:
        logging.error(f"An error occurred during translation: {e}", exc_info=True)
        return {"error": "Translation failed.", "details": str(e)}


def process_ner(ner, language):
    ner_res = []

    for ele in ner:
        ############# RAG #############
        ner_rag = entity_translate_retriever.get_relevant_documents(ele)
        ner_rag_list = list({doc.page_content for doc in ner_rag})  # Unique results
        
        ############# Re-Rank and filter #############
        if ner_rag_list:
            # ner_re_rank = re_rank_doc(ele, ner_rag_list, CROSS_ENCODER_MODEL)
            ner_filter = filter_context(ele, ner_rag_list, INTERPRETOR_MODEL, entity_query=False, entity=True)
            ner_res.append(list(ner_filter))

    ner_res_future = ner_res.result()
    ner_res_list = sum(ner_res_future, [])

    return ner_res_list


def process_fuzzy_matches(ner_top_matches, ar_ner, en_ner_res_list, ar_ner_res_list):
    """
    Helper function to process fuzzy matches for NER entities.
    
    Parameters:
    - ner_top_matches: The top matches from the fuzzy search.
    - ar_ner: Arabic NER results.
    - en_ner_res_list: List to store English NER results.
    - ar_ner_res_list: List to store Arabic NER results.
    """
    try:
        ner_re_rank_match = re_rank_doc(ar_ner, ner_top_matches, CROSS_ENCODER_MODEL, len(ar_ner) * 2)
        ner_rr_match_entities = map_fuzzy_to_entity_translator_context(ner_re_rank_match, translated_texts_dict, entity_translate_map)
        ner_filtered_match = filter_context(ar_ner, ner_top_matches, INTERPRETOR_MODEL, fuzzy_entity=True)
        ner_match_entities = map_fuzzy_to_entity_translator_context(ner_filtered_match, translated_texts_dict, entity_translate_map)

        en_ner_res_list.extend(ner_rr_match_entities)
        ar_ner_res_list.extend(ner_match_entities)

        logging.debug(f"Fuzzy NER matches processed successfully.")

    except Exception as e:
        logging.error(f"Error processing fuzzy NER matches: {e}", exc_info=True)


# Agent

In [None]:
AGENT_PROMPT = """You are a very powerful assistant that can answer questions. 

ALWAYS USE user_question_tool to answer fuel, fuel stations, station locations, transcations and bills related questions.

You can invoke the tool Calculator if you need to do mathematical operations.

Always use the tools to try to answer the questions. Use the chat history for context. Never try to use any other external information.

Assume that the user may write with misspellings, fix the spelling of the user before passing the question to any tool.

Don't mention what tool you have used in your answer.
"""

In [None]:
from langchain.tools import tool
from langchain.callbacks.tracers import ConsoleCallbackHandler
from pydantic import BaseModel, Field


class UserQuestionToolInput(BaseModel):
    input: int = Field(description="User natural language questions to be answered.")
    user_id: int = Field(description="User Id")


@tool("user-question-tool", args_schema= UserQuestionToolInput)
def user_question_tool(input: str, user_id: str ) -> str:
    """Answers natural language questions related to fuel, fuel stations, station locations, transcations and bills using data from database"""
    
    config = {'callbacks': [ConsoleCallbackHandler()]}
    
    try:
        logger.info(f"Using user_question_tool for user: {user_id}")
        logger.info(f"Using user_question_tool for user question: {input}")


        # Fetch user memory
        user_memory = user_memories.get(user_id)
        if not user_memory:
            raise ValueError("User memory not found.")
        memory = user_memory.buffer_as_str.strip()

        # Translate input to English
        try:
            translated_text, entities_list = translate_text_v2(CHAT_BISON, input, "en")
            logger.info(f"Translated text: {translated_text}, Entities: {entities_list}")
        except Exception as e:
            logger.error(f"Error during translation: {str(e)}")
            raise

        # Prepare query
        question = {"input": translated_text, "chat_history": memory, "entities": entities_list}
        
        # Generate SQL query
        try:
            query = sql_query_generator(question, config)
            logger.info(f"Generated SQL query: {query}")
        except Exception as e:
            logger.error(f"Error in SQL query generation: {str(e)}")
            raise

        # Interpret query response
        try:
            query_response = interpret_data(query, config)
            logger.info(f"Query response: {query_response}")
        except Exception as e:
            logger.error(f"Error interpreting query response: {str(e)}")
            raise

        # Translate query response back to Arabic
        try:
            translated_result = translate_en_to_ar(query, query_response, entities_list, config)
            logger.info(f"Translated result: {translated_result}")
        except Exception as e:
            logger.error(f"Error translating result to Arabic: {str(e)}")
            raise

        # Save conversation context
        try:
            user_memory.save_context({"input": translated_text}, {"output": translated_result.strip()})
        except Exception as e:
            logger.error(f"Error saving user memory: {str(e)}")
            raise

        return translated_result.strip()

    except Exception as e:
        logger.error(f"An error occurred in user_question_tool: {str(e)}")
        return "An error occurred while processing your request."



In [None]:
from collections import defaultdict
from langchain.memory import ConversationBufferWindowMemory

user_memories = defaultdict(lambda: ConversationBufferWindowMemory(memory_key="chat_history", k=50, return_messages=True))
user_agents = {}

In [None]:
from langchain_core.prompts import ChatPromptTemplate 
from langchain.agents import AgentExecutor, create_tool_calling_agent, tool


# Define the agent's system prompt
AGENT_PROMPT = ChatPromptTemplate.from_messages([
    ("system", "You are a very powerful assistant that can answer questions using postgres."),
    ("placeholder", "{chat_history}"),
    ("human", "{input}"),
    ("placeholder", "{agent_scratchpad}")
])

# List of tools available for the agent
agent_tools = [math_tool, user_question_tool]

def get_or_create_agent(user_id):
    try:
        # Check if the user agent already exists
        if user_id in user_agents:
            logging.info(f"Reusing existing agent for user_id: {user_id}")
            return user_agents[user_id]

        logging.info(f"Creating new agent for user_id: {user_id}")
        # Get the memory associated with the user
        agent_memory = user_memories.get(user_id)

        if not agent_memory:
            logging.error(f"No memory found for user_id: {user_id}")
            raise ValueError(f"Memory for user_id {user_id} not initialized.")

        # Create a new agent using create_tool_calling_agent
        agent = create_tool_calling_agent(
            llm=AGENT_MODEL,
            tools=agent_tools,
            prompt=AGENT_PROMPT
        )

        # Wrap the agent in an executor with settings like max iterations and verbose logging
        agent_executor = AgentExecutor(
            agent=agent,
            tools=agent_tools,
            memory=agent_memory,
            max_iterations=5,
            early_stopping_method='generate',
            verbose=True
        )

        # Store the agent executor for future use
        user_agents[user_id] = agent_executor
        logging.info(f"Agent successfully created for user_id: {user_id}")

        return agent_executor

    except Exception as e:
        logging.error(f"Failed to create or retrieve agent for user_id: {user_id}. Error: {e}")
        raise e


In [None]:
import logging

def process_user_query(user_id: str, query: str) -> str:
    """Process the user query and return the response."""

    # Validate inputs
    if not user_id or not query:
        logging.error("Invalid input: user_id and query cannot be empty.")
        raise ValueError("Both user_id and query must be provided.")

    try:
        # Get or create an agent for the user
        agent = get_or_create_agent(user_id)
        logging.info(f"Agent created for user_id: {user_id}")

        # Process the user query
        response = agent({"input": query})
        logging.info(f"Query processed for user_id: {user_id}, query: {query}")

        # Extract the output from the response
        output = response.get('output', "No output returned.")
        
        if output is None:
            logging.warning(f"No output found in response for user_id: {user_id}.")
            return "No output found in response."

        return output

    except KeyError as e:
        logging.error(f"Key error while processing query: {str(e)}")
        return "Error processing your query. Please try again."

    except Exception as e:
        logging.error(f"An unexpected error occurred for user_id: {user_id}, query: {query}: {str(e)}")
        return "An unexpected error occurred while processing your query."


In [None]:
user_memories.clear()
user_agents.clear()
user_agents
user_memories

# Query

In [None]:
user_id = "user12"  # Example user ID
query_list = ["كم مبيعات الموظفين في يوم 18 مايو سنة 2024", "ما هي مبيعات الموظفين بتاريخ 18 مايو 2024 ؟", "كم كانت مبيعات الموظفين في تاريخ 18/05/2024", "كم عدد الوقود الذي تم طلبه في اول أسبوع من شهر مايو", "ما مقدار الوقود المطلوب في الأسبوع الأول من مايو 2024 ؟", "كم عدد طلبات الوقود في الأسبوع الأول من شهر مايو", "ماهي مبيعات الموظفين في اول أسبوعين من شهر مايو سنة 2024", "ما هي مبيعات الموظفين في الأسابيع الاثنين الأولى من مايو 2024 ؟", "كم كانت مبيعات الموظفين في الأسبوعين الأولى من شهر مايو ", "عطني ملخص العمليات التي تمت في شهر مايو سنة 2024", "أعطني ملخصًا موجزًا للمعاملات التي تمت في مايو 2024", "اعطيني ملخص العمليات اللي صارت في شهر مايو", "اعطيني معلومات عن الخليج", "زودني بمعلومات عن الخليج", "عطني معلومات عن الخليج ", "اعطيني بعض المعلومات عن محطة سمرة", "هل يمكنك أن تعطيني بعض المعلومات عن محطة سمرة ؟", "تقدر تعطيني بعض الملعومات عن محطة سمرة", "ما هي المحطات التي تحتوي على دورة مياه ومحطة شحن وغسيل سيارات ؟", "ايش هي المحطات التي تحتوي على دورة مياه ومحطة شحن وغسيل سيارات ؟", "وش هي المحطات اللي فيها دورات مياه وشاحن كهربائي ومغسلة سيارات ؟", "مين وافق على طلب الوقود في يوم 19 شهر 5 سنة 2024", "من وافق على الوقود المطلوب في 19 مايو 2024 ؟", "من اعتمد طلبات الوقود في 19 مايو 2024 ؟", "كم عدد مبيعات الوقود في شهر 5 سنة 2024؟ اعطيني تراكمي المبيعات ", "ما هو إجمالي مبيعات الوقود لشهر مايو 2024 ؟ مبلغ المبيعات التراكمي.", "كم كان اجمالي مبيعات الوقود في شهر مايو 2024 ؟ عطني اجمالي قيمة المبيعات ", "كم عدد الحوالات المكتملة في يوم 25 شهر 5 سنة 2024؟", "كم عدد المعاملات التي تم الانتهاء منها في 25 مايو 2024 ؟", "كم كانت العمليات اللي تمت في 25 مايو سنة 2024", "اعطيني اجمالي الحولات في هذاك اليوم", "هل يمكنك إعطاء إجمالي عدد المعاملات لذلك اليوم ؟", "تقدر تعطيني اجمالي العمليات في ذاك اليوم ", "من هم أفضل ثلاثة موظفين من حيث المبيعات لشهر 5 سنة 2024 ؟ يرجى ذكر أسمائهم ومبالغ مبيعاتهم.", "مين افضل ثلاثة موظفين من حيث المبيعات لشهر مايو سنة 2024؟ اذكر أسمائهم ومبالغ مبيعاتهم", "مين اعلى ثلاثة عمال في المبيعات لشهر مايو سنة 2024؟ عطني أسمائهم مع المبيعات ", "ما مقدار الإيرادات التي تم تحقيقها من مبيعات الوقود في الأسبوع الثاني من مايو 2024 ؟ توفير إجمالي الإيرادات لتلك الفترة.", "كم عدد الإيرادات من مبيعات الوقود في الأسبوع الثاني من شهر خمسة سنة 2024؟ اعطيني اجمالي الإيرادات في ذيك الفترة", "كم كان دخل مبيعات الوقود في الأسبوع الثاني من شهر 5 سنة 2024؟ عطني اجمالي الدخل في هذيك الفترة ", "ماهو متوسط عدد العمليات في شهر خمسة سنة 2024؟ متوسط قيمة جميع العمليات للشهر.", "ما هو متوسط مبلغ الصفقة لشهر مايو 2024 ؟ تضمين متوسط قيمة جميع المعاملات للشهر.", "وش متوسط عدد العمليات في مايو 2024؟ اضف متوسط قيمة كل العمليات للشهر", "هل يمكنك تقديم العدد الإجمالي لطلبات الوقود المقدمة في مايو 2024 ؟ أحتاج إلى إحصاء جميع طلبات الوقود المقدمة في ذلك الشهر.", "اعطيني مجموع طلبات الوقود في شهر 5 سنة 2024؟ احتاج جميع طلبات الوقود في ذلك الشهر", "كم مجموع طلبات الوقود في شهر مايو 2024؟ ابي عدد كل الطلبات المعتمدة في هذا الشهر"]

# query = "Which stations have charging?"
# query = "ما هي المحطات التي لديها الشحن؟"
# query = "Which stations have restrooms?"
# query = "Can you give me name of 2 stations?"
# query = "what were the employee sales for the date 18th of May 2024?"
# query = "ما هي مبيعات الموظفين بتاريخ 18 مايو 2024؟"
# query = "what is the employee sales of the 1st 2 week of may 2024"
# query = "How much fuel was requested on 1st week of may 2024"
# query = "give me a brief summary on the transactions done in may 2024"
# query = "Who approved the fuel requested on 19th may 2024"
# query = "Tell me about the bill processed by Amal"
# query = "أخبريني عن الفاتورة التي عالجتها أمل"
# query = "What were the employee sales for the date 18th of May 2024?"
# query = "عطني ملخص العمليات التي تمت في شهر مايو سنة 2024"
# query = "كم مبيعات الموظفين في يوم 18 مايو سنة 2024"
# query = "ماهو متوسط عدد العمليات في شهر خمسة سنة 2024؟ متوسط قيمة جميع العمليات للشهر." # What is the average transaction amount for May 2024? Include the average value of all transactions for the month.
# query = "هل يمكنك أن تعطيني بعض المعلومات عن محطة سمرة ؟" # Give me information about samara station
# query =  "كم عدد المعاملات التي تم الانتهاء منها في 23 مايو 2024 ؟" # How many transactions were completed on May 25, 2024? 
# query = "2024 كم عدد طلبات الوقود في الأسبوع الأول من شهر مايو" # How much fuel was requested on the 1st week of May 2024?
# query = "no, give me average value of all transactions for the whole month, not only 18th"
query = "ما هي مبيعات الموظفين بتاريخ 18 مايو 2024؟"  # "what were the employee sales for the date 18th of May 2024?"
response = process_user_query(user_id, query)

# Test

## Lanagchain set_llm_cache

In [None]:
CODE_BISON = ChatVertexAI(model_name="codechat-bison", max_output_tokens=1024)
INTERPRETOR_MODEL = ChatVertexAI(max_output_tokens=1024)
AGENT_MODEL = ChatVertexAI(max_output_tokens=1024)
CHAT_BISON = ChatVertexAI(model_name="chat-bison@002", max_output_tokens=1024)

In [7]:
from langchain_core.caches import InMemoryCache
from langchain_core.globals import set_llm_cache

set_llm_cache(InMemoryCache())

In [13]:
%%time
res = INTERPRETOR_MODEL.invoke("what is ML")
res

CPU times: user 727 µs, sys: 115 µs, total: 842 µs
Wall time: 729 µs


AIMessage(content=' Machine learning (ML) is a subfield of artificial intelligence (AI) that gives computers the ability to learn without being explicitly programmed. ML algorithms are able to learn from and make decisions or predictions based on data. These algorithms are typically trained on large datasets and can be used for a variety of tasks, such as image recognition, natural language processing, and predictive analytics.')

In [12]:
%%time
res = INTERPRETOR_MODEL.invoke("Tell me about ML")
res

CPU times: user 9.71 ms, sys: 856 µs, total: 10.6 ms
Wall time: 2.6 s


AIMessage(content=' Machine learning (ML) is a subfield of artificial intelligence (AI) that gives computers the ability to learn without being explicitly programmed. ML algorithms are able to learn from and make decisions or predictions based on data.\n\nThere are many different types of ML algorithms, each with its own strengths and weaknesses. Some of the most common types of ML algorithms include:\n\n* **Supervised learning:** In supervised learning, the ML algorithm is trained on a dataset of labeled data. This means that each data point in the dataset is associated with a known output. The ML algorithm learns to map input data to output data by finding patterns in the labeled data.\n* **Unsupervised learning:** In unsupervised learning, the ML algorithm is trained on a dataset of unlabeled data. This means that each data point in the dataset does not have a known output. The ML algorithm learns to find patterns in the unlabeled data without being explicitly told what to look for.

In [14]:
%%time
res = INTERPRETOR_MODEL.invoke("Tell me about ML")
res

CPU times: user 847 µs, sys: 135 µs, total: 982 µs
Wall time: 869 µs


AIMessage(content=' Machine learning (ML) is a subfield of artificial intelligence (AI) that gives computers the ability to learn without being explicitly programmed. ML algorithms are able to learn from and make decisions or predictions based on data.\n\nThere are many different types of ML algorithms, each with its own strengths and weaknesses. Some of the most common types of ML algorithms include:\n\n* **Supervised learning:** In supervised learning, the ML algorithm is trained on a dataset of labeled data. This means that each data point in the dataset is associated with a known output. The ML algorithm learns to map input data to output data by finding patterns in the labeled data.\n* **Unsupervised learning:** In unsupervised learning, the ML algorithm is trained on a dataset of unlabeled data. This means that each data point in the dataset does not have a known output. The ML algorithm learns to find patterns in the unlabeled data without being explicitly told what to look for.

In [24]:
!pip install langchainhub
from langchain import hub

Collecting langchainhub
  Downloading langchainhub-0.1.21-py3-none-any.whl.metadata (659 bytes)
Collecting types-requests<3.0.0.0,>=2.31.0.2 (from langchainhub)
  Downloading types_requests-2.32.0.20240914-py3-none-any.whl.metadata (1.9 kB)
Collecting urllib3<3,>=1.21.1 (from requests<3,>=2->langchainhub)
  Downloading urllib3-2.2.3-py3-none-any.whl.metadata (6.5 kB)
Downloading langchainhub-0.1.21-py3-none-any.whl (5.2 kB)
Downloading types_requests-2.32.0.20240914-py3-none-any.whl (15 kB)
Downloading urllib3-2.2.3-py3-none-any.whl (126 kB)
Installing collected packages: urllib3, types-requests, langchainhub
  Attempting uninstall: urllib3
    Found existing installation: urllib3 1.26.19
    Uninstalling urllib3-1.26.19:
      Successfully uninstalled urllib3-1.26.19
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
kfp 2.5.0 requires urllib3<2.0.0, but you

In [25]:
prompt = hub.pull("hwchase17/react")

Please use the `langsmith sdk` instead:
  pip install langsmith
Use the `pull_prompt` method.
  resp: str = client.pull(owner_repo_commit)
Please use the `langsmith sdk` instead:
  pip install langsmith
Use the `pull_prompt` method.
  res_dict = self.pull_repo(owner_repo_commit)


In [26]:
prompt

PromptTemplate(input_variables=['agent_scratchpad', 'input', 'tool_names', 'tools'], template='Answer the following questions as best you can. You have access to the following tools:\n\n{tools}\n\nUse the following format:\n\nQuestion: the input question you must answer\nThought: you should always think about what to do\nAction: the action to take, should be one of [{tool_names}]\nAction Input: the input to the action\nObservation: the result of the action\n... (this Thought/Action/Action Input/Observation can repeat N times)\nThought: I now know the final answer\nFinal Answer: the final answer to the original input question\n\nBegin!\n\nQuestion: {input}\nThought:{agent_scratchpad}')

## CONTEXT_PROMPT

In [None]:
CONTEXT_PROMPT = """You are a helpful assistant. Your task is to determine if the current user question is a follow-up to any previous questions.

Current question:
{question}

Chat history:
{chat_history}

If the current question is indeed a follow-up, please formulate a new question that highlights the current question while incorporating relevant context from the related question and its answer.
If the current question is not a  follow-up, return the current question as is.

New question:
"""

# " and its answer" is optional

# question adjusted for possible follow up 
adujusted_question = GEMINI_PRO(CONTEXT_PROMPT)

# Guard Rails

In [None]:
import jwt
import json
from jwt import DecodeError, ExpiredSignatureError

# Decoding JWT (without verification)
def decode_jwt(token):
    try:
        # Decode the token to view its payload and header without verifying the signature
        decoded = jwt.decode(token, options={"verify_signature": False})
        
        return decoded
    except DecodeError as e:
        return f"Failed to decode token: {e}"


# Verifying JWT Signature
def verify_jwt(token, pub_key):
    try:
        # Verifying the token with the public key (for RS256)
        verified = jwt.decode(token, pub_key, algorithms=["RS256"])
        print("Verified JWT Payload: ", verified)
        print("Verified JWT Header: ", verified.header)
        return verified
    except ExpiredSignatureError:
        return "Token has expired!"
    except DecodeError:
        return "Invalid token signature!"

def process_user_query(token: str, query: str) -> str: 
    """Process the user query and return the response."""
    try:
        if not token or not query:
            logging.error("Token or query is missing.")
            raise ValueError("Token and query are required.")
        
        # Decode JWT to get user data
        user_data = decode_jwt(token)
        
        if 'TenantID' not in user_data or 'user_id' not in user_data:
            logging.error("Invalid user data received from JWT.")
            raise ValueError("Invalid user data.")
        
        tenant_id = user_data['TenantID']
        user_id = user_data['user_id']
        
        logging.info(f"Processing query for user_id: {user_id} with tenant_id: {tenant_id}")

        # Get or create the agent for the user
        agent = get_or_create_agent(user_id)
        
        # Process the query using the agent
        response = agent({"input": query})

        if 'output' not in response:
            logging.error(f"No output found in response for user_id: {user_id}")
            raise ValueError("No output found in the response.")
        
        output = response['output']
        logging.info(f"Query processed successfully for user_id: {user_id}")
        
        return output

    except Exception as e:
        logging.error(f"Error processing user query for token: {token}. Error: {e}")
        raise  # Re-raise the exception after logging
    

In [None]:
def calling_dummy_api(user_id):
    return
    
def table_access_check():
    # call API
    permited_tables = calling_dummy_api(user_id)

    # From RAG get table id, if restricted id are there remove that table
    # from SQL query generated, check if table_id in query, if is there re generate query with restricted_prompt
    # get the id of the user from tenant_id table, see how you can add filter to make sure only selected tenant and stations are seleted, 
        # Maybe use a function to that adds to WHERE clause if it exists if not, adds the WHERE clause   
    # Get the sql result, make sure labels are there (wont work if aggregation are performed)
        # and turn it into  DF,
        # use it to filter out the ID  
    

## Timer

In [None]:
import time

start_time = time.time()

# Somefunc()
end_time = time.time()
print(f"Time taken by Somefunc : {end_time - start_time} seconds")
