# DATASCI W261: Machine Learning at Scale
## Assignment Week 3
Jackson Lane (jelane@berkeley.edu) <br>
W261-3 <br>

## HW3.0.  
##### How do you merge  two sorted  lists/arrays of records of the form [key, value]? Where is this  used in Hadoop MapReduce? [Hint within the shuffle]

If you're starting out with two sorted lists, it's pretty easy to merge them.  First initalize a new empty list to hold all the elements from your two input lists.  Then out of all the elements in the two input lists, remove the element with the smallest key and put it at the begining of the new list.  Then out of the remaining elements from the two input lists, take the element with the smallest and put it in the second position in the new list.  Repeat this step until you have no more elements left in either input list.  You should now have a sorted list.

Because the two input lists are already sorted, you only need to make one comparison per step to find the smallest key.  As such, the algorithm can run in O(n+m) time, where n and m are the size of the first and second input lists respectively.  

The interactive HTML below does a better job of explaining the process than I do.

In [None]:
%%HTML
<div style="width:700px; height:400px; overflow:hidden;  position:relative; margin: 0px auto; background-color: #FFF">
<iframe src="http://cs.armstrong.edu/liang/animation/web/MergeList.html" scrolling="no" style="overflow:hidden; border:0px; position:absolute; top:-100px; left:0px; width: 700px; height:450px;"></iframe>
</div>

In Hadoop, this type of sort is applied after the combining phase in the Hadoop shuffle.  If there is more than 1 mapper, then Hadoop will merge the outputs of the two mappers and/or combiners together on the reducer before executing the actual reducer code.  Hadoop can do this, because it assumes that both of these outputs were already sorted during the shuffle.
This type of sorting also happens during the Hadoop shuffle multiple times in a process called "merge-sort" that takes two <b>unsorted</b> lists and returns a sorted list.  The merge-sort process happens during the shuffle phasse, after partitioning but before combining.  It works by breaking down the lists into single elements and then merging these single elements back into larger sorted lists.

##### What is  a combiner function in the context of Hadoop? Give an example where it can be used and justify why it should be used in the context of this problem.


A combiner function is like a reducer run on the mapper side.  It happens during the shuffle phase and runs on elements with the same key.  It's used to reduce the volumne of data sent from the mappers to the reducers over the network.  A good example is in the generic word count example.  Basically, instead of sending over 50 instances of 1 count of the same word for the reducer to count, you can use a combiner to aggregate those 50 instances into just 1 instance with a count of 50.  In java the combiner may be applied once per unique key emitted by the mappers.  However, in Hadoop Streaming, it appears that combiner can sometimes run on multiple keys at a time.  In both cases however, Hadoop does not guarantee that it will run the combiner or the amount of times it will run the combiner.  It could run 0, 1, or many times.

##### What is the Hadoop shuffle?

The Hadoop Shuffle is the process Hadoop uses to transfer the outputs from the mappers to reducers.  The shuffle consists of three parts:  Partitioning, sorting, and combining. The first two phases always happen in that order in each shuffle phase, but the third phase may or may not happen, hence why Hadoop does not guarantee it will run combiners.  Partitioning divides up the mapper outputs among the reducer tasks.  Typically this is done using a hash, but you can specify other types of partitioners as well in the job configuration.  Once partitioning has completed, the sorting phase sorts each partition using merge-sort.  Again, this behavior is configurable in the job configuration.  Then the combiner phase may execute a combiner funciton on the sorted partitions.  Note that the combiner cannot change which reducer a record gets sent to, as that is determined in the partitioning pase.  But a combiner change the order in which the reducer recieves the records by printing the records in a different order than received from the sort phase.  

## HW3.1 

Use Counters to do EDA (exploratory data analysis and to monitor progress)
Counters are lightweight objects in Hadoop that allow you to keep track of system progress in both the map and reduce stages of processing. By default, Hadoop defines a number of standard counters in "groups"; these show up in the jobtracker webapp, giving you information such as "Map input records", "Map output records", etc. 

While processing information/data using MapReduce job, it is a challenge to monitor the progress of parallel threads running across nodes of distributed clusters. Moreover, it is also complicated to distinguish between the data that has been processed and the data which is yet to be processed. The MapReduce Framework offers a provision of user-defined Counters, which can be effectively utilized to monitor the progress of data across nodes of distributed clusters.

Use the Consumer Complaints  Dataset provide here to complete this question:

     https://www.dropbox.com/s/vbalm3yva2rr86m/Consumer_Complaints.csv?dl=0

The consumer complaints dataset consists of diverse consumer complaints, which have been reported across the United States regarding various types of loans. The dataset consists of records of the form:

Complaint ID,Product,Sub-product,Issue,Sub-issue,State,ZIP code,Submitted via,Date received,Date sent to company,Company,Company response,Timely response?,Consumer disputed?

User-defined Counters

Now, let’s use Hadoop Counters to identify the number of complaints pertaining to debt collection, mortgage and other categories (all other categories get lumped into this one) in the consumer complaints dataset. Basically produce the distribution of the Product column in this dataset using counters (limited to 3 counters here).

Hadoop offers Job Tracker, an UI tool to determine the status and statistics of all jobs. Using the job tracker UI, developers can view the Counters that have been created. Screenshot your  job tracker UI as your job completes and include it here. Make sure that your user defined counters are visible. 

In [None]:
%%writefile mapper.py
#!/usr/bin/python
# mapper.py
# Author:Jackson Lane
# Description: mapper code for HW3.1

import sys

for line in sys.stdin:
    fields = line.split(",")
    product = fields[1]
    #Group all other product fields into just "other"
    if product != "Debt collection" and product != "Mortgage": product = "Other"
    #Increment counter by 1
    sys.stderr.write("reporter:counter:3.1,"+product+",1\n")
    print product,",", 1

In [None]:
%%writefile reducer.py
#!/usr/bin/python 
# reducer.py
# Author: Jackson Lane
# Description: reducer code for HW3.1

from __future__ import print_function
from operator import itemgetter
import sys

word = ""
count = 0

for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    newword, newcount = line.split(",")
    newcount = 1
    if (newword == word): count += newcount
    else:
        # We have finished with all instances of the current word.  
        # Print total count and move on to next word
        if (count > 0): print (word, count ,sep=',')
        word = newword
        count = newcount
if (count > 0): print (word, count , sep=',')


Run the mapreduce job

In [None]:
!hdfs dfs -rm -r results/3.1
!hdfs dfs -put Consumer_Complaints.csv 
!hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2*.jar \
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapreduce.partition.keycomparator.options=-n \
-D mapreduce.output.key.field.separator="," \
-file mapper.py \
-file reducer.py \
-mapper "mapper.py" \
-reducer "reducer.py" \
-input Consumer_Complaints.csv \
-output results/3.1

In [None]:
!hdfs dfs -cat results/3.1/part-00000

In [None]:
from IPython.display import Image
from IPython.core.display import HTML 
Image(url= "https://dl.dropboxusercontent.com/u/43045211/stats/HW3/HW3.1.png.PNG")

## HW 3.2 
Analyze the performance of your Mappers, Combiners and Reducers using Counters

### HW 3.2.0.1
Perform a word count analysis of this single record dataset using a Mapper and Reducer based WordCount (i.e., no combiners are used here) using user defined Counters to count up how many time the mapper and reducer are called. What is the value of your user defined Mapper Counter, and Reducer Counter after completing this word count job. The answer should be 1 and 4 respectively. Please explain.

In [None]:
%%writefile mapper.py
#!/usr/bin/python
# mapper.py
# Author:Jackson Lane
# Description: mapper code for HW3.2.0.1

import sys
sys.stderr.write("reporter:counter:3.2,MapperCount,1\n")

for line in sys.stdin:
    line = line.split()
    for word in line:
        #Emit each word with a count of 1
        print word,",",1

In [None]:
%%writefile reducer.py
#!/usr/bin/python 
# reducer.py
# Author: Jackson Lane
# Description: reducer code for 3.2.0.1
# Same as reducer code for 3.1, but with a counter
from __future__ import print_function
from operator import itemgetter
import sys

word = ""
count = 0
sys.stderr.write("reporter:counter:3.2,ReducerCount,1\n")

for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    newword, newcount = line.split(",")
    newcount = int(newcount)
    if (newword == word): count += newcount
    else:
        # We have finished with all instances of the current word.  
        # Print total count and move on to next word
        if (count > 0): print (word, count, sep=',')
        word = newword
        count = newcount
if (count > 0): print (word,  count, sep=',')


make the input file

In [None]:
%%writefile ffqlfbq.txt
foo foo quux labs foo bar quux

Run the mapreduce job

In [None]:
!hdfs dfs -rm -r results/3.2
!hdfs dfs -put -p -f ffqlfbq.txt
!hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2*.jar \
-D mapred.map.tasks=1 \
-D mapred.reduce.tasks=4 \
-file mapper.py \
-file reducer.py \
-mapper "mapper.py" \
-reducer "reducer.py" \
-input ffqlfbq.txt \
-output results/3.2

In [None]:
!hdfs dfs -cat results/3.2/part-00000

The mapper and reducer counters are equal to the number of mapper and reducer jobs respectively.  The default seems to be 2 mapper jobs and 1 reducer job.  However, I can explicitly set the mapper jobs to 1 and the reducer jobs to 4 in the run command.  Perhaps in java though, the reducer tasks will run on each unique key by default, but this behavior does not appear to be the case in Hadoop Streaming.   

### HW 3.2.0.2
Please use mulitple mappers and reducers for these jobs (at least 2 mappers and 2 reducers).
Perform a word count analysis of the Issue column of the Consumer Complaints  Dataset using a Mapper and Reducer based WordCount (i.e., no combiners used anywhere)  using user defined Counters to count up how many time the mapper and reducer are called. What is the value of your user defined Mapper Counter, and Reducer Counter after completing your word count job. 

In [20]:
%%writefile mapper.py
#!/usr/bin/python
# mapper.py
# Author:Jackson Lane
# Description: mapper code for 3.2.0.2 and 3.2.0.3
from __future__ import print_function

import sys,re
sys.stderr.write("reporter:counter:3.2,MapperCount,1\n")

for line in sys.stdin:
    fields = line.split(",")
    #Get the issue field
    issue = fields[3]
    #split the issue field into individual words
    words = re.findall("[\w']+",issue)
    for word in words:
        word = word.lower()
        # Even though we have two reducers, we still want to print out 
        # an alphabetically sorted list of word counts.  So we make sure
        # to send every word that begins with a-l to the first partition
        # and everything else to the second partition
        partitionkey = int(word > "m")
        #Emit each word with a count of 1
        print (partitionkey,word,1,sep=",")


Overwriting mapper.py


In [21]:
%%writefile reducer.py
#!/usr/bin/python 
# reducer.py
# Author: Jackson Lane
# Description: reducer code for 3.2.0.2, 3.2.0.3
# Same as reducer code for 3.2.0.1, but adjusting for the extra partitionkey field
from __future__ import print_function
from operator import itemgetter
import sys

word = ""
count = 0
sys.stderr.write("reporter:counter:3.2,ReducerCount,1\n")

for line in sys.stdin:
    sys.stderr.write(line)

    # remove leading and trailing whitespace
    line = line.strip()
    _,newword, newcount = line.split(",")
    newcount = int(newcount)
    if (newword == word): count += newcount
    else:
        # We have finished with all instances of the current word.  
        # Print total count and move on to next word
        if (count > 0): print (word,count,sep=',')
        word = newword
        count = newcount
if (count > 0): print (word,count,sep=',')


Overwriting reducer.py


Run the map-reduce job

In [22]:
!hdfs dfs -rm -r results/3.2
!hdfs dfs -put -p -f Consumer_Complaints.csv
!hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2*.jar \
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapreduce.partition.keypartitioner.options="-k1,1n" \
-D mapreduce.partition.keycomparator.options="-k2,2" \
-D mapreduce.output.key.field.separator=, \
-D stream.map.output.field.separator=, \
-D stream.reduce.output.field.separator=, \
-D stream.map.input.field.separator=, \
-D stream.reduce.input.field.separator=, \
-D map.output.key.field.separator=, \
-D stream.num.map.output.key.fields=2 \
-D mapred.map.tasks=2 \
-D mapred.reduce.tasks=2 \
-file mapper.py \
-file reducer.py \
-mapper "mapper.py" \
-reducer "reducer.py" \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-input Consumer_Complaints.csv \
-output results/3.2

16/06/05 11:02:36 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/06/05 11:02:37 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted results/3.2
16/06/05 11:02:38 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/06/05 11:02:41 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/06/05 11:02:41 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [mapper.py, reducer.py, /tmp/hadoop-unjar8007677370073082870/] [] /tmp/streamjob7713098128148077896.jar tmpDir=null
16/06/05 11:02:42 INFO client.RMProxy: Connecting to ResourceManager at /50.23.93.133:8032
16/06/05 11:02:42 INFO client.RMProxy: Connecting to ResourceMa

From the output, you can see that the mapper and reducer counts are both 2.  

In [23]:
!hdfs dfs -cat results/3.2/part-0000*

16/06/05 11:03:14 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
a	3503
account	20681
acct	163
action	2505
advance	240
advertising	1193
amount	98
amt	71
an	2505
and	16448
application	8868
applied	139
apply	118
apr	3431
arbitration	168
are	3821
atm	2422
attempts	11848
available	274
balance	597
bank	202
bankruptcy	222
being	5663
billing	8158
by	5663
can't	1999
cancelling	2795
card	4405
cash	240
caused	5663
changes	350
charged	976
charges	131
checks	75
closing	2795
club	12545
collect	11848
collection	1907
communication	6920
company's	4858
cont'd	11848
contact	3053
convenience	75
costs	4350
credit	55251
credited	92
customer	2734
day	71
dealing	1944
debit	2422
debt	19309
decision	2774
decrease	1149
delay	243
delinquent	1061
deposits	10555
determination	1490
did	139
didn't	925
disclosure	5214
disclosures	64
dispute	904
disputes	6938
embezzlement	3276
expect	807
false	2508
fee	3198
fees	807
for	929
forbearance	

### HW 3.2.0.3
Perform a word count analysis of the Issue column of the Consumer Complaints  Dataset using a Mapper, Reducer, and standalone combiner (i.e., not an in-memory combiner) based WordCount using user defined Counters to count up how many time the mapper, combiner, reducer are called. What is the value of your user defined Mapper Counter, and Reducer Counter after completing your word count job?

In [13]:
# Use same mapper as in previous problem

Overwriting mapper.py


In [24]:
%%writefile combiner.py
#!/usr/bin/python 
# combiner.py
# Author: Jackson Lane
# Description: combiner code for 3.2.0.3
# Similar to reducer code from 3.2.0.2, except that the combiner 
# also prints the partition key and has a different counter
from __future__ import print_function
import sys

word = ""
count = 0
sys.stderr.write("reporter:counter:3.2,CombinerCount,1\n")

for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    partitionkey,newword, newcount = line.split(",")
    newcount = int(newcount)
    if (newword == word): count += newcount
    else:
        # We have finished with all instances of the current word.  
        # Print total count and move on to next word
        if (count > 0): print (partitionkey,word, count, sep=',')
        word = newword
        count = newcount
if (count > 0): print (partitionkey,word,  count, sep=',')


Overwriting combiner.py


In [25]:
# Use same reducer as in previous problem

Run map reduce job

In [26]:
!hdfs dfs -rm -r results/3.2
!hdfs dfs -put -p -f Consumer_Complaints.csv
!hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2*.jar \
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapreduce.partition.keypartitioner.options="-k1,1n" \
-D mapreduce.partition.keycomparator.options="-k2,2" \
-D mapreduce.output.key.field.separator=, \
-D mapreduce.map.output.key.field.separator=, \
-D stream.map.output.field.separator=, \
-D stream.reduce.output.field.separator=, \
-D stream.map.input.field.separator=, \
-D stream.reduce.input.field.separator=, \
-D stream.num.map.output.key.fields=2 \
-file mapper.py \
-file reducer.py \
-file combiner.py \
-mapper "mapper.py" \
-combiner "combiner.py" \
-reducer "reducer.py" \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-input Consumer_Complaints.csv \
-output results/3.2

16/06/05 11:04:29 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/06/05 11:04:29 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted results/3.2
16/06/05 11:04:31 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/06/05 11:04:33 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/06/05 11:04:33 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [mapper.py, reducer.py, combiner.py, /tmp/hadoop-unjar5076983616027755871/] [] /tmp/streamjob6710361804620623018.jar tmpDir=null
16/06/05 11:04:34 INFO client.RMProxy: Connecting to ResourceManager at /50.23.93.133:8032
16/06/05 11:04:34 INFO client.RMProxy: Connecting 

In [28]:
!hdfs dfs -cat results/3.2/part-0000*

16/06/05 11:05:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
a	3503
account	20681
acct	163
action	2505
advance	240
advertising	1193
amount	98
an	229
amt	71
and	10227
an	2276
application	5962
and	6221
apr	2557
application	2906
arbitration	92
applied	139
are	2510
apply	118
atm	1366
apr	874
attempts	1372
arbitration	76
available	64
are	1311
balance	400
atm	1056
bankruptcy	143
attempts	10476
being	3743
available	210
billing	5289
balance	197
by	3743
bank	202
cancelling	1822
bankruptcy	79
card	2673
being	1920
cash	167
billing	2869
caused	3743
by	1920
changes	235
can't	1999
charged	17
cancelling	973
checks	49
card	1732
closing	1822
cash	73
club	1783
caused	1920
collect	1372
changes	115
collection	1907
charged	959
communication	795
charges	131
company's	1913
checks	26
cont'd	1372
closing	973
contact	282
club	10762
convenience	49
collect	10476
costs	2811
communication	6125
credit	21686
company's	2945
customer

<i>The mapper and reducer counts were 2 and 1 respectively, which are the defaults for Hadoop Streaming.  The combiner counter was 2.  This is somewhat strange as there were certainly more than 2 different unique keys emitted by the mappers.  However, Hadoop does not guarantee that the combiner will run a certain number of times. </i>

### HW 3.2.0.4
Using a single reducer: What are the top 50 most frequent terms in your word count analysis? Present the top 50 terms and their frequency and their relative frequency. Present the top 50 terms and their frequency and their relative frequency. If there are ties please sort the tokens in alphanumeric/string order. Present bottom 10 tokens (least frequent items). 

In [29]:
%%writefile mapper.py
#!/usr/bin/python
# mapper.py
# Author:Jackson Lane
# Description: mapper code for HW3.2.0.4
from __future__ import print_function

import sys,re
sys.stderr.write("reporter:counter:3.2,MapperCount,1\n")
for line in sys.stdin:
    fields = line.split(",")
    issue = fields[3]
    words = re.findall("[\w']+",issue)
    for word in words:
        word = word.lower()
        # I'm emitting the number first here so that I can use order inversion to emit the total
        print (word,1,sep=",")

Overwriting mapper.py


In [30]:
%%writefile reducer.py
#!/usr/bin/python
# reducer.py
# Author:Jackson Lane
# Description: reducer code for HW3.2.0.4
# Since we are using a single reducer, it has to sum up the word counts and get the top 50 and bottom 10.


import sys,Queue
from collections import deque
words = {}
sys.stderr.write("reporter:counter:3.2,ReducerCount,1\n")
total = float(0)
min_elements= []
max_elements = deque([])
for line in sys.stdin:
        #Parse in the word and count from the mapper
        word,value = line.split(",")
        word = word
        value = int(value)
        #Update the master word count dictionary
        words[word] = words.setdefault(word, 0) + value
        #Update the total number of words as well
        total += value
#Convert the words dictionary into an array of tuples
words =[(k, v) for k, v in words.iteritems()]
#Sort the array of tuples by count and then alphabetically by word.
words   = sorted(words,key=lambda x: x[0])
words   = sorted(words,key=lambda x: x[1])

#Get the top 50 and bottom elements from the list of tuples
for (word,value) in words:
        value = float(value)
        if len(min_elements) < 10:
            min_elements.append((word,value,value/total))
        if(len(max_elements) >= 50):
            max_elements.popleft()
        max_elements.append((word,value,value/total))

#Print out the max 50 and min 10 elements
print "50 most common tokens:"
for i,p in enumerate(max_elements):
        print str(50-i)+":",p
print "10 least common tokens:"
for i,p in enumerate(min_elements):
        print str(i+1)+":",p

Overwriting reducer.py


In [31]:
!hdfs dfs -rm -r results/3.2
!hdfs dfs -put Consumer_Complaints.csv 
!hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2*.jar \
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapreduce.partition.keycomparator.options=-n \
-D mapreduce.output.key.field.separator="," \
-D mapred.reduce.tasks=1 \
-file mapper.py \
-file reducer.py \
-mapper "mapper.py" \
-reducer "reducer.py" \
-input Consumer_Complaints.csv \
-output results/3.2

16/06/05 11:11:46 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/06/05 11:11:46 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted results/3.2
16/06/05 11:11:48 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
put: `Consumer_Complaints.csv': File exists
16/06/05 11:11:49 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/06/05 11:11:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [mapper.py, reducer.py, /tmp/hadoop-unjar3310236814296116326/] [] /tmp/streamjob6610809708114615817.jar tmpDir=null
16/06/05 11:11:50 INFO client.RMProxy: Connecting to ResourceManager at /50.23.93.133:8032
16/06/05 11:11:50 I

In [32]:
!hdfs dfs -cat results/3.2/part-00000

16/06/05 11:12:18 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
50 most common tokens:	
50: ('score', 4357.0, 0.004443732776328377)	
49: ('card', 4405.0, 0.004492688290045101)	
48: ('identity', 4729.0, 0.0048231380076329804)	
47: ("company's", 4858.0, 0.0049547059507466734)	
46: ('investigation', 4858.0, 0.0049547059507466734)	
45: ('managing', 5006.0, 0.005105652118039903)	
44: ('disclosure', 5214.0, 0.0053177926774790355)	
43: ('verification', 5214.0, 0.0053177926774790355)	
42: ('process', 5505.0, 0.005614585479386669)	
41: ('being', 5663.0, 0.0057757307120375485)	
40: ('by', 5663.0, 0.0057757307120375485)	
39: ('caused', 5663.0, 0.0057757307120375485)	
38: ('funds', 5663.0, 0.0057757307120375485)	
37: ('low', 5663.0, 0.0057757307120375485)	
36: ('the', 6248.0, 0.0063723760354601105)	
35: ('lease', 6337.0, 0.006463147717143201)	
34: ('reporting', 6559.0, 0.006689566968083045)	
33: ('communication', 6

### HW 3.2.1  
Using 2 reducers: What are the top 50 most frequent terms in your word count analysis? Present the top 50 terms and their frequency and their relative frequency. Present the top 50 terms and their frequency and their relative frequency. If there are ties please sort the tokens in alphanumeric/string order. Present bottom 10 tokens (least frequent items). Please use a combiner.

<i> For this problem, my understanding from the discussion board was to use a single mapreduce job with a single reducer.py with two reducer tasks.  I understand that other students interpreted the problem as meaning they could use two different reducer.py files across two different map reduce jobs.  I also understand that other students interpreted the problem as meaning they could feed in the wordcounts from the previous parts of 3.2 into this problem.  Both of these methods might have been easier, but I wanted to see if I could perform the word count and get the top 50 and bottom 10 elements with just a single map reduce job starting from the Consumer_Complaints.csv.  

The challenging part was getting the top 50 and bottom 10 elements into different partitions while still keeping in the spirit of MapReduce.  There's not enough information at the mapper stage to determine whether a word is in the top 50 or bottom 10.  One could put some reducer functionality in the mapper to first aggregate the counts of each word and then determine the partition key, but that goes against the intended purpose of a mapper.

To work around this, I had my mappers emit each word twice (once for each partition).  Normally, this would defeat the purpose of having partitions as each reducer would need to process the full dataset.  However, I also wrote my combiners to filter the mapper outputs after aggregation so that only the high count words make it to the top 50 reducer task and only the low count words make it to the bottom 10 reducer task.  This type of solution also fits with the combiners expressed purpose of reducing the volume of data sent to the reducers.

While my solution gets the top 50 and bottom 10 words, the accurarcy drops beyond those ranks (for example, it's not accurate on the top 100 and bottom 50 words).  Since a combiner may be applied on just a subset of the keys emitted by the mapper, the combiner cannot know for sure whether a word appears with a high or low frequency relative to the other words in the corpus.  In fact, the combiner cannot even know for sure the total number of words in the corpus as it might not be processing all the words.  The reducer does not have this problem because it has barrier synchronization. </i>


In [33]:
%%writefile mapper.py
#!/usr/bin/python
# mapper.py
# Author:Jackson Lane
# Description: mapper code for HW3.2.1
from __future__ import print_function
from sets import Set
import sys,re,random
sys.stderr.write("reporter:counter:3.2,MapperCount,1\n")
total = 0

uniquewords = Set([])
for line in sys.stdin:
    fields = line.split(",")
    issue = fields[3]
    words = re.findall("[\w']+",issue)
    for word in words:
        word = word.lower()
        uniquewords.add(word)
        total+= 1
        # I'm emitting the number first here so that I can use order inversion to emit the total
        print(0,1,word,sep="\t")
        print(1,1,word,sep="\t")


#Since we need to compute relative frequencies in the reducer, we need the total number of issues
#But since reading counters from within a job is apparently bad practice, I'm supposed to print
# a special key value pair with the fields that I need.
# emit total twice for each partition
print(0,0,total,sep="\t")
print(0,-1,uniquewords,sep="\t")
print(1,0,total,sep="\t")
print(1,-1,uniquewords,sep="\t")


Overwriting mapper.py


In [43]:
%%writefile combiner.py
#!/usr/bin/python 
# reducer.py
# Author: Jackson Lane
# Description: combiner code for 3.2.1
# Acts like router, only sending the small word counts to the first reducer and the large word counts to the second red
# This type of "filter combiner" currently only works with 2 mappers
# Alternatively I could just remove the filter condition and make it so that this combiner passes through all the
# word counts.  That would be scalable to any number of mappers, but it would defeat the purpose of parallelism
# as each reducer would process the full word count work load.  
# If there are more mappers, then this combiner will end up sending the wrong lines to the wrong partitions

from __future__ import print_function
from sets import Set

from operator import itemgetter
import sys
#Output to combiner is sorted and paritioned already, so we can use reducer word count log
partition = 0
word = ""
count = 0
#Since each instance of a combiner only processes a subset of the data emitted by the mappers, 
# the distribution of the words observed in the combiner is not always representative of
# the distribution of words in the rest of the corpus.
# But since the we want the top 50 words but only the bottom 10 words, we can err on the side
# of the top 50 since it needs a higher level of accuracy.  The below magic number means that
# we send every word with a count that is in the top 80% to the top 50 reducer and the bottom
# 20% to the bottom 10 reducer
magicnumber = .4
sys.stderr.write("reporter:counter:3.2,CombinerCount,1\n")
total = float(0)
sys.stderr.write("combiner\n")
uniquewords = Set([])

for line in sys.stdin:
    sys.stderr.write(line)
    # remove leading and trailing whitespace
    line = line.strip()
    newpartition, newcount, newword = line.split("\t")
    newcount = int(newcount)
    newpartition = int(newpartition)
    # Update the summary statistic variable we passed along from the mapper
    # These values should appear first in the stdin because they have counts
    # of 0 and -1
    if newcount == 0: 
        total += float(newword)
        continue
    if newcount == -1: 
        uniquewords.update(eval(newword))
        continue
        
    #Update the word counts
    if (newword == word and partition == newpartition):
        count += newcount
    else:
        # We have finished with all instances of the current word.  
        # Now determine which partition the word should go to
        partitionkey =int(count> total*magicnumber / len(uniquewords))

        # If word is routed to correct partition, emit word and count.  Otherwise, just skip word
        if (count > 0 and partitionkey == partition): print(partition,count,word, sep='\t')
        word = newword
        count = newcount
        partition = newpartition

#Emit last word count if going to right partition
partitionkey =int(count > total*magicnumber  / len(uniquewords))
if (count > 0 and partitionkey == partition): 
    print(partition,count,word, sep='\t')

#pass on totals again
print(partitionkey,0,3total,sep="\t")
#We don't need to pass on the uniquewordcount


Overwriting combiner.py


In [39]:
%%writefile reducer.py
#!/usr/bin/python
# reducer.py
# Author:Jackson Lane
# Description: reducer code for HW3.2.1
# This reducer collects all the incoming words into a dictionary
# with their counts and relative frequencies.  Then the reducer
# sorts the dictionary by count and outputs either the
# top 50 or bottom 10 word counts

import sys
words = {}
sys.stderr.write("reporter:counter:3.2,ReducerCount,1\n")
total = float(0)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    p, count, word = line.split("\t")
    p = int(p)
    count = float(count)
    # Update the total summary statistic variable
    # Since combiner output is not sorted this one may
    # no longer be at the top of the stdin.  So
    # that's why we can only compute relative frequency
    # after all the stdin has been processed.
    if count == 0: 
        total += float(word)
        continue

    #Update the word count dictionary. 
    words[word] = words.setdefault(word, 0) + count

#Get which partition this reducer represents. 
# If 0, then it's the bottom 10 reducer.
# If 1, then it's the top 50 reducer.
partition = p

#Turn dictionary into sorted array of tuples with counts and 
# relative frequencies
words =[(k, v,float(v)/total) for k, v in words.iteritems()]
words   = sorted(words,key=lambda x: x[0])
words   = sorted(words,key=lambda x: x[1])

#Print either top 50 or bottom 10
if partition == 1:
    print "50 most common tokens:"
    for i,p in enumerate(words[-50:]):
        print str(50-i)+":",p
    
else:
    print "10 least common tokens:"
    for i,p in enumerate(words[:10]):
        print str(i+1)+":",p
    


Overwriting reducer.py


In [40]:
!hdfs dfs -rm -r results/3.2
!hdfs dfs -put Consumer_Complaints.csv 
!hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2*.jar \
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapreduce.partition.keycomparator.options="-k2,2n -k3,3" \
-D mapreduce.partition.keypartitioner.options="-k1,1n" \
-D stream.num.map.output.key.fields=3 \
-D mapreduce.job.maps=2 \
-D mapreduce.job.reduces=2 \
-file mapper.py \
-file combiner.py \
-file reducer.py \
-mapper "mapper.py" \
-combiner "combiner.py" \
-reducer "reducer.py" \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-input Consumer_Complaints.csv \
-output results/3.2


16/06/05 11:15:56 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/06/05 11:15:57 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted results/3.2
16/06/05 11:15:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
put: `Consumer_Complaints.csv': File exists
16/06/05 11:16:00 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/06/05 11:16:00 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [mapper.py, combiner.py, reducer.py, /tmp/hadoop-unjar1441934325260427106/] [] /tmp/streamjob1667956411416720403.jar tmpDir=null
16/06/05 11:16:01 INFO client.RMProxy: Connecting to ResourceManager at /50.23.93.133:8032
16/06/

In [42]:
!hdfs dfs -cat results/3.2/part-0000*

16/06/05 11:17:05 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
10 least common tokens:	
1: ('disclosures', 64.0, 6.5274018288964e-05)	
2: ('missing', 64.0, 6.5274018288964e-05)	
3: ('amt', 71.0, 7.241336403931944e-05)	
4: ('day', 71.0, 7.241336403931944e-05)	
5: ('checks', 75.0, 7.649299018237969e-05)	
6: ('convenience', 75.0, 7.649299018237969e-05)	
7: ('credited', 92.0, 9.383140129038574e-05)	
8: ('payment', 92.0, 9.383140129038574e-05)	
9: ('amount', 98.0, 9.995084050497613e-05)	
10: ('apply', 118.0, 0.00012034897122027737)	
50 most common tokens:	
50: ('score', 4357.0, 0.004443732776328377)	
49: ('card', 4405.0, 0.004492688290045101)	
48: ('disclosure', 4517.0, 0.004606917822050787)	
47: ('verification', 4517.0, 0.004606917822050787)	
46: ('identity', 4729.0, 0.0048231380076329804)	
45: ("company's", 4858.0, 0.0049547059507466734)	
44: ('investigation', 4858.0, 0.0049547059507466734)	
43: ('managin

## HW3.3. Shopping Cart Analysis
	
For this homework use the online browsing behavior dataset located at: 

       https://www.dropbox.com/s/zlfyiwa70poqg74/ProductPurchaseData.txt?dl=0

Do some exploratory data analysis of this dataset. 

How many unique items are available from this supplier?

Using a single reducer: Report your findings such as number of unique products; largest basket; report the top 50 most frequently purchased items,  their frequency,  and their relative frequency (break ties by sorting the products alphabetical order) etc. using Hadoop Map-Reduce. 

In [44]:
%%writefile mapper.py
#!/usr/bin/python
# mapper.py
# Author:Jackson Lane
# Description: mapper code for HW3.3
from __future__ import print_function
from sets import Set
import sys,re
sys.stderr.write("reporter:counter:3.3,MapperCount,1\n")
total = 0
#Maintain a set of unique products to pass to reducer
uniqueproducts = Set([])
for line in sys.stdin:
    basket = re.findall("[\w']+",line)
    for product in basket:
        uniqueproducts.add(product)
        total+= 1
        #Emit three fields for the reducer to use:
        # Product, count, and basket size
        # Basket size will be emitted once for each
        # product in the basket.  This is inefficient
        # from a network bandwidth perspective, but
        # relatively harmless as it's just an 
        # extra integer.
        # If we really wanted to save bandwidth, we 
        # could just emit the product by itself and
        # nothing else.  The reducer could assume that
        # the count will be 1.
        print (product,1,len(basket),sep="\t")


#Emit the total number of products and the set of unique products to reducer
print('Totals',total,uniqueproducts,sep="\t")



Overwriting mapper.py


In [45]:
%%writefile reducer.py
#!/usr/bin/python
# reducer.py
# Author:Jackson Lane
# Description: reducer code for HW3.3
# This reducer gets the products from the mapper and computes
# The number of unique products, the largest basket, and the
# top 50 products.
# Since we are using a single reducer, we have more flexibility
# with what we are allowed to do.

import sys
from sets import Set

sys.stderr.write("reporter:counter:3.3,ReducerCount,1\n")
total = float(0)
uniqueproducts = Set([])

# These fields are similar to the fields from the word count
# code, but renamed for products instead of words
product = 0
maxbasketsize = 0
count =0 
products = {}

for line in sys.stdin:
    line = line.strip()
    
    #Get the product, count, and basket size 
    newproduct,newcount,basketsize=line.split("\t")
    newcount = int(newcount)
    
    # Get the total products and the set of unique products.
    # These fields do not have to come first as we compute
    # the relative frequencies after all stdin has been
    # processed
    if newproduct=="Totals": 
        total+=newcount
        uniqueproducts.update(eval(basketsize))
        continue
    basketsize=int(basketsize)

    #Update maximum basket size
    if basketsize>maxbasketsize: 
        maxbasketsize=basketsize
        
    if product==newproduct:
        count+=newcount
    else:
        # We are finished with the current product
        # Update the products dictionary with the product count
        products[product] = products.setdefault(product, 0) + count
        product=newproduct
        count=newcount
        
# Change products dictionary in an array of tuples with counts
# and relative frequencies
products =[(k, v,float(v)/total) for k, v in products.iteritems()]

# Sort the dictionary by count and then by product name
products   = sorted(products,key=lambda x: x[0])
products   = sorted(products,key=lambda x: x[1])

# Output the number of unique products, largest basket size, and top 50
# products.
print "Number of Unique Products:",len(uniqueproducts)
print "Largest Basket Size:", maxbasketsize
print "Top 50 Products:"
for i,p in enumerate(products[-50:]):
    print str(50-i)+":",p

Overwriting reducer.py


Run the mapreduce job

In [46]:
!hdfs dfs -rm -r results/3.3

!hdfs dfs -put -p -f ProductPurchaseData.txt
!hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2*.jar \
-file mapper.py \
-file reducer.py \
-mapper "mapper.py" \
-reducer "reducer.py" \
-input ProductPurchaseData.txt \
-output results/3.3

16/06/05 11:56:25 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
rm: `results/3.3': No such file or directory
16/06/05 11:56:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/06/05 11:56:29 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/06/05 11:56:29 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [mapper.py, reducer.py, /tmp/hadoop-unjar1357311333018480171/] [] /tmp/streamjob72353093689334652.jar tmpDir=null
16/06/05 11:56:29 INFO client.RMProxy: Connecting to ResourceManager at /50.23.93.133:8032
16/06/05 11:56:30 INFO client.RMProxy: Connecting to ResourceManager at /50.23.93.133:8032
16/06/05 11:56:31 INFO mapred.FileInputFormat: Total input paths to process : 1
16/06/

In [47]:
!hdfs dfs -cat results/3.3/part-00000

16/06/05 11:56:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Number of Unique Products: 12592	
Largest Basket Size: 37	
Top 50 Products:	
50: ('GRO85051', 1214, 0.0031878242967880175)	
49: ('DAI22896', 1219, 0.0032009537214041134)	
48: ('GRO81087', 1220, 0.0032035796063273323)	
47: ('DAI31081', 1261, 0.0033112408881793166)	
46: ('GRO15017', 1275, 0.003348003277104384)	
45: ('ELE91337', 1289, 0.0033847656660294517)	
44: ('DAI43223', 1290, 0.003387391550952671)	
43: ('SNA96271', 1295, 0.0034005209755687666)	
42: ('ELE59935', 1311, 0.003442535134340273)	
41: ('DAI88807', 1316, 0.0034556645589563684)	
40: ('ELE74482', 1316, 0.0034556645589563684)	
39: ('GRO61133', 1321, 0.003468793983572464)	
38: ('ELE56788', 1345, 0.003531815221729723)	
37: ('GRO38814', 1352, 0.0035501964161922567)	
36: ('SNA90094', 1390, 0.0036499800432745837)	
35: ('SNA93860', 1407, 0.0036946200869693085)	
34: ('FRO53271', 1420, 0.003

## HW3.4.  Pairs

Suppose we want to recommend new products to the customer based on the products they
have already browsed on the online website. Write a map-reduce program 
to find products which are frequently browsed together. Fix the support count (cooccurence count) to s = 100 
(i.e. product pairs need to occur together at least 100 times to be considered frequent) 
and find pairs of items (sometimes referred to itemsets of size 2 in association rule mining) that have a support count of 100 or more.

List the top 50 product pairs with corresponding support count (aka frequency), and relative frequency or support (number of records where they coccur, the number of records where they coccur/the number of baskets in the dataset)  in decreasing order of support  for frequent (100>count) itemsets of size 2. 

Use the Pairs pattern (lecture 3)  to  extract these frequent itemsets of size 2. Free free to use combiners if they bring value. Instrument your code with counters for count the number of times your mapper, combiner and reducers are called.  

Please output records of the following form for the top 50 pairs (itemsets of size 2): 

      item1, item2, support count, support



Fix the ordering of the pairs lexicographically (left to right), 
and break ties in support (between pairs, if any exist) 
by taking the first ones in lexicographically increasing order. 

Report  the compute time for the Pairs job. Describe the computational setup used (E.g., single computer; dual core; linux, number of mappers, number of reducers)
Instrument your mapper, combiner, and reducer to count how many times each is called using Counters and report these counts.


In [123]:
%%writefile mapper.py
#!/usr/bin/python

#HW 3.4 - Mapper Function Code
from __future__ import print_function
from sets import Set

import sys,re
sys.stderr.write("reporter:counter:3.4,MapperCount,1\n")

baskets =0

for line in sys.stdin:
    line=line.strip()
    # Using a set to avoid double counting in instances where a customer 
    # bought more than 1 of a product
    basket = list(Set(re.findall("[\w']+",line)))
    baskets +=1
    for i,p1 in enumerate(basket):
        # Only iterate through products we haven't seen yet to avoid 
        # duplicates
        for p2 in basket[i+1:]:
            # Create pairs out of the two products.  For consitency,
            # put the alphbetically higher element first in the pair.
            # Then emit both products and the count
            if p2 > p1:
                print (p1,p2,1,sep='\t')
            else:
                print (p2,p1,1,sep='\t')

# Output total number of baskets with a leading space in the key for 
# order-inversion purposes
print (" Total","Baskets",baskets,sep='\t')

Overwriting mapper.py


In [124]:
%%writefile combiner.py
#!/usr/bin/python
# combiner.py
# Author:Jackson Lane
# Description: combiner code for HW3.4
from __future__ import print_function

import sys

sys.stderr.write("reporter:counter:3.4,Combiner,1\n")
product1 =''
product2 = ''
count =0 

for line in sys.stdin:
    line = line.strip()
    
    # Parse the two product fields and the count field from the mapper
    newproduct1,newproduct2,newcount=line.split("\t")
    newcount = int(newcount)
    
    # Pass along the total number of baskets to the reducer
    if newproduct1=="Total":
        print (" Total","Baskets",newcount,sep='\t')
        continue
    if product1 == newproduct1 and product2 == newproduct2:
        count+=newcount
    else:
        if(count > 0): print (product1,product2,count,sep='\t')
        product1=newproduct1
        product2=newproduct2
        count=newcount

if(count > 0): print (product1,product2,count,sep='\t')
    


Overwriting combiner.py


In [125]:
%%writefile reducer.py
#!/usr/bin/python
# reducer.py
# Author:Jackson Lane
# Description: reducer code for HW3.2.4
# Since we are using a single reducer, it has to sum up the word counts and get the top 50 and bottom 10.
from __future__ import print_function

import sys

sys.stderr.write("reporter:counter:3.4,ReducerCount,1\n")
total = float(0)
baskets = float(0)
pair =('','')
count =0 
pairs = {}

for line in sys.stdin:
    line = line.strip()
    # Parse the two product fields and the count field from the mapper
    newproduct1,newproduct2,newcount=line.split("\t")
    newcount = int(newcount)
    #Extract the total number of baskets
    #This field should come first in the stdin due to order inversion
    if newproduct1=="Total": 
        baskets+=newcount
        continue
    # Turn the two products into a tuple.  This is more for convience
    # than functionality, as it's easier to compare two tuples than 
    # four values.  Note that we will end up outputting the two products
    # separately
    newpair = (newproduct1,newproduct2)
    if (newpair == pair):
        count+=newcount
    else:
        #Only consider products that were purchased more than 100 times
        if (count > 100): 
            print (pair[0],pair[1],count,count / baskets, sep = ",")
        pair=newpair
        count=newcount
#Emit the last product and count (if its over 100 of courses)
if (count > 100):  print (pair[0],pair[1],count,count / baskets, sep = ",")


Overwriting reducer.py


Run the mapreduce job

In [126]:
!hdfs dfs -rm -r temp

!hdfs dfs -put -p -f ProductPurchaseData.txt
!hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2*.jar \
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapreduce.partition.keycomparator.options='-k1,2' \
-D stream.num.map.output.key.fields=2 \
-D stream.num.reduce.output.key.fields=2 \
-file mapper.py \
-file combiner.py \
-file reducer.py \
-mapper "mapper.py" \
-combiner "combiner.py" \
-reducer "reducer.py" \
-input ProductPurchaseData.txt \
-output temp

!hdfs dfs -rm -r results/3.4

# Second map reduce job to sort the output.
!hdfs dfs -put -p -f ProductPurchaseData.txt
!hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2*.jar \
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapreduce.partition.keycomparator.options='-k3,3nr -k1,2' \
-D mapreduce.output.key.field.separator="," \
-D stream.map.output.field.separator=, \
-D stream.reduce.output.field.separator=, \
-D stream.map.input.field.separator=, \
-D stream.reduce.input.field.separator=, \
-D map.output.key.field.separator=, \
-D stream.num.map.output.key.fields=4 \
-D stream.num.reduce.output.key.fields=4 \
-mapper cat \
-reducer cat \
-input temp/part-* \
-output results/3.4

16/06/05 13:19:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/06/05 13:19:39 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted temp
16/06/05 13:19:41 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/06/05 13:19:43 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/06/05 13:19:43 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [mapper.py, combiner.py, reducer.py, /tmp/hadoop-unjar2150924994250287621/] [] /tmp/streamjob7177716152771347565.jar tmpDir=null
16/06/05 13:19:44 INFO client.RMProxy: Connecting to ResourceManager at /50.23.93.133:8032
16/06/05 13:19:44 INFO client.RMProxy: Connecting to Reso

In [122]:
!hdfs dfs -cat results/3.4/part-00000 | head -50

16/06/05 13:19:16 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
DAI62779,ELE17451,1592,0.0511880646925		
FRO40251,SNA80324,1412,0.0454004694383		
DAI75645,FRO40251,1254,0.0403202469374		
FRO40251,GRO85051,1213,0.0390019613517		
DAI62779,GRO73461,1139,0.0366226166361		
DAI75645,SNA80324,1130,0.0363332368734		
DAI62779,FRO40251,1070,0.0344040384554		
DAI62779,SNA80324,923,0.0296775023311		
DAI62779,DAI85309,918,0.0295167357963		
ELE32164,GRO59710,911,0.0292916626475		
DAI62779,DAI75645,882,0.0283592167454		
FRO40251,GRO73461,882,0.0283592167454		
DAI62779,ELE92920,877,0.0281984502106		
FRO40251,FRO92469,835,0.026848011318		
DAI62779,ELE32164,832,0.0267515513971		
DAI75645,GRO73461,712,0.0228931545609		
DAI43223,ELE32164,711,0.022861001254		
DAI62779,GRO30386,709,0.02279669464		
ELE17451,FRO40251,697,0.0224108549564		
DAI85309,ELE99737,659,0.0211890292917		
DAI62779,ELE26917,650,0.020899649529		
GRO21487,G

I ran both jobs on a Softlayer Hadoop 2.7.2 cluster with 1 master and 2 slave VMs.  Each VM is running CentOS7.0-64 on 2 2.0 GHz cores with 6GB of RAM.  

In each map reduce jobs, there were two mapper and one reducer tasks.  The combiner was called twice.  

According to the output, the entire MapReduce job ran in 44 seconds.  The ,apper part ran in 36 seconds while the reducer part ran in 8 seconds.  The second map reduce job ran in 34 seconds.

## HW3.5: Stripes
Repeat 3.4 using the stripes design pattern for finding cooccuring pairs.

Report  the compute times for stripes job versus the Pairs job. Describe the computational setup used (E.g., single computer; dual core; linux, number of mappers, number of reducers)

Instrument your mapper, combiner, and reducer to count how many times each is called using Counters and report these counts. Discuss the differences in these counts between the Pairs and Stripes jobs


In [149]:
%%writefile mapper.py
#!/usr/bin/python

#HW 3.4 - Mapper Function Code
from __future__ import print_function
from sets import Set

import sys,re
sys.stderr.write("reporter:counter:3.5,MapperCount,1\n")

#Define data split for custom partitioner
baskets =0

for line in sys.stdin:
    line=line.strip()
    # Using a set to avoid double counting in instances where a customer bought more than 1 of a product
    basket = sorted(Set(re.findall("[\w']+",line)))
    baskets +=1
    for i,p1 in enumerate(basket[:-1]):
        stripe = dict([(x,1) for x in basket[i+1:]])
        print (p1,stripe,sep='\t')
        
#Output total number of carts with a special key for order-inversion purposes
print (" Total",baskets,sep='\t')

Overwriting mapper.py


In [150]:
%%writefile combiner.py
#!/usr/bin/python
# combiner.py
# Author:Jackson Lane
# Description: combiner code for HW3.5
from __future__ import print_function

import sys

sys.stderr.write("reporter:counter:3.5,Combiner,1\n")
product =''
stripe = {}
for line in sys.stdin:
    #Parse line into fields
    sys.stderr.write(line)

    line = line.strip()
    newproduct,newstripe=line.split("\t")
    if newproduct=="Total": #Extract total products for order inversion
        print (" Total",int(newstripe)  ,sep='\t')
        continue
    newstripe = eval(newstripe)
    
    if product == newproduct:
        stripe = { k: stripe.get(k, 0) + newstripe.get(k, 0) for k in set(stripe) | set(newstripe) }
    else:
        if len(stripe) > 0: print (product,stripe,sep='\t')
        product=newproduct
        stripe=newstripe

if len(stripe) > 0: print (product,stripe,sep='\t')
    


Overwriting combiner.py


In [151]:
%%writefile reducer.py
#!/usr/bin/python
# reducer.py
# Author:Jackson Lane
# Description: reducer code for HW3.5
# Reads in stripes pattern from mapper and breaks up into product pairs

from __future__ import print_function
import sys

sys.stderr.write("reporter:counter:3.5,ReducerCount,1\n")
baskets = float(0)
product =''
stripe = {}
for line in sys.stdin:
    #Parse line into fields
    sys.stderr.write(line)
    line = line.strip()
    newproduct,newstripe=line.split("\t")
    if newproduct=="Total": #Extract total products for order inversion
        baskets+=int(newstripe)        
        continue
    newstripe = eval(newstripe)
    
    if product == newproduct:
        stripe = { k: stripe.get(k, 0) + newstripe.get(k, 0) for k in set(stripe) | set(newstripe) }
    else:
        for (product2,count) in stripe.iteritems():
            count = int(count)
            if (count > 100): print(product,product2,count,count / baskets, sep = ",")          
        product=newproduct
        stripe=newstripe

for (product2,count) in stripe.iteritems():
            count = int(count)
            pair = (product,product2)
            if (count > 100): print(product,product2,count,count / baskets, sep= ",")          


Overwriting reducer.py


Run the mapreduce job

In [152]:
!hdfs dfs -rm -r temp

!hdfs dfs -put -p -f ProductPurchaseData.txt
!hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2*.jar \
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapreduce.partition.keycomparator.options='-k1,1' \
-D stream.num.map.output.key.fields=2 \
-D stream.num.reduce.output.key.fields=2 \
-file mapper.py \
-file combiner.py \
-file reducer.py \
-mapper "mapper.py" \
-combiner "combiner.py" \
-reducer "reducer.py" \
-input ProductPurchaseData.txt \
-output temp

!hdfs dfs -rm -r results/3.5

# Second map reduce job to sort the output.
!hdfs dfs -put -p -f ProductPurchaseData.txt
!hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2*.jar \
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapreduce.partition.keycomparator.options='-k3,3nr -k1,2' \
-D mapreduce.output.key.field.separator="," \
-D stream.map.output.field.separator=, \
-D stream.reduce.output.field.separator=, \
-D stream.map.input.field.separator=, \
-D stream.reduce.input.field.separator=, \
-D map.output.key.field.separator=, \
-D stream.num.map.output.key.fields=3 \
-D stream.num.reduce.output.key.fields=3 \
-mapper cat \
-reducer cat \
-input temp/part-* \
-output results/3.5

16/06/05 13:43:40 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/06/05 13:43:40 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted temp
16/06/05 13:43:41 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/06/05 13:43:43 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/06/05 13:43:44 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [mapper.py, combiner.py, reducer.py, /tmp/hadoop-unjar303191613663761740/] [] /tmp/streamjob4794543400269878294.jar tmpDir=null
16/06/05 13:43:44 INFO client.RMProxy: Connecting to ResourceManager at /50.23.93.133:8032
16/06/05 13:43:45 INFO client.RMProxy: Connecting to Resou

In [153]:
!hdfs dfs -cat results/3.4/part-00000 | head -50

16/06/05 13:45:50 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
DAI62779,ELE17451,1592,0.0511880646925		
FRO40251,SNA80324,1412,0.0454004694383		
DAI75645,FRO40251,1254,0.0403202469374		
FRO40251,GRO85051,1213,0.0390019613517		
DAI62779,GRO73461,1139,0.0366226166361		
DAI75645,SNA80324,1130,0.0363332368734		
DAI62779,FRO40251,1070,0.0344040384554		
DAI62779,SNA80324,923,0.0296775023311		
DAI62779,DAI85309,918,0.0295167357963		
ELE32164,GRO59710,911,0.0292916626475		
DAI62779,DAI75645,882,0.0283592167454		
FRO40251,GRO73461,882,0.0283592167454		
DAI62779,ELE92920,877,0.0281984502106		
FRO40251,FRO92469,835,0.026848011318		
DAI62779,ELE32164,832,0.0267515513971		
DAI75645,GRO73461,712,0.0228931545609		
DAI43223,ELE32164,711,0.022861001254		
DAI62779,GRO30386,709,0.02279669464		
ELE17451,FRO40251,697,0.0224108549564		
DAI85309,ELE99737,659,0.0211890292917		
DAI62779,ELE26917,650,0.020899649529		
GRO21487,G

I ran both jobs on a Softlayer Hadoop 2.7.2 cluster with 1 master and 2 slave VMs.  Each VM is running CentOS7.0-64 on 2 2.0 GHz cores with 6GB of RAM.  

In each map reduce jobs, there were two mapper and one reducer tasks, and the counters reflected this.  The combiner was called twice.  

According to the output, the entire MapReduce job ran in 101 seconds.  The Mapper part ran in 93 seconds while the reducer part ran in 8 seconds.  The second map reduce job ran in 26 seconds.

The stripes job definetely ran slower than pairs job.  I believe this is because the Mapper processed the stripe as a key, and that processing slowed the mapper down signficantly.