# HW 2 - Naive Bayes in Hadoop MR
__`MIDS w261: Machine Learning at Scale | UC Berkeley School of Information | Fall 2018`__

In the live sessions for week 2 and week 3 you got some practice designing and debugging Hadoop Streaming jobs. In this homework we'll use Hadoop MapReduce to implement your first parallelized machine learning algorithm: Naive Bayes. As you develop your implementation you'll test it on a small dataset that matches the 'Chinese Example' in the _Manning, Raghavan and Shutze_ reading for Week 2. For the main task in this assignment you'll be working with a small subset of the Enron Spam/Ham Corpus. By the end of this assignment you should be able to:
* __... describe__ the Naive Bayes algorithm including both training and inference.
* __... perform__ EDA on a corpus using Hadoop MR.
* __... implement__ parallelized Naive Bayes.
* __... constrast__ partial, unordered and total order sort and their implementations in Hadoop Streaming.
* __... explain__ how smoothing affects the bias and variance of a Multinomial Naive Bayes model.

As always, your work will be graded both on the correctness of your output and on the clarity and design of your code. __Please refer to the `README` for homework submission instructions.__ 

## Notebook Setup
Before starting, run the following cells to confirm your setup.

In [2]:
# imports
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
%reload_ext autoreload
%autoreload 2

In [3]:
%cd /media/notebooks/Assignments/HW2

/media/notebooks/Assignments/HW2


In [4]:
# global vars (paths) - ADJUST AS NEEDED
#JAR_FILE = "/usr/lib/hadoop-mapreduce/hadoop-streaming.jar"
JAR_FILE = "/usr/lib/hadoop/hadoop-streaming-3.2.2.jar"
HDFS_DIR = "/user/root/HW2/"
HOME_DIR = "/media/notebooks/Assignments/HW2"

In [5]:
# save path for use in Hadoop jobs (-cmdenv PATH={PATH})
from os import environ
PATH  = environ['PATH']

In [6]:
# data path
ENRON = "data/enronemail_1h.txt"

In [7]:
# make the HDFS directory if it doesn't already exist
!hdfs dfs -ls 
!hdfs dfs -mkdir {HDFS_DIR}

Found 4 items
drwxr-xr-x   - root hadoop          0 2022-01-22 23:35 .sparkStaging
drwxr-xr-x   - root hadoop          0 2022-01-30 00:39 HW2
drwxr-xr-x   - root hadoop          0 2022-01-19 03:17 demo2
drwxr-xr-x   - root hadoop          0 2022-01-19 22:23 demo3
mkdir: `/user/root/HW2': File exists


# Question 1: Hadoop MapReduce Key Takeaways.  

This assignment will be the only one in which you use Hadoop Streaming to implement a distributed algorithm. The key reason we continue to teach Hadoop streaming is because of the way it forces the programmer to think carefully about what is happening under the hood when you parallelize a calculation. This question will briefly highlight some of the most important concepts that you need to understand about Hadoop Streaming and MapReduce before we move on to Spark next week.   

### Q1 Tasks:

* __a) short response:__ What "programming paradigm" is Hadoop MapReduce based on? What are the main ideas of this programming paradigm and how does MapReduce exemplify these ideas?

* __b) short response:__ What is the Hadoop Shuffle? When does it happen? Why is it potentially costly? Describe one specific thing we can we do to mitigate the cost associated with this stage of our Hadoop Streaming jobs.

* __c) short response:__ In Hadoop Streaming why do the input and output record format of a combiner script have to be the same? [__`HINT`__ _what level of combining does the framework guarantee? what is the relationship between the record format your mapper emits and the format your reducer expects to receive?_]

* __d) short response:__ To what extent can you control the level of parallelization of your Hadoop Streaming jobs? Please be specific.

* __e) short response:__ What change in the kind of computing resources available prompted the creation of parallel computation frameworks like Hadoop? 

### Q1 Student Answers:

> __a)__ Hadoop MapReduce paradigm is based out of the following 3 concepts: (1) Functional Programming, (2) Mappers and Reducers, and (3) Execution Framework as discussed in Lin and Dyer book, chapter 2 MapReduce Basics.  

> __b)__ After the Map phase and before the beginning of the Reduce phase there is a handoff process, known as shuffle and sort. Shuffle and sort step performs the important tasks of routing, collating records with the same key, and transporting these records from the mapper is prepared and moved to the nodes where the reducer tasks are run. This step can become potentially expensive as the data is transported through the networking layer and also because same keys can be emitted several times by the mappers and if they were not consolidated will take up the network bandwidth unnecessarily. To improve overall efficiency, records from mapper output are sent to the physical node that a reducer will be running on as they are being produced - to avoid flooding the network when all mapper tasks are complete. Combiners can be used to consolidate the records with the same key to a minimum so that they dont flood the network. So, combiners that allow for local aggregation can serve as a optimization step in MapReduce. MapReduce does not use combiners by default.   

> __c)__  In any MapReduce program, the reducer input key-value type must match the mapper output key-value type: this implies that the combiner input and output key-value types must match the mapper output key-value type (which is the same as the reducer input key-value type). As mentioned above, combiner is an optional component so when we write reducers that are compatible to the combiner will break when the combiners were not used by the framework. So, it is necessary to match the input and output record format of the combiner.

> __d)__  Unlike with a traditional HPC job, the level of parallelism in a Hadoop job is adjustable. The number of map tasks is ultimately determined by the nature of your input data due to how HDFS distributes chunks of data to your mappers. The number of reducers is determined by the scale of the data and desired time to complete the processing. With Hadoop, we can "suggest" a number of mappers when we submit the job. Hadoop tries to honors the number of reduce jobs mentioned in the command line more strictly. The following are the parameters that we typically pass to the Hadoop at the command line to control the number of mappers and reducers: "-D mapred.map.tasks=4", "-D mapred.job.reducers=4".  

> __e)__ The manner in which the semiconductor industry had been exploiting Moore’s Law simply ran out of opportunities for improvement: faster clocks, deeper pipelines, superscalar architectures, and other tricks of the trade reached a point of diminishing returns that did not justify continued investment. As a result, the performance of new generated CPUs didn't dramatically increase as it did in the past. This marked the beginning of an entirely new strategy and the dawn of the multi-core era. In this backdrop, widespread need for data-intensive processing, flooding of commodity computing platforms and the plummeeting storage costs drove innovations in distributed computing such as MapReduce—first by Google, and then by Yahoo and the open source community. This in turn created more demand: when organizations learned about the availability of effective data analysis tools for large datasets, they began instrumenting various business processes to gather even more data—driven by the belief that more data leads to deeper insights and greater competitive advantages.

# Question 2: MapReduce Design Patterns.  

In the last two live sessions and in your readings from Lin & Dyer you encountered a number of techniques for manipulating the logistics of a MapReduce implementation to ensure that the right information is available at the right time and location. In this question we'll review a few of the key techniques you learned.   

### Q2 Tasks:

* __a) short response:__ What are counters (in the context of Hadoop Streaming)? How are they useful? What kinds of counters does Hadoop provide for you? How do you create your own custom counter?

* __b) short response:__ What are composite keys? How are they useful? How are they related to the idea of custom partitioning?

* __c) short response:__ What is the order inversion pattern? What problem does it help solve? How do we implement it? 

### Q2 Student Answers:

> __a)__ There are two types of Hadoop MapReduce Counters: (1) Built-in Counters, and (2) User-Defined Counters/Custom counters. Hadoop maintains some built-in Hadoop counters for every job and these report various metrics, like, there are counters for the number of bytes and records, which allow us to confirm that the expected amount of input is consumed and the expected amount of output is produced. For e.g. (1) MapReduce Task Counter, (2) FileSystem Counters, (3) FileInput Format Counter, (4) Job Counter to name a few. Hadoop also allows user code to define a set of counters, which are then incremented as desired in the mapper or reducer. These are called user-defined counters.

> __b)__ Composite keys are keys that are made up of two or more number of fields that can be used for sequencing or ordering records. By emitting appropriate values for these keys we can control the flow of data in the sort and shuffle part of the MapReduce framework. But if we used multiple reducers, a default partitioner would use the CompositeKey and would assign it to a reducer. With a custom partitioner, we will be able to control the flow by grouping (key1, key2, ...,value) tuples with  same “state” and send it to a same reducer. 
 

> __c)__ The order inversion pattern exploits the sorting phase of MapReduce to push data needed for calculations to the reducer ahead of the data that will be manipulated. Often, a reducer needs to compute an aggregate statistic on a set of elements before individual elements can be processed. Normally, this would require two passes over the data, but with the “order inversion” design pattern, the aggregate statistic can be computed in the reducer before the individual elements are encountered. We can add some special characters to the key (e.g. !, *, etc) such that it gets sorted ahead of other elements in the list.


# Question 3: Understanding Total Order Sort

The key challenge in distributed computing is to break a problem into a set of sub-problems that can be performed without communicating with each other. Ideally, we should be able to define an arbirtary number of splits and still get the right result, but that is not always possible. Parallelization becomes particularly challenging when we need to make comparisons between records, for example when sorting. Total Order Sort allows us to order large datasets in a way that enables efficient retrieval of results. Before beginning this assignment, make sure you have read and understand the [Total Order Sort Notebook](https://github.com/UCB-w261/main/tree/master/HelpfulResources/TotalSortGuide/_total-sort-guide-spark2.01-JAN27-2017.ipynb). You can skip the first two MRJob sections, but the rest of section III and all of section IV are **very** important (and apply to Hadoop Streaming) so make sure to read them closely. Feel free to read the Spark sections as well but you won't be responsible for that material until later in the course. To verify your understanding, answer the following questions.

### Q3 Tasks:

* __a) short response:__ What is the difference between a partial sort, an unordered total sort, and a total order sort? From the programmer's perspective, what does total order sort allow us to do that we can't with unordered total? Why is this important with large datasets?

* __b) short response:__ Which phase of a MapReduce job is leveraged to implement Total Order Sort? Which default behaviors must be changed. Why must they be changed?

* __c) short response:__ Describe in words how to configure a Hadoop Streaming job for the custom sorting and partitioning that is required for Total Order Sort.  

* __d) short response:__ Explain why we need to use an inverse hash code function.

* __e) short response:__ Where does this function need to be located so that a Total Order Sort can be performed?

### Q3 Student Answers: 

> __a)__  
<div style="color: black;">
The bulleted points below describe the partial sort, unordered total sort, and ordered total sort respectively. 
            <ol type="a">
                <li>In partial sort, keys are assigned to buckets without any ordering. Keys are sorted within each bucket.</li>
                <li>In unordered total sort, Keys are assigned to buckets according to their numeric value and buckets are not assigned in sorted order.</li>
                <li>In ordered total sort, Keys are assigned to buckets according to their numeric value and buckets are assigned in the sorted order.  </li>
            </ol>
In unordered total sort, buckets are not assigned in that order. In ordered total sort the buckets are as assigned in that order. So in the case of unordered total sort, we have to make sure to order the buckets after the reducer stage. This can be a big deal when the data is huge. We need to run a post processing job to accomplish this ordering. 

</div>

> __b)__ 
<div style="color: black;">
The following components have to be changed to implement the Total Order Sort: 
            <ol type="a">
                <li>The mapper will have to generate inverse hash map key to identify the partition.</li>
                <li>The final reducer should drop the partition key so that the output doesn't include the partition key.</li>
                <li>We will have to specify the partition and sorting key to customize the streaming job.</li>
            </ol>

</div>

> __c)__ 
<div style="color: black;">
As a first step, we customize the delimiter, number of keys. The following sample illustrate how to set the number of fields and tab as a delimiter:
            <ul type="a">
                <li>-D stream.num.map.output.key.fields=3</li>
                <li>-D streamm.map.output.field.separator="\t"</li>
            </ul>
We then customize the sorting key by sepecifying the primary and secondary keys, format of those keys (e.g. numeric or non-numeric), and the order of the sorting if needed (ascending or descending). The following example illustrates how to set the non-numeric first field as the primary key and the second numeric field as the secondary key to be used.
            <ul type="a">
                <li>-D mapreduce.partition.keycomparator.options=-"-k1,1 k2.2nr"</li>
            </ul>
We can then specify the partition key as follows.
            <ul type="a">
                <li>-D mapreduce.partition.keypartitioner.options=-"-k1,1</li>
            </ul>
We can also customize the number of reducers as follows. There are several comprehensive hadoop stream jobs in this notebook. The example in for problem 4(e) will serve as good example for hadoop streaming.
            <ul type="a">
                <li>mapreduce.job.reduces=3</li>
            </ul>
</div>

> __d)__ 
<div style="fcolor: black;"> Hadoop by default assigns the partition key to each record. Hadoop shuffle routes the data to the reducer using this partition key. If in case we need to modify the final output ordering we need to assign our own partition keys to get desired routing. Inverse hash code function/file is the technique with which we assign custom partition keys to control the way it is partitioned and routed. For e.g. partition keys like 'A', 'B', 'C' are sorted as 'B', 'C', 'A' by hadoop. If we want to customize this ordering it is imperitive to reverse engineer this hashing to accomplish our ordering. This approach of reverse engineering this ordering is called inverse hash coding.</div>  

> __e)__ 
<div style="color: black;"> So, in the case of total order sort, each bucket is sorted and buckets are expected to be in a specific order. We exploit the inverse hash code function to achieve this ordering. So, typically the inverse hash function is  located in the mapper so that the output records contain the partition key in them.</div>  

# About the Data
For the main task in this portion of the homework you will train a classifier to determine whether an email represents spam or not. You will train your Naive Bayes model on a 100 record subset of the Enron Spam/Ham corpus available in the HW2 data directory (__`HW2/data/enronemail_1h.txt`__).

__Source:__   
The original data included about 93,000 emails which were made public after the company's collapse. There have been a number raw and preprocessed versions of this corpus (including those available [here](http://www.aueb.gr/users/ion/data/enron-spam/index.html) and [here](http://www.aueb.gr/users/ion/publications.html)). The subset we will use is limited to emails from 6 Enron employees and a number of spam sources. It is part of [this data set](http://www.aueb.gr/users/ion/data/enron-spam/) which was created by researchers working on personlized Bayesian spam filters. Their original publication is [available here](http://www.aueb.gr/users/ion/docs/ceas2006_paper.pdf). __`IMPORTANT!`__ _For this homework please limit your analysis to the 100 email subset which we provide. No need to download or run your analysis on any of the original datasets, those links are merely provided as context._

__Preprocessing:__  
For their work, Metsis et al. (the authors) appeared to have pre-processed the data, not only collapsing all text to lower-case, but additionally separating "words" by spaces, where "words" unfortunately include punctuation. As a concrete example, the sentence:  
>  `Hey Jon, I hope you don't get lost out there this weekend!`  

... would have been reduced by Metsis et al. to the form:  
> `hey jon , i hope you don ' t get lost out there this weekend !` 

... so we have reverted the data back toward its original state, removing spaces so that our sample sentence would now look like:
> `hey jon, i hope you don't get lost out there this weekend!`  

Thus we have at least preserved contractions and other higher-order lexical forms. However, one must be aware that this reversion is not complete, and that some object (specifically web sites) will be ill-formatted, and that all text is still lower-cased.


__Format:__   
All messages are collated to a tab-delimited format:  

>    `ID \t SPAM \t SUBJECT \t CONTENT \n`  

where:  
>    `ID = string; unique message identifier`  
    `SPAM = binary; with 1 indicating a spam message`  
    `SUBJECT = string; title of the message`  
    `CONTENT = string; content of the message`   
    
Note that either of `SUBJECT` or `CONTENT` may be "NA", and that all tab (\t) and newline (\n) characters have been removed from both of the `SUBJECT` and `CONTENT` columns.  

In [14]:
!pwd

/media/notebooks/Assignments/HW2


In [15]:
# take a look at the first 100 characters of the first 5 records (RUN THIS CELL AS IS)
!head -n 5 /media/notebooks/Assignments/HW2/{ENRON} | cut -c-100

0001.1999-12-10.farmer	0	 christmas tree farm pictures	NA
0001.1999-12-10.kaminski	0	 re: rankings	 thank you.
0001.2000-01-17.beck	0	 leadership development pilot	" sally:  what timing, ask and you shall receiv
0001.2000-06-06.lokay	0	" key dates and impact of upcoming sap implementation over the next few week
0001.2001-02-07.kitchen	0	 key hr issues going forward	 a) year end reviews-report needs generating 


In [16]:
# see how many messages/lines are in the file 
#(this number may be off by 1 if the last line doesn't end with a newline)
!wc -l /media/notebooks/Assignments/HW2/{ENRON}

100 /media/notebooks/Assignments/HW2/data/enronemail_1h.txt


In [17]:
# make the HDFS directory if it doesn't already exist
!hdfs dfs -mkdir {HDFS_DIR}

mkdir: `/user/root/HW2': File exists


In [18]:
# load the data into HDFS (RUN THIS CELL AS IS)
!hdfs dfs -copyFromLocal /media/notebooks/Assignments/HW2/{ENRON} {HDFS_DIR}/enron.txt

copyFromLocal: `/user/root/HW2/enron.txt': File exists


In [19]:
!hdfs dfs -ls {HDFS_DIR}

Found 16 items
-rw-r--r--   1 root hadoop     254300 2022-01-30 00:39 /user/root/HW2/NBmodel.txt
drwxr-xr-x   - root hadoop          0 2022-01-29 23:10 /user/root/HW2/chinese-output
drwxr-xr-x   - root hadoop          0 2022-01-29 19:06 /user/root/HW2/chinese-output-smooth
-rw-r--r--   1 root hadoop        119 2022-01-23 00:41 /user/root/HW2/chineseTest.txt
-rw-r--r--   1 root hadoop        107 2022-01-23 00:41 /user/root/HW2/chineseTrain.txt
drwxr-xr-x   - root hadoop          0 2022-01-30 00:39 /user/root/HW2/custom-partition
drwxr-xr-x   - root hadoop          0 2022-01-29 18:12 /user/root/HW2/eda-output
drwxr-xr-x   - root hadoop          0 2022-01-29 18:11 /user/root/HW2/eda-sort-output
drwxr-xr-x   - root hadoop          0 2022-01-29 19:09 /user/root/HW2/enron-model
-rw-r--r--   1 root hadoop     204559 2022-01-21 02:30 /user/root/HW2/enron.txt
-rw-r--r--   1 root hadoop      41493 2022-01-28 02:47 /user/root/HW2/enron_test.txt
-rw-r--r--   1 root hadoop     163066 2022-01-28 02:

# Question 4:  Enron Ham/Spam EDA.
Before building our classifier, lets get aquainted with our data. In particular, we're interested in which words occur more in spam emails than in legitimate ("ham") emails. In this question you'll implement two Hadoop MapReduce jobs to count and sort word occurrences by document class. You'll also learn about two new Hadoop streaming parameters that will allow you to control how the records output from your mappers are partitioned for reducing on separate nodes. 

__`IMPORTANT NOTE:`__ For this question and all subsequent items, you should include both the subject and the body of the email in your analysis (i.e. concatetate them to get the 'text' of the document).

### Q4 Tasks:
* __a) code:__ Complete the missing components of the code in __`EnronEDA/mapper.py`__ and __`EnronEDA/reducer.py`__ to create a Hadoop  MapReduce job that counts how many times each word in the corpus occurs in an email for each class. Pay close attention to the data format specified in the docstrings of these scripts _-- there are a number of ways to accomplish this task, we've chosen this format to help illustrate a technique in `part e`_. Run the provided unit tests to confirm that your code works as expected, then run the provided Hadoop Streaming command to apply your analysis to the Enron data.


* __b) code + short response:__ How many times does the word "__assistance__" occur in each class? (`HINT:` Use a `grep` command to read from the results file you generated in '`a`' and then report the answer in the space provided.)


* __c) short response:__ Would it have been possible to add some sorting parameters to the Hadoop streaming command that would cause our `part a` results to be sorted by count? Explain why or why not. (`HINT:` This question demands an understanding of the sequence of the phases of MapReduce.)


* __d) code + short response:__ Write a second Hadoop MapReduce job to sort the output of `part a` first by class and then by count. Run your job and save the results to a local file. Then describe in words how you would go about printing the top 10 words in each class given this sorted output. (`HINT 1:` _remember that you can simply pass the `part a` output directory to the input field of this job; `HINT 2:` since this task is just reodering the records from `part a` we don't need to write a mapper or reducer, just use `/bin/cat` for both_)


* __e) code:__ A more efficient alternative to '`grep`-ing' for the top 10 words in each class would be to use the Hadoop framework to separate records from each class into its own partition so that we can just read the top lines in each. Rewrite your job from ` part d` to specify 2 reduce tasks and to tell Hadoop to partition based on the second field (which indicates spam/ham in our data). Your code should maintain the secondary sort -- that is each partition should list words from most to least frequent.

### Q4 Student Answers:
> __b)__ The word assistance appeared 8 times in the class "spam" and appeared 2 times in the class "ham"

> __c)__ No, it is not possible to sort the **part a** output by streaming the data in hadoop. The problem in **part a** is a word counting problem. As the final word count is not known until the reducer sums up all the partial counts. So, we need to route the word count records from this hadoop streaming job to another hadoop streaming process to sort the word count records by the count. 

> __d)__ This question has two parts to it. (1) Sorting the output from part a: The output from part a has 3 fields per record (word, class, count). We can then stream the output through Hadoop using unix /bin/cat and configuring the shuffle and sort layer to sort by the primary key the class and secondary key the count to accomplish the sorting. (Pleae look the code at the cell dedicated for this section below) (2) How to list top 10 items in each class: As we partitioned the data by the class. The reducer output from reducer 1 and reducer 2 will be in hdfs output directory in parttion 1 and partion 2 respectively. We can pipe the "!hdfs dfs -cat" to unix "head" to peek at the top 10 items from each partition sequentially.

In [20]:
# part a - do your work in the provided scripts then RUN THIS CELL AS IS
!chmod a+x EnronEDA/mapper.py
!chmod a+x EnronEDA/reducer.py

In [21]:
# part a - unit test EnronEDA/mapper.py (RUN THIS CELL AS IS)
!echo -e "d1	1	title	body\nd2	0	title	body" | EnronEDA/mapper.py

title	1	1
body	1	1
title	0	1
body	0	1


In [22]:
# part a - unit test EnronEDA/reducer.py (RUN THIS CELL AS IS)
!echo "one	1	1\none	0	1\none	0	1\ntwo	0	1" | EnronEDA/reducer.py

one	1	1
one	0	2
two	1	0
two	0	1


In [23]:
# part a - clear output directory in HDFS (RUN THIS CELL AS IS)
!hdfs dfs -rm -r {HDFS_DIR}/eda-output

Deleted /user/root/HW2/eda-output


In [24]:
# part a - Hadoop streaming job (RUN THIS CELL AS IS)
!hadoop jar {JAR_FILE} \
  -files EnronEDA/reducer.py,EnronEDA/mapper.py \
  -mapper mapper.py \
  -reducer reducer.py \
  -input {HDFS_DIR}/enron.txt \
  -output {HDFS_DIR}/eda-output \
  -numReduceTasks 2 \
  -cmdenv PATH={PATH}

packageJobJar: [] [/usr/lib/hadoop/hadoop-streaming-3.2.2.jar] /tmp/streamjob2860073544150334342.jar tmpDir=null
2022-01-30 23:30:43,191 INFO client.RMProxy: Connecting to ResourceManager at w261-m/10.128.0.2:8032
2022-01-30 23:30:43,444 INFO client.AHSProxy: Connecting to Application History server at w261-m/10.128.0.2:10200
2022-01-30 23:30:43,903 INFO client.RMProxy: Connecting to ResourceManager at w261-m/10.128.0.2:8032
2022-01-30 23:30:43,904 INFO client.AHSProxy: Connecting to Application History server at w261-m/10.128.0.2:10200
2022-01-30 23:30:44,096 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1643556265243_0001
2022-01-30 23:30:44,989 INFO mapred.FileInputFormat: Total input files to process : 1
2022-01-30 23:30:45,050 INFO mapreduce.JobSubmitter: number of splits:9
2022-01-30 23:30:45,195 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1643556265243_0001
2022-01-30 23:30:45,197 INFO mapreduce.

In [25]:
# part a - retrieve results from HDFS & copy them into a local file (RUN THIS CELL AS IS)
!hdfs dfs -cat {HDFS_DIR}/eda-output/part-0000* > EnronEDA/results.txt

In [26]:
# part b - write your grep command here
!hdfs dfs -cat {HDFS_DIR}/eda-output/part-0000* | grep 'assistance'

assistance	1	8
assistance	0	2


In [28]:
# part d - clear the output directory in HDFS (RUN THIS CELL AS IS)
!hdfs dfs -rm -r {HDFS_DIR}/eda-sort-output

rm: `/user/root/HW2//eda-sort-output': No such file or directory


In [29]:
# part d - write your Hadoop streaming job here
!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=3 \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k2,2nr -k3,3nr" \
  -mapper /bin/cat \
  -reducer /bin/cat \
  -input {HDFS_DIR}/eda-output \
  -output {HDFS_DIR}/eda-sort-output \
  -cmdenv PATH={PATH}\
  -numReduceTasks 3

packageJobJar: [] [/usr/lib/hadoop/hadoop-streaming-3.2.2.jar] /tmp/streamjob2457006673226946770.jar tmpDir=null
2022-01-30 23:34:24,183 INFO client.RMProxy: Connecting to ResourceManager at w261-m/10.128.0.2:8032
2022-01-30 23:34:24,392 INFO client.AHSProxy: Connecting to Application History server at w261-m/10.128.0.2:10200
2022-01-30 23:34:24,931 INFO client.RMProxy: Connecting to ResourceManager at w261-m/10.128.0.2:8032
2022-01-30 23:34:24,931 INFO client.AHSProxy: Connecting to Application History server at w261-m/10.128.0.2:10200
2022-01-30 23:34:25,278 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1643556265243_0002
2022-01-30 23:34:25,921 INFO mapred.FileInputFormat: Total input files to process : 2
2022-01-30 23:34:26,373 INFO mapreduce.JobSubmitter: number of splits:10
2022-01-30 23:34:26,507 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1643556265243_0002
2022-01-30 23:34:26,508 INFO mapreduce

In [30]:
# part e - clear the output directory in HDFS (RUN THIS CELL AS IS)
!hdfs dfs -rm -r {HDFS_DIR}/eda-sort-output

Deleted /user/root/HW2/eda-sort-output


In [31]:
# part e - write your Hadoop streaming job here
!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=3 \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k3,3nr" \
  -D mapreduce.partition.keypartitioner.options="-k2,2"  \
  -mapper /bin/cat \
  -reducer /bin/cat \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
  -input {HDFS_DIR}/eda-output \
  -output {HDFS_DIR}/eda-sort-output \
  -cmdenv PATH={PATH}\
  -numReduceTasks 3

packageJobJar: [] [/usr/lib/hadoop/hadoop-streaming-3.2.2.jar] /tmp/streamjob1220247576543057211.jar tmpDir=null
2022-01-30 23:35:17,199 INFO client.RMProxy: Connecting to ResourceManager at w261-m/10.128.0.2:8032
2022-01-30 23:35:17,408 INFO client.AHSProxy: Connecting to Application History server at w261-m/10.128.0.2:10200
2022-01-30 23:35:17,954 INFO client.RMProxy: Connecting to ResourceManager at w261-m/10.128.0.2:8032
2022-01-30 23:35:17,955 INFO client.AHSProxy: Connecting to Application History server at w261-m/10.128.0.2:10200
2022-01-30 23:35:18,316 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1643556265243_0003
2022-01-30 23:35:18,604 INFO mapred.FileInputFormat: Total input files to process : 2
2022-01-30 23:35:18,654 INFO mapreduce.JobSubmitter: number of splits:10
2022-01-30 23:35:18,794 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1643556265243_0003
2022-01-30 23:35:18,796 INFO mapreduce

In [32]:
# part e - view the top 10 records from each partition (RUN THIS CELL AS IS)
for idx in range(2):
    print(f"\n===== part-0000{idx}=====\n")
    !hdfs dfs -cat {HDFS_DIR}/eda-sort-output/part-0000{idx} | head


===== part-00000=====

the	0	549	
to	0	398	
ect	0	382	
and	0	278	
of	0	230	
hou	0	206	
a	0	196	
in	0	182	
for	0	170	
on	0	135	
cat: Unable to write to output stream.

===== part-00001=====

the	1	698	
to	1	566	
and	1	392	
your	1	357	
a	1	347	
you	1	345	
of	1	336	
in	1	236	
for	1	204	
com	1	153	
cat: Unable to write to output stream.


__Expected output:__
<table>
<th>part-00000:</th>
<th>part-00001:</th>
<tr><td><pre>
the	0	549	
to	0	398	
ect	0	382	
and	0	278	
of	0	230	
hou	0	206	
a	0	196	
in	0	182	
for	0	170	
on	0	135
</pre></td>
<td><pre>
the	1	698	
to	1	566	
and	1	392	
your	1	357	
a	1	347	
you	1	345	
of	1	336	
in	1	236	
for	1	204	
com	1	153
</pre></td></tr>
</table>

# Question 5: Counters and Combiners.
Tuning the number of mappers & reducers is helpful to optimize very large distributed computations. Doing so successfully requires a thorough understanding of the data size at each stage of the job. As you learned in the week3 live session, counters are an invaluable resource for understanding this kind of detail. In this question, we will take the EDA performed in Question 4 as an opportunity to illustrate some related concepts.

### Q5 Tasks:
* __a) short response:__ Read the Hadoop output from your job in Question 4a to report how many records are emitted by the mappers and how many records are received be the reducers. In the context of word counting what does this number represent practically?

* __b) code:__ Note that we wrote the reducer in question 4a such that the input and output record format is identical. This makes it easy to use the same reducer script as a combiner. In the space provided below, write the Hadoop Streaming command to re-run your job from question 4a with this combining added.

* __c) short response__: Report the number of records emitted by your mappers in part b and the number of records received by your reducers. Compare your results here to what you saw in part a. Explain.

* __d) short response__: Describe a scenario where using a combiner would _NOT_ improve the efficiency of the shuffle stage. Explain. [__`BONUS:`__ how does increasing the number of mappers affect the usefulness of a combiner?]

### Q5 Student Answers:
> __a)__  Mappers emitted 31490 records and reducers received  the same 31490 records. As we partioned the records by class and have specified to run with 2 reducers, all records that belonged to class 0 would have reached the reducer 0 and records that belonged to class 1 would have reached the reducer 1 and they added to 31490 records. No combiners were used to consolidate the records before sending it to the reducer.

> __c)__  Mappers emitted 31490 records and reducers received  only 20576 records. As we used the reducer as the combiner, the combiner consolidated the records before sending it to the reducer. So, we have only 20576 records reaching the reducer. This number matched with the number of records output by the combiner. For example, lets assume the word 'for', the counts from mapper 1 and mapper 2 for class '1' would have gone to the combiner 1 and would have collapsed into one record before reaching the reducer and similarly the word count for the word for  class '0' would have collapsed into one record before reaching the reducer.

> __d)__  In our previous problem we used our combiner to be a EnronEDA/reducer.py. The reducer.py collapses records with the same key into one by adding their counts. This reduced the number of (word, count) records that was passing through the shuffle. If we replaced this combiner with a simple /bin/cat command in LINUX it will serve as a pass through and so will not reduce any traffic on the shuffle and hence will NOT improve the efficiency.  **BONUS:** Lets go back to our earlier problem of using EnronEDA/reducer.py as both combiner and reducer. When we increase the number of mappers by two times from $m$ to $2*m$ then we will have approximately two times more (word, count) records leaving the mapping layer. However, assuming twice as many combiners $2*c$ are being used to reduce the (word, count) records it may not reduce the number of records through the shuffle to a level when we had $m$ mappers and $c$ combiners.

In [35]:
# part b - clear output directory in HDFS (RUN THIS CELL AS IS)
!hdfs dfs -rm -r {HDFS_DIR}/eda-output

Deleted /user/root/HW2/eda-output


In [36]:
# part b - write your Hadoop streaming job here
!hadoop jar {JAR_FILE} \
  -files EnronEDA/reducer.py,EnronEDA/mapper.py \
  -mapper mapper.py \
  -combiner reducer.py \
  -reducer reducer.py \
  -input {HDFS_DIR}/enron.txt \
  -output {HDFS_DIR}/eda-output \
  -numReduceTasks 2 \
  -cmdenv PATH={PATH}

packageJobJar: [] [/usr/lib/hadoop/hadoop-streaming-3.2.2.jar] /tmp/streamjob1945276239039924653.jar tmpDir=null
2022-01-30 23:44:47,430 INFO client.RMProxy: Connecting to ResourceManager at w261-m/10.128.0.2:8032
2022-01-30 23:44:47,676 INFO client.AHSProxy: Connecting to Application History server at w261-m/10.128.0.2:10200
2022-01-30 23:44:48,124 INFO client.RMProxy: Connecting to ResourceManager at w261-m/10.128.0.2:8032
2022-01-30 23:44:48,124 INFO client.AHSProxy: Connecting to Application History server at w261-m/10.128.0.2:10200
2022-01-30 23:44:48,297 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1643556265243_0005
2022-01-30 23:44:48,997 INFO mapred.FileInputFormat: Total input files to process : 1
2022-01-30 23:44:49,058 INFO mapreduce.JobSubmitter: number of splits:9
2022-01-30 23:44:49,196 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1643556265243_0005
2022-01-30 23:44:49,198 INFO mapreduce.

# Question 6: Document Classification Task Overview.
The week 2 assigned reading from Chapter 13 of _Introduction to Information Retrieval_ by Manning, Raghavan and Schutze provides a thorough introduction to the document classification task and the math behind Naive Bayes. In this question we'll use the example from Table 13.1 (reproduced below) to 'train' an unsmoothed Multinomial Naive Bayes model and classify a test document by hand.

<table>
<th>DocID</th>
<th>Class</th>
<th>Subject</th>
<th>Body</th>
<tr><td>Doc1</td><td>1</td><td></td><td>Chinese Beijing Chinese</td></tr>
<tr><td>Doc2</td><td>1</td><td></td><td>Chinese Chinese Shanghai</td></tr>
<tr><td>Doc3</td><td>1</td><td></td><td>Chinese Macao</td></tr>
<tr><td>Doc4</td><td>0</td><td></td><td>Tokyo Japan Chinese</td></tr>
</table>

### Q6 Tasks:
* __a) short response:__ Equation 13.3 in Manning, Raghavan and Shutze shows how a Multinomial Naive Bayes model classifies a document. It predicts the class, $c$, for which the estimated conditional probability of the class given the document's contents,  $\hat{P}(c|d)$, is greatest. In this equation what two pieces of information are required to calculate  $\hat{P}(c|d)$? Your answer should include both mathematical notatation and verbal explanation.


* __b) short response:__ The Enron data includes two classes of documents: `spam` and `ham` (they're actually labeled `1` and `0`). In plain English, explain what  $\hat{P}(c)$ and   $\hat{P}(t_{k} | c)$ mean in the context of this data. How will we would estimate these values from a training corpus? How many passes over the data would we need to make to retrieve this information for all classes and all words?


* __c) hand calculations:__ Above we've reproduced the document classification example from the textbook (we added an empty subject field to mimic the Enron data format). Remember that the classes in this "Chinese Example" are `1` (about China) and `0` (not about China). Calculate the class priors and the conditional probabilities for an __unsmoothed__ Multinomial Naive Bayes model trained on this data. Show the calculations that lead to your result using markdown and $\LaTeX$ in the space provided or by embedding an image of your hand written work. [`NOTE:` _Your results should NOT match those in the text -- they are training a model with +1 smoothing you are training a model without smoothing_]


* __d) hand calculations:__ Use the model you trained to classify the following test document: `Chinese Chinese Chinese Tokyo Japan`. Show the calculations that lead to your result using markdown and   $\LaTeX$ in the space provided or by embedding an image of your hand written work.


* __e) short response:__ Compare the classification you get from this unsmoothed model in `d`/`e` to the results in the textbook's "Example 1" which reflects a model with Laplace plus 1 smoothing. How does smoothing affect our inference?

### Q6 Student Answers:
> __a)__ The estimated conidtional probability of the class given the document's content is given as follows: $$argmax_{c \epsilon C}\hat{P(c|d)} = argmax_{c \epsilon C}\hat{P(c)} \prod_{1 \le k \le n_d } \hat{P(t_k|c)} $$ where, $c, d, t_k, n_d$ stand for class, document, $k^{th}$ token of the vocabulary we used for the classification, and number of documents respectively. So, basically we need the probability of class $P(c)$ and probability of each token in the vocabulary given the class $\hat{P(t_k|c)}$ .

> __b)__ $\hat{P(c)}$ in the above equation refer to $\hat{P(ham)}$ and $\hat{P(spam)}$. $\hat{P(ham)}$ can be calculated by the ratio of number of hams and the total number of mails. Similarly, the $\hat{P(spam)}$ is cacluated by the ratio of number of spams and the total number of mails. The probability of a token $t_k$ given the class $\hat{P(t_k|c)}$ is calcuated by the ratio of joint probability of $P(t_k, c)$ and $P(c)$. The joint probability $P(t_k, c)$ is calcuated by dividing the number of occurences of the token and total tokens in the class $c$.

> __c)__ Prior probability for spam is calculated as follows: $$\hat{P(spam)} = \frac{N_c}{N} = \frac{3}{4} = 0.75$$ where $N_c$ refers to the number of documents labeled as class $c$ and $N$ is the total number of documents. Similarly the prior probability for ham is calculated as follows:  $$\hat{P(ham)} = \frac{1}{4} = 0.25$$ The Conditional probablities are calculated as follows: $$\hat{P(Chinese|spam)} = \frac{5}{8}$$
$$\hat{P(Tokyo|spam)} = \hat{P(Japan|spam)} = \frac{0}{8}$$
$$\hat{P(Chinese|ham)} = \frac{1}{3}$$
$$\hat{P(Tokyo|ham)} = \hat{P(Japan|ham)} = \frac{1}{3}$$

> __d)__  For a given doument $D_5$: $$P(spam|D_5) = \hat{P(spam)}\cdot\hat{P(Chinese|spam)}^3\cdot\hat{P(Tokyo|spam)}\cdot \hat{P(Japan|spam)} = (3/4)\cdot(5/8)^3\cdot0\cdot0 = 0$$ and similarly, $$P(ham|D_5) = \hat{P(ham)}\cdot\hat{P(Chinese|ham)}^3\cdot\hat{P(Tokyo|ham)}\cdot \hat{P(Japan|ham)} = (1/3)\cdot(1/3)\cdot(1/3)$$.

> __e)__ As $P(spam|D_5)$ < $P(0|D_5)$, our unsmoothed model favours class ham and the smoothing model rightly favors the class spam over class ham.


In [1]:
# part d/e - if you didn't write out your calcuations above, embed a picture of them here:
from IPython.display import Image
#Image(filename="path-to-hand-calulations-image.png")

# Question 7: Naive Bayes Inference.
In the next two questions you'll write code to parallelize the Naive Bayes calculations that you performed above. We'll do this in two phases: one MapReduce job to perform training and a second MapReduce to perform inference. While in practice we'd need to train a model before we can use it to classify documents, for learning purposes we're going to develop our code in the opposite order. By first focusing on the pieces of information/format we need to perform the classification (inference) task you should find it easier to develop a solid implementation for training phase when you get to question 8 below. In both of these questions we'll continue to use the Chinese example corpus from the textbook to help us test our MapReduce code as we develop it. Below we've reproduced the corpus, test set and model in text format that matches the Enron data.

### Q7 Tasks:
* __a) short response:__ run the provided cells to create the example files and load them in to HDFS. Then take a closer look at __`NBmodel.txt`__. This text file represents a Naive Bayes model trained (with Laplace +1 smoothing) on the example corpus. What are the 'keys' and 'values' in this file? Which record means something slightly different than the rest? The value field of each record includes two numbers which will be helpful for debugging but which we don't actually need to perform inference -- what are they? [`HINT`: _This file represents the model from Example 13.1 in the textbook, if you're having trouble getting oriented try comparing our file to the numbers in that example._]


* __b) short response:__ When performing Naive Bayes in practice instead of multiplying the probabilities (as in equation 13.3) we add their logs (as in equation 13.4). Why do we choose to work with log probabilities? If we had an unsmoothed model, what potential error could arise from this transformation?


* __c) short response:__ Documents 6 and 8 in the test set include a word that did not appear in the training corpus (and as a result does not appear in the model). What should we do at inference time when we need a class conditional probability for this word?


* __d) short response:__ The goal of our MapReduce job is to stream over the test set and classify each document by peforming the calculation from equation 13.4. To do this we'll load the model file (which contains the probabilities for equation 13.4) into memory on the nodes where we do our mapping. This is called an in-memory join. Does loading a model 'state' like this depart from the functional programming principles? Explain why or why not. From a scability perspective when would this kind of memory use be justified? when would it be unwise?


* __e) code:__ Complete the code in __`NaiveBayes/classify_mapper.py`__. Read the docstring carefully to understand how this script should work and the format it should return. Run the provided unit tests to confirm that your script works as expected then write a Hadoop streaming job to classify the Chinese example test set. [`HINT 1:` _you shouldn't need a reducer for this one._ `HINT 2:` _Don't forget to add the model file to the_ `-files` _parameter in your Hadoop streaming job so that it gets shipped to the mapper nodes where it will be accessed by your script._]


* __f) short response:__ In our test example and in the Enron data set we have fairly short documents. Since these fit fine in memory on a mapper node we didn't need a reducer and could just do all of our calculations in the mapper. However with much longer documents (eg. books) we might want a higher level of parallelization -- for example we might want to process parts of a document on different nodes. In this hypothetical scenario how would our algorithm design change? What could the mappers still do? What key-value structure would they emit? What would the reducers have to do as a last step?

### Q7 Student Answers:
> __a)__  NBModel.txt file, the ClassPriors record is different from other records. Other records display 4 numbers The first two show the word count of the word in the corresponding document category (0 or 1). The next two numbers refer to the smoothened probabilities of the word given the classes 0 or 1 respectively. The First two numbers are mainly for debugging

> __b)__  When performing Naive Bayes we end up  multiplying many conditional probabilities. This may result is floating point underflow. To avoid this underflow problem, it is a standard practice to take logarithms of these probabilties and add them instead. However, when we use this technique with unsmoothened models it may lead to floating point error as we may end up taking logorithms of zero numbers.

> __c)__ Ignore the word. This means that if we use the logprobability like we have in our model then assign the logprobability value of zero for the word.

> __d)__ One of the core principle of functional programming is statelessness of a function. Loading a model state from disk or other external source and there by computing conditional probability is a deviation from this core principle. However, this approach has an inherent advantage of short processing time and low memory usage in some cases. Once a trained and tested model is loaded in memory it is ready for use in production. The memory footprint of the model is far smaller than the data to train and test the model and we don't need to worry about the data anymore. For e.g. models like regression, SVM, etc. the models are far smaller than data itself and it is wise to use this approach. In the case of KNN and NB models the models themselves are large. In case of our NBmodel.txt, we still have to load the conditional probabilities of each word in the vocabulary which takes substantial memory. It will be unwise to use this approach.

> __e)__ Complete the coding portion of this question before answering 'f'.

> __f)__ In case of longer documents (eg. books) the document would be stored in the hadoop file system as chunks. While it does not make sense to classify a book to be spam or ham. We can assume it to be a book classification problem. When we process this document or book using a mapper that structurally looks like a classify_mapper for e.g. we would be processing different part of the same document in multiple mappers and each mapper will output a record containing doc_id, class, log_prior_ham, log_prior_spam, log_cond_prob_ham_partial, log_cond_prob_spam_partial, pred_class. Please note this record emits log_prior separately as it has to be added in the end after adding all partial conditional probabilites in the reducer. Also note doc_id will used as the partition key so that all partial conditional probabilites from multiple mappers go to the same reducer to get aggregated and finally the prior to calculate the predicted class. So, basically our hadoop streaming process would have multiple mappers each mapper processing a part of the book/document and all mappers will output a record as listed here for each chunk. These records for the same doc_id will all go to one reducer to get aggregated for classification.


Run these cells to create the example corpus and model.

In [37]:
%%writefile NaiveBayes/chineseTrain.txt
D1	1		Chinese Beijing Chinese
D2	1		Chinese Chinese Shanghai
D3	1		Chinese Macao
D4	0		Tokyo Japan Chinese

Overwriting NaiveBayes/chineseTrain.txt


In [38]:
%%writefile NaiveBayes/chineseTest.txt
D5	1		Chinese Chinese Chinese Tokyo Japan
D6	1		Beijing Shanghai Trade
D7	0		Japan Macao Tokyo
D8	0		Tokyo Japan Trade

Overwriting NaiveBayes/chineseTest.txt


In [39]:
%%writefile NBmodel.txt
beijing	0.0,1.0,0.111111111111,0.142857142857
chinese	1.0,5.0,0.222222222222,0.428571428571
tokyo	1.0,0.0,0.222222222222,0.0714285714286
shanghai	0.0,1.0,0.111111111111,0.142857142857
ClassPriors	1.0,3.0,0.25,0.75
japan	1.0,0.0,0.222222222222,0.0714285714286
macao	0.0,1.0,0.111111111111,0.142857142857

Overwriting NBmodel.txt


In [40]:
# load the data files into HDFS
!hdfs dfs -copyFromLocal NaiveBayes/chineseTrain.txt {HDFS_DIR}
!hdfs dfs -copyFromLocal NaiveBayes/chineseTest.txt {HDFS_DIR}

copyFromLocal: `/user/root/HW2/chineseTrain.txt': File exists
copyFromLocal: `/user/root/HW2/chineseTest.txt': File exists


Your work for `part e` starts here:

In [41]:
# part e - do your work in NaiveBayes/classify_mapper.py first, then run this cell.
!chmod a+x NaiveBayes/classify_mapper.py

In [42]:
!cat NaiveBayes/classify_mapper.py

#!/usr/bin/env python
"""
Mapper for Naive Bayes Inference.
INPUT:
    ID \t true_class \t subject \t body \n
OUTPUT:
    ID \t true_class \t logP(ham|doc) \t logP(spam|doc) \t predicted_class
SUPPLEMENTAL FILE: 
    This script requires a trained Naive Bayes model stored 
    as NBmodel.txt in the current directory. The model should 
    be a tab separated file whose records look like:
        WORD \t ham_count,spam_count,P(word|ham),P(word|spam)
        
Instructions:
    We have loaded the supplemental file and taken the log of 
    each conditional probability in the model. We also provide
    the code to tokenize the input lines for you. Keep in mind 
    that each 'line' of this file represents a unique document 
    that we wish to classify. Fill in the missing code to get
    the probability of each class given the words in the document.
    Remember that you will need to handle the case where you
    encounter a word that is not represented in the model.
"""
import os
import r

In [43]:
# part e - unit test NaiveBayes/classify_mapper.py (RUN THIS CELL AS IS)
!cat NaiveBayes/chineseTest.txt | NaiveBayes/classify_mapper.py | column -t

d5  1  -8.90668134500626   -8.10769031284611   1
d6  1  -5.780743515794329  -4.179502370564408  1
d7  0  -6.591673732011658  -7.511706880737812  0
d8  0  -4.394449154674438  -5.565796731681498  0


In [44]:
# part e - clear the output directory in HDFS (RUN THIS CELL AS IS)
!hdfs dfs -rm -r {HDFS_DIR}/chinese-output

Deleted /user/root/HW2/chinese-output


In [45]:
# part e - write your Hadooop streaming job here
!hadoop jar {JAR_FILE} \
  -files NaiveBayes/classify_mapper.py,NBmodel.txt\
  -mapper classify_mapper.py \
  -input {HDFS_DIR}/chineseTest.txt \
  -output {HDFS_DIR}/chinese-output \
  -cmdenv PATH={PATH} \
  -numReduceTasks 1

packageJobJar: [] [/usr/lib/hadoop/hadoop-streaming-3.2.2.jar] /tmp/streamjob1830671026684640071.jar tmpDir=null
2022-01-30 23:58:01,400 INFO client.RMProxy: Connecting to ResourceManager at w261-m/10.128.0.2:8032
2022-01-30 23:58:01,654 INFO client.AHSProxy: Connecting to Application History server at w261-m/10.128.0.2:10200
2022-01-30 23:58:02,119 INFO client.RMProxy: Connecting to ResourceManager at w261-m/10.128.0.2:8032
2022-01-30 23:58:02,119 INFO client.AHSProxy: Connecting to Application History server at w261-m/10.128.0.2:10200
2022-01-30 23:58:02,290 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1643556265243_0006
2022-01-30 23:58:02,624 INFO mapred.FileInputFormat: Total input files to process : 1
2022-01-30 23:58:02,683 INFO mapreduce.JobSubmitter: number of splits:10
2022-01-30 23:58:02,843 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1643556265243_0006
2022-01-30 23:58:02,845 INFO mapreduce

In [46]:
# part e - retrieve test set results from HDFS (RUN THIS CELL AS IS)
!hdfs dfs -cat {HDFS_DIR}/chinese-output/part-000* > NaiveBayes/chineseResults.txt

In [47]:
# part e - take a look (RUN THIS CELL AS IS)
!cat NaiveBayes/chineseResults.txt | column -t

d5  1  -8.90668134500626   -8.10769031284611   1
d6  1  -5.780743515794329  -4.179502370564408  1
d7  0  -6.591673732011658  -7.511706880737812  0
d8  0  -4.394449154674438  -5.565796731681498  0


<table>
<th> Expected output for the test set:</th>
<tr align=Left><td><pre>
d5	1	-8.90668134	-8.10769031	1
d6	1	-5.78074351	-4.17950237	1
d7	0	-6.59167373	-7.51170688	0
d8	0	-4.39444915	-5.56579673	0
</pre></td><tr>
</table>

# Question 8: Naive Bayes Training.
In Question 7 we used a model that we had trained by hand. Next we'll develop the code to do that same training in parallel, making it suitable for use with larger corpora (like the Enron emails). The end result of the MapReduce job you write in this question should be a model text file that looks just like the example (`NBmodel.txt`) that we created by hand above.

To refresh your memory about the training process take a look at  `6a` and `6b` where you described the pieces of information you'll need to collect in order to encode a Multinomial Naive Bayes model. We now want to retrieve those pieces of information while streaming over a corpus. The bulk of the task will be very similar to the word counting excercises you've already done but you may want to consider a slightly different key-value record structure to efficiently tally counts for each class. 

The most challenging (interesting?) design question will be how to retrieve the totals (# of documents and # of words in documents for each class). Of course, counting these numbers is easy. The hard part is the timing: you'll need to make sure you have the counts totalled up _before_ you start estimating the class conditional probabilities for each word. It would be best (i.e. most scalable) if we could find a way to do this tallying without storing the whole vocabulary in memory... Use an appropriate MapReduce design pattern to implement this efficiently! 


### Q8 Tasks:
* __a) make a plan:__  Fill in the docstrings for __`NaiveBayes/train_mapper.py`__ and __`NaiveBayes/train_reducer.py`__ to appropriately reflect the format that each script will input/output. [`HINT:` _the input files_ (`enronemail_1h.txt` & `chineseTrain.txt`) _have a prespecified format and your output file should match_ `NBmodel.txt` _so you really only have to decide on an internal format for Hadoop_].


* __b) implement it:__ Complete the code in __`NaiveBayes/train_mapper.py`__ and __`NaiveBayes/train_reducer.py`__ so that together they train a Multinomial Naive Bayes model __with no smoothing__. Make sure your end result is formatted correctly (see note above). Test your scripts independently and together (using `chineseTrain.txt` or test input of your own devising). When you are satisfied with your Python code design and run a Hadoop streaming command to run your job in parallel on the __chineseTrain.txt__. Confirm that your trained model matches your hand calculations from Question 6.


* __c) short response:__ We saw in Question 6 that adding Laplace smoothing (where the smoothing parameter $k=1$) makes our classifications less sensitve to rare words. However implementing this technique requires access to one additional piece of information that we had not previously used in our Naive Bayes training. What is that extra piece of information? [`HINT:` see equation 13.7 in Manning, Raghavan and Schutze].


* __d) short response:__ There are a couple of approaches that we could take to handle the extra piece of information you identified in `c`: 1) if we knew this extra information beforehand, we could provide it to our reducer as a configurable parameter for the vocab size dynamically (_where would we get it in the first place?_). Or 2) we could compute it in the reducer without storing any bulky information in memory but then we'd need some postprocessing or a second MapReduce job to complete the calculation (_why?_). Breifly explain what is non-ideal about each of these options. 


* __e) code + short response:__ Choose one of the 2 options above. State your choice & reasoning in the space below then use that strategy to complete the code in __`NaiveBayes/train_reducer_smooth.py`__. Test this alternate reducer then write and run a Hadoop streaming job to train an MNB model with smoothing on the Chinese example. Your results should match the model that we provided for you above (and the calculations in the textbook example). __IMPORTANT NOTE:__ For full credit on this question, your code must work with multiple reducers. 

    - [`HINT:` You will need to implement custom partitioning - [Total Order Sort Notebook](https://github.com/UCB-w261/main/tree/master/HelpfulResources/TotalSortGuide/_total-sort-guide-spark2.01-JAN27-2017.ipynb)] 

    - [`HINT:` Don't start from scratch with this one -- you can just copy over your reducer code from part `b` and make the needed modifications]. 



__IMPORTANT NOTE:__ For full credit on this question, your code must work with multiple reducers. [`HINT:`_You will need to implement custom partitioning - [Total Order Sort Notebook](https://github.com/UCB-w261/main/tree/master/HelpfulResources/TotalSortGuide/_total-sort-guide-spark2.01-JAN27-2017.ipynb)]


### Q8 Student Answers:
> __ c)__ Suppose we have a multinomial variable with sample counts $c_1, c_2,...c_n$ where $d$ is the vocabulary size. A Laplacian smoothed version of estimated probabilities has the form: $(c_i+k)/(N+d.k)$ , where $k$ is a given positive integer and d the size of the vocabulary. Typically, $k$ is set to 1 to smooth the estimator. So, we need $k$, the input parameter that is provided and $d$ the size of the vocabulary.

> __ d)__ As noted in previous question we need the size of the vocabulary to perform Laplacian smoothing when calcuating the conditional probability. As mentioned above, **Option 1:** We could post process the NBmodel.txt file from the unsmoothed hadoop process to calculate the vocabulary size. As this file has the all the words in the vocabulary except for the PostPrior entries which can be ignored and set the vocabulary size as the environment variable or pass the value as command line argument to the hadoop process when running the smoothened hadoop run or pass the file as we can pass the NBmode.txt as a file arugment and than can be parsed at the reducer phase of the smoothened hadoop run. When parsing the file we can count the words as we read in and use it in the probabaility calcuation.  OR **Option 2:** Were we can stream the ouput from first Hadoop streaming job to a second hadoop streaming job which will calcuate the vocabulary count as send it to the mapper using an inverted index pattern. With option 1, when we fail to set the vacabulary size the caculations can be wrong or could potentially break the processing. Another downside would be is that the NBmode.txt file or the post-processed vocabulary count could be stale and as a result the model could be in accurate. In option 2, however we will have to run two hadoop jobs have to be run back-to-back.

> __ e)__ We picked option 1, as it is the simplest and least error prone of both the approaches and the pattern is very similar to what we employed in problem 7(e).


In [50]:
# part a - do your work in train_mapper.py and train_reducer.py then RUN THIS CELL AS IS
!chmod a+x NaiveBayes/train_mapper.py
!chmod a+x NaiveBayes/train_reducer.py
!echo "=========== MAPPER DOCSTRING ============"
!head -n 8 NaiveBayes/train_mapper.py | tail -n 6
!echo "=========== REDUCER DOCSTRING ============"
!head -n 8 NaiveBayes/train_reducer.py | tail -n 6

Mapper reads in text documents and emits word counts by class.
INPUT:                                                    
    DocID \t true_class \t subject \t body                
OUTPUT:                                                   
    partitionKey \t word \t class0_partialCount,class1_partialCount       
    
Reducer aggregates word counts by class and emits frequencies.
INPUT:                                                    
    partitionKey \t word \t class0_partialCount \t class1_partialCount                
OUTPUT:
    word \t class0_count \t class1_count \t class0_conditional_prob \t class1_conditional_prob 
    


In [51]:
!cat NaiveBayes/train_mapper.py

#!/usr/bin/env python
"""
Mapper reads in text documents and emits word counts by class.
INPUT:                                                    
    DocID \t true_class \t subject \t body                
OUTPUT:                                                   
    partitionKey \t word \t class0_partialCount,class1_partialCount       
    

Instructions:
    You know what this script should do, go for it!
    (As a favor to the graders, please comment your code clearly!)
    
    A few reminders:
    1) To make sure your results match ours please be sure
       to use the same tokenizing that we have provided in
       all the other jobs:
         words = re.findall(r'[a-z]+', text-to-tokenize.lower())
         
    2) Don't forget to handle the various "totals" that you need
       for your conditional probabilities and class priors.
       
Partitioning:
    In order to send the totals to each reducer, we need to implement
    a custom partitioning strategy.
    
    We will gene

In [52]:
!cat NaiveBayes/train_reducer.py

#!/usr/bin/env python
"""
Reducer aggregates word counts by class and emits frequencies.
INPUT:                                                    
    partitionKey \t word \t class0_partialCount \t class1_partialCount                
OUTPUT:
    word \t class0_count \t class1_count \t class0_conditional_prob \t class1_conditional_prob 
    
    
Instructions:
    Again, you are free to design a solution however you see 
    fit as long as your final model meets our required format
    for the inference job we designed in Question 8. Please
    comment your code clearly and concisely.
    
    A few reminders: 
    1) Don't forget to emit Class Priors (with the right key).
    2) In python2: 3/4 = 0 and 3/float(4) = 0.75
"""
import sys
##################### YOUR CODE HERE ####################
c0_total = None
c1_total = None
c0_wordtotal = None
c1_wordtotal = None

current_word = None
ham_count = 0
spam_count = 0
ham_prob = 0.0
spam_prob = 0.0

for line in sys.stdin:
    # parse input
 

In [53]:
!cat NaiveBayes/train_reducer_smooth.py

#!/usr/bin/env python

import os
import sys                                                  
import numpy as np  

#################### YOUR CODE HERE ###################
# confirm that we have access to the model file
assert 'NBmodel.txt' in os.listdir('.'), "ERROR: can't find NBmodel.txt"

# load the model into a dictionary for easy access
uniquewc =0
for record in open('NBmodel.txt', 'r').readlines():
    word = record.split('\t')[0]
    if word == 'ClassPriors':
        continue
    uniquewc = uniquewc + 1
        
        
    
c0_total = None
c1_total = None
c0_wordtotal = None
c1_wordtotal = None
#uniquewc = len(unique_words)

current_word = None
ham_count = 0
spam_count = 0
ham_prob = 0.0
spam_prob = 0.0

for line in sys.stdin:
    # parse input
    pk, word, c0_n, c1_n = line.split('\t')
    c0_n = int(c0_n)
    c1_n = int(c1_n)
    if word == current_word:
        spam_count = spam_count + c1_n
        ham_count = ham_count + c0_n
    else:
        if word[0] == '*':
       

__`part b starts here`:__ MNB _without_ Smoothing (training on Chinese Example Corpus).

In [54]:
# part b - write a unit test for your mapper here
!cat NaiveBayes/chineseTrain.txt | NaiveBayes/train_mapper.py | sort -k1n

0	*doccount	1	3
0	*wordcount	3	8
0	beijing	0	1
0	chinese	0	1
0	chinese	0	1
0	chinese	0	1
0	chinese	0	1
0	chinese	0	1
0	chinese	1	0
1	*doccount	1	3
1	*wordcount	3	8
1	japan	1	0
1	macao	0	1
2	*doccount	1	3
2	*wordcount	3	8
2	shanghai	0	1
2	tokyo	1	0


In [55]:
# part b - write a systems test for your mapper + reducer together here
!cat NaiveBayes/chineseTrain.txt | NaiveBayes/train_mapper.py | sort -k1n | sed -n 1,9p | NaiveBayes/train_reducer.py

beijing	0	1	0.0	0.125
chinese	1	5	0.3333333333333333	0.625
ClassPriors	1	3	0.25	0.75


In [56]:
# part b - write a systems test for your mapper + reducer together here
!cat NaiveBayes/chineseTrain.txt | NaiveBayes/train_mapper.py | sort -k1n | sed -n 10,13p | NaiveBayes/train_reducer.py

japan	1	0	0.3333333333333333	0.0
macao	0	1	0.0	0.125
ClassPriors	1	3	0.25	0.75


In [57]:
# part b - write a systems test for your mapper + reducer together here
!cat NaiveBayes/chineseTrain.txt | NaiveBayes/train_mapper.py | sort -k1n | sed -n 14,17p | NaiveBayes/train_reducer.py

shanghai	0	1	0.0	0.125
tokyo	1	0	0.3333333333333333	0.0
ClassPriors	1	3	0.25	0.75


In [58]:
# part b - clear (and name) an output directory in HDFS for your unsmoothed chinese NB model
!hdfs dfs -rm -r {HDFS_DIR}/chinese-output

Deleted /user/root/HW2/chinese-output


In [59]:
# part b - write your hadoop streaming job
!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=2 \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k2,2" \
  -D mapreduce.partition.keypartitioner.options="-k1,1nr"  \
  -D mapreduce_job_reduces=3 \
  -files NaiveBayes/train_mapper.py,NaiveBayes/train_reducer.py \
  -mapper train_mapper.py \
  -reducer train_reducer.py \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
  -input {HDFS_DIR}/chineseTrain.txt \
  -output {HDFS_DIR}/chinese-output \
  -cmdenv PATH={PATH}\
  -numReduceTasks 3

packageJobJar: [] [/usr/lib/hadoop/hadoop-streaming-3.2.2.jar] /tmp/streamjob1010769798146897951.jar tmpDir=null
2022-01-31 00:17:23,865 INFO client.RMProxy: Connecting to ResourceManager at w261-m/10.128.0.2:8032
2022-01-31 00:17:24,108 INFO client.AHSProxy: Connecting to Application History server at w261-m/10.128.0.2:10200
2022-01-31 00:17:24,535 INFO client.RMProxy: Connecting to ResourceManager at w261-m/10.128.0.2:8032
2022-01-31 00:17:24,536 INFO client.AHSProxy: Connecting to Application History server at w261-m/10.128.0.2:10200
2022-01-31 00:17:24,699 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1643556265243_0007
2022-01-31 00:17:25,028 INFO mapred.FileInputFormat: Total input files to process : 1
2022-01-31 00:17:25,090 INFO mapreduce.JobSubmitter: number of splits:10
2022-01-31 00:17:25,242 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1643556265243_0007
2022-01-31 00:17:25,244 INFO mapreduce

In [60]:
# part b - extract your results (i.e. model) to a local file
!hdfs dfs -cat {HDFS_DIR}/chinese-output/part-0000* > NaiveBayes/NBmodel.txt

In [61]:
# part b - print your model so that we can confirm that it matches expected results
!cat NaiveBayes/NBmodel.txt | sort -k1n

ClassPriors	1	3	0.25	0.75
ClassPriors	1	3	0.25	0.75
ClassPriors	1	3	0.25	0.75
beijing	0	1	0.0	0.125
chinese	1	5	0.3333333333333333	0.625
japan	1	0	0.3333333333333333	0.0
macao	0	1	0.0	0.125
shanghai	0	1	0.0	0.125
tokyo	1	0	0.3333333333333333	0.0


In [62]:
!cat NaiveBayes/chineseTrain.txt

D1	1		Chinese Beijing Chinese
D2	1		Chinese Chinese Shanghai
D3	1		Chinese Macao
D4	0		Tokyo Japan Chinese


__`part e starts here`:__ MNB _with_ Smoothing (training on Chinese Example Corpus).

In [63]:
!chmod a+x NaiveBayes/train_reducer_smooth.py

In [64]:
# part e - write a unit test for your NEW reducer here
!cat NaiveBayes/chineseTrain.txt | NaiveBayes/train_mapper.py | sort -k1n | sed -n 1,9p | NaiveBayes/train_reducer_smooth.py

beijing	0,1,0.1111111111111111,0.14285714285714285
chinese	1,5,0.2222222222222222,0.42857142857142855
ClassPriors	1,3,0.25,0.75


In [65]:
# part e - write a systems test for your mapper + reducer together here

In [66]:
# part e - clear (and name) an output directory in HDFS for your SMOOTHED chinese NB model
!hdfs dfs -rm -r {HDFS_DIR}/chinese-output-smooth

Deleted /user/root/HW2/chinese-output-smooth


In [67]:
# part e - write your hadoop streaming job
!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=2 \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k2,2" \
  -D mapreduce.partition.keypartitioner.options="-k1,1nr"  \
  -D mapreduce_job_reduces=3 \
  -files NaiveBayes/train_mapper.py,NaiveBayes/train_reducer_smooth.py,NaiveBayes/NBmodel.txt \
  -mapper train_mapper.py \
  -reducer train_reducer_smooth.py \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
  -input {HDFS_DIR}/chineseTrain.txt \
  -output {HDFS_DIR}/chinese-output-smooth \
  -cmdenv PATH={PATH}\
  -numReduceTasks 3

packageJobJar: [] [/usr/lib/hadoop/hadoop-streaming-3.2.2.jar] /tmp/streamjob2836118689315485243.jar tmpDir=null
2022-01-31 00:19:12,492 INFO client.RMProxy: Connecting to ResourceManager at w261-m/10.128.0.2:8032
2022-01-31 00:19:12,734 INFO client.AHSProxy: Connecting to Application History server at w261-m/10.128.0.2:10200
2022-01-31 00:19:13,213 INFO client.RMProxy: Connecting to ResourceManager at w261-m/10.128.0.2:8032
2022-01-31 00:19:13,213 INFO client.AHSProxy: Connecting to Application History server at w261-m/10.128.0.2:10200
2022-01-31 00:19:13,391 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1643556265243_0008
2022-01-31 00:19:13,731 INFO mapred.FileInputFormat: Total input files to process : 1
2022-01-31 00:19:14,204 INFO mapreduce.JobSubmitter: number of splits:10
2022-01-31 00:19:14,335 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1643556265243_0008
2022-01-31 00:19:14,336 INFO mapreduce

In [68]:
# part e - extract your results (i.e. model) to a local file
!hdfs dfs -cat {HDFS_DIR}/chinese-output-smooth/part-0000* > NaiveBayes/NBmodel_smooth.txt

In [69]:
# part e - print your model (above local file) so that we can confirm that it matches expected results
!cat NaiveBayes/NBmodel_smooth.txt

beijing	0,1,0.1111111111111111,0.14285714285714285
chinese	1,5,0.2222222222222222,0.42857142857142855
ClassPriors	1,3,0.25,0.75
japan	1,0,0.2222222222222222,0.07142857142857142
macao	0,1,0.1111111111111111,0.14285714285714285
ClassPriors	1,3,0.25,0.75
shanghai	0,1,0.1111111111111111,0.14285714285714285
tokyo	1,0,0.2222222222222222,0.07142857142857142
ClassPriors	1,3,0.25,0.75


# Question 9: Enron Ham/Spam NB Classifier & Results.

Fantastic work. We're finally ready to perform Spam Classification on the Enron Corpus. In this question you'll run the analysis you've developed, report its performance.

### Q9 Tasks:
* __a) train/test split:__ Run the provided code to split our Enron file into a training set and testing set then load them into HDFS. [`NOTE:` _Make sure you re calculate the vocab size for just the training set!_]

* __b) train 2 models:__ Write Hadoop Streaming jobs to train MNB Models on the training set with and without smoothing. Save your models to local files at __`NaiveBayes/Unsmoothed/NBmodel.txt`__ and __`NaiveBayes/Smoothed/NBmodel.txt`__. [`NOTE:` _This naming is important because we wrote our classification task so that it expects a file of that name... if this inelegance frustrates you there is an alternative that would involve a few adjustments to your code [read more about it here](http://www.tnoda.com/blog/2013-11-23)._] Finally run the checks that we provide to confirm that your results are correct.


* __c) code:__ Recall that we designed our classification job with just a mapper. An efficient way to report the performance of our models would be to simply add a reducer phase to this job and compute precision and recall right there. Complete the code in __`NaiveBayes/evaluation_reducer.py`__ and then write Hadoop jobs to evaluate your two models on the test set. Report their performance side by side. [`NOTE:` if you need a refresher on precision, recall and F1-score [Wikipedia](https://en.wikipedia.org/wiki/F1_score) is a good resource.]


* __d) short response:__ Compare the performance of your two models. What do you notice about the unsmoothed model's predictions? Can you guess why this is happening? Which evaluation measure do you think is most relevant in our use case? [`NOTE:` _Feel free to answer using your common sense but if you want more information on evaluating the classification task checkout_ [this blogpost](https://tryolabs.com/blog/2013/03/25/why-accuracy-alone-bad-measure-classification-tasks-and-what-we-can-do-about-it/
) or [this paper](http://www.flinders.edu.au/science_engineering/fms/School-CSEM/publications/tech_reps-research_artfcts/TRRA_2007.pdf
)]



### Q9 Student Answers:
> __d)__ Overall the F1-score of the smoothened model improved substantially from 0.16 to 0.88. **Precision answers the following question: out of all the examples the classifier labeled as positive, what fraction were correct?** On the otherhand, **recall answers the following question: out of all the positive examples what fraction were correct?**.The unsmoothened model gave out 1.0 and 0.0909 as precision and recall respectively. Unsmoothened model has only predicted the classification correctly only 50% times. So clearly, the model didn't do good at all. Tweaking a classifier is a matter of balancing precision and recall. It is possible to get both up: one may choose to optimize a measure that combines precision and recall into a single value, such as the F-measure. F-score is the harmonic mean of of precision and recall. As we noticed in the below example, in our smoothened case the precision was at 0.7857 and the accuracy at 0.85. The smoothened model has balanced the precision and recall score and hence its F1-score was far better than the unsmoothened model. For this use case of identifying the mail as either spam or ham, it is important that we don't classify a ham as a spam as it may have significant business impact. So, we think recall is a critical metric as it measures percentage of correct predictions of spam over total spam in the dataset. As per the metrics listed below smoothened model has a highest recall score over the unsmoothened model with a score of just 0.0909.

__Test/Train split__

In [70]:
# part a - test/train split (RUN THIS CELL AS IS)
!head -n 80 data/enronemail_1h.txt > data/enron_train.txt
!tail -n 20 data/enronemail_1h.txt > data/enron_test.txt
!hdfs dfs -copyFromLocal data/enron_train.txt {HDFS_DIR}
!hdfs dfs -copyFromLocal data/enron_test.txt {HDFS_DIR}

copyFromLocal: `/user/root/HW2/enron_train.txt': File exists
copyFromLocal: `/user/root/HW2/enron_test.txt': File exists


__Training__ (Enron MNB Model _without smoothing_ )

In [71]:
# part b -  Unsmoothed model (FILL IN THE MISSING CODE BELOW)

# clear the output directory
!hdfs dfs -rm -r {HDFS_DIR}/enron-model

# hadoop command
!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=2 \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k2,2" \
  -D mapreduce.partition.keypartitioner.options="-k1,1nr"  \
  -D mapreduce_job_reduces=5 \
  -files NaiveBayes/train_mapper.py,NaiveBayes/train_reducer.py \
  -mapper train_mapper.py \
  -reducer train_reducer.py \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
  -input {HDFS_DIR}/enron_train.txt \
  -output {HDFS_DIR}/enron-model \
  -cmdenv PATH={PATH}\
  -numReduceTasks 5

# save the model locally
!rm -rf NaiveBayes/Unsmoothed
!mkdir NaiveBayes/Unsmoothed
!hdfs dfs -cat {HDFS_DIR}/enron-model/part-000* > NaiveBayes/Unsmoothed/NBmodel.txt

Deleted /user/root/HW2/enron-model
packageJobJar: [] [/usr/lib/hadoop/hadoop-streaming-3.2.2.jar] /tmp/streamjob7083438271505124556.jar tmpDir=null
2022-01-31 00:20:36,269 INFO client.RMProxy: Connecting to ResourceManager at w261-m/10.128.0.2:8032
2022-01-31 00:20:36,510 INFO client.AHSProxy: Connecting to Application History server at w261-m/10.128.0.2:10200
2022-01-31 00:20:36,947 INFO client.RMProxy: Connecting to ResourceManager at w261-m/10.128.0.2:8032
2022-01-31 00:20:36,947 INFO client.AHSProxy: Connecting to Application History server at w261-m/10.128.0.2:10200
2022-01-31 00:20:37,122 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1643556265243_0009
2022-01-31 00:20:37,844 INFO mapred.FileInputFormat: Total input files to process : 1
2022-01-31 00:20:37,901 INFO mapreduce.JobSubmitter: number of splits:9
2022-01-31 00:20:38,037 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1643556265243_0009
2022

In [72]:
# part b - check your UNSMOOTHED model results (RUN THIS CELL AS IS)
!grep assistance NaiveBayes/Unsmoothed/NBmodel.txt
# EXPECTED OUTPUT: assistance	2,4,0.000172547666293,0.000296823983378

assistance	2	4	0.0001725476662928134	0.00029682398337785694


In [73]:
# part b - check your UNSMOOTHED model results (RUN THIS CELL AS IS)
!grep money NaiveBayes/Unsmoothed/NBmodel.txt
# EXPECTED OUTPUT: money	1,22,8.62738331464e-05,0.00163253190858

money	1	22	8.62738331464067e-05	0.001632531908578213


__Training__ (Enron MNB Model _with Laplace +1 smoothing_ )

In [74]:
# part b -  Smoothed model (FILL IN THE MISSING CODE BELOW)

# clear the output directory
!hdfs dfs -rm -r {HDFS_DIR}/smooth-model

# hadoop command
!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=2 \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k2,2" \
  -D mapreduce.partition.keypartitioner.options="-k1,1nr"  \
  -D mapreduce_job_reduces=5 \
  -files NaiveBayes/train_mapper.py,NaiveBayes/train_reducer_smooth.py,NaiveBayes/Unsmoothed/NBmodel.txt \
  -mapper train_mapper.py \
  -reducer train_reducer_smooth.py \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
  -input {HDFS_DIR}/enron_train.txt \
  -output {HDFS_DIR}/smooth-model \
  -cmdenv PATH={PATH}\
  -numReduceTasks 5

# save the model locally
!rm -rf NaiveBayes/Smoothed
!mkdir NaiveBayes/Smoothed
!hdfs dfs -cat {HDFS_DIR}/smooth-model/part-000* > NaiveBayes/Smoothed/NBmodel.txt

Deleted /user/root/HW2/smooth-model
packageJobJar: [] [/usr/lib/hadoop/hadoop-streaming-3.2.2.jar] /tmp/streamjob3202876487256201386.jar tmpDir=null
2022-01-31 00:21:49,054 INFO client.RMProxy: Connecting to ResourceManager at w261-m/10.128.0.2:8032
2022-01-31 00:21:49,302 INFO client.AHSProxy: Connecting to Application History server at w261-m/10.128.0.2:10200
2022-01-31 00:21:49,740 INFO client.RMProxy: Connecting to ResourceManager at w261-m/10.128.0.2:8032
2022-01-31 00:21:49,740 INFO client.AHSProxy: Connecting to Application History server at w261-m/10.128.0.2:10200
2022-01-31 00:21:49,908 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1643556265243_0010
2022-01-31 00:21:50,284 INFO mapred.FileInputFormat: Total input files to process : 1
2022-01-31 00:21:50,354 INFO mapreduce.JobSubmitter: number of splits:9
2022-01-31 00:21:50,512 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1643556265243_0010
202

In [75]:
# part b - check your SMOOTHED model results (RUN THIS CELL AS IS)
!grep assistance NaiveBayes/Smoothed/NBmodel.txt
# EXPECTED OUTPUT: assistance	2,4,0.000185804533631,0.000277300205202

assistance	2,4,0.0001858045336306206,0.00027730020520215184


In [76]:
# part b - check your SMOOTHED model results (RUN THIS CELL AS IS)
!grep money NaiveBayes/Smoothed/NBmodel.txt
# EXPECTED OUTPUT: money	1,22,0.000123869689087,0.00127558094393

money	1,22,0.0001238696890870804,0.0012755809439298986


__Evaluation__

In [77]:
# part c - write your code in NaiveBayes/evaluation_reducer.py then RUN THIS
!chmod a+x NaiveBayes/evaluation_reducer.py

In [78]:
!cat NaiveBayes/evaluation_reducer.py

#!/usr/bin/env python
"""
Reducer to calculate precision and recall as part
of the inference phase of Naive Bayes.
INPUT:
    ID \t true_class \t P(ham|doc) \t P(spam|doc) \t predicted_class
OUTPUT:
    precision \t ##
    recall \t ##
    accuracy \t ##
    F-score \t ##
         
Instructions:
    Complete the missing code to compute these^ four
    evaluation measures for our classification task.
    
    Note: if you have no True Positives you will not 
    be able to compute the F1 score (and maybe not 
    precision/recall). Your code should handle this 
    case appropriately feel free to interpret the 
    "output format" above as a rough suggestion. It
    may be helpful to also print the counts for true
    positives, false positives, etc.
"""
import sys

# initialize counters
FP = 0.0 # false positives
FN = 0.0 # false negatives
TP = 0.0 # true positives
TN = 0.0 # true negatives
NDOCS = 0

# read from STDIN
for line in sys.stdin:
    # parse input
    docID, class_, pHam, p

In [79]:
# part c - unit test your evaluation job on the chinese model (RUN THIS CELL AS IS)
!cat NaiveBayes/chineseTest.txt | NaiveBayes/classify_mapper.py 
!cat NaiveBayes/chineseTest.txt | NaiveBayes/classify_mapper.py | NaiveBayes/evaluation_reducer.py

d5	1	-8.90668134500626	-8.10769031284611	1
d6	1	-5.780743515794329	-4.179502370564408	1
d7	0	-6.591673732011658	-7.511706880737812	0
d8	0	-4.394449154674438	-5.565796731681498	0
d5	1	-8.90668134500626	-8.10769031284611	 True
d6	1	-5.780743515794329	-4.179502370564408	 True
d7	0	-6.591673732011658	-7.511706880737812	 True
d8	0	-4.394449154674438	-5.565796731681498	 True
# Documenents:	4
True Positives	2.0
True Negatives:	2.0
False Positives	0.0
False Negatives	0.0
Accuracy	1.0000
Precision	1.0000
Recall	1.0000
F_score	1.0000


In [80]:
# part c - Evaluate the UNSMOOTHED Model Here (FILL IN THE MISSING CODE)

# clear output directory
!hdfs dfs -rm -r {HDFS_DIR}/evaluate-unsmoothed-model

# hadoop job
!hadoop jar {JAR_FILE} \
  -files NaiveBayes/classify_mapper.py,NaiveBayes/evaluation_reducer.py,NaiveBayes/Unsmoothed/NBmodel.txt \
  -mapper classify_mapper.py \
  -reducer evaluation_reducer.py \
  -input {HDFS_DIR}/enron_test.txt \
  -output {HDFS_DIR}/evaluate-unsmoothed-model \
  -cmdenv PATH={PATH} \
  -numReduceTasks 1

# retrieve results locally
!rm -rf NaiveBayes/Unsmoothed/results.txt
!hdfs dfs -cat {HDFS_DIR}/evaluate-unsmoothed-model/part-000* > NaiveBayes/Unsmoothed/results.txt

Deleted /user/root/HW2/evaluate-unsmoothed-model
packageJobJar: [] [/usr/lib/hadoop/hadoop-streaming-3.2.2.jar] /tmp/streamjob924028847229706262.jar tmpDir=null
2022-01-31 00:23:20,946 INFO client.RMProxy: Connecting to ResourceManager at w261-m/10.128.0.2:8032
2022-01-31 00:23:21,200 INFO client.AHSProxy: Connecting to Application History server at w261-m/10.128.0.2:10200
2022-01-31 00:23:21,668 INFO client.RMProxy: Connecting to ResourceManager at w261-m/10.128.0.2:8032
2022-01-31 00:23:21,668 INFO client.AHSProxy: Connecting to Application History server at w261-m/10.128.0.2:10200
2022-01-31 00:23:21,842 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1643556265243_0011
2022-01-31 00:23:22,173 INFO mapred.FileInputFormat: Total input files to process : 1
2022-01-31 00:23:22,231 INFO mapreduce.JobSubmitter: number of splits:9
2022-01-31 00:23:22,372 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1643556265

In [81]:
# part c - Evaluate the SMOOTHED Model Here (FILL IN THE MISSING CODE)

# clear output directory
!hdfs dfs -rm -r {HDFS_DIR}/evaluate-smoothed-model

# hadoop job
!hadoop jar {JAR_FILE} \
  -files NaiveBayes/classify_mapper.py,NaiveBayes/evaluation_reducer.py,NaiveBayes/Smoothed/NBmodel.txt \
  -mapper classify_mapper.py \
  -reducer evaluation_reducer.py \
  -input {HDFS_DIR}/enron_test.txt \
  -output {HDFS_DIR}/evaluate-smoothed-model \
  -cmdenv PATH={PATH} \
  -numReduceTasks 1

# retrieve results locally
!rm -rf NaiveBayes/Smoothed/results.txt
!hdfs dfs -cat {HDFS_DIR}/evaluate-smoothed-model/part-000* > NaiveBayes/Smoothed/results.txt

Deleted /user/root/HW2/evaluate-smoothed-model
packageJobJar: [] [/usr/lib/hadoop/hadoop-streaming-3.2.2.jar] /tmp/streamjob2737056886230107523.jar tmpDir=null
2022-01-31 00:24:09,211 INFO client.RMProxy: Connecting to ResourceManager at w261-m/10.128.0.2:8032
2022-01-31 00:24:09,458 INFO client.AHSProxy: Connecting to Application History server at w261-m/10.128.0.2:10200
2022-01-31 00:24:09,883 INFO client.RMProxy: Connecting to ResourceManager at w261-m/10.128.0.2:8032
2022-01-31 00:24:09,883 INFO client.AHSProxy: Connecting to Application History server at w261-m/10.128.0.2:10200
2022-01-31 00:24:10,068 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1643556265243_0012
2022-01-31 00:24:10,434 INFO mapred.FileInputFormat: Total input files to process : 1
2022-01-31 00:24:10,492 INFO mapreduce.JobSubmitter: number of splits:9
2022-01-31 00:24:10,637 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_16435562652

In [82]:
# part c - display results 
# NOTE: feel free to modify the tail commands to match the format of your results file
print('=========== UNSMOOTHED MODEL ============')
!tail -n 9 NaiveBayes/Unsmoothed/results.txt
print('=========== SMOOTHED MODEL ============')
!tail -n 9 NaiveBayes/Smoothed/results.txt

# Documenents:	20
True Positives	1.0
True Negatives:	9.0
False Positives	0.0
False Negatives	10.0
Accuracy	0.5000
Precision	1.0000
Recall	0.0909
F_score	0.1667
# Documenents:	20
True Positives	11.0
True Negatives:	6.0
False Positives	3.0
False Negatives	0.0
Accuracy	0.8500
Precision	0.7857
Recall	1.0000
F_score	0.8800


__`EXPECTED RESULTS:`__ 
<table>
<th>Unsmoothed Model</th>
<th>Smoothed Model</th>
<tr>
<td><pre>
# Documents:	20
True Positives:	1
True Negatives:	9
False Positives:	0
False Negatives:	10
Accuracy	0.5
Precision	1.0
Recall	0.0909
F-Score	0.1666
</pre></td>
<td><pre>
# Documents:	20
True Positives:	11
True Negatives:	6
False Positives:	3
False Negatives:	0
Accuracy	0.85
Precision	0.7857
Recall	1.0
F-Score	0.88
</pre></td>
</tr>
</table>

__`NOTE:`__ _Don't be too disappointed if these seem low to you. We've trained and tested on a very very small corpus... bigger datasets coming soon!_

# Question 10: Custom Partitioning and Secondary Sort

Now that we have our model, we can analyse the results and think about future improvements.

### Q10 Tasks:

* __a) code + short response:__ Let's look at the top ten words with the highest conditional probability in `Spam` and in `Ham`. We'll do this by writing a Hadoop job that sorts the model file (`NaiveBayes/Smoothed/NBmodel.py`). Normally we'd have to run two jobs -- one that sorts on $P(word|ham)$ and another that sorts on $P(word|spam)$. However if we slighly modify the data format in the model file then we can get the top words in each class with just one job. We've written a mapper that will do just this for you. Read through __`NaiveBayes/model_sort_mapper.py`__ and then briefly explain how this mapper will allow us to partition and sort our model file. Write a Hadoop job that uses our mapper and `/bin/cat` for a reducer to partition and sort. Print out the top 10 words in each class (where 'top' == highest conditional probability).[`HINT:` _this should remind you a lot of what we did in Question 6._]


* __b) short response:__ What do you notice about the 'top words' we printed in `a`? How would increasing the smoothing parameter 'k' affect the probabilities for the top words that you identified for 'a'. How would they affect the probabilities of words that occur much more in one class than another? In summary, how does the smoothing parameter 'k' affect the bias and the variance of our model. [`NOTE:` _you do not need to code anything for this task, but if you are struggling with it you could try changing 'k' and see what happens to the test set. We don't recommend doing this exploration with the Enron data because it will be harder to see the impact with such a big vocabulary_]

### Q10 Student Answers:
> __a)__ We have partitioned using the class and sorted using the conditional probability value of words given the class. As the class happen to be the $3^{rd}$ field and value of the conditional probability is in the $4^{th}$ field. We configured the hadoop's keycomparator and keypartitioner to do the custom partitioning and sorting. (**NOTE**: while our output of this hadoop streaming job matches with the **expected results** provided, the output is not sorted from highest to lowest. This is because hadoop doesn't sort the numbers in scientific notation correctly. While we could get that working by filtering the output using a threshold value we choose not to do that as ours matches the expected output.)

> __b)__ Laplace smoothing moves probabilities towards uninformed mean. Suppose we have a multinomial variable with sample counts $c_1, c_2,...c_n$ where $d$ is the vocabulary size. A Laplacian smoothened version of estimated probabilities has the form: $(c_i+k)/(N+d.k)$ , where $k$ is a positive integer. Typically, $k$ is set to 1 to smooth the estimator. If $k$ is $0$ then we have unsmoothed estimator. When large values of k are used then the influence of observed counts will be lower because estimated probabilities for the same number of observations will be lower. So, the higher probabilites will reduce to move towards the mean and similarly, lower probabilities will increase to move towards the mean. A direct consequence is that the variance of the model will tend to be lower and possibly with a larger bias.

In [83]:
!hdfs dfs -rm -r {HDFS_DIR}/NBmodel.txt
!hdfs dfs -copyFromLocal NaiveBayes/Smoothed/NBmodel.txt {HDFS_DIR}

Deleted /user/root/HW2/NBmodel.txt


In [84]:
!hadoop fs -ls {HDFS_DIR}

Found 16 items
-rw-r--r--   1 root hadoop     254300 2022-01-31 00:25 /user/root/HW2/NBmodel.txt
drwxr-xr-x   - root hadoop          0 2022-01-31 00:18 /user/root/HW2/chinese-output
drwxr-xr-x   - root hadoop          0 2022-01-31 00:19 /user/root/HW2/chinese-output-smooth
-rw-r--r--   1 root hadoop        119 2022-01-23 00:41 /user/root/HW2/chineseTest.txt
-rw-r--r--   1 root hadoop        107 2022-01-23 00:41 /user/root/HW2/chineseTrain.txt
drwxr-xr-x   - root hadoop          0 2022-01-30 00:39 /user/root/HW2/custom-partition
drwxr-xr-x   - root hadoop          0 2022-01-30 23:45 /user/root/HW2/eda-output
drwxr-xr-x   - root hadoop          0 2022-01-30 23:35 /user/root/HW2/eda-sort-output
drwxr-xr-x   - root hadoop          0 2022-01-31 00:21 /user/root/HW2/enron-model
-rw-r--r--   1 root hadoop     204559 2022-01-21 02:30 /user/root/HW2/enron.txt
-rw-r--r--   1 root hadoop      41493 2022-01-28 02:47 /user/root/HW2/enron_test.txt
-rw-r--r--   1 root hadoop     163066 2022-01-28 02:

In [85]:
# part a - write your Hadoop job here (sort smoothed model on P(word|class))

# clear output directory
!hdfs dfs -rm -r {HDFS_DIR}/custom-partition

# hadoop job
!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=4 \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k4,4nr -k1,1" \
  -D mapreduce.partition.keypartitioner.options="-k3,3" \
  -files NaiveBayes/model_sort_mapper.py \
  -mapper model_sort_mapper.py \
  -reducer /bin/cat \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
  -input {HDFS_DIR}/NBmodel.txt \
  -output {HDFS_DIR}/custom-partition \
  -cmdenv PATH={PATH} \
  -numReduceTasks 2


Deleted /user/root/HW2/custom-partition
packageJobJar: [] [/usr/lib/hadoop/hadoop-streaming-3.2.2.jar] /tmp/streamjob8495792187527972986.jar tmpDir=null
2022-01-31 00:25:31,725 INFO client.RMProxy: Connecting to ResourceManager at w261-m/10.128.0.2:8032
2022-01-31 00:25:31,978 INFO client.AHSProxy: Connecting to Application History server at w261-m/10.128.0.2:10200
2022-01-31 00:25:32,447 INFO client.RMProxy: Connecting to ResourceManager at w261-m/10.128.0.2:8032
2022-01-31 00:25:32,447 INFO client.AHSProxy: Connecting to Application History server at w261-m/10.128.0.2:10200
2022-01-31 00:25:32,619 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1643556265243_0013
2022-01-31 00:25:32,918 INFO mapred.FileInputFormat: Total input files to process : 1
2022-01-31 00:25:33,378 INFO mapreduce.JobSubmitter: number of splits:9
2022-01-31 00:25:33,518 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1643556265243_0013

In [86]:
# part b - print top words in each class
!hdfs dfs -cat {HDFS_DIR}/custom-partition/part-00000 | head

abn	0,1,6.19348445435402e-05,0.00011092008208086075	ham	6.19348445435402e-05	
absenteeism	0,1,6.19348445435402e-05,0.00011092008208086075	ham	6.19348445435402e-05	
absolute	0,1,6.19348445435402e-05,0.00011092008208086075	ham	6.19348445435402e-05	
absolutely	0,1,6.19348445435402e-05,0.00011092008208086075	ham	6.19348445435402e-05	
absorb	0,1,6.19348445435402e-05,0.00011092008208086075	ham	6.19348445435402e-05	
abuse	0,1,6.19348445435402e-05,0.00011092008208086075	ham	6.19348445435402e-05	
abused	0,1,6.19348445435402e-05,0.00011092008208086075	ham	6.19348445435402e-05	
acce	0,1,6.19348445435402e-05,0.00011092008208086075	ham	6.19348445435402e-05	
accelerate	0,1,6.19348445435402e-05,0.00011092008208086075	ham	6.19348445435402e-05	
accelerated	0,1,6.19348445435402e-05,0.00011092008208086075	ham	6.19348445435402e-05	
cat: Unable to write to output stream.


In [87]:
!hdfs dfs -cat {HDFS_DIR}/custom-partition/part-00001 | head

ab	4,0,0.00030967422271770096,5.546004104043037e-05	spam	5.546004104043037e-05	
absent	1,0,0.0001238696890870804,5.546004104043037e-05	spam	5.546004104043037e-05	
accepts	1,0,0.0001238696890870804,5.546004104043037e-05	spam	5.546004104043037e-05	
accomodate	4,0,0.00030967422271770096,5.546004104043037e-05	spam	5.546004104043037e-05	
accomodates	1,0,0.0001238696890870804,5.546004104043037e-05	spam	5.546004104043037e-05	
accompanied	1,0,0.0001238696890870804,5.546004104043037e-05	spam	5.546004104043037e-05	
accounting	4,0,0.00030967422271770096,5.546004104043037e-05	spam	5.546004104043037e-05	
accurate	1,0,0.0001238696890870804,5.546004104043037e-05	spam	5.546004104043037e-05	
achieve	1,0,0.0001238696890870804,5.546004104043037e-05	spam	5.546004104043037e-05	
achieved	1,0,0.0001238696890870804,5.546004104043037e-05	spam	5.546004104043037e-05	
cat: Unable to write to output stream.


### Congratulations, you have completed HW2! Please refer to the readme for submission instructions.

If you would like to provide feedback regarding this homework, please use the survey at: https://docs.google.com/forms/d/e/1FAIpQLSce9feiQeSkdP43A0ZYui1tMGIBfLfzb0rmgToQeZD9bXXX8Q/viewform