In [1]:
%load_ext dockermagic

# MRJob

- https://github.com/Yelp/mrjob
- https://mrjob.readthedocs.io/en/stable/

## Local execution

In [2]:
%%bash

# local install
pip3 install mrjob



In [3]:
%mkdir mrjob

mkdir: mrjob: File exists


In [4]:
%%writefile mrjob/mrwordcount.py
import re
from mrjob.job import MRJob

WORD_RE = re.compile(r"[\w']+")

class MRWordCount(MRJob):

    def mapper(self, _, line):
        for word in WORD_RE.findall(line):
            word = word.lower()
            yield word,1

    def combiner(self, word, counts):
        yield word, sum(counts)

    def reducer(self, word, counts):
        yield word, sum(counts)

if __name__ == '__main__':
    MRWordCount.run()

Overwriting mrjob/mrwordcount.py


In [5]:
%%bash

wget -q -c http://www.gutenberg.org/files/996/996-0.txt -O mrjob/donquixote.txt

In [6]:
%%bash

cd mrjob

# inline runner (default)
# python3 mrwordcount.py donquixote.txt > donquixote-output.txt

# local runner
python3 mrwordcount.py -r local donquixote.txt > donquixote-output.txt

# head output
head donquixote-output.txt

"painted"	23
"painter"	11
"painters"	1
"painting"	8
"paints"	1
"pair"	45
"pairs"	3
"palace"	25
"palaces"	13
"palacios"	1


No configs found; falling back on auto-configuration
No configs specified for local runner
Creating temp directory /var/folders/x8/88k3_7f167g6x76m6hjxpq880000gn/T/mrwordcount.tiagoferreto.20210111.004849.291701
Running step 1 of 1...
job output is in /var/folders/x8/88k3_7f167g6x76m6hjxpq880000gn/T/mrwordcount.tiagoferreto.20210111.004849.291701/output
Streaming final output from /var/folders/x8/88k3_7f167g6x76m6hjxpq880000gn/T/mrwordcount.tiagoferreto.20210111.004849.291701/output...
Removing temp directory /var/folders/x8/88k3_7f167g6x76m6hjxpq880000gn/T/mrwordcount.tiagoferreto.20210111.004849.291701...


## Cluster execution

### Setup

In [7]:
%%bash

# install in hadoop cluster
docker exec -u hadoop hadoop pip3 install mrjob
docker exec -u hadoop hadoop1 pip3 install mrjob
docker exec -u hadoop hadoop2 pip3 install mrjob
docker exec -u hadoop hadoop3 pip3 install mrjob

Collecting mrjob
  Downloading https://files.pythonhosted.org/packages/8e/58/fc28ab743aba16e90736ad4e29694bd2adaf7b879376ff149306d50c4e90/mrjob-0.7.4-py2.py3-none-any.whl (439kB)
Collecting PyYAML>=3.10 (from mrjob)
  Downloading https://files.pythonhosted.org/packages/64/c2/b80047c7ac2478f9501676c988a5411ed5572f35d1beff9cae07d321512c/PyYAML-5.3.1.tar.gz (269kB)
Building wheels for collected packages: PyYAML
  Running setup.py bdist_wheel for PyYAML: started
  Running setup.py bdist_wheel for PyYAML: finished with status 'done'
  Stored in directory: /home/hadoop/.cache/pip/wheels/a7/c1/ea/cf5bd31012e735dc1dfea3131a2d5eae7978b251083d6247bd
Successfully built PyYAML
Installing collected packages: PyYAML, mrjob
Successfully installed PyYAML-5.3.1 mrjob-0.7.4
Collecting mrjob
  Downloading https://files.pythonhosted.org/packages/8e/58/fc28ab743aba16e90736ad4e29694bd2adaf7b879376ff149306d50c4e90/mrjob-0.7.4-py2.py3-none-any.whl (439kB)
Collecting PyYAML>=3.10 (from mrjob)
  Downloading htt

In [8]:
%%bash

docker cp mrjob hadoop:/opt
docker exec hadoop chown -R hadoop:hadoop /opt/mrjob

In [9]:
%%dockerexec -u hadoop hadoop
source /opt/envvars.sh

cd /opt/mrjob

# create directory in HDFS and send file
hdfs dfs -mkdir donquixote
hdfs dfs -put donquixote.txt donquixote

# run in hadoop
python3 mrwordcount.py -r hadoop --output-dir donquixote-output hdfs:///user/hadoop/donquixote

2021-01-11 00:50:12,819 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /opt/hadoop/bin...
Found hadoop binary: /opt/hadoop/bin/hadoop
Using Hadoop version 3.2.1
Looking for Hadoop streaming jar in /opt/hadoop...
Found Hadoop streaming jar: /opt/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.2.1.jar
Creating temp directory /tmp/mrwordcount.hadoop.20210111.005013.668024
uploading working dir files to hdfs:///user/hadoop/tmp/mrjob/mrwordcount.hadoop.20210111.005013.668024/files/wd...
Copying other local files to hdfs:///user/hadoop/tmp/mrjob/mrwordcount.hadoop.20210111.005013.668024/files/
Running step 1 of 1...
  packageJobJar: [/tmp/hadoop-unjar6657991860716594815/] [] /tmp/streamjob7877810569761033444.jar tmpDir=null
  Connecting to ResourceManager at hadoop/172.17.0.2:8032
  Connecting to Applicat

In [10]:
%%dockerexec -u hadoop -w /opt/mrjob hadoop
source /opt/envvars.sh

# get output
hdfs dfs -getmerge donquixote-output donquixote-output.txt

# head output
head donquixote-output.txt

2021-01-11 00:51:50,311 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
"0"	2
"000"	1
"1"	45
"104k"	1
"105k"	1
"106k"	1
"1085"	1
"108k"	1
"109k"	2
"10k"	1


## MapReduce patterns

### Datasets

- weblogs.csv
- books from Gutenberg project
- departments.csv and employees.csv

### 1. Count

In [11]:
%%writefile mrjob/1_count_weblog.py
#Total number of times each page is visited
from mrjob.job import MRJob

class MRURLCount(MRJob):

    def mapper(self, _, line):
        #Split the line with comma separated fields
        data = line.split(',')

        #Parse line
        ip = data[0].strip()
        #Check if it's not the header line
        if ip == 'IP' : return
        time = data[1].strip()
        request = data[2].strip()
        status = data[3].strip()
        visit = data[4].strip()

        #Extract site
        url = request.split(' ')[1]

        #Emit url and 1
        yield url, 1

    def reducer(self, key, list_of_values):
        yield key,sum(list_of_values)

if __name__ == '__main__':
    MRURLCount.run()

Overwriting mrjob/1_count_weblog.py


In [12]:
%%bash

python3 mrjob/1_count_weblog.py datasets/weblog.csv 2> /dev/null | head

"/js/vendor/moment.min.js"	173
"/login.php"	3298
"/contestsubmit.php?id=43"	5
"/contestsubmit.php?id=44"	1
"/contestsubmit.php?id=45"	23
"/countdown.php"	1
"/countdown.php?name=Another%20Multiplication%20Game"	1
"/countdown.php?name=RUET%20OJ%20Server%20Testing%20Contest"	71
"/createadmin.php"	4
"/css/bootstrap.min.css"	404


### 2. Max value

In [13]:
%%writefile mrjob/2_max_weblog.py
# Return most visited URL
from mrjob.job import MRJob
from mrjob.step import MRStep

class MRURLMax(MRJob) :

    def mapper1(self, _, line) :
        #Split the line with comma separated fields
        data = line.split(',')

        #Parse line
        ip = data[0].strip()
        #Check if it's not the header line
        if ip == 'IP' : return
        time = data[1].strip()
        request = data[2].strip()
        status = data[3].strip()
        visit = data[4].strip()

        #Extract site
        url = request.split(' ')[1]

        #Emit url and 1
        yield url, 1

    def reducer1(self, key, list_of_values) :
        yield None, (sum(list_of_values), key)

    def reducer2(self, key, list_of_values) :
        yield max(list_of_values)

    def steps(self) :
        return [MRStep(mapper=self.mapper1, reducer=self.reducer1),
        MRStep(reducer=self.reducer2)]

if __name__ == '__main__' :
    MRURLMax.run()

Overwriting mrjob/2_max_weblog.py


In [14]:
%%bash

python3 mrjob/2_max_weblog.py datasets/weblog.csv 2> /dev/null

3298	"/login.php"


### 3. Average

In [15]:
%%writefile mrjob/3_average_weblog.py
#Average visit time
from mrjob.job import MRJob

class MRAvgVisitTime(MRJob):

    def mapper(self, _, line):
        #Split the line with comma separated fields
        data = line.split(',')

        #Parse line
        ip = data[0].strip()
        #Check if it's not the header line
        if ip == 'IP' : return
        time = data[1].strip()
        request = data[2].strip()
        status = data[3].strip()
        visit = float(data[4].strip())

        #Extract site
        url = request.split(' ')[1]

        #Emit url and visit time
        yield url, visit

    def reducer(self, key, list_of_values):
        count = 0
        total = 0.0
        for x in list_of_values:
            total = total + x
            count = count + 1

        avglen = ("%.2f" % (total/count))
        yield key,avglen

if __name__ == '__main__':
    MRAvgVisitTime.run()

Overwriting mrjob/3_average_weblog.py


In [16]:
%%bash

python3 mrjob/3_average_weblog.py datasets/weblog.csv 2> /dev/null | head

"/js/vendor/moment.min.js"	"279.61"
"/login.php"	"252.70"
"/css/bootstrap.min.css.map"	"25.87"
"/css/font-awesome.min.css"	"256.18"
"/css/main.css"	"240.80"
"/css/normalize.css"	"228.38"
"/css/style.css"	"261.45"
"/dboot/js/bootstrap.min.js"	"428.69"
"/dcss/bootstrap-datetimepicker.min.css"	"103.50"
"/description.php"	"117.08"


### 4. Top N

In [17]:
%%writefile mrjob/4_topn_weblog.py
#Top 3 visited pages
from mrjob.job import MRJob
from mrjob.step import MRStep

class MRTopN(MRJob):

    def mapper(self, _, line):
        #Split the line with comma separated fields
        data = line.split(',')

        #Parse line
        ip = data[0].strip()
        #Check if it's not the header line
        if ip == 'IP' : return
        time = data[1].strip()
        request = data[2].strip()
        status = data[3].strip()
        visit = data[4].strip()

        #Extract url
        url = request.split(' ')[1]

        #Emit url and 1
        yield url, 1

    def reducer1(self, key, list_of_values):
        total_count = sum(list_of_values)
        yield None, (total_count, key)

    def reducer2(self, _, list_of_values):
        N=3
        list_of_values = sorted(list(list_of_values), reverse=True)
        return list_of_values[:N]

    def steps(self):
        return [MRStep(mapper=self.mapper, reducer=self.reducer1),
        MRStep(reducer=self.reducer2)]

if __name__ == '__main__':
    MRTopN.run()

Overwriting mrjob/4_topn_weblog.py


In [18]:
%%bash

python3 mrjob/4_topn_weblog.py datasets/weblog.csv 2> /dev/null

3298	"/login.php"
2653	"/home.php"
1417	"/js/vendor/modernizr-2.8.3.min.js"


### 5. Filter

In [19]:
%%writefile mrjob/5_filter_weblog.py
#Filter accesses to "/login.php?value=fail" on Feb/2018
from mrjob.job import MRJob

class MRFilter(MRJob):

    def mapper(self, _, line):
        #Split the line with comma separated fields
        data = line.split(',')

        #Parse line
        ip = data[0].strip()
        #Check if it's not the header line
        if ip == 'IP' : return
        time = data[1].strip()
        request = data[2].strip()
        status = data[3].strip()
        visit = data[4].strip()

        #Extract site
        url = request.split(' ')[1]

        #Extract month/year
        date = time[4:12]

        #Filter access to "/login.php?value=fail" on Feb/2018
        if url == "/login.php?value=fail" and date == "Feb/2018" :
            yield url, (time, ip, visit)

if __name__ == '__main__':
    MRFilter.run()

Overwriting mrjob/5_filter_weblog.py


In [20]:
%%bash

python3 mrjob/5_filter_weblog.py datasets/weblog.csv 2> /dev/null

"/login.php?value=fail"	["[17/Feb/2018:20:08:56", "10.128.2.1", "185.3492762625749"]
"/login.php?value=fail"	["[17/Feb/2018:20:20:28", "10.130.2.1", "231.88065750818035"]
"/login.php?value=fail"	["[18/Feb/2018:19:40:31", "10.128.2.1", "119.2142303257265"]
"/login.php?value=fail"	["[18/Feb/2018:19:40:35", "10.128.2.1", "147.46020967961022"]
"/login.php?value=fail"	["[18/Feb/2018:19:40:38", "10.131.0.1", "257.82366658285235"]
"/login.php?value=fail"	["[18/Feb/2018:19:40:39", "10.131.0.1", "169.11496206962957"]
"/login.php?value=fail"	["[20/Feb/2018:11:14:24", "10.130.2.1", "330.90262403457416"]
"/login.php?value=fail"	["[22/Feb/2018:06:26:02", "10.131.0.1", "766.0191400216718"]
"/login.php?value=fail"	["[22/Feb/2018:06:26:08", "10.128.2.1", "34.905177651134075"]
"/login.php?value=fail"	["[22/Feb/2018:06:26:10", "10.128.2.1", "49.17624847339196"]
"/login.php?value=fail"	["[22/Feb/2018:06:26:11", "10.128.2.1", "268.1915691580601"]
"/login.php?value=fail"	["[22/Feb/2018:06:26:33", "10.130.2

### 6. Distinct

In [21]:
%%writefile mrjob/6_distinct_weblog.py
#Distinct IPs
from mrjob.job import MRJob

class MRDistinct(MRJob):

    def mapper(self, _, line):
        #Split the line with comma separated fields
        data = line.split(',')

        #Parse line
        ip = data[0].strip()
        #Check if it's not the header line
        if ip == 'IP' : return
        time = data[1].strip()
        request = data[2].strip()
        status = data[3].strip()
        visit = data[4].strip()

        yield ip, None

    def reducer(self, key, list_of_values) :
        yield key, None

if __name__ == '__main__':
    MRDistinct.run()

Overwriting mrjob/6_distinct_weblog.py


In [22]:
%%bash

python3 mrjob/6_distinct_weblog.py datasets/weblog.csv 2> /dev/null

"10.131.0.1"	null
"10.131.2.1"	null
"10.129.2.1"	null
"10.130.2.1"	null
"10.128.2.1"	null


### 7. Binning

In [41]:
%%writefile mrjob/7_binning_weblog.py
#Create bins for different status codes for 20/Feb/2018
from mrjob.job import MRJob

class MRBinning(MRJob):

    def mapper(self, _, line):
        #Split the line with comma separated fields
        data = line.split(',')

        #Parse line
        ip = data[0].strip()
        #Check if it's not the header line
        if ip == 'IP' : return
        time = data[1].strip()
        request = data[2].strip()
        status = data[3].strip()
        visit = data[4].strip()
        
        #Extract month/year
        date = time[1:12]

        #Filter accesses on 20/Feb/2018
        if date == "20/Feb/2018" :
            yield status, (time, request, ip)

    def reducer(self, key, list_of_values):
        yield key, (list(list_of_values))

if __name__ == '__main__':
    MRBinning.run()

Overwriting mrjob/7_binning_weblog.py


In [42]:
%%bash

python3 mrjob/7_binning_weblog.py datasets/weblog.csv 2> /dev/null

"304"	[["[20/Feb/2018:09:23:18", "GET /css/bootstrap.min.css HTTP/1.1", "10.128.2.1"], ["[20/Feb/2018:09:23:18", "GET /css/font-awesome.min.css HTTP/1.1", "10.128.2.1"], ["[20/Feb/2018:09:23:18", "GET /css/normalize.css HTTP/1.1", "10.128.2.1"], ["[20/Feb/2018:09:23:18", "GET /css/main.css HTTP/1.1", "10.128.2.1"], ["[20/Feb/2018:09:23:18", "GET /css/style.css HTTP/1.1", "10.128.2.1"], ["[20/Feb/2018:09:23:18", "GET /js/vendor/modernizr-2.8.3.min.js HTTP/1.1", "10.131.0.1"], ["[20/Feb/2018:09:23:18", "GET /js/vendor/jquery-1.12.0.min.js HTTP/1.1", "10.128.2.1"], ["[20/Feb/2018:09:23:19", "GET /bootstrap-3.3.7/js/bootstrap.min.js HTTP/1.1", "10.130.2.1"], ["[20/Feb/2018:09:23:31", "GET /js/vendor/moment.min.js HTTP/1.1", "10.128.2.1"]]
"404"	[["[20/Feb/2018:01:48:55", "GET /robots.txt HTTP/1.1", "10.130.2.1"], ["[20/Feb/2018:13:58:09", "GET /robots.txt HTTP/1.1", "10.130.2.1"], ["[20/Feb/2018:16:08:45", "GET /robots.txt HTTP/1.1", "10.130.2.1"]]
"302"	[["[20/Feb/2018:01:50:40", "GET / H

### 8. Inverted index

In [43]:
%%writefile mrjob/8_invertedindex_books.py
#Inverted Index
from mrjob.job import MRJob
import os

class MRInvertedIndex(MRJob):

    def mapper(self, _, line):
        fileName = os.environ['mapreduce_map_input_file']

        words = line.split()
        for word in words:
            yield word, fileName

    def reducer(self, key, list_of_values):
        docs = set()
        for x in list_of_values :
            docs.add(x)
        yield key,list(docs)

if __name__ == '__main__':
    MRInvertedIndex.run()

Overwriting mrjob/8_invertedindex_books.py


In [45]:
%%bash

python3 mrjob/8_invertedindex_books.py datasets/books 2> /dev/null | head -n 40

"one's"	["file://datasets/books/book3.txt"]
"one),"	["file://datasets/books/book2.txt"]
"one,"	["file://datasets/books/book1.txt", "file://datasets/books/book3.txt", "file://datasets/books/book2.txt"]
"one,\""	["file://datasets/books/book3.txt"]
"one,\u201d"	["file://datasets/books/book1.txt", "file://datasets/books/book2.txt"]
"one--and"	["file://datasets/books/book3.txt"]
"one--tell"	["file://datasets/books/book3.txt"]
"one--the"	["file://datasets/books/book1.txt", "file://datasets/books/book3.txt", "file://datasets/books/book2.txt"]
"one-and-twenty.\u201d"	["file://datasets/books/book1.txt"]
"one-armed"	["file://datasets/books/book2.txt"]
"one-half"	["file://datasets/books/book2.txt"]
"one-handed"	["file://datasets/books/book2.txt"]
"one-legged"	["file://datasets/books/book2.txt"]
"one-sided"	["file://datasets/books/book2.txt"]
"one-third"	["file://datasets/books/book2.txt"]
"one."	["file://datasets/books/book1.txt", "file://datasets/books/book3.txt", "file://datasets/books/book2.tx

### 9. Sort

In [49]:
%%writefile mrjob/9_sort_weblog.py
# Sort visit times in descending order
from mrjob.job import MRJob
class MRSortVisit(MRJob) :
    def mapper(self, _, line):
        #Split the line with comma separated fields
        data = line.split(',')

        #Parse line
        ip = data[0].strip()
        #Check if it's not the header line
        if ip == 'IP' : return
        time = data[1].strip()
        request = data[2].strip()
        status = data[3].strip()
        visit = data[4].strip()

        #Extract site
        url = request.split(' ')[1]

        yield None, (visit, (time, url, ip))

    def reducer(self, key, list_of_values):
        l = [(float(v), content) for v, content in list_of_values]
        l.sort(reverse=True)
        return l

if __name__ == '__main__':
    MRSortVisit.run()

Overwriting mrjob/9_sort_weblog.py


In [50]:
%%bash

python3 mrjob/9_sort_weblog.py datasets/weblog.csv 2> /dev/null | head -n 20

8742.057189992775	["[29/Jan/2018:20:33:45", "/login.php", "10.128.2.1"]
6287.629575490433	["[29/Jan/2018:20:55:42", "/login.php", "10.128.2.1"]
5711.326270236046	["[29/Jan/2018:20:35:56", "/js/vendor/modernizr-2.8.3.min.js", "10.128.2.1"]
5283.59276598012	["[09/Nov/2017:19:53:43", "/js/vendor/modernizr-2.8.3.min.js", "10.131.0.1"]
5123.493680523937	["[29/Jan/2018:20:47:45", "/js/vendor/modernizr-2.8.3.min.js", "10.128.2.1"]
5053.873518356491	["[13/Nov/2017:09:11:14", "/login.php", "10.131.2.1"]
4642.003763780086	["[29/Jan/2018:20:29:30", "/js/vendor/modernizr-2.8.3.min.js", "10.131.0.1"]
4615.5689039456365	["[29/Jan/2018:20:32:40", "/login.php", "10.128.2.1"]
4421.817090530212	["[29/Jan/2018:20:35:37", "/js/vendor/modernizr-2.8.3.min.js", "10.131.0.1"]
3857.8085677731	["[24/Nov/2017:08:29:49", "/css/font-awesome.min.css", "10.131.2.1"]
3725.3713482396056	["[29/Jan/2018:20:34:22", "/login.php", "10.131.0.1"]
3667.904220839362	["[12/Nov/2017:19:44:01", "/archive.php", "10.130.2.1"]
3570.

### 10. Joins

#### InnerJoin

In [53]:
%%writefile mrjob/10_innerjoin_db.py
from mrjob.job import MRJob
import os

class MRInnerJoin(MRJob) :
    def mapper(self, _, line):
        data = line.split(',')

        filename = os.environ['mapreduce_map_input_file']

        if 'employees.csv' in filename :
            dep_no = data[2]
            yield dep_no, ('Employee', data)
        elif 'departments.csv' in filename:
            dep_no = data[0]
            yield dep_no, ('Department', data)

    def reducer(self, key, list_of_values) :
        values = list(list_of_values)
        employees = []
        departments = []
        for v in values:
            if v[0] == 'Employee' :
                employees.append(v)
            elif v[0] == 'Department' :
                departments.append(v)

        # Inner Join
        for e in employees :
            for d in departments :
                yield key, (e+d)

if __name__ == '__main__' :
    MRInnerJoin.run()

Overwriting mrjob/10_innerjoin_db.py


In [54]:
%%bash

python3 mrjob/10_innerjoin_db.py datasets/employees.csv datasets/departments.csv 2> /dev/null | head

"d005"	["Employee", ["10001", "Georgi Facello", "d005", "1986-06-26"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10006", "Anneke Preusig", "d005", "1989-06-02"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10008", "Saniya Kalloufi", "d005", "1994-09-15"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10012", "Patricio Bridgland", "d005", "1992-12-18"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10014", "Berni Genin", "d005", "1987-03-11"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10018", "Kazuhide Peha", "d005", "1987-04-03"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10021", "Ramzi Erde", "d005", "1988-02-10"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10022", "Shahaf Famili", "d005", "1995-08-22"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10023", "Bojan Montemayor", "d005", "1989-12-17"], "Department", ["d005", "Development"]]
"d005"	["Emp

#### LeftOuterJoin

In [55]:
%%writefile mrjob/11_leftouterjoin_db.py
from mrjob.job import MRJob
import os

class MRLeftOuterJoin(MRJob) :
    def mapper(self, _, line):
        data = line.split(',')

        filename = os.environ['mapreduce_map_input_file']

        if 'employees.csv' in filename :
            dep_no = data[2]
            yield dep_no, ('Employee', data)
        elif 'departments.csv' in filename:
            dep_no = data[0]
            yield dep_no, ('Department', data)

    def reducer(self, key, list_of_values) :
        # yield None, list(list_of_values)
        values = list(list_of_values)
        employees = []
        departments = []
        for v in values:
            if v[0] == 'Employee' :
                employees.append(v)
            elif v[0] == 'Department' :
                departments.append(v)

        # Left Outer Join
        for e in employees :
            if len(departments) > 0 :
                for d in departments :
                    yield key, (e+d)
            else :
                yield key, (e)

if __name__ == '__main__' :
    MRLeftOuterJoin.run()

Overwriting mrjob/11_leftouterjoin_db.py


In [56]:
%%bash

python3 mrjob/11_leftouterjoin_db.py datasets/employees.csv datasets/departments.csv 2> /dev/null | head

"d005"	["Employee", ["10001", "Georgi Facello", "d005", "1986-06-26"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10006", "Anneke Preusig", "d005", "1989-06-02"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10008", "Saniya Kalloufi", "d005", "1994-09-15"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10012", "Patricio Bridgland", "d005", "1992-12-18"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10014", "Berni Genin", "d005", "1987-03-11"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10018", "Kazuhide Peha", "d005", "1987-04-03"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10021", "Ramzi Erde", "d005", "1988-02-10"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10022", "Shahaf Famili", "d005", "1995-08-22"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10023", "Bojan Montemayor", "d005", "1989-12-17"], "Department", ["d005", "Development"]]
"d005"	["Emp

#### RightOuterJoin

In [57]:
%%writefile mrjob/12_rightouterjoin_db.py
from mrjob.job import MRJob
import os

class MRRightOuterJoin(MRJob) :
    def mapper(self, _, line):
        data = line.split(',')

        filename = os.environ['mapreduce_map_input_file']
        
        if 'employees.csv' in filename :
            dep_no = data[2]
            yield dep_no, ('Employee', data)
        elif 'departments.csv' in filename:
            dep_no = data[0]
            yield dep_no, ('Department', data)

    def reducer(self, key, list_of_values) :
        # yield None, list(list_of_values)
        values = list(list_of_values)
        employees = []
        departments = []
        for v in values:
            if v[0] == 'Employee' :
                employees.append(v)
            elif v[0] == 'Department' :
                departments.append(v)

        # Right Outer Join
        for d in departments :
            if len(employees) > 0 :
                for e in employees :
                    yield key, (e+d)
            else :
                yield key, (d)

if __name__ == '__main__' :
    MRRightOuterJoin.run()

Overwriting mrjob/12_rightouterjoin_db.py


In [58]:
%%bash

python3 mrjob/12_rightouterjoin_db.py datasets/employees.csv datasets/departments.csv 2> /dev/null | head

"d005"	["Employee", ["10001", "Georgi Facello", "d005", "1986-06-26"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10006", "Anneke Preusig", "d005", "1989-06-02"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10008", "Saniya Kalloufi", "d005", "1994-09-15"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10012", "Patricio Bridgland", "d005", "1992-12-18"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10014", "Berni Genin", "d005", "1987-03-11"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10018", "Kazuhide Peha", "d005", "1987-04-03"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10021", "Ramzi Erde", "d005", "1988-02-10"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10022", "Shahaf Famili", "d005", "1995-08-22"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10023", "Bojan Montemayor", "d005", "1989-12-17"], "Department", ["d005", "Development"]]
"d005"	["Emp

#### FullOuterJoin

In [59]:
%%writefile mrjob/13_fullouterjoin_db.py
from mrjob.job import MRJob
import os

class MRFullOuterJoin(MRJob) :
    def mapper(self, _, line):
        data = line.split(',')

        filename = os.environ['mapreduce_map_input_file']

        if 'employees.csv' in filename :
            dep_no = data[2]
            yield dep_no, ('Employee', data)
        elif 'departments.csv' in filename:
            dep_no = data[0]
            yield dep_no, ('Department', data)

    def reducer(self, key, list_of_values) :
        values = list(list_of_values)
        employees = []
        departments = []
        for v in values:
            if v[0] == 'Employee' :
                employees.append(v)
            elif v[0] == 'Department' :
                departments.append(v)

        # Full Outer Join
        if len(employees) > 0 :
            for e in employees :
                if len(departments) > 0 :
                    for d in departments :
                        yield key, (e+d)
                else :
                    yield key, (e)
        else :
            yield None, (d)

if __name__ == '__main__' :
    MRFullOuterJoin.run()

Overwriting mrjob/13_fullouterjoin_db.py


In [60]:
%%bash

python3 mrjob/13_fullouterjoin_db.py datasets/employees.csv datasets/departments.csv 2> /dev/null | head

"d005"	["Employee", ["10001", "Georgi Facello", "d005", "1986-06-26"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10006", "Anneke Preusig", "d005", "1989-06-02"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10008", "Saniya Kalloufi", "d005", "1994-09-15"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10012", "Patricio Bridgland", "d005", "1992-12-18"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10014", "Berni Genin", "d005", "1987-03-11"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10018", "Kazuhide Peha", "d005", "1987-04-03"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10021", "Ramzi Erde", "d005", "1988-02-10"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10022", "Shahaf Famili", "d005", "1995-08-22"], "Department", ["d005", "Development"]]
"d005"	["Employee", ["10023", "Bojan Montemayor", "d005", "1989-12-17"], "Department", ["d005", "Development"]]
"d005"	["Emp