In [1]:
%pip install --upgrade google-cloud-aiplatform==1.35.0 google-cloud-documentai==2.20.1 backoff==2.2.1 --user

Note: you may need to restart the kernel to use updated packages.


In [2]:
import sys

if "google.colab" in sys.modules:
    # Automatically restart kernel after installs so that your environment can access the new packages
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)
else:
    # Otherwise, attempt to discover local credentials as described on https://cloud.google.com/docs/authentication/application-default-credentials
    pass

In [40]:
from __future__ import annotations
import backoff
from tenacity import retry, stop_after_attempt, wait_random_exponential
from google.api_core.exceptions import ResourceExhausted
from google.api_core.client_options import ClientOptions
from google.api_core.exceptions import AlreadyExists
from google.cloud import documentai
import numpy as np
import glob
import os
from typing import Dict, List
import pandas as pd
from logging import error
import re
import textwrap
from typing import Tuple, List
import vertexai
from vertexai.language_models import TextEmbeddingModel, TextGenerationModel
import fitz 
import json
import time
import numpy as np


## Pre-configuration of GCP project

In [27]:
#Once the project is created in the console, extract the parameters here
PROJECT_ID = !gcloud config get project
PROJECT_ID = PROJECT_ID.n
LOCATION = "europe-west2"
LOCATION_DEPLOY = "europe-west2" #Location to deploy GCP resources

!gcloud services enable documentai.googleapis.com storage.googleapis.com aiplatform.googleapis.com

Operation "operations/acat.p2-268250494950-ee35f94b-df81-4663-9a97-f3d26a27c6ae" finished successfully.


In [13]:
client_options

ClientOptions: {'api_endpoint': 'europe-west2-documentai.googleapis.com', 'client_cert_source': None, 'client_encrypted_cert_source': None, 'quota_project_id': None, 'credentials_file': None, 'scopes': None, 'api_key': None, 'api_audience': None}

The processor is now ready to be defined by calling the document AI API. Code source: https://github.com/GoogleCloudPlatform/generative-ai/blob/main/language/use-cases/document-qa/question_answering_documentai_vector_store_palm.ipynb.


## 1. Create the processor


"There are two types of Document AI processors:

Pre-trained processors: These processors are pre-trained on a large dataset of documents and can be used to perform common document processing tasks, such as Optical Character Recognition (OCR), form parsing, and entity extraction.
Custom processors: These processors can be trained on your own dataset of documents to perform specific tasks that are not covered by the pre-trained processors.
Refer to Full processor and detail list for all supported processors.

Processors take a PDF or image file as input and output the data in the Document format."

In [29]:
# Edit these variables before running the code.
project_id = PROJECT_ID

# See https://cloud.google.com/document-ai/docs/regions for all options.
location = LOCATION

# Must be unique per project, e.g.: "My Processor"
processor_display_name = "my_processor1"

# You must set the `api_endpoint` if you use a location other than "us".
client_options = ClientOptions(api_endpoint=f"{location}-documentai.googleapis.com")


#1. Create the processor: you can not create multiple processors with the same display name
def create_processor(
    project_id: str, location: str, processor_display_name: str
) -> documentai.Processor:
    client = documentai.DocumentProcessorServiceClient(client_options=client_options)

    # The full resource name of the location
    # e.g.: projects/project_id/locations/location
    parent = client.common_location_path(project_id, location)

    # Create a processor
    return client.create_processor(
        parent=parent,
        processor=documentai.Processor(
            display_name=processor_display_name, type_="OCR_PROCESSOR" #we are using the pre-trained OCR processor
        ),
    )


try:
    processor = create_processor(project_id, location, processor_display_name)
    print(f"Created Processor {processor.name}")
except AlreadyExists as e:
    print(
        f"Processor already exits, change the processor name and rerun this code. {e.message}"
    )

    

#2. Define process document function which takes the processor name and file path of the document and extracts the text from the document.  
def process_document(
    processor_name: str,
    file_path: str,
) -> documentai.Document:
    client = documentai.DocumentProcessorServiceClient(client_options=client_options)

    # Read the file into memory
    with open(file_path, "rb") as image:
        image_content = image.read()

    # Load Binary Data into Document AI RawDocument Object
    raw_document = documentai.RawDocument(
        content=image_content, mime_type="application/pdf"
    )

    # Configure the process request
    request = documentai.ProcessRequest(name=processor_name, raw_document=raw_document)

    result = client.process_document(request=request)

    return result.document



Created Processor projects/268250494950/locations/europe-west2/processors/1553b92dcd6e0f0b


The document AI processor set a limit of 15 pages of PDF to be processed at once. The following function will split the initial PDF into pdfs of 15 pages max. The created pdfs are saved under new folder pdf_chunks with path labelled 1 to n (n=number of pdf chunks)

In [10]:
def split_and_save_pdf(input_pdf_path: str, max_pages_per_file: int):
    # Create a folder to store the split PDFs
    output_folder = os.path.join(os.path.dirname(input_pdf_path), 'pdf_chunks')
    os.makedirs(output_folder, exist_ok=True)
    
    pdf_paths = []

    # Open the input PDF
    with fitz.open(input_pdf_path) as pdf_document:
        num_pages = pdf_document.page_count

        # Calculate the number of files needed
        num_files = (num_pages + max_pages_per_file - 1) // max_pages_per_file

        # Split the PDF into multiple files
        for i in range(num_files):
            start_page = i * max_pages_per_file
            end_page = min((i + 1) * max_pages_per_file, num_pages)

            pdf_writer = fitz.open()
            pdf_writer.insert_pdf(pdf_document, from_page=start_page, to_page=end_page - 1)

            output_pdf_path = os.path.join(output_folder, f'pdf_{i + 1}.pdf')
            pdf_writer.save(output_pdf_path)

            print(f'Saved: {output_pdf_path}')
            pdf_paths.append(output_pdf_path)
        
    
    return pdf_paths

We are now ready to extract the text from our pdf. 

## 2. Text extraction from PDF

In [32]:
# Set the desired parameters
input_pdf_path = "SC145746_aa_2021-10-29.pdf" # Replace with your actual input PDF path
max_pages_per_file = 15 # Set the desired maximum number of pages per file
processor_name = processor.name # Assign the created processor name


# 1. Split the PDF into pdf chunks of 15 pages max and save their paths 
pdf_paths = split_and_save_pdf(input_pdf_path, max_pages_per_file)


# 2. Iterate through the pdf chunks and extract and join their text
texts = []
for pdf_path in pdf_paths:   
    document = process_document(processor_name, file_path=pdf_path)
    texts.append(document.text)
    
text = ''.join(texts)

Saved: pdf_chunks/pdf_1.pdf
Saved: pdf_chunks/pdf_2.pdf
Saved: pdf_chunks/pdf_3.pdf


3. Text chunking

The extracted text from the PDF is chunked following these steps: 

    - Split into sentences
    - Create paragraphs based on number of element per chunk which can be set up manually 
    - Create DataFrame where each row is a paragraph

In [37]:
#split into sentences
def text_to_sentences(text):
    sentences = text.split('\n')
    return sentences

def create_sentence_chunks(sentences, chunk_size):
    sentence_chunks = [sentences[i:i+chunk_size] for i in range(0, len(sentences), chunk_size)]
    return sentence_chunks

def text_to_paragraph(text, chunk_size):
    return create_sentence_chunks(text_to_sentences(text), chunk_size)

def paragraphs_to_df(paragraphs):
    data = []

    # Iterate through sentence chunks
    for idx, chunk in enumerate(sentence_chunks, start=1):
        # Merge sentences within the chunk into one string
        merged_text = ''.join(chunk)
        # Append data to the list
        data.append({
            'paragraph_number':idx,
            'text':merged_text
        })
    # DataFrame with different paragraphs
    return pd.DataFrame(data)
    
chunk_size = 40
sentence_chunks = text_to_paragraph(text, chunk_size)
df = paragraphs_to_df(sentence_chunks)

## 4. Create the embeddings

For each paragraph in the DataFrame, the gecko embedding model from GCP is applied to create the corresponding embeddign vector.

In [38]:
# Call the GCP models
generation_model = TextGenerationModel.from_pretrained("text-bison@002")
embedding_model = TextEmbeddingModel.from_pretrained("textembedding-gecko@002")


# This decorator is used to handle exceptions and apply exponential backoff in case of ResourceExhausted errors.
# It means the function will be retried with increasing time intervals in case of this specific exception.
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(5))
def text_generation_model_with_backoff(**kwargs):
    return generation_model.predict(**kwargs).text


def get_embedding(text):
    get_embedding.counter += 1
    try:
        if get_embedding.counter % 100 == 0:
            time.sleep(3)
        return embedding_model.get_embeddings([text])[0].values #Send request to embedding model
    except:
        print('waiting for 60 secs')
        time.sleep(60)
        return embedding_model.get_embeddings([text])[0].values #Send request to embedding model


In [41]:
get_embedding.counter = 0

# This may take several minutes to complete.
df["embedding"] = df["text"].apply(lambda x: get_embedding(x))

The same embedding model is applied on the question of the user.

In [43]:
def get_context_from_question(
    question: str, vector_store: pd.DataFrame, sort_index_value: int = 3
) -> Tuple[str, pd.DataFrame]:
    query_vector = np.array(get_embedding(question))
    vector_store["dot_product"] = vector_store["embedding"].apply(
        lambda row: np.dot(row, query_vector)
    )
    # Similarity matching by dot product 
    top_matched = vector_store.sort_values(by="dot_product", ascending=False)[
        :sort_index_value
    ].index
    
    top_matched_df = vector_store.loc[top_matched, ["paragraph_number", "text"]]
    context = "\n".join(top_matched_df["text"].values)
    
    return context, top_matched_df

## Usage

In [45]:
%%time
# your question for the documents
question = "Give me the scope 1 emissions in 2021?"

# get the custom relevant chunks from all the chunks in vector store.
context, top_matched_df = get_context_from_question(
    question,
    vector_store=df,
    sort_index_value=4,  # Top N results to pick from embedding vector search
)
prompt = f""" Answer the question as precise as possible using the provided context. \n\n
            Context: \n {context}?\n
            Question: \n {question} \n
            Answer:
  
  """

# Call the PaLM API on the prompt.
print(question)
print("PaLM Predicted:", text_generation_model_with_backoff(prompt=prompt), "\n\n")
# top 5 data that has been picked by model based on user question. This becomes the context.
print(top_matched_df)


Give me the scope 1 emissions in 2021?
PaLM Predicted:  The scope 1 emissions in 2021 were 240.00 metric tonnes. 


    paragraph_number                                               text
30                31  3,573,204758,517383,67220,828,484Depreciation ...
26                27  20212020££Interest on bank overdrafts and loan...
5                  6  - Gas combustion740,145- Fuel consumed for tra...
0                  1  Company Registration No. SC145746 (Scotland)TH...
CPU times: user 18.5 ms, sys: 10.2 ms, total: 28.6 ms
Wall time: 723 ms
