In [0]:
%pip install sentence-transformers

Collecting sentence-transformers
  Downloading sentence_transformers-5.1.2-py3-none-any.whl.metadata (16 kB)
Collecting transformers<5.0.0,>=4.41.0 (from sentence-transformers)
  Downloading transformers-4.57.3-py3-none-any.whl.metadata (43 kB)
Collecting tqdm (from sentence-transformers)
  Downloading tqdm-4.67.1-py3-none-any.whl.metadata (57 kB)
Collecting torch>=1.11.0 (from sentence-transformers)
  Downloading torch-2.9.1-cp312-cp312-manylinux_2_28_aarch64.whl.metadata (30 kB)
Collecting huggingface-hub>=0.20.0 (from sentence-transformers)
  Downloading huggingface_hub-1.1.5-py3-none-any.whl.metadata (13 kB)
Collecting hf-xet<2.0.0,>=1.2.0 (from huggingface-hub>=0.20.0->sentence-transformers)
  Downloading hf_xet-1.2.0-cp37-abi3-manylinux_2_28_aarch64.whl.metadata (4.9 kB)
Collecting shellingham (from huggingface-hub>=0.20.0->sentence-transformers)
  Downloading shellingham-1.5.4-py2.py3-none-any.whl.metadata (3.5 kB)
Collecting typer-slim (from huggingface-hub>=0.20.0->sentence-tr

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pyarrow as pa
from sentence_transformers import SentenceTransformer
import numpy as np
import time

config = {
    "chunk_size": 250,
    "chunk_overlap": 50,
    "embedding_model": "all-MiniLM-L6-v2",
    "k_retrieval": 3,
    "logging_enabled": True
}

CHUNK_SIZE     = config["chunk_size"]
CHUNK_OVERLAP  = config["chunk_overlap"]
MODEL_NAME     = config["embedding_model"]
K_RETRIEVAL    = config["k_retrieval"]

print("=== DeltaRAG TABLE-ONLY CONFIG ===")
for k, v in config.items():
    print(f"{k}: {v}")


=== DeltaRAG TABLE-ONLY CONFIG ===
chunk_size: 250
chunk_overlap: 50
embedding_model: all-MiniLM-L6-v2
k_retrieval: 3
logging_enabled: True


In [0]:
# Our "raw docs" – no filesystem, just a Python list
docs = [
    ("doc1.txt", "Delta Lake provides ACID transactions, time travel, and schema enforcement."),
    ("doc2.txt", "Databricks lets data engineers build scalable ETL, ML, and RAG pipelines."),
    ("doc3.txt", "RAG retrieves relevant chunks before sending to an LLM for answer generation."),
    ("doc4.txt", "PyArrow and Parquet improve analytics and ML workloads with columnar formats.")
]

bronze_df = (
    spark.createDataFrame(docs, ["file_name", "raw_text"])
         .withColumn("raw_id", monotonically_increasing_id())
         .withColumn("load_ts", current_timestamp())
         .withColumn("content_hash", sha2(col("raw_text"), 256))
)

bronze_df.write.format("delta").mode("overwrite").saveAsTable("delta_rag_bronze")

spark.table("delta_rag_bronze").show(truncate=False)


+---------+-----------------------------------------------------------------------------+------+--------------------------+----------------------------------------------------------------+
|file_name|raw_text                                                                     |raw_id|load_ts                   |content_hash                                                    |
+---------+-----------------------------------------------------------------------------+------+--------------------------+----------------------------------------------------------------+
|doc1.txt |Delta Lake provides ACID transactions, time travel, and schema enforcement.  |0     |2025-11-27 22:18:56.088695|9b3967813593ed033ef84b6c9d52a3b755ce68ae9569159022df4e698750e8ce|
|doc2.txt |Databricks lets data engineers build scalable ETL, ML, and RAG pipelines.    |1     |2025-11-27 22:18:56.088695|cf946a9f8c04296e0f1f60279ad67f57e3a9f8c47a4e4f4522d6be031947cc94|
|doc3.txt |RAG retrieves relevant chunks before sending

In [0]:
silver_df = (
    spark.table("delta_rag_bronze")
         .select(
             "raw_id",
             "file_name",
             trim(lower(col("raw_text"))).alias("clean_text"),
             "load_ts",
             "content_hash"
         )
         .filter(length("clean_text") > 0)
)

silver_df.write.format("delta").mode("overwrite").saveAsTable("delta_rag_silver")

spark.table("delta_rag_silver").show(truncate=False)


+------+---------+-----------------------------------------------------------------------------+--------------------------+----------------------------------------------------------------+
|raw_id|file_name|clean_text                                                                   |load_ts                   |content_hash                                                    |
+------+---------+-----------------------------------------------------------------------------+--------------------------+----------------------------------------------------------------+
|0     |doc1.txt |delta lake provides acid transactions, time travel, and schema enforcement.  |2025-11-27 22:18:56.088695|9b3967813593ed033ef84b6c9d52a3b755ce68ae9569159022df4e698750e8ce|
|1     |doc2.txt |databricks lets data engineers build scalable etl, ml, and rag pipelines.    |2025-11-27 22:18:56.088695|cf946a9f8c04296e0f1f60279ad67f57e3a9f8c47a4e4f4522d6be031947cc94|
|2     |doc3.txt |rag retrieves relevant chunks before 

In [0]:
silver_df = (
    spark.table("delta_rag_bronze")
         .select(
             "raw_id",
             "file_name",
             trim(lower(col("raw_text"))).alias("clean_text"),
             "load_ts",
             "content_hash"
         )
         .filter(length("clean_text") > 0)
)

silver_df.write.format("delta").mode("overwrite").saveAsTable("delta_rag_silver")

spark.table("delta_rag_silver").show(truncate=False)


+------+---------+-----------------------------------------------------------------------------+--------------------------+----------------------------------------------------------------+
|raw_id|file_name|clean_text                                                                   |load_ts                   |content_hash                                                    |
+------+---------+-----------------------------------------------------------------------------+--------------------------+----------------------------------------------------------------+
|0     |doc1.txt |delta lake provides acid transactions, time travel, and schema enforcement.  |2025-11-27 22:18:56.088695|9b3967813593ed033ef84b6c9d52a3b755ce68ae9569159022df4e698750e8ce|
|1     |doc2.txt |databricks lets data engineers build scalable etl, ml, and rag pipelines.    |2025-11-27 22:18:56.088695|cf946a9f8c04296e0f1f60279ad67f57e3a9f8c47a4e4f4522d6be031947cc94|
|2     |doc3.txt |rag retrieves relevant chunks before 

In [0]:
silver_pdf = spark.table("delta_rag_silver").toPandas()

arrow_table = pa.Table.from_pandas(silver_pdf)
print(arrow_table)  # just to prove it's created

# In a real production env you would write arrow_table to cloud storage as Parquet.
# Here we keep it in memory because filesystem writes are restricted.


pyarrow.Table
raw_id: int64
file_name: string
clean_text: string
load_ts: timestamp[ns]
content_hash: string
----
raw_id: [[0,1,2,3]]
file_name: [["doc1.txt","doc2.txt","doc3.txt","doc4.txt"]]
clean_text: [["delta lake provides acid transactions, time travel, and schema enforcement.","databricks lets data engineers build scalable etl, ml, and rag pipelines.","rag retrieves relevant chunks before sending to an llm for answer generation.","pyarrow and parquet improve analytics and ml workloads with columnar formats."]]
load_ts: [[2025-11-27 22:18:56.088695000,2025-11-27 22:18:56.088695000,2025-11-27 22:18:56.088695000,2025-11-27 22:18:56.088695000]]
content_hash: [["9b3967813593ed033ef84b6c9d52a3b755ce68ae9569159022df4e698750e8ce","cf946a9f8c04296e0f1f60279ad67f57e3a9f8c47a4e4f4522d6be031947cc94","cdc6df301aa2629905b5f13e8637f739db897576bdf09b4f571e4d2109880cc7","42de3a7c269a8537bc3bf69b407b86fa12864009e6d436762a165f451d823107"]]


In [0]:
import builtins  # to call built-in min safely
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import udf, posexplode, col

CHUNK_SIZE = 1000
CHUNK_OVERLAP = 100

def smart_chunk(text: str, max_chars: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP):
    # Use builtin min explicitly
    py_min = builtins.min

    if text is None:
        return []

    text = text.strip()
    if not text:
        return []

    chunks = []
    start = 0
    while start < len(text):
        end = py_min(start + max_chars, len(text))
        chunks.append(text[start:end])
        if end == len(text):
            break
        start = py_min(end - overlap, len(text))
    return chunks

smart_chunk_udf = udf(smart_chunk, ArrayType(StringType()))

silver_for_chunks = spark.table("delta_rag_silver")

with_chunks_df = silver_for_chunks.withColumn("chunks", smart_chunk_udf(col("clean_text")))

chunked_df = (
    with_chunks_df
    .select(
        "raw_id",
        "file_name",
        posexplode("chunks").alias("chunk_id", "chunk_text"),
        "load_ts",
        "content_hash"
    )
)

chunked_df.write.format("delta").mode("overwrite").saveAsTable("delta_rag_gold_chunks")

display(spark.table("delta_rag_gold_chunks"))


raw_id,file_name,chunk_id,chunk_text,load_ts,content_hash
0,doc1.txt,0,"delta lake provides acid transactions, time travel, and schema enforcement.",2025-11-27T22:18:56.088Z,9b3967813593ed033ef84b6c9d52a3b755ce68ae9569159022df4e698750e8ce
1,doc2.txt,0,"databricks lets data engineers build scalable etl, ml, and rag pipelines.",2025-11-27T22:18:56.088Z,cf946a9f8c04296e0f1f60279ad67f57e3a9f8c47a4e4f4522d6be031947cc94
2,doc3.txt,0,rag retrieves relevant chunks before sending to an llm for answer generation.,2025-11-27T22:18:56.088Z,cdc6df301aa2629905b5f13e8637f739db897576bdf09b4f571e4d2109880cc7
3,doc4.txt,0,pyarrow and parquet improve analytics and ml workloads with columnar formats.,2025-11-27T22:18:56.088Z,42de3a7c269a8537bc3bf69b407b86fa12864009e6d436762a165f451d823107


In [0]:
print(f"Loading embedding model: {MODEL_NAME}")
model = SentenceTransformer(MODEL_NAME)

gold_chunks_df = spark.table("delta_rag_gold_chunks")
chunks_pdf = gold_chunks_df.toPandas()

print(f"Embedding {len(chunks_pdf)} chunks...")
embeddings = model.encode(
    chunks_pdf["chunk_text"].tolist(),
    normalize_embeddings=True,
    show_progress_bar=True
)

chunks_pdf["embedding"] = embeddings.tolist()

emb_sdf = spark.createDataFrame(chunks_pdf)

emb_sdf.write.format("delta").mode("overwrite").saveAsTable("delta_rag_gold_embeddings")

spark.table("delta_rag_gold_embeddings").show(truncate=False)


Loading embedding model: all-MiniLM-L6-v2


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Embedding 4 chunks...


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

+------+---------+--------+-----------------------------------------------------------------------------+--------------------------+----------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
def retrieve_top_k(query: str, k: int = K_RETRIEVAL):
    emb_df = spark.table("delta_rag_gold_embeddings").select(
        "raw_id", "file_name", "chunk_id", "chunk_text", "embedding"
    )
    pdf = emb_df.toPandas()
    if pdf.empty:
        raise ValueError("No embeddings found.")

    matrix = np.vstack(pdf["embedding"].values)
    q_vec = model.encode([query], normalize_embeddings=True)[0]

    sims = matrix @ q_vec
    pdf["score"] = sims

    top_pdf = pdf.sort_values("score", ascending=False).head(k).copy()

    # Optional: log queries to a table
    ts = time.time()
    log_rows = [
        (query, float(row["score"]), row["file_name"], int(row["chunk_id"]), row["chunk_text"], ts)
        for _, row in top_pdf.iterrows()
    ]
    log_schema = StructType([
        StructField("query", StringType(), False),
        StructField("score", DoubleType(), False),
        StructField("file_name", StringType(), False),
        StructField("chunk_id", IntegerType(), False),
        StructField("chunk_text", StringType(), False),
        StructField("logged_at_unix", DoubleType(), False),
    ])
    log_df = spark.createDataFrame(log_rows, schema=log_schema)
    log_df.write.format("delta").mode("append").saveAsTable("delta_rag_query_logs")

    return top_pdf

# Quick sanity check
retrieve_top_k("What is RAG?", k=3)[["file_name", "chunk_text", "score"]]


Unnamed: 0,file_name,chunk_text,score
2,doc3.txt,rag retrieves relevant chunks before sending t...,0.461451
1,doc2.txt,databricks lets data engineers build scalable ...,0.146141
3,doc4.txt,pyarrow and parquet improve analytics and ml w...,0.104667


In [0]:
def build_rag_context(query: str, k: int = K_RETRIEVAL) -> str:
    top_pdf = retrieve_top_k(query, k=k)
    blocks = []
    for i, row in enumerate(top_pdf.itertuples(), start=1):
        blocks.append(f"[Chunk {i} | file={row.file_name} | score={row.score:.3f}]\n{row.chunk_text}")
    return "\n\n".join(blocks)

def delta_rag_prompt(query: str, k: int = K_RETRIEVAL) -> str:
    context = build_rag_context(query, k=k)
    prompt = f"""
You are an assistant specialized in data engineering, Databricks, Delta Lake, PyArrow, and RAG.

Context:
{context}

Question:
{query}

Using only the context above, answer in 4–6 sentences.
If the context is not sufficient, say you do not know based on the provided context.
""".strip()
    return prompt

q = "What is Delta Lake and how does it relate to RAG?"
print(delta_rag_prompt(q))


You are an assistant specialized in data engineering, Databricks, Delta Lake, PyArrow, and RAG.

Context:
[Chunk 1 | file=doc1.txt | score=0.602]
delta lake provides acid transactions, time travel, and schema enforcement.

[Chunk 2 | file=doc3.txt | score=0.229]
rag retrieves relevant chunks before sending to an llm for answer generation.

[Chunk 3 | file=doc4.txt | score=0.123]
pyarrow and parquet improve analytics and ml workloads with columnar formats.

Question:
What is Delta Lake and how does it relate to RAG?

Using only the context above, answer in 4–6 sentences.
If the context is not sufficient, say you do not know based on the provided context.


In [0]:
retrieve_top_k("What is RAG and how does Databricks help?", k=3)[["file_name", "chunk_text", "score"]]


Unnamed: 0,file_name,chunk_text,score
1,doc2.txt,databricks lets data engineers build scalable ...,0.613401
2,doc3.txt,rag retrieves relevant chunks before sending t...,0.364262
3,doc4.txt,pyarrow and parquet improve analytics and ml w...,0.321104


In [0]:
query = "What is Delta Lake and why is it useful?"
print(delta_rag_prompt(query))


You are an assistant specialized in data engineering, Databricks, Delta Lake, PyArrow, and RAG.

Context:
[Chunk 1 | file=doc1.txt | score=0.721]
delta lake provides acid transactions, time travel, and schema enforcement.

[Chunk 2 | file=doc4.txt | score=0.154]
pyarrow and parquet improve analytics and ml workloads with columnar formats.

[Chunk 3 | file=doc2.txt | score=0.145]
databricks lets data engineers build scalable etl, ml, and rag pipelines.

Question:
What is Delta Lake and why is it useful?

Using only the context above, answer in 4–6 sentences.
If the context is not sufficient, say you do not know based on the provided context.
