## MapReduce Introduction

The mrjob package is a Python package that simplifies the task of writing MapReduce jobs that can be tested locally and ru non a cluster.  Logic is written in a single class and simplifies the boiler plate to get to the heart of the logic.

In [1]:
!pip install mrjob

Collecting mrjob
  Downloading mrjob-0.7.4-py2.py3-none-any.whl.metadata (7.3 kB)
Downloading mrjob-0.7.4-py2.py3-none-any.whl (439 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m439.6/439.6 kB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: mrjob
Successfully installed mrjob-0.7.4


Here, we grab some text files as an input to our word count programs

In [2]:
!apt-get update
!apt install build-essential libpoppler-cpp-dev pkg-config python3-dev
!pip install pdftotext

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.82)] [1 InRelease 3,632 B/30% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.82)] [Connected to r2u.stat                                                                                                    Hit:2 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.82)] [Connected to r2u.stat                                                                                                    Get:3 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:6 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:7 htt

In [6]:
import pdftotext
!wget https://s21.q4cdn.com/399680738/files/doc_financials/2024/q4/META-Q4-2024-Earnings-Call-Transcript.pdf
with open("/content/META-Q4-2024-Earnings-Call-Transcript.pdf", "rb") as f:
    pdf = pdftotext.PDF(f)

# Save all text to a txt file.
with open('META.txt', 'w') as f:
    f.write("\n\n".join(pdf))

--2025-03-25 23:04:49--  https://s21.q4cdn.com/399680738/files/doc_financials/2024/q4/META-Q4-2024-Earnings-Call-Transcript.pdf
Resolving s21.q4cdn.com (s21.q4cdn.com)... 68.70.205.4, 68.70.205.3, 68.70.205.2, ...
Connecting to s21.q4cdn.com (s21.q4cdn.com)|68.70.205.4|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 127873 (125K) [application/pdf]
Saving to: ‘META-Q4-2024-Earnings-Call-Transcript.pdf.3’


2025-03-25 23:04:49 (3.82 MB/s) - ‘META-Q4-2024-Earnings-Call-Transcript.pdf.3’ saved [127873/127873]



### Basic Word Statistics

The following MapReduce job takes the input and counts the number of characters, words and lines.

In [9]:
%%file word_stat_count.py
# From http://mrjob.readthedocs.org/en/latest/guides/quickstart.html#writing-your-first-job

from mrjob.job import MRJob


class MRWordStatCount(MRJob):

    # This mapper gets called on each line of the input. In return,
    # it will return three different tuples for any given line.
    def mapper(self, _, line):
        yield "chars", len(line)
        yield "words", len(line.split(" "))
        yield "lines", 1

    '''
    "lines", 1
    "lines", 1
    "lines", 1
    "chars", 30
    "words", 10
    "chars", 20
    "words", 8
    "chars", 60
    "words", 20

    '''

    # All the tuples with the same key (chars, words or lines)
    # get passed to the reducer. The reducer aggregates this
    # data and returns a single tuple with the key and the sum
    # of all the values passed in
    def reducer(self, key, values):
        yield key, sum(values)

    '''
    "lines", 3
    "chars", 110
    '''

# lines below pass control over the command line arguments and execution to mrjob. Without them, your job will not work.
if __name__ == '__main__':
    MRWordStatCount.run()


Overwriting word_stat_count.py


In [10]:
!python word_stat_count.py -r local *.txt --output-dir=word_stat_count_out --no-output


No configs found; falling back on auto-configuration
No configs specified for local runner
Creating temp directory /tmp/word_stat_count.root.20250325.230647.670520
Running step 1 of 1...
job output is in word_stat_count_out
Removing temp directory /tmp/word_stat_count.root.20250325.230647.670520...


In [11]:
!cat word_stat_count_out/part-0000*


"chars"	52948
"lines"	865
"words"	9467


### Word Count

In [12]:
%%file word_count.py
# From http://mrjob.readthedocs.org/en/latest/guides/quickstart.html#writing-your-first-job

from mrjob.job import MRJob
import re

WORD_RE = re.compile(r"[\w']+")

class MRWordCount(MRJob):

    def mapper(self, _, line):
        for word in WORD_RE.findall(line):
            yield word.lower(), 1
    '''
    "thank", 1
    "you", 1
    "good", 1
    "morning", 1
    "operator", 1
    "thank", 1
    '''
    # optional combiner job
    #def combiner(self, word, counts):
    #    yield word, sum(counts)

    def reducer(self, key, values):
        yield key, sum(values)

    '''
    "thank", 2
    "you", 20
    "good", 50
    "monring", 1
    "operator", 10
    '''

if __name__ == '__main__':
    MRWordCount.run()


Writing word_count.py


In [13]:
!python word_count.py -r local *.txt --output-dir=word_count_out --no-output

No configs found; falling back on auto-configuration
No configs specified for local runner
Creating temp directory /tmp/word_count.root.20250325.230734.134832
Running step 1 of 1...
job output is in word_count_out
Removing temp directory /tmp/word_count.root.20250325.230734.134832...


In [14]:
!head -50 word_count_out/part-0000*

==> word_count_out/part-00000 <==
"0"	4
"000"	1
"000x"	1
"02"	1
"1"	9
"10"	5
"100"	1
"11"	2
"114"	1
"119"	1
"12"	3
"13"	3
"14"	3
"15"	3
"16"	2
"17"	1
"18"	3
"19"	2
"1gw"	1
"2"	2
"20"	2
"2024"	8
"2025"	24
"21"	3
"22"	1
"23"	2
"25"	5
"26"	2
"27"	2
"28"	2
"29th"	1
"2gw"	1
"3"	12
"320"	1
"39"	1
"4"	14
"41"	1
"46"	1
"47"	1
"48"	3
"5"	7
"519"	1
"55"	1
"55b"	1
"6"	4
"60"	3
"600"	1
"65"	2
"67"	1
"7"	1

==> word_count_out/part-00001 <==
"each"	3
"earlier"	2
"early"	7
"earnings"	4
"easier"	2
"easy"	2
"ecosystem"	1
"editing"	1
"edits"	1
"educational"	1
"effect"	4
"effective"	4
"effectiveness"	1
"efficiencies"	2
"efficiency"	7
"efficient"	4
"efficiently"	2
"efforts"	4
"either"	2
"electronics"	1
"emerged"	1
"emotional"	1
"employee"	6
"employees"	1
"enable"	2
"enabled"	1
"enabling"	2
"end"	5
"ended"	2
"ending"	1
"energies"	1
"engage"	3
"engagement"	9
"engaging"	1
"engineer"	3
"engineering"	2
"engineers"	3
"enhance"	1
"enhancements"	1
"enjoy"	1
"ensure"	3
"entering"	1
"entertainment"	1
"entirely"	1
"

### Multi-step Job / Most Frequent Word

**SQL equivalent:**
```
with word_count as (
    select
       count(*) as count, word
    from books
    group by
       word)

select
   word, count
from word_count
order by count desc
limit 1
```

In [15]:
%%file word_max_count.py
# From http://mrjob.readthedocs.org/en/latest/guides/quickstart.html#writing-your-first-job

from mrjob.job import MRJob
from mrjob.step import MRStep
import re

WORD_RE = re.compile(r"[\w']+")

class MRMaxWordCount(MRJob):

    # These three functions comprise the first step of the
    # MapReduce Job
    # Count # of appears by each word
    def mapper(self, _, line):
        for word in WORD_RE.findall(line):
            yield word.lower(), 1
    '''
    "thank", 1
    "you", 1
    "thank", 1
    .......
    '''

    def combiner(self, word, counts):
        yield word, sum(counts)

    '''
    "thank", 2
    "you", 1
    ......
    '''

    def reducer(self, word, counts):
        yield None, (sum(counts), word)

    '''
    None, (2, "thank")
    None, (1, "you")
    None, (5, "meta")
    None, (2, "facebook")
    '''

    # This represents the second step of the Job. Here, we are
    # only running a reducer (you can think of this as if the
    # mapper and combiner return the identity value)
    def reducer_max_word(self, _, pairs):
        yield max(pairs)

    # The steps function defines the sequence of steps
    # take the max # of appeared word
    def steps(self):
        return[
            MRStep(
                mapper=self.mapper,
                combiner=self.combiner,
                reducer=self.reducer),
            MRStep(
                reducer=self.reducer_max_word
            )
        ]


if __name__ == '__main__':
    MRMaxWordCount.run()


Writing word_max_count.py


In [16]:
!python word_max_count.py -r local *.txt --output-dir=word_max_count_out --no-output


No configs found; falling back on auto-configuration
No configs specified for local runner
Creating temp directory /tmp/word_max_count.root.20250325.230810.995417
Running step 1 of 2...
Running step 2 of 2...
job output is in word_max_count_out
Removing temp directory /tmp/word_max_count.root.20250325.230810.995417...


In [17]:
!cat word_max_count_out/part-0000*

348	"to"
