In [None]:
import random

In [None]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

In [None]:
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])

In [None]:
with beam.Pipeline(options=pipeline_options) as pipeline:
  (
      pipeline
      | beam.Create(range(10))
      | 'Number of words' >> beam.CombineGlobally(sum)
      | 'Print words' >> beam.Map(print)
  )

In [None]:
with beam.Pipeline(options=pipeline_options) as pipeline:
  (
      pipeline
      | beam.Create([(random.randint(0,10), i) for i in range(10)])
      | 'Number of words' >> beam.CombinePerKey(beam.combiners.sum)
      | 'Print words' >> beam.Map(print)
  )

### Custom combiner

In [None]:
class AverageFn(beam.CombineFn):
  def create_accumulator(self):
    return (0.0, 0)

  def add_input(self, sum_count, input):
    (sum, count) = sum_count
    return sum + input, count + 1

  def merge_accumulators(self, accumulators):
    sums, counts = zip(*accumulators)
    return sum(sums), sum(counts)

  def extract_output(self, sum_count):
    (sum, count) = sum_count
    return sum / count if count else float('NaN')

In [None]:
with beam.Pipeline(options=pipeline_options) as pipeline:
  (
      pipeline
      | 'Range 10' >> beam.Create(range(10))
      | 'Number of words' >> beam.CombineGlobally(AverageFn())
      | 'Print words' >> beam.Map(print)
  )