In [None]:
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# **Initialize BQML Models and Vector Search**

## Overview

This notebook performs two initializations:
1. Trains BQML models to predict expenses and detect unusual expenditures for Cymbal Bank Customers
2. Indexing the Cymbal Bank website using Vector Search for Retrieval Augmented-Generation

### Install Vertex AI SDK and other required packages

In [None]:
!pip install --upgrade google-cloud-bigquery
!pip install --upgrade langchain nest_asyncio google-cloud-aiplatform
!pip install --upgrade google-cloud-aiplatform
!pip install tensorflow_hub==0.13.0 tensorflow_text==2.12.1

### Restart runtime
To use the newly installed packages in this Jupyter runtime, you must restart the runtime. You can do this by running the cell below, which restarts the current kernel.

The restart might take a minute or longer. After it's restarted, continue to the next step.

In [None]:
import IPython

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

### Authenticate your notebook environment (Colab only)
If you are running this notebook on Google Colab, run the following cell to authenticate your environment. This step is not required if you are using Vertex AI Workbench.

In [None]:
import sys

# Additional authentication is required for Google Colab
if "google.colab" in sys.modules:
    # Authenticate user to Google Cloud
    from google.colab import auth

    auth.authenticate_user()

### Setup Project ID and Region

In [None]:
PROJECT_ID = "your-gcp-project-id"  # @param {type:"string"}
REGION = "us-central1"  # @param {type:"string"}

### Import libraries


In [None]:
import os

# Utils
import time
import urllib.request
from typing import List

# LangChain
import nest_asyncio
import requests
import vertexai
from bs4 import BeautifulSoup

# Vertex AI
from google.cloud import bigquery
from langchain.document_loaders import WebBaseLoader
from langchain.embeddings import VertexAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter

## 1. **BQML Model Initialization**

There are two features for which BQML models are used in CymBuddy assistant:
1. **Expense Prediction** uses Arima Plus time series forecasting model
   - Model is created to predict future expenses per month per user for the following categories:
   
      | Category | Subcategory|
      |----------|------------|
      | Earning | Passive |
      | Earning | Active |
      | Needs | Transportation |
      | Needs | Housing |
      | Needs | Food and Groceries |
      | Needs | Healthcare |
      | Needs | Education |
      | Wants | Travel |
      | Wants | Entertainment |
      | Miscellaneous | Miscellaneous |

2. **Unusual Expenditure detection** uses Auto Encoder model

In [2]:
def create_expense_prediction_model(project_id):
    client = bigquery.Client()
    query_train_arima = f"""
  CREATE OR REPLACE MODEL
  `{project_id}.ExpensePrediction.expense_prediction_model` OPTIONS(MODEL_TYPE='ARIMA_PLUS',
  TIME_SERIES_TIMESTAMP_COL='month_year',
  TIME_SERIES_DATA_COL='transaction_amount',
  TIME_SERIES_ID_COL=['ac_id','category','sub_category'],
  HOLIDAY_REGION='in') AS
  SELECT
    month_year, transaction_amount, ac_id, category,sub_category
  FROM
  `{project_id}.ExpensePrediction.training_data`
  """
    job = client.query(query_train_arima)

    # wait for the model to complete training
    job.result()

In [3]:
def create_unusual_spends_model(project_id):
    client = bigquery.Client()
    query_train_arima = f"""
  CREATE MODEL OR REPLACE MODEL `{project_id}.ExpensePrediction.unusual_spend3`
  OPTIONS(
    model_type='autoencoder',
    activation_fn='relu',
    batch_size=8,
    dropout=0.2,
    hidden_units=[32, 16, 4, 16, 32],
    learn_rate=0.001,
    l1_reg_activation=0.0001,
    max_iterations=10,
    optimizer='adam'
  ) AS
  SELECT
    transaction_amount, country
  FROM
    `{project_id}.DummyBankDataset.AccountTransactions`;
  """
    job = client.query(query_train_arima)

    # wait for the model to complete training
    job.result()

In [None]:
create_expense_prediction_model(PROJECT_ID)
create_unusual_spends_model(PROJECT_ID)

## 2. **Initialize Vector Store for Retrieval Augmented-Generation (RAG)**

RAG is a technique that helps large language models (LLMs) access and use facts from external sources, for example, websites. This makes their answers more accurate, reliable, and up-to-date. RAG works by finding relevant information based on a question and then giving this information to the language model to help it generate a better answer

Following is the RAG architecture used for question answering:

![image.png](images/RAG_architecture.png) 

This colab builds the index in Vector Search by using the following steps:
- Read the Cymbal Bank website
- Split them in chunks. Website metadata, e.g. website url, is also stored in the chunks
- Create embeddings for those chunks
    - Store the chunks in a GCS bucket
- Build the index in Vector Search using the embeddings

### Initialize Vertex AI

In [None]:
vertexai.init(project=PROJECT_ID, location=REGION)

### Build the index

In [4]:
def init_me_libs():
    if not os.path.exists("utils"):
        os.makedirs("utils")

    url_prefix = "https://raw.githubusercontent.com/GoogleCloudPlatform/generative-ai/main/language/use-cases/document-qa/utils"
    files = ["__init__.py", "matching_engine.py", "matching_engine_utils.py"]

    for fname in files:
        urllib.request.urlretrieve(f"{url_prefix}/{fname}", filename=f"utils/{fname}")


init_me_libs()

from utils.matching_engine import MatchingEngine
from utils.matching_engine_utils import MatchingEngineUtils


# helper function to list all the urls present on a particular web page
def get_urls(url):
    reqs = requests.get(url)
    soup = BeautifulSoup(reqs.text, "html.parser")

    urls = []
    for link in soup.find_all("a"):
        urls.append(link.get("href"))


def load_website_content():
    nest_asyncio.apply()
    website_homepage = "add-url-of-cloud-run-website"  # @param {type:"string"}

    # if all the web pages to be indexed are linked on the website homepage, the following step is sufficient
    urls = get_urls(website_homepage)

    loader = WebBaseLoader(
        urls
    )  # add any other urls that need to indexed which are not linked on the website homepage
    loader.requests_per_second = 1

    documents = loader.aload()
    return documents


def chunk_documents(documents):
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=1000,
        chunk_overlap=50,
        separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""],
    )
    doc_splits = text_splitter.split_documents(documents)

    # Add chunk number to metadata
    for idx, split in enumerate(doc_splits):
        split.metadata["chunk"] = idx

    print(f"# of documents = {len(doc_splits)}")
    return doc_splits


# Utility functions for Embeddings API with rate limiting
def rate_limit(max_per_minute):
    period = 60 / max_per_minute
    print("Waiting")
    while True:
        before = time.time()
        yield
        after = time.time()
        elapsed = after - before
        sleep_time = max(0, period - elapsed)
        if sleep_time > 0:
            print(".", end="")
            time.sleep(sleep_time)


class BaseModelMixin:
    requests_per_minute: int
    num_instances_per_batch: int

    # Overriding embed_documents method
    def embed_documents(self, texts: List[str]):
        limiter = rate_limit(self.requests_per_minute)
        results = []
        docs = list(texts)

        while docs:
            # Working in batches because the API accepts maximum 5
            # documents per request to get embeddings
            head, docs = (
                docs[: self.num_instances_per_batch],
                docs[self.num_instances_per_batch :],
            )
            chunk = self.client.get_embeddings(head)
            results.extend(chunk)
            next(limiter)

        return [r.values for r in results]


class CustomVertexAIEmbeddings(VertexAIEmbeddings, BaseModelMixin):
    pass

In [None]:
# Embeddings API integrated with langChain
EMBEDDING_QPM = 100
EMBEDDING_NUM_BATCH = 5
embeddings = CustomVertexAIEmbeddings(
    requests_per_minute=EMBEDDING_QPM,
    num_instances_per_batch=EMBEDDING_NUM_BATCH,
)

ME_REGION = REGION
ME_INDEX_NAME = f"{PROJECT_ID}-me-index"
ME_EMBEDDING_DIR = f"{PROJECT_ID}-me-bucket"
ME_DIMENSIONS = 768  # when using Vertex PaLM Embedding

m_engine = MatchingEngineUtils(PROJECT_ID, ME_REGION, ME_INDEX_NAME)

ME_INDEX_ID, ME_INDEX_ENDPOINT_ID = m_engine.get_index_and_endpoint()
print(f"ME_INDEX_ID={ME_INDEX_ID}")
print(f"ME_INDEX_ENDPOINT_ID={ME_INDEX_ENDPOINT_ID}")

# initialize vector store
me = MatchingEngine.from_components(
    project_id=PROJECT_ID,
    region=ME_REGION,
    gcs_bucket_name=f"gs://{ME_EMBEDDING_DIR}".split("/")[2],
    embedding=embeddings,
    index_id=ME_INDEX_ID,
    endpoint_id=ME_INDEX_ENDPOINT_ID,
)

documents = load_website_content()

doc_splits = chunk_documents(documents)
# Store docs as embeddings in Matching Engine index
# It may take a while since API is rate limited
texts = [doc.page_content for doc in doc_splits]
metadatas = [
    [
        {"namespace": "source", "allow_list": [doc.metadata["source"]]},
        {"namespace": "title", "allow_list": [doc.metadata["title"]]},
        {"namespace": "chunk", "allow_list": [str(doc.metadata["chunk"])]},
    ]
    for doc in doc_splits
]

doc_ids = me.add_texts(texts=texts, metadatas=metadatas)