In [None]:
# Restart kernel after complete
!pip install --upgrade google-cloud-aiplatform
!pip install pydantic 

In [None]:
from google.cloud import aiplatform
print(aiplatform.__version__)

In [None]:
PROJECT = !(gcloud config get-value core/project)
PROJECT = PROJECT[0]
REGION = "us-central1"
DATASET_FILE_BUCKET = f"{PROJECT}-dataset-files-test"
VECTOR_SEARCH_BUCKET = f"{PROJECT}-vector-search-test"
DOCUMENT_BUCKET = f"{PROJECT}-documents-test"
DEPLOYED_INDEX_ID = "my_test_index_deployed" 

In [None]:
!gsutil mb -l us-central1 gs://{DATASET_FILE_BUCKET}
!gsutil mb -l us-central1 gs://{VECTOR_SEARCH_BUCKET}
!gsutil mb -l us-central1 gs://{DOCUMENT_BUCKET}

In [None]:
%%writefile test.txt
gs://cloud-samples-data/gen-app-builder/search/arxiv/1012.0841v1.Automated_Query_Learning_with_Wikipedia_and_Genetic_Programming.pdf
gs://cloud-samples-data/gen-app-builder/search/arxiv/1406.2538v1.FrameNet_CNL_a_Knowledge_Representation_and_Information_Extraction_Language.pdf
gs://cloud-samples-data/gen-app-builder/search/arxiv/1409.2944v2.Collaborative_Deep_Learning_for_Recommender_Systems.pdf
gs://cloud-samples-data/gen-app-builder/search/arxiv/1412.3714v2.Feature_Weight_Tuning_for_Recursive_Neural_Networks.pdf
gs://cloud-samples-data/gen-app-builder/search/arxiv/1412.5335v7.Ensemble_of_Generative_and_Discriminative_Techniques_for_Sentiment_Analysis_of_Movie_Reviews.pdf

In [None]:
# Put together a dataset file with a list of PDF paths 
DATASET_FILE = f"gs://{DATASET_FILE_BUCKET}/test.txt"
!gsutil cp test.txt {DATASET_FILE}

In [None]:
# Take a look at the dataset file in GCS. This is the format of the txt file the Dataflow pipeline expects
!gsutil cat {DATASET_FILE}

In [None]:
# Create Vector Search Index 
index = aiplatform.MatchingEngineIndex.create_tree_ah_index(
    display_name="test-index",
    contents_delta_uri=f"gs://{VECTOR_SEARCH_BUCKET}",
    dimensions=512,
    approximate_neighbors_count=150,
    index_update_method="STREAM_UPDATE",
    shard_size="SHARD_SIZE_SMALL"
)
INDEX_ID = index.resource_name

In [None]:
# Feel free to change the chunk size and overlap
print(
    f"Google Cloud Project ID: {PROJECT}\n"
    f"Dataset file:            {DATASET_FILE}\n"
    f"Vector Search Index:     {INDEX_ID}\n"
    f"Document Bucket:         {DOCUMENT_BUCKET}\n"
    f"Chunk Size:              1000\n"
    f"Chunk Overlap:           250\n"
)

## Executing the Pipeline
Using the above runtime parameters, launch an execution of the pipeline. 

#### Through the UI
 1. Navigate to Dataflow in the GCP Console 
 1. Select 'Create job from template'
 1. Specify a unique job name (e.g. pdf-to-vector-pipeline-run-{timestamp})
 1. In Template Select, scroll down to the button and select 'Custom Template'
 1. Browse to and select the template path (you can find this as an output of the final code cell in setup.ipynb)
 1. Input the above pipeline parameters 
 1. Select 'Run Job'
 
#### Using gcloud CLI
```python
!gcloud dataflow flex-template run "pdf-to-vector-search-`date +%Y%m%d-%H%M%S`" \
 --template-file-gcs-location={TEMPLATE_FILE} \
 --region={REGION} \
 --num-workers={NUM_WORKERS} \
 --max-workers={MAX_WORKERS} \
 --worker-machine-type={MACHINE_TYPE} \
 --parameters project_id={PROJECT} \
 --parameters dataset_file={DATASET_FILE} \
 --parameters index_id={INDEX_ID} \
 --parameters document_bucket={DOCUMENT_BUCKET} \
 --parameters chunk_size={CHUNK_SIZE} \
 --parameters chunk_overlap={CHUNK_OVERLAP}
```

While the pipeline to embed the PDFs and ingest into Vector Search is running, create an endpoint to host the index and deploy the index to the endpoint so we can serve nearest neighbors queries for retrieval.

In [None]:
# Create Vector Search Endpoint 
endpoint = aiplatform.MatchingEngineIndexEndpoint.create(
    display_name=f'{INDEX_ID}-endpoint',
    public_endpoint_enabled=True
)
ENDPOINT_ID = endpoint.resource_name 
print(f"ENDPOINT_ID: {ENDPOINT_ID}")

# Deploy index to endpoint
deployed_index = endpoint.deploy_index(
    index=index,
    deployed_index_id=DEPLOYED_INDEX_ID
)

## End to End Retrieval System

**NOTE** You need to wait for the pipeline to successfully finish, and the endpoint to be deployed before moving forward.

Create a hepler class that implements the retrieval chain for a research paper chat agent. 

In [None]:
import tensorflow as tf 
import tensorflow_hub as hub 
import json 
from vertexai.generative_models import (
    GenerativeModel, 
    GenerationConfig,
    GenerationResponse
)
from google.cloud import storage
from pydantic import BaseModel
from typing import List

class DocumentChunk(BaseModel):
    source_file: str 
    page_number: int 
    content: str 
    
class Response(BaseModel):
    text: str | None = None
    full_response: dict 
    citations: List[DocumentChunk] | None = None 
    
class ResearchPaperChat:
    def __init__(
        self,
        endpoint_id: str,
        deployed_index_id: str,
        document_bucket: str,
        k: int,
        model_id: str = "gemini-1.5-pro-001",
        temperature: float = 0.2,
        max_output_tokens: int = 2500,
        sys_msg: str = """
        You are a friendly research assistant with access to many research papers. 
        You only answer questions about research. You do not make up any new facts.
        """,
    ):
        """
        endpoint_id: str. Vertex AI Vector Search index endpoint ID. 
        deployed_index_id: str. Deployed Index ID on Vector Search endpoint.
        k: int. Number of nearest neighbors to include in retrieval.
        model_id: str. Underlying LLM to use. Must be Gemini variant.
        sys_msg: str. System message to initialize model with.
        document_bucket: str. GCS bucket name where {vector_id}.json files are stored. 
                    Each file needs to have keys: {source_file, page_number, content}
        
        """        
        self.k = k 
        # Load embedding model that was used to create vector database 
        self.embedding_model = hub.load("https://tfhub.dev/google/universal-sentence-encoder/4")
        self.endpoint = aiplatform.MatchingEngineIndexEndpoint(endpoint_id)
        self.bucket = storage.Client().get_bucket(document_bucket) 
        self.deployed_index_id = deployed_index_id 
        
        model = GenerativeModel(
            model_id,
            system_instruction=sys_msg,
            generation_config=GenerationConfig(
                temperature=temperature,
                max_output_tokens=max_output_tokens
            )
        )
        self.chat_session = model.start_chat()
        
    def send_message(self, message: str) -> Response:
        
        query_embedding = self._get_embedding(message) # Embed query 
        query_neighbor_ids = self._find_neighbors(query_embedding) # Find nearest neighbor IDs
        
        # Read documents for nearest neighbors 
        neighbor_docs = [
            self._get_document(doc_id) 
            for doc_id in query_neighbor_ids
        ]
        
        # Put nearest neighbors content in a prompt with the query 
        full_message = self._get_prompt(
            query=message,
            documents=neighbor_docs
        )   
        response = self.chat_session.send_message(full_message)
        try:
            response_text = response.text 
        except:
            response_text = None 
            
        return Response(
            text=response_text,
            full_response=response.to_dict(),
            citations=neighbor_docs
        )
        
    def _get_embedding(self, text: str) -> List[float]: 
        output = self.embedding_model([text])
        return tf.squeeze(output).numpy().tolist()
    
    def _find_neighbors(self, vector: List[float]) -> List[str]: 
        neighbors = self.endpoint.find_neighbors(
            deployed_index_id=self.deployed_index_id,
            queries=[
                vector
            ],
            num_neighbors=self.k
        )
        
        # Dont fail if there are no neighbors found. Simply return no context data points 
        if not neighbors:
            return [] 
        
        return [x.id for x in neighbors[0]]
    
    def _get_document(self, doc_id: str) -> DocumentChunk: 
        blob = self.bucket.blob(f'documents/{doc_id}.json')
        data = json.loads(blob.download_as_string())
        return DocumentChunk(**data) 
    
    def _get_prompt(self, query: str, documents: List[DocumentChunk] | None = None) -> str:
        content_list = [
            f"\nSource: {d.source_file}\nPage Num: {d.page_number}\nContent: {d.content}\n\n"
            for d in documents 
        ]
        content_str = '\n'.join(content_list)
        
        prompt = f"""
        Using only the provided context from research papers, answer the question.
        If you cannot answer the question using only the provided context, 
        respond that you do not have the context needed to answer the question.
        
        Question: {query}.
        
        Context:
        {content_str}
        """
        return prompt
    
def pretty_print_response(response: Response, include_content: bool = False):
    print(response.text)
    print("\nCitations:\n")
    for c in response.citations: 
        print(f"\nSource PDF: {c.source_file}\nPage Number: {c.page_number}\n")
        if include_content:
            print(f"{c.content} \n")

In [None]:
# If session was interupted then be sure to initialize the needed variables 
# ENDPOINT_ID = "{YOUR VECTOR SEARCH ENDPOINT ID}"
# DEPLOYED_INDEX_ID = "my_test_index_deployed"
# PROJECT = !(gcloud config get-value core/project)
# PROJECT = PROJECT[0]
# DOCUMENT_BUCKET = f"{PROJECT}-documents-test"

chat = ResearchPaperChat(
    endpoint_id=ENDPOINT_ID,
    deployed_index_id=DEPLOYED_INDEX_ID,
    document_bucket=DOCUMENT_BUCKET,
    k=5
)

In [None]:
response = chat.send_message("What is sentence pair scoring?")
pretty_print_response(response)

In [None]:
response = chat.send_message("Interesting. What is batch softmax contrastive loss?")
pretty_print_response(response)