# HW 2 (part 2) - Naive Bayes in Hadoop MR
__`MIDS w261: Machine Learning at Scale | UC Berkeley School of Information | Spring 2018`__

Now that you're up to speed in writing Hadoop MR jobs we'll use this framework to implement your first parallelized machine learning algorithm: Naive Bayes. As you develop your implementation you'll test it on a small dataset that matches the 'Chinese Example' in the _Manning, Raghavan and Shutze_ reading for Week 2. For the main task in part 2 you'll be working with a small subset of the Enron Spam/Ham Corpus. By the end of this portion of the assignment you should be able to:
* __... describe__ the Naive Bayes algorithm including both training and inference.
* __... perform__ EDA on a corpus using Hadoop MR.
* __... implement__ parallelized Naive Bayes.
* __... explain__ how smoothing affects the bias and variance of  a Multinomial Naive Bayes model.

As always, your work will be graded both on the correctness of your output and on the clarity and design of your code. __Please refer to the `README` for homework submission instructions.__ 

# Notebook Setup
Before starting part 2, run the following cells to confirm your setup.

In [1]:
# imports
import numpy as np
import matplotlib.pyplot as plt

%reload_ext autoreload
%autoreload 2

In [2]:
# global vars (paths) - ADJUST AS NEEDED
JAR_FILE = "/usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.7.0.jar"
HDFS_DIR = "/user/root/HW2"
HOME_DIR = "/media/notebooks"  # FILL IN HERE eg. /media/notebooks/w261-main/Assignments

In [3]:
# save path for use in Hadoop jobs (-cmdenv PATH={PATH})
from os import environ
PATH  = environ['PATH']

In [4]:
# data path
ENRON = HOME_DIR + "/HW02/data/enronemail_1h.txt"

# About the Data
For the main task in this portion of the homework you will train a classifier to determine whether an email represents spam or not. You will train your Naive Bayes model on a 100 record subset of the Enron Spam/Ham corpus available in the HW02 data directory (__`HW02/data/enronemail_1h.txt`__).

__Source:__   
The original data included about 93,000 emails which were made public after the company's collapse. There have been a number raw and preprocessed versions of this corpus (including those available [here](http://www.aueb.gr/users/ion/data/enron-spam/index.html) and [here](http://www.aueb.gr/users/ion/publications.html)). The subset we will use is limited to emails from 6 Enron employees and a number of spam sources. It is part of [this data set](http://www.aueb.gr/users/ion/data/enron-spam/) which was created by researchers working on personlized Bayesian spam filters. Their original publication is [available here](http://www.aueb.gr/users/ion/docs/ceas2006_paper.pdf). __`IMPORTANT!`__ _For this homework please limit your analysis to the 100 email subset which we provide. No need to download or run your analysis on any of the original datasets, those links are merely provided as context._

__Preprocessing:__  
For their work, Metsis et al. (the authors) appeared to have pre-processed the data, not only collapsing all text to lower-case, but additionally separating "words" by spaces, where "words" unfortunately include punctuation. As a concrete example, the sentence:  
>  `Hey Jon, I hope you don't get lost out there this weekend!`  

... would have been reduced by Metsis et al. to the form:  
> `hey jon , i hope you don ' t get lost out there this weekend !` 

... so we have reverted the data back toward its original state, removing spaces so that our sample sentence would now look like:
> `hey jon, i hope you don't get lost out there this weekend!`  

Thus we have at least preserved contractions and other higher-order lexical forms. However, one must be aware that this reversion is not complete, and that some object (specifically web sites) will be ill-formatted, and that all text is still lower-cased.


__Format:__   
All messages are collated to a tab-delimited format:  

>    `ID \t SPAM \t SUBJECT \t CONTENT \n`  

where:  
>    `ID = string; unique message identifier`  
    `SPAM = binary; with 1 indicating a spam message`  
    `SUBJECT = string; title of the message`  
    `CONTENT = string; content of the message`   
    
Note that either of `SUBJECT` or `CONTENT` may be "NA", and that all tab (\t) and newline (\n) characters have been removed from both of the `SUBJECT` and `CONTENT` columns.  

In [5]:
# take a look at the first 100 characters of the first 5 records (RUN THIS CELL AS IS)
!head -n 5 {ENRON} | cut -c-100



0001.2000-01-17.beck	0	 leadership development pilot	" sally:  what timing, ask and you shall receiv
0001.2000-06-06.lokay	0	" key dates and impact of upcoming sap implementation over the next few week
0001.2001-02-07.kitchen	0	 key hr issues going forward	 a) year end reviews-report needs generating 


In [6]:
# load the data into HDFS (RUN THIS CELL AS IS)
!hdfs dfs -copyFromLocal {ENRON} {HDFS_DIR}/enron.txt

# Question 6:  Enron Ham/Spam EDA.
Before building our classifier, lets get aquainted with our data. In particular, we're interested in which words occur more in spam emails than in real emails. In this question you'll implement two Hadoop MapReduce jobs to count and sort word occurrences by document class. You'll also learn about two new Hadoop streaming parameters that will allow you to control how the records output of your mappers are partitioned for reducing on separate nodes. 

__`IMPORTANT NOTE:`__ For this question and all subsequent items, you should include both the subject and the body of the email in your analysis (i.e. concatetate them to get the 'text' of the document).

### Q6 Tasks:
* __a) code:__ Complete the missing components of the code in __`EnronEDA/mapper.py`__ and __`EnronEDA/reducer.py`__ to create a Hadoop MapReduce job that counts how many times each word in the corpus occurs in an email for each class. Pay close attention to the data format specified in the docstrings of these scripts _-- there are a number of ways to accomplish this task, we've chosen this format to help illustrate a technique in `part e`_. Run the provided unit tests to confirm that your code works as expected then run the provided Hadoop streaming command to apply your analysis to the Enron data.


* __b) code + short response:__ How many times does the word "__assistance__" occur in each class? (`HINT:` Use a `grep` command to read from the results file you generated in '`a`' and then report the answer in the space provided.)


* __c) short response:__ Would it have been possible to add some sorting parameters to the Hadoop streaming command that would cause our `part a` results to be sorted by count? Briefly explain why or why not.


* __d) code + short response:__ Write a second Hadoop MapReduce job to sort the output of `part a` first by class and then by count. Run your job and save the results to a local file using the provide code. Then describe in words how you would go about printing the top 10 words in each class. (`HINT 1:` _remember that you can simply pass the `part a` output directory to the input field of this job; `HINT 2:` since this task is just reodering the records from `part a` we don't need to write a mapper or reducer, just use `/bin/cat` for both_)


* __ e) code:__ A more efficient alternative to '`grep`-ing' for the top 10 words in each class would be to use the Hadoop framework to separate records from each class into its own partition so that we can just read the top lines in each. To do this, make sure to specify 2 reduce tasks and add the following parameters to your Hadoop streaming job from 'd':
>__`-D mapreduce.partition.keypartitioner.options="-k2,2"`__ : tells Hadoop to partition based on the second field (which indicates spam/ham in our data).  
>__`-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner`__ : tells Hadoop that we want to partition on a field that is part of a composite key. (_This parameter should go at the end, not with the `-D` options at the top._)

> __`NOTE:`__ these options should be used in conjunction with `-D stream.num.map.output.key.fields=#` which you will have already added as part of your sort.

### Q6 Student Answers:
> __b)__ Assistance:
assistance	0	2	
assistance	1	8	

> __c)__  No, the parameters which we can add to Hadoop streaming only allow us to control the order in which the mapper output is fed into the reducers. The output of the reducers themselves cannot be sorted explicitly. 

> __d)__ We can get the top 10 spam words just by doing head  -n 10 which would give us the top 10 lines. To get the top 10 ham words, we can use something like awk '\$2 == 0' | head -n  10. The awk command should give us all lines where the second field is 0. 

In [7]:
# part a - do your work in the provided scripts then RUN THIS CELL AS IS
!chmod a+x EnronEDA/mapper.py
!chmod a+x EnronEDA/reducer.py

In [16]:
# part a - unit test EnronEDA/mapper.py (RUN THIS CELL AS IS)
!echo -e "d1	1	title	body\nd2	0	title	body" | EnronEDA/mapper.py

title	1	1
body	1	1
title	0	1
body	0	1


In [18]:
# part a - unit test EnronEDA/reducer.py (RUN THIS CELL AS IS)
!echo -e "one	1	1\none	0	1\none	0	1\ntwo	0	1" | EnronEDA/reducer.py

one	0	2	
one	1	1	
two	0	1	
two	1	0	


In [33]:
# part a - clear output directory in HDFS (RUN THIS CELL AS IS)
!hdfs dfs -rm -r {HDFS_DIR}/eda-output

Deleted /user/root/HW2/eda-output


In [34]:
# part a - Hadoop streaming job (RUN THIS CELL AS IS)
!hadoop jar {JAR_FILE} \
  -files EnronEDA/reducer.py,EnronEDA/mapper.py \
  -mapper mapper.py \
  -reducer reducer.py \
  -input {HDFS_DIR}/enron.txt \
  -output {HDFS_DIR}/eda-output \
  -numReduceTasks 2

packageJobJar: [] [/usr/jars/hadoop-streaming-2.6.0-cdh5.7.0.jar] /tmp/streamjob1066534205971389208.jar tmpDir=null
18/01/20 20:04:28 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/01/20 20:04:28 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/01/20 20:04:29 INFO mapred.FileInputFormat: Total input paths to process : 1
18/01/20 20:04:29 INFO mapreduce.JobSubmitter: number of splits:2
18/01/20 20:04:29 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1516450633555_0021
18/01/20 20:04:29 INFO impl.YarnClientImpl: Submitted application application_1516450633555_0021
18/01/20 20:04:29 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1516450633555_0021/
18/01/20 20:04:29 INFO mapreduce.Job: Running job: job_1516450633555_0021
18/01/20 20:04:34 INFO mapreduce.Job: Job job_1516450633555_0021 running in uber mode : false
18/01/20 20:04:34 INFO mapreduce.Job:  map 0% reduce 0%
18/01/20 20:04

In [35]:
# part a - retrieve results from HDFS & copy them into a local file (RUN THIS CELL AS IS)
!hdfs dfs -cat {HDFS_DIR}/eda-output/part-0000* > EnronEDA/results.txt

In [36]:
# part b - write your grep command here
!grep 'assistance' EnronEDA/results.txt

assistance	0	2	
assistance	1	8	


In [46]:
# part d/e - clear the output directory in HDFS (RUN THIS CELL AS IS)
!hdfs dfs -rm -r {HDFS_DIR}/eda-sort-output

Deleted /user/root/HW2/eda-sort-output


In [47]:
# part d/e - write your Hadoop streaming job here
!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=3 \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k2,2nr -k3,3nr" \
  -D mapreduce.partition.keypartitioner.options="-k2,2" \
  -mapper /bin/cat \
  -reducer /bin/cat \
  -input {HDFS_DIR}/eda-output \
  -output {HDFS_DIR}/eda-sort-output \
  -numReduceTasks 2 \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

packageJobJar: [] [/usr/jars/hadoop-streaming-2.6.0-cdh5.7.0.jar] /tmp/streamjob3656532941110478678.jar tmpDir=null
18/01/20 20:16:52 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/01/20 20:16:52 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/01/20 20:16:52 INFO mapred.FileInputFormat: Total input paths to process : 2
18/01/20 20:16:52 INFO mapreduce.JobSubmitter: number of splits:2
18/01/20 20:16:52 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1516450633555_0025
18/01/20 20:16:52 INFO impl.YarnClientImpl: Submitted application application_1516450633555_0025
18/01/20 20:16:53 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1516450633555_0025/
18/01/20 20:16:53 INFO mapreduce.Job: Running job: job_1516450633555_0025
18/01/20 20:16:59 INFO mapreduce.Job: Job job_1516450633555_0025 running in uber mode : false
18/01/20 20:16:59 INFO mapreduce.Job:  map 0% reduce 0%
18/01/20 20:17

In [48]:
# part d - retrieve results from HDFS & copy them into a local file (RUN THIS CELL AS IS)
!hdfs dfs -cat {HDFS_DIR}/eda-sort-output/part-0000* > EnronEDA/results-sorted.txt

In [50]:
# part e - view the top 10 records from each partition (RUN THIS CELL AS IS) 
for idx in range(2):
    print "\n===== part-0000%s=====\n" % (idx)
    !hdfs dfs -cat {HDFS_DIR}/eda-sort-output/part-0000{idx} | head


===== part-00000=====

the	0	549	
to	0	398	
ect	0	382	
and	0	278	
of	0	230	
hou	0	206	
a	0	196	
in	0	182	
for	0	170	
on	0	135	
cat: Unable to write to output stream.

===== part-00001=====

the	1	698	
to	1	566	
and	1	392	
your	1	357	
a	1	347	
you	1	345	
of	1	336	
in	1	236	
for	1	204	
com	1	153	
cat: Unable to write to output stream.


__Expected output:__
<table>
<th>part-00000:</th>
<th>part-00001:</th>
<tr><td><pre>
the	0	549	
to	0	398	
ect	0	382	
and	0	278	
of	0	230	
hou	0	206	
a	0	196	
in	0	182	
for	0	170	
on	0	135
</pre></td>
<td><pre>
the	1	698	
to	1	566	
and	1	392	
your	1	357	
a	1	347	
you	1	345	
of	1	336	
in	1	236	
for	1	204	
com	1	153
</pre></td></tr>
</table>

# Question 7: Document Classification Task Overview.
The week 2 assigned reading from Chapter 13 of _Introduction to Information Retrieval_ by Manning, Raghavan and Schutze provides a thorough introduction to the document classification task and the math behind Naive Bayes. In this question we'll use the example from Table 13.1 (reproduced below) to 'train' an unsmoothed Multinomial Naive Bayes model and classify a test document by hand.

<table>
<th>DocID</th>
<th>Class</th>
<th>Subject</th>
<th>Body</th>
<tr><td>Doc1</td><td>1</td><td></td><td>Chinese Beijing Chinese</td></tr>
<tr><td>Doc2</td><td>1</td><td></td><td>Chinese Chinese Shanghai</td></tr>
<tr><td>Doc3</td><td>1</td><td></td><td>Chinese Macao</td></tr>
<tr><td>Doc4</td><td>0</td><td></td><td>Tokyo Japan Chinese</td></tr>
</table>

### Q7 Tasks:
* __a) short response:__ Equation 13.3 in Manning, Raghavan and Shutze shows how a Multinomial Naive Bayes model classifies a document. It predicts the class, $c$, for which the estimated conditional probability of the class given the document's contents,  $\hat{P}(c|d)$, is greatest. In this equation what two pieces of information are required to calculate  $\hat{P}(c|d)$? Your answer should include both mathematical notatation and verbal explanation.


* __b) short response:__ The Enron data includes two classes of documents: `spam` and `ham` (they're actually labeled `1` and `0`). In plain English, explain what  $\hat{P}(c)$ and   $\hat{P}(t_{k} | c)$ mean in the context of this data. How will we would estimate these values from a training corpus? How many passes over the data would we need to make to retrieve this information for all classes and all words?


* __c) hand calculations:__ Above we've reproduced the document classification example from the textbook (we added an empty subject field to mimic the Enron data format). Remember that the classes in this "Chinese Example" are `1` (about China) and `0` (not about China). Calculate the class priors and the conditional probabilities for an __unsmoothed__ Multinomial Naive Bayes model trained on this data. Show the calculations that lead to your result using markdown and LaTeX in the space provided or by embedding an image of your hand written work. [`NOTE:` _Your results should NOT match those in the text -- they are training a model with +1 smoothing you are training a model without smoothing_]


* __d) hand calculations:__ Use the model you trained to classify the following test document: `Chinese Chinese Chinese Tokyo Japan`. Show the calculations that lead to your result using markdown and LaTeX in the space provided or by embedding an image of your hand written work.


* __e) short response:__ Compare the classification you get from this unsmoothed model in `d`/`e` to the results in the textbook's "Example 1" which reflects a model with Laplace plus 1 smoothing. How does smoothing affect our inference?

### Q7 Student Answers:
> __a)__ You need the prior probability of a document occurring with class c:

> $P(c)$

> You also need the product of the conditional probabilities of each term in the document occurring in a document of class c:

> $\prod_{1\le k \le n_d} P(t_k | c)$

> __b)__ $\hat{P}(c)$ is the probability that an email is classified as c (spam or ham) as estimated from the training set. This should be fairly easy to estimate with one pass through the data, counting the number of emails in each class. 

> $\hat{P}(t_{k} | c)$ is the probability of a term occurring in a doccument of this class, again estimated from the training set. As we saw in the previous question, we can make one MapReduce run to get this information for every word in every class, followed by one final pass through the corpus (that can probably be linear with the right sorting) to calculate the actual probabilities. 

> __c)__ Show your calculations here using markdown & LaTeX or embed them below!

> $P(c=1) = 3/4 = 0.75$

> $P(c=0) = 1/4 = 0.25$

> $P(Chinese | c=1) = 5/8$

> $P(Chinese | c=0) = 1/3$

> $P(Beijing | c=1) = 1/8$

> $P(Beijing | c=0) = 0$

> $P(Shanghai | c=1) = 1/8$

> $P(Shanghai | c=0) = 0$

> $P(Macao | c=1) = 1/8$

> $P(Macao | c=0) = 0$

> $P(Tokyo | c=1) = 0$

> $P(Tokyo | c=0) = 1/3$

> $P(Japan | c=1) = 0$

> $P(Japan | c=0) = 1/3$

> __d)__ Show your calculations here using markdown & LaTeX or embed them below!

> $P(c=1 | Chinese Chinese Chinese Tokyo Japan ) = P(c=1)* P(Chinese|c=1)*P(Chinese|c=1)* P(Chinese|c=1)* P(Tokyo|c=1)* P(Japan|c=1)=(3/4)*(5/8)*(5/8)*(5/8)*0*0 = 0$

> __e)__ Our calculations show that the document needs to be classified as non-Chinese because we have conditional probabilities for terms like Tokyo to be in documents classified as Chinese. Laplace smoothing allows us to take into account the fact that we probably haven't seen all possible samples, so it adds one to each conditional probability calculation in order to take this into account. With Laplace smoothing, the zero conditional probabilities from Tokyo and Japan become non-zero and allow us to do a more accurate calculation. 


In [None]:
# part d/e - if you didn't write out your calcuations above, embed a picture of them here:
from IPython.display import Image
Image(filename="path-to-hand-calulations-image.png")

# Question 8: Naive Bayes Inference.
In the next two questions you'll write code to parallelize the Naive Bayes calculations that you performed above. We'll do this in two phases: one MapReduce job to perform training and a second MapReduce to perform inference. While in practice we'd need to train a model before we can use it to classify documents, for learning purposes we're going to develop our code in the opposite order. By first focusing on the pieces of information/format we need to perform the classification (inference) task you should find it easier to develop a solid implementation for training phase when you get to question 9 below. In both of these questions we'll continue to use the Chinese example corpus from the textbook to help us test our MapReduce code as we develop it. Below we've reproduced the corpus, test set and model in text format that matches the Enron data.

### Q8 Tasks:
* __a) short response:__ run the provided cells to create the example files and load them in to HDFS. Then take a closer look at __`chinese/model.txt`__. This text file represents a Naive Bayes model trained (with Laplace +1 smoothing) on the example corpus. What are the 'keys' and 'values' in this file? Which record means something slightly different than the rest? The value field of each record includes two numbers which will be helpful for debugging but which we don't actually need to perform inference -- what are they? [`HINT`: _This file represents the model from Example 13.1 in the textbook, if you're having trouble getting oriented try comparing our file to the numbers in that example._]


* __b) short response:__ When performing Naive Bayes in practice instead of multiplying the probabilities (as in equation 13.3) we add their logs (as in equation 13.4). Why do we choose to work with log probabilities? If we had an unsmoothed model, what potential error could arise from this transformation?


* __c) short response:__ Documents 6 and 8 in the test set include a word that did not appear in the training corpus (and as a result does not appear in the model). What should we do at inference time when we need a class conditional probability for this word?


* __d) short response:__ The goal of our MapReduce job is to stream over the test set and classify each document by peforming the calculation from equation 13.4. To do this we'll load the model file (which contains the probabilities for equation 13.4) into memory on the nodes where we do our mapping. This is called an in-memory join, we'll learn more about those in Week 5. For now, explain how this is a slight departure from one of the functional programming principles. From a scalability perspective when might this departure be justified? when would it be unwise?


* __e) code:__ Complete the code in __`NaiveBayes/classify_mapper.py`__. Read the docstring carefully to understand how this script should work and the format it should return. Run the provided unit tests to confirm that your script works as expected then write a Hadoop streaming job to classify the Chinese example test set. [`HINT 1:` _you shouldn't need a reducer for this one._ `HINT 2:` _Don't forget to add the model file to the_ `-files` _parameter in your Hadoop streaming job so that it gets shipped to the mapper nodes where it will be accessed by your script._]


* __f) short response:__ In our test example and in the Enron data set we have fairly short documents. Since these fit fine in memory on a mapper node we didn't need a reducer and could just do all of our calculations in the mapper. However with much longer documents (eg. books) we might want a higher level of parallelization -- for example we might want to process parts of a document on different nodes. In this hypothetical scenario how would our algorithm design change? What could the mappers still do? What key-value structure would they emit? What would the reducers have to do as a last step?

### Q8 Student Answers:
> __a)__ The key is the word in each line, the values are the fields after that. The first field shows how many times this word appears in c=0 classifications, the second field shows how many times this word appears in c=1 classifications, the third field shows the conditional probability of seeing this word given that c=0, and the final field shows the conditional probability of seeing this word given that c=1. We only use the final two fields for inference, so the first and second fields, showing the counts, can be used for debugging. The ClassPriors record is different from the rest because it signifies the class priors. 

> __b)__ When we work with logs, our long product of conditional probabilities turns into a summation. If it is a product, any single probability that is zero will make the whole result zero. Using logs and summation means that instead of zero, we get something very close to zero, but which still has a value. 

> __c)__ According to Figure 13.3 in the textbook, this word will be ignored and will not add anything when we don't have a conditional probability for it. 

> __d)__ This is a departure from functional programming in that our algorithm isn't stateless, since it depends on the results present in memory. This is useful in case where the information in memory is the same everywhere and is a constant. If the information is different across nodes or is being changed during the run, then processing run on one node with an input x might produce different outcomes if run on another node with the same input. 

> __e)__ Complete the coding portion of this question before answering 'f'.

> __f)__ If we want to run multiple mappers on multiple nodes, the mappers can still sum up the log probabilities, but they will need to be reduced together with log probabilities from other parts of the document that ran on different mappers. They will emit the log probabilities with a document ID. The reducers would take this result for a document from several different mappers, sum it up and take the exponent, picking the right classification based on the final two probabilities.


Run these cells to create the example corpus and model.

In [51]:
%%writefile NaiveBayes/chineseTrain.txt
D1	1		Chinese Beijing Chinese
D2	1		Chinese Chinese Shanghai
D3	1		Chinese Macao
D4	0		Tokyo Japan Chinese

Writing NaiveBayes/chineseTrain.txt


In [52]:
%%writefile NaiveBayes/chineseTest.txt
D5	1		Chinese Chinese Chinese Tokyo Japan
D6	1		Beijing Shanghai Trade
D7	0		Japan Macao Tokyo
D8	0		Tokyo Japan Trade

Writing NaiveBayes/chineseTest.txt


In [53]:
%%writefile NBmodel.txt
beijing	0.0,1.0,0.111111111111,0.142857142857
chinese	1.0,5.0,0.222222222222,0.428571428571
tokyo	1.0,0.0,0.222222222222,0.0714285714286
shanghai	0.0,1.0,0.111111111111,0.142857142857
ClassPriors	1.0,3.0,0.25,0.75
japan	1.0,0.0,0.222222222222,0.0714285714286
macao	0.0,1.0,0.111111111111,0.142857142857

Writing NBmodel.txt


In [54]:
# load the data files into HDFS
!hdfs dfs -copyFromLocal NaiveBayes/chineseTrain.txt {HDFS_DIR}
!hdfs dfs -copyFromLocal NaiveBayes/chineseTest.txt {HDFS_DIR}

Your work for `part e` starts here:

In [55]:
# part e - do your work in NaiveBayes/classify_mapper.py first, then run this cell.
!chmod a+x NaiveBayes/classify_mapper.py

In [56]:
# part e - unit test NaiveBayes/classify_mapper.py (RUN THIS CELL AS IS)
!cat NaiveBayes/chineseTest.txt | NaiveBayes/classify_mapper.py

d5	1	0.000135480702467	0.000301213779972	1
d6	1	0.00308641975308	0.0153061224489	1
d7	0	0.00137174211248	0.000546647230321	0
d8	0	0.0123456790123	0.00382653061225	0


In [59]:
# part e - clear the output directory in HDFS (RUN THIS CELL AS IS)
!hdfs dfs -rm -r {HDFS_DIR}/chinese-output

Deleted /user/root/HW2/chinese-output


In [60]:
# part e - write your Hadooop streaming job here
!hadoop jar {JAR_FILE} \
  -files NaiveBayes/classify_mapper.py,NBmodel.txt \
  -mapper classify_mapper.py \
  -reducer /bin/cat \
  -input {HDFS_DIR}/chineseTest.txt \
  -output {HDFS_DIR}/chinese-output \
  -cmdenv PATH={PATH} 

packageJobJar: [] [/usr/jars/hadoop-streaming-2.6.0-cdh5.7.0.jar] /tmp/streamjob112883358079335547.jar tmpDir=null
18/01/22 20:54:02 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/01/22 20:54:02 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/01/22 20:54:03 INFO mapred.FileInputFormat: Total input paths to process : 1
18/01/22 20:54:03 INFO mapreduce.JobSubmitter: number of splits:2
18/01/22 20:54:03 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1516450633555_0027
18/01/22 20:54:03 INFO impl.YarnClientImpl: Submitted application application_1516450633555_0027
18/01/22 20:54:03 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1516450633555_0027/
18/01/22 20:54:03 INFO mapreduce.Job: Running job: job_1516450633555_0027
18/01/22 20:54:08 INFO mapreduce.Job: Job job_1516450633555_0027 running in uber mode : false
18/01/22 20:54:08 INFO mapreduce.Job:  map 0% reduce 0%
18/01/22 20:54:

In [61]:
# part e - retrieve test set results from HDFS
!hdfs dfs -cat {HDFS_DIR}/chinese-output/part-000* > NaiveBayes/chineseResults.txt

In [62]:
# part e - take a look
!cat NaiveBayes/chineseResults.txt

d5	1	0.000135480702467	0.000301213779972	1
d6	1	0.00308641975308	0.0153061224489	1
d7	0	0.00137174211248	0.000546647230321	0
d8	0	0.0123456790123	0.00382653061225	0


<table>
<th> Expected output for the test set:</th>
<tr align=Left><td><pre>
d5	1	0.00013548	0.00030121	1
d6	1	0.00308641	0.01530612	1
d7	0	0.00137174	0.00054664	0
d8	0	0.01234567	0.00382653	0
</pre></td><tr>
</table>

# Question 9: Naive Bayes Training.
In Question 8 we used a model that we had trained by hand. Next we'll develop the code to do that same training in parallel, making it suitable for use with larger corpora (like the Enron emails). The end result of the MapReduce job you write in this question should be a model text file that looks just like the example (`NBmodel.txt`) that we created by hand above.

To refresh your memory about the training process take a look at  `7a` and `7b` where you described the pieces of information you'll need to collect in order to encode a Multinomial Naive Bayes model. We now want to retrieve those pieces of information while streaming over a corpus. The bulk of the task will be very similar to the word counting excercises you've already done but you may want to consider a slightly different key-value record structure to efficiently tally counts for each class. 

The most challenging (interesting?) design question will be how to retrieve the totals (# of documents and # of words in documents for each class). Of course, counting these numbers is easy. The hard part is the timing: you'll need to make sure you have the counts totalled up _before_ you start estimating the class conditional probabilities for each word. It would be best (i.e. most scalable) if we could find a way to do this tallying without storing the whole vocabulary in memory... _can you see why this is gong to be a challenge?_ [`HINT:` _There is a problem with trying to tally in the mapper but also a problem with waiting to do it in the reducer. _]. __For part `b` of this question you will receive full credit for any design that results in the correct conditional probabilities _without hard coding the class totals_ (i.e. you _can_ store records in memory if you so choose).__ However, for fun, we challenge you to try and figure out the stateless solution. It involves a technique that you will learn formally next week but we think you may be able to figure it out for yourself. [`HINT:` _Think about Hadoop's default sorting behavior, what determines the order in which records arrive at the reducer?  If we compute partial totals in our mappers can you think of a way to make sure that those numbers arrive at our reducer before any of the other 'regular' key-value pairs?_`]

__`IMPORTANT NOTE:`__ Please use a single reducer for all of the jobs in this question.

### Q9 Tasks:
* __a) make a plan:__  Fill in the docstrings for __`NaiveBayes/train_mapper.py`__ and __`NaiveBayes/train_reducer.py`__ to appropriately reflect the format that each script will input/output. [`HINT:` _the input files_ (`enronemail_1h.txt` & `chineseTrain.txt`) _have a prespecified format and your output file should match_ `NBmodel.txt` _so you really only have to decide on an internal format for Hadoop_].


* __b) implement it:__ Complete the code in __`NaiveBayes/train_mapper.py`__ and __`NaiveBayes/train_reducer.py`__ so that together they train a Multinomial Naive Bayes model __with no smoothing__. Make sure your end result is formatted correctly (see note above). Test your scripts independently and together (using `chineseTrain.txt` or test input of your own devising). When you are satisfied with your Python code design and run a Hadoop streaming command to run your job in parallel on the __chineseTrain.txt__. Confirm that your trained model matches your hand calculations from Question 7.


* __c) short response:__ We saw in Question 7 that adding Laplace +1 smoothing makes our classifications less sensitve to rare words. However implementing this technique requires access to one additional piece of information that we had not previously used in our Naive Bayes training. What is that extra piece of information? [`HINT:` see equation 13.7 in Manning, Raghavan and Schutze].


* __d) short response:__ There are three approaches that we could take to handle the extra piece of information you identified in `c`: 1) we could hard code it into our reducer (_where would we get it in the first place?_). Or 2) we could compute it inside the reducer which would require storing some information in memory (_what information?_). Or 3) we could compute it in the reducer without storing any bulky information in memory but then we'd need some postprocessing or a second MapReduce job to complete the calculation (_why?_). Breifly explain what is non-ideal about each of these options. BONUS: which of these options is incompatible with using multiple reducers?


* __e) code + short response:__ In future weeks we'll learn better ways to handle this kind of situation. For now, chose one of the 3 options above. State your choice & reasoning in the space below then use that strategy to complete the code in __`NaiveBayes/train_reducer_smooth.py`__. Test this alternate reducer then write and run a Hadoop streaming job to train an MNB model with smoothing on the Chinese example. Your results should match the model that we provided for you in Question 9 (and the calculations in the textbook example). [`HINT:` _don't start from scratch with this one -- you can just copy over your reducer code from part `b` and make the needed modifications_].

### Q9 Student Answers:

> __b)__ A note on this: my current solution is limited in that it only works for one reducer, and will need to be rewritten for multiple reducers. One possible solution involves having a second reducer after the first set of reducer(s), which will produce the final model. 

> __ c)__ This extra piece of information is the size of the whole vocabulary, all the unique words used in the training data. This will be 6 in this case. 

> __ d)__ 1) It may be difficult to have this information for a huge collection of documents and a significantly large corpus, since we might need to do an expensive pass of the complete data to calculate it. 2) we would need to store all the unique words we have seen so far while processing, as well as intermediate counts, but this has the drawback of taking up too much memory if we have a large corpus. This might also not be compatible with multiple reducers, since we depend on memory that will not be available across reducers.  3) The problem here is that we might need an extra step to count the total number of unique words at some point in the process. Even if this is after a reducer that has produced a single record per word, we still need to count the number of lines and put this result in memory for the final calculation. 

> __ e)__ I re-wrote my reducer so that it can now be run as multiple reducers for one hadoop job. The data it generates is explained in detail in its docstring. The produced file (intermediate) has V+2 lines, where V is the total number of unique words in all the documents. Counting the lines using wc -l and subtracting two gives us the vocabulary size needed for smoothing. The extra two lines contain information needed to calculate the prior probability and information on the number of words in each class. The final reducer, which has to be a single one, takes this information and does a single loop through the file to calculate conditional and prior probabilities. 


In [68]:
# part a - do your work in train_mapper.py and train_reducer.py then RUN THIS CELL AS IS
!chmod a+x NaiveBayes/train_mapper.py
!chmod a+x NaiveBayes/train_reducer.py
!echo "=========== MAPPER DOCSTRING ============"
!head -n 8 NaiveBayes/train_mapper.py | tail -n 6
!echo "=========== REDUCER DOCSTRING ============"
!head -n 8 NaiveBayes/train_reducer.py | tail -n 6

Mapper reads in text documents and emits word counts by class.
INPUT:
    ID \t SPAM \t SUBJECT \t CONTENT \n
OUTPUT:
    word \t class \t count
    
Reducer aggregates word counts by class and emits frequencies.

INPUT:
    word \t class \t count
OUTPUT:
    word \t num_c0_class \t num_c1_class \t cond_prob_c0 \t cond_prob_c1


__`Q9 part c starts here`:__ MNB _without_ Smoothing (training on Chinese Example Corpus).

In [114]:
# part b - write a unit test for your mapper here
!cat NaiveBayes/chineseTrain.txt | NaiveBayes/train_mapper.py | sort -k1,1 -k2,2n

*	0	1
*	1	1
*	1	1
*	1	1
.	0	1
.	0	1
.	0	1
.	1	1
.	1	1
.	1	1
.	1	1
.	1	1
.	1	1
.	1	1
.	1	1
beijing	1	1
chinese	0	1
chinese	1	1
chinese	1	1
chinese	1	1
chinese	1	1
chinese	1	1
japan	0	1
macao	1	1
shanghai	1	1
tokyo	0	1


In [115]:
# part b - write a unit test for your reducer here
!cat NaiveBayes/chineseTrain.txt | NaiveBayes/train_mapper.py | sort -k1,1 -k2,2n | NaiveBayes/train_reducer.py

ClassPriors	1	3	0.25	0.75
beijing	0	0	0.0	0.0
chinese	1	5	0.333333333333	0.625
japan	1	0	0.333333333333	0.0
macao	0	1	0.0	0.125
shanghai	0	1	0.0	0.125
tokyo	1	0	0.333333333333	0.0


In [116]:
# part b - write a systems test for your mapper + reducer together here
!cat NaiveBayes/chineseTrain.txt | NaiveBayes/train_mapper.py | sort -k1,1 -k2,2n | NaiveBayes/train_reducer.py

ClassPriors	1	3	0.25	0.75
beijing	0	0	0.0	0.0
chinese	1	5	0.333333333333	0.625
japan	1	0	0.333333333333	0.0
macao	0	1	0.0	0.125
shanghai	0	1	0.0	0.125
tokyo	1	0	0.333333333333	0.0


In [108]:
# part b - clear (and name) an output directory in HDFS for your unsmoothed chinese NB model
!hdfs dfs -rm -r {HDFS_DIR}/chinese-unsmooth-output

Deleted /user/root/HW2/chinese-unsmooth-output


In [111]:
# part b - write your hadoop streaming job
!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=2 \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k1,1 -k2,2n" \
  -files NaiveBayes/train_mapper.py,NaiveBayes/train_reducer.py \
  -mapper train_mapper.py \
  -reducer train_reducer.py \
  -input {HDFS_DIR}/chineseTrain.txt \
  -output {HDFS_DIR}/chinese-unsmooth-output \

packageJobJar: [] [/usr/jars/hadoop-streaming-2.6.0-cdh5.7.0.jar] /tmp/streamjob6656376710911277347.jar tmpDir=null
18/01/23 00:29:27 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/01/23 00:29:27 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/01/23 00:29:27 INFO mapred.FileInputFormat: Total input paths to process : 1
18/01/23 00:29:27 INFO mapreduce.JobSubmitter: number of splits:2
18/01/23 00:29:28 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1516450633555_0029
18/01/23 00:29:28 INFO impl.YarnClientImpl: Submitted application application_1516450633555_0029
18/01/23 00:29:28 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1516450633555_0029/
18/01/23 00:29:28 INFO mapreduce.Job: Running job: job_1516450633555_0029
18/01/23 00:29:34 INFO mapreduce.Job: Job job_1516450633555_0029 running in uber mode : false
18/01/23 00:29:34 INFO mapreduce.Job:  map 0% reduce 0%
18/01/23 00:29

In [112]:
# part b - extract your results (i.e. model) to a local file
!hdfs dfs -cat {HDFS_DIR}/chinese-unsmooth-output/part-000* > NaiveBayes/chineseUnsmoothResults.txt

In [113]:
# part b - print your model so that we can confirm that it matches expected results
!cat NaiveBayes/chineseUnsmoothResults.txt

ClassPriors	1	3	0.25	0.75
beijing	0	1	0.0	0.125
chinese	1	5	0.333333333333	0.625
japan	1	0	0.333333333333	0.0
macao	0	1	0.0	0.125
shanghai	0	1	0.0	0.125
tokyo	1	0	0.333333333333	0.0


__`Q9 part e starts here`:__ MNB _with_ Smoothing (training on Chinese Example Corpus).

In [120]:
# part e - write a unit test for your NEW reducer here
# This is an intermediate reducer, see docs for more details
!chmod a+x NaiveBayes/train_reducer_smooth.py
!cat NaiveBayes/chineseTrain.txt | NaiveBayes/train_mapper.py | sort -k1,1 -k2,2n | NaiveBayes/train_reducer_smooth.py

*	1	3
.	3	8
beijing	0	1
chinese	1	5
japan	1	0
macao	0	1
shanghai	0	1
tokyo	1	0


In [132]:
# Counting the lines from this reducer, as we get the number of unique words 
# if we subtract two from the number. we save this in a txt file for use in next reducer
!cat NaiveBayes/chineseTrain.txt | NaiveBayes/train_mapper.py | sort -k1,1 -k2,2n | NaiveBayes/train_reducer_smooth.py | wc -l > NaiveBayes/p9WordCount.txt

In [137]:
# Passing all the data to the last reducer, which has to be single
!cat NaiveBayes/chineseTrain.txt | NaiveBayes/train_mapper.py | sort -k1,1 -k2,2n | NaiveBayes/train_reducer_smooth.py | sort -k1,1 -k2,2n | NaiveBayes/train_single_reducer.py 

ClassPriors	1	3	0.25	0.75
beijing	0	1	0.111111111111	0.142857142857
chinese	1	5	0.222222222222	0.428571428571
japan	1	0	0.222222222222	0.0714285714286
macao	0	1	0.111111111111	0.142857142857
shanghai	0	1	0.111111111111	0.142857142857
tokyo	1	0	0.222222222222	0.0714285714286


In [None]:
# part e - clear (and name) an output directory in HDFS for your SMOOTHED chinese NB model
!hdfs dfs -rm -r {HDFS_DIR}/chinese-smooth-output-intermediate
!hdfs dfs -rm -r {HDFS_DIR}/chinese-smooth-output-final

In [138]:
# part e - write your hadoop streaming job

# we will use two hadoop streaming jobs with a count in the middle, as we want to apply a second reducer on the results of our first reducer
!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=2 \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k1,1 -k2,2n" \
  -files NaiveBayes/train_mapper.py,NaiveBayes/train_reducer_smooth.py \
  -mapper train_mapper.py \
  -reducer train_reducer_smooth.py \
  -input {HDFS_DIR}/chineseTrain.txt \
  -output {HDFS_DIR}/chinese-smooth-output-intermediate \


packageJobJar: [] [/usr/jars/hadoop-streaming-2.6.0-cdh5.7.0.jar] /tmp/streamjob3938595312589241639.jar tmpDir=null
18/01/23 22:07:05 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/01/23 22:07:06 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/01/23 22:07:06 INFO mapred.FileInputFormat: Total input paths to process : 1
18/01/23 22:07:06 INFO mapreduce.JobSubmitter: number of splits:2
18/01/23 22:07:06 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1516450633555_0030
18/01/23 22:07:06 INFO impl.YarnClientImpl: Submitted application application_1516450633555_0030
18/01/23 22:07:07 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1516450633555_0030/
18/01/23 22:07:07 INFO mapreduce.Job: Running job: job_1516450633555_0030
18/01/23 22:07:12 INFO mapreduce.Job: Job job_1516450633555_0030 running in uber mode : false
18/01/23 22:07:12 INFO mapreduce.Job:  map 0% reduce 0%
18/01/23 22:07

In [139]:
# write and extract intermediate results
!hdfs dfs -cat {HDFS_DIR}/chinese-smooth-output-intermediate/part-000* > NaiveBayes/chineseSmoothResultsIntermediate.txt

In [140]:
# print intermediate results and write line count to file, to be used by last reducer
!cat NaiveBayes/chineseSmoothResultsIntermediate.txt | wc -l > NaiveBayes/p9WordCount.txt
!cat NaiveBayes/chineseSmoothResultsIntermediate.txt

*	1	3
.	3	8
beijing	0	1
chinese	1	5
japan	1	0
macao	0	1
shanghai	0	1
tokyo	1	0


In [141]:
# write second hadoop job with no mapper and just the new reducer
!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=2 \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k1,1 -k2,2n" \
  -files NaiveBayes/p9WordCount.txt,NaiveBayes/train_single_reducer.py \
  -mapper /bin/cat \
  -reducer train_single_reducer.py \
  -input {HDFS_DIR}/chinese-smooth-output-intermediate \
  -output {HDFS_DIR}/chinese-smooth-output-final \

packageJobJar: [] [/usr/jars/hadoop-streaming-2.6.0-cdh5.7.0.jar] /tmp/streamjob4775445200809144717.jar tmpDir=null
18/01/23 22:08:13 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/01/23 22:08:14 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/01/23 22:08:14 INFO mapred.FileInputFormat: Total input paths to process : 1
18/01/23 22:08:14 INFO mapreduce.JobSubmitter: number of splits:2
18/01/23 22:08:14 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1516450633555_0031
18/01/23 22:08:15 INFO impl.YarnClientImpl: Submitted application application_1516450633555_0031
18/01/23 22:08:15 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1516450633555_0031/
18/01/23 22:08:15 INFO mapreduce.Job: Running job: job_1516450633555_0031
18/01/23 22:08:21 INFO mapreduce.Job: Job job_1516450633555_0031 running in uber mode : false
18/01/23 22:08:21 INFO mapreduce.Job:  map 0% reduce 0%
18/01/23 22:08

In [142]:
# part e - extract your results (i.e. model) to a local file
!hdfs dfs -cat {HDFS_DIR}/chinese-smooth-output-final/part-000* > NaiveBayes/chineseSmoothResultsFinal.txt

In [143]:
!cat NaiveBayes/chineseSmoothResultsFinal.txt

ClassPriors	1	3	0.25	0.75
beijing	0	1	0.111111111111	0.142857142857
chinese	1	5	0.222222222222	0.428571428571
japan	1	0	0.222222222222	0.0714285714286
macao	0	1	0.111111111111	0.142857142857
shanghai	0	1	0.111111111111	0.142857142857
tokyo	1	0	0.222222222222	0.0714285714286


# Question 10: Enron Ham/Spam NB Classifier & Results.

Fantastic work. We're finally ready to perform Spam Classification on the Enron Corpus. In this final homework question you'll run the analysis you've developed, report its performance, and draw some conclusions.

### Q10 Tasks:
* __a) train/test split:__ Run the provided code to split our Enron file into a training set and testing set then load them into HDFS. [`NOTE:` _If you hard coded the vocab size in question 9d make sure you re calculate the vocab size for just the training set!_]


* __b) train 2 models:__ Write Hadoop Streaming jobs to train MNB Models on the training set with and without smoothing. Save your models to local files at __`NaiveBayes/Unsmoothed/NBmodel.txt`__ and __`NaiveBayes/Smoothed/NBmodel.txt`__. [`NOTE:` _This naming is important because we wrote our classification task so that it expects a file of that name... if this inelegance frustrates you there is an alternative that would involve a few adjustments to your code [read more about it here](http://www.tnoda.com/blog/2013-11-23). If you can let it slide for now, MRJob (the next framework we'll learn) will have an easier way to handle this._] Finally run the checks that we provide to confirm that your results are correct.


* __c) code:__ Recall that we designed our classification job with just a mapper. An efficient way to report the performance of our models would be to simply add a reducer phase to this job and compute precision and recall right there. Complete the code in __`NaiveBayes/evaluation_reducer.py`__ and then write Hadoop jobs to evaluate your two models on the test set. Report their performance side by side. [`NOTE:` if you need a refresher on precision, recall and F1-score [Wikipedia](https://en.wikipedia.org/wiki/F1_score) is a good resource.]


* __d) short response:__ Compare the performance of your two models. What do you notice about the unsmoothed model's predictions? Can you guess why this is happening? Which evaluation measure do you think is most relevant in our use case? [`NOTE:` _Feel free to answer using your common sense but if you want more information on evaluating the classification task checkout_ [this blogpost](https://tryolabs.com/blog/2013/03/25/why-accuracy-alone-bad-measure-classification-tasks-and-what-we-can-do-about-it/
) or [this paper](http://www.flinders.edu.au/science_engineering/fms/School-CSEM/publications/tech_reps-research_artfcts/TRRA_2007.pdf
)]


* __e) code + short response:__ Let's look at the top ten words with the highest conditional probability in `Spam` and in `Ham`. We'll do this by writing a Hadoop job that sorts the model file (`NaiveBayes/Smoothed/NBmodel.py`). Normally we'd have to run two jobs -- one that sorts on $P(word|ham)$ and another that sorts on $P(word|spam)$. However if we slighly modify the data format in the model file then we can get the top words in each class with just one job. We've written a mapper that will do just this for you. Read through __`NaiveBayes/model_sort_mapper.py`__ and then briefly explain how this mapper will allow us to partition and sort our model file. Write a Hadoop job that uses our mapper and `/bin/cat` for a reducer to partition and sort. Print out the top 10 words in each class (where 'top' == highest conditional probability).[`HINT:` _this should remind you a lot of what we did in Question 6._]


* __f) short response:__ What do you notice about the 'top words' we printed in `e`? How would increasing the smoothing parameter 'k' affect the probabilities for the top words that you identified for 'e'. How would they affect the probabilities of words that occur much more in one class than another? In summary, how does the smoothing parameter 'k' affect the bias and the variance of our model. [`NOTE:` _you do not need to code anything for this task, but if you are struggling with it you could try changing 'k' and see what happens to the test set. We don't recommend doing this exploration with the Enron data because it will be harder to see the impact with such a big vocabulary_]

### Q10 Student Answers:
> __d)__ 

> __e)__ Type your answer here!

> __f)__ Type your answer here!

__Test/Train split__

In [144]:
# part a - test/train split (RUN THIS CELL AS IS)
!head -n 80 data/enronemail_1h.txt > data/enron_train.txt
!tail -n 20 data/enronemail_1h.txt > data/enron_test.txt
!hdfs dfs -copyFromLocal data/enron_train.txt {HDFS_DIR}
!hdfs dfs -copyFromLocal data/enron_test.txt {HDFS_DIR}

__Training__ (Enron MNB Model _without smoothing_ )

In [173]:
# part b -  Unsmoothed model (FILL IN THE MISSING CODE BELOW)

# clear the output directory
!hdfs dfs -rm -r {HDFS_DIR}/enron-model

# hadoop command
!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=2 \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k1,1 -k2,2n" \
  -files NaiveBayes/train_mapper.py,NaiveBayes/train_reducer.py \
  -mapper train_mapper.py \
  -reducer train_reducer.py \
  -input {HDFS_DIR}/enron_train.txt \
  -output {HDFS_DIR}/enron-model \
# save the model locally
!mkdir NaiveBayes/Unsmoothed
!hdfs dfs -cat {HDFS_DIR}/enron-model/part-000* > NaiveBayes/Unsmoothed/NBmodel.txt

Deleted /user/root/HW2/enron-model
packageJobJar: [] [/usr/jars/hadoop-streaming-2.6.0-cdh5.7.0.jar] /tmp/streamjob3429723021223339073.jar tmpDir=null
18/01/24 17:52:31 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/01/24 17:52:31 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/01/24 17:52:32 INFO mapred.FileInputFormat: Total input paths to process : 1
18/01/24 17:52:32 INFO mapreduce.JobSubmitter: number of splits:2
18/01/24 17:52:32 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1516450633555_0042
18/01/24 17:52:32 INFO impl.YarnClientImpl: Submitted application application_1516450633555_0042
18/01/24 17:52:32 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1516450633555_0042/
18/01/24 17:52:32 INFO mapreduce.Job: Running job: job_1516450633555_0042
18/01/24 17:52:38 INFO mapreduce.Job: Job job_1516450633555_0042 running in uber mode : false
18/01/24 17:52:38 INFO mapreduce.Jo

In [174]:
# part b - check your UNSMOOTHED model results (RUN THIS CELL AS IS)
!grep assistance NaiveBayes/Unsmoothed/NBmodel.txt
# EXPECTED OUTPUT: assistance	2,4,0.000172547666293,0.000296823983378

assistance	2,4,0.000172547666293,0.000296823983378


In [175]:
# part b - check your UNSMOOTHED model results (RUN THIS CELL AS IS)
!grep money NaiveBayes/Unsmoothed/NBmodel.txt
# EXPECTED OUTPUT: money	1,22,8.62738331464e-05,0.00163253190858

money	1,22,8.62738331464e-05,0.00163253190858


__Training__ (Enron MNB Model _with Laplace +1 smoothing_ )

In [176]:
# part b -  Smoothed model (FILL IN THE MISSING CODE BELOW)

# clear the output directory
!hdfs dfs -rm -r {HDFS_DIR}/smooth-model
!hdfs dfs -rm -r {HDFS_DIR}/smooth-model-output-intermediate

# hadoop command
# we will use two hadoop streaming jobs with a count in the middle, as we want to apply a second reducer on the results of our first reducer

!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=2 \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k1,1 -k2,2n" \
  -files NaiveBayes/train_mapper.py,NaiveBayes/train_reducer_smooth.py \
  -mapper train_mapper.py \
  -reducer train_reducer_smooth.py \
  -input {HDFS_DIR}/enron_train.txt \
  -output {HDFS_DIR}/smooth-model-output-intermediate


# write and extract intermediate results
!mkdir NaiveBayes/Smoothed
!hdfs dfs -cat {HDFS_DIR}/smooth-model-output-intermediate/part-000* > NaiveBayes/Smoothed/Intermediate.txt


Deleted /user/root/HW2/smooth-model
Deleted /user/root/HW2/smooth-model-output-intermediate
packageJobJar: [] [/usr/jars/hadoop-streaming-2.6.0-cdh5.7.0.jar] /tmp/streamjob7100281208411216763.jar tmpDir=null
18/01/24 17:53:15 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/01/24 17:53:15 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/01/24 17:53:16 INFO mapred.FileInputFormat: Total input paths to process : 1
18/01/24 17:53:16 INFO mapreduce.JobSubmitter: number of splits:2
18/01/24 17:53:16 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1516450633555_0043
18/01/24 17:53:16 INFO impl.YarnClientImpl: Submitted application application_1516450633555_0043
18/01/24 17:53:16 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1516450633555_0043/
18/01/24 17:53:16 INFO mapreduce.Job: Running job: job_1516450633555_0043
18/01/24 17:53:22 INFO mapreduce.Job: Job job_1516450633555_0043 running

In [177]:
# print intermediate results and write line count to file, to be used by last reducer
!cat NaiveBayes/Smoothed/Intermediate.txt | wc -l > NaiveBayes/Smoothed/p9WordCount.txt

# write second hadoop job with no mapper and just the new reducer
!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=2 \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k1,1 -k2,2n" \
  -files NaiveBayes/Smoothed/p9WordCount.txt,NaiveBayes/train_single_reducer.py \
  -mapper /bin/cat \
  -reducer train_single_reducer.py \
  -input {HDFS_DIR}/smooth-model-output-intermediate \
  -output {HDFS_DIR}/smooth-model 

# save the model locally
!hdfs dfs -cat {HDFS_DIR}/smooth-model/part-000* > NaiveBayes/Smoothed/NBmodel.txt

packageJobJar: [] [/usr/jars/hadoop-streaming-2.6.0-cdh5.7.0.jar] /tmp/streamjob3709233855020527961.jar tmpDir=null
18/01/24 17:53:49 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/01/24 17:53:49 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/01/24 17:53:49 INFO mapred.FileInputFormat: Total input paths to process : 1
18/01/24 17:53:49 INFO mapreduce.JobSubmitter: number of splits:2
18/01/24 17:53:50 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1516450633555_0044
18/01/24 17:53:50 INFO impl.YarnClientImpl: Submitted application application_1516450633555_0044
18/01/24 17:53:50 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1516450633555_0044/
18/01/24 17:53:50 INFO mapreduce.Job: Running job: job_1516450633555_0044
18/01/24 17:53:56 INFO mapreduce.Job: Job job_1516450633555_0044 running in uber mode : false
18/01/24 17:53:56 INFO mapreduce.Job:  map 0% reduce 0%
18/01/24 17:54

In [178]:
# part b - check your SMOOTHED model results (RUN THIS CELL AS IS)
!grep assistance NaiveBayes/Smoothed/NBmodel.txt
# EXPECTED OUTPUT: assistance	2,4,0.000185804533631,0.000277300205202

assistance	2,4,0.000185804533631,0.000277300205202


In [179]:
# part b - check your SMOOTHED model results (RUN THIS CELL AS IS)
!grep money NaiveBayes/Smoothed/NBmodel.txt
# EXPECTED OUTPUT: money	1,22,0.000123869689087,0.00127558094393

money	1,22,0.000123869689087,0.00127558094393


__Evaluation__

In [162]:
# part c - write your code in NaiveBayes/evaluation_reducer.py then RUN THIS CELL
!chmod a+x NaiveBayes/evaluation_reducer.py

In [168]:
# part c - unit test your evaluation job on the chinese model (RUN THIS CELL AS IS)
!cat NaiveBayes/chineseTest.txt | NaiveBayes/classify_mapper.py 
!cat NaiveBayes/chineseTest.txt | NaiveBayes/classify_mapper.py | NaiveBayes/evaluation_reducer.py

d5	1	0.000135480702467	0.000301213779972	1
d6	1	0.00308641975308	0.0153061224489	1
d7	0	0.00137174211248	0.000546647230321	0
d8	0	0.0123456790123	0.00382653061225	0
d5	1	0.000135480702467	0.000301213779972	True
d6	1	0.00308641975308	0.0153061224489	True
d7	0	0.00137174211248	0.000546647230321	True
d8	0	0.0123456790123	0.00382653061225	True
Precision 1.0 
Recall 1.0 
Accuracy 1.0 
F-score 1.0 


In [180]:
# part c - Evaluate the UNSMOOTHED Model Here (FILL IN THE MISSING CODE)

# clear output directory
!hdfs dfs -rm -r {HDFS_DIR}/classify-unsmooth-output
# hadoop job
!hadoop jar {JAR_FILE} \
  -files NaiveBayes/classify_mapper.py,NaiveBayes/evaluation_reducer.py,NaiveBayes/Unsmoothed/NBmodel.txt \
  -mapper classify_mapper.py \
  -reducer evaluation_reducer.py \
  -input {HDFS_DIR}/enron_test.txt \
  -output {HDFS_DIR}/classify-unsmooth-output \
  -cmdenv PATH={PATH} 


# retrieve results locally
!hdfs dfs -cat {HDFS_DIR}/classify-unsmooth-output/part-000* > NaiveBayes/Unsmoothed/results.txt

Deleted /user/root/HW2/classify-unsmooth-output
packageJobJar: [] [/usr/jars/hadoop-streaming-2.6.0-cdh5.7.0.jar] /tmp/streamjob1845833187154449192.jar tmpDir=null
18/01/24 17:54:35 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/01/24 17:54:35 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/01/24 17:54:36 INFO mapred.FileInputFormat: Total input paths to process : 1
18/01/24 17:54:36 INFO mapreduce.JobSubmitter: number of splits:2
18/01/24 17:54:36 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1516450633555_0045
18/01/24 17:54:36 INFO impl.YarnClientImpl: Submitted application application_1516450633555_0045
18/01/24 17:54:36 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1516450633555_0045/
18/01/24 17:54:36 INFO mapreduce.Job: Running job: job_1516450633555_0045
18/01/24 17:54:42 INFO mapreduce.Job: Job job_1516450633555_0045 running in uber mode : false
18/01/24 17:54:42 INFO

In [181]:
# part c - Evaluate the SMOOTHED Model Here (FILL IN THE MISSING CODE)

# clear output directory
!hdfs dfs -rm -r {HDFS_DIR}/classify-smooth-output
# hadoop job
!hadoop jar {JAR_FILE} \
  -files NaiveBayes/classify_mapper.py,NaiveBayes/evaluation_reducer.py,NaiveBayes/Smoothed/NBmodel.txt \
  -mapper classify_mapper.py \
  -reducer evaluation_reducer.py \
  -input {HDFS_DIR}/enron_test.txt \
  -output {HDFS_DIR}/classify-smooth-output \
  -cmdenv PATH={PATH} 


# retrieve results locally
!hdfs dfs -cat {HDFS_DIR}/classify-smooth-output/part-000* > NaiveBayes/Smoothed/results.txt

rm: `/user/root/HW2/classify-smooth-output': No such file or directory
packageJobJar: [] [/usr/jars/hadoop-streaming-2.6.0-cdh5.7.0.jar] /tmp/streamjob1748274860188178654.jar tmpDir=null
18/01/24 17:55:04 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/01/24 17:55:05 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/01/24 17:55:05 INFO mapred.FileInputFormat: Total input paths to process : 1
18/01/24 17:55:05 INFO mapreduce.JobSubmitter: number of splits:2
18/01/24 17:55:05 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1516450633555_0046
18/01/24 17:55:06 INFO impl.YarnClientImpl: Submitted application application_1516450633555_0046
18/01/24 17:55:06 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1516450633555_0046/
18/01/24 17:55:06 INFO mapreduce.Job: Running job: job_1516450633555_0046
18/01/24 17:55:11 INFO mapreduce.Job: Job job_1516450633555_0046 running in uber mode : false

In [182]:
# part c - display results (RUN THIS CELL AS IS*)
# NOTE: *feel free to modify the tail commands to match the format of your results file
print '=========== UNSMOOTHED MODEL ============'
!tail -n 7 NaiveBayes/Unsmoothed/results.txt
print '=========== SMOOTHED MODEL ============'
!tail -n 9 NaiveBayes/Smoothed/results.txt

0018.1999-12-14.kaminski	0	0.0	0.0	True
0018.2001-07-13.sa_and_hp	1	0.0	0.0	False
0018.2003-12-18.gp	1	0.0	0.0	False
no precision	
no recall	
Accuracy 0.45 	
no f-score	
0017.2004-08-01.bg	1	2.73878566681e-62	1.95150799641e-59	True
0017.2004-08-02.bg	1	0.0	0.0	False
0018.1999-12-14.kaminski	0	0.0	0.0	True
0018.2001-07-13.sa_and_hp	1	0.0	0.0	False
0018.2003-12-18.gp	1	0.0	0.0	False
Precision 0.75 	
Recall 0.545454545455 	
Accuracy 0.65 	
F-score 0.631578947368 	


__`EXPECTED RESULTS:`__ 
<table>
<th>Unsmoothed Model</th>
<th>Smoothed Model</th>
<tr>
<td><pre>
# Documents:	20
True Positives:	0
True Negatives:	9
False Positives:	0
False Negatives:	11
Accuracy	0.45
Recall	0.0
</pre></td>
<td><pre>
# Documents:	20
True Positives:	6
True Negatives:	7
False Positives:	2
False Negatives:	5
Accuracy	0.65
Precision	0.75
Recall	0.5454
F-Score	0.6315
</pre></td>
</tr>
</table>

__`NOTE:`__ _Don't be too disappointed if these seem low to you. We've trained and tested on a very very small corpus... bigger datasets coming soon!_

__`Q10 part d starts here:`__

In [None]:
# part d - write your Hadoop job here (sort smoothed model on P(word|class))

# clear output directory

# hadoop job






In [None]:
# part d - print top words in each class


# Bonus Question: Sorting with multiple Reducers (Optional)

Congratulations you've completed all the required questions! If you have energy for more, let's take a moment to preview one of the important design issues that we'll tackle next week: total order sort.

In the jobs in this assignment we've mostly used a single reducer and mostly used the default Hadoop sorting (alphabetical by key). However when we took the time to specify an alternate key to sort on and tried using multiple reducers (as we did in Questions 5d, 6e and 10d)... we saw that sorting and partitioning the data on different fields can cause our final output to no longer be sorted from top to bottom. Instead it will usually be sorted just within each partition. This is called a partial sort and in the examples we saw that was mostly what we wanted.

So far, we've also always partitioned on a categorical field. But what if we wanted to sort on values (eg. counts in a word count file) but have too much data to fit on a single reducer and there isn't a logical categorical variable to use as a partition key? How could we ensure the the top-n words end up in `part-00000`, the next n end up in `part-00001`, etc? The solution is similar in idea to what we did in Question 10b (where we created extra fields to partition on)... but a bit more complicated because the way that Hadoop orders its partitions and writes them to file depends on an internal hash function (eg. we might expect the partition with key `ham` to end up in `part-00000` and `spam` to end up in `part-00001` but Hadoop won't necessarily order the words the way a human reader would).

__Challenge Exercise:__ Write a Hadoop job that will use 5 reducers and will sort your `NBmodel.txt` file (you can use the smoothed or unsmoothed one) in descending order according to the words' conditional probability in `spam`. A few tips:
* start by reading the [Total Sort Notebook](https://github.com/UCB-w261/main/blob/master/Resources/TotalSortGuide/_total-sort-guide-spark2.01-JAN27-2017.ipynb) that is provided in the course repo under the HelpfulResources folder.
* you will need to write a mapper for this job, you can do this easily on the fly using the jupyter magic command `%%writefile`.
* when you run your job, you should see that the records in `part-00000` have higher conditional probabilities in Spam, than the records in `part-00001` which in turn are higher than in `part-00002` etc.

In [None]:
%%writefile demo/bonus_mapper.py
"""
Write your mapper here!
"""


In [None]:
# BONUS - write your hadoop job here!

In [None]:
# BONUS - show us it worked here!

# HW2 ends here, please refer to the `README.md` for submission instructions.