In [None]:
!pip3 install apache_beam



In [None]:
import apache_beam as beam

## **GroupBy**:

*   Takes a collection of elements and produces a collection grouped, by properties of those elements.
*   Unlike GroupByKey, the key is dynamically created from the elements themselves.





In [None]:
with beam.Pipeline() as p:
  grouped = (
      p
      | beam.Create(['strawberry', 'raspberry', 'blueberry', 'blackberry', 'banana'])
      | beam.GroupBy(lambda s: s[0])
      |beam.Map(print))





('s', ['strawberry'])
('r', ['raspberry'])
('b', ['blueberry', 'blackberry', 'banana'])


## **Aggregation:**

In [None]:
GROCERY_LIST = [
    beam.Row(recipe='pie', fruit='strawberry', quantity=3, unit_price=1.50),
    beam.Row(recipe='pie', fruit='raspberry', quantity=1, unit_price=3.50),
    beam.Row(recipe='pie', fruit='blackberry', quantity=1, unit_price=4.00),
    beam.Row(recipe='pie', fruit='blueberry', quantity=1, unit_price=2.00),
    beam.Row(recipe='muffin', fruit='blueberry', quantity=2, unit_price=2.00),
    beam.Row(recipe='muffin', fruit='banana', quantity=3, unit_price=1.00),
]

In [None]:
with beam.Pipeline() as p:
  grouped = p | beam.Create(GROCERY_LIST) | beam.GroupBy('recipe')

In [None]:
with beam.Pipeline() as p:
  grouped = (
      p
      | beam.Create(GROCERY_LIST)
      | beam.GroupBy('fruit')
          .aggregate_field('recipe', sum, 'total_quantity'))

In [None]:
with beam.Pipeline() as p:
  grouped = (
      p
      | beam.Create(GROCERY_LIST)
      | beam.GroupBy('fruit')
          .aggregate_field('quantity', sum, 'total_quantity'))

## **GroupByKey**:

*   Takes a keyed collection of elements and produces a collection where each element consists of a key and all values associated with that key.



In [None]:
records = [("vignesh", [27, "engineer"]),
("neethu", [27, "developer"]),
("farooqui", [26, "data analyst"]),
("sai", [29, "web developer"]),
("tinkle", [28, "fullstack developer"]),
("neethu", 'Employed'),
("sai", 'Unemployed'),
("tinkle", 'Employed'),
("farooqui",'Employed'),
("vignesh", 'Unemployed')]

In [None]:
with beam.Pipeline() as pipeline:
  produce_counts = (
      pipeline
      | 'Create produce counts' >> beam.Create(records)
      | 'Group counts per produce' >> beam.GroupByKey()
      | beam.Map(print))



('vignesh', [[27, 'engineer'], 'Unemployed'])
('neethu', [[27, 'developer'], 'Employed'])
('farooqui', [[26, 'data analyst'], 'Employed'])
('sai', [[29, 'web developer'], 'Unemployed'])
('tinkle', [[28, 'fullstack developer'], 'Employed'])


## **CoGroupByKey**:

*   Aggregates all input elements by their key and allows downstream processing to consume all values associated with the key. 
*   While GroupByKey performs this operation over a single input collection and thus a single type of input values.
*  CoGroupByKey operates over multiple input collections. As a result, the result for each key is a tuple of the values associated with that key in each input collection.



In [None]:
with beam.Pipeline() as pipeline:
  student_pairs = pipeline | 'Create icons' >> beam.Create([
      ('vignesh', 'bangalore'),
      ('khaula', 'hyderabad'),
      ('neethu', 'malapur'),
      ('sai', 'chennai'),
  ])

  student_result = pipeline | 'Create durations' >> beam.Create([
      ('vignesh', [15,"FAIL"]),
      ('khaula', [99,"PASS"]),
      ('neethu', [100,"PASS"]),
      ('sai',[ 37,"FAIL"]),
  ])

  plants = (({
      'icons': student_pairs, 'durations': student_result
  })
            | 'Merge' >> beam.CoGroupByKey()
            | beam.Map(print))



('vignesh', {'icons': ['bangalore'], 'durations': [[15, 'FAIL']]})
('khaula', {'icons': ['hyderabad'], 'durations': [[99, 'PASS']]})
('neethu', {'icons': ['malapur'], 'durations': [[100, 'PASS']]})
('sai', {'icons': ['chennai'], 'durations': [[37, 'FAIL']]})


In [None]:
with beam.Pipeline() as pipeline:
  student_pairs = pipeline | 'Create icons' >> beam.Create([
      ('vignesh', 15),
      ('khaula', 99),
      ('neethu', 100),
      ('sai', 37),
  ])

  student_result = pipeline | 'Create durations' >> beam.Create([
      ('vignesh', "FAIL"),
      ('khaula',"PASS"),
      ('neethu',"PASS"),
      ('sai', "FAIL"),
  ])

  plants = (({
      'Marks': student_pairs, 'Result': student_result
  })
  | 'Merge' >> beam.CoGroupByKey()
  | beam.Map(print))



('vignesh', {'Marks': [15], 'Result': ['FAIL']})
('khaula', {'Marks': [99], 'Result': ['PASS']})
('neethu', {'Marks': [100], 'Result': ['PASS']})
('sai', {'Marks': [37], 'Result': ['FAIL']})


## **GroupIntoBatches**:



*   Batches the input into desired batch size.



In [None]:
with beam.Pipeline() as pipeline:
  batches_with_keys = (
      pipeline
      | 'Create produce' >> beam.Create([
          ('spring', '🍓'),
          ('spring', '🥕'),
          ('spring', '🍆'),
          ('spring', '🍅'),
          ('summer', '🥕'),
          ('summer', '🍅'),
          ('summer', '🌽'),
          ('fall', '🥕'),
          ('fall', '🍅'),
          ('winter', '🍆'),
      ])
      | 'Group into batches' >> beam.GroupIntoBatches(4)  #3, #2
      | beam.Map(print))

('spring', ['🍓', '🥕', '🍆', '🍅'])
('summer', ['🥕', '🍅', '🌽'])
('fall', ['🥕', '🍅'])
('winter', ['🍆'])


