Copyright 2024 - Forusone : shins777@gmail.com

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

   https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

# Advanced RAG with Vertex AI Search as a grounding service.

* This notebook explains how to use grounding service in Gemini Pro.
* Refer to https://cloud.google.com/vertex-ai/generative-ai/docs/grounding/overview
* Using Vertex AI Search :
  * https://cloud.google.com/vertex-ai/generative-ai/docs/grounding/overview#ground-private

# Configuration
## Install python packages
* Vertex AI SDK for Python
  * https://cloud.google.com/python/docs/reference/aiplatform/latest


In [1]:
%pip install --upgrade --quiet google-cloud-aiplatform google-cloud-discoveryengine

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.4/2.4 MB[0m [31m14.2 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
from IPython.display import display, Markdown

## Authentication to access to the GCP & Google drive

* Use OAuth to access the GCP environment.
 * Refer to the authentication methods in GCP : https://cloud.google.com/docs/authentication?hl=ko

In [66]:
#  For only colab to authenticate to get an access to the GCP.
import sys

if "google.colab" in sys.modules:
    from google.colab import auth
    auth.authenticate_user()

    # To access contents in Google drive
    # from google.colab import drive
    # drive.mount('/content/drive')

## Set the environment on GCP Project

In [67]:
MODEL_NAME="gemini-1.5-flash"
PROJECT_ID="ai-hangsik"
REGION="asia-northeast3"



### Vertex AI initialization
Configure Vertex AI and access to the foundation model.
* Vertex AI initialization : aiplatform.init(..)
  * https://cloud.google.com/python/docs/reference/aiplatform/latest#initialization

In [68]:
import os
import time
import ast
import requests
import json
import numpy as np
from concurrent.futures import ThreadPoolExecutor
from operator import is_not
from functools import partial

import google
import google.oauth2.credentials
import google.auth.transport.requests
from google.oauth2 import service_account

import vertexai
from vertexai.generative_models import (
    GenerationConfig,
    GenerativeModel,
    HarmBlockThreshold,
    HarmCategory,
    Part,
)

# Initalizate the current vertex AI execution environment.
vertexai.init(project=PROJECT_ID, location=REGION)


### Ranking service

In [110]:
#--------[ Ranking service ]---------------

from google.cloud import discoveryengine_v1alpha as discoveryengine
re_rank_client = discoveryengine.RankServiceClient()

ranking_config = re_rank_client.ranking_config_path(
    project=PROJECT_ID,
    location="global",
    ranking_config="default_ranking_config",
)

RANKER_MODEL = f"semantic-ranker-512@latest"
RANK_SCORE = 0.3

### Vertex AI Search end point URL

In [103]:


# Constant - Configuration
SEARCH_URL = "https://discoveryengine.googleapis.com/v1alpha/projects/721521243942/locations/global/collections/default_collection/dataStores/it-laws-ds_1713063479348/servingConfigs/default_search:search"
NUM_SEARCH = 2
NUM_QUESTION = 3

# Credential token for REST call to Vertex AI
stream = os.popen('gcloud auth print-access-token')
CREDENTIAL_TOKEN = stream.read().strip()




## Process for RAG

 1. Classify complex questions into several.
 2. Search each question through Vertex AI Search.
 3. Only the necessary context is organized through the verification process of the search results.
 4. Configure only the verified context as the final context and request the final result from LLM.


In [111]:

def rag_process(question:str, condition:dict, top_n: int = 5 ):
    """
    Controller to execute the RAG processes.
    """

    print(f"[rag_process] start rag_process : {question}")

    t1 = time.perf_counter()

    rewrited_questions = rewrite_question(question)
    rewrited_questions.append(question) # Add original question
    print(f"[rag_process] rewrited_questions : {rewrited_questions}")

    t2 = time.perf_counter()

    searched_list = search_contexts(rewrited_questions)
    print(f"[rag_process] searched_list : {searched_list}")

    t3 = time.perf_counter()

    ranked_results = ranking_results(question, searched_list, top_n)
    print(f"[rag_process] ranked_results : {ranked_results}")

    t4 = time.perf_counter()

    final_contexts = "\n Searched Context : ".join(ranked_results)

    final_prompt = f"""
      You are an AI assistant that searches for knowledge and provides some advice.
      When answering the <Question> below, please refer only to the contents within <Context>, infer step by step, summarize, and answer.

      <Context>{final_contexts}</Context>
      <Question>{question}</Question>
    """

    final_outcome = gemini_response(final_prompt)
    t5 = time.perf_counter()

    execution_stat['query_rewrite'] = round((t2-t1), 3)
    execution_stat['vertex_ai_search'] = round((t3-t2), 3)
    execution_stat['ranked_results'] = round((t4-t3), 3)
    execution_stat['llm_request'] = round((t5-t4), 3)

    return final_outcome, execution_stat

#----------------------------------------------------------------------------------------------------------------

def rewrite_question(question:str )->list:

  prompt = f"""
    This is a question generator for your precise search.
    In order to search for facts to answer the [Question] below, please create 3 questions based on [Question].
    Your answers must be in the list format below.

      [Question] : {question}
      Format : ["Question 1", "Question 2", "Question 3"]

  """
  questions = gemini_response(prompt)
  print(f"[Controller][rewrite_question] questions : {questions}")

  q_list = []

  try:
      q_list = ast.literal_eval(questions)

  # Handling for exception when splitting mixed question.
  except Exception as e:
      print(f"[Controller][rewrite_question] Query rewrite failed")
      for i in range(NUM_QUESTION):
          q_list.append(question)

  print(f"[Controller][rewrite_question] Generated Question List : {q_list}")

  return q_list


#----------------------------------------------------------------------------------------------------------------

def search_contexts(rewrited_questions):
    """
    Controller to execute the RAG processes.

    1. Call flow for mixed question:
        question_splitter --> search_chunks
    2. Call flow for singuar question:
        search_chunks
    """

    # t1 = time.perf_counter()

    # Parallel processing to reduce the latency for the Vertex AI Search.
    with ThreadPoolExecutor(max_workers=10) as executor:
        searched_contexts = executor.map(search_parsing, rewrited_questions )

    searched_list = [context for context in searched_contexts]

    print(f"[Controller][search] len(searched_list) : {len(searched_list)}")
    print(f"[Controller][search] searched_list : {searched_list}")

    return searched_list



#----------------------------------------------------------------------------------------------

def search_parsing(question:str)->str:
    print(f"[Controller][search_parsing] search_parsing Start! : {question}")

    #------- Searching --------------------------------------------------------------------
    # request = google.auth.transport.requests.Request()
    # Controller.credentials.refresh(request)

    headers = {
        "Authorization": "Bearer "+ CREDENTIAL_TOKEN,
        "Content-Type": "application/json"
    }

    query_dic ={
        "query": question,
        "page_size": str(NUM_SEARCH),
        "offset": 0,
        "contentSearchSpec":{
                "searchResultMode" : "CHUNKS",
                "chunkSpec" : {
                    "numPreviousChunks" : 1,
                    "numNextChunks" : 1
                }
        },
    }

    data = json.dumps(query_dic)
    data=data.encode("utf8")
    response = requests.post(SEARCH_URL,headers=headers, data=data)

    print(f"[Controller][search_parsing] Search Response len : {len(response.text)}")
    #print(f"[Controller][search_parsing] Search Response chunks : {response.text}")
    print(f"[Controller][search_parsing] Search End! : {question}")

    # Start to parse the searched chunks
    dict_results = json.loads(response.text)

    #------- Parsing --------------------------------------------------------------------

    search_results = {}

    if dict_results.get('results'):
        for result in dict_results['results']:
            item = {}
            chunk = result['chunk']
            item['title'] = chunk['documentMetadata']['title']
            item['uri'] = chunk['documentMetadata']['uri']
            item['pageSpan'] = f"{chunk['pageSpan']['pageStart']} ~ {chunk['pageSpan']['pageEnd']}"
            item['content'] = chunk['content']
            item['question'] = question

            if 'chunkMetadata' in chunk:
                add_chunks = chunk['chunkMetadata']
                if 'previousChunks' in add_chunks:
                    # Chunks appearing from those closest to the current Contents.
                    p_chunks = chunk['chunkMetadata']['previousChunks']
                    if p_chunks:
                        for p_chunk in p_chunks:
                            item['content'] = p_chunk['content'] +"\n"+ item['content']

                if 'nextChunks' in add_chunks:
                    n_chunks = chunk['chunkMetadata']['nextChunks']
                    if n_chunks:
                        for n_chunk in n_chunks:
                            item['content'] = item['content'] +"\n"+ n_chunk['content']

            search_results['result'] = item

    return search_results

#----------------------------------------------------------------------------------------------------------------
def ranking_results(query, search_results, top_n):

    records = []

    for index, response in enumerate(search_results):

      title = response['result']['title']
      content = response['result']['content']
      records.append(discoveryengine.RankingRecord(id=str(index), title=title, content=content))

    request = discoveryengine.RankRequest(
        ranking_config = ranking_config,
        model = RANKER_MODEL,
        top_n = top_n,
        query = query,
        records = records
      )

    ranked_response = re_rank_client.rank(request=request,)

    ranked_res_list = []

    for record in ranked_response.records:  # https://cloud.google.com/generative-ai-app-builder/docs/reference/rpc/google.cloud.discoveryengine.v1alpha#rankresponse

      if record.score > RANK_SCORE:
        print(f"ranking score > {RANK_SCORE} : {record.score}")
        print(f"Ranked result : [{record.score}] : {record.content}")
        ranked_res_list.append(record.content)
      else:
        print(f"ranking score < {RANK_SCORE} : {record.score}")
    return ranked_res_list


#----------------------------------------------------------------------------------------------------------------

def gemini_response(prompt:str,
                response_schema:dict = None):

    model = GenerativeModel(model_name=MODEL_NAME)

    generation_config = GenerationConfig(
        temperature=0.5,
        top_p=1.0,
        top_k=32,
        candidate_count=1,
        max_output_tokens=8192,
        )

    responses = model.generate_content(
        [prompt],
        generation_config = generation_config)

    print(f"[Controller][call_gemini] Final response Len {len(responses.text)}")

    return responses.text

## Execute RAG

In [112]:

from time import perf_counter

t1 = perf_counter()

question = "개인정보 분쟁조정위원회는 어떤 조직이고 역할은 무엇인가요?"
final_outcome, execution_stat = rag_process(question, 5)

display(Markdown(final_outcome))

t2  = perf_counter()

print(f"\n Total latency : {t2 - t1} seconds\n\n")
print(f"\n Detailed latency : {execution_stat} \n\n")


[rag_process] start rag_process : 개인정보 분쟁조정위원회는 어떤 조직이고 역할은 무엇인가요?
[Controller][call_gemini] Final response Len 113
[Controller][rewrite_question] questions : ["개인정보 분쟁조정위원회는 어떤 기관에 속해 있나요?", "개인정보 분쟁조정위원회는 어떤 분쟁을 조정할 수 있나요?", "개인정보 분쟁조정위원회에 조정 신청을 하려면 어떤 절차를 거쳐야 하나요?"] 

[Controller][rewrite_question] Generated Question List : ['개인정보 분쟁조정위원회는 어떤 기관에 속해 있나요?', '개인정보 분쟁조정위원회는 어떤 분쟁을 조정할 수 있나요?', '개인정보 분쟁조정위원회에 조정 신청을 하려면 어떤 절차를 거쳐야 하나요?']
[rag_process] rewrited_questions : ['개인정보 분쟁조정위원회는 어떤 기관에 속해 있나요?', '개인정보 분쟁조정위원회는 어떤 분쟁을 조정할 수 있나요?', '개인정보 분쟁조정위원회에 조정 신청을 하려면 어떤 절차를 거쳐야 하나요?', '개인정보 분쟁조정위원회는 어떤 조직이고 역할은 무엇인가요?']
[Controller][search_parsing] search_parsing Start! : 개인정보 분쟁조정위원회는 어떤 기관에 속해 있나요?
[Controller][search_parsing] search_parsing Start! : 개인정보 분쟁조정위원회는 어떤 분쟁을 조정할 수 있나요?
[Controller][search_parsing] search_parsing Start! : 개인정보 분쟁조정위원회에 조정 신청을 하려면 어떤 절차를 거쳐야 하나요?
[Controller][search_parsing] search_parsing Start! : 개인정보 분쟁조정위원회는 어떤 조직이고 역할은 무엇인가요?
[Controller][search_parsing]

## 개인정보 분쟁조정위원회는 개인정보와 관련된 분쟁을 조정하는 기관입니다. 

**구성:**

* 위원장 1명을 포함하여 30명 이내의 위원으로 구성됩니다.
* 위원은 당연직위원과 위촉위원으로 구성됩니다.
* 당연직위원은 대통령령으로 정하는 국가기관 소속 공무원입니다.
* 위촉위원은 개인정보 보호업무 경험이 있는 사람, 대학교수, 판사/검사/변호사, 시민사회단체/소비자단체 추천 인사, 개인정보처리자 사업자단체 임원 등으로 구성됩니다.

**역할:**

* 개인정보 관련 분쟁 조정을 담당합니다.
* 분쟁 당사자에게 필요한 자료를 요청하고, 사실 확인을 위해 관련 장소에 출입하여 자료를 조사하거나 열람할 수 있습니다.
* 분쟁 당사자나 참고인을 위원회에 출석시켜 의견을 들을 수 있습니다.
* 조정 전 합의를 권고할 수 있습니다.
* 조정안을 작성하고, 당사자의 수락을 받아 조정서를 작성하여 송달합니다.
* 조정 내용은 재판상 화해와 동일한 효력을 갖습니다.
* 분쟁의 성질상 조정이 적합하지 않거나 부정한 목적으로 조정이 신청된 경우에는 조정을 거부할 수 있습니다.
* 한 쪽 당사자가 소를 제기하면 조정 처리를 중지합니다.

**추가 정보:**

* 분쟁조정위원회는 분쟁조정 업무를 효율적으로 수행하기 위해 필요하다면 조정 사건의 분야별로 5명 이내의 위원으로 구성되는 조정부를 둘 수 있습니다.
* 위원은 자격정지 이상의 형을 선고받거나 심신상의 장애로 직무를 수행할 수 없는 경우를 제외하고는 그의 의사에 반하여 면직되거나 해촉되지 않습니다.
* 위원은 특정 사건에 대해 제척, 기피, 회피를 할 수 있습니다.
* 분쟁조정 신청은 개인정보와 관련된 분쟁의 조정을 원하는 자 누구나 할 수 있습니다.
* 분쟁조정위원회는 분쟁조정 신청을 받은 날부터 60일 이내에 조정안을 작성해야 합니다.
* 집단분쟁조정도 가능합니다.

**요약:** 개인정보 분쟁조정위원회는 개인정보 관련 분쟁을 조정하는 기관으로, 분쟁 당사자 간의 합의를 이끌어내고 조정 내용을 법적으로 효력을 갖도록 하는 역할을 수행합니다. 



 Total latency : 8.839952626000013 seconds



 Detailed latency : {'time_question_splitter': 2.565, 'time_ai_search': 0.274, 'time_context_verifier': 0.053, 'num_total_searched': 4, 'num_ranked_results': 1, 'llm_request': 5.728, 'query_rewrite': 2.529, 'vertex_ai_search': 0.259, 'ranked_results': 0.32} 


