# *MapReduce* <br> Stromverbrauch am Tag pro Kunde

In [None]:
%%writefile mapper.py
#!/bin/python3
import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split(",")

    # key: constructing the customerID_date key by concatenating the
    # value of the last element (column) which is the ID and the 
    # fourth element which is the date
    # value: the consumption of a time interval in a day, which is 
    # seventh element
    customerID_date= words[-1] + "_" + words[3]
    consumptionPerDay = words[6]

    # printing the key and the value to STDOUT
    # this line will be sorted before passed to reducer
    print("{}\t{}".format(customerID_date, consumptionPerDay))

In [13]:
%%writefile reducer.py
#!/bin/python3
from operator import itemgetter
import sys

currentCustomer = None
currentConsumption = 0
consumption = None

# input comes from STDIN
for line in sys.stdin:

    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    customerID_date, consumptionPerDay = line.split("\t", 1)

    # convert consumptionPerDay (currently a string) to int
    try:
        consumptionPerDay = int(consumptionPerDay)
    except ValueError:
        # consumptionPerDay was not a number, so silently
        # ignore/discard this line
        continue

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: customerID_date) before it is passed to the reducer
    if currentCustomer == customerID_date:
        currentConsumption += consumptionPerDay
    else:
        if currentConsumption:
            # we write the result of the previous customer to STDOUT
            print("{}\t{}".format(currentCustomer, currentConsumption))
        # we set a new currentCustomer and a new currentConsumption
        currentConsumption = consumptionPerDay
        currentCustomer = customerID_date

#printing the very last customer also to STDOUT
if currentCustomer == customerID_date:
    print("{}\t{}".format(currentCustomer, currentConsumption))

Overwriting reducer.py


In [None]:
#run job
!mapred streaming -files mapper.py,reducer.py -mapper mapper.py -reducer reducer.py -input /data/consumer_data.csv -output output
!hdfs dfs -cat output/part-00000 | head

In [19]:
!hdfs dfs -cat output16/part-00000 | head

1018508_2016-10-19	12
1018508_2016-10-20	14
1018508_2016-10-21	14
1018508_2016-10-23	4
1018508_2016-10-25	16
1018508_2016-10-30	4
1018508_2016-11-03	11
1018508_2016-11-05	36
1018508_2016-11-06	4
1018508_2016-11-08	25
cat: Unable to write to output stream.
