In [1]:
import requests
import sys, os
import kfp
from kfp.components import load_component_from_file
from kfp.dsl import ExitHandler, RUN_ID_PLACEHOLDER
from kubernetes.client.models import V1Volume
from kfp import dsl
import argparse

sys.path.append("..")
from utils.kfp_utils import *
from config.config import *

In [2]:
# !pip install kfp==1.8.22

# Pipeline

In [3]:
# load components
data_downloading_op = load_component_from_file(
    f"{COMPONENTS_DIR}/data_downloading/component.yaml"
)
data_preprocessing_op = load_component_from_file(
    f"{COMPONENTS_DIR}/data_preprocessing/component.yaml"
)
model_training_op = load_component_from_file(
    f"{COMPONENTS_DIR}/model_training/component.yaml"
)
model_evaluation_op = load_component_from_file(
    f"{COMPONENTS_DIR}/model_evaluation/component.yaml"
)
feature_store_op = load_component_from_file(
    f"{COMPONENTS_DIR}/feature_store/component.yaml"
)


# Define a pipeline and create a task from above components:
@dsl.pipeline(name=PIPELINE_NAME, description=PIPELINE_DESCRIPTION)
def pipeline(url: str):
    # data_downloading_task = data_downloading_op(url=url)
    # data_preprocessing_task = data_preprocessing_op(
    #     input_df=data_downloading_task.outputs["data"]
    # )
    # model_training_task = model_training_op(df=data_preprocessing_task.outputs["df"])
    # model_evalution_task = model_evaluation_op(
    #     df=data_preprocessing_task.outputs["df"],
    #     matrix=model_training_task.outputs["matrix"],
    #     movie2idx=model_training_task.outputs["movie2idx"],
    # )
    feature_store_task = feature_store_op(url=url)

In [4]:
pipeline_args = {}
pipeline_args["url"] = DATA_URL.format(DRIVER_ORDERS_CSV)

kf = KubeFlow(
    namespace=NAMESPACE,
    host=HOST,
    username=KF_USERNAME,
    password=KF_PASSWORD,
)
client = kf.client

pipeline_package_path = f"./pipeline_{PIPELINE_VERSION}.yaml"

# pipeline config
pipeline_conf = kfp.dsl.PipelineConf()

env = {
    "ENVIRONMENT": {
        "type": "string",
        "value": "staging",
    },
}
_pipeline_conf = PipelineConfig(env)
pipeline_conf.add_op_transformer(_pipeline_conf.pipeline_env_variables)
pipeline_conf.set_parallelism(NUM_PODS_PARALLEL)
# check whether exp existed or not
kfp_experiment = kf.get_or_create_experiment(EXPERIMENT_NAME)
run_result = client.create_run_from_pipeline_func(
    experiment_name=kfp_experiment.name,
    pipeline_func=pipeline,
    arguments=pipeline_args,
    pipeline_conf=pipeline_conf,
)
print(run_result)

RunPipelineResult(run_id=836ee250-f52c-4a29-9db4-4a18f502158d)
