# Upload Dialog (Pre-defined scripts) to Google Cloud Vector DB

Vector Search + Bigtable

# Install library, remember to restart runtime

In [None]:
! pip install google-cloud-aiplatform==1.25.0
! pip install langchain
! pip install google-cloud-bigtable

Collecting google-cloud-aiplatform==1.25.0
  Downloading google_cloud_aiplatform-1.25.0-py2.py3-none-any.whl (2.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.6/2.6 MB[0m [31m24.5 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: google-cloud-aiplatform
  Attempting uninstall: google-cloud-aiplatform
    Found existing installation: google-cloud-aiplatform 1.30.1
    Uninstalling google-cloud-aiplatform-1.30.1:
      Successfully uninstalled google-cloud-aiplatform-1.30.1
Successfully installed google-cloud-aiplatform-1.25.0
[0m

Collecting langchain
  Obtaining dependency information for langchain from https://files.pythonhosted.org/packages/74/34/e1a55be448d4fd1866516b9612097c76c7d384d64490a37677c667de58bc/langchain-0.0.297-py3-none-any.whl.metadata
  Downloading langchain-0.0.297-py3-none-any.whl.metadata (14 kB)
Collecting dataclasses-json<0.7,>=0.5.7 (from langchain)
  Obtaining dependency information for dataclasses-json<0.7,>=0.5.7 from https://files.pythonhosted.org/packages/13/75/82ce74880711ced796fd5a32e4d40c5a32dbea3f1c5e219a8b0544b7bd8c/dataclasses_json-0.6.0-py3-none-any.whl.metadata
  Downloading dataclasses_json-0.6.0-py3-none-any.whl.metadata (24 kB)
Collecting langsmith<0.1.0,>=0.0.38 (from langchain)
  Obtaining dependency information for langsmith<0.1.0,>=0.0.38 from https://files.pythonhosted.org/packages/fb/58/89739e01b4ef4018efc4a1a107fbba4af8215b59b5bd8caac6cb2910bd7e/langsmith-0.0.39-py3-none-any.whl.metadata
  Downloading langsmith-0.0.39-py3-none-any.whl.metadata (10 kB)
Collecting mar

# Authenticate to Google Cloud

In [None]:
# Authenticate with Google Cloud credentials
from google.colab import auth as google_auth
google_auth.authenticate_user()

# Utility functions to create Vector Search

use ai_platform_v1 SDK, independent with Langchain

In [None]:
%%writefile matching_engine_utils.py
from datetime import datetime
import time
import logging

from google.cloud import aiplatform_v1 as aipv1
from google.protobuf import struct_pb2

logging.basicConfig(level = logging.INFO)
logger = logging.getLogger()

class MatchingEngineUtils:
    def __init__(self,
                 project_id: str,
                 region: str,
                 index_name: str):
        self.project_id = project_id
        self.region = region
        self.index_name = index_name
        self.index_endpoint_name = f"{self.index_name}-endpoint"
        self.PARENT = f"projects/{self.project_id}/locations/{self.region}"

        ENDPOINT = f"{self.region}-aiplatform.googleapis.com"
        # set index client
        self.index_client = aipv1.IndexServiceClient(
            client_options=dict(api_endpoint=ENDPOINT)
        )
        # set index endpoint client
        self.index_endpoint_client = aipv1.IndexEndpointServiceClient(
            client_options=dict(api_endpoint=ENDPOINT)
        )

    def get_index(self):
        # Check if index exists
        request = aipv1.ListIndexesRequest(parent=self.PARENT)
        page_result = self.index_client.list_indexes(request=request)
        indexes = [response.name for response in page_result
                   if response.display_name == self.index_name]

        if len(indexes) == 0:
            return None
        else:
            index_id = indexes[0]
            request = aipv1.GetIndexRequest(name=index_id)
            index = self.index_client.get_index(request=request)
            return index

    def get_index_endpoint(self):
        # Check if index endpoint exists
        request = aipv1.ListIndexEndpointsRequest(parent=self.PARENT)
        page_result = self.index_endpoint_client.list_index_endpoints(request=request)
        index_endpoints = [response.name for response in page_result
                           if response.display_name == self.index_endpoint_name]

        if len(index_endpoints) == 0:
            return None
        else:
            index_endpoint_id = index_endpoints[0]
            request = aipv1.GetIndexEndpointRequest(name=index_endpoint_id)
            index_endpoint = self.index_endpoint_client.get_index_endpoint(request=request)
            return index_endpoint

    def create_index(self,
                     embedding_gcs_uri: str,
                     dimensions: int
                     ):
        # Get index
        index = self.get_index()
        # Create index if does not exists
        if index:
            logger.info(f"Index {self.index_name} already exists with id {index.name}")
        else:
            logger.info(f"Index {self.index_name} does not exists. Creating index ...")

            treeAhConfig = struct_pb2.Struct(
                fields={
                    "leafNodeEmbeddingCount": struct_pb2.Value(number_value=500),
                    "leafNodesToSearchPercent": struct_pb2.Value(number_value=7),
                }
            )
            algorithmConfig = struct_pb2.Struct(
                fields={"treeAhConfig": struct_pb2.Value(struct_value=treeAhConfig)}
            )
            config = struct_pb2.Struct(
                fields={
                    "dimensions": struct_pb2.Value(number_value=dimensions),
                    "approximateNeighborsCount": struct_pb2.Value(number_value=150),
                    "distanceMeasureType": struct_pb2.Value(string_value="DOT_PRODUCT_DISTANCE"),
                    "algorithmConfig": struct_pb2.Value(struct_value=algorithmConfig),
                    "shardSize": struct_pb2.Value(string_value="SHARD_SIZE_SMALL"),
                }
            )
            metadata = struct_pb2.Struct(
                fields={
                    "config": struct_pb2.Value(struct_value=config),
                    "contentsDeltaUri": struct_pb2.Value(string_value=embedding_gcs_uri),
                }
            )

            index_request = {
                "display_name": self.index_name,
                "description": "Index for LangChain demo",
                "metadata": struct_pb2.Value(struct_value=metadata),
                "index_update_method": aipv1.Index.IndexUpdateMethod.STREAM_UPDATE,
            }

            r = self.index_client.create_index(parent=self.PARENT,
                                               index=index_request)

            # Poll the operation until it's done successfullly.
            logging.info("Poll the operation to create index ...")
            while True:
                if r.done():
                    break
                time.sleep(60)
                print('.', end='')

            index = r.result()
            logger.info(f"Index {self.index_name} created with resource name as {index.name}")

        return index

    def deploy_index(self,
                     machine_type: str = "e2-standard-2",
                     min_replica_count: int = 2,
                     max_replica_count: int = 10,
                     network: str = None):
        try:
            # Get index if exists
            index = self.get_index()
            if not index:
                raise Exception(f"Index {self.index_name} does not exists. Please create index before deploying.")

            # Get index endpoint if exists
            index_endpoint = self.get_index_endpoint()
            # Create Index Endpoint if does not exists
            if index_endpoint:
                logger.info(f"Index endpoint {self.index_endpoint_name} already exists with resource " +
                            f"name as {index_endpoint.name} and endpoint domain name as " +
                            f"{index_endpoint.public_endpoint_domain_name}")
            else:
                logger.info(f"Index endpoint {self.index_endpoint_name} does not exists. Creating index endpoint...")
                index_endpoint_request = {"display_name": self.index_endpoint_name}

                if network:
                    index_endpoint_request["network"] = network
                else:
                    index_endpoint_request["public_endpoint_enabled"] = True

                r = self.index_endpoint_client.create_index_endpoint(
                    parent=self.PARENT,
                    index_endpoint=index_endpoint_request)

                logger.info("Poll the operation to create index endpoint ...")
                while True:
                    if r.done():
                        break
                    time.sleep(60)
                    print('.', end='')

                index_endpoint = r.result()
                logger.info(f"Index endpoint {self.index_endpoint_name} created with resource " +
                            f"name as {index_endpoint.name} and endpoint domain name as " +
                            f"{index_endpoint.public_endpoint_domain_name}")
        except Exception as e:
            logger.error(f"Failed to create index endpoint {self.index_endpoint_name}")
            raise e

        # Deploy Index to endpoint
        try:
            # Check if index is already deployed to the endpoint
            for d_index in index_endpoint.deployed_indexes:
                if d_index.index == index.name:
                    logger.info(f"Skipping deploying Index. Index {self.index_name}" +
                                f"already deployed with id {index.name} to the index endpoint {self.index_endpoint_name}")
                    return index_endpoint

            timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
            deployed_index_id = f"{self.index_name.replace('-', '_')}_{timestamp}"
            deploy_index = {
                "id": deployed_index_id,
                "display_name": deployed_index_id,
                "index": index.name,
                "dedicated_resources": {
                    "machine_spec": {
                        "machine_type": machine_type,
                        },
                    "min_replica_count": min_replica_count,
                    "max_replica_count": max_replica_count
                    }
            }
            logger.info(f"Deploying index with request = {deploy_index}")
            r = self.index_endpoint_client.deploy_index(
                index_endpoint=index_endpoint.name,
                deployed_index=deploy_index
            )

            # Poll the operation until it's done successfullly.
            logger.info("Poll the operation to deploy index ...")
            while True:
                if r.done():
                    break
                time.sleep(60)
                print('.', end='')

            logger.info(f"Deployed index {self.index_name} to endpoint {self.index_endpoint_name}")

        except Exception as e:
            logger.error(f"Failed to deploy index {self.index_name} to the index endpoint {self.index_endpoint_name}")
            raise e

        return index_endpoint

    def get_index_and_endpoint(self):
        # Get index id if exists
        index = self.get_index()
        index_id = index.name if index else ''

        # Get index endpoint id if exists
        index_endpoint = self.get_index_endpoint()
        index_endpoint_id = index_endpoint.name if index_endpoint else ''

        return index_id, index_endpoint_id

    def delete_index(self):
        # Check if index exists
        index = self.get_index()

        # create index if does not exists
        if index:
            # Delete index
            index_id = index.name
            logger.info(f"Deleting Index {self.index_name} with id {index_id}")
            self.index_client.delete_index(name=index_id)
        else:
            raise Exception("Index {index_name} does not exists.")

    def delete_index_endpoint(self):
        # Check if index endpoint exists
        index_endpoint = self.get_index_endpoint()

        # Create Index Endpoint if does not exists
        if index_endpoint:
            logger.info(f"Index endpoint {self.index_endpoint_name}  exists with resource " +
                        f"name as {index_endpoint.name} and endpoint domain name as " +
                        f"{index_endpoint.public_endpoint_domain_name}")

            index_endpoint_id = index_endpoint.name
            index_endpoint = self.index_endpoint_client.get_index_endpoint(name=index_endpoint_id)
            # Undeploy existing indexes
            for d_index in index_endpoint.deployed_indexes:
                logger.info(f"Undeploying index with id {d_index.id} from Index endpoint {self.index_endpoint_name}")
                request = aipv1.UndeployIndexRequest(
                    index_endpoint=index_endpoint_id,
                    deployed_index_id=d_index.id)
                r = self.index_endpoint_client.undeploy_index(request=request)
                response = r.result()
                logger.info(response)

            # Delete index endpoint
            logger.info(f"Deleting Index endpoint {self.index_endpoint_name} with id {index_endpoint_id}")
            self.index_endpoint_client.delete_index_endpoint(name=index_endpoint_id)
        else:
            raise Exception(f"Index endpoint {self.index_endpoint_name} does not exists.")

Writing matching_engine_utils.py


# Create Vector Search

In [None]:
# @title parameters
PROJECT_ID = "argolis-lsj-test" # @param {type: "string"}
LOCATION = "us-central1"
ME_REGION          = "us-central1"
ME_INDEX_NAME      = "langchain-vs-tencent-colabEnterprise" # @param {type: "string"}
ME_DIMENSIONS      = 768 # when using Vertex PaLM Embedding
ME_EMBEDDING_DIR   = "gs://langchain-vs-tencent-bucket" # @param {type: "string"}

In [None]:
# @title create a gcs bucket for vector search initialization
! gsutil mb -l $LOCATION -p $PROJECT_ID $ME_EMBEDDING_DIR

Creating gs://langchain-vs-tencent-bucket/...


In [None]:
# @title init matching engine
from google.cloud import storage
import uuid
import numpy as np
import json


# dummy embedding
init_embedding = {"id": str(uuid.uuid4()),
                  "embedding": list(np.zeros(ME_DIMENSIONS))}

# dump embedding to a local file
with open("embeddings_0.json", "w") as f:
    json.dump(init_embedding, f)

# write embedding to Cloud Storage
! gsutil cp embeddings_0.json {ME_EMBEDDING_DIR}/init_index/embeddings_0.json

Copying file://embeddings_0.json [Content-Type=application/json]...
/ [1 files][  3.8 KiB/  3.8 KiB]                                                
Operation completed over 1 objects/3.8 KiB.                                      


In [None]:
# @title create index, wait for 30 mins
import matching_engine_utils
mengine = matching_engine_utils.MatchingEngineUtils(PROJECT_ID, ME_REGION, ME_INDEX_NAME)
index = mengine.create_index(f"{ME_EMBEDDING_DIR}/init_index", ME_DIMENSIONS)
if index:
  print(index.name)

.........................................................projects/703099487153/locations/us-central1/indexes/7197024883421741056


In [None]:
# @title deploy index to endpoint, wait for 30 mins
#import matching_engine_utils
#mengine = matching_engine_utils.MatchingEngineUtils(PROJECT_ID, ME_REGION, ME_INDEX_NAME)
index_endpoint = mengine.deploy_index(machine_type="n1-standard-16", max_replica_count=2)
if index_endpoint:
  print(f"Index endpoint resource name: {index_endpoint.name}")
  print(f"Index endpoint public domain name: {index_endpoint.public_endpoint_domain_name}")
  print(f"Deployed indexes on the index endpoint:")
  for d in index_endpoint.deployed_indexes:
    print(f"    {d.id}")

....................................Index endpoint resource name: projects/703099487153/locations/us-central1/indexEndpoints/2049410509337264128
Index endpoint public domain name: 
Deployed indexes on the index endpoint:


# Vector Search implementation of the Vector Store in Langchain


In [None]:
%%writefile matching_engine_bigtable.py
"""Vertex Matching Engine implementation of the vector store."""
from __future__ import annotations

import json
import logging
import time
import uuid, hashlib
from typing import TYPE_CHECKING, Any, Iterable, List, Optional, Type

from langchain.docstore.document import Document
from langchain.embeddings import TensorflowHubEmbeddings
from langchain.embeddings.base import Embeddings
from langchain.vectorstores.base import VectorStore

from google.cloud import storage
from google.cloud import bigtable
import google.cloud.bigtable.row_filters as row_filters
from google.cloud import aiplatform_v1beta1
from google.cloud.aiplatform import MatchingEngineIndex, MatchingEngineIndexEndpoint
from google.cloud.aiplatform_v1beta1 import FindNeighborsResponse
from google.oauth2.service_account import Credentials
from google.cloud import aiplatform_v1

logger = logging.getLogger()
bigtable_table_id = "vectordb"
bigtable_column_family_id = "vector_text"
collection_list = ["default"]
ENDPOINT = "{}-aiplatform.googleapis.com".format("us-central1")

class MatchingEngine(VectorStore):
    """Vertex Matching Engine implementation of the vector store.

    While the embeddings are stored in the Matching Engine, the embedded
    documents will be stored in GCS.

    An existing Index and corresponding Endpoint are preconditions for
    using this module.

    See usage in docs/modules/indexes/vectorstores/examples/matchingengine.ipynb

    Note that this implementation is mostly meant for reading if you are
    planning to do a real time implementation. While reading is a real time
    operation, updating the index takes close to one hour."""

    def __init__(
        self,
        project_id: str,
        index: MatchingEngineIndex,
        endpoint: MatchingEngineIndexEndpoint,
        embedding: Embeddings,
        gcs_client: storage.Client,
        gcs_bucket_name: str,
        index_client: aiplatform_v1.IndexServiceClient,
        index_endpoint_client: aiplatform_v1.IndexEndpointServiceClient,
        bigtable_client: bigtable.Client,
        bigtable_instance_id: str,
        credentials: Optional[Credentials] = None,
    ):
        """Vertex Matching Engine implementation of the vector store.

        While the embeddings are stored in the Matching Engine, the embedded
        documents will be stored in GCS.

        An existing Index and corresponding Endpoint are preconditions for
        using this module.

        See usage in
        docs/modules/indexes/vectorstores/examples/matchingengine.ipynb.

        Note that this implementation is mostly meant for reading if you are
        planning to do a real time implementation. While reading is a real time
        operation, updating the index takes close to one hour.

        Attributes:
            project_id: The GCS project id.
            index: The created index class. See
                ~:func:`MatchingEngine.from_components`.
            endpoint: The created endpoint class. See
                ~:func:`MatchingEngine.from_components`.
            embedding: A :class:`Embeddings` that will be used for
                embedding the text sent. If none is sent, then the
                multilingual Tensorflow Universal Sentence Encoder will be used.
            gcs_client: The GCS client.
            gcs_bucket_name: The GCS bucket name.
            credentials (Optional): Created GCP credentials.
        """
        super().__init__()
        self._validate_google_libraries_installation()

        self.project_id = project_id
        self.index = index
        self.endpoint = endpoint
        self.embedding = embedding
        self.bigtable_client = bigtable_client
        self.credentials = credentials
        self.bigtable_instance_id = bigtable_instance_id
        self.gcs_client = gcs_client
        self.gcs_bucket_name = gcs_bucket_name
        self.index_client = index_client
        self.index_endpoint_client = index_endpoint_client

    def _validate_google_libraries_installation(self) -> None:
        """Validates that Google libraries that are needed are installed."""
        try:
            from google.cloud import aiplatform, bigtable, storage  # noqa: F401
            from google.oauth2 import service_account  # noqa: F401
        except ImportError:
            raise ImportError(
                "You must run `pip install --upgrade "
                "google-cloud-aiplatform google-cloud-storage google-cloud-bigtable`"
                "to use the MatchingEngine Vectorstore."
            )

    def list_collection(
        self,
        **kwargs: Any,
    )-> List[str]:
        return collection_list

    def add_texts(
        self,
        texts: Iterable[str],
        collection: str,
        metadatas: Optional[List[dict]] = None,
        **kwargs: Any,
    ) -> List[str]:
        """Run more texts through the embeddings and add to the vectorstore.

        Args:
            texts: Iterable of strings to add to the vectorstore.
            metadatas: Optional list of metadatas associated with the texts.
            kwargs: vectorstore specific parameters.

        Returns:
            List of ids from adding the texts into the vectorstore.
        """
        logger.debug("Embedding documents.")
        embeddings = self.embedding.embed_documents(texts=list(texts), batch_size=5)
        jsons = []
        ids = []
        global collection_list

        if collection in collection_list:
            pass
        else:
            collection_list.append(collection)

        # Could be improved with async.
        for embedding, text in zip(embeddings, texts):
            # id = str(uuid.uuid4())
            id = hashlib.sha256((str(text)+str(collection)).encode('utf-8')).hexdigest()
            ids.append(id)
            jsons.append({"id": id, "embedding": embedding, "text": text})
            # self._upload_to_gcs(text, f"documents/{id}")

        # logger.debug(f"Uploaded {len(ids)} documents to GCS.")

        # # Creating json lines from the embedded documents.
        # result_str = "\n".join([json.dumps(x) for x in jsons])

        # filename_prefix = f"indexes/{uuid.uuid4()}"
        # filename = f"{filename_prefix}/{time.time()}.json"
        # self._upload_to_gcs(result_str, filename)
        # logger.debug(
        #     f"Uploaded updated json with embeddings to "
        #     f"{self.gcs_bucket_name}/{filename}."
        # )

        # self.index = self.index.update_embeddings(
        #     contents_delta_uri=f"gs://{self.gcs_bucket_name}/{filename_prefix}/"
        # )
        self._stream_upsert_vme(data=jsons, collection=collection)
        logger.debug("Updated index with new configuration.")

        return ids

    def delete_texts(
        self,
        texts: Iterable[str],
        collection: str,
        metadatas: Optional[List[dict]] = None,
        **kwargs: Any,
    ) -> List[str]:
        """Run more texts through the embeddings and delete from the vectorstore.

        Args:
            texts: Iterable of strings to delete from the vectorstore.
            metadatas: Optional list of metadatas associated with the texts.
            kwargs: vectorstore specific parameters.

        Returns:
            List of ids from deleting the texts into the vectorstore.
        """
        logger.debug("Delete embedding documents.")
        ids = []
        jsons =[]
        # Could be improved with async.

        for text in texts:
            # id = str(uuid.uuid4())
            id = hashlib.sha256((str(text)+str(collection)).encode('utf-8')).hexdigest()
            ids.append(id)
            jsons.append({"id": id})

        self._stream_delete_vme(data=jsons, collection=collection)
        logger.debug("Delete index with new configuration.")

        return ids

    def delete_collection(
        self,
        collection: str,
        metadatas: Optional[List[dict]] = None,
        **kwargs: Any,
    ) -> List[str]:
        logger.debug("Delete embedding documents by collection")
        ids = []
        jsons = []

        instance_client = self.bigtable_client.instance(self.bigtable_instance_id)
        table_client = instance_client.table(bigtable_table_id)
        collection_name = collection

        try:
            rows = table_client.read_rows(
                filter_=row_filters.ValueRegexFilter(collection_name.encode("utf-8"))
            )
        except Exception as error:
            logger.debug(f"Fail to fetech collection data {collection_name} with error {error}")

        for row in rows:
            id = row.row_key.decode("utf-8")
            ids.append(id)
            jsons.append({"id": id})

        if jsons.__len__() > 0:
            self._stream_delete_vme(data=jsons, collection=collection_name)
            logger.debug("Delete index with new configuration.")

        return ids

    def _read_text(self, row_key_text: str) -> str:

        instance_client = self.bigtable_client.instance(self.bigtable_instance_id)
        table_client = instance_client.table(bigtable_table_id)

        row_key = row_key_text
        row = table_client.read_row(row_key)
        column_family_id = bigtable_column_family_id
        buf = row.cell_value(column_family_id=column_family_id, column="text".encode('utf-8'))

        if buf != None:
            return buf.decode("utf-8")
        else:
            return None

    def _upload_to_gcs(self, data: str, gcs_location: str) -> None:
        """Uploads data to gcs_location.

        Args:
            data: The data that will be stored.
            gcs_location: The location where the data will be stored.
        """
        bucket = self.gcs_client.get_bucket(self.gcs_bucket_name)
        blob = bucket.blob(gcs_location)
        blob.upload_from_string(data)

    def _stream_upsert_vme(self, data: List[dict], collection: str) -> None:
        """Stream upsert data to vertex matching engine

        Args:
            data: The data will be stored [{"id": id, "embedding": embedding}]
        """
        from google.cloud import aiplatform_v1
        import datetime

        instance_client = self.bigtable_client.instance(self.bigtable_instance_id)
        table_client = instance_client.table(bigtable_table_id)
        column_family_id = bigtable_column_family_id

        # collection_name = collection

        if collection != "":
            restriction  = aiplatform_v1.IndexDatapoint.Restriction(
                {
                    "namespace" : "collection",
                    "allow_list" : [collection]
                }
            )
            collection_name = collection
        else:
            restriction  = aiplatform_v1.IndexDatapoint.Restriction({})
            collection_name = "default"

        for embedding_data in data:
            datapoint = aiplatform_v1.IndexDatapoint(
                datapoint_id=embedding_data["id"],
                feature_vector=embedding_data["embedding"],
                restricts=[restriction]
            )

            upsert_datapoint_request = aiplatform_v1.UpsertDatapointsRequest(
                        index=self.index.name,
                        datapoints=[datapoint]
                    )

            response = self.index_client.upsert_datapoints(request=upsert_datapoint_request)
            if response != {}:
                logger.debug(f"Fail to upsert {embedding_data['id']} embedding data.")

            try:
                timestamp = datetime.datetime.utcnow()
                row_key = embedding_data["id"]
                row = table_client.direct_row(row_key)
                row.set_cell(column_family_id, "text".encode("utf-8"), embedding_data["text"].encode("utf-8"), timestamp)
                row.set_cell(column_family_id,"collection".encode("utf-8"), collection_name, timestamp)
                row.commit()
            except Exception as error:
                print(error)
                logger.debug(f"Fail to upsert {embedding_data['id']} with error {error}")

    def _stream_delete_vme(self, data: List[str], collection: str) -> None:
        """Stream upsert data to vertex matching engine

        Args:
            data: The data will be stored [{"id": id}]
        """

        instance_client = self.bigtable_client.instance(self.bigtable_instance_id)
        table_client = instance_client.table(bigtable_table_id)

        for id in data:
            delete_datapoint_request = aiplatform_v1.RemoveDatapointsRequest(
               index=self.index.name,
               datapoint_ids=[id['id']]
            )

            response = self.index_client.remove_datapoints(request=delete_datapoint_request)
            if response != None:
                logger.debug(f"Fail to delete {id['id']} embedding data.")

            try:
                row_key = id['id']
                row = table_client.row(row_key)
                row.delete()
                row.commit()
            except Exception as error:
                print(error)
                logger.debug(f"Fail to delete {id['id']} with error {error}")

        global collection_list

        if collection != "":
            collection_name = collection
        else:
            collection_name = "default"

        rows = table_client.read_rows(
            filter_=row_filters.RowFilterChain(
                filters=[
                    row_filters.ColumnQualifierRegexFilter("collection".encode("utf-8")),
                    row_filters.ValueRegexFilter(collection_name.encode("utf-8")),
                    row_filters.CellsColumnLimitFilter(1),
                    row_filters.StripValueTransformerFilter(True),
                ]
            )
        )
        rows.consume_all()
        if len(rows.rows) == 0:
            collection_list.remove(collection_name)
        else:
            pass

    def get_matches(
            self,
            embeddings: List[str],
            n_matches: int,
            collection: str) -> FindNeighborsResponse:
        '''
        get matches from matching engine given a vector query
        Uses public endpoint

        '''

        client_options = {
              "api_endpoint": self.endpoint.public_endpoint_domain_name
        }

        vertex_ai_client = aiplatform_v1beta1.MatchServiceClient(
              client_options=client_options,
        )

        request = aiplatform_v1beta1.FindNeighborsRequest(
              index_endpoint=self.endpoint.resource_name,
              deployed_index_id=self.endpoint.deployed_indexes[0].id,
          )

        if collection != "":
            restriction  = aiplatform_v1beta1.IndexDatapoint.Restriction(
                {
                    "namespace" : "collection",
                    "allow_list" : [collection]
                }
            )
        else:
            restriction  =  aiplatform_v1beta1.IndexDatapoint.Restriction(
                {}
            )

        for i, embedding in enumerate(embeddings):
            query = aiplatform_v1beta1.FindNeighborsRequest.Query(
                datapoint=aiplatform_v1beta1.IndexDatapoint(
                        datapoint_id=str(i),
                        feature_vector=embedding,
                        restricts=[restriction],
                    ),
                neighbor_count = n_matches
            )
            request.queries.append(query)

        response = vertex_ai_client.find_neighbors(request)
        return response

    def similarity_search(
        self, query: str, k: int = 4, collection: str = "", **kwargs: Any
    ) -> List[Document]:
        """Return docs most similar to query.

        Args:
            query: The string that will be used to search for similar documents.
            k: The amount of neighbors that will be retrieved.

        Returns:
            A list of k matching documents.
        """

        logger.debug(f"Embedding query {query}.")
        embedding_query = self.embedding.embed_documents([query])

        # TO-DO: Pending query sdk integration
        # response = self.endpoint.match(
        #     deployed_index_id=self._get_index_id(),
        #     queries=embedding_query,
        #     num_neighbors=k,
        # )

        response = self.get_matches(embedding_query,
                                    k, collection)

        if response != None:
            response = response.nearest_neighbors[0].neighbors
        else:
            raise Exception(f"Failed to query index {str(response)}")

        if len(response) == 0:
            return []

        logger.debug(f"Found {len(response)} matches for the query {query}.")

        results = []

        # I'm only getting the first one because queries receives an array
        # and the similarity_search method only recevies one query. This
        # means that the match method will always return an array with only
        # one element.
        # for doc in response[0]:
        #     page_content = self._download_from_gcs(f"documents/{doc.id}")
        #     results.append(Document(page_content=page_content))
        for doc in response:
            page_content = self._read_text(doc.datapoint.datapoint_id)
            results.append(Document(page_content=page_content))


        logger.debug("Downloaded documents for query.")

        return results

    def _get_index_id(self) -> str:
        """Gets the correct index id for the endpoint.

        Returns:
            The index id if found (which should be found) or throws
            ValueError otherwise.
        """
        for index in self.endpoint.deployed_indexes:
            if index.index == self.index.resource_name:
                return index.id

        raise ValueError(
            f"No index with id {self.index.resource_name} "
            f"deployed on endpoint "
            f"{self.endpoint.display_name}."
        )

    def _download_from_gcs(self, gcs_location: str) -> str:
        """Downloads from GCS in text format.

        Args:
            gcs_location: The location where the file is located.

        Returns:
            The string contents of the file.
        """
        bucket = self.gcs_client.get_bucket(self.gcs_bucket_name)
        blob = bucket.blob(gcs_location)
        return blob.download_as_string()

    @classmethod
    def from_texts(
        cls: Type["MatchingEngine"],
        texts: List[str],
        embedding: Embeddings,
        metadatas: Optional[List[dict]] = None,
        **kwargs: Any,
    ) -> "MatchingEngine":
        """Use from components instead."""
        raise NotImplementedError(
            "This method is not implemented. Instead, you should initialize the class"
            " with `MatchingEngine.from_components(...)` and then call "
            "`add_texts`"
        )

    @classmethod
    def from_components(
        cls: Type["MatchingEngine"],
        project_id: str,
        region: str,
        gcs_bucket_name: str,
        index_id: str,
        endpoint_id: str,
        instance_id: str,
        credentials_path: Optional[str] = None,
        embedding: Optional[Embeddings] = None,
    ) -> "MatchingEngine":
        """Takes the object creation out of the constructor.

        Args:
            project_id: The GCP project id.
            region: The default location making the API calls. It must have
            the same location as the GCS bucket and must be regional.
            gcs_bucket_name: The location where the vectors will be stored in
            order for the index to be created.
            index_id: The id of the created index.
            endpoint_id: The id of the created endpoint.
            credentials_path: (Optional) The path of the Google credentials on
            the local file system.
            embedding: The :class:`Embeddings` that will be used for
            embedding the texts.

        Returns:
            A configured MatchingEngine with the texts added to the index.
        """
        gcs_bucket_name = cls._validate_gcs_bucket(gcs_bucket_name)
        credentials = cls._create_credentials_from_file(credentials_path)
        index = cls._create_index_by_id(index_id, project_id, region, credentials)
        endpoint = cls._create_endpoint_by_id(
            endpoint_id, project_id, region, credentials
        )

        gcs_client = cls._get_gcs_client(credentials, project_id)
        bigtable_client = cls._get_bigtable_client(credentials, project_id)
        index_client = cls._get_index_client(project_id, region, credentials)
        index_endpoint_client = cls._get_index_endpoint_client(project_id, region, credentials)
        cls._init_aiplatform(project_id, region, gcs_bucket_name, credentials)

        return cls(
            project_id=project_id,
            index=index,
            endpoint=endpoint,
            embedding=embedding or cls._get_default_embeddings(),
            gcs_client=gcs_client,
            index_client=index_client,
            index_endpoint_client=index_endpoint_client,
            credentials=credentials,
            gcs_bucket_name=gcs_bucket_name,
            bigtable_client=bigtable_client,
            bigtable_instance_id=instance_id
        )

    @classmethod
    def _validate_gcs_bucket(cls, gcs_bucket_name: str) -> str:
        """Validates the gcs_bucket_name as a bucket name.

        Args:
              gcs_bucket_name: The received bucket uri.

        Returns:
              A valid gcs_bucket_name or throws ValueError if full path is
              provided.
        """
        gcs_bucket_name = gcs_bucket_name.replace("gs://", "")
        if "/" in gcs_bucket_name:
            raise ValueError(
                f"The argument gcs_bucket_name should only be "
                f"the bucket name. Received {gcs_bucket_name}"
            )
        return gcs_bucket_name

    @classmethod
    def _create_credentials_from_file(
        cls, json_credentials_path: Optional[str]
    ) -> Optional[Credentials]:
        """Creates credentials for GCP.

        Args:
             json_credentials_path: The path on the file system where the
             credentials are stored.

         Returns:
             An optional of Credentials or None, in which case the default
             will be used.
        """

        from google.oauth2 import service_account

        credentials = None
        if json_credentials_path is not None:
            credentials = service_account.Credentials.from_service_account_file(
                json_credentials_path
            )

        return credentials

    @classmethod
    def _create_index_by_id(
        cls, index_id: str, project_id: str, region: str, credentials: "Credentials"
    ) -> MatchingEngineIndex:
        """Creates a MatchingEngineIndex object by id.

        Args:
            index_id: The created index id.
            project_id: The project to retrieve index from.
            region: Location to retrieve index from.
            credentials: GCS credentials.

        Returns:
            A configured MatchingEngineIndex.
        """

        from google.cloud import aiplatform_v1


        logger.debug(f"Creating matching engine index with id {index_id}.")
        index_client = cls._get_index_client(project_id, region, credentials)
        request = aiplatform_v1.GetIndexRequest(name=index_id)
        return index_client.get_index(request=request)

    @classmethod
    def _create_endpoint_by_id(
        cls, endpoint_id: str, project_id: str, region: str, credentials: "Credentials"
    ) -> MatchingEngineIndexEndpoint:
        """Creates a MatchingEngineIndexEndpoint object by id.

        Args:
            endpoint_id: The created endpoint id.
            project_id: The project to retrieve index from.
            region: Location to retrieve index from.
            credentials: GCS credentials.

        Returns:
            A configured MatchingEngineIndexEndpoint.
        """

        from google.cloud import aiplatform

        logger.debug(f"Creating endpoint with id {endpoint_id}.")
        return aiplatform.MatchingEngineIndexEndpoint(
            index_endpoint_name=endpoint_id,
            project=project_id,
            location=region,
            credentials=credentials,
        )

    @classmethod
    def _get_gcs_client(
        cls, credentials: "Credentials", project_id: str
    ) -> "storage.Client":
        """Lazily creates a GCS client.

        Returns:
            A configured GCS client.
        """

        from google.cloud import storage

        return storage.Client(credentials=credentials, project=project_id)

    @classmethod
    def _get_bigtable_client(
        cls, credentials: "Credentials", project_id: str
    ) -> "bigtable.Client":
        """Lazily creates a BigTable client.

        Returns:
            A configured BigTable client.
        """

        from google.cloud import bigtable

        return bigtable.Client(credentials=credentials, project=project_id,  admin=True)

    @classmethod
    def _get_index_client(
        cls, project_id: str, region: str, credentials: "Credentials"
    ) -> "storage.Client":
        """Lazily creates a Matching Engine Index client.

        Returns:
            A configured Matching Engine Index client.
        """

        from google.cloud import aiplatform_v1

        PARENT = f"projects/{project_id}/locations/{region}"
        ENDPOINT = f"{region}-aiplatform.googleapis.com"
        return aiplatform_v1.IndexServiceClient(
            client_options=dict(api_endpoint=ENDPOINT),
            credentials=credentials
        )

    @classmethod
    def _get_index_endpoint_client(
        cls, project_id: str, region: str, credentials: "Credentials"
    ) -> "storage.Client":
        """Lazily creates a Matching Engine Index Endpoint client.

        Returns:
            A configured Matching Engine Index Endpoint client.
        """

        from google.cloud import aiplatform_v1

        PARENT = f"projects/{project_id}/locations/{region}"
        ENDPOINT = f"{region}-aiplatform.googleapis.com"
        return aiplatform_v1.IndexEndpointServiceClient(
            client_options=dict(api_endpoint=ENDPOINT),
            credentials=credentials
        )

    @classmethod
    def _init_aiplatform(
        cls,
        project_id: str,
        region: str,
        gcs_bucket_name: str,
        credentials: "Credentials",
    ) -> None:
        """Configures the aiplatform library.

        Args:
            project_id: The GCP project id.
            region: The default location making the API calls. It must have
            the same location as the GCS bucket and must be regional.
            gcs_bucket_name: GCS staging location.
            credentials: The GCS Credentials object.
        """

        from google.cloud import aiplatform

        logger.debug(
            f"Initializing AI Platform for project {project_id} on "
            f"{region} and for {gcs_bucket_name}."
        )
        aiplatform.init(
            project=project_id,
            location=region,
            staging_bucket=gcs_bucket_name,
            credentials=credentials,
        )

    @classmethod
    def _get_default_embeddings(cls) -> TensorflowHubEmbeddings:
        """This function returns the default embedding.

        Returns:
            Default TensorflowHubEmbeddings to use.
        """
        return TensorflowHubEmbeddings()

Writing matching_engine_bigtable.py


# Create Bigtable

https://cloud.google.com/bigtable/docs/creating-instance#console

Use UI or gcloud, I just use UI to quickly create a bigtable instance

create table:
- name: vectordb
- column family id: vector_text

In [None]:
! gcloud bigtable instances create INSTANCE_ID \
    --display-name=DISPLAY_NAME

In [None]:
! gcloud bigtable instances tables create vectordb \
    --instance=INSTANCE_ID \
    --project=PROJECT_ID \
    --column-families="vector_text"

# Add texts

In [None]:
# @title in case you need the project information
#PROJECT_ID = "argolis-lsj-test" # @param {type: "string"}
#LOCATION = "us-central1"
#ME_REGION          = "us-central1"
#ME_INDEX_NAME      = "langchain-vs-tencent" # @param {type: "string"}
#ME_DIMENSIONS      = 768 # when using Vertex PaLM Embedding
#ME_EMBEDDING_DIR   = "gs://langchain-vs-tencent-bucket" # @param {type: "string"}

In [None]:
# @title get matching engine index information
import matching_engine_utils
mengine = matching_engine_utils.MatchingEngineUtils(PROJECT_ID, ME_REGION, ME_INDEX_NAME)
ME_INDEX_ID, ME_INDEX_ENDPOINT_ID = mengine.get_index_and_endpoint()

In [None]:
# @title bigtable parameters
BIGTABLE_INSTANCE_ID: str = "me0920" # @param {type: "string"}
#request_per_minute = 300

In [None]:
# @title import libraries
from langchain.embeddings import VertexAIEmbeddings
import matching_engine_bigtable
from matching_engine_bigtable import MatchingEngine
import vertexai

In [None]:
# @title init vertexai and embedding function
vertexai.init(project=PROJECT_ID, location=LOCATION)
embedding = VertexAIEmbeddings()
# embedding.model_name = "textembedding-gecko@latest" # for enhanced quality of embedding
# embedding.model_name = "textembedding-gecko-multilingual@latest" # for multilingual embedding

In [None]:
# @title get vector search instance
me_instance = MatchingEngine.from_components(
    project_id=PROJECT_ID,
    region=LOCATION,
    gcs_bucket_name=ME_EMBEDDING_DIR,
    index_id=ME_INDEX_ID,
    endpoint_id=ME_INDEX_ENDPOINT_ID,
    instance_id=BIGTABLE_INSTANCE_ID,
    embedding=embedding)

In [None]:
# @title split file
from langchain.text_splitter import CharacterTextSplitter
from langchain.document_loaders import TextLoader
def build_me_file(file_path=None, chunck_size=1000, seperator="\n\n"):
    # This is a long document we can split up.
    loader = TextLoader(file_path)
    state_of_the_union = loader.load()
    text_splitter = CharacterTextSplitter(
        separator = seperator,
        chunk_size = chunck_size,
        chunk_overlap  = 0,
        length_function = len,
    )
    texts = text_splitter.split_documents(state_of_the_union)
    return texts

texts = []
for chapter_number in range(1,9):
    texts += build_me_file(file_path=f"/content/story.txt", chunck_size=800)
result = [doc.page_content for doc in texts]



In [None]:
# @title add text arrays, collection is supported in Vector Search
me_instance.add_texts(texts=result, collection="wow")

['c994557e05938b6882ffa8805bcfed642c8a2e555e47600229b51c497e6cc409',
 'd9459dc07d5c4ecf349a59ebbe881c84a424758ddbeb9b1f97a2e011da7a8970',
 '2c3ae81472ca401a685a0de41d89af7a4e2e7e1301d3e63aefb6dd0b9c885ace',
 '91a725c48659e7175f4f9b311e248df978a783a4e5ab551a48d00affcda40290',
 'f66dbea6708b869d7cabf29959a040f702c5604870cf6003d642af07d595e2f3',
 '3ed84033f23f72ebe855a53fa67b5741700ddaa1d4e25827ba7b03e043384b23',
 '9c36b362f47826254a597deeb2f363883e3f8d1b343f816f59c42d0de31e2cd8',
 '6cf6806265757e9ac790feb41d0eacffdd95dfaff1d300a7a3ee7847ad9730bd',
 'eec241697c8e5e94b43c12bd348a3e476e032017ee5444d1da9d6d144f2aa378',
 '14663cce15b212a95ca2196de26b7fc33c4089aa443ba3c5ccd044f7ac7dbd01',
 '4674fc6bcc30a29602c0a4cb1fa7daa7e7a42fe4ed94bb131d0cbaaaf9088423',
 '321414203d6e3f63059e018b41f0eb785a89f17c68e2498a7f305514623a9fce',
 '05d2ebfcf0a2dea28ae803aad8b630d6895e442eb656f80a07f5620638f1bd85',
 '0954b3a087b47b84324238e56931ed1bfa5f50c309caf6352091ae1d93aa791b',
 '4b8b71b22f34587a39626987be6212d0

# Test

In [None]:
# @title similarity search demo
me_instance.similarity_search("Who's Thrall's father?", k=2)

[Document(page_content='sorry. Please, join me. We still have things to settle before I leave,\ndon\'t we?"\n"First thing is food," Greymane said, and he pulled up a chair and\nspeared some chicken.\n"You and Wyll are colluding against me,” Anduin sighed. “The sad\nthing is, I\'m glad of it."\nGenn grunted in amusement as Anduin filled his own plate. "I\'ve got\nthe papers drawn up," Genn said.\n"Thank you for handling that. I\'ll sign them right away."\n"Read them first. Doesn\'t matter who wrote it. There\'s a free piece of\nadvice for you.”\nAnduin smiled tiredly. "You\'ve given me quite a bit of free advice."\n"And some you\'re even grateful for, I imagine,” Genn said.\n"All of it. Even what I disagree with and choose to ignore."\n"Ah, now there speaks a wise king." Greymane reached for the bottle\nof wine on the table and filled his glass.\n"No coup planned, then?” Anduin found himself reaching for\nanother helping of chicken. His body was hungry, it would seem, even\nif his mind 

In [None]:
# @title in case you want to delete the vector search collection
#me_instance.delete_collection(collection="wow")

[]

In [None]:
# @title [NOT USED] langchain RetrievalQA demo
from langchain.chains import RetrievalQA
from langchain.chains import LLMChain
from langchain.memory import ConversationBufferMemory
from langchain.agents import ZeroShotAgent, Tool, AgentExecutor
from langchain.llms import VertexAI

import vertexai
vertexai.init(project=PROJECT_ID, location=LOCATION)

LLM_MODEL = "text-bison@latest" #@param {type: "string"}
MAX_OUTPUT_TOKENS = 512 #@param {type: "integer"}
TEMPERATURE = 0.2 #@param {type: "number"}
TOP_P = 0.8 #@param {type: "number"}
TOP_K = 40 #@param {type: "number"}
VERBOSE = True #@param {type: "boolean"}

llm_palm = VertexAI(model_name=LLM_MODEL,
      max_output_tokens=MAX_OUTPUT_TOKENS,
      temperature=TEMPERATURE,
      top_p=TOP_P,
      top_k=TOP_K,
      verbose=VERBOSE)

state_of_union = RetrievalQA.from_chain_type(llm=llm_palm, chain_type="refine", retriever=me_instance.as_retriever(), verbose=True, return_source_documents=False)
tools = [
          Tool(
          name="Vector Search",
          func=state_of_union.run,
          description="search for a query",
          ),
]
PREFIX = """Answer the following questions as best you can. The answer should be as detailed as possible. You have access to the following tools:"""
FORMAT_INSTRUCTIONS = """Use the following format:

Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times. )
Thought: I now know the final answer
Final Answer: the final answer to the original input question. The final answer should be very detailed."""
SUFFIX = """Begin!

Previous conversation history:
{chat_history}

Question: {input}
Thought:{agent_scratchpad}"""

prompt = ZeroShotAgent.create_prompt(
    tools,
    PREFIX,
    SUFFIX,
    FORMAT_INSTRUCTIONS,
    input_variables=["input", "chat_history", "agent_scratchpad"],
)
memory = ConversationBufferMemory(memory_key="chat_history")
llm_chain = LLMChain(llm=llm_palm, prompt=prompt, verbose=True)

zs_agent = ZeroShotAgent(llm_chain=llm_chain, tools=tools, verbose=True)
zs_agent_instance = AgentExecutor.from_agent_and_tools(
    agent=zs_agent, tools=tools, verbose=True, memory=memory
)
COMPLEX_QUERY = "Who's Thrall's father?"
zs_agent_instance.run(COMPLEX_QUERY)



[1m> Entering new AgentExecutor chain...[0m


[1m> Entering new LLMChain chain...[0m
Prompt after formatting:
[32;1m[1;3mAnswer the following questions as best you can. The answer should be as detailed as possible. You have access to the following tools:

Vector Search: search for a query

Use the following format:

Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [Vector Search]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times. )
Thought: I now know the final answer
Final Answer: the final answer to the original input question. The final answer should be very detailed.

Begin!

Previous conversation history:


Question: Who's Thrall's father?
Thought:[0m

[1m> Finished chain.[0m
[32;1m[1;3m Durotan is Thrall's father
Action: Vector Search
Action Input: father of thrall[0m

[1m> Entering

'Durotan'