In [None]:
#!python -m pip install --user --upgrade pip
#!pip install kfp

In [1]:
import kfp
from kfp import dsl

In [2]:
def preprocess_op():

    return dsl.ContainerOp(
        name='Preprocess Data',
        image='kenechi/preprocess-component:v.0.1',
        arguments=[],
        file_outputs={
            'clean_data': '/data_preprocess/clean_data'
        }
    )

In [7]:
def train_op(clean_data):

    return dsl.ContainerOp(
        name='Train Model',
        image='kenechi/model-training:v.0.4',
        arguments=[
            '--clean_data',clean_data
        ],
        file_outputs={
            'x_test': '/model_training/x_test.npy',
            'y_test': '/model_training/y_test.npy',
            'model': '/model_training/model.pkl'
        }
    )

In [10]:
def test_op(x_test,y_test,model):

    return dsl.ContainerOp(
        name='Test Model',
        image='kenechi/model-testing:v.0.3',
        arguments=[
            '--x_test',x_test,
            '--y_test',y_test,
            '--model',model,
        ],
        file_outputs={
            'results': '/model_testing/results.txt'
        }
    )

In [11]:
@dsl.pipeline(
   name=' Road Safety Pipeline',
   description='Road Safety Reuseable Pipeline.'
)
def road_safety_useable_pipeline():
    _preprocess_op = preprocess_op()
    
    _train_op = train_op(
        dsl.InputArgumentPath(_preprocess_op.outputs['clean_data'])
    ).after(_preprocess_op)

    _test_op = test_op(
        dsl.InputArgumentPath( _train_op.outputs['x_test']),
        dsl.InputArgumentPath( _train_op.outputs['y_test']),
        dsl.InputArgumentPath(_train_op.outputs['model'])
    ).after(_train_op)


In [12]:
client = kfp.Client()
client.create_run_from_pipeline_func(road_safety_useable_pipeline, arguments={})

RunPipelineResult(run_id=f0a7ba8d-f89e-4989-96b9-1dbbe4e4b19a)