In [1]:
%pip install google-cloud-discoveryengine --upgrade --user
%pip install --upgrade google-auth
!pip install --upgrade --quiet \
    google-cloud-aiplatform==1.51.0 \
    langchain==0.1.20 \
    langchain-google-vertexai==1.0.3 \
    cloudpickle==3.0.0 \
    pydantic==2.7.1 \
    langchain_google_community \
    google-cloud-discoveryengine \
    google-api-python-client \
    requests \
    python-dotenv



In [2]:
## All the Required imports
from google.api_core.client_options import ClientOptions
from google.cloud import discoveryengine_v1 as discoveryengine
from dotenv import load_dotenv, find_dotenv
from vertexai.preview import reasoning_engines
from googleapiclient import discovery
from langchain.agents.format_scratchpad import format_to_openai_function_messages
from langchain.memory import ChatMessageHistory
from operator import itemgetter
from typing import List
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.documents import Document
from langchain_core.messages import BaseMessage, AIMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.runnables import (
    RunnableLambda,
    ConfigurableFieldSpec,
    RunnablePassthrough,
)
from langchain_google_vertexai import HarmBlockThreshold, HarmCategory
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_core import prompts
from langchain_core import agents
from langchain.agents.format_scratchpad import (
    format_to_openai_function_messages
)

import google.auth
import json
import time
import os
import vertexai
import requests


In [19]:
## Environment Variables needs to be set explicitly based on the env(dev/SIT)
# !export PROJECT_ID="enterprise-search-gen-ai"
# !export LOCATION="global"
# !export STAGING_BUCKET="gs://agent-test-srini"
# !export DATA_STORE_ID_PDF="tmf-metadata-layout-p_1715009821486"
# !export DATA_STORE_ID_WEB="tmf-public_1692445422672"

In [3]:
# Load environment variables
_=load_dotenv(find_dotenv())
credentials, _ = google.auth.default()
request = google.auth.transport.requests.Request()
credentials.refresh(request)
PROJECT_ID = os.getenv('PROJECT_ID')
LOCATION = os.getenv('LOCATION')
STAGING_BUCKET = os.getenv('STAGING_BUCKET')
DATA_STORE_ID_PDF = os.getenv('DATA_STORE_ID_PDF')
DATA_STORE_ID_WEB = os.getenv('DATA_STORE_ID_WEB')
AUTH_TOKEN = credentials.token

In [4]:
safety_settings = {
    HarmCategory.HARM_CATEGORY_UNSPECIFIED: HarmBlockThreshold.BLOCK_NONE,
    HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
    HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_ONLY_HIGH,
    HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_LOW_AND_ABOVE,
    HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE,
}

# model ="gemini-1.5-pro-latest"
model = "gemini-1.5-pro-preview-0409"

# model configuration
model_kwargs = {
    # temperature (float): The sampling temperature controls the degree of
    # randomness in token selection.
    "temperature": 0.28,
    # max_output_tokens (int): The token limit determines the maximum amount of
    # text output from one prompt.
    "max_output_tokens": 1000,
    # top_p (float): Tokens are selected from most probable to least until
    # the sum of their probabilities equals the top-p value.
    "top_p": 0.95,
    # top_k (int): The next token is selected from among the top-k most
    # probable tokens.
    "top_k": 40,
    "safety_settings": safety_settings,
    # safety_settings (Dict[HarmCategory, HarmBlockThreshold]): The safety
    # settings to use for generating content.\
}

In [5]:
## This prompt template needs to be changed

prompt = {
    "history": lambda x: x["history"],
    "input": lambda x: x["input"],
    "agent_scratchpad": (
        lambda x: format_to_openai_function_messages(x["intermediate_steps"])
    ),
} | prompts.ChatPromptTemplate.from_messages([
    ("system", """
    Please use the appropriate tool based on the user's input.
    Call search_web_and_pdf to answer all the user questions EXCEPT code generation requests.
    Call swagger_gen to generate code for the spec in the url and respond as in the example below.
    Example:
    I have generated server code in Python Flask for the spec in the url http://petstore.swagger.io/v2/swagger.json.
    You can download it from this link: 'https://generator.swagger.io/api/gen/download/4ad93713-be44-4514-8344-493bb44d5b97'
    """),
    ("placeholder", "{history}"),
    ("user", "{input}"),
    ("placeholder", "{agent_scratchpad}"),
])

In [6]:
vertexai.init(project=PROJECT_ID, location='us-central1', staging_bucket=STAGING_BUCKET)

In [7]:
def gcp_websearch(
    query_input: str,
    auth_token: str,
    project_id:str,
    location_id: str,
    datastore_id: str,
    llm_model_version:str,
    summary_result_size:int
):

    """Looks up for things in the web related to tmf forum"""

    import requests
    import json

    END_POINT_URL = f'https://discoveryengine.googleapis.com/v1alpha/projects/{project_id}/locations/{location_id}/collections/default_collection/dataStores/{datastore_id}/conversations/-:converse'
    headers = {'Content-type': 'application/json',
               'Accept': 'text/plain',
               'Authorization':f'Bearer {auth_token}'}
    data = {
      "query": {
        "input": query_input
        },
      "summarySpec": {
        "summaryResultCount": summary_result_size,
        # "modelPromptSpec": {
        #   "preamble": "Given the conversation between a user and a helpful assistant and some search results, create a final answer for the assistant. The answer should use all relevant information from the search results, not introduce any additional information, and use exactly the same words as the search results when possible. The assistant's answer should be no more than 20 sentences. The user is an expert who has an in-depth understanding of the subject matter. The assistant should answer in a technical manner that uses specialized knowledge and terminology when it helps answer the query."
        # },
        "modelSpec": {
          "version": llm_model_version
        },
        "ignoreAdversarialQuery": True,
        "includeCitations": True
      }
    }

    r = requests.post(END_POINT_URL, data=json.dumps(data), headers=headers)
    return r.json()




In [8]:
def gcp_pdfsearch(
    query_input: str,
    auth_token: str,
    project_id: str,
    location_id: str,
    datastore_id: str,
    page_size: int,
    llm_model_version: str,
)-> str:


    """Looks up for things in pdf document stored  related to tmf forum"""

    import requests
    import json


    END_POINT_URL = f'https://discoveryengine.googleapis.com/v1alpha/projects/{project_id}/locations/{location_id}/collections/default_collection/dataStores/{datastore_id}/servingConfigs/default_search:search'
    headers = {'Content-type': 'application/json',
               'Accept': 'text/plain',
               'Authorization':f'Bearer {auth_token}'}
    data = {
      "query": query_input,
      "pageSize": page_size,
      "queryExpansionSpec": {
        "condition": "AUTO"
      },
      "spellCorrectionSpec": {
        "mode": "AUTO"
      }
    }

    r = requests.post(END_POINT_URL, data=json.dumps(data), headers=headers)
    return r.json()



In [9]:
def search_web_and_pdf(
    query_str: str):

    """This is the function to call to answer all the user questions EXCEPT code generation requests."""

    import requests
    from dotenv import load_dotenv, find_dotenv
    import os
    # _=load_dotenv(find_dotenv())

    PROJECT_ID = "enterprise-search-gen-ai"
    LOCATION_ID = "global"
    DATA_STORE_ID_PDF = "tmf-metadata-layout-p_1715009821486" # PDF
    DATA_STORE_ID_WEB = "tmf-public_1692445422672" # WEB

    query = query_str

    vertexai.init(project=PROJECT_ID, location='us-central1')

    from google.oauth2 import service_account
    from vertexai.generative_models import (
      GenerativeModel,
    )

    import google.auth
    credentials, _ = google.auth.default()
    model = GenerativeModel("gemini-1.5-flash-preview-0514")
    request = google.auth.transport.requests.Request() # User is handling it, user token will come, if the service account token will come.
    credentials.refresh(request)



    pdf_response = gcp_pdfsearch(
                  query_input=query_str,
                  auth_token=credentials.token,
                  project_id=PROJECT_ID,
                  location_id=LOCATION_ID,
                  datastore_id=DATA_STORE_ID_PDF,
                  page_size= 10,
                  llm_model_version="gemini-1.0-pro-002/answer_gen/v1"
                )

    web_response = gcp_websearch(
                  query_input=query_str,
                  auth_token=credentials.token,
                  project_id=PROJECT_ID,
                  location_id=LOCATION_ID,
                  datastore_id=DATA_STORE_ID_WEB,
                  summary_result_size= 5,
                  llm_model_version="gemini-1.0-pro-002/answer_gen/v1"
                )


    # return response1, response2
    # print(response1)

    # nothing_found = True

    # summary_query = """You are an expert TM Forum assistant talking to a technically savvy user.
    # Please answer the user query from all the relevant information conatained in any part of the contexts.""" + query + "\n\n"

    # # Provide your response as either itemized lists as bullets or running text in paragraphs as appropriate.""" + "\n\n" + "User Query: " + query + "\n\n"
    # if "cannot be answered" not in response1.summary.summary_text:
    #     summary_query = summary_query + "Context: " + response1.summary.summary_text + "\n\n"
    #     nothing_found = False
    # if "cannot be answered" not in response2.summary.summary_text:
    #     summary_query = summary_query + "Context: " + response2.summary.summary_text + "\n\n"
    #     nothing_found = False

    # if not nothing_found:
    #     summary = model.generate_content(summary_query)
    # else:
    #     summary = "I am sorry, this question cannot be answered from information I have.  Please let me know if I can help you in finding any other TM Forum information"
    consolidated_responses = [pdf_response, web_response]
    return consolidated_responses

In [10]:
def swagger_gen(
    text: str,
):


    """
    Generates Server/ Client code using Swagger Code Generator Cloud Function for the given text.

    Args:
    - text (str): The text to be sent in the request body.
    - url (str): The endpoint URL.

    Returns:
    - dict: The JSON response from the server.
    """
    import requests
    import json
    import google.auth.transport.requests
    import google.oauth2.id_token


    # credentials, _ = google.auth.default()
    auth_request = google.auth.transport.requests.Request() # User is handling it, user token will come, if the service account token will come.
    url= "https://us-central1-enterprise-search-gen-ai.cloudfunctions.net/swagger-code-generator-1"
    id_token = google.oauth2.id_token.fetch_id_token(auth_request, url)

    headers = {
        "Authorization": f"bearer {id_token}",
        "Content-Type": "application/json"
    }

    data = {
        "text": text
    }

    response = requests.post(url, headers=headers, json=data, timeout=70)

    return response.json()




In [None]:
res = swagger_gen("Can you give me the code for Product catalogue management version 4?")
res

In [11]:
class InMemoryHistory(BaseChatMessageHistory, BaseModel):
    """In memory implementation of chat message history."""

    messages: List[BaseMessage] = Field(default_factory=list)

    def add_messages(self, messages: List[BaseMessage]) -> None:
        """Add a list of messages to the store"""
        self.messages.extend(messages)

    def clear(self) -> None:
        self.messages = []

# Here we use a global variable to store the chat message history.
# This will make it easier to inspect it to see the underlying results.
store = {}

def get_by_session_id(session_id: str) -> BaseChatMessageHistory:
    if session_id not in store:
        store[session_id] = InMemoryHistory()
    return store[session_id]


In [12]:
DISPLAY_NAME = "TMFMultiChatApp"

remote_app = reasoning_engines.ReasoningEngine.create(
    reasoning_engines.LangchainAgent(
        model=model,
        tools=[search_web_and_pdf,swagger_gen],
        model_kwargs=model_kwargs,
        agent_executor_kwargs={"return_intermediate_steps": True},
        chat_history=get_by_session_id
    ),
    requirements=[
        "google-cloud-aiplatform==1.51.0",
        "langchain==0.1.20",
        "langchain-google-vertexai==1.0.3",
        "cloudpickle==3.0.0",
        "pydantic==2.7.1",
        "requests",
        "google-cloud-discoveryengine",
        "google-auth",
        "python-dotenv"
    ],
    display_name=DISPLAY_NAME,
)
remote_app

INFO:vertexai.reasoning_engines._reasoning_engines:Using bucket agent-test-srini
INFO:vertexai.reasoning_engines._reasoning_engines:Writing to gs://agent-test-srini/reasoning_engine/reasoning_engine.pkl
INFO:vertexai.reasoning_engines._reasoning_engines:Writing to gs://agent-test-srini/reasoning_engine/requirements.txt
INFO:vertexai.reasoning_engines._reasoning_engines:Creating in-memory tarfile of extra_packages
INFO:vertexai.reasoning_engines._reasoning_engines:Writing to gs://agent-test-srini/reasoning_engine/dependencies.tar.gz
INFO:vertexai.reasoning_engines._reasoning_engines:Creating ReasoningEngine
INFO:vertexai.reasoning_engines._reasoning_engines:Create ReasoningEngine backing LRO: projects/982845833565/locations/us-central1/reasoningEngines/6584192286471487488/operations/181261398146285568
INFO:vertexai.reasoning_engines._reasoning_engines:ReasoningEngine created. Resource name: projects/982845833565/locations/us-central1/reasoningEngines/6584192286471487488
INFO:vertexai.re

<vertexai.reasoning_engines._reasoning_engines.ReasoningEngine object at 0x7dbb71155990> 
resource name: projects/982845833565/locations/us-central1/reasoningEngines/6584192286471487488

In [13]:
import pprint
try:
  res = remote_app.query(input="Can you give me the code for Product catalogue management version 4?", config={"configurable": {"session_id": "game"}})
  pprint.pprint(res)
except Exception as e:
  pprint.pprint(e)

{'history': [],
 'input': 'Can you give me the code for Product catalogue management version '
          '4?',
 'intermediate_steps': [[{'id': ['langchain',
                                 'schema',
                                 'agent',
                                 'ToolAgentAction'],
                          'kwargs': {'log': '\n'
                                            'Invoking: `swagger_gen` with '
                                            "`{'text': 'Product Catalog "
                                            "Management API V4'}`\n"
                                            '\n'
                                            '\n',
                                     'message_log': [{'id': ['langchain',
                                                             'schema',
                                                             'messages',
                                                             'AIMessageChunk'],
                                        

## Bulk Queries Testing

In [None]:
import pprint

import re
import time

from ratelimit import limits, sleep_and_retry

# Define the rate limit (e.g., 1000 requests per minute)
RATE_LIMIT = 5 # Number of requests
RATE_PERIOD = 60  # Time period in seconds

running_time = []
# Decorator to apply rate limiting
@sleep_and_retry
@limits(calls=RATE_LIMIT, period=RATE_PERIOD)
def query_reasoning_engine(query,reasoning_engine):
    return reasoning_engine.query(input=query)


def extract_pdf_references(data):
    references = []
    try:
        # Regular expression to find references
        reference_pattern = r'references {\s*title: "(.*?)"\s*document: "(.*?)"\s*uri: "(.*?)"'
        matches = re.findall(reference_pattern, data, re.DOTALL)

        for match in matches:
            title, document, uri = match
            references.append({
                "title": title,
                "document": document,
                "uri": uri
            })
    except Exception as e:
        print(f"An error occurred: {e}")

    return references


import re

def extract_web_references(data):
    links = []
    try:
        # Define the regular expression pattern
        pattern = r'fields {\n\s+key: \"link\"\n\s+value {\n\s+string_value: \"(.*?)\"'

        # Find all occurrences of the pattern
        matches = re.findall(pattern, data, re.DOTALL)

        # Store the matches in a separate list
        links.extend(matches)
    except Exception as e:
        print(f"An error occurred: {e}")

    return links



def answer_questions(
    row, reasoning_engine, project_id: str, location: str, top_n: int = 5
) -> None:
    while True:
        try:
            start = time.time()
            response = query_reasoning_engine(row["Query"], reasoning_engine)
            intermediate_steps = response['intermediate_steps'][0][1]

            top5docs_pdf, top5docs_web = [], []

            for pages in intermediate_steps:
                try:
                    references = extract_pdf_references(pages['repr'])
                    if len(references) == 0:
                        references = extract_web_references(pages['repr'])
                        top5docs_web.append(references)
                    else:
                        top5docs_pdf.append(references)

                    print(references)

                except Exception as e:
                    # print(f"An error occurred: {e} for page {pages}")
                    continue

            row["Top 5 Docs PDF"] = top5docs_pdf
            row["Top 5 Docs Web"] = top5docs_web
            row["Summary"] = response['output']

            end = time.time()
            running_time.append(end - start)
            print(f"Time taken for query {row['Query']} is {end - start}")

            break  # Exit the loop if successful
        except Exception as e:
            print(f"An error occurred: {e}. Retrying in 60 seconds...")
            time.sleep(60)  # Wait for 60 seconds before retrying