CombineFn: and Overriding methods

For more complex combine functions, you can define a subclass of CombineFn. You should use CombineFn if the combine function requires a more sophisticated accumulator, must perform additional pre- or post-processing, might change the output type, or takes the key into account.

A general combining operation consists of four operations. When you create a subclass of CombineFn, you must provide four operations by overriding the corresponding methods:

Accumulator:

    Create Accumulator creates a new “local” accumulator. In the example case, taking a mean average, a local accumulator tracks the running sum of values (the numerator value for our final average division) and the number of values summed so far (the denominator value). It may be called any number of times in a distributed fashion.

Add Input:

    Add Input adds an input element to an accumulator, returning the accumulator value. In our example, it would update the sum and increment the count. It may also be invoked in parallel.

merge Accumulators:

    Merge Accumulators merges several accumulators into a single accumulator; this is how data in multiple accumulators is combined before the final calculation. In the case of the mean average computation, the accumulators representing each portion of the division are merged together. It may be called again on its outputs any number of times.

Extract output:

    Extract Output performs the final computation. In the case of computing a mean average, this means dividing the combined sum of all the values by the number of values summed. It is called once on the final, merged accumulator.


In [1]:
import apache_beam as beam

  'Running the Apache Beam SDK on Python 3 is not yet fully supported. '


In [9]:
#passing simple function in to CombineGlobally()
p = beam.Pipeline()

small_sum =(
    p
    | beam.Create([15,5,7,7,9,23,13,5])
    | beam.CombineGlobally(sum)
    | 'write result'>> beam.io.WriteToText('/home/james/beam/comb_out')
)
p.run()



<apache_beam.runners.portability.fn_api_runner.RunnerResult at 0x7fad88940c50>

In [10]:
! cat beam/comb_out-00000-of-00001

84


In [20]:
class averageFn(beam.CombineFn):
    
    def create_accumulator(self):   # this function works locally in diff systems
        return(0.0,0) # initalizing sum and count
    
    def add_input(self, sum_count,input): # this function works locally in diff systems
        (sum,count)=sum_count
        return sum+input, count+1
    
    def merge_accumulators(self,accumulators): #merges all values from diff systesm
        ind_sum,ind_count =zip(*accumulators) # zip[((27,3),(39,3), (18,2)] --> [(27,39,18),(3,3,2))
        return sum(ind_sum),sum(ind_count)    # (84,8)
    
    def extract_output(self, sum_count):
        sum,count = sum_count # combine globally using combineFn
        return sum/count if count else float('NaN')
        
        

In [21]:
p = beam.Pipeline()

small_sum =(
    p
    | beam.Create([15,5,7,7,9,23,13,5])
    | beam.CombineGlobally(averageFn())
    | 'write result'>> beam.io.WriteToText('/home/james/beam/comb_out')
)
p.run()



<apache_beam.runners.portability.fn_api_runner.RunnerResult at 0x7fad889405c0>

In [22]:
! cat beam/comb_out-00000-of-00001

10.5
