### **irpf.ipynb** ###
### **Pipeline for converting IRPF pdf to xml** ###

* ##### 01 - Install packages
* ##### 02 - Import packages
* ##### 03 - Create tasks
* ##### 04 - Create pipeline
* ##### 05 - Create pipeline yaml
* ##### 06 - Create pipeline run

### 01 - Install packages

In [None]:
!pip install --upgrade kfp[kubernetes]

### 02 - Import packages

In [None]:
import os
import sys
sys.path.append(os.path.dirname(os.getcwd()))

import kfp
import kfp.kubernetes as kubernetes

from components.download_document import download_document
from components.extract_text      import extract_text
from components.index_document    import index_document
from components.llm               import llm
from components.remove_watermark  import remove_watermark
from components.upload_document   import upload_document

### 03 - Create tasks

In [None]:
image_boto3     = '<image_boto3>'
image_fitz      = '<image_fitz>'
image_llm       = '<image_llm>'
image_milvus    = '<image_milvus>'
image_tesseract = '<image_tesseract>'

In [None]:
download_document_op = kfp.dsl.component(
    func       = download_document,
    base_image = image_boto3
)

In [None]:
remove_watermark_op = kfp.dsl.component(
    func       = remove_watermark,
    base_image = image_fitz
)

In [None]:
extract_text_op = kfp.dsl.component(
    func       = extract_text,
    base_image = image_tesseract
)

In [None]:
index_document_op = kfp.dsl.component(
    func       = index_document,
    base_image = image_milvus
)

In [None]:
llm_op = kfp.dsl.component(
    func       = llm,
    base_image = image_llm
)

In [None]:
upload_document_op = kfp.dsl.component(
    func       = upload_document,
    base_image = image_boto3
)

### 04 - Create pipeline

In [None]:
pipeline_name        = 'irpf'
pipeline_description = 'Convert IRPF .pdf to .xml'

In [None]:
@kfp.dsl.pipeline(
    name        = pipeline_name,
    description = pipeline_description
)
def pipeline(
    s3_service_name      : str,
    s3_endpoint_url      : str,
    s3_access_key_id     : str,
    s3_secret_access_key : str,
    s3_region            : str,
    s3_bucket            : str,
    s3_filename          : str,
    milvus_uri           : str,
    milvus_username      : str,
    milvus_password      : str,
    milvus_collection    : str,
    inference_server     : str,
    model_name           : str,
    remove_watermark     : bool,
    storage_class_name   : str
):

    # Imports

    import os

    # Create PVC task

    create_pvc_task = kubernetes.CreatePVC(
        pvc_name_suffix    = '-pipeline-pvc',
        size               = '1Gi',
        access_modes       = ['ReadWriteOnce'],
        storage_class_name = storage_class_name
    )

    pvc_directory = os.path.join('/', 'pipeline', pipeline_name)
    pvc_name      = create_pvc_task.outputs['name']

    # Download document task

    download_document_task = download_document_op(
        s3_service_name      = s3_service_name,
        s3_endpoint_url      = s3_endpoint_url,
        s3_access_key_id     = s3_access_key_id,
        s3_secret_access_key = s3_secret_access_key,
        s3_region            = s3_region,
        s3_bucket            = s3_bucket,
        s3_filename          = s3_filename,
        pvc_directory        = pvc_directory
    )

    kubernetes.mount_pvc(
        task       = download_document_task,
        pvc_name   = pvc_name,
        mount_path = pvc_directory
    )

    download_document_task.set_caching_options(False)
    download_document_task.after(create_pvc_task)

    # Remove watermark task

    remove_watermark_task = remove_watermark_op(
        pvc_directory    = pvc_directory,
        pvc_filename     = s3_filename,
        remove_watermark = remove_watermark
    )

    kubernetes.mount_pvc(
        task       = remove_watermark_task,
        pvc_name   = pvc_name,
        mount_path = pvc_directory
    )

    remove_watermark_task.set_caching_options(False)
    remove_watermark_task.after(download_document_task)

    # Extract text task

    extract_text_task = extract_text_op(
        pvc_directory = pvc_directory,
        pvc_filename  = s3_filename
    )

    kubernetes.mount_pvc(
        task       = extract_text_task,
        pvc_name   = pvc_name,
        mount_path = pvc_directory
    )

    extract_text_task.set_caching_options(False)
    extract_text_task.after(remove_watermark_task)

    # Index document task

    index_document_task = index_document_op(
        milvus_uri        = milvus_uri,
        milvus_username   = milvus_username,
        milvus_password   = milvus_password,
        milvus_collection = milvus_collection,
        pvc_directory     = pvc_directory,
        pvc_filename      = s3_filename
    )

    kubernetes.mount_pvc(
        task       = index_document_task,
        pvc_name   = pvc_name,
        mount_path = pvc_directory
    )

    index_document_task.set_caching_options(False)
    index_document_task.after(extract_text_task)

    # LLM task

    llm_task = llm_op(
        milvus_uri        = milvus_uri,
        milvus_username   = milvus_username,
        milvus_password   = milvus_password,
        milvus_collection = milvus_collection,
        inference_server  = inference_server,
        model_name        = model_name,
        pvc_directory     = pvc_directory,
        pvc_filename      = s3_filename
    )

    kubernetes.mount_pvc(
        task       = llm_task,
        pvc_name   = pvc_name,
        mount_path = pvc_directory
    )

    llm_task.set_caching_options(False)
    llm_task.after(index_document_task)

    # Upload document task

    upload_document_task = upload_document_op(
        s3_service_name      = s3_service_name,
        s3_endpoint_url      = s3_endpoint_url,
        s3_access_key_id     = s3_access_key_id,
        s3_secret_access_key = s3_secret_access_key,
        s3_region            = s3_region,
        s3_bucket            = s3_bucket,
        pvc_directory        = pvc_directory,
        pvc_filename         = s3_filename
    )

    kubernetes.mount_pvc(
        task       = upload_document_task,
        pvc_name   = pvc_name,
        mount_path = pvc_directory
    )

    upload_document_task.set_caching_options(False)
    upload_document_task.after(llm_task)

    # Delete PVC task

    delete_pvc_task = kubernetes.DeletePVC(pvc_name = pvc_name)
    delete_pvc_task.after(upload_document_task)

### 05 - Create pipeline yaml

In [None]:
pipeline_package_path = os.path.join('yaml', f'{ pipeline_name }.yaml')

In [None]:
kfp.compiler.Compiler().compile(
    pipeline_func = pipeline,
    package_path  = pipeline_package_path
)

### 06 - Create pipeline run

In [None]:
kubeflow_host = '<kubeflow_host>'

In [None]:
pipeline_arguments = {
    's3_service_name'      : 's3',
    's3_endpoint_url'      : '<s3_endpoint_url>',
    's3_access_key_id'     : '<s3_access_key_id>',
    's3_secret_access_key' : '<s3_secret_access_key>',
    's3_region'            : '<s3_region>',
    's3_bucket'            : '<s3_bucket>',
    's3_filename'          : '<s3_filename>',
    'milvus_uri'           : '<milvus_uri>',
    'milvus_username'      : '<milvus_username>',
    'milvus_password'      : '<milvus_password>',
    'milvus_collection'    : '<milvus_collection>',
    'inference_server'     : '<inference_server>',
    'model_name'           : '<model_name>',
    'remove_watermark'     : True,
    'storage_class_name'   : '<storage_class_name>'
}

In [None]:
kfp.client.Client(host = kubeflow_host).create_run_from_pipeline_package(
    pipeline_file = pipeline_package_path,
    arguments     = pipeline_arguments
)