# DATSCIW261 ASSIGNMENT #3

Jing Xu

jaling@gmail.com

W261-3

DATSCIW261 Assignment #2

1/23/16

**HW3.0. What is a merge sort? Where is it used in Hadoop? How is a combiner function in the context of Hadoop?  Give an example where it can be used and justify why it should be used in the context of this problem. What is the Hadoop shuffle?**

Merge sort is an algorithm used to sort n amount of sorted lists. It works by looking at the front of each list and always picking the smallest element to move to a temporary array, afterwards moving the pointer on the list from which the most recent smallest element over one position. If m = sum of the lengths of all the lists, then the algorithm generally runs in O(m log n) time, and can use a heap-based priority queue. Hadoop uses merge sort in the shuffle steps: partition, sort, and combine. On the map side, mapper outputs are spilled onto disk and a merge sort is run on the keys to create a single, partitioned file. Combiners are then run on the sorted partition file. Combiners in the context of Hadoop are functions that operate between the map and reduce stages that take the intermediate outputs from the mappers and aggregates the results based on keys. The output from the combiner is then copied over to the reducer machine where they are merge sorted again with the output from other combiners assigned to that reducer machine, and then that result is streamed to the reduce function.

One example of an effective use of combiners is when computing the average (mean) transaction amount per customer from log files that record only individual transactions. It would be efficient to have the averages for each customer emitted by a mapper already calculated for that mapper output before shuffling and then sending to the reducers. In this case, we can't use the reducer as a combiner because mean isn't an associative operation and because the output of the mapper needs to match the input for the reducers (Output of the mapper currently would be in form (key, value) while the reducer would need (key, [value, count]) to compute the average.  First, we need the mapper to emit a pair of values (sum, count) for each string key, and then a combiner can be used to compute an intermediary sum all the sums for a key and sum all the counts for the key. The combiner output is then sent to the reducer in an appropriate format to be used to calculate average while also providing a more efficient shuffling step to lower overall runtime.

The Hadoop shuffle is comprised of the partition, sort, and combine steps described in detail above. It is the part of the Hadoop MapReduce execution framework synchronization step that gathers mapper outputs, sorts according to combiner settings, and shuffles through partitioning settings intermediate data to the final reducer steps. These steps involve both in-memory buffering (map-side partitioning, sorting, and combining) and on-disk (when combiner outputs are shuffled to the reducers.



**HW3.1 Use Counters to do EDA (exploratory data analysis and to monitor progress)
Counters are lightweight objects in Hadoop that allow you to keep track of system progress in both the map and reduce stages of processing. By default, Hadoop defines a number of standard counters in "groups"; these show up in the jobtracker webapp, giving you information such as "Map input records", "Map output records", etc.** 

**While processing information/data using MapReduce job, it is a challenge to monitor the progress of parallel threads running across nodes of distributed clusters. Moreover, it is also complicated to distinguish between the data that has been processed and the data which is yet to be processed. The MapReduce Framework offers a provision of user-defined Counters, which can be effectively utilized to monitor the progress of data across nodes of distributed clusters.**

**The consumer complaints dataset consists of diverse consumer complaints, which have been reported across the United States regarding various types of loans. The dataset consists of records of the form:**

**Complaint ID,Product,Sub-product,Issue,Sub-issue,State,ZIP code,Submitted via,Date received,Date sent to company,Company,Company response,Timely response?,Consumer disputed?**

**User-defined Counters**

**Now, let’s use Hadoop Counters to identify the number of complaints pertaining to debt collection, mortgage and other categories (all other categories get lumped into this one) in the consumer complaints dataset. Basically produce the distribution of the Product column in this dataset using counters (limited to 3 counters here).**

**Hadoop offers Job Tracker, an UI tool to determine the status and statistics of all jobs. Using the job tracker UI, developers can view the Counters that have been created. Screenshot your  job tracker UI as your job completes and include it here. Make sure that your user defined counters are visible.** 

In [1]:
%%writefile mapper.py
#!/usr/bin/python
## mapper.py
## Author: Jing Xu
## Description: mapper code for HW3.1

import sys
import csv

reader = csv.reader(sys.stdin)
# read input from STDIN (standard input)
for line in reader:
    # split the line into words
    product = line[1].lower()
#        product = csvreader[1].lower() #assign product variable
    if product == 'debt collection': #check if product matches debt collection to increment counters
        sys.stderr.write("reporter:counter:DebtCounter,Total,1\n")
        print '%s\t%s' % (product, 1)
    elif line[1].lower() == 'mortgage': #check if product matches mortgage to increment counters
        sys.stderr.write("reporter:counter:MortgageCounter,Total,1\n")
        print '%s\t%s' % (product, 1)
    else: #all other categories grouped to "other"
        sys.stderr.write("reporter:counter:OtherCounter,Total,1\n")
        print '%s\t%s' % ('other', 1)

Overwriting mapper.py


In [2]:
%%writefile reducer.py
#!/usr/bin/python
## reducer.py
## Author: Jing Xu
## Description: reducer code for HW3.1

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)
    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word

# output the last word if needed
if current_word == word:
    print '%s\t%s' % (current_word, current_count)

Overwriting reducer.py


In [3]:
%%writefile mapreduce.sh
## mapreduce.sh
## Author: Jing Xu
## Description: mapreduce bash script for HW3.1

hdfs dfs -mkdir /user # create hdfs folder
wait
hdfs dfs -mkdir /user/jing # create hdfs folder
wait
hadoop fs -put Consumer_Complaints.csv /user/jing # upload local file to hdfs folder
wait

# hadoop command to run streaming mapreduce job
hadoop jar /Users/JingXu/Documents/hadoop-2.6.3/share/hadoop/tools/lib/hadoop-streaming-2.6.3.jar \
-D mapred.job.name="Count Job via Streaming" \
-file mapper.py    -mapper mapper.py \
-file reducer.py   -reducer reducer.py \
-input /user/jing/* -output /user/jing/test-output1/
wait

hadoop dfs -cat /user/jing/test-output1/part-00000 # print contents of the word count output file
wait

hdfs dfs -rmr /user/jing # remove hdfs folder
hdfs dfs -rmr /user # remove hdfs folder
rm part-00000 # delete results 


Overwriting mapreduce.sh


In [4]:
# give correct permissions
!chmod a+x reducer.py
!chmod a+x mapper.py
!chmod a+x mapreduce.sh

In [5]:
!./mapreduce.sh

16/02/01 16:20:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 16:20:57 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 16:20:59 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 16:21:01 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/02/01 16:21:01 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [mapper.py, reducer.py] [] /var/folders/zs/k144hqks281fbt0x68c_zj9m0000gp/T/streamjob4137158095172437633.jar tmpDir=null
16/02/01 16:21:01 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/02/01 16:21:01 INFO jvm.JvmMetrics: Initial

debt collection	44372

mortgage	125752

other	142789

**HW3.2 Analyze the performance of your Mappers, Combiners and Reducers using Counters**

**For this brief study the Input file will be one record (the next line only): 
foo foo quux labs foo bar quux**


**Perform a word count analysis of this single record dataset using a Mapper and Reducer based WordCount (i.e., no combiners are used here) using user defined Counters to count up how many time the mapper and reducer are called. What is the value of your user defined Mapper Counter, and Reducer Counter after completing this word count job. The answer  should be 1 and 4 respectively. Please explain.**

**Please use mulitple mappers and reducers for these jobs (at least 2 mappers and 2 reducers).
Perform a word count analysis of the Issue column of the Consumer Complaints  Dataset using a Mapper and Reducer based WordCount (i.e., no combiners used anywhere)  using user defined Counters to count up how many time the mapper and reducer are called. What is the value of your user defined Mapper Counter, and Reducer Counter after completing your word count job.**

**Perform a word count analysis of the Issue column of the Consumer Complaints  Dataset using a Mapper, Reducer, and standalone combiner (i.e., not an in-memory combiner) based WordCount using user defined Counters to count up how many time the mapper, combiner, reducer are called. What is the value of your user defined Mapper Counter, and Reducer Counter after completing your word count job. 
Using a single reducer: What are the top 50 most frequent terms in your word count analysis? Present the top 50 terms and their frequency and their relative frequency. Present the top 50 terms and their frequency and their relative frequency. If there are ties please sort the tokens in alphanumeric/string order. Present bottom 10 tokens (least frequent items).** 

In [6]:
%%writefile mapper.py
#!/usr/bin/python
## mapper.py
## Author: Jing Xu
## Description: mapper code for HW3.2.1

import sys
import re

sys.stderr.write("reporter:counter:MapCounter,Instances,1\n")

WORD_RE = re.compile(r"[\w']+") #Compile regex to easily parse complete words

# read input from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words=re.findall(WORD_RE,line) #create list of words
    # increase counters
    for word in words:
        word = word.lower()
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        # tab-delimited; the trivial word count is 1
        print '%s\t%s' % (word, 1)
    sys.stderr.write("reporter:counter:MapCounter,Lines,1\n")



Overwriting mapper.py


In [7]:
%%writefile reducer.py
#!/usr/bin/python
## reducer.py
## Author: Jing Xu
## Description: reducer code for HW3.2.1

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)
    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word

if current_word == word:
    print '%s\t%s' % (current_word, current_count)

sys.stderr.write("reporter:counter:ReducerCounter,Total,1\n")

Overwriting reducer.py


In [8]:
%%writefile mapreduce.sh
## mapreduce.sh
## Author: Jing Xu
## Description: mapreduce bash script for HW3.2.1

echo "foo foo quux labs foo bar quux" > hw3-2.txt

hdfs dfs -mkdir /user # create hdfs folder
wait
hdfs dfs -mkdir /user/jing # create hdfs folder
wait
hadoop fs -put hw3-2.txt /user/jing # upload local file to hdfs folder
wait

# hadoop command to run streaming mapreduce job
hadoop jar /Users/JingXu/Documents/hadoop-2.6.3/share/hadoop/tools/lib/hadoop-streaming-2.6.3.jar \
-D mapred.job.name="Count Job via Streaming" \
-D mapred.map.tasks=1 \
-D mapred.reduce.tasks=4 \
-file mapper.py    -mapper mapper.py \
-file reducer.py   -reducer reducer.py \
-input /user/jing/* -output /user/jing/test-output1/
wait

hadoop dfs -cat /user/jing/test-output1/part-00000

hdfs dfs -rmr /user/jing # remove hdfs folder
hdfs dfs -rmr /user # remove hdfs folder

Overwriting mapreduce.sh


In [9]:
!./mapreduce.sh

16/02/01 16:22:25 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 16:22:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 16:22:29 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 16:22:30 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/02/01 16:22:31 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [mapper.py, reducer.py] [] /var/folders/zs/k144hqks281fbt0x68c_zj9m0000gp/T/streamjob8936948989538245615.jar tmpDir=null
16/02/01 16:22:31 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/02/01 16:22:31 INFO jvm.JvmMetrics: Initial

MapCounter
	
    Total=1

ReduceCounter
	
    Total=4

This indicates that 1 mapper is being called within the Hadoop MR framework, because the input file has only 1 line that is being read. 4 reducers are being called because that is the amount I set to be called.

In [59]:
%%writefile mapper.py
#!/usr/bin/python
## mapper.py
## Author: Jing Xu
## Description: mapper code for HW3.2.2

import sys
import csv
import re

sys.stderr.write("reporter:counter:MapCounter,Instances,1\n")

WORD_RE = re.compile(r"[\w']+") #Compile regex to easily parse complete words

reader = csv.reader(sys.stdin)
# read input from STDIN (standard input)
for line in reader:
    # split the line into words
    issue = line[3]
    words = re.findall(WORD_RE,issue) #create list of words
    for word in words:
        print '%s\t%s' % (word.lower(), 1)
    sys.stderr.write("reporter:counter:MapperCounter,Lines,1\n")


Overwriting mapper.py


In [63]:
%%writefile reducer.py
#!/usr/bin/python
## reducer.py
## Author: Jing Xu
## Description: reducer code for HW3.2.2

import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)
    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%d' % (current_word, current_count)
        current_count = count
        current_word = word
    sys.stderr.write("reporter:counter:ReducerCounter,Lines,1\n")
        
# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%d' % (current_word, current_count)

sys.stderr.write("reporter:counter:ReducerCounter,Instances,1\n")

Overwriting reducer.py


In [74]:
%%writefile mapreduce.sh
## mapreduce.sh
## Author: Jing Xu
## Description: mapreduce bash script for HW3.2.2

hdfs dfs -mkdir /user # create hdfs folder
wait
hdfs dfs -mkdir /user/jing # create hdfs folder
wait
hadoop fs -put Consumer_Complaints.csv /user/jing # upload local file to hdfs folder
wait

# hadoop command to run streaming mapreduce job
#sort primarly by numeric descending in 2nd key, then alphabetically on 1st key
hadoop jar /Users/JingXu/Documents/hadoop-2.6.3/share/hadoop/tools/lib/hadoop-streaming-2.6.3.jar \
-D mapred.job.name="Count Job via Streaming" \
-D mapred.map.tasks=2 \
-D mapred.reduce.tasks=2 \
-D stream.num.map.output.key.fields=2 \
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.keycomparator.options="-k2,2nr -k1,1" \
-file mapper.py    -mapper mapper.py \
-file reducer.py   -reducer reducer.py \
-input /user/jing/* -output /user/jing/test-output1/
wait

hadoop dfs -cat /user/jing/test-output1/part-00000
wait

hdfs dfs -rmr /user/jing # remove hdfs folder
hdfs dfs -rmr /user # remove hdfs folder

Overwriting mapreduce.sh


In [75]:
!./mapreduce.sh

16/02/01 18:30:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 18:30:34 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 18:30:36 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 18:30:38 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/02/01 18:30:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [mapper.py, reducer.py] [] /var/folders/zs/k144hqks281fbt0x68c_zj9m0000gp/T/streamjob4520530247144752552.jar tmpDir=null
16/02/01 18:30:39 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/02/01 18:30:39 INFO jvm.JvmMetrics: Initial

There are now 2 mapper instances and 312913 mapper line calls, one for each line in the file. There are now 2 reducer instances and 8350086 reducer word calls, one for each word in the Issues column. The two reducer outputs are not displaying a total sort - they are sorted individually within each reducer. This is not optimal.

In [14]:
%%writefile mapper.py
#!/usr/bin/python
## mapper.py
## Author: Jing Xu
## Description: mapper code for HW3.2.3

import sys
import csv
import re

sys.stderr.write("reporter:counter:MapperCounter,Instance,1\n")

WORD_RE = re.compile(r"[\w']+") #Compile regex to easily parse complete words

reader = csv.reader(sys.stdin)
# read input from STDIN (standard input)
for line in reader:
    # split the line into words
    issue = line[3]
    words = re.findall(WORD_RE,issue) #create list of words
    for word in words:
        print '%s\t%s' % (word.lower(), 1)
    sys.stderr.write("reporter:counter:MapperCounter,Line,1\n")


Overwriting mapper.py


In [15]:
%%writefile combiner.py
#!/usr/bin/python
## combiner.py
## Author: Jing Xu
## Description: combiner code for HW3.2.3

import sys

current_word = None
current_count = 0
word = None

sys.stderr.write("reporter:counter:CombinerCounter,Instance,1\n")

# input comes from STDIN
for line in sys.stdin:
    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)
    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word
    sys.stderr.write("reporter:counter:CombinerCounter,Line,1\n")

# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%s' % (current_word, current_count)
    


Overwriting combiner.py


In [16]:
%%writefile reducer.py
#!/usr/bin/python
## reducer.py
## Author: Jing Xu
## Description: reducer code for HW3.2.3

import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)
    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word
    sys.stderr.write("reporter:counter:ReducerCounter,Line,1\n")

# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%s' % (current_word, current_count)

sys.stderr.write("reporter:counter:ReducerCounter,Instance,1\n")


Overwriting reducer.py


In [17]:
%%writefile mapreduce.sh
## mapreduce.sh
## Author: Jing Xu
## Description: mapreduce bash script for HW3.2.2

hdfs dfs -mkdir /user # create hdfs folder
wait
hdfs dfs -mkdir /user/jing # create hdfs folder
wait
hadoop fs -put Consumer_Complaints.csv /user/jing # upload local file to hdfs folder
wait

# hadoop command to run streaming mapreduce job
hadoop jar /Users/JingXu/Documents/hadoop-2.6.3/share/hadoop/tools/lib/hadoop-streaming-2.6.3.jar \
-D mapred.job.name="Count Job via Streaming" \
-D mapred.map.tasks=2 \
-D mapred.reduce.tasks=2 \
-file mapper.py    -mapper mapper.py \
-file reducer.py   -reducer reducer.py \
-file reducer.py   -combiner combiner.py \
-input /user/jing/* -output /user/jing/test-output1/
wait

hadoop dfs -cat /user/jing/test-output1/part-00000
wait

hdfs dfs -rmr /user/jing # remove hdfs folder
hdfs dfs -rmr /user # remove hdfs folder

Overwriting mapreduce.sh


In [18]:
!chmod a+x combiner.py

In [19]:
!./mapreduce.sh

16/02/01 16:26:00 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 16:26:01 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 16:26:03 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 16:26:05 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/02/01 16:26:05 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [mapper.py, reducer.py, reducer.py] [] /var/folders/zs/k144hqks281fbt0x68c_zj9m0000gp/T/streamjob7066278641568308671.jar tmpDir=null
16/02/01 16:26:06 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/02/01 16:26:06 INFO jvm.JvmMetr

CombinerCounter

    Instance=4
    Line=1348309

MapperCounter

    Instance=2
    Line=312913
	
ReducerCounter

    Instance=2
    Line=174

In [20]:
%%writefile mapper.py
#!/usr/bin/python
## mapper.py
## Author: Jing Xu
## Description: mapper code for HW3.2.4

import sys
import csv
import re

WORD_RE = re.compile(r"[\w']+") #Compile regex to easily parse complete words

reader = csv.reader(sys.stdin)
# read input from STDIN (standard input)
for line in reader:
    # split the line into words
    issue = line[3]
    words = re.findall(WORD_RE,issue) #create list of words
    for word in words:
        print '%s\t%d' % ('*', 1)
        print '%s\t%d' % (word.lower(), 1)

Overwriting mapper.py


In [21]:
%%writefile mapper2.py
#!/usr/bin/python
## mapper2.py
## Author: Jing Xu
## Description: mapper2 code for HW3.2.4

import sys

# read input from STDIN (standard input)
for line in sys.stdin:
    # split the line into words
    line = line.strip('\n')
    word, count = line.split('\t', 1)
    count = int(count)
    print '%s\t%d' % (word, count)

Overwriting mapper2.py


In [22]:
%%writefile reducer.py
#!/usr/bin/python
## combiner.py
## Author: Jing Xu
## Description: combiner code for HW3.2.4

import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)
    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%d' % (current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%d' % (current_word, current_count)

Overwriting reducer.py


In [23]:
%%writefile reducer2.py
#!/usr/bin/python
## reducer2.py
## Author: Jing Xu
## Description: reducer2 code for HW3.2.4

import sys, Queue

n_max, n_min = 50, 10 #want top 50 largest numbers and 10 smallest numbers
q_max = Queue.Queue(n_max) #queue for largest values
a_min = [] #list for smallest values
total_words = 0 #total words used to calculate relative frequency

for line in sys.stdin:
    line = line.strip('\n') #clean line
    rec = line.split('\t', 1) #split line
    if rec[0] == '*': #if key is *, indicates it is the total count value
        rec[1] = rec[1].strip('\t')
        total_words = int(rec[1]) #assign count as total_words
    else:
        rec[1] = rec[1].strip('\t')
        rec[1] = int(rec[1])
        # put the smalles value in the queue
        if len(a_min) < n_min:
            a_min.append([rec[0], rec[1]])
        # whatever left is the biggest
        if q_max.full():
            q_max.get()
        q_max.put([rec[0], rec[1]])

print '\n%d smallest records:' %n_min
for record in a_min:
    freq = float(record[1])/float(total_words) #calculate relative frequency
    record.append(freq) #append relative frequency to output 
    print record

print '\n%d biggest records:' %n_max
for i in range(n_max):
    value = q_max.get() 
    freq = float(value[1])/float(total_words) #calculate relative frequency
    value.append(freq) #append relative frequency to output 
    print value

Overwriting reducer2.py


In [24]:
%%writefile mapreduce.sh
## mapreduce.sh
## Author: Jing Xu
## Description: mapreduce bash script for HW3.2.4

hdfs dfs -mkdir /user # create hdfs folder
wait
hdfs dfs -mkdir /user/jing # create hdfs folder
wait
hadoop fs -put Consumer_Complaints.csv /user/jing # upload local file to hdfs folder
wait

# hadoop command to run streaming mapreduce job
hadoop jar /Users/JingXu/Documents/hadoop-2.6.3/share/hadoop/tools/lib/hadoop-streaming-2.6.3.jar \
-D mapred.job.name="Count Job via Streaming" \
-D mapred.map.tasks=1 \
-D mapred.reduce.tasks=1 \
-file mapper.py    -mapper mapper.py \
-file reducer.py   -reducer reducer.py \
-input /user/jing/* -output /user/jing/test-output1/

wait

# second hadoop command to run streaming mapreduce job
hadoop jar /Users/JingXu/Documents/hadoop-2.6.3/share/hadoop/tools/lib/hadoop-streaming-2.6.3.jar \
-D mapred.job.name="Count Job via Streaming" \
-D mapred.map.tasks=1 \
-D mapred.reduce.tasks=1 \
-D stream.num.map.output.key.fields=2 \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options="-k2n -k1" \ #sort primarly by numeric descending in 2nd key, then alphabetically on 1st key
-file mapper2.py    -mapper mapper2.py \
-file reducer2.py   -reducer reducer2.py \
-input /user/jing/test-output1/part-00000 -output /user/jing/test-output2/

wait

hadoop dfs -cat /user/jing/test-output2/part-00000 #examine hdfs file output
wait

hdfs dfs -rmr /user/jing # remove hdfs folder
hdfs dfs -rmr /user # remove hdfs folder

Overwriting mapreduce.sh


In [25]:
!./mapreduce.sh

16/02/01 16:30:57 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 16:30:59 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 16:31:00 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 16:31:02 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/02/01 16:31:02 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [mapper.py, reducer.py] [] /var/folders/zs/k144hqks281fbt0x68c_zj9m0000gp/T/streamjob7591024917523853133.jar tmpDir=null
16/02/01 16:31:03 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/02/01 16:31:03 INFO jvm.JvmMetrics: Initial

**10 smallest records:**

['disclosures', 64, 4.746686404970967e-05]	
['missing', 64, 4.746686404970967e-05]	
['amt', 71, 5.265855230514667e-05]	
['day', 71, 5.265855230514667e-05]	
['checks', 75, 5.5625231308253527e-05]	
['convenience', 75, 5.5625231308253527e-05]	
['credited', 92, 6.823361707145766e-05]	
['payment', 92, 6.823361707145766e-05]	
['amount', 98, 7.268363557611794e-05]	
['apply', 118, 8.751703059165221e-05]	

**50 biggest records:**

['being', 5663, 0.004200075798648529]	
['by', 5663, 0.004200075798648529]	
['caused', 5663, 0.004200075798648529]	
['funds', 5663, 0.004200075798648529]	
['low', 5663, 0.004200075798648529]	
['the', 6248, 0.004633952602852907]	
['lease', 6337, 0.004699961210672034]	
['reporting', 6559, 0.004864611895344465]	
['disputes', 6938, 0.005145704730888839]	
['disclosure', 7655, 0.005677481942195743]	
['verification', 7655, 0.005677481942195743]	
['other', 7886, 0.005848807654625164]	
['billing', 8158, 0.00605054182683643]	
['unable', 8178, 0.006065375221851964]	
['to', 8401, 0.006230767576275172]	
['broker', 8625, 0.006396901600449155]	
['mortgage', 8625, 0.006396901600449155]	
['originator', 8625, 0.006396901600449155]	
['communication', 8671, 0.006431018408984884]	
['tactics', 8671, 0.006431018408984884]	
['application', 8868, 0.006577127349887897]	
['problems', 9484, 0.007033995916366352]	
['deposits', 10555, 0.007828324219448212]	
['withdrawals', 10555, 0.007828324219448212]	
['my', 10731, 0.007958858095584914]	
['of', 13983, 0.010370768125110787]	
['management', 16205, 0.012018758311336645]	
['opening', 16205, 0.012018758311336645]	
['and', 16448, 0.012198984060775386]	
['attempts', 17972, 0.013329288760959098]	
['collect', 17972, 0.013329288760959098]	
["cont'd", 17972, 0.013329288760959098]	
['owed', 17972, 0.013329288760959098]	
['not', 18477, 0.013703831985101339]	
['closing', 19000, 0.014091725264757559]	
['debt', 27874, 0.020673302633150117]	
['information', 29069, 0.021559597985328288]	
['on', 29069, 0.021559597985328288]	
['incorrect', 29133, 0.021607064849377997]	
['report', 34903, 0.025886499311359636]	
['escrow', 36767, 0.02726897172680743]	
['servicing', 36767, 0.02726897172680743]	
['payments', 39993, 0.02966159834281311]	
['or', 40508, 0.030043558264463116]	
['credit', 55251, 0.040977995400164204]	
['account', 57448, 0.04260744384262065]	
['foreclosure', 70487, 0.05227807572299822]	
['modification', 70487, 0.05227807572299822]	
['collection', 72394, 0.05369243993772941]	
['loan', 119630, 0.08872595228541826]	

**HW3.3. Shopping Cart Analysis
Product Recommendations: The action or practice of selling additional products or services 
to existing customers is called cross-selling. Giving product recommendation is 
one of the examples of cross-selling that are frequently used by online retailers. 
One simple method to give product recommendations is to recommend products that are frequently
browsed together by the customers.**

**Do some exploratory data analysis of this dataset. How many unique items are available from this supplier? Using a single reducer: Report your findings such as number of unique products; largest basket; report the top 50 most frequently purchased items,  their frequency,  and their relative frequency (break ties by sorting the products alphabetical order) etc. using Hadoop Map-Reduce.** 

In [26]:
%%writefile mapper.py
#!/usr/bin/python
## mapper.py
## Author: Jing Xu
## Description: mapper code for HW3.3.1

import sys

# read input from STDIN (standard input)
for line in sys.stdin:
    # split the line into words
    words = line.split()
    for word in words:
        print '%s\t%d' % (word, 1)

Overwriting mapper.py


In [27]:
%%writefile reducer.py
#!/usr/bin/python
## reducer.py
## Author: Jing Xu
## Description: combiner code for HW3.3.1

import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)
    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%d' % (current_word, current_count)
            sys.stderr.write("reporter:counter:UniqueWordsCounter,Total,1\n")
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%d' % (current_word, current_count)
    sys.stderr.write("reporter:counter:UniqueWordsCounter,Total,1\n")

Overwriting reducer.py


In [28]:
%%writefile mapreduce.sh
## mapreduce.sh
## Author: Jing Xu
## Description: mapreduce bash script for HW3.3.1

hdfs dfs -mkdir /user # create hdfs folder
wait
hdfs dfs -mkdir /user/jing # create hdfs folder
wait
hadoop fs -put ProductPurchaseData.txt /user/jing # upload local file to hdfs folder
wait

# hadoop command to run streaming mapreduce job
hadoop jar /Users/JingXu/Documents/hadoop-2.6.3/share/hadoop/tools/lib/hadoop-streaming-2.6.3.jar \
-D mapred.job.name="Count Job via Streaming" \
-D mapred.map.tasks=1 \
-D mapred.reduce.tasks=1 \
-file mapper.py    -mapper mapper.py \
-file reducer.py   -reducer reducer.py \
-input /user/jing/* -output /user/jing/test-output1/

wait

hdfs dfs -rmr /user/jing # remove hdfs folder
hdfs dfs -rmr /user # remove hdfs folder

Overwriting mapreduce.sh


In [29]:
!./mapreduce.sh

16/02/01 16:31:35 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 16:31:36 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 16:31:38 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 16:31:40 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/02/01 16:31:40 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [mapper.py, reducer.py] [] /var/folders/zs/k144hqks281fbt0x68c_zj9m0000gp/T/streamjob746043006379144654.jar tmpDir=null
16/02/01 16:31:40 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/02/01 16:31:40 INFO jvm.JvmMetrics: Initiali

UniqueWordsCounter
    
    Total=12592

In [30]:
%%writefile mapper.py
#!/usr/bin/python
## mapper.py
## Author: Jing Xu
## Description: mapper code for HW3.3.2

import sys

max_basket_count = 0
max_basket_items = []

# read input from STDIN (standard input)
for line in sys.stdin:
    # split the line into words
    basket_items = []
    basket_count = 0
    words = line.split()
    for word in words:
        basket_items.append(word)
        basket_count+=1
    if basket_count >= max_basket_count: 
        max_basket_count = basket_count
        max_basket_items = basket_items

print '%s\t%d\t%s' % ('max_count', max_basket_count, max_basket_items)

Overwriting mapper.py


In [31]:
%%writefile reducer.py
#!/usr/bin/python
## reducer.py
## Author: Jing Xu
## Description: combiner code for HW3.3.2

import sys

# input comes from STDIN
for line in sys.stdin:
    line = line.strip('\n')
    print line

Overwriting reducer.py


In [32]:
%%writefile mapreduce.sh
## mapreduce.sh
## Author: Jing Xu
## Description: mapreduce bash script for HW3.3.2

hdfs dfs -mkdir /user # create hdfs folder
wait
hdfs dfs -mkdir /user/jing # create hdfs folder
wait
hadoop fs -put ProductPurchaseData.txt /user/jing # upload local file to hdfs folder
wait

# hadoop command to run streaming mapreduce job
hadoop jar /Users/JingXu/Documents/hadoop-2.6.3/share/hadoop/tools/lib/hadoop-streaming-2.6.3.jar \
-D mapred.job.name="Count Job via Streaming" \
-D mapred.map.tasks=1 \
-D mapred.reduce.tasks=1 \
-file mapper.py    -mapper mapper.py \
-file reducer.py   -reducer reducer.py \
-input /user/jing/* -output /user/jing/test-output1/

wait

hadoop dfs -cat /user/jing/test-output1/part-00000 #examine hdfs file output
wait

hdfs dfs -rmr /user/jing # remove hdfs folder
hdfs dfs -rmr /user # remove hdfs folder

Overwriting mapreduce.sh


In [33]:
!./mapreduce.sh

16/02/01 16:31:56 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 16:31:57 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 16:31:59 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 16:32:01 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/02/01 16:32:01 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [mapper.py, reducer.py] [] /var/folders/zs/k144hqks281fbt0x68c_zj9m0000gp/T/streamjob2458430377133595769.jar tmpDir=null
16/02/01 16:32:02 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/02/01 16:32:02 INFO jvm.JvmMetrics: Initial

max_count: 37	
basket: ['GRO21487', 'FRO85978', 'DAI89320', 'SNA53220', 'SNA55762', 'GRO46854', 'ELE38511', 'SNA66583', 'FRO79579', 'FRO92469', 'FRO40251', 'GRO97448', 'DAI35347', 'FRO31317', 'FRO87622', 'SNA42518', 'ELE53126', 'ELE17451', 'GRO32086', 'ELE30327', 'DAI58206', 'DAI38969', 'ELE16038', 'DAI75645', 'DAI55148', 'GRO94173', 'ELE43952', 'FRO69613', 'GRO81647', 'GRO73461', 'FRO24098', 'ELE96667', 'GRO88324', 'GRO82670', 'GRO12815', 'SNA37475', 'ELE24369']

In [34]:
%%writefile mapper.py
#!/usr/bin/python
## mapper.py
## Author: Jing Xu
## Description: mapper code for HW3.3.3

import sys

# read input from STDIN (standard input)
for line in sys.stdin:
    # split the line into words
    words = line.split()
    for word in words:
        print '%s\t%d' % ('*', 1)
        print '%s\t%d' % (word, 1)

Overwriting mapper.py


In [35]:
%%writefile mapper2.py
#!/usr/bin/python
## mapper2.py
## Author: Jing Xu
## Description: mapper2 code for HW3.2.4

import sys

# read input from STDIN (standard input)
for line in sys.stdin:
    # split the line into words
    line = line.strip('\n')
    word, count = line.split('\t', 1)
    count = int(count)
    print '%s\t%d' % (word, count)

Overwriting mapper2.py


In [36]:
%%writefile reducer.py
#!/usr/bin/python
## combiner.py
## Author: Jing Xu
## Description: combiner code for HW3.2.4

import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)
    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%d' % (current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%d' % (current_word, current_count)

Overwriting reducer.py


In [37]:
%%writefile reducer2.py
#!/usr/bin/python
## reducer2.py
## Author: Jing Xu
## Description: reducer2 code for HW3.2.4

import sys, Queue

n_max = 50 #want top 50 largest numbers and 10 smallest numbers
q_max = Queue.Queue(n_max) #queue for largest values
total_words = 0 #total words used to calculate relative frequency

for line in sys.stdin:
    line = line.strip('\n') #clean line
    rec = line.split('\t', 1) #split line
    if rec[0] == '*': #if key is *, indicates it is the total count value
        rec[1] = rec[1].strip('\t')
        total_words = int(rec[1]) #assign count as total_words
    else:
        rec[1] = rec[1].strip('\t')
        rec[1] = int(rec[1])
        # whatever left is the biggest
        if q_max.full():
            q_max.get()
        q_max.put([rec[0], rec[1]])

print '\n%d biggest records:' %n_max
for i in range(n_max):
    value = q_max.get() 
    freq = float(value[1])/float(total_words) #calculate relative frequency
    value.append(freq) #append relative frequency to output 
    print value

Overwriting reducer2.py


In [38]:
%%writefile mapreduce.sh
## mapreduce.sh
## Author: Jing Xu
## Description: mapreduce bash script for HW3.2.4

hdfs dfs -mkdir /user # create hdfs folder
wait
hdfs dfs -mkdir /user/jing # create hdfs folder
wait
hadoop fs -put ProductPurchaseData.txt /user/jing # upload local file to hdfs folder
wait

# hadoop command to run streaming mapreduce job
hadoop jar /Users/JingXu/Documents/hadoop-2.6.3/share/hadoop/tools/lib/hadoop-streaming-2.6.3.jar \
-D mapred.job.name="Count Job via Streaming1" \
-D mapred.map.tasks=1 \
-D mapred.reduce.tasks=1 \
-file mapper.py    -mapper mapper.py \
-file reducer.py   -reducer reducer.py \
-input /user/jing/* -output /user/jing/test-output1/

wait

# second hadoop command to run streaming mapreduce job
# sort primarly by numeric descending in 2nd key, then alphabetically on 1st key
hadoop jar /Users/JingXu/Documents/hadoop-2.6.3/share/hadoop/tools/lib/hadoop-streaming-2.6.3.jar \
-D mapred.job.name="Count Job via Streaming2" \
-D mapred.map.tasks=1 \
-D mapred.reduce.tasks=1 \
-D stream.num.map.output.key.fields=2 \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options="-k2n -k1" \
-file mapper2.py    -mapper mapper2.py \
-file reducer2.py   -reducer reducer2.py \
-input /user/jing/test-output1/part-00000 -output /user/jing/test-output2/

wait

hadoop dfs -cat /user/jing/test-output2/part-00000 #examine hdfs file output
wait

hdfs dfs -rmr /user/jing # remove hdfs folder
hdfs dfs -rmr /user # remove hdfs folder

Overwriting mapreduce.sh


In [39]:
!./mapreduce.sh

16/02/01 16:32:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 16:32:19 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 16:32:21 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 16:32:22 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/02/01 16:32:22 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [mapper.py, reducer.py] [] /var/folders/zs/k144hqks281fbt0x68c_zj9m0000gp/T/streamjob4443816464631775356.jar tmpDir=null
16/02/01 16:32:23 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/02/01 16:32:23 INFO jvm.JvmMetrics: Initial

**50 biggest records:**

['GRO85051', 1214, 0.0031878242967880175]	
['DAI22896', 1219, 0.0032009537214041134]	
['GRO81087', 1220, 0.0032035796063273323]	
['DAI31081', 1261, 0.0033112408881793166]	
['GRO15017', 1275, 0.003348003277104384]	
['ELE91337', 1289, 0.0033847656660294517]	
['DAI43223', 1290, 0.003387391550952671]	
['SNA96271', 1295, 0.0034005209755687666]	
['ELE59935', 1311, 0.003442535134340273]	
['DAI88807', 1316, 0.0034556645589563684]	
['ELE74482', 1316, 0.0034556645589563684]	
['GRO61133', 1321, 0.003468793983572464]	
['ELE56788', 1345, 0.003531815221729723]	
['GRO38814', 1352, 0.0035501964161922567]	
['SNA90094', 1390, 0.0036499800432745837]	
['SNA93860', 1407, 0.0036946200869693085]	
['FRO53271', 1420, 0.003728756590971157]	
['FRO35904', 1436, 0.0037707707497426635]	
['ELE34057', 1489, 0.003909942650673277]	
['GRO94758', 1489, 0.003909942650673277]	
['ELE99737', 1516, 0.003980841543600193]	
['FRO78087', 1531, 0.00402022981744848]	
['DAI22177', 1627, 0.004272314770077516]	
['SNA55762', 1646, 0.00432220658361868]	
['ELE66810', 1697, 0.0044561267147028545]	
['FRO32293', 1702, 0.004469256139318951]	
['DAI83733', 1712, 0.004495514988551142]	
['ELE66600', 1713, 0.004498140873474361]	
['GRO46854', 1756, 0.004611053925172783]	
['DAI63921', 1773, 0.004655693968867509]	
['GRO56726', 1784, 0.0046845787030229185]	
['ELE74009', 1816, 0.004768607020565931]	
['GRO30386', 1840, 0.00483162825872319]	
['FRO85978', 1918, 0.005036447282734282]	
['GRO71621', 1920, 0.0050416990525807195]	
['GRO59710', 2004, 0.005262273386131126]	
['SNA99873', 2083, 0.005469718295065437]	
['GRO21487', 2115, 0.005553746612608449]	
['FRO80039', 2233, 0.005863601033548306]	
['ELE26917', 2292, 0.006018528244018234]	
['DAI85309', 2293, 0.006021154128941453]	
['FRO31317', 2330, 0.006118311871100561]	
['SNA45677', 2455, 0.006446547486502951]	
['DAI75645', 2736, 0.007184421149927526]	
['ELE32164', 2851, 0.007486397916097725]	
['SNA80324', 3044, 0.007993193706279015]	
['GRO73461', 3602, 0.009458437493435288]	
['ELE17451', 3875, 0.010175304077474108]	
['FRO40251', 3881, 0.010191059387013424]	
['DAI62779', 6667, 0.017506774783101905]

**HW3.4. (Computationally prohibitive but then again Hadoop can handle this) Pairs**

**Suppose we want to recommend new products to the customer based on the products they
have already browsed on the online website. Write a map-reduce program 
to find products which are frequently browsed together. Fix the support count (cooccurence count) to s = 100 
(i.e. product pairs need to occur together at least 100 times to be considered frequent) 
and find pairs of items (sometimes referred to itemsets of size 2 in association rule mining) that have a support count of 100 or more.**

**List the top 50 product pairs with corresponding support count (aka frequency), and relative frequency or support (number of records where they coccur, the number of records where they coccur/the number of baskets in the dataset)  in decreasing order of support  for frequent (100>count) itemsets of size 2.**

**Use the Pairs pattern (lecture 3)  to  extract these frequent itemsets of size 2. Free free to use combiners if they bring value. Instrument your code with counters for count the number of times your mapper, combiner and reducers are called.**

**Please output records of the following form for the top 50 pairs (itemsets of size 2):** 

**      item1, item2, support count, support**



**Fix the ordering of the pairs lexicographically (left to right), 
and break ties in support (between pairs, if any exist) 
by taking the first ones in lexicographically increasing order.** 

**Report the compute time for the Pairs job. Describe the computational setup used (E.g., single computer; dual core; linux, number of mappers, number of reducers)
Instrument your mapper, combiner, and reducer to count how many times each is called using Counters and report these counts.**

In [40]:
%%writefile mapper.py
#!/usr/bin/python
## mapper.py
## Author: Jing Xu
## Description: mapper code for HW3.4.1

import sys
from itertools import combinations

sys.stderr.write("reporter:counter:Mapper1Counter,Instance,1\n")

# read input from STDIN (standard input)
for line in sys.stdin:
    # split the line into words
    items = line.split()
    items = set(items) #create unique set of items
    items = list(items) #convert to list
    pairs = [",".join(map(str,comb)) for comb in combinations(items, 2)] #list of every unique pair from items list
    for pair in pairs:
        pair = pair.split(',')
        pair = sorted(pair, key=str.lower) #sort pair alphabetically for future matching
        pair = pair[0]+','+pair[1]
        print '%s\t%d' % (pair, 1)
        sys.stderr.write("reporter:counter:Mapper1Counter,Pairs,1\n")
    print '%s\t%d' % ('*,*', 1)

Overwriting mapper.py


In [41]:
%%writefile mapper2.py
#!/usr/bin/python
## mapper2.py
## Author: Jing Xu
## Description: mapper2 code for HW3.4.1

import sys

# read input from STDIN (standard input)
for line in sys.stdin:
    # split the line into words
    line = line.strip('\n')
    pair, count = line.split('\t', 1)
    count = int(count)
    print '%s\t%d' % (pair, count)

Overwriting mapper2.py


In [42]:
%%writefile reducer.py
#!/usr/bin/python
## combiner.py
## Author: Jing Xu
## Description: combiner code for HW3.4.1

import sys

current_pair = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # parse the input we got from mapper.py
    pair, count = line.split('\t', 1)
    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
    if current_pair == pair:
        current_count += count
    else:
        if current_pair and current_count>=100: #filter out all pairs that have cooccurence < 100
            # write result to STDOUT
            print '%s\t%d' % (current_pair, current_count)
            sys.stderr.write("reporter:counter:Reducer1Counter,Pairs,1\n")
        current_count = count
        current_pair = pair
    
# do not forget to output the last word if needed!
if current_pair == pair and current_count>=100:
    print '%s\t%d' % (current_pair, current_count)
    sys.stderr.write("reporter:counter:Reducer1Counter,Pairs,1\n")

sys.stderr.write("reporter:counter:Reducer1Counter,Instances,1\n")

Overwriting reducer.py


In [43]:
%%writefile reducer2.py
#!/usr/bin/python
## reducer2.py
## Author: Jing Xu
## Description: reducer2 code for HW3.2.4

import sys, Queue

n_max = 50 #want top 50 largest numbers and 10 smallest numbers
q_max = Queue.Queue(n_max) #queue for largest values
total_buckets = 0 #total words used to calculate relative frequency

for line in sys.stdin:
    line = line.strip('\n') #clean line
    rec = line.split('\t', 1) #split line
    if rec[0] == '*,*': #if key is *,*, indicates it is the total buckets value
        rec[1] = rec[1].strip('\t')
        total_buckets = int(rec[1]) #assign count as total_words
    else:
        first, second = rec[0].split(',')
        rec[1] = rec[1].strip('\t')
        rec[1] = int(rec[1])
        # whatever left is the biggest
        if q_max.full():
            q_max.get()
        q_max.put([first, second, rec[1]])

top_list = []
        
for i in range(n_max):
    value = q_max.get() 
    freq = float(value[2])/float(total_buckets) #calculate relative frequency
    value.append(freq) #append relative frequency to output 
    top_list.append(value)

top_list.reverse() #reverse list so highest value is first
    
print '\nTop %d product pairs:' %n_max
for value in top_list:
    print value

Overwriting reducer2.py


In [44]:
%%writefile mapreduce.sh
## mapreduce.sh
## Author: Jing Xu
## Description: mapreduce bash script for HW3.4.1

hdfs dfs -mkdir /user # create hdfs folder
wait
hdfs dfs -mkdir /user/jing # create hdfs folder
wait
hadoop fs -put ProductPurchaseData.txt /user/jing # upload local file to hdfs folder
wait

STARTTIME=$(date +%s)

# hadoop command to run streaming mapreduce job
hadoop jar /Users/JingXu/Documents/hadoop-2.6.3/share/hadoop/tools/lib/hadoop-streaming-2.6.3.jar \
-D mapred.job.name="Count Job via Streaming1" \
-D mapred.map.tasks=1 \
-D mapred.reduce.tasks=1 \
-file mapper.py    -mapper mapper.py \
-file reducer.py   -reducer reducer.py \
-input /user/jing/* -output /user/jing/test-output1/

wait

# second hadoop command to run streaming mapreduce job
# sort primarly by numeric descending in 2nd key, then alphabetically on 1st key
hadoop jar /Users/JingXu/Documents/hadoop-2.6.3/share/hadoop/tools/lib/hadoop-streaming-2.6.3.jar \
-D mapred.job.name="Count Job via Streaming2" \
-D mapred.map.tasks=1 \
-D mapred.reduce.tasks=1 \
-D stream.num.map.output.key.fields=2 \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options="-k2n -k1" \
-file mapper2.py    -mapper mapper2.py \
-file reducer2.py   -reducer reducer2.py \
-input /user/jing/test-output1/part-00000 -output /user/jing/test-output2/

wait

#calculate compute time for Pairs job
ENDTIME=$(date +%s)
echo "It takes $(($ENDTIME - $STARTTIME)) seconds to complete this task"
wait

hadoop dfs -cat /user/jing/test-output2/part-00000 #examine hdfs file output
wait

hdfs dfs -rmr /user/jing # remove hdfs folder
hdfs dfs -rmr /user # remove hdfs folder

Overwriting mapreduce.sh


In [45]:
!./mapreduce.sh

16/02/01 16:32:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 16:32:54 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 16:32:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 16:32:57 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/02/01 16:32:57 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [mapper.py, reducer.py] [] /var/folders/zs/k144hqks281fbt0x68c_zj9m0000gp/T/streamjob8771942968046436571.jar tmpDir=null
16/02/01 16:32:58 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/02/01 16:32:58 INFO jvm.JvmMetrics: Initial

**Single computer dual core, 1 mapper, 1 reducer**

**It takes 42 seconds to complete this task**

**Mapper1Counter**

    Instance=1
	Pairs=2534014

**Reducer1Counter**

	Instances=1
	Pairs=1335

**Top 50 product pairs:**

['DAI62779', 'ELE17451', 1592, 0.05118806469245362]	
['FRO40251', 'SNA80324', 1412, 0.04540046943828173]	
['DAI75645', 'FRO40251', 1254, 0.04032024693739751]	
['FRO40251', 'GRO85051', 1213, 0.039001961351725026]	
['DAI62779', 'GRO73461', 1139, 0.03662261663612103]	
['DAI75645', 'SNA80324', 1130, 0.03633323687341243]	
['DAI62779', 'FRO40251', 1070, 0.03440403845535513]	
['DAI62779', 'SNA80324', 923, 0.029677502331114755]	
['DAI62779', 'DAI85309', 918, 0.029516735796276648]	
['ELE32164', 'GRO59710', 911, 0.029291662647503297]	
['FRO40251', 'GRO73461', 882, 0.02835921674544227]	
['DAI62779', 'DAI75645', 882, 0.02835921674544227]	
['DAI62779', 'ELE92920', 877, 0.02819845021060416]	
['FRO40251', 'FRO92469', 835, 0.026848011317964052]	
['DAI62779', 'ELE32164', 832, 0.026751551397061188]	
['DAI75645', 'GRO73461', 712, 0.022893154560946594]	
['DAI43223', 'ELE32164', 711, 0.022861001253978972]	
['DAI62779', 'GRO30386', 709, 0.02279669464004373]	
['ELE17451', 'FRO40251', 697, 0.022410854956432268]	
['DAI85309', 'ELE99737', 659, 0.021189029291662647]	
['DAI62779', 'ELE26917', 650, 0.020899649528954053]	
['GRO21487', 'GRO73461', 631, 0.02028873669656924]	
['DAI62779', 'SNA45677', 604, 0.019420597408443457]	
['ELE17451', 'SNA80324', 597, 0.019195524259670107]	
['DAI62779', 'GRO71621', 595, 0.019131217645734864]	
['DAI62779', 'SNA55762', 593, 0.01906691103179962]	
['DAI62779', 'DAI83733', 586, 0.01884183788302627]	
['ELE17451', 'GRO73461', 580, 0.018648918041220538]	
['GRO73461', 'SNA80324', 562, 0.01807015851580335]	
['DAI62779', 'GRO59710', 561, 0.01803800520883573]	
['DAI62779', 'FRO80039', 550, 0.01768431883219189]	
['DAI75645', 'ELE17451', 547, 0.017587858911289025]	
['DAI62779', 'SNA93860', 537, 0.01726632584161281]	
['DAI55148', 'DAI62779', 526, 0.016912639464968973]	
['DAI43223', 'GRO59710', 512, 0.01646249316742227]	
['ELE17451', 'ELE32164', 511, 0.016430339860454647]	
['DAI62779', 'SNA18336', 506, 0.01626957332561654]	
['ELE32164', 'GRO73461', 486, 0.015626507186264106]	
['DAI85309', 'ELE17451', 482, 0.01549789395839362]	
['DAI62779', 'FRO78087', 482, 0.01549789395839362]	
['DAI62779', 'GRO94758', 479, 0.015401434037490756]	
['GRO85051', 'SNA80324', 471, 0.015144207581749784]	
['DAI62779', 'GRO21487', 471, 0.015144207581749784]	
['ELE17451', 'GRO30386', 468, 0.015047747660846917]	
['FRO85978', 'SNA95666', 463, 0.01488698112600881]	
['DAI62779', 'FRO19221', 462, 0.014854827819041188]	
['DAI62779', 'GRO46854', 461, 0.014822674512073567]	
['DAI43223', 'DAI62779', 459, 0.014758367898138324]	
['ELE92920', 'SNA18336', 455, 0.014629754670267838]	
['DAI88079', 'FRO40251', 446, 0.014340374907559243]

**HW3.5: Stripes
Repeat 3.4 using the stripes design pattern for finding cooccuring pairs.**

**Report the compute times for stripes job versus the Pairs job. Describe the computational setup used (E.g., single computer; dual core; linux, number of mappers, number of reducers)**

**Instrument your mapper, combiner, and reducer to count how many times each is called using Counters and report these counts. Discuss the differences in these counts between the Pairs and Stripes jobs.**

In [46]:
%%writefile mapper.py
#!/usr/bin/python
## mapper.py
## Author: Jing Xu
## Description: mapper code for HW3.5

import sys

sys.stderr.write("reporter:counter:Mapper1Counter,Instance,1\n")

# read input from STDIN (standard input)
for line in sys.stdin:
    # split the line into words
    items = line.split()
    items = set(items) #create unique set of items
    items = list(items) #convert to list
    items = sorted(items, key=str.lower) #sort list of unique items
    while len(items) != 1:
        first = items[0] #first item in unique list is key
        items = items[1:] #remove first item from unique list 
        string = ''
        for item in items: #replicate dictionary structure
            string+="'%s': 1, "%item        
        print '%s\t{%s}' % (first, string)
        sys.stderr.write("reporter:counter:Mapper1Counter,Stripes,1\n")
    print "*\t{'*': 1}" #counter for total baskets

Overwriting mapper.py


In [47]:
%%writefile mapper2.py
#!/usr/bin/python
## mapper2.py
## Author: Jing Xu
## Description: mapper2 code for HW3.5

import sys
import ast

# read input from STDIN (standard input)
for line in sys.stdin:
    # split the line into words
    line = line.strip('\n')
    key, stripe = line.split('\t', 1)
    dic = ast.literal_eval(stripe)
    elements = dic.items()
    for element in elements:
        pair = '%s,%s' % (key, element[0])
        count = int(element[1])
        print '%s\t%d' % (pair, count)

Overwriting mapper2.py


In [48]:
%%writefile reducer.py
#!/usr/bin/python
## combiner.py
## Author: Jing Xu
## Description: combiner code for HW3.5

import sys
import ast

current_key = None
key_dictionary = {}

# input comes from STDIN
for line in sys.stdin:
    # parse the input we got from mapper.py
    key, stripe = line.split('\t', 1)
    dic = ast.literal_eval(stripe)
    if current_key == key:
        #merge and add dictionary values together
        key_dictionary = { k: key_dictionary.get(k, 0) + dic.get(k, 0) for k in set(key_dictionary) | set(dic) } 
    else:
        for k in key_dictionary.keys(): #delete all dictionary keys with values < 100
            if key_dictionary[k] < 100:
                del key_dictionary[k]
        if current_key: #filter out all pairs that have cooccurence < 100
            # write result to STDOUT
            print '%s\t%s' % (current_key, key_dictionary)
            sys.stderr.write("reporter:counter:Reducer1Counter,Pairs,1\n")
        key_dictionary = dic
        current_key = key
    
# do not forget to output the last word if needed!
if current_key == key:
    for k in key_dictionary.keys(): #delete all dictionary keys with values < 100
            if key_dictionary[k] < 100:
                del key_dictionary[k]
    print '%s\t%s' % (current_key, key_dictionary)
    sys.stderr.write("reporter:counter:Reducer1Counter,Pairs,1\n")

sys.stderr.write("reporter:counter:Reducer1Counter,Instances,1\n")

Overwriting reducer.py


In [49]:
%%writefile reducer2.py
#!/usr/bin/python
## reducer2.py
## Author: Jing Xu
## Description: reducer2 code for HW3.5

import sys, Queue

n_max = 50 #want top 50 largest numbers and 10 smallest numbers
q_max = Queue.Queue(n_max) #queue for largest values
total_buckets = 0 #total words used to calculate relative frequency

for line in sys.stdin:
    line = line.strip('\n') #clean line
    rec = line.split('\t', 1) #split line
    if rec[0] == '*,*': #if key is *,*, indicates it is the total buckets value
        rec[1] = rec[1].strip('\t')
        total_buckets = int(rec[1]) #assign count as total_words
    else:
        first, second = rec[0].split(',')
        rec[1] = rec[1].strip('\t')
        rec[1] = int(rec[1])
        # whatever left is the biggest
        if q_max.full():
            q_max.get()
        q_max.put([first, second, rec[1]])

top_list = []
        
for i in range(n_max):
    value = q_max.get() 
    freq = float(value[2])/float(total_buckets) #calculate relative frequency
    value.append(freq) #append relative frequency to output 
    top_list.append(value)

top_list.reverse() #reverse list so highest value is first
    
print '\nTop %d product pairs:' %n_max
for value in top_list:
    print value

Overwriting reducer2.py


In [50]:
%%writefile mapreduce.sh
## mapreduce.sh
## Author: Jing Xu
## Description: mapreduce bash script for HW3.5

hdfs dfs -mkdir /user # create hdfs folder
wait
hdfs dfs -mkdir /user/jing # create hdfs folder
wait
hadoop fs -put ProductPurchaseData.txt /user/jing # upload local file to hdfs folder
wait

STARTTIME=$(date +%s)

# hadoop command to run streaming mapreduce job
hadoop jar /Users/JingXu/Documents/hadoop-2.6.3/share/hadoop/tools/lib/hadoop-streaming-2.6.3.jar \
-D mapred.job.name="Count Job via Streaming1" \
-D mapred.map.tasks=1 \
-D mapred.reduce.tasks=1 \
-file mapper.py    -mapper mapper.py \
-file reducer.py   -reducer reducer.py \
-input /user/jing/* -output /user/jing/test-output1/

wait

# second hadoop command to run streaming mapreduce job
# sort primarly by numeric descending in 2nd key, then alphabetically on 1st key
hadoop jar /Users/JingXu/Documents/hadoop-2.6.3/share/hadoop/tools/lib/hadoop-streaming-2.6.3.jar \
-D mapred.job.name="Count Job via Streaming2" \
-D mapred.map.tasks=1 \
-D mapred.reduce.tasks=1 \
-D stream.num.map.output.key.fields=2 \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options="-k2n -k1" \
-file mapper2.py    -mapper mapper2.py \
-file reducer2.py   -reducer reducer2.py \
-input /user/jing/test-output1/part-00000 -output /user/jing/test-output2/

wait

#calculate compute time for Pairs job
ENDTIME=$(date +%s)
echo "It takes $(($ENDTIME - $STARTTIME)) seconds to complete this task"
wait

hadoop dfs -cat /user/jing/test-output2/part-00000 #examine hdfs file output
wait

hdfs dfs -rmr /user/jing # remove hdfs folder
hdfs dfs -rmr /user # remove hdfs folder

Overwriting mapreduce.sh


In [51]:
!./mapreduce.sh

16/02/01 16:34:05 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 16:34:07 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 16:34:08 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 16:34:10 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/02/01 16:34:10 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [mapper.py, reducer.py] [] /var/folders/zs/k144hqks281fbt0x68c_zj9m0000gp/T/streamjob5610622673548507847.jar tmpDir=null
16/02/01 16:34:11 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/02/01 16:34:11 INFO jvm.JvmMetrics: Initial

**Single computer dual core, 1 mapper, 1 reducer**

**It takes 128 seconds to complete this task**

**Mapper1Counter**

    Instance=1
	Pairs=349720

**Reducer1Counter**

	Instances=1
	Pairs=12012

**Top 50 product pairs:**

['DAI62779', 'ELE17451', 1592, 0.05118806469245362]	
['FRO40251', 'SNA80324', 1412, 0.04540046943828173]	
['DAI75645', 'FRO40251', 1254, 0.04032024693739751]	
['FRO40251', 'GRO85051', 1213, 0.039001961351725026]	
['DAI62779', 'GRO73461', 1139, 0.03662261663612103]	
['DAI75645', 'SNA80324', 1130, 0.03633323687341243]	
['DAI62779', 'FRO40251', 1070, 0.03440403845535513]	
['DAI62779', 'SNA80324', 923, 0.029677502331114755]	
['DAI62779', 'DAI85309', 918, 0.029516735796276648]	
['ELE32164', 'GRO59710', 911, 0.029291662647503297]	
['FRO40251', 'GRO73461', 882, 0.02835921674544227]	
['DAI62779', 'DAI75645', 882, 0.02835921674544227]	
['DAI62779', 'ELE92920', 877, 0.02819845021060416]	
['FRO40251', 'FRO92469', 835, 0.026848011317964052]	
['DAI62779', 'ELE32164', 832, 0.026751551397061188]	
['DAI75645', 'GRO73461', 712, 0.022893154560946594]	
['DAI43223', 'ELE32164', 711, 0.022861001253978972]	
['DAI62779', 'GRO30386', 709, 0.02279669464004373]	
['ELE17451', 'FRO40251', 697, 0.022410854956432268]	
['DAI85309', 'ELE99737', 659, 0.021189029291662647]	
['DAI62779', 'ELE26917', 650, 0.020899649528954053]	
['GRO21487', 'GRO73461', 631, 0.02028873669656924]	
['DAI62779', 'SNA45677', 604, 0.019420597408443457]	
['ELE17451', 'SNA80324', 597, 0.019195524259670107]	
['DAI62779', 'GRO71621', 595, 0.019131217645734864]	
['DAI62779', 'SNA55762', 593, 0.01906691103179962]	
['DAI62779', 'DAI83733', 586, 0.01884183788302627]	
['ELE17451', 'GRO73461', 580, 0.018648918041220538]	
['GRO73461', 'SNA80324', 562, 0.01807015851580335]	
['DAI62779', 'GRO59710', 561, 0.01803800520883573]	
['DAI62779', 'FRO80039', 550, 0.01768431883219189]	
['DAI75645', 'ELE17451', 547, 0.017587858911289025]	
['DAI62779', 'SNA93860', 537, 0.01726632584161281]	
['DAI55148', 'DAI62779', 526, 0.016912639464968973]	
['DAI43223', 'GRO59710', 512, 0.01646249316742227]	
['ELE17451', 'ELE32164', 511, 0.016430339860454647]	
['DAI62779', 'SNA18336', 506, 0.01626957332561654]	
['ELE32164', 'GRO73461', 486, 0.015626507186264106]	
['DAI85309', 'ELE17451', 482, 0.01549789395839362]	
['DAI62779', 'FRO78087', 482, 0.01549789395839362]	
['DAI62779', 'GRO94758', 479, 0.015401434037490756]	
['GRO85051', 'SNA80324', 471, 0.015144207581749784]	
['DAI62779', 'GRO21487', 471, 0.015144207581749784]	
['ELE17451', 'GRO30386', 468, 0.015047747660846917]	
['FRO85978', 'SNA95666', 463, 0.01488698112600881]	
['DAI62779', 'FRO19221', 462, 0.014854827819041188]	
['DAI62779', 'GRO46854', 461, 0.014822674512073567]	
['DAI43223', 'DAI62779', 459, 0.014758367898138324]	
['ELE92920', 'SNA18336', 455, 0.014629754670267838]	
['DAI88079', 'FRO40251', 446, 0.014340374907559243]	