In [None]:
# 1. Install dependencies (Run once)
# !pip install --upgrade pip --quiet
# !pip install "mineru[core]" huggingface_hub sentence-transformers chromadb pdf2image PyMuPDF --quiet

# 2. Imports
import os
import re
import json
import time
from requests.exceptions import ConnectionError, HTTPError
from huggingface_hub import snapshot_download
from sentence_transformers import SentenceTransformer
import chromadb
from PIL import Image
import matplotlib.pyplot as plt
import io

# 3. Download PDF-Extract-Kit-1.0 model weights with retry/backoff
weights_dir = "/kaggle/working/pdf_extract_kit_models"
os.makedirs(weights_dir, exist_ok=True)

def download_with_retry(repo_id, local_dir, repo_type="model",
                        max_workers=1, resume_download=True,
                        retries=5, backoff_factor=1.0):
    for attempt in range(1, retries + 1):
        try:
            print(f"[DEBUG] Attempt {attempt}/{retries}: downloading {repo_id}…")
            path = snapshot_download(
                repo_id=repo_id,
                local_dir=local_dir,
                repo_type=repo_type,
                max_workers=max_workers,
                resume_download=resume_download
            )
            print(f"[DEBUG] Download succeeded: {path}")
            return path
        except (ConnectionError, HTTPError) as e:
            wait = backoff_factor * (2 ** (attempt - 1))
            print(f"[DEBUG] Error on attempt {attempt}: {e}. Retrying in {wait:.1f}s…")
            time.sleep(wait)
    raise RuntimeError(f"[ERROR] Failed to download {repo_id} after {retries} attempts")

# snapshot_path = download_with_retry(
#     repo_id="opendatalab/pdf-extract-kit-1.0",
#     local_dir=weights_dir
# )

# 4. Configure MinerU for local inference
config = {
    "weights_path": weights_dir,
    "device-mode": "cuda",        # or "cpu"
    "models_dir": weights_dir,
    "models-dir": weights_dir,
    # "device-mode": "cuda",         # or "cpu"
    "virtual-vram-size": 8,
    "method": "auto",
    "backend": "pipeline",
    "formula-enable": True,
    "table-enable": True,
    "start-page": 0,
    "end-page": None
}
conf_path = "/kaggle/working/magic-pdf.json"
with open(conf_path, "w") as f:
    json.dump(config, f, indent=2)
!cp /kaggle/working/magic-pdf.json /root/magic-pdf.json
print(f"[DEBUG] MinerU config written to {conf_path}")

# 5. Set PDF and output paths
pdf_path   = "/kaggle/input/test12/rondo_pro_manual_2015.pdf"  # update if needed
output_base = "/kaggle/working/mineru_output"
os.makedirs(output_base, exist_ok=True)
print(f"[DEBUG] PDF input: {pdf_path}")
print(f"[DEBUG] Output base directory: {output_base}")

# 6. Parse PDF with MinerU CLI (fully local inference)
print("[DEBUG] Running MinerU CLI for PDF parsing…")
!mineru -p {pdf_path} -o {output_base} -m auto

In [None]:
import re
import os
import string
from PIL import Image
import matplotlib.pyplot as plt
import fitz  # PyMuPDF
from sentence_transformers import SentenceTransformer
import chromadb

# --- Setup base paths and files ---
base_out = "/kaggle/working/mineru_output"  # Your base directory

# 1. Find project folder with "auto" directory
projects = [
    d for d in os.listdir(base_out)
    if os.path.isdir(os.path.join(base_out, d, "auto"))
]
if not projects:
    raise FileNotFoundError(f"No run folder with 'auto' in {base_out}")

run_dir = os.path.join(base_out, projects[0], "auto")

print(run_dir)
# 2. Find Markdown files in run_dir
md_files = [f for f in os.listdir(run_dir) if f.lower().endswith(".md")]
if not md_files:
    raise FileNotFoundError(f"No Markdown file (*.md) found in {run_dir}")

# Optionally select the first Markdown file, or apply filtering if needed
md_path = os.path.join(run_dir, md_files[0])
print(f"Using Markdown file: {md_path}")

# 3. Extract PDF ToC Titles (keep original)
pdf_path = "/kaggle/input/test12/rondo_pro_manual_2015.pdf"  # Set this appropriately!
doc = fitz.open(pdf_path)
toc = doc.get_toc()
pdf_titles = set(entry[1].strip() for entry in toc if entry[1].strip())
print(f"[DEBUG] Extracted {len(pdf_titles)} PDF ToC titles for heading detection.")

# --- Normalizer for Markdown lines ---
def normalize_line(line):
    # Replace tabs/newlines with space, lower-case, remove punctuation, collapse spaces, strip
    line = re.sub(r'[\r\n\t]', ' ', line)
    line = line.lower()
    line = line.translate(str.maketrans('', '', string.punctuation))
    line = re.sub(r'\s+', ' ', line)
    return line.strip()

# --- Chunk Markdown using normalized lines and PDF ToC titles ---
def chunk_markdown_with_pdf_titles(md_path, pdf_titles):
    print("[DEBUG] Starting Markdown chunking with normalized line matching…")
    heading_pattern = re.compile(r"^#+\s*\d+(?:\.\d+)*\b")
    image_pattern   = re.compile(r"!\[\]\((.+?)\)")
    table_inline    = re.compile(r"(<table>.*?</table>)", re.DOTALL)

    chunks = []
    heading = None
    content_lines = []
    images = []
    tables = []

    with open(md_path, "r", encoding="utf-8") as f:
        lines = f.readlines()

    for idx, raw in enumerate(lines, start=1):
        line = raw.rstrip("\n")
        stripped = line.lstrip()
        norm_line = normalize_line(line)
        debug_tag = f"[DEBUG][Line {idx:03d}] {repr(line)}"

        # Heading detection: regex on original, or normalized line against normalized PDF TOC titles
        is_heading = False
        reason = ""
        if stripped.startswith("#") and heading_pattern.match(stripped):
            is_heading = True
            reason = "markdown numeric heading"
        else:
            for title in pdf_titles:
                if norm_line == normalize_line(title):
                    is_heading = True
                    reason = "normalized PDF TOC title match"
                    break

        if is_heading:
            print(f"{debug_tag} → DETECTED HEADING ({reason})")
            if heading is not None:
                text = "".join(content_lines).strip()
                print(
                    f"[DEBUG]   Saving chunk: heading={repr(heading)}, "
                    f"text_len={len(text)}, images={images}, tables={len(tables)}"
                )
                chunks.append({
                    "heading": heading,
                    "text": text,
                    "images": images.copy(),
                    "tables": tables.copy(),
                })
                print(f"[DEBUG]   Current chunk count: {len(chunks)}")
            heading = line
            content_lines.clear()
            images.clear()
            tables.clear()
            print(f"[DEBUG]   New heading set: {repr(heading)}")
            continue

        # Image detection
        img_match = image_pattern.search(line)
        if img_match:
            img_path_ref = img_match.group(1)
            images.append(img_path_ref)
            print(f"{debug_tag} → IMAGE REF: {img_path_ref}")
            continue

        # Inline table detection
        inline_tables = table_inline.findall(line)
        if inline_tables:
            for tbl in inline_tables:
                tables.append(tbl)
                print(f"{debug_tag} → INLINE TABLE FOUND: {tbl[:60]}...")
            continue

        # Body text
        content_lines.append(line + "\n")
        print(f"{debug_tag} → BODY TEXT (len now={sum(len(l) for l in content_lines)})")

    # Save final chunk
    if heading is not None:
        text = "".join(content_lines).strip()
        print(
            f"[DEBUG]   Saving FINAL chunk: heading={repr(heading)}, "
            f"text_len={len(text)}, images={images}, tables={len(tables)}"
        )
        chunks.append({
            "heading": heading,
            "text": text,
            "images": images.copy(),
            "tables": tables.copy(),
        })
        print(f"[DEBUG]   Final chunk count: {len(chunks)}")

    print(f"[DEBUG] Completed chunking: total_chunks={len(chunks)}")
    return chunks

# --- Use the chunking function ---
md_chunks = chunk_markdown_with_pdf_titles(md_path, pdf_titles)

# 9. List all images in run_dir/images → image_collection
images_dir = os.path.join(run_dir, "images")
image_files = []
if os.path.isdir(images_dir):
    for fn in os.listdir(images_dir):
        if fn.lower().endswith((".png", ".jpg", ".jpeg")):
            image_files.append(os.path.join(images_dir, fn))
    print(f"[DEBUG] Found {len(image_files)} image files in {images_dir}")

# 10. Initialize ChromaDB and embedding models
chromadb_path = f"./chroma_db_{projects[0]}"
client = chromadb.PersistentClient(path=chromadb_path)
text_embedder = SentenceTransformer("all-MiniLM-L6-v2")
image_embedder = SentenceTransformer("clip-ViT-B-32")
print(f"[DEBUG] ChromaDB initialized at {chromadb_path}")

# 11. Store Markdown-heading chunks: embed only heading and store metadata
md_col = client.get_or_create_collection("md_heading_chunks")
for i, chunk in enumerate(md_chunks):
    combined = f"{chunk['heading']}\n{chunk['text']}"
    emb = text_embedder.encode(combined).tolist()    
    md_col.add(
        ids=[f"md-{i}"],
        embeddings=[emb],
        metadatas=[{
            "heading": chunk["heading"],
            "images": ";".join(chunk["images"]),
            "tables_count": json.dumps(chunk["tables"]),
        }],
        documents=[chunk["text"]]
    )
    print(
        f"[DEBUG] Stored md-{i}: heading={repr(chunk['heading'])}, images={chunk['images']}, tables={len(chunk['tables'])}"
    )
print(f"[INFO] Inserted {len(md_chunks)} markdown-heading chunks into 'md_heading_chunks'")

# 12. Store extracted image files in 'image_collection'
img_col = client.get_or_create_collection("image_collection")
for i, img_path in enumerate(image_files):
    img = Image.open(img_path).convert("RGB")
    emb = image_embedder.encode(img).tolist()
    img_col.add(
        ids=[f"img-{i}"],
        embeddings=[emb],
        metadatas=[{"file_path": img_path}],
        documents=[f"[IMAGE] {img_path}"]
    )
    print(f"[DEBUG] Inserted img-{i}: file_path={img_path}")
print(f"[INFO] Inserted {len(image_files)} images into 'image_collection'")

# 13. (Optional) Visualize first layout PNG if available
layout_pngs = [f for f in os.listdir(run_dir) if f.lower().endswith(".png")]
if layout_pngs:
    layout = Image.open(os.path.join(run_dir, layout_pngs[0]))
    plt.figure(figsize=(8, 12))
    plt.imshow(layout)
    plt.axis("off")
    plt.title(f"Layout: {layout_pngs[0]}")
    plt.show()
else:
    print("[INFO] No layout PNGs found in run directory.")


In [None]:
import os
import re
from PIL import Image
import matplotlib.pyplot as plt
import chromadb

# --- PARAMETERS and setup ---
base_out = "/kaggle/working/mineru_output"  # Set as appropriate

# Find project folders with "auto" directory
projects = [d for d in os.listdir(base_out) if os.path.isdir(os.path.join(base_out, d, "auto"))]
if not projects:
    raise FileNotFoundError(f"No run folder with 'auto' in {base_out}")

run_dir = os.path.join(base_out, projects[0], "auto")

# Initialize ChromaDB client/collection
chromadb_path = f"./chroma_db_{projects[0]}"
client = chromadb.PersistentClient(path=chromadb_path)
md_col = client.get_or_create_collection("md_heading_chunks")

# --- USER QUERY SECTION ---
try:
    user_query = input("Enter your question: ").strip()
except EOFError:
    user_query = "How to correctly mount and tighten the conveyor belts?"

if not user_query:
    raise ValueError("No user query entered. Please provide a question.")

# --- Semantic search for top N relevant chunks ---
n_results = 3
result = md_col.query(
    query_texts=[user_query],
    n_results=n_results,
    include=["metadatas", "documents"]
)

# --- Display the top chunks ---
for idx, (doc, meta) in enumerate(zip(result["documents"][0], result["metadatas"][0]), 1):
    print(f"\n[Excerpt {idx}] Heading: {meta.get('heading')}")
    print(doc + ("..." if len(doc) > 400 else ""))
    imgs = [img.strip() for img in meta.get("images", "").split(";") if img.strip()]
    if imgs:
        print(f"Images found: {imgs}")
    else:
        print("No images listed for this chunk.")

# --- Choose best chunk: (automated as first, or prompt user) ---
best_idx = 0  # Or: int(input("Which excerpt to use (1/2/3)? ")) - 1
print("\n[INFO] Using Excerpt", best_idx + 1)
best_doc = result["documents"][0][best_idx]
best_meta = result["metadatas"][0][best_idx]
imgs = [img.strip() for img in best_meta.get("images", "").split(";") if img.strip()]

# --- Step-by-step extraction (split by numbered steps, fallback: splitlines) ---
steps = re.split(r'(?<=\.|:)\s*(?=\d+\.)', best_doc)  # crude split for "1.", "2.", etc.
steps = [s.strip() for s in steps if s.strip()]
if len(steps) == 1:
    steps = [s.strip() for s in best_doc.strip().split('\n') if s.strip()]

print("\nStep-by-step Instructions:")
for idx, step in enumerate(steps, 1):
    print(f"{idx}. {step}")

# --- Display all images associated with the chunk ---
if imgs:
    print("\nRelevant images for these instructions:")
    for i, img_path in enumerate(imgs, 1):
        print(f"Image {i}: {img_path}")
        # Try resolve absolute path:
        possible_paths = [
            img_path,
            os.path.join(run_dir, "images", img_path),
            os.path.join(run_dir, img_path)
        ]
        abs_img_path = next((p for p in possible_paths if os.path.exists(p)), None)
        if abs_img_path:
            img = Image.open(abs_img_path)
            plt.figure(figsize=(9, 8))
            plt.imshow(img)
            plt.axis("off")
            plt.title(f"Image {i}")
            plt.show()
        else:
            print(f"Image file not found for: {img_path}")
else:
    print("No images for this chunk.")


In [None]:
import chromadb
import json

# Use the same ChromaDB path and collection name as in your pipeline
chromadb_path = f"./chroma_db_{projects[0]}"
client = chromadb.PersistentClient(path=chromadb_path)
collection = client.get_or_create_collection("md_heading_chunks")

# Retrieve all stored chunk IDs
chunk_ids = collection.get()['ids']
print(f"Total chunks in 'md_heading_chunks': {len(chunk_ids)}")

# Display details for every chunk, or just the first N for quick inspection
N = len(chunk_ids)  # Change to 'len(chunk_ids)' to print everything (may be large!)
results = collection.get(ids=chunk_ids[:N])

for idx in range(len(results['ids'])):
    print(f"\n--- Chunk {idx + 1} ---")
    print("ID:", results['ids'][idx])
    print("Heading:", results['metadatas'][idx].get('heading', ''))
    print("Images:", results['metadatas'][idx].get('images', ''))
    print("Tables count:", results['metadatas'][idx].get('tables_count', 0))

    tables_meta = results['metadatas'][idx].get('tables', None)
    tables = []
    if tables_meta:
        # If you stored as JSON string, decode it;
        # if you joined as one string, you can split by `\n\n` or suitable delimiter
        try:
            tables = json.loads(tables_meta)
        except (TypeError, json.JSONDecodeError):
            # If already a list, or couldn't decode, handle gracefully
            if isinstance(tables_meta, list):
                tables = tables_meta
            else:
                # fallback: treat as a single string, split if needed
                tables = [tables_meta]

    if tables:
        print(f"Tables ({len(tables)}):")
        for t_idx, table in enumerate(tables, 1):
            preview = table[:200] + ("..." if len(table) > 200 else "")
            print(f"  Table {t_idx}: {preview}")
    else:
        print("No tables in this chunk.")

    # Uncomment below to preview the text as well
    # print("Text preview:\n", results['documents'][idx][:300], "..." if len(results['documents'][idx]) > 300 else "")


In [None]:
# import os
# import re
# from PIL import Image
# import matplotlib.pyplot as plt
# import pandas as pd
# from IPython.display import display  # For Jupyter, use display() for DataFrames

# # --- PARAMETERS and setup ---
# base_out = "/kaggle/working/mineru_output"  # Adjust as needed
# projects = [d for d in os.listdir(base_out) if os.path.isdir(os.path.join(base_out, d, "auto"))]
# if not projects:
#     raise FileNotFoundError(f"No run folder with 'auto' in {base_out}")
# run_dir = os.path.join(base_out, projects[0], "auto")

# import chromadb
# chromadb_path = f"./chroma_db_{projects[0]}"
# client = chromadb.PersistentClient(path=chromadb_path)
# md_col = client.get_or_create_collection("md_heading_chunks")

# # --- Get user query ---
# try:
#     user_query = input("Enter your question: ").strip()
# except EOFError:
#     user_query = "How to correctly mount and tighten the conveyor belts?"
# if not user_query:
#     raise ValueError("No user query entered. Please provide a question.")

# # --- Semantic search for top N relevant chunks ---
# n_results = 3
# result = md_col.query(
#     query_texts=[user_query],
#     n_results=n_results,
#     include=["metadatas", "documents"]
# )

# # --- Display the top chunks ---
# for idx, (doc, meta) in enumerate(zip(result["documents"][0], result["metadatas"][0]), 1):
#     print(f"\n[Excerpt {idx}] Heading: {meta.get('heading')}")
#     print(doc[:400] + ("..." if len(doc) > 400 else ""))
#     imgs = [img.strip() for img in meta.get("images", "").split(";") if img.strip()]
#     tables = meta.get("tables", [])
#     if imgs:
#         print(f"Images found: {imgs}")
#     else:
#         print("No images listed for this chunk.")
#     print(f"Tables count: {len(tables)}")

# # --- Choose best chunk: automated here as first, or ask user ---
# best_idx = 0  # or int(input("Which excerpt to use (1/2/3)? ")) - 1
# print("\n[INFO] Using Excerpt", best_idx + 1)
# best_doc = result["documents"][0][best_idx]
# best_meta = result["metadatas"][0][best_idx]
# imgs = [img.strip() for img in best_meta.get("images", "").split(";") if img.strip()]
# tables = best_meta.get("tables", [])

# # --- Step-by-step extraction (split by numbered steps, fallback: splitlines) ---
# steps = re.split(r'(?<=\.|:)\s*(?=\d+\.)', best_doc)  # crude split for "1.", "2.", etc.
# if len(steps) == 1:
#     steps = [s.strip() for s in best_doc.strip().split('\n') if s.strip()]

# print("\nStep-by-step Instructions:")
# for idx, step in enumerate(steps, 1):
#     print(f"{idx}. {step}")

# # --- Display tables from metadata ---
# if tables:
#     print("\nRelevant tables found in this chunk:")
#     for i, table in enumerate(tables, 1):
#         print(f"\nTable {i}:")
#         # If table is stored as HTML string, you can print or parse it
#         if isinstance(table, str):
#             print(table)  # raw HTML or markdown snippet
            
#             # Optionally convert markdown tables to DataFrame for nicer display if you recognize markdown
#             try:
#                 from io import StringIO
#                 # crude conversion for markdown tables if applicable
#                 if table.strip().startswith("|"):
#                     csv_like = "\n".join(
#                         line.strip() for line in table.splitlines() if line.strip()
#                     )
#                     csv_like = re.sub(r'^\|', '', csv_like, flags=re.MULTILINE)
#                     csv_like = re.sub(r'\|$', '', csv_like, flags=re.MULTILINE)
#                     csv_like = re.sub(r'\|', ',', csv_like)
#                     df = pd.read_csv(StringIO(csv_like))
#                     print("Rendered as DataFrame:")
#                     display(df)
#             except Exception as e:
#                 print(f"Could not parse table to DataFrame: {e}")

#         # If table is structured data (e.g., list of rows), try render as DataFrame
#         elif isinstance(table, (list, tuple)):
#             try:
#                 df = pd.DataFrame(table)
#                 print("Rendered as DataFrame:")
#                 display(df)
#             except Exception as e:
#                 print(f"Could not convert table data to DataFrame: {e}")
#                 print(table)
#         else:
#             print(table)
# else:
#     print("No tables found in this chunk.")

# # --- Display all images associated with the chunk ---
# if imgs:
#     print("\nRelevant images for these instructions:")
#     for i, img_path in enumerate(imgs, 1):
#         print(f"Image {i}: {img_path}")
#         # Try resolve absolute path of the image
#         possible_paths = [
#             img_path,
#             os.path.join(run_dir, "images", img_path),
#             os.path.join(run_dir, img_path)
#         ]
#         abs_img_path = next((p for p in possible_paths if os.path.exists(p)), None)
#         if abs_img_path:
#             try:
#                 img = Image.open(abs_img_path)
#                 plt.figure(figsize=(6, 5))
#                 plt.imshow(img)
#                 plt.axis("off")
#                 plt.title(f"Image {i}")
#                 plt.show()
#             except Exception as e:
#                 print(f"Error opening image {img_path}: {e}")
#         else:
#             print(f"Image file not found for: {img_path}")
# else:
#     print("No images for this chunk.")


In [None]:
# # Retrieve all stored documents, metadatas, and ids
# results = collection.get(include=["documents", "metadatas"])

# all_ids = results["ids"]
# all_docs = results["documents"]
# all_metas = results["metadatas"]

# print(f"Total chunks stored: {len(all_ids)}")

# for idx in range(len(all_ids)):
#     print(f"ID: {all_ids[idx]}")
#     print(f"Heading: {all_metas[idx].get('heading', '')}")
#     print(f"Text excerpt: {all_docs[idx].replace(chr(10), ' ')}")
#     images = all_metas[idx].get("images", "")
#     if images:
#         print("Associated images:", images)
#     print("-" * 75)


In [None]:
# import fitz  # PyMuPDF

# # Open the PDF file
# doc = fitz.open(pdf_path)

# # Retrieve the Table of Contents (list of [level, title, page number])
# toc = doc.get_toc()

# # Extract just the titles (ensure they are exactly as in the PDF)
# titles = [entry[1].strip() for entry in toc]

# # Optional: print all titles to verify
# for title in titles:
#     print(title)
