## Vertex AI Pipeline(Kubeflow) Training model pipeline

This sample kubeflow pipeline is for training the simple regression model.
</br>Newyork taxi data used. </br>
https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page </br>

This pipeline generate the model to predict the driving duration, 
based on pickup location (PULocationID), dropoff location(DOLocationID) and trip distance.

Generated model will be registered to Model Registry in Vertex AI,
This model will be deployed to Online Prediction in Vertex AI.

For scheduing test purpose, I used just random month from 2022 to see the whole process works fine
running every minute.

This Kubeflow training pipeline consist of steps following.


1. Download the data
2. Preprocessing
3. Training
4. Evaluating
5. Register the model
6. Deploy the model (Depending on the evaluation result)
7. Get test prediction results

![img_info](./kubeflow-graph.png)


In [1]:
USER_FLAG = "--user"

In [2]:
%%capture
!pip3 install {USER_FLAG} google-cloud-aiplatform==1.0.0 --upgrade
!pip3 install {USER_FLAG} kfp google-cloud-pipeline-components==0.1.1 --upgrade
!pip install -U google-cloud-aiplatform "shapely<2"

In [3]:
import os
PROJECT_ID = ""

if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Project ID:  yorrr78-dev-111111


In [4]:
BUCKET_NAME="gs://" + PROJECT_ID + "-mlops-bucket"
!gsutil mb -l asia $BUCKET_NAME

Creating gs://yorrr78-dev-111111-mlops-bucket/...
ServiceException: 409 A Cloud Storage bucket named 'yorrr78-dev-111111-mlops-bucket' already exists. Try another name. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.


In [44]:
# Restart
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [5]:
from typing import NamedTuple
import kfp
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, ClassificationMetrics, Metrics, component)
from kfp.v2.google.client import AIPlatformClient
from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
import pandas as pd
import requests
import os
from tqdm import tqdm
import pickle
import pandas as pd
from sklearn.feature_extraction import DictVectorizer
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
from sklearn.pipeline import make_pipeline
from google.cloud import storage
import logging

In [6]:
!python3 -c "import sklearn; print('Scikit-Learn version: {}'.format(sklearn.__version__))"
!python3 --version

Scikit-Learn version: 1.4.0
Python 3.10.13


In [7]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="asia-northeast3"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

env: PATH=/usr/local/cuda/bin:/opt/conda/bin:/opt/conda/condabin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/snap/bin:/home/jupyter/.local/bin


'gs://yorrr78-dev-111111-mlops-bucket/pipeline_root/'

In [11]:
# 1. Download the data
@component(
    base_image="python:3.10-slim", 
    output_component_file="./components/ingestion-component.yaml",
    packages_to_install=["google-cloud-storage", "tqdm"]
)
def download_taxi_data(
    bucket_name:str
) -> str:
    """
    Data will be downloaded from 'https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page'
    which is public dataset from New York.
    This downloaded parquet file will be located in 'data'folder of your GCS bucket.
    """
    
    from google.cloud import storage
    from tqdm import tqdm
    import requests
    import os
    import logging
    from datetime import datetime, timedelta
    import random 
    
    
    def get_file_name_by_date():
        """Get the file name from pipeline running date"""
        
        # # Get the previous month data file name 
        # current_time = datetime.now() - timedelta(days=31)
        # file_month = '%02d' % current_time.month
        # file = f'green_tripdata_2022-{file_month}.parquet'

        # Test purpose
        random_number = random.randint(1, 12)
        current_month = '%02d' % random_number

        return f'green_tripdata_2022-{current_month}.parquet'
    
    
    def download_to_local(download_url, file):
        """Download the target file from internet to local"""
        
        if os.path.isfile(f'./data/{file}'):
            logging.info('Already exist')
            pass
        
        else:
            if not os.path.exists('data'):
                os.mkdir('data')
            
            file_url = download_url + file
            response = requests.get(file_url, stream=True)
            logging.info(f'downloading.. {file}')
            
            with open(f'./data/{file}', 'wb') as f_in:
                for chunk in tqdm(response.iter_content()):
                    f_in.write(chunk)
            logging.info('Download finished!')

    
    def upload_to_blob(bucket_name, file):
        """Upload the temporary file to the GCS blob"""
        
        with open(f'./data/{file}', 'rb') as f_out:
            result = blob.upload_from_file(f_out)

        logging.info(f'Finished downloading {file} to GCS bucket {bucket_name}')
    
    
    download_url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/'
    file = get_file_name_by_date()
    
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(f'data/{file}')
    
    if blob.exists():
        logging.info(f'Blob {file} already exists in bucket {bucket_name}')
        return f'gs://{bucket_name}/data/{file}'
    
    download_to_local(download_url, file)
    upload_to_blob(bucket_name, file)
    
    return f'gs://{bucket_name}/data/{file}'
    

In [12]:
# 2. Preprocess data
@component(
    base_image="python:3.10-slim", 
    output_component_file="./components/preprocessing-component.yaml",
    packages_to_install=["pandas", "tqdm", "fastparquet", "pyarrow", "numpy"]
)
def preprocess_taxi_data(
    input_data: str,
    train_size: float,
    valid_size: float,
    test_size: float,
    train_data: Output[Dataset],
    valid_data: Output[Dataset],
    test_data: Output[Dataset],
) -> str:
    """
    Data will be preprocessed this step.
    - Model type: regression model
    - Target column: duration (will be calculated with pickup time & dropoff time) 
    - Feature columns: pickup location (PULocationID), dropoff location(DOLocationID) and trip distance
    """
    
    import pandas as pd
    import numpy as np
    
    
    df = pd.read_parquet(input_data)
    columns = [
        'PULocationID',
        'DOLocationID',
        'lpep_dropoff_datetime',
        'lpep_pickup_datetime',
        'trip_distance',
    ]
    df = df[columns]

    df['duration'] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    df.duration = df.duration.dt.total_seconds() / 60
    df = df[(df.duration >= 1) & (df.duration <= 60)] # Get rid of outliers

    categorical = ['PULocationID', 'DOLocationID']
    df[categorical] = df[categorical].astype(str)
    
    train_ds, valid_ds, test_ds = np.split(df.sample(frac=1, random_state=42), [int((train_size)*len(df)), int((1-test_size)*len(df))])
    
    # Use .path for passing data in Artifact
    train_ds.to_parquet(train_data.path, index=False) 
    valid_ds.to_parquet(valid_data.path, index=False)
    test_ds.to_parquet(test_data.path, index=False)


In [13]:
#3 Train data
@component(
    base_image="python:3.9-slim", 
    output_component_file="./components/training-component.yaml",
    packages_to_install=["pandas", "scikit-learn", "fastparquet", "pyarrow"]
)
def train_taxi_data(
    train_data: Input[Dataset], 
    params: dict,
    model: Output[Model],
)-> str:
    """
    This step will train the regression model using Scikit-learn randomforest regressor.
    - Model type: regression model
    - Target column: duration (will be calculated with pickup time & dropoff time) 
    - Feature columns: pickup location (PULocationID), dropoff location(DOLocationID) and trip distance
    """
    
    import pandas as pd
    import pickle
    import os
    from sklearn.feature_extraction import DictVectorizer
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.pipeline import make_pipeline
    import sklearn
    
    
    def prepare_dictionaries(df: pd.DataFrame):
        """Composite"""
        df['PU_DO'] = df['PULocationID'] + '_' + df['DOLocationID']
        categorical = ['PU_DO']
        numerical = ['trip_distance']
        dicts = df[categorical + numerical].to_dict(orient='records')
        return dicts

    
    def train_model(X_train, y_train):
        """Model training"""
        pipeline = make_pipeline(
            DictVectorizer(),
            RandomForestRegressor(**params, n_jobs=-1)
        )
        pipeline.fit(X_train, y_train)
        return pipeline
        
    
    df = pd.read_parquet(train_data.path)
    target = 'duration'
    X_train = prepare_dictionaries(df)
    y_train = df[target].values
    
    my_model = train_model(X_train, y_train)
    
    model.metadata["model_name"] = "RandomForestRegressor"
    model.metadata["framework"] = "sklearn"
    model.metadata["framework_version"] = sklearn.__version__
    file_name = model.path + f".pkl"
    
    with open(file_name, 'wb') as f:
        pickle.dump(my_model, f)

In [14]:
# 4. Evaluate model
@component(
    base_image="python:3.10-slim", 
    output_component_file="./components/evaluating-component.yaml",
    packages_to_install=["pandas", "scikit-learn", "tqdm", "fastparquet", "pyarrow", "numpy"]
)
def evaluate_model(
    val_data: Input[Dataset],
    model: Input[Model],
    target_column_name: str,
    deployment_metric: str,
    deployment_metric_threshold: float,
    kpi: Output[Metrics],
) -> NamedTuple(
    "Outputs", [
        ("dep_decision", str),
    ]
):
    """
    Evaluating model good enough to deploy.
    
    Data from preprocessing step will be used for evalutaion (valid_data).
    given 'deployment_metric', 'deployment_metric_threshold' values are compared.
    if passing threshold, model will be registered aand deployed,
    if not, this is the pipeline end.
    """
    
    from sklearn.metrics import mean_absolute_error, r2_score, mean_squared_error, mean_absolute_percentage_error
    import pandas as pd
    from google.cloud import storage
    from tqdm import tqdm
    import requests
    import os
    import pickle
    import logging
    import numpy as np
    
    
    def prepare_dictionaries(df: pd.DataFrame):
        """Composite"""
        df['PU_DO'] = df['PULocationID'] + '_' + df['DOLocationID']
        categorical = ['PU_DO']
        numerical = ['trip_distance']
        dicts = df[categorical + numerical].to_dict(orient='records')
        return dicts
    
    
    val_ds = pd.read_parquet(val_data.path)
    target = target_column_name
    
    X_test = prepare_dictionaries(val_ds)
    y_test = val_ds[target].values
    
    logging.info(f"model.path : {model.path}")
    file_name = model.path + f".pkl"
    logging.info(f"file_name : {file_name}")
    
    with open(file_name, 'rb') as f:  
        model = pickle.load(f)
    
    y_pred = model.predict(X_test)
    r2 = r2_score(y_true=y_test, y_pred=y_pred)
    mae = mean_absolute_error(y_true=y_test, y_pred=y_pred)
    mse = mean_squared_error(y_true=y_test, y_pred=y_pred)
    mape = mean_absolute_percentage_error(y_true=y_test, y_pred=y_pred)
    rmse = np.sqrt(mse)
    
    model_metrics = {
        "r2": r2, 
        "mae": mae, 
        "mape": mape, 
        "mse" : mse, 
        "rmse" : rmse,
    }

    logging.info(f"Adjusted_R2 : {r2}")
    logging.info(f"Mean Absolute Error : {mae}")
    logging.info(f"Mean Absolute Percentage Error : {round(mape,4)*100}%")
    logging.info(f"Mean Squared Error : {mse}")
    logging.info(f"Root Mean Squared Error : {rmse}")
    
    kpi.log_metric("Adjusted_R2", float(r2))
    kpi.log_metric("Mean Absolute Error", float(mae))
    kpi.log_metric("Mean Absolute Percentage Error", float(mape))
    kpi.log_metric("Mean Squared Error", float(mse))
    kpi.log_metric("Root Mean Squared Error", float(rmse))
    
    actual_metric_value = model_metrics.get(deployment_metric)
    
    if actual_metric_value >= deployment_metric_threshold:
        dep_decision = "true"
    else:
        dep_decision = "false"
        
    return (dep_decision,)

In [15]:
#5 Register model
@component(
    packages_to_install=["pandas", "pyarrow", "scikit-learn" , "fsspec" , "gcsfs" , "google-cloud-aiplatform"], 
    base_image="python:3.9-slim", 
    output_component_file="./components/registering-component.yaml"
)
def register_model(
    serving_container_uri: str,
    project_id: str,
    region: str,
    model_name: str, 
    model: Input[Model], 
)-> NamedTuple(
    "Outputs",
    [
        ("model_resource_name", str),  # Return parameter.
    ],
):
    """Regster the model to Vertex AI model registry"""
    
    from google.cloud import aiplatform
    import logging
    
    
    logging.info(f"serving_container_uri: {serving_container_uri}")
    logging.info(f"project_id: {project_id}")
    logging.info(f"region: {region}")
    logging.info(f"model: {model}")
    logging.info(f"model.uri: {model.uri[:-5]}")
    
    # for artifact_uri arg,
    # The model name must be one of: saved_model.pb, model.pkl, model.joblib, or model.bst, depending on which library you used.
    aiplatform.init(project=project_id, location=region)
    model = aiplatform.Model.upload(
        display_name= model_name,
        # artifact_uri=model.uri,
        artifact_uri=model.uri[:-5],
        serving_container_image_uri=serving_container_uri
    )
    
    return (model.resource_name,)

In [16]:
#6 Deploy model
@component(
    packages_to_install=["pandas", "pyarrow",  "scikit-learn" , "fsspec" , "gcsfs", "google-cloud-aiplatform"], 
    base_image="python:3.9-slim", 
    output_component_file="./components/deploying-component.yaml"
)
def deploy_model(
    model_resource_name: str ,
    project_id: str ,
    region: str
)-> NamedTuple(
    "Outputs",
    [
        ("endpoint_resource_name", str),
    ],
):
    """Deploy the model to Vertex AI for online prediction"""
    from google.cloud import aiplatform
    import logging
    
    
    logging.info(f"model_resource_name : {model_resource_name}")
    logging.info(f"project_id : {project_id}")
    logging.info(f"region : {region}")
    
    aiplatform.init(project=project_id, location=region)
    
    model = aiplatform.Model(model_resource_name)
    endpoint = model.deploy(
        machine_type="n1-standard-2",
        min_replica_count=1,
        max_replica_count=1
    )
    
    return (endpoint.resource_name,)

In [17]:
#7 Test prediction
@component(
    packages_to_install=["pandas", "pyarrow", "google-cloud-aiplatform", "google-cloud-storage"], 
    base_image="python:3.9-slim", 
    output_component_file="./components/predicting-component.yaml"
)
def test_prediction(
    project_id: str ,
    region: str,
    test_ds: Input[Dataset],
    endpoint_resource_name: str,
    bucket_name: str,
    prediction_blob: str
) -> str:
    
    """Get predictions from model served with test data"""
    from google.cloud import aiplatform
    from google.cloud import storage
    import logging
    import pandas as pd
    import json
    
    
    logging.info(f"testds: {test_ds}")
    logging.info(f"test_ds: {test_ds.path}")
    logging.info(f"model_resource_name : {region}")

    def prepare_dictionaries(df: pd.DataFrame):
        """Composite"""
        df['PU_DO'] = df['PULocationID'] + '_' + df['DOLocationID']
        categorical = ['PU_DO']
        numerical = ['trip_distance']
        dicts = df[categorical + numerical].to_dict(orient='records')
        return dicts
    
    
    def get_predictions(instances, region, endpoint_resource_name):
        """Get predictions from deployed model"""
        
        client_options = {"api_endpoint": f"{region}-aiplatform.googleapis.com"}
        client = aiplatform.gapic.PredictionServiceClient(client_options=client_options)

        response = client.predict(
            endpoint=endpoint_resource_name, 
            instances=instances
        )

        predictions = {
            "predictions": list(response.predictions)
        }

        # Write predictions to a JSON file
        output_file = "predictions.json"
        with open(output_file, "w") as f:
            json.dump(predictions, f)
        
        return output_file
            
        
    def upload_to_gcs(prediction_file, bucket_name, prediction_blob):
        """Upload the file to Google Cloud Storage"""
        client = storage.Client()
        bucket = client.bucket(bucket_name)
        blob = bucket.blob(prediction_blob)
        blob.upload_from_filename(prediction_file)

        logging.info(f"Predictions uploaded to gs://{bucket_name}/{prediction_blob} successfully!")    
        
        
    test_ds = pd.read_parquet(test_ds.path)
    test_dict_ds = prepare_dictionaries(test_ds)
    
    prediction_file = get_predictions(
        instances=test_dict_ds,
        region=region,
        endpoint_resource_name=endpoint_resource_name,
    )
    
    upload_to_gcs(prediction_file, bucket_name, prediction_blob)
    
    return f'gs://{bucket_name}/{prediction_blob}' 

In [18]:
%%writefile config.json
{
    "project_id": "yorrr78-dev-111111",
    "region": "asia-northeast3",
    "pipeline_name": "taxi-data-model-kfp-test",
    "pipeline_package_path": "nytaxi_model_pipeline_job.json",
    "bucket_name": "yorrr78-dev-111111-mlops-bucket",
    "train_size": 0.8,
    "valid_size": 0.1,
    "test_size": 0.1,
    "deployment_metric": "r2",
    "deployment_metric_threshold": 0.7,
    "serving_container_uri": "asia-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-3:latest",
    "model_name": "sklearn-kubeflow-nytaxi-regression-model",
    "params": {
        "max_depth": 20,
        "n_estimators": 100,
        "min_samples_leaf": 10,
        "random_state": 0
    },
    "prediction_blob": "predictions/predictions.json"
}


Overwriting config.json


In [20]:
%%writefile kfp_build_training_pipeline.py

import json
import yaml

import kfp
from kfp.v2 import dsl
from kfp.v2 import compiler
from kfp.components import load_component_from_file


download_taxi_data = load_component_from_file("./components/ingestion-component.yaml")
preprocess_taxi_data = load_component_from_file("./components/preprocessing-component.yaml")
train_taxi_data = load_component_from_file("./components/training-component.yaml")
evaluate_model = load_component_from_file("./components/evaluating-component.yaml")
register_model = load_component_from_file("./components/registering-component.yaml")
deploy_model = load_component_from_file("./components/deploying-component.yaml")
test_prediction = load_component_from_file("./components/predicting-component.yaml")


#read configuration from file
with open("config.json") as json_file:
    config = json.load(json_file)

PIPELINE_NAME = config.get("pipeline_name")
PACKAGE_PATH = config.get("pipeline_package_path")
BUCKET_NAME = config.get("bucket_name") 
PIPELINE_ROOT = f"gs://{BUCKET_NAME}/pipeline_root/"
    
@dsl.pipeline(
    name=PIPELINE_NAME,
    pipeline_root=PIPELINE_ROOT
)
def model_pipeline(
    project_id:str="",
    region:str="asia-northeast3",
    pipeline_name:str="",
    pipeline_package_path:str="",
    bucket_name:str="",
    train_size:float=0.8,
    valid_size:float=0.1,
    test_size:float=0.1,
    deployment_metric:str="r2",
    deployment_metric_threshold:float=0.7,
    serving_container_uri:str='asia-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-3:latest',
    model_name:str='sklearn-kubeflow-nytaxi-regression-model',
    params:dict={
        'max_depth': 20,
        'n_estimators': 100,
        'min_samples_leaf': 10,
        'random_state': 0
    },
    prediction_blob:str='predictions/predictions.json',
):
    ingestion_task = download_taxi_data(bucket_name)
    processing_task = preprocess_taxi_data(
        input_data=ingestion_task.output,
        train_size=train_size,
        valid_size=valid_size,
        test_size=test_size
    )
    training_task = train_taxi_data(
        train_data=processing_task.outputs["train_data"],
        params=params,
    )
    evaluating_task = evaluate_model(
        val_data=processing_task.outputs["valid_data"],
        model=training_task.outputs["model"],
        target_column_name='duration',
        deployment_metric=deployment_metric,
        deployment_metric_threshold=deployment_metric_threshold,
    )
    
    with dsl.Condition(
        evaluating_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):
        # check the container uri list here: https://cloud.google.com/vertex-ai/docs/predictions/pre-built-containers#scikit-learn
        # deploy only if metric value exceeds deployment threshold
        registering_task = register_model(
            serving_container_uri=serving_container_uri,
            project_id=project_id,
            region=region,
            model_name=model_name,
            model=training_task.outputs["model"],
        )
        
        deploying_task = deploy_model(
            model_resource_name = registering_task.outputs["model_resource_name"],
            project_id=project_id,
            region=region
        )
        
        predicting_task = test_prediction(
            project_id=project_id,
            region=region,
            test_ds=processing_task.outputs["test_data"],
            endpoint_resource_name=deploying_task.outputs["endpoint_resource_name"],
            bucket_name=bucket_name,
            prediction_blob=prediction_blob,
        )

compiler.Compiler().compile(
    pipeline_func=model_pipeline, package_path=PACKAGE_PATH
)


Overwriting kfp_build_training_pipeline.py


In [21]:
%%writefile kfp_run_training_pipeline.py

from google.cloud import aiplatform
import yaml
import json
from datetime import datetime, timedelta

with open("config.json") as json_file:
    config = json.load(json_file)

DISPLAY_NAME = config.get("pipeline_name")
PACKAGE_PATH = config.get("pipeline_package_path")
BUCKET_NAME = config.get("bucket_name") 
PIPELINE_ROOT = f"gs://{BUCKET_NAME}/pipeline_root/"

job = aiplatform.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path=PACKAGE_PATH,
    pipeline_root=PIPELINE_ROOT,
    parameter_values=config,
)

# For test purpose, execute pipeline every min
job_schedule = job.create_schedule(
  display_name=DISPLAY_NAME,
  cron="* * * * *",
  max_concurrent_run_count=5,
  max_run_count=30,
)


Overwriting kfp_run_training_pipeline.py


In [24]:
!python3 kfp_build_training_pipeline.py



In [25]:
!python3 kfp_run_training_pipeline.py

Creating PipelineJobSchedule
PipelineJobSchedule created. Resource name: projects/616906371504/locations/us-central1/schedules/3306143503792209920
To use this PipelineJobSchedule in another session:
schedule = aiplatform.PipelineJobSchedule.get('projects/616906371504/locations/us-central1/schedules/3306143503792209920')
View Schedule:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/schedules/3306143503792209920?project=616906371504


In [90]:
! gsutil cp ./config.json $BUCKET_NAME
! gsutil cp ./nytaxi_model_pipeline_job.json $BUCKET_NAME

Copying file://./config.json [Content-Type=application/json]...
/ [1 files][  729.0 B/  729.0 B]                                                
Operation completed over 1 objects/729.0 B.                                      
Copying file://./nytaxi_model_pipeline_job.json [Content-Type=application/json]...
/ [1 files][ 39.7 KiB/ 39.7 KiB]                                                
Operation completed over 1 objects/39.7 KiB.                                     


### Create by api_client.create_run_from_job_spec

In [129]:
@dsl.pipeline(
    name="taxi-data-training",
    description="NYC taxi open data for testing VertexAI Kubeflow",
    pipeline_root=PIPELINE_ROOT
)
def model_pipeline(
    project_id:str='yorrr78-dev-111111',
    region:str='asia-northeast3',
    bucket_name:str='yorrr78-dev-111111-mlops-bucket',
    train_size:float=0.8,
    valid_size:float=0.1,
    test_size:float=0.1,
    deployment_metric:str="r2",
    deployment_metric_threshold:float=0.7,
    serving_container_uri:str='asia-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-3:latest',
    model_name:str='sklearn-kubeflow-nytaxi-regression-model',
    params:dict={
        'max_depth': 20,
        'n_estimators': 100,
        'min_samples_leaf': 10,
        'random_state': 0
    },
    prediction_blob:str='predictions/predictions.json',
):
    ingestion_task = download_taxi_data(bucket_name)
    processing_task = preprocess_taxi_data(
        input_data=ingestion_task.output,
        train_size=train_size,
        valid_size=valid_size,
        test_size=test_size
    )
    training_task = train_taxi_data(
        train_data=processing_task.outputs["train_data"],
        params=params,
    )
    evaluating_task = evaluate_model(
        val_data=processing_task.outputs["valid_data"],
        model=training_task.outputs["model"],
        target_column_name='duration',
        deployment_metric=deployment_metric,
        deployment_metric_threshold=deployment_metric_threshold,
    )
    
    with dsl.Condition(
        evaluating_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):
        # check the container uri list here: https://cloud.google.com/vertex-ai/docs/predictions/pre-built-containers#scikit-learn
        # deploy only if metric value exceeds deployment threshold
        registering_task = register_model(
            serving_container_uri=serving_container_uri,
            project_id=project_id,
            region=region,
            model_name=model_name,
            model=training_task.outputs["model"],
        )
        
        deploying_task = deploy_model(
            model_resource_name = registering_task.outputs["model_resource_name"],
            project_id=project_id,
            region=region
        )
        
        predicting_task = test_prediction(
            project_id=project_id,
            region=region,
            test_ds=processing_task.outputs["test_data"],
            endpoint_resource_name=deploying_task.outputs["endpoint_resource_name"],
            bucket_name=bucket_name,
            prediction_blob=prediction_blob,
        )


In [107]:
compiler.Compiler().compile(
    pipeline_func=model_pipeline, package_path="nytaxi_model_pipeline_job.json"
)

Config loaded successfully: {'project_id': 'yorrr78-dev-111111', 'region': 'asia-northeast3', 'pipeline_name': 'taxi-data-model-kfp-test', 'pipeline_package_path': 'nytaxi_model_pipeline_job.json', 'bucket_name': 'yorrr78-dev-111111-mlops-bucket', 'train_size': 0.8, 'valid_size': 0.1, 'test_size': 0.1, 'deployment_metric': 'r2', 'deployment_metric_threshold': 0.7, 'serving_container_uri': 'asia-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-3:latest', 'model_name': 'sklearn-kubeflow-nytaxi-regression-model', 'params': {'max_depth': 20, 'n_estimators': 100, 'min_samples_leaf': 10, 'random_state': 0}, 'prediction_blob': 'predictions/predictions.json'}


In [127]:
api_client = AIPlatformClient(
    project_id=PROJECT_ID,
    region=REGION,
)

'green_tripdata_2022-02.parquet'

In [None]:

response = api_client.create_run_from_job_spec(
    job_spec_path="nytaxi_model_pipeline_job.json",
)