In [2]:
import os 
from kfp import dsl
from kfp.dsl import Artifact, Dataset, Input, Output, Model, Metrics, Markdown, HTML, component
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "dev_mlops.json"
from kfp import compiler
from google.cloud import aiplatform as vertex
from google.cloud.aiplatform import pipeline_jobs
from datetime import datetime
import pandas as pd
from typing import NamedTuple

import numpy as np


In [57]:
project_id = "mlops-workshop-420523"
region = "us-central1"
bucket = "gs://ml_gcp_bucket"
pipeline_root_path = f"{bucket}/stroke/pipelines"
base_image = "us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-0:latest"

In [58]:
# Read dataset
@component(base_image=base_image)
def get_stroke_data(filepath: str, dataset_train: Output[Dataset]):
    import pandas as pd
    df_train = pd.read_csv(filepath)
    df_train.to_csv(dataset_train.path, index=False)

In [59]:
# data preprocesssing
@component(base_image=base_image)
def preprocess_stroke_data(train_df: Input[Dataset], dataset_train_preprocessed: Output[Dataset]):
    import pandas as pd

    categorical_columns=["gender", "ever_married", "work_type", "Residence_type", "smoking_status"]
    target_column="stroke"

    dataset_df = pd.read_csv(train_df.path)
    
    # one hot encoding categorical columns
    dataset_df = pd.get_dummies(dataset_df, columns=categorical_columns)

    # fill null values
    dataset_df = dataset_df.fillna(dataset_df.mean())

    dataset_df.to_csv(dataset_train_preprocessed.path, index=False)




In [60]:
# Train test split
@component(base_image=base_image)
def train_test_split(
    dataset_in: Input[Dataset],
    dataset_train: Output[Dataset],
    dataset_test: Output[Dataset],
    test_size: float = 0.3,
):
    import pandas as pd
    from sklearn.model_selection import train_test_split
    # split features and target
    df = pd.read_csv(dataset_in.path)

    # # split train and test sets
    df_train, df_test = train_test_split(df, test_size=test_size, random_state=0)
    
    # dataset_train = Dataset()
    # dataset_test = Dataset()

    df_train.to_csv(dataset_train.path, index=False)
    df_test.to_csv(dataset_test.path, index=False)

    # return dataset_train, dataset_test

In [61]:
# train
@component(base_image=base_image)
def train_stroke(
    dataset_train: Input[Dataset],
    model: Output[Model],
):
    import pandas as pd
    import pickle
    import joblib
    from sklearn.tree import DecisionTreeClassifier

    TARGET = "stroke"

    # read train and test data
    train_data = pd.read_csv(dataset_train.path)

    X_train = train_data.drop(TARGET, axis=1)
    y_train = train_data[TARGET].values

    dt_model = DecisionTreeClassifier()
    dt_model.fit(X_train, y_train)

    joblib.dump(dt_model, model.path)


    

In [62]:
# evaluate model
@component(base_image=base_image)
def evaluate_stroke(
    stroke_model: Input[Model],
    dataset_test: Input[Dataset],
    metrics_test: Output[Metrics],
):
    import pandas as pd
    import joblib
    test_data = pd.read_csv(dataset_test.path)

    TARGET = "stroke"
    X_test = test_data.drop(TARGET, axis=1)
    y_test = test_data[TARGET].values

    model = joblib.load(stroke_model.path)

    mean_acc = model.score(X_test, y_test)

    metrics_test.log_metric("mean_accuracy", mean_acc)


In [63]:
# pipeline
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
DISPLAY_NAME = 'pipeline-stroke-job{}'.format(TIMESTAMP)

In [64]:
@dsl.pipeline(
    pipeline_root=pipeline_root_path,
    # A name for the pipeline. Use to determine the pipeline Context.
    name="pipeline-houseprice"   
)

def pipeline(
    data_filepath: str = "gs://ml_gcp_bucket/stroke/datasets/healthcare-dataset-stroke-data.csv",
    project: str = project_id,
    region: str = region, 
    display_name: str = DISPLAY_NAME,        
):

    data_op = get_stroke_data(filepath=data_filepath)
    data_preprocess_op = preprocess_stroke_data(train_df=data_op.outputs["dataset_train"])
    train_test_split_op = train_test_split(dataset_in=data_preprocess_op.outputs["dataset_train_preprocessed"])
    train_model_op = train_stroke(dataset_train=train_test_split_op.outputs["dataset_train"])
    model_evaluation_op = evaluate_stroke(stroke_model=train_model_op.outputs["model"], dataset_test=train_test_split_op.outputs["dataset_test"])
           


In [65]:

compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='ml_stroke.json')

In [66]:
start_pipeline = pipeline_jobs.PipelineJob(
    display_name="stroke-pipeline",
    template_path="ml_stroke.json",
    enable_caching=False,
    location=region,
)

In [67]:
start_pipeline.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/667868132446/locations/us-central1/pipelineJobs/pipeline-houseprice-20240417160333
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/667868132446/locations/us-central1/pipelineJobs/pipeline-houseprice-20240417160333')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/pipeline-houseprice-20240417160333?project=667868132446
PipelineJob projects/667868132446/locations/us-central1/pipelineJobs/pipeline-houseprice-20240417160333 current state:
3
PipelineJob projects/667868132446/locations/us-central1/pipelineJobs/pipeline-houseprice-20240417160333 current state:
3
PipelineJob projects/667868132446/locations/us-central1/pipelineJobs/pipeline-houseprice-20240417160333 current state:
3
PipelineJob projects/667868132446/locations/us-central1/pipelineJobs/pipeline-houseprice-20240417160333 current state:
3
PipelineJob projects/6678681324