# 1. Creating pipeline

In [1]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

with beam.Pipeline(options=PipelineOptions()) as p:
  pass  # build your pipeline here


In [7]:
import apache_beam.runners.interactive.interactive_beam as ib

# 2. Configuring pipeline options

# 3. PCollections

## 3.1 Reading from an external source

```python
lines = p | 'ReadMyFile' >> beam.io.ReadFromText('gs://some/inputData.txt')
```

# 3.2 Creating a PCollection from in-memory data

In [14]:
import apache_beam as beam

def print_row(element):
  print(element)

with beam.Pipeline() as pipeline:
  lines = (
      pipeline
      | beam.Create([
          'To be, or not to be: that is the question: ',
          "Whether 'tis nobler in the mind to suffer ",
          'The slings and arrows of outrageous fortune, ',
          'Or to take arms against a sea of troubles, ',
      ])
      | 'Print result' >> beam.Map(print_row)
  )


To be, or not to be: that is the question: 
Whether 'tis nobler in the mind to suffer 
The slings and arrows of outrageous fortune, 
Or to take arms against a sea of troubles, 


# 4. Transforms

## 4.2. Core Beam transforms

## Map (Same as ParDo)

In [19]:
import apache_beam as beam

def print_row(element):
  print(element)

pipeline = beam.Pipeline()
words = (
      pipeline
      | beam.Create([
          'Cat',
          'Dog',
          'Bat',
          'Rabbit',
      ])
  )

# Apply a ParDo to the PCollection "words" to compute lengths for each word.
word_lengths = words | beam.Map(len) | 'Print result' >> beam.Map(print_row)

pipeline.run()

3
3
3
6


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fe759a3a780>

### FlatMap (same as ParDo)

In [21]:
import apache_beam as beam

def print_row(element):
  print(element)

pipeline = beam.Pipeline()
words = (
      pipeline
      | beam.Create([
          'Cat',
          'Dog',
          'Bat',
          'Rabbit',
      ])
  )

# Apply a ParDo to the PCollection "words" to compute lengths for each word.
word_lengths = words | beam.FlatMap(lambda word: [len(word)]) | 'Print result' >> beam.Map(print_row)

pipeline.run()

3
3
3
6


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fe7599916a0>

### ParDo

In [20]:
import apache_beam as beam

def print_row(element):
  print(element)

pipeline = beam.Pipeline()
words = (
      pipeline
      | beam.Create([
          'Cat',
          'Dog',
          'Bat',
          'Rabbit',
      ])
  )


# The DoFn to perform on each element in the input PCollection.
class ComputeWordLengthFn(beam.DoFn):
  def process(self, element):
    return [len(element)]


# Apply a ParDo to the PCollection "words" to compute lengths for each word.
word_lengths = words | beam.ParDo(ComputeWordLengthFn()) | 'Print result' >> beam.Map(print_row)

pipeline.run()

3
3
3
6


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fe759a6e5c0>

### 4.2.2. GroupByKey

In [24]:
import apache_beam as beam

def print_row(element):
  print(element)

pipeline = beam.Pipeline()
words = (
      pipeline
      | beam.Create([
          ('cat', 1),
          ('dog', 5),
          ('and', 1),
          ('jump', 3),
          ('tree', 2),
          ('cat', 5),
          ('dog', 2),
          ('and', 2),
          ('cat', 9),
          ('and', 6)
      ])
  )


# Apply a ParDo to the PCollection "words" to compute lengths for each word.
word_lengths = words | beam.GroupByKey() | 'Print result' >> beam.Map(print_row)

pipeline.run()

('cat', [1, 5, 9])
('dog', [5, 2])
('and', [1, 2, 6])
('jump', [3])
('tree', [2])


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fe7599bba20>

## 4.2.3. CoGroupByKey

In [4]:
import apache_beam as beam

def print_row(element):
  print(element)

p = beam.Pipeline()

emails_list = [
    ('amy', 'amy@example.com'),
    ('carl', 'carl@example.com'),
    ('julia', 'julia@example.com'),
    ('carl', 'carl@email.com'),
]
phones_list = [
    ('amy', '111-222-3333'),
    ('james', '222-333-4444'),
    ('amy', '333-444-5555'),
    ('carl', '444-555-6666'),
]

emails = p | 'CreateEmails' >> beam.Create(emails_list)
phones = p | 'CreatePhones' >> beam.Create(phones_list)

results = ({'emails': emails, 'phones': phones} | beam.CoGroupByKey()) | 'Print result' >> beam.Map(print_row)


p.run()

('amy', {'emails': ['amy@example.com'], 'phones': ['111-222-3333', '333-444-5555']})
('james', {'emails': [], 'phones': ['222-333-4444']})
('carl', {'emails': ['carl@example.com', 'carl@email.com'], 'phones': ['444-555-6666']})
('julia', {'emails': ['julia@example.com'], 'phones': []})


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f85d7442f98>

## 4.2.4. Combine

In [23]:
import apache_beam as beam

def print_row(element):
  print(element)

pipeline = beam.Pipeline()
pc = (
      pipeline
      | beam.Create([1, 10, 100, 1000])
  )


result_sum = pc | beam.CombineGlobally(sum) | 'Print result' >> beam.Map(print_row)

pipeline.run()

1111


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f85d7b44780>

In [26]:
import apache_beam as beam

def print_row(element):
  print(element)

def bounded_sum(values, bound=500):
  return min(sum(values), bound)

pipeline = beam.Pipeline()
pc = (
      pipeline
      | beam.Create([1, 10, 100, 1000])
  )


# 500
small_sum = (pc 
             | 'Small Sum' >> beam.CombineGlobally(bounded_sum) 
             | 'Print Small Sum' >> beam.Map(print_row))

# 1111
large_sum = (pc 
             | 'Large Sum' >> beam.CombineGlobally(bounded_sum, bound=5000)  
             | 'Print Large Sum' >> beam.Map(print_row))


pipeline.run()

500
1111


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f85d7e9b198>

In [29]:
import apache_beam as beam

def print_row(element):
  print(element)

# UDF
class AverageFn(beam.CombineFn):
  def create_accumulator(self):
    return (0.0, 0)

  def add_input(self, sum_count, input):
    (sum, count) = sum_count
    return sum + input, count + 1

  def merge_accumulators(self, accumulators):
    sums, counts = zip(*accumulators)
    return sum(sums), sum(counts)

  def extract_output(self, sum_count):
    (sum, count) = sum_count
    return sum / count if count else float('NaN')


pipeline = beam.Pipeline()
pc = (
      pipeline
      | beam.Create([1, 10, 100, 1000])
  )


res = (pc 
       | beam.CombineGlobally(AverageFn()) 
       | 'Print result' >> beam.Map(print_row))


pipeline.run()

277.75


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f85d7f8a278>

## CombinePerKey

https://beam.apache.org/documentation/transforms/python/aggregation/combineperkey/

### Example 1
We use the function `sum` which takes an `iterable` of numbers and adds them together.

In [36]:
import apache_beam as beam

with beam.Pipeline() as pipeline:
  total = (
      pipeline
      | 'Create plant counts' >> beam.Create([
          ('🥕', 3),
          ('🥕', 2),
          ('🍆', 1),
          ('🍅', 4),
          ('🍅', 5),
          ('🍅', 3),
      ])
      | 'Sum' >> beam.CombinePerKey(sum)
      | beam.Map(print))

('🥕', 5)
('🍆', 1)
('🍅', 12)


### Example 2

We define a function saturated_sum which takes an iterable of numbers and adds them together, up to a predefined maximum number.

In [37]:
import apache_beam as beam

def saturated_sum(values):
  max_value = 8
  return min(sum(values), max_value)

with beam.Pipeline() as pipeline:
  saturated_total = (
      pipeline
      | 'Create plant counts' >> beam.Create([
          ('🥕', 3),
          ('🥕', 2),
          ('🍆', 1),
          ('🍅', 4),
          ('🍅', 5),
          ('🍅', 3),
      ])
      | 'Saturated sum' >> beam.CombinePerKey(saturated_sum)
      | beam.Map(print))

('🥕', 5)
('🍆', 1)
('🍅', 8)
