<a target="_blank" href="../cluster" style="font-size:20px">All Applications (YARN)</a>

# Домашнее задание

Будем использовать логи прослушивания музыкальных исполнителей в сервисе Яндекс.Музыка.

Файл `events.csv` содержит записи вида `Пользователь,Исполнитель,Число прослушиваний,Число пропусков`:
```csv
userId,artistId,plays,skips
0,335,1,0
0,708,1,0
0,710,2,1
0,815,1,1
```

Вам необходимо проделать следующее:
1. **Оставьте в данных только тех пользователей, для которых сумма plays строго больше 1000. Сколько таких пользователей?**
2. **В отфильтрованных на первом шаге данных найдите 5 самых популярных по числу пользователей исполнителей (идентификаторы).**

Детали:
1. Давайте считать, что список прослушиваний одного пользователя всегда помещается в память.

Решение сохраните в файл `result.json`. Пример содержимого файла:

```json
{
    "q1": 123,
    "q2": [
        4,
        5,
        6,
        7,
        8
    ]
}```

In [1]:
# пример содержимого файла
! head -n 5 yandex_music/events.csv

userId,artistId,plays,skips
0,335,1,0
0,708,1,0
0,710,2,1
0,815,1,1


In [2]:
# копируем файлы в HDFS
! hadoop fs -copyFromLocal yandex_music /
! hadoop fs -ls -h /yandex_music

Found 3 items
-rw-r--r--   1 jovyan supergroup        254 2021-05-08 19:44 /yandex_music/README.txt
-rw-r--r--   1 jovyan supergroup      3.7 M 2021-05-08 19:44 /yandex_music/artists.jsonl
-rw-r--r--   1 jovyan supergroup     47.6 M 2021-05-08 19:44 /yandex_music/events.csv


In [3]:
# пример содержимого файла
! head -n 10 yandex_music/events.csv

userId,artistId,plays,skips
0,335,1,0
0,708,1,0
0,710,2,1
0,815,1,1
0,880,1,1
0,1091,2,3
0,1222,1,1
0,1571,1,2
0,1592,2,2


In [4]:
%%file mapper.py
import sys

first_skip = True
for line in sys.stdin:
    if first_skip:
        first_skip = False
        continue
    user_id, artist_id, plays, skips = line.split(',')
    print(user_id + "\t" + artist_id + "," + plays)

Overwriting mapper.py


In [5]:
! head -n 5 yandex_music/events.csv | python ./mapper.py

0	335,1
0	708,1
0	710,2
0	815,1


In [6]:
%%file reducer.py
import sys

prev_key = None
plays_sum = 0
event_list = []
for line in sys.stdin:  # stream is sorted by key
    key, value = line.split("\t")
    _, plays = value.split(",")
    if prev_key is not None and key != prev_key:
        # new key in stream, dump previous
        if plays_sum > 1000:
            for i in event_list:
                print(i)
        plays_sum = 0
        event_list = []
    
    event_list.append(line[:-1])
    plays_sum += int(plays)
    prev_key = key

# dump last key
if plays_sum > 1000:
    for i in event_list:
        print(i)

Overwriting reducer.py


In [7]:
! head -n 5 yandex_music/events.csv | python ./mapper.py | python ./reducer.py

In [8]:
! hadoop fs -rm -r /filtered

! mapred streaming \
  -input /yandex_music/events.csv \
  -output /filtered \
  -mapper "/opt/conda/bin/python3.6 mapper.py" \
  -reducer "/opt/conda/bin/python3.6 reducer.py" \
  -file mapper.py \
  -file reducer.py

rm: `/filtered': No such file or directory
2021-05-08 19:44:40,992 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [mapper.py, reducer.py] [/usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.3.0.jar] /tmp/streamjob8191031279392158772.jar tmpDir=null
2021-05-08 19:44:41,756 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at /0.0.0.0:8032
2021-05-08 19:44:41,898 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at /0.0.0.0:8032
2021-05-08 19:44:42,080 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/jovyan/.staging/job_1620502994344_0001
2021-05-08 19:44:43,134 INFO mapred.FileInputFormat: Total input files to process : 1
2021-05-08 19:44:43,575 INFO mapreduce.JobSubmitter: number of splits:3
2021-05-08 19:44:44,063 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1620502994344_0001
2021-05-08 19:44:44,

In [9]:
! hadoop fs -ls /filtered

Found 2 items
-rw-r--r--   1 jovyan supergroup          0 2021-05-08 19:45 /filtered/_SUCCESS
-rw-r--r--   1 jovyan supergroup   32322272 2021-05-08 19:45 /filtered/part-00000


In [10]:
! hadoop fs -cat /filtered/part-00000 | cut -f1 | uniq | wc -l

3117


In [11]:
%%file mapper2.py
import sys

first_skip = True
for line in sys.stdin:
    key, value = line.split("\t")
    artist_id, plays = value.split(",")
    print(artist_id + '\t' + "1")

Overwriting mapper2.py


In [12]:
%%file reducer2.py
import sys

prev_key = None
count = 0
for line in sys.stdin:  # stream is sorted by key
    key, value = line.split("\t")
    
    if prev_key is not None and key != prev_key:
        # new key in stream, dump previous
        print(prev_key + "\t" + str(count))
        count = 0
    
    count += int(value)
    prev_key = key

# dump last key
print(prev_key + "\t" + str(count))

Overwriting reducer2.py


In [None]:
! hadoop fs -rm -r /artist_count

! mapred streaming \
  -input /filtered/part-00000 \
  -output /artist_count \
  -mapper "/opt/conda/bin/python3.6 mapper2.py" \
  -reducer "/opt/conda/bin/python3.6 reducer2.py" \
  -file mapper2.py \
  -file reducer2.py

rm: `/artist_count': No such file or directory
2021-05-08 19:45:26,163 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [mapper2.py, reducer2.py] [/usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.3.0.jar] /tmp/streamjob7442406271332210491.jar tmpDir=null
2021-05-08 19:45:26,969 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at /0.0.0.0:8032
2021-05-08 19:45:27,111 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at /0.0.0.0:8032
2021-05-08 19:45:27,279 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/jovyan/.staging/job_1620502994344_0002
2021-05-08 19:45:28,300 INFO mapred.FileInputFormat: Total input files to process : 1
2021-05-08 19:45:28,311 INFO net.NetworkTopology: Adding a new node: /default-rack/127.0.0.1:9866
2021-05-08 19:45:29,134 INFO mapreduce.JobSubmitter: number of splits:2
2021-05-08 19:45:29

In [None]:
! hadoop fs -cat /artist_count/part-00000 | sort -nk2 -r | head -5 | cut -f1

In [None]:
! hadoop fs -cat /artist_count/part-00000 

In [None]:
%%file result.json
{
    "q1": 3117,
    "q2": [
        11368,
        3629,
        259,
        44148,
        23524
    ]
}