In [1]:
!pip install -U xgboost kfp



In [2]:
import warnings
warnings.filterwarnings('ignore')

In [3]:
PROJECT_ID = !(gcloud config get-value core/project)
PROJECT_ID = PROJECT_ID[0]
print(PROJECT_ID)

GS_PATH = "gs://bankapp_gs"
print(GS_PATH)

DATA_FILE_NAME = "bank_data.csv"
print(DATA_FILE_NAME)

RF_MODEL = "rf_model.joblib"
XGB_MODEL = "xgb_model.joblib"
BEST_MODEL = "model.joblib"

print(RF_MODEL)
print(XGB_MODEL)
print(BEST_MODEL)

mlbankapp
gs://bankapp_gs
bank_data.csv
rf_model.joblib
xgb_model.joblib
model.joblib


In [4]:
%%writefile ./Dockerfile
FROM gcr.io/deeplearning-platform-release/base-cpu
RUN pip install -U kfp fire pandas scikit-learn xgboost 

Writing ./Dockerfile


In [5]:
BANK_APP_IMAGE='bank_app'
TAG='latest'
BANK_APP_IMAGE_GCR='gcr.io/{}/{}:{}'.format(PROJECT_ID, BANK_APP_IMAGE, TAG)
BANK_APP_IMAGE_GCR

'gcr.io/mlbankapp/bank_app:latest'

In [6]:
!gcloud builds submit --timeout 15m --tag $BANK_APP_IMAGE_GCR .

Creating temporary tarball archive of 3 file(s) totalling 78.3 KiB before compression.
Uploading tarball of [.] to [gs://mlbankapp_cloudbuild/source/1641392730.711963-675a143d17a8410e8a30d6595d3bf3de.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/mlbankapp/locations/global/builds/1286a182-6611-4723-9f5a-ea2f63eeab85].
Logs are available at [https://console.cloud.google.com/cloud-build/builds/1286a182-6611-4723-9f5a-ea2f63eeab85?project=248389392445].
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "1286a182-6611-4723-9f5a-ea2f63eeab85"

FETCHSOURCE
Fetching storage object: gs://mlbankapp_cloudbuild/source/1641392730.711963-675a143d17a8410e8a30d6595d3bf3de.tgz#1641392730886224
Copying gs://mlbankapp_cloudbuild/source/1641392730.711963-675a143d17a8410e8a30d6595d3bf3de.tgz#1641392730886224...
/ [1 files][ 10.8 KiB/ 10.8 KiB]                                                
Operation completed over 1 objects/10.8 KiB.
BUILD
Alread

### Import required libraries

In [7]:
import kfp
from typing import NamedTuple

### Read and Slipt data Pipeline Step

In [8]:
def read_split_data(gs_path:str, 
                    data_file_name:str)-> NamedTuple('Outputs',
                                                     [('gs_path', str), 
                                                      ('train_file_name', str),
                                                      ('test_file_name', str)]):
    import sys
    import subprocess        
    from pandas import read_csv
    from sklearn.model_selection import train_test_split

    TRAIN_FILE_NAME = "train_data.csv"
    TEST_FILE_NAME = "test_data.csv"

    DATA_FILE_GS_PATH = "{}/{}".format(gs_path, data_file_name)
    TRAIN_FILE_GS_PATH = "{}/{}".format(gs_path, TRAIN_FILE_NAME)
    TEST_FILE_GS_PATH = "{}/{}".format(gs_path, TEST_FILE_NAME)

    data = read_csv(DATA_FILE_GS_PATH)

    train, test = train_test_split(data, test_size=0.3, random_state=123)
    
    train.to_csv(TRAIN_FILE_NAME, index=False)
    test.to_csv(TEST_FILE_NAME, index=False)
        
    subprocess.check_call(['gsutil', 'cp', TRAIN_FILE_NAME, TRAIN_FILE_GS_PATH], stderr=sys.stdout)    
    subprocess.check_call(['gsutil', 'cp', TEST_FILE_NAME, TEST_FILE_GS_PATH], stderr=sys.stdout)
    
    from collections import namedtuple
    
    read_split_data_output = namedtuple('Outputs', ['gs_path', 'train_file_name', 'test_file_name'])
    
    return read_split_data_output(gs_path, TRAIN_FILE_NAME, TEST_FILE_NAME)

Just for testing read_split_data() function

In [9]:
read_split_data_op = kfp.components.create_component_from_func(
    read_split_data,
    base_image = BANK_APP_IMAGE_GCR,
    output_component_file='read_split_data_component.yaml')

In [10]:
@kfp.dsl.pipeline(name='ML BankApp', 
                  description='Kubeflow pipeline for ML BankApp.')
def bank_app_pipeline(gs_path, data_file_name):
    
    read_split_data_op(gs_path, data_file_name)
    
client = kfp.Client(host='https://646c93bca82df7a6-dot-us-central1.pipelines.googleusercontent.com')
    
# Specify argument values for your pipeline run.
arguments = {'gs_path': GS_PATH,
             'data_file_name': DATA_FILE_NAME}

# Create a pipeline run, using the client you initialized in a prior step.
client.create_run_from_pipeline_func(bank_app_pipeline, arguments=arguments)

RunPipelineResult(run_id=d479c610-a2fd-4351-8c96-6bed7e1b6e7e)

### Train XGB Model

In [11]:
def train_xgb_model(gs_path:str, 
                    train_file_name:str, 
                    xgb_model: str)->NamedTuple('Outputs',
                                                [('gs_path', str), 
                                                 ('xgb_model', str)]):

    import sys
    import subprocess
    import pandas as pd

    from joblib import dump

    from sklearn.impute import SimpleImputer
    from sklearn.preprocessing import StandardScaler, OneHotEncoder
    from sklearn.compose import ColumnTransformer 
    from sklearn.pipeline import Pipeline
    from xgboost import XGBClassifier
    
    train_file_gs_path = "{}/{}".format(gs_path, train_file_name)
    
    train = pd.read_csv(train_file_gs_path)
    
    cat_Attr_Names = ['job', 'marital', 'education', 'credit_default', 'housing', 'loan', 
                      'contact', 'contacted_month', 'day_of_week', 'poutcome', 'y']

    num_Attr_Names = list(set(train.columns) - set(cat_Attr_Names))

    train[cat_Attr_Names] = train[cat_Attr_Names].apply(lambda col: col.astype('category'))
    train[num_Attr_Names] = train[num_Attr_Names].apply(lambda col: col.astype('float64'))
    
    X_train = train.drop(columns=['y'])
    y_train = train['y']

    cat_Attr_Names = list(set(cat_Attr_Names) - set('y'))
    
    numeric_transformer = Pipeline(steps=[
        ('num_imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())])

    categorical_transformer = Pipeline(steps=[
        ('cat_imputer', SimpleImputer(strategy='most_frequent')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))])

    preprocess = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, num_Attr_Names),
            ('cat', categorical_transformer, cat_Attr_Names)])
    
    xgb_pipeline = Pipeline([('preprocess', preprocess),
                             ('xgboost', XGBClassifier())])
    
    xgb_pipeline.fit(X_train, y_train)
  
    dump(xgb_pipeline, xgb_model)
    
    xgb_model_gs_path = '{}/{}'.format(gs_path, xgb_model)
    
    subprocess.check_call(['gsutil', 'cp', xgb_model, xgb_model_gs_path], stderr=sys.stdout)
    
    from collections import namedtuple
    
    train_xgb_model_output = namedtuple('Outputs', ['gs_path', 'xgb_model'])
    
    return train_xgb_model_output(gs_path, xgb_model)    

Just for testing train_xgb_model() function

In [12]:
train_xgb_model_op = kfp.components.create_component_from_func(
    train_xgb_model,
    base_image=BANK_APP_IMAGE_GCR,
    output_component_file='train_xgb_model_component.yaml')

In [13]:
@kfp.dsl.pipeline(
   name='ML BankApp',
   description='Kubeflow pipeline for ML BankApp .'
)
def bank_app_pipeline(gs_path, data_file_name):
    
    train_test_split = read_split_data_op(gs_path, data_file_name)
        
    xgb_model = train_xgb_model_op(train_test_split.outputs['gs_path'], 
                                   train_test_split.outputs['train_file_name'],
                                   XGB_MODEL)
    
    
client = kfp.Client(host='https://646c93bca82df7a6-dot-us-central1.pipelines.googleusercontent.com') 
    
# Specify argument values for your pipeline run.
arguments = {'gs_path': GS_PATH,
             'data_file_name': DATA_FILE_NAME}

# Create a pipeline run, using the client you initialized in a prior step.
client.create_run_from_pipeline_func(bank_app_pipeline, arguments=arguments)

RunPipelineResult(run_id=ff4af724-0b33-48bd-9639-b98c80e36d25)

### Training Random Forest Model

In [14]:
def train_rf_model(gs_path:str, 
                   train_file_name:str,
                   rf_model:str)->NamedTuple('Outputs',
                                             [('gs_path', str), 
                                              ('rf_model', str)]):

    import sys
    import subprocess
    import pandas as pd

    from joblib import dump

    from sklearn.impute import SimpleImputer
    from sklearn.preprocessing import StandardScaler, OneHotEncoder
    from sklearn.compose import ColumnTransformer 
    from sklearn.pipeline import Pipeline
    from sklearn.ensemble import RandomForestClassifier
    
    train_file_gs_path = "{}/{}".format(gs_path, train_file_name)
    
    train = pd.read_csv(train_file_gs_path)
    
    cat_Attr_Names = ['job', 'marital', 'education', 'credit_default', 'housing', 'loan', 
                      'contact', 'contacted_month', 'day_of_week', 'poutcome', 'y']

    num_Attr_Names = list(set(train.columns) - set(cat_Attr_Names))

    train[cat_Attr_Names] = train[cat_Attr_Names].apply(lambda col: col.astype('category'))
    train[num_Attr_Names] = train[num_Attr_Names].apply(lambda col: col.astype('float64'))
    
    X_train = train.drop(columns=['y'])
    y_train = train['y']

    cat_Attr_Names = list(set(cat_Attr_Names) - set('y'))
    
    numeric_transformer = Pipeline(steps=[
        ('num_imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())])

    categorical_transformer = Pipeline(steps=[
        ('cat_imputer', SimpleImputer(strategy='most_frequent')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))])

    preprocess = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, num_Attr_Names),
            ('cat', categorical_transformer, cat_Attr_Names)])
    
    rf_pipeline = Pipeline([('preprocess', preprocess),
                            ('rf', RandomForestClassifier())])
    
    rf_pipeline.fit(X_train, y_train)
    
    dump(rf_pipeline, rf_model)
    
    rf_model_gs_path = '{}/{}'.format(gs_path, rf_model)
    
    subprocess.check_call(['gsutil', 'cp', rf_model, rf_model_gs_path], stderr=sys.stdout)
    
    from collections import namedtuple
    
    train_rf_model_output = namedtuple('Outputs', ['gs_path', 'rf_model'])
    
    return train_rf_model_output(gs_path, rf_model)        

Just for testing train_rf_model() function

In [15]:
train_rf_model_op = kfp.components.create_component_from_func(
    train_rf_model,
    base_image=BANK_APP_IMAGE_GCR,
    output_component_file='train_rf_model_component.yaml')

In [16]:
@kfp.dsl.pipeline(
   name='ML BankApp',
   description='Kubeflow pipeline for ML BankApp .'
)
def bank_app_pipeline(gs_path, data_file_name):
    
    train_test_split = read_split_data_op(gs_path, data_file_name)        
    xgb_model = train_xgb_model_op(train_test_split.outputs['gs_path'], 
                                   train_test_split.outputs['train_file_name'],
                                   XGB_MODEL)
    rf_model = train_rf_model_op(train_test_split.outputs['gs_path'], 
                                 train_test_split.outputs['train_file_name'],
                                 RF_MODEL)

client = kfp.Client(host='https://646c93bca82df7a6-dot-us-central1.pipelines.googleusercontent.com')    
    
# Specify argument values for your pipeline run.
arguments = {'gs_path': GS_PATH,
             'data_file_name': DATA_FILE_NAME}

# Create a pipeline run, using the client you initialized in a prior step.
client.create_run_from_pipeline_func(bank_app_pipeline, arguments=arguments)

RunPipelineResult(run_id=c0143216-2b4b-4ee1-a6b9-be0ed5617132)

### Evaluate Models

In [17]:
def evaluate_model(gs_path:str, model_joblib:str, test_file_name:str) -> float:
    
    import sys
    import subprocess
    import pandas as pd
    from joblib import load
    from sklearn.metrics import accuracy_score
        
    input_test_file_gs_path = '{}/{}'.format(gs_path, test_file_name)
    test = pd.read_csv(input_test_file_gs_path)

    cat_Attr_Names = ['job', 'marital', 'education', 'credit_default', 'housing', 'loan', 
                      'contact', 'contacted_month', 'day_of_week', 'poutcome', 'y']
    num_Attr_Names = list(set(test.columns) - set(cat_Attr_Names))

    test[cat_Attr_Names] = test[cat_Attr_Names].apply(lambda col: col.astype('category'))
    test[num_Attr_Names] = test[num_Attr_Names].apply(lambda col: col.astype('float64'))
    
    X_test = test.drop(columns=['y'])
    y_test = test['y']

    cat_Attr_Names = list(set(cat_Attr_Names) - set('y'))
    
    model_gs_path = '{}/{}'.format(gs_path, model_joblib)
    subprocess.check_call(['gsutil', 'cp', model_gs_path, model_joblib], stderr=sys.stdout)
    
    model = load(model_joblib)
    
    accuracy = model.score(X_test, y_test)
    
    return accuracy

In [18]:
evaluate_model_op = kfp.components.create_component_from_func(
    evaluate_model,
    base_image=BANK_APP_IMAGE_GCR,
    output_component_file='evaluate_model_component.yaml')

In [19]:
@kfp.dsl.pipeline(
   name='ML BankApp',
   description='Kubeflow pipeline for ML BankApp .'
)
def bank_app_pipeline(gs_path, data_file_name):
    
    train_test_split = read_split_data_op(gs_path, data_file_name)  
    
    xgb_model = train_xgb_model_op(train_test_split.outputs['gs_path'], 
                                   train_test_split.outputs['train_file_name'],
                                   XGB_MODEL)
    
    rf_model = train_rf_model_op(train_test_split.outputs['gs_path'], 
                                 train_test_split.outputs['train_file_name'], 
                                 RF_MODEL)
    
    eval_xgb_model = evaluate_model_op(xgb_model.outputs['gs_path'], 
                                       xgb_model.outputs['xgb_model'],                                       
                                       train_test_split.outputs['test_file_name'])
                                       
    eval_rf_model = evaluate_model_op(rf_model.outputs['gs_path'], 
                                      rf_model.outputs['rf_model'],                                       
                                      train_test_split.outputs['test_file_name'])
    
client = kfp.Client(host='https://646c93bca82df7a6-dot-us-central1.pipelines.googleusercontent.com')    
    
# Specify argument values for your pipeline run.
arguments = {'gs_path': GS_PATH,
             'data_file_name': DATA_FILE_NAME}

# Create a pipeline run, using the client you initialized in a prior step.
client.create_run_from_pipeline_func(bank_app_pipeline, arguments=arguments)

RunPipelineResult(run_id=654e48f0-2ea8-4863-8ee5-704c2315b1ae)

In [20]:
@kfp.dsl.pipeline(
   name='ML BankApp',
   description='Kubeflow pipeline for ML BankApp .'
)
def bank_app_pipeline(gs_path, data_file_name):
    
    train_test_split = read_split_data_op(gs_path, data_file_name)  
    
    xgb_model = train_xgb_model_op(train_test_split.outputs['gs_path'], 
                                   train_test_split.outputs['train_file_name'], 
                                   XGB_MODEL)
    
    rf_model = train_rf_model_op(train_test_split.outputs['gs_path'], 
                                 train_test_split.outputs['train_file_name'], 
                                 RF_MODEL)
    
    eval_xgb_model = evaluate_model_op(xgb_model.outputs['gs_path'], 
                                       xgb_model.outputs['xgb_model'],                                       
                                       train_test_split.outputs['test_file_name'])
                                       
    eval_rf_model = evaluate_model_op(rf_model.outputs['gs_path'], 
                                      rf_model.outputs['rf_model'],                                       
                                      train_test_split.outputs['test_file_name'])
    
    with kfp.dsl.Condition(eval_xgb_model.output >= eval_rf_model.output):
        train_xgb_model_op(gs_path, data_file_name, BEST_MODEL)
    with kfp.dsl.Condition(eval_xgb_model.output < eval_rf_model.output):
        train_rf_model_op(gs_path, data_file_name, BEST_MODEL)

client = kfp.Client(host='https://646c93bca82df7a6-dot-us-central1.pipelines.googleusercontent.com')    
    
# Specify argument values for your pipeline run.
arguments = {'gs_path': GS_PATH,
             'data_file_name': DATA_FILE_NAME}

# Create a pipeline run, using the client you initialized in a prior step.
client.create_run_from_pipeline_func(bank_app_pipeline, arguments=arguments)

RunPipelineResult(run_id=648a3949-e9b9-4afc-8905-95ce8208a239)