In [35]:
from kfp.dsl import (
    pipeline,
    Input,
    Output,
    Dataset,
    component,
    Metrics,
    Model,
    OutputPath,
    Condition,
    OutputPath,
)
from typing import NamedTuple
from kfp import compiler

In [36]:
@component(base_image='python:3.10', packages_to_install=['hopsworks', 'pandas'])
def data_extraction(features: Output[Dataset], labels: Output[Dataset]):
    import hopsworks
    project = hopsworks.login(api_key_value="d8xtITOWVijkhAXY.w40wTIyrfsOWnspA7lJIsAKk3CQU5ethzhx5KHFEmC9tnAjRdLMXHjEEGefhHMvo")
    fs = project.get_feature_store()
    bank_fv = fs.get_feature_view("bank_fv", 1)
    features_df, labels_df = bank_fv.get_training_data(training_dataset_version=1)
    features_df.to_csv(features.path, index=False)
    labels_df.to_csv(labels.path, index=False)

In [37]:
# This is not required since we are handlling it by feature store only
# @component(base_image='python:3.10')
# def data_transformation():
#     print("Data extraction")

In [38]:
# This also can be applied at feature store level, but we will keep it here for now

@component(base_image='python:3.10', packages_to_install=['pandas', 'evidently'])
def data_validation(features: Input[Dataset], metrics: Output[Metrics]) -> NamedTuple('outputs', [('all_passed', bool)]):
    from evidently.test_suite import TestSuite
    from evidently.tests import TestNumberOfColumnsWithMissingValues
    from evidently.tests import TestNumberOfRowsWithMissingValues
    from evidently.tests import TestNumberOfConstantColumns
    from evidently.tests import TestNumberOfDuplicatedRows
    from evidently.tests import TestNumberOfDuplicatedColumns

    tests = TestSuite(tests=[
        TestNumberOfColumnsWithMissingValues(),
        TestNumberOfRowsWithMissingValues(),
        TestNumberOfConstantColumns(),
        TestNumberOfDuplicatedRows(),
        TestNumberOfDuplicatedColumns(),
    ])
    
    import pandas as pd
    df = pd.read_csv(features.path)

    tests.run(reference_data=None, current_data=df)
    test_dict = tests.as_dict()
    metrics.log_metric("all_passed", test_dict["summary"]["all_passed"])
    
    namedTuple = NamedTuple('outputs', [('all_passed', str)])
    return namedTuple(test_dict["summary"]["all_passed"])

In [39]:
@component(base_image='python:3.10', packages_to_install=['pandas', 'scikit-learn'])
def data_preparation(features: Input[Dataset], labels: Input[Dataset],X_train_dataset:Output[Dataset], X_test_dataset:Output[Dataset], y_train_dataset:Output[Dataset], y_test_dataset:Output[Dataset]):
    from sklearn.model_selection import train_test_split
    import pandas as pd
    X = pd.read_csv(features.path)
    y = pd.read_csv(labels.path)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    X_train.to_csv(X_train_dataset.path, index=False)
    X_test.to_csv(X_test_dataset.path, index=False)
    y_train.to_csv(y_train_dataset.path, index=False)
    y_test.to_csv(y_test_dataset.path, index=False)
    

In [40]:
@component(base_image='python:3.10', packages_to_install=['pandas', 'scikit-learn', 'joblib'])
def model_train(X_train_dataset:Input[Dataset], y_train_dataset:Input[Dataset], model_output:Output[Model]):
    import time
    import joblib
    time_start = time.time()
    from sklearn.tree import DecisionTreeClassifier
    import pandas as pd
    X_train = pd.read_csv(X_train_dataset.path)
    y_train = pd.read_csv(y_train_dataset.path)
    model = DecisionTreeClassifier(
        class_weight='balanced', 
        criterion='gini', 
        max_depth=5, 
        min_samples_leaf=2
    )
    model.fit(X_train, y_train)
    time_end = time.time() 
    model_output.metadata["framework"] = "Scikit-learn"
    model_output.metadata["time_to_train_in_seconds"] = time_end - time_start
    joblib.dump(model, model_output.path)
    

In [41]:
@component(base_image='python:3.10', packages_to_install=['pandas', 'scikit-learn', 'joblib'])
def model_evaluation(metrics: Output[Metrics], model_input: Input[Model], X_test_dataset:Input[Dataset], y_test_dataset:Input[Dataset]):
    from sklearn.metrics import accuracy_score
    import pandas as pd
    import joblib
    X_test = pd.read_csv(X_test_dataset.path)
    y_test = pd.read_csv(y_test_dataset.path)
    model = joblib.load(model_input.path)
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    metrics.log_metric("accuracy", accuracy)

#### TODO: Model Registry Component can be added here based on evaluation metrics results or without

In [42]:
BUCKET_URI = "gs://leap-vertex-ai-im"
PIPELINE_ROOT = "{}/test-runs".format(BUCKET_URI)

@pipeline(
    name="bank-model-pipeline",
    pipeline_root=PIPELINE_ROOT,
)
def bank_pipeline():
    # TODO: use dsl.Condition for model evaluation then create model registry
    data_extraction_task = data_extraction()
    data_validation_task = data_validation(
        features=data_extraction_task.outputs["features"]
    )
    with Condition(data_validation_task.outputs["all_passed"] == True):
        data_preparation_task = data_preparation(features=data_extraction_task.outputs["features"], labels=data_extraction_task.outputs["labels"])
        train = model_train(X_train_dataset=data_preparation_task.outputs["X_train_dataset"], y_train_dataset=data_preparation_task.outputs["y_train_dataset"])
        evaluate = model_evaluation(model_input=train.outputs["model_output"], X_test_dataset=data_preparation_task.outputs["X_test_dataset"], y_test_dataset=data_preparation_task.outputs["y_test_dataset"])
    

In [43]:
compiler.Compiler().compile(pipeline_func=bank_pipeline,package_path='bank_pipeline.yaml')