In [1]:
import apache_beam as beam

#using beam.CombineGlobally(function)
    #beam.CombineGlobally which take all pvalue and take it to the function
#explain example function get_common_items:
    #unpack a lit of set or return list of empty set
def get_common_items(sets):
  # set.intersection() takes multiple sets as separete arguments.
  # We unpack the `sets` list into multiple arguments with the * operator.
  # The combine transform might give us an empty list of `sets`,
  # so we use a list with an empty set as a default value.
  return set.intersection(*(sets or [set()]))

with beam.Pipeline() as pipeline:
  common_items = (
      pipeline
      | 'Create produce' >> beam.Create([
          {'🍓', '🥕', '🍌', '🍅', '🌶️'},
          {'🍇', '🥕', '🥝', '🍅', '🥔'},
          {'🍉', '🥕', '🍆', '🍅', '🍍'},
          {'🥑', '🥕', '🌽', '🍅', '🥥'},
      ])
      | 'Get common items' >> beam.CombineGlobally(get_common_items)
      | beam.Map(print))



{'🥕', '🍅'}


In [2]:
import apache_beam as beam

with beam.Pipeline() as pipeline:
  common_items = (
      pipeline
      | 'Create produce' >> beam.Create([
          {'🍓', '🥕', '🍌', '🍅', '🌶️'},
          {'🍇', '🥕', '🥝', '🍅', '🥔'},
          {'🍉', '🥕', '🍆', '🍅', '🍍'},
          {'🥑', '🥕', '🌽', '🍅', '🥥'},
      ])
      | 'Get common items' >>
      beam.CombineGlobally(lambda sets: set.intersection(*(sets or [set()])))
      | beam.Map(print))

{'🥕', '🍅'}


In [None]:
import apache_beam as beam

#Example of beam.CombineGlobally with function take multiple inputs
with beam.Pipeline() as pipeline:
  common_items_with_exceptions = (
      pipeline
      | 'Create produce' >> beam.Create([
          {'🍓', '🥕', '🍌', '🍅', '🌶️'},
          {'🍇', '🥕', '🥝', '🍅', '🥔'},
          {'🍉', '🥕', '🍆', '🍅', '🍍'},
          {'🥑', '🥕', '🌽', '🍅', '🥥'},
      ])
      | 'Get common items with exceptions' >> beam.CombineGlobally(
          lambda sets, exclude: \
              set.intersection(*(sets or [set()])) - exclude,
          exclude={'🥕'})
      | beam.Map(print)
  )

In [3]:
import apache_beam as beam

#Example of define your own beam.CombineFn
class PercentagesFn(beam.CombineFn):
  def create_accumulator(self):
    return {}

  def add_input(self, accumulator, input):
    # accumulator == {}
    # input == '🥕'
    if input not in accumulator:
      accumulator[input] = 0  # {'🥕': 0}
    accumulator[input] += 1  # {'🥕': 1}
    return accumulator

  def merge_accumulators(self, accumulators):
    # accumulators == [
    #     {'🥕': 1, '🍅': 2},
    #     {'🥕': 1, '🍅': 1, '🍆': 1},
    #     {'🥕': 1, '🍅': 3},
    # ]
    merged = {}
    for accum in accumulators:
      for item, count in accum.items():
        if item not in merged:
          merged[item] = 0
        merged[item] += count
    # merged == {'🥕': 3, '🍅': 6, '🍆': 1}
    return merged

  def extract_output(self, accumulator):
    # accumulator == {'🥕': 3, '🍅': 6, '🍆': 1}
    total = sum(accumulator.values())  # 10
    percentages = {item: count / total for item, count in accumulator.items()}
    # percentages == {'🥕': 0.3, '🍅': 0.6, '🍆': 0.1}
    return percentages

with beam.Pipeline() as pipeline:
  percentages = (
      pipeline
      | 'Create produce' >> beam.Create(
          ['🥕', '🍅', '🍅', '🥕', '🍆', '🍅', '🍅', '🍅', '🥕', '🍅'])
      | 'Get percentages' >> beam.CombineGlobally(PercentagesFn())
      | beam.Map(print))

{'🥕': 0.3, '🍅': 0.6, '🍆': 0.1}


**Summary on beam.CombineGlobally**
- beam.CombineGlobally will collect all pvalue of PCollection and apply combine function

**Discuss about beam.CombinePerKey**
- reference link: https://beam.apache.org/documentation/transforms/python/aggregation/combineperkey/

In [4]:
import apache_beam as beam

#using beam.CombinePerKey(func_name)
    #it is like apply function base on key
    #example, we collect pvalue base on keys and then apply sum
with beam.Pipeline() as pipeline:
  total = (
      pipeline
      | 'Create plant counts' >> beam.Create([
          ('🥕', 3),
          ('🥕', 2),
          ('🍆', 1),
          ('🍅', 4),
          ('🍅', 5),
          ('🍅', 3),
      ])
      | 'Sum' >> beam.CombinePerKey(sum)
      | beam.Map(print))

('🥕', 5)
('🍆', 1)
('🍅', 12)


In [5]:
import apache_beam as beam

#Example of using beam.CombinePerKey with custom function
    #function will return min(sum(values), limit)
def saturated_sum(values):
  max_value = 8
  return min(sum(values), max_value)

with beam.Pipeline() as pipeline:
  saturated_total = (
      pipeline
      | 'Create plant counts' >> beam.Create([
          ('🥕', 3),
          ('🥕', 2),
          ('🍆', 1),
          ('🍅', 4),
          ('🍅', 5),
          ('🍅', 3),
      ])
      | 'Saturated sum' >> beam.CombinePerKey(saturated_sum)
      | beam.Map(print))

('🥕', 5)
('🍆', 1)
('🍅', 8)


In [6]:
import apache_beam as beam

#Example of using beam.CombinePerKey() with input is function name and additional input other than the pvalue
with beam.Pipeline() as pipeline:
  saturated_total = (
      pipeline
      | 'Create plant counts' >> beam.Create([
          ('🥕', 3),
          ('🥕', 2),
          ('🍆', 1),
          ('🍅', 4),
          ('🍅', 5),
          ('🍅', 3),
      ])
      | 'Saturated sum' >> beam.CombinePerKey(
          lambda values, max_value: min(sum(values), max_value), max_value=8)
      | beam.Map(print))

('🥕', 5)
('🍆', 1)
('🍅', 8)


In [None]:
import apache_beam as beam

#Example of using beam.CombinePerKey with your custom beam.CombineFn
class AverageFn(beam.CombineFn):
  def create_accumulator(self):
    sum = 0.0
    count = 0
    accumulator = sum, count
    return accumulator

  def add_input(self, accumulator, input):
    sum, count = accumulator
    return sum + input, count + 1

  def merge_accumulators(self, accumulators):
    # accumulators = [(sum1, count1), (sum2, count2), (sum3, count3), ...]
    sums, counts = zip(*accumulators)
    # sums = [sum1, sum2, sum3, ...]
    # counts = [count1, count2, count3, ...]
    return sum(sums), sum(counts)

  def extract_output(self, accumulator):
    sum, count = accumulator
    if count == 0:
      return float('NaN')
    return sum / count

with beam.Pipeline() as pipeline:
  average = (
      pipeline
      | 'Create plant counts' >> beam.Create([
          ('🥕', 3),
          ('🥕', 2),
          ('🍆', 1),
          ('🍅', 4),
          ('🍅', 5),
          ('🍅', 3),
      ])
      | 'Average' >> beam.CombinePerKey(AverageFn())
      | beam.Map(print))

**Discuss about beam.CombineValues**
- reference link: https://beam.apache.org/documentation/transforms/python/aggregation/combinevalues/


In [7]:
import apache_beam as beam

#using beam.CombineValues:
    #p.value is key-value pair (tupe)
    #value is iterable
#example: for each pvalue compute sum of values

with beam.Pipeline() as pipeline:
  total = (
      pipeline
      | 'Create produce counts' >> beam.Create([
          ('🥕', [3, 2]),
          ('🍆', [1]),
          ('🍅', [4, 5, 3]),
      ])
      | 'Sum' >> beam.CombineValues(sum)
      | beam.Map(print))

('🥕', 5)
('🍆', 1)
('🍅', 12)


In [8]:
import apache_beam as beam

def saturated_sum(values):
  max_value = 8
  return min(sum(values), max_value)

with beam.Pipeline() as pipeline:
  saturated_total = (
      pipeline
      | 'Create plant counts' >> beam.Create([
          ('🥕', [3, 2]),
          ('🍆', [1]),
          ('🍅', [4, 5, 3]),
      ])
      | 'Saturated sum' >> beam.CombineValues(saturated_sum)
      | beam.Map(print))

('🥕', 5)
('🍆', 1)
('🍅', 8)


In [9]:
import apache_beam as beam

with beam.Pipeline() as pipeline:
  saturated_total = (
      pipeline
      | 'Create plant counts' >> beam.Create([
          ('🥕', [3, 2]),
          ('🍆', [1]),
          ('🍅', [4, 5, 3]),
      ])
      | 'Saturated sum' >>
      beam.CombineValues(lambda values: min(sum(values), 8))
      | beam.Map(print))

('🥕', 5)
('🍆', 1)
('🍅', 8)


In [10]:
import apache_beam as beam

#Example of using beam.CombineValues with function that take multiple input other than pvalue
with beam.Pipeline() as pipeline:
  saturated_total = (
      pipeline
      | 'Create plant counts' >> beam.Create([
          ('🥕', [3, 2]),
          ('🍆', [1]),
          ('🍅', [4, 5, 3]),
      ])
      | 'Saturated sum' >> beam.CombineValues(
          lambda values, max_value: min(sum(values), max_value), max_value=8)
      | beam.Map(print))

('🥕', 5)
('🍆', 1)
('🍅', 8)


In [None]:
import apache_beam as beam

#Example of using beam.CombineValues() with your custom beam.CombineFn
class AverageFn(beam.CombineFn):
  def create_accumulator(self):
    return {}

  def add_input(self, accumulator, input):
    # accumulator == {}
    # input == '🥕'
    if input not in accumulator:
      accumulator[input] = 0  # {'🥕': 0}
    accumulator[input] += 1  # {'🥕': 1}
    return accumulator

  def merge_accumulators(self, accumulators):
    # accumulators == [
    #     {'🥕': 1, '🍅': 1},
    #     {'🥕': 1, '🍅': 1, '🍆': 1},
    # ]
    merged = {}
    for accum in accumulators:
      for item, count in accum.items():
        if item not in merged:
          merged[item] = 0
        merged[item] += count
    # merged == {'🥕': 2, '🍅': 2, '🍆': 1}
    return merged

  def extract_output(self, accumulator):
    # accumulator == {'🥕': 2, '🍅': 2, '🍆': 1}
    total = sum(accumulator.values())  # 5
    percentages = {item: count / total for item, count in accumulator.items()}
    # percentages == {'🥕': 0.4, '🍅': 0.4, '🍆': 0.2}
    return percentages

with beam.Pipeline() as pipeline:
  percentages_per_season = (
      pipeline
      | 'Create produce' >> beam.Create([
          ('spring', ['🥕', '🍅', '🥕', '🍅', '🍆']),
          ('summer', ['🥕', '🍅', '🌽', '🍅', '🍅']),
          ('fall', ['🥕', '🥕', '🍅', '🍅']),
          ('winter', ['🍆', '🍆']),
      ])
      | 'Average' >> beam.CombineValues(AverageFn())
      | beam.Map(print))