In [None]:
from dkube.sdk import *
import os

In [None]:
existing_token = os.getenv("DKUBE_USER_ACCESS_TOKEN")
user = os.getenv('USERNAME', 'songole')
code_name = "tmdb"
merge_ds = "tmdb-merged"
clean_ds = "tmdb-cleaned"
train_fs = "tmdb-train-fs"
test_fs  = "tmdb-test-fs"

In [None]:
api = DkubeApi(token=existing_token)

In [None]:
print(f"Adding code {code_name}")
code = DkubeCode(user, name=code_name)
code.update_git_details("https://github.com/riteshkarvaloc/pipelines.git")
api.create_code(code)
print(f"Code {code_name} added")

print(f"Adding dataset {merge_ds}")
dataset = DkubeDataset(user, name=merge_ds)
dataset.update_dataset_source(source='dvs')
api.create_dataset(dataset)
print(f"Dataset {merge_ds} added")

print(f"Adding dataset {clean_ds}")
dataset = DkubeDataset(user, name=clean_ds)
dataset.update_dataset_source(source='dvs')
api.create_dataset(dataset)
print(f"Dataset {clean_ds} added")

print(f"Adding featureset {train_fs}")
featureset = DkubeFeatureSet(name=train_fs)
api.create_featureset(featureset)
print(f"Featureset {train_fs} added")

print(f"Adding featureset {test_fs}")
featureset = DkubeFeatureSet(name=test_fs)
api.create_featureset(featureset)
print(f"Featureset {test_fs} added")

In [None]:
import os, json
import kfp
import kfp.compiler as compiler
import random
import string

generate = lambda hint: "{}-{}".format(hint, ''.join([random.choice(string.digits) for n in range(4)]))

In [None]:
client = kfp.Client(existing_token=existing_token)

In [None]:
image = "docker.io/ocdr/dkube-datascience-tf-cpu:v2.0.0-3"
merge_ds_path = "/data/merge"
clean_ds_path = "/data/clean"
test_fs_path = "/data/test_fs"
train_fs_path = "/data/train_fs"
merge_script = "cd data-engineering; python merging.py"
clean_script = "cd data-engineering; python cleaning.py"
feature_script = f"cd data-engineering; python feature-engineering.py --train_fs {train_fs} --test_fs {test_fs}"

In [None]:
components_url = "/mnt/dkube/pipeline/components/"
dkube_preprocessing_op = kfp.components.load_component_from_file(components_url + "preprocess/component.yaml")

In [None]:
@kfp.dsl.pipeline(
    name='dkube-DE-pl',
    description='example titanic pipeline to submit to leaderboard'
)
def data_engineering_pipeline(token):

    merge = dkube_preprocessing_op(token, json.dumps({"image": image}),
                                   program=code_name, run_script=merge_script,
                                   outputs=json.dumps([str(merge_ds)]),
                                   output_mounts=json.dumps([merge_ds_path])).set_display_name("Merging")
    
    clean = dkube_preprocessing_op(token, json.dumps({"image": image}),
                                   program=code_name, run_script=clean_script,
                                   datasets = json.dumps([str(merge_ds)]),
                                   input_dataset_mounts = json.dumps([merge_ds_path]),
                                   outputs=json.dumps([str(clean_ds)]),
                                   output_mounts=json.dumps([clean_ds_path])).after(merge).set_display_name("cleaning")
    
    f_eng = dkube_preprocessing_op(token, json.dumps({"image": image}),
                                   program=code_name, run_script=feature_script,
                                   datasets=json.dumps([str(clean_ds)]), 
                                   output_featuresets=json.dumps([train_fs, test_fs]),
                                   input_dataset_mounts=json.dumps([clean_ds_path]), 
                                   output_featureset_mounts=json.dumps([train_fs_path, test_fs_path])
                                    ).after(clean).set_display_name("Feature-Engineering")

In [None]:
experiment_name = 'Dkube- DE pl'
de_experiment = client.create_experiment(name=experiment_name)

In [None]:
import kfp.compiler as compiler
arguments = {"token":existing_token}
compiler.Compiler().compile(data_engineering_pipeline, "de-pipeline.zip")
try:
    pipeline = client.upload_pipeline("de-pipeline.zip", pipeline_name = "DE-pipeline")
except BaseException as e:
    print(e)
runid = 1

In [24]:
run = client.run_pipeline(de_experiment.id, job_name=f"[{pipeline.name}] Run" + str(runid), pipeline_id=pipeline.id, params=arguments)
runid += 1