# Heart Disease Pipeline Model

## Setup Environment

In [1]:
USER_FLAG = "--user"

In [2]:
!gcloud services enable compute.googleapis.com         \
                       containerregistry.googleapis.com  \
                       aiplatform.googleapis.com  \
                       cloudbuild.googleapis.com \
                       cloudfunctions.googleapis.com

Operation "operations/acat.p2-316990051574-2d85e193-c2a5-41f0-9fc3-16566293a8c6" finished successfully.


In [19]:
!pip3 install {USER_FLAG} google-cloud-aiplatform==1.11.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0
!pip3 install fsspec
!pip3 install gcsfs



In [3]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [3]:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

KFP SDK version: 1.8.9
google_cloud_pipeline_components version: 0.2.0


### Setting Bucket name 

In [4]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="europe-west1"

# Get projet name
shell_output=!gcloud config get-value project 2> /dev/null
PROJECT_ID=shell_output[0]

# Set bucket name
BUCKET_NAME="gs://"+PROJECT_ID+"-bucket"

# Create bucket
PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root_heartds/"
PIPELINE_ROOT

USER_FLAG = "--user"

env: PATH=/usr/local/cuda/bin:/opt/conda/bin:/opt/conda/condabin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/jupyter/.local/bin


In [5]:
BUCKET_NAME

'gs://mle-rimac-heartdisease-bucket'

### Import Libraries

In [6]:
import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics, Dataset, OutputPath, InputPath

from kfp.v2 import compiler
from google.cloud import bigquery
from google.cloud import aiplatform
from google.cloud import storage
from google.cloud.aiplatform import pipeline_jobs
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple
import pandas as pd

## Create Pipeline

### Load and Process Data
Componente para carga y procesamiento de datos

In [7]:
@component(
    packages_to_install=["pandas", "pyarrow", "scikit-learn==1.0.0","fsspec","gcsfs"],
    base_image="python:3.9",
    output_component_file="load_data.yaml"
)

def load_data(
    url: str,
    ds_train: Output[Dataset],
    ds_test: Output[Dataset],
):
    import pandas as pd
    import numpy as np
    from sklearn.model_selection import train_test_split
    
    df = pd.read_csv(url)   
    
    df['target'] = df['HeartDisease']
    df = df.drop(['HeartDisease'], axis=1)
    
    train, test = train_test_split(df, test_size=0.3, random_state=42)
    train.to_csv(ds_train.path + ".csv" , index=False)
    test.to_csv(ds_test.path + ".csv" , index=False)

In [16]:
@component(
    packages_to_install = ["pandas","scikit-learn",
                           "lightgbm","joblib"],
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="train_model.yaml",
)
def train_model(
    dataset:  Input[Dataset],
    model: Output[Model], 
):
    
    import joblib
    from sklearn.compose import make_column_transformer
    from sklearn.preprocessing import OneHotEncoder
    from sklearn.model_selection import train_test_split
    from sklearn.pipeline import make_pipeline
    from lightgbm import LGBMClassifier
    import pandas as pd

    df = pd.read_csv(dataset.path+".csv")
    #getting numerical and categorical variables
    numerical= df.drop(['target'], axis=1).select_dtypes('number').columns
    categorical = df.select_dtypes('object').columns
    
    #One-Hot encoding on categorical data
    ohe = OneHotEncoder()
    ct = make_column_transformer((ohe, categorical),remainder='passthrough')
    
    X_train = df.drop(columns=["target"])
    y_train = df.target
    
    #LightGBM classifier object
    lgbmc = LGBMClassifier(random_state=0)
    
    #define pipe to avoid data leakage
    pipe = make_pipeline(ct, lgbmc)
    pipe.fit(X_train, y_train)

    model.metadata["framework"] = "LGBM"
    #save model into pickle file
    joblib.dump(pipe, model.path + f".pkl")

### Evaluate Model
Evaluación y métricas del modelo

In [24]:
@component(
    packages_to_install=["pandas", "scikit-learn","lightgbm"],
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="eval_model.yaml"
)
def eval_model(
    test_set:  Input[Dataset],
    lgbm_heartds_model: Input[Model],
    thresholds_dict_str: str,
    metrics: Output[ClassificationMetrics],
    kpi: Output[Metrics]
) -> NamedTuple("output", [("deploy", str)]):

    from joblib import load
    from lightgbm import LGBMClassifier
    import pandas as pd
    import logging
    from sklearn.metrics import roc_curve, confusion_matrix, accuracy_score
    import json
    import typing

    
    def threshold_check(val1, val2):
        cond = "false"
        if val1 >= val2 :
            cond = "true"
        return cond

    df = pd.read_csv(test_set.path+".csv")
    model = load(open(lgbm_heartds_model.path + ".pkl",'rb'))
    
    x_test = df.drop(columns=["target"])
    y_test = df.target
    y_pred = model.predict(x_test)
    
    probs =  model.predict_proba(df.drop(columns=["target"]))[:, 1]
    fpr, tpr, thresholds = roc_curve(
         y_true=df.target.to_numpy(), y_score=probs, pos_label=True
    )
    metrics.log_roc_curve(fpr.tolist(), tpr.tolist(), thresholds.tolist())  
    
    metrics.log_confusion_matrix(
       ["False", "True"],
       confusion_matrix(
           df.target, y_pred
       ).tolist(), 
    )
    
    accuracy = accuracy_score(df.target, y_pred.round())
    thresholds_dict = json.loads(thresholds_dict_str)
    lgbm_heartds_model.metadata["accuracy"] = float(accuracy)
    kpi.log_metric("accuracy", float(accuracy))
    deploy = threshold_check(float(accuracy), int(thresholds_dict['roc']))
    return (deploy,)
    


In [18]:
@component(
    packages_to_install=["google-cloud-aiplatform", "sklearn",  "kfp"],
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="heartds_endpoint.yml"
)
def deploy_model(
    model: Input[Model],
    project: str,
    region: str,
    #serving_container_image_uri : str, 
    vertex_endpoint: Output[Artifact],
    vertex_model: Output[Model]
):
    from google.cloud import aiplatform
    aiplatform.init(project=project, location=region)

    DISPLAY_NAME  = "heartds"
    MODEL_NAME = "heartds-lgbm"
    ENDPOINT_NAME = "heartds-predict"
    
    def create_endpoint():
        endpoints = aiplatform.Endpoint.list(
        filter='display_name="{}"'.format(ENDPOINT_NAME),
        order_by='create_time desc',
        project=project, 
        location=region,
        )
        if len(endpoints) > 0:
            endpoint = endpoints[0]  # most recently created
        else:
            endpoint = aiplatform.Endpoint.create(
            display_name=ENDPOINT_NAME, project=project, location=region
        )
    endpoint = create_endpoint()   
    
    
    #Import a model programmatically
    '''
    model_upload = aiplatform.Model.upload(
        display_name = DISPLAY_NAME, 
        artifact_uri = model.uri.replace("model", ""),
        serving_container_image_uri =  serving_container_image_uri,
        serving_container_health_route=f"/v1/models/{MODEL_NAME}",
        serving_container_predict_route=f"/v1/models/{MODEL_NAME}:predict",
        serving_container_environment_variables={
        "MODEL_NAME": MODEL_NAME,
    },       
    )
    model_deploy = model_upload.deploy(
        machine_type="n1-standard-4", 
        endpoint=endpoint,
        traffic_split={"0": 100},
        deployed_model_display_name=DISPLAY_NAME,
    )
    '''
    model_deploy = model.deploy(
        machine_type="n1-standard-4", 
        endpoint=endpoint,
        traffic_split={"0": 100},
        deployed_model_display_name=DISPLAY_NAME,
    )

    # Save data to the output params
    vertex_model.uri = model_deploy.resource_name

In [19]:
from datetime import datetime

TIMESTAMP =datetime.now().strftime("%Y%m%d%H%M%S")
DISPLAY_NAME = 'pipeline-heartds-job{}'.format(TIMESTAMP)

In [25]:
@dsl.pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT,
    # A name for the pipeline. Use to determine the pipeline Context.
    name="pipeline-heartds",
    
)
def pipeline(
    url: str = "gs://mle-rimac-heartdisease-bucket/heart.csv",
    project: str = PROJECT_ID,
    region: str = REGION, 
    display_name: str = DISPLAY_NAME,
    api_endpoint: str = REGION+"-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"roc":0.8}',
    serving_container_image_uri: str = "europe-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-24:latest"
    ):
    
    data_op = load_data(url)
    train_model_op = train_model(data_op.outputs["ds_train"])
    model_evaluation_op = eval_model(
        test_set=data_op.outputs["ds_test"],
        lgbm_heartds_model=train_model_op.outputs["model"],
        thresholds_dict_str = thresholds_dict_str, # I deploy the model anly if the model performance is above the threshold
    )
    
    with dsl.Condition(
        model_evaluation_op.outputs["deploy"]=="true",
        name="deploy-heartds",
    ):
           
        deploy_model_op = deploy_model(
        model=train_model_op.outputs['model'],
        project=project,
        region=region, 
        #serving_container_image_uri = serving_container_image_uri,
        )

In [26]:
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='ml_heartds.json')

In [27]:
ml_pipeline_job = aiplatform.PipelineJob(
    display_name="rimac_chall_heartds_training",
    template_path="ml_heartds.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project": PROJECT_ID, "display_name": DISPLAY_NAME},
    enable_caching=True
)

In [28]:
ml_pipeline_job.run()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/316990051574/locations/us-central1/pipelineJobs/pipeline-heartds-20220917002714
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/316990051574/locations/us-central1/pipelineJobs/pipeline-heartds-20220917002714')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/pipeline-heartds-20220917002714?project=316990051574
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/316990051574/locations/us-central1/pipelineJobs/pipeline-heartds-20220917002714 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/316990051574/locations/us-central1/pipelineJobs/p

RuntimeError: Job failed with:
code: 9
message: "The DAG failed because some tasks failed. The failed tasks are: [condition-deploy-heartds-1].; Job (project_id = mle-rimac-heartdisease, job_id = 6923405917291544576) is failed due to the above error.; Failed to handle the job: {project_number = 316990051574, job_id = 6923405917291544576}"
