# Lab 3 - MapReduce

In this lab, we practice the MapReduce programming paradigm.

We will complete the tasks using the accompanied *mapreduce* package (as **mapreduce.py**) and MRJob. Please download the **mapreduce.py** file from our online class resource page, and place it in the same folder with your notebook.

For each invocation of an MapReduce job (with mr.run()), you are expected to supply a mapper, a reducer and/or a combiner as needed. Below are sample usage of the package:

```python
    # Run on input1 using your mapper1 and reducer1 function
    output = list(mr.run(input1, mapper1, reducer1))

    # Run on input2 using only your mapper2, no reduce phase
    output = list(mr.run(enumerate(input2), mapper2, combiner2))
    
    # Run on input3 using 2 nested MapReduce jobs
    output = mr.run(mr.run(input3, mapper3, reducer3), mapper4)
```
    
Please note that the input must be an iteratable of **key/value pairs**. If your inpu tdata does not have a key, you can simply add a null or index key through **enumerator(input)**. The output of the mr.run() is always a **generator**. You have to cast it to a list if you'd like to view, index or print it out.

We will also need **book.txt** and **citibike.csv** to be downloaded.

In [None]:
!pip install mrjob

Collecting mrjob
  Downloading mrjob-0.7.4-py2.py3-none-any.whl (439 kB)
[?25l[K     |▊                               | 10 kB 19.9 MB/s eta 0:00:01[K     |█▌                              | 20 kB 25.4 MB/s eta 0:00:01[K     |██▎                             | 30 kB 14.0 MB/s eta 0:00:01[K     |███                             | 40 kB 10.5 MB/s eta 0:00:01[K     |███▊                            | 51 kB 5.7 MB/s eta 0:00:01[K     |████▌                           | 61 kB 5.8 MB/s eta 0:00:01[K     |█████▏                          | 71 kB 6.3 MB/s eta 0:00:01[K     |██████                          | 81 kB 5.8 MB/s eta 0:00:01[K     |██████▊                         | 92 kB 4.9 MB/s eta 0:00:01[K     |███████▌                        | 102 kB 5.4 MB/s eta 0:00:01[K     |████████▏                       | 112 kB 5.4 MB/s eta 0:00:01[K     |█████████                       | 122 kB 5.4 MB/s eta 0:00:01[K     |█████████▊                      | 133 kB 5.4 MB/s eta 0:00:01[

In [None]:
!gdown --id 1sq4-zXn2Z82mdLSBBegEgsUsfqtgza-C -O mapreduce.py
!gdown --id 1qCQ6edyhTA1kqFWZf1y65ogidivDbBIT -O book.txt
!gdown --id 1I8eqA1Zy3vFq4mN8z0ZRl7ABXrdzCRYI -O citibike.csv

Downloading...
From: https://drive.google.com/uc?id=1sq4-zXn2Z82mdLSBBegEgsUsfqtgza-C
To: /content/mapreduce.py
100% 2.66k/2.66k [00:00<00:00, 8.19MB/s]
Downloading...
From: https://drive.google.com/uc?id=1qCQ6edyhTA1kqFWZf1y65ogidivDbBIT
To: /content/book.txt
100% 259k/259k [00:00<00:00, 36.8MB/s]
Downloading...
From: https://drive.google.com/uc?id=1I8eqA1Zy3vFq4mN8z0ZRl7ABXrdzCRYI
To: /content/citibike.csv
100% 8.16M/8.16M [00:00<00:00, 38.2MB/s]


In [None]:
import csv
import mapreduce as mr

## Task 0

Here is another concrete example on "Word Count" using the package. Assuming we have a text file named *book.txt*. Our task is to count the frequency of words in this document, and print the top 10. For illustration purposes, we use only the first 1000 lines of the book for counting.

In [None]:
with open('book.txt', 'r') as fi:
    lines = [(i,line.strip()) for i,line in enumerate(fi) if i<1000]

### After this, 'lines' stores a list of 1000 text lines
def mapper(k1, line):
    for word in line.strip().split(' '):
        if len(word)>0:
            yield (word, 1)
    
def reducer(word, counts):
    yield (word, sum(counts))

wCounts = list(mr.run(lines, mapper, reducer))
sortedCounts = sorted(wCounts, key=lambda x: -x[1])
sortedCounts[:10]

[('the', 360),
 ('of', 326),
 ('and', 246),
 ('a', 169),
 ('or', 161),
 ('to', 101),
 ('with', 100),
 ('in', 88),
 ('on', 67),
 ('as', 56)]

In [None]:
!head -n 2 citibike.csv

cartodb_id,the_geom,tripduration,starttime,stoptime,start_station_id,start_station_name,start_station_latitude,start_station_longitude,end_station_id,end_station_name,end_station_latitude,end_station_longitude,bikeid,usertype,birth_year,gender
1,,801,2015-02-01 00:00:00+00,2015-02-01 00:14:00+00,521,8 Ave & W 31 St,40.75044999,-73.99481051,423,W 54 St & 9 Ave,40.76584941,-73.98690506,17131,Subscriber,1978,2


## Task 1

We would like to write a MapReduce job to count the total number of trips involved at each station. For example, if a trip starts at station A and stops at station B, the trip will count for both A and B. The output must be tuples, each consisting of a station name and a count.

In [None]:
def mapper1(station, row):
  <CODE_HERE>

def reducer1(station, counts):
  <CODE_HERE>
    
with open('citibike.csv', 'r') as fi:
    reader = enumerate(csv.DictReader(fi))
    output1 = list(mr.run(reader, mapper1, reducer1))

output1[:10]

SyntaxError: ignored


## Task 2

Below is an example of showing how to use nested jobs and jobs with mappers only using the mapreduce package, thus, no points are included. Our task here is that we would like to filter the output of Task 1 to display only those stations with more than 1000 trips involved, of course, using the MapReduce paradigm.

In [None]:
def mapper2(station, count):
  <CODE_HERE>

with open('citibike.csv', 'r') as fi:
    reader = enumerate(csv.DictReader(fi))
    output2 = list(mr.run(mr.run(reader, mapper1, reducer1), mapper2))

output2

SyntaxError: ignored


## Task 3

We would like to count the number of trips taken between pairs of stations. Trips taken from station A to station B or  from station B to station A are both counted towards the station pair A and B. Please note that the station pair shoud be identified by station names, as a tuple, and in lexical order, i.e. (A,B) instead of (B,A) in this case. The output must be tuples, each consisting of the station pair identification and a count.

In [None]:
def mapper3(_, row):
  <CODE_HERE>

def reducer3(station_pair, counts):
  <CODE_HERE>

with open('citibike.csv', 'r') as fi:
    reader = enumerate(csv.DictReader(fi))
    output3 = list(mr.run(reader, mapper3, reducer3))

output3[:10]

SyntaxError: ignored


## Task 4

In this task, you are asked to compute the station with the most riders started from, per each gender of the *'Subscriber'* user. Meaning, what was the station name with the highest number of bike pickups for female riders, for male riders and for unknown riders.

The output will be a list of tuples, each includes a gender label (as indicated below) and another tuple consisting of a station name, and the total number of trips started at that station for that gender.


The label mapping for the gender column in citibike.csv is: (Zero=<b>Unknown</b>; 1=<b>Male</b>; 2=<b>Female</b>)

In [None]:
from mrjob.job import MRJob
from mrjob.step import MRStep
import re

WORD_RE = re.compile(r"[\w']+")

class MRTask4(MRJob):
  def mapper4(self, _, line):
    row = line.split(',')
    if row[14] == 'Subscriber':
      gender_station = (row[16], row[6])
      yield (gender_station, 1)

  def reducer4(self, gender_station, counts):
    stations = (gender_station[1], sum(counts))
    yield (gender_station[0], stations)

  def mapper5(self, gender, station_count):
    genderLabel = ('Unknown', 'Male', 'Female')[int(gender)]
    yield (genderLabel, station_count)

  def reducer5(self, genderLabel, station_counts):
    yield (genderLabel, max(station_counts, key=lambda x: x[1]))

  def steps(self):
    return [
      MRStep(mapper=self.mapper4, reducer=self.reducer4),
      MRStep(mapper=self.mapper5, reducer=self.reducer5),
    ]

task4 = MRTask4(args=[])

with open('citibike.csv', 'r') as fi:
    reader = enumerate(fi)
    output5 = list(mr.runJob(reader, task4))

output5[:10]

[('Female', ('W 21 St & 6 Ave', 107)),
 ('Male', ('8 Ave & W 31 St', 488)),
 ('Unknown', ('Catherine St & Monroe St', 1))]

In [None]:
!python task4.py citibike.csv | head

python3: can't open file 'task4.py': [Errno 2] No such file or directory


## Task 5

MRJob is a convenient package for simplifying the execution of MapReduce jobs on clusters. However, it doesn't work in a notebook. We're going to convert some of the examples of MRJob into our notebooks so that we can test our code before deploying them on Hadoop.

The two examples are available at:

https://mrjob.readthedocs.io/en/latest/guides/quickstart.html

https://mrjob.readthedocs.io/en/latest/guides/writing-mrjobs.html

In [None]:
!python mr_word_count.py book.txt

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/mr_word_count.root.20210927.194209.620334
Running step 1 of 1...
job output is in /tmp/mr_word_count.root.20210927.194209.620334/output
Streaming final output from /tmp/mr_word_count.root.20210927.194209.620334/output...
"chars"	246432
"words"	38538
"lines"	5877
Removing temp directory /tmp/mr_word_count.root.20210927.194209.620334...


In [None]:
list(enumerate(next(open('citibike.csv', 'r')).strip().split(',')))

[(0, 'cartodb_id'),
 (1, 'the_geom'),
 (2, 'tripduration'),
 (3, 'starttime'),
 (4, 'stoptime'),
 (5, 'start_station_id'),
 (6, 'start_station_name'),
 (7, 'start_station_latitude'),
 (8, 'start_station_longitude'),
 (9, 'end_station_id'),
 (10, 'end_station_name'),
 (11, 'end_station_latitude'),
 (12, 'end_station_longitude'),
 (13, 'bikeid'),
 (14, 'usertype'),
 (15, 'birth_year'),
 (16, 'gender')]

In [None]:
from mrjob.job import MRJob

class MRTask1(MRJob):

  def mapper(self, _, line):
    record = line.split(',')
    yield (record[6],1)
    yield (record[10],1)

  def reducer(self, station, counts):
    yield (station,sum(counts))

task1 = MRTask1(args=[])

with open('citibike.csv', 'r') as fi:
  reader = enumerate(fi)
  output1 = list(mr.runJob(reader, task1))

output1[:10]

ModuleNotFoundError: ignored

In [None]:
!python task1.py citibike.csv | head

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/task1.root.20210927.200913.245600
Running step 1 of 1...
job output is in /tmp/task1.root.20210927.200913.245600/output
Streaming final output from /tmp/task1.root.20210927.200913.245600/output...
"1 Ave & E 15 St"	795
"1 Ave & E 44 St"	219
"10 Ave & W 28 St"	422
"11 Ave & W 27 St"	354
"11 Ave & W 41 St"	461
"11 Ave & W 59 St"	242
"12 Ave & W 40 St"	217
"2 Ave & E 31 St"	588
"2 Ave & E 58 St"	125
"3 Ave & Schermerhorn St"	34
Removing temp directory /tmp/task1.root.20210927.200913.245600...
Traceback (most recent call last):
  File "task1.py", line 15, in <module>
    MRTask1.run()
  File "/usr/local/lib/python3.7/dist-packages/mrjob/job.py", line 616, in run
    cls().execute()
  File "/usr/local/lib/python3.7/dist-packages/mrjob/job.py", line 687, in execute
    self.run_job()
  File "/usr/local/lib/python3.7/dist-packages/mrjob/job.py", line 645, in run_job
    sel

## Task 6

Let's try to run the above MRJob examples as stand-alone applications. Please check again:
https://mrjob.readthedocs.io/en/latest/guides/writing-mrjobs.html#defining-steps

In [None]:
from mrjob.job import MRJob
from mrjob.step import MRStep
import re

WORD_RE = re.compile(r"[\w']+")


class MRMostUsedWord(MRJob):

  def mapper_get_words(self, _, line):
      # yield each word in the line
      for word in WORD_RE.findall(line):
          yield (word.lower(), 1)

  def combiner_count_words(self, word, counts):
      # sum the words we've seen so far
      yield (word, sum(counts))

  def reducer_count_words(self, word, counts):
      # send all (num_occurrences, word) pairs to the same reducer.
      # num_occurrences is so we can easily use Python's max() function.
      yield None, (sum(counts), word)

  # discard the key; it is just None
  def reducer_find_max_word(self, _, word_count_pairs):
      # each item of word_count_pairs is (count, word),
      # so yielding one results in key=counts, value=word
      yield max(word_count_pairs)

  def steps(self):
      return [
          MRStep(mapper=self.mapper_get_words,
                  combiner=self.combiner_count_words,
                  reducer=self.reducer_count_words),
          MRStep(reducer=self.reducer_find_max_word)
      ]

with open('')

taskMostUsed = MRMostUsedWord(args=[])
mostUsed = mr.runJob(lines, taskMostUsed)
list(mostUsed)

[(419, 'the')]