# HW 2 - Naive Bayes in Hadoop MR
__`MIDS w261: Machine Learning at Scale | UC Berkeley School of Information | Fall 2018`__

In the live sessions for week 2 and week 3 you got some practice designing and debugging Hadoop Streaming jobs. In this homework we'll use Hadoop MapReduce to implement your first parallelized machine learning algorithm: Naive Bayes. As you develop your implementation you'll test it on a small dataset that matches the 'Chinese Example' in the _Manning, Raghavan and Shutze_ reading for Week 2. For the main task in this assignment you'll be working with a small subset of the Enron Spam/Ham Corpus. By the end of this assignment you should be able to:
* __... describe__ the Naive Bayes algorithm including both training and inference.
* __... perform__ EDA on a corpus using Hadoop MR.
* __... implement__ parallelized Naive Bayes.
* __... constrast__ partial, unordered and total order sort and their implementations in Hadoop Streaming.
* __... explain__ how smoothing affects the bias and variance of a Multinomial Naive Bayes model.

As always, your work will be graded both on the correctness of your output and on the clarity and design of your code. __Please refer to the `README` for homework submission instructions.__ 

## Notebook Setup
Before starting, run the following cells to confirm your setup.

In [1]:
# imports
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
%reload_ext autoreload
%autoreload 2

In [2]:
# global vars (paths) - ADJUST AS NEEDED
JAR_FILE = "/usr/lib/hadoop-mapreduce/hadoop-streaming.jar"
HDFS_DIR = "/user/root/HW2"
HOME_DIR = "/media/notebooks/Assignments/HW2" # FILL IN HERE eg. /media/notebooks/Assignments/HW2

In [3]:
# save path for use in Hadoop jobs (-cmdenv PATH={PATH})
from os import environ
PATH  = environ['PATH']

In [4]:
# data path
ENRON = HOME_DIR + "/data/enronemail_1h.txt"

# Question 1: Hadoop MapReduce Key Takeaways.  

This assignment will be the only one in which you use Hadoop Streaming to implement a distributed algorithm. The key reason we continue to teach Hadoop streaming is because of the way it forces the programmer to think carefully about what is happening under the hood when you parallelize a calculation. This question will briefly highlight some of the most important concepts that you need to understand about Hadoop Streaming and MapReduce before we move on to Spark next week.   

### Q1 Tasks:

* __a) short response:__ What "programming paradigm" is Hadoop MapReduce based on? What are the main ideas of this programming paradigm and how does MapReduce exemplify these ideas?

* __b) short response:__ What is the Hadoop Shuffle? When does it happen? Why is it potentially costly? Describe one specific thing we can we do to mitigate the cost associated with this stage of our Hadoop Streaming jobs.

* __c) short response:__ In Hadoop Streaming why do the input and output record format of a combiner script have to be the same? [__`HINT`__ _what level of combining does the framework guarantee? what is the relationship between the record format your mapper emits and the format your reducer expects to receive?_]

* __d) short response:__ To what extent can you control the level of parallelization of your Hadoop Streaming jobs? Please be specific.

* __e) short response:__ What change in the kind of computing resources available prompted the creation of parallel computation frameworks like Hadoop? 

### Q1 Student Answers:

> __a)__ Hadoop MapReduce is based on **functional programming paradigm** in which mathematical evaluation is paramount without changing the state or mutable data. In contrast of *declarative programming* in which variables are declared and can be changed either locally or globally, functional programming evaluates a given argument mathematically and generates the output which does not change upon second time execution. MapReduce achieves this paradigm by divide and conquer method, which divides the data into many manageable partitions and executes mathematical operations in parallel. A key feature is the higher-order function that can accept the function as an argument and execute accordingly. Therefore, **Map** and **Reduce** can be attributed as higher-order functions that can accept other functions as arguments. Hadoop is a unique implementation of MapReduce with additional features such as distributed file system to hold a copious amount of data and resist data degeneration by duplicating in several nodes (HDFS). 

> __b)__ Hadoop shuffling is the transfer of output data from the mapper to the reducer. Because of the multiple outputs from multiple mappers, it's potentially costly when the reducer merges all partitioned data and combined the output. Reducer phase looks like a bottle neck where all processed data are coming into. The incoming data for mappers are future keys (eg, "birds", "fly"), but the output from mappers are {"birds": 20, "fly": 10} with respective values. It's costly especially when the data is large and all data on the fly are being stored and processed in memory and might run out of memory. In order to mitigate the cost, we can introduce in-mapper sorting function (combiner) that would sort each mapper output locally before relaying to the reducer, eg., aggregating the values of the same key in in-mapper phase, thereby reducing the network traffic between nodes. Therefore combiner phase can be thought of as "mini-reducer" phase. When combining or reducing at the reducer phase, we can also implement *merge sort* approach which will greatly reduce the cost and synchronize the communication during the shuffling phase. 

> __c)__ Since the combiners serve as local aggregators in order to alleviate the burden on reducers, combiners can be considered as "mini-reducers". However combiners are not compulsory steps in order to process the data at the reducers phase. Combiners are optmization steps. Rightly so, combiners might not receive all the keys-values pairs locally. Therefore, it's essential to have the input and ouput format of the combiners to be the same so that reducers can recognize the input format even if some data are not processed by combiners and coming directly from the mappers. MapReduce framework guarantee at least in-mapper combining level. The relationship between the mappers and the reducers is consistent in terms of key-value pairs. The reducer receives the intermediate data, sorted by the key. However there's no guarantee that ordering relationship for keys across different reducers. 

> __d)__ We can control the level of parallelization by adjusting the number of mappers and reducers. Since the parallelization comes with communication costs, the higher the level of parallelization (number of tasks), the higher the cost, and ultimately results in diminishing return. 

> __e)__ The transition from single core CPU to multi-core CPUs prompted the creation of parallel computation. Hadoop is a combination of parallel computation with additional features such as distributed file system (HDFS), thereby achieving the scalability and data integrity. 

# Question 2: MapReduce Design Patterns.  

In the last two live sessions and in your readings from Lin & Dyer you encountered a number of techniques for manipulating the logistics of a MapReduce implementation to ensure that the right information is available at the right time and location. In this question we'll review a few of the key techniques you learned.   

### Q2 Tasks:

* __a) short response:__ What are counters (in the context of Hadoop Streaming)? How are they useful? What kinds of counters does Hadoop provide for you? How do you create your own custom counter?

* __b) short response:__ What are composite keys? How are they useful? How are they related to the idea of custom partitioning?

* __c) short response:__ What is the order inversion pattern? What problem does it help solve? How do we implement it? 

### Q2 Student Answers:

> __a)__ **Counters** are as the name suggests counting events during MapReduce execution, eg, number of successful tasks, number of corrupt records, number of times a certain conditions is met. Hadoop provides a lightweight API counters such as Job Tracking counters, file system counters, Job counter and so on. We can also create our own custom counters by printing incremental outputs under certain conditions in the mapper, combiner, or reducer scripts eg, `if (counter % 10000 == 0): print("{} record mapped".format(counter))`

> __b)__ A composite key is when the value-to-key conversion design pattern is employed. Eg, when the part of the value from the mapper function is moved to the existing key, creating an intermediate key, eg. {sensorID(key): timestamp, reading(value)} --> {sensorID, timestamp (key): reading(value)}. In this example, sensorID and timestamp become **a composite key**. Composite keys are useful because they provide an intermediate key sort and any other value emitted by the mapper function, which can now convert to the key. Secondary sorting can then be performed to support custom partitioning.

> __c)__ The **order inversion** pattern takes advantage of the sorting phase of the mapper and executes the aggregate statistics which will make up the special key-value pair. For example, if we want to calculate the relative frequency of certain word pairs on either of the base words, we need the total number of the base word in the first place. Since the reducer is executing aggregate statistics, in order to calculate the relative frequency, we need to emit the total number of the base word to the reducer first. Therefore, as the procedure is implied, this pattern is called **order inversion** pattern.  
>  
> To implement as an example, we can imagine that we want to calculate the relative frequency of word pairs `("birds", "fly")` among all the `("birds")` word in the document. First we will create a special key-value pair, e.g, `("birds", "*")` which is a total aggregate value of all the `("birds")` words. The special key-value pair is sent to the reducer ahead of the time before the normal key-value pairs are sent, eg, `("birds", "fly")`, `("birds", "perch")`, `("birds", "dive")`. Therefore, order inversion helps solve the problem of identifying the relative frequency, which requires the total number of words to use as a denominator. For example, the special key-value pair is sent to the reducer by hash function to make sure that the same first words are emitted to the same reducer, here `("birds")`. The intermediate key `birds(key)` is shuffled to the same reducer by hashcode of the key modulo the number of reducers. When the special key-value `("birds", "*")` pairs arrives at the reducer, which will check against the 2nd word `"*"`. If the current word in the reducer is not equal to the special character `"*"`, it will reset its counter value and sum up all the values to obtain the total number of the normal key-value pairs. By doing so, the reducer can calculate the relative frequency of the word pairs, e.g., `("birds", "fly")` word pair is commonly found and makes up 50% of the entire `("birds")` word. 

# Question 3: Understanding Total Order Sort

The key challenge in distributed computing is to break a problem into a set of sub-problems that can be performed without communicating with each other. Ideally, we should be able to define an arbirtary number of splits and still get the right result, but that is not always possible. Parallelization becomes particularly challenging when we need to make comparisons between records, for example when sorting. Total Order Sort allows us to order large datasets in a way that enables efficient retrieval of results. Before beginning this assignment, make sure you have read and understand the [Total Order Sort Notebook](https://github.com/UCB-w261/main/tree/master/HelpfulResources/TotalSortGuide/_total-sort-guide-spark2.01-JAN27-2017.ipynb). You can skip the first two MRJob sections, but the rest of section III and all of section IV are **very** important (and apply to Hadoop Streaming) so make sure to read them closely. Feel free to read the Spark sections as well but you won't be responsible for that material until later in the course. To verify your understanding, answer the following questions.

### Q3 Tasks:

* __a) short response:__ What is the difference between a partial sort, an unordered total sort, and a total order sort? From the programmer's perspective, what does total order sort allow us to do that we can't with unordered total? Why is this important with large datasets?

* __b) short response:__ Which phase of a MapReduce job is leveraged to implement Total Order Sort? Which default behaviors must be changed. Why must they be changed?

* __c) short response:__ Describe in words how to configure a Hadoop Streaming job for the custom sorting and partitioning that is required for Total Order Sort.  

* __d) short response:__ Explain why we need to use an inverse hash code function.

* __e) short response:__ Where does this function need to be located so that a Total Order Sort can be performed?

### Q3 Student Answers:

> __a)__ **A partial sort** is a key-sorted mapper output in each partition, but not across all partitions. eg  
> `mapperA = (A, B, H)`,  
> `mapperB = (C, D, I)`,  
> `mapperC = (E, F, G)`  
> We can see that in each partition, keys are sorted. However, when the mapperA and mapperB are compared, they need to be sorted again. Therefore it is a partial sort in the mapper function.  
>  
> **An unordered total sort** is a sorted mapper output in each and across all partitions. eg.,  
> `mapperA = (G, H, I)`,  
> `mapperB = (A, B, C)`,  
> `mapperC = (D, E, F)`  
> In unordered total sort, keys are sorted across all mappers. However when we want to compare mapperA and mapperB, we would have expected that mapperB should be the first to be **in order**. Therefore this approach is called **unordered total sort**.  
>  
> **A total order sort** is a completely sorted order not only in keys within each partition and across all partitions, but all partitions are also sorted.  
> `mapperA = (A, B, C)`,  
> `mapperB = (D, E, F)`,  
> `mapperC = (G, H, I)`  
> In total sort order, all keys are sorted within each partition and across all partitions. In addition, all partitions are sorted as well.  
>  
> From the programmer's perspective, total sort order is very crucial with respect to the memory and space constraint. By having a total sort order, we can easily retrieve the top partition and analyze the data in order. Or we can also retrieve the last partition to identify the lowest denominator in our data analysis. This is especially important in large dataset because we don't know which partition has the data we want to analyze and it can cause more overheads for reading and writing a large datafile in and out of memory. 

> __b)__ Shuffle phase is used to implement total order sort. Hadoop's default behavior is only a partial sort which is keys are sorted within each partitions. However partitions are not sorted. They must be changed so that a file can be easily retrieved and analyzed, based on the total sort order, especially a large data file. The other behavior in reducer also needs to change, i.e., the keys used to partition needs to be dropped. Reducers that we used so far, directly execute the record format from the mappers. However due to the partition information coming with key-value pairs, reducers need to drop the partition key (eg, hash function from the character of the word), and resume its execution with an original key-value pairs.  

> __c)__ Since Hadoop only provides a partial sort, we need to create a custom sorting and partitioning.  
> 1. Create in-mapper function that will custom-sort and partition based on the mapper key-value output. eg., mapper key can be transformed into hashcode by modulo the number of reducers or partitions we're going to create. For sorting a dictionary, we would create 26 partitions (reducers) to send all the same alphabets to the same reducer.  
> 2. Remove hashcode in reducers internally and retrieve original key-value pairs.  
> 3. Post-processing step to sort all the partitions. We have so far created an unordered total sort. But the sorted words that begin with a character, **`a`**, might be in partition 4 (e.g., part-00004). So we'd do the partition sort as post-process, which requires one record from each partition output and constructs an ordering function among all partitions. 

> __d)__ Usually, when we hash the partition key, the purpose is to generate the partition index for the key and the record will be emitted to the partition with respective to the index. However sometimes we might want to keep the partition keys in order and want to partition based on a desired index. **Inverse hashcode function** allows us to take the desired partition index and total number of partitions as inputs. It then computes a partition key based on our desired index. 

> __e)__ This inverse hashcode function needs to be implemented in Partitioner to create a total order sort. 

# About the Data
For the main task in this portion of the homework you will train a classifier to determine whether an email represents spam or not. You will train your Naive Bayes model on a 100 record subset of the Enron Spam/Ham corpus available in the HW2 data directory (__`HW2/data/enronemail_1h.txt`__).

__Source:__   
The original data included about 93,000 emails which were made public after the company's collapse. There have been a number raw and preprocessed versions of this corpus (including those available [here](http://www.aueb.gr/users/ion/data/enron-spam/index.html) and [here](http://www.aueb.gr/users/ion/publications.html)). The subset we will use is limited to emails from 6 Enron employees and a number of spam sources. It is part of [this data set](http://www.aueb.gr/users/ion/data/enron-spam/) which was created by researchers working on personlized Bayesian spam filters. Their original publication is [available here](http://www.aueb.gr/users/ion/docs/ceas2006_paper.pdf). __`IMPORTANT!`__ _For this homework please limit your analysis to the 100 email subset which we provide. No need to download or run your analysis on any of the original datasets, those links are merely provided as context._

__Preprocessing:__  
For their work, Metsis et al. (the authors) appeared to have pre-processed the data, not only collapsing all text to lower-case, but additionally separating "words" by spaces, where "words" unfortunately include punctuation. As a concrete example, the sentence:  
>  `Hey Jon, I hope you don't get lost out there this weekend!`  

... would have been reduced by Metsis et al. to the form:  
> `hey jon , i hope you don ' t get lost out there this weekend !` 

... so we have reverted the data back toward its original state, removing spaces so that our sample sentence would now look like:
> `hey jon, i hope you don't get lost out there this weekend!`  

Thus we have at least preserved contractions and other higher-order lexical forms. However, one must be aware that this reversion is not complete, and that some object (specifically web sites) will be ill-formatted, and that all text is still lower-cased.


__Format:__   
All messages are collated to a tab-delimited format:  

>    `ID \t SPAM \t SUBJECT \t CONTENT \n`  

where:  
>    `ID = string; unique message identifier`  
    `SPAM = binary; with 1 indicating a spam message`  
    `SUBJECT = string; title of the message`  
    `CONTENT = string; content of the message`   
    
Note that either of `SUBJECT` or `CONTENT` may be "NA", and that all tab (\t) and newline (\n) characters have been removed from both of the `SUBJECT` and `CONTENT` columns.  

In [5]:
# take a look at the first 100 characters of the first 5 records (RUN THIS CELL AS IS)
!head -n 5 {ENRON} | cut -c-100

0001.1999-12-10.farmer	0	 christmas tree farm pictures	NA
0001.1999-12-10.kaminski	0	 re: rankings	 thank you.
0001.2000-01-17.beck	0	 leadership development pilot	" sally:  what timing, ask and you shall receiv
0001.2000-06-06.lokay	0	" key dates and impact of upcoming sap implementation over the next few week
0001.2001-02-07.kitchen	0	 key hr issues going forward	 a) year end reviews-report needs generating 


In [6]:
# see how many messages/lines are in the file 
#(this number may be off by 1 if the last line doesn't end with a newline)
!wc -l {ENRON}

100 /media/notebooks/Assignments/HW2/data/enronemail_1h.txt


In [7]:
# make the HDFS directory if it doesn't already exist
!hdfs dfs -mkdir {HDFS_DIR}

mkdir: `/user/root/HW2': File exists


In [8]:
# load the data into HDFS (RUN THIS CELL AS IS)
!hdfs dfs -copyFromLocal {ENRON} {HDFS_DIR}/enron.txt

copyFromLocal: `/user/root/HW2/enron.txt': File exists


# Question 4:  Enron Ham/Spam EDA.
Before building our classifier, lets get aquainted with our data. In particular, we're interested in which words occur more in spam emails than in real emails. In this question you'll implement two Hadoop MapReduce jobs to count and sort word occurrences by document class. You'll also learn about two new Hadoop streaming parameters that will allow you to control how the records output of your mappers are partitioned for reducing on separate nodes. 

__`IMPORTANT NOTE:`__ For this question and all subsequent items, you should include both the subject and the body of the email in your analysis (i.e. concatetate them to get the 'text' of the document).

### Q4 Tasks:
* __a) code:__ Complete the missing components of the code in __`EnronEDA/mapper.py`__ and __`EnronEDA/reducer.py`__ to create a Hadoop MapReduce job that counts how many times each word in the corpus occurs in an email for each class. Pay close attention to the data format specified in the docstrings of these scripts _-- there are a number of ways to accomplish this task, we've chosen this format to help illustrate a technique in `part e`_. Run the provided unit tests to confirm that your code works as expected then run the provided Hadoop streaming command to apply your analysis to the Enron data.


* __b) code + short response:__ How many times does the word "__assistance__" occur in each class? (`HINT:` Use a `grep` command to read from the results file you generated in '`a`' and then report the answer in the space provided.)


* __c) short response:__ Would it have been possible to add some sorting parameters to the Hadoop streaming command that would cause our `part a` results to be sorted by count? Briefly explain why or why not.


* __d) code + short response:__ Write a second Hadoop MapReduce job to sort the output of `part a` first by class and then by count. Run your job and save the results to a local file using the provide code. Then describe in words how you would go about printing the top 10 words in each class given this sorted output. (`HINT 1:` _remember that you can simply pass the `part a` output directory to the input field of this job; `HINT 2:` since this task is just reodering the records from `part a` we don't need to write a mapper or reducer, just use `/bin/cat` for both_)


* __ e) code:__ A more efficient alternative to '`grep`-ing' for the top 10 words in each class would be to use the Hadoop framework to separate records from each class into its own partition so that we can just read the top lines in each. Edit your job from ` part d` to specify 2 reduce tasks and to tell Hadoop to partition based on the second field (which indicates spam/ham in our data). Your code should maintain the secondary sort -- that is each partition should list words from most to least frequent.

### Q4 Student Answers:
> __b)__ The word "assistance" was found __8__ times in spam emails and **2** times in regular emails. 

> __c)__ No. Since the mapper and the reducer we implemented has a record format similar to a tuple (word, class, 1) and the count hasn't been combined. Therefore we won't be able to sort the words by aggregate counts yet. 

> __d)__ Since we partitioned into two, spam and ham emails and combined them in order when we saved (eg, > EnronEDA/sorted_results.txt), we can call the top word by  
> `!awk '$2==0' EnronEDA/sorted_results.txt | head -n 10` for regular emails  
> `!awk '$2==1' EnronEDA/sorted_results.txt | head -n 10` for spam emails  

In [9]:
# part a - do your work in the provided scripts then RUN THIS CELL AS IS
!chmod a+x EnronEDA/mapper.py
!chmod a+x EnronEDA/reducer.py

In [10]:
# part a - unit test EnronEDA/mapper.py (RUN THIS CELL AS IS)
!echo -e "d1	1	title	body\nd2	0	title	body" | EnronEDA/mapper.py

title	1	1
body	1	1
title	0	1
body	0	1


In [11]:
# part a - unit test EnronEDA/reducer.py (RUN THIS CELL AS IS)
!echo -e "one	1	1\none	0	1\none	0	1\ntwo	0	1" | EnronEDA/reducer.py

one	1	1
one	0	2
two	0	1


In [12]:
# part a - clear output directory in HDFS (RUN THIS CELL AS IS)
!hdfs dfs -rm -r {HDFS_DIR}/eda-output

Deleted /user/root/HW2/eda-output


In [13]:
# part a - Hadoop streaming job (RUN THIS CELL AS IS)
!hadoop jar {JAR_FILE} \
  -files EnronEDA/reducer.py,EnronEDA/mapper.py \
  -mapper mapper.py \
  -reducer reducer.py \
  -input {HDFS_DIR}/enron.txt \
  -output {HDFS_DIR}/eda-output \
  -numReduceTasks 2 \
  -cmdenv PATH={PATH}

packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.15.1.jar] /tmp/streamjob6446478263860175404.jar tmpDir=null
19/01/16 23:38:03 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/01/16 23:38:03 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/01/16 23:38:04 INFO mapred.FileInputFormat: Total input paths to process : 1
19/01/16 23:38:04 INFO mapreduce.JobSubmitter: number of splits:2
19/01/16 23:38:04 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1547262255608_0033
19/01/16 23:38:05 INFO impl.YarnClientImpl: Submitted application application_1547262255608_0033
19/01/16 23:38:05 INFO mapreduce.Job: The url to track the job: http://docker.w261:8088/proxy/application_1547262255608_0033/
19/01/16 23:38:05 INFO mapreduce.Job: Running job: job_1547262255608_0033
19/01/16 23:38:13 INFO mapreduce.Job: Job job_1547262255608_0033 running in uber mode : false
19/01/16 23:38:13 INFO mapreduce.Job:  map 0% reduce 0%
19/01

In [14]:
# part a - retrieve results from HDFS & copy them into a local file (RUN THIS CELL AS IS)
!hdfs dfs -cat {HDFS_DIR}/eda-output/part-0000* > EnronEDA/results.txt

In [15]:
# part b - write your grep command here
!grep 'assistance' EnronEDA/results.txt 

assistance	1	8
assistance	0	2


In [16]:
# part d/e - clear the output directory in HDFS (RUN THIS CELL AS IS)
!hdfs dfs -rm -r {HDFS_DIR}/eda-sort-output

Deleted /user/root/HW2/eda-sort-output


In [17]:
# part d/e - write your Hadoop streaming job here
!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=3 \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k2,2nr -k3,3nr" \
  -D mapreduce.partition.keypartitioner.options="-k2,2" \
  -mapper /bin/cat \
  -reducer /bin/cat \
  -input {HDFS_DIR}/eda-output \
  -output {HDFS_DIR}/eda-sort-output \
  -numReduceTasks 2 \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.15.1.jar] /tmp/streamjob7360827518110123786.jar tmpDir=null
19/01/16 23:39:01 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/01/16 23:39:01 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/01/16 23:39:02 INFO mapred.FileInputFormat: Total input paths to process : 2
19/01/16 23:39:02 INFO mapreduce.JobSubmitter: number of splits:2
19/01/16 23:39:02 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1547262255608_0034
19/01/16 23:39:03 INFO impl.YarnClientImpl: Submitted application application_1547262255608_0034
19/01/16 23:39:03 INFO mapreduce.Job: The url to track the job: http://docker.w261:8088/proxy/application_1547262255608_0034/
19/01/16 23:39:03 INFO mapreduce.Job: Running job: job_1547262255608_0034
19/01/16 23:39:11 INFO mapreduce.Job: Job job_1547262255608_0034 running in uber mode : false
19/01/16 23:39:11 INFO mapreduce.Job:  map 0% reduce 0%
19/01

In [18]:
# part e - view the top 10 records from each partition (RUN THIS CELL AS IS)
for idx in range(2):
    print(f"\n===== part-0000{idx}=====\n")
    !hdfs dfs -cat {HDFS_DIR}/eda-sort-output/part-0000{idx} | head


===== part-00000=====

the	0	549	
to	0	398	
ect	0	382	
and	0	278	
of	0	230	
hou	0	206	
a	0	196	
in	0	182	
for	0	170	
on	0	135	
cat: Unable to write to output stream.

===== part-00001=====

the	1	698	
to	1	566	
and	1	392	
your	1	357	
a	1	347	
you	1	345	
of	1	336	
in	1	236	
for	1	204	
com	1	153	
cat: Unable to write to output stream.


__Expected output:__
<table>
<th>part-00000:</th>
<th>part-00001:</th>
<tr><td><pre>
the	0	549	
to	0	398	
ect	0	382	
and	0	278	
of	0	230	
hou	0	206	
a	0	196	
in	0	182	
for	0	170	
on	0	135
</pre></td>
<td><pre>
the	1	698	
to	1	566	
and	1	392	
your	1	357	
a	1	347	
you	1	345	
of	1	336	
in	1	236	
for	1	204	
com	1	153
</pre></td></tr>
</table>

# Question 5: Counters and Combiners.
Tuning the number of mappers & reducers is helpful to optimize very large distributed computations. Doing so successfully requires a thorough understanding of the data size at each stage of the job. As you learned in the week3 live session, counters are an invaluable resource for understanding this kind of detail. In this question, we will take the EDA performed in Question 4 as an opportunity to illustrate some related concepts.

### Q5 Tasks:
* __a) short response:__ Read the Hadoop output from your job in Question 4a to report how many records are emitted by the mappers and how many records are received be the reducers. In the context of word counting what does this number represent practically?

* __b) code:__ Note that we wrote the reducer in question 4a such that the input and output record format is identical. This makes it easy to use the same reducer script as a combiner. In the space provided below, write the Hadoop Streaming command to re-run your job from question 4a with this combining added.

* __c) short response__: Report the number of records emitted by your mappers in part b and the number of records received by your reducers. Compare your results here to what you saw in part a. Explain.

* __d) short response__: Describe a scenario where using a combiner would _NOT_ improve the efficiency of the shuffle stage. Explain. [__`BONUS:`__ how does increasing the number of mappers affect the usefulness of a combiner?]

### Q5 Student Answers:
> __a)__ There are **31490** records emitted by the mappers and the same amount of records are received by the reducers. The 31490 records represent the total number of word occurrences in all 100 emails. 

> __c)__ With using reducer.py as *combiner* one step above, the number of records emitted by the mappers is still __31490__ but the number of records received by the reducers decreased to **7648**. It is because the combiner serves as a _**mini-reducer**_. We can also see that the output emitted by the combiner and the records received by the reducers is the same 7648. 

> __d)__ When the entire corpus of the documents, texts, emails, etc is very common with unique words, i.e., occurence of each word = 1, the presence of combiner (in-mapper function) would not improve the efficiency of the shuffle stage. The difference between the records emitted by the mapper and that of the combiner will be relatively small.  
> In addition, in any condition, increasing the number of mappers inversely affects the usefulness of a combiner because the combiner main purpose is to aggregate the same key one step ahead of the reducers. Therefore, by having more mappers it would defeat the purpose of combiners. 

In [19]:
# part b - clear output directory in HDFS (RUN THIS CELL AS IS)
!hdfs dfs -rm -r {HDFS_DIR}/eda-output

Deleted /user/root/HW2/eda-output


In [20]:
# part b - write your Hadoop streaming job here
!hadoop jar {JAR_FILE} \
  -files EnronEDA/reducer.py,EnronEDA/mapper.py \
  -mapper mapper.py \
  -combiner reducer.py \
  -reducer reducer.py \
  -input {HDFS_DIR}/enron.txt \
  -output {HDFS_DIR}/eda-output \
  -numReduceTasks 2 \
  -cmdenv PATH={PATH}

packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.15.1.jar] /tmp/streamjob786558373979355867.jar tmpDir=null
19/01/16 23:40:19 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/01/16 23:40:19 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/01/16 23:40:20 INFO mapred.FileInputFormat: Total input paths to process : 1
19/01/16 23:40:20 INFO mapreduce.JobSubmitter: number of splits:2
19/01/16 23:40:20 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1547262255608_0035
19/01/16 23:40:21 INFO impl.YarnClientImpl: Submitted application application_1547262255608_0035
19/01/16 23:40:21 INFO mapreduce.Job: The url to track the job: http://docker.w261:8088/proxy/application_1547262255608_0035/
19/01/16 23:40:21 INFO mapreduce.Job: Running job: job_1547262255608_0035
19/01/16 23:40:29 INFO mapreduce.Job: Job job_1547262255608_0035 running in uber mode : false
19/01/16 23:40:29 INFO mapreduce.Job:  map 0% reduce 0%
19/01/

# Question 6: Document Classification Task Overview.
The week 2 assigned reading from Chapter 13 of _Introduction to Information Retrieval_ by Manning, Raghavan and Schutze provides a thorough introduction to the document classification task and the math behind Naive Bayes. In this question we'll use the example from Table 13.1 (reproduced below) to 'train' an unsmoothed Multinomial Naive Bayes model and classify a test document by hand.

<table>
<th>DocID</th>
<th>Class</th>
<th>Subject</th>
<th>Body</th>
<tr><td>Doc1</td><td>1</td><td></td><td>Chinese Beijing Chinese</td></tr>
<tr><td>Doc2</td><td>1</td><td></td><td>Chinese Chinese Shanghai</td></tr>
<tr><td>Doc3</td><td>1</td><td></td><td>Chinese Macao</td></tr>
<tr><td>Doc4</td><td>0</td><td></td><td>Tokyo Japan Chinese</td></tr>
</table>

### Q6 Tasks:
* __a) short response:__ Equation 13.3 in Manning, Raghavan and Shutze shows how a Multinomial Naive Bayes model classifies a document. It predicts the class, $c$, for which the estimated conditional probability of the class given the document's contents,  $\hat{P}(c|d)$, is greatest. In this equation what two pieces of information are required to calculate  $\hat{P}(c|d)$? Your answer should include both mathematical notatation and verbal explanation.


* __b) short response:__ The Enron data includes two classes of documents: `spam` and `ham` (they're actually labeled `1` and `0`). In plain English, explain what  $\hat{P}(c)$ and   $\hat{P}(t_{k} | c)$ mean in the context of this data. How will we would estimate these values from a training corpus? How many passes over the data would we need to make to retrieve this information for all classes and all words?


* __c) hand calculations:__ Above we've reproduced the document classification example from the textbook (we added an empty subject field to mimic the Enron data format). Remember that the classes in this "Chinese Example" are `1` (about China) and `0` (not about China). Calculate the class priors and the conditional probabilities for an __unsmoothed__ Multinomial Naive Bayes model trained on this data. Show the calculations that lead to your result using markdown and $\LaTeX$ in the space provided or by embedding an image of your hand written work. [`NOTE:` _Your results should NOT match those in the text -- they are training a model with +1 smoothing you are training a model without smoothing_]


* __d) hand calculations:__ Use the model you trained to classify the following test document: `Chinese Chinese Chinese Tokyo Japan`. Show the calculations that lead to your result using markdown and   $\LaTeX$ in the space provided or by embedding an image of your hand written work.


* __e) short response:__ Compare the classification you get from this unsmoothed model in `d`/`e` to the results in the textbook's "Example 1" which reflects a model with Laplace plus 1 smoothing. How does smoothing affect our inference?

### Q6 Student Answers:
> __a)__ To calculate $\hat{P}(c|d)$, we need  
>  
> (1) $\hat{P}(c)$, the prior probablity of a document occuring in class c, and   
>  
> (2) $\prod_{1 \le k \le n_d} \hat{P}(t_k|c)$ where $P(t|c)$ is the conditional probability of term $t_k$ occurring in a document of class _**c**_. Since there could be hundreds or thousands of terms in each document, and in order to represent the document as class _**c**_ by all terms, we need to multiply the conditional probability of all terms.  
>  
> We used $\hat{P}$ in both parameters because we do not know the true value of the parameters and will estimate from the training texts. 


> __b)__ Assuming __c__ as a spam email, the $\hat{P}(c)$ would indicate the prior probability of the spam email. This can be calculated by  
> $\hat{P}(c) = \frac{\text{total number of spam emails}}{\text{total number of emails = spam + ham emails}}$  
>  
> $\hat{P}(t_k|c)$ is an estimated probability of a term $t_k$ given the email is a spam. This can be calculated by  
>  
> $\hat{P}(t_k|c) = \frac{\text{total number of occurrence of term }t_k \text{ in spam emails}}{\text{total number of words in spam emails}}$  
>   
> These calculations can be done by one pass through the training data.  


> __c)__ Show your calculations here using markdown & $\LaTeX$ or embed them below!  
>  
> $\hat{P}(c=1)$ = 3/4 = 0.75  
>  
> $\hat{P}(c=0)$ = 1/4 = 0.25  
>  
> $\hat{P}(Chinese|c=1)$ = 5/8  
>  
> $\hat{P}(Chinese|c=0)$ = $\hat{P}(Tokyo|c=0)$ = $\hat{P}(Japan|c=0)$ =1/3  
>   
> $\hat{P}(Beijing|c=1)$ = $\hat{P}(Shanghai|c=1)$ = $\hat{P}(Macao|c=1)$ = 1/8  
>  
> $\hat{P}(Beijing|c=0)$ = $\hat{P}(Shanghai|c=0)$ = $\hat{P}(Macao|c=0)$ = 0  
>  
> $\hat{P}(Tokyo|c=1)$ = $\hat{P}(Japan|c=1)$ = 0



> __d)__ Show your calculations here using markdown & $\LaTeX$ or embed them below!  
>  
> test doc, **d** = `['Chinese', 'Chinese', 'Chinese', 'Tokyo', 'Japan']`  
>  
> $\hat{P}(c=1|d)$ = $\hat{P}(c=1) \cdot (5/8)^3 \cdot 0 \cdot 0  = 3/4 \cdot 0 = 0$
>  
> $\hat{P}(c=0|d)$ = $\hat{P}(c=0) \cdot (1/3)^3 \cdot 1/3 \cdot 1/3 = 1/4 \cdot (1/3)^5 = 0.001$  


> __e)__ Our calculations show that the document __d__ would have been classified as non-Chinese (c=0) because we don't have conditional probabilities for terms like Tokyo and Japan in Chinese documents, $\hat{P}(Japan|c=1) = \hat{P}(Tokyo|c=1) = 0$. Intuitively, the test document would have been **_Chinese_** with more occurrence of _Chinese_ words. Since our Naive Bayes model hasn't seen the terms such as Tokyo, Japan in the training set, the model has classified the document inaccurately. With Laplace smoothing, the zero conditional probabilities become non-zero and help the model to make a more accurate prediction.  


In [None]:
# part d/e - if you didn't write out your calcuations above, embed a picture of them here:
from IPython.display import Image
Image(filename="path-to-hand-calulations-image.png")

# Question 7: Naive Bayes Inference.
In the next two questions you'll write code to parallelize the Naive Bayes calculations that you performed above. We'll do this in two phases: one MapReduce job to perform training and a second MapReduce to perform inference. While in practice we'd need to train a model before we can use it to classify documents, for learning purposes we're going to develop our code in the opposite order. By first focusing on the pieces of information/format we need to perform the classification (inference) task you should find it easier to develop a solid implementation for training phase when you get to question 8 below. In both of these questions we'll continue to use the Chinese example corpus from the textbook to help us test our MapReduce code as we develop it. Below we've reproduced the corpus, test set and model in text format that matches the Enron data.

### Q7 Tasks:
* __a) short response:__ run the provided cells to create the example files and load them in to HDFS. Then take a closer look at __`NBmodel.txt`__. This text file represents a Naive Bayes model trained (with Laplace +1 smoothing) on the example corpus. What are the 'keys' and 'values' in this file? Which record means something slightly different than the rest? The value field of each record includes two numbers which will be helpful for debugging but which we don't actually need to perform inference -- what are they? [`HINT`: _This file represents the model from Example 13.1 in the textbook, if you're having trouble getting oriented try comparing our file to the numbers in that example._]


* __b) short response:__ When performing Naive Bayes in practice instead of multiplying the probabilities (as in equation 13.3) we add their logs (as in equation 13.4). Why do we choose to work with log probabilities? If we had an unsmoothed model, what potential error could arise from this transformation?


* __c) short response:__ Documents 6 and 8 in the test set include a word that did not appear in the training corpus (and as a result does not appear in the model). What should we do at inference time when we need a class conditional probability for this word?


* __d) short response:__ The goal of our MapReduce job is to stream over the test set and classify each document by peforming the calculation from equation 13.4. To do this we'll load the model file (which contains the probabilities for equation 13.4) into memory on the nodes where we do our mapping. This is called an in-memory join. Does loading a model 'state' like this depart from the functional programming principles? Explain why or why not. From a scability perspective when would this kind of memory use be justified? when would it be unwise?


* __e) code:__ Complete the code in __`NaiveBayes/classify_mapper.py`__. Read the docstring carefully to understand how this script should work and the format it should return. Run the provided unit tests to confirm that your script works as expected then write a Hadoop streaming job to classify the Chinese example test set. [`HINT 1:` _you shouldn't need a reducer for this one._ `HINT 2:` _Don't forget to add the model file to the_ `-files` _parameter in your Hadoop streaming job so that it gets shipped to the mapper nodes where it will be accessed by your script._]


* __f) short response:__ In our test example and in the Enron data set we have fairly short documents. Since these fit fine in memory on a mapper node we didn't need a reducer and could just do all of our calculations in the mapper. However with much longer documents (eg. books) we might want a higher level of parallelization -- for example we might want to process parts of a document on different nodes. In this hypothetical scenario how would our algorithm design change? What could the mappers still do? What key-value structure would they emit? What would the reducers have to do as a last step?

### Q7 Student Answers:
> __a)__ In NBModel.txt, the keys are the word and the values are the fields after that, specifically  
> 1. number of occurrence of the word in non-Chinese (c=0)  
> 2. number of occurrence of the word in Chinese (c=1)  
> 3. conditional probability of the word given the doc is non-Chinese (c=0)  
> 4. conditional probability of the word given the doc is Chinese(c=1)  
>  
> Since the last two fields (3 and 4) are what we want in our calculation, the first fields (1 and 2) were dropped in classify_mapper.py.  
>  
> The record __ClassPriors__ is slightly different than the rest because the probabilities given are not conditional probabilities, but just a respective **Prior** probability, $\hat{P}(c=0)$ and $\hat{P}(c=1)$.  

> __b)__ Taking log on maximum likelihood (multiplication of probabilities) turns an underlying multiplication into summation of log probabilities, eg., `log(xy) = log(x) + log(y)`. By doing so also removes the problem of floating point underflow by successive multiplications. However, since we're taking the log probability, for an unsmoothened model, taking logarithm, however, can cause an error if the word occurrence in a certain class is **0** because log(0) is undefined.  

> __c)__ Documents 6 and 8 contain the word `Trade` that was never in the training documents. As a result, the conditional probability of the word `Trade` is **0** which will be used accordingly in inference time.  

> __d)__ Functional programming principles is based on evaluation of mathematical operation, the result of which should not affect or should not be affected by other operations. For example, the word __'Chinese'__ count in `['Chinese', 'Chinese', 'Chinese', 'Tokyo', 'Japan']` is 3 no matter what other operations take place in other places. When we use a model which comes with corresponding weights or conditional probabilities in this scenario, this is a slight departure from one of the functional programming principles because the model can change the output of our operation. The model itself can be changed, depending on the number of documents we'd train. Total number of word occurrence changes as we change our documents, and the conditional probability changes as a result.  
>  
> This departure is justified when we can control the operation of the model, eg, if the information in memory is the same across all nodes (hadoop datanodes). It would be unwise if the operation on one node will produce a result which would then change other operational results from other nodes. 

> __e)__ Complete the coding portion of this question before answering 'f'.

> __f)__ For classifying larger documents (eg. books), we might want a higher level of parallelization with multiple mappers across multiple nodes. In this situation, we cannot calculate the predicted class from each mapper. The reducer will receive the emitted output from each mapper in the record format (doc(id), partial sum of logpHam, partial sum of logpSpam). The aggregration of log probability for each class (Ham or Spam) will be done in the reducer with respective to document ids. We can then predict the document or text or email by taking the exponential of the aggregated log probability for each document and predict their class.  


Run these cells to create the example corpus and model.

In [21]:
%%writefile NaiveBayes/chineseTrain.txt
D1	1		Chinese Beijing Chinese
D2	1		Chinese Chinese Shanghai
D3	1		Chinese Macao
D4	0		Tokyo Japan Chinese

Overwriting NaiveBayes/chineseTrain.txt


In [22]:
%%writefile NaiveBayes/chineseTest.txt
D5	1		Chinese Chinese Chinese Tokyo Japan
D6	1		Beijing Shanghai Trade
D7	0		Japan Macao Tokyo
D8	0		Tokyo Japan Trade

Overwriting NaiveBayes/chineseTest.txt


In [23]:
%%writefile NBmodel.txt
beijing	0.0,1.0,0.111111111111,0.142857142857
chinese	1.0,5.0,0.222222222222,0.428571428571
tokyo	1.0,0.0,0.222222222222,0.0714285714286
shanghai	0.0,1.0,0.111111111111,0.142857142857
ClassPriors	1.0,3.0,0.25,0.75
japan	1.0,0.0,0.222222222222,0.0714285714286
macao	0.0,1.0,0.111111111111,0.142857142857

Overwriting NBmodel.txt


In [24]:
# load the data files into HDFS
!hdfs dfs -copyFromLocal NaiveBayes/chineseTrain.txt {HDFS_DIR}
!hdfs dfs -copyFromLocal NaiveBayes/chineseTest.txt {HDFS_DIR}

copyFromLocal: `/user/root/HW2/chineseTrain.txt': File exists
copyFromLocal: `/user/root/HW2/chineseTest.txt': File exists


Your work for `part e` starts here:

In [25]:
# part e - do your work in NaiveBayes/classify_mapper.py first, then run this cell.
!chmod a+x NaiveBayes/classify_mapper.py

In [26]:
# part e - unit test NaiveBayes/classify_mapper.py (RUN THIS CELL AS IS)
!cat NaiveBayes/chineseTest.txt | NaiveBayes/classify_mapper.py | column -t

d5  1  -8.90668134500626   -8.10769031284611   1
d6  1  -5.780743515794329  -4.179502370564408  1
d7  0  -6.591673732011658  -7.511706880737812  0
d8  0  -4.394449154674438  -5.565796731681498  0


In [27]:
# part e - clear the output directory in HDFS (RUN THIS CELL AS IS)
!hdfs dfs -rm -r {HDFS_DIR}/chinese-output

Deleted /user/root/HW2/chinese-output


In [28]:
# part e - write your Hadooop streaming job here
!hadoop jar {JAR_FILE} \
  -files NBmodel.txt,NaiveBayes/classify_mapper.py \
  -mapper classify_mapper.py \
  -reducer /bin/cat \
  -input {HDFS_DIR}/chineseTest.txt \
  -output {HDFS_DIR}/chinese-output \
  -numReduceTasks 1 \
  -cmdenv PATH={PATH}

packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.15.1.jar] /tmp/streamjob8870566797316105413.jar tmpDir=null
19/01/16 23:41:29 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/01/16 23:41:29 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/01/16 23:41:30 INFO mapred.FileInputFormat: Total input paths to process : 1
19/01/16 23:41:30 INFO mapreduce.JobSubmitter: number of splits:2
19/01/16 23:41:30 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1547262255608_0036
19/01/16 23:41:31 INFO impl.YarnClientImpl: Submitted application application_1547262255608_0036
19/01/16 23:41:31 INFO mapreduce.Job: The url to track the job: http://docker.w261:8088/proxy/application_1547262255608_0036/
19/01/16 23:41:31 INFO mapreduce.Job: Running job: job_1547262255608_0036
19/01/16 23:41:39 INFO mapreduce.Job: Job job_1547262255608_0036 running in uber mode : false
19/01/16 23:41:39 INFO mapreduce.Job:  map 0% reduce 0%
19/01

In [29]:
# part e - retrieve test set results from HDFS (RUN THIS CELL AS IS)
!hdfs dfs -cat {HDFS_DIR}/chinese-output/part-000* > NaiveBayes/chineseResults.txt

In [30]:
# part e - take a look (RUN THIS CELL AS IS)
!cat NaiveBayes/chineseResults.txt | column -t

d5  1  -8.90668134500626   -8.10769031284611   1
d6  1  -5.780743515794329  -4.179502370564408  1
d7  0  -6.591673732011658  -7.511706880737812  0
d8  0  -4.394449154674438  -5.565796731681498  0


<table>
<th> Expected output for the test set:</th>
<tr align=Left><td><pre>
d5	1	-8.90668134	-8.10769031	1
d6	1	-5.78074351	-4.17950237	1
d7	0	-6.59167373	-7.51170688	0
d8	0	-4.39444915	-5.56579673	0
</pre></td><tr>
</table>

# Question 8: Naive Bayes Training.
In Question 7 we used a model that we had trained by hand. Next we'll develop the code to do that same training in parallel, making it suitable for use with larger corpora (like the Enron emails). The end result of the MapReduce job you write in this question should be a model text file that looks just like the example (`NBmodel.txt`) that we created by hand above.

To refresh your memory about the training process take a look at  `6a` and `6b` where you described the pieces of information you'll need to collect in order to encode a Multinomial Naive Bayes model. We now want to retrieve those pieces of information while streaming over a corpus. The bulk of the task will be very similar to the word counting excercises you've already done but you may want to consider a slightly different key-value record structure to efficiently tally counts for each class. 

The most challenging (interesting?) design question will be how to retrieve the totals (# of documents and # of words in documents for each class). Of course, counting these numbers is easy. The hard part is the timing: you'll need to make sure you have the counts totalled up _before_ you start estimating the class conditional probabilities for each word. It would be best (i.e. most scalable) if we could find a way to do this tallying without storing the whole vocabulary in memory... Use an appropriate MapReduce design pattern to implement this efficiently! 


### Q8 Tasks:
* __a) make a plan:__  Fill in the docstrings for __`NaiveBayes/train_mapper.py`__ and __`NaiveBayes/train_reducer.py`__ to appropriately reflect the format that each script will input/output. [`HINT:` _the input files_ (`enronemail_1h.txt` & `chineseTrain.txt`) _have a prespecified format and your output file should match_ `NBmodel.txt` _so you really only have to decide on an internal format for Hadoop_].


* __b) implement it:__ Complete the code in __`NaiveBayes/train_mapper.py`__ and __`NaiveBayes/train_reducer.py`__ so that together they train a Multinomial Naive Bayes model __with no smoothing__. Make sure your end result is formatted correctly (see note above). Test your scripts independently and together (using `chineseTrain.txt` or test input of your own devising). When you are satisfied with your Python code design and run a Hadoop streaming command to run your job in parallel on the __chineseTrain.txt__. Confirm that your trained model matches your hand calculations from Question 5.


* __c) short response:__ We saw in Question 6 that adding Laplace +1 smoothing makes our classifications less sensitve to rare words. However implementing this technique requires access to one additional piece of information that we had not previously used in our Naive Bayes training. What is that extra piece of information? [`HINT:` see equation 13.7 in Manning, Raghavan and Schutze].


* __d) short response:__ There are three approaches that we could take to handle the extra piece of information you identified in `c`: 1) we could hard code it into our reducer (_where would we get it in the first place?_). Or 2) we could compute it inside the reducer which would require storing some information in memory (_what information?_). Or 3) we could compute it in the reducer without storing any bulky information in memory but then we'd need some postprocessing or a second MapReduce job to complete the calculation (_why?_). Breifly explain what is non-ideal about each of these options. BONUS: which of these options is incompatible with using multiple reducers?


* __e) code + short response:__ Choose one of the 3 options above. State your choice & reasoning in the space below then use that strategy to complete the code in __`NaiveBayes/train_reducer_smooth.py`__. Test this alternate reducer then write and run a Hadoop streaming job to train an MNB model with smoothing on the Chinese example. Your results should match the model that we provided for you above (and the calculations in the textbook example). [`HINT:` _don't start from scratch with this one -- you can just copy over your reducer code from part `b` and make the needed modifications_].

### Q8 Student Answers:
> __ c)__ We need a total number of unique words in the entire documents. 

> __ d)__  
>  
__1. Hardcoded approach__  
We could use the **set** method to count the total number of unique words in the entire documents in the reducer. We could hard code the function in the reducer to load the entire document and count the unique words. This function will be separate from a regular reducer function. However this would require a large chunk of memory if the document is large. In addition, since we're loading from the raw document, it would also require to preprocess the documents so that the total number of unique words we're counting will reflect all the words coming from the mapper. The other source we could also get the vocabulary count is from the mapper and emit in the record. In that case, mapper would also require to preprocess the entire document rather than reading line by line. This approach allows to use multiple reducers, however it falls short of providing the better memory management. 
>  
> __2. Dictionary approach__  
The other approach is we can record the number of words in a dictionary as they are received in the reducer. However, we would need to wait until all the records from the mapper are done emitting because another unique word could appear in the last emitted record. We would store information on words and their respective count on each of the labels. Eg, 'beijing' word will be recorded with its total number of occurrence in each labels; c=0 and c=1. This approach is not scalable because if there are multiple reducers, they need to communicate with each other in order to calculate the true total number of unique words. However, for a smaller dataset, this approach reassures that there won't be any redundant steps where additional mistakes can be introduced (eg., the hard-coded approach with preprocessing step).  
>  
> __3. Counter approach__  
> We can also employ counter approach where we just count the unique words in each reducer. This removes the necessity of the large memory usage because we are storing only the counter. This approach is also scalable because we can deploy as many reducers as we want and each reducer will store their respective counters for unique words. However, because of this, we'd need an extra MapReduce step where counter from each reducer is aggregated and conditional probability of each word with laplace smoothing can be calculated in the final step.  
> 
> __ e)__ I chose the dictionary approach (2nd) because of the small dataset we're processing. This also removes the extra steps of pre-processing and post-processing. I created the vocabulary dictionary with list as a value for each word so that I can store information for total number of word occurrence in each label; c=0 and c=1. However this approach is not scalable due to the nature of storing unique corpus in dictionary format in memory. If the entire corpus is huge, I would go with the 3rd approach (counter) and employ multiple reducers. 


In [31]:
# part a - do your work in train_mapper.py and train_reducer.py then RUN THIS CELL AS IS
!chmod a+x NaiveBayes/train_mapper.py
!chmod a+x NaiveBayes/train_reducer.py
!echo "=========== MAPPER DOCSTRING ============"
!head -n 8 NaiveBayes/train_mapper.py | tail -n 6
!echo "=========== REDUCER DOCSTRING ============"
!head -n 8 NaiveBayes/train_reducer.py | tail -n 6

Mapper reads in text documents and emits word counts by class.

INPUT:
    ID \t true_class \t subject \t body \n
OUTPUT:
    word \t true_class \t count
Reducer aggregates word counts by class and emits frequencies.

INPUT:
    word \t true_class \t count
OUTPUT:
    word \t count_Ham \t count_Spam \t cp_Ham \t cp_Spam


__`part b starts here`:__ MNB _without_ Smoothing (training on Chinese Example Corpus).

In [32]:
# part b - write a unit test for your mapper here
!cat NaiveBayes/chineseTrain.txt | NaiveBayes/train_mapper.py | sort -k1,1 | column -t

*         0  1
*         1  1
*         1  1
*         1  1
.         0  1
.         0  1
.         0  1
.         1  1
.         1  1
.         1  1
.         1  1
.         1  1
.         1  1
.         1  1
.         1  1
beijing   1  1
chinese   0  1
chinese   1  1
chinese   1  1
chinese   1  1
chinese   1  1
chinese   1  1
japan     0  1
macao     1  1
shanghai  1  1
tokyo     0  1


In [33]:
# part b - write a unit test for your reducer here
# Creating a custom input for reducer 
!cat NaiveBayes/chineseTrain.txt | NaiveBayes/train_mapper.py | sort -k1,1 | column -t > NaiveBayes/train_reducer_test_input.txt

#test reducer
!cat NaiveBayes/train_reducer_test_input.txt | NaiveBayes/train_reducer.py | column -t

ClassPriors  1,3,0.25,0.75
beijing      0,1,0.0,0.125
chinese      1,5,0.3333333333333333,0.625
japan        1,0,0.3333333333333333,0.0
macao        0,1,0.0,0.125
shanghai     0,1,0.0,0.125
tokyo        1,0,0.3333333333333333,0.0


In [34]:
# part b - write a systems test for your mapper + reducer together here
!cat NaiveBayes/chineseTrain.txt | NaiveBayes/train_mapper.py | sort -k1,1 -k2,2n | NaiveBayes/train_reducer.py | column -t

ClassPriors  1,3,0.25,0.75
beijing      0,1,0.0,0.125
chinese      1,5,0.3333333333333333,0.625
japan        1,0,0.3333333333333333,0.0
macao        0,1,0.0,0.125
shanghai     0,1,0.0,0.125
tokyo        1,0,0.3333333333333333,0.0


In [35]:
# part b - clear (and name) an output directory in HDFS for your unsmoothed chinese NB model
!hdfs dfs -rm -r {HDFS_DIR}/chinese-unsmooth-output

Deleted /user/root/HW2/chinese-unsmooth-output


In [36]:
# part b - write your hadoop streaming job
!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=2 \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k1,1 -k2,2n" \
  -files NaiveBayes/train_mapper.py,NaiveBayes/train_reducer.py \
  -mapper train_mapper.py \
  -reducer train_reducer.py \
  -input {HDFS_DIR}/chineseTrain.txt \
  -output {HDFS_DIR}/chinese-unsmooth-output \
  -numReduceTasks 1 \
  -cmdenv PATH={PATH}

packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.15.1.jar] /tmp/streamjob7217577082093140659.jar tmpDir=null
19/01/16 23:42:38 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/01/16 23:42:38 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/01/16 23:42:39 INFO mapred.FileInputFormat: Total input paths to process : 1
19/01/16 23:42:39 INFO mapreduce.JobSubmitter: number of splits:2
19/01/16 23:42:39 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1547262255608_0037
19/01/16 23:42:39 INFO impl.YarnClientImpl: Submitted application application_1547262255608_0037
19/01/16 23:42:39 INFO mapreduce.Job: The url to track the job: http://docker.w261:8088/proxy/application_1547262255608_0037/
19/01/16 23:42:39 INFO mapreduce.Job: Running job: job_1547262255608_0037
19/01/16 23:42:49 INFO mapreduce.Job: Job job_1547262255608_0037 running in uber mode : false
19/01/16 23:42:49 INFO mapreduce.Job:  map 0% reduce 0%
19/01

In [37]:
# part b - extract your results (i.e. model) to a local file
!hdfs dfs -cat {HDFS_DIR}/chinese-unsmooth-output/part-000* > NaiveBayes/chinese_unsmooth_results.txt

In [38]:
# part b - print your model so that we can confirm that it matches expected results
!cat NaiveBayes/chinese_unsmooth_results.txt | column -t

ClassPriors  1,3,0.25,0.75
beijing      0,1,0.0,0.125
chinese      1,5,0.3333333333333333,0.625
japan        1,0,0.3333333333333333,0.0
macao        0,1,0.0,0.125
shanghai     0,1,0.0,0.125
tokyo        1,0,0.3333333333333333,0.0


__`part e starts here`:__ MNB _with_ Smoothing (training on Chinese Example Corpus).

In [39]:
# part e - write a unit test for your NEW reducer here
!chmod a+x NaiveBayes/train_reducer_smooth.py
!cat NaiveBayes/train_reducer_test_input.txt | NaiveBayes/train_reducer_smooth.py | column -t

ClassPriors  1,3,0.25,0.75
beijing      0,1,0.1111111111111111,0.14285714285714285
chinese      1,5,0.2222222222222222,0.42857142857142855
japan        1,0,0.2222222222222222,0.07142857142857142
macao        0,1,0.1111111111111111,0.14285714285714285
shanghai     0,1,0.1111111111111111,0.14285714285714285
tokyo        1,0,0.2222222222222222,0.07142857142857142


In [40]:
# part e - write a systems test for your mapper + reducer together here
!cat NaiveBayes/chineseTrain.txt | NaiveBayes/train_mapper.py | sort -k1,1 | NaiveBayes/train_reducer_smooth.py | column -t

ClassPriors  1,3,0.25,0.75
beijing      0,1,0.1111111111111111,0.14285714285714285
chinese      1,5,0.2222222222222222,0.42857142857142855
japan        1,0,0.2222222222222222,0.07142857142857142
macao        0,1,0.1111111111111111,0.14285714285714285
shanghai     0,1,0.1111111111111111,0.14285714285714285
tokyo        1,0,0.2222222222222222,0.07142857142857142


In [41]:
# part e - clear (and name) an output directory in HDFS for your SMOOTHED chinese NB model
!hdfs dfs -rm -r {HDFS_DIR}/chinese-smooth-output

Deleted /user/root/HW2/chinese-smooth-output


In [42]:
# part e - write your hadoop streaming job
!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=2 \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k1,1 -k2,2n" \
  -files NaiveBayes/train_mapper.py,NaiveBayes/train_reducer_smooth.py \
  -mapper train_mapper.py \
  -reducer train_reducer_smooth.py \
  -input {HDFS_DIR}/chineseTrain.txt \
  -output {HDFS_DIR}/chinese-smooth-output \
  -numReduceTasks 1 \
  -cmdenv PATH={PATH}

packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.15.1.jar] /tmp/streamjob7476726953767136956.jar tmpDir=null
19/01/16 23:44:23 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/01/16 23:44:24 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/01/16 23:44:25 INFO mapred.FileInputFormat: Total input paths to process : 1
19/01/16 23:44:25 INFO mapreduce.JobSubmitter: number of splits:2
19/01/16 23:44:25 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1547262255608_0038
19/01/16 23:44:25 INFO impl.YarnClientImpl: Submitted application application_1547262255608_0038
19/01/16 23:44:25 INFO mapreduce.Job: The url to track the job: http://docker.w261:8088/proxy/application_1547262255608_0038/
19/01/16 23:44:25 INFO mapreduce.Job: Running job: job_1547262255608_0038
19/01/16 23:44:34 INFO mapreduce.Job: Job job_1547262255608_0038 running in uber mode : false
19/01/16 23:44:34 INFO mapreduce.Job:  map 0% reduce 0%
19/01

In [43]:
# part e - extract your results (i.e. model) to a local file
!hdfs dfs -cat {HDFS_DIR}/chinese-smooth-output/part-000* > NaiveBayes/chinese_smooth_results.txt

In [44]:
!cat NaiveBayes/chinese_smooth_results.txt | column -t

ClassPriors  1,3,0.25,0.75
beijing      0,1,0.1111111111111111,0.14285714285714285
chinese      1,5,0.2222222222222222,0.42857142857142855
japan        1,0,0.2222222222222222,0.07142857142857142
macao        0,1,0.1111111111111111,0.14285714285714285
shanghai     0,1,0.1111111111111111,0.14285714285714285
tokyo        1,0,0.2222222222222222,0.07142857142857142


# Question 9: Enron Ham/Spam NB Classifier & Results.

Fantastic work. We're finally ready to perform Spam Classification on the Enron Corpus. In this question you'll run the analysis you've developed, report its performance, and draw some conclusions.

### Q9 Tasks:
* __a) train/test split:__ Run the provided code to split our Enron file into a training set and testing set then load them into HDFS. 

[`NOTE:` _If you hard coded the vocab size in question 8d make sure you re calculate the vocab size for just the training set!_]

* __b) train 2 models:__ Write Hadoop Streaming jobs to train MNB Models on the training set with and without smoothing. Save your models to local files at __`NaiveBayes/Unsmoothed/NBmodel.txt`__ and __`NaiveBayes/Smoothed/NBmodel.txt`__. [`NOTE:` _This naming is important because we wrote our classification task so that it expects a file of that name... if this inelegance frustrates you there is an alternative that would involve a few adjustments to your code [read more about it here](http://www.tnoda.com/blog/2013-11-23)._] Finally run the checks that we provide to confirm that your results are correct.


* __c) code:__ Recall that we designed our classification job with just a mapper. An efficient way to report the performance of our models would be to simply add a reducer phase to this job and compute precision and recall right there. Complete the code in __`NaiveBayes/evaluation_reducer.py`__ and then write Hadoop jobs to evaluate your two models on the test set. Report their performance side by side. [`NOTE:` if you need a refresher on precision, recall and F1-score [Wikipedia](https://en.wikipedia.org/wiki/F1_score) is a good resource.]


* __d) short response:__ Compare the performance of your two models. What do you notice about the unsmoothed model's predictions? Can you guess why this is happening? Which evaluation measure do you think is most relevant in our use case? [`NOTE:` _Feel free to answer using your common sense but if you want more information on evaluating the classification task checkout_ [this blogpost](https://tryolabs.com/blog/2013/03/25/why-accuracy-alone-bad-measure-classification-tasks-and-what-we-can-do-about-it/
) or [this paper](http://www.flinders.edu.au/science_engineering/fms/School-CSEM/publications/tech_reps-research_artfcts/TRRA_2007.pdf
)]


* __e) code + short response:__ Let's look at the top ten words with the highest conditional probability in `Spam` and in `Ham`. We'll do this by writing a Hadoop job that sorts the model file (`NaiveBayes/Smoothed/NBmodel.py`). Normally we'd have to run two jobs -- one that sorts on $P(word|ham)$ and another that sorts on $P(word|spam)$. However if we slighly modify the data format in the model file then we can get the top words in each class with just one job. We've written a mapper that will do just this for you. Read through __`NaiveBayes/model_sort_mapper.py`__ and then briefly explain how this mapper will allow us to partition and sort our model file. Write a Hadoop job that uses our mapper and `/bin/cat` for a reducer to partition and sort. Print out the top 10 words in each class (where 'top' == highest conditional probability).[`HINT:` _this should remind you a lot of what we did in Question 6._]


* __f) short response:__ What do you notice about the 'top words' we printed in `e`? How would increasing the smoothing parameter 'k' affect the probabilities for the top words that you identified for 'e'. How would they affect the probabilities of words that occur much more in one class than another? In summary, how does the smoothing parameter 'k' affect the bias and the variance of our model. [`NOTE:` _you do not need to code anything for this task, but if you are struggling with it you could try changing 'k' and see what happens to the test set. We don't recommend doing this exploration with the Enron data because it will be harder to see the impact with such a big vocabulary_]

### Q9 Student Answers:
> __d)__ The unsmoothened model has very poor classification and there was only 1 true positive out of 20 documents classified. What is also interesting is the unsmoothened model has far more better prediction on true negatives: **9** out of 20 than its counterpart smoothened model, __6__ out of 20 documents. This result underlines in the mechanism of Naive Bayes conditional probability calculation. Since unseen words are counted as zero probablity in a given classification (c=0 or c=1), when taking logs, it becomes $-\infty$ and the document will be labeled as *negative* even if it has many positive words which were never seen in the corresponding training documents. 

> __e)__ The **model_sort_mapper.py** mapper checks every word and their associated conditional probabilities for both classification (c=0 and c=1). Comparing between the two, taking the higher probability, the mapper classifies the word accordingly into either Ham or Spam category. The emitted records are sorted in /bin/cat. 

> __f)__ The top 10 words between each classification do not have any insightful information that we can use to classify the email as either Ham or Spam. Majority of words are English common words such as particles: "to", "a", "the" or pronouns: "you", "your". Although each word stands on its own as highest conditional probability for Ham or Spam classification, it would be very hard to classify emails based on those few words. I suspect the composite of those words in the entire email is what makes it classification possible with their respective conditional probabilities. Increasing the smoothing parameter **k** would affect the probabilities of those words. Since each word conditional probability is $\hat{P}(t_k|c)$ for Ham and $\hat{P}(t_k|\bar{c})$ for Spam, by increasing the parameter k, we're artificially increasing the occurrence of the word in a given document and making the model more generalized, i.e., words are artificially distributed across all documents even if they don't represent or occur in a document. So choosing the right parameter k is crucial to balance between bias and variance of the model. 

__Test/Train split__

In [45]:
# part a - test/train split (RUN THIS CELL AS IS)
!head -n 80 data/enronemail_1h.txt > data/enron_train.txt
!tail -n 20 data/enronemail_1h.txt > data/enron_test.txt
!hdfs dfs -copyFromLocal data/enron_train.txt {HDFS_DIR}
!hdfs dfs -copyFromLocal data/enron_test.txt {HDFS_DIR}

copyFromLocal: `/user/root/HW2/enron_train.txt': File exists
copyFromLocal: `/user/root/HW2/enron_test.txt': File exists


__Training__ (Enron MNB Model _without smoothing_ )

In [46]:
# part b -  Unsmoothed model (FILL IN THE MISSING CODE BELOW)

# clear the output directory
!hdfs dfs -rm -r {HDFS_DIR}/enron-model

# hadoop command
!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=2 \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k1,1 -k2,2n" \
  -files NaiveBayes/train_mapper.py,NaiveBayes/train_reducer.py \
  -mapper train_mapper.py \
  -reducer train_reducer.py \
  -input {HDFS_DIR}/enron_train.txt \
  -output {HDFS_DIR}/enron-model \
  -numReduceTasks 1 \
  -cmdenv PATH={PATH}

# save the model locally
!mkdir NaiveBayes/Unsmoothed
!hdfs dfs -cat {HDFS_DIR}/enron-model/part-000* > NaiveBayes/Unsmoothed/NBmodel.txt

Deleted /user/root/HW2/enron-model
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.15.1.jar] /tmp/streamjob4052735845300804375.jar tmpDir=null
19/01/16 23:45:24 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/01/16 23:45:24 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/01/16 23:45:25 INFO mapred.FileInputFormat: Total input paths to process : 1
19/01/16 23:45:25 INFO mapreduce.JobSubmitter: number of splits:2
19/01/16 23:45:25 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1547262255608_0039
19/01/16 23:45:26 INFO impl.YarnClientImpl: Submitted application application_1547262255608_0039
19/01/16 23:45:26 INFO mapreduce.Job: The url to track the job: http://docker.w261:8088/proxy/application_1547262255608_0039/
19/01/16 23:45:26 INFO mapreduce.Job: Running job: job_1547262255608_0039
19/01/16 23:45:33 INFO mapreduce.Job: Job job_1547262255608_0039 running in uber mode : false
19/01/16 23:45:33 INFO map

In [47]:
# part b - check your UNSMOOTHED model results (RUN THIS CELL AS IS)
!grep assistance NaiveBayes/Unsmoothed/NBmodel.txt
# EXPECTED OUTPUT: assistance	2,4,0.000172547666293,0.000296823983378

assistance	2,4,0.0001725476662928134,0.00029682398337785694


In [48]:
# part b - check your UNSMOOTHED model results (RUN THIS CELL AS IS)
!grep money NaiveBayes/Unsmoothed/NBmodel.txt
# EXPECTED OUTPUT: money	1,22,8.62738331464e-05,0.00163253190858

money	1,22,8.62738331464067e-05,0.001632531908578213


__Training__ (Enron MNB Model _with Laplace +1 smoothing_ )

In [49]:
# part b -  Smoothed model (FILL IN THE MISSING CODE BELOW)

# clear the output directory
!hdfs dfs -rm -r {HDFS_DIR}/smooth-model

# hadoop command
!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=2 \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k1,1 -k2,2n" \
  -files NaiveBayes/train_mapper.py,NaiveBayes/train_reducer_smooth.py \
  -mapper train_mapper.py \
  -reducer train_reducer_smooth.py \
  -input {HDFS_DIR}/enron_train.txt \
  -output {HDFS_DIR}/smooth-model \
  -numReduceTasks 1 \
  -cmdenv PATH={PATH}

# save the model locally
!mkdir NaiveBayes/Smoothed
!hdfs dfs -cat {HDFS_DIR}/smooth-model/part-000* > NaiveBayes/Smoothed/NBmodel.txt

Deleted /user/root/HW2/smooth-model
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.15.1.jar] /tmp/streamjob6338038141466930427.jar tmpDir=null
19/01/17 00:00:34 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/01/17 00:00:34 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/01/17 00:00:35 INFO mapred.FileInputFormat: Total input paths to process : 1
19/01/17 00:00:35 INFO mapreduce.JobSubmitter: number of splits:2
19/01/17 00:00:35 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1547262255608_0040
19/01/17 00:00:36 INFO impl.YarnClientImpl: Submitted application application_1547262255608_0040
19/01/17 00:00:36 INFO mapreduce.Job: The url to track the job: http://docker.w261:8088/proxy/application_1547262255608_0040/
19/01/17 00:00:36 INFO mapreduce.Job: Running job: job_1547262255608_0040
19/01/17 00:00:44 INFO mapreduce.Job: Job job_1547262255608_0040 running in uber mode : false
19/01/17 00:00:44 INFO ma

In [50]:
# part b - check your SMOOTHED model results (RUN THIS CELL AS IS)
!grep assistance NaiveBayes/Smoothed/NBmodel.txt
# EXPECTED OUTPUT: assistance	2,4,0.000185804533631,0.000277300205202

assistance	2,4,0.0001858045336306206,0.00027730020520215184


In [51]:
# part b - check your SMOOTHED model results (RUN THIS CELL AS IS)
!grep money NaiveBayes/Smoothed/NBmodel.txt
# EXPECTED OUTPUT: money	1,22,0.000123869689087,0.00127558094393

money	1,22,0.0001238696890870804,0.0012755809439298986


__Evaluation__

In [52]:
# part c - write your code in NaiveBayes/evaluation_reducer.py then RUN THIS
!chmod a+x NaiveBayes/evaluation_reducer.py

In [53]:
# part c - unit test your evaluation job on the chinese model (RUN THIS CELL AS IS)
!cat NaiveBayes/chineseTest.txt | NaiveBayes/classify_mapper.py 
!cat NaiveBayes/chineseTest.txt | NaiveBayes/classify_mapper.py | NaiveBayes/evaluation_reducer.py 

d5	1	-8.90668134500626	-8.10769031284611	1
d6	1	-5.780743515794329	-4.179502370564408	1
d7	0	-6.591673732011658	-7.511706880737812	0
d8	0	-4.394449154674438	-5.565796731681498	0
d5	1	-8.90668134500626	-8.10769031284611	 True
d6	1	-5.780743515794329	-4.179502370564408	 True
d7	0	-6.591673732011658	-7.511706880737812	 True
d8	0	-4.394449154674438	-5.565796731681498	 True
# Documents:      4.0
True Positives:   2.0
True Negatives:   2.0
False Positives:  0.0
False Negatives:  0.0
Accuracy          1.0
Precision         1.0
Recall            1.0
F-score           1.0


In [54]:
# part c - Evaluate the UNSMOOTHED Model Here (FILL IN THE MISSING CODE)

# clear output directory
!hdfs dfs -rm -r {HDFS_DIR}/enron-unsmooth-output

# hadoop job
!hadoop jar {JAR_FILE} \
  -files NaiveBayes/classify_mapper.py,NaiveBayes/evaluation_reducer.py,NaiveBayes/Unsmoothed/NBmodel.txt \
  -mapper classify_mapper.py \
  -reducer evaluation_reducer.py \
  -input {HDFS_DIR}/enron_test.txt \
  -output {HDFS_DIR}/enron-unsmooth-output \
  -numReduceTasks 1 \
  -cmdenv PATH={PATH}

# retrieve results locally
!hdfs dfs -cat {HDFS_DIR}/enron-unsmooth-output/part-000* > NaiveBayes/Unsmoothed/results.txt

Deleted /user/root/HW2/enron-unsmooth-output
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.15.1.jar] /tmp/streamjob3951322220624242896.jar tmpDir=null
19/01/17 00:01:37 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/01/17 00:01:37 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/01/17 00:01:38 INFO mapred.FileInputFormat: Total input paths to process : 1
19/01/17 00:01:38 INFO mapreduce.JobSubmitter: number of splits:2
19/01/17 00:01:39 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1547262255608_0041
19/01/17 00:01:39 INFO impl.YarnClientImpl: Submitted application application_1547262255608_0041
19/01/17 00:01:39 INFO mapreduce.Job: The url to track the job: http://docker.w261:8088/proxy/application_1547262255608_0041/
19/01/17 00:01:39 INFO mapreduce.Job: Running job: job_1547262255608_0041
19/01/17 00:01:48 INFO mapreduce.Job: Job job_1547262255608_0041 running in uber mode : false
19/01/17 00:01:4

In [55]:
# part c - Evaluate the SMOOTHED Model Here (FILL IN THE MISSING CODE)

# clear output directory
!hdfs dfs -rm -r {HDFS_DIR}/enron-smooth-output

# hadoop job
!hadoop jar {JAR_FILE} \
  -files NaiveBayes/classify_mapper.py,NaiveBayes/evaluation_reducer.py,NaiveBayes/Smoothed/NBmodel.txt \
  -mapper classify_mapper.py \
  -reducer evaluation_reducer.py \
  -input {HDFS_DIR}/enron_test.txt \
  -output {HDFS_DIR}/enron-smooth-output \
  -numReduceTasks 1 \
  -cmdenv PATH={PATH}

# retrieve results locally
!hdfs dfs -cat {HDFS_DIR}/enron-smooth-output/part-000* > NaiveBayes/Smoothed/results.txt

Deleted /user/root/HW2/enron-smooth-output
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.15.1.jar] /tmp/streamjob4904108726057978050.jar tmpDir=null
19/01/17 00:02:20 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/01/17 00:02:21 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/01/17 00:02:22 INFO mapred.FileInputFormat: Total input paths to process : 1
19/01/17 00:02:22 INFO mapreduce.JobSubmitter: number of splits:2
19/01/17 00:02:22 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1547262255608_0042
19/01/17 00:02:22 INFO impl.YarnClientImpl: Submitted application application_1547262255608_0042
19/01/17 00:02:23 INFO mapreduce.Job: The url to track the job: http://docker.w261:8088/proxy/application_1547262255608_0042/
19/01/17 00:02:23 INFO mapreduce.Job: Running job: job_1547262255608_0042
19/01/17 00:02:32 INFO mapreduce.Job: Job job_1547262255608_0042 running in uber mode : false
19/01/17 00:02:32 

In [56]:
# part c - display results 
# NOTE: feel free to modify the tail commands to match the format of your results file
print('=========== UNSMOOTHED MODEL ============')
!tail -n 9 NaiveBayes/Unsmoothed/results.txt
print('=========== SMOOTHED MODEL ============')
!tail -n 9 NaiveBayes/Smoothed/results.txt

# Documents:      20.0	
True Positives:   1.0	
True Negatives:   9.0	
False Positives:  0.0	
False Negatives:  10.0	
Accuracy          0.5	
Precision         1.0	
Recall            0.09090909090909091	
F-score           0.16666666666666669	
# Documents:      20.0	
True Positives:   11.0	
True Negatives:   6.0	
False Positives:  3.0	
False Negatives:  0.0	
Accuracy          0.85	
Precision         0.7857142857142857	
Recall            1.0	
F-score           0.88	


__`EXPECTED RESULTS:`__ 
<table>
<th>Unsmoothed Model</th>
<th>Smoothed Model</th>
<tr>
<td><pre>
# Documents:	20
True Positives:	1
True Negatives:	9
False Positives:	0
False Negatives:	10
Accuracy	0.5
Precision	1.0
Recall	0.0909
F-Score	0.1666
</pre></td>
<td><pre>
# Documents:	20
True Positives:	11
True Negatives:	6
False Positives:	3
False Negatives:	0
Accuracy	0.85
Precision	0.7857
Recall	1.0
F-Score	0.88
</pre></td>
</tr>
</table>

__`NOTE:`__ _Don't be too disappointed if these seem low to you. We've trained and tested on a very very small corpus... bigger datasets coming soon!_

__`part e starts here:`__

In [58]:
# part e - write your Hadoop job here (sort smoothed model on P(word|class))

# copying from Local to HDFS
!hdfs dfs -mkdir {HDFS_DIR}/Smoothed
!hdfs dfs -copyFromLocal NaiveBayes/Smoothed/NBmodel.txt {HDFS_DIR}/Smoothed/NBmodel.txt

# clear output directory
!hdfs dfs -rm -r {HDFS_DIR}/smooth-sort-output

# hadoop job
!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=6 \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k3,3 -k4,4nr" \
  -files NaiveBayes/model_sort_mapper.py \
  -mapper model_sort_mapper.py \
  -reducer /bin/cat \
  -input {HDFS_DIR}/Smoothed/NBmodel.txt \
  -output {HDFS_DIR}/smooth-sort-output \
  -cmdenv PATH={PATH}

# retrieve results locally
!hdfs dfs -cat {HDFS_DIR}/smooth-sort-output/part-000* > NaiveBayes/Smoothed/sorted_results.txt

rm: `/user/root/HW2/smooth-sort-output': No such file or directory
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.15.1.jar] /tmp/streamjob62723252219042272.jar tmpDir=null
19/01/17 00:57:06 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/01/17 00:57:06 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/01/17 00:57:08 INFO mapred.FileInputFormat: Total input paths to process : 1
19/01/17 00:57:08 INFO mapreduce.JobSubmitter: number of splits:2
19/01/17 00:57:08 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1547262255608_0044
19/01/17 00:57:09 INFO impl.YarnClientImpl: Submitted application application_1547262255608_0044
19/01/17 00:57:09 INFO mapreduce.Job: The url to track the job: http://docker.w261:8088/proxy/application_1547262255608_0044/
19/01/17 00:57:09 INFO mapreduce.Job: Running job: job_1547262255608_0044
19/01/17 00:57:19 INFO mapreduce.Job: Job job_1547262255608_0044 running in uber mode : fa

In [69]:
# part e - print top words in each class

print('='*10,f'Top 10 words in Ham Enron Emails','='*10)
!awk '$3 == "ham" {print $1 "\t" $4}' NaiveBayes/Smoothed/sorted_results.txt | head -n 10 

print('='*10,f'Top 10 words in Spam Enron Emails','='*10)
!awk '$3 == "spam" {print $1 "\t" $4}' NaiveBayes/Smoothed/sorted_results.txt | head -n 10 

ClassPriors	0.5875
ect	0.023473306082001735
and	0.01604112473677691
hou	0.0126347082868822
in	0.009971509971509971
for	0.00922829183698749
on	0.007617985878855444
enron	0.007246376811594203
i	0.007060572277963582
will	0.007060572277963582
awk: (FILENAME=NaiveBayes/Smoothed/sorted_results.txt FNR=693) fatal: print to "standard output" failed (Broken pipe)
the	0.029726581997670677
to	0.023348677278021184
a	0.015251511286118352
your	0.01508513116299706
you	0.014031390383228884
of	0.014031390383228884
it	0.0066552049248516446
com	0.006045144473406911
that	0.005601464145083467
or	0.004935943652598303
awk: (FILENAME=NaiveBayes/Smoothed/sorted_results.txt FNR=2524) fatal: print to "standard output" failed (Broken pipe)


# HW2 ends here, please refer to the `README.md` for submission instructions.