### Installation
Install the packages required for executing this notebook.

## Some of the source codes are based on
https://towardsdatascience.com/how-to-set-up-custom-vertex-ai-pipelines-step-by-step-467487f81cad 

In [2]:
# Install the packages
! pip3 install --user --no-cache-dir --upgrade "kfp>2" "google-cloud-pipeline-components>2" \
                                        google-cloud-aiplatform

Collecting kfp>2
  Downloading kfp-2.9.0.tar.gz (595 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m595.6/595.6 kB[0m [31m30.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Collecting kfp-pipeline-spec==0.4.0 (from kfp>2)
  Downloading kfp_pipeline_spec-0.4.0-py3-none-any.whl.metadata (301 bytes)
Collecting kfp-server-api<2.4.0,>=2.1.0 (from kfp>2)
  Downloading kfp_server_api-2.3.0.tar.gz (84 kB)
  Preparing metadata (setup.py) ... [?25ldone


## Restart the kernel
Once you've installed the additional packages, you need to restart the notebook kernel so it can find the packages.

In [3]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

Check the versions of the packages you installed. The KFP SDK version should be >=1.6.

In [1]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! pip3 freeze | grep aiplatform
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

KFP SDK version: 2.7.0
google-cloud-aiplatform==1.70.0
google_cloud_pipeline_components version: 2.17.0


In [2]:
import kfp
import typing
from typing import Dict
from typing import NamedTuple
from kfp import dsl
from kfp.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        OutputPath, 
                        InputPath)
import google.cloud.aiplatform as aip
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.v1.endpoint import (EndpointCreateOp,ModelDeployOp)
from google_cloud_pipeline_components.types import artifact_types

#### Project and Pipeline Configurations

In [3]:
#The Google Cloud project that this pipeline runs in.
PROJECT_ID = "de2024-436414"
# The region that this pipeline runs in
REGION = "us-central1"
# Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored within the pipeline root.
PIPELINE_ROOT = "gs://temp_de2024_17"   # e.g., gs://temp_de2024

#### Create Pipeline Components

We can create a component from Python functions (inline) and from a container. We will first try inline python functions. 
Refer to  https://www.kubeflow.org/docs/components/pipelines/v2/components/lightweight-python-components/ for more information.

In [None]:
from typing import NamedTuple
import kfp.dsl as dsl
from kfp.v2.dsl import (
    Input,
    Output,
    Dataset,
    Model,
    ClassificationMetrics,
    Metrics
)

@dsl.component(
    packages_to_install=["pandas", "scikit-learn==1.3.2"],
    base_image="python:3.10.7-slim"
)
def train_test_split(dataset: Input[Dataset], dataset_train: Output[Dataset], dataset_test: Output[Dataset]):
    '''Splits dataset into train and test sets'''
    import pandas as pd
    from sklearn.model_selection import train_test_split as tts

    df = pd.read_csv(dataset.path)
    train, test = tts(df, test_size=0.2, random_state=42)
    train.to_csv(dataset_train.path + ".csv", index=False)
    test.to_csv(dataset_test.path + ".csv", index=False)

@dsl.component(
    packages_to_install=["pandas", "scikit-learn==1.3.2"],
    base_image="python:3.10.7-slim"
)
def train_stacked_model(features: Input[Dataset], model: Output[Model]):
    '''Train a stacking classifier with SVC and LogisticRegression'''
    import pandas as pd
    from sklearn.compose import ColumnTransformer
    from sklearn.ensemble import StackingClassifier
    from sklearn.linear_model import LogisticRegression
    from sklearn.svm import SVC
    from sklearn.pipeline import Pipeline
    from sklearn.preprocessing import OneHotEncoder, StandardScaler
    import pickle

    # Load the dataset
    data = pd.read_csv(features.path + ".csv")
    categorical_columns = ['schoolsup', 'higher']
    numerical_columns = ['absences', 'failures', 'Medu', 'Fedu', 'Walc', 'Dalc', 'famrel', 'goout', 'freetime', 'studytime']

    # Preprocessing pipeline
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', StandardScaler(), numerical_columns),
            ('cat', OneHotEncoder(drop='if_binary'), categorical_columns)
        ])

    # Define base models and stacking ensemble
    svm_linear = SVC(C=0.1, kernel='linear', probability=True)
    svm_rbf = SVC(C=0.1, kernel='rbf', probability=True)
    meta_model = LogisticRegression(C=10)

    # Stacking ensemble
    stack_clf = StackingClassifier(
        estimators=[('svm_linear', svm_linear), ('svm_rbf', svm_rbf)],
        final_estimator=meta_model,
        cv=10
    )

    # Full pipeline
    pipeline = Pipeline(steps=[
        ('preprocessor', preprocessor),
        ('classifier', stack_clf)
    ])

    # Train model
    X = data[numerical_columns + categorical_columns]
    y = data['class']
    pipeline.fit(X, y)

    # Save model
    with open(model.path + ".pkl", 'wb') as f:
        pickle.dump(pipeline, f)

@dsl.component(
    packages_to_install=["pandas", "scikit-learn==1.3.2", "numpy"],
    base_image="python:3.10.7-slim"
)
def model_evaluation(test_set: Input[Dataset], model: Input[Model], thresholds_dict_str: str, metrics: Output[ClassificationMetrics], kpi: Output[Metrics]) -> NamedTuple('outputs', [('approval', bool)]):
    '''Evaluates the model and approves it if accuracy meets the threshold'''
    import pandas as pd
    import pickle
    from sklearn.metrics import accuracy_score, roc_curve, confusion_matrix
    import json
    from numpy import nan_to_num

    # Load test data
    data = pd.read_csv(test_set.path + ".csv")
    X_test = data.drop(columns=["class"])
    y_test = data["class"]

    # Load model
    with open(model.path + ".pkl", 'rb') as f:
        pipeline = pickle.load(f)

    # Make predictions and evaluate
    y_pred = pipeline.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)

    # Log ROC curve
    y_proba = pipeline.predict_proba(X_test)[:, 1]
    fpr, tpr, thresholds = roc_curve(y_test, y_proba)
    metrics.log_roc_curve(fpr.tolist(), tpr.tolist(), thresholds.tolist())

    # Log Confusion Matrix
    metrics.log_confusion_matrix(['Negative', 'Positive'], confusion_matrix(y_test, y_pred).tolist())

    # Accuracy metric
    kpi.log_metric("accuracy", accuracy)
    threshold = json.loads(thresholds_dict_str)["roc"]
    return (accuracy >= threshold, )

@dsl.component(
    packages_to_install=["google-cloud-storage"],
    base_image="python:3.10.7-slim"
)
def upload_model_to_gcs(project_id: str, model_repo: str, model: Input[Model]):
    '''Uploads model to Google Cloud Storage'''
    from google.cloud import storage

    client = storage.Client(project=project_id)
    bucket = client.bucket(model_repo)
    blob = bucket.blob('stacked_model.pkl')
    blob.upload_from_filename(model.path + '.pkl')

@dsl.component(
    packages_to_install=["google-cloud-build"],
    base_image="python:3.10.7-slim"
)
def run_build_trigger(project_id: str, trigger_id: str):
    '''Runs the CI/CD trigger'''
    from google.cloud.devtools import cloudbuild_v1

    client = cloudbuild_v1.CloudBuildClient()
    name = f"projects/{project_id}/locations/global/triggers/{trigger_id}"
    request = cloudbuild_v1.RunBuildTriggerRequest(project_id=project_id, trigger_id=trigger_id, name=name)
    client.run_build_trigger(request=request)

@kfp.dsl.pipeline(name="diabetes-predictor-stacked-training-pipeline")
def pipeline(project_id: str, dataset_uri: str, model_repo: str, thresholds_dict_str: str, trigger_id: str):
    '''Pipeline definition for training, evaluation, and deployment'''
    
    dataset_op = kfp.dsl.importer(
        artifact_uri=dataset_uri,
        artifact_class=Dataset,
        reimport=False,
    )
    
    # Train-test split
    split_op = train_test_split(dataset=dataset_op.output)
    
    # Model training
    training_op = train_stacked_model(features=split_op.outputs["dataset_train"])
    
    # Model evaluation
    evaluation_op = model_evaluation(
        test_set=split_op.outputs["dataset_test"],
        model=training_op.outputs["model"],
        thresholds_dict_str=thresholds_dict_str
    )
    
    # Conditional deployment
    with dsl.If(evaluation_op.outputs["approval"] == True):
        upload_op = upload_model_to_gcs(
            project_id=project_id,
            model_repo=model_repo,
            model=training_op.outputs['model']
        )
        run_build_trigger(project_id=project_id, trigger_id=trigger_id).after(upload_op)


#### Pipeline Component : Train and Test Split

In [4]:
@dsl.component(
    packages_to_install=["pandas", "scikit-learn==1.3.2"],
    base_image="python:3.10.7-slim"
)
def train_test_split(dataset: Input[Dataset], dataset_train: Output[Dataset], dataset_test: Output[Dataset]):
    '''train_test_split'''
    import pandas as pd
    import logging 
    import sys
    from sklearn.model_selection import train_test_split as tts
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO) 
    
    alldata = pd.read_csv(dataset.path, index_col=None)
    train, test = tts(alldata, test_size=0.3)
    train.to_csv(dataset_train.path + ".csv" , index=False, encoding='utf-8-sig')
    test.to_csv(dataset_test.path + ".csv" , index=False, encoding='utf-8-sig')

#### Pipeline Component : Training model

In [5]:
@dsl.component(
    packages_to_install=["pandas", "scikit-learn==1.3.2"],
    base_image="python:3.10.7-slim"
)
def train_stacking_model(features: Input[Dataset], model: Output[Model]):
    '''Train a stacking classifier model'''
    import pandas as pd
    from sklearn.compose import ColumnTransformer
    from sklearn.ensemble import StackingClassifier
    from sklearn.linear_model import LogisticRegression
    from sklearn.svm import SVC
    from sklearn.preprocessing import StandardScaler, OneHotEncoder
    import pickle

    data = pd.read_csv(features.path + ".csv")
    
    # Define features and target variable
    categorical_columns = ['schoolsup', 'higher']
    numerical_columns = ['absences', 'failures', 'Medu', 'Fedu', 'Walc', 'Dalc', 'famrel', 'goout', 'freetime', 'studytime']
    X = data[numerical_columns + categorical_columns]
    y = data['class']

    # Preprocessing
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', StandardScaler(), numerical_columns),
            ('cat', OneHotEncoder(drop='if_binary'), categorical_columns)
        ]
    )
    
    # Base models
    svm_linear_clf = SVC(C=0.1, kernel='linear', gamma='scale', class_weight='balanced', degree=2, probability=True)
    svm_rbf_clf = SVC(C=0.1, kernel='rbf', gamma='scale', class_weight='balanced', degree=2, probability=True)
    
    # Meta-classifier
    logreg = LogisticRegression(C=10, solver='newton-cg')
    
    # Stacking ensemble
    stack_clf = StackingClassifier(
        estimators=[
            ('svm_linear_clf', svm_linear_clf),
            ('svm_rbf', svm_rbf_clf)
        ],
        final_estimator=logreg
    )
    
    # Create pipeline
    from sklearn.pipeline import Pipeline
    pipeline = Pipeline(steps=[
        ('preprocessor', preprocessor),
        ('classifier', stack_clf)
    ])
    
    # Train the model
    pipeline.fit(X, y)
    
    # Save the model
    model.metadata["framework"] = "StackingClassifier"
    model_file = model.path + ".pkl"
    with open(model_file, 'wb') as f:
        pickle.dump(pipeline, f)

#### Pipeline Component : Model Evaluation

See https://www.kubeflow.org/docs/components/pipelines/v2/data-types/parameters/ for NamedTuple

In [6]:
@dsl.component(
    packages_to_install=["pandas", "scikit-learn==1.3.2"],
    base_image="python:3.10.7-slim"
)
def stacking_model_evaluation(
    test_set: Input[Dataset],
    model: Input[Model],
    thresholds_dict_str: str,
    metrics: Output[ClassificationMetrics],
    kpi: Output[Metrics]
) -> NamedTuple('outputs', [('approval', bool)]):
    '''Evaluate a stacking model'''
    import pandas as pd
    from sklearn.metrics import roc_curve, confusion_matrix, accuracy_score
    import pickle
    import json
    from numpy import nan_to_num

    # Load the test data and model
    data = pd.read_csv(test_set.path + ".csv")
    model_path = model.path + ".pkl"
    with open(model_path, 'rb') as f:
        pipeline = pickle.load(f)

    # Evaluate model
    X_test = data.drop(columns=["class"])
    y_true = data['class']
    y_pred = pipeline.predict(X_test)
    y_scores = pipeline.predict_proba(X_test)[:, 1]

    # Compute metrics
    accuracy = accuracy_score(y_true, y_pred)
    fpr, tpr, thresholds = roc_curve(y_true, y_scores, pos_label=1)
    thresholds = nan_to_num(thresholds)
    metrics.log_roc_curve(fpr, tpr, thresholds)
    metrics.log_confusion_matrix(['Negative', 'Positive'], confusion_matrix(y_true, y_pred).tolist())
    kpi.log_metric("accuracy", float(accuracy))

    # Approve the model based on threshold
    thresholds_dict = json.loads(thresholds_dict_str)
    approval = accuracy >= thresholds_dict['roc']
    return (approval,)

### Upload Model and Metrics to Google Bucket 

In [7]:
@dsl.pipeline(name="performance-predictor-training-pipeline")
def pipeline(project_id: str, data_bucket: str, dataset_uri: str, model_repo: str, thresholds_dict_str: str, trigger_id: str):    
    dataset_op = kfp.dsl.importer(
        artifact_uri=dataset_uri,
        artifact_class=Dataset,
        reimport=False,
    )
    
    train_test_split_op = train_test_split(dataset=dataset_op.output)
    
    train_stacking_model_op = train_stacking_model(features=train_test_split_op.outputs["dataset_train"])
    
    model_evaluation_op = stacking_model_evaluation(
        test_set=train_test_split_op.outputs["dataset_test"],
        model=train_stacking_model_op.outputs["model"],
        thresholds_dict_str=thresholds_dict_str
    )
    
    with dsl.If(
        model_evaluation_op.outputs["approval"] == True,
        name="approve-model",
    ):
        upload_model_to_gcs_op = upload_model_to_gcs(
            project_id=project_id,
            model_repo=model_repo,
            model=train_stacking_model_op.outputs["model"]
        )   
        
        run_build_trigger_op = run_build_trigger(
            project_id=project_id,
            trigger_id=trigger_id
        ).after(upload_model_to_gcs_op) 

### Trigger Another CI_CD Pipeline

In [8]:
@dsl.component(
    packages_to_install=["google-cloud-build"],
    base_image="python:3.10.7-slim"
)
def run_build_trigger(project_id:str, trigger_id:str):
    import sys
    from google.cloud.devtools import cloudbuild_v1    
    import logging 
    logging.basicConfig(stream=sys.stdout, level=logging.INFO) 
    
    # Create a client
    client = cloudbuild_v1.CloudBuildClient()
    name = f"projects/{project_id}/locations/us-central1/triggers/{trigger_id}"
    # Initialize request argument(s)
    request = cloudbuild_v1.RunBuildTriggerRequest(        
        project_id="de2024-436414",
        trigger_id="1d9db7f0-bd68-4559-8b42-7b6812646cd7",
        name=name
    )

    # Make the request
    operation = client.run_build_trigger(request=request)
    
    logging.info("Trigger the CI-CD Pipeline: " + trigger_id)

#### Define the Pipeline

In [9]:
@kfp.dsl.pipeline(name="performance-predictor-stacked-training-pipeline")
def pipeline(project_id: str, dataset_uri: str, model_repo: str, thresholds_dict_str: str, trigger_id: str):
    '''Pipeline definition for training, evaluation, and deployment'''
    
    dataset_op = kfp.dsl.importer(
        artifact_uri=dataset_uri,
        artifact_class=Dataset,
        reimport=False,
    )
    
    # Train-test split
    split_op = train_test_split(dataset=dataset_op.output)
    
    # Model training
    training_op = train_stacked_model(features=split_op.outputs["dataset_train"])
    
    # Model evaluation
    evaluation_op = model_evaluation(
        test_set=split_op.outputs["dataset_test"],
        model=training_op.outputs["model"],
        thresholds_dict_str=thresholds_dict_str
    )
    
    # Conditional deployment
    with dsl.If(evaluation_op.outputs["approval"] == True):
        upload_op = upload_model_to_gcs(
            project_id="de2024-436414",
            model_repo="models_de2024_17",
            model=training_op.outputs['model']
        )
        run_build_trigger(project_id=project_id, trigger_id=trigger_id).after(upload_op)

#### Compile the pipeline into a JSON file

In [10]:
from kfp import compiler
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='performance_prdictor_stacked_training_pipeline.yaml')

#### Submit the pipeline run

In [11]:
import google.cloud.aiplatform as aip

# Before initializing, make sure to set the GOOGLE_APPLICATION_CREDENTIALS
# environment variable to the path of your service account.
aip.init(
    project=PROJECT_ID,
    location=REGION,
)

# Prepare the pipeline job
job = aip.PipelineJob(
    display_name="performance-predictor-stacked",
    enable_caching=False, # Make this False and True as necessary 
    template_path="performance_prdictor_stacked_training_pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
    location=REGION,
    parameter_values={
        'project_id': PROJECT_ID, # makesure to use your project id 
        'data_bucket': 'data_de2024_17',  # makesure to use your data bucket name 
        'dataset_uri':'gs://data_de2024_17/training_set.csv',
        'model_repo':'models_de2024_17', # makesure to use your model bucket name 
        'thresholds_dict_str':'{"roc":0.8}',
        'trigger_id':'1d9db7f0-bd68-4559-8b42-7b6812646cd7' # makesure to use the id of the cloud build tigger deploying the model. For example, "id" field returned from runing gcloud builds triggers describe [your trigger name] --region=us-central1
    }
)

job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/966204339179/locations/us-central1/pipelineJobs/diabetes-predictor-training-pipeline-20241013122317
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/966204339179/locations/us-central1/pipelineJobs/diabetes-predictor-training-pipeline-20241013122317')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/diabetes-predictor-training-pipeline-20241013122317?project=966204339179
PipelineJob projects/966204339179/locations/us-central1/pipelineJobs/diabetes-predictor-training-pipeline-20241013122317 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/966204339179/locations/us-central1/pipelineJobs/diabetes-predictor-training-pipeline-20241013122317 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/966204339179/locations/us-central1/pipelineJobs/diabetes-predictor-training-pipeline-202410131223