In [0]:
%pip install dbdemos
dbutils.library.restartPython()
import dbdemos
dbdemos.create_cluster('llm-rag-chatbot')

In [0]:
%pip install --quiet -U transformers==4.41.1 pypdf==4.1.0 langchain-text-splitters==0.2.0 databricks-vectorsearch mlflow tiktoken==0.7.0 torch==2.3.0 llama-index==0.10.43 langchain==0.2.1 langchain_core==0.2.5 langchain_community==0.2.4 
dbutils.library.restartPython()

In [0]:
%run ../_resources/00-init-advanced $reset_all_data=false

Prepare the data by putting the news pdf document in volumes and create the eveluation dataset

In [0]:
# Fetch username dynamically
username = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().get("user").get()

# Create the volume if it does not exist
try:
    spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog}.{db}.pdfdata")
    print("Volume created or already exists.")
except Exception as e:
    print(f"Error creating volume: {e}")
    
# Define source and target paths
workspace_path = f"file:/Workspace/Users/{username}/Gen-AI-Hands-On-Workshop-Part-2/Data/news.pdf"
volumes_path = f"/Volumes/{catalog}/{db}/pdfdata/news.pdf"

# Verify source file existence
print(f"Checking existence of: {workspace_path}")
try:
    dbutils.fs.head(workspace_path, 1)
    print("File exists, proceeding with copy.")
    dbutils.fs.cp(workspace_path, volumes_path)
    print("File copied successfully.")
except Exception as e:
    print(f"Error: {e}")

In [0]:

workspace_file_path = f"file:/Workspace/Users/{username}/Gen-AI-Hands-On-Workshop-Part-2/Data/eval_dataset.csv"

# Load CSV into a DataFrame
df = spark.read.option("header", "true").csv(workspace_file_path)

# Check if the table exists
if not spark.catalog.tableExists(f"{catalog}.{db}.eval_dataset"):
    # Write the DataFrame to a managed Unity Catalog table
    df.write.format("delta").saveAsTable(f"{catalog}.{db}.eval_dataset")
else:
    print(f"Table {catalog}.{db}.{eval_table} already exists.")

%md-sandbox

# 1/ Ingesting and preparing PDF for LLM and Self Managed Vector Search Embeddings

## In this example, we will focus on ingesting pdf documents as source for our retrieval process. 

<img src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/product/chatbot-rag/rag-pdf-self-managed-0.png?raw=true" style="float: right; width: 600px; margin-left: 10px">


For this example, we will add Databricks ebook PDFs from [Databricks resources page](https://www.databricks.com/resources) to our knowledge database.

**Note: This demo is advanced content, we strongly recommend going over the simple version first to learn the basics.**

Here are all the detailed steps:

- Use autoloader to load the binary PDFs into our first table. 
- Use the `unstructured` library  to parse the text content of the PDFs.
- Use `llama_index` or `Langchain` to split the texts into chuncks.
- Compute embeddings for the chunks.
- Save our text chunks + embeddings in a Delta Lake table, ready for Vector Search indexing.


Lakehouse AI not only provides state of the art solutions to accelerate your AI and LLM projects, but also to accelerate data ingestion and preparation at scale, including unstructured data like PDFs.

<!-- Collect usage data (view). Remove it to disable collection or disable tracker during installation. View README for more details.  -->
<img width="1px" src="https://ppxrzfxige.execute-api.us-west-2.amazonaws.com/v1/analytics?category=data-science&org_id=1444828305810485&notebook=%2F03-advanced-app%2F01-PDF-Advanced-Data-Preparation&demo_name=llm-rag-chatbot&event=VIEW&path=%2F_dbdemos%2Fdata-science%2Fllm-rag-chatbot%2F03-advanced-app%2F01-PDF-Advanced-Data-Preparation&version=1">

## Ingesting Databricks ebook PDFs and extracting their pages

<img src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/product/chatbot-rag/rag-pdf-self-managed-1.png?raw=true" style="float: right" width="500px">

First, let's ingest our PDF news data as a Delta Lake table with path and content in binary format. 

We'll use [Databricks Autoloader](https://docs.databricks.com/en/ingestion/auto-loader/index.html) to incrementally ingeset new files, making it easy to incrementally consume billions of files from the data lake in various data formats. Autoloader easily ingests our unstructured PDF data in binary format.


In [0]:
# List our raw PDF docs
volume_folder =  f"/Volumes/{catalog}/{db}"

print(f"Reading data from UC Volume: {volume_folder}")

raw_df = (
    spark.read.format("binaryFile")
    .option("recursiveFileLookup", "true")
    .option("pathGlobFilter", "*.pdf")
    .load(volume_folder+"/pdfdata")
)

display(raw_df)

Observe that in our raw_df Spark DataFrame above, we have 1 row (for our 1 document), where the contents of that file are in the content column. You may have seen other spark.read formats, but it is common to use the binaryFile format when working with documents (PDFs, DOCX, etc.) then convert the binary contents later.

We can now parse out the binary contents! The below code uses a Spark UDF to extract information. While this code could be written as a traditional Python function, adding the @F.udf() decorator allows it to be parallelized across many workers if we have dozens/hundreds/millions of documents to process.

In [0]:
@F.udf(
    returnType=StructType(
        [
            StructField("number_pages", IntegerType(), nullable=True),
            StructField("text", StringType(), nullable=True),
            StructField("status", StringType(), nullable=False),
        ]
    ),
    # useArrow=True, # set globally
)
def parse_pdf(pdf_raw_bytes):
    try:
        pdf = io.BytesIO(pdf_raw_bytes)
        reader = PdfReader(pdf)
        output_text = ""
        for _, page_content in enumerate(reader.pages):
            #PyPDF docs: https://pypdf.readthedocs.io/en/stable/user/extract-text.html
            output_text += page_content.extract_text( 
                extraction_mode="layout", 
                layout_mode_space_vertically=False,
                ) + "\n\n"

        return {
            "number_pages": len(reader.pages),
            "text": output_text,
            "status": "SUCCESS",
        }
    except Exception as e:
        return {"number_pages": None, "text": None, "status": f"ERROR: {e}"}

In [0]:
# Apply UDF to the binary "content" column
parsed_df = (
  raw_df.withColumn("parsed_output", parse_pdf("content"))
        .drop("content") # For brevity
        .drop("length")  # For brevity
)

display(parsed_df)

<img src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/product/chatbot-rag/rag-pdf-self-managed-2.png?raw=true" style="float: right" width="500px">

## Extracting our PDF content as text chunks

We need to convert the PDF documents bytes to text, and extract chunks from their content.

This part can be tricky as PDFs are hard to work with and can be saved as images, for which we'll need an OCR to extract the text.

Using the `Unstructured` library within a Spark UDF makes it easy to extract text. 

*Note: Your cluster will need a few extra libraries that you would typically install with a cluster init script.*

<br style="clear: both">

### Splitting our big documentation page in smaller chunks

<img src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/product/chatbot-rag/chunk-window-size.png?raw=true" style="float: right" width="700px">

In this demo, some PDFs are very large, with a lot of text.

We'll extract the content and then use llama_index `SentenceSplitter`, and ensure that each chunk isn't bigger than 500 tokens. 


The chunk size and chunk overlap depend on the use case and the PDF files. 

Remember that your prompt+answer should stay below your model max window size (4096 for llama2). 

For more details, review the previous [../01-Data-Preparation](01-Data-Preparation) notebook. 

<br/>
<br style="clear: both">
<div style="background-color: #def2ff; padding: 15px;  border-radius: 30px; ">
  <strong>Information</strong><br/>
  Remember that the following steps are specific to your dataset. This is a critical part to building a successful RAG assistant.
  <br/> Always take time to review the chunks created and ensure they make sense and contain relevant information.
</div>

In [0]:
# Set chunking params
chunk_size_tokens = 2000
chunk_overlap_tokens = 200

# Instantiate tokenizer
## Read more here: https://huggingface.co/transformers/v3.0.2/model_doc/auto.html#autotokenizer
tokenizer = AutoTokenizer.from_pretrained('hf-internal-testing/llama-tokenizer')

# Create UDF to recursively split text
## For other splitting approaches, see accompanying notebook
@F.udf(returnType=ArrayType(StringType())
          # useArrow=True, # set globally
          )
def split_char_recursive(content: str) -> List[str]:
    # Adding regex to remove ellipsis
    pattern = r'\.{3,}'
    cleaned_content = re.sub(pattern, '', content)
    # Use Hugging Face's CharacterTextSplitter
    text_splitter = CharacterTextSplitter.from_huggingface_tokenizer(
        tokenizer, 
        separator = " ",
        chunk_size=chunk_size_tokens, 
        chunk_overlap=chunk_overlap_tokens
    )
    chunks = text_splitter.split_text(cleaned_content)
    return [doc for doc in chunks]

# Apply Chunking
chunked_df = (
  parsed_df.select(
    "*", 
    F.explode(split_char_recursive("parsed_output.text")).alias("chunked_text")
  )
  .drop(F.col("parsed_output"))
  .withColumn("chunk_id", F.md5(F.col("chunked_text")))
)

# Printouts to review results
num_chunks = chunked_df.count()
print(f"Number of chunks: {num_chunks}")

avg_chunk_words = chunked_df.withColumn("word_count", F.size(F.split(F.col("chunked_text"), " "))).select(F.avg(F.col("word_count"))).first()[0]
print(f"Average words per chunk: {avg_chunk_words}")

display(chunked_df)

## 4. Write the final DataFrame to a Delta table

We will save our final results to a table that we will then use as the basis for a vector search index to perform similarlity search. This step is pretty straightforward: we are persisting the results to a permanent location so we can re-use them later. 

The code below will overwrite so you can run it more than once (but in production, you would append new chunks and/or merge existing ones). We need to enable [Change Data Feed](https://docs.databricks.com/en/delta/delta-change-data-feed.html) to allow Vector Search to monitor for changes tot his table

In [0]:
chunked_table_name = "chunked_news"
full_table_location = f"{catalog}.{db}.{chunked_table_name}"

print(f"Saving data to UC table: {full_table_location}")

(
  chunked_df.write
    .format("delta")
    .option("delta.enableChangeDataFeed", "true")
    .mode("overwrite")
    .saveAsTable(full_table_location)
)

# # We need to enable Change Data Feed on our Delta table to use it for Vector Search. If your table was already created, you can alter it:
# spark.sql(f"ALTER TABLE {full_table_location} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")

## What's required for our Vector Search Index

<img src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/product/chatbot-rag/databricks-vector-search-type.png?raw=true" style="float: right" width="800px">

Databricks provide multiple types of vector search indexes:

- **Managed embeddings**: you provide a text column and endpoint name and Databricks synchronizes the index with your Delta table 
- **Self Managed embeddings**: you compute the embeddings and save them as a field of your Delta Table, Databricks will then synchronize the index
- **Direct index**: when you want to use and update the index without having a Delta Table.

In this demo, we will show you how to setup a **Self-managed Embeddings** index. 

To do so, we will have to first compute the embeddings of our chunks and save them as a Delta Lake table field as `array&ltfloat&gt`

## Introducing Databricks GTE Embeddings Foundation Model endpoints

<img src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/product/chatbot-rag/rag-pdf-self-managed-4.png?raw=true" style="float: right; width: 600px; margin-left: 10px">

Foundation Models are provided by Databricks, and can be used out-of-the-box.

Databricks supports several endpoint types to compute embeddings or evaluate a model:
- DBRX Instruct, a **foundation model endpoint**, or another model served by databricks (ex: llama2-70B, MPT...)
- An **external endpoint**, acting as a gateway to an external model (ex: Azure OpenAI)
- A **custom**, fined-tuned model hosted on Databricks model service

Open the [Model Serving Endpoint page](/ml/endpoints) to explore and try the foundation models.

For this demo, we will use the foundation model `GTE` (embeddings) and `DBRX` (chat). <br/><br/>

<img src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/product/chatbot-rag/databricks-foundation-models.png?raw=true" width="600px" >

In [0]:
from mlflow.deployments import get_deploy_client

# gte-large-en Foundation models are available using the /serving-endpoints/databricks-gtegte-large-en/invocations api. 
deploy_client = get_deploy_client("databricks")

## NOTE: if you change your embedding model here, make sure you change it in the query step too
embeddings = deploy_client.predict(endpoint="databricks-gte-large-en", inputs={"input": ["What is Apache Spark?"]})
pprint(embeddings)

### Setup via UI
You can perform this step from the UI: 

1. Navigate on the left side to Catalog > {your workshop catalog+schema} > Tables

2. Click on the table you created in the last step (e.g. `chunked_product_manuals`)

3. Once on the table screen: at the top-right, click Create > Vector Search Index

4. Fill in these details:
  * Enter index name: `product_manuals_index`
  * Primary key: `chunk_id`
  * Endpoint: `vs_endpoint_1` <-- replace `1` with a different number if errors occur
    * If you get an error creating, the index is full and you should create another one.
  * Embedding source: `Compute embeddings`
  * Embedding source column: `chunked_text`
  * Embedding model: `databricks-gte-large-en`
  * Sync mode: `Triggered`

For more details, see documentation: [Create index using the UI
](https://docs.databricks.com/en/generative-ai/create-query-vector-search.html#create-index-using-the-ui)

### Setup Vector Search Index programmatically.
`You can skip this step if you created the index in the UI`

The below code completes two steps: 
1. Sets up a Vector Search **Endpoint**. This is the **compute** that hosts your index, and an endpoint can host multiple indices
2. Sets up a Vector Search **Index**. This is the **online replica** of your Delta table we will use in our RAG application

Full documentation: [Create index using the Python SDK](https://docs.databricks.com/en/generative-ai/create-query-vector-search.html#create-index-using-the-python-sdk)

**NOTE**: The cell below should run in 5-10 minutes, and will show as completed when the Endpoint is ready.


### Our dataset is now ready! Let's create our Self-Managed Vector Search Index.

<img src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/product/chatbot-rag/rag-pdf-self-managed-3.png?raw=true" style="float: right; width: 600px; margin-left: 10px">

Our dataset is now ready. We chunked the documentation pages into small sections, computed the embeddings and saved it as a Delta Lake table.

Next, we'll configure Databricks Vector Search to ingest data from this table.

Vector search index uses a Vector search endpoint to serve the embeddings (you can think about it as your Vector Search API endpoint). <br/>
Multiple Indexes can use the same endpoint. Let's start by creating one.

If a vector search index is ready, use the name of the available index and set it up in config, else creare a new vector search index

In [0]:
from databricks.vector_search.client import VectorSearchClient
vsc = VectorSearchClient()

if not endpoint_exists(vsc, VECTOR_SEARCH_ENDPOINT_NAME):
    vsc.create_endpoint(name=VECTOR_SEARCH_ENDPOINT_NAME, endpoint_type="STANDARD")

wait_for_vs_endpoint_to_be_ready(vsc, VECTOR_SEARCH_ENDPOINT_NAME)
print(f"Endpoint named {VECTOR_SEARCH_ENDPOINT_NAME} is ready.")


You can view your endpoint on the [Vector Search Endpoints UI](#/setting/clusters/vector-search). Click on the endpoint name to see all indexes that are served by the endpoint.

In [0]:
# One the endpoint is ready, lets create the Index
index_name = "news_index"
full_index_location = f"{catalog}.{db}.{index_name}"

# Check first to see if index already exists
if not index_exists(vsc, VECTOR_SEARCH_ENDPOINT_NAME, full_index_location):
  print(f"Creating index {full_index_location} on endpoint {VECTOR_SEARCH_ENDPOINT_NAME}...")
  vsc.create_delta_sync_index(
    endpoint_name =                 VECTOR_SEARCH_ENDPOINT_NAME, # The endpoint where you want to host the index
    index_name =                    full_index_location, # Where in UC you want the index to be created
    source_table_name =             full_table_location, #The UC location of the offline source table
    pipeline_type =                 "TRIGGERED", # Set so we can manually refresh the index
    primary_key =                   "chunk_id", # The primary key of each chunk
    embedding_source_column =       "chunked_text", # The column containing chunked text
    embedding_model_endpoint_name = "databricks-bge-large-en" # The embedding model we want to use
  )
  # Creating the index will take a few moments. Navigate to the Catalog UI to take a look!
  wait_for_index_to_be_ready(vsc, VECTOR_SEARCH_ENDPOINT_NAME, full_index_location)
else:
  # If the index already exists, let's force a refresh using the .sync() method
  wait_for_index_to_be_ready(vsc, VECTOR_SEARCH_ENDPOINT_NAME, full_index_location)
  vsc.get_index(VECTOR_SEARCH_ENDPOINT_NAME, full_index_location).sync()

print(f"index {full_index_location} on table {full_table_location} is ready")

## Searching for similar content

That's all we have to do. Databricks will automatically capture and synchronize new entries in your Delta Lake Table.

Note that depending on your dataset size and model size, index creation can take a few seconds to start and index your embeddings.

Let's give it a try and search for similar content.

*Note: `similarity_search` also supports a filters parameter. This is useful to add a security layer to your RAG system: you can filter out some sensitive content based on who is doing the call (for example filter on a specific department based on the user preference).*

In [0]:
# If you have ingested a different PDF document, change this question to something the document mentions
question = "What happened with unitedheathcare?"

results = vsc.get_index(VECTOR_SEARCH_ENDPOINT_NAME, full_index_location).similarity_search(
  query_text=question,
  columns=["chunk_id", "chunked_text"],
  num_results=2)

docs = results.get('result', {}).get('data_array', [])
docs

## Next step: Deploy our chatbot model with RAG

We've seen how Databricks Lakehouse AI makes it easy to ingest and prepare your documents, and deploy a Self Managed Vector Search index on top of it with just a few lines of code and configuration.

This simplifies and accelerates your data projects so that you can focus on the next step: creating your realtime chatbot endpoint with well-crafted prompt augmentation.

Open the [02-Advanced-Chatbot-Chain]($./02-Advanced-Chatbot-Chain) notebook to create and deploy a chatbot endpoint.