In [None]:
!pip install --upgrade kfp[kubernetes]

In [None]:
from kfp          import kubernetes
from kfp.client   import Client
from kfp.compiler import Compiler
from kfp.dsl      import component, pipeline

In [None]:
TASK_BASE_IMAGE      = 'registry.access.redhat.com/ubi9/python-311'
TASK_IMAGE_BOTO3     = '<task_image_boto3>'
TASK_IMAGE_DOCLING   = '<task_image_docling>'
TASK_IMAGE_LANGCHAIN = '<task_image_langchain>'

In [None]:
@component(
    base_image = TASK_IMAGE_BOTO3
)
def download_json(
    s3_endpoint_url      : str,
    s3_access_key_id     : str,
    s3_secret_access_key : str,
    s3_region_name       : str,
    s3_bucket            : str,
    s3_filename          : str,
    pvc_filename         : str
):

    from boto3   import client
    from os      import makedirs
    from os.path import dirname

    makedirs(dirname(pvc_filename), exist_ok = True)

    s3_client = client(
        service_name          = 's3',
        endpoint_url          = s3_endpoint_url,
        aws_access_key_id     = s3_access_key_id,
        aws_secret_access_key = s3_secret_access_key,
        region_name           = s3_region_name
    )

    s3_client.download_file(s3_bucket, s3_filename, pvc_filename)

In [None]:
@component(
    base_image = TASK_IMAGE_DOCLING
)
def extract_content(
    pvc_filename : str
):

    from base64                     import b64decode
    from docling.document_converter import DocumentConverter
    from json                       import load
    from os.path                    import basename, dirname, join, splitext

    basename = splitext(basename(pvc_filename))[0]
    basename = join(dirname(pvc_filename), basename)

    result          = ''
    result_filename = basename + '.md'

    with open(pvc_filename) as file:

        data = load(file)

    for index0, detalhe in enumerate(data['PRODETALHEs'], start = 1):

        for index1, anexo in enumerate(detalhe['ANEXOs'], start = 1):

            if anexo['extArquivo'] != '.pdf':

                continue

            if anexo['idTipoDoc'] == 437:

                continue

            filename = basename + '_' + str(index0) + '_' + str(index1) + '_' + str(anexo['idTipoDoc']) + '.pdf'
            print(f'Processando arquivo {filename}')

            with open(filename, 'wb') as file:

                file.write(b64decode(anexo['CONTEUDO']))

            converter = DocumentConverter()
            result   += converter.convert(filename).document.export_to_markdown() + '\n\n'

    print(f'Resultado:\n\n{result}')

    with open(result_filename, 'w') as file:

        file.write(result)

In [None]:
@component(
    base_image = TASK_BASE_IMAGE
)
def generate_summary(): pass

In [None]:
@component(
    base_image = TASK_BASE_IMAGE
)
def similarity_search(): pass

In [None]:
@component(
    base_image = TASK_BASE_IMAGE
)
def generate_output(): pass

In [None]:
PIPELINE_NAME              = 'Gerar Parecer'
PIPELINE_DESCRIPTION       = 'Utilizando IA para gerar parecer jurídico de infrações ambientais'
PIPELINE_YAML              = 'pipeline.yaml'
PIPELINE_PVC_STORAGE_CLASS = '<pipeline_pvc_storage_class>'

In [None]:
@pipeline(
    name        = PIPELINE_NAME,
    description = PIPELINE_DESCRIPTION
)
def pipeline(
    s3_endpoint_url      : str,
    s3_access_key_id     : str,
    s3_secret_access_key : str,
    s3_region_name       : str,
    s3_bucket            : str,
    s3_filename          : str,
    pvc_filename         : str
):

    from os.path import join

    # PVC

    pvc_name      = 'demo-pipeline'
    pvc_directory = join('/', 'pipeline')

    # Task Download JSON

    download_json_task = download_json(
        s3_endpoint_url      = s3_endpoint_url,
        s3_access_key_id     = s3_access_key_id,
        s3_secret_access_key = s3_secret_access_key,
        s3_region_name       = s3_region_name,
        s3_bucket            = s3_bucket,
        s3_filename          = s3_filename,
        pvc_filename         = pvc_filename
    )

    kubernetes.mount_pvc(
        task       = download_json_task,
        pvc_name   = pvc_name,
        mount_path = pvc_directory
    )

    download_json_task.set_display_name('Carregando JSON')
    download_json_task.set_caching_options(False)

    # Task Extract Content

    extract_content_task = extract_content(
        pvc_filename = pvc_filename
    )

    kubernetes.mount_pvc(
        task       = extract_content_task,
        pvc_name   = pvc_name,
        mount_path = pvc_directory
    )

    extract_content_task.set_display_name('Extraindo Conteúdo')
    extract_content_task.set_caching_options(False)
    extract_content_task.after(download_json_task)

    # Task Generate Summary

    generate_summary_task = generate_summary()

    kubernetes.mount_pvc(
        task       = generate_summary_task,
        pvc_name   = pvc_name,
        mount_path = pvc_directory
    )

    generate_summary_task.set_display_name('Criando Resumo')
    generate_summary_task.set_caching_options(False)
    generate_summary_task.after(extract_content_task)

    # Task Similarity Search

    similarity_search_task = similarity_search()

    kubernetes.mount_pvc(
        task       = similarity_search_task,
        pvc_name   = pvc_name,
        mount_path = pvc_directory
    )

    similarity_search_task.set_display_name('Busca por Similaridade')
    similarity_search_task.set_caching_options(False)
    similarity_search_task.after(generate_summary_task)

    # Task Generate Output

    generate_output_task = generate_output()

    kubernetes.mount_pvc(
        task       = generate_output_task,
        pvc_name   = pvc_name,
        mount_path = pvc_directory
    )

    generate_output_task.set_display_name('Criando Parecer')
    generate_output_task.set_caching_options(False)
    generate_output_task.after(similarity_search_task)

In [None]:
Compiler().compile(pipeline, PIPELINE_YAML)

In [None]:
KUBEFLOW_HOST = '<kubeflow_host>'

PIPELINE_ARGUMENTS = {
    's3_endpoint_url'      : '<s3_endpoint_url>',
    's3_access_key_id'     : '<s3_access_key_id>',
    's3_secret_access_key' : '<s3_secret_access_key>',
    's3_region_name'       : '<s3_region_name>',
    's3_bucket'            : '<s3_bucket>',
    's3_filename'          : '<s3_filename>',
    'pvc_filename'         : '<pvc_filename>',
}

In [None]:
Client(host = KUBEFLOW_HOST).create_run_from_pipeline_package(
    pipeline_file = PIPELINE_YAML,
    arguments     = PIPELINE_ARGUMENTS
)