## Notebook using kfp.create_component_from_func function

### Telco Churn Pipeline

In [38]:
## Import Required Libraries

import kfp
import typing

In [39]:
## Read Data

from typing import NamedTuple
from kfp.components import *
import pandas as pd 

def read_data(file_name: str) -> str: 
        
    ## Import Required Libraries
    import pandas as pd
    import numpy as np
    
    #This line may cause problems as file is on the system and not inside container
    #Importing directly from Github Raw Content
    df_churn = pd.read_csv(file_name)
    df_churn = df_churn.astype(str)
    
    def remove_spaces(s):
        return s.replace(' ','_')
    
    df_churn = df_churn.applymap(remove_spaces)
    
    #returning df as to_csv without passing path converts it to string
    df_str = df_churn.to_string(index=False)
    return df_str #to_dict() #to_csv(index=False)


In [40]:
kfp_read_data = kfp.components.func_to_container_op(func = read_data, 
                                                          output_component_file = './read-data-func.yaml',
                                                          packages_to_install = ['numpy','pandas'])



In [41]:
## One-Hot-Encode

from typing import NamedTuple
from kfp.components import *

def one_hot_encode(input_df: str) -> str: 

    ## Import Required Libraries
    import pandas as pd
    import numpy as np
    
    lines = input_df.splitlines()
    l_df = []
    for l in lines:
        l_df.append(l.split())
        
    cols = l_df[0]
    l_df = l_df[1:]
    df_churn = pd.DataFrame(l_df, columns=cols) 

    empty_cols=['customerID', 'gender', 'SeniorCitizen', 'Partner', 'Dependents',
           'tenure', 'PhoneService', 'MultipleLines', 'InternetService',
           'OnlineSecurity', 'OnlineBackup', 'DeviceProtection','TechSupport',
           'StreamingTV', 'StreamingMovies', 'Contract', 'PaperlessBilling',
           'PaymentMethod', 'MonthlyCharges', 'TotalCharges', 'Churn']

    for i in empty_cols:
        df_churn[i]=df_churn[i].replace(" ",np.nan)

    df_churn.drop(['customerID','cluster_number'], axis=1, inplace=True)
    df_churn = df_churn.dropna()
    binary_cols = ['Partner','Dependents','PhoneService','PaperlessBilling']

    for i in binary_cols:
        df_churn[i] = df_churn[i].replace({"Yes":1,"No":0})

    #Encoding column 'gender'
    df_churn['gender'] = df_churn['gender'].replace({"Male":1,"Female":0})


    category_cols = ['PaymentMethod','MultipleLines','InternetService','OnlineSecurity',
                   'OnlineBackup','DeviceProtection',
                   'TechSupport','StreamingTV','StreamingMovies','Contract']

    for cc in category_cols:
        dummies = pd.get_dummies(df_churn[cc], drop_first=False)
        dummies = dummies.add_prefix("{}#".format(cc))
        df_churn.drop(cc, axis=1, inplace=True)
        df_churn = df_churn.join(dummies)

    df_churn['Churn'] = df_churn['Churn'].replace({"Yes":1,"No":0})

    #saving files may need a PV allocation to container
    #output of files as Named tuple may cause problems 
    
    df_str = df_churn.to_string(index=False)
    return df_str #to_dict() #to_csv(index=False)

In [42]:
kfp_one_hot_encode = kfp.components.func_to_container_op(func = one_hot_encode, 
                                                          output_component_file = './one-hot-encode-func.yaml',
                                                          packages_to_install = ['numpy','pandas'])

In [43]:
## Random Forest Model
from typing import NamedTuple
def rf_model(input_df: dict, n_estimators: int = 100) -> NamedTuple('Outputs', [('Cf1', int), ('Cf2', int),
                                                                                     ('Cf3', int), ('Cf4', int)]):

    from sklearn.ensemble import RandomForestClassifier
    from imblearn.over_sampling import SMOTE
    from sklearn.model_selection import GridSearchCV
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import confusion_matrix
    
    
    lines = input_df.splitlines()
    l_df = []
    for l in lines:
        l_df.append(l.split())
        
    cols = l_df[0]
    l_df = l_df[1:]
    df_churn = pd.DataFrame(l_df, columns=cols) 
    
    n_estimators = 100
    n_est = n_estimators

    y1 = df_churn['Churn']
    X1 = df_churn.drop(['Churn'],axis=1)

    X_train, X_test, y_train, y_test = train_test_split(X1, y1, random_state=0)

    sm = SMOTE(random_state=0)
    X_train_res, y_train_res = sm.fit_sample(X_train, y_train)

    param_grid = {
        'n_estimators': [50, 100, 200],
        'max_features': ['auto', 'sqrt', 'log2'],
        'max_depth' : [2,4,5,6,7,8],
        'criterion' :['gini', 'entropy']
    }


    rfc=RandomForestClassifier(random_state=42,n_estimators=n_est)
    gsv_rfc = GridSearchCV(estimator=rfc, param_grid=param_grid, cv= 5)
    rfc.fit(X_train_res, y_train_res)

    rfc_best=RandomForestClassifier(random_state=42, max_features='auto', n_estimators= 50, max_depth=8,
                                    criterion='gini')

    rfc_best.fit(X_train_res, y_train_res)
    X_test_res, y_test_res = sm.fit_sample(X_test, y_test)
    y_test_pred = rfc_best.predict(X_test_res)
    rf_score = rfc_best.score(X_test_res, y_test_res)
    conf = confusion_matrix(y_test_res, y_test_pred)
    
    return (conf[0][0],conf[0][1],conf[1][0],conf[1][1])

In [44]:
kfp_rf_model = kfp.components.func_to_container_op(func = rf_model, 
                                                          output_component_file = './rf-model-func.yaml',
                                                          packages_to_install = ['scikit-learn==0.19.1','numpy','pandas','imbalanced-learn==0.6.2'])

In [45]:
#Compile the components into pipeline function

In [46]:
import kfp.dsl as dsl

@dsl.pipeline(name='Merchant-Churn-Pipeline',description='A pipeline that processes and performs ML-Predictions using Random Forest Algorithm')
def Merch_Churn(file_name = "https://raw.githubusercontent.com/rujual/telco_churn/master/Data.csv", 
                n_estimators = 100):
    
    #Passing pipeline parameter and a constant value as operation arguments
    #Passing a task output reference as operation arguments
    
    read_data_task = read_data(file_name)    #Returns a dsl.ContainerOp class instance. 
    one_hot_encode_task = one_hot_encode(read_data_task.output) 
    rf_model_task = srf_model(one_hot_encode_task.output, n_estimators = 100)
    

#For an operation with a single return value, the output reference can be accessed using `task.output` or `task.outputs['output_name']` syntax
#For an operation with a multiple return values, the output references can be accessed using `task.outputs['output_name']` syntax

In [47]:
pipeline_func = Merch_Churn
pipeline_filename = pipeline_func.__name__+'.pipeline.tar.gz'

import kfp.compiler as comp
comp.Compiler().compile(pipeline_func, pipeline_filename)

ValueError: Invalid file path or buffer object type: <class 'kfp.dsl._pipeline_param.PipelineParam'>