##### Copyright 2020 Google Inc.

Licensed under the Apache License, Version 2.0 (the "License").
<!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at

      http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
-->


# Using Apache Beam on Google Cloud Dataflow to process streaming data

This notebook example demonstrates how to set up a streaming pipeline that processes a stream that contains NYC taxi ride data.

It's a modified version of an example from Google's example repository, and it reads the data from a public Google Cloud Pub/Sub topic that Google created for people to be able to test streaming data processing use-cases.

Each element in the stream contains the location of the taxi, the timestamp, the meter reading, the meter increment, the passenger count, and ride status in JSON format.

We will use Apache Beam to define the pipeline, run it on Google Cloud Dataflow, and store the results in Google Cloud BigQuery.
To get a better understanding of the Apache Beam constructs we use in this example, see [Apache Beam Basics](https://beam.apache.org/documentation/basics/), and the [Apache Beam Programming Guide](https://beam.apache.org/documentation/programming-guide/).


**Attention:** The code in this notebook creates Google Cloud resources that can incur costs.

Refer to the Google Cloud pricing documentation for details.

For example:

* [Vertex AI Pricing](https://cloud.google.com/vertex-ai/pricing)
* [Google Cloud Storage Pricing](https://cloud.google.com/storage/pricing)
* [BigQuery Pricing](https://cloud.google.com/bigquery/pricing)
* [Dataflow Pricing](https://cloud.google.com/dataflow/pricing)


Start with the necessary imports:

In [None]:
import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
import json

## Set Google Cloud resource variables

The following code will set variables specific to your Google Cloud resources that will be used in this notebook, such as the Project ID, Region, and GCS Bucket.

**Note: This notebook is intended to execute in a Vertex AI Workbench Notebook, in which case the API calls issued in this notebook are authenticated according to the permissions (e.g., service account) assigned to the Vertex AI Workbench Notebook.**

We will use the `gcloud` command to get the Project ID details from the local Google Cloud project, and assign the results to the PROJECT_ID variable. If, for any reason, PROJECT_ID is not set, you can set it manually or change it, if preferred.

We also use a default bucket name for most of the examples and activities in this book, which has the format: `{PROJECT_ID}-aiml-sa-bucket`. You can change the bucket name if preferred.

Also, we're defaulting to the **us-central1** region, but you can optionally replace this with your [preferred region](https://cloud.google.com/about/locations).

In [None]:
PROJECT_ID_DETAILS = !gcloud config get-value project
PROJECT_ID = PROJECT_ID_DETAILS[0]  # The project ID is item 0 in the list returned by the gcloud command
BUCKET=f"{PROJECT_ID}-aiml-sa-bucket" # Optional: replace with your preferred bucket name, which must be a unique name.
REGION="us-central1" # Optional: replace with your preferred region (See: https://cloud.google.com/about/locations) 
print(f"Project ID: {PROJECT_ID}")
print(f"Bucket Name: {BUCKET}")

## Create bucket

The following code will create the bucket if it doesn't already exist.

If you get an error saying that it already exists, that's fine, you can ignore it and continue with the rest of the steps, unless you want to use a different bucket.

In [None]:
!gsutil mb -l us-central1 gs://{BUCKET}

## Begin implementation

Now that we have performed the prerequisite steps for this activity, it's time to implement the activity.

Set additional variables, such as the relevant BigQuery dataset and table, the Google Cloud Pub/Sub topic that our pipeline will read from, and the Google Cloud Storage bucket that Dataflow will use to store the data in transit. 


In [None]:
# Google Cloud Storage location for Dataflow artifacts.
DATAFLOW_GCS_PATH = f'gs://{BUCKET}/dataflow'

# BigQuery dataset ID and table
DATASET_ID = "taxirides"
TABLE_ID = "run_rates"

# Pub/Sub source topic
topic = "projects/pubsub-public-data/topics/taxirides-realtime"

Specify the options to create the streaming pipeline.
For more information, see [Apache Beam Pipeline Options](https://beam.apache.org/releases/pydoc/2.4.0/apache_beam.options.pipeline_options.html)

In [None]:
# Setting up the Beam pipeline options.
options = beam.options.pipeline_options.PipelineOptions(flags={})

# Sets the pipeline mode to streaming, so we can stream the data from PubSub.
options.view_as(beam.options.pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the PubSub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

Create a pipeline with the options we just created:

In [None]:
p = beam.Pipeline(options=options)

The following creates a `PTransform` that will create a subscription to the given Pub/Sub topic and reads from the subscription. 
The data is in JSON format, so we add another `Map` `PTransform` to parse the data as JSON.

In [None]:
data = p | "read" >> beam.io.ReadFromPubSub(topic=topic) | beam.Map(json.loads)

Because we are reading from an unbounded source, we need to create a windowing scheme.
We will use sliding windows with a 10-second duration for each window, and with one second for each slide.
For more information about windowing in Apache Beam, see [Windowing Basics](https://beam.apache.org/documentation/programming-guide/#windowing-basics).


In [None]:
windowed_data = (data | "window" >> beam.WindowInto(beam.window.SlidingWindows(10, 1)))

Note that there will be some duplicate data for each element because with sliding windows each element has to appear in multiple
windows.

Now let's calculate the 10-second dollar run rate for each second, by summing the `meter_increment` JSON field for each window.

First, extract the `meter_increment` field from the JSON object.

In [None]:
meter_increments = windowed_data | beam.Map(lambda e: e.get('meter_increment'))

Now sum all elements by window:

In [None]:
run_rates = meter_increments | beam.CombineGlobally(sum).without_defaults()

Now we have our pipeline defined. Next, we will specify some additional options in order to run this pipeline on Google Cloud Dataflow.

In [None]:
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = REGION

# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = f'{DATAFLOW_GCS_PATH}/staging'

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = f'{DATAFLOW_GCS_PATH}/temp'


Next, we specify the schema for our BigQuery Table. Dataflow will create the table in BigQuery, with this schema. The outputs are just the aggregated run_rates, so our schema consists of just one field.

In [None]:
table_schema = {
    'fields': [{
        'name': 'run_rates', 'type': 'NUMERIC', 'mode': 'NULLABLE'
    }]
}

The final step in the process is to write the results (i.e., the contents of the run_rates variable we created above) to BigQuery. However, the run_rates variable currently just contains numeric data, whereas the WriteToBigQuery transform expects a dictionary with a key-value pair that matches the schema. We also want to ensure that we don't exceed the size of BigQuery's NUMERIC datatype, so we round the rate value to 5 decimal places just to be safe. The following code will perform the necessary conversion:

In [None]:
# Convert run_rates to a dictionary
def to_dict(rate):
    return {'run_rates': round(rate, 5)}

run_rates_dict = run_rates | 'ConvertToDict' >> beam.Map(to_dict)

Now the data is ready to be written to BigQuery, and the following code will create the BigQuery sink for that purpose.

In [None]:
# Write results to BigQuery
run_rates_dict | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
    table=f'{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}',
    schema=table_schema,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)

Okay, now we have defined all of the steps in our data processing pipeline, and now we will run the pipeline on Google Cloud Dataflow. After running this cell, it will display details regarding the pipeline result, and you can go to the [Dataflow Jobs Console](https://console.cloud.google.com/dataflow/jobs/) to view the additional job details and execution status.

In [None]:
runner = beam.runners.DataflowRunner()
result = runner.run_pipeline(p, options=options)

In [None]:
job_id = result.job_id()
print(f"Job ID: {job_id}")

## Stop the Dataflow Job after verifying

After verifying the data processing as described in the chapter, you can stop the Dataflow job as follows.

In [None]:
! gcloud dataflow jobs cancel $job_id --region=$REGION

You can verify the job status either in the [Dataflow Jobs Console](https://console.cloud.google.com/dataflow/jobs/) or with the following command:

In [None]:
! gcloud dataflow jobs list 

When you are finished this chapter you should repeat the same `gcloud dataflow jobs cancel` command for each running job (replace the `job_id` each time). **Otherwise, you will continue to be charged for those jobs.** 

**It's also possible to stop and delete the jobs from the [Dataflow Jobs Console](https://console.cloud.google.com/dataflow/jobs/).** 

# Clean up

When you no longer need the resources created by this notebook. You can delete them as follows.

**Note: if you do not delete the resources, you will continue to pay for them.**

## Delete BigQuery datasets

In this chapter, we created a BigQuery dataset outside this notebook, so it's best to delete it from the [BigQuery console](https://console.cloud.google.com/bigquery). In the console, click on the name of your project to see the datasets in that project. Then, click the three vertical dots next to the name of the dataset, and click `Delete`.

## Delete GCS Bucket

The bucket can be reused throughout multiple activities in the book. Sometimes, activities in certain chapters make use of artifacts from previous chapters that are stored in the GCS bucket. 

I highly recommend **not deleting the bucket** unless you will be performing no further activities in the book. For this reason, there's a separate `delete_bucket` variable to specify if you want to delete the bucket.

If you want to delete the bucket, set the `delete_bucket` parameter to `True`.

In [None]:
delete_bucket = False

In [None]:
if delete_bucket == True:
    # Delete the bucket
    ! gcloud storage rm --recursive gs://$BUCKET
else:
    print("delete_bucket parameter is set to False")