### Retail demand forecasting with Vertex Forecast and Pipelines

This notebook demonstrates the use of Vertex Pipelines to orchestrate Vertex Forecast (VF) workflows. It makes use of three tables that define the retail demand schema for this example:

**Dataset**
* Uses the M5 public dataset


**Tables in the dataset**
- **activity**: activity table, containing time-variant information, e.g., sales, promo, etc.
- **product**: the product table containing metadata of items
- **plan table**: the table used to generate forecasts for future periods where actuals are unknown

**Modeling approach**
* Two models are trained, each with a different optimization objective: `minimize-rmse` & `minimize-mape`
* The predictions from both models aere stacked (averaged), to produce the final prediction
* In the training job configuration, `additional_experiments` is used to specify two overrides:
> * `'forecasting_model_type_override': 'seq2seq'` - instructs VF to only train a sequence-to-sequence model
> * `'forecasting_hierarchical_group_column_names':'dept_id, cat_id'` - specifies hierarchical modeling, and which input features to aggregate

**Model Pipeline**
* This pipeline includes all steps needed to train and evlauate the models, as well as generate a forecast/prediction for the future
* It includes examples of custom pipeline components and pre-built components from Vertex AI


## Notebook Setup

In [None]:
PROJECT_ID = 'sk-ai-ml-poc'  # <--- TODO: CHANGE THIS
LOCATION = 'us-central1' 
!gcloud config set project {PROJECT_ID}

### Install KFP SDK and Vertex Pipelines client library

In [None]:
# New
! pip3 install -U google-cloud-storage $USER_FLAG
# ! pip3 install $USER kfp google-cloud-pipeline-components --upgrade
!git clone https://github.com/kubeflow/pipelines.git
!pip install pipelines/components/google-cloud/.
!pip install google-cloud-aiplatform

In [None]:
# Automatically restart kernel after installs
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)


In [None]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

In [None]:
# GCP Project Configuration:
# project where pipeline and vertex jobs are executed

PROJECT_ID = 'sk-ai-ml-poc' # {type: 'string'} <--- TODO: CHANGE THIS
PROJECT_NUMBER = '555195908111' # {type: 'string'} <--- TODO: CHANGE THIS
LOCATION = 'us-central1' # {type: 'string'} 
# BQ_LOCATION = 'US' # {type: 'string'}

SCOPES = (
  'https://www.googleapis.com/auth/cloud-platform',
)

assert LOCATION, 'the value for this variable must be set'
assert PROJECT_ID, 'the value for this variable must be set'
assert PROJECT_NUMBER, 'the value for this variable must be set'
%env GOOGLE_CLOUD_PROJECT={PROJECT_ID}

## Pipeline Configuration

### Set some variables

**Before you run the next cell**, **edit it** to set variables for your project.  
 
* For `BUCKET_NAME`, enter the name of a Cloud Storage (GCS) bucket in your project.  Don't include the `gs://` prefix.

In [None]:
# Required Pipeline Parameters
USER = 'skewalramani'  #  {type: 'string'} <--- TODO: CHANGE THIS
BUCKET_NAME = 'sk-forecasting'  #  {type: 'string'} <--- TODO: CHANGE THIS
GS_PIPELINE_ROOT_PATH = 'gs://{}/pipeline_root/{}'.format(BUCKET_NAME, USER)


print('GS_PIPELINE_ROOT_PATH: {}'.format(GS_PIPELINE_ROOT_PATH))

Packages

In [None]:
from datetime import datetime
import json
import os
import time
from typing import Any, Callable, Dict, NamedTuple, Optional
from IPython.display import clear_output

from google import auth
from google.api_core import exceptions as google_exceptions
from google_cloud_pipeline_components import aiplatform as gcc_aip
from google_cloud_pipeline_components.experimental import forecasting as gcc_aip_forecasting
import google.cloud.aiplatform
from google.cloud import bigquery
from google.cloud import storage

#from google.colab import auth as colab_auth
#from google.colab import drive

import kfp
import kfp.v2.dsl
from kfp.v2.google import client as pipelines_client

from matplotlib import dates as mdates
from matplotlib import pyplot as plt

import pandas as pd
import seaborn as sns

from IPython.display import Image
from IPython.core.display import HTML 

In [None]:
print(f'aiplatform SDK version: {google.cloud.aiplatform.__version__}')

BQ and pipeline clients

In [None]:
# colab_auth.authenticate_user()
credentials, _ = auth.default()
credentials, _ = auth.default(scopes=SCOPES, quota_project_id=PROJECT_ID)
bq_client = bigquery.Client(project=PROJECT_ID, credentials=credentials)
pipeline_client = pipelines_client.AIPlatformClient(
  project_id=PROJECT_ID,
  region=LOCATION,
)

# Training, Evaluation, and Forecast Pipeline

### Pipeline visualization in Vertex UI

In [None]:
PIPELINES = {}

PIPELINES_FILEPATH = 'gs://sk-forecasting/pipelines/pipelines.json'

if os.path.isfile(PIPELINES_FILEPATH):
  with open(PIPELINES_FILEPATH) as f:
    PIPELINES = json.load(f)
else:
  PIPELINES = {}

def save_pipelines():
  with open(PIPELINES_FILEPATH, 'w') as f:
    json.dump(PIPELINES, f)

## Components

### Pipeline Component: **Create BQ dataset**

In [None]:
@kfp.v2.dsl.component(
  base_image='python:3.9',
  packages_to_install=['google-cloud-bigquery==2.18.0'],
)
def create_dataset(
    project: str, 
    vertex_dataset: str, 
    eval_dataset: str, 
    bq_location: str):
  from google.cloud import bigquery

  bq_client = bigquery.Client(project=project, location=bq_location)
  # (
  #     bq_client.query(f'CREATE SCHEMA IF NOT EXISTS `{project}.{vertex_dataset}`')
  #     .result()
  # ),
  (
      bq_client.query(f'CREATE SCHEMA IF NOT EXISTS `{project}.{eval_dataset}`')
      .result()
  )

### Pipeline component: **Input Table Specs**

In [None]:
@kfp.v2.dsl.component(base_image='python:3.9')
def create_input_table_specs(
  products_table_uri: str,
  activities_table_uri: str,
  locations_table_uri: str,
  time_granularity_unit: str,
  time_granularity_quantity: int,
) -> NamedTuple(
  'Output',
  [('input_table_specs', str), ('model_feature_columns', str)],
):
  import json

  products_table_specs = {
    'bigquery_uri': products_table_uri,
    'table_type': 'FORECASTING_ATTRIBUTE',
    'forecasting_attribute_table_metadata': {
      'primary_key_column': 'product_id'
    }
  }

  locations_table_specs = {
    'bigquery_uri': locations_table_uri,
    'table_type': 'FORECASTING_ATTRIBUTE',
    'forecasting_attribute_table_metadata': {
      'primary_key_column': 'location_id'
    }
  }

  activities_table_specs = {
    'bigquery_uri': activities_table_uri,
    'table_type': 'FORECASTING_PRIMARY',
    'forecasting_primary_table_metadata': {
        'time_column': 'date',
        'target_column': 'gross_quantity',
        'time_series_identifier_columns': ['product_id', 'location_id'],
        'unavailable_at_forecast_columns': [],
        'time_granularity': {
          'unit': time_granularity_unit,
          'quantity': time_granularity_quantity,
        },
        # 'predefined_splits_column': 'ml_use',
        # 'predefined_split_column': 'ml_use', # model_override
    }
  }

  model_feature_columns = [
    'product_id',
    'location_id',
    'gross_quantity',
    'date',
    'weekday',
    'wday',
    'month',
    'year',
    'event_name_1',
    'event_type_1',
    'event_name_2',
    'event_type_2',
    'snap_CA',
    'snap_TX'
    'snap_WI',
    'dept_id',
    'cat_id',
    'state_id',
  ]

  input_table_specs = [
    activities_table_specs,
    products_table_specs,
    locations_table_specs,
  ]

  return (
    json.dumps(input_table_specs),  # input_table_specs
    json.dumps(model_feature_columns),  # model_feature_columns
  )

### Pipeline Component: Get Eval Dataset uri

In [None]:
@kfp.v2.dsl.component(base_image='python:3.9')
def get_eval_dataset_path_uri(
    project: str,
    eval_bq_dataset: str,
    model_1_table: str,
    model_2_table: str,
) -> NamedTuple(
    'Outputs', 
    [
     ('model_1_bigquery_table_uri', str),
     ('model_2_bigquery_table_uri', str),
    ]
):

  # import json

  model_1_table_path_name = f'{project}:{eval_bq_dataset}:eval-{model_1_table}'
  model_2_table_path_name = f'{project}:{eval_bq_dataset}:eval-{model_2_table}'

  print(model_1_table_path_name)
  print(model_2_table_path_name)

  return (
      f'bq://{model_1_table_path_name}',
      f'bq://{model_2_table_path_name}'
  )

In [None]:
from typing import Dict, List, Optional, Sequence, Tuple, Union
from kfp.v2.dsl import Artifact
from kfp.v2.dsl import Input, Model

### Pipeline component: combine model predictions

In [None]:
@kfp.v2.dsl.component(
  base_image='python:3.9',
  packages_to_install=['google-cloud-bigquery==2.18.0', 'google-cloud-aiplatform==1.7.0'],
)
def create_combined_preds_table(
  project: str,
  dataset: str,
  bq_location: str,
  model_1_eval_table_uri: str,
  model_2_eval_table_uri: str,
  model_1_path: Input[Artifact],
  model_2_path: Input[Artifact],
  override: str = 'False',
) -> NamedTuple('Outputs', [('combined_preds_table_uri', str)]):
  from google.cloud import bigquery
 
  override = bool(override)
  bq_client = bigquery.Client(project=project, location=bq_location)
  combined_preds_table_name = f'{project}.{dataset}.combined_preds'
  
  model_1_eval_table_uri=model_1_eval_table_uri
  model_2_eval_table_uri=model_2_eval_table_uri

  def _sanitize_bq_uri(bq_uri):
    if bq_uri.startswith("bq://"):
      bq_uri = bq_uri[5:]
    return bq_uri.replace(":", ".")

  model_1_eval_table_uri = _sanitize_bq_uri(
      model_1_eval_table_uri
  )

  model_2_eval_table_uri = _sanitize_bq_uri(
      model_2_eval_table_uri
  )

  (
    bq_client.query(
      f"""
      CREATE {'OR REPLACE TABLE' if override else 'TABLE IF NOT EXISTS'} 
        `{combined_preds_table_name}`
      AS (
        SELECT * except(row_number) from
          (
            SELECT *,ROW_NUMBER() OVER (PARTITION BY datetime,vertex__timeseries__id order by predicted_on_date asc) row_number
            FROM
            (
              SELECT
              DATE(table_a.date) as datetime, 
              DATE(table_a.predicted_on_date) as predicted_on_date,
              CAST(table_a.gross_quantity as INTEGER) as gross_quantity, 
              table_a.vertex__timeseries__id,
              table_a.predicted_gross_quantity.value as predicted_gross_quantity_a, 
              table_b.predicted_gross_quantity.value as predicted_gross_quantity_b
              FROM
              `{model_1_eval_table_uri}` AS table_a
              INNER JOIN `{model_2_eval_table_uri}` AS table_b
              ON DATE(table_a.date) = DATE(table_b.date)
              and table_a.vertex__timeseries__id = table_b.vertex__timeseries__id
              and DATE(table_a.predicted_on_date) = DATE(table_b.predicted_on_date)
          ) a
          )m
          where row_number = 1
        );
          """
    )
    .result()
  )

  return (
    f'bq://{combined_preds_table_name}',
  )

In [None]:
@kfp.v2.dsl.component(
  base_image='python:3.9',
  packages_to_install=['google-cloud-bigquery==2.18.0'],
)
def create_final_pred_table(
  project: str,
  dataset: str,
  bq_location: str,
  combined_preds_table_uri: str,
  override: str = 'False',
) -> NamedTuple('Outputs', [('final_preds_table_uri', str)]):
  from google.cloud import bigquery
 
  override = bool(override)
  bq_client = bigquery.Client(project=project, location=bq_location)
  final_preds_table_name = f'{project}.{dataset}.final_preds'
  (
    bq_client.query(
        f"""
      CREATE {'OR REPLACE TABLE' if override else 'TABLE IF NOT EXISTS'} 
        `{final_preds_table_name}`
        AS (
          SELECT
              datetime,
              vertex__timeseries__id, 
              gross_quantity as gross_quantity_actual,
              ROUND(a.predicted_gross_quantity_a, 2) as model_a_pred,
              ROUND(a.predicted_gross_quantity_b, 2) as model_b_pred,
              ROUND((a.predicted_gross_quantity_a + a.predicted_gross_quantity_b)/2, 2) AS Final_Pred,
              ROUND(ABS(gross_quantity - ((a.predicted_gross_quantity_a + a.predicted_gross_quantity_b)/2)), 2) as Final_Pred_error,
          FROM
            `{combined_preds_table_uri[5:]}` AS a);
            """
    )
    .result()
  )

  return (
      f'bq://{final_preds_table_name}',
  )

### Pipeline component: Create Forecast input table

In [None]:
@kfp.v2.dsl.component(
  base_image='python:3.9',
  packages_to_install=['google-cloud-aiplatform==1.7.0'],
)
def create_forecast_input_table_specs(
  project: str,
  forecast_products_table_uri: str,
  forecast_activities_table_uri: str,
  forecast_locations_table_uri: str,
  forecast_plan_table_uri: str,
  time_granularity_unit: str,
  time_granularity_quantity: int,
) -> NamedTuple('Outputs', [('forecast_input_table_specs', str)]):
  import json
  import os

  forecast_input_table_specs = [
    {
      'bigquery_uri': forecast_plan_table_uri,
      'table_type': 'FORECASTING_PLAN',
    },
    {
      'bigquery_uri': forecast_activities_table_uri,
      'table_type': 'FORECASTING_PRIMARY',
      'forecasting_primary_table_metadata': {
          'time_column': 'date',
          'target_column': 'gross_quantity',
          'time_series_identifier_columns': ['product_id', 'location_id'],
          'unavailable_at_forecast_columns': [],
          'time_granularity': {
              'unit': time_granularity_unit,
              'quantity': time_granularity_quantity,
              },
          # 'predefined_splits_column': 'ml_use',
          # 'predefined_split_column': 'ml_use', # model_override
      }
    },
    {
      'bigquery_uri': forecast_products_table_uri,
      'table_type': 'FORECASTING_ATTRIBUTE',
      'forecasting_attribute_table_metadata': {
      'primary_key_column': 'product_id'
      }
     },
    {
      'bigquery_uri': forecast_locations_table_uri,
      'table_type': 'FORECASTING_ATTRIBUTE',
      'forecasting_attribute_table_metadata': {
      'primary_key_column': 'location_id'
      }
    },
  ]

  print(forecast_input_table_specs)
  
  return (json.dumps(forecast_input_table_specs),)

### Pipeline Component: Get predict table path

In [None]:
@kfp.v2.dsl.component(base_image='python:3.9')
def get_predict_table_path(
  predict_processed_table: str
) -> NamedTuple('Outputs', [('preprocess_bq_uri', str)]):
  import json

  preprocess_bq_uri = (
    json.loads(predict_processed_table)
    ['processed_bigquery_table_uri']
  )
  return (preprocess_bq_uri,)

### Pipeline Component: Model_1 Batch Prediction

In [None]:
from google_cloud_pipeline_components.types import artifact_types

In [None]:
@kfp.v2.dsl.component(
  base_image='python:3.9',
  packages_to_install=['google-cloud-aiplatform==1.7.0'],
)
def model_1_predict_job_v2(
    project: str,
    location: str,
    eval_bq_dataset: str,
    bigquery_source: str,
    model_1_path: Input[Artifact],
) -> NamedTuple('Outputs', [
                            ('batch_predict_output_bq_uri', str),
                            ('batch_predict_job_dict', dict)]):

  from google.cloud import aiplatform
  # from google_cloud_pipeline_components.types import artifact_types
  import json
  import logging

  aiplatform.init(
      project=project,
      location=location,
  )
  # print("model_1_path:", model_1_path)
  # model_aip_uri=model_1_path["uri"]
  # print("model_1_path[49:]:", model_1_path[49:])
  # model_aip_uri=model_1_path[49:]
  # print("model_aip_uri_2:", model_aip_uri)

  # model = artifact_types.VertexModel(uri='xxx')

  model_resource_path = model_1_path.metadata["resourceName"]
  logging.info("model path: %s", model_resource_path)

  model = aiplatform.Model(model_name=model_resource_path)
  logging.info("Model dict:", model.to_dict())

  batch_predict_job = model.batch_predict(
      bigquery_source=bigquery_source,
      instances_format="bigquery",
      bigquery_destination_prefix=f'bq://{project}.{eval_bq_dataset}',
      predictions_format="bigquery",
      job_display_name='batch-predict-job-1',
  )

  batch_predict_bq_output_uri = "{}.{}".format(
      batch_predict_job.output_info.bigquery_output_dataset,
      batch_predict_job.output_info.bigquery_output_table)
  
  # if batch_predict_bq_output_uri.startswith("bq://"):
  #   batch_predict_bq_output_uri = batch_predict_bq_output_uri[5:]

  # batch_predict_bq_output_uri.replace(":", ".")

  print(batch_predict_job.to_dict())
  return (batch_predict_bq_output_uri, 
          batch_predict_job.to_dict())

### Pipeline Component: Model_2 Batch Prediction

In [None]:
@kfp.v2.dsl.component(
  base_image='python:3.9',
  packages_to_install=['google-cloud-aiplatform==1.7.0'],
)
def model_2_predict_job_v2(
    project: str,
    location: str,
    eval_bq_dataset: str,
    bigquery_source: str,
    model_2_path: Input[Artifact],
) -> NamedTuple('Outputs', [
                            ('batch_predict_output_bq_uri', str),
                            ('batch_predict_job_dict', dict)]):

  from google.cloud import aiplatform
  # from google_cloud_pipeline_components.types import artifact_types
  import json
  import logging

  aiplatform.init(
      project=project,
      location=location,
  )
  # print("model_2_path:", model_2_path)
  # model_aip_uri=model_2_path["uri"]
  # print("model_2_path[49:]:", model_2_path[49:])
  # model_aip_uri=model_2_path[49:]
  # print("model_aip_uri_2:", model_aip_uri)

  # model = artifact_types.VertexModel(uri='xxx')

  model_resource_path = model_2_path.metadata["resourceName"]
  logging.info("model path: %s", model_resource_path)

  model = aiplatform.Model(model_name=model_resource_path)
  print("Model dict:", model.to_dict())

  batch_predict_job = model.batch_predict(
      bigquery_source=bigquery_source,
      instances_format="bigquery",
      bigquery_destination_prefix=f'bq://{project}.{eval_bq_dataset}',
      predictions_format="bigquery",
      job_display_name='batch-predict-job-2',
  )

  batch_predict_bq_output_uri = "{}.{}".format(
      batch_predict_job.output_info.bigquery_output_dataset,
      batch_predict_job.output_info.bigquery_output_table)
  
  # if batch_predict_bq_output_uri.startswith("bq://"):
  #   batch_predict_bq_output_uri = batch_predict_bq_output_uri[5:]

  # batch_predict_bq_output_uri.replace(":", ".")

  print(batch_predict_job.to_dict())
  return (batch_predict_bq_output_uri, 
          batch_predict_job.to_dict())

### Pipeline Component: Combine Plan Predictions

In [None]:
@kfp.v2.dsl.component(
  base_image='python:3.9',
  packages_to_install=['google-cloud-bigquery==2.18.0'],
)
def create_combined_preds_forecast_table(
  project: str,
  dataset: str,
  model_1_pred_table_uri: str,
  model_2_pred_table_uri: str,
  override: str = 'False',
) -> NamedTuple('Outputs', [('combined_preds_forecast_table_uri', str)]):
  from google.cloud import bigquery
 
  override = bool(override)
  bq_client = bigquery.Client(project=project)
  combined_preds_forecast_table_name = f'{project}.{dataset}.combined_preds_forecast'
  (
    bq_client.query(
      f"""
      CREATE {'OR REPLACE TABLE' if override else 'TABLE IF NOT EXISTS'} 
        `{combined_preds_forecast_table_name}`
      AS (SELECT
          table_a.date as date, 
          table_a.vertex__timeseries__id,
          ROUND(table_a.predicted_gross_quantity.value,2) as predicted_gross_quantity_a, 
          ROUND(table_b.predicted_gross_quantity.value, 2) as predicted_gross_quantity_b,
          ROUND((table_a.predicted_gross_quantity.value + table_b.predicted_gross_quantity.value)/2, 2) AS Final_Pred
          FROM
          `{model_1_pred_table_uri[5:]}` AS table_a
          INNER JOIN `{model_2_pred_table_uri[5:]}` AS table_b
              ON table_a.date = table_b.date
              and table_a.vertex__timeseries__id = table_b.vertex__timeseries__id
              );
          """
    )
    .result()
  )

  return (
    f'bq://{combined_preds_forecast_table_name}',
  )

## Compile and Run Train Pipeline

In [None]:
OVERRIDE = 'True' # replace BQ eval tables?
VERSION = 'm5_v8' # <--- TODO; Pipeline & model identifier;

In [None]:
PIPELINE_TAG = 'train-m5-hier' # <--- TODO; optionally name pipeline
@kfp.v2.dsl.pipeline(
  name=f'{VERSION}-{PIPELINE_TAG}'.replace('_', '-')
)
def pipeline(
  vertex_project: str,
  location: str,
  version: str,
  data_source_dataset: str,
  eval_destination_dataset: str,
  preprocess_dataset_us: str,
  locations_table_uri: str,
  forecast_locations_table_uri: str,
  products_override: str,
  products_table_uri: str,
  forecast_products_table_uri: str,
  activities_override: str,
  activities_table_uri: str,
  forecast_activities_table_uri: str,
  # activities_expected_historical_last_date: str,
  forecast_plan_table_uri: str,
  time_granularity_unit: str,
  time_granularity_quantity: int,
  context_window: int,
  forecast_horizon: int,
  override: str,
  budget_milli_node_hours: int = 16000,
):

  create_dataset_op = create_dataset(
      project=vertex_project, 
      vertex_dataset=data_source_dataset, 
      eval_dataset=eval_destination_dataset, 
      bq_location=location
      )

  create_input_table_specs_op = create_input_table_specs(
    products_table_uri=products_table_uri,
    activities_table_uri=activities_table_uri,
    locations_table_uri=locations_table_uri,
    time_granularity_unit=time_granularity_unit,
    time_granularity_quantity=time_granularity_quantity,
  )
  create_input_table_specs_op.after(create_dataset_op)

  forecasting_validation_op = gcc_aip_forecasting.ForecastingValidationOp(
    input_tables=create_input_table_specs_op.outputs['input_table_specs'],
    validation_theme='FORECASTING_TRAINING',
  )

  forecasting_preprocessing_op = gcc_aip_forecasting.ForecastingPreprocessingOp(
    project=vertex_project,
    input_tables=create_input_table_specs_op.outputs['input_table_specs'],
    preprocessing_bigquery_dataset=data_source_dataset,
  )
  forecasting_preprocessing_op.after(forecasting_validation_op)
  
  prepare_data_for_train_op = gcc_aip_forecasting.ForecastingPrepareDataForTrainOp(
      input_tables=(
          create_input_table_specs_op.outputs['input_table_specs']
      ),
      preprocess_metadata=(
          forecasting_preprocessing_op.outputs['preprocess_metadata']
      ),
      model_feature_columns=(
          create_input_table_specs_op.outputs['model_feature_columns']
      ),
    )

  time_series_dataset_create_op = gcc_aip.TimeSeriesDatasetCreateOp(
    display_name='training_dataset_full_m5', 
    bq_source=prepare_data_for_train_op.outputs['preprocess_bq_uri'],
    project=vertex_project,
    location=location,
  )

  mape_model_version = f'{VERSION}-seq2seq-mape' # TODO: determines model display name and eval BQ table name # f'{VERSION}-l2l-mape'
  rmse_model_version = f'{VERSION}-seq2seq-rmse' # TODO: determines model display name and eval BQ table name

  get_eval_dataset_path_uri_op = get_eval_dataset_path_uri(
      project=vertex_project,
      eval_bq_dataset=eval_destination_dataset,
      model_1_table=mape_model_version,
      model_2_table=rmse_model_version,
  )

  mape_model_op = gcc_aip_forecasting.ForecastingTrainingWithExperimentsOp(
      display_name=f'train-{mape_model_version}',
      model_display_name=mape_model_version,
      model_labels={'model_override' : 'se2seq-hier'}, # model_override : se2seq-hier, tft
      # model_labels={'model_type' : 'l2l'},
      dataset=time_series_dataset_create_op.outputs['dataset'],
      context_window=context_window,
      forecast_horizon=forecast_horizon,
      budget_milli_node_hours=budget_milli_node_hours,
      project=vertex_project,
      location=location,
      export_evaluated_data_items=True,
      export_evaluated_data_items_bigquery_destination_uri=get_eval_dataset_path_uri_op.outputs['model_1_bigquery_table_uri'], # must be format:``bq://<project_id>:<dataset_id>:<table>``
      export_evaluated_data_items_override_destination=True,
      target_column=prepare_data_for_train_op.outputs['target_column'],
      time_column=prepare_data_for_train_op.outputs['time_column'],
      time_series_identifier_column=prepare_data_for_train_op.outputs['time_series_identifier_column'],
      time_series_attribute_columns=prepare_data_for_train_op.outputs['time_series_attribute_columns'],
      unavailable_at_forecast_columns=prepare_data_for_train_op.outputs['unavailable_at_forecast_columns'],
      available_at_forecast_columns=prepare_data_for_train_op.outputs['available_at_forecast_columns'],
      data_granularity_unit=prepare_data_for_train_op.outputs['data_granularity_unit'],
      data_granularity_count=prepare_data_for_train_op.outputs['data_granularity_count'],
      predefined_split_column_name= '', # prepare_data_for_train_op.outputs['predefined_split_column'],
      column_transformations=prepare_data_for_train_op.outputs['column_transformations'],
      weight_column=prepare_data_for_train_op.outputs['weight_column'],
      optimization_objective='minimize-mape',
      additional_experiments={
          'forecasting_model_type_override': 'seq2seq',
          'forecasting_hierarchical_group_column_names':'dept_id, cat_id'},
  )

  rmse_model_op = gcc_aip_forecasting.ForecastingTrainingWithExperimentsOp(
      display_name=f'train-{rmse_model_version}',
      model_display_name=rmse_model_version,
      model_labels={'model_override' : 'se2seq-hier'}, # model_override : se2seq-hier, tft
      # model_labels={'model_type' : 'l2l'},
      dataset=time_series_dataset_create_op.outputs['dataset'],
      context_window=context_window,
      forecast_horizon=forecast_horizon,
      budget_milli_node_hours=budget_milli_node_hours,
      project=vertex_project,
      location=location,
      export_evaluated_data_items=True,
      export_evaluated_data_items_bigquery_destination_uri=get_eval_dataset_path_uri_op.outputs['model_2_bigquery_table_uri'], # must be format: ``bq://<project_id>:<dataset_id>:<table>``
      export_evaluated_data_items_override_destination=True,
      target_column=prepare_data_for_train_op.outputs['target_column'],
      time_column=prepare_data_for_train_op.outputs['time_column'],
      time_series_identifier_column=prepare_data_for_train_op.outputs['time_series_identifier_column'],
      time_series_attribute_columns=prepare_data_for_train_op.outputs['time_series_attribute_columns'],
      unavailable_at_forecast_columns=prepare_data_for_train_op.outputs['unavailable_at_forecast_columns'],
      available_at_forecast_columns=prepare_data_for_train_op.outputs['available_at_forecast_columns'],
      data_granularity_unit=prepare_data_for_train_op.outputs['data_granularity_unit'],
      data_granularity_count=prepare_data_for_train_op.outputs['data_granularity_count'],
      predefined_split_column_name= '', # prepare_data_for_train_op.outputs['predefined_split_column'],
      column_transformations=prepare_data_for_train_op.outputs['column_transformations'],
      weight_column=prepare_data_for_train_op.outputs['weight_column'],
      optimization_objective='minimize-rmse',
      additional_experiments={
          'forecasting_model_type_override': 'seq2seq',
          'forecasting_hierarchical_group_column_names':'dept_id, cat_id'},
  )

  create_combined_preds_table_op = create_combined_preds_table(
      project=vertex_project,
      dataset=eval_destination_dataset,
      bq_location=location,
      model_1_eval_table_uri=get_eval_dataset_path_uri_op.outputs['model_1_bigquery_table_uri'],
      model_2_eval_table_uri=get_eval_dataset_path_uri_op.outputs['model_2_bigquery_table_uri'],
      model_1_path=mape_model_op.outputs['model'],
      model_2_path=rmse_model_op.outputs['model'],
  )

  create_final_preds_table_op = create_final_pred_table(
      project=vertex_project,
      dataset=eval_destination_dataset,
      bq_location=location,
      combined_preds_table_uri=create_combined_preds_table_op.outputs['combined_preds_table_uri'],
      override=override,
  )

  create_forecast_input_table_specs_op = create_forecast_input_table_specs(
      project=vertex_project,
      forecast_products_table_uri=forecast_products_table_uri,
      forecast_activities_table_uri=forecast_activities_table_uri,
      forecast_locations_table_uri=forecast_locations_table_uri,
      forecast_plan_table_uri=forecast_plan_table_uri,
      time_granularity_unit=time_granularity_unit,
      time_granularity_quantity=time_granularity_quantity,
  )

  forecast_validation_op = gcc_aip_forecasting.ForecastingValidationOp(
      input_tables=create_forecast_input_table_specs_op.outputs['forecast_input_table_specs'],
      validation_theme='FORECASTING_PREDICTION',
  )

  forecast_preprocess_op = gcc_aip_forecasting.ForecastingPreprocessingOp(
      project=vertex_project,
      input_tables=create_forecast_input_table_specs_op.outputs['forecast_input_table_specs'],
      preprocessing_bigquery_dataset=preprocess_dataset_us, # TODO: Table needs to be in 'US'
  )
  forecast_preprocess_op.after(forecast_validation_op)

  get_predict_table_path_op = get_predict_table_path(
      predict_processed_table=forecast_preprocess_op.outputs['preprocess_metadata'],
  )

  model_1_predict_job_op = model_1_predict_job_v2(
      project=vertex_project,
      location=location,
      eval_bq_dataset=eval_destination_dataset,
      bigquery_source=get_predict_table_path_op.outputs['preprocess_bq_uri'],
      model_1_path=mape_model_op.outputs['model'],
  )

  model_2_predict_job_op = model_2_predict_job_v2(
      project=vertex_project,
      location=location,
      eval_bq_dataset=eval_destination_dataset,
      bigquery_source=get_predict_table_path_op.outputs['preprocess_bq_uri'],
      model_2_path=rmse_model_op.outputs['model'],
  )

  create_combined_preds_forecast_table_op = create_combined_preds_forecast_table(
      project=vertex_project,
      dataset=eval_destination_dataset,
      model_1_pred_table_uri=model_1_predict_job_op.outputs['batch_predict_output_bq_uri'],
      model_2_pred_table_uri=model_2_predict_job_op.outputs['batch_predict_output_bq_uri'],
      override=override,
  )

In [None]:
kfp.v2.compiler.Compiler().compile(
  pipeline_func=pipeline, 
  package_path='custom_container_pipeline_spec.json',
)

Specify pipeline parameters

In [None]:
PROJECT_ID = 'sk-ai-ml-poc' # <--- TODO: If not set
LOCATION = 'us-central1' # <--- TODO: If not set
# SERVICE_ACCOUNT = 'jtotten-project-user@jtotten-project.iam.gserviceaccount.com', # <--- TODO: Change This if needed

# BQ dataset for source data source
DATA_SOURCE_DATASET = 'm5_forecasting'

# BQ dataset for eval tables
EVAL_DESTINATION_DATASET = 'm5_eval_forecasting' # <--- TODO: Change this
PREPROCESS_DATASET_US = 'm5_preprocessing_us' # <--- TODO: create this in BQ region "US"; dont need to create for different pipeline runs

# training BQ tables
PRODUCTS_TABLE = 'sk-ai-ml-poc.m5_forecasting.products'  #  {type: 'string'} <---TODO: CHANGE THIS
LOCATIONS_TABLE = 'sk-ai-ml-poc.m5_forecasting.locations'  #  {type: 'string'} <---TODO: CHANGE THIS
ACTIVITIES_TABLE = 'sk-ai-ml-poc.m5_forecasting.activity_all'  #  {type: 'string'} <---TODO: CHANGE THIS
# BATCH_PREDICT_TABLE = 'jtotten-project.m5_comp.test_split'  #  {type: 'string'} <---TODO: CHANGE THIS

assert PRODUCTS_TABLE, 'the value for this variable must be set'
assert LOCATIONS_TABLE, 'the value for this variable must be set'
assert ACTIVITIES_TABLE, 'the value for this variable must be set'
# assert BATCH_PREDICT_TABLE, 'the value for this variable must be set'

# forecast BQ tables
FORECAST_PRODUCTS_TABLE = 'sk-ai-ml-poc.m5_forecasting.products'
FORECAST_PLAN_TABLE = 'sk-ai-ml-poc.m5_forecasting.plan_table_all'
FORECAST_ACTIVITIES_TABLE = 'sk-ai-ml-poc.m5_forecasting.activity_all'
FORECAST_LOCATIONS_TABLE = 'sk-ai-ml-poc.m5_forecasting.locations'

assert FORECAST_PRODUCTS_TABLE, 'the value for this variable must be set'
assert FORECAST_PLAN_TABLE, 'the value for this variable must be set'
assert FORECAST_ACTIVITIES_TABLE, 'the value for this variable must be set'
assert FORECAST_LOCATIONS_TABLE, 'the value for this variable must be set'

# TODO: Forecasting Configuration:
HISTORY_WINDOW_n = 28 #  {type: 'integer'} # context_window
FORECAST_HORIZON = 28 #  {type: 'integer'} 
BUDGET_MILLI_NODE_HOURS = 10000

# Max date in the activities/sales table
# ACTIVITIES_EXPECTED_HISTORICAL_LAST_DATE = 'xxxx' #  {type: 'string'}
# PREDICTED_ON_DATETIME = 'xxx' # not used for POC, but likely needed for production

assert HISTORY_WINDOW_n, 'the value for this variable must be set'
assert FORECAST_HORIZON, 'the value for this variable must be set'
# assert ACTIVITIES_EXPECTED_HISTORICAL_LAST_DATE, 'the value for this variable must be set'
assert LOCATION, 'the value for this variable must be set'
assert PROJECT_ID, 'the value for this variable must be set'

In [None]:
overwrite = True # True creates new pipeline instance for execution

Submit pipeline run

In [None]:
if not PIPELINES.get('train') or overwrite:
  response = pipeline_client.create_run_from_job_spec(
    job_spec_path='custom_container_pipeline_spec.json',
    # service_account=SERVICE_ACCOUNT, # <--- TODO: Uncomment if needed
    parameter_values={
      'vertex_project': PROJECT_ID,
      'location': LOCATION,
      'version': VERSION,
      'data_source_dataset': DATA_SOURCE_DATASET,
      'eval_destination_dataset': EVAL_DESTINATION_DATASET,
      'preprocess_dataset_us': PREPROCESS_DATASET_US,
      'products_override': 'False',
      'products_table_uri': f'bq://{PRODUCTS_TABLE}',
      'forecast_products_table_uri': f'bq://{FORECAST_PRODUCTS_TABLE}',
      'activities_override': 'False',
      'activities_table_uri': f'bq://{ACTIVITIES_TABLE}',
      'forecast_activities_table_uri': f'bq://{FORECAST_ACTIVITIES_TABLE}',
      # 'activities_expected_historical_last_date': ACTIVITIES_EXPECTED_HISTORICAL_LAST_DATE,
      # 'predicted_on_date': PREDICTED_ON_DATETIME,
      'forecast_plan_table_uri': f'bq://{FORECAST_PLAN_TABLE}',
      'locations_table_uri': f'bq://{LOCATIONS_TABLE}',
      'forecast_locations_table_uri': f'bq://{FORECAST_LOCATIONS_TABLE}',
      'time_granularity_unit': 'DAY',
      'time_granularity_quantity': 1,
      'context_window': HISTORY_WINDOW_n,
      'forecast_horizon': FORECAST_HORIZON,
      'override': OVERRIDE,
      'budget_milli_node_hours': BUDGET_MILLI_NODE_HOURS,
    },
    pipeline_root=f'{GS_PIPELINE_ROOT_PATH}/{VERSION}',
  )
  PIPELINES['train'] = response['name']
  # save_pipelines()