In [0]:
!pip install docling
!pip install -qU pip docling transformers

In [0]:
from pathlib import Path
from pyspark.dbutils import DBUtils
import re, os

def get_repo_root() -> Path:
    dbutils = DBUtils(spark)
    notebook_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()

    # Extrahiere /Workspace/Repos/<user>/<repo>
    match = re.match(r"^(\/Workspace\/Repos\/[^\/]+\/[^\/]+)", notebook_path)
    if match:
        return Path(match.group(1))
    else:
        return Path(os.getcwd())

repo_root = get_repo_root()
print("📁 Repo root:", repo_root)

# Eine Ebene höher (wie "cd ..")
parent_path = repo_root.parent

# Zwei Ebenen höher
two_up = repo_root.parent.parent

print("⬆️ Eine Ebene höher:", parent_path)
print("⬆️ root:", parent_path)

doc_root = parent_path / "documents"
out_dir = parent_path / "out"
product_name = "Nano 33 BLE"
product_category = "Nano Family"
file_name  = "Nano_33_BLE_datasheet.pdf"

pdf_path = doc_root / product_category / product_name / "Nano_33_BLE_datasheet.pdf"
print("⬆️ pdf pfad:",pdf_path)

out_path_docling = out_dir / product_category / product_name / "docling_chunks.jsonl"
print("⬆️ oout_path_docling:",out_path_docling)

out_path_langchain = out_dir / product_category / product_name / "langchain_chunks.jsonl"
print("⬆️ out_path_langchain:",out_path_langchain)

code_path_langchain = parent_path / "langchain_chunking.py"
print("⬆️code_path_langchain:",code_path_langchain)

code_path_docling= parent_path / "docling_chunking.py"
print("⬆️code_path_langchain:",code_path_docling)




In [0]:
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import PdfPipelineOptions
pdf_options = PdfPipelineOptions()
pdf_options.do_ocr = False                    # No OCR - pure text extraction only   # noqa: E501
pdf_options.generate_page_images = False      # No page images  # noqa: E501
pdf_options.generate_picture_images = False   # Ignore pictures completely  # noqa: E501
pdf_options.generate_table_images = False     # Keep tables as text/markdown, not images  # noqa: E501

        # Configure format options
format_options = {
            InputFormat.PDF: PdfFormatOption(
                pipeline_options=pdf_options
            )
        }

# Initialize document converter
converter = DocumentConverter(
            format_options=format_options
        )
result = converter.convert(pdf_path)
doc = result.document

In [0]:
print(doc_2.pages)
print(doc_2.export_to_markdown()[:1000])

In [0]:
from docling_core.transforms.chunker.tokenizer.huggingface import HuggingFaceTokenizer
from transformers import AutoTokenizer

from docling.chunking import HybridChunker

EMBED_MODEL_ID = "sentence-transformers/all-MiniLM-L6-v2"
MAX_TOKENS = 800  # set to a small number for illustrative purposes

tokenizer = HuggingFaceTokenizer(
    tokenizer=AutoTokenizer.from_pretrained(EMBED_MODEL_ID),
    max_tokens=MAX_TOKENS,  # optional, by default derived from `tokenizer` for HF case
)
chunker = HybridChunker(
    tokenizer=tokenizer,
    merge_peers=True,  # optional, defaults to True
)
# chunk_iter = chunker.chunk(dl_doc=doc_2)
# chunks = list(chunk_iter)
# chunks

In [0]:
# Cleaning-Funktion
# ---------------------------
def clean_text(t: str) -> str:
    if not t:
        return ""
    t = re.sub(r"(\w)-\n(\w)", r"\1\2", t)
    t = t.replace("\r", "")
    t = re.sub(r"\n{2,}", "\n", t)
    t = re.sub(r"[ \t]{2,}", " ", t)

    def noisy(line: str) -> bool:
        s = line.strip()
        if not s:
            return True
        non_alpha = sum(1 for ch in s if not ch.isalpha())
        return (non_alpha / max(1, len(s))) > 0.6

    lines = [ln for ln in t.split("\n") if not noisy(ln)]
    t = "\n".join(lines)
    t = re.sub(r"^(Table|Figure)\s*\d+[:.\-]\s.*$", "", t, flags=re.IGNORECASE | re.MULTILINE)
    t = re.sub(r"^\s*\|.*\|\s*$", "", t, flags=re.MULTILINE)
    t = re.sub(r"^\s*[-=]{3,}\s*$", "", t, flags=re.MULTILINE)
    return t.strip()




In [0]:
import re

def normalize_heading(title: str) -> str:
    """Entfernt Kapitel-/Abschnitts-Präfixe, Nummern und Deko."""
    if not title:
        return ""
    s = title.strip().lower()

    # "section 9", "chapter 3", "kapitel 4", "abschnitt 2" vorne entfernen
    s = re.sub(r'^\s*(section|chapter|kapitel|abschnitt)\s*\d+[:.)-]*\s*', '', s, flags=re.I)

    # führende Nummerierung wie "9.", "9.1.2", "10)" etc. entfernen
    s = re.sub(r'^\s*\d+(?:\.\d+)*\s*[:.)-]*\s*', '', s)

    # Deko am Ende weg
    s = s.rstrip(' :.-')
    s = re.sub(r'\s+', ' ', s)
    return s
HEADINGS_BLACKLIST_EQ = {
    # exakte Matches nach Normalisierung
    "contents", "table of contents", "toc",
    "index", "references", "reference documentation",
    "company information", "company info",
    "revision history", "document history",
    "legal notice", "trademarks", "acknowledgements",
    "glossary", "contacts", "contact",
    # deutsch
    "inhalt", "inhaltsverzeichnis", "verzeichnis",
    "referenzen", "referenzdokumentation",
    "unternehmensinformationen", "revision", "versionsverlauf",
    "rechtliche hinweise", "marken", "danksagungen", "glossar", "kontakt",
}

# substring-Varianten (robuster)
HEADINGS_BLACKLIST_CONTAINS = {
    "reference documentation",
    "referenzdokumentation",
    "table of contents",
    "inhaltsverzeichnis",
    "revision history",
    "document history",
    "company information",
}

def title_matches_blacklist(title: str) -> bool:
    tnorm = normalize_heading(title)
    if tnorm in HEADINGS_BLACKLIST_EQ:
        return True
    return any(key in tnorm for key in HEADINGS_BLACKLIST_CONTAINS)
URL_RE = re.compile(r'https?://|www\.', re.I)

def url_ratio(text: str) -> float:
    if not text:
        return 0.0
    urls = len(URL_RE.findall(text))
    words = max(1, len(text.split()))
    return urls / words

def looks_like_link_table(text: str) -> bool:
    # viele "Link ="-Zeilen oder Markdown-Tabelle
    lines = text.splitlines()
    link_eq = sum(1 for ln in lines if "link" in ln.lower() and "=" in ln)
    pipes   = sum(1 for ln in lines if ln.count("|") >= 2)
    return link_eq >= 2 or pipes >= 6
def get_section_title_from_chunk(ch):
    hp = getattr(ch, "hierarchy_path", None)
    if isinstance(hp, list) and hp:
        last = hp[-1]
        if isinstance(last, dict):
            return last.get("title") or ""
    return ""

def first_line(text: str) -> str:
    return (text or "").split("\n", 1)[0].strip()

def should_drop_chunk(ch, ctx_text: str) -> bool:
    title_h = get_section_title_from_chunk(ch)
    title_f = first_line(ctx_text)

    # 1) Titel (hierarchy oder erste Zeile) – nach Normalisierung + contains
    if title_matches_blacklist(title_h) or title_matches_blacklist(title_f):
        return True

    # 2) Inhaltliche Heuristik – viele Links / Link-Tabelle
    if url_ratio(ctx_text) > 0.04 or looks_like_link_table(ctx_text):
        # nur droppen, wenn der Titel leicht in die Richtung geht (robust)
        if any(k in normalize_heading(title_f) for k in ("reference", "referenz", "link", "documentation")) \
           or any(k in normalize_heading(title_h) for k in ("reference", "referenz", "link", "documentation")):
            return True

    return False


In [0]:
out_path_docling

In [0]:
out_path_langchain = out_dir / product_category / product_name / "langchain_chunks.jsonl"
print("⬆️ out_path_langchain:",out_path_langchain)

code_path_langchain = parent_path / "langchain_chunking.py"
print("⬆️code_path_langchain:",code_path_langchain)

code_path_docling= parent_path / "docling_chunking.py"
print("⬆️code_path_langchain:",code_path_docling)



In [0]:
from pathlib import Path
import json

def process_pdf(pdf_path: Path, out_dir: Path):
    category = pdf_path.parent.parent.name
    product  = pdf_path.parent.name

    out_path = out_dir / category / "docling_chunks.jsonl"
    out_path.parent.mkdir(parents=True, exist_ok=True)

    #doc = converter.convert(str(pdf_path)).document
    raw_chunks = list(chunker.chunk(dl_doc=doc_2))
    total_chunks = len(raw_chunks)

    records = []
    for i, ch in enumerate(raw_chunks):
        text_raw = clean_text(ch.text or "")
        if len(text_raw) < 30:
            continue

        context = clean_text(chunker.contextualize(chunk=ch))
        if len(context.split()) < 25:
            continue

        if should_drop_chunk(ch, context):  # optional
            continue

        # section (defensiv)
        section = None
        hp = getattr(ch, "hierarchy_path", None)
        if isinstance(hp, list) and hp:
            last = hp[-1]
            if isinstance(last, dict):
                section = last.get("title")

        n_tokens = tokenizer.count_tokens(context)
        semantic_density = round(n_tokens / max(1, len(context)), 4)

        rec = {
            "category": category,
            "chunk_id": f"{pdf_path.stem}::c{i}",
            "chunk_size": n_tokens,
            "chunk_type": "contextualized",
            "product": product,
            "section": section,
            "semantic_density": semantic_density,
            "text": f"[Product: {product}] [Category: {category}]\n\n{context}",
            "total_chunks": total_chunks,
        }
        records.append(rec)

    with open(out_path, "a", encoding="utf-8") as f:
        for r in records:
            f.write(json.dumps(r, ensure_ascii=False) + "\n")

    print(f"[OK] {len(records)} Chunks hinzugefügt zu: {out_path}")

def iterate_product_docs(doc_root: Path, out_dir: Path):
    for pdf_path in doc_root.rglob("*.pdf"):
        # erwartet Struktur <root>/<category>/<product>/<file.pdf>
        if len(pdf_path.parts) >= 3:
            print(f"📂 {pdf_path.parent.parent.name} / {pdf_path.parent.name}")
            process_pdf(pdf_path, out_dir)



In [0]:
from pathlib import Path
import json


def get_repo_root() -> Path:
    dbutils = DBUtils(spark)
    notebook_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()

    # Extrahiere /Workspace/Repos/<user>/<repo>
    match = re.match(r"^(\/Workspace\/Repos\/[^\/]+\/[^\/]+)", notebook_path)
    if match:
        return Path(match.group(1))
    else:
        return Path(os.getcwd())

repo_root = get_repo_root()
print("📁 Repo root:", repo_root)

# Eine Ebene höher (wie "cd ..")
parent_path = repo_root.parent

doc_root = parent_path / "documents"
out_dir = parent_path / "out"
# product_name = "Nano 33 BLE"
# product_category = "Nano Family"
# file_name  = "Nano_33_BLE_datasheet.pdf"

print("⬆️ doc_root:",doc_root)

print("⬆️ out_dir:",out_dir)

pdf_path = doc_root / product_category / product_name / "Nano_33_BLE_datasheet.pdf"
print("⬆️ pdf pfad:",pdf_path)

out_path_docling = out_dir / product_category / product_name / "docling_chunks.jsonl"
print("⬆️ oout_path_docling:",out_path_docling)

iterate_product_docs(doc_root, out_dir)


In [0]:
out_path_docling = "file:/Workspace/Users/nasiba.tuychieva@gea.com/master_thesis-rag/main/out/Nano Family/docling_chunks.jsonl"
df = spark.read.json(out_path_docling)
display(df)

In [0]:
out_path_docling = "file:/Workspace/Users/nasiba.tuychieva@gea.com/master_thesis-rag/main/out/UNO Family/docling_chunks.jsonl"
df = spark.read.json(out_path_docling)
display(df)

In [0]:
out_path_docling = "file:/Workspace/Users/nasiba.tuychieva@gea.com/master_thesis-rag/main/out/Education/docling_chunks.jsonl"
df = spark.read.json(out_path_docling)
display(df)

In [0]:
%pip install docling
%pip install -qU pip docling transformers

In [0]:
spark.catalog.clearCache()


In [0]:
import importlib
import process_document

importlib.reload(process_document)
from process_document import get_repo_root, iterate_product_docs


In [0]:
df.unpersist()


In [0]:
root = get_repo_root()
print(root)
# oder mit eigenem Startpunkt/Markern:
# root = get_repo_root(start_path=Path(__file__).parent, markers=['.git', '.hg'])


In [0]:
iterate_product_docs()

In [0]:
from process_document import

In [0]:
root = get_repo_root()
parent_path = root.parent
doc_root = parent_path / "documents"
out_dir = parent_path / "out"
print("⬆️ doc_root:", doc_root)
print("⬆️ out_dir:",out_dir)

In [0]:
root = get_repo_root()
parent_path = root.parent
doc_root = parent_path / "documents"
out_dir = parent_path / "out"
#tokenizer = return_tokenizer()
##doc=convert_documents_into_docling_doc(doc_root)
#chunker = chunk_documents_with_docling(doc)
    
for pdf_path in doc_root.rglob("*.pdf"):
        # expected struktur: <root>/<category>/<product>/<file.pdf>
    if len(pdf_path.parts) >= 3:
        print(f"Processing {pdf_path}")
        print(f"Processing {pdf_path.parent.parent.name} / {pdf_path.parent.name} / {pdf_path.name}")
        print(f"Writing {pdf_path.parent.parent.name} / {pdf_path.parent.name} / {pdf_path.name}")
            #process_pdf(pdf_path, out_dir, doc, chunker, tokenizer)

In [0]:
from pathlib import Path
pdf_path = Path("file:/Workspace/Users/nasiba.tuychieva@gea.com/master_thesis-rag/main/documents/MKR Family/MKR IoT Carrier Rev2/ABX00073-datasheet.pdf")

category = pdf_path.parent.parent.name
product  = pdf_path.parent.name

out_path = out_dir / category / "docling_chunks.jsonl"
out_path.parent.mkdir(parents=True, exist_ok=True)

print(out_path)