Copyright 2024 - Forusone : shins777@gmail.com

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

   https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

# Advanced RAG with Vertex AI Search as a grounding service.

* This notebook explains how to use grounding service in Gemini Pro.
* Refer to https://cloud.google.com/vertex-ai/generative-ai/docs/grounding/overview
* Using Vertex AI Search :
  * https://cloud.google.com/vertex-ai/generative-ai/docs/grounding/overview#ground-private

# Configuration
## Install python packages
* Vertex AI SDK for Python
  * https://cloud.google.com/python/docs/reference/aiplatform/latest


In [1]:
%pip install --upgrade --quiet google-cloud-aiplatform

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.1/5.1 MB[0m [31m14.9 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
%pip install --upgrade --quiet google-cloud-discoveryengine

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m10.4 MB/s[0m eta [36m0:00:00[0m
[?25h

In [3]:
from IPython.display import display, Markdown

## Authentication to access to the GCP & Google drive

* Use OAuth to access the GCP environment.
 * Refer to the authentication methods in GCP : https://cloud.google.com/docs/authentication?hl=ko

In [4]:
#  For only colab to authenticate to get an access to the GCP.
import sys

if "google.colab" in sys.modules:
    from google.colab import auth
    auth.authenticate_user()

* Mount to the google drive to access the .ipynb files in the repository.

In [5]:
# To access contents in Google drive

if "google.colab" in sys.modules:
  from google.colab import drive
  drive.mount('/content/drive')

Mounted at /content/drive


# Execute the example
## Set the environment on GCP Project
* Configure project information
  * Model name : LLM model name : https://cloud.google.com/vertex-ai/generative-ai/docs/learn/models
  * Project Id : prodect id in GCP
  * Region : region name in GCP

In [6]:
MODEL_NAME="gemini-1.5-flash"
PROJECT_ID="ai-hangsik"
REGION="asia-northeast3"

### Vertex AI initialization
Configure Vertex AI and access to the foundation model.
* Vertex AI initialization : aiplatform.init(..)
  * https://cloud.google.com/python/docs/reference/aiplatform/latest#initialization

In [7]:
import vertexai
from vertexai.generative_models import GenerativeModel, Part, Tool
import vertexai.generative_models as generative_models

# Grounding service is still in preview.
from vertexai.preview.generative_models import grounding

# Initalizate the current vertex AI execution environment.
vertexai.init(project=PROJECT_ID, location=REGION)

# Access to the generative model.
model = GenerativeModel(MODEL_NAME)

### Vertex AI Search end point URL

In [8]:
import os
import time
import ast
import requests
import json
import numpy as np
from concurrent.futures import ThreadPoolExecutor
from operator import is_not
from functools import partial

import vertexai
import google
import google.oauth2.credentials
import google.auth.transport.requests
from google.oauth2 import service_account


# Constant - Configuration
SEARCH_URL = "https://discoveryengine.googleapis.com/v1alpha/projects/721521243942/locations/global/collections/default_collection/dataStores/it-laws-ds_1713063479348/servingConfigs/default_search:search"
NUM_SEARCH = 2

# Credential token for REST call to Vertex AI
stream = os.popen('gcloud auth print-access-token')
CREDENTIAL_TOKEN = stream.read().strip()




## Overall Process for RAG

 1. Classify complex questions into several.
 2. Search each question through Vertex AI Search.
 3. Only the necessary context is organized through the verification process of the search results.
 4. Configure only the verified context as the final context and request the final result from LLM.


In [9]:

def response(question:str, condition:dict ):
    """
    Controller to execute the RAG processes.
    """

    print(f"[Controller][response] Start response : {condition}")

    splitted_questions, contexts_list, verified_context_list, final_contexts, execution_stat = search(question, condition)

    t1 = time.time()

    final_prompt = f"""

    당신은 지식을 검색해서 상담해주는 AI 어시스턴트입니다.
    아래 <Question> 에 대한 답변을 할 때 반드시 <Context> 내에 있는 내용만을 참고해서 단계적으로 추론 후 요약해서 답변해주세요.
    답변할 때 결론을 먼저 언급하고, 그 뒤에 이유를 최대한 상세하게 정리해서 근거를 가지고 답해주세요.
    답변 포맷은 아래와 같이 해주세요.
    1. 답변 요약
    2. 이유 또는 참조 근거
    3. 기타 고려사항

    <Context>
        {final_contexts}

    </Context>

    <Question>
        {question}
    </Question>

    """

    # gemini is more concise to answer as of Mar 12, 2024.
    final_outcome = call_gemini(final_prompt, MODEL_NAME)

    # print(f"[Controller][response] final_prompt : {final_prompt}")

    t2 = time.time()

    execution_stat['llm_request'] = round((t2-t1), 3)

    # print(f"[Controller][response] Final_outcome : {final_outcome}")
    print(f"[Controller][response] Elapsed time : {execution_stat}")

    if condition['detailed_return']:
        return splitted_questions, contexts_list, verified_context_list, final_prompt, final_outcome, execution_stat
    else:
        return final_outcome

#----------------------------------------------------------------------------------------------------------------

def search(question:str, condition:dict ):
    """
    Controller to execute the RAG processes.

    1. Call flow for mixed question:
        question_splitter --> search_chunks
    2. Call flow for singuar question:
        search_chunks

    - quesiton : user query.
    - mixed_question : complex and composite questions
    - detailed : return more detailed information.

    """

    mixed_question = condition['mixed_question']

    splitted_questions = None
    final_contexts = None

    t1 = time.time()

    if mixed_question:

        print(f"[Controller][search] Mixed Question Processing Start! : {question}")

        # Question split for the composite question which contains several questions in a sentence.
        splitted_questions = question_splitter(question)
        # Add original question
        splitted_questions.append(question)

        t2 = time.time()

        # Parallel processing to reduce the latency for the Vertex AI Search.
        with ThreadPoolExecutor(max_workers=10) as executor:
            searched_contexts = executor.map(search_chunks, splitted_questions )

        searched_list = [context for context in searched_contexts]

        print(f"[Controller][search] len(searched_list) : {len(searched_list)}")
        print(f"[Controller][search] searched_list : {searched_list}")

    else:

        print(f"[Controller][search] Simple Question Processing Start! : {question}")

        t2 = time.time()

        # Search contexts from the question directly in the different way which one question is searched.
        contexts = search_chunks(question )
        searched_list = [contexts]

        print(f"[Controller][search] len(searched_list) : {len(searched_list)}")
        print(f"[Controller][search] searched_list : {searched_list}")

    t3 = time.time()

    question_list =[]

    for context in searched_list:
        question_list.append(context['result']['question'])

    print(f"[Controller][search] question_list : {question_list}")
    print(f"[Controller][search] searched_list : {searched_list}")

    # 만일 복합 질문 형태가 아니면 검색결과를 그대로 사용.
    if not mixed_question:
        verified_context_list = searched_list
    # 복합 질문인 경우에는 Verification 과정을 처리.
    else:
        # Context Verification for the each contex searched from the Vertex AI Search.
        with ThreadPoolExecutor(max_workers=10) as executor:
            verified_contexts = executor.map(context_verifier, searched_list, question_list)

        verified_context_list_org = [context for context in verified_contexts]
        verified_context_list = list(filter(partial(is_not, None), verified_context_list_org))

    print(f"[Controller][search] verified_contexts : {verified_context_list}")

    # Build the final context consolidated from verified contexts.
    final_contexts = ""
    for context in verified_context_list:
        if context != None:
            final_contexts = final_contexts + "\n[content] : " + context['result']['content']

    # print(f"[Controller][search] final_contexts : {final_contexts}")

    t4 = time.time()

    execution_stat = {}
    execution_stat['time_question_splitter'] = round((t2-t1), 3)
    execution_stat['time_ai_search'] = round((t3-t2), 3)
    execution_stat['time_context_verifier'] = round((t4-t3), 3)

    execution_stat['num_total_searched'] = len(searched_list)
    execution_stat['num_verified_contexts'] = len(verified_context_list)

    #elapsed_time = f"question_splitter[{t2-t1}] : ai_search {t3-t2}] : context_verifier [{t4-t3}] : Total search time : {t4-t1} "
    print(f"[Controller][search] Elapsed time : {execution_stat}")
    #print(f"[Controller][search] Final_outcome : {final_contexts}")

    return splitted_questions, searched_list, verified_context_list, final_contexts, execution_stat


#----------------------------------------------------------------------------------------------------------------

def question_splitter(question :str )->list:

    prompt = f"""
        당신을 정확한 검색을 위한 질문 생성기 입니다.
        아래 [Question]에 답하기 위한 사실을 검색할 목적으로, [Question]을 기반으로 3가지 질문을 만들어 주세요.
        답변 형태는 반드시 아래와 같은 리스트 포맷으로 답해주세요.

        [Question] : {question}
        답변 포맷 : ["질문1", "질문2", "질문3"]

    """
    questions = call_gemini(prompt, MODEL_NAME)

    # Number of questions splitted
    num_q = 2

    print(f"questions : {questions}")

    try:
        q_list = ast.literal_eval(questions)

    # Handling for exception when splitting mixed question.
    except Exception as e:
        print(f"[Controller][question_splitter] Splitting failed")
        for i in range(num_q):
            q_list.append(question)

    print(f"[Controller][question_verifier] Generated Question List : {q_list}")

    return q_list



#----------------------------------------------------------------------------------------------------------------


# https://github.com/shins777/google_gen_ai_sample/blob/main/notebook/03.grounding/ai_search/Vertex%20AI%20Search%20-%20Chunk%20REST.ipynb
def search_chunks(question:str)->str:

    print(f"[Controller][search_chunks] Search Start! : {question}")

    # request = google.auth.transport.requests.Request()
    # Controller.credentials.refresh(request)

    headers = {
        "Authorization": "Bearer "+ CREDENTIAL_TOKEN,
        "Content-Type": "application/json"
    }

    query_dic ={
        "query": question,
        "page_size": str(NUM_SEARCH),
        "offset": 0,
        "contentSearchSpec":{
                "searchResultMode" : "CHUNKS",
                "chunkSpec" : {
                    "numPreviousChunks" : 1,
                    "numNextChunks" : 1
                }
        },
    }

    data = json.dumps(query_dic)
    data=data.encode("utf8")
    response = requests.post(SEARCH_URL,headers=headers, data=data)

    print(f"[Controller][search_chunks] Search Response len : {len(response.text)}")
    #print(f"[Controller][search_chunks] Search Response chunks : {response.text}")
    print(f"[Controller][search_chunks] Search End! : {question}")

    # Start to parse the searched chunks
    dict_results = json.loads(response.text)

    search_results = {}

    if dict_results.get('results'):

        for result in dict_results['results']:

            item = {}

            chunk = result['chunk']

            item['title'] = chunk['documentMetadata']['title']
            item['uri'] = chunk['documentMetadata']['uri']
            item['pageSpan'] = f"{chunk['pageSpan']['pageStart']} ~ {chunk['pageSpan']['pageEnd']}"
            item['content'] = chunk['content']
            item['question'] = question

            if 'chunkMetadata' in chunk:

                add_chunks = chunk['chunkMetadata']
                if 'previousChunks' in add_chunks:
                    # chunk 는 현재 Contents에 가까운것 부터 나타남.
                    p_chunks = chunk['chunkMetadata']['previousChunks']
                    if p_chunks:
                        for p_chunk in p_chunks:
                            item['content'] = p_chunk['content'] +"\n"+ item['content']

                if 'nextChunks' in add_chunks:
                    n_chunks = chunk['chunkMetadata']['nextChunks']
                    if n_chunks:
                        for n_chunk in n_chunks:
                            item['content'] = item['content'] +"\n"+ n_chunk['content']

            search_results['result'] = item

    return search_results


#----------------------------------------------------------------------------------------------------------------

def context_verifier(context : str, question : str)->str:

    """
    The purpose of this function is to decrease the context size for the better latency.
    Small context size helps to decrease the latency.
    """

    prompt = f"""
        당신은 <Question>의 내용을 바탕으로 <Context>의 내용이 관련이 있는 여부를 판단하는 AI Agent 입니다.
        아래 <Context>의 내용이 아래 <Question>과 관련이 있으면 "Yes" 라고 답하세요.
        만일, 아래 <Context>의 내용이 아래 <Question>과 관련이 없으면 "No"라고 답하세요.

        <Context>
            {context['result']['content']}
        </Context>

        <Question>
            {question}
        </Question>

    """


    result = call_gemini(prompt, MODEL_NAME)
    print(f"[Controller][context_verifier] Question : {question}, Verification Result :{result}")

    # if the context is relevant to the question, return the context.

    if result.strip() == "Yes":
        return context


#----------------------------------------------------------------------------------------------------------------


def call_gemini(prompt, gemini_model):

    gemini_model = GenerativeModel(gemini_model)

    generation_config = {
        "candidate_count": 1,
        "max_output_tokens": 8092,
        "temperature": 0.5,
        "top_p": 1,
        "top_k": 40
    }
    responses = gemini_model.generate_content(
        [prompt],
        generation_config = generation_config
    )

    print(f"[Controller][call_gemini] Final response Len {len(responses.text)}")

    return responses.text

## Execute RAG

In [10]:

from time import perf_counter

t1_start = perf_counter()

#question = "개인정보 보호법에 대해서 설명해주세요."
question = "개인정보 분쟁조정위원회는 어떤 조직이고 역할은 무엇인가요?"

condition = {
    "mixed_question" : True,
    "detailed_return" : False,
}

if condition['detailed_return']:
    splitted_questions, contexts_list, verified_context_list, final_prompt, final_outcome, execution_stat = response(question, condition)

    print(f"\n\n")
    print(f"splitted_questions {splitted_questions}")
    print(f"contexts_list {contexts_list}")
    print(f"verified_context_list {verified_context_list}")
    print(f"final_prompt {final_prompt}")
    print(f"final_outcome {final_outcome}")
    print(f"execution_stat {execution_stat}")

else:
    final_outcome = response(question, condition)

    print(f"\n\n Final_outcome")
    display(Markdown(final_outcome))

t1_end  = perf_counter()
print(f"\n Total latency : {t1_end - t1_start} seconds\n\n")

[Controller][response] Start response : {'mixed_question': True, 'detailed_return': False}
[Controller][search] Mixed Question Processing Start! : 개인정보 분쟁조정위원회는 어떤 조직이고 역할은 무엇인가요?
[Controller][call_gemini] Final response Len 152
questions : ["개인정보 분쟁조정위원회는 어떤 기관에 속해 있으며, 어떤 법률에 근거하여 설립되었나요?", "개인정보 분쟁조정위원회는 어떤 유형의 개인정보 분쟁을 조정할 수 있나요?", "개인정보 분쟁조정위원회의 조정 절차는 어떻게 진행되며, 조정 결과는 어떤 효력을 가지나요?"] 

[Controller][question_verifier] Generated Question List : ['개인정보 분쟁조정위원회는 어떤 기관에 속해 있으며, 어떤 법률에 근거하여 설립되었나요?', '개인정보 분쟁조정위원회는 어떤 유형의 개인정보 분쟁을 조정할 수 있나요?', '개인정보 분쟁조정위원회의 조정 절차는 어떻게 진행되며, 조정 결과는 어떤 효력을 가지나요?']
[Controller][search_chunks] Search Start! : 개인정보 분쟁조정위원회는 어떤 기관에 속해 있으며, 어떤 법률에 근거하여 설립되었나요?
[Controller][search_chunks] Search Start! : 개인정보 분쟁조정위원회는 어떤 유형의 개인정보 분쟁을 조정할 수 있나요?
[Controller][search_chunks] Search Start! : 개인정보 분쟁조정위원회의 조정 절차는 어떻게 진행되며, 조정 결과는 어떤 효력을 가지나요?
[Controller][search_chunks] Search Start! : 개인정보 분쟁조정위원회는 어떤 조직이고 역할은 무엇인가요?
[Controller][search_chunks] Search Response len

1. 답변 요약: 개인정보 분쟁조정위원회는 개인정보보호법 위반으로 발생한 분쟁을 조정하는 기관입니다. 분쟁 당사자 간의 합의를 이끌어내고, 개인정보 침해에 대한 구제 조치를 권고하는 역할을 수행합니다.
2. 이유 또는 참조 근거: 
    - 개인정보보호법 제7장 '개인정보 분쟁조정위원회'에서 분쟁조정위원회의 역할을 명시하고 있습니다.
    - 제45조는 분쟁조정위원회가 분쟁 조정을 위해 필요한 자료를 분쟁 당사자에게 요청할 수 있다고 규정합니다.
    - 제47조는 분쟁조정위원회가 침해 행위 중지, 원상회복, 손해배상 등의 구제 조치를 포함하는 조정안을 작성할 수 있다고 명시합니다.
3. 기타 고려사항: 개인정보 분쟁조정위원회는 개인정보 침해로 인한 피해를 구제하고, 개인정보 보호를 위한 사회적 합의를 도출하는 데 중요한 역할을 합니다. 



 Total latency : 9.806196107999995 seconds


