 ## Import the necessary components

In [70]:
import kfp
import os
import requests

from kfp.dsl import Input, Model, component, Dataset, Output
from kfp.dsl import InputPath, OutputPath, pipeline, component, PipelineTask

## Ingest your files
Create a component that downloads the files and then saves it at a specified path.

In [71]:
@component(
    base_image='python:3.9',
    packages_to_install=["gitpython"],
)
def download_files_from_github(output_dir: Output[Dataset]):
    import git
    import shutil
    from pathlib import Path

    repo_url = "https://github.com/ravikiranvs/kf-demo.git"
    target_subdir = "examples/rag-ingest/files"

    # Clone the repo to /tmp
    repo_path = "/tmp/kf-demo"
    if Path(repo_path).exists():
        shutil.rmtree(repo_path)
    git.Repo.clone_from(repo_url, repo_path)

    # Copy the desired files to the output directory
    source_dir = Path(repo_path) / target_subdir
    dest_dir = Path(output_dir.path)
    dest_dir.mkdir(parents=True, exist_ok=True)

    for file_path in source_dir.iterdir():
        if file_path.is_file():
            shutil.copy(file_path, dest_dir / file_path.name)

## Chunking

In [72]:
@component(
    base_image='downloads.unstructured.io/unstructured-io/unstructured:latest',
)
def process_pdfs_with_unstructured(input_dir: Input[Dataset], output_structured_data: Output[Dataset]):
    import os
    import json
    from unstructured.partition.pdf import partition_pdf

    input_path = input_dir.path
    output_path = output_structured_data.path
    os.makedirs(output_path, exist_ok=True)

    for filename in os.listdir(input_path):
        if filename.lower().endswith('.pdf'):
            file_path = os.path.join(input_path, filename)
            elements = partition_pdf(filename=file_path)
            output_file = os.path.join(output_path, f"{os.path.splitext(filename)[0]}.json")
            with open(output_file, 'w') as f:
                json.dump([element.to_dict() for element in elements], f)


## Create Embeddings

In [73]:
@component(
    base_image='huggingface/transformers-pytorch-gpu:latest',
    packages_to_install=["sentence-transformers"],
)
def generate_embeddings(
    input_structured_data: Input[Dataset],
    output_embeddings: Output[Dataset]
):
    import os
    import json
    from pathlib import Path
    from sentence_transformers import SentenceTransformer

    # Initialize the embedding model
    model = SentenceTransformer("BAAI/llm-embedder")

    input_path = Path(input_structured_data.path)
    output_path = Path(output_embeddings.path)
    output_path.mkdir(parents=True, exist_ok=True)

    for file in input_path.glob("*.json"):
        with open(file, 'r') as f:
            elements = json.load(f)

        # Extract text from elements
        texts = [el.get("text", "") for el in elements if el.get("text")]

        # Generate embeddings
        embeddings = model.encode(texts)

        # Prepare data with metadata
        data = []
        for idx, (text, embedding) in enumerate(zip(texts, embeddings)):
            data.append({
                "chunk_id": idx,
                "text": text,
                "embedding": embedding.tolist(),
                "source_file": file.name
            })

        # Save embeddings to a JSON file
        output_file = output_path / f"{file.stem}_embeddings.json"
        with open(output_file, 'w') as f:
            json.dump(data, f, indent=2)


## Insert to Milvus

In [74]:
@component(
    base_image='python:3.9',
    packages_to_install=["pymilvus"],
)
def ingest_embeddings_to_milvus(
    input_embeddings: Input[Dataset],
    milvus_host: str = 'standalone-milvus.milvus.svc.cluster.local',
    milvus_port: int = 19530,
    collection_name: str = 'rag_embeddings'
):
    from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
    import os
    import json
    from pathlib import Path

    # Connect to Milvus
    connections.connect(alias="default", host=milvus_host, port=milvus_port)
    print(f"Connected to Milvus at {milvus_host}:{milvus_port}")

    # Check if collection exists
    if utility.has_collection(collection_name):
        print(f"Collection '{collection_name}' exists. Dropping it.")
        collection = Collection(name=collection_name)
        collection.drop()
        print(f"Collection '{collection_name}' dropped.")

    # Define schema
    fields = [
        FieldSchema(name="chunk_id", dtype=DataType.INT64, is_primary=True, auto_id=False),
        FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
        FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768),
        FieldSchema(name="source_file", dtype=DataType.VARCHAR, max_length=65535),
    ]
    schema = CollectionSchema(fields=fields, description="RAG Embeddings Collection")

    # Create collection
    collection = Collection(name=collection_name, schema=schema)
    print(f"Collection '{collection_name}' created.")

    # Insert data
    input_path = Path(input_embeddings.path)
    for file in input_path.glob("*_embeddings.json"):
        with open(file, 'r') as f:
            data = json.load(f)

        if not data:
            print(f"No data found in {file.name}. Skipping.")
            continue

        chunk_ids = [item["chunk_id"] for item in data]
        texts = [item["text"] for item in data]
        embeddings = [item["embedding"] for item in data]
        source_files = [item["source_file"] for item in data]

        entities = [chunk_ids, texts, embeddings, source_files]
        collection.insert(entities)
        print(f"Inserted {len(chunk_ids)} records from {file.name} into '{collection_name}'.")

    # Flush the collection to ensure data is persisted
    collection.flush()
    print(f"Collection '{collection_name}' flushed.")

    # Create index on the 'embedding' field
    index_params = {
        "index_type": "IVF_FLAT",
        "metric_type": "L2",
        "params": {"nlist": 128}
    }
    collection.create_index(field_name="embedding", index_params=index_params)
    print(f"Index created on 'embedding' field of collection '{collection_name}'.")

    # Load collection into memory
    collection.load()
    print(f"Collection '{collection_name}' is loaded and ready for querying.")


## Create a pipeline
Create a pipeline that combines all the components you defined in the previous sections.

In [75]:
def add_gpu_request(task: PipelineTask) -> PipelineTask:
    """Add a request field for a GPU to the container created by the PipelineTask object."""
    return task.add_node_selector_constraint(accelerator="nvidia.com/gpu").set_accelerator_limit(
        limit=1
    )

@pipeline(name='rag-ingest-pipeline')
def rag_ingest_pipeline():
    step1 = download_files_from_github()
    step2 = process_pdfs_with_unstructured(input_dir=step1.output)
    step3 = add_gpu_request(generate_embeddings(input_structured_data=step2.output))
    step4 = ingest_embeddings_to_milvus(input_embeddings=step3.output)

## Execute the pipeline

In [76]:
client = kfp.Client()

kfp.compiler.Compiler().compile(rag_ingest_pipeline, './pipelines-yaml/rag_ingest_pipeline.yaml')

run = client.create_run_from_pipeline_func(rag_ingest_pipeline, arguments={}, enable_caching=False)