# pre-process data

In [None]:
#1st component
import numpy as np
from sklearn import datasets
from sklearn.model_selection import train_test_split

def _preprocess_data():
     X, y = datasets.load_boston(return_X_y=True)
     X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33)
     np.save('x_train.npy', X_train)
     np.save('x_test.npy', X_test)
     np.save('y_train.npy', y_train)
     np.save('y_test.npy', y_test)
     
if __name__ == '__main__':
     print('Preprocessing data...')
     _preprocess_data()

In [None]:
import kfp
from kfp import dsl

@dsl.pipeline(
   name='Boston Housing Pipeline',
   description='An example pipeline.'
)
def boston_pipeline():
    _preprocess_op = preprocess_op()

client = kfp.Client()
client.create_run_from_pipeline_func(boston_pipeline, arguments={})

In [None]:
import argparse
import joblib
import numpy as np
from sklearn.linear_model import SGDRegressor

def train_model(x_train, y_train):
    x_train_data = np.load(x_train)
    y_train_data = np.load(y_train)

    model = SGDRegressor(verbose=1)
    model.fit(x_train_data, y_train_data)
    
    joblib.dump(model, 'model.pkl')


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--x_train')
    parser.add_argument('--y_train')
    args = parser.parse_args()
    train_model(args.x_train, args.y_train)

In [None]:
FROM python:3.7-slim

WORKDIR /app

RUN pip install -U scikit-learn numpy

COPY train.py ./train.py

ENTRYPOINT [ "python", "train.py" ]

In [None]:
def train_op(x_train, y_train):

    return dsl.ContainerOp(
        name='Train Model',
        image='gnovack/boston_pipeline_train:latest',
        arguments=[
            '--x_train', x_train,
            '--y_train', y_train
        ],
        file_outputs={
            'model': '/app/model.pkl'
        }
    )

In [None]:
def test_op(x_test, y_test, model):

    return dsl.ContainerOp(
        name='Test Model',
        image='gnovack/boston_pipeline_test:latest',
        arguments=[
            '--x_test', x_test,
            '--y_test', y_test,
            '--model', model
        ],
        file_outputs={
            'mean_squared_error': '/app/output.txt'
        }
    )

In [None]:
def deploy_model_op(model):

    return dsl.ContainerOp(
        name='Deploy Model',
        image='gnovack/boston_pipeline_deploy_model:latest',
        arguments=[
            '--model', model
        ]
    )

In [None]:
@dsl.pipeline(
   name='Boston Housing Pipeline',
   description='An example pipeline that trains and logs a regression model.'
)
def boston_pipeline():
    _preprocess_op = preprocess_op()
    
    _train_op = train_op(
        dsl.InputArgumentPath(_preprocess_op.outputs['x_train']),
        dsl.InputArgumentPath(_preprocess_op.outputs['y_train'])
    ).after(_preprocess_op)

    _test_op = test_op(
        dsl.InputArgumentPath(_preprocess_op.outputs['x_test']),
        dsl.InputArgumentPath(_preprocess_op.outputs['y_test']),
        dsl.InputArgumentPath(_train_op.outputs['model'])
    ).after(_train_op)

    deploy_model_op(
        dsl.InputArgumentPath(_train_op.outputs['model'])
    ).after(_test_op)