- General Links
- What are Apache Beam and Google Dataflow? (the very basics)
- Apache Beam
- Dataflow
Note: References in [brackets] indicate PGB-specific examples of the indicated files/functions/etc. Most of these can be found at code path broker/beam/value_added/value_added.py or broker/beam/README.md.
- Apache Beam
- `Apache Beam Overview/Tutorial <https://beam.apache.org/documentation/programming-guide/>`__ Extremely useful!
- Apache Beam SDK (2.25.0) for Python
- Dataflow
- Deploying a pipeline
- Apache Beam
`DataflowRunner
<https://beam.apache.org/documentation/runners/dataflow/>`__
We use the Apache Beam Python SDK (v2.25 docs) to run data processing and storage pipelines. The Apache Beam Overview/Tutorial is very good.
Apache Beam pipelines can be run in a variety of environments; environment-specific "runners" handle the pipeline execution. We use the DataflowRunner to execute our pipelines in the Google Cloud using their Dataflow service (see also Deploying a pipeline). You can also use DirectRunner to execute the pipeline on a local machine (useful for testing).
Install the Beam Python SDK (including the Dataflow runner) with:
pip install apache-beam[gcp]
Apache Beam runs data processing pipelines. Batch and streaming jobs are
both supported under a single programming model. Which type of job is
run depends only on the initial data source input to the pipeline (and
we must set the pipeline's streaming
option), the rest of the
programming logic and syntax is agnostic.
Here we outline the essential concepts needed to program with Beam. Much of it is quoted directly from `Apache Beam Overview/Tutorial <https://beam.apache.org/documentation/programming-guide/>`__.
Programming guide: Create a pipeline
- The
Pipeline
abstraction encapsulates all the data and steps in the data processing task. - The "driver program" [
value_added/value_added.py
] creates aPipeline
object [pipeline
, withinrun()
] and uses it as the basis for creating the pipeline’s data (PCollection
s) and its operations (PTransform
s). - The general form of a step in the pipeline is (here, brackets
indicate general Beam objects, not PGB-specific objects):
[Output PCollection] = [Input PCollection] | [Transform]
Programming guide: PCollections
- A
PCollection
represents a distributed data set that the Beam pipeline operates on. PCollection
s are the inputs and outputs for each step in the pipeline.- The exception is the first step of the pipeline, where we must
pass the
Pipeline
object itself to aRead
transform to create the initial collection [PSin
].
- The exception is the first step of the pipeline, where we must
pass the
- The elements of a
PCollection
may be of any type, but must all be of the same type. Our current pipeline uses dictionaries for the alert data and most of its child/downstream collections. - A
PCollection
is immutable. A Beam Transform might process each element of a PCollection and generate new pipeline data (as a newPCollection
), but it does not consume or modify the original input collection.
- A
PTransform
represents a data processing operation, or a step, in the pipeline. I/O (read/write) operations are special cases ofPTransform
s. - Every
PTransform
takes one or morePCollection
objects as input, performs some processing with/on the elements of thatPCollection
, and produces zero or more outputPCollection
objects. - To invoke a transform, we apply it (using the pipe operator
|
) to the inputPCollection
. This takes the general form (here, brackets indicate general Beam objects, not PGB-specific objects):[Output PCollection] = [Input PCollection] | [Transform]
- We can decorate the transform (using the
>>
operator) with a name that will show up in Dataflow:"My Transform Name" >> [Transform]
- We can also chain transforms together without explicitly naming
the output collections.
[Final Output PCollection] = ([Initial Input PCollection] | [1st Transform] | [2nd Transform])
- How we apply the pipeline’s transforms to its various collections
determines the structure of our pipeline.
- The best way to think of the pipeline is as a directed acyclic
graph, where
PTransform
nodes are subroutines that acceptPCollection
nodes as inputs and emitPCollection
nodes as outputs. - Dataflow provides a "job graph" visualization of the pipeline. See
here
for the job graph in production [defined in
ztf-beam.py
] at the time of this writing (the interface is described in the Dataflow section below).
- The best way to think of the pipeline is as a directed acyclic
graph, where
- Built-in transforms (complete lists):
Two important transforms in our pipeline are:
Given a predicate, filter out all elements that don't satisfy the predicate.
- We write a function ([
is_extragalactic_transient
]) which operates on a single element of the input collection and returnsTrue
if the element meets our condition(s) andFalse
otherwise. - We apply our function as a filter on the pipeline by passing it to
the
Filter
transform:- [
ExgalTrans = alertDicts | apache_beam.Filter(is_extragalactic_transient)
]
- [
See also:
Generic parallel processing.
- We write a function which:
- performs some data processing (e.g., fit the data using Salt2) on a single element of the input collection, and
- returns a list containing zero or more elements, each of which will become an element of the output collection.
- We name that function
process
and wrap it in an arbitrarily-named class [fitSalt2
] (subclass ofDoFn
). - We apply our function to each element of the step's input
PCollection
by passing the class to theParDo
transform:- [
salt2Dicts = ExgalTrans | apache_beam.ParDo(fitSalt2())
].
- [
Programming guide: Additional outputs
See also:
ParDo
(or the DoFn
passed to it) can produce more than one
output PCollection. The main output should be returned as normal(*),
additional outputs should be tagged using
apache_beam.pvalue.TaggedOutput('tag',element)
See the examples in
the links above, and [the FitSalt2
(DoFn
) class in
beam_helpers/salt2_utils.py].
(*) We typically use return
statements in our DoFn
s, but we
also have the option of using yield
statements (making the DoFn
a generator). However, to return multiple outputs we must use
yield
statements.
Programming guide: Composite transforms
See also:
- Creating composite transforms
- Example in: ptransform_fn
To make the pipeline structure more clear and modular, we can group
multiple transforms into a single composite transform. We do this by
creating a subclass of the PTransform
class and overriding the
expand
method to specify the actual processing logic. We can then
use this transform just as we would a built-in transform from the Beam
SDK. See the links above and [the Salt2
composite transform at code path
broker/beam/value_added/value_added.py].
Dataflow is a Google service that runs Apache Beam pipelines on the Google Cloud Platform (GCP). Deploying a pipeline is a good place to start.
Dataflow handles the provisioning and management of all GCP resources (e.g., Compute Engine virtual machines or "workers"), and autoscales resources based on the (streaming) pipeline's current backlog and the workers' CPU usage over the last couple of minutes.
We tell the Beam pipeline to run on Dataflow by setting it as the
"runner". The runner, and its configuration options, are set when
creating the Beam Pipeline
object. We pass them in as command line
arguments when starting the job.
[see the file at code path broker/beam/README.md].
--runner=DataflowRunner
runs the job in the Google Cloud via Dataflow- See Pipeline options for the Cloud Dataflow Runner for a complete list of Dataflow runner configuration options.
Dataflow also provides us with a nice monitoring interface [see here for the job in production at the time of this writing]. There we can see:
- A graphical representation of pipelines.
- Details about the job's status and execution.
- Errors, warnings, and additional diagnostics. Links to the complete logs.
- Monitoring charts with job-level and step-level metrics.
GCP docs: Error and exception handling
Quoted directly from the link, with emphasis added:
"Your pipeline may throw exceptions while processing data. Some of these errors are transient (e.g., temporary difficulty accessing an external service), but some are permanent, such as errors caused by corrupt or unparseable input data, or null pointers during computation.
Dataflow processes elements in arbitrary bundles, and retries the complete bundle when an error is thrown for any element in that bundle. When running in batch mode, bundles including a failing item are retried 4 times. The pipeline will fail completely when a single bundle has failed 4 times. When running in streaming mode, a bundle including a failing item will be retried indefinitely, which may cause your pipeline to permanently stall."