In [None]:
from kfp.v2 import dsl
from kfp.v2.dsl import (Output, Metrics, component)
from google.cloud.aiplatform import pipeline_jobs
from typing import NamedTuple
from kfp.v2 import compiler

@component(
    packages_to_install=["gcsfs", "pandas", "google-cloud-storage", "kfp"]
)
def validate_input_ds(filename: str) -> NamedTuple("output", [("input_validation", str)]):

    import logging
    from google.cloud import storage
    import pandas as pd

    logging.basicConfig(level=logging.INFO)

    logging.info(f"Reading file: {filename}")
    df = pd.read_csv(filename)
    expected_num_cols = 21
    num_cols = len(df.columns)

    logging.info(f"Number of columns: {num_cols}")
    
    input_validation = "true"
    
    if num_cols != expected_num_cols:
        input_validation = "false"

    return (input_validation, )

In [None]:
@component(
    packages_to_install=["google-cloud-aiplatform", "gcsfs", "xgboost", "category_encoders", 
                         "imblearn", "pandas", "google-cloud-storage", "scikit-learn", "fsspec"]
)
def custom_training_job_component(
    max_depth:int,
    learning_rate:float,
    n_estimators:int,
    metrics: Output[Metrics]
) -> NamedTuple("output", [("model_validation", str)]):

    import pandas as pd
    from sklearn.model_selection import train_test_split
    from xgboost import XGBClassifier
    from google.cloud import storage
    from sklearn.metrics import precision_score, recall_score, accuracy_score, confusion_matrix

    storage_client = storage.Client()
    bucket = storage_client.bucket("udemy-gcp-mlops")

    def purpose_encode(x):
        if x == "Consumer Goods":
            return 1
        elif x == "Vehicle":
            return 2
        elif x == "Tuition":
            return 3
        elif x == "Business":
            return 4
        elif x == "Repairs":
            return 5
        else:
            return 0

    def other_parties_encode(x):
        if x == "Guarantor":
            return 1
        elif x == "Co-Applicant":
            return 2
        else:
            return 0

    def qualification_encode(x):
        if x == "unskilled":
            return 1
        elif x == "skilled":
            return 2
        elif x == "highly skilled":
            return 3
        else:
            return 0

    def credit_standing_encode(x):
        if x == "good":
            return 1
        else:
            return 0

    def assets_encode(x):
        if x == "Vehicle":
            return 1
        elif x == "Investments":
            return 2
        elif x == "Home":
            return 3
        else:
            return 0

    def housing_encode(x):
        if x == "rent":
            return 1
        elif x == "own":
            return 2
        else:
            return 0

    def marital_status_encode(x):
        if x == "Married":
            return 1
        elif x == "Single":
            return 2
        else:
            return 0

    def other_payment_plans_encode(x):
        if x == "bank":
            return 1
        elif x == "stores":
            return 2
        else:
            return 0

    def sex_encode(x):
        if x == "M":
            return 1
        else:
            return 0

    def credit_score_decode(x):
        return "Approved" if x == 1 else "Denied"

    def preprocess_data(df):
        df["PURPOSE_CODE"] = df["PURPOSE"].apply(purpose_encode)
        df["OTHER_PARTIES_CODE"] = df["OTHER_PARTIES"].apply(other_parties_encode)
        df["QUALIFICATION_CODE"] = df["QUALIFICATION"].apply(qualification_encode)
        df["CREDIT_STANDING_CODE"] = df["CREDIT_STANDING"].apply(credit_standing_encode)
        df["ASSETS_CODE"] = df["ASSETS"].apply(assets_encode)
        df["HOUSING_CODE"] = df["HOUSING"].apply(housing_encode)
        df["MARITAL_STATUS_CODE"] = df["MARITAL_STATUS"].apply(marital_status_encode)
        df["OTHER_PAYMENT_PLANS_CODE"] = df["OTHER_PAYMENT_PLANS"].apply(other_payment_plans_encode)
        df["SEX_CODE"] = df["SEX"].apply(sex_encode)

        columns_to_drop = ["PURPOSE", "OTHER_PARTIES", "QUALIFICATION", "CREDIT_STANDING",
                        "ASSETS", "HOUSING", "MARITAL_STATUS", "OTHER_PAYMENT_PLANS", "SEX"]
        df = df.drop(columns=columns_to_drop)

        return df

    def split_data(df):
        X_train, X_test, y_train, y_test = train_test_split(df.drop('CREDIT_STANDING_CODE', axis=1), 
                                                            df['CREDIT_STANDING_CODE'], test_size=0.30)
        return X_train, X_test, y_train, y_test


    def train_model(X_train, y_train,max_depth,learning_rate,n_estimators):    
        model = XGBClassifier(
            max_depth=max_depth,
            learning_rate=learning_rate,
            n_estimators=n_estimators,
            random_state=42,
            use_label_encoder=False
        )    
        model.fit(X_train, y_train)
        return model

    def save_model_artifact(pipeline):
        artifact_name = 'model.bst'
        pipeline.save_model(artifact_name)
        model_artifact = bucket.blob('credit-scoring/artifacts/'+artifact_name)
        model_artifact.upload_from_filename(artifact_name)

    def evaluate_model(model, X_test, y_test):
        y_pred = model.predict(X_test)
        accuracy = accuracy_score(y_test, y_pred)
        precision = precision_score(y_test, y_pred)
        recall = recall_score(y_test, y_pred)
        conf_matrix = confusion_matrix(y_test, y_pred)

        return accuracy, precision, recall, conf_matrix

    input_file = "gs://udemy-gcp-mlops/credit-scoring/credit_files.csv"
    credit_df = pd.read_csv(input_file)
    credit_df = preprocess_data(credit_df)

    X_train, X_test, y_train, y_test = split_data(credit_df)

    pipeline = train_model(X_train, y_train, max_depth, learning_rate, n_estimators)

    accuracy, precision, recall, conf_matrix = evaluate_model(pipeline, X_test, y_test)

    metrics.log_metric("accuracy", accuracy)
    metrics.log_metric("precision", precision)
    metrics.log_metric("recall", recall)

    if accuracy > 0.5 and precision > 0.5:
        save_model_artifact(pipeline)
        model_validation = "true"
    else :
        model_validation = "false"

    return (model_validation, )

In [None]:
@component(
    packages_to_install=["google-cloud-aiplatform"]
)
def model_deployment()-> NamedTuple("endpoint", [("endpoint", str)]):
    
    from google.cloud import aiplatform
    
    aiplatform.init(project="dataanalytics-347914", location="us-central1", staging_bucket="gs://udemy-gcp-mlops")
    
    model = aiplatform.Model.upload(
        display_name="credit-scoring-model",
        artifact_uri="gs://udemy-gcp-mlops/credit-scoring/artifacts/",
        serving_container_image_uri = "us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-6:latest",
        sync=False
    )
    
    DEPLOYED_NAME = "credit-scoring-model-endpoint"
    TRAFFIC_SPLIT = {"0": 100}
    MIN_NODES = 1
    MAX_NODES = 1

    endpoint = model.deploy(
        deployed_model_display_name=DEPLOYED_NAME,
        traffic_split=TRAFFIC_SPLIT,
        machine_type="n1-standard-4",
        min_replica_count=MIN_NODES,
        max_replica_count=MAX_NODES
    )

In [None]:
@dsl.pipeline(
    pipeline_root="gs://udemy-gcp-mlops/credit-pipeline-v1",
    name="credit-scoring-training-pipeline",
)
def pipeline(
    project: str = "dataanalytics-347914",
    region: str = "us-central1"
    ):
    
    max_depth = 5
    learning_rate = 0.2
    n_estimators = 40
    
    file_name = "gs://udemy-gcp-mlops/credit-scoring/credit_files.csv"
    input_validation_task = validate_input_ds(filename=file_name)
    
    with dsl.Condition(input_validation_task.outputs["input_validation"] == "true"):
        model_training = custom_training_job_component(
            max_depth=max_depth,
            learning_rate=learning_rate,
            n_estimators=n_estimators,
        ).after(input_validation_task)

        with dsl.Condition(model_training.outputs["model_validation"] == "true"):
            task_deploy_model = model_deployment().after(model_training)
        
compiler.Compiler().compile(pipeline_func=pipeline, package_path='credit-pipeline-deploy-v1.json')

In [None]:
start_pipeline = pipeline_jobs.PipelineJob(
    display_name="model-deployment-credit-kubeflow-pipeline",
    template_path="credit-pipeline-deploy-v1.json",
    enable_caching=False,
    location="us-central1",
)

start_pipeline.run()