In [None]:
import sys
!{sys.executable} -m pip install kfp >/dev/null

In [None]:
import os
import json
import kfp
import kfp.dsl as dsl
import kfp.compiler as compiler
from kubernetes import client as k8s_client

In [None]:
dkube_preprocessing_op      = kfp.components.load_component_from_url("https://raw.githubusercontent.com/oneconvergence/dkube-examples/citiustech/components/preprocess/component.yaml")
dkube_training_op           = kfp.components.load_component_from_url("https://raw.githubusercontent.com/oneconvergence/dkube-examples/citiustech/components/training/component.yaml")
dkube_serving_op            = kfp.components.load_component_from_url("https://raw.githubusercontent.com/oneconvergence/dkube-examples/citiustech/components/serving/component.yaml")

In [None]:
image = "docker.io/ocdr/dkube-datascience-tf-cpu:fs-v2.0.0"
serving_image = "ocdr/sklearnserver:0.23.2"
dataset = 'insurance'
featureset = 'insurance-fs'
training_program = 'insurance'
model = 'insurance'
preprocessing_script = f"python preprocessing.py --fs {featureset}"
training_script = "python training.py"
transformer_code='insurance/transformer.py'
user = os.getenv('USERNAME')
framework = "sklearn"
f_version = "0.23.2"
output_mount_point = "/opt/dkube/output/"
input_mount_point = "/opt/dkube/input/"

In [None]:
@kfp.dsl.pipeline(
    name='dkube-insurance-pl',
    description='sample insurance pipeline with featuresets'
)
def insurance_pipeline(token):
    
    preprocessing = dkube_preprocessing_op(token, json.dumps({"image": image}),
                                           program=training_program, run_script=preprocessing_script,
                                           datasets=json.dumps([dataset]), 
                                           output_featuresets=json.dumps([str(featureset)]),
                                           input_dataset_mounts=json.dumps([input_mount_point]), 
                                           output_featureset_mounts=json.dumps([output_mount_point])
                                            )

    train       = dkube_training_op(token, json.dumps({"image": image}),
                                    framework=framework, version=f_version,
                                    program=training_program, run_script=training_script,
                                    featuresets=json.dumps([featureset]), outputs=json.dumps([model]),
                                    input_featureset_mounts=json.dumps([input_mount_point]),
                                    output_mounts=json.dumps([output_mount_point])).after(preprocessing)

    serving     = dkube_serving_op(token, train.outputs['artifact'], device='cpu', 
                                    serving_image=json.dumps({"image": serving_image}),
                                    transformer_image=json.dumps({"image": image}),
                                    transformer_project=training_program,
                                    transformer_code=transformer_code).after(train)

# Compile and generate tar ball

In [None]:
compiler.Compiler().compile(insurance_pipeline, 'dkube_insurance_fs.tar.gz')

# Upload pipeline

In [None]:
existing_token = os.getenv("DKUBE_USER_ACCESS_TOKEN")
client = kfp.Client(existing_token=existing_token)
try:
    client.upload_pipeline(pipeline_package_path = 'dkube_insurance_fs.tar.gz', pipeline_name = 'Insurance pipeline', description = None)
except BaseException as e:
    print(e)

# Create Insurance experiment

In [None]:
client.list_experiments()
# Create a new experiment
try:
    insurance_experiment = client.create_experiment(name='Dkube - Insurance')
except BaseException as e:
    print(e)

# Create a run

In [None]:
try:
    pipeline_id = client.get_pipeline_id('Insurance pipeline')
    run = client.run_pipeline(insurance_experiment.id, 'insurance_fs_pipeline', None, pipeline_id=pipeline_id,
                              params={"token":existing_token})
except BaseException as e:
    print(e)