## Data Prep for Multi-Modal RAG (Structured and Unstructured Data)

![Data_Prep](./Assets/Data_Prep.png)

### Installing Libraries and Utilities

In [None]:
%pip install numpy==1.26.4 openai==1.69.0 langchain-community==0.4.1 PyPDF2==3.0.1

### Restarting Python

In [None]:
dbutils.library.restartPython()

### Saving Images and Docs in DBFS and Local File System

In [None]:
import os

source_folder = "./knowledge"
dbfs_target_folder = "dbfs:/tmp/knowledge"

os.makedirs(source_folder, exist_ok=True)
dbutils.fs.mkdirs(dbfs_target_folder)

subfolders = ['docs', 'images']
for subfolder in subfolders:
    local_subfolder = os.path.join(source_folder, subfolder)
    dbfs_subfolder = f"{dbfs_target_folder}/{subfolder}"
    os.makedirs(local_subfolder, exist_ok=True)
    dbutils.fs.mkdirs(dbfs_subfolder)
    for filename in os.listdir(local_subfolder):
        src = os.path.join(local_subfolder, filename)
        dst = f"{dbfs_subfolder}/{filename}"
        if os.path.isfile(src) and os.path.getsize(src) > 0:
            dbutils.fs.cp(f"file:{os.path.abspath(src)}", dst, recurse=False)

### Verfy Data is Stored in Local File System

In [None]:
print(os.listdir("./knowledge/"))
print(" \n images: ")
print(os.listdir("./knowledge/images/"))
print(" \n docs: ")
print(os.listdir("./knowledge/docs/"))

### Verify Data is Stored in DBFS mounted by ADLS

In [None]:
print(dbutils.fs.ls("dbfs:/tmp/knowledge/"))
print(" \n images: ")
print(dbutils.fs.ls("dbfs:/tmp/knowledge/images/"))
print(" \n docs: ")
print(dbutils.fs.ls("dbfs:/tmp/knowledge/docs/"))

### Creating the RAG Table Schema in Unity Catalog

In [None]:
%sql
CREATE SCHEMA IF NOT EXISTS YOUR_UNITY_CATALOG_NAME.RAG

### Creating an Image Table to Store Base64 Encoding of Images

In [None]:
images_df = spark.read.format("binaryFile").load("dbfs:/tmp/knowledge/images/")
images_df.createOrReplaceTempView("images_temp")

spark.sql("""
CREATE OR REPLACE TABLE RAG.images_metadata AS
SELECT
  path AS content_path,
  base64(content) AS base64_content
FROM images_temp
""")

display(spark.table("RAG.images_metadata"))

### Creating a Table with LLM Image Verbalisation

In [None]:
spark.sql("""
CREATE OR REPLACE TABLE RAG.images_verbalization AS
SELECT
  *,
  ai_query(
    'databricks-llama-4-maverick',
    'what is this image about?', files => unbase64(base64_content)
  ) AS chunk
FROM RAG.images_metadata
""")

display(spark.table("RAG.images_verbalization"))

### Extracting PDF Content and Storing as Table with Langchain Chunking Strategy

In [None]:
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from PyPDF2 import PdfReader

def perform_fixed_size_chunking(document, chunk_size=2000, chunk_overlap=500):
    """
    Performs recursive chunking on a document with specified overlap.
    Uses RecursiveCharacterTextSplitter which tries multiple separators.
    """
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len,
        separators=["\n\n", "\n", ". ", " ", ""]
    )
    return text_splitter.split_text(document)

import os

docs_folder = "./knowledge/docs"
dbfs_docs_folder = "dbfs:/tmp/knowledge/docs"
all_docs = []

for filename in os.listdir(docs_folder):
    file_path = os.path.join(docs_folder, filename)
    dbfs_path = f"{dbfs_docs_folder}/{filename}"
    if os.path.isfile(file_path) and filename.lower().endswith(".pdf"):
        with open(file_path, "rb") as f:
            reader = PdfReader(f)
            text = ""
            for page in reader.pages:
                page_text = page.extract_text()
                if page_text:
                    text += page_text
            if text.strip():
                chunks = perform_fixed_size_chunking(text)
                for i, chunk in enumerate(chunks):
                    if chunk.strip():
                        all_docs.append({
                            "content_path": dbfs_path,
                            "chunk": chunk
                        })

if all_docs:
    df = spark.createDataFrame(all_docs)
    print(f"Total chunks created: {df.count()}")
    print(f"\nChunks per document:")
    df.groupBy("content_path").count().show(truncate=False)
    display(df)
else:
    print("No chunks extracted from documents.")

In [None]:
df.write.mode("overwrite").saveAsTable("RAG.docs_chunks")

### Creating the Final Multi-Modal RAG Table

In [None]:
spark.sql("""
CREATE OR REPLACE TABLE RAG.final_rag_dataset AS
SELECT monotonically_increasing_id() AS id, content_path, chunk FROM RAG.images_verbalization
UNION ALL
SELECT monotonically_increasing_id() AS id, content_path, chunk FROM RAG.docs_chunks
""")

display(spark.table("RAG.final_rag_dataset"))