# **BigQuery Q&A using langchain & LLM (go/ask-bigquery)**


Demo: This notebook guides you on use of LLM to answer questions over a BigQuery table.
This notebook has also been tested on big datasets(~2 million rows) and works well with low latency.

Dataset: Fitbit public dataset from Kaggle (https://www.kaggle.com/datasets/arashnic/fitbit)

To use it on your own data, change the parameters in the section "App parameters initialization"





|Author:       |Nikhil Rana (nikhilrana@)|
|--------------|---------|
|Last Updated: |5/30/2023|


Example input: What is the average number of steps taken by the user?

| Date     | Change Log  |
|----------|-------------|
|5/17/2023 |Added Gradio UI at the end |
|5/23/2023 |Changed setup to latest SDK|
|5/30/2023 |Modified Prompt template for metadata queries|
|5/31/2023 |Successfully tested on multiple SQL joins|
|5/31/2023 |Modified prompt template for casting data types for calculations|
|6/07/2023 |Updated SDKs to latest Langchain & SQLAlchemy|

### Install latest Vertex LLM SDK

In [1]:
# Authenticate with Google account
from google.colab import auth as google_auth
google_auth.authenticate_user()

In [None]:
#Check for latest available SDK
# !gsutil cp gs://vertex_sdk_llm_private_releases/SDK/google_cloud_aiplatform-1.23.0.llm.alpha.23.03.28-py2.py3-none-any.whl .
# !gsutil cp gs://vertex_sdk_llm_private_releases/SDK/google_cloud_aiplatform-1.25.dev20230502+language.models-py2.py3-none-any.whl .

!pip3 uninstall -y google-cloud-aiplatform
!!pip install google-cloud-aiplatform --upgrade

In [3]:
# !pip install google_cloud_aiplatform-1.25.dev20230502+language.models-py2.py3-none-any.whl "shapely<2.0.0"

### Python libraries setup

In [None]:
# Install Python Libraries
!pip install chromadb --quiet
!pip install langchain==0.0.191 --quiet
!pip install google-cloud-core --quiet
!pip install gradio --quiet

# Below libraries are required to build a SQL engine for BigQuery
!pip install SQLAlchemy --quiet
!pip install sqlalchemy-bigquery --quiet

In [5]:
exit()

## **Since the kernel restarted at this point, run the below code individually**

### LLM Model Initialization & App parameters initialization

In [4]:
# @title Specify Project details and location of the BQ table

project_id = "prueba"  # @param {type:"string"}
location = "us-central1"  # @param {type:"string"}
dataset_id = 'best' # @param {type:"string"}
table_name1 = 'best' # @param {type:"string"}
table_name2 = '' # @param {type:"string"}
table_name3 = '' # @param {type:"string"}

# table_names = (table_name1,table_name2,table_name3)
table_names = (table_name1)

In [12]:
# @title Vertex AI LLM wrapper for using with Langchain
# Credits:
#  pmarlow@: go/vertex-on-langchain-source
# Note:
# - 04/19: Eventually this will be replaced by Langchain + Vertex AI integration

import vertexai
vertexai.init(project=project_id, location=location)

import time
from typing import Any, Mapping, List, Dict, Optional, Tuple

from pydantic import BaseModel, Extra, root_validator

from langchain.llms.base import LLM
from langchain.embeddings.base import Embeddings
from langchain.chat_models.base import BaseChatModel
from langchain.llms.utils import enforce_stop_tokens
from langchain.schema import Generation, LLMResult
from langchain.schema import AIMessage, BaseMessage, ChatGeneration, ChatResult, HumanMessage, SystemMessage

from vertexai.preview.language_models import TextGenerationModel, TextEmbeddingModel, ChatModel


def rate_limit(max_per_minute):
  period = 60 / max_per_minute
  while True:
    before = time.time()
    yield
    after = time.time()
    elapsed = after - before
    sleep_time = max(0, period - elapsed)
    if sleep_time > 0:
      print(f'Sleeping {sleep_time:.1f} seconds')
      time.sleep(sleep_time)


class _VertexCommon(BaseModel):
    """Wrapper around Vertex AI large language models.

    """
    client: Any = None #: :meta private:
    model_name: str = "text-bison@001"
    """Model name to use."""

    temperature: float = 0.2
    """What sampling temperature to use."""

    top_p: int = 0.8
    """Total probability mass of tokens to consider at each step."""

    top_k: int = 40
    """The number of highest probability tokens to keep for top-k filtering."""

    max_output_tokens: int = 200
    """The maximum number of tokens to generate in the completion."""

    @property
    def _default_params(self) -> Mapping[str, Any]:
        """Get the default parameters for calling Vertex AI API."""
        return {
            "temperature": self.temperature,
            "top_p": self.top_p,
            "top_k": self.top_k,
            "max_output_tokens": self.max_output_tokens
        }

    def _predict(self, prompt: str, stop: Optional[List[str]]) -> str:
        res = self.client.predict(prompt, **self._default_params)
        return self._enforce_stop_words(res.text, stop)

    def _enforce_stop_words(self, text: str, stop: Optional[List[str]]) -> str:
        if stop:
            return enforce_stop_tokens(text, stop)
        return text

    @property
    def _llm_type(self) -> str:
        """Return type of llm."""
        return "vertex_ai"

class VertexLLM(_VertexCommon, LLM):
    model_name: str = "text-bison@001"

    @root_validator()
    def validate_environment(cls, values: Dict) -> Dict:
        """Validate that the python package exists in environment."""

        try:
            values["client"] = TextGenerationModel.from_pretrained(values["model_name"])
        except AttributeError:
            raise ValueError(
                "Could not set Vertex Text Model client."
            )

        return values

    def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str:
        """Call out to Vertex AI's create endpoint.

        Args:
            prompt: The prompt to pass into the model.

        Returns:
            The string generated by the model.
        """
        return self._predict(prompt, stop)


class _VertexChatCommon(_VertexCommon):
    """Wrapper around Vertex AI Chat large language models.


    """
    model_name: str = "chat-bison@001"
    """Model name to use."""

    @root_validator()
    def validate_environment(cls, values: Dict) -> Dict:
        """Validate that the python package exists in environment."""


        try:
            values["client"] = ChatModel.from_pretrained(values["model_name"])
        except AttributeError:
            raise ValueError(
                "Could not set Vertex Text Model client."
            )

        return values

    def _response_to_chat_results(
        self, response: object, stop: Optional[List[str]]
    ) -> ChatResult:
        text = self._enforce_stop_words(response.text, stop)
        return ChatResult(generations=[ChatGeneration(message=AIMessage(content=text))])

class VertexChat(_VertexChatCommon, BaseChatModel):
    """Wrapper around Vertex AI large language models."""

    def _generate(
        self, messages: List[BaseMessage], stop: Optional[List[str]] = None
    ) -> ChatResult:
        chat, prompt = self._start_chat(messages)
        response = chat.send_message(prompt)
        return self._response_to_chat_results(response, stop=stop)

    def _start_chat(
        self, messages: List[BaseMessage]
    ) -> Tuple[object, str]:
        """Start a chat.
        Args:
            messages: a list of BaseMessage.
        Returns:
            a tuple that has a Vertex AI chat model initializes, and a prompt to send to the model.
        Currently it expects either one HumanMessage, or two message (SystemMessage and HumanMessage).
        If two messages are provided, the first one would be use for context.
        """
        if len(messages) == 1:
            message = messages[0]
            if not isinstance(message, HumanMessage):
                raise ValueError("Message should be from a human if it's the first one.")
            context, prompt = None, message.content
        elif len(messages) == 2:
            first_message, second_message = messages[0], messages[1]
            if not isinstance(first_message, SystemMessage):
                raise ValueError(
                    "The first message should be a system if there're two of them."
                )
            if not isinstance(second_message, HumanMessage):
                raise ValueError("The second message should be from a human.")
            context, prompt = first_message.content, second_message.content
        else:
            raise ValueError(f"Chat model expects only one or two messages. Received {len(messages)}")
        chat = self.client.start_chat(context=context, **self._default_params)
        return chat, prompt

    async def _agenerate(
        self, messages: List[BaseMessage], stop: Optional[List[str]] = None
    ) -> ChatResult:
        raise NotImplementedError(
            """Vertex AI doesn't support async requests at the moment."""
        )


class VertexMultiTurnChat(_VertexChatCommon, BaseChatModel):
    """Wrapper around Vertex AI large language models."""

    chat: Optional[object] = None

    def clear_chat(self) -> None:
        self.chat = None

    def start_chat(self, message: Optional[SystemMessage] = None) -> None:
        if self.chat:
            raise ValueError("Chat has already been started. Please, clear it first.")
        if message and not isinstance(message, SystemMessage):
            raise ValueError("Context should be a system message")
        context = message.content if message else None
        self.chat = self.client.start_chat(context=context, **self._default_params)

    @property
    def history(self) -> List[Tuple[str]]:
        """Chat history."""
        if self.chat:
            return self.chat._history
        return []

    def _generate(
        self, messages: List[BaseMessage], stop: Optional[List[str]] = None
    ) -> ChatResult:
        if len(messages) != 1:
            raise ValueError(
                "You should send exactly one message to the chat each turn."
            )
        if not self.chat:
            raise ValueError("You should start_chat first!")
        response = self.chat.send_message(messages[0].content)
        return self._response_to_chat_results(response, stop=stop)

    async def _agenerate(
        self, messages: List[BaseMessage], stop: Optional[List[str]] = None
    ) -> ChatResult:
        raise NotImplementedError(
            """Vertex AI doesn't support async requests at the moment."""
        )

class VertexEmbeddings(Embeddings, BaseModel):
    """Wrapper around Vertex AI large language models embeddings API.
    """
    model_name: str = "textembedding-gecko@001"
    """Model name to use."""

    model: Any
    requests_per_minute: int = 15


    @root_validator()
    def validate_environment(cls, values: Dict) -> Dict:
        """Validate that the python package exists in environment."""

        try:
            values["model"] = TextEmbeddingModel

        except AttributeError:
            raise ValueError(
                "Could not set Vertex Text Model client."
            )

        return values

    class Config:
        """Configuration for this pydantic object."""

        extra = Extra.forbid

    def embed_documents(self, texts: List[str]) -> List[List[float]]:
      """Call Vertex LLM embedding endpoint for embedding docs
      Args:
          texts: The list of texts to embed.
      Returns:
          List of embeddings, one for each text.
      """
      self.model = self.model.from_pretrained(self.model_name)

      limiter = rate_limit(self.requests_per_minute)
      results = []
      docs = list(texts)

      while docs:
        # Working in batches of 2 because the API apparently won't let
        # us send more than 2 documents per request to get embeddings.
        head, docs = docs[:2], docs[2:]
        # print(f'Sending embedding request for: {head!r}')
        chunk = self.model.get_embeddings(head)
        results.extend(chunk)
        next(limiter)

      return [r.values for r in results]

    def embed_query(self, text: str) -> List[float]:
      """Call Vertex LLM embedding endpoint for embedding query text.
      Args:
        text: The text to embed.
      Returns:
        Embedding for the text.
      """
      single_result = self.embed_documents([text])
      return single_result[0]

#Initialize a model
llm = VertexLLM(
    model_name='text-bison@001',
    max_output_tokens=256,
    temperature=0,
    top_p=0.8,top_k=40,
    verbose=True,
)

### Create SQL engine for BigQuery

In [1]:
from sqlalchemy import *
from sqlalchemy.engine import create_engine
from sqlalchemy.schema import *
import pandas as pd

In [5]:
table_uri = f"bigquery://{project_id}/{dataset_id}"
engine = create_engine(f"bigquery://{project_id}/{dataset_id}")

In [None]:
#Testing all tables
query=f"""SELECT * FROM {project_id}.{dataset_id}.{table_name1}"""
engine.execute(query).first()

### SQL Chain setup for LLM

In [16]:
from langchain import SQLDatabase, SQLDatabaseChain
from langchain.prompts.prompt import PromptTemplate

def bq_qna(question):
  #create SQLDatabase instance from BQ engine
  # db = SQLDatabase(engine=engine,metadata=MetaData(bind=engine),include_tables=[x for x in table_names])
  db = SQLDatabase(engine=engine,metadata=MetaData(bind=engine),include_tables=[table_name1])
  #create SQL DB Chain with the initialized LLM and above SQLDB instance
  db_chain = SQLDatabaseChain.from_llm(llm, db, verbose=True, return_intermediate_steps=True)

  #Define prompt for BigQuery SQL
  _googlesql_prompt = """You are a GoogleSQL expert. Given an input question, first create a syntactically correct GoogleSQL query to run, then look at the results of the query and return the answer to the input question.
  Unless the user specifies in the question a specific number of examples to obtain, query for at most {top_k} results using the LIMIT clause as per GoogleSQL. You can order the results to return the most informative data in the database.
  Never query for all columns from a table. You must query only the columns that are needed to answer the question. Wrap each column name in backticks (`) to denote them as delimited identifiers.
  Pay attention to use only the column names you can see in the tables below. Be careful to not query for columns that do not exist. Also, pay attention to which column is in which table.
  Use the following format:
  Question: "Question here"
  SQLQuery: "SQL Query to run"
  SQLResult: "Result of the SQLQuery"
  Answer: "Final answer here"
  Only use the following tables:
  {table_info}

  If someone asks for aggregation on a STRING data type column, then CAST column as NUMERIC before you do the aggregation.

  If someone asks for specific month, use ActivityDate between current month's start date and current month's end date

  If someone asks for column names in the table, use the following format:
  SELECT column_name
  FROM `{project_id}.{dataset_id}`.INFORMATION_SCHEMA.COLUMNS
  WHERE table_name in {table_info}

  Question: {input}"""

  GOOGLESQL_PROMPT = PromptTemplate(
      input_variables=["input", "table_info", "top_k", "project_id", "dataset_id"],
      template=_googlesql_prompt,
  )

  #passing question to the prompt template
  final_prompt = GOOGLESQL_PROMPT.format(input=question, project_id =project_id, dataset_id=dataset_id, table_info=table_names, top_k=10000)

  #pass final prompt to SQL Chain
  output = db_chain(final_prompt)


  return output['result'], output['intermediate_steps'][1]


### Testing the setup

In [None]:
#Testing 1
bq_qna('Show me the top 10 product with category_2 equal to Clothing with the best ranking')

In [None]:
#Testing 2
bq_qna('Show me the top 10 product with category_2 equal to Clothing with the best ranking, include price_range y la image_url')

In [None]:
#Testing 3 - Metadata queries
bq_qna('what are the product name with greatest price')

In [None]:
#Testing 4 - Joins
bq_qna('what are the product name with greatest price, include the price min and max')

In [None]:
#Testing 5 - Joins
bq_qna("""show me the url to the product name Velilla Mono""")

In [None]:
#Testing 5 - Joins
bq_qna("""show me the url to the products barnd exact Velilla with date 8 Aug 2023""")

### UI for Demo

In [None]:
import gradio as gr

with gr.Blocks() as demo:
    gr.Markdown(
    """
    ## Ask BiqQuery

    This demo is to showcase answering questions on a tabular data available in Big Query using Vertex PALM LLM & Langchain.

    This demo uses a sample public dataset from Kaggle (https://www.kaggle.com/datasets/arashnic/fitbit)

    ### Sample Inputs:
    1. Show me the top 10 product with category_2 equal to Clothing with the best ranking, include price_range y la image_url ?
    2. what are the product name with greatest price, include the price min and max ?
    3. show me the url to the products barnd exact Velilla with date 8 Aug 2023

    ### Enter a search query...

    """)
    with gr.Row():
      with gr.Column():
        input_text = gr.Textbox(label="Question", placeholder="Show me the top 10 product with category_2 equal to Clothing with the best ranking")

    with gr.Row():
      generate = gr.Button("Ask BigQuery")

    with gr.Row():
      label2 = gr.Textbox(label="Output")
    with gr.Row():
      label3 = gr.Textbox(label="SQL query generated by LLM")

    generate.click(bq_qna,input_text, [label2, label3])
demo.launch(share=False, debug=False)