# Using Google Cloud Functions to support event-based triggering of Kubeflow Pipelines

This notebook shows how you can run a Kubeflow Pipeline from a [Google Cloud Function](https://cloud.google.com/functions/docs/), thus providing a way for Pipeline runs to be triggered by events (in the interim before this is supported by Pipelines itself).  

In this example, the function is triggered by the addition of or update to a file in a [Google Cloud Storage](https://cloud.google.com/storage/) (GCS) bucket, but Cloud Functions can have other triggers too (including [Pub/Sub](https://cloud.google.com/pubsub/docs/)-based triggers).

The example is Google Cloud Platform (GCP)-specific, and requires an [IAP](https://cloud.google.com/iap/)-enabled Kubeflow install on GKE.


## Deploy a Kubeflow cluster on GKE using IAP

Deploy an **IAP-enabled** Kubeflow cluster on GKE.  The [launcher web app](https://deploy.kubeflow.cloud/#/deploy) is recommended. Follow [the instructions](https://www.kubeflow.org/docs/gke/deploy/oauth-setup/) to create an Oauth client ID and secret. 

Note the client ID, which you'll need later in the notebook to create a Pipelines SDK client. Note the IAP endpoint too, which should be of the form: `https://<deployment-name>.endpoints.<project>.cloud.goog`.


## Create a simple GCF function to test your configuration

First we'll generate and deploy a simple GCF function, to test that the basics are properly configured.  

In [None]:
%%bash
mkdir -p functions

Next, create a `requirements.txt` file in the `functions` directory, telling GCF that it needs to install the Kubeflow Pipelines SDK as part of the deployment. This will be required for the second function we'll define.

In [None]:
%%writefile functions/requirements.txt
kfp

**Before executing the next cell**, edit it to set the `TRIGGER_BUCKET` environment variable to a Google Cloud Storage bucket ([create a bucket first](https://console.cloud.google.com/storage/browser) if necessary). Do *not* include the `gs://` prefix in the bucket name.

We'll deploy the GCF function so that it will trigger on new and updated files (blobs) in this bucket.

In [None]:
%env TRIGGER_BUCKET=REPLACE_WITH_YOUR_GCS_BUCKET_NAME

Next, we'll create a simple GCF function in the `functions/main.py` file:

In [None]:
%%writefile functions/main.py
import logging

def gcs_test(data, context):
  """Background Cloud Function to be triggered by Cloud Storage.
     This generic function logs relevant data when a file is changed.

  Args:
      data (dict): The Cloud Functions event payload.
      context (google.cloud.functions.Context): Metadata of triggering event.
  Returns:
      None; the output is written to Stackdriver Logging
  """

  logging.info('Event ID: {}'.format(context.event_id))
  logging.info('Event type: {}'.format(context.event_type))
  logging.info('Data: {}'.format(data))
  logging.info('Bucket: {}'.format(data['bucket']))
  logging.info('File: {}'.format(data['name']))
  file_uri = 'gs://%s/%s' % (data['bucket'], data['name'])
  logging.info('Using file uri: %s', file_uri)

  logging.info('Metageneration: {}'.format(data['metageneration']))
  logging.info('Created: {}'.format(data['timeCreated']))
  logging.info('Updated: {}'.format(data['updated']))

Deploy the GCF function as follows. (You'll need to wait a moment or two for output of the deployment to display in the notebook).  You can also run this command from a notebook terminal window in the `functions` subdirectory.

In [None]:
%%bash
cd functions
gcloud functions deploy gcs_test --runtime python37 --trigger-resource ${TRIGGER_BUCKET} --trigger-event google.storage.object.finalize

After you've deployed, test your deployment by adding a file to the specified `TRIGGER_BUCKET`.  Then check in the logs viewer panel (https://console.cloud.google.com/logs/viewer) to confirm that the GCF function was triggered and ran correctly.


## Deploy a Pipeline from a GCF function

Next, we'll create a GCF function that deploys a Kubeflow Pipeline when triggered.  For this to work, we need to first do two additional things: 

- give the 'appspot' member account additional permissions,
- then add that account as an `IAP-secured Web App User`.


### Update the `<your-project>@appspot.gserviceaccount.com` account permissions

When you deployed the GCF function above, a member account was automatically created (if it didn't already exist) of the form:
`<your-project>@appspot.gserviceaccount.com`.  Find this service account in the
[IAM panel of the cloud console](https://console.cloud.google.com/iam-admin/iam). 
    
Then, give the service account 'Service Account Token Creator' permissions.

![give the service account service account token creator permissions](https://storage.googleapis.com/amy-jo/images/kfp-deploy/Screenshot_2019-03-10_12_32_54.png)

![give the service account service account token creator permissions](https://storage.googleapis.com/amy-jo/images/kfp-deploy/Screenshot_2019-03-10_12_32_32.png)

### Add the new service account as an IAP-secured web app user

Next, add the new service account as an IAP-secured Web App User.  Visit [https://console.cloud.google.com/security/iap](https://console.cloud.google.com/security/iap). 
Click on `istio-system/istio-ingressgateway` (which was set up as part of your Kubeflow installation), then click on **ADD MEMBER**

![](https://storage.googleapis.com/amy-jo/images/kf-pls/IAP_webappuser_setup1.png)

Add the new service account as an IAP-secured Web App User.

![](https://storage.googleapis.com/amy-jo/images/kfp-deploy/Screenshot_2019-03-09_11_30_24x-2.png)

Now we're ready to create a version of the GCF function that deploys a pipeline.  First, preserve your existing `main.py` in a backup file:

In [None]:
%%bash
cd functions
mv main.py main.py.bak

Then, **before executing the next cell**, edit the `HOST` and `CLIENT_ID` variables in the code below. The client ID is the same Oauth client ID you generated for the IAP-based deployment of your Kubeflow cluster. The `HOST` is your IAP endpoint with `/pipeline` appended.

In [None]:
%%writefile functions/main.py
import datetime
import logging
import time

import kfp
import kfp.compiler as compiler
import kfp.dsl as dsl

from google.cloud import storage

# gcloud functions deploy gcs_update_count --runtime python37 \
#--trigger-resource <your_trigger_bucket> \
#--trigger-event google.storage.object.finalize

EXPERIMENT_NAME = 'e1'

# EDIT THE NEXT TWO LINES for your installation
HOST = 'https://<deployment-name>.endpoints.<project>.cloud.goog/pipeline'
CLIENT_ID = '<YOUR_IAP_CLIENT_ID>'


@dsl.pipeline(
  name='Sequential',
  description='A pipeline with two sequential steps.'
)
def sequential_pipeline(filename='gs://ml-pipeline-playground/shakespeare1.txt'):
  """A pipeline with two sequential steps."""

  op1 = dsl.ContainerOp(
     name='filechange',
     image='library/bash:4.4.23',
     command=['sh', '-c'],
     arguments=['echo "%s" > /tmp/results.txt' % filename],
     file_outputs={'newfile': '/tmp/results.txt'})
  op2 = dsl.ContainerOp(
     name='echo',
     image='library/bash:4.4.23',
     command=['sh', '-c'],
     arguments=['echo "%s"' % op1.outputs['newfile']]
     )


def gcs_deploy_pipeline(data, context):
  """Background Cloud Function to be triggered by Cloud Storage.
     This generic function logs relevant data when a file is changed.

  Args:
      data (dict): The Cloud Functions event payload.
      context (google.cloud.functions.Context): Metadata of triggering event.
  Returns:
      None; the output is written to Stackdriver Logging
  """

  logging.info('Event ID: {}'.format(context.event_id))
  logging.info('Event type: {}'.format(context.event_type))
  logging.info('Data: {}'.format(data))
  logging.info('Bucket: {}'.format(data['bucket']))
  logging.info('File: {}'.format(data['name']))
  file_uri = 'gs://%s/%s' % (data['bucket'], data['name'])
  logging.info('Using file uri: %s', file_uri)

  logging.info('Metageneration: {}'.format(data['metageneration']))
  logging.info('Created: {}'.format(data['timeCreated']))
  logging.info('Updated: {}'.format(data['updated']))


  logging.info('attempting to launch pipeline run.')
  ts = int(datetime.datetime.utcnow().timestamp() * 100000)
  client = kfp.Client(
      host=HOST, client_id=CLIENT_ID)
  compiler.Compiler().compile(sequential_pipeline, '/tmp/sequential.tar.gz')
  exp = client.create_experiment(name=EXPERIMENT_NAME)  # this is a 'get or create' op
  res = client.run_pipeline(exp.id, 'sequential_' + str(ts), '/tmp/sequential.tar.gz',
                              params={'filename': file_uri})
  # alternately, if you want to launch a run using an already-existing pipeline, you can pass
  # the pipeline ID as follows, replacing 'your-pipeline-id' with the actual ID.
  # res = client.run_pipeline(exp.id, 'your-pipeline-name',
  #                           params={...}, pipeline_id='your-pipeline-id')
  logging.info(res)


You can see that we're passing the name of the added or updated GCS file (the file that triggered the GCF function) to our (very simple) example pipeline as an input parameter.

Next, deploy the GCF function that you just created. As before, it will take a moment or two for the results of the deployment to display in the notebook.

In [None]:
%%bash
cd functions
gcloud functions deploy gcs_deploy_pipeline --runtime python37 --trigger-resource ${TRIGGER_BUCKET} --trigger-event google.storage.object.finalize

Add another file to your `TRIGGER_BUCKET`. This time you should see both GCF functions triggered. The `gcs_deploy_pipeline` function will deploy the pipeline. You'll be able to see it running at your Kubeflow cluster's IAP endpoint, `https://<deployment-name>.endpoints.<project>.cloud.goog/pipeline`, under the given Pipelines Experiment (`e1` as default).

### Finding and using the ID of an existing pipeline to launch a run

In the scenario above, we ran a pipeline by uploading its compiled archive.
Sometimes, you might want to trigger a run of an existing pipeline, that has already been uploaded previously (e.g., via the `upload_pipeline` method, or via the web UI). 

For this, you need the ID of the pipeline.  You can get it via the `list_pipelines` method, filtering on the name of the pipeline that you want.

See one of the other notebooks in this directory, e.g. [kfp_remote_deploy-IAP.ipynb](kfp_remote_deploy-IAP.ipynb), for an example of defining and using a utility function that uses `list_pipelines` to find a pipeline's ID given its name. You would add that code to your `functions/main.py` file, then re-deploy it to GCF.

## Communicating pipeline parameters via a GCF trigger file

In the simple example above, we showed how to pass the name of the added/updated GCS file — obtained from the GCF function arguments— as a pipeline parameter.
More realistically, you may often want to communicate information via the GCS file *contents*, which can then be extracted and passed as pipeline parameter(s).

For example, suppose you are [exporting data labeled by the AI Platform Data Labeling Service](https://cloud.google.com/data-labeling/docs/export#datalabel-example-python).  As part of the function you write to **export the labeled data to GCS**, you could include a call to the `write_export_info` utility function below (or something similar), where you write the location of the exported data to the `$TRIGGER_BUCKET` you set up for your GCS function.

In [None]:
def write_export_info(bucket_name, file_name, export_info_string):
  """Utility function to write a string to a GCS file."""
  from google.cloud import storage

  storage_client = storage.Client()
  bucket = storage_client.get_bucket(bucket_name)  # e.g., a GCF 'trigger' bucket. Don't include the 'gs://'-- just the name
  blob = bucket.blob(file_name)  # don't include a leading /
  blob.upload_from_string(export_info_string)

Then, in your GCF function, you can add code to read the contents of the GCS trigger file. For example, you could add a function similar to this to your GCF `main.py` file:

In [None]:
def read_export_info(bucket_name, file_name):
  storage_client = storage.Client()
  bucket = storage_client.get_bucket(bucket_name)
  blob = bucket.blob(file_name)
  return blob.download_as_string()

Then, do any necessary parsing of that string to extract parameter information, and make a call to `run_pipeline` using those extracted parameters.  That might look something like this snippet, if you were editing the `main.py` we created above:

In [None]:
def gcs_deploy_pipeline(data, context):
  ...
  export_info = read_export_info(data['bucket'], data['name'])
  ...
  client.run_pipeline(exp.id, 'sequential_' + str(ts), '/tmp/sequential.tar.gz',
                            params={'filename': str(export_info)})
  
  

------------------------------------------
Copyright 2019, Google, LLC.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.