In [0]:
%pip install Pdfplumber langchain pymupdf
dbutils.library.restartPython()

In [0]:
import fitz, statistics
from langchain.schema import Document

def load_twocol_pdf_auto(path:str):
    pdf = fitz.open(path)
    docs = []
    for pno in range(pdf.page_count):
        page = pdf.load_page(pno)
        blocks = page.get_text("blocks")  # (x0, y0, x1, y1, text, block_no, ...)
        # keep text blocks only
        blocks = [b for b in blocks if isinstance(b[4], str) and b[4].strip()]

        # midpoints of blocks
        mids = sorted([( (b[0]+b[2])/2, b ) for b in blocks], key=lambda x: x[0])
        if not mids:
            docs.append(Document(page_content="", metadata={"source": path, "page": pno+1}))
            continue

        # find biggest horizontal gap between sorted midpoints
        gaps = []
        for i in range(len(mids)-1):
            gaps.append((mids[i+1][0]-mids[i][0], (mids[i][0]+mids[i+1][0])/2))
        # fallback to geometric center if something odd
        split_x = page.rect.x0 + page.rect.width/2
        if gaps:
            gaps.sort(key=lambda g: g[0], reverse=True)
            split_x = gaps[0][1]

        left_rect  = fitz.Rect(page.rect.x0, page.rect.y0, split_x, page.rect.y1)
        right_rect = fitz.Rect(split_x, page.rect.y0, page.rect.x1, page.rect.y1)

        left_text  = page.get_text("text", clip=left_rect)
        right_text = page.get_text("text", clip=right_rect)

        combined = (left_text or "").strip() + "\n" + (right_text or "").strip()
        docs.append(Document(page_content=combined, metadata={"source": path, "page": pno+1, "split_x": split_x}))
    pdf.close()
    return docs


In [0]:
from pyspark.sql.functions import substring_index

directory_path = "/Volumes/llmdb2/rag/docloader"

#list all the files in the directory
files_paths = [file.path for file in dbutils.fs.ls(directory_path)]
#print(files_paths)

df = spark.createDataFrame(files_paths, "string").select(substring_index("value", "/", -1).alias("file_name"))

df.display()

In [0]:
import os
import pdfplumber
from langchain.text_splitter import RecursiveCharacterTextSplitter

# pdf_volume_path = "/Volumes/llmdb2/rag/docloader"

# # get the list of already processed pdf files

# processed_files = spark.sql(f"SELECT DISTINCT file_name FROM llmdb2.rag.docs_load_tracker").collect()

# processed_files = set(row["file_name"] for row in processed_files)

# # process new pdf files only

# new_files = [file for file in os.listdir(pdf_volume_path) if file not in processed_files]

# all_text = '' # initialize all_text to store text from new pdf files

# for file_name in new_files:
#     pdf_path = os.path.join(pdf_volume_path, file_name)

#     with pdfplumber.open(pdf_path) as pdf:
#         for pdf_page in pdf.pages:
#             single_page_text = pdf_page.extract_text()
#             all_text = all_text + '\n' + single_page_text




# # #split the text into chunks

# # from langchain.text_splitter import RecursiveCharacterTextSplitter

# # length_function = len

# # splitter = RecursiveCharacterTextSplitter(
# #     separators=["\n\n", "\n", " ", ""],
# #     chunk_size=1000,
# #     chunk_overlap=200,
# #     length_function=length_function
# # )

# # chunks = splitter.split_text(all_text)

# # print(chunks)




In [0]:
from langchain.text_splitter import RecursiveCharacterTextSplitter

splitter = RecursiveCharacterTextSplitter(
    chunk_size=1200, chunk_overlap=150, separators=["\n\n", "\n", " ", ""]
)
chunks = splitter.split_documents(load_twocol_pdf_auto('/Volumes/llmdb2/rag/docloader/db2z_11_codesbook.pdf'))  # or load_twocol_pdf_auto / ocr_twocol_pdf


In [0]:
# from pyspark.sql.functions import pandas_udf
# from pyspark.sql.types import ArrayType, StringType
# import pandas as pd

# @pandas_udf("array<string>")
# def get_chunks(dummy):
#     return pd.Series([chunks])

# # Register the UDF
# spark.udf.register("get_chunks_udf", get_chunks)

In [0]:
df_chunks = spark.createDataFrame(chunks, "string").toDF("text")
df_chunks.write.mode("append").saveAsTable("llmdb2.rag.docs_text")

In [0]:
%sql
select * from llmdb2.rag.docs_text

In [0]:
df.createOrReplaceTempView("temp_table")  # Create a temporary table from the DataFrame

# Insert only the rows that do not exist in the target table
spark.sql("""
    INSERT INTO llmdb2.rag.docs_load_tracker
    SELECT * , current_timestamp FROM temp_table
    WHERE NOT EXISTS (
        SELECT 1 FROM llmdb2.rag.docs_load_tracker
        WHERE temp_table.file_name = llmdb2.rag.docs_load_tracker.file_name
    )
""")