Name: Patrick Ng  
Email: patng@ischool.berkeley.edu  
Class: W261-2  
Week: 03  
Date of submission: Feb 01, 2016

## HW3.0.

What is a merge sort? Where is it used in Hadoop?  
How is  a combiner function in the context of Hadoop?   
Give an example where it can be used and justify why it should be used in the context of this problem.  
What is the Hadoop shuffle?

Merge sort is a sorting algorithm based on divide-n-conquer.  


1. Divide by finding the number qqq of the position midway between ppp and rrr. Do this step the same way we found the midpoint in binary search: add ppp and rrr, divide by 2, and round down.
2. Conquer by recursively sorting the subarrays in each of the two subproblems created by the divide step. That is, recursively sort the subarray array[p..q] and recursively sort the subarray array[q+1..r].
3. Combine by merging the two sorted subarrays back into the single sorted subarray array[p..r].  

In Hadoop, it is used in the *shuffle and sort* phase of a mapreduce job.  

In the context of Hadoop, a combiner acts as a mini-reducer which runs on the same node as the mapper.  It helps to reduce the size of the data which has to be transferred and processed at the shuffle and sort phase.  

One example of the use of combiner is in the basic Word Count problem.  The mapper will emit a < word, count > pair for every word encountered in the input.  Running a combiner will merge all the records which have the same words into a single record.  This greatly reduce the size of the data which Hadoop needs to shuffle and sort.

MapReduce makes the guarantee that the input to every reducer is sorted by key.
The process by which the system performs the sort—and transfers the map outputs to the reducers as inputs—is known as the shuffle.  It includes partition, sort and combine, both in memory and on disk.


## HW3.1 
Use Counters to do EDA (exploratory data analysis and to monitor progress)
Counters are lightweight objects in Hadoop that allow you to keep track of system progress in both the map and reduce stages of processing. By default, Hadoop defines a number of standard counters in "groups"; these show up in the jobtracker webapp, giving you information such as "Map input records", "Map output records", etc. 

While processing information/data using MapReduce job, it is a challenge to monitor the progress of parallel threads running across nodes of distributed clusters. Moreover, it is also complicated to distinguish between the data that has been processed and the data which is yet to be processed. The MapReduce Framework offers a provision of user-defined Counters, which can be effectively utilized to monitor the progress of data across nodes of distributed clusters.

Use the Consumer Complaints  Dataset provide here to complete this question:

     https://www.dropbox.com/s/vbalm3yva2rr86m/Consumer_Complaints.csv?dl=0

The consumer complaints dataset consists of diverse consumer complaints, which have been reported across the United States regarding various types of loans. The dataset consists of records of the form:

Complaint ID,Product,Sub-product,Issue,Sub-issue,State,ZIP code,Submitted via,Date received,Date sent to company,Company,Company response,Timely response?,Consumer disputed?


User-defined Counters

Now, let’s use Hadoop Counters to identify the number of complaints pertaining to debt collection, mortgage and other categories (all other categories get lumped into this one) in the consumer complaints dataset. Basically produce the distribution of the Product column in this dataset using counters (limited to 3 counters here).

Hadoop offers Job Tracker, an UI tool to determine the status and statistics of all jobs. Using the job tracker UI, developers can view the Counters that have been created. Screenshot your  job tracker UI as your job completes and include it here. Make sure that your user defined counters are visible. 

## Mapper

In [480]:
%%writefile mapper.py
#!/usr/bin/python
import sys
import re
import csv

# input comes from STDIN (standard input)
for fields in csv.reader(sys.stdin):

    # Skip header row
    if fields[0] == "Complaint ID":
        continue

    reason = fields[1].lower()
    
    if reason == "debt collection":
        counter = "debt"
    elif reason == "mortgage":
        counter = "mortgage"
    else:
        counter = "others"
        
    sys.stderr.write('reporter:counter:custom,' + counter + ',1\n')


Overwriting mapper.py


## Quick Test

In [481]:
# Quick test
!head -n 20 Consumer_Complaints.csv  | python mapper.py

reporter:counter:custom,debt,1
reporter:counter:custom,debt,1
reporter:counter:custom,others,1
reporter:counter:custom,debt,1
reporter:counter:custom,others,1
reporter:counter:custom,debt,1
reporter:counter:custom,others,1
reporter:counter:custom,debt,1
reporter:counter:custom,others,1
reporter:counter:custom,debt,1
reporter:counter:custom,others,1
reporter:counter:custom,others,1
reporter:counter:custom,debt,1
reporter:counter:custom,mortgage,1
reporter:counter:custom,others,1
reporter:counter:custom,others,1
reporter:counter:custom,debt,1
reporter:counter:custom,debt,1
reporter:counter:custom,debt,1


## Run it in Hadoop

In [2]:
# upload input file to hdfs
!hdfs dfs -rm -f Consumer_Complaints.csv
!hdfs dfs -put Consumer_Complaints.csv

16/01/30 17:26:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/01/30 17:26:54 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [16]:
# Hadoop streaming command
!hdfs dfs -rm -r hw3.1
!hadoop jar $HADOOP_INSTALL/share/hadoop/tools/lib/hadoop-*streaming*.jar \
-D mapred.reduce.tasks=0 \
-file mapper.py -mapper mapper.py \
-input Consumer_Complaints.csv \
-output hw3.1

16/01/30 17:40:10 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Deleted hw3.1
16/01/30 17:40:12 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/01/30 17:40:12 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [mapper.py, /var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/hadoop-unjar6298878087895734314/] [] /var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/streamjob2623231940027838210.jar tmpDir=null


### Result

![result](https://photos-3.dropbox.com/t/2/AAD96apEVb1NEyOodPWCtkdcs8K_w-nW4PJtqZe6LTUSCQ/12/15674996/png/32x32/1/_/1/2/Screenshot%202016-01-30%2017.42.35.png/EKi01gsYnS8gBygH/Hs_dhV-YD5vL1Ja5tyxj1tFInEhRb2H_uNNtVF96zBs?size=1024x768&size_mode=3)

## HW 3.2  - Part 1

Analyze the performance of your Mappers, Combiners and Reducers using Counters

For this brief study the Input file will be one record (the next line only): 
foo foo quux labs foo bar quux


Perform a word count analysis of this single record dataset using a Mapper and Reducer based WordCount (i.e., no combiners are used here) using user defined Counters to count up how many time the mapper and reducer are called. What is the value of your user defined Mapper Counter, and Reducer Counter after completing this word count job. The answer  should be 1 and 4 respectively. Please explain.

Please use mulitple mappers and reducers for these jobs (at least 2 mappers and 2 reducers).
Perform a word count analysis of the Issue column of the Consumer Complaints  Dataset using a Mapper and Reducer based WordCount (i.e., no combiners used anywhere)  using user defined Counters to count up how many time the mapper and reducer are called. What is the value of your user defined Mapper Counter, and Reducer Counter after completing your word count job. 

Perform a word count analysis of the Issue column of the Consumer Complaints  Dataset using a Mapper, Reducer, and standalone combiner (i.e., not an in-memory combiner) based WordCount using user defined Counters to count up how many time the mapper, combiner, reducer are called. What is the value of your user defined Mapper Counter, and Reducer Counter after completing your word count job. 
Using a single reducer: What are the top 50 most frequent terms in your word count analysis? Present the top 50 terms and their frequency and their relative frequency. Present the top 50 terms and their frequency and their relative frequency. If there are ties please sort the tokens in alphanumeric/string order. Present bottom 10 tokens (least frequent items). 

In [377]:
# Prepare input file
!echo "foo foo quux labs foo bar quux" > input.txt
!hdfs dfs -rm -r input.txt
!hdfs dfs -put input.txt

16/02/01 15:06:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Deleted input.txt
16/02/01 15:06:51 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Mapper and Reducer

In [818]:
%%writefile mapper_3.2_1.py
#!/usr/bin/python
import sys
import re
import csv

sys.stderr.write('reporter:counter:custom,mapper_called,1\n')

# input comes from STDIN (standard input)
for words in csv.reader(sys.stdin, delimiter=' '):
    for word in words:
        print word + "\t1" 
            

Overwriting mapper_3.2_1.py


In [821]:
%%writefile reducer_3.2.py
#!/usr/bin/python
import sys
import csv

wordCount = 0
cur = None # the current word

sys.stderr.write('reporter:counter:custom,reducer_called,1\n')

# input comes from STDIN
for (word, count) in csv.reader(sys.stdin, delimiter='\t'):
    count = int(count)
    
    # If we have encountered a new word, output the answer of the current word
    if cur != word:
        if cur is not None:
            print "%s\t%d" % (cur, wordCount)
            wordCount = 0
            
    wordCount += count
    cur = word


# Output for the last word seen
if cur is not None:
    print "%s\t%d" % (cur, wordCount)

Writing reducer_3.2.py


### Quick Test

In [822]:
!cat input.txt | python mapper_3.2_1.py | sort | python reducer_3.2.py

reporter:counter:custom,mapper_called,1
reporter:counter:custom,reducer_called,1
bar	1
foo	3
labs	1
quux	2


### Run in Hadoop

In [823]:
!hdfs dfs -rm -r hw3.2-part1

!hadoop jar $HADOOP_INSTALL/share/hadoop/tools/lib/hadoop-*streaming*.jar \
-files mapper_3.2_1.py,reducer_3.2.py \
-mapper mapper_3.2_1.py \
-reducer reducer_3.2.py \
-numReduceTasks 4 \
-input input.txt \
-output hw3.2-part1

16/02/03 10:52:05 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Deleted hw3.2-part1
16/02/03 10:52:07 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [/var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/hadoop-unjar1700378626734870731/] [] /var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/streamjob5817016397314229988.jar tmpDir=null


In [824]:
# Show result
!hdfs dfs -cat hw3.2-part1/part-*

16/02/03 10:52:38 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
quux	2
foo	3
bar	1
labs	1


### Result Part 1

The mapper was called once, because there is only one line of input.  
The reducer was called four times, because we specified 4 reducers in the option.


## HW 3.2  - Part 2

Please use mulitple mappers and reducers for these jobs (at least 2 mappers and 2 reducers).
Perform a word count analysis of the Issue column of the Consumer Complaints  Dataset using a Mapper and Reducer based WordCount (i.e., no combiners used anywhere)  using user defined Counters to count up how many time the mapper and reducer are called. What is the value of your user defined Mapper Counter, and Reducer Counter after completing your word count job. 

### Mapper (re-use last Reducer)

In [825]:
%%writefile mapper_3.2.py
#!/usr/bin/python
import sys
import re
import csv

# Word delimiters are space, \, comma and "
regex = re.compile(r"[\s/\",]+")

sys.stderr.write('reporter:counter:custom,mapper_called,1\n')

# input comes from STDIN (standard input)
for fields in csv.reader(sys.stdin):

    # Skip header row
    if fields[0] == "Complaint ID":
        continue
    
    words = filter(None, regex.split(fields[3]))
    for word in words:
        print word + "\t1" 
            

Writing mapper_3.2.py


### Quick Test

In [826]:
!head -n 100 Consumer_Complaints.csv | python mapper_3.2.py | \
sort -k1,1 | \
python reducer_3.2.py

reporter:counter:custom,reducer_called,1
reporter:counter:custom,mapper_called,1
ATM	1
Account	1
Advertising	1
Balance	1
Communication	6
Cont'd	19
Credit	8
Deposits	5
Disclosure	7
Embezzlement	1
False	1
Fraud	1
Identity	1
Improper	3
Incorrect	29
Loan	5
Managing	2
Problems	1
Taking	1
Unable	7
Using	1
a	1
account	2
and	6
attempts	19
being	1
by	1
card	1
caused	1
closing	1
collect	19
collection	3
company's	7
contact	2
credit	44
debit	1
debt	26
escrow	2
foreclosure	3
funds	1
get	7
identity	1
info	2
information	29
investigation	7
lease	3
loan	3
low	1
management	1
marketing	1
modification	3
monitoring	1
my	2
not	19
of	10
on	29
opening	1
or	9
out	1
owed	19
payments	2
protection	1
report	37
reporting	7
representation	1
score	7
servicing	2
sharing	2
statements	1
tactics	6
the	3
theft	1
to	7
transfer	1
use	1
verification	7
withdrawals	5


### Run in Hadoop

In [828]:
!hdfs dfs -rm -r hw3.2-part2


# Use 4 mappers and 4 reducers
!hadoop jar $HADOOP_INSTALL/share/hadoop/tools/lib/hadoop-*streaming*.jar \
-files mapper_3.2.py,reducer_3.2.py \
-D mapreduce.job.name="HW3.2-part2" \
-D mapred.map.tasks=4 \
-numReduceTasks 4 \
-mapper mapper_3.2.py \
-reducer reducer_3.2.py \
-input Consumer_Complaints.csv \
-output hw3.2-part2

16/02/03 10:54:04 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Deleted hw3.2-part2
16/02/03 10:54:06 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [/var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/hadoop-unjar1434959601069990937/] [] /var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/streamjob6662875735736771107.jar tmpDir=null


### Result Part 2

The mapper was called 4 times.  
The reducer was called 4 times.

## HW3.2 - Part 3

Perform a word count analysis of the Issue column of the Consumer Complaints  Dataset using a Mapper, Reducer, and standalone combiner (i.e., not an in-memory combiner) based WordCount using user defined Counters to count up how many time the mapper, combiner, reducer are called. What is the value of your user defined Mapper Counter, and Reducer Counter after completing your word count job.

### Combiner (Note: Re-use the previous mapper and reducer)

In [829]:
%%writefile combiner_3.2.py
#!/usr/bin/python
import sys
import csv

wordCount = 0
curr = None # the current word

sys.stderr.write('reporter:counter:custom,combiner_called,1\n')

# input comes from STDIN
for (word, count) in csv.reader(sys.stdin, delimiter='\t'):
    count = int(count)
    
    # If we have encountered a new word, output the answer of the current word
    if curr != word:
        if curr is not None:
            print "%s\t%d" % (curr, wordCount)
            wordCount = 0
            
    wordCount += count
    curr = word

# Output for the last word seen
if curr is not None:
    print "%s\t%d" % (curr, wordCount)

Writing combiner_3.2.py


### Quick Test

In [830]:
!head -n 100 Consumer_Complaints.csv | python mapper_3.2.py | \
python combiner_3.2.py | sort -k1,1 | \
python reducer_3.2.py

reporter:counter:custom,mapper_called,1
reporter:counter:custom,reducer_called,1
reporter:counter:custom,combiner_called,1
ATM	1
Account	1
Advertising	1
Balance	1
Communication	6
Cont'd	19
Credit	8
Deposits	5
Disclosure	7
Embezzlement	1
False	1
Fraud	1
Identity	1
Improper	3
Incorrect	29
Loan	5
Managing	2
Problems	1
Taking	1
Unable	7
Using	1
a	1
account	2
and	6
attempts	19
being	1
by	1
card	1
caused	1
closing	1
collect	19
collection	3
company's	7
contact	2
credit	44
debit	1
debt	26
escrow	2
foreclosure	3
funds	1
get	7
identity	1
info	2
information	29
investigation	7
lease	3
loan	3
low	1
management	1
marketing	1
modification	3
monitoring	1
my	2
not	19
of	10
on	29
opening	1
or	9
out	1
owed	19
payments	2
protection	1
report	37
reporting	7
representation	1
score	7
servicing	2
sharing	2
statements	1
tactics	6
the	3
theft	1
to	7
transfer	1
use	1
verification	7
withdrawals	5


In [831]:
!hdfs dfs -rm -r hw3.2-part3

!hadoop jar $HADOOP_INSTALL/share/hadoop/tools/lib/hadoop-*streaming*.jar \
-files mapper_3.2.py,combiner_3.2.py,reducer_3.2.py \
-D mapreduce.job.name="HW3.2-part3" \
-D mapred.map.tasks=4 \
-numReduceTasks 4 \
-mapper mapper_3.2.py \
-combiner combiner_3.2.py \
-reducer reducer_3.2.py \
-input Consumer_Complaints.csv \
-output hw3.2-part3

16/02/03 10:55:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Deleted hw3.2-part3
16/02/03 10:55:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [/var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/hadoop-unjar6033747649411275576/] [] /var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/streamjob8819446693044245170.jar tmpDir=null


### Result Part 3

The mapper was called 4 times.  
The combiner was called 16 times.  
The reducer was called 4 times.

## HW 3.2 Part 4

Using a single reducer: What are the top 50 most frequent terms in your word count analysis? Present the top 50 terms and their frequency and their relative frequency. If there are ties please sort the tokens in alphanumeric/string order. Present bottom 10 tokens (least frequent items). 

### Mapper and Reducer

In [807]:
%%writefile mapper_3.2_4.py
#!/usr/bin/python
import sys
import re
import csv

sys.stderr.write('reporter:counter:custom,mapper_called,1\n')

# input comes from STDIN (standard input)
for line in sys.stdin:
    (word, count) = line.strip().split("\t")

    print line.strip()

    # Use order inversion so that reducer can get the total word count in a single pass.
    # That is for calculating the relative frequenc for each word.
    print count + "\t" + str(sys.maxint)

Writing mapper_3.2_4.py


In [808]:
%%writefile reducer_3.2_4.py
#!/usr/bin/python
from __future__ import division # Use Python 3-style division
import sys, Queue, csv

sys.stderr.write('reporter:counter:custom,reducer_called,1\n')

n_max, n_min = 50, 10
q_min = Queue.Queue(n_min)
a_max = []

def updateResult(word, count, freq):
    global n_max, q_min, a_max
    
    rec = [word, count, freq]
    # put the biggest
    if len(a_max) < n_max:
        a_max.append(rec)
    
    # whatever left is the smallest
    if q_min.full():
        q_min.get()
    q_min.put(rec)
    
wordCount = 0 # Count of each word
totalCount = 0 # Total number of words
curr = None # the current word

# input comes from STDIN
for fields in csv.reader(sys.stdin, delimiter='\t'):
    word = fields[0]
    count = fields[1]
    count = int(count)

    # Find out the total word count.
    # We use count == sys.maxint as the special key for order inversion.
    if count == sys.maxint:
        totalCount += int(word) # The word is the count
        continue
    
    # If we have encountered a new word, output the answer of the current word
    if curr != word:
        if curr is not None:
            updateResult(curr, wordCount, wordCount/totalCount)
            wordCount = 0
            
    wordCount += count
    curr = word

# Handle the last word seen
if curr is not None:
    updateResult(curr, wordCount, wordCount/totalCount)
    
# Output the result
print "Top %d words" % n_max
for rec in a_max:
    print "%s\t%d\t%f" % tuple(rec)
    
print
print "Bottom %d words" % n_min
while not q_min.empty():
    print "%s\t%d\t%f" % tuple(q_min.get())


Writing reducer_3.2_4.py


### Quick test

In [833]:
# Prepare test input
!hdfs dfs -cat hw3.2-part3/part-* > hw3.2-part4-input.txt

16/02/03 10:56:33 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [834]:
!cat hw3.2-part4-input.txt | python mapper_3.2_4.py | \
sort -t$'\t' -k2,2nr -k1,1 | python reducer_3.2_4.py 

reporter:counter:custom,mapper_called,1
reporter:counter:custom,reducer_called,1
Top 50 words
Loan	107254	0.079547
collection	70487	0.052278
foreclosure	70487	0.052278
modification	70487	0.052278
account	40893	0.030329
or	40508	0.030044
credit	40483	0.030025
payments	39993	0.029662
escrow	36767	0.027269
servicing	36767	0.027269
report	34903	0.025887
Incorrect	29133	0.021607
information	29069	0.021560
on	29069	0.021560
debt	26531	0.019677
not	18477	0.013704
Cont'd	17972	0.013329
attempts	17972	0.013329
collect	17972	0.013329
owed	17972	0.013329
Account	16555	0.012278
and	16448	0.012199
closing	16205	0.012019
management	16205	0.012019
opening	16205	0.012019
Credit	14768	0.010953
of	13983	0.010371
loan	12376	0.009179
my	10731	0.007959
Deposits	10555	0.007828
withdrawals	10555	0.007828
Problems	9484	0.007034
Application	8868	0.006577
Communication	8671	0.006431
tactics	8671	0.006431
broker	8625	0.006397
mortgage	8625	0.006397
originator	8625	0.006397
to	8401	0.006231
Billing	8158	0.006051


### Run it in Hadoop

In [835]:
!hdfs dfs -rm -r hw3.2-part4

# For sorting, use the 2nd field count as the primary key, in numeric, reverse order, and
# use 1st field as our secondary sort.
!hadoop jar $HADOOP_INSTALL/share/hadoop/tools/lib/hadoop-*streaming*.jar \
-D mapreduce.job.name="HW3.2-part4" \
-D mapred.reduce.tasks=1 \
-D stream.num.map.output.key.fields=2 \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options="-k2,2nr -k1,1" \
-file mapper_3.2_4.py -mapper mapper_3.2_4.py \
-file reducer_3.2_4.py -reducer reducer_3.2_4.py \
-input hw3.2-part3/part-* \
-output hw3.2-part4


16/02/03 10:57:07 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Deleted hw3.2-part4
16/02/03 10:57:09 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/02/03 10:57:09 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [mapper_3.2_4.py, reducer_3.2_4.py, /var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/hadoop-unjar3871592559344650025/] [] /var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/streamjob2016275701778847096.jar tmpDir=null


### Result for Part 4

In [836]:
!hdfs dfs -cat hw3.2-part4/part-00000

16/02/03 10:58:16 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Top 50 words	
Loan	107254	0.079547
collection	70487	0.052278
foreclosure	70487	0.052278
modification	70487	0.052278
account	40893	0.030329
or	40508	0.030044
credit	40483	0.030025
payments	39993	0.029662
escrow	36767	0.027269
servicing	36767	0.027269
report	34903	0.025887
Incorrect	29133	0.021607
information	29069	0.021560
on	29069	0.021560
debt	26531	0.019677
not	18477	0.013704
Cont'd	17972	0.013329
attempts	17972	0.013329
collect	17972	0.013329
owed	17972	0.013329
Account	16555	0.012278
and	16448	0.012199
closing	16205	0.012019
management	16205	0.012019
opening	16205	0.012019
Credit	14768	0.010953
of	13983	0.010371
loan	12376	0.009179
my	10731	0.007959
Deposits	10555	0.007828
withdrawals	10555	0.007828
Problems	9484	0.007034
Application	8868	0.006577
Communication	8671	0.006431
tactics	8671	0.006431
broker	8625	0.006397
mortgage	8625	0.006

## HW3.2.1 OPTIONAL 
Using 2 reducers: What are the top 50 most frequent terms in your word count analysis? Present the top 50 terms and their frequency and their relative frequency. Present the top 50 terms and their frequency and their relative frequency. If there are ties please sort the tokens in alphanumeric/string order. Present bottom 10 tokens (least frequent items). 

### First create a job to generate the partition file

In [844]:
%%writefile mapper_3.2.1_1.py
#!/usr/bin/python
import sys
import re
import csv
import numpy as np

# The input is the result from hw3.2-part3, which has the format:
# word<tab>count

n = 0
rate = 5 # sample one out of every five counts
samples = []

# We want to sample the count
for line in sys.stdin:
    (word, count) = line.strip().split("\t")

    if n % rate == 0:
        samples.append(int(count))
        
    n += 1
    
# Now we have a sample of counts.  Let's find the 50% percentile, as we only have 2 reducers.
p50 = np.percentile(samples, 50)

print p50

Overwriting mapper_3.2.1_1.py


In [843]:
# Quick test
!hdfs dfs -cat hw3.2-part3/part* | python mapper_3.2.1_1.py > partitions.txt
!cat partitions.txt

16/02/03 11:54:48 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
3095.0


In [870]:
%%writefile mapper_3.2.1_2.py
#!/usr/bin/python
import sys
import re
import csv
import argparse

parser = argparse.ArgumentParser()
parser.add_argument("--partitionFile", default="partitions.txt")

args = parser.parse_args()

sys.stderr.write('reporter:counter:custom,mapper_called,1\n')


# The input is the result from hw3.2-part3, which has the format:
# word<tab>count

# First read the partition file and get the 50th percentile
with open(args.partitionFile, 'r') as f:
    p50 = float(f.readline())

groups = ["g" + str(x) for x in range(2)] # names of the two groups

for line in sys.stdin:
    (word, count) = line.strip().split("\t")
    count = int(count)

    # Assign it to different reducers based on the count value
    if count >= p50:
        group = groups[0]
    else:
        group = groups[1]
        
    print group + "\t" + line.strip()

    # Use order inversion so that reducer can count the total word count in a single pass
    # Need to send it to each group
    for g in groups:
        print g + "\t" + str(count) + "\t" + str(sys.maxint)

Overwriting mapper_3.2.1_2.py


In [873]:
# Quick test
!hdfs dfs -cat hw3.2-part3/part* > t
!cat t | python mapper_3.2.1_1.py > partitions.txt

print "partitions.txt:"
!cat partitions.txt
print

!head -n 30 t | \
python mapper_3.2.1_2.py | \
sort -k1,1 -k3,3nr


16/02/03 12:24:45 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
partitions.txt:
3095.0

reporter:counter:custom,mapper_called,1
g0	1061	9223372036854775807
g0	1098	9223372036854775807
g0	1155	9223372036854775807
g0	1343	9223372036854775807
g0	139	9223372036854775807
g0	139	9223372036854775807
g0	16205	9223372036854775807
g0	163	9223372036854775807
g0	16555	9223372036854775807
g0	17972	9223372036854775807
g0	1999	9223372036854775807
g0	240	9223372036854775807
g0	26531	9223372036854775807
g0	274	9223372036854775807
g0	2795	9223372036854775807
g0	29133	9223372036854775807
g0	3226	9223372036854775807
g0	350	9223372036854775807
g0	3503	9223372036854775807
g0	40893	9223372036854775807
g0	4357	9223372036854775807
g0	4858	9223372036854775807
g0	5663	9223372036854775807
g0	5663	9223372036854775807
g0	640	9223372036854775807
g0	6938	9223372036854775807
g0	75	9223372036854775807
g0	925	9223372036854775807
g0	929	9

In [888]:
%%writefile reducer_3.2.1.py
#!/usr/bin/python
from __future__ import division # Use Python 3-style division
import sys, Queue, csv

sys.stderr.write('reporter:counter:custom,reducer_called,1\n')
    
wordCount = 0 # Count of each word
totalCount = 0 # Total number of words
curr = None # the current word

# input format: 
# Total count: group \t count \t max.int
# Word: group \t word \t count
for fields in csv.reader(sys.stdin, delimiter='\t'):
    group = fields[0]
    word = fields[1]
    count = fields[2]
    count = int(count)

    # Find out the total word count.
    # We use count == sys.maxint as the special key for order inversion.
    if count == sys.maxint:
        totalCount += int(word) # The word is the count
        continue
    
    # If we have encountered a new word, output the answer of the current word
    if curr != word:
        if curr is not None:
            print "\t".join([group, curr, str(wordCount), str(wordCount/totalCount)])
            wordCount = 0
            
    wordCount += count
    curr = word

# Handle the last word seen
if curr is not None:
    print "\t".join([group, curr, str(wordCount), str(wordCount/totalCount)])
    

Overwriting reducer_3.2.1.py


In [889]:
# Quick test
!hdfs dfs -cat hw3.2-part3/part* > t
!cat t | python mapper_3.2.1_1.py > partitions.txt

print "partitions.txt:"
!cat partitions.txt
print

!head -n 30 t | \
python mapper_3.2.1_2.py | \
sort -k1,1 -k3,3nr | \
python reducer_3.2.1.py


16/02/03 14:04:19 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
partitions.txt:
3095.0

reporter:counter:custom,reducer_called,1
reporter:counter:custom,mapper_called,1
g0	account	40893	0.209793761543
g0	Incorrect	29133	0.149461317464
g0	debt	26531	0.13611225118
g0	Cont'd	17972	0.0922019289965
g0	Account	16555	0.0849322799097
g0	closing	16205	0.083136671455
g0	disputes	6938	0.035594089883
g0	by	5663	0.0290529447979
g0	caused	5663	0.0290529447979
g0	company's	4858	0.0249230453519
g0	Unable	4357	0.0223527601067
g0	a	3503	0.0179714754771
g1	Making	3226	0.00827518982147
g1	Closing	2795	0.00716960804433
g1	Can't	1999	0.00512774471578
g1	Debt	1343	0.00344500307819
g1	Payoff	1155	0.00296275395034
g1	issue	1098	0.00281654011902
g1	Delinquent	1061	0.00272162938641
g1	for	929	0.00238302893495
g1	I	925	0.00237276831521
g1	issuance	640	0.00164169915863
g1	Workout	350	0.000897804227375
g1	available	274	0.00070285245

In [876]:
# First job - generate partitions file
!hdfs dfs -rm -r hw3.2.1-part1

!hadoop jar $HADOOP_INSTALL/share/hadoop/tools/lib/hadoop-*streaming*.jar \
-D mapreduce.job.name="hw3.2.1-part1" \
-D mapred.map.tasks=1 \
-D mapred.reduce.tasks=0 \
-file mapper_3.2.1_1.py -mapper mapper_3.2.1_1.py \
-input hw3.2-part3/part* \
-output hw3.2.1-part1

16/02/03 12:28:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
rm: `hw3.2.1-part1': No such file or directory
reporter:counter:custom,reducer_called,1
reporter:counter:custom,mapper_called,1
account	40893	0.209793761543
Incorrect	29133	0.149461317464
debt	26531	0.13611225118
Cont'd	17972	0.0922019289965
Account	16555	0.0849322799097
closing	16205	0.083136671455
disputes	6938	0.035594089883
by	5663	0.0290529447979
caused	5663	0.0290529447979
company's	4858	0.0249230453519
Unable	4357	0.0223527601067
a	3503	0.0179714754771
Making	3226	0.00827518982147
Closing	2795	0.00716960804433
Can't	1999	0.00512774471578
Debt	1343	0.00344500307819
Payoff	1155	0.00296275395034
issue	1098	0.00281654011902
Delinquent	1061	0.00272162938641
for	929	0.00238302893495
I	925	0.00237276831521
issuance	640	0.00164169915863
Workout	350	0.000897804227375
available	274	0.000702852452288
Cash	240	0.000615637184486
acct	163	0.000418

In [897]:
!hdfs dfs -rm -r hw3.2.1-part2

!hadoop jar $HADOOP_INSTALL/share/hadoop/tools/lib/hadoop-*streaming*.jar \
-files "hdfs://localhost:8020/user/patrickng/hw3.2.1-part1/part-00000#partitions.txt" \
-D mapreduce.job.name="hw3.2.1-part2" \
-D mapred.map.tasks=1 \
-D mapred.reduce.tasks=2 \
-D mapreduce.partition.keypartitioner.options=-k1,1 \
-D stream.num.map.output.key.fields=3 \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options="-k3,3nr -k2,2" \
-file mapper_3.2.1_2.py -mapper mapper_3.2.1_2.py \
-file reducer_3.2.1.py -reducer reducer_3.2.1.py \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-input hw3.2-part3/part* \
-output hw3.2.1-part2

16/02/03 14:25:16 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Deleted hw3.2.1-part2
16/02/03 14:25:18 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/03 14:25:18 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [mapper_3.2.1_2.py, reducer_3.2.1.py, /var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/hadoop-unjar2352388023510399363/] [] /var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/streamjob5675893172764108239.jar tmpDir=null


## HW3.3. 

Shopping Cart Analysis  
Product Recommendations: The action or practice of selling additional products or services 
to existing customers is called cross-selling. Giving product recommendation is 
one of the examples of cross-selling that are frequently used by online retailers. 
One simple method to give product recommendations is to recommend products that are frequently
browsed together by the customers.  

For this homework use the online browsing behavior dataset located at: 

       https://www.dropbox.com/s/zlfyiwa70poqg74/ProductPurchaseData.txt?dl=0

Each line in this dataset represents a browsing session of a customer. 
On each line, each string of 8 characters represents the id of an item browsed during that session.   
The items are separated by spaces.


Do some exploratory data analysis of this dataset. 

How many unique items are available from this supplier?

Using a single reducer: Report your findings such as number of unique products; largest basket; report the top 50 most frequently purchased items,  their frequency,  and their relative frequency (break ties by sorting the products alphabetical order) etc. using Hadoop Map-Reduce. 

In [539]:
%%writefile mapper1.py
#!/usr/bin/python
import sys
import re

# input comes from STDIN (standard input)
for line in sys.stdin:
    items = line.strip().split()
    
    # Use order inversion to help calculate the largest basket
    print "#\t" + str(len(items)) # Use # as the special key

    for item in items:
        print item + "\t1"


Overwriting mapper1.py


In [540]:
%%writefile reducer1.py
#!/usr/bin/python
from __future__ import division # Use Python 3-style division
import sys

largestBasket = None # Largest basket seen
uniqueCount = 0 # Number of unqiue items
itemCount = 0 # Item count
curr = None # The current item
        
# input comes from STDIN
for line in sys.stdin:
    (item, count) = line.strip().split('\t')
    count = int(count)

    # Find out the largest basket size.  The first set of input is for largest basket.
    if largestBasket is None:
        largestBasket = count
        
    # Skip all the largest basket records
    if item == "#":
        continue
            
    # If we have encountered a new item, output the count of the last item
    if curr is not None and curr != item:
        print "%d\t%s" % (itemCount, curr)        
        # Use order inversion to calculate the total number of items
        print "%d\t%d" % (sys.maxint, itemCount)
        # Increase the number of unique items
        uniqueCount += 1       
        itemCount = 0
            
    itemCount += count
    curr = item

# Handle the last item seen
if curr is not None:
    print "%d\t%s" % (itemCount, curr)        
    # Use order inversion to calculate the total number of items
    print "%d\t%d" % (sys.maxint, itemCount)
    # Increase the number of unique items
    uniqueCount += 1       
    itemCount = 0
    
# Report the "Largest Basket" and "Number of unique items"
print "%d\t%d" % (sys.maxint-1, largestBasket)
print "%d\t%d" % (sys.maxint-2, uniqueCount)


Overwriting reducer1.py


In [542]:
%%writefile reducer2.py
#!/usr/bin/python
from __future__ import division # Use Python 3-style division
import sys

totalCount = 0 # Total number of items
itemPrinted = 0
itemCount = 0
curr = None # The current item
        
# input comes from STDIN
for line in sys.stdin:
    (count, item) = line.strip().split('\t')
    try:
        count = int(count)
    except:
        print "Unexpected error:", sys.exc_info()[0]
        continue
    
    # The first part is for calculating the total number of items
    if count == sys.maxint:
        totalCount += int(item)
        continue
        
    if count == sys.maxint - 1:
        print "Largest basket size:", item
        print
        continue
        
    if count == sys.maxint - 2:
        print "Number of unique items:", item
        print
        continue

    # If we have encountered a new item, output the count of the current item
    if curr is not None and curr != item:
        if itemPrinted < 50:
            print "%s\t%d\t%f" % (curr, itemCount, itemCount/totalCount) 
            itemPrinted += 1
            
        itemCount = 0
    
    itemCount += count
    curr = item

    
# Handle the last item seen
if itemPrinted < 50 and curr is not None:
    print "%s\t%d\t%f" % (curr, itemCount, itemCount/totalCount)     


Overwriting reducer2.py


### Quick Test

In [547]:
!head -n 4 ProductPurchaseData.txt | \
python mapper1.py | \
sort -t$'\t' -k1,1 -k2,2nr | \
python reducer1.py | \
/bin/cat | \
sort -t$'\t' -k1,1nr -k2,2 | \
python reducer2.py 

Largest basket size: 12

Number of unique items: 22

ELE17451	4	0.142857
FRO86643	2	0.071429
GRO99222	2	0.071429
SNA11465	2	0.071429
DAI22896	1	0.035714
ELE23393	1	0.035714
ELE26917	1	0.035714
ELE37798	1	0.035714
ELE52966	1	0.035714
ELE89019	1	0.035714
ELE91550	1	0.035714
FRO11987	1	0.035714
FRO12685	1	0.035714
FRO84225	1	0.035714
FRO90334	1	0.035714
GRO12298	1	0.035714
GRO56989	1	0.035714
GRO73461	1	0.035714
SNA30755	1	0.035714
SNA80192	1	0.035714
SNA90258	1	0.035714
SNA99873	1	0.035714


### Run it in Hadoop

In [269]:
# upload input file to hdfs
!hdfs dfs -rm -f ProductPurchaseData.txt
!hdfs dfs -put ProductPurchaseData.txt

16/01/31 23:09:40 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/01/31 23:09:42 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [544]:
# First job
!hdfs dfs -rm -r hw3.3-part1

!hadoop jar $HADOOP_INSTALL/share/hadoop/tools/lib/hadoop-*streaming*.jar \
-D mapred.map.tasks=2 \
-D mapred.reduce.tasks=1 \
-D stream.num.map.output.key.fields=2 \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options="-k1,1 -k2,2nr" \
-file mapper1.py -mapper mapper1.py \
-file reducer1.py -reducer reducer1.py \
-input ProductPurchaseData.txt \
-output hw3.3-part1

16/02/02 16:18:31 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Deleted hw3.3-part1
16/02/02 16:18:33 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/02/02 16:18:33 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [mapper1.py, reducer1.py, /var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/hadoop-unjar6220574856817021032/] [] /var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/streamjob7573617829780053436.jar tmpDir=null


In [545]:
# Second job, using the result from first job as input.
# We use /bin/cat as the Identity Mapper
!hdfs dfs -rm -r hw3.3-part2

!hadoop jar $HADOOP_INSTALL/share/hadoop/tools/lib/hadoop-*streaming*.jar \
-D mapred.map.tasks=1 \
-D mapred.reduce.tasks=1 \
-D stream.num.map.output.key.fields=2 \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options="-k1,1nr -k2,2" \
-mapper /bin/cat \
-file reducer2.py -reducer reducer2.py \
-input hw3.3-part1/part-* \
-output hw3.3-part2

16/02/02 16:19:20 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Deleted hw3.3-part2
16/02/02 16:19:22 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/02/02 16:19:23 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [reducer2.py, /var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/hadoop-unjar7447411094773386909/] [] /var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/streamjob2637008777364281477.jar tmpDir=null


### Output Result

In [546]:
!hdfs dfs -cat hw3.3-part2/part-00000

16/02/02 16:20:13 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Largest basket size: 37	
	
Number of unique items: 12592	
	
DAI62779	6667	0.017507
FRO40251	3881	0.010191
ELE17451	3875	0.010175
GRO73461	3602	0.009458
SNA80324	3044	0.007993
ELE32164	2851	0.007486
DAI75645	2736	0.007184
SNA45677	2455	0.006447
FRO31317	2330	0.006118
DAI85309	2293	0.006021
ELE26917	2292	0.006019
FRO80039	2233	0.005864
GRO21487	2115	0.005554
SNA99873	2083	0.005470
GRO59710	2004	0.005262
GRO71621	1920	0.005042
FRO85978	1918	0.005036
GRO30386	1840	0.004832
ELE74009	1816	0.004769
GRO56726	1784	0.004685
DAI63921	1773	0.004656
GRO46854	1756	0.004611
ELE66600	1713	0.004498
DAI83733	1712	0.004496
FRO32293	1702	0.004469
ELE66810	1697	0.004456
SNA55762	1646	0.004322
DAI22177	1627	0.004272
FRO78087	1531	0.004020
ELE99737	1516	0.003981
ELE34057	1489	0.003910
GRO94758	1489	0.003910
FRO35904	1436	0.003771
FRO53271	1420	0.003729
SNA93860	1

## 3.3.1 OPTIONAL 
Using 2 reducers:  Report your findings such as number of unique products; largest basket; report the top 50 most frequently purchased items,  their frequency,  and their relative frequency (break ties by sorting the products alphabetical order) etc. using Hadoop Map-Reduce. 

In [None]:
?????

## HW3.4. Pairs

Suppose we want to recommend new products to the customer based on the products they
have already browsed on the online website. Write a map-reduce program 
to find products which are frequently browsed together. Fix the support count (cooccurence count) to s = 100 
(i.e. product pairs need to occur together at least 100 times to be considered frequent) 
and find pairs of items (sometimes referred to itemsets of size 2 in association rule mining) that have a support count of 100 or more.

List the top 50 product pairs with corresponding support count (aka frequency), and relative frequency or support (the number of records where they coccur/the number of baskets in the dataset) in decreasing order of support  for frequent (100>count) itemsets of size 2. 

Use the Pairs pattern (lecture 3) to  extract these frequent itemsets of size 2. Free free to use combiners if they bring value. Instrument your code with counters for count the number of times your mapper, combiner and reducers are called.  

Please output records of the following form for the top 50 pairs (itemsets of size 2): 

      item1, item2, support count, support



Fix the ordering of the pairs lexicographically (left to right), 
and break ties in support (between pairs, if any exist) 
by taking the first ones in lexicographically increasing order. 

Report  the compute time for the Pairs job. Describe the computational setup used (E.g., single computer; dual core; linux, number of mappers, number of reducers)
Instrument your mapper, combiner, and reducer to count how many times each is called using Counters and report these counts.

### Mapper and Reducer (part 1)

In [798]:
%%writefile mapper_3.4_1.py
#!/usr/bin/python
import sys
import re
import itertools

sys.stderr.write('reporter:counter:custom,mapper_called,1\n')

# input comes from STDIN (standard input)
for line in sys.stdin:
    items = line.strip().split()
    
    # Use order inversion to help calculate the total # of baskets
    print "#\t\t1" # Use # as the special key
    
    # Output all co-occuring pairs.
    for subset in itertools.combinations(sorted(set(items)), 2):
        print "%s\t%s\t1" % (subset[0], subset[1])

Writing mapper_3.4_1.py


In [799]:
%%writefile reducer_3.4_1.py
#!/usr/bin/python
from __future__ import division # Use Python 3-style division
import sys
import argparse

parser = argparse.ArgumentParser()
parser.add_argument("--threshold", type=int, default=100)
args = parser.parse_args()
supportThreshold = args.threshold

sys.stderr.write('reporter:counter:custom,reducer_called,1\n')

supportCount = 0 # Support count of each pair
totalCount = 0 # Total number of baskets
lastPair = None # The pair previously seen

# input comes from STDIN
for line in sys.stdin:
    (item1, item2, count) = line.strip().split('\t')
    count = int(count)

    # Find out the total basket count.
    # We use # as the special key for order inversion.
    if item1 == "#":
        totalCount += count
        continue
        
    pair = [item1, item2]
    
    # If we have encountered a new pair, output the count of the last pair
    if lastPair and lastPair != pair:
        if supportCount >= supportThreshold:
            print "%s\t%s\t%d\t%f" % (lastPair[0], lastPair[1], supportCount, supportCount / totalCount)
        supportCount = 0
            
    supportCount += count
    lastPair = pair

# Handle the last pair seen
if lastPair is not None:
    if supportCount >= supportThreshold:
        print "%s\t%s\t%d\t%f" % (lastPair[0], lastPair[1], supportCount, supportCount / totalCount)

Writing reducer_3.4_1.py


### Mapper and Reducer (Part 2)

In [800]:
%%writefile mapper_3.4_2.py
#!/usr/bin/python
import sys
import re

sys.stderr.write('reporter:counter:custom,mapper_called,1\n')

# input comes from STDIN (standard input)
for line in sys.stdin:
    (item1, item2, count, support) = line.strip().split("\t")
    
    # Output the count as the first key field, so that we can use
    # Hadoop to sort it in reverse order.
    print "\t".join([count, item1, item2, support])

Writing mapper_3.4_2.py


In [801]:
%%writefile reducer_3.4_2.py
#!/usr/bin/python
from __future__ import division # Use Python 3-style division
import sys, Queue

sys.stderr.write('reporter:counter:custom,reducer_called,1\n')

n_max = 50
a_max = []

# input comes from STDIN
for line in sys.stdin:
    if len(a_max) < n_max:
        a_max.append(line)
    else:
        break

# Output the result
print "Top %d pairs" % n_max
for line in a_max:
    rec = line.strip().split('\t')
    print ",".join(rec)

Writing reducer_3.4_2.py


### Quick test

In [810]:
# Note: have to change threshold to 1
!head -n 2 ProductPurchaseData.txt | \
python mapper_3.4_1.py | \
sort -t$'\t' -k1,1 | \
python reducer_3.4_1.py --threshold 1 | \
/bin/cat | \
sort -t$'\t' -k3,3rn -k1,2 | \
python reducer_3.4_2.py 

reporter:counter:custom,mapper_called,1
reporter:counter:custom,reducer_called,1
reporter:counter:custom,reducer_called,1
Top 50 pairs
ELE17451,GRO99222,2,1.000000
ELE17451,ELE26917,1,0.500000
ELE17451,ELE52966,1,0.500000
ELE17451,ELE89019,1,0.500000
ELE17451,ELE91550,1,0.500000
ELE17451,FRO11987,1,0.500000
ELE17451,FRO12685,1,0.500000
ELE17451,FRO84225,1,0.500000
ELE17451,FRO90334,1,0.500000
ELE17451,GRO12298,1,0.500000
ELE17451,SNA11465,1,0.500000
ELE17451,SNA30755,1,0.500000
ELE17451,SNA80192,1,0.500000
ELE17451,SNA90258,1,0.500000
ELE26917,ELE52966,1,0.500000
ELE26917,ELE91550,1,0.500000
ELE26917,FRO12685,1,0.500000
ELE26917,FRO84225,1,0.500000
ELE26917,FRO90334,1,0.500000
ELE26917,GRO12298,1,0.500000
ELE26917,GRO99222,1,0.500000
ELE26917,SNA11465,1,0.500000
ELE26917,SNA30755,1,0.500000
ELE26917,SNA80192,1,0.500000
ELE52966,ELE91550,1,0.500000
ELE52966,FRO12685,1,0.500000
ELE52966,FRO84225,1,0.500000
ELE52966,FRO90334,1,0.500000
ELE52966,GRO12298,1,0

### Run it in Hadoop

In [802]:
# Part 1

!hdfs dfs -rm -r hw3.4-part1

!hadoop jar $HADOOP_INSTALL/share/hadoop/tools/lib/hadoop-*streaming*.jar \
-D mapreduce.job.name='HW3.4-part1' \
-D mapred.map.tasks=3 \
-D mapred.reduce.tasks=1 \
-D stream.num.map.output.key.fields=2 \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options="-k1,1 -k2,2" \
-file mapper1.py -mapper mapper1.py \
-file reducer1.py -reducer reducer1.py \
-input ProductPurchaseData.txt \
-output hw3.4-part1

16/02/03 10:13:29 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Deleted hw3.4-part1
16/02/03 10:13:31 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/02/03 10:13:31 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [mapper1.py, reducer1.py, /var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/hadoop-unjar7921158822365522625/] [] /var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/streamjob2250619433173984701.jar tmpDir=null


In [803]:
# Part 2

!hdfs dfs -rm -r hw3.4-part2

# For sorting, use the 1st field (count) as the primary key, in reverse order.
!hadoop jar $HADOOP_INSTALL/share/hadoop/tools/lib/hadoop-*streaming*.jar \
-D mapreduce.job.name='HW3.4-part2' \
-D mapred.reduce.tasks=1 \
-D stream.num.map.output.key.fields=3 \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options="-k3,3rn -k1,2" \
-mapper /bin/cat \
-file reducer2.py -reducer reducer2.py \
-input hw3.4-part1/part-* \
-output hw3.4-part2

16/02/03 10:14:51 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Deleted hw3.4-part2
16/02/03 10:14:52 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/02/03 10:14:53 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [reducer2.py, /var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/hadoop-unjar8293456977848402676/] [] /var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/streamjob2176540031542185349.jar tmpDir=null


In [804]:
# Show the result
!hdfs dfs -cat hw3.4-part2/part-00000

16/02/03 10:15:36 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Top 50 pairs	
DAI62779,ELE17451,1592,0.051188	
FRO40251,SNA80324,1412,0.045400	
DAI75645,FRO40251,1254,0.040320	
FRO40251,GRO85051,1213,0.039002	
DAI62779,GRO73461,1139,0.036623	
DAI75645,SNA80324,1130,0.036333	
DAI62779,FRO40251,1070,0.034404	
DAI62779,SNA80324,923,0.029678	
DAI62779,DAI85309,918,0.029517	
ELE32164,GRO59710,911,0.029292	
DAI62779,DAI75645,882,0.028359	
FRO40251,GRO73461,882,0.028359	
DAI62779,ELE92920,877,0.028198	
FRO40251,FRO92469,835,0.026848	
DAI62779,ELE32164,832,0.026752	
DAI75645,GRO73461,712,0.022893	
DAI43223,ELE32164,711,0.022861	
DAI62779,GRO30386,709,0.022797	
ELE17451,FRO40251,697,0.022411	
DAI85309,ELE99737,659,0.021189	
DAI62779,ELE26917,650,0.020900	
GRO21487,GRO73461,631,0.020289	
DAI62779,SNA45677,604,0.019421	
ELE17451,SNA80324,597,0.019196	
DAI62779,GRO71621,595,0.019131	
DAI62779,SNA55762,593,0.019067	

### Report 

Compute time: 46 + 16 = 62 sec   
Setup: Quad Core, OS X, 3 mappers, 1 reducer  
Mapper called: 3 + 2 = 5 times  
Reducer called: 1 + 1 = 2 times  

## HW3.5: Stripes
Repeat 3.4 using the stripes design pattern for finding cooccuring pairs.

Report  the compute times for stripes job versus the Pairs job. Describe the computational setup used (E.g., single computer; dual core; linux, number of mappers, number of reducers)

Instrument your mapper, combiner, and reducer to count how many times each is called using Counters and report these counts. Discuss the differences in these counts between the Pairs and Stripes jobs

In [751]:
%%writefile mapper_3.5_1.py
#!/usr/bin/python
import sys
import re
from collections import OrderedDict

sys.stderr.write('reporter:counter:custom,mapper_called,1\n')

# input comes from STDIN (standard input)
for line in sys.stdin:
    items = line.strip().split()
    
    # Use order inversion to help calculate the total # of baskets
    print "#\t1" # Use # as the special key
    
    # First ignore the corner case
    if len(items) <= 1:
        continue
        
    items = sorted(items) # Sort it because we will list pairs info in sorted order
        
    for i in range(len(items) - 1):
        cooccur = OrderedDict()  
        x = items[i]
        for j in range(i+1, len(items)):
            y = items[j]
            if x == y:
                continue # skip duplicate items
            
            if y in cooccur:
                cooccur[y] += 1
            else:
                cooccur[y] = 1
        
        # Output the coocurr info about x
        stripe = ",".join([k+","+str(v) for k,v in cooccur.items()])
        print "%s\t%s" % (x, stripe)


Overwriting mapper_3.5_1.py


In [813]:
%%writefile reducer_3.5_1.py
#!/usr/bin/python
from __future__ import division # Use Python 3-style division
import sys
from collections import Counter
import argparse

parser = argparse.ArgumentParser()
parser.add_argument("--threshold", type=int, default=100)
args = parser.parse_args()
supportThreshold = args.threshold

sys.stderr.write('reporter:counter:custom,reducer_called,1\n')

stripeCombined = Counter() # Stripe info about the current item
totalCount = 0 # Total number of baskets
curr = None # The current item

# Read the stripe input and convert it into a dict
def readStripe(data):
    stripe = Counter()
    
    # E.g. SNA90258,1,ELE17451,1,ELE89019,1,GRO99222,1
    items = data.split(',')
    for i in range(0, len(items), 2):
        stripe[items[i]] = int(items[i+1])
    return stripe

def outputStripe(item, stripe):
    # only keep those which pass the threshold
    filtered = {k:v for k,v in stripe.items() if v >= supportThreshold}
    
    # Output if we have at least one pair which has met the threshold
    if len(filtered) > 0:
        print "%s\t%s" % (item, ",".join([":".join([k, str(v), str(v/totalCount)]) 
                             for k,v in filtered.items()]))

# input comes from STDIN
for line in sys.stdin:
    (item, data) = line.strip().split('\t')

    # Find out the total basket count.
    # We use # as the special key for order inversion.
    if item == "#":
        totalCount += int(data)
        continue
    
    # If we have encountered a new item, output the stripe info of the last item
    if curr and curr != item:
        outputStripe(curr, stripeCombined)
        stripeCombined.clear()
            
    # Merge the stripe info of this input into stripeCombined
    stripeCombined = stripeCombined + readStripe(data)
    
    curr = item

# Handle the last pair seen
if curr is not None:
    outputStripe(curr, stripeCombined)

Overwriting reducer_3.5_1.py


### Quick test for mapper/reducer 1

In [815]:
# Note: have to change threshold to 1
!head -n 10 ProductPurchaseData.txt | \
python mapper_3.5_1.py | \
sort -t$'\t' -k1,1 | \
python reducer_3.5_1.py --threshold 1 


reporter:counter:custom,mapper_called,1
reporter:counter:custom,reducer_called,1
DAI14125	FRO18919:1:0.1,SNA90258:1:0.1,ELE11375:1:0.1,ELE17451:1:0.1,FRO78087:1:0.1,ELE28573:1:0.1,SNA69641:1:0.1
DAI22177	DAI91535:1:0.1,DAI49199:1:0.1,FRO18919:1:0.1,SNA91554:1:0.1,SNA80192:2:0.2,SNA85662:2:0.2,ELE17451:2:0.2,FRO81176:1:0.1,GRO94758:1:0.1,DAI22896:1:0.1,ELE23393:1:0.1,SNA90258:1:0.1,ELE66810:1:0.1,ELE94711:1:0.1,DAI46755:1:0.1,ELE59935:1:0.1,GRO73461:1:0.1
DAI22896	SNA99873:2:0.2,DAI91535:1:0.1,DAI49199:1:0.1,FRO18919:1:0.1,DAI50921:1:0.1,SNA80192:2:0.2,SNA85662:1:0.1,ELE17451:3:0.3,FRO81176:1:0.1,FRO86643:1:0.1,SNA90258:1:0.1,ELE66810:1:0.1,ELE94711:1:0.1,DAI46755:1:0.1,GRO94758:1:0.1,GRO75578:1:0.1,GRO73461:3:0.3
DAI46755	DAI91535:1:0.1,DAI49199:1:0.1,SNA80192:1:0.1,SNA85662:1:0.1,ELE17451:1:0.1,FRO81176:1:0.1,SNA90258:1:0.1,ELE66810:1:0.1,ELE94711:1:0.1,GRO94758:1:0.1,GRO73461:1:0.1
DAI49199	DAI91535:1:0.1,SNA80192:1:0.1,SNA85662:1:0.1,ELE17451:1:0.1,FRO81176:1:0.1,SNA90258:1:0.1,ELE6

In [775]:
%%writefile mapper_3.5_2.py
#!/usr/bin/python
import sys
import re

sys.stderr.write('reporter:counter:custom,mapper_called,1\n')

# input comes from STDIN (standard input)
for line in sys.stdin:
    # E.g.
    # SNA80192	SNA90258:1:0.1,SNA85662:2:0.2,SNA91554:1:0.1,SNA99873:1:0.1
    (item, data) = line.strip().split("\t")
    
    # Output all the pair info
    for pair in data.split(","):
        info = pair.split(":")
        print "\t".join([item, info[0], info[1], info[2]])

Overwriting mapper_3.5_2.py


In [776]:
%%writefile reducer_3.5_2.py
#!/usr/bin/python
from __future__ import division # Use Python 3-style division
import sys, Queue

sys.stderr.write('reporter:counter:custom,reducer_called,1\n')

n_max = 50
a_max = []

# input comes from STDIN
for line in sys.stdin:
    if len(a_max) < n_max:
        a_max.append(line)
    else:
        break

# Output the result
print "Top %d pairs" % n_max
for line in a_max:
    rec = line.strip().split('\t')
    print ",".join(rec)

Overwriting reducer_3.5_2.py


### Quick test for both mapper/reducer 1 and 2

In [816]:
## Note: have to change threshold to 1
!head -n 1000 ProductPurchaseData.txt | \
python mapper_3.5_1.py | \
sort -t$'\t' -k1,1 | \
python reducer_3.5_1.py --threshold 1 | \
python mapper_3.5_2.py | \
sort -t$'\t' -k3,3rn -k1,2 | \
python reducer_3.5_2.py 

reporter:counter:custom,mapper_called,1
reporter:counter:custom,mapper_called,1
reporter:counter:custom,reducer_called,1
reporter:counter:custom,reducer_called,1
Top 50 pairs
FRO40251,GRO73461,84,0.084
DAI62779,GRO73461,79,0.079
GRO73461,SNA80324,76,0.076
DAI75645,FRO40251,73,0.073
FRO40251,SNA80324,69,0.069
DAI75645,SNA80324,68,0.068
DAI75645,GRO73461,67,0.067
ELE17451,GRO73461,63,0.063
FRO73056,GRO73461,48,0.048
ELE12792,SNA69641,46,0.046
FRO40251,GRO85051,46,0.046
DAI50913,ELE38289,44,0.044
DAI62779,ELE17451,38,0.038
FRO26482,GRO73461,38,0.038
FRO40251,FRO92469,37,0.037
FRO73056,GRO44993,37,0.037
GRO56726,GRO73461,37,0.037
DAI50913,ELE24064,36,0.036
GRO15017,GRO73461,36,0.036
GRO44993,GRO73461,36,0.036
DAI50913,SNA46714,35,0.035
ELE26917,GRO99222,34,0.034
FRO85978,SNA95666,34,0.034
DAI62779,SNA80324,33,0.033
ELE73604,GRO73461,33,0.033
DAI62779,DAI75645,32,0.032
ELE24064,SNA46714,32,0.032
GRO73461,SNA69641,32,0.032
ELE17451,FRO40251,31,0.031
GRO61133,GRO73461,31,0.031
DAI22896,GRO734

### Run it in Hadoop

In [805]:
# Part 1

!hdfs dfs -rm -r hw3.5-part1

!hadoop jar $HADOOP_INSTALL/share/hadoop/tools/lib/hadoop-*streaming*.jar \
-D mapreduce.job.name='HW3.5-part1' \
-D mapred.map.tasks=3 \
-D mapred.reduce.tasks=1 \
-D stream.num.map.output.key.fields=1 \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options="-k1,1" \
-file mapper_3.5_1.py -mapper mapper_3.5_1.py \
-file reducer_3.5_1.py -reducer reducer_3.5_1.py \
-input ProductPurchaseData.txt \
-output hw3.5-part1

16/02/03 10:16:08 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Deleted hw3.5-part1
16/02/03 10:16:10 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/02/03 10:16:10 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [mapper_3.5_1.py, reducer_3.5_1.py, /var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/hadoop-unjar8236972170776780045/] [] /var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/streamjob6091141613170820410.jar tmpDir=null


In [806]:
# Part 2

!hdfs dfs -rm -r hw3.5-part2

# For sorting, use the 1st field (count) as the primary key, in reverse order.
!hadoop jar $HADOOP_INSTALL/share/hadoop/tools/lib/hadoop-*streaming*.jar \
-D mapreduce.job.name='HW3.5-part2' \
-D mapred.map.tasks=4 \
-D mapred.reduce.tasks=1 \
-D stream.num.map.output.key.fields=3 \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options="-k3,3rn -k1,2" \
-file mapper_3.5_2.py -mapper mapper_3.5_2.py \
-file reducer_3.5_2.py -reducer reducer_3.5_2.py \
-input hw3.5-part1/part-* \
-output hw3.5-part2

16/02/03 10:19:56 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Deleted hw3.5-part2
16/02/03 10:19:58 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/02/03 10:19:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [mapper_3.5_2.py, reducer_3.5_2.py, /var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/hadoop-unjar8809908029816802731/] [] /var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/streamjob2213618716350449340.jar tmpDir=null


In [783]:
# Show the result
!hdfs dfs -cat hw3.5-part2/part-00000

16/02/03 00:25:48 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Top 50 pairs	
DAI62779,ELE17451,1592,0.0511880646925	
FRO40251,SNA80324,1412,0.0454004694383	
DAI75645,FRO40251,1254,0.0403202469374	
FRO40251,GRO85051,1213,0.0390019613517	
DAI62779,GRO73461,1139,0.0366226166361	
DAI75645,SNA80324,1130,0.0363332368734	
DAI62779,FRO40251,1070,0.0344040384554	
DAI62779,SNA80324,923,0.0296775023311	
DAI62779,DAI85309,918,0.0295167357963	
ELE32164,GRO59710,911,0.0292916626475	
DAI62779,DAI75645,882,0.0283592167454	
FRO40251,GRO73461,882,0.0283592167454	
DAI62779,ELE92920,877,0.0281984502106	
FRO40251,FRO92469,835,0.026848011318	
DAI62779,ELE32164,832,0.0267515513971	
DAI75645,GRO73461,712,0.0228931545609	
DAI43223,ELE32164,711,0.022861001254	
DAI62779,GRO30386,709,0.02279669464	
ELE17451,FRO40251,697,0.0224108549564	
DAI85309,ELE99737,659,0.0211890292917	
DAI62779,ELE26917,650,0.020899649529	
GRO21487,GRO73461

### Report 

Compute time: 3min 40sec+ 18sec = 3min 58sec   
Setup: Quad Core, OS X, 4 mappers, 1 reducer  
Mapper called: 3 + 3 = 6 times  
Reducer called: 1 + 1 = 2 times  