# Apache Beam

**Apache Beam** is a unified programming model for defining both **batch** jobs and **streaming** data-parallel processing pipelines.

<img src='imgs/beam_pipeline.svg' alt='beam_pipeline.svg' width=800 height=600>

* A **pipeline** is a user-constructed graph of transformations that defines the desired data processing operations.


* A **PCollection** is a data set or data stream. The data that a pipeline processes is part of a PCollection.


* A **PTransform** (or **transform**) represents a data processing operation, or a step, in our pipeline. A transform is applied to zero or more PCollection objects, and produces zero or more PCollection objects.


* A **runner** runs a Beam pipeline using the capabilities of our chosen data processing engine. It is also known as an **execution engine**.


* **SDK** is a language-specific library that lets pipeline authors build transforms, construct their pipelines, and submit them to a runner. Apache Beam follows the philosopy **"Write once, run everywhere"** and has SDK in multiple languages, i.e Python, Java and Go.

## Pipeline

A Beam pipeline is a graph (specifically, a **directed acyclic graph**) of all the data and computations in our data processing task. This includes reading input data, transforming that data, and writing output data. 

A pipeline is constructed by a user in their **SDK** of choice. Then, the pipeline makes its way to the **runner** either through the SDK directly or through the Runner API’s RPC interface. For example, this diagram shows a branching pipeline:

<img src='imgs/pipeline.svg' alt='pipeline.svg' width=800 height=600>

## PCollection

A PCollection generally contains **big data** : it is an unordered bag of elements that might be **bounded**, **stored**, **datasets**, or **unbounded streams** of data. PCollections flows between the transforms. In Beam, most transforms apply equally to bounded and unbounded data.

A Beam driver program typically starts by creating a Pipeline object, and then uses that object as the basis for creating the pipeline’s data sets and its transforms. Each PCollection is owned by the specific Pipeline object for which it is created and multiple pipelines cannot share a PCollection. 

Beam pipelines process PCollections, and the runner is responsible for storing these elements. Beam’s computational patterns and transforms are focused on situations where distributed data-parallel computation is required. Therefore, the elements of a PCollection cannot be processed individually, and are instead processed uniformly in parallel.


### bounded PCollection
A **bounded PCollection** is a dataset of a known, fixed size (alternatively, a dataset that is not growing over time). Bounded data can be processed by **batch pipelines**.

### unbounded PCollection
An **unbounded PCollection** is a dataset that grows over time, and the elements are processed as they arrive. Unbounded data must be processed by **streaming pipelines**.

Both bounded and unbounded PCollections can coexist in the same pipeline. If our runner can only support bounded PCollections, we must reject pipelines that contain unbounded PCollections. If our runner is only targeting streams, there are adapters in Beam’s support code to convert everything to APIs that target unbounded data.

<img src='imgs/batch-stream.png' alt='batch-stream.png' width=600 height=600>

## PTransform

A **PTransform** (or **transform**) represents a data processing operation, or a step, in our pipeline. A transform is usually applied to one or more input PCollection objects. Transforms that read input are an exception; these transforms might not have an input PCollection.

We provide transform processing **logic** in the form of a function object (referred as **user code**), and our user code is applied to each element of the input PCollection (or more than one PCollection). Depending on the pipeline runner and backend that we choose, many different workers across a cluster might execute instances of our user code in parallel. The user code that runs on each worker generates the output elements that are added to zero or more output PCollection objects.

The Beam SDKs contain a number of different transforms that we can apply to our pipeline’s PCollections. We can also define our own more complex composite transforms to fit our pipeline’s exact use case. The following list has some common transform types:

* Source transforms such as **TextIO.Read** and **Create**. A source transform conceptually has no input.
* Processing and conversion operations such as **ParDo**, **GroupByKey**, **CoGroupByKey**, **Combine**, and **Count**.
* Outputting transforms such as **TextIO.Write**.
* **User-defined**, application-specific composite transforms.

## Runner

A **Beam runner** runs a Beam pipeline on a specific platform. Most runners are translators or adapters to massively parallel big data processing systems, such as **Apache Flink**, **Apache Spark**, **Google Cloud Dataflo**w, and more. For example, the Flink runner translates a Beam pipeline into a Flink job. 

The **Direct Runner** runs pipelines locally so we can test, debug, and validate that our pipeline adheres to the Apache Beam model as closely as possible.


## Aggregation

**Aggregation** is computing a value from multiple (1 or more) input elements. In Beam, the primary computational pattern for aggregation is to group all elements with a common key and window, then combine each group of elements using an associative and commutative operation. This is similar to the **Reduce** operation in the MapReduce model, though it is enhanced to work with unbounded input streams as well as bounded data sets.

For example during aggregation, elements with the same color represent those with a common key and window.

<img src='imgs/aggregation.png' alt='batch-stream.png' width=100 height=200>


Some simple aggregation transforms include **Count** (computes the count of all elements in the aggregation), **Max** (computes the maximum element in the aggregation), and **Sum** (computes the sum of all elements in the aggregation).

When elements are grouped and emitted as a bag, the aggregation is known as **GroupByKey** (the associative/commutative operation is bag union). In this case, the output is no smaller than the input. Often, we will apply an operation such as summation, called a **CombineFn**, in which the output is significantly smaller than the input. In this case the aggregation is called **CombinePerKey**.


In a real application, we might have millions of keys and/or windows; that is why this is still an “embarassingly parallel” computational pattern. In those cases where we have fewer keys, we can add parallelism by adding a supplementary key, splitting each of our problem’s natural keys into many sub-keys. After these sub-keys are aggregated, the results can be further combined into a result for the original natural key for our problem. The associativity of our aggregation function ensures that this yields the same answer, but with more parallelism.

When our input is unbounded, the computational pattern of grouping elements by key and window is roughly the same, but governing when and how to emit the results of aggregation involves three concepts:

* **Windowing**, which partitions our input into bounded subsets that can be complete.


* **Watermarks**, which estimate the completeness of our input.


* **Triggers**, which govern when and how to emit aggregated results.

## User-defined function (UDF)

Some Beam operations allow us to run user-defined code as a way to configure the transform. For example, when using **ParDo**, user-defined code specifies what operation to apply to every element. For Combine, it specifies how values should be combined. By using cross-language transforms, a Beam pipeline can contain **UDFs** written in a different language, or even multiple languages in the same pipeline.

Beam has several varieties of UDFs:

* **DoFn** - per-element processing function (used in **ParDo**)


* **WindowFn** - places elements in windows and merges windows (used in Window and GroupByKey)


* **ViewFn** - adapts a materialized PCollection to a particular interface (used in side inputs)


* **WindowMappingFn** - maps one element’s window to another, and specifies bounds on how far in the past the result window will be (used in side inputs)


* **CombineFn** - associative and commutative aggregation (used in Combine and state)


* **Coder** - encodes user data; some coders have standard formats and are not really UDFs


Each language SDK has its own idiomatic way of expressing the user-defined functions in Beam, but there are common requirements. When we build user code for a Beam transform, we should keep in mind the distributed nature of execution. For example, there might be many copies of our function running on a lot of different machines in parallel, and those copies function independently, without communicating or sharing state with any of the other copies. Each copy of our user code function might be retried or run multiple times, depending on the pipeline runner and the processing backend that we choose for our pipeline. 

Beam also supports stateful processing through the stateful processing API.

## Schema

A **schema** is a language-independent type definition for a PCollection. The schema for a PCollection defines elements of that PCollection as an ordered list of named fields. Each field has a name, a type, and possibly a set of user options.

The element type in a PCollection has a structure that can be introspected. For example, formats like **JSON**, **Protocol Buffer**, **Avro**, and **database row objects** can be converted to Beam Schemas. By understanding the structure of a pipeline’s records, we can define APIs for data processing.

Beam provides a collection of transforms that operate natively on schemas. For example, **Beam SQL** is a common transform that operates on schemas. These transforms allow selections and aggregations in terms of named **schema fields**. Another advantage of schemas is that they allow referencing of element fields by name. Beam provides a selection syntax for referencing fields, including nested and repeated fields.

## Timestamps

Every element in a PCollection has a **timestamp** associated with it. When we execute a primitive connector to a storage system, that connector is responsible for providing initial timestamps. 

The runner must propagate and aggregate timestamps. If the timestamp is not important, such as with certain batch processing jobs where elements do not denote events, the timestamp will be the minimum representable timestamp, often referred to colloquially as **negative infinity**.

## Window

A PCollection can be subdivided into **windows** based on the **timestamps** of the individual elements. Windows enable grouping operations over unbounded collections that grow over time by dividing the collection into windows of finite collections.

A windowing function tells the runner how to assign elements to one or more initial windows, and how to merge windows of grouped elements. Each element in a PCollection can only be in one window, so if a windowing function specifies multiple windows for an element, the element is conceptually duplicated into each of the windows and each element is identical except for its window.

Transforms that aggregate multiple elements, such as **GroupByKey** and **Combine**, work implicitly on a per-window basis; they process each PCollection as a succession of multiple, finite windows, though the entire collection itself may be of unbounded size.

Beam provides several windowing functions:

* **Fixed time windows** (also known as **tumbling windows**) represent a consistent duration, non-overlapping time interval in the data stream.


* **Sliding time windows** (also known as **hopping windows**) also represent time intervals in the data stream; however, sliding time windows can overlap.


* **Per-session windows** define windows that contain elements that are within a certain gap duration of another element.


* **Single global window**: by default, all data in a PCollection is assigned to the single global window, and late data is discarded.


* **Calendar-based windows** (not supported by the Beam SDK for Python)


* **Customized windows** are user-defined windows, based on complex requirements.

For example, let’s say we have a PCollection that uses **fixed-time windowing**, with windows that are five minutes long. For each window, Beam must collect all the data with an event time timestamp in the given window range (between 0:00 and 4:59 in the first window, for instance). Data with timestamps outside that range (data from 5:00 or later) belongs to a different window.


## Watermark

A **watermark** is a guess as to when all data in a certain window is expected to have arrived. This is needed because data isn’t always guaranteed to arrive in a pipeline in time order, or to always arrive at predictable intervals.

In any data processing system, there is a certain amount of lag between the time a data event occurs (the **event time**, determined by the timestamp on the data element itself) and the time the actual data element gets processed at any stage in your pipeline (the **processing time**, determined by the clock on the system processing the element). In addition, data isn’t always guaranteed to arrive in a pipeline in time order, or to always arrive at predictable intervals. For example, we might have intermediate systems that don’t preserve order, or we might have two servers that timestamp data but one has a better network connection.

**Data sources are responsible for producing a watermark**, and **every PCollection must have a watermark** that estimates how complete the PCollection is. The contents of a PCollection are complete when a watermark advances to **infinity**. In this manner, we might discover that an unbounded PCollection is finite. After the watermark progresses past the end of a window, any further element that arrives with a timestamp in that window is considered **late data**.

## Trigger

**Triggers** are a related concept that allow us to modify and refine the **windowing strategy** for a PCollection. We can use triggers to decide when each individual window aggregates and reports its results, including how the window emits late elements.

When collecting and grouping data into windows, Beam uses triggers to determine when to emit the aggregated results of each **window** (referred to as a **pane**). If we use Beam’s default windowing configuration and default trigger, Beam outputs the aggregated result when it estimates all data has arrived, and discards all subsequent data for that window.

Triggers also provide two additional capabilities that allow us to control the flow of our data and also balance between **data completeness**, **latency**, and **cost**:

* Triggers allow Beam to **emit early results**, before all the data in a given window has arrived. For example, emitting after a certain amount of time elapses, or after a certain number of elements arrives.


* Triggers allow **processing of late data** by triggering after the event time watermark passes the end of the window.

Beam provides a number of pre-built triggers that we can set:

* **Event time triggers**: These triggers operate on the event time, as indicated by the timestamp on each data element. Beam’s default trigger is event time-based.


* **Processing time triggers**: These triggers operate on the processing time, which is the time when the data element is processed at any given stage in the pipeline.


* **Data-driven triggers**: These triggers operate by examining the data as it arrives in each window, and firing when that data meets a certain property. Currently, data-driven triggers only support firing after a certain number of data elements.


* **Composite triggers**: These triggers combine multiple triggers in various ways. For example, we might want one trigger for early data and a different trigger for late data.