<a target="_blank" href="../cluster" style="font-size:20px">All Applications (YARN)</a>

# Домашнее задание

В лекциях мы обсуждали меру Жаккара и то, как ее эффективно считать на MapReduce.

Вам предлагается посчитать меру Жаккара на Spark для поиска похожих исполнителей во всем наборе данных и ответить на следующие вопросы:
1. **Сколько исполнителей остаются в рассмотрении после применения всех фильтров из описания задания?**
2. **Для скольких пар исполнителей удалось насчитать ненулевую похожесть по Жаккару? Здесь учитываются всевозможные пары (a, b) и (b, a), а также (a, a), для проверки корректности.**
3. **Найдите 5 самых похожих исполнителей на "Maroon 5" по посчитанной мере Жаккара. В результат запишите имена 5 исполнителей отличных от "Maroon 5".**

Несколько напутственных слов:
- Используйте данные, загруженные в разделе <a href="#Загружаем-данные">Загружаем данные</a>.
- Пользователи, прослушавшие $N$ исполнителей внесут вклад в похожесть $N^2$ пар артистов. Поэтому редкие очень активные пользователи будут сильно замедлять наш алгоритм. Для таких пользователей на практике берут подмножество прослушиваний, например, 1000. Мы поступим проще и будем учитывать только прослушивания, где $plays > 2$, таким образом оставим только наиболее уверенные предпочтения пользователя.
- Чтобы похожести были более уверенными, будем считать их только для тех исполнителей, которых прослушали строго больше 50 человек (с учетом предыдущего фильтра по прослушиваниям).
- Для отладки алгоритма на меньшем объеме данных можно использовать трансформацию <a href="https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.sample">events.sample(False, 0.01)</a>, чтобы не ждать долго отладочных запусков.
- Можно считать, что данные об исполнителях (например, их популярность) поместятся в памяти каждой машины. Просто нет так много исполнителей в мире, чтобы не поместиться.
- Если какой-то шаг выполняется очень долго, можно увеличить степень параллелизма, например, 
<a href="https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.groupByKey">groupByKey(numPartitions=100)</a>, чтобы увидеть более гранулярный прогресс выполнения.
- Иногда посчитанный результат имеет смысл сохранить в HDFS, чтобы не пересчитывать его заново каждый раз, когда он нужен.
- При работе с большими данными требуется терпение, авторское решение работает около 10 минут.
- Эту задачу можно решить и на Spark SQL, если вам он нравится больше.

Решение сохраните в файл `result.json`. Пример содержимого файла:
```json
{
    "q1": 123,
    "q2": 456,
    "q3": [
        "artistName1",
        "artistName2",
        "artistName3",
        "artistName4",
        "artistName5"
    ]
}
```

# Копируем файлы в HDFS

In [None]:
! hadoop fs -copyFromLocal yandex_music /

In [None]:
! hadoop fs -ls -h /yandex_music

Found 3 items
-rw-r--r--   1 jovyan supergroup        254 2022-02-16 18:12 /yandex_music/README.txt
-rw-r--r--   1 jovyan supergroup      3.7 M 2022-02-16 18:12 /yandex_music/artists.jsonl
-rw-r--r--   1 jovyan supergroup     47.6 M 2022-02-16 18:12 /yandex_music/events.csv


# Загружаем данные

In [None]:
import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext(appName='jupyter')

from pyspark.sql import SparkSession, Row
se = SparkSession(sc)

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=jupyter, master=yarn) created by __init__ at <ipython-input-5-3835b93bac9b>:5 

In [None]:
artists = se.read.json("hdfs:///yandex_music/artists.jsonl")
artists.registerTempTable("artists")
artists.limit(5).toPandas()

Unnamed: 0,artistId,artistName
0,0,Mack Gordon
1,1,Kenny Dorham
2,2,Max Roach
3,3,Francis Rossi
4,4,Status Quo


In [None]:
events = se.read.csv("hdfs:///yandex_music/events.csv", header=True, 
                     schema='userId bigint, artistId bigint, plays INT, skips INT')
events.registerTempTable("events")
events.limit(5).toPandas()

Unnamed: 0,userId,artistId,plays,skips
0,0,335,1,0
1,0,708,1,0
2,0,710,2,1
3,0,815,1,1
4,0,880,1,1


In [None]:
events.show(1)

+------+--------+-----+-----+
|userId|artistId|plays|skips|
+------+--------+-----+-----+
|     0|     335|    1|    0|
+------+--------+-----+-----+
only showing top 1 row



# Фильтруем

Выделим артистов с > 50 человек

In [None]:
n = 50

popular_artists = se.sql(f"""
    select artistId, count(distinct userId) as users, sum(plays) as total_plays
    from events
    where plays > 2
    group by artistId
    having users > {n}
"""
)

popular_artists.registerTempTable("popular_artists")

In [None]:
popular_artists.show(5)

+--------+-----+-----------+
|artistId|users|total_plays|
+--------+-----+-----------+
|   10156|  874|      11483|
|   11434|  114|       1431|
|   23019|   90|       1218|
|   28078|  332|       3337|
|   21899|   73|        308|
+--------+-----+-----------+
only showing top 5 rows



Отфильтруем данные по ним

In [None]:
filtered_data = se.sql(
f"""
select 
    userId,
    e.artistId,
    plays
from events as e
join popular_artists pa on e.artistId = pa.artistId
where plays > 2
and pa.users > {n}
""")

filtered_data.registerTempTable("popular_data")

# IOU

In [None]:
def calc_iou(pow_a: int, pow_b: int, pow_intersect: int) -> float:
    union = pow_a + pow_b - pow_intersect
    if union == 0:
        return 0
    return pow_intersect / union

## Посчитаем на подвыборке

In [None]:
filtered_data.persist()

DataFrame[userId: bigint, artistId: bigint, plays: int]

In [None]:
filtered_data.take(5)

[Row(userId=24, artistId=1806, plays=5),
 Row(userId=103, artistId=1806, plays=3),
 Row(userId=109, artistId=1806, plays=9),
 Row(userId=152, artistId=1806, plays=5),
 Row(userId=199, artistId=1806, plays=17)]

In [None]:
filtered_sample = filtered_data.sample(False, 0.05)

In [None]:
users_artists = (
filtered_sample.rdd
    .map(lambda x: (x.userId, (x.artistId, x.plays)))
    .groupByKey()
    .map(lambda x: (x[0], list(x[1])))    
)

Посмотрим на корректность

In [None]:
users_artists.take(1)

[(2848,
  [(21042, 18), (1120, 3), (12819, 7), (8202, 9), (13585, 3), (42408, 28)])]

In [None]:
filtered_sample.where('userId=2848').take(20)

[Row(userId=2848, artistId=13585, plays=3),
 Row(userId=2848, artistId=21042, plays=18),
 Row(userId=2848, artistId=42408, plays=28),
 Row(userId=2848, artistId=1120, plays=3),
 Row(userId=2848, artistId=8202, plays=9),
 Row(userId=2848, artistId=12819, plays=7)]

## Преобразуем данные в новые ключи и посчитаем пересечения

In [None]:
from itertools import combinations, combinations_with_replacement

def get_artists_pairs(l: list):
    pairs = combinations(l, 2)
    keys = []
    
    for pair in pairs:
        key_one, val_one = pair[0]
        key_two, val_two = pair[1]
        keys.append((tuple(sorted((key_one, key_two))), 1))
    
    return keys

In [None]:
from operator import add

artists_intersections = ( 
    users_artists
    .flatMap(lambda x: get_artists_pairs(x[1]))
    .reduceByKey(add)
    .collect()
)

In [None]:
artists_intersections = {key : val for key, val in artists_intersections}

## Посчитаем мощности

In [None]:
artists_plays = se.sql(
"""
    select artistId, users, total_plays
    from popular_artists
"""
).toPandas()

In [None]:
artists_unions = {}
for data in artists_plays.iterrows():
    row = data[1]
    artists_unions[row.artistId] = row.users

## Перейдем к полным данным

In [None]:
users_artists_full = (
filtered_data.rdd
    .map(lambda x: (x.userId, (x.artistId, x.plays)))
    .groupByKey()
    .map(lambda x: (x[0], list(x[1])))
)

In [None]:
artists_intersections_full = ( 
    users_artists_full
    .flatMap(lambda x: get_artists_pairs(x[1]))
    .reduceByKey(add)
    .collect()
)

artists_intersections_full = {key : val for key, val in artists_intersections_full}

In [None]:
ious = {}

for key, val in artists_intersections_full.items():
    a, b = key
    ious[key] = calc_iou(artists_unions[a], artists_unions[b], val)

# Answers

**Сколько исполнителей остаются в рассмотрении после применения всех фильтров из описания задания?**

In [None]:
artists_answer = se.sql(
"""
    select pa.artistId, artistName, users
    from popular_artists pa
    left join artists using(artistId) 
"""
).toPandas()

In [None]:
artists_answer.artistId.unique().shape

(2889,)

**Для скольких пар исполнителей удалось насчитать ненулевую похожесть по Жаккару? Здесь учитываются всевозможные пары (a, b) и (b, a), а также (a, a), для проверки корректности.**

In [None]:
len([val for val in ious.values() if val > 0])

3417845

**Найдите 5 самых похожих исполнителей на "Maroon 5" по посчитанной мере Жаккара. В результат запишите имена 5 исполнителей отличных от "Maroon 5".**

In [None]:
band_name = "Maroon 5"
artists_answer.query(f'artistName == "{band_name}"')

Unnamed: 0,artistId,artistName,users
1490,14803,Maroon 5,919


In [None]:
band_id = 14803

In [None]:
band_ious = [(key, val) for key, val in ious.items() if band_id in key]

In [None]:
from operator import itemgetter

best_band_ious = sorted(band_ious, key=itemgetter(1), reverse=True)[:10]

In [None]:
best_band_ious

[((3568, 14803), 0.33197556008146639),
 ((3629, 14803), 0.31266017426960535),
 ((259, 14803), 0.29184782608695653),
 ((14803, 22629), 0.28674481514878269),
 ((14803, 59783), 0.28589032655576091),
 ((11368, 14803), 0.28221597751906863),
 ((14803, 64627), 0.27981995498874718),
 ((404, 14803), 0.27945619335347432),
 ((14803, 70542), 0.27321981424148606),
 ((4073, 14803), 0.26656025538707101)]

In [None]:
band_mates = [key for data in best_band_ious for key in data[0] if key != band_id]

In [None]:
band_mates

[3568, 3629, 259, 22629, 59783, 11368, 64627, 404, 70542, 4073]

In [None]:
artists_answer.set_index('artistId').loc[band_mates]

Unnamed: 0_level_0,artistName,users
artistId,Unnamed: 1_level_1,Unnamed: 2_level_1
3568,OneRepublic,1043
3629,Sia,1642
259,David Guetta,1458
22629,Bruno Mars,508
59783,Calvin Harris,1168
11368,Imagine Dragons,2275
64627,Ed Sheeran,787
404,Coldplay,775
70542,Sam Smith,726
4073,Katy Perry,668


**Пишем ответ**

In [None]:
%%writefile result.json
{
    "q1": 2889,
    "q2": 3417845,
    "q3": [
        "OneRepublic",
        "Sia",
        "David Guetta",
        "Bruno Mars",
        "Calvin Harris"
    ]
}

Overwriting result.json


In [None]:
# останавливаем Spark (и YARN приложение)
# sc.stop()