In [15]:
import kfp
from kfp import dsl
from kubernetes.client.models import V1EnvVar

In [16]:
#Setting up all the necessary steps

#Setting up the volume for the pipeline
def setup_volume_op():
    return dsl.VolumeOp(
        name="Volume Creation",
        resource_name="dataset_pvc",
        modes=dsl.VOLUME_MODE_RWO,
        size="1Gi"
    )

#Downloading the necessary dataset for the training from Gcloud Bucket
def dataset_download_op(url, volume, data_path):
    op = dsl.ContainerOp(
        name='Dataset Download',
        image='google/cloud-sdk:272.0.0',
        command=['sh', '-c'],
        arguments=['gsutil cat $0 | tee $1', url, data_path],
        pvolumes={"/mnt": volume}
    )
    op.container.set_memory_limit('2G')
    op.container.set_memory_request('1G')
    op.container.set_cpu_limit('1')
    op.container.set_cpu_request('0.5')
    return op

#Training the model with the linear regression algorithm
def training_op(volume, trained_path, data_path):
    op = dsl.ContainerOp(
        name='ML Training',
        image='styl3/ai-devops:minikf-1',
        command=['sh', '-c'],
        arguments=['python3 main.py'],
        pvolumes={'/mnt': volume}
    )
    op.container.add_env_variable(V1EnvVar('TRAINED_MODEL_PATH', trained_path))
    op.container.add_env_variable(V1EnvVar('DATA_PATH', data_path))
    # /!\ Crash if not comment /!\
    # No GPU on GCP free account  
    # op.container.set_gpu_limit('2')
    op.container.set_memory_limit('4G')
    op.container.set_memory_request('2G') 
    op.container.set_cpu_limit('10')
    op.container.set_cpu_request('5')
    return op
#Check if the training was succesful, should have an output
def check_op(volume, trained_path):
    op = dsl.ContainerOp(
        name='Training Check',
        image='library/bash:4.4.23',
        command=['sh', '-c'],
        arguments=['wc $0', trained_path],
        pvolumes={"/mnt": volume}
    )
    op.container.set_memory_limit('2G')
    op.container.set_memory_request('1G')
    op.container.set_cpu_limit('1')
    op.container.set_cpu_request('0.5')
    return op

In [17]:
#Pipeline definition
@dsl.pipeline(
    name='Linear Regression',
    description='Linear Regression pipeline with two sequential steps.'
)
def sequential_pipeline(url='gs://iamachine/kc_house_data.csv', \
                        trained_path='/mnt/trained_model', \
                        data_path='/mnt/dataset.csv'):
    """Linear Regression pipeline with two sequential steps."""
    volume_task = setup_volume_op()
    dataset_task = dataset_download_op(url, volume_task.volume, data_path)
    train_task = training_op(dataset_task.pvolume, trained_path, data_path)
    check_task = check_op(train_task.pvolume, trained_path)

In [18]:
#Pipeline Yaml output
if __name__ == '__main__':
    kfp.compiler.Compiler().compile(sequential_pipeline, 'linear-regression_pipeline.yaml')