In [None]:
!pip install --upgrade pip
!pip install boto3 pandas
!pip install docstring-parser==0.7.3 kfp-pipeline-spec==0.6.0 kfp-server-api==2.1.0 kubernetes==8.0.0 protobuf==4.21.1 requests-toolbelt==0.8.0
!pip install llama-stack
!pip install sentence-transformers
!pip install llama-stack-client==0.1.9
!pip install huggingface_hub==0.14.1
!pip install pymupdf
!pip install numpy
!pip install pdfplumber

## Fetch from minIO

In [None]:
from kfp.dsl import component, pipeline
from kfp.v2 import compiler
from kfp import Client
from kfp.v2 import compiler
from kfp.dsl import pipeline

@component(
    base_image="python:3.10",
    packages_to_install=[
        "boto3",
        "langchain-community",
        "pdfminer.six",
        "pymupdf",
        "pypdf",
        "tqdm",
        "sentence-transformers",
        "huggingface-hub",
        "llama-stack-client==0.1.9",
        "numpy",
        "pdfplumber"
    ])
def fetch_from_minio_store_pgvector(bucket_name: str, minio_endpoint: str, minio_access_key: str, minio_secret_key: str, llamastack_base_url: str):
    import os
    import boto3
    import tempfile
    import numpy as np
    import pdfplumber
    from llama_stack_client import LlamaStackClient, RAGDocument

    temp_dir = tempfile.mkdtemp()
    download_dir = os.path.join(temp_dir, "source_repo")
    os.makedirs(download_dir, exist_ok=True)

    s3 = boto3.client(
        "s3",
        endpoint_url=minio_endpoint,
        aws_access_key_id=minio_access_key,
        aws_secret_access_key=minio_secret_key
    )

    paginator = s3.get_paginator("list_objects_v2")
    pages = paginator.paginate(Bucket=bucket_name)

    for page in pages:
        for obj in page.get("Contents", []):
            key = obj["Key"]
            file_path = os.path.join(download_dir, os.path.basename(key))
            s3.download_file(bucket_name, key, file_path)

    rag_documents = []
    rng = np.random.default_rng()
    for filename in os.listdir(download_dir):
        if not filename.endswith(".pdf"):
            continue
        full_path = os.path.join(download_dir, filename)
        full_text = ""
        with pdfplumber.open(full_path) as pdf:
            for page in pdf.pages:
                page_text = page.extract_text()
                if page_text:
                    full_text += page_text
        full_text = full_text.encode("utf-8", "ignore").decode("utf-8").replace("\x00", "")
        if not full_text.strip():
            continue
        rag_documents.append(
            RAGDocument(
                document_id=f"pdf-{rng.integers(1000, 9999)}",
                content=full_text,
                mime_type="application/pdf",
                metadata={"source": "rag-pipeline", "filename": filename}
            )
        )

    client = LlamaStackClient(base_url=llamastack_base_url)
    print("Registering db")
    client.vector_dbs.register(
        vector_db_id="test",
        embedding_model="all-MiniLM-L6-v2",
        embedding_dimension=384,
        provider_id="pgvector",
    )

    try:
        client.tool_runtime.rag_tool.insert(
            documents=rag_documents,
            vector_db_id="test",
            chunk_size_in_tokens=1000,
        )
    except Exception as e:
        print("Embedding insert failed:", e)


@pipeline(name="fetch-and-store-pipeline")
def full_pipeline():
    import os
    
    fetch_from_minio_store_pgvector(
        bucket_name="llama", 
        minio_endpoint=os.environ["MINIO_ENDPOINT"], 
        minio_access_key=os.environ["MINIO_ACCESS_KEY"], 
        minio_secret_key=os.environ["MINIO_SECRET_KEY"],
        llamastack_base_url=os.environ["LLAMASTACK_BASE_URL"]
    )

# 1. Compile pipeline to a file
pipeline_yaml = "fetch_chunk_embed_pipeline.yaml"
compiler.Compiler().compile(
    pipeline_func=full_pipeline,
    package_path=pipeline_yaml
)

# 2. Connect to KFP
client = Client(
    host="https://ds-pipeline-dspa.llama-stack-rag-2.svc.cluster.local:8888",
    namespace="llama-stack-rag-2",
    verify_ssl=False
)

# 3. Upload pipeline
uploaded_pipeline = client.upload_pipeline(
    pipeline_package_path=pipeline_yaml,
    pipeline_name="fetch-and-store-pipeline"
)

# 4. Run the pipeline
run = client.create_run_from_pipeline_package(
    pipeline_file=pipeline_yaml,
    arguments={},
    run_name="fetch-store-run"
)

print(f"Pipeline submitted! Run ID: {run.run_id}")


Pipeline submitted! Run ID: 751e0d06-cd93-4e71-99e0-1ea3d04cda09
