#DATASCI W261: Machine Learning at Scale 

#Vineet Gangwar
**vineet.gangwar@gmail.com **  
**W261-2: Machine Learning at Scale**  
**Assignment #2**   
**Date: Sep - 15 - 2015**

#HW2.0  
- What is a race condition in the context of parallel computation? Give an example.
- What is MapReduce?
- How does it differ from Hadoop?
- Which programming paradigm is Hadoop based on? Explain and give a simple example in code and show the code running.

**Race Condition**  
A race condition in the context of parallel computation is a condition where the system gets into an unintended state because of multiple threads acting upon the same shared data. For example, a thread might want to check the current value of a variable and if the value is positive, the thread will calculate the log and store the log into the variable. However, between the check and act phase of the thread a second thread comes along changes the value to 0. Then the first thread will overwrite 0 with the log of the previous data. This is incorrect behaviour because the most current value of the variable is lost.  

**MapReduce**  
Map Reduce is a programming model which follows a divide and conquer strategy to parallely process embarrasingly parallel problems on a cluster of machines  

**How does it differ from Hadoop**  
Hadoop is a software platform that implements the Map Reduce programming paradigm. It also provides a filesystem that supports the Map Reduce implementation. So MapReduce is a model while Hadoop is an implementation of that model  

**Which programming paradigm is Hadoop based on?**  
Hadoop is based on the MapReduce programming model  

**Explain and give a simple example in code and show the code running.**  
To solve a problem in Hadoop the user needs to provide a map job and a reduce job. Hadoop splits the input data and passes the chunks to as many map tasks. It then implements a barrier. Once all the map tasks are complete, the framework distributes the keys to the reduce jobs based on a hash function. The framework also sorts the keys before they are presented to the reducer jobs. The reduce jobs work on the input data and their output constitutes the output of the entire MapReduce job.  
**HW2.1** is an example of a running MapReduce code

#HW2.1: Sort in Hadoop MapReduce
Given as input: Records of the form < integer, “NA” >, where integer is any integer, and “NA” is just the empty string.
Output: sorted key value pairs of the form < integer, “NA” >; what happens if you have multiple reducers? Do you need additional steps? Explain.

The below MapReduce code relies on the sorting feature of the Hadoop framework to achieve sorting. Hadoop sorts keys before it sends the output from the mappers to the reducers. In this case as I am using just one reducer I am ensuring that Hadoop sorts all the 10K keys. The (Identity) reducer just prints out the input it gets after a minor formating.  
Hadoop by default does a text sort on the keys. I used the following to enable numeric sorting of the keys:  
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator  -D  mapred.text.key.comparator.options=-n  
I used the option -D mapred.reduce.tasks=1 to ensure only one reducer task. If multiple reducers are used then the output will be as many files with the keys randomly distributed amongst them. Even though the keys within each reducer's output file will be sorted by no sorting will be achived across the files.  
To sort we will need another MapReduce with just one reducer.

Generating 10000 random records

In [1]:
import random

n = 10000
filename = 'random_records'

# Generating 10000 numbers
random_list = range(n)
# Shuffling the numbers
random.shuffle(random_list)

# Writing out the file in the format <Integer, NA>
filehndl = open(filename, 'w')
for num in random_list:
    filehndl.write('{0}, NA\n'.format(num))
filehndl.close()

**Mapper**  
The mapper reads each line and convert the input format from < Interger, NA > to < Interger\tNA >

In [2]:
%%writefile mapper.py
#!/home/vineetgangwar/anaconda/bin/python
import sys

for line in sys.stdin:
    fields = line.split(',')
    fields = [field.strip() for field in fields]
    print '{0}\t{1}'.format(fields[0], fields[1])

Overwriting mapper.py


**Reducer**  
Writing reducer to filesystem

In [3]:
%%writefile reducer.py
#!/home/vineetgangwar/anaconda/bin/python
import sys

for line in sys.stdin:
    fields = line.split('\t')
    fields = [field.strip() for field in fields]
    print '{0}, {1}'.format(fields[0], fields[1])


Overwriting reducer.py


Changing permissions

In [4]:
!chmod 755 mapper.py
!chmod 755 reducer.py

Moving input file into HDFS

In [5]:
# Deleting folders from HDFS
!/usr/local/hadoop/bin/hdfs dfs -rm -r /input
!/usr/local/hadoop/bin/hdfs dfs -rm -r /output

# Creating folder in HDFS
!/usr/local/hadoop/bin/hdfs dfs -mkdir -p /input

# Copying input file for the job into HDFS
!/usr/local/hadoop/bin/hdfs dfs -put random_records /input/

# Running MapReducer job
# With numeric sort of Keys
# Number of reducers = 1
!/usr/local/hadoop/bin/hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator  -D  mapred.text.key.comparator.options=-n  -D mapred.reduce.tasks=1 -mapper mapper.py -reducer reducer.py -input /input/* -output /output/

#Reading output into a variable
job_output = !/usr/local/hadoop/bin/hdfs dfs -cat /output/*

# Printing first 100 lines of MapReduce output
for line in job_output[1:101]:
    print line

15/09/15 11:46:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 11:46:18 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /input
15/09/15 11:46:19 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 11:46:19 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /output
15/09/15 11:46:20 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 11:46:21 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 11:46:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform...

#HW2.2  
Using the Enron data from HW1 and Hadoop MapReduce streaming, write mapper/reducer pair that  will determine the number of occurrences of a single, user-specified word. Examine the word “assistance” and report your results.

   To do so, make sure that
   
   - mapper.py counts all occurrences of a single word, and
   - reducer.py collates the counts of the single word.

**Mapper**  
As informed by **Jake** on google groups, I am **only using email content for calculations**. As a result the word **assistance** is returning a **count of 9 instead of 10**  
Error in input data:  
Two emails have only 2 tab separator. For such emails, the mapper assumes that subject is missing

**This job can display counts of a single word, a list of words or all words**  
The input words are sent to the MapReduce task using -cmdenv option in the environment variable vocab_input

Code Description:  
The mapper splits each email and creates separate variables for email_id, truth, subject and message. It then counts the number of times each word occurs in the message and then outputs word, word_count, email_id and truth separated by tab

In [6]:
%%writefile mapper.py
#!/home/vineetgangwar/anaconda/bin/python
import sys
import re
import os

for line in sys.stdin:
    # Splitting each line/email based on tab
    fields = line.strip().split('\t')
    email_id = fields[0]
    truth = fields[1]
    if len(fields) == 4:
        subject = fields[2]
        message = fields[3]
    else:
        subject = ""
        message = fields[2]
    
    # Finding set of unique words and list of all words
    message = message.lower()
    message = re.sub('[^a-z]', ' ', message)    # Converting non-alpha to space
    words = message.split()
    unique_words = set(words)
    
    ## Capturing envirnment variable vocab_input to obtain user inout data
    env_vars = os.environ
    vocab = env_vars['vocab_input'].split(',')
    # Handling for hw1.5 i.e. if '*' then vocab is all words
    if vocab[0] == '*':
        vocab = unique_words

    # Loop through all unique words and calculating count
    # And printing word, word_count, email_id, truth
    for word in vocab:
        word_count = words.count(word)
        print '{0}\t{1}\t{2}\t{3}'.format(word, word_count, email_id, truth)

Overwriting mapper.py


**Reducer**  
The reducer first accumulates all the mapper output data in a list object. It then loops through the list and does two things:
- Uses the words in the list to create dictionary keys
- Uses the word_counts in the list to increment the dictionary values where the word == dictionary key

It then prints out the dictionary which gives words and their corresponding counts

In [7]:
%%writefile reducer.py
#!/home/vineetgangwar/anaconda/bin/python
import sys

# The vocab variable can take the following values:
# A single word - such as assistance
# A list of words - such as "assistance valium him her"
# "*" - meaning all words

# This list will store the output of all mappers as a list of lists
# e.g.
# [
# ['assistance', '1', '0018.2003-12-18.GP', '1'],
# ['assistance', '3', '0018.2001-07-13.SA_and_HP', '1'],
# ['enlargementwithatypo', '0', '0001.1999-12-10.farmer', '0']
# ]
key_value_list = list()

for line in sys.stdin:
    fields = line.strip().split('\t')
    key_value_list.append(fields)

# This dict will be used to store counts of terms
word_count_dict = dict()

# Looping through the list of lists created by def readfiles()
for item in key_value_list:
    if item[0] in word_count_dict.keys():   # If word exists update count
        word_count_dict[item[0]] = int(word_count_dict[item[0]]) + int(item[1])
    else:                                   # If new word then create key and store count
        word_count_dict[item[0]] = int(item[1])


for key, value in word_count_dict.iteritems():
    print  key + '\t' + str(value)
    

Overwriting reducer.py


Cleaning HDFS and moving input file into HDFS

In [8]:
# Deleting folders from HDFS
!/usr/local/hadoop/bin/hdfs dfs -rm -r /input
!/usr/local/hadoop/bin/hdfs dfs -rm -r /output

# Creating folder
!/usr/local/hadoop/bin/hdfs dfs -mkdir -p /input

# Copying input file for the job
!/usr/local/hadoop/bin/hdfs dfs -put enronemail_1h.txt /input/

# Executing the MapReduce job
!/usr/local/hadoop/bin/hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar -mapper mapper.py -reducer reducer.py -cmdenv vocab_input=assistance -input /input/* -output /output/

# Reading output into a variable
job_output = !/usr/local/hadoop/bin/hdfs dfs -cat /output/*

# Printing output
for line in job_output[1:101]:
    print line

15/09/15 11:46:45 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 11:46:46 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /input
15/09/15 11:46:47 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 11:46:47 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /output
15/09/15 11:46:48 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 11:46:50 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 11:46:51 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform...

#HW2.3  
Using the Enron data from HW1 and Hadoop MapReduce, write  a mapper/reducer pair that will classify the email messages by a single, user-specified word. Examine the word “assistance” and report your results. To do so, make sure that
   
   - mapper.py
   - reducer.py 

performs a single word multinomial Naive Bayes classification.

**Result**  
The algorithm obtained an accurracy of 0.56

**Mapper**  
Using mapper for **HW2.2** above

**Reducer**  
Step 1:  
The reducer first accumulates all the mapper output data in a list object. It then uses the list object to create a Pandas DataFrame that contains the term frequency per document:
- Index/Row names are the email_ids
- Column headers are words/features. If the user inputs a word or a list of words from the command line then the headers are those words. This includes words such as enlargementWITHATypo. If the user specified * then all unique words from all emails become the column headers
- The DataFrame also contains another column called 'TRUTH' that contains the true class of each email

Step 2:  
Next, the reducer calculates probabilities from the above DataFrame and stores them in new objects. It used Pandas DataFrame methods such as groupby, sum for the calculations:
- It calculates and stores the prior probabilities in a dictionary
- It calculates and stores P(word|class) in a Pandas DataFrame. The index of the DataFrame are words and the columns are 'SPAM' and 'HAM'. Laplace smoothing is applied in this step

Step 3:  
The Reducer then calculates P(email|class) and stores these in yet another Pandas DataFrame. This DataFrame has email_ids as the index and has the following column headings - SPAM, HAM, PREDICT, TRUTH. 'SPAM' stores the log probability of the email given SPAM. 'HAM' stores the log probability of the email given HAM. 'PREDICT' stores the predict class of the email based on the calculated log probabilities. 'TRUTH' contains the true class of the email.
The log probabilities of email given class is calculated as follows:
- For each email, the reducer refers the 1st DataFrame i.e. the one that contains the Term Frequecy per DataFrame. It gets a dictionary of words and counts where the counts are greater than zero.
- It then uses the prior proabilities dictionary and the DataFrame containing P(word|class) to calculate P(email|document)
- It then stores the log probabilities in the DataFrame created in step 3

Step 4:
The Reducer outputs the results in the format - Email_id \t TRUTH \t Predicted_Class. It also calculates the accuracy and prints it also.

Note:  
The reducer has an additional functionalilty wherein it drops words/tokens/features whose terms frequency is less than 3. This feature can be turned on/off from the command line. The -cmdenv drop_cols=no disables this feature while -cmdenv drop_cols=yes enables this feature. In HW2.5 this feature is set to yes

In [9]:
%%writefile reducer.py
#!/home/vineetgangwar/anaconda/bin/python
import pandas as pd
import numpy as np
import math
import sys
import os

# The vocab variable can take the following values:
# A single word - such as assistance
# A list of words - such as "assistance valium him her"
# "*" - meaning all words

# This function creates a dataframe with email_ids as the index
# and all words as the column headings.
# Each cell contains the count of occurrences of each word in each email
# This function returns a tuple of vocab and the DataFrame

def create_dataframe(key_value_list):
    ## Creating dataframe of email id and truth pairs
    # Creating list of email_ids and truths
    email_ids = list()
    truths = list()
    for item in key_value_list:
        email_id = item[2]
        truth = int(item[3])
        if email_id not in email_ids:
            email_ids.append(email_id)
            truths.append(truth)
    # Creating dictionary
    id_truth_dict = dict()
    id_truth_dict['email_id'] = email_ids
    id_truth_dict['TRUTH'] = truths
    # Converting into dataframe
    id_truth = pd.DataFrame(id_truth_dict)

    ## Creating data frame to store word counts and email in matrix
    # Creating words and ids list to create an empty data frame email_id X word list
    set_of_words = set()
    set_of_ids = set()
    for item in key_value_list:
        set_of_words.add(item[0])
        set_of_ids.add(item[2])

    set_of_words = list(set_of_words)
    set_of_ids = list(set_of_ids)
    num_of_ids = len(set_of_ids)

    # Creating dict of zeros to convert into a dataframe
    zeros_dict = dict()
    for i in range(len(set_of_words)):
        zeros_dict[set_of_words[i]] = [0 for x in range(num_of_ids)]
    # Adding ids
    zeros_dict['email_id'] = set_of_ids

    # Converting into dataframe
    id_wordlist = pd.DataFrame(zeros_dict)

    # Merging dataframe to add truth also
    df = pd.merge(id_wordlist, id_truth, on='email_id', how='inner')
    df.set_index('email_id', inplace=True)

    # Updating counts
    for item in key_value_list:
        email_id = item[2]
        word_count = item[1]
        word = item[0]
        df.loc[email_id, word] = int(word_count)

    return set_of_words, df

# This function calcuates the following probabilities:
# Priors in a Dict() called priors
# A DataFrame containing probabilities of all words given class. This Dataframe called 
# word_prob_class has the following structure:
# words X class

def calculating_probs(vocab, df):
    category = {'spam': 1, 'ham': 0}
    ## Calculating probabilities
    # Calculating priors probabilites and storing in a dict
    prob_prior_spam = df.groupby('TRUTH').size()[1].astype(float) / len(df)
    prob_prior_ham = df.groupby('TRUTH').size()[0].astype(float) / len(df)
    priors = {'spam': prob_prior_spam, 'ham': prob_prior_ham}

    # Calculating term count in spam and ham for the given vocab
    term_count_spam = df.groupby('TRUTH').sum().sum(axis=1)[1]
    term_count_ham = df.groupby('TRUTH').sum().sum(axis=1)[0]
    term_count_category = {'spam': term_count_spam, 'ham': term_count_ham}

    # Calculating counts of words in vocab per catergory
    words_per_category = df.groupby('TRUTH').sum().transpose()

    # Calculating word probabilities per class
    word_probs_class = words_per_category.copy()
    for cat_key, cat_value in category.iteritems():
        word_probs_class[cat_value] = word_probs_class[cat_value] / term_count_category[cat_key]
    # Applying laplace smoothing
    # For Spam
    word_probs_class[1][word_probs_class[1] == 0] = float(1) / (term_count_category['spam'] + len(vocab))
    # For ham
    word_probs_class[0][word_probs_class[0] == 0] = float(1) / (term_count_category['ham'] + len(vocab))
    return priors, word_probs_class

def drop_cols_less_than_3(df):
    cols_more_3 = dict(df.sum(axis=0) >= 3)
    cols_to_include = [key for key, val in cols_more_3.iteritems() if val == True]
    df = df[cols_to_include]
    return df

# Main execution of the Reducer starts here

# This list will store the output of all mappers as a list of lists
# e.g.
# [
# ['assistance', '1', '0018.2003-12-18.GP', '1'],
# ['assistance', '3', '0018.2001-07-13.SA_and_HP', '1'],
# ['enlargementwithatypo', '0', '0001.1999-12-10.farmer', '0']
# ]
key_value_list = list()
for line in sys.stdin:
    fields = line.strip().split('\t')
    key_value_list.append(fields)

# Creating pandas DataFrame from input data
vocab, df = create_dataframe(key_value_list)

# If flag for dropping cols is set then dropping for words that occurred less than 3 times in the corpus
env_vars = os.environ
drop_cols = env_vars['drop_cols']
if drop_cols == 'yes':
    df = drop_cols_less_than_3(df)

# Getting priors and words given class probabilities
priors, word_probs_class = calculating_probs(vocab, df)

# Creating Pandas DataFrame to store final probabilities
# Creating dataframe to store probabilities
# Structure of DataFrame has email_ds in the index
# and spam, ham, TRUTH, PREDICT as columns
df_probs = df.copy(deep=True)
header_to_remove = list(df_probs.columns.values)
header_to_remove.remove('TRUTH')
df_probs.drop(header_to_remove, inplace=True, axis=1)
df_probs['spam'] = [0 for x in range(df.index.values.shape[0])]
df_probs['ham'] = [0 for x in range(df.index.values.shape[0])]
df_probs['PREDICT'] = [0 for x in range(df.index.values.shape[0])]

# Looping through all emails and calculating probabilites of email given class
# and storing in a DataFrame
category = {'spam': 1, 'ham': 0}
for email_id in df_probs.index:
    # Creating dict of all words whose count != 0 per email
    words_in_email = dict(df.loc[email_id, df.loc[email_id] != 0])
    # Removing 'TRUTH'
    if 'TRUTH' in words_in_email:
        words_in_email.pop('TRUTH')

    for cat_key, cat_value in category.iteritems():
        running_prob = math.log(priors[cat_key])

        for word, count in words_in_email.iteritems():
            running_prob += count * math.log(word_probs_class.loc[word, cat_value])

        df_probs.loc[email_id, cat_key] = running_prob

# Calculating predictions
df_probs['PREDICT'] = (df_probs['spam'] > df_probs['ham']).astype(int)

# Printing output
for email_id in df_probs.index:
    print email_id, '\t', int(df_probs.loc[email_id, 'TRUTH']), '\t', int(df_probs.loc[email_id, 'PREDICT'])

# Calculating and printing accuracy accurracy
correct = df_probs['TRUTH'] == df_probs['PREDICT']
print 'Accuracy:', float(np.sum(correct.astype(int))) / len(df_probs)

Overwriting reducer.py


Cleaning HDFS, moving input file into HDFS, executing MapReduce Job and displaying output

In [10]:
# Deleting folders from HDFS
!/usr/local/hadoop/bin/hdfs dfs -rm -r /input
!/usr/local/hadoop/bin/hdfs dfs -rm -r /output

# Creating folder
!/usr/local/hadoop/bin/hdfs dfs -mkdir -p /input

# Copying input file for the job
!/usr/local/hadoop/bin/hdfs dfs -put enronemail_1h.txt /input/

!/usr/local/hadoop/bin/hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar -mapper mapper.py -reducer reducer.py -cmdenv vocab_input=assistance -cmdenv drop_cols=no -input /input/* -output /output/

#Reading output into a variable
job_output = !/usr/local/hadoop/bin/hdfs dfs -cat /output/*

# Printing output
for line in job_output[1:]:
    print line

15/09/15 11:47:10 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 11:47:11 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /input
15/09/15 11:47:12 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 11:47:12 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /output
15/09/15 11:47:13 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 11:47:15 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 11:47:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform...

#HW2.4  
Using the Enron data from HW1 and in the Hadoop MapReduce framework, write  a mapper/reducer pair that will classify the email messages using multinomial Naive Bayes Classifier using a list of one or more user-specified words. Examine the words “assistance”, “valium”, and “enlargementWithATypo” and report your results

To do so, make sure that

   - mapper.py 
   - reducer.py 

performs the multiple-word multinomial Naive Bayes classification via the chosen list.

**Result**  
The algorithm reports an accuracy of 0.56 with input terms - assistance,valium,enlargementWithATypo. HW1.4 also reported an accuracy of 0.56. However, HW1.4 had reported an accuracy of 0.59 for the same input terms but with tokens selected from the entire email rather than just the email message.  
To test the algorithm I passed the following terms - assistance,valium,enlargementWithATypo,him. The algorithm reported an accuracy of 0.62. Further, in HW2.5 when I used all the terms, it reported and accuracy of 0.98

**Mapper**  
Using mapper from **HW2.2** above

**Reducer**  
Using reducer from **HW2.3** above

The algorithm reports an accuracy of 0.56 with input terms - assistance,valium,enlargementWithATypo

In [11]:
# Deleting folders from HDFS
!/usr/local/hadoop/bin/hdfs dfs -rm -r /input
!/usr/local/hadoop/bin/hdfs dfs -rm -r /output

# Creating folder
!/usr/local/hadoop/bin/hdfs dfs -mkdir -p /input

# Copying input file for the job
!/usr/local/hadoop/bin/hdfs dfs -put enronemail_1h.txt /input/

!/usr/local/hadoop/bin/hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar -mapper mapper.py -reducer reducer.py -cmdenv vocab_input=assistance,valium,enlargementWithATypo -cmdenv drop_cols=no -input /input/* -output /output/
#!/usr/local/hadoop/bin/hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar -mapper mapper.py -cmdenv vocab_input=assistance,valium,enlargementWithATypo -cmdenv drop_cols=no -input /input/* -output /output/

#Reading output into a variable
job_output = !/usr/local/hadoop/bin/hdfs dfs -cat /output/*

# Printing output
for line in job_output[1:]:
    print line

15/09/15 11:47:30 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 11:47:31 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /input
15/09/15 11:47:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 11:47:32 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /output
15/09/15 11:47:33 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 11:47:35 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 11:47:36 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform...

The algorithm reports an accuracy of 0.62 with input terms - assistance,valium,enlargementWithATypo,him

In [12]:
# Deleting folders from HDFS
!/usr/local/hadoop/bin/hdfs dfs -rm -r /input
!/usr/local/hadoop/bin/hdfs dfs -rm -r /output

# Creating folder
!/usr/local/hadoop/bin/hdfs dfs -mkdir -p /input

# Copying input file for the job
!/usr/local/hadoop/bin/hdfs dfs -put enronemail_1h.txt /input/

!/usr/local/hadoop/bin/hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar -mapper mapper.py -reducer reducer.py -cmdenv vocab_input=assistance,valium,enlargementWithATypo,him -cmdenv drop_cols=no -input /input/* -output /output/

#Reading output into a variable
job_output = !/usr/local/hadoop/bin/hdfs dfs -cat /output/*

# Printing output
for line in job_output[1:]:
    print line

15/09/15 11:48:08 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 11:48:08 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /input
15/09/15 11:48:09 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 11:48:10 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /output
15/09/15 11:48:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 11:48:12 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 11:48:14 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform...

#HW2.5  
Using the Enron data from HW1 an in the  Hadoop MapReduce framework, write  a mapper/reducer for a multinomial Naive Bayes Classifier that will classify the email messages using  words present. Also drop words with a frequency of less than three (3). How does it affect the misclassifcation error of learnt naive multinomial Bayesian Classifiers on the training dataset:


**Result**  
The algorithm reported an accuracy of 0.98. This is higher than the accurracy reported in HW1.5. This probabily due to the change in corpus (HW 1 uses entire email, while HW 2 only uses the email content).  
I find no change in the accuracy when terms with a frequency of less than 3 are not dropped

**Mapper**  
Using Mapper from **HW2.2** above

**Reducer**  
Using Reducer from **HW2.3** above

The algorithm reported an accuracy of 0.98 when using all words and dropping terms with a frequency of less than 3

In [13]:
# Deleting folders from HDFS
!/usr/local/hadoop/bin/hdfs dfs -rm -r /input
!/usr/local/hadoop/bin/hdfs dfs -rm -r /output

# Creating folder
!/usr/local/hadoop/bin/hdfs dfs -mkdir -p /input

# Copying input file for the job
!/usr/local/hadoop/bin/hdfs dfs -put enronemail_1h.txt /input/

!/usr/local/hadoop/bin/hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar -mapper mapper.py -reducer reducer.py -cmdenv vocab_input=* -cmdenv drop_cols=yes -input /input/* -output /output/

#Reading output into a variable
job_output = !/usr/local/hadoop/bin/hdfs dfs -cat /output/*

# Printing output
for line in job_output[1:]:
    print line

15/09/15 11:48:29 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 11:48:30 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /input
15/09/15 11:48:31 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 11:48:31 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /output
15/09/15 11:48:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 11:48:34 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 11:48:35 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform...

The algorithm reported an accuracy of 0.98 when using all words and not dropping terms with a frequency of less than 3

In [14]:
# Deleting folders from HDFS
!/usr/local/hadoop/bin/hdfs dfs -rm -r /input
!/usr/local/hadoop/bin/hdfs dfs -rm -r /output

# Creating folder
!/usr/local/hadoop/bin/hdfs dfs -mkdir -p /input

# Copying input file for the job
!/usr/local/hadoop/bin/hdfs dfs -put enronemail_1h.txt /input/

!/usr/local/hadoop/bin/hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar -mapper mapper.py -reducer reducer.py -cmdenv vocab_input=* -cmdenv drop_cols=no -input /input/* -output /output/

#Reading output into a variable
job_output = !/usr/local/hadoop/bin/hdfs dfs -cat /output/*

# Printing output
for line in job_output[1:]:
    print line

15/09/15 11:48:51 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 11:48:51 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /input
15/09/15 11:48:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 11:48:52 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /output
15/09/15 11:48:53 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 11:48:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 11:48:56 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform...