
# 1/ Ingesting and preparing PDF for LLM and Pinecone Vector Database

## In this example, we will focus on ingesting pdf documents as source for our retrieval process. 

<img src="https://github.com/prasadkona/databricks_demos/blob/main/images/llm-rag-full-pinecone-1.png?raw=true" style="float: right; width: 600px; margin-left: 10px">


For this example, we will add Databricks ebook PDFs from [Databricks resources page](https://www.databricks.com/resources) to our knowledge database.

**Note: This demo is an advanced content, we strongly recommand going over the simple version first to learn the basics.**

Here are all the detailed steps:

- Use autoloader to load the binary PDF as our first table. 
- Use the `unstructured` library  to parse the text content of the PDFs.
- Use `llama_index` or `Langchain` to split the texts into chuncks.
- Compute embeddings for the chunks
- Save our text chunks + embeddings in a Delta Lake table
- Write to Pinecone vector database.


Lakehouse AI not only provides state of the art solutions to accelerate your AI and LLM projects, but also to accelerate data ingestion and preparation at scale, including unstructured data like pdfs.

<!-- Collect usage data (view). Remove it to disable collection or disable tracker during installation. View README for more details.  -->
<img width="1px" src="https://ppxrzfxige.execute-api.us-west-2.amazonaws.com/v1/analytics?category=data-science&org_id=1444828305810485&notebook=advanced/01-PDF-Advanced-Data-Preparation&demo_name=chatbot-rag-llm&event=VIEW">

In [0]:
%pip install -U transformers==4.41.1 pypdf==4.1.0 langchain-text-splitters==0.2.0 mlflow==2.15.1 tiktoken==0.7.0 torch==2.3.0 llama-index==0.10.43 pinecone-client==5.0.1
dbutils.library.restartPython()

In [0]:
%run ../_resources/00-init-advanced $reset_all_data=false

USE CATALOG `prasad_kona_dev`
using catalog.database `prasad_kona_dev`.`rag_chatbot_prasad_kona`


DataFrame[]

## Ingesting Databricks ebook PDFs and extracting their pages

<img src="https://github.com/prasadkona/databricks_demos/blob/main/images/llm-rag-full-pinecone-2.png?raw=true" style="float: right" width="500px">

First, let's ingest our PDFs as a Delta Lake table with path urls and content in binary format. 

We'll use [Databricks Autoloader](https://docs.databricks.com/en/ingestion/auto-loader/index.html) to incrementally ingest new files, making it easy to incrementally consume billions of files from the data lake in various data formats. Autoloader can easily ingests our unstructured PDF data in binary format.


In [0]:
%sql
CREATE VOLUME IF NOT EXISTS volume_databricks_documentation;

In [0]:
# List our raw PDF docs
volume_folder =  f"/Volumes/{catalog}/{db}/volume_databricks_documentation"
#Let's upload some pdf to our volume as example
upload_pdfs_to_volume(volume_folder+"/databricks-pdf")

display(dbutils.fs.ls(volume_folder+"/databricks-pdf"))

In [0]:
df = (spark.readStream
        .format('cloudFiles')
        .option('cloudFiles.format', 'BINARYFILE')
        .load('dbfs:'+volume_folder+"/databricks-pdf"))

# Write the data as a Delta table
(df.writeStream
  .trigger(availableNow=True)
  .option("checkpointLocation", f'dbfs:{volume_folder}/checkpoints/raw_docs')
  .table('pdf_raw').awaitTermination())

In [0]:
%sql SELECT * FROM pdf_raw LIMIT 2

<img src="https://github.com/prasadkona/databricks_demos/blob/main/images/llm-rag-full-pinecone-3.png?raw=true" style="float: right" width="600px">

## Extracting our PDF content as text chunk

We need to convert the pdf documents bytes as text, and extract chunks from their content.

This part can be tricky as pdf are hard to work with and can be saved as images, for which we'll need an OCR to extract the text.

Using the `Unstructured` library within a Spark UDF makes it easy to extract text. 

*Note: Your cluster will need a few extra libraries that you would typically install with a cluster init script.*

<br style="clear: both">

### Splitting our big documentation page in smaller chunks

<img src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/product/chatbot-rag/chunk-window-size.png?raw=true" style="float: right" width="700px">

In this demo, some PDF can be really big, with a lot of text.

We'll extract the content and then use llama_index `SentenceSplitter`, and ensure that each chunk isn't bigger than 500 tokens. 


The chunk size and chunk overlap depend on the use case and the PDF files. 

Remember that your prompt+answer should stay below your model max window size (4096 for llama2). 

For more details, review the previous [../01-Data-Preparation](01-Data-Preparation) notebook. 

<br/>
<br style="clear: both">
<div style="background-color: #def2ff; padding: 15px;  border-radius: 30px; ">
  <strong>Information</strong><br/>
  Remember that the following steps are specific to your dataset. This is a critical part to building a successful RAG assistant.
  <br/> Always take time to review the chunks created and ensure they make sense, containing relevant informations.
</div>

In [0]:
import warnings
from pypdf import PdfReader

def parse_bytes_pypdf(raw_doc_contents_bytes: bytes):
    try:
        pdf = io.BytesIO(raw_doc_contents_bytes)
        reader = PdfReader(pdf)
        parsed_content = [page_content.extract_text() for page_content in reader.pages]
        return "\n".join(parsed_content)
    except Exception as e:
        warnings.warn(f"Exception {e} has been thrown during parsing")
        return None

Let's start by extracting text from our PDF.

In [0]:
import io
import re
with requests.get('https://github.com/databricks-demos/dbdemos-dataset/blob/main/llm/databricks-pdf-documentation/Databricks-Customer-360-ebook-Final.pdf?raw=true') as pdf:
  doc = parse_bytes_pypdf(pdf.content)  
  print(doc)

This looks great. We'll now wrap it with a text_splitter to avoid having too big pages, and create a Pandas UDF function to easily scale that across multiple nodes.

*Note that our pdf text isn't clean. To make it nicer, we could imagine a few extra LLM-based pre-processing steps, asking to remove unrelevant content like the list of chapters to only keep the meat of the text.*

In [0]:
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core import Document, set_global_tokenizer
from transformers import AutoTokenizer
from typing import Iterator

# Reduce the arrow batch size as our PDF can be big in memory
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 10)

@pandas_udf("array<string>")
def read_as_chunk(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    #set llama2 as tokenizer to match our model size (will stay below gte 1024 limit)
    set_global_tokenizer(
      AutoTokenizer.from_pretrained("hf-internal-testing/llama-tokenizer")
    )
    #Sentence splitter from llama_index to split on sentences
    splitter = SentenceSplitter(chunk_size=500, chunk_overlap=10)
    def extract_and_split(b):
      txt = parse_bytes_pypdf(b)
      if txt is None:
        return []
      nodes = splitter.get_nodes_from_documents([Document(text=txt)])
      return [n.text for n in nodes]

    for x in batch_iter:
        yield x.apply(extract_and_split)

## What's required for Pinecone Vector Database


In this demo, we will show you how to use pinecone as your vector database

We will first compute the embeddings of our chunks and save them as a Delta Lake table field as `array&ltfloat&gt`

## Introducing Databricks BGE Embeddings Foundation Model endpoints

<img src="https://github.com/prasadkona/databricks_demos/blob/main/images/llm-rag-full-pinecone-5.png?raw=true" style="float: right; width: 600px; margin-left: 10px">

Foundation Models are provided by Databricks, and can be used out-of-the-box.

Databricks supports several endpoint types to compute embeddings or evaluate a model:
- A **foundation model endpoint**, provided by databricks (ex: llama2-70B, MPT...)
- An **external endpoint**, acting as a gateway to an external model (ex: Azure OpenAI)
- A **custom**, fined-tuned model hosted on Databricks model service

Open the [Model Serving Endpoint page](/ml/endpoints) to explore and try the foundation models.

For this demo, we will use the foundation model `BGE` (embeddings) and `llama2-70B` (chat). <br/><br/>

<img src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/product/chatbot-rag/databricks-foundation-models.png?raw=true" width="600px" >

In [0]:
from mlflow.deployments import get_deploy_client

# bge-large-en Foundation models are available using the /serving-endpoints/databricks-bge-large-en/invocations api. 
deploy_client = get_deploy_client("databricks")

## NOTE: if you change your embedding model here, make sure you change it in the query step too
embeddings = deploy_client.predict(endpoint="databricks-bge-large-en", inputs={"input": ["What is Apache Spark?"]})
print(embeddings)

In [0]:
%sql
--Note that we need to enable Change Data Feed on the table to create the index
CREATE TABLE IF NOT EXISTS databricks_pdf_documentation (
  id BIGINT GENERATED BY DEFAULT AS IDENTITY,
  url STRING,
  content STRING,
  metadata STRING,
  embedding ARRAY <FLOAT>
) TBLPROPERTIES (delta.enableChangeDataFeed = true); 

### Computing the chunk embeddings and saving them to our Delta Table

The last step is to now compute an embedding for all our documentation chunks. Let's create an udf to compute the embeddings using the foundation model endpoint.

*Note that this part would typically be setup as a production-grade job, running as soon as a new documentation page is updated. <br/> This could be setup as a Delta Live Table pipeline to incrementally consume updates.*

In [0]:
import pandas as pd
import numpy as np
from pyspark.sql.functions import pandas_udf,PandasUDFType, udf

@pandas_udf("array<float>")
def get_embedding(contents: pd.Series) -> pd.Series:
    import mlflow.deployments
    deploy_client = mlflow.deployments.get_deploy_client("databricks")
    def get_embeddings(batch):
        #Note: this will gracefully fail if an exception is thrown during embedding creation (add try/except if needed) 
        response = deploy_client.predict(endpoint="databricks-bge-large-en", inputs={"input": batch})
        return [e['embedding'] for e in response.data]

    # Splitting the contents into batches of 150 items each, since the embedding model takes at most 150 inputs per request.
    max_batch_size = 150
    batches = [contents.iloc[i:i + max_batch_size] for i in range(0, len(contents), max_batch_size)]

    # Process each batch and collect the results
    all_embeddings = []
    for batch in batches:
        all_embeddings += get_embeddings(batch.tolist())

    return pd.Series(all_embeddings)

In [0]:
import json
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType
import pandas as pd

@pandas_udf(StringType())
def create_metadata_json_string(original_col: pd.Series) -> pd.Series:
    json_dict = {'original_doc': original_col}

    return pd.Series(json.dumps(json_dict))

#df = df.withColumn('json_col', create_metadata_json_string('input_col'))



In [0]:
# UDF for embedding
from pyspark.sql.types import *
def get_embedding_for_string(text):
    response = deploy_client.predict(endpoint="databricks-bge-large-en", inputs={"input": text})
    e = response.data
    return e[0]['embedding']

get_embedding_for_string_udf = udf(get_embedding_for_string, ArrayType(FloatType()))
print(get_embedding_for_string("What is a lakehouse ?"))

In [0]:
# Delete checkpoint for the pdf_raw table streaming query
#dbutils.fs.rm(f'{folder}/checkpoints/pdf_chunks_{catalog}_{db}', True)

# Delete checkpoint for the databricks_documentation table streaming query
#dbutils.fs.rm(f'{folder}/checkpoints/docs_chunks_{catalog}_{db}', True)

In [0]:
(spark.readStream.table('pdf_raw')
      .withColumn("content", F.explode(read_as_chunk("content")))
      .withColumn("embedding", get_embedding("content"))
      .withColumn("metadata", create_metadata_json_string("content") )
      #.selectExpr('path as url', 'content', 'embedding','metadata')
      .selectExpr('path as url', 'content', 'embedding')
  .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", f'{folder}/checkpoints/pdf_chunks{catalog}_{db}')
    .table('databricks_pdf_documentation').awaitTermination())

#Let's also add our documentation web page from the simple demo (make sure you run the simple demo for it to work)
if spark.catalog.tableExists(f'{catalog}.{db}.databricks_documentation'):
  (spark.readStream.table('databricks_documentation')
      .withColumn('embedding', get_embedding("content"))
      .withColumn("metadata", create_metadata_json_string("content") )
      #.select('url', 'content', 'embedding','metadata')
      .select('url', 'content', 'embedding')
  .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", f'{folder}/checkpoints/docs_chunks_{catalog}_{db}')
    .table('databricks_pdf_documentation').awaitTermination())

In [0]:
%sql
SELECT * FROM databricks_pdf_documentation WHERE url like '%.pdf' limit 10

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, FloatType, IntegerType  
schema = StructType([  
    StructField("id",StringType(),True),  
    StructField("values",ArrayType(FloatType()),True),  
    StructField("namespace",StringType(),True),  
    StructField("metadata", StringType(), True),  
    StructField("sparse_values", StructType([  
        StructField("indices", ArrayType(IntegerType(), False), False),  
        StructField("values", ArrayType(FloatType(), False), False)  
    ]), True)  
])  
#embeddings_df = spark.createDataFrame(data=embeddings,schema=schema)  


In [0]:
from pyspark.sql.functions import col, lit, struct, to_json
from pyspark.sql.functions import encode

df = spark.table('databricks_pdf_documentation')\
            .withColumn("metadata", to_json(struct(col("content"), col("url"), col("id"))))\
            .withColumn("namespace", lit("dbdemo-namespace")) \
            .withColumn("values", col("embedding")) \
            .withColumn("sparse_values", lit(None)) \
            .select("id", "values", "namespace", "metadata", "sparse_values")

display(df.count())

# Print the valid JSON
display(df.limit(2))

In [0]:
# If you dont know the embedding array size, use the below to determine the embedding array size.
# The embedding array size varies based on the model used for converting a string to an embedding array
# Note: Login to pinecone, Set the pinecone vector index to have the size of the embedding array 

from pyspark.sql.functions import size

df2 = df.withColumn('array_col_len', size('values'))
display(df2.limit(1))



In [0]:
# Initialize pinecone variables

api_key = dbutils.secrets.get("pinecone_secrets_scope", "PINECONE_API_KEY")
project_name = "Starter"
index_name = "dbdemo-index"



In [0]:
(  
    df.write  
    .option("pinecone.apiKey", api_key) 
    .option("pinecone.indexName", index_name)  
    .format("io.pinecone.spark.pinecone.Pinecone")  
    .mode("append")  
    .save()  
)  



### Our dataset is now ready! and is available for query via Pinecone

Our dataset is now ready. We chunked the documentation page in small section, computed the embeddings and saved it as a Delta Lake table and ingested it into the Pinecone vector database

## Searching for similar content

Let's give it a try and search for similar content. Lets get the top n results 



In [0]:

# connect to pinecone index
from pinecone import Pinecone

pc = Pinecone(api_key=api_key)
index = pc.Index(index_name)


In [0]:
question = "How can I track billing usage on my workspaces?"

# create the query embedding
xq = get_embedding_for_string(question)

# query pinecone the top 5 most similar results
query_response = index.query(
    namespace='dbdemo-namespace',
    top_k=5,
    include_values=True,
    include_metadata=True,
    vector=xq
)

#print(query_response)

query_response_docs = []
for match in query_response['matches']:
    query_response_docs.append([match['metadata']['url'],match['metadata']['content'],match['score']])

print(query_response_docs)

[['dbfs:/Volumes/prasad_kona_dev/rag_chatbot_prasad_kona/volume_databricks_documentation/databricks-pdf/big-book-of-data-engineering-2nd-edition-final.pdf', 'There is a Ganglia dashboard at the cluster level, integrated partner \napplications like Datadog  for monitoring streaming workloads, or even more open \nsource options you can build using tools like Prometheus and Grafana. Each \nhas advantages and disadvantages to consider around cost, performance, and \nmaintenance requirements.\nWhether you have low volumes of streaming workloads where interactions in the \nUI are sufficient or have decided to invest in a more robust monitoring platform, \nyou should know how to observe your production streaming workloads. Further \n“Monitoring and Alerting” posts later in this series will contain a more thorough \ndiscussion. In particular, we’ll see different measures on which to monitor \nstreaming applications and then later take a deeper look at some of the tools \nyou can leverage for o

## Next step: Deploy our chatbot model with RAG

We've seen how Databricks Lakehouse AI makes it easy to ingest and prepare your documents, and write to Pinecone vector database.

This simplifies and accelerates your data projects so that you can focus on the next step: creating your realtime chatbot endpoint with well-crafted prompt augmentation.

Open the [02-Advanced-Chatbot-Chain]($./02-Advanced-Chatbot-Chain) notebook to create and deploy a chatbot endpoint.