##### Copyright 2020 Google Inc.

Licensed under the Apache License, Version 2.0 (the "License").
<!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at

      http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
-->


# Apache Beam basics

In this first notebook you can explore basic operations within Apache Beam. After going through them, work through the exercises to test your knowledge.

The following concepts are used throughout the notebooks:
- **Element**: minimal unit of data.
- **`PCollection`**: represents a distribute data set; it can be *bounded* or *unbounded*. Made of element(s).
    - *Bounded* `PCollection` is data that has a fixed size. For example, text files, BigQuery tables, Avro files, and so on.
    - *Unbounded* `PCollections` are potentially of infinite size, coming from a data stream. Examples of this are Pub/Sub topic/subscription and Kafka.

Before running into code, examine the basic structure for creating a pipeline:

- At the beginning, to define your pipeline, use `p = beam.Pipeline()`.
- The pipe `|` separates steps within the pipeline. Every time you want to add a new step, you need a new pipe.
- At the right of the pipe, add the step you want to execute, ` | <STEP> `. You can optionally name the step using `>>` between the step and the pipe ` | "NAME" >> <STEP>`. Two steps cannot have the same name.
- At the left of the pipe, there has to be a reference to a pipeline `p | <STEP>`, `p | <STEP1> | <STEP2>...` or `squares | <STEP>` (where squares is a pipeline variable ).
____________________

First, let's import the operations you need for this notebook.


In [None]:
import logging

import apache_beam as beam
from apache_beam import Create, Map, ParDo, Flatten, Partition
from apache_beam import pvalue

To minimize the amount of logs for your pipeline, specify that only warnings are logged. For a greater level of details, change `WARNING` to `INFO`.

In [None]:
logging.getLogger().setLevel(logging.WARNING)

These tutorials use the `InteractiveRunner` in most pipelines, so that you can see their graphs and output. Apache Beam can use other runners such as `DirectRunner`, `DataflowRunner`, or `FlinkRunner`.

In [None]:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

The package `sympy` is not part of Apache Beam, but it will be used in some examples. The following cell installs `sympy`.

In [None]:
%pip install sympy

In [None]:
import sympy

## Basic Operations

**`Create`** is used to create elements.

**`Map`** does an operation at the element level. Applies a simple one-to-one mapping function over each element in the collection.

During most of the following examples we are going to use `Create` as the source in these examples, since it's more intuitive than using CSVs or other sources.

The following pipeline returns the squares of the N first non-negative integers. For this first pipeline, we are going to use the default runner (`DirectRunner`):

In [None]:
p = beam.Pipeline()
N = 7
squares = (p | "Create Elements" >> Create(range(N))
             | "Squares" >> Map(lambda x: x**2)
             | Map(print))

At this point, the pipeline hasn't been executed; you need to use `p.run()` .

In [None]:
p.run()

If you use the `with` statement as a context manager, you don't need to specify `p.run()`.

In [None]:
with beam.Pipeline() as p:
    squares = (p | "Create Elements" >> Create(range(N))
                 | "Squares" >> Map(lambda x: x**2)
                 | Map(print))

Changing the runner to `InteractiveRunner` is quite easy. `InteractiveRunner` has built-in functionalities like showing the pipeline graph or seeing the output without the need of `print`. You also don't need to use `p.run()`, since it's contained in `ib.show()`

In [None]:
p = beam.Pipeline(InteractiveRunner())
squares = (p | "Create Elements" >> Create(range(N))
             | "Squares" >> Map(lambda x: x**2))

Let's see the Pipeline Graph

In [None]:
ib.show_graph(p)

And now, let's see the results

In [None]:
ib.show(squares)

From now on, the examples use the `ib.show` from the`InteractiveRunner`.
_____________________

**`ParDo`** is a more general operation than `Map` and the lowest level element-wise operation. It applies a given function to an element and outputs zero or more elements.

In [None]:
p = beam.Pipeline(InteractiveRunner())

N = 8

def divisors(element):
    divisor_list = sympy.divisors(element)
    return [(element, x) for x in divisor_list]  # has to be an iterable

divisors = (p | "Create" >> Create(range(N))
              | "ParDo Divisors" >> ParDo(divisors))

ib.show_graph(p)
ib.show(divisors)

Note that from N elements, the output is more than N elements (one-to-many). This operation could not be done with Map, since it's one-to-one.

### Branching Operations

**`Flatten`** combines two or more `PCollections` into one. It takes elements for all input `PCollections` and outputs them as one `PCollection`. It's the equivalent of `UNION` in SQL.

In [None]:
p = beam.Pipeline(InteractiveRunner())

elements_1 = [
    {"country": "China", "population": 1389},
    {"country": "India", "population": 1311},
    {"country": "USA", "population": 331},
    {"country": "Ireland", "population": 5}
]

elements_2 = [
    {"country": "Indonesia", "population": 273},
    {"country": "Brazil", "population": 212},
    {"country": "Egypt", "population": 102},
    {"country": "Spain", "population": 47},
    {"country": "Ghana", "population": 31},
    {"country": "Australia", "population": 25},
]

create_1 = p | "Create 1" >> Create(elements_1)
create_2 = p | "Create 2" >> Create(elements_2)

# Left side of | has to be a tuple
flattened = (create_1, create_2) | Flatten()

ib.show_graph(p)
ib.show(flattened)

**Branching** The same way we can join two or more `PCollections` into one, we can use the same `PCollections` as input for one or more `PTransforms`/Sinks. It is as simple as referencing a previous section of a pipeline.

In [None]:
p = beam.Pipeline(InteractiveRunner())

elements = [
    {"country": "China", "population": 1389},
    {"country": "India", "population": 1311},
    {"country": "USA", "population": 331},
    {"country": "Ireland", "population": 5}
]

create = p | "Create" >> Create(elements)

country = create | "country" >> Map(lambda x: x["country"])
population = create | "population" >> Map(lambda x: x["population"])

ib.show_graph(p)  

Since we have two outputs, we need to use `ib.show` with both of them:

In [None]:
ib.show(country, population)

*SUGGESTION*: In the previous example, try to modify it to use `ParDo` instead of `Map`.

There are other more clever ways to branch. Sometimes, you want to split the `PCollection` into two or more `PCollections`:

**`Partition`** sends elements to different `PTransforms` following a given function. It applies a function element-wise which outputs the index of the pipeline which the element should go to.

In [None]:
p = beam.Pipeline(InteractiveRunner())

even, odd = (p | "Create Numbers" >> Create(range(10))
               | "Odd or Even" >> Partition(lambda n, partitions: n % 2, 2))
# lambda x,y: which partition fn, number partitions
# even is when the fn outputs 0, odd when it outputs 1

ib.show(even, odd)

_________________________________
This option still does not cover all possibilities. What if you wanted a particular element to fall into two or more different categories?

We can use `ParDo` and the `TaggedOutput` option to achieve this. This is the most flexible way to branch `PCollections`.

In [None]:
p = beam.Pipeline(InteractiveRunner())

class DifferentOutputsFn(beam.DoFn):
    def process(self, element, x, y):
        if element % x == 0:
            yield pvalue.TaggedOutput("x", element)

        if element % y == 0:
            yield pvalue.TaggedOutput("y", element)

        yield element


diff_outputs = (p | "Create" >> Create(range(8))
                  | "Split Outputs" >> ParDo(DifferentOutputsFn(), x=2, y=3).with_outputs("x", "y"))

multiple_x = diff_outputs.x
multiple_y = diff_outputs.y
all_outputs = diff_outputs[None] 

ib.show_graph(p)
ib.show(multiple_x, multiple_y, all_outputs)

Let's go through the code. 

We are sending the elements to three subsections:

- Containing the multiples of X and/or Y: 
    * To send elements to those sections, use `yield` to that tagged output: `yield pvalue.TaggedOutput("x", element)`
    * To retrieve elements from that pipeline, simply refer to the original subpipeline `diff_outputs` with the tagged key *x* or *y*: `diff_outputs.x`


- All elements: 
    * In this case, there is no need to specify the tagged output, so simply use `yield`
    * To get this output, reference the elements without a tag by using `diff_outputs[None]`
    
Also, let's check how the `ParDo` is being used:

- `ParDo(DifferentOutputsFn(), x=2, y=3).with_outputs("x", "y"))`
    * Use `with_outputs` to reference the name of the tagged outputs
    * Note how the parameters `x` and `y` are passed to the `DifferentOutputsFn` Class



Let's look at another example. In this case, the pipeline only process words with more than five letters and discard other words.

In [None]:
p = beam.Pipeline(InteractiveRunner())

class LengthStringFn(beam.DoFn):
    def process(self, element, max_len):
        if len(element) <= max_len:
            yield pvalue.TaggedOutput("smaller", element)
        else:
            yield element

elements = ["Beam", "Pipeline", "PCollection", "Map", "Notebook"]

string_length = (p | "Create" >> Create(elements)
                   | "Split Outputs" >> ParDo(LengthStringFn(), max_len=5).with_outputs("smaller"))

smaller = string_length.smaller | "Discarded" >> Map(lambda e: logging.warning(f"Discarded: {e}"))

bigger = string_length[None] | "Filtered" >> Map(lambda e: e.lower())

ib.show_graph(p)
ib.show(bigger)

## Exercise

Create elements (integers) coming from two sources, and categorize these elements as perfect squares or non-perfect squares. The output needs to be a tuple containing the original element and its square root. For example, for input `25`, the output needs to be `(25, 5.0)`.

There are hints below and the solution at the end.

Since we are going to test if the pipeline is right, be sure to name the final pipelines `perfect_squares` and `not_perfect_squares`. 

In [None]:
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import matches_all, equal_to
from utils.solutions import solutions

import math

In [None]:
p = beam.Pipeline(InteractiveRunner())

def is_perfect_square(e):
    boolean = e[1].is_integer()
    return int(boolean)

elements_1 = [1, 49]
elements_2 = [3, 1024, 1729]

# TODO: Finish the pipeline 
create_1 = p | "Create1" >> Create()

ib.show_graph(p)
ib.show(not_perfect_squares, perfect_squares)

# For testing the solution - Don't modify
# assert_that(pipeline, matcher, label(optional))
assert_that(not_perfect_squares, equal_to(solutions[1]["not_perfect_squares"]), label="not_perfect_squares")
assert_that(perfect_square, equal_to(solutions[1]["perfect_squares"]), label="perfect_squares")

### Hints

**Create elements**
<details><summary>Hint</summary>
<p>
    
You need to use `Create` twice, since we are using two sources. Don't forget the name of the steps has to be different.
</p>
</details>


<details><summary>Code</summary>
<p>

```
   create_1 = p | "Create_1" >> Create(elements_1)
   create_2 = p | "Create_2" >> Create(elements_2) 
```

</p>
</details>

**Process created elements**
<details><summary>Hint</summary>
<p>
    
Since the pipeline reads from two sources and performs the same operations to them, you need to join them using a `Flatten`. To know if a number is a perfect square, you need to calculate its square root, you need a `Map`. Since the output requires a tuple, `Map` should generate it.
</p>
</details>


<details><summary>Code</summary>
<p>

```
flattened = ((create_1, create_2) | Flatten()
                                  | "Square root" >> Map(lambda x: (x, math.sqrt(x))))
```

</p>
</details>

**Split elements given according to a rule**
<details><summary>Hint</summary>
<p>

You need to send elements to different steps given a rule (perfect square or not), you can do so using `Partition` or the general `ParDo withOutputTags`. Potentially, this could also be done with `Filter` (next notebook), but you need to process every element twice, so it's not recommended.
</p>
</details>


<details><summary>Code</summary>
<p>


```    
not_perfect_squares, perfect_squares = flattened | "Partition" >> Partition(lambda x, partition: int(x[1].is_integer()), 2)
```

</p>
</details>

**Full code**
<details><summary>Code</summary>
<p>

```
p = beam.Pipeline(InteractiveRunner())

elements_1 = [1, 49]
elements_2 = [3, 1024, 1729]

# TODO: Finish the pipeline 
create_1 = p | "Create1" >> Create(elements_1)
create_2 = p | "Create2" >> Create(elements_2) 

flattened = ((create_1, create_2) | Flatten()
                                  | "Square root" >> Map(lambda x: (x, math.sqrt(x))))

not_perfect_squares, perfect_squares = flattened | "Partition" >> Partition(lambda x, partition: int(x[1].is_integer()), 2)

ib.show_graph(p)
ib.show(not_perfect_squares, perfect_squares)

# For testing the solution - Don't modify
# assert_that(pipeline, matcher, label(optional))
assert_that(not_perfect_squares, equal_to(solutions[1]["not_perfect_squares"]), label="not_perfect_squares")
assert_that(perfect_squares, equal_to(solutions[1]["perfect_squares"]), label="perfect_squares")
```
    

</p>
</details>
