## **Apache Beam**

Installing Apache beam interactive

In [1]:
!pip install apache-beam[interactive]

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting apache-beam[interactive]
  Downloading apache_beam-2.41.0-cp37-cp37m-manylinux2010_x86_64.whl (10.9 MB)
[K     |████████████████████████████████| 10.9 MB 5.1 MB/s 
[?25hCollecting fastavro<2,>=0.23.6
  Downloading fastavro-1.6.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.4 MB)
[K     |████████████████████████████████| 2.4 MB 35.2 MB/s 
[?25hCollecting hdfs<3.0.0,>=2.1.0
  Downloading hdfs-2.7.0-py3-none-any.whl (34 kB)
Collecting proto-plus<2,>=1.7.1
  Downloading proto_plus-1.22.1-py3-none-any.whl (47 kB)
[K     |████████████████████████████████| 47 kB 4.9 MB/s 
Collecting cloudpickle<3,>=2.1.0
  Downloading cloudpickle-2.2.0-py3-none-any.whl (25 kB)
Collecting orjson<4.0
  Downloading orjson-3.8.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (270 kB)
[K     |████████████████████████████████| 270 kB 59.7 MB/s 
[?25hCollecting pymongo<4.0.0

Installing apache beam dataframe

In [1]:
!pip install apache_beam[dataframe]

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
!pip install --quiet apache-beam

Importing all the necessary libraries

In [3]:
import apache_beam as beam
import pandas as pd
from apache_beam.dataframe.io import read_csv
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner

Read the csv data and filter the customers that purchased chocolate and put it into an output file

In [23]:
p1 = beam.Pipeline()
visit_count = (
  
  p1
  |beam.dataframe.io.read_csv('/content/grocery.csv')
  #|beam.io.Read('/content/grocery.txt')

  |beam.Map(lambda record: record.split(','))
  |beam.Filter(lambda record: record[3] == 'chocolate')
  |beam.Map(lambda record: (record[1], 1))
  |beam.CombinePerKey(sum)
  
  |beam.io.WriteToText('out_data.txt')
)


In [24]:
p1.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fc7bb38c650>

calculating the customers who brought both choclotates and yogurt and pushing into 

In [34]:
import apache_beam as beam


p2 = beam.Pipeline()


input_collection = (

    p2
    | "Read input data" >> beam.io.ReadFromText('/content/grocery.txt')
    | "Split rows into columns" >> beam.Map(lambda record: record.split(','))
                  )

chocolate_buyers_count = (
    input_collection
    | 'Filter Chocolate buyers' >> beam.Filter(lambda record: record[3] == 'chocolate')
    | 'Pair each chocolate buyer with 1' >> beam.Map(lambda record: ("chocolate, " +record[1], 1))
    | 'Aggregate all chocolate buyers' >> beam.CombinePerKey(sum)
    )

yogurt_buyers_count = (
    input_collection
    | 'Filter yogurt buyers' >> beam.Filter(lambda record: record[3] == 'yogurt')
    | 'Pair each buyer with 1' >> beam.Map(lambda record: ("yogurt, " +record[1], 1))
    | 'Aggregate all yogurt buyers' >> beam.CombinePerKey(sum)
    )

output =(
        (chocolate_buyers_count,yogurt_buyers_count)
  | beam.Flatten()
  | beam.io.WriteToText('data/both')
)


# **Composite transforms**

group all the purchases with chocolates and yogurt in the grocery dataset

In [32]:
import apache_beam as beam

class CustomTransform(beam.PTransform):
  
  def expand(self, input_coll):
    
    a = ( 
        input_coll
                       | 'Group and sum' >> beam.CombinePerKey(sum)
                       | 'Grocery sales' >> beam.Map(format_output)
              
    )
    return a

def SplitRow(element):
    return element.split(',')
  
def format_output(element):
  name, count = element
  return ', '.join((name,str(count)))

p4 = beam.Pipeline()

input_collection = ( 
                      p4
                      | "Read from text file" >> beam.dataframe.io.read_csv('/content/grocery.csv')
                      | "Split rows" >> beam.Map(SplitRow)
                   )

chocolate_count = (
                      input_collection
                      | 'Filter chocolate buyers' >> beam.Filter(lambda record: record[2] == 'chocolate')
                      | 'Pair each chocolate buyer with 1' >> beam.Map(lambda record: ("chocolate, " +record[1], 1))
                      | 'composite chocolate buyers' >> CustomTransform()
                      | 'Write results for chocolate' >> beam.io.WriteToText('chocolate_output.txt')
                 )

yogurt_count = (
                input_collection
                | 'Filter yogurt buyers' >> beam.Filter(lambda record: record[2] == 'Apples')
                | 'Pair each yogurt buyer with 1' >> beam.Map(lambda record: ("Apples, " +record[1], 1))
                | 'composite yogurt buyers' >> CustomTransform()
                | 'Write results for yogurt' >> beam.io.WriteToText('yogurt_output.txt')
           )


p4.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fc7bb74d850>