Map-Reduce Using MRJob
======================

Sales Data
----------

Here is the sales data we are going to analyze.

In [35]:
%%writefile sales.txt
#ID    Date           Store   State  Product    Amount
101    11/13/2014     100     WA     331        300.00
104    11/18/2014     700     OR     329        450.00
102    11/15/2014     203     CA     321        200.00
106    11/19/2014     202     CA     331        330.00
103    11/17/2014     101     WA     373        750.00
105    11/19/2014     202     CA     321        200.00

Overwriting sales.txt


Transactions By State
---------------------

Q: How many transactions were there for each state?

- Create the `SaleCount.py` file.

In [36]:
%%writefile SaleCount.py
from mrjob.job import MRJob
class SaleCount(MRJob):
    def mapper(self, _, line):
        if line.startswith('#'):
            return
        fields = line.split()
        state = fields[3]
        yield (state, 1)
    def reducer(self, state, counts): 
        yield state, sum(counts)
if __name__ == '__main__': 
    SaleCount.run()

Overwriting SaleCount.py


- Run it locally.

In [37]:
!python SaleCount.py sales.txt > output.txt

No configs found; falling back on auto-configuration
Creating temp directory /var/folders/sj/gwpxmb714733ysxdwp8209wh0000gn/T/SaleCount.srikanajan.20160912.041155.380633
Running step 1 of 1...
Streaming final output from /var/folders/sj/gwpxmb714733ysxdwp8209wh0000gn/T/SaleCount.srikanajan.20160912.041155.380633/output...
Removing temp directory /var/folders/sj/gwpxmb714733ysxdwp8209wh0000gn/T/SaleCount.srikanajan.20160912.041155.380633...


- Check the output.

In [38]:
!cat output.txt

"CA"	3
"OR"	1
"WA"	2


### Suppose instead of counting transactions by state we want to count transactions by store. What should we change in the code above?

In [39]:
%%writefile SaleCountByStore.py
from mrjob.job import MRJob
class SaleCountByStore(MRJob):
    def mapper(self, _, line):
        if line.startswith('#'):
            return
        fields = line.split()
        store = fields[2]
        yield (store, 1)
    def reducer(self, store, counts): 
        yield store, sum(counts)
if __name__ == '__main__': 
    SaleCountByStore.run()

Overwriting SaleCountByStore.py


In [40]:
!python SaleCountByStore.py sales.txt > output.txt
!cat output.txt

No configs found; falling back on auto-configuration
Creating temp directory /var/folders/sj/gwpxmb714733ysxdwp8209wh0000gn/T/SaleCountByStore.srikanajan.20160912.041159.380608
Running step 1 of 1...
Streaming final output from /var/folders/sj/gwpxmb714733ysxdwp8209wh0000gn/T/SaleCountByStore.srikanajan.20160912.041159.380608/output...
Removing temp directory /var/folders/sj/gwpxmb714733ysxdwp8209wh0000gn/T/SaleCountByStore.srikanajan.20160912.041159.380608...
"100"	1
"101"	1
"202"	2
"203"	1
"700"	1


In [41]:
%%writefile WordCount.py
from mrjob.job import MRJob
import re
WORD_RE = re.compile(r"[\w']+")
class WordCount(MRJob):
    def mapper(self, _, line):
        for word in WORD_RE.findall(line):
            yield word.lower(), 1
    def reducer(self, word, counts): 
        yield word, sum(counts)
if __name__ == '__main__': 
    WordCount.run()

Overwriting WordCount.py


- Run it locally.

In [42]:
!python WordCount.py input.txt > output.txt

No configs found; falling back on auto-configuration
Creating temp directory /var/folders/sj/gwpxmb714733ysxdwp8209wh0000gn/T/WordCount.srikanajan.20160912.041200.986804
Running step 1 of 1...
Streaming final output from /var/folders/sj/gwpxmb714733ysxdwp8209wh0000gn/T/WordCount.srikanajan.20160912.041200.986804/output...
Removing temp directory /var/folders/sj/gwpxmb714733ysxdwp8209wh0000gn/T/WordCount.srikanajan.20160912.041200.986804...


- Check the output.

In [43]:
!cat output.txt

"again"	1
"hello"	2
"is"	2
"line"	2
"second"	1
"the"	2
"third"	1
"this"	2
"world"	1


Map-Only Job Observations
-------------------------

- Map-only jobs are the multi-machine equivalent of the
  multi-threading and multi-processing exercises we did earlier.

- Like our multi-threading and multi-processing applications, map-only
  jobs break up a larger problem into smaller chunks and then work on
  a particular chunk.

- Any time we have a problem where we don't need to reconcile or
  consolidate records we should use map-only jobs.

- Map-only jobs are much faster than regular Map-Reduce jobs.
