# MIDS UC Berkeley - Machine Learning at Scale
## DATSCIW261 ASSIGNMENT #3  

[James Gray](https://github.com/jamesgray007)   
jamesgray@ischool.berkeley.edu   
Time of Initial Submission: 12:45 PM US Central, Sunday, June 5, 2016  
Time of **Resubmission**:  
W261-1, Spring 2016  
Week 3 Homework

## References for this Assignment

* x



## HW3.0

* **_How do you merge  two sorted  lists/arrays of records of the form [key, value]? Where is this used in Hadoop MapReduce? [Hint within the shuffle]_**

![merge](img/mergesort.png)

The "merge sort" algorithm merges two sorted lists into a third list. Pointers are set at the start of each list.  The pointer picks off the first number of the first list and another pointer picks off the first number of the second list. The smaller value is copied to the third list and the pointer is moved to the right of the list with the smaller number and the pointer of the third list (merged list) is also moved. This process is repeated until all of the two lists have been read.

* **_What is  a combiner function in the context of Hadoop? _**

A combiner is a mapper-side function that consolidates key-value pairs with the same key before it enters the Hadoop shuffle process.  This reduces network congestion between mappers and reducers.

* **_Give an example where it can be used and justify why it should be used in the context of this problem._**

For example, if we were computing a word count on a document corpus instead of emitting key-value pairs as "word, 1", we could "reduce" on the mapper side by counting the frequency of each word before sending the data to the reducer.  This would reduce network traffic. 

* **_What is the Hadoop shuffle? _**

The Hadoop shuffle is the heart of the map-reduce framework that transfers and synchronizes data between the map and reduce process.  There are three parts to the shuffle: 

1. partition - the partitioner organizes the mapper output by key and creates a file for each reducer.
2. sort - after the data is partitioned into files the sort function sorts the data.
3. combine - the combine function uses the sorted data and combines key-value pairs with the same key.

The data is directed to the reducer(s) once the three shuffle steps have been completed.  See [Wikipedia](https://en.wikipedia.org/wiki/Merge_sort) for more information.


## HW3.1 - Consumer Complaints dataset - Use Counters to do EDA

Counters are lightweight objects in Hadoop that allow you to keep track of system progress in both the map and reduce stages of processing. By default, Hadoop defines a number of standard counters in "groups"; these show up in the jobtracker webapp, giving you information such as "Map input records", "Map output records", etc. 

While processing information/data using MapReduce job, it is a challenge to monitor the progress of parallel threads running across nodes of distributed clusters. Moreover, it is also complicated to distinguish between the data that has been processed and the data which is yet to be processed. The MapReduce Framework offers a provision of user-defined Counters, which can be effectively utilized to monitor the progress of data across nodes of distributed clusters.

Use the Consumer Complaints  Dataset provide here to complete this question:

     https://www.dropbox.com/s/vbalm3yva2rr86m/Consumer_Complaints.csv?dl=0

The consumer complaints dataset consists of diverse consumer complaints, which have been reported across the United States regarding various types of loans. The dataset consists of records of the form:

Complaint ID,Product,Sub-product,Issue,Sub-issue,State,ZIP code,Submitted via,Date received,Date sent to company,Company,Company response,Timely response?,Consumer disputed?

#### User-defined Counters

Now, let’s use Hadoop Counters to identify the number of complaints pertaining to debt collection, mortgage and other categories (all other categories get lumped into this one) in the consumer complaints dataset. **Basically produce the distribution of the Product column in this dataset using counters (limited to 3 counters here).**

Hadoop offers Job Tracker, an UI tool to determine the status and statistics of all jobs. Using the job tracker UI, developers can view the Counters that have been created. ** Screenshot your job tracker UI as your job completes and include it here. Make sure that your user defined counters are visible. **



### HW3.1 - Mapper 

In [3]:
%%writefile mapper31.py
#!/usr/bin/python
## mapper31.py
## Author: James Gray
## Description: mapper code for HW3.1

import sys
for line in sys.stdin:
    line=line.strip()
    product=line.split(',')[1] #extract product field from second field
    
    # populate 3 counters based on Consumer_Complaints.csv product type
    if product=='Debt collection':
        sys.stderr.write("reporter:counter:Debt,Total,1\n")
    if product=='Mortgage':
        sys.stderr.write("reporter:counter:Mortgage,Total,1\n")
    else:
        sys.stderr.write("reporter:counter:Other,Total,1\n")
    print product+'\t1'

Overwriting mapper31.py


### HW3.1 - Reducer

In [4]:
%%writefile reducer31.py
#!/usr/bin/python
## reducer31.py
## Author: James Gray
## Description: reducer code for HW3.1

import sys
for line in sys.stdin:
    line=line.strip()
    print line

Writing reducer31.py


In [5]:
# set file priveleges to execute script
!chmod a+x reducer31.py
!chmod a+x mapper31.py

### HW3.1 - Populate HDFS

In [8]:
# transfer Consumer_Complaints.csv into HDFS

#!hdfs dfs -put Consumer_Complaints.csv /user/graymatter/consumer_complaints.csv

!hdfs dfs -ls /user/graymatter/*

16/05/26 14:29:35 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-rw-r--r--   1 jamesgray supergroup   50906486 2016-05-26 14:29 /user/graymatter/consumer_complaints.csv
-rw-r--r--   1 jamesgray supergroup     202254 2016-05-23 18:10 /user/graymatter/enronemail_1h.txt
Found 2 items
-rw-r--r--   1 jamesgray supergroup          0 2016-05-24 23:07 /user/graymatter/hw21-output/_SUCCESS
-rw-r--r--   1 jamesgray supergroup     108879 2016-05-24 23:07 /user/graymatter/hw21-output/part-00000
Found 2 items
-rw-r--r--   1 jamesgray supergroup          0 2016-05-21 14:46 /user/graymatter/hw21-output.txt/_SUCCESS
-rw-r--r--   1 jamesgray supergroup     108879 2016-05-21 14:46 /user/graymatter/hw21-output.txt/part-00000
Found 2 items
-rw-r--r--   1 jamesgray supergroup          0 2016-05-21 17:22 /user/graymatter/hw22-output/_SUCCESS
-rw-r--r--   1 jamesgray supergroup         14 2016-05-21 17:22 /user/graymatter/hw2

### HW3.1 - Execute MR Job

In [12]:
# delete output directory
!hdsf dfs -rm -r /user/graymatter/hw31-output

!hadoop jar /usr/local/Cellar/hadoop/2.7.1/libexec/share/hadoop/tools/lib/hadoop-streaming-2.7.1.jar \
-files mapper31.py,reducer31.py -mapper mapper31.py -reducer reducer31.py \
-input /user/graymatter/consumer_complaints.csv -output /user/graymatter/hw31-output

16/05/26 14:39:45 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/26 14:39:45 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/05/26 14:39:45 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/05/26 14:39:45 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/05/26 14:39:46 ERROR streaming.StreamJob: Error Launching job : Output directory hdfs://localhost:9000/user/graymatter/hw31-output already exists
Streaming Command Failed!


### HW3.1 - Counter Output

![counters](img/counters.png)

## HW3.2 - Analyze the performance of your Mappers, Combiners and Reducers using Counters

For this brief study the Input file will be one record (the next line only): 
** foo foo quux labs foo bar quux **

#### Part 1 ####
Perform a word count analysis of this single record dataset using a Mapper and Reducer based WordCount (i.e., no combiners are used here) using user defined Counters to ** count up how many time the mapper and reducer are called**. What is the value of your user defined Mapper Counter, and Reducer Counter after completing this word count job. The answer  should be 1 and 4 respectively. Please explain.  Please use mulitple mappers and reducers for these jobs (at least 2 mappers and 2 reducers). 

#### Part 2 ####
Perform a word count analysis of the Issue column of the Consumer Complaints  Dataset using a Mapper and Reducer based WordCount (i.e., no combiners used anywhere)  using user defined Counters to count up how many time the mapper and reducer are called. ** What is the value of your user defined Mapper Counter, and Reducer Counter after completing your word count job**. 

#### Part 3 ####
Perform a word count analysis of the Issue column of the Consumer Complaints  Dataset using a Mapper, Reducer, and standalone combiner (i.e., not an in-memory combiner) based WordCount using user defined Counters to ** count up how many time the mapper, combiner, reducer are called. What is the value of your user defined Mapper Counter, and Reducer Counter after completing your word count job.** 

#### Part 4 ####
Using a single reducer: 

* What are the top 50 most frequent terms in your word count analysis? Present the top 50 terms and their frequency and their relative frequency. 
* If there are ties please sort the tokens in alphanumeric/string order. Present bottom 10 tokens (least frequent items). Please use a combiner.


### Populate HDFS with file

In [24]:
# the text "foo foo quux labs foo bar quux" was added to "singleline.txt file

#!hdfs dfs -put singleline.txt /user/graymatter/singleline.txt

!hdfs dfs -cat /user/graymatter/singleline.txt

16/05/27 07:58:04 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
foo foo quux labs foo bar quux

### HW3.2 - Part 1 - Mapper

In [435]:
%%writefile mapper321.py
#!/usr/bin/python
## mapper321.py
## Author: James Gray
## Description: mapper code for HW3.2 Part 1

import sys
import re

# capture how many times Mapper is executed in a user-defined counter
sys.stderr.write("reporter:counter:Mapper Counter,Calls,1\n")

# input comes from STDNIN and is specified in the Hadoop Streaming job
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # create tokens
    words = line.split()
    
    # iterate through words
    for word in words:
        # create key-value pair for each word and send to STDOUT
        print('%s\t%s') % (word, 1)

Overwriting mapper321.py


### HW3.2 - Part 1 - Reducer

Code based on example from Michael G. Noll http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/

In [436]:
%%writefile reducer321.py
#!/usr/bin/python
## reducer321.py
## Author: James Gray
## Description: reducer code for HW3.2 Part 1

from operator import itemgetter
import sys

# capture how many times Reducer is executed in a user-defined counter
sys.stderr.write("reporter:counter:Reducer Counter,Calls,1\n")

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%s' % (current_word, current_count)

Overwriting reducer321.py


### HW3.1 - Part 1 - Execute MR Job

In [437]:
# delete output directory and files
!hdfs dfs -rm -r /user/graymatter/hw31_job1-output

!hadoop jar /usr/local/Cellar/hadoop/2.7.1/libexec/share/hadoop/tools/lib/hadoop-streaming-2.7.1.jar \
-D mapred.map.tasks=2 \
-D mapred.reduce.tasks=4 \
-files mapper321.py,reducer321.py -mapper mapper321.py -reducer reducer321.py \
-input /user/graymatter/singleline.txt -output /user/graymatter/hw31_job1-output

16/05/30 14:59:38 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/30 14:59:39 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/graymatter/hw31_job1-output
16/05/30 14:59:40 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/30 14:59:41 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/05/30 14:59:41 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/05/30 14:59:41 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/05/30 14:59:41 INFO mapred.FileInputFormat: Total input paths to process : 1
16/05/30 14:59:41 INFO mapreduce.JobSubmitter: number of splits:1
16/05/30 14:59:41 INFO Configura

### HW3.1 - Part 1 - Output Analysis

The output confirmed that the Mapper was called "1" time and the Reducer was called "4" times

![output](img/321output.png)

In [50]:
!hdfs dfs -cat /user/graymatter/hw31_job1-output/*

16/05/27 09:20:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
quux	2
foo	3
bar	1
labs	1


### HW3.2 - Part 2 - Mapper

In [438]:
%%writefile mapper322.py
#!/usr/bin/python
## mapper321.py
## Author: James Gray
## Description: mapper code for HW3.2 Part 2

import sys
import re
from csv import reader

# capture how many times Mapper is executed in a user-defined counter
sys.stderr.write("reporter:counter:Mapper Counter,Calls,1\n")

WORD_RE = re.compile(r"[\w']+")

# input comes from STDNIN and is specified in the Hadoop Streaming job
for line in reader(sys.stdin): # returns list of strings
    # create tokens
    issues=re.findall(WORD_RE, line[3]) #extract issue (4th field)
    
    # iterate through issues and produce KV pairs
    for issue in issues:
        # create key-value pair for each word and send to STDOUT
        print('%s\t%s') % (issue, 1)

Overwriting mapper322.py


In [65]:
!cat Consumer_Complaints.csv | ./mapper322.py > test.txt

reporter:counter:Mapper,Calls,1


### HW3.2 - Part 2 - Reducer

In [439]:
%%writefile reducer322.py
#!/usr/bin/python
## reducer321.py
## Author: James Gray
## Description: reducer code for HW3.2 Part 2

from operator import itemgetter
import sys

# capture how many times Reducer is executed in a user-defined counter
sys.stderr.write("reporter:counter:Reducer Counter,Calls,1\n")

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%s' % (current_word, current_count)

Overwriting reducer322.py


In [66]:
# delete output directory
!hdfs dfs -rm -r /user/graymatter/hw322-output

!hadoop jar /usr/local/Cellar/hadoop/2.7.1/libexec/share/hadoop/tools/lib/hadoop-streaming-2.7.1.jar \
-D mapred.map.tasks=2 \
-D mapred.reduce.tasks=2 \
-files mapper322.py,reducer322.py -mapper mapper322.py -reducer reducer322.py \
-input /user/graymatter/consumer_complaints.csv -output /user/graymatter/hw322-output

16/05/27 10:14:51 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/27 10:14:52 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/graymatter/hw322-output
16/05/27 10:14:53 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/27 10:14:53 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/05/27 10:14:53 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/05/27 10:14:53 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/05/27 10:14:54 INFO mapred.FileInputFormat: Total input paths to process : 1
16/05/27 10:14:54 INFO mapreduce.JobSubmitter: number of splits:1
16/05/27 10:14:54 INFO Configuration

### HW3.2 - Part 2 - Output

![output](img/322aoutput.png)

In [68]:
!hdfs dfs -cat /user/graymatter/hw322-output/*

16/05/27 10:18:09 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
APR	3431
Account	16555
Applied	139
Arbitration	168
Bankruptcy	222
Billing	8158
Can't	1999
Cash	240
Closing	2795
Cont'd	17972
Convenience	75
Credit	14768
Debt	1343
Delinquent	1061
Deposits	10555
Disclosure	7655
False	3621
I	925
Incorrect	29133
Issue	1
Making	3226
Overlimit	127
Payoff	1155
Received	118
Repaying	3844
Sale	139
Settlement	4350
Unable	4357
Unsolicited	640
Workout	350
Wrong	98
a	3503
account	40893
acct	163
an	2964
and	16448
available	274
being	5663
broker	8625
by	5663
caused	5663
changes	350
charges	131
checks	75
closing	16205
company's	4858
credit	40483
debt	26531
delay	243
determination	1490
did	139
disputes	6938
escrow	36767
expect	807
fees	807
for	929
foreclosure	70487
issuance	640
issue	1098
management	16205
not	18477
of	13983
on	29069
or	40508
owed	17972
payments	39993
process	5505
processing	243
promised	274
protection	4139

### HW3.2 - Part 3 - Mapper

This mapper is identical to the original Mapper in Part 1

In [69]:
%%writefile mapper323.py
#!/usr/bin/python
## mapper321.py
## Author: James Gray
## Description: mapper code for HW3.2 Part 3

import sys
import re
from csv import reader

# capture how many times Mapper is executed in a user-defined counter
sys.stderr.write("reporter:counter:Mapper Counter,Calls,1\n")

WORD_RE = re.compile(r"[\w']+")

# input comes from STDNIN and is specified in the Hadoop Streaming job
for line in reader(sys.stdin): # returns list of strings
    # create tokens
    issues=re.findall(WORD_RE, line[3]) #extract issue (4th field)
    
    # iterate through issues and produce KV pairs
    for issue in issues:
        # create key-value pair for each word and send to STDOUT
        print('%s\t%s') % (issue, 1)

Overwriting mapper323.py


### HW3.2 - Part 3 - Combiner

In this part we will use a combiner which will aggregate the word frequencies in the mapper and emit key-value pair for the unique word and count.  Combiners actually act as "mini-reducers" that process the output of mappers (page 42 - MapReduce Algorithm Design).  So in this scenario we will use the "reducer" code that performs the aggregation as the combiner.

In [440]:
%%writefile combiner323.py
#!/usr/bin/python
## combiner321.py
## Author: James Gray
## Description: combiner code for HW3.2 Part 3

from operator import itemgetter
import sys

# capture how many times Reducer is executed in a user-defined counter
sys.stderr.write("reporter:counter:Combiner Counter,Calls,1\n")

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%s' % (current_word, current_count)

Overwriting combiner323.py


### HW3.2 - Part 3 - Reducer

In [441]:
%%writefile reducer323.py
#!/usr/bin/python
## reducer323.py
## Author: James Gray
## Description: reducer code for HW3.2 Part 3

from operator import itemgetter
import sys

# capture how many times Reducer is executed in a user-defined counter
sys.stderr.write("reporter:counter:Reducer Counter,Calls,1\n")

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%s' % (current_word, current_count)

Overwriting reducer323.py


In [43]:
# set file priveleges to execute script
!chmod a+x reducer323.py
!chmod a+x mapper323.py
!chmod a+x combiner323.py

### HW3.2 - Part 3 - Execute MR Job

In [70]:
# delete output directory
!hdfs dfs -rm -r /user/graymatter/hw323-output

!hadoop jar /usr/local/Cellar/hadoop/2.7.1/libexec/share/hadoop/tools/lib/hadoop-streaming-2.7.1.jar \
-D mapred.map.tasks=2 \
-D mapred.reduce.tasks=2 \
-files mapper323.py,reducer323.py,combiner323.py -mapper mapper323.py -reducer reducer323.py -combiner combiner323.py  \
-input /user/graymatter/consumer_complaints.csv -output /user/graymatter/hw323-output

16/05/27 10:20:25 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/27 10:20:25 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/graymatter/hw323-output
16/05/27 10:20:26 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/27 10:20:27 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/05/27 10:20:27 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/05/27 10:20:27 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/05/27 10:20:27 INFO mapred.FileInputFormat: Total input paths to process : 1
16/05/27 10:20:27 INFO mapreduce.JobSubmitter: number of splits:1
16/05/27 10:20:27 INFO Configuration

### HW3.2 - Part 3 - Output Summar

![output](img/333output.png)

In [71]:
!hdfs dfs -cat /user/graymatter/hw323-output/*

16/05/27 10:20:54 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
APR	3431
Account	16555
Applied	139
Arbitration	168
Bankruptcy	222
Billing	8158
Can't	1999
Cash	240
Closing	2795
Cont'd	17972
Convenience	75
Credit	14768
Debt	1343
Delinquent	1061
Deposits	10555
Disclosure	7655
False	3621
I	925
Incorrect	29133
Issue	1
Making	3226
Overlimit	127
Payoff	1155
Received	118
Repaying	3844
Sale	139
Settlement	4350
Unable	4357
Unsolicited	640
Workout	350
Wrong	98
a	3503
account	40893
acct	163
an	2964
and	16448
available	274
being	5663
broker	8625
by	5663
caused	5663
changes	350
charges	131
checks	75
closing	16205
company's	4858
credit	40483
debt	26531
delay	243
determination	1490
did	139
disputes	6938
escrow	36767
expect	807
fees	807
for	929
foreclosure	70487
issuance	640
issue	1098
management	16205
not	18477
of	13983
on	29069
or	40508
owed	17972
payments	39993
process	5505
processing	243
promised	274
protection	4139

### HW3.2 - Part 4

Using a single reducer: 

What are the top 50 most frequent terms in your word count analysis? Present the top 50 terms and their frequency and their relative frequency. 

If there are ties please sort the tokens in alphanumeric/string order. Present bottom 10 tokens (least frequent items). 

In [128]:
%%writefile mapper324.py
#!/usr/bin/python
## mapper324.py
## Author: James Gray
## Description: mapper code for HW3.2 Part 4

import sys
import re
from csv import reader

word_total = 0

# capture how many times Mapper is executed in a user-defined counter
sys.stderr.write("reporter:counter:Mapper Counter,Calls,1\n")

WORD_RE = re.compile(r"[\w']+")

# input comes from STDNIN and is specified in the Hadoop Streaming job
for line in reader(sys.stdin): # returns list of strings
    # create tokens
    issues=re.findall(WORD_RE, line[3]) #extract issue (4th field)
    
    # iterate through issues and produce KV pairs
    for issue in issues:
        word_total+=1
        # create key-value pair for each word and send to STDOUT
        print('%s\t%s') % (issue.lower(), 1)
        
# emit total number of words for calculating relative frequency
print('*WORD.TOTAL' + '\t' + str(word_total))

Overwriting mapper324.py


In [131]:
%%writefile reducer324.py
#!/usr/bin/python
## reducer324.py
## Author: James Gray
## Description: reducer code for HW3.2 Part 4

from __future__ import division
from operator import itemgetter

import sys

# capture how many times Reducer is executed in a user-defined counter
sys.stderr.write("reporter:counter:Reducer Counter,Calls,1\n")

current_word = None
current_count = 0
word = None
word_total = 0

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
        
    # total the words to calculate a relative frequency
    if word == "*WORD.TOTAL":
        word_total = count

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            word_relative = current_count/word_total
            print '%s\t%s\t%s' % (current_word, str(current_count), str(word_relative))
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    #print '%s\t%s' % (current_word, current_count)
    # reverse the output so we can sort on the current_count
    word_relative = current_count/word_total
    print '%s\t%s\t%s' % (current_count, str(current_count), str(word_relative))

Overwriting reducer324.py


In [132]:
# set file priveleges to execute script
!chmod a+x reducer324.py
!chmod a+x mapper324.py

In [129]:
!cat Consumer_Complaints.csv | ./mapper324.py > test324.txt

reporter:counter:Mapper,Calls,1


In [147]:
# delete output directory
!hdfs dfs -rm -r /user/graymatter/hw324-output

!hadoop jar /usr/local/Cellar/hadoop/2.7.1/libexec/share/hadoop/tools/lib/hadoop-streaming-2.7.1.jar \
-D mapred.map.tasks=2 \
-D mapred.reduce.tasks=1 \
-files mapper324.py,reducer324.py -mapper mapper324.py -reducer reducer324.py \
-input /user/graymatter/consumer_complaints.csv -output /user/graymatter/hw324-output

16/05/27 22:06:44 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/27 22:06:44 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/graymatter/hw324-output
16/05/27 22:06:46 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/27 22:06:47 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/05/27 22:06:47 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/05/27 22:06:47 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/05/27 22:06:47 INFO mapred.FileInputFormat: Total input paths to process : 1
16/05/27 22:06:47 INFO mapreduce.JobSubmitter: number of splits:1
16/05/27 22:06:47 INFO Configuration

In [134]:
!hdfs dfs -cat /user/graymatter/hw324-output/*

16/05/27 20:23:48 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
*WORD.TOTAL	1348309	1.0
a	3503	0.00259806913697
account	57448	0.0426074438426
acct	163	0.000120892169377
action	2964	0.0021983091413
advance	240	0.000178000740186
advertising	1193	0.000884812012677
amount	98	7.26836355761e-05
amt	71	5.26585523051e-05
an	2964	0.0021983091413
and	16448	0.0121989840608
application	8868	0.00657712734989
applied	139	0.000103092095358
apply	118	8.75170305917e-05
apr	3431	0.00254466891491
arbitration	168	0.00012460051813
are	3821	0.00283392011772
atm	2422	0.00179632413638
attempts	17972	0.013329288761
available	274	0.000203217511713
balance	597	0.000442776841214
bank	202	0.000149817289657
bankruptcy	222	0.000164650684672
being	5663	0.00420007579865
billing	8158	0.00605054182684
broker	8625	0.00639690160045
by	5663	0.00420007579865
can't	1999	0.0014825978318
cancelling	2795	0.00207296695342
card	4405	0.003267055252

In [137]:
%%writefile identity.py
#!/usr/bin/python

import sys
for line in sys.stdin:
    print line.strip()

Writing identity.py


In [144]:
!chmod +x identity.py

### H3.2 - Part 4 - MR Job with Sorting

In [149]:
!hdfs dfs -rm -r user/graymatter/hw324-output-sorted 

!hadoop jar /usr/local/Cellar/hadoop/2.7.1/libexec/share/hadoop/tools/lib/hadoop-streaming-2.7.1.jar \
 -D mapred.map.tasks=2 \
 -D mapred.reduce.tasks=1 \
 -D stream.num.map.output.key.fields=2 \ # word and value
 -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
 -D  mapred.text.key.comparator.options='-k2,2nr' \ #sort on 2nd field(value), numeric, reverse
 -file ./identity.py    -mapper ./identity.py \
  -reducer ./identity.py \
 -input /user/graymatter/hw324-output -output /user/graymatter/hw324_output-sorted


16/05/27 22:09:02 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
rm: `user/graymatter/hw324-output-sorted': No such file or directory
16/05/27 22:09:03 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/05/27 22:09:04 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [./identity.py] [] /var/folders/ld/9wpyxfw13t7_pdv_0b8958x40000gn/T/streamjob3744883328114669627.jar tmpDir=null
16/05/27 22:09:05 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/05/27 22:09:05 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/05/27 22:09:05 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/05/27 22:09:05 INFO mapred.FileInputFormat:

In [151]:
!hdfs dfs -cat /user/graymatter/hw324_output-sorted/*

16/05/27 22:11:41 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
*WORD.TOTAL	1348309	1.0
loan	119630	0.0887259522854
collection	72394	0.0536924399377
foreclosure	70487	0.052278075723
modification	70487	0.052278075723
account	57448	0.0426074438426
credit	55251	0.0409779954002
or	40508	0.0300435582645
payments	39993	0.0296615983428
servicing	36767	0.0272689717268
escrow	36767	0.0272689717268
report	34903	0.0258864993114
incorrect	29133	0.0216070648494
information	29069	0.0215595979853
on	29069	0.0215595979853
debt	27874	0.0206733026332
closing	19000	0.0140917252648
not	18477	0.0137038319851
attempts	17972	0.013329288761
owed	17972	0.013329288761
collect	17972	0.013329288761
cont'd	17972	0.013329288761
and	16448	0.0121989840608
opening	16205	0.0120187583113
management	16205	0.0120187583113
of	13983	0.0103707681251
my	10731	0.00795885809558
withdrawals	10555	0.00782832421945
deposits	10555	0.00782832421945
p

In [166]:
# Generate top 50 words 
print("Top 50 words - word, frequency, relative frequency")
!hdfs dfs -cat /user/graymatter/hw324_output-sorted/* | head -50
print("")

# Generate least common 10 words 
print("Least common 10 words - word, frequency, relative frequency")
!hdfs dfs -cat /user/graymatter/hw324_output-sorted/* | tail -10

Top 50 words - word, frequency, relative frequency
16/05/28 13:39:56 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
*WORD.TOTAL	1348309	1.0
loan	119630	0.0887259522854
collection	72394	0.0536924399377
foreclosure	70487	0.052278075723
modification	70487	0.052278075723
account	57448	0.0426074438426
credit	55251	0.0409779954002
or	40508	0.0300435582645
payments	39993	0.0296615983428
servicing	36767	0.0272689717268
escrow	36767	0.0272689717268
report	34903	0.0258864993114
incorrect	29133	0.0216070648494
information	29069	0.0215595979853
on	29069	0.0215595979853
debt	27874	0.0206733026332
closing	19000	0.0140917252648
not	18477	0.0137038319851
attempts	17972	0.013329288761
owed	17972	0.013329288761
collect	17972	0.013329288761
cont'd	17972	0.013329288761
and	16448	0.0121989840608
opening	16205	0.0120187583113
management	16205	0.0120187583113
of	13983	0.0103707681251
my	10731	0.00795885809558
withdrawals	10555

## HW3.2.1

Using 2 reducers: What are the top 50 most frequent terms in your word count analysis? Present the top 50 terms and their frequency and their relative frequency. If there are ties please sort the tokens in alphanumeric/string order. Present bottom 10 tokens (least frequent items). Please use a combiner.



### HW3.2.1 Mapper

Since we have two reducers, we will emit KV pairs that will get routed to two partitioners

In [509]:
%%writefile mapper3215.py
#!/usr/bin/python
## mapper3215.py
## Author: James Gray
## Description: mapper code for HW3.2.1

import sys
import re
from csv import reader

word_total = 0

# Create two partitioners for the reducers
partition1 = "abcdefghijklm"
partition2 = "nopqrstuvwxyz"

# capture how many times Mapper is executed in a user-defined counter
sys.stderr.write("reporter:counter:Mapper Counter,Calls,1\n")

WORD_RE = re.compile(r"[\w']+")

# input comes from STDNIN and is specified in the Hadoop Streaming job
for line in reader(sys.stdin): # returns list of strings
    # create tokens
    issues=re.findall(WORD_RE, line[3]) #extract issue (4th field)
    
    # iterate through issues and produce KV pairs
    for issue in issues:
        word_total+=1
        if issue[0].lower() in partition1:
            partitionKey = "a"
        else:
            partitionKey = "b"
            
        # create key-value pair for each word and send to STDOUT
        print('%s\t%s\t%s') % (issue.lower(), 1, partitionKey)
        
# emit total number of words for calculating relative frequency
print('*WORD.TOTAL' + '\t' + str(word_total), 1)

Overwriting mapper3215.py


### HW3.2.1 Reducer

In [510]:
%%writefile reducer3215.py
#!/usr/bin/python
## reducer3215.py
## Author: James Gray
## Description: reducer code for HW3.2.1

from __future__ import division
from operator import itemgetter

import sys

# capture how many times Reducer is executed in a user-defined counter
sys.stderr.write("reporter:counter:Reducer Counter,Calls,1\n")

current_word = None
current_count = 0
word = None
word_total = 0

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count, partition = line.split('\t')

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
        
    # total the words to calculate a relative frequency
    if word == "*WORD.TOTAL":
        word_total = count

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            word_relative = current_count/word_total
            print '%s\t%s\t%s' % (current_word, str(current_count), str(word_relative))
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    #print '%s\t%s' % (current_word, current_count)
    # reverse the output so we can sort on the current_count
    word_relative = current_count/word_total
    print '%s\t%s\t%s' % (current_count, str(current_count), str(word_relative))

Overwriting reducer3215.py


In [511]:
!chmod +x mapper3215.py
!chmod +x reducer3215.py

In [512]:
!cat Consumer_Complaints.csv | ./mapper3215.py > test3215.txt

reporter:counter:Mapper Counter,Calls,1


In [513]:
# delete output directory
!hdfs dfs -rm -r /user/graymatter/hw3215-output

!hadoop jar /usr/local/Cellar/hadoop/2.7.1/libexec/share/hadoop/tools/lib/hadoop-streaming-2.7.1.jar \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D stream.map.output.field.separator="\t" \ 
-D mapreduce.partition.keypartitioner.options="-k3,3" \ 
-D mapreduce.partition.keycomparator.options="-k2,2nr" \  
-D mapred.map.tasks=1 \
-D mapred.reduce.tasks=2 \
-files mapper3215.py,reducer3215.py -mapper mapper3215.py -reducer reducer3215.py \
-input /user/graymatter/consumer_complaints.csv -output /user/graymatter/hw3215-output \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \


SyntaxError: invalid syntax (<ipython-input-513-9a01b722bb0f>, line 5)

In [497]:
!hdfs dfs -cat /user/graymatter/hw3215-output/*

16/05/31 08:37:15 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
1348309	1348309	1.0


## HW3.3 - Shopping Cart Analysis

Product Recommendations: The action or practice of selling additional products or services to existing customers is called cross-selling. Giving product recommendation is one of the examples of cross-selling that are frequently used by online retailers. One simple method to give product recommendations is to recommend products that are frequently
browsed together by the customers.
	
For this homework use the online browsing behavior dataset located at: 

       https://www.dropbox.com/s/zlfyiwa70poqg74/ProductPurchaseData.txt?dl=0

Each line in this dataset represents a browsing session of a customer. On each line, each string of 8 characters represents the id of an item browsed during that session. The items are separated by spaces.

**_Here are the first few lines of the ProductPurchaseData_**   
FRO11987 ELE17451 ELE89019 SNA90258 GRO99222 
GRO99222 GRO12298 FRO12685 ELE91550 SNA11465 ELE26917 ELE52966 FRO90334 SNA30755 ELE17451 FRO84225 SNA80192 
ELE17451 GRO73461 DAI22896 SNA99873 FRO86643 
ELE17451 ELE37798 FRO86643 GRO56989 ELE23393 SNA11465 
ELE17451 SNA69641 FRO86643 FRO78087 SNA11465 GRO39357 ELE28573 ELE11375 DAI54444 

Do some exploratory data analysis of this dataset guided by the following questions: 

* How many unique items are available from this supplier?

Using a single reducer: 

* Report your findings such as number of unique products; 
* largest basket; 
* report the top 50 most frequently purchased items,  their frequency,  and their relative frequency (break ties by sorting the products alphabetical order) etc. using Hadoop Map-Reduce. 

In [442]:
# Load text file into HDFS

#!hdfs dfs -put ProductPurchaseData.txt /user/graymatter/ProductPurchaseData.txt

!hdfs dfs -ls /user/graymatter/ProductPurchaseData.txt

16/05/30 15:02:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-rw-r--r--   1 jamesgray supergroup    3458517 2016-05-28 14:59 /user/graymatter/ProductPurchaseData.txt


### HW3.3 - Mapper

Given the calculations that are required we must:

* produce KV pairs that include the products but also by basket since there is question regarding the largest basket size.  
* count the total number of products to calculate relative frequency.
* count the number of products by basket to determine the largest basket size

In [443]:
%%writefile mapper33.py
#!/usr/bin/python
## mapper33.py
## Author: James Gray
## Description: reducer code for HW3.3

import sys
basket_number = 0
total_products = 0

# capture how many times Mapper is executed in a user-defined counter
sys.stderr.write("reporter:counter:Mapper Counter,Calls,1\n")

for line in sys.stdin:
    line = line.strip()
    products = line.split() # each line is split into products using a space between products
    
    # create basket ID
    basket_id = "basket_" + str(basket_number)
    
    for product in products:
        total_products+=1 # increment to calculate total number of products across all buckets
        # emit K-V pairs: product \t 1 \t basket_size \t basket_id
        print('%s\t%s\t%s\t%s') % (product, 1, str(len(products)), basket_id)
        
    # increment basket number
    basket_number+=1
    
# emit total number of products across all baskets
print('%s\t%s\t%s\t%s') % ("*TOTAL.PRODUCTS", str(total_products), 0, 0)

Overwriting mapper33.py


In [444]:
%%writefile reducer33.py
#!/usr/bin/python
## reducer33.py
## Author: James Gray
## Description: reducer code for HW3.3

from __future__ import division
from operator import itemgetter

import sys

# capture how many times Reducer is executed in a user-defined counter
sys.stderr.write("reporter:counter:Reducer,Calls,1\n")

current_product = None
current_count = 0
product = None
total_products = 0
unique_products = 0
baskets={} # dict to hold basket_id, basket_size
prod_relative = 0.0

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    product, count, basket_size, basket_id = line.split('\t')
    
    #convert strings into integers
    count = int(count)
    basket_size = int(basket_size)
        
    # total the words to calculate a relative frequency
    # this should be the first row of the mapper output file as its sorted
    if product == "*TOTAL.PRODUCTS":
        total_products = count
        
    # keep track of the basket_id, basket_size
    baskets[basket_id] = {'size': basket_size}

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_product == product:
        current_count += count
    else:
        if current_product:
            # write result to STDOUT
            prod_relative = current_count/total_products
            print '%s\t%s\t%s' % (current_product, str(current_count), str(prod_relative))
            unique_products+=1
        current_count = count
        current_product = product

# do not forget to output the last word if needed!
if current_product == product:
    #print '%s\t%s' % (current_word, current_count)
    # reverse the output so we can sort on the current_count
    prod_relative = current_count/total_products
    print '%s\t%s\t%s' % (current_product, str(current_count), str(prod_relative))
    unique_products+=1

# emit the largest basket size
largest_basket = max(baskets, key=baskets.get)
largest_basketsize = baskets[largest_basket]['size']
print("LARGEST.BASKET\t" + largest_basket + '\t' + str(largest_basketsize))

# emit the number of unique products
print("UNIQUE.PRODUCTS\t" + str(unique_products))

Overwriting reducer33.py


In [445]:
!chmod +x mapper33.py
!chmod +x reducer33.py

In [446]:
!cat ProductPurchaseData.txt | ./mapper33.py > output33.txt

reporter:counter:Mapper Counter,Calls,1


In [447]:
# delete output directory
!hdfs dfs -rm -r /user/graymatter/hw33-output

!hadoop jar /usr/local/Cellar/hadoop/2.7.1/libexec/share/hadoop/tools/lib/hadoop-streaming-2.7.1.jar \
-D mapred.map.tasks=2 \
-D mapred.reduce.tasks=1 \
-files mapper33.py,reducer33.py -mapper mapper33.py -reducer reducer33.py \
-input /user/graymatter/ProductPurchaseData.txt -output /user/graymatter/hw33-output

16/05/30 15:36:47 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/30 15:36:48 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/graymatter/hw33-output
16/05/30 15:36:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/30 15:36:50 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/05/30 15:36:50 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/05/30 15:36:50 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/05/30 15:36:50 INFO mapred.FileInputFormat: Total input paths to process : 1
16/05/30 15:36:50 INFO mapreduce.JobSubmitter: number of splits:1
16/05/30 15:36:50 INFO Configuration.

In [242]:
!hdfs dfs -cat /user/graymatter/hw33-output/*

16/05/29 11:09:36 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
*TOTAL.PRODUCTS	380824	1.0
DAI11153	8	2.10070793858e-05
DAI11223	155	0.000407012163099
DAI11238	3	7.87765476966e-06
DAI11257	1	2.62588492322e-06
DAI11261	6	1.57553095393e-05
DAI11273	1	2.62588492322e-06
DAI11290	5	1.31294246161e-05
DAI11299	2	5.25176984644e-06
DAI11375	1	2.62588492322e-06
DAI11462	8	2.10070793858e-05
DAI11541	5	1.31294246161e-05
DAI11552	8	2.10070793858e-05
DAI11555	25	6.56471230805e-05
DAI11582	1	2.62588492322e-06
DAI11613	2	5.25176984644e-06
DAI11695	5	1.31294246161e-05
DAI11707	1	2.62588492322e-06
DAI11778	117	0.000307228536017
DAI11927	73	0.000191689599395
DAI11946	1	2.62588492322e-06
DAI11995	1	2.62588492322e-06
DAI12036	6	1.57553095393e-05
DAI12139	1	2.62588492322e-06
DAI12152	1	2.62588492322e-06
DAI12275	2	5.25176984644e-06
DAI12437	16	4.20141587715e-05
DAI12448	1	2.62588492322e-06
DAI12460	1	2.62588492322e-06
DAI125

### HW3.3. Run MR Job with Sort

In [243]:
# delete output directory
!hdfs dfs -rm -r /user/graymatter/hw33-outputsorted

!hadoop jar /usr/local/Cellar/hadoop/2.7.1/libexec/share/hadoop/tools/lib/hadoop-streaming-2.7.1.jar \
-D mapred.map.tasks=2 \
-D mapred.reduce.tasks=1 \
-D stream.num.map.output.key.fields=2 \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options='-k2,2nr -k1,1' \
-file identity.py -mapper identity.py -reducer identity.py \
-input /user/graymatter/hw33-output/ -output /user/graymatter/hw33-outputsorted

16/05/29 11:09:46 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
rm: `/user/graymatter/hw33-outputsorted': No such file or directory
16/05/29 11:09:48 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/05/29 11:09:48 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [identity.py] [] /var/folders/ld/9wpyxfw13t7_pdv_0b8958x40000gn/T/streamjob1149202741170728182.jar tmpDir=null
16/05/29 11:09:49 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/05/29 11:09:49 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/05/29 11:09:49 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/05/29 11:09:49 INFO mapred.FileInputFormat: To

In [245]:
!hdfs dfs -cat /user/graymatter/hw33-outputsorted/*

16/05/29 11:10:34 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
*TOTAL.PRODUCTS	380824	1.0
UNIQUE.PRODUCTS	12593
DAI62779	6667	0.0175067747831
FRO40251	3881	0.010191059387
ELE17451	3875	0.0101753040775
GRO73461	3602	0.00945843749344
SNA80324	3044	0.00799319370628
ELE32164	2851	0.0074863979161
DAI75645	2736	0.00718442114993
SNA45677	2455	0.0064465474865
FRO31317	2330	0.0061183118711
DAI85309	2293	0.00602115412894
ELE26917	2292	0.00601852824402
FRO80039	2233	0.00586360103355
GRO21487	2115	0.00555374661261
SNA99873	2083	0.00546971829507
GRO59710	2004	0.00526227338613
GRO71621	1920	0.00504169905258
FRO85978	1918	0.00503644728273
GRO30386	1840	0.00483162825872
ELE74009	1816	0.00476860702057
GRO56726	1784	0.00468457870302
DAI63921	1773	0.00465569396887
GRO46854	1756	0.00461105392517
ELE66600	1713	0.00449814087347
DAI83733	1712	0.00449551498855
FRO32293	1702	0.00446925613932
ELE66810	1697	0.0044561267147
SNA55

### Top 50 products -> product | purchase frequency | relative frequency

In [247]:
!hdfs dfs -cat /user/graymatter/hw33-outputsorted/* | head -52

16/05/29 11:15:18 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
*TOTAL.PRODUCTS	380824	1.0
UNIQUE.PRODUCTS	12593
DAI62779	6667	0.0175067747831
FRO40251	3881	0.010191059387
ELE17451	3875	0.0101753040775
GRO73461	3602	0.00945843749344
SNA80324	3044	0.00799319370628
ELE32164	2851	0.0074863979161
DAI75645	2736	0.00718442114993
SNA45677	2455	0.0064465474865
FRO31317	2330	0.0061183118711
DAI85309	2293	0.00602115412894
ELE26917	2292	0.00601852824402
FRO80039	2233	0.00586360103355
GRO21487	2115	0.00555374661261
SNA99873	2083	0.00546971829507
GRO59710	2004	0.00526227338613
GRO71621	1920	0.00504169905258
FRO85978	1918	0.00503644728273
GRO30386	1840	0.00483162825872
ELE74009	1816	0.00476860702057
GRO56726	1784	0.00468457870302
DAI63921	1773	0.00465569396887
GRO46854	1756	0.00461105392517
ELE66600	1713	0.00449814087347
DAI83733	1712	0.00449551498855
FRO32293	1702	0.00446925613932
ELE66810	1697	0.0044561267147
SNA55

### HW3.3 - Output Summary

* Total number of unique products = 12,593
* Total number of products sold = 380,824
* Largest basket = 37 products in basket_id 7033 


## HW3.3.1 OPTIONAL 

Using 2 reducers:  Report your findings such as number of unique products; largest basket; report the top 50 most frequently purchased items,  their frequency,  and their relative frequency (break ties by sorting the products alphabetical order) etc. using Hadoop Map-Reduce. 


## HW3.4 - (Computationally prohibitive but then again Hadoop can handle this) Pairs

Suppose we want to recommend new products to the customer based on the products they have already browsed on the online website. Write a map-reduce program to find products which are frequently browsed together. Fix the support count (cooccurence count) to s = 100 (i.e. product pairs need to occur together at least 100 times to be considered frequent) and find pairs of items (sometimes referred to itemsets of size 2 in association rule mining) that have a support count of 100 or more.

List the top 50 product pairs with corresponding support count (aka frequency), and relative frequency or support (number of records where they coccur, the number of records where they coccur/the number of baskets in the dataset)  in decreasing order of support  for frequent (100>count) itemsets of size 2. 

Use the Pairs pattern (lecture 3)  to  extract these frequent itemsets of size 2. Free free to use combiners if they bring value. Instrument your code with counters for count the number of times your mapper, combiner and reducers are called.  

Please output records of the following form for the top 50 pairs (itemsets of size 2): 

      item1, item2, support count, support

Fix the ordering of the pairs lexicographically (left to right), and break ties in support (between pairs, if any exist) by taking the first ones in lexicographically increasing order. 

Report  the compute time for the Pairs job. Describe the computational setup used (E.g., single computer; dual core; linux, number of mappers, number of reducers)Instrument your mapper, combiner, and reducer to count how many times each is called using Counters and report these counts.

Pairs Design Pattern Pseudocode from Async Week 3

![pairs](img/pairs.png)


### HW3.4 Mapper

This mapper will implement the pseudo-code above by emitting 

In [316]:
%%writefile mapper34.py
#!/usr/bin/python
## mapper34.py
## Author: James Gray
## Description: reducer code for HW3.4

import sys
baskets_total = 0 #calculate the total number of baskets in the dataset

# capture how many times Mapper is executed in a user-defined counter
sys.stderr.write("reporter:counter:Mapper,Calls,1\n")

total_pairs=0

for line in sys.stdin:
    line = line.strip()
    products = line.split() # each line is split into products using a space between products
    
    # count the total number of baskets
    baskets_total+=1
    
    # iterate through each product in basket from left to right
    for i,product in enumerate(products): # this will return a tuple of index, product
    
        current_product = product
        
        # now index to the next products in the basket after the current product
        for next_product in products[i+1:]: #select the subset of products to the right of the current product
            
            # product a consistent itemset format by sorting the pairs
            itemset = sorted([current_product, next_product])
            
            # increment itemset pair count
            total_pairs+=1
            
            # emit K-V pairs: product \t 1 \t basket_size \t basket_id
            #print('%s\t%s\t%s') % (itemset[0],itemset[1], 1)
            print('%s\t%s') % (itemset[0]+','+itemset[1], 1)
        
# emit total number of baskets using "*" for order inversion to enable calcs in reducer
#print('%s\t%s') % ("*TOTAL.BASKETS", str(baskets_total))
print('%s\t%s') % ("*TOTAL.PAIRS", str(total_pairs))

Overwriting mapper34.py


### HW3.4 Reducer

In [317]:
%%writefile reducer34.py
#!/usr/bin/python
## reducer34.py
## Author: James Gray
## Description: reducer code for HW3.4

from __future__ import division # this needs to be the first line of code
import sys

# capture how many times Mapper is executed in a user-defined counter
sys.stderr.write("reporter:counter:Reducer,Calls,1\n")

s=100 # frequency of product pairs to be considered frequent

current_itemset = None

current_count = 0
product = None
total_products = 0
unique_products = 0

prod_relative = 0.0

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    itemset, count = line.split('\t')
    
    #convert strings into integers
    count = int(count)
        
    # total the words to calculate a relative frequency
    # this should be the first row of the mapper output file as its sorted
    if itemset == "*TOTAL.PAIRS":
        total_products = count
        
    # keep track of the basket_id, basket_size
    #baskets[basket_id] = {'size': basket_size}

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_itemset == itemset:
        current_count += count
    else:
        if current_itemset and current_count>=s:
            # write result to STDOUT
            prod_relative = current_count/total_products
            
            if current_itemset!= "*TOTAL.PAIRS":
                print '%s\t%s\t%s' % (current_itemset, str(current_count), str(prod_relative))
            #unique_products+=1
        current_count = count
        current_itemset = itemset

# do not forget to output the last word if needed!
if current_itemset and current_count>=s:
    
    prod_relative = current_count/total_products
    
    if current_itemset!= "*TOTAL.PAIRS":
        print '%s\t%s\t%s' % (current_itemset, str(current_count), str(prod_relative))
    #unique_products+=1

# emit the largest basket size
#largest_basket = max(baskets, key=baskets.get)
#largest_basketsize = baskets[largest_basket]['size']
#print("LARGEST.BASKET\t" + largest_basket + '\t' + str(largest_basketsize))

# emit the number of unique products
#print("UNIQUE.PRODUCTS\t" + str(unique_products))


Overwriting reducer34.py


In [318]:
!chmod +x mapper34.py
!chmod +x reducer34.py

In [425]:
#!cat ProductPurchaseData.txt | ./mapper34.py | sort > output34.txt

!cat output34.txt | ./reducer34.py | sort -k2,2nr > output34final.txt | head -10

reporter:counter:Reducer,Calls,1


In [319]:
# delete output directory
!hdfs dfs -rm -r /user/graymatter/hw34-output

!hadoop jar /usr/local/Cellar/hadoop/2.7.1/libexec/share/hadoop/tools/lib/hadoop-streaming-2.7.1.jar \
-D mapred.map.tasks=2 \
-D mapred.reduce.tasks=1 \
-files mapper34.py,reducer34.py -mapper mapper34.py -reducer reducer34.py \
-input /user/graymatter/ProductPurchaseData.txt -output /user/graymatter/hw34-output

16/05/29 20:04:42 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/29 20:04:43 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/graymatter/hw34-output
16/05/29 20:04:44 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/29 20:04:45 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/05/29 20:04:45 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/05/29 20:04:45 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/05/29 20:04:45 INFO mapred.FileInputFormat: Total input paths to process : 1
16/05/29 20:04:46 INFO mapreduce.JobSubmitter: number of splits:1
16/05/29 20:04:46 INFO Configuration.

In [399]:
!hdfs dfs -cat /user/graymatter/hw34-output/*

16/05/30 12:37:19 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
DAI16732,FRO78087	106	4.18301561488e-05
DAI18527,SNA44451	102	4.02516596904e-05
DAI22177,DAI31081	127	5.01172625557e-05
DAI22177,DAI62779	382	0.000150746411782
DAI22177,DAI63921	136	5.36688795872e-05
DAI22177,DAI75645	123	4.85387660972e-05
DAI22177,DAI83733	126	4.9722638441e-05
DAI22177,DAI85309	172	6.78753477132e-05
DAI22177,ELE17451	203	8.01086952661e-05
DAI22177,ELE26917	134	5.28796313579e-05
DAI22177,ELE32164	155	6.11667377648e-05
DAI22177,ELE34057	107	4.22247802634e-05
DAI22177,ELE56788	134	5.28796313579e-05
DAI22177,ELE66600	101	3.98570355758e-05
DAI22177,ELE66810	105	4.14355320342e-05
DAI22177,ELE74009	108	4.2619404378e-05
DAI22177,ELE91337	150	5.91936171917e-05
DAI22177,FRO31317	160	6.31398583378e-05
DAI22177,FRO32293	128	5.05118866703e-05
DAI22177,FRO40251	181	7.14269647447e-05
DAI22177,FRO66272	130	5.13011348995e-05
DAI22177,FRO78

In [320]:
!hdfs dfs -rm -r /user/graymatter/hw34-output-sorted

!hadoop jar /usr/local/Cellar/hadoop/2.7.1/libexec/share/hadoop/tools/lib/hadoop-streaming-2.7.1.jar \
-D mapred.map.tasks=2 \
-D mapred.reduce.tasks=1 \
-D stream.num.map.output.key.fields=2 \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options='-k2,2nr' \
-files identity.py -mapper identity.py -reducer identity.py \
-input /user/graymatter/hw34-output -output /user/graymatter/hw34-output-sorted

16/05/29 20:05:46 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/29 20:05:47 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/graymatter/hw34-output-sorted
16/05/29 20:05:48 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/29 20:05:49 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/05/29 20:05:49 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/05/29 20:05:49 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/05/29 20:05:50 INFO mapred.FileInputFormat: Total input paths to process : 1
16/05/29 20:05:50 INFO mapreduce.JobSubmitter: number of splits:1
16/05/29 20:05:50 INFO Configu

In [313]:
!hdfs dfs -cat /user/graymatter/hw34-output-sorted/*

16/05/29 19:46:31 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
DAI62779,ELE17451	1592	0.000628241590461
FRO40251,SNA80324	1412	0.000557209249831
DAI75645,FRO40251	1254	0.000494858639723
FRO40251,GRO85051	1213	0.000478679051024
DAI62779,GRO73461	1139	0.000449476866542
DAI75645,SNA80324	1130	0.000445925249511
DAI62779,FRO40251	1070	0.000422247802634
DAI62779,SNA80324	923	0.000364238057786
DAI62779,DAI85309	918	0.000362264937213
ELE32164,GRO59710	911	0.000359502568411
FRO40251,GRO73461	882	0.000348058469087
DAI62779,DAI75645	882	0.000348058469087
DAI62779,ELE92920	877	0.000346085348514
FRO40251,FRO92469	835	0.000329511135701
DAI62779,ELE32164	832	0.000328327263357
DAI75645,GRO73461	712	0.000280972369603
DAI43223,ELE32164	711	0.000280577745489
DAI62779,GRO30386	709	0.00027978849726
ELE17451,FRO40251	697	0.000275053007884
DAI85309,ELE99737	659	0.000260057291529
DAI62779,ELE26917	650	0.000256505674497
GRO214

In [314]:
!hdfs dfs -cat /user/graymatter/hw34-output-sorted/* | head -50

16/05/29 19:46:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
DAI62779,ELE17451	1592	0.000628241590461
FRO40251,SNA80324	1412	0.000557209249831
DAI75645,FRO40251	1254	0.000494858639723
FRO40251,GRO85051	1213	0.000478679051024
DAI62779,GRO73461	1139	0.000449476866542
DAI75645,SNA80324	1130	0.000445925249511
DAI62779,FRO40251	1070	0.000422247802634
DAI62779,SNA80324	923	0.000364238057786
DAI62779,DAI85309	918	0.000362264937213
ELE32164,GRO59710	911	0.000359502568411
FRO40251,GRO73461	882	0.000348058469087
DAI62779,DAI75645	882	0.000348058469087
DAI62779,ELE92920	877	0.000346085348514
FRO40251,FRO92469	835	0.000329511135701
DAI62779,ELE32164	832	0.000328327263357
DAI75645,GRO73461	712	0.000280972369603
DAI43223,ELE32164	711	0.000280577745489
DAI62779,GRO30386	709	0.00027978849726
ELE17451,FRO40251	697	0.000275053007884
DAI85309,ELE99737	659	0.000260057291529
DAI62779,ELE26917	650	0.000256505674497
GRO214

In [315]:
!hdfs dfs -cat /user/graymatter/hw34-output-sorted/* | tail -10

16/05/29 19:48:10 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
ELE14480,SNA80324	100	3.94624114611e-05
ELE17451,ELE37770	100	3.94624114611e-05
GRO46854,SNA66583	100	3.94624114611e-05
GRO73461,GRO88511	100	3.94624114611e-05
FRO80039,GRO64900	100	3.94624114611e-05
DAI62779,GRO17075	100	3.94624114611e-05
FRO78087,GRO94758	100	3.94624114611e-05
FRO78087,GRO30386	100	3.94624114611e-05
DAI63921,ELE11160	100	3.94624114611e-05
GRO38814,SNA93860	100	3.94624114611e-05


### HW3.4 Conclusions

This code was run on a MacBook with 16GB RAM, 2cores, 3.1GHz.  

First MR job with 2 mappers(ran 2x), 1 reducer (ran 1x) ran in 27 seconds
Second MR job with 2 mappers(ran 1x, 1 reducer (ran 1x) ran in 4 seconds



## HW3.5: Stripes

Repeat 3.4 using the stripes design pattern for finding cooccuring pairs.

* Report the compute times for stripes job versus the Pairs job. 
* Describe the computational setup used (E.g., single computer; dual core; linux, number of mappers, number of reducers)
* Instrument your mapper, combiner, and reducer to count how many times each is called using Counters and report these counts. Discuss the differences in these counts between the Pairs and Stripes jobs

![stripes](img/stripes.png)
![stripes2](img/stripes2.png)

### HW3.5 Mapper

The mapper will emit a KV-pair based on the design pattern above (product \t associative array). 

In [427]:
%%writefile mapper35.py
#!/usr/bin/python
## mapper35.py
## Author: James Gray
## Description: reducer code for HW3.4

import sys

# capture how many times Mapper is executed in a user-defined counter
sys.stderr.write("reporter:counter:Mapper Counter,Calls,1\n")

total_pairs=0

for line in sys.stdin:
    line = line.strip()
    products = line.split() # each line is split into products using a space between products
    
    try:
        products.sort() # get all products left to right alphabetical
    except:
        pass
    
    # count the total number of baskets
    #baskets_total+=1
    
    # iterate through each product in basket from left to right
    for i,product in enumerate(products): # this will return a tuple of index, product
    
        current_product = product
        
        stripes = {} # use dictionary for associative array
        
        # now index to the next products in the basket after the current product
        for next_product in products[i+1:]: #select the subset of products to the right of the current product
            
            # increment itemset pair count
            total_pairs+=1
            
            # build associative array for current product
            try:
                # increment count if product appears in basket more than once
                stripes[next_product]+=1
            except KeyError:
                # add next_product to dictionary for current product
                stripes[next_product]=1
            
        # emit K-V pairs: current product \t stripes
        print('%s\t%s') % (current_product, str(stripes))
        
# emit total number of baskets using "*" for order inversion to enable calcs in reducer
print('%s\t%s') % ("*TOTAL.PAIRS", str(total_pairs))

Overwriting mapper35.py


### HW3.5 Reducer

This reducer will add the associative arrays with the same product key

In [413]:
%%writefile reducer35.py
#!/usr/bin/python
## reducer35.py
## Author: James Gray
## Description: reducer code for HW3.5

from __future__ import division # this needs to be the first line of code
import sys
from collections import Counter

# capture how many times Reducer is executed in a user-defined counter
sys.stderr.write("reporter:counter:Reducer Counter,Calls,1\n")

s=100 # frequency of product pairs to be considered frequent

current_product = None

current_count = 0
product = None
total_products = 0
unique_products = 0
final_counter = Counter({})
#prod_dict ={}

prod_relative = 0.0

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    product, prod_dict = line.split('\t')
    
    # total the words to calculate a relative frequency
    # this should be the first row of the mapper output file as its sorted
    if product == "*TOTAL.PAIRS":
        total_products = int(prod_dict)
    
    # convert dicionary string back to dictionary object
    if product != "*TOTAL.PAIRS":
        prod_dict = Counter(eval(prod_dict))
    
    # keep track of the basket_id, basket_size
    #baskets[basket_id] = {'size': basket_size}

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_product == product:
        # https://docs.python.org/3.5/library/collections.html#collections.Counter
        # Two counters can be added and the dict values are added
        final_counter += Counter(prod_dict)
    else:
        if current_product and current_product!= "*TOTAL.PAIRS":
            # write result to STDOUT
            
            # iterate through each of the dictionary entries and check if frequency > s
            for key in final_counter:
                if final_counter[key] >= s:
                    print '%s\t%s\t%s\t%s' % (current_product, key, str(final_counter[key]),
                                              str((final_counter[key]/total_products)))
        current_product=product
        final_counter = prod_dict #reset counter to current product

# do not forget to output the last word if needed!
if current_product and current_product!= "*TOTAL.PAIRS":
    
    # iterate through each of the dictionary entries and check if frequency > s
    for key in final_counter:
        if final_counter[key] >= s:
            print '%s\t%s\t%s\t%s' % (current_product, key, str(final_counter[key]),
                                      str((final_counter[key]/total_products)))

# emit the largest basket size
#largest_basket = max(baskets, key=baskets.get)
#largest_basketsize = baskets[largest_basket]['size']
#print("LARGEST.BASKET\t" + largest_basket + '\t' + str(largest_basketsize))

# emit the number of unique products
#print("UNIQUE.PRODUCTS\t" + str(unique_products))


Overwriting reducer35.py


In [414]:
!chmod +x mapper35.py
!chmod +x reducer35.py

### HW3.5 - Execute MR Job on Unix

In [430]:
#!cat ProductPurchaseData.txt | ./mapper35.py | sort > output35.txt

!cat output35.txt | ./reducer35.py | sort -k3,3nr > output35final.txt 

reporter:counter:Reducer,Calls,1


### HW3.5 - MR Job #1

In [431]:
# delete output directory
!hdfs dfs -rm -r /user/graymatter/hw35-output

!hadoop jar /usr/local/Cellar/hadoop/2.7.1/libexec/share/hadoop/tools/lib/hadoop-streaming-2.7.1.jar \
-D mapred.map.tasks=2 \
-D mapred.reduce.tasks=1 \
-files mapper35.py,reducer35.py -mapper mapper35.py -reducer reducer35.py \
-input /user/graymatter/ProductPurchaseData.txt -output /user/graymatter/hw35-output

16/05/30 14:41:00 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/30 14:41:00 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/graymatter/hw35-output
16/05/30 14:41:01 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/30 14:41:02 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/05/30 14:41:02 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/05/30 14:41:02 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/05/30 14:41:02 INFO mapred.FileInputFormat: Total input paths to process : 1
16/05/30 14:41:02 INFO mapreduce.JobSubmitter: number of splits:1
16/05/30 14:41:03 INFO Configuration.

In [409]:
!hdfs dfs -cat /user/graymatter/hw35-output/* | head -50

16/05/30 13:33:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
DAI11778	DAI62779	169	6.66914753693e-05
DAI11778	SNA80324	120	4.73548937534e-05
DAI11778	DAI75645	128	5.05118866703e-05
DAI11927	GRO73461	122	4.81441419826e-05
DAI11927	GRO46854	155	6.11667377648e-05
DAI13194	FRO31077	286	0.000112862496779
DAI13194	GRO67376	252	9.94452768821e-05
DAI13266	GRO44993	131	5.16957590141e-05
DAI13788	DAI31081	102	4.02516596904e-05
DAI13788	FRO31317	111	4.38032767219e-05
DAI13788	GRO32086	140	5.52473760456e-05
DAI13902	FRO41069	110	4.34086526073e-05
DAI13902	DAI62779	281	0.000110889376206
DAI13902	ELE17451	162	6.39291065671e-05
DAI13902	SNA80324	110	4.34086526073e-05
DAI13902	ELE32164	180	7.10323406301e-05
DAI13902	SNA55952	135	5.32742554725e-05
DAI14125	FRO78087	188	7.4189333547e-05
DAI14125	ELE12845	199	7.85301988077e-05
DAI14125	FRO85978	110	4.34086526073e-05
DAI14125	ELE30911	172	6.78753477132e-05
DAI14125	DAI1

### HW3.5 - MR Job #2

In [432]:
!hdfs dfs -rm -r /user/graymatter/hw35-output-sorted

!hadoop jar /usr/local/Cellar/hadoop/2.7.1/libexec/share/hadoop/tools/lib/hadoop-streaming-2.7.1.jar \
-D mapred.map.tasks=2 \
-D mapred.reduce.tasks=1 \
-D stream.num.map.output.key.fields=3 \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options='-k3,3nr' \
-files identity.py -mapper identity.py -reducer identity.py \
-input /user/graymatter/hw35-output -output /user/graymatter/hw35-output-sorted

16/05/30 14:45:25 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/30 14:45:26 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/graymatter/hw35-output-sorted
16/05/30 14:45:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/30 14:45:28 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/05/30 14:45:28 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/05/30 14:45:28 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/05/30 14:45:28 INFO mapred.FileInputFormat: Total input paths to process : 1
16/05/30 14:45:28 INFO mapreduce.JobSubmitter: number of splits:1
16/05/30 14:45:28 INFO Configu

In [433]:
!hdfs dfs -cat /user/graymatter/hw35-output-sorted/* | head -50

16/05/30 14:45:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
DAI62779	ELE17451	1592	0.000628241590461
FRO40251	SNA80324	1412	0.000557209249831
DAI75645	FRO40251	1254	0.000494858639723
FRO40251	GRO85051	1213	0.000478679051024
DAI62779	GRO73461	1139	0.000449476866542
DAI75645	SNA80324	1130	0.000445925249511
DAI62779	FRO40251	1070	0.000422247802634
DAI62779	SNA80324	923	0.000364238057786
DAI62779	DAI85309	918	0.000362264937213
ELE32164	GRO59710	911	0.000359502568411
DAI62779	DAI75645	882	0.000348058469087
FRO40251	GRO73461	882	0.000348058469087
DAI62779	ELE92920	877	0.000346085348514
FRO40251	FRO92469	835	0.000329511135701
DAI62779	ELE32164	832	0.000328327263357
DAI75645	GRO73461	712	0.000280972369603
DAI43223	ELE32164	711	0.000280577745489
DAI62779	GRO30386	709	0.00027978849726
ELE17451	FRO40251	697	0.000275053007884
DAI85309	ELE99737	659	0.000260057291529
DAI62779	ELE26917	650	0.000256505674497
GRO214

In [434]:
!hdfs dfs -cat /user/graymatter/hw35-output-sorted/* | tail -10

16/05/30 14:46:26 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
ELE17451	SNA59061	100	3.94624114611e-05
GRO59710	SNA93860	100	3.94624114611e-05
FRO80039	GRO64900	100	3.94624114611e-05
DAI85309	ELE14480	100	3.94624114611e-05
DAI62779	GRO17075	100	3.94624114611e-05
ELE17451	ELE37770	100	3.94624114611e-05
FRO78087	GRO94758	100	3.94624114611e-05
FRO78087	GRO30386	100	3.94624114611e-05
FRO40251	GRO56989	100	3.94624114611e-05
GRO38814	SNA93860	100	3.94624114611e-05


### HW3.5 Conclusions

This code was run on a MacBook with 16GB RAM, 2cores, 3.1GHz.  

* First MR job with 2 mappers(ran 2x), 1 reducer (ran 1x) ran in 27 seconds
* Second MR job with 2 mappers(ran 1x, 1 reducer (ran 1x) ran in 4 seconds
