
<div style="text-align: center; line-height: 0; padding-top: 9px;">  
  <img src="/Volumes/cmidev/default/preventech/DBX_hack/compnovai_logo.PNG" alt="Image">  
</div> 


# Preparing Data for Mode Building and Prediction
This notebook takes care of below: 

* Split the document data into chunks that are at least as small as the maximum context window of the LLM to be used later.

* Choose an embedding model.

* Compute embeddings for each of the chunks using a Databricks-managed embedding model.

* Use the chunking strategy to divide up the context information to be provided to a model.

* Store embeddings into a table for further use



## Requirements

Please review the following requirements before starting the lesson:

* To run this notebook, you need to use one of the following Databricks runtime(s): **14.3.x-cpu-ml-scala2.12 14.3.x-scala2.12**



<div style="text-align: center; line-height: 0; padding-top: 9px;">  
  <img src="/Volumes/cmidev/default/preventech/DBX_hack/embeddings_Creation.PNG" alt="Image">  
</div> 


## Setup Installation

Install required libraries.

In [0]:
%pip install --quiet PyMuPDF mlflow==2.14.3 transformers==4.44.0 "unstructured[pdf,docx]==0.14.10" llama-index==0.10.62 pydantic==2.8.2 accelerate
dbutils.library.restartPython()

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
databricks-feature-engineering 0.2.1 requires pyspark<4,>=3.1.2, which is not installed.
ydata-profiling 4.2.0 requires pydantic<2,>=1.8.1, but you have pydantic 2.8.2 which is incompatible.
databricks-sdk 0.1.6 requires requests<2.29.0,>=2.28.1, but you have requests 2.32.3 which is incompatible.
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


In [0]:
%pip install --upgrade urllib3 transformers
%pip uninstall -y transformers
%pip install transformers

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting urllib3
  Using cached urllib3-2.2.3-py3-none-any.whl (126 kB)
Collecting transformers
  Downloading transformers-4.44.2-py3-none-any.whl (9.5 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 9.5/9.5 MB 26.2 MB/s eta 0:00:00
Installing collected packages: urllib3, transformers
  Attempting uninstall: urllib3
    Found existing installation: urllib3 1.26.20
    Uninstalling urllib3-1.26.20:
      Successfully uninstalled urllib3-1.26.20
  Attempting uninstall: transformers
    Found existing installation: transformers 4.44.0
    Uninstalling transformers-4.44.0:
      Successfully uninstalled transformers-4.44.0
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
databricks-feature-engineering 0.2.1 requires pyspark<4,>=3.1.2, which is not ins

## Extract PDF Content as Text Chunks

As the first step, we need to ingest PDF files and divide the content into chunks. PDF files are already downloaded during the course step and stored in **datasets path**.

In [0]:
# Reduce the arrow batch size as our PDF can be big in memory
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 10)

In [0]:
#cmidev.cbu_ccims_poc.ccims_claim_inspection_feature

# Read feature table

In [0]:
table_name = f"cmidev.cbu_ccims_poc.ccims_claim_inspection_feature"
df =spark.readStream.table(table_name)

In [0]:
# Function to clean up the extracted text (optional)
import re
def clean_extracted_text(text):
    text = re.sub(r'\n', '', text)
    return re.sub(r' ?\.', '.', text)


def pprint(obj):
  import pprint
  pprint.pprint(obj, compact=True, indent=1, width=100)

In [0]:
def extract_doc_text(df,row_id):
    txt = df.collect()[row_id][20]
    return txt    

In [0]:
#extract_doc_text(df,row_id=5)

# Preprocess and chunk text using pretrained model tokenizer

In [0]:
import io
import os
import pandas as pd 

from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.schema import Document
from llama_index.core.utils import set_global_tokenizer
from transformers import AutoTokenizer
from typing import Iterator
from pyspark.sql.functions import col, udf, length, pandas_udf, explode
from unstructured.partition.auto import partition


@pandas_udf("array<string>")
def read_as_chunk(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    # set llama2 as tokenizer
    set_global_tokenizer(
      AutoTokenizer.from_pretrained("hf-internal-testing/llama-tokenizer")
    )
    # sentence splitter from llama_index to split on sentences
    splitter = SentenceSplitter(chunk_size=100, chunk_overlap=20)
    def extract_and_split(txt):
      # txt = extract_doc_text(b)
      nodes = splitter.get_nodes_from_documents([Document(text=txt)])
      return [n.text for n in nodes]

    for x in batch_iter:
        yield x.apply(extract_and_split)

2024-09-18 02:06:39.873851: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


# Perform chunking of each field in document array for each claim

In [0]:
from mlflow.deployments import get_deploy_client


# gte-large-en Foundation models are available using the /serving-endpoints/databricks-gte-large-en/invocations api. 
deploy_client = get_deploy_client("databricks")

# NOTE: if you change your embedding model here, make sure you change it in the query step too
embeddings = deploy_client.predict(endpoint="databricks-gte-large-en", inputs={"input": ["What is Apache Spark?"]})
pprint(embeddings)

{'data': [{'embedding': [1.0458984375, -0.052520751953125, -0.281982421875, -0.0909423828125,
                         0.095458984375, -0.1343994140625, 0.1583251953125, 0.281982421875,
                         0.763671875, 0.415771484375, 0.60791015625, -0.06201171875,
                         0.10443115234375, 0.58984375, -0.40087890625, 0.35498046875, 0.58837890625,
                         0.4326171875, 0.06085205078125, -0.2039794921875, -0.59326171875,
                         0.099853515625, -0.33447265625, -2.966796875, 0.3046875, 0.422607421875,
                         0.0034198760986328125, -0.09210205078125, -0.8515625, 0.03375244140625,
                         0.489990234375, 0.00843048095703125, -1.41015625, 0.417724609375,
                         0.63525390625, -0.272705078125, -0.3408203125, 1.3095703125,
                         -0.280517578125, -0.716796875, -0.223876953125, -0.1217041015625,
                         -0.7685546875, 0.09429931640625, 1.384765625, -0.

# Compute Chunking Embeddings

In [0]:
from mlflow.exceptions import MlflowException
import logging

# Function to create embeddings using gte_large_en endpoint

In [0]:
import mlflow.deployments
deploy_client = mlflow.deployments.get_deploy_client("databricks")

@pandas_udf("array<float>")
def get_embedding(contents: pd.Series) -> pd.Series:
    
    def get_embeddings(batch):
        # NOTE: this will fail if an exception is thrown during embedding creation (add try/except if needed) 
         try:
            # Convert the batch to a list of strings
            string_batch = [str(item) for item in batch]  # Ensure all items are strings
            emb_endpoint_name = "databricks-gte-large-en"# "databricks-bge-large-en"
            response = deploy_client.predict(endpoint=emb_endpoint_name, inputs={"input": string_batch})
            return [e["embedding"] for e in response.data]
         except MlflowException as e:
            logging.error(f"Error occurred while generating embeddings: {e}")
            return [None] * len(batch)  # Return None for failed embeddings
    # splitting the contents into batches of 150 items each, since the embedding model takes at most 150 inputs per request.
    max_batch_size = 150
    batches = [contents.iloc[i:i + max_batch_size] for i in range(0, len(contents), max_batch_size)]

    # process each batch and collect the results
    all_embeddings = []
    for batch in batches:
        all_embeddings += get_embeddings(batch.tolist())

    return pd.Series(all_embeddings)

## Save Embeddings to a Delta Table

Now that the embeddings are ready, let's create a Delta table and store the embeddings in this table.

In [0]:
# %sql
# DROP TABLE IF EXISTS cmidev.cbu_ccims_poc.claim_text_embeddings

In [0]:
%sql
CREATE EXTERNAL TABLE IF NOT EXISTS cmidev.cbu_ccims_poc.claim_text_embeddings_gte (
  id BIGINT GENERATED BY DEFAULT AS IDENTITY,
  CLAIM_ID_SEQ STRING,
  DOCUMENT_CHUNK STRING,
  embedding ARRAY <FLOAT>
  -- NOTE: the table has to be CDC because VectorSearch is using DLT that is requiring CDC state
  ) 
  STORED AS PARQUET
  LOCATION '<your_path_here>'
  TBLPROPERTIES (delta.enableChangeDataFeed = true);

# Batch operation to write data from df_chunk_emd into the embeddings delta table

In [0]:
target_tbl = "cmidev.cbu_ccims_poc.claim_text_embeddings"

# define a function to append each batch of data to the Delta table  
def append_to_delta_table(batch_df, batch_id):  
    from pyspark.sql.functions import array, explode

    # Assuming df has a column named "DOCUMENT" of type STRUCT
    # and you want to create an array of some specific fields within this STRUCT
    df_transformed = batch_df.withColumn("DOCUMENT_ARRAY", array("DOCUMENT.OEM_CODE",
                                                        "DOCUMENT.DEALER",
                                                        "DOCUMENT.DISTR","DOCUMENT.FC",
                                                        "DOCUMENT.ENGINE_NAME_DESC","DOCUMENT.FAILCODE",
                                                        "DOCUMENT.SHOPORDERNUM",
                                                        "DOCUMENT.FAILURE_MODE_BUCKET",
                                                        "DOCUMENT.CLEANED_CORRECTION"))

    df_chunks = spark.createDataFrame([], schema='CLAIM_ID_SEQ INT, DOCUMENT_CHUNK STRING')

    df_chunks = (df_transformed
                    .withColumn("DOCUMENT_CHUNK", explode(df_transformed["DOCUMENT_ARRAY"]))
                    .selectExpr('CLAIM_ID_SEQ', 'DOCUMENT_CHUNK')
                )

    df_chunk_emd = (df_chunks 
                .withColumn("embedding", get_embedding("DOCUMENT_CHUNK"))
                .selectExpr("CLAIM_ID_SEQ", "DOCUMENT_CHUNK", "embedding")
                )

    df_chunk_emd.write.mode("append").option("path","abfss://cbu-ccims-poc@cmidevengineraw.dfs.core.windows.net/claim_text_embeddings")\
        .saveAsTable(target_tbl)
  
# write the PySpark DataFrame to the Delta table in batches using foreachBatch()  
df.writeStream \
    .foreachBatch(append_to_delta_table) \
    .option("checkpointLocation", "abfss://cbu-ccims-poc@cmidevengineraw.dfs.core.windows.net/checkpoint/claim_text_embeddings") \
    .trigger(Trigger.AvailableNow()) \
    .start()


## Conclusion

This notebook completed processing documents by extracting text chunks from feature table, and then by creating embeddings using a foundation model. This process included setting up an enpoint and computing embeddings for the chunks. In the final step, we stored computed embeddings in a Delta table.