## Notebook using kfp.create_component_from_func function

### Telco Churn Pipeline

In [1]:
## Import Required Libraries

import kfp
import typing

In [2]:
## Read Data

from typing import NamedTuple
from kfp.components import *

def read_data(file_name: str) -> 'pd.DataFrame': 
        
    ## Import Required Libraries
    import pandas as pd
    import numpy as np
    
    #This line may cause problems as file is on the system and not inside container
    #Importing directly from Github Raw Content
    
    df_churn = pd.read_csv(file_name, error_bad_lines=False)
    col1 = len(df_churn.columns)
    
    empty_cols=['customerID', 'gender', 'SeniorCitizen', 'Partner', 'Dependents',
           'tenure', 'PhoneService', 'MultipleLines', 'InternetService',
           'OnlineSecurity', 'OnlineBackup', 'DeviceProtection','TechSupport',
           'StreamingTV', 'StreamingMovies', 'Contract', 'PaperlessBilling',
           'PaymentMethod', 'MonthlyCharges', 'TotalCharges', 'Churn']
    
    for i in empty_cols:
        df_churn[i]=df_churn[i].replace(" ",np.nan)

    df_churn.drop(['customerID','cluster number'], axis=1, inplace=True)
    df_churn = df_churn.dropna()
    #df_churn.to_string()
    return df_churn


In [3]:
kfp_read_data = kfp.components.create_component_from_func(func = read_data, 
                                                          output_component_file = './read-data-func.yaml',
                                                          packages_to_install = ['numpy','pandas'])

read_data_task = kfp_read_data(file_name = 'https://raw.githubusercontent.com/rujual/telco_churn/master/Data.csv') 

In [4]:
## One-Hot-Encode

from typing import NamedTuple
from kfp.components import *

def one_hot_encode(input_df: 'pd.DataFrame') -> 'pd.DataFrame': 

    ## Import Required Libraries
    import pandas as pd
    import numpy as np
    
    df_churn = input_df #pd.read_csv(file_name)  
    
    binary_cols = ['Partner','Dependents','PhoneService','PaperlessBilling']

    for i in binary_cols:
        df_churn[i] = df_churn[i].replace({"Yes":1,"No":0})

    #Encoding column 'gender'
    df_churn['gender'] = df_churn['gender'].replace({"Male":1,"Female":0})


    category_cols = ['PaymentMethod','MultipleLines','InternetService','OnlineSecurity',
                   'OnlineBackup','DeviceProtection',
                   'TechSupport','StreamingTV','StreamingMovies','Contract']

    for cc in category_cols:
        dummies = pd.get_dummies(df_churn[cc], drop_first=False)
        dummies = dummies.add_prefix("{}#".format(cc))
        df_churn.drop(cc, axis=1, inplace=True)
        df_churn = df_churn.join(dummies)
    
    df_churn['Churn'] = df_churn['Churn'].replace({"Yes":1,"No":0})

    
    #saving files may need a PV allocation to container
    #output of files as Named tuple may cause problems    
    
    return df_churn 

In [5]:
kfp_one_hot_encode = kfp.components.create_component_from_func(func = one_hot_encode, 
                                                          output_component_file = './one-hot-encode-func.yaml',
                                                          packages_to_install = ['numpy','pandas'])
one_hot_encode_task = kfp_one_hot_encode(read_data_task.outputs)

  serialized_value),


In [6]:
## Random Forest Model
from typing import NamedTuple
def rf_model(input_df: 'pd.DataFrame', n_estimators: int = 100) -> NamedTuple('Outputs', [('Cf1', int), ('Cf2', int),
                                                                                     ('Cf3', int), ('Cf4', int)]):

    from sklearn.ensemble import RandomForestClassifier
    from imblearn.over_sampling import SMOTE
    from sklearn.model_selection import GridSearchCV
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import confusion_matrix
    
    df_churn = input_df
    n_est = n_estimators
    
    y1 = df_churn['Churn']
    X1 = df_churn.drop(['Churn'],axis=1)

    X_train, X_test, y_train, y_test = train_test_split(X1, y1, random_state=0)

    sm = SMOTE(random_state=0)
    X_train_res, y_train_res = sm.fit_sample(X_train, y_train)

    param_grid = {
        'n_estimators': [50, 100, 200],
        'max_features': ['auto', 'sqrt', 'log2'],
        'max_depth' : [2,4,5,6,7,8],
        'criterion' :['gini', 'entropy']
    }


    rfc=RandomForestClassifier(random_state=42,n_estimators=n_est)
    gsv_rfc = GridSearchCV(estimator=rfc, param_grid=param_grid, cv= 5)
    rfc.fit(X_train_res, y_train_res)

    rfc_best=RandomForestClassifier(random_state=42, max_features='auto', n_estimators= 50, max_depth=8,
                                    criterion='gini')

    rfc_best.fit(X_train_res, y_train_res)
    X_test_res, y_test_res = sm.fit_sample(X_test, y_test)
    y_test_pred = rfc_best.predict(X_test_res)
    rf_score = rfc_best.score(X_test_res, y_test_res)
    conf = confusion_matrix(y_test_res, y_test_pred)
    return (conf[0][0],conf[0][1],conf[1][0],conf[1][1])

In [7]:
kfp_rf_model = kfp.components.create_component_from_func(func = rf_model, 
                                                          output_component_file = './rf-model-func.yaml',
                                                          packages_to_install = ['scikit-learn==0.19.1','numpy','pandas','imbalanced-learn==0.6.2'])
rf_model_task = kfp_rf_model(one_hot_encode_task.outputs, 100)

  serialized_value),


In [8]:
#Compile the components into pipeline function

In [9]:
import kfp.dsl as dsl

@dsl.pipeline(name='Merchant-Churn-Pipeline',description='A pipeline that processes and performs ML-Predictions using Random Forest Algorithm')
def Merch_Churn(file_name = "https://raw.githubusercontent.com/rujual/telco_churn/master/Data.csv", 
                n_estimators = 100):
    
    #Passing pipeline parameter and a constant value as operation arguments
    #Passing a task output reference as operation arguments
    
    read_data_task = kfp_read_data(file_name)    #Returns a dsl.ContainerOp class instance. 
    one_hot_encode_task = kfp_one_hot_encode(read_data_task.output) 
    rf_model_task = kfp_rf_model(one_hot_encode_task.output, n_estimators = 100)
    

#For an operation with a single return value, the output reference can be accessed using `task.output` or `task.outputs['output_name']` syntax
#For an operation with a multiple return values, the output references can be accessed using `task.outputs['output_name']` syntax

In [10]:
pipeline_func = Merch_Churn
pipeline_filename = pipeline_func.__name__+'.pipeline.tar.gz'

import kfp.compiler as comp
comp.Compiler().compile(pipeline_func, pipeline_filename)

