In [1]:
import apache_beam as beam
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam.pipeline import Pipeline
from apache_beam.io import ReadFromBigQuery
from apache_beam.io.gcp.internal.clients import bigquery
from typing import List, Tuple, Any

In [2]:
import warnings
warnings.filterwarnings(action='ignore')

In [14]:
def get_common_items(sets):
  # set.intersection() takes multiple sets as separete arguments.
  # We unpack the `sets` list into multiple arguments with the * operator.
  # The combine transform might give us an empty list of `sets`,
  # so we use a list with an empty set as a default value.
  return set.intersection(*(sets or [set()]))

with beam.Pipeline() as pipeline:
  common_items = (
      pipeline
      | 'Create produce' >> beam.Create([
          {'🍓', '🥕', '🍌', '🍅', '🌶️'},
          {'🍇', '🥕', '🥝', '🍅', '🥔'},
          {'🍉', '🥕', '🍆', '🍅', '🍍'},
          {'🥑', '🥕', '🌽', '🍅', '🥥'},
      ])
      | 'Get common items' >> beam.CombineGlobally(get_common_items)
      | beam.Map(print))



{'🍅', '🥕'}


In [5]:
class CollapsePivoted(beam.CombineFn):
    def create_accumulator(self):
        return {}

    def add_input(self, accumulator, input):
        for row in input:
            for k,v in row.items():
                if type(v) != str:
                    if v is None:
                        v = 0
                    if k not in accumulator:
                        accumulator[k] = v
                    else:
                        accumulator[k] += v
        return accumulator

    def merge_accumulators(self, accumulators):
        merged = {}
        for accum in accumulators:
            for k, v in accum.items():
                if type(v) != str:
                    if v is None:
                        v = 0
                    if k not in merged:
                        merged[k] = v
                        merged[k] += v
        return merged

    def extract_output(self, accumulator):
        # # accumulator ex: ('Casey', {'RI_number': 16, 'F_number': 440, ...})
        # name, counts = accumulator
        # # total = counts['F_number'] + counts['M_number']
        # acc = {'name' : name}
        # counts = sorted(counts, key=lambda x: counts.get(x), reverse=True)
        # for k,v in counts.items():
        #     acc.update({k:v})
        
        return accumulator#sorted(accumulator.items(), key=lambda x: accumulator.__getitem__(x))

# with beam.Pipeline() as pipeline:
#     percentages = (
#         pipeline
#         | 'Create produce' >> beam.Create(
#             ['🥕', '🍅', '🍅', '🥕', '🍆', '🍅', '🍅', '🍅', '🥕', '🍅'])
#         | 'Get percentages' >> beam.CombineGlobally(PercentagesFn())
#         | beam.Map(print))

# {'🥕': 0.3, '🍅': 0.6, '🍆': 0.1}

In [28]:
p = Pipeline(InteractiveRunner())
pcoll = p | beam.Create([{'a':'bogus','b':1,'c':3,'d':6}, {'a':'bogus','b':7,'c':None,'d':3}, {'a':'hola','b':6,'c':8,'d':2}])

keyfields = ['a']

result = (
    pcoll 
    | beam.GroupBy(lambda rowdict: rowdict.get('a')) 
    | beam.CombinePerKey(CollapsePivoted())
    | beam.Map(lambda tup: [{keyfields[0]:tup[0]}, tup[1]] )
    | beam.Map(lambda x: {k:v for d in x for k,v in d.items()} )
    | beam.Map(print)
    )
p.run()

# ('bogus', {'b': 16, 'c': 14, 'd': 18})
# ('hola', {'b': 12, 'c': 16, 'd': 4})
# ...
# {'a': 'bogus', 'b': 16, 'c': 6, 'd': 18}
# {'a': 'hola', 'b': 12, 'c': 16, 'd': 4}



{'a': 'bogus', 'b': 16, 'c': 6, 'd': 18}
{'a': 'hola', 'b': 12, 'c': 16, 'd': 4}


<apache_beam.runners.interactive.interactive_runner.PipelineResult at 0x248ccc237f0>

In [19]:
tup = ('bogus', {'b': 16, 'c': 14, 'd': 18})
{'a':tup[0]}, tup[1].items()

({'a': 'bogus'}, dict_items([('b', 16), ('c', 14), ('d', 18)]))

In [18]:
{'a': 'bogus'}.update({'b': 16, 'c': 14, 'd': 18}.items())

In [27]:
p.run()
# ib.show(result)

  and should_run_async(code)


{'a': 18, 'b': 28, 'c': 30, 'd': 22}


<apache_beam.runners.interactive.interactive_runner.PipelineResult at 0x19d2c813640>

In [60]:
d = dict([('a',3),('b','s')])

In [107]:
d.get('a')

3

In [67]:
kf = ['a']

  and should_run_async(code)


In [69]:
d.__getitem__(*kf)

3