If you are not using the `Assignments` tab on the course JupyterHub server to read this notebook, read [Activating the assignments tab](https://github.com/lcdm-uiuc/info490-sp17/blob/master/help/act_assign_tab.md).

A few things you should keep in mind when working on assignments:

1. Make sure you fill in any place that says `YOUR CODE HERE`. Do **not** write your answer in anywhere else other than where it says `YOUR CODE HERE`. Anything you write anywhere else will be removed or overwritten by the autograder.

2. Before you submit your assignment, make sure everything runs as expected. Go to menubar, select _Kernel_, and restart the kernel and run all cells (_Restart & Run all_).

3. Do not change the title (i.e. file name) of this notebook.

4. Make sure that you save your work (in the menubar, select _File_ → _Save and CheckPoint_)

5. You are allowed to submit an assignment multiple times, but only the most recent submission will be graded.

## Problem 12.2. MapReduce.

In this problem, we will use Hadoop Streaming to execute a MapReduce code written in Python.

In [49]:
import os
from nose.tools import assert_equal, assert_true

We will use the [airline on-time performance data](http://stat-computing.org/dataexpo/2009/), but before proceeding, recall that the data set is encoded in `latin-1`. However, the Python 3 interpreter expects the standard input and output to be in `utf-8` encoding. Thus, we have to explicitly state that the Python interpreter should use `latin-1` for all IO operations, which we can do by setting the Python environment variable `PYTHONIOENCODING` equal to `latin-1`. We can set the environment variables of the current IPython kernel by modifying the `os.environ` dictionary.

In [50]:
os.environ['PYTHONIOENCODING'] = 'latin-1'

Let's use the shell to check if the variable is set correctly. If you are not familiar with the following syntax (i.e., Python variable = ! shell command), [this notebook](https://github.com/UI-DataScience/info490-fa15/blob/master/Week4/assignment/unix_ipython.ipynb) from the previous semester might be useful.

In [51]:
python_io_encoding = ! echo $PYTHONIOENCODING
assert_equal(python_io_encoding.s, 'latin-1')

## Mapper

Write a Python script that
  - Reads data from `STDIN`,
  - Skips the first line (The first line of `2001.csv` is the header that has the column titles.)
  - Outputs to `STDOUT` the `Origin` and `AirTime` columns separated with a tab.

In [52]:
%%writefile mapper.py
#!/usr/bin/env python3

import sys

# YOUR CODE HERE

# We open STDIN and STDOUT
#with sys.stdin as fin:
#    with sys.stdout as fout:
        # For every line in STDIN
#        for line in fin:
            # Strip off leading and trailing whitespace
#            line = line.strip()
            # We split the line into word tokens. Use whitespace to split.
            # Note we don't deal with punctuation.
#            words = line.split()
            # Now loop through all words in the line and output
#            for word in words:
#                fout.write("{0}{1}1\n".format(word, sep))


# Reads data from STDIN,
with sys.stdin as fin:
    # Skips the first line (The first line of 2001.csv is the header that has the column titles.)
    next(fin)
    with sys.stdout as fout:
        # For every line in STDIN
        for line in fin:
            # Strip off leading and trailing whitespace
            line = line.strip()
            # We split the line into word tokens. Use whitespace to split.
            # Note we don't deal with punctuation.
            words = line.split(',')
            # Outputs to STDOUT the Origin and AirTime columns separated with a tab.
            fout.write("{0}\t{1}\n".format(words[16], words[13]))


Overwriting mapper.py


We need make the file executable.

In [53]:
! chmod u+x mapper.py

Before testing the mapper code on the entire data set, let's first create a small file and test our code on this small data set.

In [54]:
! head -n 50 $HOME/data/2001.csv > 2001.csv.head
map_out_head = ! ./mapper.py < 2001.csv.head
print('\n'.join(map_out_head))

BWI	60
BWI	64
BWI	80
BWI	66
BWI	62
BWI	61
BWI	61
BWI	60
BWI	52
BWI	62
BWI	62
BWI	55
BWI	60
BWI	61
BWI	63
PHL	53
PHL	54
PHL	55
PHL	53
PHL	50
PHL	NA
PHL	57
PHL	48
PHL	56
PHL	55
PHL	55
PHL	55
PHL	55
PHL	49
PHL	75
PHL	49
PHL	50
PHL	49
PHL	NA
PHL	46
PHL	NA
PHL	51
PHL	53
PHL	52
PHL	52
PHL	54
PHL	56
PHL	55
PHL	51
PHL	49
PHL	49
CLT	82
CLT	82
CLT	78


In [55]:
assert_equal(
    map_out_head,
    ['BWI\t60','BWI\t64','BWI\t80','BWI\t66','BWI\t62','BWI\t61',
     'BWI\t61','BWI\t60','BWI\t52','BWI\t62','BWI\t62','BWI\t55',
     'BWI\t60','BWI\t61','BWI\t63','PHL\t53','PHL\t54','PHL\t55',
     'PHL\t53','PHL\t50','PHL\tNA','PHL\t57','PHL\t48','PHL\t56',
     'PHL\t55','PHL\t55','PHL\t55','PHL\t55','PHL\t49','PHL\t75',
     'PHL\t49','PHL\t50','PHL\t49','PHL\tNA','PHL\t46','PHL\tNA',
     'PHL\t51','PHL\t53','PHL\t52','PHL\t52','PHL\t54','PHL\t56',
     'PHL\t55','PHL\t51','PHL\t49','PHL\t49','CLT\t82','CLT\t82',
     'CLT\t78']
    )

## Reducer

Write a Python script that

  - Reads key-value pairs from `STDIN`,
  - Computes the minimum and maximum air time for flights, with respect to each origin airport,
  - Outputs to `STDOUT` the airports and the minimum and maximum air time for flights at each airport, separated with tabs.
  
For example,

```shell
$ ./mapper.py < 2001.csv.head | sort -n -k 1 | ./reducer.py
```

should give

```
BWI	52	80
CLT	78	82
PHL	46	75
```

In [56]:
%%writefile reducer.py
#!/usr/bin/env python3

import sys

# YOUR CODE HERE

# We open STDIN and STDOUT
# with sys.stdin as fin:
#    with sys.stdout as fout:
        # Keep track of current word and count
#        cword = None
#        ccount = 0
#        word = None   
        # For every line in STDIN
#        for line in fin:
            # We split the line into a word and count, based on predefined
            # separator token.
            #
            # Note we haven't dealt with punctuation.          
#            word, scount = line.split('\t', 1)            
            # We will assume count is always an integer value            
#            count = int(scount)
            # word is either repeated or new           
#            if cword == word:
#                ccount += count
#            else:
                # We have to handle first word explicitly
#                if cword != None:
#                    fout.write("{0:s}{1:s}{2:d}\n".format(cword, sep, ccount))            
                # New word, so reset variables
#                cword = word
#                ccount = count
#        else:
            # Output final word count
#            if cword == word:
#                fout.write("{0:s}{1:s}{2:d}\n".format(word, sep, ccount))
                
                

# Reads key-value pairs from STDIN,
with sys.stdin as fin:
    with sys.stdout as fout:
        # Computes the minimum and maximum air time for flights, with respect to each origin airport,
        origin = None
        c_origin = None
        c_min = None
        c_max = None
        # For every line in STDIN
        for line in fin:
            # We split the line into a word and count, based on predefined
            # separator token.
            #
            # Note we haven't dealt with punctuation.
            origin = line.split('\t')[0]
            airtime = line.split('\t')[1]
            
            # there are NA\n in there...
            if airtime != "NA\n":
                # we will assume airtime is always an integer value
                airtime = int(airtime)
                if c_origin == None:
                    # new
                    c_origin = origin
                    c_min = airtime
                    c_max = airtime
                elif origin == c_origin:
                    # repeated
                    c_min = min(c_min, airtime)
                    c_max = max(c_max, airtime)
                else:
                    # next
                    # Outputs to STDOUT the airports and the minimum and maximum air time for flights at each airport, separated with tabs.
                    fout.write("{0}\t{1}\t{2}\n".format(c_origin, c_min, c_max))
                    c_origin = origin
                    c_min = airtime
                    c_max = airtime
        else:
            # Output final word count
            if c_origin == origin:
                fout.write("{0}\t{1}\t{2}\n".format(c_origin, c_min, c_max))

Overwriting reducer.py


In [57]:
! chmod u+x reducer.py

In [58]:
red_head_out = ! ./mapper.py < 2001.csv.head | sort -n -k 1 | ./reducer.py
print('\n'.join(red_head_out))

BWI	52	80
CLT	78	82
PHL	46	75


In [59]:
assert_equal(red_head_out, ['BWI\t52\t80','CLT\t78\t82','PHL\t46\t75'])

If the previous tests on the smaller data set were successful, we can run the mapreduce on the entire data set.

In [60]:
mapred_out = ! ./mapper.py < $HOME/data/2001.csv | sort -n -k 1 | ./reducer.py
print('\n'.join(mapred_out[:10]))

ABE	16	180
ABI	28	85
ABQ	15	264
ACT	19	81
ACY	33	33
ADQ	32	67
AKN	12	54
ALB	23	360
AMA	30	130
ANC	23	428


In [61]:
assert_equal(len(mapred_out), 231)
assert_equal(mapred_out[:5], ['ABE\t16\t180', 'ABI\t28\t85', 'ABQ\t15\t264', 'ACT\t19\t81', 'ACY\t33\t33'])
assert_equal(mapred_out[-5:], ['TYS\t11\t177', 'VPS\t28\t123', 'WRG\t5\t38', 'XNA\t33\t195', 'YAK\t28\t72'])

## HDFS: Reset

We will do some cleaning up before we run Hadoop streaming. Let's first stop the [namenode and datanodes](https://hadoop.apache.org/docs/r1.2.1/hdfs_design.html).

In [62]:
! $HADOOP_PREFIX/sbin/stop-dfs.sh
! $HADOOP_PREFIX/sbin/stop-yarn.sh

Stopping namenodes on [info490rb.studentspace.cs.illinois.edu]
info490rb.studentspace.cs.illinois.edu: stopping namenode
localhost: stopping datanode
Stopping secondary namenodes [0.0.0.0]
0.0.0.0: stopping secondarynamenode
stopping yarn daemons
stopping resourcemanager
localhost: stopping nodemanager
localhost: nodemanager did not stop gracefully after 5 seconds: killing with kill -9
no proxyserver to stop


If there are any temporary files created during the previous Hadoop operation, we want to clean them up.

In [63]:
! rm -rf /tmp/*

rm: cannot remove ‘/tmp/hsperfdata_root’: Operation not permitted


We will simply [format the namenode](https://wiki.apache.org/hadoop/GettingStartedWithHadoop#Formatting_the_Namenode) and delete all files in our HDFS. Note that our HDFS is in an ephemeral Docker container, so all data will be lost anyway when the Docker container is shut down.

In [64]:
! echo "Y" | $HADOOP_PREFIX/bin/hdfs namenode -format 2> /dev/null

Formatting using clusterid: CID-0fcb4945-86ee-4c3b-8adb-ce4755fb8edd


After formatting the namenode, we restart the namenode and datanodes.

In [65]:
!$HADOOP_PREFIX/etc/hadoop/hadoop-env.sh
!$HADOOP_PREFIX/sbin/start-dfs.sh
!$HADOOP_PREFIX/sbin/start-yarn.sh

Starting namenodes on [info490rb.studentspace.cs.illinois.edu]
info490rb.studentspace.cs.illinois.edu: starting namenode, logging to /usr/local/hadoop/logs/hadoop-data_scientist-namenode-info490rb.studentspace.cs.illinois.edu.out
localhost: starting datanode, logging to /usr/local/hadoop/logs/hadoop-data_scientist-datanode-info490rb.studentspace.cs.illinois.edu.out
Starting secondary namenodes [0.0.0.0]
0.0.0.0: starting secondarynamenode, logging to /usr/local/hadoop/logs/hadoop-data_scientist-secondarynamenode-info490rb.studentspace.cs.illinois.edu.out
starting yarn daemons
starting resourcemanager, logging to /usr/local/hadoop/logs/yarn--resourcemanager-info490rb.studentspace.cs.illinois.edu.out
localhost: starting nodemanager, logging to /usr/local/hadoop/logs/yarn-data_scientist-nodemanager-info490rb.studentspace.cs.illinois.edu.out


Sometimes when the namenode is restarted, it enteres Safe Mode, not allowing any changes to the file system. We do want to make changes, so we manually leave Safe Mode.

In [66]:
! $HADOOP_PREFIX/bin/hdfs dfsadmin -safemode leave

Safe mode is OFF


## HDFS: Create directory

- Create a new directory in HDFS at `/user/data_scientist`.

In [67]:
# Create a new directory in HDFS at /user/data_scientist.

# YOUR CODE HERE
# !$HADOOP_PREFIX/bin/hdfs dfs -mkdir -p /user/$NB_USER
!$HADOOP_PREFIX/bin/hdfs dfs -mkdir -p /user/data_scientist

In [68]:
ls_user = ! $HADOOP_PREFIX/bin/hdfs dfs -ls /user/
print('\n'.join(ls_user))

Found 1 items
drwxr-xr-x   - data_scientist supergroup          0 2017-04-17 02:14 /user/data_scientist


In [69]:
assert_true('/user/data_scientist' in ls_user.s)

- Create a new directory in HDFS at `/user/data_scientist/wc/in`

In [70]:
# Create a new directory in HDFS at `/user/data_scientist/wc/in`

# YOUR CODE HERE
# !$HADOOP_PREFIX/bin/hdfs dfs -mkdir -p /user/$NB_USER
!$HADOOP_PREFIX/bin/hdfs dfs -mkdir -p /user/data_scientist/wc/in

In [71]:
ls_wc = ! $HADOOP_PREFIX/bin/hdfs dfs -ls wc
print('\n'.join(ls_wc))

Found 1 items
drwxr-xr-x   - data_scientist supergroup          0 2017-04-17 02:14 wc/in


In [72]:
assert_true('wc/in' in ls_wc.s)

## HDFS: Copy

- Copy `/home/data_scientist/data/2001.csv` from local file system into our new HDFS directory `wc/in`.

In [73]:
# Copy `/home/data_scientist/data/2001.csv` from local file system into our new HDFS directory `wc/in`.

# YOUR CODE HERE
# !$HADOOP_PREFIX/bin/hdfs dfs -put $HOME/hadoop/book.txt wc/in/book.txt
!$HADOOP_PREFIX/bin/hdfs dfs -put /home/data_scientist/data/2001.csv /user/data_scientist/wc/in/2001.csv

In [74]:
ls_wc_in = ! $HADOOP_PREFIX/bin/hdfs dfs -ls wc/in
print('\n'.join(ls_wc_in))

Found 1 items
-rw-r--r--   1 data_scientist supergroup  600411462 2017-04-17 02:15 wc/in/2001.csv


In [75]:
assert_true('wc/in/2001.csv' in ls_wc_in.s)

## Python Hadoop Streaming

- Run `mapper.py` and `reducer.py` via Hadoop Streaming.
- Use `/usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar`.
- We need to pass the `PYTHONIOENCODING` environment variable to our Hadoop streaming task. To find out how to set `PYTHONIOENCODING` to `latin-1` in a Hadoop streaming task, use the `--help` and `-info` options.

In [76]:
# Run Python code via Hadoop streaming

# YOUR CODE HERE
# $HADOOP_PREFIX/bin/hadoop jar $streaming_file \
#    -files mapper.py,reducer.py -input wc/in \
#    -output wc/out -mapper mapper.py -reducer reducer.py
!$HADOOP_PREFIX/bin/hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar \
    -files mapper.py,reducer.py -input wc/in \
    -output wc/out -mapper mapper.py -reducer reducer.py \
    -cmdenv PYTHONIOENCODING=latin-1

packageJobJar: [/tmp/hadoop-unjar3935353567922575303/] [] /tmp/streamjob9104049820679751086.jar tmpDir=null
17/04/17 02:15:38 INFO client.RMProxy: Connecting to ResourceManager at info490rb.studentspace.cs.illinois.edu/192.168.4.6:8032
17/04/17 02:15:38 INFO client.RMProxy: Connecting to ResourceManager at info490rb.studentspace.cs.illinois.edu/192.168.4.6:8032
17/04/17 02:15:39 INFO mapred.FileInputFormat: Total input paths to process : 1
17/04/17 02:15:39 INFO mapreduce.JobSubmitter: number of splits:5
17/04/17 02:15:40 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1492413232684_0001
17/04/17 02:15:40 INFO impl.YarnClientImpl: Submitted application application_1492413232684_0001
17/04/17 02:15:40 INFO mapreduce.Job: The url to track the job: http://info490rb.studentspace.cs.illinois.edu:8088/proxy/application_1492413232684_0001/
17/04/17 02:15:40 INFO mapreduce.Job: Running job: job_1492413232684_0001
17/04/17 02:15:47 INFO mapreduce.Job: Job job_1492413232684_0001 runn

In [77]:
ls_wc_out = ! $HADOOP_PREFIX/bin/hdfs dfs -ls wc/out
print('\n'.join(ls_wc_out))

Found 2 items
-rw-r--r--   1 data_scientist supergroup          0 2017-04-17 02:16 wc/out/_SUCCESS
-rw-r--r--   1 data_scientist supergroup       2464 2017-04-17 02:16 wc/out/part-00000


In [78]:
assert_true('wc/out/_SUCCESS' in ls_wc_out.s)
assert_true('wc/out/part-00000' in ls_wc_out.s)

In [79]:
stream_out = ! $HADOOP_PREFIX/bin/hdfs dfs -cat wc/out/part-00000
print('\n'.join(stream_out[:10]))

ABE	16	180
ABI	28	85
ABQ	15	264
ACT	19	81
ACY	33	33
ADQ	32	67
AKN	12	54
ALB	23	360
AMA	30	130
ANC	23	428


In [80]:
assert_equal(mapred_out, stream_out)

## Cleanup

In [81]:
! $HADOOP_PREFIX/bin/hdfs dfs -rm -r -f -skipTrash wc/out

Deleted wc/out
