<a href="https://colab.research.google.com/github/nomadicsenseis/Real-time-forecasting-with-BQML-and-VertexAI-pipelines/blob/main/Real_time_forecasting_with_BQML_and_Vertex_AI_pipelines.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Introduction
In this notebook I will be implementing 3 forecasting pipelines using Vertex AI and Biq Quey ML. The forecast will be done using the NYC bike's rentals public dataset. The different forecast will consist on:

1) Simple rentals forecast across the whole city using ARIMA PLUS with no id_column defined (just one model). Therefore, the auto_arima_max_order parameter will be set to 5.

2) Multivariate ARIMA_PLUS model forecasting using time_series_id_col = 'start_station_name' (one model for each station in the city). In order to redduce costs (and sacrifying a bit of accuracy) the auto_arima_max_order parameter will be 2 (or 3) this time.

3) K-means clustering to segment stations and users and ARIMA_PLUS to forecast withing the clusters. 

In order to train these models periodically, I make use of VErtex AI pipelines, creating a training, evaluation an plotting components (among other) for each.

#Before the fun

##Intallations

In [19]:
import os

# The Vertex AI Workbench Notebook product has specific requirements
IS_WORKBENCH_NOTEBOOK = os.getenv("DL_ANACONDA_HOME")
IS_USER_MANAGED_WORKBENCH_NOTEBOOK = os.path.exists(
    "/opt/deeplearning/metadata/env_version"
)

# Vertex AI Notebook requires dependencies to be installed with '--user'
USER_FLAG = ""
if IS_WORKBENCH_NOTEBOOK:
    USER_FLAG = "--user"
    
! pip3 install --upgrade "kfp" \
                         "google-cloud-aiplatform" \
                         "google_cloud_pipeline_components" \
                         "google-cloud-bigquery" {USER_FLAG} -q

! pip3 install --upgrade "tensorflow<2.8.0" {USER_FLAG} -q

In [20]:
# Automatically restart kernel after installs
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

##GCP connection/authentification

###Project

In [1]:
import os

PROJECT_ID = "incentro-spain-projects"  # @param {type:"string"}


In [2]:
! gcloud config set project $PROJECT_ID

Updated property [core/project].


###Region

1) Americas: "us-central1"
2) Europe: "europe-west4"
3) ASia pacific: "asia-east1"
You may not use a multi-regional bucket for training with Vertex AI. Not all regions provide support for all Vertex AI services. Since my dataset wil be hosted in the US, I will select the Americas region.

In [3]:
REGION = "us-central1"

###Authentificate

In [17]:
# If you are running this notebook in Colab, run this cell and follow the
# instructions to authenticate your GCP account. This provides access to your
# Cloud Storage bucket and lets you submit training jobs and prediction
# requests.

import os
import sys

# If on Vertex AI Workbench, then don't execute this code
IS_COLAB = "google.colab" in sys.modules
if not os.path.exists("/opt/deeplearning/metadata/env_version") and not os.getenv(
    "DL_ANACONDA_HOME"
):
    if "google.colab" in sys.modules:
        from google.colab import auth as google_auth

        google_auth.authenticate_user()

    # If you are running this notebook locally, replace the string below with the
    # path to your service account key and run this cell to authenticate your GCP
    # account.
    elif not os.getenv("IS_TESTING"):
        %env GOOGLE_APPLICATION_CREDENTIALS ''

##Cloud storage bucket

If you have already created a bucket for this project:

In [5]:
BUCKET_NAME = "looker-demo-nyc-bikes"  # @param {type:"string"}
BUCKET_URI = f"gs://{BUCKET_NAME}"

##Imports

In [6]:
from pathlib import Path as path
from typing import NamedTuple
# General
from urllib.parse import urlparse

import google.cloud.aiplatform as vertex_ai
# Check components
import tensorflow as tf
# Simulate operations
from google.cloud import bigquery
# ML pipeline
from google_cloud_pipeline_components.v1.bigquery import (
    BigqueryCreateModelJobOp, BigqueryEvaluateModelJobOp,
    BigqueryExplainForecastModelJobOp, BigqueryForecastModelJobOp,
    BigqueryMLArimaEvaluateJobOp, BigqueryQueryJobOp)
from kfp.v2 import compiler, dsl
from kfp.v2.dsl import HTML, Artifact, Condition, Input, Output, component

##Initialize BQ SDK for Python

In [7]:
bq_client = bigquery.Client(project=PROJECT_ID, location=REGION)

##Create local directories

In [8]:
DATA_PATH = "data"
KFP_COMPONENTS_PATH = "components"
PIPELINES_PATH = "pipelines"

! mkdir -m 777 -p {DATA_PATH}
! mkdir -m 777 -p {KFP_COMPONENTS_PATH}
! mkdir -m 777 -p {PIPELINES_PATH}

#Big Query ML formalization

##Set up pipeline variables

In [9]:
# BQML pipeline job configuation
PIPELINE_NAME = "bqml-forecast-pipeline"
PIPELINE_ROOT = urlparse(BUCKET_URI)._replace(path="pipeline_root").geturl()
PIPELINE_PACKAGE = str(path(PIPELINES_PATH) / f"{PIPELINE_NAME}.json")

# BQML pipeline conponent configuration
BQ_DATASET = "Looker_demo_pipeline"
BQ_BIKES_TABLE_PREFIX = "citibike_trips"
BQ_TRAINING_TABLE_PREFIX = "bike_trips_training"
BQ_MODEL_TABLE_PREFIX = "bike_trips_forecast_arima"
BQ_EVALUATE_TS_TABLE_PREFIX = "bike_trips_arima_time_series_evaluate"
BQ_EVALUATE_MODEL_TABLE_PREFIX = "bike_trips_arima_model_evaluate"
BQ_FORECAST_TABLE_PREFIX = "bike_trips_arima_forecast"
BQ_EXPLAIN_FORECAST_TABLE_PREFIX = "bike_trips_arima_explain_forecast"
BQ_BIKES_TABLE = f"{BQ_BIKES_TABLE_PREFIX}"
BQ_TRAINING_TABLE = f"{BQ_TRAINING_TABLE_PREFIX}"
BQ_MODEL_TABLE = f"{BQ_MODEL_TABLE_PREFIX}"
BQ_EVALUATE_TS_TABLE = f"{BQ_EVALUATE_TS_TABLE_PREFIX}"
BQ_EVALUATE_MODEL_TABLE = f"{BQ_EVALUATE_MODEL_TABLE_PREFIX}"
BQ_FORECAST_TABLE = f"{BQ_FORECAST_TABLE_PREFIX}"
BQ_EXPLAIN_FORECAST_TABLE = f"{BQ_EXPLAIN_FORECAST_TABLE_PREFIX}"

BQ_TRAIN_CONFIGURATION = {
    "destinationTable": {
        "projectId": PROJECT_ID,
        "datasetId": BQ_DATASET,
        "tableId": BQ_TRAINING_TABLE,
    },
    "writeDisposition": "WRITE_TRUNCATE",
}

BQ_EVALUATE_TS_CONFIGURATION = {
    "destinationTable": {
        "projectId": PROJECT_ID,
        "datasetId": BQ_DATASET,
        "tableId": BQ_EVALUATE_TS_TABLE,
    },
    "writeDisposition": "WRITE_TRUNCATE",
}
BQ_EVALUATE_MODEL_CONFIGURATION = {
    "destinationTable": {
        "projectId": PROJECT_ID,
        "datasetId": BQ_DATASET,
        "tableId": BQ_EVALUATE_MODEL_TABLE,
    },
    "writeDisposition": "WRITE_TRUNCATE",
}
BQ_FORECAST_CONFIGURATION = {
    "destinationTable": {
        "projectId": PROJECT_ID,
        "datasetId": BQ_DATASET,
        "tableId": BQ_FORECAST_TABLE,
    },
    "writeDisposition": "WRITE_TRUNCATE",
}
BQ_EXPLAIN_FORECAST_CONFIGURATION = {
    "destinationTable": {
        "projectId": PROJECT_ID,
        "datasetId": BQ_DATASET,
        "tableId": BQ_EXPLAIN_FORECAST_TABLE,
    },
    "writeDisposition": "WRITE_TRUNCATE",
}
PERF_THRESHOLD = 3000

##Create location for storing YAML component definitions

In [10]:
! mkdir -m 777 -p {KFP_COMPONENTS_PATH}/custom_components

#Components

##Create custom component to read model evaluation metrics

Build a custom component to consume model evaluation metrics for visualizations in the Vertex AI Pipelines UI using Kubeflow SDK visualization APIs. Indeed, Vertex AI allows you to render that HTML in an output page which is easily accessible from the Google Cloud console.

In [11]:
@component(
    base_image="python:3.8-slim",
    packages_to_install=["jinja2", "pandas", "matplotlib"],
    output_component_file=f"{KFP_COMPONENTS_PATH}/custom_components/build_bq_evaluate_metrics.yaml",
)
def get_model_evaluation_metrics(
    metrics_in: Input[Artifact], metrics_out: Output[HTML]
) -> NamedTuple("Outputs", [("avg_mean_absolute_error", float)]):
    """
    Get the average mean absolute error from the metrics
    Args:
        metrics_in: metrics artifact
        metrics_out: metrics artifact
    Returns:
        avg_mean_absolute_error: average mean absolute error
    """

    import pandas as pd

    # Helpers
    def prettyfier(styler):
        """
        Helper function to prettify the metrics table.
        Args:
            styler: Styler object
        Returns:
            Styler object
        """
        caption = {
            "selector": "caption",
            "props": [
                ("caption-side", "top"),
                ("font-size", "150%"),
                ("font-weight", "bold"),
                ("font-family", "arial"),
            ],
        }
        headers = {
            "selector": "th",
            "props": [("color", "black"), ("font-family", "arial")],
        }
        rows = {
            "selector": "td",
            "props": [("text-align", "center"), ("font-family", "arial")],
        }
        styler.set_table_styles([caption, headers, rows])
        styler.set_caption("Forecasting accuracy report ")
        styler.hide(axis="index")
        styler.format(precision=2)
        styler.background_gradient(cmap="Blues")
        return styler

    def get_column_names(header):
        """
        Helper function to get the column names from the metrics table.
        Args:
            header: header
        Returns:
            column_names: column names
        """
        header_clean = header.replace("_", " ")
        header_abbrev = "".join([h[0].upper() for h in header_clean.split()])
        header_prettied = f"{header_clean} ({header_abbrev})"
        return header_prettied

    # Extract rows and schema from metrics artifact
    rows = metrics_in.metadata["rows"]
    schema = metrics_in.metadata["schema"]

    # Convert into a tabular format
    columns = [metrics["name"] for metrics in schema["fields"] if "name" in metrics]
    records = []
    for row in rows:
        records.append([dl["v"] for dl in row["f"]])
    metrics = (
        pd.DataFrame.from_records(records, columns=columns, index="product_name")
        .astype(float)
        .round(3)
    )
    metrics = metrics.reset_index()

    # Create the HTML artifact for the metrics
    pretty_columns = list(
        map(
            lambda h: get_column_names(h)
            if h != columns[0]
            else h.replace("_", " ").capitalize(),
            columns,
        )
    )
    pretty_metrics = metrics.copy()
    pretty_metrics.columns = pretty_columns
    html_metrics = pretty_metrics.style.pipe(prettyfier).to_html()
    with open(metrics_out.path, "w") as f:
        f.write(html_metrics)

    # Create metrics dictionary for the model
    avg_mean_absolute_error = round(float(metrics.mean_absolute_error.mean()), 0)
    component_outputs = NamedTuple("Outputs", [("avg_mean_absolute_error", float)])

    return component_outputs(avg_mean_absolute_error)



#Create pipeline

In [47]:

@dsl.pipeline(
    name=PIPELINE_NAME,
    description="A batch pipeline to train ARIMA PLUS using BQML",
)
def pipeline(
    bq_dataset: str = BQ_DATASET,
    bq_bikes_table: str = BQ_BIKES_TABLE,
    bq_training_table: str = BQ_TRAINING_TABLE,
    bq_train_configuration: dict = BQ_TRAIN_CONFIGURATION,
    bq_model_table: str = BQ_MODEL_TABLE,
    bq_evaluate_time_series_configuration: dict = BQ_EVALUATE_TS_CONFIGURATION,
    bq_evaluate_model_configuration: dict = BQ_EVALUATE_MODEL_CONFIGURATION,
    performance_threshold: float = PERF_THRESHOLD,
    bq_forecast_configuration: dict = BQ_FORECAST_CONFIGURATION,
    bq_explain_forecast_configuration: dict = BQ_EXPLAIN_FORECAST_CONFIGURATION,
    project: str = PROJECT_ID,
    location: str = REGION.split('-')[0],
):

    # Create the training dataset
    create_training_dataset_op = BigqueryQueryJobOp(
        query=f"""
        -- create the training table
        WITH 
        -- get 90% percentile for time series splitting
        get_split AS (
          SELECT APPROX_QUANTILES(DATETIME_TRUNC(starttime, DAY), 100)[OFFSET(90)] as split
          FROM `{project}.{bq_dataset}.{bq_bikes_table}`
        ),
        -- get train table
        get_train AS (
          SELECT
            DATETIME_TRUNC(starttime, DAY) as hourly_timestamp,
            start_station_name,
            COUNT(*) AS rentals,
            FROM `{project}.{bq_dataset}.{bq_bikes_table}`
        GROUP BY hourly_timestamp, start_station_name
        )
        SELECT
          *,
          CASE WHEN hourly_timestamp < (SELECT split FROM get_split) THEN 'TRAIN' ELSE 'TEST' END AS split
        FROM get_train
        ORDER BY hourly_timestamp
        """,
        job_configuration_query=bq_train_configuration,
        project=project,
        location=location,
    ).set_display_name("get train data")

    # Run an ARIMA PLUS experiment
    bq_arima_model_exp_op = (
        BigqueryCreateModelJobOp(
            query=f"""
        -- create model table
        CREATE OR REPLACE MODEL `{project}.{bq_dataset}.{bq_model_table}`
        OPTIONS(
        MODEL_TYPE = \'ARIMA_PLUS\',
        TIME_SERIES_TIMESTAMP_COL = \'hourly_timestamp\',
        TIME_SERIES_DATA_COL = \'rentals\',
        TIME_SERIES_ID_COL = [\'start_station_name\'],
        AUTO_ARIMA_MAX_ORDER=2
        ) AS
        SELECT
          hourly_timestamp,
          start_station_name,
          rentals
        FROM `{project}.{bq_dataset}.{bq_training_table}`
        WHERE split='TRAIN';
        """,
            project=project,
            location=location,
        )
        .set_display_name("run arima+ model experiment")
        .after(create_training_dataset_op)
    )

    # Evaluate ARIMA PLUS time series
    _ = (
        BigqueryMLArimaEvaluateJobOp(
            project=project,
            location=location,
            model=bq_arima_model_exp_op.outputs["model"],
            show_all_candidate_models=False,
            job_configuration_query=bq_evaluate_time_series_configuration,
        )
        .set_display_name("evaluate arima plus time series")
        .after(bq_arima_model_exp_op)
    )

    # Evaluate ARIMA Plus model
    bq_arima_evaluate_model_op = (
        BigqueryEvaluateModelJobOp(
            project=project,
            location=location,
            model=bq_arima_model_exp_op.outputs["model"],
            query_statement=f"""SELECT * FROM `{project}.{bq_dataset}.{bq_training_table}` WHERE split='TEST'""",
            job_configuration_query=bq_evaluate_model_configuration,
        )
        .set_display_name("evaluate arima plus model")
        .after(bq_arima_model_exp_op)
    )

    # Plot model metrics
    get_evaluation_model_metrics_op = (
        get_model_evaluation_metrics(
            bq_arima_evaluate_model_op.outputs["evaluation_metrics"]
        )
        .after(bq_arima_evaluate_model_op)
        .set_display_name("plot evaluation metrics")
    )

    # Check the model performance. If ARIMA_PLUS average MAE metric is below to a minimal threshold
    with Condition(
        get_evaluation_model_metrics_op.outputs["avg_mean_absolute_error"]
        < PERF_THRESHOLD,
        name="avg. mae good",
    ):
        # Train the ARIMA PLUS model
        bq_arima_model_op = (
            BigqueryCreateModelJobOp(
                query=f"""
        -- create model table
        CREATE OR REPLACE MODEL `{project}.{bq_dataset}.{bq_model_table}`
        OPTIONS(
        MODEL_TYPE = \'ARIMA_PLUS\',
        TIME_SERIES_TIMESTAMP_COL = \'hourly_timestamp\',
        TIME_SERIES_DATA_COL = \'rentals\',
        TIME_SERIES_ID_COL = [\'start_station_name\'],
        AUTO_ARIMA_MAX_ORDER=2,
        MODEL_REGISTRY = \'vertex_ai\',
        VERTEX_AI_MODEL_ID = \'rentals_demand_forecasting\',
        VERTEX_AI_MODEL_VERSION_ALIASES = [\'staging\']
        ) AS
        SELECT
          DATETIME_TRUNC(starttime, DAY) as hourly_timestamp,
          start_station_name,
          COUNT(*) AS rentals,
          FROM `{project}.{bq_dataset}.{bq_bikes_table}`
        GROUP BY hourly_timestamp, start_station_name;
        """,
                project=project,
                location=location,
            )
            .set_display_name("train arima+ model")
            .after(get_evaluation_model_metrics_op)
        )

        # Generate the ARIMA PLUS forecasts
        bq_arima_forecast_op = (
            BigqueryForecastModelJobOp(
                project=project,
                location=location,
                model=bq_arima_model_op.outputs["model"],
                horizon=1,  # 1 hour
                confidence_level=0.9,
                job_configuration_query=bq_forecast_configuration,
            )
            .set_display_name("generate hourly forecasts")
            .after(get_evaluation_model_metrics_op)
        )

        # Generate the ARIMA PLUS forecast explainations
        _ = (
            BigqueryExplainForecastModelJobOp(
                project=project,
                location=location,
                model=bq_arima_model_op.outputs["model"],
                horizon=1,  # 1 hour
                confidence_level=0.9,
                job_configuration_query=bq_explain_forecast_configuration,
            )
            .set_display_name("explain hourly forecasts")
            .after(bq_arima_forecast_op)
        )

In [48]:
compiler.Compiler().compile(pipeline_func=pipeline, package_path=PIPELINE_PACKAGE)

In [49]:
bqml_pipeline = vertex_ai.PipelineJob(
    display_name=f"{PIPELINE_NAME}-job",
    template_path=PIPELINE_PACKAGE,
    pipeline_root=PIPELINE_ROOT,
    enable_caching=False,    
    project=PROJECT_ID,
)

bqml_pipeline.run()

Creating PipelineJob


INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob


PipelineJob created. Resource name: projects/126752948276/locations/us-central1/pipelineJobs/bqml-forecast-pipeline-20230227121854


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/126752948276/locations/us-central1/pipelineJobs/bqml-forecast-pipeline-20230227121854


To use this PipelineJob in another session:


INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:


pipeline_job = aiplatform.PipelineJob.get('projects/126752948276/locations/us-central1/pipelineJobs/bqml-forecast-pipeline-20230227121854')


INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/126752948276/locations/us-central1/pipelineJobs/bqml-forecast-pipeline-20230227121854')


View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/bqml-forecast-pipeline-20230227121854?project=126752948276


INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/bqml-forecast-pipeline-20230227121854?project=126752948276


PipelineJob projects/126752948276/locations/us-central1/pipelineJobs/bqml-forecast-pipeline-20230227121854 current state:
PipelineState.PIPELINE_STATE_RUNNING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/126752948276/locations/us-central1/pipelineJobs/bqml-forecast-pipeline-20230227121854 current state:
PipelineState.PIPELINE_STATE_RUNNING


PipelineJob projects/126752948276/locations/us-central1/pipelineJobs/bqml-forecast-pipeline-20230227121854 current state:
PipelineState.PIPELINE_STATE_RUNNING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/126752948276/locations/us-central1/pipelineJobs/bqml-forecast-pipeline-20230227121854 current state:
PipelineState.PIPELINE_STATE_RUNNING


PipelineJob projects/126752948276/locations/us-central1/pipelineJobs/bqml-forecast-pipeline-20230227121854 current state:
PipelineState.PIPELINE_STATE_RUNNING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/126752948276/locations/us-central1/pipelineJobs/bqml-forecast-pipeline-20230227121854 current state:
PipelineState.PIPELINE_STATE_RUNNING


PipelineJob projects/126752948276/locations/us-central1/pipelineJobs/bqml-forecast-pipeline-20230227121854 current state:
PipelineState.PIPELINE_STATE_RUNNING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/126752948276/locations/us-central1/pipelineJobs/bqml-forecast-pipeline-20230227121854 current state:
PipelineState.PIPELINE_STATE_RUNNING


PipelineJob projects/126752948276/locations/us-central1/pipelineJobs/bqml-forecast-pipeline-20230227121854 current state:
PipelineState.PIPELINE_STATE_RUNNING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/126752948276/locations/us-central1/pipelineJobs/bqml-forecast-pipeline-20230227121854 current state:
PipelineState.PIPELINE_STATE_RUNNING


PipelineJob projects/126752948276/locations/us-central1/pipelineJobs/bqml-forecast-pipeline-20230227121854 current state:
PipelineState.PIPELINE_STATE_RUNNING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/126752948276/locations/us-central1/pipelineJobs/bqml-forecast-pipeline-20230227121854 current state:
PipelineState.PIPELINE_STATE_RUNNING


RuntimeError: ignored