# build kubeflow pipeline with python SDK


* Leverage python SDK 
* Create components and pipelines (YAML)

## Required installation

In [1]:
!pip install kfp --upgrade



## Import

In [4]:
import kfp
import kfp.components as comp

## 1 Data Acquisition: Download data

Download data file from web. 
Use kfp component developed by kubeflow

In [8]:
web_downloader_op = kfp.components.load_component_from_url(
    'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/contrib/web/Download/component-sdk-v2.yaml')

## load component from yaml
download_file_component = kfp.components.load_component_from_file("./component-sdk-v2.yaml")



### Pipeline: Download a single CSV

In [13]:
# Inputs
url_for_data_download = 'https://www.openml.org/data/get_csv/3594/dataset_113_primary-tumor.arff'

# Define a pipeline and create a task from a component:
def pl01_web_download(url):
    web_downloader_task = download_file_component(url)

    
kfp.compiler.Compiler().compile(
    pipeline_func=pl01_web_download,
    package_path='pl01_web_download.yaml')

## 1 Data Acquisition: Merge CSV

If tarball with multiple CSV file is downloaded, merge them together

### Component

In [14]:
def merge_csv(file_path: comp.InputPath('Tarball'),
              output_csv: comp.OutputPath('CSV')):
    """Merge multiple CSV files inside tarball.
    
    Args:
        file_path: A string containing path to the tarball.
                    e.g., 'https://storage.googleapis.com/ml-pipeline-playground/iris-csv-files.tar.gz'
    """
    import glob
    import pandas as pd
    import tarfile

    tarfile.open(name=file_path, mode="r|gz").extractall('data')
    df = pd.concat(
        [pd.read_csv(csv_file, header=None) 
            for csv_file in glob.glob('data/*.csv')])
    df.to_csv(output_csv, index=False, header=False)
    
    
create_step_merge_csv = kfp.components.create_component_from_func(
    func=merge_csv,
    output_component_file='component_merge_csv.yaml', # save the component spec for future use.
    base_image='python:3.7',
    packages_to_install=['pandas==1.1.4'])

### Pipeline: Download tarball of multiple CSV and merge

In [15]:
# Define a pipeline and create a task from a component:
def pl01a_web_download_merge_csv_tarball(url):
    """Pipeline: Download and merge multiple CSV files inside tarball.
        1. download tarball
        2. unpack and merge multiple CSVs (assuming same header)
    
    Args:
        file_path: A string containing path to tarball.
                    e.g., 'https://storage.googleapis.com/ml-pipeline-playground/iris-csv-files.tar.gz'
    """
    web_downloader_task = download_file_component(url)
    merge_csv_task = create_step_merge_csv(file=web_downloader_task.outputs['data'])
    # The outputs of the merge_csv_task can be referenced using the
    # merge_csv_task.outputs dictionary: merge_csv_task.outputs['output_csv']
    
kfp.compiler.Compiler().compile(
    pipeline_func=pl01a_web_download_merge_csv_tarball,
    package_path='pl01a_web_download_merge_csv_tarball.yaml')

## 2 Data processing

### Component: get dummies

In [16]:
def get_dummies(file_path: comp.InputPath('CSV'),
                output_csv_features: comp.OutputPath('CSV'),
                output_csv_target: comp.OutputPath('CSV')):
    """Distribute categorical features into separate features.
        Input: CSV with categorical (and numeric) features. Assume last 
            feature is target label. 
        Output: CSV with categorical features separated into dummies.
    
    Args:
        file_path: A string containing path to input data.
        output_csv: A string containing path to processed data.
    """
    import glob
    import pandas as pd
    
    df = pd.read_csv(filepath_or_buffer=file_path)
    l_col_names = list(df.columns)
    
    # assume last col is target
    df_target = df[l_col_names[-1]]
    
    # create dummies for every col except last
    df_features = df[l_col_names[:len(l_col_names)-1]]
    df_features_dummies = pd.get_dummies(df_features)
    
    # write outputs
    df_features_dummies.to_csv(output_csv_features, index = False, header = True)
    df_target.to_csv(output_csv_target, index = False, header = True)

create_step_dp_get_dummies = kfp.components.create_component_from_func(
    func=get_dummies,
    output_component_file='component_dp_get_dummies.yaml', # save the component spec for future use.
    base_image='python:3.7',
    packages_to_install=['pandas==1.1.4'])

### Pipeline 02a

1. download file
2. data processing: a) get dummies


In [17]:
def pl02a_web_download_dp_dummies(url):
    """Pipeline: Download data, data processing
        1. download data
        2. data processing
            a. get dummies
    
    Args:
        file_path: A string containing path to the CSV.
                    e.g., 'https://www.openml.org/data/get_csv/3594/dataset_113_primary-tumor.arff'
    """
    web_downloader_task = download_file_component(url=url_for_data_download)
    dp_get_dummies_task = create_step_dp_get_dummies(file=web_downloader_task.outputs['data'])

    
kfp.compiler.Compiler().compile(
    pipeline_func=pl02a_web_download_dp_dummies,
    package_path='pl02a_web_download_dp_dummies.yaml')

### Component: Imputation

In [18]:
def impute_unknown(file_path: comp.InputPath('CSV'),
                output_csv: comp.OutputPath('CSV')):
    """Impute unknown values (nan).
        Input: CSV.
        Output: CSV.
    
    Args:
        file_path: A string containing path to input data.
        output_csv: A string containing path to processed data.
    """
    import pandas as pd
    
    # Read in CSV
    df = pd.read_csv(filepath_or_buffer=file_path)
    
    # Output to CSV
    df.to_csv(output_csv, index = False, header = True)

create_step_dp_impute_unknown = kfp.components.create_component_from_func(
    func=impute_unknown,
    output_component_file='component_dp_impute_unknown.yaml', # save the component spec for future use.
    base_image='python:3.7',
    packages_to_install=['pandas==1.1.4'])

### Component: Scaler


In [20]:
def scale_df(file_path: comp.InputPath('CSV'),
                output_csv: comp.OutputPath('CSV')):
    """Impute unknown values (nan).
        Input: CSV.
        Output: CSV.
    
    Args:
        file_path: A string containing path to input data.
        output_csv: A string containing path to processed data.
    """
    import pandas as pd
    
    # Read in CSV
    df = pd.read_csv(filepath_or_buffer=file_path)
    
    # Output to CSV
    df.to_csv(output_csv, index = False, header = True)

create_step_dp_scale_df = kfp.components.create_component_from_func(
    func=scale_df,
    output_component_file='component_dp_scale_df.yaml', # save the component spec for future use.
    base_image='python:3.7',
    packages_to_install=['pandas==1.1.4'])

### Pipeline 02b

1. download file
2. data processing: 
    
    - a) get dummies
    - b) impute
    - c) scale

In [21]:
def pl02b_web_download_dp_dummies_impute_scale(url):
    """Pipeline: Download data, data processing
        1. download data
        2. data processing
            a. get dummies
    
    Args:
        file_path: A string containing path to the CSV.
                    e.g., 'https://www.openml.org/data/get_csv/3594/dataset_113_primary-tumor.arff'
    """
    web_downloader_task = download_file_component(url=url_for_data_download)
    dp_get_dummies_task = create_step_dp_get_dummies(file=web_downloader_task.outputs['data'])
    dp_impute_task = create_step_dp_impute_unknown(file=dp_get_dummies_task.outputs['output_csv_features'])
    dp_scale_task = create_step_dp_scale_df(file=dp_impute_task.outputs['output_csv'])
    
kfp.compiler.Compiler().compile(
    pipeline_func=pl02b_web_download_dp_dummies_impute_scale,
    package_path='pl02b_web_download_dp_dummies_impute_scale.yaml')

### Component: create model inputs - feature matrix, target, feature list

In [22]:
def create_model_inputs(file_path_features: comp.InputPath('CSV'),
                        file_path_target: comp.InputPath('CSV'),
                            output_json: comp.OutputPath('JSON')):
    """Impute unknown values (nan).
        Input: CSV.
        Output: CSV.
    
    Args:
        file_path: A string containing path to input data.        
    """
    
    import json 
    from json import JSONEncoder
    import numpy as np
    import pandas as pd
    from sklearn import preprocessing
    from sklearn.model_selection import train_test_split

    
    class NumpyArrayEncoder(JSONEncoder):
        def default(self, obj):
            if isinstance(obj, np.ndarray):
                return obj.tolist()
            return JSONEncoder.default(self, obj)
    
    
    # Read in feature df and make X
    
    df_feature_matrix = pd.read_csv(filepath_or_buffer=file_path_features)
    df_X = df_feature_matrix
    X = np.array(df_X)
    feature_names = np.array(df_feature_matrix.columns)
    
    
    # Read in target df and make y
    df_target = pd.read_csv(filepath_or_buffer=file_path_target) # one-column df
    df_y = df_target # one-column df
    y = np.array(df_y)
    

    # If needed, binarize one-vs-all
    ## https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.LabelBinarizer.html
    lb = preprocessing.LabelBinarizer()
    lb.fit(y)
    target_class_names = lb.classes_
    y_binarized = lb.transform(y) # can be multi-dimensional array if there are more than 2 classes
    
    
    # Train and test split - with labels for y
    X_train, X_test, y_train_labels, y_test_labels = train_test_split(
                                            X, y, test_size=0.25, random_state=42) 
    
    # Train and test split - one-hot encoded y
    X_train, X_test, y_train_one_hot, y_test_one_hot = train_test_split(
                                            X, y_binarized, test_size=0.25, random_state=42) 
    
    ## output: JSON
    d_output = {"X": np.array(X),
                "y": y,
                "X_train": X_train,
                "X_test": X_test,
                "y_train_labels": y_train_labels,
                "y_test_labels": y_test_labels,
                "y_train_one_hot": y_train_one_hot,
                "y_test_one_hot": y_test_one_hot,
                "X_feature_names": feature_names,
                "y_target_class_names": target_class_names}

    json_string_output = json.dumps(d_output, cls=NumpyArrayEncoder)
    
    ### write json file
    with open(output_json, 'w') as outfile:
        outfile.write(json_string_output)

create_step_create_model_inputs = kfp.components.create_component_from_func(
    func=create_model_inputs,
    output_component_file='component_dp_create_model_inputs.yaml', # save the component spec for future use.
    base_image='python:3.7',
    packages_to_install=['pandas==1.1.4',
                         'scikit-learn==1.0.2'])

### Pipeline 02c

1. download file
2. data processing: 
    
    - a) get dummies
    - b) impute
    - c) scale
    
    - Then, create model inputs


In [23]:
def pl02c_web_download_dp(url):
    """Pipeline: Download data, data processing
        1. download data
        2. data processing
            a. get dummies
            b. imputation
            c. scaling
            d. create model inputs
    
    Args:
        file_path: A string containing path to the CSV.
                    e.g., 'https://www.openml.org/data/get_csv/3594/dataset_113_primary-tumor.arff'
    """
    web_downloader_task = download_file_component(url=url_for_data_download)
    dp_get_dummies_task = create_step_dp_get_dummies(file=web_downloader_task.outputs['data'])
    dp_impute_task = create_step_dp_impute_unknown(file=dp_get_dummies_task.outputs['output_csv_features'])
    dp_scale_task = create_step_dp_scale_df(file=dp_impute_task.outputs['output_csv'])
    dp_create_model_inputs_task = create_step_create_model_inputs(file_path_features=dp_scale_task.outputs['output_csv'],
                                                                  file_path_target=dp_get_dummies_task.outputs['output_csv_target'])
    

kfp.compiler.Compiler().compile(
    pipeline_func=pl02c_web_download_dp,
    package_path='pl02c_web_download_dp.yaml')

## 3 Feature Selection

### Component: feature selection

In [24]:
def feature_selection(file_path: comp.InputPath('JSON'),
                            output_json: comp.OutputPath('JSON')):
    """Impute unknown values (nan).
        Input: CSV.
        Output: CSV.
    
    Args:
        file_path: A string containing path to input data.        
    """
    
    import json 
    from json import JSONEncoder
    import numpy as np
    import pandas as pd

    
    class NumpyArrayEncoder(JSONEncoder):
        def default(self, obj):
            if isinstance(obj, np.ndarray):
                return obj.tolist()
            return JSONEncoder.default(self, obj)
    
    # read in JSON
    with open(file_path, "r") as read_file:
        json_input_data = json.load(read_file)
    
    X = np.array(json_input_data['X'])
    y = np.array(json_input_data['y'])
    X_feature_names = json_input_data['X_feature_names']
    y_target_class_names = json_input_data['y_target_class_names']
            
        
    # output: JSON
    d_output = json_input_data

    json_string_output = json.dumps(d_output, cls=NumpyArrayEncoder)
    
    ## write json file
    with open(output_json, 'w') as outfile:
        outfile.write(json_string_output)

create_step_feature_selection = kfp.components.create_component_from_func(
    func=feature_selection,
    output_component_file='component_feature_selection.yaml', # save the component spec for future use.
    base_image='python:3.7',
    packages_to_install=['pandas==1.1.4',
                        'scikit-learn==1.0.2'])

### Pipeline 03

1. download file
2. data processing: 
    
    - a) get dummies
    - b) impute
    - c) scale
    - d) create JSON object for inputs
3. feature selection


In [25]:
def pl03_webdl_dp_fs(url):
    """Pipeline: Download data, data processing
        1. download data
        2. data processing
            a. get dummies
            b. imputation
            c. scaling
            d. create model inputs
        3. feature selection
    
    Args:
        file_path: A string containing path to the CSV.
                    e.g., 'https://www.openml.org/data/get_csv/3594/dataset_113_primary-tumor.arff'
    """
    web_downloader_task = download_file_component(url=url_for_data_download)
    dp_get_dummies_task = create_step_dp_get_dummies(file=web_downloader_task.outputs['data'])
    dp_impute_task = create_step_dp_impute_unknown(file=dp_get_dummies_task.outputs['output_csv_features'])
    dp_scale_task = create_step_dp_scale_df(file=dp_impute_task.outputs['output_csv'])
    dp_create_model_inputs_task = create_step_create_model_inputs(file_path_features=dp_scale_task.outputs['output_csv'],
                                                                  file_path_target=dp_get_dummies_task.outputs['output_csv_target'])
    dp_feature_selection_task = create_step_feature_selection(file=dp_create_model_inputs_task.outputs['output_json'])
    
kfp.compiler.Compiler().compile(
    pipeline_func=pl03_webdl_dp_fs,
    package_path='pl03_webdl_dp_fs.yaml')

## 4 Classification

### Component: decision tree model

* source: https://towardsdatascience.com/kubeflow-pipelines-how-to-build-your-first-kubeflow-pipeline-from-scratch-2424227f7e5


In [26]:
# decision tree model: https://towardsdatascience.com/kubeflow-pipelines-how-to-build-your-first-kubeflow-pipeline-from-scratch-2424227f7e5


def decision_tree(file_path: comp.InputPath('JSON'),
                            output_json: comp.OutputPath('JSON')):
    """Impute unknown values (nan).
        Input: CSV.
        Output: CSV.
    
    Args:
        file_path: A string containing path to input data.        
    """
    
    import json 
    from json import JSONEncoder
    import numpy as np
    import pandas as pd
    
    from sklearn.metrics import accuracy_score
    from sklearn.metrics import confusion_matrix
    from sklearn.metrics import roc_auc_score
    from sklearn.tree import DecisionTreeClassifier

    
    class NumpyArrayEncoder(JSONEncoder):
        def default(self, obj):
            if isinstance(obj, np.ndarray):
                return obj.tolist()
            return JSONEncoder.default(self, obj)
    
    # read in JSON
    with open(file_path, "r") as read_file:
        json_input_data = json.load(read_file)
    
    X_train = np.array(json_input_data['X_train']) # JSON has list of lists - cast to np array
    y_train_labels = np.array(json_input_data['y_train_labels'])
    y_train_one_hot = np.array(json_input_data['y_train_one_hot'])
    X_test = np.array(json_input_data['X_test'])
    y_test_labels = np.array(json_input_data['y_test_labels'])
    y_test_one_hot = np.array(json_input_data['y_test_one_hot'])
    X_feature_names = json_input_data['X_feature_names']
    y_target_class_names = json_input_data['y_target_class_names']
            

    # Initialize and train the model - labels
    model_labels = DecisionTreeClassifier(max_depth=3)
    model_labels.fit(X_train, y_train_labels)        

    # Initialize and train the model - one-hot
    model_one_hot = DecisionTreeClassifier(max_depth=3)
    model_one_hot.fit(X_train, y_train_one_hot)   
    
    # Get predictions - labels (use for confusion matrix computation)
    y_pred_labels = model_labels.predict(X_test)
    
    # Get predictions - one-hot (use for AUC computation)
    y_pred_one_hot = model_one_hot.predict(X_test)
    
    # Get accuracy - use labels
    accuracy = accuracy_score(y_test_labels, y_pred_labels)
    
    # Get accuracy - use one-hot

    # Confusion matrix - use labels
    cm = confusion_matrix(y_test_labels, y_pred_labels)
    
    
    # AUC score - use one-hot
    sum_y_test_axis_0 = np.sum(y_test_one_hot, axis=0)
    col_idx_y_test_nnz = np.where(sum_y_test_axis_0 > 0)[0]
    y_test_classes_more_than_one_label = y_test_one_hot[:, col_idx_y_test_nnz]
    y_pred_classes_more_than_one_label = y_pred_one_hot[:, col_idx_y_test_nnz]
    auc_test = roc_auc_score(y_test_classes_more_than_one_label, y_pred_classes_more_than_one_label, multi_class='ovr') # AUC score true y_test vs predicted


    # output: JSON
    d_output = json_input_data
    ## add in model results
    d_output['model_results'] = {}
#    d_output['model_results']['model'] = model
    d_output['model_results']['y_pred_labels'] = y_pred_labels
    d_output['model_results']['y_pred_one_hot'] = y_pred_one_hot
    d_output['model_results']['accuracy'] = accuracy
    d_output['model_results']['auc_test'] = auc_test

    json_string_output = json.dumps(d_output, cls=NumpyArrayEncoder)
    
    ## write json file
    with open(output_json, 'w') as outfile:
        outfile.write(json_string_output)

create_step_classif_decision_tree = kfp.components.create_component_from_func(
    func=decision_tree,
    output_component_file='component_classif_decision_tree.yaml', # save the component spec for future use.
    base_image='python:3.7',
    packages_to_install=['pandas==1.1.4',
                         'scikit-learn==1.0.2'])

### Pipeline 04

1. download file
2. data processing: 
    
    - a) get dummies
    - b) impute
    - c) scale
    - d) create JSON object for inputs
3. feature selection
4. classification
    - decision tree

In [27]:
def pl04_webdl_dp_fs_classif_decisiontree(url):
    """Pipeline: Download data, data processing
        1. download data
        2. data processing
            a. get dummies
            b. imputation
            c. scaling
            d. create model inputs
        3. feature selection
        4. classification
            a. decision tree
    
    Args:
        file_path: A string containing path to the CSV.
                    e.g., 'https://www.openml.org/data/get_csv/3594/dataset_113_primary-tumor.arff'
    """
    web_downloader_task = download_file_component(url=url_for_data_download)
    dp_get_dummies_task = create_step_dp_get_dummies(file=web_downloader_task.outputs['data'])
    dp_impute_task = create_step_dp_impute_unknown(file=dp_get_dummies_task.outputs['output_csv_features'])
    dp_scale_task = create_step_dp_scale_df(file=dp_impute_task.outputs['output_csv'])
    dp_create_model_inputs_task = create_step_create_model_inputs(file_path_features=dp_scale_task.outputs['output_csv'],
                                                                  file_path_target=dp_get_dummies_task.outputs['output_csv_target'])
    dp_feature_selection_task = create_step_feature_selection(file=dp_create_model_inputs_task.outputs['output_json'])                                                                  
    dp_classif_decision_tree_task = create_step_classif_decision_tree(file=dp_feature_selection_task.outputs['output_json'])
    
kfp.compiler.Compiler().compile(
    pipeline_func=pl04_webdl_dp_fs_classif_decisiontree,
    package_path='pl04_webdl_dp_fs_classif_dectree.yaml')