In [2]:
import pandas as pd
import numpy as np
import mlflow
import pickle
import kfp.dsl as dsl
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
import kfp

In [3]:
from kfp.components import func_to_container_op


In [4]:
@func_to_container_op
def load_and_clean_data():
    data = pd.read_csv("https://raw.githubusercontent.com/TripathiAshutosh/dataset/main/banking.csv")
    print("Null/missingalues available in the data: \n", data.isna().sum())
    data = data.dropna()
    print("The data after dropping the NA values are: \n", data.isna().sum())
    print("--------data imported and cleaned----------")
    return data


In [11]:
data = load_and_clean_data()

In [5]:
@func_to_container_op
def preprocessing(data):
    data['education'] = np.where(data['education'].isin(['basic.9y', 'basic.6y', 'basic.4y']), 'Basic', data['education'])
    categorical_vars = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'month', 'day_of_week', 'poutcome']
    for var in categorical_vars:
        cat_list = pd.get_dummies(data[var], prefix=var) # one hot encoding
        data = data.join(cat_list)
    categorical_vars = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'month', 'day_of_week', 'poutcome']
    data_vars = data.columns.values.tolist()
    keeping = [i for i in data_vars if i not in categorical_vars]
    final_df = data[keeping]
    final_df.columns = final_df.columns.str.replace(".", "_").str.replace(" ", "_")
    print(final_df.head())
    print("Education column pre-processed, categorical variables one-hot encoded. Ready to input data to model")
    return final_df


In [6]:
@func_to_container_op
def train_test_split(final_df):
    X = final_df.loc[:, final_df.columns != 'y']
    y = final_df.loc[:, final_df.columns == 'y']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, stratify=y, random_state=47)
    print("\n---- X_train ----\n", X_train.head())
    print("\n---- X_test ----\n", X_test.head())
    print("\n---- y_test ----\n", y_test.head())
    return X_train, X_test, y_train, y_test

In [7]:
@func_to_container_op
def training_basic_classifier(X_train, y_train):
    model = RandomForestClassifier(n_estimators=150)
    model.fit(X_train, y_train.values.ravel())
    with mlflow.start_run():
        mlflow.log_param("n_estimators", 150)
        mlflow.sklearn.log_model(model, "model")
        with open('data/model.pkl', 'wb') as f:
            pickle.dump(model, f)
    print("\nRandomForest classifier is trained on banking data and saved to PV location /data/model.pkl ----")
    return model

In [8]:
@func_to_container_op
def predict_on_test_data(model, X_test):
    print("---- Inside predict_on_test_data component ----")
    y_pred = model.predict(X_test)
    np.save('data/y_pred.npy', y_pred)
    print("\n---- Predicted classes ----\n", y_pred)
    return y_pred


In [12]:
@dsl.pipeline(name='My ML Pipeline')
def my_pipeline():
    # Define the pipeline steps
    load_and_clean_dataa = load_and_clean_data()
    data_preprocessing = preprocessing(load_and_clean_dataa.outputs[data])
    data_splitting = train_test_split(data_preprocessing.output)
    model_training = training_basic_classifier(data_splitting.outputs['X_train'])
    prediction = predict_on_test_data(model_training.output, data_splitting.outputs['X_test'])


In [13]:
kfp.compiler.Compiler().compile(my_pipeline,'final.yaml')
kfp.Client().create_run_from_pipeline_func(my_pipeline)

KeyError: TaskSpec(component_ref=ComponentReference(name=None, digest=None, tag=None, url=None, spec=ComponentSpec(name='Load and clean data', description=None, metadata=None, inputs=None, outputs=None, implementation=ContainerImplementation(container=ContainerSpec(image='python:3.7', command=['sh', '-ec', 'program_path=$(mktemp)\nprintf "%s" "$0" > "$program_path"\npython3 -u "$program_path" "$@"\n', 'def load_and_clean_data():\n    data = pd.read_csv("https://raw.githubusercontent.com/TripathiAshutosh/dataset/main/banking.csv")\n    print("Null/missingalues available in the data: \\n", data.isna().sum())\n    data = data.dropna()\n    print("The data after dropping the NA values are: \\n", data.isna().sum())\n    print("--------data imported and cleaned----------")\n    return data\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=\'Load and clean data\', description=\'\')\n_parsed_args = vars(_parser.parse_args())\n\n_outputs = load_and_clean_data(**_parsed_args)\n'], args=[], env=None, file_outputs=None)), version='google.com/cloud/pipelines/component/v1')), arguments={}, is_enabled=None, execution_options=None, annotations=None)