In [1]:
PROJECT_ID = 'sk-ai-ml-poc'  # <--- TODO: CHANGE THIS
LOCATION = 'us-central1' 
!gcloud config set project {PROJECT_ID}

Updated property [core/project].


In [None]:
# New
! pip3 install -U google-cloud-storage $USER_FLAG
# ! pip3 install $USER kfp google-cloud-pipeline-components --upgrade
!git clone https://github.com/kubeflow/pipelines.git
!pip install pipelines/components/google-cloud/.
!pip install google-cloud-aiplatform

In [None]:
# Automatically restart kernel after installs
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)


In [2]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

KFP SDK version: 1.8.10
google_cloud_pipeline_components version: 0.2.3.dev


In [3]:
# GCP Project Configuration:
# project where pipeline and vertex jobs are executed

PROJECT_ID = 'sk-ai-ml-poc' # {type: 'string'} <--- TODO: CHANGE THIS
PROJECT_NUMBER = '555195908111' # {type: 'string'} <--- TODO: CHANGE THIS
LOCATION = 'us-central1' # {type: 'string'} 
BQ_LOCATION = 'us-central1' # {type: 'string'}

SCOPES = (
  'https://www.googleapis.com/auth/cloud-platform',
)

assert LOCATION, 'the value for this variable must be set'
assert PROJECT_ID, 'the value for this variable must be set'
assert PROJECT_NUMBER, 'the value for this variable must be set'
%env GOOGLE_CLOUD_PROJECT={PROJECT_ID}

env: GOOGLE_CLOUD_PROJECT=sk-ai-ml-poc


In [4]:
# Required Pipeline Parameters
USER = 'skewalramani'  #  {type: 'string'} <--- TODO: CHANGE THIS
BUCKET_NAME = 'sk-forecasting'  #  {type: 'string'} <--- TODO: CHANGE THIS
GS_PIPELINE_ROOT_PATH = 'gs://{}/pipeline_root/{}'.format(BUCKET_NAME, USER)


print('GS_PIPELINE_ROOT_PATH: {}'.format(GS_PIPELINE_ROOT_PATH))

GS_PIPELINE_ROOT_PATH: gs://sk-forecasting/pipeline_root/skewalramani


In [5]:
from datetime import datetime
import json
import os
import time
from typing import Any, Callable, Dict, NamedTuple, Optional
from IPython.display import clear_output

from google import auth
from google.api_core import exceptions as google_exceptions
from google_cloud_pipeline_components import aiplatform as gcc_aip
from google_cloud_pipeline_components.experimental import forecasting as gcc_aip_forecasting
import google.cloud.aiplatform
from google.cloud import bigquery
from google.cloud import storage

#from google.colab import auth as colab_auth
#from google.colab import drive

import kfp
import kfp.v2.dsl
from kfp.v2.google import client as pipelines_client

from matplotlib import dates as mdates
from matplotlib import pyplot as plt

import pandas as pd
import seaborn as sns

from IPython.display import Image
from IPython.core.display import HTML 

In [6]:
print(f'aiplatform SDK version: {google.cloud.aiplatform.__version__}')

aiplatform SDK version: 1.8.1


In [24]:
# authenticate_user()
credentials, _ = auth.default()
credentials, _ = auth.default(scopes=SCOPES, quota_project_id=PROJECT_ID)
bq_client = bigquery.Client(project=PROJECT_ID, credentials=credentials)
pipeline_client = pipelines_client.AIPlatformClient(
  project_id=PROJECT_ID,
  region=LOCATION,
)

In [14]:
PIPELINES = {}

PIPELINES_FILEPATH = 'gs://sk-forecasting/pipelines/pipelines.json'

if os.path.isfile(PIPELINES_FILEPATH):
  with open(PIPELINES_FILEPATH) as f:
    PIPELINES = json.load(f)
else:
  PIPELINES = {}

def save_pipelines():
  with open(PIPELINES_FILEPATH, 'w') as f:
    json.dump(PIPELINES, f)

In [7]:
@kfp.v2.dsl.component(base_image='python:3.9')
def create_input_table_specs(
  sales_table_uri: str,
  time_granularity_unit: str,
  time_granularity_quantity: int,
) -> NamedTuple(
  'Output',
  [('input_table_specs', str), ('model_feature_columns', str)],
):
  import json

  sales_table_specs = {
    'bigquery_uri': sales_table_uri,
    'table_type': 'FORECASTING_PRIMARY',
    'forecasting_primary_table_metadata': {
        'time_column': 'ds',
        'target_column': 'y',
        'time_series_identifier_columns': ['xsku', 'xstore'],
        'unavailable_at_forecast_columns': [],
        'time_granularity': {
          'unit': time_granularity_unit,
          'quantity': time_granularity_quantity,
        },
        # 'predefined_splits_column': 'ml_use',
        # 'predefined_split_column': 'ml_use', # model_override
    }
  }


  model_feature_columns = [
    'cal_name',
    'cal_val',
    'discount_amount',
    'ds',
    'oss_Days',
    'promo_type',
    'wic',
    'xclass',
    'xcoupon_type',
    'xdiscount_type',
    'xsku',
    'xstore',
    'xstore_type',
    'xsubclass',
    'y'
  ]

  input_table_specs = [
    sales_table_specs

  ]

  return (
    json.dumps(input_table_specs),  # input_table_specs
    json.dumps(model_feature_columns),  # model_feature_columns
  )

In [16]:
OVERRIDE = 'True' # replace BQ eval tables?
VERSION = 'sales_v1' # <--- TODO; Pipeline & model identifier;

In [17]:
PIPELINE_TAG = 'train-sales-ds' # <--- TODO; optionally name pipeline
@kfp.v2.dsl.pipeline(
  name=f'{VERSION}-{PIPELINE_TAG}'.replace('_', '-')
)
def pipeline(
  vertex_project: str,
  location: str,
  version: str,
  data_source_dataset: str,
  sales_table_uri: str,
  time_granularity_unit: str,
  time_granularity_quantity: int,
  context_window: int,
  forecast_horizon: int,
  override: str,
  budget_milli_node_hours: int = 16000,
):


  create_input_table_specs_op = create_input_table_specs(
    sales_table_uri=sales_table_uri,
    time_granularity_unit=time_granularity_unit,
    time_granularity_quantity=time_granularity_quantity
   )
#  create_input_table_specs_op.after(create_dataset_op)

  forecasting_validation_op = gcc_aip_forecasting.ForecastingValidationOp(
    input_tables=create_input_table_specs_op.outputs['input_table_specs'],
    validation_theme='FORECASTING_TRAINING',
  )

  forecasting_preprocessing_op = gcc_aip_forecasting.ForecastingPreprocessingOp(
    project=vertex_project,
    input_tables=create_input_table_specs_op.outputs['input_table_specs'],
    preprocessing_bigquery_dataset=data_source_dataset,
  )
  forecasting_preprocessing_op.after(forecasting_validation_op)

  
  prepare_data_for_train_op = gcc_aip_forecasting.ForecastingPrepareDataForTrainOp(
      input_tables=(
          create_input_table_specs_op.outputs['input_table_specs']
      ),
      preprocess_metadata=(
          forecasting_preprocessing_op.outputs['preprocess_metadata']
      ),
      model_feature_columns=(
          create_input_table_specs_op.outputs['model_feature_columns']
      ),
    )


  time_series_dataset_create_op = gcc_aip.TimeSeriesDatasetCreateOp(
    display_name='sales_training_dataset', 
    bq_source=prepare_data_for_train_op.outputs['preprocess_bq_uri'],
    project=vertex_project,
    location=location,
  )

#  mape_model_version = f'{VERSION}-seq2seq-mape' # TODO: determines model display name and eval BQ table name # f'{VERSION}-l2l-mape'
  rmse_model_version = f'{VERSION}-seq2seq-rmse' # TODO: determines model display name and eval BQ table name



  rmse_model_op = gcc_aip_forecasting.ForecastingTrainingWithExperimentsOp(
      display_name=f'train-{rmse_model_version}',
      model_display_name=rmse_model_version,
      model_labels={'model_override' : 'se2seq-hier'}, # model_override : se2seq-hier, tft
      # model_labels={'model_type' : 'l2l'},
      dataset=time_series_dataset_create_op.outputs['dataset'],
      context_window=context_window,
      forecast_horizon=forecast_horizon,
      budget_milli_node_hours=budget_milli_node_hours,
      project=vertex_project,
      location=location,
      export_evaluated_data_items=True,
      #export_evaluated_data_items_bigquery_destination_uri=get_eval_dataset_path_uri_op.outputs['model_2_bigquery_table_uri'], # must be format: ``bq://<project_id>:<dataset_id>:<table>``
      export_evaluated_data_items_override_destination=True,
      target_column=prepare_data_for_train_op.outputs['target_column'],
      time_column=prepare_data_for_train_op.outputs['time_column'],
      time_series_identifier_column=prepare_data_for_train_op.outputs['time_series_identifier_column'],
      time_series_attribute_columns=prepare_data_for_train_op.outputs['time_series_attribute_columns'],
      unavailable_at_forecast_columns=prepare_data_for_train_op.outputs['unavailable_at_forecast_columns'],
      available_at_forecast_columns=prepare_data_for_train_op.outputs['available_at_forecast_columns'],
      data_granularity_unit=prepare_data_for_train_op.outputs['data_granularity_unit'],
      data_granularity_count=prepare_data_for_train_op.outputs['data_granularity_count'],
      predefined_split_column_name= '', # prepare_data_for_train_op.outputs['predefined_split_column'],
      column_transformations=prepare_data_for_train_op.outputs['column_transformations'],
      weight_column=prepare_data_for_train_op.outputs['weight_column'],
      optimization_objective='minimize-rmse',
      additional_experiments={
          'forecasting_model_type_override': 'seq2seq',
          'forecasting_hierarchical_group_column_names':'xksu, xstore'},
  )



In [18]:
kfp.v2.compiler.Compiler().compile(
  pipeline_func=pipeline, 
  package_path='sales_fsct_pipeline_spec.json',
)

In [19]:
PROJECT_ID = 'sk-ai-ml-poc' # <--- TODO: If not set
LOCATION = 'us-central1' # <--- TODO: If not set
location = 'us-central1' # <--- TODO: If not set
bq_location = 'us-central1' # <--- TODO: If not set


# BQ dataset for source data source
DATA_SOURCE_DATASET = 'h_data_1'


# training BQ tables
SALES_TABLE = 'sk-ai-ml-poc.h_data_1.train_data_1'  #  {type: 'string'} <---TODO: CHANGE THIS


# TODO: Forecasting Configuration:
HISTORY_WINDOW_n = 26 #  {type: 'integer'} # context_window
FORECAST_HORIZON = 26 #  {type: 'integer'} 
BUDGET_MILLI_NODE_HOURS = 1000



assert HISTORY_WINDOW_n, 'the value for this variable must be set'
assert FORECAST_HORIZON, 'the value for this variable must be set'
assert LOCATION, 'the value for this variable must be set'
assert PROJECT_ID, 'the value for this variable must be set'

In [20]:
overwrite = True # True creates new pipeline instance for execution

In [23]:
if not PIPELINES.get('train') or overwrite:
  response = pipeline_client.create_run_from_job_spec(
    job_spec_path='sales_fsct_pipeline_spec.json',
    # service_account=SERVICE_ACCOUNT, # <--- TODO: Uncomment if needed
    parameter_values={
      'vertex_project': PROJECT_ID,
      'location': LOCATION,
      'version': VERSION,
      'data_source_dataset': DATA_SOURCE_DATASET,
      'sales_table_uri': f'bq://{SALES_TABLE}',  
      'time_granularity_unit': 'WEEK',
      'time_granularity_quantity': 1,
      'context_window': HISTORY_WINDOW_n,
      'forecast_horizon': FORECAST_HORIZON,
      'override': OVERRIDE,
      'budget_milli_node_hours': BUDGET_MILLI_NODE_HOURS,
    },
    pipeline_root=f'{GS_PIPELINE_ROOT_PATH}/{VERSION}',
  )
  PIPELINES['train'] = response['name']
  # save_pipelines()