<a href="https://colab.research.google.com/github/rahiakela/building-machine-learning-pipelines/blob/main/02-introduction-to-tensorflow-extended/basic_data_pipeline_using_apache_beam.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Basic Data Pipeline using Apache Beam

A variety of TFX components and libraries (e.g., TensorFlow Transform) rely on
Apache Beam to process pipeline data efficiently. 

Apache Beam can be used to describe batch processes, streaming operations, and data pipelines. In fact, TFX relies on Apache Beam and uses it under
the hood for a variety of components (e.g., TensorFlow Transform or TensorFlow
Data Validation).

While Apache Beam abstracts away the data processing logic from its supporting
runtime tools, it can be executed on multiple distributed processing runtime environments.
This means that you can run the same data pipeline on Apache Spark or Google
Cloud Dataflow without a single change in the pipeline description. Also, Apache
Beam was not just developed to describe batch processes but to support streaming
operations seamlessly.

Apache Beam’s abstraction is based on two concepts: collections and transformations.

On the one hand, Beam’s collections describe operations where data is being
read or written from or to a given file or stream.

On the other hand, Beam’s transformations describe ways to manipulate the data.

When we define our collections or transformations in our following
example, no data is actually being loaded or transformed. This only happens
when the pipeline is executed in the context of a runtime environment (e.g., Apache Beam’s DirectRunner, Apache Spark, Apache Flink, or Google Cloud Dataflow).

Data pipelines usually start and end with data being read or written, which is handled in Apache Beam through collections, often called PCollections. The collections are then transformed, and the final result can be expressed as a collection again and written to a filesystem.

The following example shows how to read a text file and return all lines:

```python
import apache_beam as beam

with beam.Pipeline() as p:
  lines = p | beam.io.ReadFromText(input_file)
```

Similar to the ReadFromText operation, Apache Beam provides functions to write collections to a text file (e.g., WriteToText). The write operation is usually performed after all transformations have been executed:

```python
with beam.Pipeline() as p:
  ...
  output | beam.io.WriteToText(output_file)
```


In Apache Beam, data is manipulated through transformations.The transformations can be chained by using the pipe operator |. If you chain multiple transformations of the same type, you have to provide a name for the operation, noted by the string identifier between the pipe operator and the right-angle brackets.

In the following example, we apply all transformations sequentially on our lines extracted from the text file:

```python
counts = (
  lines
  | 'Split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
  | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
  | 'GroupAndSum' >> beam.CombinePerKey(sum))
```

Let’s walk through this code in detail. As an example, we’ll take the phrases “Hello, how do you do?” and “I am well, thank you.”

The Split transformation uses re.findall to split each line into a list of tokens, giving the result:

```
["Hello", "how", "do", "you", "do"]
["I", "am", "well", "thank", "you"]
```

beam.FlatMap maps the result into a PCollection:

```
"Hello" "how" "do" "you" "do" "I" "am" "well" "thank" "you"
```

Next, the PairWithOne transformation uses beam.Map to create a tuple out of every token and the count (1 for each result):

```
("Hello", 1) ("how", 1) ("do", 1) ("you", 1) ("do", 1) ("I", 1) ("am", 1)
("well", 1) ("thank", 1) ("you", 1)
```

Finally, the GroupAndSum transformation sums up all individual tuples for each token:

```
("Hello", 1) ("how", 1) ("do", 2) ("you", 2) ("I", 1) ("am", 1) ("well", 1)
("thank", 1)
```

Apache Beam provides a variety of predefined transformations. However, if your preferred operation isn’t available, you can write your own transformations by using the Map operators.

## Putting it all together

In [None]:
!pip install apache-beam[gcp]

In [4]:
%%writefile basic_pipeline.py
import re
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

input_file = "gs://dataflow-samples/shakespeare/kinglear.txt"
output_file = "output.txt"

# Define pipeline options object.
pipeline_options = PipelineOptions()

# Set up the Apache Beam pipeline.
with beam.Pipeline(options=pipeline_options) as p:
  # Read the text file or file pattern into a PCollection.
  lines = p | ReadFromText(input_file)

  # Perform the transformations on the collection: Count the occurrences of each word.
  counts = (
      lines | "Split" >> beam.FlatMap(lambda x: re.findall(r"[A-Za-z\']+", x))
            | "PairWithOne" >> beam.Map(lambda x: (x, 1))
            | "GroupAndSum" >> beam.CombinePerKey(sum)
  )

  # Format the counts into a PCollection of strings.
  def format_result(word_count):
    (word, count) = word_count
    return "{}: {}".format(word, count)

  output = counts | "Format" >> beam.Map(format_result)

  # Write the output using a "Write" transform that has side effects.
  output | WriteToText(output_file)

Writing basic_pipeline.py


## Executing Your Basic Pipeline

As an example, you can run the pipeline with Apache Beam’s DirectRunner by executing the following command (assuming that the previous example code was saved as `basic_pipeline.py`).

In [None]:
!python basic_pipeline.py

The results of the transformations can be found in the designated text file:

In [6]:
!head output.txt*

KING: 243
LEAR: 236
DRAMATIS: 1
PERSONAE: 1
king: 65
of: 447
Britain: 2
OF: 15
FRANCE: 10
DUKE: 3
