##### Copyright 2020 Google Inc.

Licensed under the Apache License, Version 2.0 (the "License").
<!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at

      http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
-->


# How to run the examples on Google Cloud Dataflow

There is a procedure on how to convert a pipeline assembled in the notebook environment to a pipeline suitable to be run in Dataflow in the guide <TODO: URL>. This notebook illustrates how the pipeline in the [First Word Count example](01-Word_Count.ipynb) can be run with the Dataflow Runner, instead of the Interactive Runner.

Before you start, please ensure the Dataflow API is enabled [here](https://console.cloud.google.com/apis/library/dataflow.googleapis.com).

### Assembling the pipeline for Dataflow Runner

First, we remove all "interactive" imports and import the Dataflow runner:

In [None]:
import re
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.runners import DataflowRunner
import google.auth

#### Setting up pipeline options

We now set up the pipeline options for running in Dataflow. For details on the pipeline options for Dataflow, please visit the [Cloud Dataflow documentation](https://cloud.google.com/dataflow/docs/guides/specifying-exec-params#setting-other-cloud-dataflow-pipeline-options).

In [None]:
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Sets the project to the default project in your current Google Cloud environment.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Sets the Google Cloud Region in which Cloud Dataflow will run.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Tells Dataflow that we are running the job from a notebook environment.
options.view_as(GoogleCloudOptions).labels = (
    ['goog-dataflow-notebook=' + beam.version.__version__.replace('.', '_')])

# Because this notebook comes with a locally built version of the Beam Python SDK, we will need to set
# the sdk_location option for the Dataflow Runner. You will not need to do this if you are using an
# officially released version of Apache Beam.
options.view_as(pipeline_options.SetupOptions).sdk_location = (
    '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' % 
    beam.version.__version__)


*IMPORTANT*: Please adjust the following code to choose a Google Cloud Storage (GCS) location for Dataflow files.

In [None]:
# IMPORTANT! Please adjust the following to choose a GCS location.
dataflow_gcs_location = 'gs://<my_bucket>/dataflow'

The staging location is for storing files that will be copied to Dataflow workers, including code that will be executed by Dataflow workers. The temporary location is for storing temporary files generated by the Dataflow job.

In [None]:
# Dataflow Staging Location.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Dataflow Temp Location.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location

#### Adding PTransforms and PCollections to the pipeline

The following class definition is the same as the one in the [original Word Count](01-Word_Count.ipynb) example.

In [None]:
class ReadWordsFromText(beam.PTransform):
    
    def __init__(self, file_pattern):
        self._file_pattern = file_pattern
    
    def expand(self, pcoll):
        return (pcoll.pipeline
                | beam.io.ReadFromText(self._file_pattern)
                | beam.FlatMap(lambda line: re.findall(r'[\w\']+', line.strip(), re.UNICODE)))

The following creates an Apache Beam pipeline with the *Dataflow Runner*, instead of the Interactive Runner in the original example, with the options we just set up.

In [None]:
p = beam.Pipeline(DataflowRunner(), options=options)

The following assembles the pipeline. It is the same as the original example, but with all the interactive calls removed:

In [None]:
words = p | 'read' >> ReadWordsFromText('gs://apache-beam-samples/shakespeare/kinglear.txt')
counts = words | 'count' >> beam.combiners.Count.PerElement()
lower_counts = (words
                | "lower" >> beam.Map(lambda word: word.lower())
                | "lower_count" >> beam.combiners.Count.PerElement())

#### Writing the results

Now, we want to write the results, contained in the PCollections `counts` and `lower_counts`, to GCS files:

In [None]:
# The directory to store the output files of the job.
output_gcs_location = '%s/output' % dataflow_gcs_location

# Specifying the GCS location to write `counts` to,
# based on the `output_gcs_location` variable set earlier.
(counts | 'Write counts to GCS' 
 >> beam.io.WriteToText(output_gcs_location + '/wordcount-output.txt'))

# Specifying the GCS location to write `lower_counts` to,
# based on the `output_gcs_location` variable set earlier.
(lower_counts | 'Write lower counts to GCS' 
 >> beam.io.WriteToText(output_gcs_location + '/wordcount-lower-output.txt'))

### Running the pipeline

Now we are ready to run the pipeline on Dataflow. `p.run()` will run the pipeline and return a pipeline result object. You can ignore the warnings this gives. 

In [None]:
pipeline_result = p.run()

Using the `pipeline_result` handle, the following code builds a link to the Google Cloud Console web page that shows you details of the Dataflow job you just started:

In [None]:
from IPython.core.display import display, HTML
url = ('https://console.cloud.google.com/dataflow/jobs/%s/%s?project=%s' % 
      (pipeline_result._job.location, pipeline_result._job.id, pipeline_result._job.projectId))
display(HTML('Click <a href="%s" target="_new">here</a> for the details of your Dataflow job!' % url))

Let's wait for the job to finish. The following call will block until the job is finished. It will take a few minutes.

In [None]:
pipeline_result.wait_until_finish()

### Checking the results

Now that the job is finished, we can check the results in GCS using the [`gsutil`](https://cloud.google.com/storage/docs/gsutil) command-line tool. Note that `beam.io.WriteToText` writes the results in a sharded set of output files. For example, if the output is specified as `gs://my_bucket/output_directory/result.txt`, the results will be written in files with names like `gs://my_bucket/output_directory/result.txt-<shard>-of-<number-of-shards>`. Let's check that.

In [None]:
!gsutil ls {output_gcs_location}

Now let's check the content of the files by looking at the first 10 lines of the files. 

In [None]:
!gsutil cat {output_gcs_location}/wordcount-output.txt* | head -10

In [None]:
!gsutil cat {output_gcs_location}/wordcount-lower-output.txt* | head -10

That's it! Using this technique, you can also try launching Dataflow jobs for other examples listed.