In [1]:
import apache_beam as beam
import csv
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

In [2]:
def parseorders(order_dict):
    new_orders = []
    for order in order_dict:
        date = order[-1]
        order_num = order[0]
        order.remove(date)
        order.remove(order_num)
        order = [i.replace('"', '') for i in order]
        order = [i.replace(' ', '') for i in order]
        new_orders.append(order)
    return new_orders

In [3]:
def myprint(x):
    print('{}'.format(x))

In [4]:
class SplitUsers(beam.DoFn):
    def process(self, element):
        data = element.split(",")
        user_id = data[0]
        data.remove(user_id)
        return [(
            user_id,
            data
            )]

In [5]:
class SplitOrders(beam.DoFn):
    def process(self, element):
        data = element.split(",")
        user_id = data[1]
        data.remove(user_id)
        return [(
            user_id,
            data     
            )]

In [6]:
class Separate(beam.DoFn):
    def process(self,element):
        user_id = element[0]
        mydict = element[1]
        mydict['users'][0]
        Name = mydict['users'][0][0]
        Gender = mydict['users'][0][1]
        Age = mydict['users'][0][2]
        Location = mydict['users'][0][3]
        Date_joined = mydict['users'][0][4]
        num_orders = len(mydict['orders'])
        orders = parseorders(mydict['orders'])
        return [
            {'user_id':user_id,
             'Gender':Gender,
             'Age':Age,
             'num_orders':num_orders,
             'orders':orders}
        ]

In [7]:
class AvgOrder(beam.DoFn):
    def process(self,element):
        return[(element['Gender'],element['num_orders'])]


In [8]:
class AgeGroup(beam.DoFn):
    def process(self,element):
        age = int(element['Age'])
        if(age<26 and age>15):
            age_group = '16-26'
        elif(age<36):
            age_group = '26-36'
        elif(age<46):
            age_group = '36-46'
        elif(age<56):
            age_group = '46-56'
        else:
            age_group = 'other'
        return[(age_group,element['num_orders'])]

In [9]:
class SpinachPurchase(beam.DoFn):
    def process(self,element):
        age = int(element['Age'])
        if(age<26 and age>15):
            age_group = '16-26'
        elif(age<36):
            age_group = '26-36'
        elif(age<46):
            age_group = '36-46'
        elif(age<56):
            age_group = '46-56'
        else:
            age_group = 'other'
        spinach = 0
        orders = element['orders']
        for order in orders:
            if('Spinach' in order):
                spinach+=1
        return[(age_group,spinach)]

In [10]:
with beam.Pipeline(InteractiveRunner()) as pipeline:
    users = (
    pipeline 
    | 'Read users' >> beam.io.ReadFromText("/Users/jeandre/Desktop/Big Data/users_v.csv",skip_header_lines=1)
    | 'Separate user' >> beam.ParDo(SplitUsers())
    #| "print user" >> beam.Map(myprint)
    )
    
    orders = (
    pipeline 
    | 'Read orders' >> beam.io.ReadFromText("/Users/jeandre/Desktop/Big Data/orders_v.csv",skip_header_lines=1)
    | 'Separate order' >> beam.ParDo(SplitOrders())
    #| "print order" >> beam.Map(myprint)
    )
    
    combined = (({
      'users': users, 'orders': orders
  })
            | 'Merge orders and users' >> beam.CoGroupByKey()
            #| beam.Map(print)
    )
    
    separated = (
    combined
    | 'Separate elements' >> beam.ParDo(Separate())
    #| beam.Map(print)
    )
    
    average = (
    separated
    | 'Foramt averages' >> beam.ParDo(AvgOrder())
    | 'Calc means' >> beam.combiners.Mean.PerKey()
    | beam.Map(print)
    )
    
    age_group = (
    separated
    | 'Foramt ages' >> beam.ParDo(AgeGroup())
    | 'Calc totals' >> beam.CombinePerKey(sum)
    |'print ages' >> beam.Map(print)
    )
    
    spinach_group = (
    separated
    | 'Foramt spinach count' >> beam.ParDo(SpinachPurchase())
    | 'Calc total spinach' >> beam.CombinePerKey(sum)
    | 'print spinach' >> beam.Map(print)
    )
    
    
    
    ib.show_graph(pipeline)




/usr/local/bin/dot




('other', 13901)
('26-36', 5949)
('46-56', 5082)
('36-46', 5624)
('16-26', 4038)
('female', 342.1982608695652)
('male', 341.89312344656173)
('other', 323193)
('26-36', 137232)
('46-56', 117701)
('36-46', 132048)
('16-26', 96019)


Order of results:  
Spinach age groups  
Age group avg orders  
Orders age group
