# Introduction to Python Dataflow
This notebook presents step by step the process of constructing a Dataflow pipeline that reads JSON-formatted sales records from a text file and writes per product sales results to a BigQuery table.

## Prerequisites

When playing with this notebook we assume that you have the latest Python Dataflow installed from https://github.com/GoogleCloudPlatform/DataflowPythonSDK/releases

For instance, to install the v0.2.2 release (currently the latest) use the download links associated with the release:
`pip install https://github.com/GoogleCloudPlatform/DataflowPythonSDK/archive/v0.2.2.zip`


We read data from an `input_file` and write results to an `output_file`. Later, we will read from a set of files and write to a BigQuery table.

In [1]:
input_file = './datain'
output_file = './dataout'

To keep the notebook self-contained we generate our own input data. 

In [2]:
def generate_data(outfile, size):
    import json
    import time
    with open(outfile, 'wt') as f:  
        for _ in xrange(size):
            for ix in xrange(10):
                f.write('%s\n' % json.dumps(
                        {'ProductID': 1 + ix, 'ProductName': 'Product(%s)' % ix, 
                         'Price': 1000, 'Timestamp': time.time()}))

generate_data(input_file, 100)
!head -3 $input_file    

{"Timestamp": 1460157827.874169, "Price": 1000, "ProductName": "Product(0)", "ProductID": 1}
{"Timestamp": 1460157827.87439, "Price": 1000, "ProductName": "Product(1)", "ProductID": 2}
{"Timestamp": 1460157827.874413, "Price": 1000, "ProductName": "Product(2)", "ProductID": 3}


First, a standard Dataflow import statement. Most objects are in the `df` or `df.io` namespace.

In [3]:
import google.cloud.dataflow as df

There are three steps involved in creating and running a pipeline:
* Create the `Pipeline` object
* Create the graph of data transforms
* Run the pipeline graph

The graph of transforms is sometimes called workflow and therefore running a workflow is the same thing as running a pipeline. 

__Step 1. Create a `Pipeline` object__

In [4]:
p = df.Pipeline('DirectPipelineRunner')

__Step 2. Build the graph of transforms__
The following workflow is essentially a file copy workflow. A `Read` transform will read line by line  from a text file source and then the resulting PCollection is written using a `Write` transform to a text file sink. 

In [5]:
(p 
   |df.io.Read(df.io.TextFileSource(input_file))
   | df.io.Write(df.io.TextFileSink(output_file)))

<PValue transform=<_NativeWrite(PTransform) label=[native_write]> at 0x10bdd0450>

__Step 3. Run the pipeline__

In [6]:
p.run()

<google.cloud.dataflow.runners.runner.PipelineResult at 0x1039b4c50>

As expected the output file contains the same JSON-formatted sales records.

In [7]:
!head -3 $output_file

{"Timestamp": 1460157827.874169, "Price": 1000, "ProductName": "Product(0)", "ProductID": 1}
{"Timestamp": 1460157827.87439, "Price": 1000, "ProductName": "Product(1)", "ProductID": 2}
{"Timestamp": 1460157827.874413, "Price": 1000, "ProductName": "Product(2)", "ProductID": 3}


It is possible to build a graph that is not associated with a pipeline. Later you can run it using a pipeline instance.

In [8]:
graph = (
    df.io.Read(df.io.TextFileSource(input_file))
    | df.io.Write(df.io.TextFileSink(output_file)))

This is followed by the code needed to run the graph in a pipeline and vizualize the result.

In [9]:
p = df.Pipeline('DirectPipelineRunner')
p | graph
p.run()
!head -3 $output_file

{"Timestamp": 1460157827.874169, "Price": 1000, "ProductName": "Product(0)", "ProductID": 1}
{"Timestamp": 1460157827.87439, "Price": 1000, "ProductName": "Product(1)", "ProductID": 2}
{"Timestamp": 1460157827.874413, "Price": 1000, "ProductName": "Product(2)", "ProductID": 3}


Since the code above used to run the workflow will be repeated a lot we will define a IPython cell magic to be used
instead.

__NOTE__ Not sure this magic adds enough value to keep it in. __TBD__

In [10]:
from IPython.core.magic import register_cell_magic
@register_cell_magic
def dataflow_run(line, cell):
    p = df.Pipeline('DirectPipelineRunner')
    p | eval(cell)
    p.run()
    !head -3 $output_file

Now we can simply prefix the graph construction code with `%%dataflow_run`.

In [11]:
%%dataflow_run
(df.io.Read(df.io.TextFileSource(input_file))
 | df.io.Write(df.io.TextFileSink(output_file)))

{"Timestamp": 1460157827.874169, "Price": 1000, "ProductName": "Product(0)", "ProductID": 1}
{"Timestamp": 1460157827.87439, "Price": 1000, "ProductName": "Product(1)", "ProductID": 2}
{"Timestamp": 1460157827.874413, "Price": 1000, "ProductName": "Product(2)", "ProductID": 3}


Let's parse the JSON-formatted input records. A `Read` transform using a `TextFileSource` returns a PCollection of the lines in the input file. The `parse_record` function below returns a 2-tuple (product ID, price) for each record.

In [12]:
def parse_record(e):
    import json
    r = json.loads(e)
    return r['ProductID'], r['Price']

We use the above function in a ParDo transform which allows specifying element-wise custom computations for a PCollection. There are three flavors of ParDo: Map, FlatMap, and ParDo. Map and FlatMap are simplified versions that take function objects with the first argument being the element to be processed. The Map's function is a one to one mapping and the FlatMap's function is a one to many mapping (including zero). In practice, this means that FlatMap must return an iterable containing the resulting elements. We write a ParDo example a little bit later.

In [13]:
%%dataflow_run
(df.io.Read(df.io.TextFileSource(input_file))
 | df.Map(parse_record)
 | df.io.Write(df.io.TextFileSink(output_file)))

(1, 1000)
(2, 1000)
(3, 1000)


If we want to use FlatMap instead of Map in the workflow above we will have to modify the `parse_record` function to return an iterable instead of an element since FlatMap returns zero or more elements as an iterable.

In [14]:
def parse_record(e):
    import json
    r = json.loads(e)
    yield r['ProductID'], r['Price']

In [15]:
%%dataflow_run
(df.io.Read(df.io.TextFileSource(input_file))
 | df.FlatMap(parse_record)
 | df.io.Write(df.io.TextFileSink(output_file)))

(1, 1000)
(2, 1000)
(3, 1000)


Using a `ParDo` transform requires a bit more code but is essential if the element-wise processing does more than just manipulate data. We will not cover these use cases in this notebook. We just show the basic steps of defining a `DoFn` and use it in a `ParDo` transform.

In [16]:
class ParseRecordDoFn(df.DoFn):
    def process(self, context):
        import json
        r = json.loads(context.element)
        yield r['ProductID'], r['Price']       

In [17]:
%%dataflow_run
(df.io.Read(df.io.TextFileSource(input_file))
 | df.ParDo(ParseRecordDoFn())
 | df.io.Write(df.io.TextFileSink(output_file)))

(1, 1000)
(2, 1000)
(3, 1000)


Now let's aggregate the sales for each product ID. Dataflow contains lots of combiner functions defined in the [combiners.py](https://github.com/GoogleCloudPlatform/DataflowPythonSDK/blob/master/google/cloud/dataflow/transforms/combiners.py) module. As with ParDo transforms we allow Python function to be used as combiner functions as longs as they are commutative and associative.
The grouping and combining transforms typically expect key-value pairs. In Python Dataflow these are simply represented as 2-tuples (e.g., ('a key', 'a value'), (1234, 'value'))

In [18]:
%%dataflow_run
(df.io.Read(df.io.TextFileSource(input_file))
 | df.FlatMap(parse_record)
 | df.CombinePerKey(sum)
 | df.io.Write(df.io.TextFileSink(output_file)))

(7, 100000)
(8, 100000)
(5, 100000)


The functions used in Map and FlatMap are not limited to just one parameter. They can have an arbitrary number of positional and keyword arguments evaluated when the transform gets created. For example, let's say the `parse_record` function has a `filtered` argument for products that should be skipped.

__Note__. Each argument expression is evaluated at pipeline construction time _once_. Do not write code with the expectation that the argument expression is evaluated per function call or for cloud execution per worker startup. For example if the argument expression is `random.randint` then the same random number will be used for all calls. 

In [19]:
filtered = [2, 3, 4, 5, 6, 7, 8, 9, 10]

def parse_record(e, filtered):
    import json
    r = json.loads(e)
    if int(r['ProductID']) not in filtered:
        yield r['ProductID'], r['Price']

In [20]:
%%dataflow_run
(df.io.Read(df.io.TextFileSource(input_file))
 | df.FlatMap(parse_record, filtered)
 | df.CombinePerKey(sum)
 | df.io.Write(df.io.TextFileSink(output_file)))

(1, 100000)


The extra arguments to Map/FlatMap functions can be also _deferred_. Such arguments are specified by applying a _view_ operation to a PCollection. The possible view operations are AsSingleton, AsIter, AsList, and AsDict. They are all defined in [pvalue.py](https://github.com/GoogleCloudPlatform/DataflowPythonSDK/blob/master/google/cloud/dataflow/pvalue.py)

In [21]:
from google.cloud.dataflow import pvalue

In [22]:
p = df.Pipeline('DirectPipelineRunner')
filtered_pcoll = p | df.Create(filtered)
(p 
 | df.io.Read(df.io.TextFileSource(input_file))
 | df.FlatMap(parse_record, pvalue.AsIter(filtered_pcoll))
 | df.CombinePerKey(sum)
 | df.io.Write(df.io.TextFileSink(output_file)))

p.run()
!head -3 $output_file

(1, 100000)


Processing graphs do not have to be linear. They are actually directed acyclic graphs (DAGs). Below is a workflow that reads the input file twice and then aggregates the results. The transform instances applied in a workflow must have unique labels. The framework generates automatically labels but in some cases, like the two `Read` transforms below, an additional string argument with the label must be passed in. All `PTransform` constructors will take an optional label argument (first position). 

In [23]:
p = df.Pipeline('DirectPipelineRunner')
pc1 = p | df.io.Read('Read once', df.io.TextFileSource(input_file)) 
pc2 = p | df.io.Read('Read twice', df.io.TextFileSource(input_file)) 

((pc1, pc2) 
 | df.Flatten()
 | df.FlatMap(parse_record, filtered=[])
 | df.CombinePerKey(sum)
 | df.io.Write(df.io.TextFileSink(output_file)))

p.run()
!head -3 $output_file

(7, 200000)
(8, 200000)
(5, 200000)


Let's write now the resulting aggregation in a BigQuery table. You can read more about BigQuery here: https://cloud.google.com/bigquery/what-is-bigquery

In [24]:
output_table = 'silviuc-dataflow:demo.silviuc_demo'

We need to change two things. First, we replace the sink used in the `Write` transform with a `BigQuerySink`. Second, we need to insert a Map transform before the write to convert the resulting tuple values to a the dictionary format expected by the BigQuery sink. The link to the resulting table is printed when the workflow is finished.

In [25]:
p = df.Pipeline('DirectPipelineRunner')
(p 
 | df.io.Read(df.io.TextFileSource(input_file))
 | df.FlatMap(parse_record, pvalue.AsIter(filtered_pcoll))
 | df.CombinePerKey(sum)
 | df.Map(lambda (pr, v): {'ProductID': pr, 'Value': v}) 
 | df.io.Write(df.io.BigQuerySink(
            output_table,
            schema='ProductID:INTEGER, Value:FLOAT', 
            create_disposition=df.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=df.io.BigQueryDisposition.WRITE_TRUNCATE))) 

p.run()
print 'Resulting BigQuery table:'
print 'https://bigquery.cloud.google.com/table/%s?pli=1' % output_table

Resulting BigQuery table:
https://bigquery.cloud.google.com/table/silviuc-dataflow:demo.silviuc_demo?pli=1


Now (finally!) let's run the same workflow in the Google Cloud using the Dataflow [service](https://cloud.google.com/dataflow/). We will need to do a few more preparations. First, we need to use a Google Cloud project to submit the workflow. Second, we need to have the input files staged somewhere in Google Cloud Storage. Jobs running in the Google Cloud will not be able to access files stored locally. All the data must be accessible from the cloud.

In [26]:
job_name = 'silviuc-demo'
project = 'silviuc-dataflow'
input_files = 'gs://silviuc-dataflow/demo/events*'
staging_location = 'gs://silviuc-dataflow/demo/staging'
temp_location = 'gs://silviuc-dataflow/demo/temp'

For cloud execution, we create a `Pipeline` object with the following arguments:
* `--job_name`: Arbitrary name for the job executing the workflow.
* `--project`: Name of the Google Cloud project used to run the job.
* `--staging_location`: A Google Cloud Storage location for staging files required to execute the workflow.
* `--temp_location`: A Google Cloud Storage location for temporary files created during workflow execution.
* `--no_save_main_session`: Required when running inside a IPython notebook to avoid pickling errors.
* `--runner`: The Google Cloud runner for the job. 


In [27]:
p = df.Pipeline(argv=[
        '--job_name', job_name,
        '--project', project, 
        '--staging_location', staging_location, 
        '--temp_location', temp_location,
        '--no_save_main_session',
        '--runner', 'BlockingDataflowPipelineRunner',])

The workflow code does not change at all between a local run and a cloud run. Note that the source sink uses the variable `input_files` to reflect that we use a glob pattern to specify more files as input. The glob pattern can also be used for local files.

In [28]:
(p 
 | df.io.Read(df.io.TextFileSource(input_files))
 | df.FlatMap(parse_record, pvalue.AsIter(filtered_pcoll))
 | df.CombinePerKey(sum)
 | df.Map(lambda (pr, v): {'ProductID': pr, 'Value': v}) 
 | df.io.Write(df.io.BigQuerySink(
            output_table,
            schema='ProductID:INTEGER, Value:FLOAT', 
            create_disposition=df.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=df.io.BigQueryDisposition.WRITE_TRUNCATE))) 

print 'Follow workflow execution at: %s' % (
    'https://console.cloud.google.com/dataflow?project=%s' % project)
p.run()
print 'Resulting BigQuery table:'
print 'https://bigquery.cloud.google.com/table/%s?pli=1' % output_table

Follow workflow execution at: https://console.cloud.google.com/dataflow?project=silviuc-dataflow
Resulting BigQuery table:
https://bigquery.cloud.google.com/table/silviuc-dataflow:demo.silviuc_demo?pli=1
