In [None]:
#==========================================================================
# Hello. In this notebook, I will be providing Map Reduce 
# examples in Python, which will work as Hadoop Streaming jobs.
# @author: Souradeep Sinha
#==========================================================================

In [None]:
#==========================================================================
# This example is from Tanmay Deshpande's list of sample Map 
# Reduce problems which are originally written in Java
# at http://hadooptutorials.co.in/tutorials/mapreduce/advanced-map-reduce-examples-1.html#
# 
# Problem Statement:
# XYZ.com is an online music website where users listen to 
# various tracks, the data gets collected like shown below. 
# Write a map reduce program to get following stats:
# 
# Number of unique listeners
# Number of times the track was shared with others
# Number of times the track was listened to on the radio
# Number of times the track was listened to in total
# Number of times the track was skipped on the radio
# The data is coming in log files and looks like as shown below.
# 
# UserId|TrackId|Shared|Radio|Skip
# 111115|222|0|1|0
# 111113|225|1|0|0
# 111117|223|0|1|1
# 111115|225|1|0|0
#==========================================================================

In [None]:
# Problem 1: Number of unique listeners

In [None]:
# Mapper Function. File saved as mapper1.py

from sys import stdin

for line in stdin:
    data = line.split("|")
    if len(data) == 5:
        uid, tid = data[:2]
        print "{0}\t{1}".format(tid, uid)
    else:
        continue

In [None]:
# Reduce Function. File saved as reducer1.py

from sys import stdin

oldKey = None
aggregator = list()
for line in stdin:
    data = line.split("\t")
    if not(len(data) == 2):
        continue
    thisKey, thisValue = data
    if oldKey and not(oldKey == thisKey):
        print "{0}\t{1}".format(oldKey, str(len(list(set(aggregator)))))
        aggregator = list()
        oldKey = thisKey
    oldKey = thisKey
    aggregator.append(thisValue)
print "{0}\t{1}".format(oldKey, str(len(list(set(aggregator)))))

In [None]:
# The mapper ad the reducer can be passed to the Hadoop 
# framework along with the data file, output folder (in  
# HDFS) and the mapper-reducer file combinations. Please
# ensure that the output folder does not already exist.

# For a sanity check, the mapper and reducer can be checked
# on the local machine terminal, using the first 1000 lines
# of the log file (log.dat) and piping the results into 
# mapper and reducer functions as follows:

In [None]:
# $> cat -1000 log.dat | python mapper1.py \
# $> | sort | python reducer1.py

In [None]:
# Problem 2: Number of times the track was shared with others

In [None]:
# Mapper Function. File saved as mapper2.py
# Almost the same as mapper1.py, except the emission data

from sys import stdin

for line in sys.stdin:
    data = line.split("|")
    if not (len(data) == 5):
        continue
    print "{}\t{}".format(data[1], data[2])

In [None]:
# Reducer Function. File saved as reducer2.py
# Similar structure, note the change in aggregation

from sys import stdin

oldKey = None
aggregator = 0

for line in stdin:
    if not (len(data) == 2):
        continue
    thisKey, thisValue = data
    if oldKey and not (oldKey == thisKey):
        print "{0}\t{1}".format(thisKey, aggregator)
        oldKey = thisKey
        aggregator = 0
    oldKey = thisKey
    aggregator += thisValue
if not (oldKey is None):
    print "{0}\t{1}".format(thisKey, aggregator)

In [None]:
# Problem 3: Number of times the track was listened to on the radio

In [None]:
# Mapper Function. File saved as mapper3.py
# For the sake of argument, we treat songs with skip = "1" as not listened.

from sys import stdin

for line in stdin:
    data = line.split("|")
    if not len(data == 5):
        continue
    tid, radio, skip = data[1], data[3], data[4]
    if skip == "1": # Change to "0" for problem 5
        continue
    else:
        print "{0}\t{1}".format(tid, radio)

In [None]:
# Reducer function reducer2.py can be used as the reducer for Problem 3

In [None]:
# Problem 4: Number of times the track was listened to in total

In [None]:
# Mapper Function. File saved as mapper4.py
# Logic similar to mapper3.py. Note the change in data emission

from sys import stdin

for line in stdin:
    data = line.split("|")
    if not len(data) == 5):
        continue
    tid, skip = data[1], data[4]
    if skip == "1":
        continue
    else:
        print "{0}\t1".format(tid)

In [None]:
# Reducer function reducer2.py can be used as the reducer for Problem 4

In [None]:
# Problem 5: Number of times the track was skipped on the radio
# Mapper to be used, mapper3.py with line 11
# Reducer to be used, reducer2.py