In [177]:
import sys
sys.path.insert(1, '/home/jovyan/.local/lib/python3.6/site-packages')
#See https://www.kubeflow.org/docs/pipelines/sdk/component-development/
import kfp
from kfp import compiler
import kfp.components as comp
import kfp.dsl as dsl
from kfp import gcp

In [178]:
EXPERIMENT_NAME='activity_classification_test'

In [179]:
#Component definition (see: https://www.kubeflow.org/docs/pipelines/sdk/build-component/)

preprocess_version='v2'
def ingestion_op(input_bucket,
                  output_bucket,
                   ):
  return dsl.ContainerOp(
    name='preprocess-op', 
    image='rio05docker/activity_classification:preprocess'+preprocess_version,
    command="python3",
    arguments=[
        "/src/preprocess.py",
        '--input-bucket', input_bucket,
        '--output-bucket', output_bucket,
    ],
    file_outputs={
        'data_file': '/src/final_df.csv',
    }
  ).apply(kfp.gcp.use_gcp_secret('user-gcp-sa'))



train_version=''
def training_op(input_bucket,
                output_bucket,
                #model_file,
                #conf_matrix,
               ):
  return dsl.ContainerOp(
    name='gpu-op', 
    image='rio05docker/activity_classification:train'+train_version,
    command="python3",
    arguments=[
        "train.py",
        '--input-bucket', input_bucket,
        '--input-bucket', output_bucket,
    ],
    #pvolumes={"/mnt": download_step.pvolume}
    file_outputs={
        'model': '/activity_classification.h5',
        'conf_matrix': '/conf_matrix.csv',
        'metrics': '/mlpipeline-metrics.json',
    }
  ).set_gpu_limit(1).apply(kfp.gcp.use_gcp_secret('user-gcp-sa'))



convert_version='v3'
def convert_op(input_model,
               output_bucket,
                   ):
  return dsl.ContainerOp(
    name='convert-op',
    image='rio05docker/activity_classification:convert'+convert_version,
    command="python3",
    arguments=[
        "convert.py",
        '--input-model', input_model,
        '--output-bucket', output_bucket,
    ],
    file_outputs={
        'model_file': '/activity_classification.tflite',
    }
  ).apply(kfp.gcp.use_gcp_secret('user-gcp-sa'))

In [180]:
#Pipeline definition
@dsl.pipeline(
  name='Kubeflow Test Pipeline',
  description='Performs preprocessing, training and deployment.'
)
def pipeline(
    input_bucket='ai-vqc', #bucket where to read data
    output_bucket='ai-vqc', #bucket where to write model
    ):

    #Pipeline component instances
    prep_op=ingestion_op(input_bucket, output_bucket)
    
    train_op=training_op(input_bucket, output_bucket).after(prep_op)
    
    conv_op=convert_op(dsl.InputArgumentPath(train_op.outputs['model']), output_bucket).after(train_op)

In [181]:
#Compile the pipeline
pipeline_func = pipeline
pipeline_filename = pipeline_func.__name__ + '.pipeline.zip'

import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, pipeline_filename) #compiles your Python domain-specific language (DSL) code into a single static configuration (in YAML format) that the Kubeflow Pipelines service can process.

In [182]:
#Create Kubeflow experiment
client = kfp.Client()
try:
    experiment = client.get_experiment(experiment_name=EXPERIMENT_NAME)
except:
    experiment = client.create_experiment(EXPERIMENT_NAME)
    
print(experiment)

{'created_at': datetime.datetime(2020, 5, 18, 14, 36, 4, tzinfo=tzlocal()),
 'description': None,
 'id': 'f0ebef9a-9b8e-45c8-adea-84dc666f9735',
 'name': 'activity_classification_test',
 'resource_references': None}


In [183]:
#Run the pipeline
arguments = {}
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, 
                                 run_name, 
                                 pipeline_filename, 
                                 arguments)
print(experiment.id)
print(run_name)
print(pipeline_filename)
print(arguments)

f0ebef9a-9b8e-45c8-adea-84dc666f9735
pipeline run
pipeline.pipeline.zip
{}
