In [0]:
%pip install python-docx PyPDF2 langchain-text-splitters



[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
dbutils.library.restartPython

<bound method DBUtils.LibraryHandler.restartPython of Package 'dbutils.library'.>

In [0]:
import os
import hashlib
from datetime import datetime
from docx import Document as DocxDocument
from PyPDF2 import PdfReader
from langchain_text_splitters import RecursiveCharacterTextSplitter

# Location of your documents in Databricks Volume
VOLUME_PATH = "/Volumes/sco_dev/sarang84/pdc_app"

# Output Delta table
SCHEMA = "sco_dev"
USER = "sarang84"
TABLE = "pdc_RAG_Demo"
FULL_TABLE_NAME = f"{SCHEMA}.{USER}.{TABLE}"

# Chunk Config
CHUNK_SIZE = 1000
CHUNK_OVERLAP = 200

splitter = RecursiveCharacterTextSplitter(
    chunk_size=CHUNK_SIZE,
    chunk_overlap=CHUNK_OVERLAP
)

# ------------------------------
# READER FUNCTIONS
# ------------------------------
def read_txt(path):
    with open(path, "r", encoding="utf-8", errors="ignore") as f:
        return f.read()

def read_docx(path):
    doc = DocxDocument(path)
    return "\n".join([p.text for p in doc.paragraphs])

def read_pdf(path):
    reader = PdfReader(path)
    text = ""
    for page in reader.pages:
        text += page.extract_text() or ""
    return text

# ------------------------------
# LOAD DOCUMENTS
# ------------------------------
all_docs = []

base_path = VOLUME_PATH  

print("🔍 Scanning directory:", base_path)

for root, dirs, files in os.walk(base_path):
    for file in files:
        full_path = os.path.join(root, file)
        ext = file.lower().split(".")[-1]

        print(f"\n📄 Found file: {file} ({ext})")

        try:
            if ext == "txt":
                text = read_txt(full_path)
            elif ext == "docx":
                text = read_docx(full_path)
            elif ext == "pdf":
                text = read_pdf(full_path)
            else:
                print(f"⏭️ Skipping unsupported file: {file}")
                continue

            # Store document entry
            cleaned_path = full_path.replace("/dbfs", "")
            all_docs.append({
                "source": cleaned_path,
                "text": text
            })

            # Print preview
            print(f"✅ Loaded: {cleaned_path}")
            print("   Preview:", text[:200].replace("\n", " "), "...\n")

        except Exception as e:
            print(f"❌ Error reading {file}: {e}")

# ------------------------------
# SUMMARY
# ------------------------------
print("\n==============================")
print("📦 DOCUMENT LOADING SUMMARY")
print("==============================")
print(f"Total files loaded: {len(all_docs)}")

for i, doc in enumerate(all_docs):
    print(f"{i+1}. {doc['source']} (length={len(doc['text'])} chars)")


🔍 Scanning directory: /Volumes/sco_dev/sarang84/pdc_app

📄 Found file: J2.05.P05.M02_Change Management Approver and Observer Matrix_R41 (1).docx (docx)
✅ Loaded: /Volumes/sco_dev/sarang84/pdc_app/J2.05.P05.M02_Change Management Approver and Observer Matrix_R41 (1).docx
   Preview:    J2.05.P05.M02  Global Change Management Approver and Observer Matrix               What is the goal of this work instruction.  Be as descriptive as possible.101.1. PURPOSE: The purpose of this docu ...


📄 Found file: J2.05.P05.W01 ECO_WI_R52.doc (doc)
⏭️ Skipping unsupported file: J2.05.P05.W01 ECO_WI_R52.doc

📄 Found file: J2.05.P07.W01_MCO_WI_R63.docx (docx)
✅ Loaded: /Volumes/sco_dev/sarang84/pdc_app/J2.05.P07.W01_MCO_WI_R63.docx
   Preview:        J2.05.P07.W01,  Business Process, MCO Work Instructions     PURPOSE:  The purpose of this document is to describe the process necessary to create, submit, route, review, approve and release a M ...


📄 Found file: J2.05.P07_R20__MCO.pdf (pdf)
✅ Loaded: /Volu

In [0]:
# ------------------------------
# CHUNK DOCUMENTS
# ------------------------------
all_chunks = []

for doc in all_docs:
    chunks = splitter.split_text(doc["text"])

    for chunk in chunks:
        all_chunks.append({
            "source": doc["source"],
            "chunk": chunk,
            "chunk_id": hashlib.md5(chunk.encode()).hexdigest(),
            "ingested_at": datetime.now()
        })

print("\n==============================")
print("🧩 CHUNKING SUMMARY")
print("==============================")
print(f"Total chunks generated: {len(all_chunks)}")



🧩 CHUNKING SUMMARY
Total chunks generated: 215


In [0]:
df_chunks = spark.createDataFrame(all_chunks)
df_chunks.show(5, truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------+-------

In [0]:
df_chunks.write.mode("overwrite").format("delta").saveAsTable(FULL_TABLE_NAME)


In [0]:
spark.sql(f"SELECT COUNT(*) FROM {FULL_TABLE_NAME}").show()


+--------+
|COUNT(*)|
+--------+
|     215|
+--------+



In [0]:
pip install databricks-sdk --upgrade


[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
dbutils.library.restartPython()