In [1]:
import kfp
import kfp.components as comp
from kfp import dsl
from kfp import onprem
from kubernetes import client as k8s_client
import os
import time
import requests
from dotenv import load_dotenv
from random import randrange
from datetime import datetime
import yaml
import pprint
import logging

# Load environment values
load_dotenv()
# Create logger
logger = logging.getLogger()

In [2]:
def create_persistent_volume_func(pvol_name="keti-shared-volume"):
    vop = kfp.dsl.VolumeOp(
        name = pvol_name,
        resource_name = pvol_name,
        #volume_name = pvol_name,
        size = '10Gi',
        #modes = kfp.dsl.VOLUME_MODE_RWM,
        #generate_unique_name = False
    ).set_display_name("[0] Import Creating persistent volume")
    return vop

def data_selection_func(pvc_args, load_data_args):#prev_cont, load_data_args):
    data_selection_cont = dsl.ContainerOp(
                    name="data_selection",
                    image="ghcr.io/keti-dp/openess-public:keti.select_dataset-0.2", 
                    arguments=[
                        '--selected_file_name', load_data_args["selected_file_name"],
                        '--type', load_data_args["ess_type"],
                        '--start_date',load_data_args["start_date"] ,
                        '--end_date', load_data_args["end_date"] ,
                        '--Bank', load_data_args["Bank"],
                        '--Rack', load_data_args["Rack"], 
                        '--Bank_num', load_data_args["Bank_num"],
                        '--Rack_num', load_data_args["Rack_num"],
                        '--Bank_columns', load_data_args["Bank_columns"], 
                        '--Rack_columns', load_data_args["Rack_columns"],
                        '--save_data_path', pvc_args.get("volume_mount_path")
                    ],
                    command=['python', 'load_data.py'],
                    #pvolumes={"/data": prev_cont.volume} # volume container
                ).set_display_name("[1] Load ESS battery data") \
                .apply(onprem.mount_pvc(pvc_args.get("pvc_name"), 
                                        volume_name=pvc_args.get("volume_name"), 
                                        volume_mount_path=pvc_args.get("volume_mount_path")))
    return data_selection_cont

def split_train_test_func(prev_cont, pvc_args, split_train_test_args, label_column):
    split_train_test_cont = dsl.ContainerOp( # train, test 데이터 분리
                        name="split train test data",
                        image="ghcr.io/keti-dp/openess-public:keti.split_train_and_test_from_dataset-0.2",
                        arguments=[
                            '--load_file_name', split_train_test_args.get("load_file_name"),
                            '--load_data_path', pvc_args.get("volume_mount_path"),#dsl.InputArgumentPath(data_selection_cont.outputs['data']),
                            '--save_data_path', pvc_args.get("volume_mount_path"),
                            '--split_method', split_train_test_args,
                            '--label_column', label_column
                        ],
                        command=['python', 'split_data.py'],
                        #pvolumes={"/data": prev_cont.pvolume}
                    ).set_display_name("[2] Split raw data to train them").after(prev_cont) \
                .apply(onprem.mount_pvc(pvc_args.get("pvc_name"), 
                                        volume_name=pvc_args.get("volume_name"), 
                                        volume_mount_path=pvc_args.get("volume_mount_path")))
    return split_train_test_cont

class PreProcessing:
    def anomaly_func(prev_cont, pvc_args, anomaly_detection_args):
        anomaly_cont = dsl.ContainerOp(
                                name="preprocessing-anomaly",
                                image="ghcr.io/keti-dp/openess-public:keti.preproc_anomaly_detection-0.1",
                                arguments=[
                                    '--split_X_train', pvc_args.get("volume_mount_path")+'/X_train.csv',
                                    '--split_Y_train', pvc_args.get("volume_mount_path")+'/Y_train.csv',
                                    '--split_X_test', pvc_args.get("volume_mount_path")+'/X_test.csv',
                                    '--split_Y_test', pvc_args.get("volume_mount_path")+'/Y_test.csv',
                                    '--anomaly_args', anomaly_detection_args,
                                    '--save_data_path', pvc_args.get("volume_mount_path") #'/data'
                                ],
                                command=['python', 'preprocessing_anomaly_detection.py'],
                                #pvolumes={"/data": prev_cont.pvolume}
                            ).set_display_name("[3-1] Preprocessing : Anomaly detection").after(prev_cont) \
                            .apply(onprem.mount_pvc(pvc_args.get("pvc_name"), 
                                                    volume_name=pvc_args.get("volume_name"), 
                                                    volume_mount_path=pvc_args.get("volume_mount_path")))
        return anomaly_cont

    def scaler_func(prev_cont, pvc_args, scaler_method, scaler_args):
        scaler_cont = dsl.ContainerOp(
                                name="preprocessing-scaler",
                                image="ghcr.io/keti-dp/openess-public:keti.preproc_scaler_models-0.1",
                                arguments=[
                                    '--split_X_train', pvc_args.get("volume_mount_path")+'/X_train.csv',
                                    '--split_X_test', pvc_args.get("volume_mount_path")+'/X_test.csv',
                                    '--prep_method', scaler_method,
                                    '--prep_args', scaler_args,
                                    '--save_data_path', pvc_args.get("volume_mount_path") #'/data'
                                ],
                                command=['python', 'preprocessing_scaler.py'],
                                #pvolumes={"/data": prev_cont.pvolume}
                            ).set_display_name("[3-2] Preprocessing : Scale Up & Down").after(prev_cont) \
                            .apply(onprem.mount_pvc(pvc_args.get("pvc_name"), 
                                                    volume_name=pvc_args.get("volume_name"), 
                                                    volume_mount_path=pvc_args.get("volume_mount_path")))
        return scaler_cont

class MachineLearning:
    def regression_func(prev_cont, pvc_args, reg_method, reg_args):
        regression_cont = dsl.ContainerOp(
                                name="ml_model_regresstion",
                                image="ghcr.io/keti-dp/openess-public:keti.ai_regression_models-0.2",
                                arguments=[
                                    '--X_train', pvc_args.get("volume_mount_path")+'/X_train.csv',
                                    '--Y_train', pvc_args.get("volume_mount_path")+'/Y_train.csv',
                                    '--X_test', pvc_args.get("volume_mount_path")+'/X_test.csv',
                                    '--Y_test', pvc_args.get("volume_mount_path")+'/Y_test.csv',
                                    '--regression_method', reg_method,
                                    '--regression_args',reg_args,
                                    '--loss_function', 'mse',
                                    '--save_data_path', pvc_args.get("volume_mount_path")#+'/result.csv'
                                ],
                                command=['python', 'ml_regresstion.py'],
                                #pvolumes={"/data": prev_cont.pvolume}
                            ).set_display_name("[4] MachineLearning : Regression method").after(prev_cont) \
                            .apply(onprem.mount_pvc(pvc_args.get("pvc_name"), 
                                                    volume_name=pvc_args.get("volume_name"), 
                                                    volume_mount_path=pvc_args.get("volume_mount_path")))
        #.set_gpu_limit(1)
        #ml_regression_cont.add_node_selector_constraint('nvidia')
        return regression_cont

    def classification_func(prev_cont, pvc_args, cls_method, cls_args):
        classification_cont = dsl.ContainerOp(
                                name="ml_model_classification",
                                image="ghcr.io/keti-dp/openess-public:keti.ai_classification_models-0.3",
                                arguments=[
                                    '--X_train', pvc_args.get("volume_mount_path")+'/X_train.csv',
                                    '--Y_train', pvc_args.get("volume_mount_path")+'/Y_train.csv',
                                    '--X_test', pvc_args.get("volume_mount_path")+'/X_test.csv',
                                    '--Y_test', pvc_args.get("volume_mount_path")+'/Y_test.csv',
                                    '--classification_method', cls_method,
                                    '--classification_args', cls_args,
                                    '--save_data_path', pvc_args.get("volume_mount_path")#+'/result.csv'
                                ],
                                command=['python', 'ml_classification.py'],
                                #pvolumes={"/data": prev_cont.pvolume}
                            ).set_display_name("[4] MachineLearning : Classification method").after(prev_cont) \
                            .apply(onprem.mount_pvc(pvc_args.get("pvc_name"), 
                                                    volume_name=pvc_args.get("volume_name"), 
                                                    volume_mount_path=pvc_args.get("volume_mount_path")))
        return classification_cont

In [3]:
############################## SAMPLE PIPELINES ############################## 
@kfp.dsl.pipeline(
    name = 'Sample Pipeline 1',
    description = 'sample pipeline case 1 (Regression model)'
)
# Sample pipeline case #1
def sample_pipeline_1():
    target_pl = kf_params_dict.get('target_pl')
    pvc_args = target_pl.get("pvc_args")
    #vop = create_persistent_volume_func(pvol_name=target_pl.get('persist_volume'))
    data_selection_cont = data_selection_func(pvc_args=pvc_args,
                                              load_data_args=target_pl.get('load_data_args'))
    split_train_test_cont = split_train_test_func(prev_cont=data_selection_cont,
                                                  pvc_args=pvc_args,
                                                  split_train_test_args=target_pl.get('split_train_test_args'), 
                                                  label_column=target_pl.get('label_column'))
    anomaly_cont = PreProcessing.anomaly_func(prev_cont=split_train_test_cont,
                                              pvc_args=pvc_args,
                                              anomaly_detection_args=target_pl.get('anomaly_detection_args'))
    scaler_cont = PreProcessing.scaler_func(prev_cont=anomaly_cont,
                                            pvc_args=pvc_args,
                                            scaler_method=target_pl.get('scaler_method'), 
                                            scaler_args=kf_params_dict.get(target_pl.get('scaler_args')))
    regression_cont = MachineLearning.regression_func(prev_cont=scaler_cont, 
                                                      pvc_args=pvc_args, 
                                                      reg_method=target_pl.get('reg_method'), 
                                                      reg_args=kf_params_dict.get(target_pl.get('reg_args')))
    # No caching parts
    data_selection_cont.execution_options.caching_strategy.max_cache_staleness = 'P0D'
    split_train_test_cont.execution_options.caching_strategy.max_cache_staleness = 'P0D'
    anomaly_cont.execution_options.caching_strategy.max_cache_staleness = 'P0D'
    scaler_cont.execution_options.caching_strategy.max_cache_staleness = 'P0D'
    regression_cont.execution_options.caching_strategy.max_cache_staleness = 'P0D'


@kfp.dsl.pipeline(
    name = 'Sample Pipeline 2',
    description = 'sample pipeline case 2 (Classification model)'
)
# Sample pipeline case #2
def sample_pipeline_2():
    target_pl = kf_params_dict.get('target_pl')
    pvc_args = target_pl.get("pvc_args")
    #vop = create_persistent_volume_func(pvol_name=target_pl.get('persist_volume'))
    data_selection_cont = data_selection_func(pvc_args=pvc_args,
                                              load_data_args=target_pl.get('load_data_args'))
    split_train_test_cont = split_train_test_func(prev_cont=data_selection_cont,
                                                  pvc_args=pvc_args,
                                                  split_train_test_args=target_pl.get('split_train_test_args'), 
                                                  label_column=target_pl.get('label_column'))
    anomaly_cont = PreProcessing.anomaly_func(prev_cont=split_train_test_cont,
                                              pvc_args=pvc_args,
                                              anomaly_detection_args=target_pl.get('anomaly_detection_args'))
    scaler_cont = PreProcessing.scaler_func(prev_cont=anomaly_cont,
                                            pvc_args=pvc_args,
                                            scaler_method=target_pl.get('scaler_method'), 
                                            scaler_args=kf_params_dict.get(target_pl.get('scaler_args')))
    classification_cont = MachineLearning.classification_func(prev_cont=scaler_cont,
                                                              pvc_args=pvc_args,
                                                              cls_method=target_pl.get('cls_method'), 
                                                              cls_args=kf_params_dict.get(target_pl.get('cls_args')))

    # No caching parts
    data_selection_cont.execution_options.caching_strategy.max_cache_staleness = 'P0D'
    split_train_test_cont.execution_options.caching_strategy.max_cache_staleness = 'P0D'
    anomaly_cont.execution_options.caching_strategy.max_cache_staleness = 'P0D'
    scaler_cont.execution_options.caching_strategy.max_cache_staleness = 'P0D'
    classification_cont.execution_options.caching_strategy.max_cache_staleness = 'P0D'

In [4]:
if __name__ == "__main__":

    # Load My KubeFlow Client info
    # File path : /home/keti_iisrc/ESS_Github/ESS/.env
    USERNAME = os.environ.get("USERNAME")
    PASSWORD = os.environ.get("PASSWORD")
    NAMESPACE = os.environ.get("NAMESPACE")
    HOST = os.environ.get("HOST") # istio-ingressgateway's Node Port IP:PORT
    
    session = requests.Session()
    response = session.get(HOST)
    headers = {
        "Content-Type": "application/x-www-form-urlencoded",
    }
    user_data = {"login": USERNAME, "password": PASSWORD}
    session.post(response.url, headers=headers, data=user_data)
    session_cookie = session.cookies.get_dict()["authservice_session"]
    
    # Connect My KubeFlow Client
    client = kfp.Client(host=f"{HOST}/pipeline",
                        namespace=f"{NAMESPACE}",
                        cookies=f"authservice_session={session_cookie}",
    )

    # User's KF pileline file written by YAML
    kf_params_file_dir = "./kf_params_v2.yaml"
    try:
        with open(kf_params_file_dir) as f:
            kf_params_dict = yaml.load(f, Loader=yaml.FullLoader) #yaml.safe_load(f)
            logger.info("Load KubeFlow Pipeline values from {}", kf_params_file_dir.split("./")[-1])
            # pprint.pprint(kf_params_v1)
    except FileNotFoundError as e:
            logger.error("Failed to load KubeFlow Pipeline values from {0}", kf_params_file_dir.split("./")[-1])
            raise e

    # (example) pipeline selection
    # load_data + split_train + anomaly_detection + reg(xgboost) + result
    pipeline_func_name =  kf_params_dict.get('user_pipeline_func')
    pipeline_func = eval(pipeline_func_name)
    logger.info("User pipeline fuction name : " , pipeline_func_name)

    now = datetime.now()
    dt_string = now.strftime("%d/%m/%Y_%H:%M:%S")
    print("[KF_pipeline_log] Start time : ", dt_string)

    #run_name = pipeline_func.__name__ + '-run-' + dt_string
    #run_name = 'ML_Regression-'+pipeline_func.__name__ + '-run-' + dt_string
    run_name = 'ML_Classification-'+pipeline_func.__name__ + '-run-' + dt_string
    

    # Compile this pipeline : pipeline_func
    import kfp.compiler as compiler
    compiler.Compiler().compile(pipeline_func, 'Onprem_pipeline_test_v5_cwkim_test_making_pipeline.yaml')
    
    # Submit pipeline directly from pipeline function
    #arguments = {}
    run_result = client.create_run_from_pipeline_func(pipeline_func,
                                                      run_name=run_name,
                                                      namespace=NAMESPACE,
                                                      arguments={})

[KF_pipeline_log] Start time :  01/06/2023_04:18:29




In [5]:
run_result

RunPipelineResult(run_id=7a51f592-2477-4d6e-bb2c-f3933d4a9854)