In [None]:
!pip install --upgrade pip
!pip install boto3
!pip install kfp
!pip install llama-stack-client==0.2.17
!pip install fire
!pip install requests
!pip install docling
!pip install docling-core

## Ingestion Pipeline
This pipeline executes the following:
- fetches pdf documents from Minio
- chunks and embeds them with docling
- stores the data in Postgres Vector database

In [None]:
from kfp.dsl import component, pipeline
from kfp.v2 import compiler
from kfp import Client
from kfp.v2 import compiler
from kfp.dsl import pipeline

@component(
    base_image="python:3.10",
    packages_to_install=[
        "boto3",
        "llama-stack-client==0.2.9",
        "fire",
        "requests",
        "docling",
        "docling-core"
    ])
def fetch_from_minio_docling_process_store(bucket_name: str, minio_endpoint: str, minio_access_key: str, minio_secret_key: str, llamastack_base_url: str):
    import os
    import boto3
    import tempfile
    from llama_stack_client import LlamaStackClient
    from llama_stack_client.types import Document as LlamaStackDocument

    # Import docling libraries
    from docling.document_converter import DocumentConverter, PdfFormatOption
    from docling.datamodel.base_models import InputFormat
    from docling.datamodel.pipeline_options import PdfPipelineOptions
    from docling_core.transforms.chunker.hybrid_chunker import HybridChunker
    from docling_core.types.doc.labels import DocItemLabel

    # Step 1: Download files from MinIO
    temp_dir = tempfile.mkdtemp()
    download_dir = os.path.join(temp_dir, "source_repo")
    os.makedirs(download_dir, exist_ok=True)

    # Connect to MinIO
    print(f"Connecting to MinIO at {minio_endpoint}")
    s3 = boto3.client(
        "s3",
        endpoint_url=minio_endpoint,
        aws_access_key_id=minio_access_key,
        aws_secret_access_key=minio_secret_key,
        verify=False
    )

    # List and download objects
    paginator = s3.get_paginator("list_objects_v2")
    pages = paginator.paginate(Bucket=bucket_name)

    print(f"Downloading files from bucket: {bucket_name}")
    downloaded_files = []
    for page in pages:
        for obj in page.get("Contents", []):
            key = obj["Key"]
            file_path = os.path.join(download_dir, os.path.basename(key))
            print(f"Downloading: {key} -> {file_path}")
            s3.download_file(bucket_name, key, file_path)
            downloaded_files.append(file_path)

    print(f"Downloaded {len(downloaded_files)} files to {download_dir}")

    # Step 2: Process the PDFs with docling
    # Setup docling components
    pipeline_options = PdfPipelineOptions()
    pipeline_options.generate_picture_images = True
    converter = DocumentConverter(
                format_options={
                    InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)
                }
    )
    chunker = HybridChunker()
    llama_documents = []
    i = 0

    # Process each file with docling (chunking)
    for file_path in downloaded_files:
        if not file_path.endswith(".pdf"):
            print(f"Skipping non-PDF file: {file_path}")
            continue

        print(f"Processing {file_path} with docling...")
        try:
            docling_doc = converter.convert(source=file_path).document
            chunks = chunker.chunk(docling_doc)
            chunk_count = 0

            for chunk in chunks:
                if any(
                    c.label in [DocItemLabel.TEXT, DocItemLabel.PARAGRAPH]
                    for c in chunk.meta.doc_items
                ):
                    i += 1
                    chunk_count += 1
                    llama_documents.append(
                        LlamaStackDocument(
                            document_id=f"doc-{i}",
                            content=chunk.text,
                            mime_type="text/plain",
                            metadata={"source": os.path.basename(file_path)},
                        )
                    )

            print(f"Created {chunk_count} chunks from {file_path}")

        except Exception as e:
            error_message = str(e)
            print(f"Error processing {file_path}: {error_message}")

    total_chunks = len(llama_documents)
    print(f"Total valid chunks prepared: {total_chunks}")

    # Step 3: Register vector database and store chunks with embeddings
    client = LlamaStackClient(base_url=llamastack_base_url)
    print("Registering db")
    try:
        client.vector_dbs.register(
            vector_db_id="test",
            embedding_model="all-MiniLM-L6-v2",
            embedding_dimension=384,
            provider_id="pgvector",
        )
        print("Vector DB registered successfully")
    except Exception as e:
        error_message = str(e)
        print(f"Failed to register vector DB: {error_message}")
        print("Continuing with insertion...")

    try:
        print(f"Inserting {total_chunks} chunks into vector database")
        client.tool_runtime.rag_tool.insert(
            documents=llama_documents,
            vector_db_id="test",
            chunk_size_in_tokens=512,
        )
        print("Documents successfully inserted into the vector DB")
    except Exception as e:
        print("Embedding insert failed:", e)


@pipeline(name="fetch-docling-process-store-pipeline")
def full_pipeline():
    import os

    fetch_from_minio_docling_process_store(
        bucket_name="llama",
        minio_endpoint=os.environ["MINIO_ENDPOINT"],
        minio_access_key=os.environ["MINIO_ACCESS_KEY"],
        minio_secret_key=os.environ["MINIO_SECRET_KEY"],
        llamastack_base_url=os.environ["LLAMASTACK_BASE_URL"]
    )

# 1. Compile pipeline to a file
pipeline_yaml = "fetch_docling_process_pipeline.yaml"
compiler.Compiler().compile(
    pipeline_func=full_pipeline,
    package_path=pipeline_yaml
)

# 2. Connect to KFP
client = Client(
    host="https://ds-pipeline-dspa:8888",
    verify_ssl=False
)

# 3. Upload pipeline
uploaded_pipeline = client.upload_pipeline(
    pipeline_package_path=pipeline_yaml,
    pipeline_name="fetch-docling-process-store-pipeline"
)

# 4. Run the pipeline
run = client.create_run_from_pipeline_package(
    pipeline_file=pipeline_yaml,
    arguments={},
    run_name="fetch-docling-process-store-run"
)

print(f"Pipeline submitted! Run ID: {run.run_id}")


Pipeline submitted! Run ID: 751e0d06-cd93-4e71-99e0-1ea3d04cda09
