<a href="https://colab.research.google.com/github/kairamilanifitria/PurpleBox-Intern/blob/main/RAG/1_PARSING.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import subprocess
import logging
import time
import warnings
import torch
from pathlib import Path
import json
import yaml

warnings.filterwarnings("ignore")

# Install dependencies
subprocess.run([
    "pip", "install",
    "llama-index>=0.12.8", "llama-index-core>=0.12.8",
    "llama-index-node-parser-docling>=0.3.0", "llama-index-readers-docling>=0.3.0",
    "pypdf2>=3.0.1", "easyocr>=1.7.2"
], check=True)

# Check GPU availability
if torch.cuda.is_available():
    device = torch.device("cuda")
    print(f"CUDA GPU is enabled: {torch.cuda.get_device_name(0)}")
elif torch.backends.mps.is_available():
    device = torch.device("mps")
    print("MPS GPU is enabled.")
else:
    raise EnvironmentError("No GPU or MPS device found.")

# Import installed libraries after installation
from docling.datamodel.base_models import InputFormat
from docling_core.types.doc import ImageRefMode
from docling.document_converter import DocumentConverter, PdfFormatOption, WordFormatOption
from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline
from docling.datamodel.pipeline_options import PdfPipelineOptions
from docling.pipeline.simple_pipeline import SimplePipeline
from docling.datamodel.settings import settings

# Logging setup
logging.basicConfig(level=logging.INFO)
_log = logging.getLogger(__name__)

In [4]:
IMAGE_RESOLUTION_SCALE = 2.0

def create_pipeline_options(input_format):
    """Creates dynamic pipeline options based on the input format."""
    if input_format == InputFormat.PDF:
        return PdfFormatOption(
            pipeline_options=PdfPipelineOptions(
                do_table_structure=True,
                generate_page_images=True,
                generate_picture_images=True,
                images_scale=IMAGE_RESOLUTION_SCALE,
            )
        )
    elif input_format == InputFormat.DOCX:
        return WordFormatOption(pipeline_cls=SimplePipeline)
    return None  # Other formats not supported

def initialize_converter():
    """Initializes the document converter with multiformat support."""
    allowed_formats = [InputFormat.PDF, InputFormat.DOCX]
    format_options = {fmt: create_pipeline_options(fmt) for fmt in allowed_formats if create_pipeline_options(fmt)}
    return DocumentConverter(allowed_formats=allowed_formats, format_options=format_options)

def convert_and_save(input_paths, output_dir, image_mode=ImageRefMode.REFERENCED):
    """Converts documents to Markdown and saves the output."""
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
    doc_converter = initialize_converter()
    conv_results = doc_converter.convert_all(input_paths)

    for res in conv_results:
        file_name = res.input.file.stem
        markdown_path = output_dir / f"{file_name}.md"
        res.document.save_as_markdown(markdown_path, image_mode=image_mode)
        _log.info(f"Markdown content saved to {markdown_path}")

def extract_all_nodes_with_image_refs(md_file_path, output_dir):
    """Extracts all nodes from a markdown file, including image references."""
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
    output_path = output_dir / f"{md_file_path.stem}_nodes.json"

    try:
        with open(md_file_path, 'r', encoding='utf-8') as f:
            markdown_content = f.read()
    except (UnicodeDecodeError, FileNotFoundError):
        print(f"Error: Could not read file {md_file_path}")
        return

    all_nodes, current_text_block = [], ""
    for line in markdown_content.split('\n'):
        if '![' in line and '(' in line and ')' in line:
            parts = line.split('(')
            image_path = parts[1].split(')')[0] if len(parts) > 1 else None
            node_text = parts[0].split('[')[1].split(']')[0] if '[' in parts[0] else ""

            if current_text_block.strip():
                all_nodes.append({"index": len(all_nodes) + 1, "text": current_text_block.strip(), "image_path": None})
            all_nodes.append({"index": len(all_nodes) + 1, "text": node_text, "image_path": image_path})
            current_text_block = ""
        else:
            current_text_block += line + "\n"

    if current_text_block.strip():
        all_nodes.append({"index": len(all_nodes) + 1, "text": current_text_block.strip(), "image_path": None})

    with output_path.open("w") as fp:
        json.dump({"file_name": md_file_path.name, "nodes": all_nodes}, fp, indent=4)
    print(f"Extracted {len(all_nodes)} nodes from {md_file_path.name} to {output_path}")


In [None]:
def main():
    settings.debug.profile_pipeline_timings = True
    input_paths = [Path("_____")]
    output_dir = "______"
    convert_and_save(input_paths, output_dir)

    for md_file in Path(output_dir).glob("*.md"):
        extract_all_nodes_with_image_refs(md_file, output_dir)

if __name__ == "__main__":
    main()