# Trigger Vertex Pipelines with Cloud Functions and PubSub

> **TODO**

In [None]:
pipe-triggers/requirements.txt trigger-vertex-pipelines/

pipe-triggers/00-setup-env.ipynb trigger-vertex-pipelines/
pipe-triggers/01-pipeline-for-triggering.ipynb trigger-vertex-pipelines/
pipe-triggers/02-create-trigger.ipynb trigger-vertex-pipelines/
pipe-triggers/pipeline.yaml trigger-vertex-pipelines/

pipe-triggers/keep-local
pipe-triggers/cloud_function

## Set these variables

In [51]:
# naming convention for all cloud resources
VERSION        = "v1"                         # TODO - @ param {type:"string"}
PREFIX         = f'pipe-triggers-{VERSION}'   # TODO - @ param {type:"string"}

print(f"PREFIX = {PREFIX}")

PREFIX = pipe-triggers-v1


In [52]:
# staging GCS
GCP_PROJECTS             = !gcloud config get-value project
PROJECT_ID               = GCP_PROJECTS[0]

# GCS bucket and paths
BUCKET_NAME              = f'{PREFIX}-{PROJECT_ID}'
BUCKET_URI               = f'gs://{BUCKET_NAME}'

config = !gsutil cat {BUCKET_URI}/config/notebook_env.py
print(config.n)
exec(config.n)


PROJECT_ID             = "hybrid-vertex"
PROJECT_NUM            = "934903580331"

REGION                 = "us-central1"
BQ_REGION              = "US"
BQ_PUBLIC_DS_URI       = "bigquery-public-data.ml_datasets.census_adult_income"
DATASET_ID             = "census_pipe_triggers_v1"
VIEW_NAME              = "census_data"
BQ_LOG_DATA_URI        = "hybrid-vertex.census_pipe_triggers_v1.census_training_table"

VPC_NETWORK_NAME       = "ucaip-haystack-vpc-network"
VPC_NETWORK_FULL       = "projects/934903580331/global/networks/ucaip-haystack-vpc-network"

SERVICE_ACCOUNT        = "934903580331-compute@developer.gserviceaccount.com"

VERSION                = "v1"
PREFIX                 = "pipe-triggers-v1"

BUCKET_NAME            = "pipe-triggers-v1-hybrid-vertex"
BUCKET_URI             = "gs://pipe-triggers-v1-hybrid-vertex"

TOPIC_PATH             = "projects/hybrid-vertex/topics/pipe-triggers-v1-topic"
PUBSUB_TOPIC           = "projects/hybrid-vertex/topics/pipe-triggers-v1-topic"

PIPELI

## create Cloud Function

* gcloud [API reference](https://cloud.google.com/sdk/gcloud/reference/functions/deploy)

**source**
> For the `--source flag`, specify a local filesystem path to the root directory of the function source code - see [Source directory structure](https://cloud.google.com/functions/docs/writing#directory-structure). The current working directory is used if this flag is omitted.

```
.
├── main.py
└── requirements.txt
```

The `requirements.txt` file must include the Functions Framework for Python as a dependency:

```python
functions-framework==3.*
```

**Entry point**
* The source code must define an `entry-point` for your function, which is the particular code that is executed when the Cloud Function is invoked. * You specify this entry point when you deploy your function
* see [documentation](https://cloud.google.com/functions/docs/writing#entry-point) for details


**Event type**
* `google.cloud.pubsub.topic.v1.messagePublished` - Cloud Pub/Sub: A message is published to the specified Pub/Sub topic

#### write config to py file

In [11]:
from cloud_function import env_config as env_config

env_config.PIPELINE_DISPLAY_NAME

'census-pipe-triggers-v1'

Reference this PubSub topic in your Cloud Function

In [15]:
env_config.PUBSUB_TOPIC

'projects/hybrid-vertex/topics/pipe-triggers-v1-topic'

In [12]:
# CF_ENTRYPOINT = "check_table_size" # see function in $CF_DIR/main.py

## Deploy Cloud Function trigger

* Deploy a Cloud Function [basics](https://cloud.google.com/functions/docs/deploy)
* see the `gcloud function deploy` [API reference](https://cloud.google.com/sdk/gcloud/reference/functions/deploy)
* public example for [testing cloud functions locally](https://towardsdatascience.com/how-to-develop-and-test-your-google-cloud-function-locally-96a970da456f)

In [22]:
!tree cloud_function

[01;34mcloud_function[00m
├── [01;34m__pycache__[00m
│   └── env_config.cpython-37.pyc
├── env_config.py
├── main.py
└── requirements.txt

1 directory, 4 files


In [34]:
# ! gcloud functions runtimes list
# ! gcloud functions event-types list --gen2

In [31]:
# CLOUD_FUNCTION_NAME = f"cf-{PREFIX}"

# tmp - testing 
CLOUD_FUNCTION_NAME = f"cf-{PREFIX}-test-1"

# cloud function runtime vars
CF_MEMORY_ALLOCATION = 512 # 256
CF_RUNTIME           = 'python310' 
CF_SOURCE_DIR        = 'cloud_function'                  # see function in $CF_DIR/main.py
CF_ENTRYPOINT        = 'check_table_size'
CF_EVENT_TYPE        = 'google.cloud.pubsub.topic.v1.messagePublished'
CF_TRIGGER_TOPIC     = env_config.PUBSUB_TOPIC.split('/')[-1]

print(f"CLOUD_FUNCTION_NAME  : {CLOUD_FUNCTION_NAME}")
print(f"CF_MEMORY_ALLOCATION : {CF_MEMORY_ALLOCATION}")
print(f"CF_RUNTIME           : {CF_RUNTIME}")
print(f"CF_SOURCE_DIR        : {CF_SOURCE_DIR}")
print(f"CF_ENTRYPOINT        : {CF_ENTRYPOINT}")
print(f"CF_EVENT_TYPE        : {CF_EVENT_TYPE}")
print(f"CF_TRIGGER_TOPIC     : {CF_TRIGGER_TOPIC}")

CLOUD_FUNCTION_NAME  : cf-pipe-triggers-v1-test-1
CF_MEMORY_ALLOCATION : 512
CF_RUNTIME           : python310
CF_SOURCE_DIR        : cloud_function
CF_ENTRYPOINT        : check_table_size
CF_EVENT_TYPE        : google.cloud.pubsub.topic.v1.messagePublished
CF_TRIGGER_TOPIC     : pipe-triggers-v1-topic


In [35]:
! gcloud functions deploy --quiet $CLOUD_FUNCTION_NAME \
    --gen2 \
    --runtime=$CF_RUNTIME \
    --region=$REGION \
    --source=./$CF_SOURCE_DIR \
    --entry-point=$CF_ENTRYPOINT \
    --no-allow-unauthenticated \
    --memory=$CF_MEMORY_ALLOCATION \
    --trigger-topic=$CF_TRIGGER_TOPIC \
    --trigger-service-account=$SERVICE_ACCOUNT

Preparing function...done.                                                     
Deploying function...                                                          
  . [Build]                                                                    
  . [Service]                                                                  
  . [Trigger]                                                                  
  . [ArtifactRegistry]                                                         
  . [Healthcheck]                                                              
  . [Triggercheck]                                                             
  Deploying function...                                                        






⠛ Deploying function...                                                        
  ⠛ [Build]                                                                    





⠹ Deploying function...                                                        
  ⠹ [Build] Build in progress

> Note:  assign the Invoker role (roles/run.invoker) through Cloud Run for 2nd gen functions if you want to allow the function to receive requests from additional principals or other given authorities in IAM

In [None]:
# !gcloud functions add-invoker-policy-binding $TOPIC_ID \
#       --region=$REGION \
#       --member=serviceAccount:$SERVICE_ACCOUNT

### Inspect cloud function and trigger...

In [36]:
! gcloud functions list --regions=$REGION

NAME                               STATE   TRIGGER                        REGION       ENVIRONMENT
cf-pipe-triggers-v1-test-1         ACTIVE  topic: pipe-triggers-v1-topic  us-central1  2nd gen
logger-cloud-function-e2ev4        ACTIVE  Event Trigger                  us-central1  1st gen
logger-cloud-function-tabsimv1     ACTIVE  Event Trigger                  us-central1  1st gen
simulator-cloud-function           ACTIVE  Event Trigger                  us-central1  1st gen
simulator-cloud-function-tabsimv1  ACTIVE  Event Trigger                  us-central1  1st gen
trigger-8451                       ACTIVE  topic: pipeline-topic          us-central1  2nd gen


In [58]:
# ! gcloud functions describe $CLOUD_FUNCTION_NAME

### View initial BQ Table rows

In [38]:
from google.cloud import bigquery

bq_client = bigquery.Client()

In [39]:
data_table = bq_client.get_table(f"{BQ_LOG_DATA_URI}")

current_rows = data_table.num_rows
current_rows

1000

## Invoke Trigger - 1

In [43]:
from google.cloud import pubsub_v1

publisher = pubsub_v1.PublisherClient()

print(f"TOPIC_PATH: {TOPIC_PATH}")

TOPIC_PATH: projects/hybrid-vertex/topics/pipe-triggers-v1-topic


In [44]:
data_str = f"{BQ_LOG_DATA_URI}"
data = data_str.encode("utf-8")
print(data)

b'hybrid-vertex.census_pipe_triggers_v1.census_training_table'


In [45]:
# When you publish a message, the client returns a future.
future = publisher.publish(TOPIC_PATH, data)

print(future.result())

8868976232872879


## Add data to BQ table to simulate trigger

In [53]:
query = f"""
CREATE OR REPLACE TABLE `{BQ_LOG_DATA_URI}` AS (
   SELECT * FROM `{BQ_PUBLIC_DS_URI}` 
   LIMIT 1500
)
"""
print(query)


CREATE OR REPLACE TABLE `hybrid-vertex.census_pipe_triggers_v1.census_training_table` AS (
   SELECT * FROM `bigquery-public-data.ml_datasets.census_adult_income` 
   LIMIT 1500
)



In [55]:
%%time
bq_client.query(query).result()

CPU times: user 11.3 ms, sys: 5.14 ms, total: 16.4 ms
Wall time: 2.29 s


<google.cloud.bigquery.table._EmptyRowIterator at 0x7f4bc56bae90>

This should add 500 new training data examples to the table being montirored for pipeline triggering

In [56]:
data_table = bq_client.get_table(f"{BQ_LOG_DATA_URI}")

current_rows = data_table.num_rows
current_rows

1500

## Invoke Trigger - 2

In [57]:
# When you publish a message, the client returns a future.
future = publisher.publish(TOPIC_PATH, data)

print(future.result())

8868939902098403
