In [None]:
!pip install google-cloud-aiplatform google-cloud-pipeline-components kfp scikit-learn mlflow

In [None]:
PROJECT_ID = "YOUR_PROJECT_ID"
EMAIL_RECIPIENTS = [ "YOUR_EMAIL" ]
#Public data is stored in us region so we'll use that
REGION="us-central1"
BUCKET_NAME = "YOUR_BUCKET_NAME"
BUCKET_URI = f"gs://{BUCKET_NAME}"
SERVICE_ACCOUNT = "YOUR SERVICE ACCOUNT pipeline job to access resources like Cloud Storage"

import os
os.system(f"gcloud config set project {PROJECT_ID}")
os.mkdir("training_package")
os.mkdir("training_package/trainer")

In [None]:
#Set default compute engine account to owner for simple testing

In [None]:
#Chicago Taxi Trips dataset, for predicting the fare for a taxi ride based on features such as ride time, location and distance
#Add 'mlops' dataset under bigquery with US multi region and run following command
CREATE OR REPLACE TABLE `mlops.chicago`
AS (
    WITH
      taxitrips AS (
      SELECT
        trip_start_timestamp,
        trip_end_timestamp,
        trip_seconds,
        trip_miles,
        payment_type,
        pickup_longitude,
        pickup_latitude,
        dropoff_longitude,
        dropoff_latitude,
        tips,
        tolls,
        fare,
        pickup_community_area,
        dropoff_community_area,
        company,
        unique_key
      FROM
        `bigquery-public-data.chicago_taxi_trips.taxi_trips`
      WHERE pickup_longitude IS NOT NULL
      AND pickup_latitude IS NOT NULL
      AND dropoff_longitude IS NOT NULL
      AND dropoff_latitude IS NOT NULL
      AND trip_miles > 0
      AND trip_seconds > 0
      AND fare > 0
      AND EXTRACT(YEAR FROM trip_start_timestamp) = 2019
    )

    SELECT
      trip_start_timestamp,
      EXTRACT(MONTH from trip_start_timestamp) as trip_month,
      EXTRACT(DAY from trip_start_timestamp) as trip_day,
      EXTRACT(DAYOFWEEK from trip_start_timestamp) as trip_day_of_week,
      EXTRACT(HOUR from trip_start_timestamp) as trip_hour,
      trip_seconds,
      trip_miles,
      payment_type,
      ST_AsText(
          ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1)
      ) AS pickup_grid,
      ST_AsText(
          ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1)
      ) AS dropoff_grid,
      ST_Distance(
          ST_GeogPoint(pickup_longitude, pickup_latitude),
          ST_GeogPoint(dropoff_longitude, dropoff_latitude)
      ) AS euclidean,
      CONCAT(
          ST_AsText(ST_SnapToGrid(ST_GeogPoint(pickup_longitude,
              pickup_latitude), 0.1)),
          ST_AsText(ST_SnapToGrid(ST_GeogPoint(dropoff_longitude,
              dropoff_latitude), 0.1))
      ) AS loc_cross,
      IF((tips/fare >= 0.2), 1, 0) AS tip_bin,
      tips,
      tolls,
      fare,
      pickup_longitude,
      pickup_latitude,
      dropoff_longitude,
      dropoff_latitude,
      pickup_community_area,
      dropoff_community_area,
      company,
      unique_key,
      trip_end_timestamp
    FROM
      taxitrips
    LIMIT 1000000
);

In [None]:
%%writefile training_package/trainer/task.py
#Create task.py as training code
# Import the libraries
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from google.cloud import bigquery, bigquery_storage
from sklearn.ensemble import RandomForestRegressor
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from google import auth
from scipy import stats
import numpy as np
import argparse
import joblib
import pickle
import csv
import os

# add parser arguments
parser = argparse.ArgumentParser()
parser.add_argument('--project-id', dest='project_id',  type=str, help='Project ID.')
parser.add_argument('--training-dir', dest='training_dir', default=os.getenv("AIP_MODEL_DIR"),
                    type=str, help='Dir to save the data and the trained model.')
parser.add_argument('--bq-source', dest='bq_source',  type=str, help='BigQuery data source for training data.')
args = parser.parse_args()

# data preparation code
BQ_QUERY = """
with tmp_table as (
SELECT trip_seconds, trip_miles, fare,
    tolls,  company,
    pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude,
    DATETIME(trip_start_timestamp, 'America/Chicago') trip_start_timestamp,
    DATETIME(trip_end_timestamp, 'America/Chicago') trip_end_timestamp,
    CASE WHEN (pickup_community_area IN (56, 64, 76)) OR (dropoff_community_area IN (56, 64, 76)) THEN 1 else 0 END is_airport,
FROM `{}`
WHERE
  dropoff_latitude IS NOT NULL and
  dropoff_longitude IS NOT NULL and
  pickup_latitude IS NOT NULL and
  pickup_longitude IS NOT NULL and
  fare > 0 and
  trip_miles > 0
  and MOD(ABS(FARM_FINGERPRINT(unique_key)), 100) between 0 and 99
ORDER BY RAND()
LIMIT 10000)
SELECT *,
    EXTRACT(YEAR FROM trip_start_timestamp) trip_start_year,
    EXTRACT(MONTH FROM trip_start_timestamp) trip_start_month,
    EXTRACT(DAY FROM trip_start_timestamp) trip_start_day,
    EXTRACT(HOUR FROM trip_start_timestamp) trip_start_hour,
    FORMAT_DATE('%a', DATE(trip_start_timestamp)) trip_start_day_of_week
FROM tmp_table
""".format(args.bq_source)
# Get default credentials
credentials, project = auth.default()
bqclient = bigquery.Client(credentials=credentials, project=args.project_id)
bqstorageclient = bigquery_storage.BigQueryReadClient(credentials=credentials)
df = (
    bqclient.query(BQ_QUERY)
    .result()
    .to_dataframe(bqstorage_client=bqstorageclient)
)
# Add 'N/A' for missing 'Company'
df.fillna(value={'company':'N/A','tolls':0}, inplace=True)
# Drop rows containing null data.
df.dropna(how='any', axis='rows', inplace=True)
# Pickup and dropoff locations distance
df['abs_distance'] = (np.hypot(df['dropoff_latitude']-df['pickup_latitude'], df['dropoff_longitude']-df['pickup_longitude']))*100

# Remove extremes, outliers
possible_outliers_cols = ['trip_seconds', 'trip_miles', 'fare', 'abs_distance']
df=df[(np.abs(stats.zscore(df[possible_outliers_cols].astype(float))) < 3).all(axis=1)].copy()
# Reduce location accuracy
df=df.round({'pickup_latitude': 3, 'pickup_longitude': 3, 'dropoff_latitude':3, 'dropoff_longitude':3})

# Drop the timestamp col
X=df.drop(['trip_start_timestamp', 'trip_end_timestamp'],axis=1)

# Split the data into train and test
X_train, X_test = train_test_split(X, test_size=0.10, random_state=123)

## Format the data for batch predictions
# select string cols
string_cols = X_test.select_dtypes(include='object').columns
# Add quotes around string fields
X_test[string_cols] = X_test[string_cols].apply(lambda x: '\"' + x + '\"')
# Add quotes around column names
X_test.columns = ['\"' + col + '\"' for col in X_test.columns]
# Save DataFrame to csv
X_test.to_csv(os.path.join(args.training_dir,"test.csv"),index=False,quoting=csv.QUOTE_NONE, escapechar=' ')
# Save test data without the target for batch predictions
X_test.drop('\"fare\"',axis=1,inplace=True)
X_test.to_csv(os.path.join(args.training_dir,"test_no_target.csv"),index=False,quoting=csv.QUOTE_NONE, escapechar=' ')

# Separate the target column
y_train=X_train.pop('fare')
# Get the column indexes
col_index_dict = {col: idx for idx, col in enumerate(X_train.columns)}
# Create a column transformer pipeline
ct_pipe = ColumnTransformer(transformers=[
    ('hourly_cat', OneHotEncoder(categories=[range(0,24)], sparse = False), [col_index_dict['trip_start_hour']]),
    ('dow', OneHotEncoder(categories=[['Mon', 'Tue', 'Sun', 'Wed', 'Sat', 'Fri', 'Thu']], sparse = False), [col_index_dict['trip_start_day_of_week']]),
    ('std_scaler', StandardScaler(), [
        col_index_dict['trip_start_year'],
        col_index_dict['abs_distance'],
        col_index_dict['pickup_longitude'],
        col_index_dict['pickup_latitude'],
        col_index_dict['dropoff_longitude'],
        col_index_dict['dropoff_latitude'],
        col_index_dict['trip_miles'],
        col_index_dict['trip_seconds']])
])
# Add the random-forest estimator to the pipeline
rfr_pipe = Pipeline([
    ('ct', ct_pipe),
    ('forest_reg', RandomForestRegressor(
        n_estimators = 20,
        max_features = 1.0,
        n_jobs = -1,
        random_state = 3,
        max_depth=None,
        max_leaf_nodes=None,
    ))
])

# train the model
rfr_score = cross_val_score(rfr_pipe, X_train, y_train, scoring = 'neg_mean_squared_error', cv = 5)
rfr_rmse = np.sqrt(-rfr_score)
print ("Crossvalidation RMSE:",rfr_rmse.mean())
final_model=rfr_pipe.fit(X_train, y_train)
# Save the model pipeline
with open(os.path.join(args.training_dir,"model.joblib"), 'wb') as model_file:
    pickle.dump(final_model, model_file)

In [None]:
%%writefile training_package/setup.py
#Definition of training package
from setuptools import find_packages
from setuptools import setup

REQUIRED_PACKAGES=["google-cloud-bigquery[pandas]","google-cloud-bigquery-storage"]
setup(
    name='trainer',
    version='0.1',
    install_requires=REQUIRED_PACKAGES,
    packages=find_packages(),
    include_package_data=True,
    description='Training application package for chicago taxi trip fare prediction.'
)

In [None]:
! cd training_package && python setup.py sdist --formats=gztar && cd ..

In [None]:
# Create staging bucket
os.system(f"gcloud storage buckets create {BUCKET_URI} --location={REGION} --project={PROJECT_ID}")

In [None]:
# Copy the training package to the bucket
os.system(f"gcloud storage cp training_package/dist/trainer-0.1.tar.gz {BUCKET_URI}/")

In [None]:
PIPELINE_ROOT = "{}/pipeline_root/chicago-taxi-pipe".format(BUCKET_URI)
PIPELINE_NAME = "vertex-pipeline-datatrigger-tutorial"
WORKING_DIR = f"{PIPELINE_ROOT}/mlops-datatrigger-tutorial"
os.environ['AIP_MODEL_DIR'] = WORKING_DIR
EXPERIMENT_NAME = PIPELINE_NAME + "-experiment"
PIPELINE_FILE = PIPELINE_NAME + ".yaml"

In [None]:
from google.cloud import aiplatform

aiplatform.init(
    project=PROJECT_ID,
    staging_bucket=BUCKET_URI,
    location=REGION,
    experiment=EXPERIMENT_NAME)

aiplatform.autolog()

#Refer following link for local testing
https://docs.cloud.google.com/vertex-ai/docs/pipelines/build-pipeline#test_a_pipeline_locally_optional

In [None]:
from kfp import dsl
from kfp.dsl import importer
from kfp.dsl import OneOf
from google_cloud_pipeline_components.v1.custom_job import CustomTrainingJobOp
from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.v1.batch_predict_job import ModelBatchPredictOp
from google_cloud_pipeline_components.v1.model_evaluation import ModelEvaluationRegressionOp
from google_cloud_pipeline_components.v1.vertex_notification_email import VertexNotificationEmailOp
from google_cloud_pipeline_components.v1.endpoint import ModelDeployOp
from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp
from google.cloud import aiplatform

# define the train-deploy pipeline
@dsl.pipeline(name="custom-model-training-evaluation-pipeline")
def custom_model_training_evaluation_pipeline(
    project: str,
    location: str,
    training_job_display_name: str,
    worker_pool_specs: list,
    base_output_dir: str,
    prediction_container_uri: str,
    model_display_name: str,
    batch_prediction_job_display_name: str,
    target_field_name: str,
    test_data_gcs_uri: list,
    ground_truth_gcs_source: list,
    batch_predictions_gcs_prefix: str,
    batch_predictions_input_format: str="csv",
    batch_predictions_output_format: str="jsonl",
    ground_truth_format: str="csv",
    parent_model_resource_name: str=None,
    parent_model_artifact_uri: str=None,
    existing_model: bool=False

):
    # Notification task
    notify_task = VertexNotificationEmailOp(
                    recipients= EMAIL_RECIPIENTS
                    )
    with dsl.ExitHandler(notify_task, name='MLOps Continuous Training Pipeline'):
        # Train the model
        custom_job_task = CustomTrainingJobOp(
                                    project=project,
                                    display_name=training_job_display_name,
                                    worker_pool_specs=worker_pool_specs,
                                    base_output_directory=base_output_dir,
                                    location=location
                            )

        # Import the unmanaged model
        import_unmanaged_model_task = importer(
                                        artifact_uri=base_output_dir,
                                        artifact_class=artifact_types.UnmanagedContainerModel,
                                        metadata={
                                            "containerSpec": {
                                                "imageUri": prediction_container_uri,
                                            },
                                        },
                                    ).after(custom_job_task)

        with dsl.If(existing_model == True):
            # Import the parent model to upload as a version
            import_registry_model_task = importer(
                                        artifact_uri=parent_model_artifact_uri,
                                        artifact_class=artifact_types.VertexModel,
                                        metadata={
                                            "resourceName": parent_model_resource_name
                                        },
                                    ).after(import_unmanaged_model_task)
            # Upload the model as a version
            model_version_upload_op = ModelUploadOp(
                                    project=project,
                                    location=location,
                                    display_name=model_display_name,
                                    parent_model=import_registry_model_task.outputs["artifact"],
                                    unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
                                )

        with dsl.Else():
            # Upload the model
            model_upload_op = ModelUploadOp(
                                    project=project,
                                    location=location,
                                    display_name=model_display_name,
                                    unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
                                )
        # Get the model (or model version)
        model_resource = OneOf(model_version_upload_op.outputs["model"], model_upload_op.outputs["model"])

        # Batch prediction
        batch_predict_task = ModelBatchPredictOp(
                            project= project,
                            job_display_name= batch_prediction_job_display_name,
                            model= model_resource,
                            location= location,
                            instances_format= batch_predictions_input_format,
                            predictions_format= batch_predictions_output_format,
                            gcs_source_uris= test_data_gcs_uri,
                            gcs_destination_output_uri_prefix= batch_predictions_gcs_prefix,
                            machine_type= 'n1-standard-2'
                            )
        # Evaluation task
        evaluation_task = ModelEvaluationRegressionOp(
                            project= project,
                            target_field_name= target_field_name,
                            location= location,
                            # model= model_resource,
                            predictions_format= batch_predictions_output_format,
                            predictions_gcs_source= batch_predict_task.outputs["gcs_output_directory"],
                            ground_truth_format= ground_truth_format,
                            ground_truth_gcs_source= ground_truth_gcs_source,
                            prediction_score_column="fare"
                            )
    return

In [None]:
from kfp import dsl
from kfp import compiler

compiler.Compiler().compile(
    pipeline_func=custom_model_training_evaluation_pipeline,
    package_path="{}.yaml".format(PIPELINE_NAME),
)

In [None]:
REPO_NAME = "mlops"
# Create a repo in the artifact registry
os.system(f"gcloud artifacts repositories create {REPO_NAME} --location={REGION} --repository-format=KFP")

In [None]:
from kfp.registry import RegistryClient

host = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}"
client = RegistryClient(host=host)
TEMPLATE_NAME, VERSION_NAME = client.upload_pipeline(
file_name=PIPELINE_FILE,
tags=["v1", "latest"],
extra_headers={"description":"This is an example pipeline template."})
TEMPLATE_URI = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}/{TEMPLATE_NAME}/latest"

In [None]:
DATASET_NAME = "mlops"
TABLE_NAME = "chicago"

worker_pool_specs = [{
                        "machine_spec": {"machine_type": "e2-highmem-2"},
                        "replica_count": 1,
                        "python_package_spec":{
                                "executor_image_uri": "us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-0:latest",
                                "package_uris": [f"{BUCKET_URI}/trainer-0.1.tar.gz"],
                                "python_module": "trainer.task",
                                "args":["--project-id",PROJECT_ID, "--training-dir",f"/gcs/{BUCKET_NAME}","--bq-source",f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}"]
                        },
}]

parameters = {
    "project": PROJECT_ID,
    "location": REGION,
    "training_job_display_name": "taxifare-prediction-training-job",
    "worker_pool_specs": worker_pool_specs,
    "base_output_dir": BUCKET_URI,
    "prediction_container_uri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
    "model_display_name": "taxifare-prediction-model",
    "batch_prediction_job_display_name": "taxifare-prediction-batch-job",
    "target_field_name": "fare",
    "test_data_gcs_uri": [f"{BUCKET_URI}/test_no_target.csv"],
    "ground_truth_gcs_source": [f"{BUCKET_URI}/test.csv"],
    "batch_predictions_gcs_prefix": f"{BUCKET_URI}/batch_predict_output",
    "existing_model": False
}

In [None]:
#Manual run of pipeline

# Create a pipeline job
job = aiplatform.PipelineJob(
    display_name="triggered_custom_regression_evaluation",
    template_path=TEMPLATE_URI ,
    parameter_values=parameters,
    pipeline_root=BUCKET_URI,
    enable_caching=False
)
# Run the pipeline job
job.run(
    service_account=SERVICE_ACCOUNT
)

In [None]:
#add job to scheduler for automated running
job_schedule = job.create_schedule(
  display_name="mlops tutorial schedule",
  cron="0 0 1 * *", #
  max_concurrent_run_count=1,
  max_run_count=12,
)

In [None]:
#Run pipeline when there is new data
https://cloud.google.com/vertex-ai/docs/pipelines/continuous-training-tutorial#run_the_pipeline_when_there_is_new_data