# Time series forecasting with Facebook prophet & Vertex AI Managed Pipelines

This notebook demonstrates how to train and deploy a Facebook Prophet time-series forecasting model using Vertex AI Managed Pipeline using components provided by Google.

Models will be created to predict the sales of various types of whisky using the public Iowa Liquor Sales Dataset.

## Configuration
Configure the following values before proceeding with this notebook:

In [None]:
# A directory in GCS to save artifacts from pipeline components
PIPELINE_ROOT = 'gs://...' # @param {type:'string'}
# A GCP project to run the pipeline in
GCP_PROJECT_ID = '' #@param {type:'string'}
# A friendly name for the forecasting model(s) created in this pipeline 
VERTEX_MODEL_DISPLAY_NAME = 'iowa-whisky-sales-forecasting' #@param {type:'string'}
# A directory in GCS to save the results of Vertex batch prediction
BATCH_PREDICT_GCS_PREFIX = 'gs://.../prophet_batch_prediction/' #@param {type:'string'}
# A directory in GCS to store model artifacts for Vertex batch prediction
GCS_MODEL_ARTIFACT_DIR = 'gs://.../prophet_models/' #@param {type:'string'}
# The path to a file in GCS that will be created by the pipeline to store the batch request payload
BATCH_PREDICT_GCS_SOURCE = 'gs://.../batch-request.jsonl' #@param {type:'string'}
# A friendly name for the batch request job
BATCH_PREDICT_DISPLAY_NAME = 'batch-prediction' #@param {type:'string'}

Import and install the prerequisite python packages. **The notebook runtime must be restarted after installing packages**. Under the 'Runtime' tab above click 'Restart Runtime'. 

In [None]:
%%capture
!pip install google-cloud-core==1.7.1 kfp==1.6.4 google-cloud-aiplatform==1.1.1 google-cloud-storage==1.40.0 google_cloud_pipeline_components==0.1.3
import json
from typing import List

from google_cloud_pipeline_components.aiplatform import ModelBatchPredictOp
from google_cloud_pipeline_components.aiplatform import ModelUploadOp as VertexModelImportOp
#from google_cloud_pipeline_components.google.facebook_prophet import FitProphetModelOp, ProphetPredictOp
from kfp.v2 import compiler
from kfp.v2 import dsl
from kfp.v2.components import InputPath, OutputPath
from kfp.v2.dsl import component
from kfp.v2.dsl import ParallelFor
from kfp.v2.google.client import AIPlatformClient

Authenticate and configure the Google Cloud SDK

In [None]:
!gcloud config set project {GCP_PROJECT_ID}

from google.colab import auth
auth.authenticate_user()

In [None]:
# TODO: Remove this block when the components have been published.
# Replace with python imports
from kfp.v2.components import load_component_from_text
import os
import json

_FitProphetModelOp = load_component_from_text("""
name: Facebook Prophet Model Fit
description: Fit a time-series forecasting model with Facebook Prophet

inputs:
  - {name: Data Source, type: Dataset}
  - {name: Features, type: List, optional: True}
  - {name: Countries, type: List, optional: True}
  - {name: N Changepoints, type: Integer, optional: True}
  - {name: Changepoint Range, type: Float, optional: True}
  - {name: Yearly Seasonality, type: String, optional: True}
  - {name: Weekly Seasonality, type: String, optional: True}
  - {name: Daily Seasonality, type: String, optional: True}
  - {name: Seasonality Mode, type: String, optional: True}
  - {name: Seasonality Prior Scale, type: Float, optional: True}
  - {name: Holidays Prior Scale, type: Float, optional: True}
  - {name: Changepoint Prior Scale, type: Float, optional: True}
  - {name: Mcmc Samples, type: Integer, optional: True}
  - {name: Interval Width, type: Float, optional: True}
  - {name: Uncertainty Samples, type: Integer, optional: True}
  - {name: Stan Backend, type: String, optional: True}
  - {name: Cross Validation Horizon, type: String, optional: True}
outputs:
  - {name: Model, type: Model}
  - {name: Cross Validation Results, type: Dataset}

implementation:
  container:
    image: us-docker.pkg.dev/vertex-ai/prophet/prophet
    command: [
      python3,
      /pipelines/component/src/fit_model.py
      ]
    args:
      - '--data_source'
      - {inputPath: Data Source}
      - if:
          cond:
            isPresent: Features
          then:
            - '--features'
            - {inputValue: Features}
      - if:
          cond:
            isPresent: Countries
          then:
            - '--countries'
            - {inputValue: Countries}
      - if:
          cond:
            isPresent: N Changepoints
          then:
            - '--n_changepoints'
            - {inputValue: N Changepoints}
      - if:
          cond:
            isPresent: Changepoint Range
          then:
            - '--changepoint_range'
            - {inputValue: Changepoint Range}
      - if:
          cond:
            isPresent: Yearly Seasonality
          then:
            - '--yearly_seasonality'
            - {inputValue: Yearly Seasonality}
      - if:
          cond:
            isPresent: Weekly Seasonality
          then:
            - '--weekly_seasonality'
            - {inputValue: Weekly Seasonality}
      - if:
          cond:
            isPresent: Daily Seasonality
          then:
            - '--daily_seasonality'
            - {inputValue: Daily Seasonality}
      - if:
          cond:
            isPresent: Seasonality Mode
          then:
            - '--seasonality_mode'
            - {inputValue: Seasonality Mode}
      - if:
          cond:
            isPresent: Seasonality Prior Scale
          then:
            - '--seasonality_prior_scale'
            - {inputValue: Seasonality Prior Scale}
      - if:
          cond:
            isPresent: Holidays Prior Scale
          then:
            - '--holidays_prior_scale'
            - {inputValue: Holidays Prior Scale}
      - if:
          cond:
            isPresent: Changepoint Prior Scale
          then:
            - '--changepoint_prior_scale'
            - {inputValue: Changepoint Prior Scale}
      - if:
          cond:
            isPresent: Mcmc Samples
          then:
            - '--mcmc_samples'
            - {inputValue: Mcmc Samples}
      - if:
          cond:
            isPresent: Interval Width
          then:
            - '--interval_width'
            - {inputValue: Interval Width}
      - if:
          cond:
            isPresent: Uncertainty Samples
          then:
            - '--uncertainty_samples'
            - {inputValue: Uncertainty Samples}
      - if:
          cond:
            isPresent: Stan Backend
          then:
            - '--stan_backend'
            - {inputValue: Stan Backend}
      - if:
          cond:
            isPresent: Cross Validation Horizon
          then:
            - '--cross_validation_horizon'
            - {inputValue: Cross Validation Horizon}
      - '--model'
      - {outputPath: Model}
      - '--cross_validation_results'
      - {outputPath: Cross Validation Results}
""")

_ProphetPredictOp = load_component_from_text("""
name: Facebook Prophet Forecast
description: Perform a time-series forecast using a fitted Facebook Prophet Model

inputs:
  - {name: Model, type: Model}
  - {name: Future Data Source, type: Artifact, optional: True}
  - {name: Periods, type: Integer, optional: True}
outputs:
  - {name: Prediction, type: Dataset}

implementation:
  container:
    image: us-docker.pkg.dev/vertex-ai/prophet/prophet
    command: [
      python3,
      /pipelines/component/src/prediction.py
      ]
    args:
      - '--model'
      - {inputPath: Model}
      - if:
          cond:
            isPresent: Future Data Source
          then:
            - '--future_data_source'
            - {inputPath: Future Data Source}
      - if:
          cond:
            isPresent: Periods
          then:
            - '--periods'
            - {inputValue: Periods}
      - '--prediction'
      - {outputPath: Prediction}
""")

# Convert compile-time list arguments to JSON strings
def SerializeListArgs(args: tuple, kwargs: dict):
  processed_args = [json.dumps(arg) if type(arg) is list else arg for arg in args]
  for key, value in kwargs.items():
    if type(value) is list:
      kwargs[key] = json.dumps(value)
  return tuple(processed_args), kwargs


def FitProphetModelOp(data_source, *args, **kwargs):
  args, kwargs = SerializeListArgs(args, kwargs)
  if type(data_source) is str:
    data_source = dsl.importer(artifact_uri=data_source, artifact_class=Dataset).output
  return _FitProphetModelOp(data_source, *args, **kwargs)

def ProphetPredictOp(*args, **kwargs):
  args, kwargs = SerializeListArgs(args, kwargs)
  if 'future_data_source' in kwargs and type(kwargs['future_data_source']) is str:
    kwargs['future_data_source'] = dsl.importer(artifact_uri=kwargs['future_data_source'], artifact_class=Dataset).output
  return _ProphetPredictOp(*args, **kwargs)


Helper function to compile and run a pipeline on Vertex

In [None]:
# Helper function to compile and run a pipeline on Vertex managed pipelines
def run_pipeline(pipeline,
                 enable_caching=True):
  compiler.Compiler().compile(
      pipeline_func=pipeline,
      package_path='{}.json'.format(pipeline.__name__))

  api_client = AIPlatformClient(
      project_id='cloud-automl-tables',
      region='us-central1',
  )

  return api_client.create_run_from_job_spec(
      job_spec_path='{}.json'.format(pipeline.__name__),
      pipeline_root=PIPELINE_ROOT,
      enable_caching=enable_caching)

# Pipeline Components

In [None]:
@component(packages_to_install=['google-cloud-bigquery', 'pandas', 'pyarrow'])
def download_data_op(products: List[str], project: str, output: OutputPath('CSV')):
  """Download the historical data from BigQuery."""
  from google.cloud import bigquery
  # Sum the number of bottles sold of each product for each day between Jan. 1st
  # 2012 to Jan. 1st 2017
  query = """
  SELECT date, LOWER(category_name) as category_name,
  SUM(bottles_sold) as bottles_sold
  FROM `bigquery-public-data.iowa_liquor_sales.sales`
  WHERE LOWER(category_name) IN ( "{}" )
  AND date BETWEEN '2012-01-01' AND '2017-01-01'
  GROUP BY date, category_name
  ORDER BY date""".format('", "'.join(products))
  bigquery.Client(project=project).query(query).to_dataframe().to_csv(output)


@component(packages_to_install=['pandas'])
def rename_and_filter_op(data: InputPath('CSV'), output: OutputPath()):
  """Rename the timestamp and target columns to 'ds' and 'y' respectively.
  Additionally, drop columns not needed for analysis.
  """
  import pandas as pd
  pd.read_csv(data).rename(columns={
      'date': 'ds',
      'bottles_sold': 'y',
      'category_name': 'product'
  })[['ds', 'y', 'product']].to_csv(output)


@component(packages_to_install=['pandas'])
def select_by_product_op(
    data: InputPath('CSV'), product: str, output: OutputPath()):
  """Drop rows that dont represent the sale of the specified product.
  Additionally, drop the column that identifies the product."""
  import pandas as pd
  df = pd.read_csv(data)
  df.loc[df['product'].str.lower() == product].drop(
      columns=['product']).to_csv(output)

@component(base_image='google/cloud-sdk')
def upload_model_to_gcs_op(model_file: InputPath(), model_artifact_dir: str,
                           model_display_name: str):
  # Remove the trailing slash on the model artifact dir if there is one
  if model_artifact_dir.endswith('/'):
    model_artifact_dir = model_artifact_dir[:-1]
  dest_uri = 'gs://{}/{}.model.json'.format(
      model_artifact_dir, model_display_name)
  import os
  os.system('gsutil cp {} {}'.format(model_file, dest_uri))

@component(base_image='google/cloud-sdk')
def create_batch_request_file_op(categories: List[str], periods: int,
                                       gcs_dest: str):
  import tempfile
  import os
  instance_format = '{{"time_series":"{}", "periods":{}}}'
  instances = [
      instance_format.format(product, periods) for product in categories]
  with tempfile.NamedTemporaryFile('w') as f:
    f.writelines(instances)
    os.system('gsutil cp {} {}'.format(f.name, gcs_dest))

# Predicting the sale of Irish whiskies
The following pipeline demonstrates how to train and make predictions for a single time-series using the `FitProphetModelOp` and `ProphetPredictOp` components.

In [None]:
@dsl.pipeline(name='iowa-irish-whisky-sales-forecast')
def irish_whisky_sales_forecast():
  download_task = download_data_op('["irish whiskies"]', GCP_PROJECT_ID)
  rename_and_filter_task = rename_and_filter_op(download_task.output)
  fit_model_task = FitProphetModelOp(rename_and_filter_task.output)
  predict_task = ProphetPredictOp(fit_model_task.outputs['model'], periods=365)

In [None]:
run = run_pipeline(irish_whisky_sales_forecast)

# Predicting the sale of various whiskies
The previous example demonstrated fitting a single Facebook Prophet model to predict the future sales of a single type of whisky. Multiple Prophet models can be trained in parallel using the `ParallelFor` construct. To make predictions against multiple models, they may be imported as a singular Vertex AI model where Vertex Batch Prediction may be used.

In [None]:
@dsl.pipeline(name='iowa-multiple-whiskies-sales-forecast')
def multiple_whiskies_sales_forecast():
  categories = [
      'blended whiskies',
      'canadian whiskies',
      'corn whiskies',
      'irish whiskies',
      'scotch whiskies',
      'straight rye whiskies',
  ]

  download_task = download_data_op(json.dumps(categories), GCP_PROJECT_ID)
  rename_and_filter_task = rename_and_filter_op(download_task.output)
  loop_task = ParallelFor(categories)
  with loop_task as product:
    # The following steps will run for each product
    select_task = select_by_product_op(rename_and_filter_task.output, product)
    fit_model_task = FitProphetModelOp(select_task.output)
    gcs_upload_task = upload_model_to_gcs_op(fit_model_task.outputs['model'],
                                             GCS_MODEL_ARTIFACT_DIR, product)
  model_import_op = VertexModelImportOp(
      project=GCP_PROJECT_ID,
      display_name=VERTEX_MODEL_DISPLAY_NAME,
      serving_container_image_uri='us-docker.pkg.dev/vertex-ai/prophet/prophet',
      artifact_uri=GCS_MODEL_ARTIFACT_DIR,
      serving_container_predict_route='/predict',
      serving_container_health_route='/health',
      serving_container_command=['./online_prediction_start.sh'],
      serving_container_ports=[8080]).after(loop_task)

  create_br_file_task = create_batch_request_file_op(json.dumps(categories),
                                                     365,
                                                     BATCH_PREDICT_GCS_SOURCE)

  ModelBatchPredictOp(
      project=GCP_PROJECT_ID,
      model=model_import_op.outputs['model'],
      job_display_name='Sales forecast',
      gcs_source=BATCH_PREDICT_GCS_SOURCE,
      gcs_destination_prefix=BATCH_PREDICT_GCS_PREFIX,
      machine_type='n1-standard-2').after(create_br_file_task)

In [None]:
run = run_pipeline(multiple_whiskies_sales_forecast)