## Устанавливаем библиотеку

In [1]:
! pip install mrjob

Collecting mrjob
  Downloading mrjob-0.7.4-py2.py3-none-any.whl (439 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m439.6/439.6 KB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Installing collected packages: mrjob
Successfully installed mrjob-0.7.4
[0m

## Пишем код

In [76]:
%%file wordcount.py
# %%file is an Ipython magic function that saves the code cell as a file

from mrjob.job import MRJob # import the mrjob library

class MRSongCount(MRJob):
    #В маппер приходят отдельные строки
    #Например, one, two, three
    #Например, one, two, five
    #Например, two, three, four
    def mapper(self, _, line):
        words = line.lower().replace("(", '').replace(")", '').replace("!", '').replace(".", '').replace(",", '').split()

        word_map = {}
        for word in words:
            if word in word_map:
                word_map[word] = word_map[word] + 1
            else:
                word_map[word] = 1

        for word,size in word_map.items():
            yield (word, size)
    #На выходе у нас пары: (key, value)
    # (one, 1)
    # (two, 1)
    # (three, 1)
    # и так далее

    
    #На выходе у нас пары: (key, спислк всех значений, которые мы получили на предыдущей стадии)
    # (one, [1,1])
    # (two, [1,1, 1])
    # (three, [1,1])
    # (four, [1])
    # и так далее
    def reducer(self, word, values):
        yield (word, sum(values))
        
if __name__ == "__main__":
    MRSongCount.run()

Overwriting wordcount.py


## Протестируем локально

In [74]:
!python3 wordcount.py input.txt

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/wordcount.root.20231107.173223.699763
Running step 1 of 1...
job output is in /tmp/wordcount.root.20231107.173223.699763/output
Streaming final output from /tmp/wordcount.root.20231107.173223.699763/output...
"i'm"	1
"in"	4
"inside"	12
"is"	4
"isn't"	4
"it"	1
"it's"	2
"let"	12
"she's"	4
"slave"	1
"so"	2
"solemn"	1
"something"	1
"stress"	1
"such"	1
"temples"	1
"terrorize"	1
"that"	5
"to"	6
"unchecked"	1
"up"	12
"vixen"	1
"what"	1
"when"	1
"oh"	2
"one"	2
"only"	2
"or"	1
"past"	1
"perverse"	1
"pheromone"	1
"possession"	1
"press"	1
"real"	8
"won't"	12
"worse"	1
"yeah"	1
"yet"	1
"but"	1
"came"	1
"can't"	4
"carve"	1
"caught"	1
"chest"	1
"climatic"	1
"collectors"	1
"coming"	3
"continues"	1
"crazy"	1
"recognize"	1
"restraints"	1
"rings"	1
"sad"	2
"say"	1
"see"	2
"seems"	1
"self-oblige"	1
"she"	10
"more"	1
"my"	6
"name"	1
"need"	1
"nervous"	1
"never"	1
"night"	1
"no"	1
"now"	

## Запустим на кластере

In [75]:
!python3 wordcount.py -r hadoop hdfs://namenode:8020/pyarrow/input.txt

No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /opt/hadoop/bin...
Found hadoop binary: /opt/hadoop/bin/hadoop
Using Hadoop version 3.3.6
Looking for Hadoop streaming jar in /opt/hadoop...
Found Hadoop streaming jar: /opt/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.3.6.jar
Creating temp directory /tmp/wordcount.root.20231107.173246.843147
uploading working dir files to hdfs:///user/root/tmp/mrjob/wordcount.root.20231107.173246.843147/files/wd...
Copying other local files to hdfs:///user/root/tmp/mrjob/wordcount.root.20231107.173246.843147/files/
Running step 1 of 1...
  packageJobJar: [/tmp/hadoop-unjar3012102092679687138/] [] /tmp/streamjob10416955896905894053.jar tmpDir=null
  Connecting to ResourceManager at resourcemanager/172.18.0.4:8032
  Connecting to Application History server at historyserver/172.18.0.7:10200
  Connecting to ResourceManager at resourcemanager/172.18.0.4:8032
  Connecting to Applicati

## Попробуем увеличить число reducer-ов. Для этого запишем настройки

In [100]:
%%file config.conf

runners:
  hadoop: # also works for emr runner
    jobconf:
      mapreduce.job.reduces: 2

Overwriting config.conf


In [89]:
!python3 wordcount.py -r hadoop hdfs://namenode:8020/pyarrow/input.txt --conf-path config.conf --output /pyarrow/output2

Looking for hadoop binary in /opt/hadoop/bin...
Found hadoop binary: /opt/hadoop/bin/hadoop
Using Hadoop version 3.3.6
Looking for Hadoop streaming jar in /opt/hadoop...
Found Hadoop streaming jar: /opt/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.3.6.jar
Creating temp directory /tmp/wordcount.root.20231107.192558.879583
uploading working dir files to hdfs:///user/root/tmp/mrjob/wordcount.root.20231107.192558.879583/files/wd...
Copying other local files to hdfs:///user/root/tmp/mrjob/wordcount.root.20231107.192558.879583/files/
Running step 1 of 1...
  packageJobJar: [/tmp/hadoop-unjar18357820264371207698/] [] /tmp/streamjob8957403416951307493.jar tmpDir=null
  Connecting to ResourceManager at resourcemanager/172.18.0.2:8032
  Connecting to Application History server at historyserver/172.18.0.7:10200
  Connecting to ResourceManager at resourcemanager/172.18.0.2:8032
  Connecting to Application History server at historyserver/172.18.0.7:10200
  Disabling Erasure Coding for path: /tm

In [91]:
!hadoop fs -cat /pyarrow/output2/part-00001

"am"	1
"aphid"	1
"astounding"	1
"attention"	1
"bathed"	1
"build"	12
"came"	1
"caught"	1
"climatic"	1
"collectors"	1
"continues"	1
"cult"	1
"dressed"	1
"enter"	1
"fatalaties"	1
"fragile"	1
"get"	1
"hands"	1
"hypnotic"	1
"inside"	12
"is"	4
"make"	4
"master"	1
"me"	16
"my"	6
"need"	1
"nervous"	1
"never"	1
"night"	1
"now"	1
"one"	2
"only"	2
"past"	1
"perverse"	1
"possession"	1
"real"	8
"recognize"	1
"sad"	2
"she"	10
"she's"	4
"so"	2
"solemn"	1
"something"	1
"stress"	1
"temples"	1
"terrorize"	1
"this"	12
"unchecked"	1
"vixen"	1
"what"	1
"when"	1
"worse"	1
"yet"	1


## Добавляем combine стадию

In [2]:
%%file wordcount.py
# %%file is an Ipython magic function that saves the code cell as a file

from mrjob.job import MRJob
from mrjob.step import MRStep
import re

WORD_RE = re.compile(r"[\w']+")


class MRMostUsedWord(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
            MRStep(reducer=self.reducer_find_max_word)
        ]

    def mapper_get_words(self, _, line):
        # yield each word in the line
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def combiner_count_words(self, word, counts):
        # optimization: sum the words we've seen so far
        yield (word, sum(counts))

    def reducer_count_words(self, word, counts):
        # send all (num_occurrences, word) pairs to the same reducer.
        # num_occurrences is so we can easily use Python's max() function.
        yield None, (sum(counts), word)

    # discard the key; it is just None
    def reducer_find_max_word(self, _, word_count_pairs):
        # each item of word_count_pairs is (count, word),
        # so yielding one results in key=counts, value=word
        yield max(word_count_pairs)


if __name__ == '__main__':
    MRMostUsedWord.run()

Writing wordcount.py


In [95]:
!python3 wordcount.py -r hadoop hdfs://namenode:8020/pyarrow/input.txt --conf-path config.conf --output /pyarrow/output2

Looking for hadoop binary in /opt/hadoop/bin...
Found hadoop binary: /opt/hadoop/bin/hadoop
Using Hadoop version 3.3.6
Looking for Hadoop streaming jar in /opt/hadoop...
Found Hadoop streaming jar: /opt/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.3.6.jar
Creating temp directory /tmp/wordcount.root.20231107.193931.170381
uploading working dir files to hdfs:///user/root/tmp/mrjob/wordcount.root.20231107.193931.170381/files/wd...
Copying other local files to hdfs:///user/root/tmp/mrjob/wordcount.root.20231107.193931.170381/files/
Running step 1 of 2...
  packageJobJar: [/tmp/hadoop-unjar2998626655543088211/] [] /tmp/streamjob15024210008432738021.jar tmpDir=null
  Connecting to ResourceManager at resourcemanager/172.18.0.2:8032
  Connecting to Application History server at historyserver/172.18.0.7:10200
  Connecting to ResourceManager at resourcemanager/172.18.0.2:8032
  Connecting to Application History server at historyserver/172.18.0.7:10200
  Disabling Erasure Coding for path: /tm

In [99]:
!hadoop fs -cat /pyarrow/output2/part-00001

## Попробуем использовать каунтер. В рамках маппера мы можем к нему обращаться, но на редьюсере мы его уже не видим

In [9]:
%%file wordcount.py
# %%file is an Ipython magic function that saves the code cell as a file

from mrjob.job import MRJob # import the mrjob library

class MRSongCount(MRJob):
    def __init__(self):
        self.counter = 0
        super().__init__()
    
    def mapper(self, _, line):
        words = line.lower().replace("(", '').replace(")", '').replace("!", '').split()

        word_map = {}
        for word in words:
            self.counter = self.counter + 1
            if word in word_map:
                word_map[word] = word_map[word] + 1
            else:
                word_map[word] = 1

        for word,size in word_map.items():                
            yield (word, size)

    def reducer(self, key, values):
        yield (key, f"{sum(values)} of {self.counter}")
        
if __name__ == "__main__":
    MRSongCount.run()

Overwriting wordcount.py


In [10]:
!python3 wordcount.py -r hadoop hdfs://namenode:8020/pyarrow/input.txt

No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /opt/hadoop/bin...
Found hadoop binary: /opt/hadoop/bin/hadoop
Using Hadoop version 3.3.6
Looking for Hadoop streaming jar in /opt/hadoop...
Found Hadoop streaming jar: /opt/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.3.6.jar
Creating temp directory /tmp/wordcount.root.20231106.122525.608203
uploading working dir files to hdfs:///user/root/tmp/mrjob/wordcount.root.20231106.122525.608203/files/wd...
Copying other local files to hdfs:///user/root/tmp/mrjob/wordcount.root.20231106.122525.608203/files/
Running step 1 of 1...
  packageJobJar: [/tmp/hadoop-unjar6103127981734022064/] [] /tmp/streamjob14595565506979629039.jar tmpDir=null
  Connecting to ResourceManager at resourcemanager/172.18.0.4:8032
  Connecting to ResourceManager at resourcemanager/172.18.0.4:8032
  Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1699271312516_0003
  T

## Конфиг для запуска в определенной очереди

In [83]:
%%file config.conf

runners:
  hadoop: # also works for emr runner
    jobconf:
      mapreduce.job.queuename: jupyter

Overwriting config.conf


In [86]:
!python3 wordcount.py -r hadoop hdfs://namenode:8020/pyarrow/input.txt 

No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /opt/hadoop/bin...
Found hadoop binary: /opt/hadoop/bin/hadoop
Using Hadoop version 3.3.6
Looking for Hadoop streaming jar in /opt/hadoop...
Found Hadoop streaming jar: /opt/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.3.6.jar
Creating temp directory /tmp/wordcount.root.20231107.192343.326396
uploading working dir files to hdfs:///user/root/tmp/mrjob/wordcount.root.20231107.192343.326396/files/wd...
Copying other local files to hdfs:///user/root/tmp/mrjob/wordcount.root.20231107.192343.326396/files/
Running step 1 of 1...
  packageJobJar: [/tmp/hadoop-unjar13723236156834957118/] [] /tmp/streamjob11680448707763552907.jar tmpDir=null
  Connecting to ResourceManager at resourcemanager/172.18.0.2:8032
  Connecting to Application History server at historyserver/172.18.0.7:10200
  Connecting to ResourceManager at resourcemanager/172.18.0.2:8032
  Connecting to Applicat