In [1]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners import DirectRunner

## Pipeline

In [76]:
with beam.Pipeline() as pipeline:
    pass

## PCollections

In [81]:
## from file
with beam.Pipeline() as pipeline:
    pipeline | 'ReadMyFile' >> beam.io.ReadFromText(
    'gs://some/inputData.txt')
    
## from in-memory data
with beam.Pipeline() as pipeline:
    pipeline \
    | 'CreateFromMemory' >> beam.Create([
        'To be, or not to be: that is the question: ',
        "Whether 'tis nobler in the mind to suffer ",
        'The slings and arrows of outrageous fortune, ',
        'Or to take arms against a sea of troubles, ',
      ]) \
    | 'output' >> beam.Map(print)

To be, or not to be: that is the question: 
Whether 'tis nobler in the mind to suffer 
The slings and arrows of outrageous fortune, 
Or to take arms against a sea of troubles, 


## Transforms

### ParDo

In [89]:
# NOTE: ParDo takes a function which returns 
# 0/more/iterable(e.g. instead return, use yield) elements

## ParDo with DoFn
print('ParDo with DoFn')
class ComputeWordLengthFn(beam.DoFn):
    # implement `process` function
    def process(self, element):
        return [len(element)]

with beam.Pipeline() as pipeline:
    pipeline \
    | 'CreateFromMemory' >> beam.Create([
        'To be, or not to be: that is the question: ',
        "Whether 'tis nobler in the mind to suffer ",
        'The slings and arrows of outrageous fortune, ',
        'Or to take arms against a sea of troubles, ',
      ]) \
    | 'ApplyParDo' >> beam.ParDo(ComputeWordLengthFn()) \
    | 'output' >> beam.Map(print)

## ParDo with lambda
print('ParDo with lambda')
with beam.Pipeline() as pipeline:
    pipeline \
    | 'CreateFromMemory' >> beam.Create([
        'To be, or not to be: that is the question: ',
        "Whether 'tis nobler in the mind to suffer ",
        'The slings and arrows of outrageous fortune, ',
        'Or to take arms against a sea of troubles, ',
      ]) \
    | 'ApplyParDo' >> beam.ParDo(lambda e: [len(e)]) \
    | 'output' >> beam.Map(print)
    
## replace ParDo with Map
## if only output ONE element
print('replace ParDo with Map')
with beam.Pipeline() as pipeline:
    pipeline \
    | 'CreateFromMemory' >> beam.Create([
        'To be, or not to be: that is the question: ',
        "Whether 'tis nobler in the mind to suffer ",
        'The slings and arrows of outrageous fortune, ',
        'Or to take arms against a sea of troubles, ',
      ]) \
    | 'ApplyMap' >> beam.Map(len) \
    | 'output' >> beam.Map(print)

ParDo with DoFn
43
42
45
43
ParDo with lambda
43
42
45
43
replace ParDo with Map
43
42
45
43


### GroupByKey

In [93]:
with beam.Pipeline() as pipeline:
    pipeline \
    | 'CreateFromMemory' >> beam.Create([
        ('a', 1),
        ('b', 2),
        ('c', 10),
        ('b', 15),
        ('a', 9)
      ]) \
    | 'GroupByKey' >> beam.GroupByKey() \
    | 'output' >> beam.Map(print)

('a', [1, 9])
('b', [2, 15])
('c', [10])


### CoGroupByKey

In [96]:
emails_list = [
    ('amy', 'amy@example.com'),
    ('carl', 'carl@example.com'),
    ('julia', 'julia@example.com'),
    ('carl', 'carl@email.com'),
]
phones_list = [
    ('amy', '111-222-3333'),
    ('james', '222-333-4444'),
    ('amy', '333-444-5555'),
    ('carl', '444-555-6666'),
]

with beam.Pipeline() as pipeline:
    emails = pipeline | 'CreateEmails' >> beam.Create(emails_list)
    phones = pipeline | 'CreatePhones' >> beam.Create(phones_list)
    emails_phones = {'emails': emails, 'phones': phones}
    results = (emails_phones | beam.CoGroupByKey())
    results | 'output' >> beam.Map(print)

('amy', {'emails': ['amy@example.com'], 'phones': ['111-222-3333', '333-444-5555']})
('james', {'emails': [], 'phones': ['222-333-4444']})
('carl', {'emails': ['carl@example.com', 'carl@email.com'], 'phones': ['444-555-6666']})
('julia', {'emails': ['julia@example.com'], 'phones': []})


### Combine

In [None]:
## CombineGlobally takes all elements
with beam.Pipeline() as pipeline:
    pipeline \
    | 'CreateFromMemory' >> beam.Create([1,2,3,4,5,6,7,8,9]) \
    | 'CombineGlobally' >> beam.CombineGlobally(sum) \
    | 'output' >> beam.Map(print)

#### CombineFn

In [105]:
class AverageFn(beam.CombineFn):
    def create_accumulator(self):
        return (0.0, 0)

    def add_input(self, sum_count, input):
        (sum, count) = sum_count
        return sum + input, count + 1

    def merge_accumulators(self, accumulators):
        sums, counts = zip(*accumulators)
        return sum(sums), sum(counts)

    def extract_output(self, sum_count):
        (sum, count) = sum_count
        return sum / count if count else float('NaN')

## NOTE: To have Combine instead return an empty PCollection if the input is empty,
## specify .withoutDefaults when you apply your Combine transform
with beam.Pipeline() as pipeline:
    pipeline \
    | 'CreateFromMemory' >> beam.Create([1,2,3,4,5,6,7,8,9]) \
    | 'CustomizedCombineFn' >> beam.CombineGlobally(AverageFn()).without_defaults() \
    | 'output' >> beam.Map(print)

5.0


#### CombinePerKey

In [None]:
with beam.Pipeline() as pipeline:
    pipeline \
    | 'CreateFromMemory' >> beam.Create([1,2,3,4,5,6,7,8,9]) \
    | 'CombinePerKey' >> beam.CombinePerKey(beam.combiners.MeanCombineFn()) \
    | 'output' >> beam.Map(print)

### Flatten

In [110]:
with beam.Pipeline() as pipeline:
    numbers1 = pipeline | 'CreateNumbers1' >> beam.Create([1,2,3,4,5])
    numbers2 = pipeline | 'CreateNumbers2' >> beam.Create([12,91,100])
    (numbers1, numbers2) | 'Flatten' >> beam.Flatten() \
    | 'output' >> beam.Map(print)

1
2
3
4
5
12
91
100


### Partition

In [127]:
## Partition is a Beam transform for PCollection objects that store the same data type. 
## Partition splits a single PCollection into a fixed number of smaller collections.

## NOTE: Partition takes in a function which returns a index/number 
## to identify which partition the element should belongs to

durations = ['annual', 'biennial', 'perennial']

def by_duration(plant, num_partitions):
#     print(durations.index(plant['duration']))
    return durations.index(plant['duration'])

with beam.Pipeline() as pipeline:
    annuals, biennials, perennials = (
        pipeline \
        | 'Gardening plants' >> beam.Create([
            {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
            {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
            {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
            {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
            {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
        ]) \
        | 'Partition' >> beam.Partition(by_duration, len(durations))
    )

    annuals | 'Annuals' >> beam.Map(lambda x: print('annual: {}'.format(x)))
    biennials | 'Biennials' >> beam.Map(lambda x: print('biennial: {}'.format(x)))
    perennials | 'Perennials' >> beam.Map(lambda x: print('perennial: {}'.format(x)))

2
perennial: {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'}
1
biennial: {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'}
2
perennial: {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'}
0
annual: {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'}
2
perennial: {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'}


### Side inputs

In [36]:
# Side inputs are available as extra arguments in the DoFn's process method or Map / FlatMap's callable.
# Optional, positional, and keyword arguments are all supported. Deferred arguments are unwrapped into their
# actual values. For example, using pvalue.AsIteor(pcoll) at pipeline construction time results in an iterable
# of the actual elements of pcoll being passed into each process invocation. In this example, side inputs are
# passed to a FlatMap transform as extra arguments and consumed by filter_using_length.

def filter_using_length(word, lower_bound, upper_bound=float('inf')):
    if lower_bound <= len(word) <= upper_bound:
        yield word

class FilterUsingLength(beam.DoFn):
    def process(self, element, lower_bound, upper_bound=float('inf')):
        if lower_bound <= len(element) <= upper_bound:
            # use yield to produce iterable returns
            yield len(element)

with beam.Pipeline() as pipeline:

    # construct a deferred side input
    words = pipeline \
    | 'CreateFromMemory' >> beam.Create([
        'To be, or not to be: that is the question: ',
        "Whether 'tis nobler in the mind to suffer ",
        'The slings and arrows of outrageous fortune, ',
        'Or to take arms against a sea of troubles, ',
      ])
    avg_len = words \
    | beam.Map(len) \
    | beam.CombineGlobally(beam.combiners.MeanCombineFn())

    # call with explicit side inputs in FlatMap
    small_words = (
        words \
        | 'small' >> beam.FlatMap(filter_using_length, lower_bound=40, upper_bound=42) \
        | 'p1' >> beam.Map(print)
    )
    
    # call with deferred side input in FlatMap
    larger_than_avg = (
        words \
        | 'larger' >> beam.FlatMap(filter_using_length, lower_bound=beam.pvalue.AsSingleton(avg_len)) \
        | 'p2' >> beam.Map(print)
    )

    # call with explicit side inputs in ParDo
    small_words_2 = (
        words \
        | 'small2' >> beam.ParDo(FilterUsingLength(), 40, 42) \
        | 'p3' >> beam.Map(print)
    )

Whether 'tis nobler in the mind to suffer 
42
The slings and arrows of outrageous fortune, 


### Additional outputs

In [42]:
from apache_beam import pvalue

## use pvalue.TaggedOutput to define the name of specific output
## then the name can be used in with_outputs() method

class ProcessWords(beam.DoFn):
    def process(self, element, cutoff_length, marker):
        if len(element) <= cutoff_length:
            # Emit this short word to the main output.
            yield element
        else:
            # Emit this word's long length to the 'above_cutoff_lengths' output.
            yield pvalue.TaggedOutput('above_cutoff_lengths', len(element))
        if element.startswith(marker):
            # Emit this word to a different output with the 'marked strings' tag.
            yield pvalue.TaggedOutput('marked strings', element)

words = ['aa', 'bbb', 'c', 'x5']
with beam.Pipeline() as pipeline:
    results = (
        words \
        | beam.ParDo(ProcessWords(), cutoff_length=2, marker='x').with_outputs(
            'above_cutoff_lengths',
            'marked strings',
            main='below_cutoff_strings')
    )
    below = results.below_cutoff_strings | 'p1' >> beam.Map(print)
    above = results.above_cutoff_lengths | 'p2' >> beam.Map(print)
    marked = results['marked strings'] | 'p3' >> beam.Map(print) # indexing works as well
    
    ## unzipping also works
    below, above, marked = results

aa
c
x5
3
x5


In [47]:
## Emitting to multiple outputs in your DoFn
# NOTE: if with_outputs() `main` argument is not assigned, then the main output is `None` - results[None]
def even_odd(x):
    yield pvalue.TaggedOutput('odd' if x % 2 else 'even', x)
    if x % 10 == 0:
        yield x

numbers = [1,2,3,4,11,10]
with beam.Pipeline() as pipeline:
    evens, odds, tens = (
        numbers \
        | beam.ParDo(even_odd).with_outputs('odd', 'even', main='tens')
    )
    evens | 'p1' >> beam.Map(print)
    odds | 'p2' >> beam.Map(print)
    tens | 'p3' >> beam.Map(print)

10
1
3
11
2
4
10


In [None]:
## access TimeStamp of an input element, add a keyword parameter default to DoFn.TimeStampParam
class ProcessRecord(beam.DoFn):
    def process(self, element, timestamp=beam.DoFn.TimestampParam):
        # access timestamp of element.
        pass

In [None]:
## To access the window an input element falls into, add a keyword parameter default to DoFn.WindowParam
class ProcessRecord(beam.DoFn):
    def process(self, element, window=beam.DoFn.WindowParam):
        pass

In [None]:
## When triggers are used, Beam provides a DoFn.PaneInfoParam object that contains information about the current firing.
## Using DoFn.PaneInfoParam you can determine whether this is an early or a late firing, 
## and how many times this window has already fired for this key
class ProcessRecord(beam.DoFn):
    def process(self, element, pane_info=beam.DoFn.PaneInfoParam):
        pass

### Composite transforms

In [67]:
## subclass PTransform to create composite transforms
## NOTE: Within your PTransform subclass, you’ll need to override the expand method. 
## The expand method is where you add the processing logic for the PTransform

words = ['aa', 'bbb', 'c', 'x5', 'Test', 'awesome', 'aa', 'c']

class ComputeWordLength(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll \
            | beam.Map(lambda x: len(x)) \
            | beam.Map(print)
        )

class CountWords(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll \
            | beam.combiners.Count.PerElement() \
            | beam.combiners.Top.Of(3) \
            | beam.Map(print)
        )

# use ComputeWordLength composite PTransform
with beam.Pipeline() as pipeline:
    words | ComputeWordLength()
with beam.Pipeline() as pipeline:
    words | CountWords()

2
3
1
2
4
7
2
1
[('x5', 1), ('c', 2), ('bbb', 1)]


---