# Common Transforms
## Summary of common transforms

*Filter*
- Filter all elements in a PCollection with `beam.Filter(<predicate-fn>)`
    - `<predicate-fn>` can be pre-defined or lambda form
    - Passing extra args to `beam.Filter()` passes those args into the second arg onwards of `<predicate-fn>`
        - Using this info, you can pass a *singleton* or a dictionary into the second arg, and use that to filter your PCollection
- Filter a range for all items smaller than or equal to `<num>` with `beam.combiners.Top.Smallest(<num>)`
- Filter a range for all items larger than `<num>` with `beam.combiners.Top.Largest(5)`

*Aggregations - Count*
- `beam.combiners.Count.Globally()` counts all elements in a PCollection
- `beam.combiners.Count.PerKey()` counts elements for each unique key in a PCollection of key-values
- `beam.combiners.Count.PerElement()` counts only the unique elements in a PCollection

*Aggregations - Sum*
- `beam.CombineGlobally(sum)` finds the global sum of a PCollection
- `beam.CombinePerKey(sum)` ums per key in the PCollection's key-value pairs

*Aggregations - Moments*
- Mean: `beam.combiners.Mean.Globally()`; `beam.combiners.Mean.PerKey()`
- Min: `beam.CombineGlobally(lambda elements: min(elements or [-1]))`; `beam.CombinePerKey(min)`
- Max: `beam.CombineGlobally(lambda elements: max(elements or [None]))`; `beam.CombinePerKey(max)`

*Generating key-value pairs*
- Generate key-value pairs from a value-only PCollection using `beam.WithKeys(lambda x: x[0])`

## Filter

`PCollection`s can be filtered using the `Filter` transform. Create a filter by supplying a predicate, and `Filter` will remove all elements of the `PCollection` that don't satisfy the predicate.

e.g.:

In [4]:
import apache_beam as beam

def is_perennial(plant):
  return plant['duration'] == 'perennial'

with beam.Pipeline() as p:
    perennials = (
        p 
        | 'Gardening plants' >> beam.Create([
            {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
            {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
            {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
            {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
            {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
        ])
        # | 'Filter perennials' >> beam.Filter(is_perennial)
        # Alternatively:
        | 'Filter perennials' >> beam.Filter(lambda plant: plant['duration'] == 'perennial')
        | beam.Map(print)
    )

{'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'}
{'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'}
{'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'}


### Filtering with multiple arguments
You can also pass multiple arguments to `Filter`, they are passed as additional positional arguments or keyword arguments to the function.

In [5]:
def has_duration(plant, duration):
    return plant['duration'] == duration

with beam.Pipeline() as p:
    perennials = (
        p
        | 'Gardening plants' >> beam.Create([
            {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
            {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
            {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
            {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
            {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
        ])
        | 'Filter perennials' >> beam.Filter(has_duration, 'perennial') # First arg is filled in with the piped in PCollection
        | beam.Map(print)
    )

{'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'}
{'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'}
{'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'}


### Filtering with side inputs as singletons

A *singleton* is a `PCollection` with only one element. We can use these to filter our input `PCollection`.

In [6]:
with beam.Pipeline() as p:

    perennial = p | 'Perennial' >> beam.Create(['perennial'])

    perennials = (
        p 
        | 'Gardening plants' >> beam.Create([
            {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
            {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
            {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
            {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
            {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
        ])
        | 'Filter perennials' >> beam.Filter(
            lambda plant, duration: plant['duration'] == duration,
            duration = beam.pvalue.AsSingleton(perennial),
        )
        | beam.Map(print)
    )

{'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'}
{'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'}
{'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'}


### Filtering with side inputs as iterators

If a `PCollection` has multiple values, you can pass that in as an iterator. This will access the elements lazily as needed, so can iterate over `PCollection`s that don't fit in memory.

In [8]:
with beam.Pipeline() as p:

    valid_durations = p | 'Valid durations' >> beam.Create([
        'annual',
        'biennial',
        'perennial',
    ])

    perennials = (
        p 
        | 'Gardening plants' >> beam.Create([
            {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
            {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
            {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
            {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
            {'icon': '🥔', 'name': 'Potato', 'duration': 'PERENNIAL'}, # note this is in all caps now
        ])
        | 'Filter perennials' >> beam.Filter(
            lambda plant, valid_durations: plant['duration'] in valid_durations,
            valid_durations = beam.pvalue.AsIter(valid_durations),
        )
        | beam.Map(print)
    )

{'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'}
{'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'}
{'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'}
{'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'}


### Filtering with side inputs as dictionaries

If a `PCollection` is small enough to fit in memory, it can be passed as a dictionary into `Filter`. Each element must be a `(key, value)` pair. If the elements won't fit in memory, use `AsIter()` instead.

In [9]:
with beam.Pipeline() as p:

    keep_duration = p | 'Duration filters' >> beam.Create([
        ('annual', False),
        ('biennial', False),
        ('perennial', True),
    ])

    perennials = (
        p 
        | 'Gardening plants' >> beam.Create([
            {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
            {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
            {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
            {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
            {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'}, # note this is in all caps now
        ])
        | 'Filter perennials' >> beam.Filter(
            lambda plant, keep_duration: keep_duration[plant['duration']],
            keep_duration = beam.pvalue.AsDict(keep_duration),
        )
        | beam.Map(print)
    )

{'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'}
{'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'}
{'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'}


### Chaining `Filter` statements

We can also chain `Filter` statements.

In [10]:
# Output PCollection
class Output(beam.PTransform):
    class _OutputFn(beam.DoFn):
        def __init__(self, prefix=''):
            super().__init__()
            self.prefix = prefix

        def process(self, element):
            print(self.prefix+str(element))

    def __init__(self, label=None,prefix=''):
        super().__init__(label)
        self.prefix = prefix

    def expand(self, input):
        input | beam.ParDo(self._OutputFn(self.prefix))

with beam.Pipeline() as p:

    string = "To be, or not to be: that is the question: Whether 'tis nobler in the mind to suffer The slings and arrows of outrageous fortune, Or to take arms against a sea of troubles, And by opposing end them. To die: to sleep"
  
    (
        p  
        | 'Convert to words' >> 
            beam.Create(string.split(' '))
        | 'Filter for words starting with a' >>
            beam.Filter(lambda word: word[0] == 'a')
        | 'Filter for words that have more than three chars' >>
            beam.Filter(lambda word: len(word) > 3)          
        | 'Output' >>
            Output(prefix='PCollection filtered value: ')                    
    )

PCollection filtered value: arrows
PCollection filtered value: arms
PCollection filtered value: against


## Aggregations
### Count

`Count` provides transformations for counting elements; either globally within the `PCollection`, or by each key.

#### Count all elements

Use `Count.Globally()` to count all elements in a `PCollection`, even duplicated elements.

In [11]:
with beam.Pipeline() as p:
    total_elements = (
        p 
        | 'Create plants' >> beam.Create(['🍓', '🥕', '🥕', '🥕', '🍆', '🍆', '🍅', '🍅', '🍅', '🌽'])
        | 'Count all elements' >> beam.combiners.Count.Globally()
        | beam.Map(print)
    )

10


#### Counting per key

Use `Count.PerKey()` to count the elements for each unique key in a `PCollection` of key-values.

In [12]:
with beam.Pipeline() as p:
    total_elements_per_keys = (
        p 
        | 'Create plants' >> beam.Create([
            ('spring', '🍓'),
            ('spring', '🥕'),
            ('summer', '🥕'),
            ('fall', '🥕'),
            ('spring', '🍆'),
            ('winter', '🍆'),
            ('spring', '🍅'),
            ('summer', '🍅'),
            ('fall', '🍅'),
            ('summer', '🌽'),
        ])
        | 'Count elements per key' >> beam.combiners.Count.PerKey()
        | beam.Map(print)
    )

('spring', 4)
('summer', 3)
('fall', 2)
('winter', 1)


In [16]:
# Counts per key and ignores value
with beam.Pipeline() as p:
    (
        p 
        | beam.Create([(1, 36), (2, 91), (3, 33), (3, 11), (4, 67),])
        | beam.combiners.Count.PerKey() 
        | beam.Map(print)
    )

# Counts elements in PCollection, which is 5 tuples
with beam.Pipeline() as p:
    (
        p 
        | beam.Create([(1, 36), (2, 91), (3, 33), (3, 11), (4, 67),])
        | beam.combiners.Count.Globally() 
        | beam.Map(print)
    )

(1, 1)
(2, 1)
(3, 2)
(4, 1)
5


#### Counting unique elements

Use `Count.PerElement()` to count only the unique elements in a `PCollection`.

In [13]:
with beam.Pipeline() as p:
    total_unique_elements = (
        p 
        | 'Create produce' >> beam.Create(['🍓', '🥕', '🥕', '🥕', '🍆', '🍆', '🍅', '🍅', '🍅', '🌽'])
        | 'Count unique elements' >> beam.combiners.Count.PerElement()
        | beam.Map(print)
    )

('🍓', 1)
('🥕', 3)
('🍆', 2)
('🍅', 3)
('🌽', 1)


#### Counting strings

In [19]:
class SplitWords(beam.DoFn):
    def __init__(self, delimiter=' '):
        self.delimiter = delimiter

    def process(self, text):
        for word in text.split(self.delimiter):
            yield word

with beam.Pipeline() as p:
    (
        p 
        | beam.Create(["To be, or not to be: that is the question: Whether 'tis nobler in the mind to suffer, the slings and arrows of outrageous fortune, or to take arms against a sea of troubles, and by opposing end them. To die: to sleep"])
        | 'Split sentence into words' >> 
            beam.ParDo(SplitWords()) 
        | 'Count number of times each word appears' >> 
            beam.combiners.Count.PerElement()
        | 'Filter for those that appear > 1 times' >> 
            beam.Filter(lambda x: x[1] > 1)
        | Output(prefix='PCollection filtered value: ')
    )

PCollection filtered value: ('To', 2)
PCollection filtered value: ('or', 2)
PCollection filtered value: ('to', 4)
PCollection filtered value: ('the', 3)
PCollection filtered value: ('and', 2)
PCollection filtered value: ('of', 2)


### Sum
#### Sum of all elements in `PCollection`

Find the global sum of a `PCollection` by running `CombineGlobally(sum)`.

In [20]:
with beam.Pipeline() as p:
  total = (
    p 
    | 'Create numbers' >> beam.Create([3, 4, 1, 2])
    | 'Sum values' >> beam.CombineGlobally(sum)
    | beam.Map(print)
  )

10


In [22]:
with beam.Pipeline() as p:
  total = (
    p 
    | 'Create numbers' >> beam.Create([(3, 4), (1, 2)])
    # Need to flatten if elements in PCollection not already flat
    # otherwise run into error
    | 'Flatten' >> beam.FlatMap(lambda x: x)
    | 'Sum values' >> beam.CombineGlobally(sum)
    | beam.Map(print)
  )

10


#### Sum of elements by each key

Use `Combine.PerKey()` to get the sum of all values grouped by each unique key in the `PCollection`'s key-values.

In [23]:
with beam.Pipeline() as p:
    totals_per_key = (
        p 
        | 'Create produce' >> beam.Create([
            ('🥕', 3),
            ('🥕', 2),
            ('🍆', 1),
            ('🍅', 4),
            ('🍅', 5),
            ('🍅', 3),
        ])
        | 'Sum values per key' >> beam.CombinePerKey(sum)
        | beam.Map(print)
    )

('🥕', 5)
('🍆', 1)
('🍅', 12)


### Mean

Can compute the arithmetic mean of elements in a collection or the mean of the values associated with each key in a collection of key-value pairs.

In [24]:
with beam.Pipeline() as p:
  (
    p 
    | beam.Create(range(1, 11))
    | beam.combiners.Mean.Globally()
    | Output(prefix='PCollection mean value: ')
  )

PCollection mean value: 5.5


In [26]:
with beam.Pipeline() as p:
  (
    p 
    | beam.Create([(1, 23), (2, 234), (1, 339), (5, 32)])
    | beam.combiners.Mean.PerKey()
    | Output(prefix='PCollection mean value: ')
  )

PCollection mean value: (1, 181.0)
PCollection mean value: (2, 234.0)
PCollection mean value: (5, 32.0)


### Min

`Min` finds the minimum values globally or per key.

In [27]:
with beam.Pipeline() as p:
    min_element = (
        p 
        | 'Create numbers' >> beam.Create([3, 4, 1, 2])
        | 'Get min value' >> beam.CombineGlobally(lambda elements: min(elements or [-1]))
        | beam.Map(print)
    )

1


In [28]:
with beam.Pipeline() as p:
    elements_with_min_value_per_key = (
        p 
        | 'Create produce' >> beam.Create([
            ('🥕', 3),
            ('🥕', 2),
            ('🍆', 1),
            ('🍅', 4),
            ('🍅', 5),
            ('🍅', 3),])
        | 'Get min value per key' >> beam.CombinePerKey(min)
        | beam.Map(print)
    )

('🥕', 2)
('🍆', 1)
('🍅', 3)


There are also functions that find all elements that are smaller than/larger than or equal to the argument. Basically a `Filter` wrapper.

In [30]:
with beam.Pipeline() as p:
    (
        p | beam.Create(range(1, 11))
        # beam.combiners.Top.Smallest(5) to return the small number than 5 from `PCollection`.
        | beam.combiners.Top.Smallest(5)
        | Output(prefix='PCollection smaller than 5 values: ')
    )

PCollection smaller than 5 values: [1, 2, 3, 4, 5]


### Max

In [31]:
with beam.Pipeline() as p:
    max_element = (
        p 
        | 'Create numbers' >> beam.Create([3, 4, 1, 2])
        | 'Get max value' >> beam.CombineGlobally(lambda elements: max(elements or [None]))
        | beam.Map(print)
    )

4


In [32]:

with beam.Pipeline() as p:
    (
        p 
        | beam.Create(range(1, 11))
        # beam.combiners.Top.Largest(5) to return the larger than [5] from `PCollection`.
        # Note that this doesn't seem to include the arg in the range
        | beam.combiners.Top.Largest(5)
        | Output(prefix='PCollection maximum value: ')
    )

PCollection maximum value: [10, 9, 8, 7, 6]


## WithKeys

We can generate key-value pairs from a `PCollection` of individual elements (i.e. `PCollection<V>` $\rightarrow$ `PCollection<KV<K, V>>`) using `beam.WithKeys()`.

In [33]:
with beam.Pipeline() as p:
    (
        p 
        | beam.Create(['apple', 'banana', 'cherry', 'durian', 'guava', 'melon'])
        | beam.WithKeys(lambda word: word[0:1])
        | Output()
    )

('a', 'apple')
('b', 'banana')
('c', 'cherry')
('d', 'durian')
('g', 'guava')
('m', 'melon')


## Challenge

The challenge was to write a pipeline that will sum taxi costs by whether they are < $15. Solution is as follows:

```python
# Output PCollection
class Output(beam.PTransform):
    class _OutputFn(beam.DoFn):
        def __init__(self, prefix=''):
            super().__init__()
            self.prefix = prefix

        def process(self, element):
            print(self.prefix+str(element))

    def __init__(self, label=None,prefix=''):
        super().__init__(label)
        self.prefix = prefix

    def expand(self, input):
        input | beam.ParDo(self._OutputFn(self.prefix))

class ExtractTaxiRideCostFn(beam.DoFn):

    def process(self, element):
        line = element.split(',')
        return tryParseTaxiRideCost(line,16)

def tryParseTaxiRideCost(line,index):
    if(len(line) > index):
      try:
        yield float(line[index])
      except:
        yield float(0)
    else:
        yield float(0)

with beam.Pipeline() as p:
    input = (
        p
        | 'Log lines' >> 
            beam.io.ReadFromText('gs://apache-beam-samples/nyc_taxi/misc/sample1000.csv')
        | 'Read CSV - Cost column' >>
            beam.ParDo(ExtractTaxiRideCostFn())
    )

    grouped = (
        input
        | 'Group by whether < 15' >> 
            beam.WithKeys(lambda cost: "< 15" if cost < 15 else ">= 15")
    )                  

    summed = (
        grouped
        | 'Aggregate by key' >> 
            beam.CombinePerKey(sum)
    )    

  summed | Output()

#> ('< 15', 5678.690000000039)
#> ('>= 15', 10184.639999999994)
```