In [1]:
import sys

In [2]:
# !{sys.executable} -m pip install --upgrade --user kfp==1.8.22

In [3]:
import kfp
from kfp import dsl
from functools import partial
from kfp.dsl import (
    pipeline,
    ContainerOp
)
from kfp.components import (
    InputPath,
    OutputPath,
    create_component_from_func
)
client = kfp.Client()
NAMESPACE = client.get_user_namespace()
EXPERIMENT_NAME = 'llm' # Name of the experiment in the KF webapp UI
EXPERIMENT_DESC = 'llm experiment'

print(NAMESPACE)

kubeflow-kindfor


In [4]:
from dataclasses import dataclass

@dataclass
class Settings():
    llm_base_image: str = 'pytorch/pytorch:2.2.0-cuda11.8-cudnn8-devel'
    # s3_base_image: str = 'python:3.10.13-slim-bullseye'
    # use a runtime pytorch image to speed up the pip install process, since the applyllm has too many dependencies
    # TODO: to seperate applyllm-io an applyllm package
    s3_base_image: str = 'pytorch/pytorch:2.2.0-cuda11.8-cudnn8-runtime'
    applyllm_version: str = '0.0.3'
    pypdf_version: str = '3.15.5'
    accelerate_version: str = '0.26.1'
    unstructured_version: str = '0.11.0'
    sentence_transformers_version: str = '2.2.2'
    docarray_version: str = '0.39.1'
    pydantic_version: str = '1.10.13'
    boto3_version: str = '1.34.14'
    pandas_version: str = '2.2.1'
    tabula_py_version: str = '2.9.0'

settings = Settings()

In [5]:
import os
PIPELINE_PATH_DIR = "./compiled"
if not os.path.exists(PIPELINE_PATH_DIR):
    os.makedirs(PIPELINE_PATH_DIR)

## Check Tabula in s3 pdf files

In [6]:
from functools import partial

@partial(
    create_component_from_func,
    output_component_file=f"{PIPELINE_PATH_DIR}/s3_pdf_tabula_check_component.yaml",
    base_image=settings.s3_base_image, 
    packages_to_install=[
        f"applyllm=={settings.applyllm_version}",
        f"boto3=={settings.boto3_version}",
        f"pandas=={settings.pandas_version}",
        f"tabula-py=={settings.tabula_py_version}"
    ],
)
def file_comp(
        bucket_name: str,
        verify_host: bool,
        file_prefix: str,
        limit_count: int,
        output_path: OutputPath("CSV"),
    ):
    import boto3, os
    import pandas as pd
    import tabula
    from io import BytesIO
    # from pypdf import PdfReader
    from applyllm.io import (
        S3AccessConf,
        S3BucketHelper,
    )
    from applyllm.utils import time_func 
    import warnings
    warnings.filterwarnings("ignore", message="ICC profile")
    warnings.filterwarnings("ignore", message="org.apache.pdfbox")

    s3_conf = S3AccessConf(
        access_key_id = os.environ.get('AWS_ACCESS_KEY_ID'),
        secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY'),
        endpoint = os.environ.get('S3_ENDPOINT'),
        bucket_name = bucket_name,
        verify_host = verify_host,
    )
    s3_pdf_reports_helper = S3BucketHelper(conf=s3_conf, file_prefix=file_prefix)

    session = boto3.session.Session(
        aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID'),
        aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
    )
    s3 = session.resource('s3', 
                          endpoint_url = os.environ.get('S3_ENDPOINT'), 
                          verify=verify_host)
    bucket = s3.Bucket(bucket_name)

    def get_s3_pdf_bytesio(s3, bucket_name, key: str):
        obj = s3.Object(bucket_name, key)
        return obj.get()['Body'].read()
    
    def extract_tables(pdf_bytesio, page_numbers='all', stream=True):
        """
        Extract tables from a PDF and organize the data into a list of combined dictionaries.
        
        Parameters:
        - pdf_path: Path to the PDF file.
        - page_numbers: Pages to extract tables from ('all' for all pages).
        
        Returns:
        - A list of dictionaries, each representing combined data from the same column across all tables.
        """
        # Extract all tables from the specified pages of the PDF
        # tables is a list of pandas DataFrame
        df_list = tabula.read_pdf(pdf_bytesio, pages=page_numbers, multiple_tables=True, pandas_options={'header': None}, stream=stream)
        return df_list

    def contains_table(pdf_bytesio):
        df_list = extract_tables(pdf_bytesio, page_numbers='all')
        return df_list is not None and len(df_list) > 0
    
    contains_table_map = map(
        lambda x: {
            "key": str(x),
            "contains_table": contains_table(BytesIO(get_s3_pdf_bytesio(s3, bucket_name, x)))
        }, s3_pdf_reports_helper.get_object_keys(limit_count=limit_count))

    @time_func
    def get_results():
        return list(contains_table_map)
    
    contains_table_dict_list = get_results()

    key_list = []
    has_table_list = []
    for dict in contains_table_dict_list:
        key_list.append(dict['key'])
        has_table_list.append(dict['contains_table'])

    data = {'key': key_list,'contains_table': has_table_list}
    result_df = pd.DataFrame.from_dict(data)
    
    with open(output_path, "w+", encoding="utf-8") as f:
        result_df.to_csv(f, index=False, header=True, encoding="utf-8")

    

In [7]:
def pod_resource_transformer(task: ContainerOp, mem_req="200Mi", cpu_req="2000m", mem_lim="4000Mi", cpu_lim='4000m'):
    """
    this function helps to set the resource limit for container operators
    op.set_memory_limit('1000Mi') = 1GB
    op.set_cpu_limit('1000m') = 1 cpu core
    """
    return task.set_memory_request(mem_req)\
            .set_memory_limit(mem_lim)\
            .set_cpu_request(cpu_req)\
            .set_cpu_limit(cpu_lim)

In [8]:
@pipeline(
    name = EXPERIMENT_NAME,
    description = EXPERIMENT_DESC
)
def custom_pipeline(
        bucket_name: str = "scivias-medreports",
        verify_host: bool = True,
        file_prefix: str = "KK-SCIVIAS",
        limit_count: int = 3,
        s3_secrets: str="add-scivias-medreport-secret",
    ):
    '''local variable'''
    no_artifact_cache = "P0D"
    artifact_cache_today = "P1D"
    # cache_setting = artifact_cache_today
    cache_setting = no_artifact_cache
    
    '''pipeline'''   
    check_task = file_comp(
        bucket_name=bucket_name,
        verify_host=verify_host,
        file_prefix=file_prefix,
        limit_count=limit_count,
    )
    # 200 MB ram and 1 cpu
    check_task = pod_resource_transformer(check_task, mem_req="1Gi", mem_lim="5Gi", cpu_req="2000m", cpu_lim="10000m")
    # set the download caching to be 1day, disable caching with P0D
    # download_task.execution_options.caching_strategy.max_cache_staleness = artifact_cache_today
    check_task.execution_options.caching_strategy.max_cache_staleness = cache_setting
    check_task.set_display_name("check pdf tabula")
    check_task.add_pod_label(s3_secrets, "true") 

In [9]:
PIPE_LINE_FILE_NAME=f"med_report_check_tabula_pipeline"
kfp.compiler.Compiler().compile(custom_pipeline, f"{PIPE_LINE_FILE_NAME}.yaml")

In [10]:
from datetime import datetime
from pytz import timezone as ptimezone

def get_local_time_str(target_tz_str: str = "Europe/Berlin", format_str: str = "%Y-%m-%d %H-%M-%S") -> str:
    """
    this method is created since the local timezone is miss configured on the server
    @param: target timezone str default "Europe/Berlin"
    @param: "%Y-%m-%d %H-%M-%S" returns 2022-07-07 12-08-45
    """
    target_tz = ptimezone(target_tz_str) # create timezone, in python3.9 use standard lib ZoneInfo
    # utc_dt = datetime.now(datetime.timezone.utc)
    target_dt = datetime.now(target_tz)
    return datetime.strftime(target_dt, format_str)

In [11]:
# from kubernetes import client as k8s_client
pipeline_config = dsl.PipelineConf()

# pipeline_config.set_image_pull_secrets([k8s_client.V1ObjectReference(name=K8_GIT_SECRET_NAME, namespace=NAME_SPACE)])
# pipeline_config.set_image_pull_policy("Always")
pipeline_config.set_image_pull_policy("IfNotPresent")

pipeline_args = {
    "bucket_name": "scivias-medreports",
    "verify_host": True,
    "file_prefix": "KK-SCIVIAS",
    "limit_count": 3,
    "s3_secrets": "add-scivias-medreport-secret"
}

In [12]:
RUN_NAME = f"medreport check tabula pipeline {get_local_time_str()}"

# client = kfp.Client()
run = client.create_run_from_pipeline_func(
    pipeline_func=custom_pipeline,
    arguments = pipeline_args, #{}
    run_name = RUN_NAME,
    pipeline_conf=pipeline_config,
    experiment_name=EXPERIMENT_NAME,
    namespace=NAMESPACE,
)

run

RunPipelineResult(run_id=d7bca3b0-2a6e-4b60-95af-5ce6c6452d6e)