## DataFlow with Python

GCP data flow is based on apache-beam open source project to process data with unified data processing tool, just like Spark, we could do batch processing and streaming processing withg data, like Spark batch-processing and structured-streaming functionility.

So if we do need to use it in real project, we need to learn apache-beam definitely, so the transformation logic is really like RDD transformation.

Here is to use Python language for tutorial. 

In [1]:
# First we need to install with apache-beam
! pip install apache-beam[gcp] --quiet

[K     |████████████████████████████████| 3.5MB 4.6MB/s 
[K     |████████████████████████████████| 225kB 33.6MB/s 
[K     |████████████████████████████████| 61kB 5.6MB/s 
[K     |████████████████████████████████| 1.2MB 41.2MB/s 
[K     |████████████████████████████████| 51kB 5.3MB/s 
[K     |████████████████████████████████| 81kB 7.1MB/s 
[K     |████████████████████████████████| 63.2MB 55kB/s 
[K     |████████████████████████████████| 184kB 47.2MB/s 
[K     |████████████████████████████████| 92kB 9.4MB/s 
[K     |████████████████████████████████| 215kB 44.4MB/s 
[K     |████████████████████████████████| 153kB 48.3MB/s 
[K     |████████████████████████████████| 174kB 42.2MB/s 
[K     |████████████████████████████████| 440kB 44.3MB/s 
[K     |████████████████████████████████| 92kB 9.8MB/s 
[K     |████████████████████████████████| 122kB 49.8MB/s 
[K     |████████████████████████████████| 235kB 41.7MB/s 
[K     |████████████████████████████████| 112kB 39.7MB/s 
[?25h  B

In [2]:
# in fact, if we finished the installment, we could do the test with python command to do the test
# for apache-beam
! python -m apache_beam.examples.wordcount --output outputs

INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
Connecting anonymously.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f411302a198> for environment urn: "beam:env:embedded_python:v1"

INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running (((ref_AppliedPTransform_read/Read/_SDFBoundedSourceWrapper/Impulse_5)+(read/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction))+(read/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction))+(ref_PCollection_PCollection_1_split/Write)
INFO:apache_beam.runners.p

In [3]:
# this file will be outputed into a file started with outputs**
import os

os.listdir('.')

['.config',
 'outputs-00000-of-00001',
 'CloudTutorial-e8d69ceedc4d.json',
 'sample_data']

In [4]:
# let's get some samples for show case
file_name = [x for x in os.listdir('.') if x.startswith('outputs')][0]
with open(file_name) as f:
  data = f.readlines()

# as we could see that result is a file with each word count.
data[:10]

['KING: 243\n',
 'LEAR: 236\n',
 'DRAMATIS: 1\n',
 'PERSONAE: 1\n',
 'king: 65\n',
 'of: 447\n',
 'Britain: 2\n',
 'OF: 15\n',
 'FRANCE: 10\n',
 'DUKE: 3\n']

In [5]:
print("How many words:", len(data))

How many words: 4784


In [6]:
%%writefile wordcount.py

# this is main logic for word count with beam.
from __future__ import absolute_import

import argparse
import logging
import re

from past.builtins import unicode

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


class WordExtractingDoFn(beam.DoFn):
  """Parse each line of input text into words."""
  def __init__(self):
    # TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
    # super(WordExtractingDoFn, self).__init__()
    beam.DoFn.__init__(self)
    self.words_counter = Metrics.counter(self.__class__, 'words')
    self.word_lengths_counter = Metrics.counter(self.__class__, 'word_lengths')
    self.word_lengths_dist = Metrics.distribution(
        self.__class__, 'word_len_dist')
    self.empty_line_counter = Metrics.counter(self.__class__, 'empty_lines')

  def process(self, element):
    """Returns an iterator over the words of this element.

    The element is a line of text.  If the line is blank, note that, too.

    Args:
      element: the element being processed

    Returns:
      The processed element.
    """
    text_line = element.strip()
    if not text_line:
      self.empty_line_counter.inc(1)
    words = re.findall(r'[\w\']+', text_line, re.UNICODE)
    for w in words:
      self.words_counter.inc()
      self.word_lengths_counter.inc(len(w))
      self.word_lengths_dist.update(len(w))
    return words


def run(argv=None, save_main_session=True):
  """Main entry point; defines and runs the wordcount pipeline."""
  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--input',
      dest='input',
      default='gs://dataflow-samples/shakespeare/kinglear.txt',
      help='Input file to process.')
  parser.add_argument(
      '--output',
      dest='output',
      required=True,
      help='Output file to write results to.')
  known_args, pipeline_args = parser.parse_known_args(argv)

  # We use the save_main_session option because one or more DoFn's in this
  # workflow rely on global context (e.g., a module imported at module level).
  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
  p = beam.Pipeline(options=pipeline_options)

  # Read the text file[pattern] into a PCollection.
  lines = p | 'read' >> ReadFromText(known_args.input)

  # Count the occurrences of each word.
  def count_ones(word_ones):
    (word, ones) = word_ones
    return (word, sum(ones))

  # Here is where to could define our processing logic.
  counts = (
      lines
      | 'split' >>
      (beam.ParDo(WordExtractingDoFn()).with_output_types(unicode))
      | 'lowercase' >> beam.Map(lambda x: str(x).lower())
      | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
      | 'group' >> beam.GroupByKey()
      | 'count' >> beam.Map(count_ones))

  # Format the counts into a PCollection of strings.
  def format_result(word_count):
    (word, count) = word_count
    return '%s: %d' % (word, count)

  output = counts | 'format' >> beam.Map(format_result)

  # Write the output using a "Write" transform that has side effects.
  # pylint: disable=expression-not-assigned
  output | 'write' >> WriteToText(known_args.output)

  result = p.run()
  result.wait_until_finish()

  # Do not query metrics when creating a template which doesn't run
  if (not hasattr(result, 'has_job')  # direct runner
      or result.has_job):  # not just a template creation
    empty_lines_filter = MetricsFilter().with_name('empty_lines')
    query_result = result.metrics().query(empty_lines_filter)
    if query_result['counters']:
      empty_lines_counter = query_result['counters'][0]
      logging.info('number of empty lines: %d', empty_lines_counter.result)

    word_lengths_filter = MetricsFilter().with_name('word_len_dist')
    query_result = result.metrics().query(word_lengths_filter)
    if query_result['distributions']:
      word_lengths_dist = query_result['distributions'][0]
      logging.info('average word length: %d', word_lengths_dist.result.mean)

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

Writing wordcount.py


In [7]:
# so that we could start our training file. Here I change each words into small case. 
# we could try again the logic locally.
! python wordcount.py --output  outputs_new


INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
Connecting anonymously.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f300f365198> for environment urn: "beam:env:embedded_python:v1"

INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running (((ref_AppliedPTransform_read/Read/_SDFBoundedSourceWrapper/Impulse_5)+(read/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction))+(read/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction))+(ref_PCollection_PCollection_1_split/Write)
INFO:apache_beam.runners.p

In [8]:
# let's get some samples for show case
file_name = [x for x in os.listdir('.') if x.startswith('outputs_new')][0]
with open(file_name) as f:
  data_new = f.readlines()

# as we could see that result is a file with each word count.
# we do find that for each word is lower case.
data_new[:10]

['king: 311\n',
 'lear: 253\n',
 'dramatis: 1\n',
 'personae: 1\n',
 'of: 483\n',
 'britain: 2\n',
 'france: 32\n',
 'duke: 26\n',
 'burgundy: 20\n',
 'cornwall: 75\n']

## Upload local logic with GCP dataflow

As we have already tested with logic that we created, then we could do the logic in dataflow, but I have to say that for cloud run with so smaller file, do take more time than local, but if we have much data will be fine. 

As for dataflow is serverless in cloud, we could use how much resource as we need.

In [0]:
# configuration of the project, but we also need to create a bucket first.
PROJECT_ID = "cloudtutorial-279003"
BUCKET = "dataflow_tutorial_bucket"
REGION = 'us-central1'

In [10]:
# after we have already created the bucket, but dataflow will use data in bucket, so
# let's make a file in the bucket.
text = """
Prepare input for prediction
To receive valid and useful predictions, you must preprocess input for prediction in the same way that training data was preprocessed. In a production system, you may want to create a preprocessing pipeline that can be used identically at training time and prediction time.

For this exercise, use the training package's data-loading code to select a random sample from the evaluation data. This data is in the form that was used to evaluate accuracy after each epoch of training, so it can be used to send test predictions without further preprocessing.

Open the Python interpreter (python) from your current working directory in order to run the next several snippets of code:
"""

file_name = 'sample.txt'

with open(file_name, 'w') as f:
  f.write(text)

print("Current folder file list:", os.listdir('.'))


Current folder file list: ['.config', 'outputs-00000-of-00001', 'wordcount.py', 'outputs_new-00000-of-00001', 'sample.txt', 'CloudTutorial-e8d69ceedc4d.json', 'sample_data']


In [0]:
# before we do anything with client, we have to set the credentials
credentials_file = [x for x in os.listdir('.') if x.endswith('json')][0]

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials_file

In [23]:
# then let's upload the file into our bucket
from google.cloud import storage

client = storage.Client(project=PROJECT_ID)

bucket = client.bucket(BUCKET)

# where to store our file
des_file = 'data_flow_inputs/{}'.format(file_name)

# let's upload
blob = bucket.blob(des_file)
try:
  blob.upload_from_filename(file_name, des_file)
  print('We have already uploaded in the file into bucket')
except Exception as e:
  raise Exception("When upload file with error. ")

We have already uploaded in the file into bucket


In [24]:
# as we already have the data file in bucket, so that we could start our 
# processing logic in cloud
! python -m apache_beam.examples.wordcount \
--region $REGION \
--input gs://$BUCKET/data_flow_inputs/sample.txt \
--output gs://$BUCKET/data_flow_output/outputs \
--runner DataflowRunner \
--project $PROJECT_ID \
--temp_location gs://$BUCKET/tmp

INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:root:Using Python SDK docker image: apache/beam_python3.6_sdk:2.21.0. If the image is not available at local, we will try to pull from hub.docker.com
INFO:apache_beam.runners.dataflow.internal.apiclient:Defaulting to the temp_location as staging_location: gs://dataflow_tutorial_bucket/tmp
INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://dataflow_tutorial_bucket/tmp/beamapp-root-0602002917-401233.1591057757.402644/pipeline.pb...
INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://dataflow_tutorial_bucket/tmp/beamapp-root-0602002917-401233.1591057757.402644/pipeline.pb in 0 seconds.
INFO:apache_beam.runners.portability.stager:Downloading source

In [27]:
# after we have already finished the job, so that we could check that we do have output
# the result into our bucket
output_folder = 'data_flow_output'

def list_bucket_files(prefix):
  blobs = client.list_blobs(BUCKET, prefix=prefix)

  print('Get file list:')
  for blob in blobs:
    print(blob.name, end='\t')
  print()

# so that we could find we do get files in bucket.
list_bucket_files(output_folder)

Get file list:
data_flow_output/outputs-00000-of-00003	data_flow_output/outputs-00001-of-00003	data_flow_output/outputs-00002-of-00003	


In [0]:
# if we want to use command line to do some query, we have to auth the lab first
from google.colab import auth
auth.authenticate_user()

## Check result

As we have already finished the whole step, let's check our reuslt

In [35]:
# let get some contents of these files.
! gsutil cat gs://$BUCKET/data_flow_output/outputs*

preprocessed: 1
without: 1
predictions: 2
test: 1
may: 1
form: 1
order: 1
each: 1
data: 4
This: 1
so: 1
system: 1
used: 3
further: 1
way: 1
pipeline: 1
receive: 1
For: 1
python: 1
exercise: 1
of: 2
the: 6
To: 1
run: 1
next: 1
working: 1
epoch: 1
is: 1
same: 1
was: 2
be: 2
training: 4
time: 2
select: 1
input: 2
that: 3
use: 1
directory: 1
after: 1
preprocessing: 2
at: 1
to: 5
package's: 1
snippets: 1
can: 2
In: 1
loading: 1
it: 1
identically: 1
this: 1
evaluation: 1
your: 1
want: 1
accuracy: 1
code: 2
interpreter: 1
from: 2
production: 1
prediction: 3
valid: 1
Open: 1
current: 1
several: 1
in: 3
sample: 1
Prepare: 1
preprocess: 1
create: 1
and: 2
must: 1
evaluate: 1
send: 1
a: 3
random: 1
useful: 1
for: 2
you: 2
Python: 1
