
📘 **Introduction**  
This Jupyter Notebook demonstrates the integration of Databricks and LangChain to build an advanced pipeline for document ingestion, text chunking, embedding, vector search, and LLM-powered question answering.

🧩 **Objective**  
The primary objective of this notebook is to extract information from a PDF document, transform the content into searchable text chunks, and enable semantic search and LLM-driven question answering using Databricks-hosted large language models.

🛠️ **Problem Description**  
With the increasing volume of unstructured data (e.g., PDFs), traditional keyword search methods are often inadequate. This notebook addresses this challenge by leveraging vector search and LLM inference, allowing users to ask natural language questions and receive context-aware answers.

📂 **Datasets and Sources**  
- **Input:** A PDF file stored in Unity Catalog Volumes  
- **Processing:** The PDF is parsed into raw text using PyMuPDF, then chunked for embedding using LangChain's `RecursiveCharacterTextSplitter`.  
- **Storage & Indexing:** The processed chunks are stored in a Delta Lake table and indexed using Databricks Mosaic AI Vector Search.

🧰 **Libraries & Tools**  
- `databricks-sdk`, `databricks-langchain`, `langchain`, `langchain_core`, `PyMuPDF`
- Databricks LLMs (Meta LLaMA 3) and embedding model (`databricks-gte-large-en`)

📋 **Notebook Structure**  
1. **Environment Setup:** Install required packages and initialize services  
2. **LLM Initialization:** Configure and test the Databricks-hosted LLaMA-3 model  
3. **PDF Processing:** Load and extract raw text using PyMuPDF  
4. **Text Chunking:** Split extracted text into semantically meaningful segments  
5. **Vector Indexing:** Store and index chunks using Databricks Vector Search  
6. **Semantic Search:** Retrieve relevant chunks based on user queries  
7. **LLM Inference:** Use retrieved chunks to generate intelligent answers

In [0]:
# Install required libraries for Databricks, LangChain, MLflow, and related tools
%pip install -U --quiet databricks-sdk==0.49.0 "databricks-langchain>=0.4.0" databricks-agents mlflow[databricks] databricks-vectorsearch==0.55 langchain==0.3.25 langchain_core==0.3.59 bs4==0.0.2 markdownify==0.14.1 pydantic==2.10.1 mlflow openai PyMuPDF

In [0]:
# Restart the Python process to ensure all installed libraries are properly loaded
dbutils.library.restartPython()

In [0]:
from databricks_langchain import ChatDatabricks

# Initialize the ChatDatabricks model with specified parameters
chat_model = ChatDatabricks(
    endpoint="databricks-meta-llama-3-3-70b-instruct",  # Endpoint for the chat model
    temperature=0.1,  # Controls the randomness of the response
    max_tokens=250,  # Maximum number of tokens in the response
)

# Invoke the model with a query
chat_model.invoke("Who is data fudiciary?")

In [0]:
# Path to the PDF file to be processed
file_path = '/Volumes/workspace/default/raw_files/dpact.pdf'

In [0]:
import fitz  # PyMuPDF

# Read the PDF file as bytes from the specified path
with open(file_path, "rb") as f:
    pdf_bytes = f.read()

# Open the PDF using PyMuPDF from bytes
doc = fitz.open("pdf", pdf_bytes)

# Extract text from all pages and concatenate into a single string
text = ""
for page in doc:
    text += page.get_text()

# Print the extracted text (for debugging or inspection)
print(text)  # Print first 1000 characters

# Example code for reading CSV, Parquet, or JSON files using Spark DataFrames:
# df = spark.read.option("header", "true").csv("/Volumes/<catalog>/<schema>/<volume>/filename.csv")
# df = spark.read.parquet("/Volumes/<catalog>/<schema>/<volume>/filename.parquet")
# df = spark.read.option("multiline", "true").json("/Volumes/<catalog>/<schema>/<volume>/filename.json")

In [0]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
import os

# Load documents (e.g., text files or PDFs from DBFS or local)
raw_text = text  # Text extracted from the PDF in the previous cell

# Chunk documents for embedding
# Initialize the text splitter with a chunk size of 500 characters and an overlap of 100 characters
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100)

# Create documents by splitting the raw text into chunks
docs = text_splitter.create_documents([raw_text])

In [0]:
# Calculate the number of document chunks created
len(docs)

In [0]:
docs[0]

In [0]:
import pandas as pd  # Import pandas library

# Convert docs to a list of dicts for display
pd_docs = pd.DataFrame([doc.dict() for doc in docs])

In [0]:
# Add an 'id_pk' column as the first column with sequential IDs
pd_docs.insert(0, "id_pk", range(1, len(pd_docs) + 1))

In [0]:
display(pd_docs)  # Display the DataFrame

In [0]:
# Convert the Pandas DataFrame to a Spark DataFrame with only 'id_pk' and 'page_content' columns
spark_df = spark.createDataFrame(pd_docs[['id_pk','page_content']])

# Write the Spark DataFrame to a Delta table named 'dpact_chunks' in the 'workspace.default' schema
# Overwrite the table if it exists and update the schema if necessary
spark_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("workspace.default.dpact_chunks")

In [0]:
# Import the VectorSearchClient from the databricks.vector_search.client module
from databricks.vector_search.client import VectorSearchClient

# Instantiate the VectorSearchClient
client = VectorSearchClient()

In [0]:
# Enable change data feed for the source table to track row-level changes
spark.sql("""
    ALTER TABLE workspace.default.dpact_chunks
    SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

# Delete the existing vector search endpoint if it exists
client.delete_endpoint(name="vs_endpoint")

# Create a new vector search endpoint of type STANDARD
client.create_endpoint(
    name="vs_endpoint",
    endpoint_type="STANDARD"
)

# Create a delta sync index for vector search:
# - Uses 'workspace.default.dpact_chunks' as the source table
# - Embeddings are generated from the 'page_content' column
# - 'id_pk' is used as the primary key
# - Index is created on the 'vs_endpoint'
# - Embedding model endpoint: 'databricks-gte-large-en'
# - Pipeline type is set to TRIGGERED (manual sync)
vs_index = client.create_delta_sync_index(
    endpoint_name="vs_endpoint",
    index_name="workspace.default.document_index",
    source_table_name="workspace.default.dpact_chunks",
    embedding_source_column="page_content",
    primary_key="id_pk",
    pipeline_type="TRIGGERED",
    embedding_model_endpoint_name="databricks-gte-large-en"
)

In [0]:
# List all existing vector search endpoints
client.list_endpoints()

In [0]:
# List all indexes on the 'vs_endpoint' vector search endpoint
client.list_indexes("vs_endpoint")

In [0]:
# Retrieve the index named 'workspace.default.document_index' from the vector search endpoint
index = client.get_index(index_name="workspace.default.document_index")

# Describe the retrieved index to get its metadata and details
index.describe()

In [0]:
# Perform a similarity search on the vector index for the query "Who is Data Fudiciary?"
# Retrieve the top 2 most similar results, selecting only the 'id_pk' and 'page_content' columns
results_dict = index.similarity_search(
    query_text="Who is Data Fudiciary?",
    columns=["id_pk", "page_content"],
    num_results=2,
)

# Display the search results in a rich tabular format
display(results_dict)

In [0]:
# Prepare the input string for the chat model by combining a prompt with the search results
input = "Answer Question " + str(results_dict) + " Question: Who is the data Fudiciary?"

In [0]:
# Import the ChatDatabricks class for interacting with Databricks-hosted LLMs
from databricks_langchain import ChatDatabricks

# Initialize the chat model with the specified endpoint and parameters
chat_model = ChatDatabricks(
    endpoint="databricks-meta-llama-3-3-70b-instruct",  # LLM endpoint name
    temperature=0.1,  # Controls randomness of responses
    max_tokens=250,   # Maximum tokens in the response
)

# Invoke the chat model with the prepared input string and return the response
chat_model.invoke(str(input))