In [None]:
# Move data from external source into storage, preprocess, train

#process:    
#data : in external source (remote), download data into dkube   
#dkube_preprocess_op : data in external source (remote), download data into dkube and do some preprocessing
#dkube_storage_op : store the downloaded data in dkube storage (mount) as pv volume
#dkube_training_op: take preprocess data and train

In [None]:
import kfp
import json
from dkube.sdk import *
from dkube.sdk import DkubeApi

In [None]:
components_url = "/mnt/dkube/pipeline/components/"
storage_op = kfp.components.load_component_from_file("/mnt/dkube/pipeline/components/storage/component.yaml")
dkube_preprocessing_op = kfp.components.load_component_from_file(components_url + "preprocess/component.yaml")
token = os.getenv("DKUBE_USER_ACCESS_TOKEN")

In [None]:
image = "ocdr/d3-datascience-sklearn:v0.23.2-1"
code_name = "external-data"
dataset="heart-data"
ptrain_dataset = "heart-data"
train_fs_name = "heart-fs-train"
dataset_mount_points = ["/opt/dkube/input"]
featureset_mount_points = ["/featureset/train"]
preprocessing_script = f"python external_data/preprocessing.py --train_fs {train_fs_name}"
model_name = "heart-model"
training_script = "python external_data/training.py"
train_out_mount_points = ["/model"]


In [None]:
api = DkubeApi(token=token)
api.create_featureset(DkubeFeatureSet(train_fs_name))

In [None]:
@kfp.dsl.pipeline(
    name='external_data',
    description='utilise data from external and train'
)
def externaldata_pipeline(token,code_name,dataset):
     with kfp.dsl.ExitHandler(exit_op=storage_op("reclaim", token, namespace="kubeflow", uid="{{workflow.uid}}")):
            
            preprocessing = dkube_preprocessing_op(token, json.dumps({"image": image}),
                                            program = code_name,run_script=preprocessing_script,
                                            datasets=json.dumps([ptrain_dataset]), 
                                            output_featuresets=json.dumps([train_fs_name]),
                                            input_dataset_mounts=json.dumps(dataset_mount_points), 
                                            output_featureset_mounts=json.dumps(featureset_mount_points))
            
            input_volumes = json.dumps(["{{workflow.uid}}-featureset@featureset://" + train_fs_name])
            storage = storage_op("export",token, namespace="kubeflow", input_volumes=input_volumes,
                                 output_volumes=json.dumps(["{{workflow.uid}}-featureset@featureset://"+train_fs_name])).after(preprocessing)
            
            
            train = dkube_training_op(token, json.dumps({"image": image}),
                                    framework="sklearn", version="0.23.2",
                                    program=code_name, run_script=training_script,
                                    featuresets=json.dumps([train_fs_name]), outputs=json.dumps([model_name]),
                                    input_featureset_mounts=json.dumps(output_volumes),
                                    output_mounts=json.dumps(train_out_mount_points)).after(storage)

In [None]:
client = kfp.Client(existing_token=token)
client.create_run_from_pipeline_func(externaldata_pipeline,arguments={"token":token,"code_name":code_name,"dataset":dataset})

