# Содержание
- [MapReduce?](#mapreduce)
- [Mapper](#mapper)
    - [Запускаем mapper](#testmapper)
- [Hadoop](#hadoop)
    - [Что такое Hadoop Streaming?](#hadoopstreaming)
    - [Список директорий в Hadoop](#hdfs_ls)
    - [Тестируем MapReduce на простом reducer](#dummyreducer)
    - [Shuffling и sorting](#shuffling&sorting)
- [Reducer](#reducer)
    - [Запускаем reducer](#run)
- [Запускаем mapreduce job с большими данными](#moredata)
    - [Sort результат (`sort`)](#sortoutput)
    - [Sort результат (в MapReduce)](#sortoutputMR)
    - [Конфигурируем сортировку с `KeyFieldBasedComparator`](#KeyFieldBasedComparator)
    - [Определяем конфигурацию опцией -D](#configuration_variables)


## MapReduce? <a name="mapreduce"></a>

MapReduce - это модель распределенных параллельных вычислений разработанная для процессов обработки больших объемов данных.

Данные разделяются по специальным узлам(nodes), где работают процессы - mappers. Мапперы - это первый шаг обработки данных, они делают "базовую обработку" и передают результаты в reducer. Уже редьюсеры объединяют данные и создают финальный результат.

![Map & Reduce](mapreduce.png)
C [Hadoop Streaming](https://hadoop.apache.org/docs/current/hadoop-streaming/HadoopStreaming.html) возможно использовать языки программирования для разработки mapper и reducer. Здесь будет описан способ использования Unix `bash`. ([Здесь](https://www.gnu.org/software/bash/manual/html_node/index.html) документация по bash).


## Mapper <a name="mapper"></a>

Давайте составим первый mapper скрипт `map.sh`. Mapper должен разбивать каждую строку на слова, добавлять число для счетчика и возвращать каждое слово отдельной строй, а через tab число для счетчика - 1.

Пример: input 
<html>
<pre>
apple orange
banana apple peach
</pre>
</html>

`map.sh` результат:
<html>
<pre>
apple   1
orange  1
banana  1
apple  1
peach  1
</pre>
</html>


<a href="https://ipython.readthedocs.io/en/stable/interactive/magics.html">_cell magic_</a> [`%%writefile`](https://ipython.readthedocs.io/en/stable/interactive/magics.html#cellmagic-writefile) позволяют писать скрипты и выполнять команды Linux в Jupyter Notebook.


In [3]:
%%writefile ap.txt

apple orange
banana apple peach

Overwriting ap.txt


In [4]:
%%writefile map.sh
#!/bin/bash

while read line
do
 for word in $line 
 do
  if [ -n "$word" ] 
  then
     echo -e ${word}"\t1"
  fi
 done
done

Overwriting map.sh


In [5]:
!cat ap.txt


apple orange
banana apple peach

В результате вы получите файл `map.sh` в вашей текущей директории.

**Note:** Каждый последущий запуск ячейки перезапишет файл `map.sh`

In [6]:
!ls -hl map.sh

#dir - для Windows

-rwx------  1 pro  staff   125B  2 мар 18:21 [31mmap.sh[m[m


### Запускаем mapper <a name="testmapper"></a>

Чтобы запустить mapper, сначала создадим для его работы данные, создадим файл `fruits.txt` с набором фруктов.

In [7]:
%%writefile fruits.txt
apple banana
peach orange peach peach
pineapple peach apple

Overwriting fruits.txt


In [8]:
!cat fruits.txt

# type - в Windows

apple banana
peach orange peach peach
pineapple peach apple

Выполним mapper. Используем для этого pipeline '|'

In [9]:
# получили ключ-значение
# мы сначала выводим все на экран, так как между мапером и редьюсером должно быть
# промежуточное звено, которое хранит все данные. В гашем случае это - экран
!cat fruits.txt | ./map.sh

apple	1
banana	1
peach	1
orange	1
peach	1
peach	1


In [10]:
# добавили аггрегатор - wc (а-ля редьюсер)
!cat fruits.txt | ./map.sh | wc

       6      12      50


Если скрипт `map.sh` не выполнится, то проверте права на его использование

In [11]:
!chmod 700 map.sh

## Hadoop <a name="hadoop"></a>
Теперь воспользуемся Hadoop и запустим наш скрипт с помощью Hadoop Streaming. 

### Что такое Hadoop Streaming <a name="hadoopstreaming"></a>

Hadoop Streaming - это библиотека в Hadoop, которая разработка для созданиях самописных мапперов и редьюсеров в исполняемые процессы MapReduce. 


Mapper и reducer читают данные из stdin и отправляют их в stdout. Обучно, колонки в данных разделяются с помощью `tab`. Если данные разделены другим разделителем, то надо будет определять разделитель. Для этого ознакомьтесь с `TextInputFormat` (see the [API documentation](https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/TextInputFormat.html)) и [Hadoop Streaming documentation](https://hadoop.apache.org/docs/stable/hadoop-streaming/HadoopStreaming.html#Customizing_How_Lines_are_Split_into_KeyValue_Pairs).

Пример MapReduce streaming синтаксиса:
<html>
<pre>
    mapred streaming \
  -input myInputDirs \
  -output myOutputDir \
  -mapper /bin/cat \
  -reducer /usr/bin/wc

</pre>
</html>

Документация для Hadoop Streaming от Apache Hadoop: [https://hadoop.apache.org/docs/stable/hadoop-streaming/HadoopStreaming.html](https://hadoop.apache.org/docs/stable/hadoop-streaming/HadoopStreaming.html).

Все настройки Hadoop Streaming и опции описаны здесь: [Streaming Command Options](https://hadoop.apache.org/docs/current/hadoop-streaming/HadoopStreaming.html#Streaming_Command_Options)

In [12]:
!mapred streaming --help

/bin/sh: mapred: command not found


### Список директорий в Hadoop <a name="hdfs_ls"></a>

Команда `hdfs dfs -l` покажет вам все, что находится в вашей домашней директории HDFS. 

`hdfs dfs` запускает файловую систему Hadoop. Списко всех достуных команды вы найдете в документации [System Shell Guide](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HDFSCommands.html#dfs).

In [6]:
!man python

PYTHON(1)                                                            PYTHON(1)



NNAAMMEE
       python  - an interpreted, interactive, object-oriented programming lan-
       guage

SSYYNNOOPPSSIISS
       ppyytthhoonn [ --BB ] [ --dd ] [ --EE ] [ --hh ] [ --ii ] [ --mm _m_o_d_u_l_e_-_n_a_m_e ]
              [ --OO ] [ --OOOO ] [ --RR ] [ --QQ _a_r_g_u_m_e_n_t ] [ --ss ] [ --SS ] [ --tt ] [  --uu
       ]
              [ --vv ] [ --VV ] [ --WW _a_r_g_u_m_e_n_t ] [ --xx ] [ --33 ] [ --??  ]
              [ --cc _c_o_m_m_a_n_d | _s_c_r_i_p_t | - ] [ _a_r_g_u_m_e_n_t_s ]

DDEESSCCRRIIPPTTIIOONN
       Python is an interpreted, interactive, object-oriented programming lan-
       guage that combines remarkable power with very clear  syntax.   For  an
       introduction  to  programming  in Python, see the Python Tutorial.  The
       Python Library 

              ber, where zero matches all line numbers and is thus  equivalent
              to an omitted line number.

       --xx     Skip  the  first line of the source.  This is intended for a DOS
              will be off by one!

       --33     Warn  about  Python 3.x incompatibilities that 2to3 cannot triv-
              ially fix.

IINNTTEERRPPRREETTEERR IINNTTEERRFFAACCEE
       The interpreter interface resembles that of the UNIX shell: when called
       with  standard input connected to a tty device, it prompts for commands
       and executes them until an EOF is read; when called with  a  file  name
       argument  or  with  a  file  as standard input, it reads and executes a
       _s_c_r_i_p_t from that file; when called with --cc  _c_o_m_m_a_n_d,  it  executes  the
       Python  statement(s) given as _c_o_m_m_a_n_d.  Here _c_o_m_m_a_n_d may contain multi-
       ple statements separated by newline

In [13]:
!hdfs dfs -ls 

/bin/sh: hdfs: command not found


Создадим  `wordcount` директорию с вложенной директорий `input` в Hadoop.

In [18]:
%%bash
# mkdir - создание новых каталогов
hdfs dfs -mkdir -p wordcount

bash: line 2: hdfs: command not found


Скопируем fruits.txt в Hadoop директории `wordcount/input`.

Почему мы это делаем? Файл `fruits.txt` должен располагаться в файловой системе Hadoop, а не локально. Когда файл находится в файловой системе Hadoop, то мы получаем возможность использовать фишки Hadoop: data partitioning, distributed processing, fault tolerance

In [14]:
%%bash
# 2> file redirects stderr to file
# /dev/null is the null device it takes any input you want and throws it away. It can be used to suppress any output.
hdfs dfs -rm -r wordcount/input 2>/dev/null
hdfs dfs -mkdir wordcount/input
hdfs dfs -put fruits.txt wordcount/input

bash: line 4: hdfs: command not found
bash: line 5: hdfs: command not found


А теперь проверим.

**Note:** Используйте опцию `-h` для `ls`, чтобы показать размер файла в `human-readable` форме

In [17]:
!hdfs dfs -ls -h -R wordcount/input

/bin/sh: hdfs: command not found


### Тестируем MapReduce на простом reducer <a name="dummyreducer"></a>

Попроюуем запустить наш mapper используя простой reducer `/bin/cat`.

**Warning:** mapreduce процесс всегда выводит большие output в командную строку, часто эта информация для нас бесполезна, нам нужен будет этот результат: <html><pre>"INFO mapreduce.Job: Job ... completed successfully"</pre></html>

**Note:** Когда вы запускаете процесс, убедитесь, что финального файла нет в системе, иначе вы получите ошибку. Вы можете всегда добавлять такое действие: `hadoop fs -rmr wordcount/output 2>/dev/null`. 

In [None]:
#!/bin/bash

# Это из файла hadoop-spark-standalone-docker/hadoop-standalone/submit/run.sh 

# STREAMING инициализирует map-reduce
STREAMING=$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-3.2.1.jar


hdfs dfs -rm -r /output 2> /dev/null

# $HADOOP_HOME/bin/hadoop - это путь к хадупу
# jar - указываем класс процесса
# $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-3.2.1.jar - тот самый класс,
# которй инициализирует mapreduce процесс в хадупе (Hadoop streaming)
$HADOOP_HOME/bin/hadoop jar $STREAMING\
    -input /input/*\
    # это ключ-значение: значение - пустое место, ключ - само слово
    # tr заменяет каждый пробел на \n
    -mapper 'tr " " "\n"' \
    -output /output


In [15]:
# здес все то же самое, что и в предыдущей строке
%%bash
hdfs dfs -rm -r wordcount/output 2>/dev/null
mapred streaming \
  -files map.sh \
  -input wordcount/input \
  -output wordcount/output \
  -mapper map.sh \
  -reducer /bin/cat

SyntaxError: invalid syntax (<ipython-input-15-ae169633e549>, line 2)

In [13]:
!hdfs dfs -ls wordcount/output

Found 2 items
-rw-r--r--   3 datalab supergroup          0 2019-11-18 08:50 wordcount/output/_SUCCESS
-rw-r--r--   3 datalab supergroup         78 2019-11-18 08:50 wordcount/output/part-00000


Если `output` содержит файл `_SUCCESS`, то ваш процесс завершился удачно

**Note:** когда работает с большими данными, то к `cat` используйте `head` или `tail`.

In [16]:
!hdfs dfs -cat wordcount/output/part*|head

/bin/sh: hdfs: command not found


### Shuffling and sorting <a name="shuffling&sorting"></a>

На изображении представлен процесс по результатам 2х мапперов. Процессы shuffle и sort происходят до попадания результатов в reducer

![shuffle & sort](shuffle_sort.png)

Shuffling и sorting являются самыми "дорогими" процессами в MapReduce.


<b>Note:</b>  $2$ - базовое количество мапперов в Hadoop. 

## Reducer <a name="reducer"></a>
Напишем скрипт reducer `reduce.sh`. 

In [1]:
%%writefile reduce.sh
#!/bin/bash

currkey=""
currcount=0
while IFS=$'\t' read -r key val
do
  if [[ $key == $currkey ]]
  then
      currcount=$(( currcount + val ))
  else
    if [ -n "$currkey" ]
    then
      echo -e ${currkey} "\t" ${currcount} 
    fi
    currkey=$key
    currcount=1
  fi
done

# stdout 
echo -e ${currkey} "\t" ${currcount}

Writing reduce.sh


Установим правила для нашего reducer скрипта

In [16]:
!chmod 700 reduce.sh

### Запускаем reducer <a name="run"></a>

Выполним map и reduce без hadoop

In [17]:
!cat fruits.txt|./map.sh|sort|./reduce.sh

apple 	 2
banana 	 1
orange 	 1
peach 	 4
pineapple 	 1


Это хороший способ тестирования, если результаты правильные, то запустим на Hadoop

In [18]:
%%bash
hdfs dfs -rm -r wordcount/output 2>/dev/null
mapred streaming \
  -file map.sh \
  -file reduce.sh \
  -input wordcount/input \
  -output wordcount/output \
  -mapper map.sh \
  -reducer reduce.sh

packageJobJar: [map.sh, reduce.sh] [/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/jars/hadoop-streaming-3.0.0-cdh6.3.0.jar] /tmp/streamjob4511756596880012363.jar tmpDir=null


19/11/18 08:50:23 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
19/11/18 08:50:25 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm472
19/11/18 08:50:25 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /user/datalab/.staging/job_1571899345744_0252
19/11/18 08:50:26 INFO mapred.FileInputFormat: Total input files to process : 1
19/11/18 08:50:26 INFO mapreduce.JobSubmitter: number of splits:2
19/11/18 08:50:26 INFO Configuration.deprecation: yarn.resourcemanager.zk-address is deprecated. Instead, use hadoop.zk.address
19/11/18 08:50:26 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
19/11/18 08:50:26 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1571899345744_0252
19/11/18 08:50:26 INFO mapreduce.JobSubmitter: Executing with tokens: []
19/11/18 08:50:26 INFO conf.Configuration: resourc

Проверим файл результата в HDFS

In [19]:
!hdfs dfs -cat wordcount/output/part*|head

apple 	 2
banana 	 1
orange 	 1
peach 	 4
pineapple 	 1


## Запускаем mapreduce job с большими данными <a name="moredata"></a>

Создадим файл с данными, на основе реальной новостой статьи из интернета. Данных стало больше, представим, что это "большие данные".

Для загрузки данных из интернета используйте, либо парсер Python, либо команду `wget` и удалите HTML командой `sed`.

In [20]:
%%bash
URL=https://www.derstandard.at/story/2000110819049/und-wo-warst-du-beim-fall-der-mauer
wget -qO- $URL | sed -e 's/<[^>]*>//g;s/^ //g' >sample_article.txt

In [14]:
# wget на Python
import wget
import lxml.html

url = 'https://www.derstandard.at/story/2000110819049/und-wo-warst-du-beim-fall-der-mauer'
filename = wget.download(url)

with open(filename, 'r') as f:
    ff = f.read()
    t = lxml.html.fromstring(ff)
    result = t.text_content()
    
with open('sample_article.txt', 'w+', encoding='utf-8') as f:
    f.write(result)

  0% [                                                                              ]     0 / 17297 47% [....................................                                          ]  8192 / 17297 94% [.........................................................................     ] 16384 / 17297100% [..............................................................................] 17297 / 17297

In [21]:
!cat sample_article.txt|./map.sh|head

	1
	1
	1
	1
Und	1
wo	1
warst	1
du	1
beim	1
Fall	1


Изменим наш маппер для работы с пустыми строками.

In [15]:
%%writefile map.sh
#!/bin/bash

while read line
do
 for word in $line
 do
  if [[ "$line" =~ [^[:space:]] ]]
  then
    if [ -n "$word" ]
    then
    echo -e ${word} "\t1"
    fi
  fi
 done
done

Overwriting map.sh


In [23]:
!cat sample_article.txt|./map.sh|head

Und 	1
wo 	1
warst 	1
du 	1
beim 	1
Fall 	1
der 	1
Mauer? 	1
- 	1
 	1


`map.sh` дает лучше результаты

<b>Note:</b> при работе с реальными данными мы должны обращать внимание на их "чистоту" и добавлять в свой код процессы обработки (отчистки) данных.

Запустим MapReduce на новых данных, но сначала загрузим их в HDFS

In [24]:
%%bash
hdfs dfs -rm -r wordcount/input 2>/dev/null
hdfs dfs -put sample_article.txt wordcount/input

In [25]:
# проверим
!hdfs dfs -ls -h wordcount/input

-rw-r--r--   3 datalab supergroup      4.1 K 2019-11-18 08:50 wordcount/input


проверим reducer

In [26]:
!cat sample_article.txt|./map.sh|./reduce.sh|head

Und 	 1
wo 	 1
warst 	 1
du 	 1
beim 	 1
Fall 	 1
der 	 1
Mauer? 	 1
- 	 1
 	 2


In [None]:
%%bash
hadoop fs -rmr wordcount/output 2>/dev/null
mapred streaming \
  -file map.sh \
  -file reduce.sh \
  -input wordcount/input \
  -output wordcount/output \
  -mapper map.sh \
  -reducer reduce.sh

Проверим результат в HDFS

In [28]:
!hdfs dfs -ls wordcount/output

Found 2 items
-rw-r--r--   3 datalab supergroup          0 2019-11-18 08:51 wordcount/output/_SUCCESS
-rw-r--r--   3 datalab supergroup       2273 2019-11-18 08:51 wordcount/output/part-00000


Больше данных - больше времени, так что этот процесс. Но учтите, что не всегда стоит использовать Hadoop, есть много кейсов, когда не стоит использовать сложные инструменты. Hadoop - только для больших инструментов.

In [29]:
!hdfs dfs -cat wordcount/output/part-00000|head

&amp; 	 1
(Herder-Verlag) 	 1
- 	 1
/ 	 2
1950 	 1
24 	 1
30 	 1
30-Jährige 	 1
<path 	 1
AGB 	 1


### Sort результат (`sort`) <a name="sortoutput"></a>

В результате мы получаем список из связки: слово - значение, нам нужно сделать сортировку по частоте употребления слова.

Результат из reducer сортируется по ключам (словам) на основе результатов из mapper. Для получения сортированного результата используем Unix команду `sort` (с опциями `k2`, `n`, `r`, которое означают "по полю 2", "числовое значение", "от большего к меньшему").

In [30]:
!hdfs dfs -cat wordcount/output/part-00000|sort -k2nr|head

die 	 8
der 	 6
Cookies 	 4
und 	 4
derStandard.at 	 3
Fall 	 3
ich 	 3
in 	 3
kann 	 3
ohne 	 3


### Sort результат (в MapReduce) <a name="sortoutputMR"></a>

Если нам необходимо сделать сортировку в reducer, то мы можем применить простой трюк: создаем маппер, который будет менять местами слова (ключи) и их частоту (значение), на выходе маппера мы получим желанный эфект автоматически.

Создадим новый маппер `swap_keyval.sh`.

In [16]:
%%writefile swap_keyval.sh
#!/bin/bash
# скрипт меняет местами значения в строке
# пример: "word 100" -> "100 word"

while read key val
do
 printf "%s\t%s\n" "$val" "$key"
done    

Writing swap_keyval.sh


Выполним наш новый маппер в старом пайплайне, не забудьте удалить `output_sorted`. 

Каждый шаг записывает свой результат на диск, что при больших объемах увеличивает время выполнения, это одна из причин появления Apache Spark [Apache Spark](https://spark.apache.org/).

In [None]:
%%bash
hdfs dfs -rm -r wordcount/output2 2>/dev/null
mapred streaming \
  -file swap_keyval.sh \
  -input wordcount/output \
  -output wordcount/output2 \
  -mapper swap_keyval.sh

Проверим результат в HDFS

In [2]:
!hdfs dfs -ls wordcount/output2

/bin/sh: hdfs: command not found


In [34]:
!hdfs dfs -cat wordcount/output2/part-00000|head 

1	an
1	–
1	überraschen.
1	Über
1	zustimmungspflichtige
1	zustimmen
1	zum
1	zu.
1	widerrufen.
1	werden.


Маппер сортирует от меньшего к большему (sort = ascending order). Самые частые слова будут внизу файла (смотрим хвост файла)

In [35]:
!hdfs dfs -cat wordcount/output2/part-00000|tail

3	kann
3	in
3	derStandard.at
3	ich
3	Fall
3	Sie
4	Cookies
4	und
6	der
8	die


### Конфигурируем сортировку с `KeyFieldBasedComparator` <a name="KeyFieldBasedComparator"></a>

Мы можем определить, как маппер будет сортировать свои результаты, для этого надо исользовать класс [`KeyFieldBasedComparator`](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedComparator.html)
<html><pre>-D mapreduce.job.output.key.comparator.class=\
    org.apache.hadoop.mapred.lib.KeyFieldBasedComparator</pre></html>
    
Данный класс (из библиотеке Hadoop) позволяет сделать похожие дополнения (опции) к сортировке, как Unix `sort`(`-n` - чиловая сортировка, `-r` от большего к меньшему, `-k pos1[,pos2]` сортировать по позиции элемента).

Применим данный класс `KeyFieldBasedComparator` 

In [None]:
%%bash
hdfs dfs -rmr wordcount/output2 2>/dev/null
comparator_class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator
mapred streaming \
  -D mapreduce.job.output.key.comparator.class=$comparator_class \
  -D mapreduce.partition.keycomparator.options=-nr \
  -file swap_keyval.sh \
  -input wordcount/output \
  -output wordcount/output2 \
  -mapper swap_keyval.sh

In [37]:
!hdfs dfs -ls wordcount/output2

Found 2 items
-rw-r--r--   3 datalab supergroup          0 2019-11-18 08:52 wordcount/output2/_SUCCESS
-rw-r--r--   3 datalab supergroup       1945 2019-11-18 08:52 wordcount/output2/part-00000


In [38]:
!hdfs dfs -cat wordcount/output2/part-00000|head 

8	die
6	der
4	und
4	Cookies
3	Fall
3	Sie
3	ich
3	kann
3	warst
3	in


Теперь мы получили необходимый результат

### Определяем конфигурацию опцией -D<a name="configuration_variables"></a>

Опция `-D` позволяет перезаписать параметр в базовой конфигурации [`mapred_default.xml`](https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml)
(в документации [Apache Hadoop documentation](https://hadoop.apache.org/docs/current/hadoop-streaming/HadoopStreaming.html#Specifying_Configuration_Variables_with_the_-D_Option)).

Иногда это требуется для исправления ошибки `out-of-memory` во время сортировки. Так как выделенной памяти может не хватать и её нужно увеличить, а если процесс выполняется и ресурсов очень много, то уменьшить. За это отвечает параметр `mapreduce.task.io.sort.mb`, он имеет размерность в Mb:
 <html>
    <pre>-D mapreduce.task.io.sort.mb=512
    </pre>
 </html>

 **Note:** `mapreduce.task.io.sort.mb` может быть не более  2047.   