# BigQuery + Cloud Functions: how to run your queries as soon as a new Google Analytics table is available

https://towardsdatascience.com/bigquery-cloud-functions-how-to-run-your-queries-as-soon-as-a-new-google-analytics-table-is-17fbb62f8aaa



## 0 - setup

1. Setup GCP - run `00_setup_env.sh`  - enable APIs, create GCS bucket 
3. Setup BQ - run `01_setup_bq.sh` - ingest sample data to GCS bucket, create target BQ dat

In [None]:
# TODO @justinjm - add creation and setup of service account 

# # TODO - Set the name of your project
# PROJECT_ID="your-project-id" 
# # TODO - Set the name of your service account
# SA_NAME="bq-scheduler" 
# SA_EMAIL="${SA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com" 

# # TODO - set target user that will schedule BQ queries
# USER_EMAIL="email@company.com" # can also be group 

# # Create the service account
# gcloud iam service-accounts create $SA_NAME --project $PROJECT_ID

# ## service account access --------------------------------------------
# ## Grant the service account project editor permissions
# ## or `roles/bigquery.jobUser` if minimal required
# gcloud projects add-iam-policy-binding $PROJECT_ID \
#   --member "serviceAccount:${SA_EMAIL}" \
#   --role "roles/bigquery.admin" \
#   --condition="None"

# ## user group access --------------------------------------------
# gcloud projects add-iam-policy-binding $PROJECT_ID \
#   --member="user:${USER_EMAIL}" \
#   --role="roles/bigquery.user"  \
#   --condition="None"

# gcloud projects add-iam-policy-binding $PROJECT_ID \
#   --member="user:${USER_EMAIL}" \
#   --role="roles/iam.serviceAccountViewer" \
#   --condition="None"

# ## give users/groups aaccess 
# ## https://cloud.google.com/iam/docs/service-account-permissions
# gcloud iam service-accounts add-iam-policy-binding "${SA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com" \
#   --member="user:${USER_EMAIL}" \
#   --role="roles/iam.serviceAccountUser" \
#   --condition="None"


In [1]:
#TODO @justinjm - create scheduled query as part of setup BQ
# https://cloud.google.com/bigquery/docs/scheduling-queries#python_1

In [None]:
# from google.cloud import bigquery_datatransfer

# transfer_client = bigquery_datatransfer.DataTransferServiceClient()

# # The project where the query job runs is the same as the project
# # containing the destination dataset.
# project_id = "your-project-id"
# dataset_id = "your_dataset_id"

# # This service account will be used to execute the scheduled queries. Omit
# # this request parameter to run the query as the user with the credentials
# # associated with this client.
# service_account_name = "abcdef-test-sa@abcdef-test.iam.gserviceaccount.com"

# # Use standard SQL syntax for the query.
# query_string = """
# SELECT * FROM `demos-vertex-ai.bq_eventarc_queries_demo.loan_201` LIMIT 10
# """

# parent = transfer_client.common_project_path(project_id)

# transfer_config = bigquery_datatransfer.TransferConfig(
#     destination_dataset_id=dataset_id,
#     display_name="Your Scheduled Query Name",
#     data_source_id="scheduled_query",
#     params={
#         "query": query_string,
#         "destination_table_name_template": "your_table_{run_date}",
#         "write_disposition": "WRITE_TRUNCATE",
#         "partitioning_field": "",
#     },
#     schedule="every 24 hours",
# )

# transfer_config = transfer_client.create_transfer_config(
#     bigquery_datatransfer.CreateTransferConfigRequest(
#         parent=parent,
#         transfer_config=transfer_config,
#         service_account_name=service_account_name,
#     )
# )

# print("Created scheduled query '{}'".format(transfer_config.name))

## 1 - cloud logging filter 


Demo Version:

```txt
protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.load.destinationTable.datasetId="bq_eventarc_queries_demo"
protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.load.destinationTable.projectId="demos-vertex-ai"
protoPayload.methodName="jobservice.jobcompleted"
protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.load.destinationTable.tableId:"loan_201"
```


Google Analytics Version: 

```txt
protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.load.destinationTable.datasetId="[REPLACE_WITH_YOUR_DATASET_ID]"
protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.load.destinationTable.projectId="REPLACE_WITH_YOUR_PROJECT_ID"
protoPayload.authenticationInfo.principalEmail="analytics-processing-dev@system.gserviceaccount.com"
protoPayload.methodName="jobservice.jobcompleted"
protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.load.destinationTable.tableId:"ga_sessions"
NOT protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.load.destinationTable.tableId:"ga_sessions_intraday"
```

In [2]:
!PROJECT_ID=$(gcloud config get-value project)
!PROJECT_NUMBER=$(gcloud projects describe $(gcloud config get-value project) --format='value(projectNumber)')

In [None]:
## Create Pub/Sub topic
!gcloud pubsub topics create bq-load-events-topic

In [None]:
## create log sink filter based on query above 
!gcloud logging sinks create bq-load-events-sink "pubsub.googleapis.com/projects/${PROJECT_ID}/topics/bq-load-events-topic" \
    --log-filter='protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.load.destinationTable.datasetId=\"bq_eventarc_queries_demo\" AND protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.load.destinationTable.projectId=\"demos-vertex-ai\" AND protoPayload.methodName=\"jobservice.jobcompleted\" AND protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.load.destinationTable.tableId:\"loan_201\"'

In [None]:
# grant `serviceAccount:service-PROJECT_NUMBER@gcp-sa-logging.iam.gserviceaccount.com` the Pub/Sub Publisher role on the topic.
# More information about sinks can be found at https://cloud.google.com/logging/docs/export/configure_export
!gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member="serviceAccount:service-${PROJECT_NUMBER}@gcp-sa-logging.iam.gserviceaccount.com" \
  --role="roles/pubsub.publisher" \
  --project=$PROJECT_ID \
  --condition=None

## 3 - Create Cloud Function 



#### Create Cloud Function

Create and deploy a Cloud function from the source code in the [functions](functions/) directory:



### Create CF files 

First we create necessary files 

In [4]:
# ! rm -rf functions/
!mkdir functions

In [7]:
%%writefile functions/main.py 
import time
from google.protobuf.timestamp_pb2 import Timestamp
from google.cloud import bigquery_datatransfer_v1

def runQuery (parent, requested_run_time):
    client = bigquery_datatransfer_v1.DataTransferServiceClient()
    projectid = '[enter your projectId here]' # Enter your projectID here
    transferid = '[enter your transferId here]'  # Enter your transferId here
    parent = client.project_transfer_config_path(projectid, transferid)
    start_time = bigquery_datatransfer_v1.types.Timestamp(seconds=int(time.time() + 10))
    response = client.start_manual_transfer_runs(parent, requested_run_time=start_time)
    print(response)
    
# do not forget to put google-cloud-bigquery-datatransfer==1 in the requirements.txt

Overwriting functions/main.py


In [6]:
%%writefile functions/requirements.txt
google-cloud-bigquery-datatransfer==1

Writing functions/requirements.txt


In [None]:
# !gcloud functions deploy bq-eventarc-driven-queries-demo \
#   --gen2 \
#   --region=us-central1 \
#   --runtime=python311 \
#   --source=functions/ \
#   --entry-point=run \
#   --trigger-http \
#   --timeout=3600 \
#   --no-allow-unauthenticated

In [None]:
! gcloud functions deploy bq-eventarc-driven-queries-demo \
  --gen2 \
  --region=us-central1 \
  --runtime=python311 \
  --source=functions/ \
  --entry-point=run \
  --trigger-topic=YOUR_TOPIC_NAME \
  --timeout=3600 \
  --no-allow-unauthenticated