In [None]:
! pip install google-cloud-aiplatform langchain pandas datasets google-api-python-client chromadb faiss-cpu faiss-cpu transformers config google-cloud-documentai google-cloud-storage pip install google.ai.generativelanguage tiktoken --upgrade

In [None]:
#
# Forgive me Guido
# 
# This notebook is a demo of a simple QnA fueled by PALM en enriched with context through embeddings. 
# 
# The embeddings are created by processing 3K+ old video game manual pdf's with DocAI and feeding them to Vertex AI embeddings.
# This specifiek version using batch processing in DocAI and stores the pdf file on GCS 
# 
#
# Utils
import time
from typing import List
import json

# Langchain
import langchain
from pydantic import BaseModel
from langchain.chains import RetrievalQA
from langchain.text_splitter import RecursiveCharacterTextSplitter

# Vertex AI
from google.cloud import aiplatform
from langchain.embeddings import VertexAIEmbeddings
from langchain.llms import VertexAI
from langchain.schema import HumanMessage, SystemMessage
from langchain.vectorstores import Chroma
from google.ai.generativelanguage import HarmCategory
from google.ai.generativelanguage import SafetySetting

#Document AI
import re
from typing import Optional

from google.api_core.client_options import ClientOptions
from google.api_core.exceptions import InternalServerError
from google.api_core.exceptions import RetryError
from google.cloud import storage

# the normal one didn't understand sharding. I ended up not using sharing but might as well keep using this one.
import google.cloud.documentai_v1beta3 as documentai

# Variables
#
# DocAI

project_number = 'NUMBER'
location = 'eu' # Format is 'us' or 'eu'
processor_id = 'fe9efa246ac573db' #  Create processor before running sample
processor_version_id = 'pretrained-ocr-v1.0-2020-09-23' # Processor version to use
input_mime_type = 'application/pdf' # Refer to https://cloud.google.com/document-ai/docs/file-types for supported file types
gcs_bucket_name = 'BUCKETNAME' # Sure could construct the other variables from this one - some people prefer it like this
gcs_input_uri = "BUCKET_DIR"  # Format: `gs://bucket/directory/file.pdf`` or `gs://bucket/directory/``
field_mask = "text"  # Optional. The fields to return in the Document object.
gcs_output_uri = "gs://OUTPUT_PATH" # this is were the processed files get stored as json


In [None]:
# Stolen from: https://github.com/GoogleCloudPlatform/generative-ai/blob/main/language/examples/langchain-intro/intro_langchain_palm_api.ipynb
#
# Utility functions for Embeddings API with rate limiting
def rate_limit(max_per_minute):
    period = 60 / max_per_minute
    print("Waiting")
    while True:
        before = time.time()
        yield
        after = time.time()
        elapsed = after - before
        sleep_time = max(0, period - elapsed)
        if sleep_time > 0:
            print(".", end="")
            time.sleep(sleep_time)

class CustomVertexAIEmbeddings(VertexAIEmbeddings, BaseModel):
    requests_per_minute: int
    num_instances_per_batch: int

    # Overriding embed_documents method
    def embed_documents(self, texts: List[str]):
        limiter = rate_limit(self.requests_per_minute)
        results = []
        docs = list(texts)

        while docs:
            # Working in batches because the API accepts maximum 5
            # documents per request to get embeddings
            head, docs = (
                docs[: self.num_instances_per_batch],
                docs[self.num_instances_per_batch :],
            )
            chunk = self.client.get_embeddings(head)
            results.extend(chunk)
            next(limiter)

        return [r.values for r in results]

In [None]:
# Stolen from: https://github.com/GoogleCloudPlatform/generative-ai/blob/main/language/examples/langchain-intro/intro_langchain_palm_api.ipynb
# 
#  I increased the temp a little, haven't experimented with top k and p to much
#
# LLM model

llm = VertexAI(
    model_name="text-bison@001",
    max_output_tokens=256,
    temperature=0.5,
    top_p=0.8,
    top_k=40,
    verbose=True,
    #safety_settings = [
    #    {
    #        "category": HarmCategory.HARM_CATEGORY_VIOLENCE,
     #       "threshold": SafetySetting.HarmBlockThreshold.BLOCK_ONLY_HIGH,
     #   }
   # ]
)
# Embedding
EMBEDDING_QPM = 60
EMBEDDING_NUM_BATCH = 5
embeddings = CustomVertexAIEmbeddings(
    requests_per_minute=EMBEDDING_QPM,
    num_instances_per_batch=EMBEDDING_NUM_BATCH,
    max_output_tokens=1024 # I've changed the default to allow for more output token in the embeddings (default is like 256)
)

In [None]:
# Straight out of our docs and some online examples. Some examples cheated by reading the results immediately and dumping
# it into some big string in the function but I'm not doing that
#
#
def batch_process_documents(
    project_number: str,
    location: str,
    processor_id: str,
    gcs_input_uri: str,
    gcs_output_uri: str,
    processor_version_id: Optional[str] = None,
    input_mime_type: Optional[str] = None,
    field_mask: Optional[str] = None,
    timeout: int = 9999, # gave it a loooong timeout because we are processing a lot of files
):
    # You must set the api_endpoint if you use a location other than "us".
    opts = ClientOptions(api_endpoint=f"{location}-documentai.googleapis.com")

    client = documentai.DocumentProcessorServiceClient(client_options=opts)

    if not gcs_input_uri.endswith("/") and "." in gcs_input_uri:
        # Specify specific GCS URIs to process individual documents
        gcs_document = documentai.GcsDocument(
            gcs_uri=gcs_input_uri, mime_type=input_mime_type
        )
        # Load GCS Input URI into a List of document files
        gcs_documents = documentai.GcsDocuments(documents=[gcs_document])
        input_config = documentai.BatchDocumentsInputConfig(gcs_documents=gcs_documents)
    else:
        # Specify a GCS URI Prefix to process an entire directory
        gcs_prefix = documentai.GcsPrefix(gcs_uri_prefix=gcs_input_uri)
        input_config = documentai.BatchDocumentsInputConfig(gcs_prefix=gcs_prefix)
        
    # Cloud Storage URI for the Output Directory
    gcs_output_config = documentai.DocumentOutputConfig.GcsOutputConfig(
        gcs_uri=gcs_output_uri, field_mask=field_mask
    )
    # Where to write results
    output_config = documentai.DocumentOutputConfig(gcs_output_config=gcs_output_config)

    if processor_version_id:
        # The full resource name of the processor version, e.g.:
        # projects/{project_number}/locations/{location}/processors/{processor_id}/processorVersions/{processor_version_id}
        name = client.processor_version_path(
            project_number, location, processor_id, processor_version_id
        )
    else:
        # The full resource name of the processor, e.g.:
        # projects/{project_number}/locations/{location}/processors/{processor_id}
        name = client.processor_path(project_number, location, processor_id)

    request = documentai.BatchProcessRequest(
        name=name,
        input_documents=input_config,
        document_output_config=output_config, 
    )

    # BatchProcess returns a Long Running Operation (LRO)
    operation = client.batch_process_documents(request)

    # Continually polls the operation until it is complete.
    # This could take some time for larger files
    # Format: projects/{project_number}/locations/{location}/operations/{operation_id}
    try:
        print(f"Waiting for operation {operation.operation.name} to complete...")
        operation.result(timeout=timeout)
    # Catch exception when operation doesn"t finish before timeout
    except (RetryError, InternalServerError) as e:
        print(e.message)

    # NOTE: Can also use callbacks for asynchronous processing
    #
    # def my_callback(future):
    #   result = future.result()
    #
    # operation.add_done_callback(my_callback)

    # Once the operation is complete,
    # get output document information from operation metadata
    metadata = documentai.BatchProcessMetadata(operation.metadata)

    if metadata.state != documentai.BatchProcessMetadata.State.SUCCEEDED:
        raise ValueError(f"Batch Process Failed: {metadata.state_message}")

In [None]:
# call the function
batch_process_documents(
        project_number=project_number,
        location=location,
        processor_id=processor_id,
        gcs_input_uri=gcs_input_uri,
        gcs_output_uri=gcs_output_uri,
        input_mime_type=input_mime_type,
        field_mask=field_mask,
    )

In [None]:
# Reading the output files from DocAI from GCS. I'm using the document object from DocAI to easily load the JSON.
#
# Maybe its the old school metadata fanboy in me but I also store the all the paths as strings in the docs list
#
client = storage.Client()
output_blobs = client.list_blobs(gcs_bucket_name, prefix="output/")
docs=[]
paths=[]
for blob in output_blobs:
    if blob.content_type != "application/json":
        print(f"Skipping non-supported file: {blob.name} - Mimetype: {blob.content_type}")
        continue
    paths.append(blob.name)
    document=documentai.Document.from_json(blob.download_as_bytes(), ignore_unknown_fields=True)
    docs.append(document.text)
paths="".join(paths)
docs.append(paths)

In [None]:
# The max token size for outputs from embeddings is 1024, same as the max input token size for Palm.
# That leaves no room for a prompt, so I'm using the recursive textsplitter to make smaller chunks. 
# Might be interesting to see the results with even smaller chunks
#
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(chunk_size=800, chunk_overlap=50)
texts = text_splitter.create_documents(docs)

In [None]:
# Store docs in local vectorstore as index
# it may take a while since API is rate limited
# Also found this somewhere, added persistence for the db
# This takes a lotta lottta lotta time
db = Chroma.from_documents(texts, embeddings, persist_directory = "index_batch")
db.persist()

In [None]:
# Max k as a search arguments gives us some room to experiment what works best when using embeddings. 
#
#
retriever = db.as_retriever(search_type="similarity", search_kwargs={"k": 10})

In [None]:
# Uses LLM to synthesize results from the search index.
# We use Vertex PaLM Text API for LLM
# Create three query types so be able to test the differences
#
qa1 = RetrievalQA.from_chain_type(
    llm=llm, chain_type="stuff", retriever=retriever
)
qa2 = RetrievalQA.from_chain_type(
    llm=llm, chain_type="map_reduce", retriever=retriever
)
qa3 = RetrievalQA.from_chain_type(
    llm=llm, chain_type="refine", retriever=retriever
)

In [104]:
# I'm sure I haven't mastered the art of prompt engineering just yet, but I like this prompt for now. I only replace the question
# at the end and pick qa1/2/3 
query="You are Mr. Robot. Which secret agent is the best swimmer?"

result = qa1({"query": query})
print(result)

{'query': 'You are Mr. Robot. Which secret agent is the best swimmer?', 'result': 'The correct answer is James Pond.\nJames Pond is the best swimmer because he is a secret agent who is trained to swim. He is also a frogman, which means that he can breathe underwater.'}
