In [None]:
!pip3 install --upgrade google-cloud-core==1.7.1 kfp==1.8.2 google-cloud-aiplatform==1.4.3 google-cloud-storage==1.40.0

In [None]:
# Automatically restart kernel after installs
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)

In [None]:

!pip install -U google-cloud-pipeline-components

In [None]:
!python3 -c "import kfp; print('KFP version: {}'.format(kfp.__version__))"

## Set some variables

In [None]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin

# Required Parameters
USER = '<username>'  # @param {type: 'string'} <---CHANGE THIS
PROJECT_ID = '<project_id>'  # @param {type: 'string'} <---CHANGE THIS
LOCATION = 'us-central1'
BUCKET_NAME = "gs://" + PROJECT_ID + "-bucket"
BUCKET_URI = f"gs://{BUCKET_NAME}"

PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(BUCKET_NAME, USER)

  # @param {type: 'string'} <---CHANGE THIS

#SERVICE_ACCOUNT = '<service_account>'  # @param {type: 'string'} <---CHANGE THIS

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))

## Define a KFP pipeline that uses the forecasting components


In [None]:
import time
from kfp.v2 import components
from kfp.v2 import dsl
from kfp.v2 import compiler
from typing import NamedTuple

### Define a python function based component:

get_predict_table_path: converts the output of ForecastingPreprocessingOp to the inputs of ModelBatchPredictOp

In [None]:
@dsl.component(base_image='python:3.9')
def get_predict_table_path(
  predict_processed_table: dict
) -> NamedTuple('Outputs', [('preprocess_bq_uri', str)]):
  return predict_processed_table['processed_bigquery_table_uri'],

### Define a pipeline:

In [None]:
from google_cloud_pipeline_components.experimental import forecasting
from google_cloud_pipeline_components import aiplatform as gcc_aip


@dsl.pipeline(name='forecasting-pipeline-{}-{}'.format(USER, str(int(time.time()))))
def pipeline(project: str,
             location: str,
             training_input_table_specs: list,
             prediction_input_table_specs: list = [],
             preprocessing_bigquery_dataset: str = '',
             batch_predict_bigquery_dataset: str = '',
             model_feature_columns: list = [],
             optimization_objective: str = 'minimize-rmse',
             forecast_horizon: int = 1,
             budget_milli_node_hours: int = 1000,
             ):

  # training workflow:
  training_validation = forecasting.ForecastingValidationOp(
      input_tables=training_input_table_specs,
      validation_theme='FORECASTING_TRAINING')
  training_preprocess = forecasting.ForecastingPreprocessingOp(
      project=project,
      input_tables=training_input_table_specs,
      preprocessing_bigquery_dataset=preprocessing_bigquery_dataset)
  training_preprocess.after(training_validation)
  prepare_data_for_train_op = forecasting.ForecastingPrepareDataForTrainOp(
    input_tables=training_input_table_specs,
    preprocess_metadata=training_preprocess.outputs['preprocess_metadata'],
    model_feature_columns=model_feature_columns,
  )
  dataset_create_op = gcc_aip.TimeSeriesDatasetCreateOp(
    display_name='training_dataset',
    bq_source=prepare_data_for_train_op.outputs['preprocess_bq_uri'],
    project=project,
    location=location,
  )

  train_op = gcc_aip.AutoMLForecastingTrainingJobRunOp(
      display_name='train-forecasting-model',
      time_series_identifier_column=(
        prepare_data_for_train_op.outputs['time_series_identifier_column']
      ),
      time_series_attribute_columns=(
        prepare_data_for_train_op.outputs['time_series_attribute_columns']
      ),
      available_at_forecast_columns=(
        prepare_data_for_train_op.outputs['available_at_forecast_columns']
      ),
      unavailable_at_forecast_columns=(
        prepare_data_for_train_op.outputs['unavailable_at_forecast_columns']
      ),
      column_transformations=(
        prepare_data_for_train_op.outputs['column_transformations']
      ),
      dataset=dataset_create_op.outputs['dataset'],
      target_column=prepare_data_for_train_op.outputs['target_column'],
      time_column=prepare_data_for_train_op.outputs['time_column'],
      forecast_horizon=forecast_horizon,
      data_granularity_unit=(
          prepare_data_for_train_op.outputs['data_granularity_unit']
      ),
      data_granularity_count=(
          prepare_data_for_train_op.outputs['data_granularity_count']
      ),
      budget_milli_node_hours=budget_milli_node_hours,
      project=project,
      location=location,
      optimization_objective=optimization_objective,
  )


  # prediction workflow:
  prediction_validation = forecasting.ForecastingValidationOp(
      input_tables=prediction_input_table_specs,
      validation_theme='FORECASTING_PREDICTION')
  prediction_preprocess = forecasting.ForecastingPreprocessingOp(
      project=project,
      input_tables=prediction_input_table_specs,
      preprocessing_bigquery_dataset=preprocessing_bigquery_dataset)
  prediction_preprocess.after(prediction_validation)
  predict_table_path_op = get_predict_table_path(
      prediction_preprocess.outputs['preprocess_metadata'])
  model_batch_predict_op = gcc_aip.ModelBatchPredictOp(
      model=train_op.outputs['model'],
      job_display_name='prediction',
      bigquery_source_input_uri=predict_table_path_op.outputs['preprocess_bq_uri'],
      instances_format='bigquery',
      predictions_format='bigquery',
      project=project,
      location=location,
      bigquery_destination_output_uri=f'bq://{project}.{batch_predict_bigquery_dataset}',
    )

### Compile the pipeline:

In [None]:
compiler.Compiler().compile(pipeline_func=pipeline,
                            package_path='forecasting_pipeline_spec.json')

In [None]:
# Primary table is required by training and prediction workflow
primary_table_uri = 'bq://<project-id>.iowa_liquor_sales_dataset.sales_table'  # @param {type: 'string'} <---CHANGE THIS
primary_table_specs = {  # <---CHANGE THIS
    'bigquery_uri': primary_table_uri,
    'table_type': 'FORECASTING_PRIMARY',
    'forecasting_primary_table_metadata': {
        'time_column': 'datetime',
        'target_column': 'gross_quantity',
        'time_series_identifier_columns': ['product_id', 'location_id'],
        'unavailable_at_forecast_columns': ['sale_dollars', 'state_bottle_cost', 'state_bottle_retail', 'pack', 'bottle_volume_ml', 'volume_sold_liters', 'volume_sold_gallons'],
        'time_granularity': {'unit': 'DAY', 'quantity': 1 },
        'predefined_splits_column': 'ml_use'
    }
}

# Attribute tables are optional
attribute_table_specs_list = []  # <---CHANGE THIS
attribute_table_uri1 = 'bq://<project-id>.iowa_liquor_sales_dataset.product_table'  # @param {type: 'string'} <---CHANGE THIS
attribute_table_specs_list.append({
    'bigquery_uri': attribute_table_uri1,
    'table_type': 'FORECASTING_ATTRIBUTE',
    'forecasting_attribute_table_metadata': {
        'primary_key_column': 'product_id'
    }
})

attribute_table_uri2 = 'bq://<project-id>.iowa_liquor_sales_dataset.location_table'  # @param {type: 'string'} <---CHANGE THIS
attribute_table_specs_list.append({
    'bigquery_uri': attribute_table_uri2,
    'table_type': 'FORECASTING_ATTRIBUTE',
    'forecasting_attribute_table_metadata': {
        'primary_key_column': 'location_id'
    }
})

# Plan table is required by prediction workflow
plan_table_uri = 'bq://<project-id>.iowa_liquor_sales_dataset.plan_table'  # @param {type: 'string'} <---CHANGE THIS
plan_table_specs = {  # <---CHANGE THIS
    'bigquery_uri': plan_table_uri,
    'table_type': 'FORECASTING_PLAN'
}

# Optional BigQuery dataset for saving the preprocessing output tables.
# If empty, a new dataset will be created by the preprocessing component.
PREPROCESS_OUTPUT_DATASET = 'iowa_liquor_sales_output'  # @param {type: 'string'} <---CHANGE THIS

# Optional Bigquery dataset for saving the batch predcition output table.
BATCH_PREDICT_OUTPUT_DATASET = 'iowa_liquor_sales_output'  # @param {type: 'string'} <---CHANGE THIS

TRAINING_INPUT = [primary_table_specs]
TRAINING_INPUT.extend(attribute_table_specs_list)

PREDICTION_INPUT = TRAINING_INPUT.copy()
PREDICTION_INPUT.append(plan_table_specs)

FORECAST_HORIZON = 7  # @param {type: 'int'} <---CHANGE THIS
TRAINING_BUDGET_MILLI_NODE_HOURS = 1000  # @param {type: 'int'} <---CHANGE THIS

### Submit the pipeline job
Here, we'll create an API client using the API key you generated.

Then, we'll submit the pipeline job by passing the compiled spec to the create_run_from_job_spec() method.



In [None]:
from kfp.v2.google.client import AIPlatformClient


api_client = AIPlatformClient(project_id=PROJECT_ID, region=LOCATION,)

parameter_values = {
    'project': PROJECT_ID,
    'location': LOCATION,
    'training_input_table_specs': TRAINING_INPUT,
    'prediction_input_table_specs': PREDICTION_INPUT,
    'preprocessing_bigquery_dataset': PREPROCESS_OUTPUT_DATASET,
    'batch_predict_bigquery_dataset': BATCH_PREDICT_OUTPUT_DATASET,
    'forecast_horizon': FORECAST_HORIZON,
    'budget_milli_node_hours': TRAINING_BUDGET_MILLI_NODE_HOURS,
}

response = api_client.create_run_from_job_spec(
    job_spec_path='forecasting_pipeline_spec.json',
    parameter_values=parameter_values,
    pipeline_root=PIPELINE_ROOT,
#    service_account=SERVICE_ACCOUNT
)