In [None]:
!pip3 install apache_beam

**This is an example in which we will perform ETL on a csv file consisting of synthetic data(500,000 rows) of sales using Apache Beam using it's Python SDK.**

**Headers**
Region,Country,Item Type,Sales Channel,Order Priority,Order,Date,Order ID,Ship Date,Units Sold,Unit Price,Unit Cost,Total Revenue,Total Cost,Total Profit


input file - SalesRecords.csv
**Case 1**
Get the total profit made by the *Item Type* s =[Clothes , Meat,Snacks,Fruits]

output file - case1-00000-of-00001end.txt

**Case 2**
Get the min and max order placed(Total Revenue) by each Country  in 2011 via Online 

output file - case2-00000-of-00001end.txt




In [22]:
%%time
#importing apache beam module
import apache_beam as beam
import datetime
import sys 

#creating a beam pipeline object
p1 = beam.Pipeline()

import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions

#create list of required Item Types
selectedItemTypes = ["Clothes","Meat","Snacks","Fruits"]

#create a ParDo function to filter the records. You can also just use the beam.Filter()
class FilterItems(beam.DoFn):
  def process(self,element):
    if element[2] in selectedItemTypes:
      yield element

#sum up the list of profits in the value list
def SumFn(element):
  (key, val ) = element
  return (key,sum(val))

#min and max of the list of Tital revenue in the value list
def MinMaxFn(element):
  (key, val ) = element
  return (key,min(val),max(val))

inp = (
    p1
    | "Read csv file" >> beam.io.ReadFromText('SalesRecords.csv',skip_header_lines=1)
    | "Convert the rows into arrays" >> beam.Map(lambda record:record.split(","))
    )

case1 = (
    inp 
    | "Filter on the givem Item Types" >> beam.ParDo(FilterItems())
    | "Create a tuple of (Item Type, Total Profit)" >> beam.Map(lambda record: (record[2],float(record[13])))
    | "collect up the profits" >> beam.GroupByKey()
    | "sum the profits" >> beam.Map(SumFn)
    | "print on terminal" >> beam.Map(print)
    | "writing op to text file" >> beam.io.WriteToText('case1','end.txt',True,header=("Item Type","Total Profits"))
    )

case2 = (
    inp
    | "filter transactions which occured online and in 2011" >>
             beam.Filter(lambda row : (row[3]=='Online') and row[5][-4:]=='2011')
    | "create a pcollection of country and total revenue" >> beam.Map(lambda row: (row[1],float(row[11])))
    | "group by country" >> beam.GroupByKey()
    | 'Determine Min Max' >> beam.Map(MinMaxFn)
    | "writing op2 to text file" >> beam.io.WriteToText('case2','end.txt',True,header=("Country","Min Order(Rs)","Ma Order(Rs)"))
    | beam.Map(print)
    )

#run the pipeline
p1.run()

case2-00000-of-00001end.txt
('Fruits', 499972690.6800023)
('Clothes', 15332166989.279985)
('Meat', 11933838487.999947)
('Snacks', 11505215029.979986)
CPU times: user 7.87 s, sys: 108 ms, total: 7.98 s
Wall time: 8.3 s
