# Hadoop: Map-Reduce 

ПРИМЕЧАНИЕ: для этого блокнота вам следует запустить сервер с Hadoop (с YARN) и средой Spark.

[Библиотека программного обеспечения Apache Hadoop](https://hadoop.apache.org/) — это фреймворк, который позволяет выполнять распределенную обработку больших наборов данных в кластерах компьютеров с использованием простых моделей программирования. Он разработан для масштабирования от отдельных серверов до тысяч машин, каждая из которых предлагает локальные вычисления и хранение. Вместо того, чтобы полагаться на оборудование для обеспечения высокой доступности, сама библиотека разработана для обнаружения и обработки сбоев на уровне приложений, тем самым предоставляя высокодоступный сервис поверх кластера компьютеров, каждый из которых может быть подвержен сбоям.

Текущая установка включает следующие модули из экосистемы Hadoop:

- __Hadoop Common:__ Общие утилиты, которые поддерживают другие модули Hadoop.
- __Hadoop Distributed File System (HDFS™):__ Распределенная файловая система, которая обеспечивает высокопроизводительный доступ к данным приложений.
- __Hadoop YARN:__ Фреймворк для планирования заданий и управления ресурсами кластера.
- __Hadoop MapReduce:__ Система на основе YARN для параллельной обработки больших наборов данных.
- __Spark™:__ Быстрый и универсальный вычислительный движок для данных Hadoop. Spark предоставляет простую и выразительную модель программирования, которая поддерживает широкий спектр приложений, включая ETL, машинное обучение, потоковую обработку и вычисление графов.

Псевдораспределенный режим (кластер с одним узлом)

Hadoop также может быть запущен на одном узле в [псевдораспределенном режиме](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SingleCluster.html#Pseudo-Distributed_Operation), где каждый демон Hadoop работает в отдельном процессе Java.

В псевдораспределенном режиме мы также используем только один узел, но главное, что кластер моделируется, что означает, что все процессы внутри кластера будут работать независимо друг от друга. Все демоны, которые являются Namenode, Datanode, Secondary Name node, Resource Manager, Node Manager и т. д., будут работать как отдельный процесс на отдельной JVM (виртуальной машине Java) или, можно сказать, работать на разных процессах Java, поэтому он называется псевдораспределенным.

<b>ОЧЕНЬ ВАЖНОЕ ПРИМЕЧАНИЕ: экземпляр Hadoop, установленный в среде «Hadoop (с YARN) и Spark», был разработан только для образовательных целей и НЕ СОХРАНЯЕТ ДАННЫЕ после остановки сервера. Вы можете создавать или удалять файлы в файловой системе HDFS, записывать данные во время сеанса, но при следующем запуске сервера Jupyter будет чистая файловая система без данных.</b>

In [38]:
import os
import re
import json
import socket
import subprocess
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import utils as pu
from pyspark.sql import functions as F
from pyspark.sql import types as pt

In [39]:
YARN_PORT = 8088

# директория по умолчанию для пользователя `jovyan`
WORK_DIR = '/jovyan'

In [40]:
def hdfs_dirs(path, filter_str=''):
    """
    Возвращает файлы по указанному пути в виде списка. 
    Имена файлов можно фильтровать по параметру `filter_str`,
    например, `filter_str='csv'` отобразит только файлы `csv`.
    
    """
    process = subprocess.Popen(
        ['hdfs', 'dfs', '-ls', path], 
        stdout=subprocess.PIPE, 
        stderr=subprocess.PIPE
    )
    out, err = process.communicate()
    dirs = out.decode('utf-8').split('\n')
    dirs = list(filter(lambda x: filter_str in x, dirs))
    dirs = list(map(lambda x: x.split(' ')[-1], dirs))
    return dirs

def file_content(path):
    """
    Возвращает содержимое файла.
    Аналогично команде `cat`.
    
    """
    process = subprocess.Popen(
        ['hdfs', 'dfs', '-cat', path], 
        stdout=subprocess.PIPE, 
        stderr=subprocess.PIPE
    )
    out, err = process.communicate()
    return out.decode('unicode_escape')

In [41]:
!hdfs dfs -ls /

Found 4 items
drwxr-xr-x   - hadoop supergroup           0 2025-03-20 07:00 /hbase
drwxr-xr-x   - jovyan hadoopusers          0 2025-03-20 07:20 /jovyan
drwxrwxrwx   - hadoop supergroup           0 2025-03-20 06:59 /tmp
drwxr-xr-x   - jovyan hadoopusers          0 2025-03-20 07:00 /user


Посмотрим, что лежит в рабочей директории

In [42]:
!hdfs dfs -ls {WORK_DIR}

Found 1 items
-rw-r--r--   1 jovyan hadoopusers   39030677 2025-03-20 07:20 /jovyan/aggrigation_logs_per_week.csv


Здесь пусто. 


Скопируем файл с логами студентов на образовательном портале. 

In [43]:
!hdfs dfs -put datasets/aggrigation_logs_per_week.csv {WORK_DIR}

put: `/jovyan/aggrigation_logs_per_week.csv': File exists


In [57]:
!hdfs dfs -ls {WORK_DIR}

Found 1 items
-rw-r--r--   1 jovyan hadoopusers   39030677 2025-03-20 07:20 /jovyan/aggrigation_logs_per_week.csv


Вызовем фанкцию вывода списка файлов

In [45]:
hdfs_dirs(WORK_DIR, 'csv')

['/jovyan/aggrigation_logs_per_week.csv']

Выведем первые 100 символов. 

In [46]:
file_content ('/jovyan/aggrigation_logs_per_week.csv')[:100]

'courseid,userid,num_week,s_all,s_all_avg,s_course_viewed,s_course_viewed_avg,s_q_attempt_viewed,s_q_'

# Команда hdfs fsck

hdfs fsck — это утилита для проверки целостности файлов и блоков в HDFS. Она позволяет выявить проблемы, такие как отсутствующие блоки, поврежденные данные или другие несоответствия.

Аргументы: 
- files 
вывести информацию о файле, включая его размер, количество блоков и другие метаданные.

- blocks
добавляет информацию о блоках, из которых состоит файл. В HDFS файлы разбиваются на блоки (обычно размером 128 МБ или 256 МБ), которые распределяются по узлам кластера. Этот флаг покажет, сколько блоков занимает файл, их размеры и статус.

- racks
добавляет информацию о том, на каких стойках (racks) расположены блоки файла. В HDFS стойки используются для обеспечения отказоустойчивости и оптимизации производительности. Этот флаг покажет, как блоки распределены по стойкам в кластере.

In [34]:
%%bash
hdfs fsck /jovyan/aggrigation_logs_per_week.csv -files -blocks -racks

Connecting to namenode via http://0.0.0.0:9870/fsck?ugi=jovyan&files=1&blocks=1&racks=1&path=%2Fjovyan%2Faggrigation_logs_per_week.csv


FSCK started by jovyan (auth:SIMPLE) from /10.112.133.154 for path /jovyan/aggrigation_logs_per_week.csv at Thu Mar 20 10:56:28 UTC 2025

/jovyan/aggrigation_logs_per_week.csv 39030677 bytes, replicated: replication=1, 1 block(s):  OK
0. BP-900967270-10.112.133.154-1742453981847:blk_1073741844_1020 len=39030677 Live_repl=1  [/default-rack/10.112.133.154:9866]


Status: HEALTHY
 Number of data-nodes:	1
 Number of racks:		1
 Total dirs:			0
 Total symlinks:		0

Replicated Blocks:
 Total size:	39030677 B
 Total files:	1
 Total blocks (validated):	1 (avg. block size 39030677 B)
 Minimally replicated blocks:	1 (100.0 %)
 Over-replicated blocks:	0 (0.0 %)
 Under-replicated blocks:	0 (0.0 %)
 Mis-replicated blocks:		0 (0.0 %)
 Default replication factor:	1
 Average block replication:	1.0
 Missing blocks:		0
 Corrupt blocks:		0
 Missing replicas:		0 (0.0 %)

Erasure Coded Block Groups:
 Total size:	0 B
 Total files:	0
 Total block groups (validated):	0
 Minimally erasure-coded block groups:	0


# Пояснения к выводу
Connecting to namenode via http://0.0.0.0:9870/fsck?ugi=jovyan&files=1&blocks=1&racks=1&path=%2Fjovyan%2Faggrigation_logs_per_week.csv
FSCK started by jovyan (auth:SIMPLE) from /10.112.129.148 for path /jovyan/aggrigation_logs_per_week.csv at Mon Jan 27 11:39:52 UTC 2025

/jovyan/aggrigation_logs_per_week.csv 39030677 bytes, replicated: replication=1, 1 block(s):  OK

#### Размер файла: 39 030 677 байт (~37 МБ).
#### Репликация: replication=1 (файл хранится только в одном экземпляре).
#### Число блоков: 1 (весь файл помещён в один блок).
#### Статус: OK (файл здоров).

<i> 0. BP-433375171-10.112.129.148-1737977684764:blk_1073741837_1013 len=39030677 Live_repl=1  [/default-rack/10.112.129.148:9866]  </i>
#### Информация о блоке:
#### Идентификатор блока: blk_1073741837_1013.
#### Длина блока: len=39030677 (совпадает с размером файла).
#### Количество активных реплик: Live_repl=1 (блок доступен только на одном узле).
#### Расположение блока: /default-rack/10.112.129.148:9866 (узел на rack'е default-rack).

Status: HEALTHY
 Number of data-nodes:	1
 Number of racks:		1
 Total dirs:			0
 Total symlinks:		0

#### HDFS работает с одним узлом хранения данных.
#### Rack'ов (групп узлов) тоже один.



# Команда hdfs dfs -setrep

команда HDFS, которая изменяет фактор репликации (количество копий) для указанного файла или директории.

Пример использования:
```
%%bash
hdfs dfs -setrep -w 2 /jovyan/aggrigation_logs_per_week.csv
```
Аргументы команды:
- -w
Этот флаг указывает, что команда должна ожидать завершения процесса репликации. Без этого флага команда просто запустит процесс репликации в фоновом режиме.

- 2
Это новое значение фактора репликации. В данном случае файл будет иметь 2 копии (реплики) в HDFS.

- /jovyan/aggrigation_logs_per_week.csv
Это путь к файлу в HDFS, для которого изменяется фактор репликации.


В нашей среде данная команда не сработает, поскольку у нас в наличие только одна DataNode.

In [27]:
## Map-reduce

Hadoop MapReduce — это программная среда для простого написания приложений, которые обрабатывают огромные объемы данных (многотерабайтные наборы данных) параллельно на больших кластерах (тысячи узлов) стандартного оборудования надежным и отказоустойчивым способом.

Задание MapReduce обычно разбивает входной набор данных на независимые фрагменты, которые обрабатываются задачами <b>map</b> полностью параллельно. Среда сортирует выходные данные карт, которые затем вводятся в задачи <b>reduce</b>. Обычно и входные, и выходные данные задания хранятся в файловой системе. Среда занимается планированием задач, их мониторингом и повторно выполняет невыполненные задачи.

Среда MapReduce работает исключительно с парами <ключ, значение>, то есть среда рассматривает входные данные задания как набор пар <ключ, значение> и создает набор пар <ключ, значение> в качестве выходных данных задания, предположительно разных типов.

Типы входных и выходных данных задания MapReduce:
```
(вход) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (выход)
```

Демонстрация для фреймворка MapReduce предназначена для хорошо известной задачи подсчета слов:

![MapReduce](https://www.todaysoftmag.com/images/articles/tsm33/large/a11.png)

# Задание 1: Подсчет количества записей для каждого курса (courseid)

Цель: Научиться использовать MapReduce для подсчета количества записей для каждого курса.

Ход выполнения:
Map: Преобразуйте каждую строку в пару ключ-значение, где ключ — это courseid, а значение — 1.

Reduce: Суммируйте значения для каждого ключа.

In [32]:
class MRCourseCount:
    def __init__(self, input_file, output_file):
        self.input_file = input_file
        self.output_file = output_file

    def mapper(self, line):
        fields = line.strip().split(',')
        if len(fields) > 0:
            courseid = fields[0]
            yield courseid, 1

    def reducer(self, courseid, counts):
        yield courseid, sum(counts)

    def run(self):
        intermediate = {}
        with open(self.input_file, 'r') as f:
            next(f)  # Пропуск заголовка
            for line in f:
                for key, value in self.mapper(line):
                    if key not in intermediate:
                        intermediate[key] = []
                    intermediate[key].append(value)

        results = {}
        for key, values in intermediate.items():
            for result_key, result_value in self.reducer(key, values):
                results[result_key] = result_value

        # Запись результатов в файл
        with open(self.output_file, 'w') as f:
            for courseid, count in results.items():
                f.write(f"{courseid}: {count}\n")


In [62]:
input_file = 'jovyan/aggrigation_logs_per_week.csv'
output_file = "jovyan/output.txt"
mr_job = MRCourseCount(input_file, output_file)
mr_job.run()

In [53]:
!ls 

hadoop	jovyan


# Задание 2: Подсчет среднего значения s_all_avg для каждого курса
С использованием MapReduce вычислить среднее значение  s_all_avg по каждому курсу.

Ход выполнения:
- Map: Преобразуйте каждую строку в пару ключ-значение, где ключ — это courseid, а значение — s_all_avg.

- Reduce: Вычислите среднее значение для каждого курса.

- Сохраните результаты вычисления в отдельный файл

In [63]:
# ваш код

# Задание 3: Подсчет количества уникальных пользователей (userid) для каждого курса

Использовать MapReduce для подсчета уникальных значений.

Ход выполнения:
- Map: Преобразуйте каждую строку в пару ключ-значение, где ключ — это courseid, а значение — userid.

- Reduce: Соберите уникальные userid для каждого курса и подсчитайте их количество.

- Сохраните результаты вычисления в отдельный файл

In [64]:
# ваш код

# Задание 4: Запуск  MapReduce  в терминале

Оформите полученный код из задания 3 в виде отдельного исполняющего файла mr_userid.py

Зайдите в терминал и запустите файл mr_userid.py

Например:
```
python untitled.py aggrigation_logs_per_week.csv
```

Файл  mr_userid.py сохраните в отдельной папке Task_6.MR


================================================


<b>В классической реализации MapReduce (например, в Hadoop) mapper и reducer выполняются как отдельные задачи (отдельные процессы или даже отдельные узлы в кластере). </b>

Преимущества разделения на mapper и reducer:
- Масштабируемость: Mapper и reducer могут выполняться на разных узлах кластера.

- Гибкость: Можно использовать разные языки программирования для mapper и reducer.

- Совместимость с Hadoop: Такой подход легко адаптировать для работы с Hadoop Streaming.

Исправим данное упущение. 

## Шаг 1: Разделение кода на два файла
Файл <b>mapper.py</b>
Этот файл будет содержать логику mapper. Он будет читать входные данные и выдавать промежуточные пары ключ-значение.
Например:
```
import sys

def mapper():
    for line in sys.stdin:
        fields = line.strip().split(',')
        if len(fields) > 1:  # Пропуск пустых строк
            courseid = fields[0]
            userid = fields[1]
            print(f"{courseid}\t{userid}")

if __name__ == '__main__':
    mapper()
```

Файл <b>reducer.py</b>
Этот файл будет содержать логику reducer. Он будет читать промежуточные данные, группировать их по ключам и выполнять агрегацию.
Например:
```
import sys
from itertools import groupby
from operator import itemgetter

def reducer():
    # Чтение данных из stdin и сортировка по ключу (courseid)
    data = [line.strip().split('\t') for line in sys.stdin]
    data.sort(key=itemgetter(0))

    # Группировка по courseid
    for courseid, group in groupby(data, key=itemgetter(0)):
        userids = set(userid for _, userid in group)
        print(f"{courseid}\t{len(userids)}")

if __name__ == '__main__':
    reducer()
```


## Шаг 2: Запуск mapper и reducer через конвейер
Для запуска mapper и reducer как отдельных процессов можно использовать конвейер (pipe) в командной строке. Вот как это сделать:

Запуск mapper и reducer через конвейер:

```
cat /jovyan/aggrigation_logs_per_week.csv | python3 mapper.py | sort | python3 reducer.py > output.txt
```

Здесь:

- cat читает файл и передает его содержимое в mapper.py.

- mapper.py обрабатывает данные и выводит промежуточные пары ключ-значение.

- sort сортирует промежуточные данные по ключу (это важно для правильной работы reducer).

- reducer.py обрабатывает отсортированные данные и записывает результат в output.txt.

Отметим, что при реализации запуска черер конвейер все данные обрабатываются на одной машине.

Преимущества такого запуска:

- Простота: Не требуется Hadoop или кластер.

- Быстрый запуск: Подходит для локальной разработки и тестирования.

- Легкость отладки: Легче отслеживать и исправлять ошибки.

Недостатки:

- Ограниченная масштабируемость: Все данные обрабатываются на одной машине, что не подходит для больших объемов данных.

- Нет отказоустойчивости: Если процесс завершится с ошибкой, все вычисления нужно будет начать заново.

# Задание 5: Выявление студентов с высоким риском отчисления
Найти студентов с низкой активностью, кто может быть в группе риска.
- Mapper:

Для каждой строки данных извлекаем userid и s_all (общая активность студента).

Выдаем пары (userid, s_all).

- Reducer:

Для каждого userid вычисляем среднее значение s_all (средняя активность студента). Вопрос: что не так в данном подходе? Предложите, как исправить упущение (если оно имеется), реализуйте его. 

Сортируем студентов по средней активности и выбираем топ-10 студентов с наименьшей активностью.

Два файла сохранить в папке Task_6.MR и запустить через конвейер. 

#  Запуск через Hadoop Streaming

Hadoop Streaming — это утилита, которая позволяет запускать MapReduce-задачи с использованием любых исполняемых файлов (например, Python-скриптов) в качестве mapper и reducer.

Она работает поверх Hadoop, что позволяет использовать распределенные вычисления на кластере.
Преимущества запуска через Hadoop Streaming:

- Масштабируемость: Hadoop распределяет данные и вычисления по множеству узлов, что позволяет обрабатывать огромные объемы данных.

- Отказоустойчивость: Hadoop автоматически перезапускает задачи, если какой-то узел выходит из строя.

- Интеграция с HDFS: Данные хранятся в HDFS, что обеспечивает высокую производительность и надежность.

Недостатки:

- Требуется настроенный Hadoop-кластер.

- Более сложная настройка и запуск по сравнению с локальным подходом.

Пример запуска:
```
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-3.2.3.jar \
    -input /jovyan/aggrigation_logs_per_week.csv \
    -output /jovyan/output_stud \
    -mapper "python3 mapper.py" \
    -reducer "python3 reducer.py" \
    -file ./data/big_data_course/Task_6.MR/mapper.py \
    -file ./data/big_data_course/Task_6.MR/reducer.py

```


Обратите внимание, что файл с результатами reducer будет сохранен в директории, которую вы указали в параметре output, причем это выходная директория hdfs, что значит, что вы увидете данный файл не в локальной системе, а именно в hdfs.

Найдем данный файл в hdfs:
```
 hdfs dfs -ls -R /jovyan | grep "output_stud"
```
Пример результата поиска:
```
drwxr-xr-x   - jovyan hadoopusers          0 2025-03-20 14:56 /jovyan/output_stud
-rw-r--r--   1 jovyan hadoopusers          0 2025-03-20 14:56 /jovyan/output_stud/_SUCCESS
-rw-r--r--   1 jovyan hadoopusers       7823 2025-03-20 14:56 /jovyan/output_stud/part-00000
```
Внутри этой директории находятся файлы с результатами, такие как part-00000, part-00001 и т.д. _SUCCESS — пустой файл, который указывает на успешное завершение задачи. part-00000 — файл с результатами.


Чтобы просмотреть содержимое файла с результатами, используйте команду:
```
 hdfs dfs -cat /jovyan/output_stud/part-00000
```

# Задание 6: Запуск MapReduce-задачи через Hadoop Streaming 

Подготовьте команду для запуска MapReduce-задачи через Hadoop Streaming из задания 5.

В терминале проверьте, что задача выполняется успешно.

Откройте полученный файл в hdfs.

Бутьте готовы продемонстриротьва запуск преподавателю. 

In [None]:
# Ваша команда запуска задачи