<a href="https://colab.research.google.com/github/yuechen2001/CS4225/blob/master/CS4225_Assignment_0_hadoop.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# CS4225 Assignment 0: Hadoop Streaming Warm-up (Token Counting)

Assignment 0 is a guided warm-up to help you get comfortable with **Hadoop Streaming** on Colab. You will implement a simple mapper/reducer that counts tokens in a text stream. You do **not** need to submit anything for this notebook, but the mechanics here will be used in Assignment 1.


# Installing Hadoop

First, run the following cell, which downloads and installs Hadoop to the `usr/local` directory, and then sets the `JAVA_HOME` environment variable so Hadoop knows where to find Java on our system.

To run the cell, either click the "play" icon at the top left of the cell, or use the keyboard shortcut Ctrl/âŒ˜+Enter, or Shift+Enter to run and go to the next cell. The installation process should take about 45 seconds or so. Note that if you refresh the colab notebook, you need to run this cell again before you can use Hadoop.

In [None]:
!wget https://dlcdn.apache.org/hadoop/common/hadoop-3.3.5/hadoop-3.3.5.tar.gz
!tar -xzf hadoop-3.3.5.tar.gz
# install hadoop to /usr/local
!cp -r hadoop-3.3.5/ /usr/local/
import os
# hadoop needs to know where java is located
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
# the following line reduces the amount of Hadoop's printed output, to make it easier to see your own debug output
os.environ["HADOOP_ROOT_LOGGER"] = "WARN,console"

# Downloading the Data

In [None]:
!wget https://nuojohnchen.github.io/teaching/cs4225/assign0/input.txt

# Hadoop Streaming

The `%%file` command in colab means that when we execute this block, colab will write the Python program in the block to a file
located at `/content/mapper.py` (note the slash in front). Generally
in colab, `/content` is the default directory where your files are stored.

It is not recommended to code your mapper and reducer directly into
the files `/content/mapper.py` and `/content/reducer.py`; this is quite unsafe, as colab
does not in general save these files, and when you re-open the notebook
later they will most likely be gone. It is better to follow the below approach and code them into
notebook cells, and then use the `%%file` command.

**Mapper:** Like in regular Hadoop, the framework divides the input file into splits (e.g., 128MB) and passes them to different mappers. But unlike regular Hadoop where the mappers receive their input as key-value pairs, here the mapper just takes in lines of text (which are lines from the input file). Instead of an explicit `map()` function, here we just iterate over the lines of text and process them one by one. Instead of emitting key-value pairs like in regular Hadoop, here the mapper emits tab-separated lines of plain text of the form "`key\tvalue`". Both the mapper and reducer read their inputs from `sys.stdin`, and write their outputs to `sys.stdout`.

In [None]:
%%file mapper.py

import io
import sys
input_stream = io.TextIOWrapper(sys.stdin.buffer, encoding='latin1')

for line in sys.stdin:

  # Here we split each line of the incoming input, and emit each word, with
  # its count of 1, separated by a tab. This corresponds to the map() function
  # in Hadoop.
  for word in line.split():
    print(f"{word}\t1")

**Reducer:** Like in regular
Hadoop, the data emitted by the mappers is grouped by key and sorted, and
then passed to the reducer responsible for that key. But unlike regular Hadoop
where the `reduce()` function's input is in the form `<key, List[values]>`,
here the reducer receives its input line by line, in the same format emitted
by the mappers.

In [None]:
%%file reducer.py

import sys

# Initialization: here we create the data structures we need; this corresponds
# to the setup() function in Hadoop.a
counts = {}

for line in sys.stdin:

  # Here we process each line of the incoming input; this corresponds to the
  # reduce() function in Hadoop.
  word, count = line.strip().split('\t')
  if word not in counts:
    counts[word] = 0
  counts[word] += int(count)

# Postprocess: after processing all lines, we do any necessary post-processing,
# corresponding to the cleanup() function in Hadoop.
for word, count in counts.items():
  print(f"{word}\t{count}")

In [None]:
# Set permissions to ensure that Hadoop can use the files
!chmod u+rwx /content/mapper.py
!chmod u+rwx /content/reducer.py

**Run Hadoop Streaming:** the first line deletes the `/content/output` folder to prevent errors when running the program multiple times.

In [None]:
!rm -rf /content/output
!/usr/local/hadoop-3.3.5/bin/hadoop jar /usr/local/hadoop-3.3.5/share/hadoop/tools/lib/hadoop-streaming-3.3.5.jar \
-input /content/input.txt \
-output /content/output \
-file /content/mapper.py \
-file /content/reducer.py \
-mapper 'python mapper.py' \
-reducer 'python reducer.py'

The output is stored in the `/content/output/part-00000` file. (If we had multiple reducers, there would be multiple files, but for our assignments it is sufficient to use just 1 reducer (and Hadoop's default behavior is to use just 1 reducer).

In [None]:
!cat /content/output/part-00000