# Homework

In the lectures, we discussed the Jaccard measure and how to calculate it efficiently on MapReduce. Examples could be found in jaccard-seminar.ipynb

You are invited to calculate the Jaccard measure on Spark to find similar performers in the entire dataset and answer the following questions:
1. **How many performers remain in consideration after applying all filters from the job description?**
2. **For how many pairs of performers did you manage to calculate non-zero similarity according to Jaccard? Here, all possible pairs (a, b) and (b, a) are taken into account, as well as (a, a), to check the correctness.**
3. **Find the 10 most similar artists to "Tommy Cash" by Jaccard's calculated measure on MapReduce. As a result, write down the names of 10 artists other than "Tommy Cash".**

PySpark MLlib contains feature for Jaccard measure calculation - MinHashLSH. You are invited to calculate the Jaccard measure using this feature with threshold 0.95 and 20 hash tables to find similar performers in the entire dataset and answer the following question:

4. **Find the 10 most similar artists to "Tommy Cash" by Jaccard's calculated measure using MinHashLSH. As a result, write down the names of 10 artists other than "Tommy Cash".**

- Use the data loaded in the <a href="#Loading-data">Loading-data</a> section.
- Users who listened to $N$ artists will contribute to the similarity of $N^2$ pairs of artists. Therefore, rare very active users will greatly slow down our algorithm. For such users, in practice, take a subset of plays, for example, 1000. We will do it easier and will only consider plays where $plays > 2$, thus leaving only the most confident user preferences.
- To make the similarities more confident, we will consider them only for those performers who were strictly listened to by more than 50 people (taking into account the previous filter by auditions).
- To debug the algorithm on a smaller amount of data, you can use the transformation <a href="https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.sample">events.sample(False , 0.01)</a> so as not to wait long for debug runs.
- We can assume that data about performers (for example, their popularity) will fit in the memory of each machine. There just aren't that many performers in the world that won't fit.
- If a step takes a very long time, you can increase the degree of parallelism, for example,
<a href="https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.groupByKey">groupByKey(numPartitions=100)</a> to see more granular progress execution.
- Sometimes it makes sense to save the calculated result in HDFS, so as not to recalculate it again every time it is needed.
- When working with big data, patience is required, the author's solution works for about 10 minutes.
- This problem can also be solved in Spark SQL, if you like it better.
- Use HashingTF in the Pipeline to pass data to MinHashLSH
- Note that 'Tommy Cash' has two artisIds


Save the solution to the `result.json` file. 
File content example:
```json
{
    "q1": 123,
    "q2": 456,
    "q3": [
        "artistName1",
        "artistName2",
        "artistName3",
        "artistName4",
        "artistName5",
        "artistName6",
        "artistName7",
        "artistName8",
        "artistName9",
        "artistName10"
    ]
    "q4": [
        "artistName1",
        "artistName2",
        "artistName3",
        "artistName4",
        "artistName5",
        "artistName6",
        "artistName7",
        "artistName8",
        "artistName9",
        "artistName10"
    ]
}
```

### Intro

In [1]:
import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext(appName='jupyter')

from pyspark.sql import SparkSession, Row
se = SparkSession(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2023-12-15 15:35:02,044 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [2]:
! hadoop fs -copyFromLocal yandex_music /

! hadoop fs -ls -h /yandex_music

copyFromLocal: `/yandex_music/events.csv': File exists
copyFromLocal: `/yandex_music/artists.jsonl': File exists
copyFromLocal: `/yandex_music/README.txt': File exists
Found 3 items
-rw-r--r--   1 jovyan supergroup        254 2023-12-14 13:21 /yandex_music/README.txt
-rw-r--r--   1 jovyan supergroup      3.7 M 2023-12-14 13:21 /yandex_music/artists.jsonl
-rw-r--r--   1 jovyan supergroup     47.6 M 2023-12-14 13:21 /yandex_music/events.csv


In [3]:
events = se.read.csv("hdfs:///yandex_music/events.csv", header=True, 
                      schema='userId bigint, artistId bigint, plays INT, skips INT')
events.registerTempTable("events")

In [4]:
events.show(1)

[Stage 0:>                                                          (0 + 1) / 1]

+------+--------+-----+-----+
|userId|artistId|plays|skips|
+------+--------+-----+-----+
|     0|     335|    1|    0|
+------+--------+-----+-----+
only showing top 1 row



                                                                                

In [5]:
artists = se.read.json("hdfs:///yandex_music/artists.jsonl")
artists.registerTempTable("artists")
artists.limit(5).toPandas()

Unnamed: 0,artistId,artistName
0,0,Mack Gordon
1,1,Kenny Dorham
2,2,Max Roach
3,3,Francis Rossi
4,4,Status Quo


In [19]:
from pyspark.sql.functions import col, countDistinct
from pyspark.ml.feature import HashingTF, MinHashLSH
from pyspark.ml import Pipeline
from pyspark.sql import functions as F

In [None]:
# here goes your solution

### Solution

In [1]:
import findspark
findspark.init()

import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.ml.feature import HashingTF, MinHashLSH
from pyspark.ml import Pipeline
import json
import random
import time

In [2]:
sparkContext = pyspark.SparkContext(appName='jupyter')
spark = SparkSession(sparkContext)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2024-10-29 14:50:20,547 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [3]:
start_time = time.time()

In [4]:
def generate_pairs(artist_list):
    pairs = []
    artist_list = list(set(artist_list))
    
    for i in range(len(artist_list)):
        for j in range(i, len(artist_list)):
            pairs.append((artist_list[i], artist_list[j]))
            
            if artist_list[i] != artist_list[j]:
                pairs.append((artist_list[j], artist_list[i]))
                
    return pairs


def calculate_jaccard(pair, intersection_count):
    a, b = pair
    union_count = artist_count_dict[a] + artist_count_dict[b] - intersection_count
    
    return float(intersection_count) / union_count


def sample_from_dict(d, sample=10):
    keys = random.sample(list(d), sample)
    values = [d[k] for k in keys]
    
    return dict(zip(keys, values))


pair_udf = F.udf(generate_pairs, T.ArrayType(T.ArrayType(T.IntegerType())))
jaccard_udf = F.udf(calculate_jaccard, 'double')

In [5]:
events_filtered_sdf = (
    spark
    .read
    .csv(
        "file:///home/jovyan/work/yandex_music/events.csv",
        header=True,
        schema='userId bigint, artistId bigint, plays INT, skips INT'
    )
    .filter(F.col('plays') > 2)
)
events_filtered_sdf.cache()
events_filtered_sdf.count()

                                                                                

671287

In [6]:
artists_sdf = spark.read.json("file:///home/jovyan/work/yandex_music/artists.jsonl")
artists_sdf.cache()
artists_sdf.count()

70818

In [7]:
popular_artists_sdf = (
    events_filtered_sdf
    .groupBy('artistId')
    .agg(F.countDistinct('userId').alias('user_count'))
    .filter(F.col('user_count') > 50)
)
popular_artists_sdf.cache()
popular_artists_sdf.show(5, False)



+--------+----------+
|artistId|user_count|
+--------+----------+
|28078   |332       |
|11567   |73        |
|10156   |874       |
|18196   |231       |
|11434   |114       |
+--------+----------+
only showing top 5 rows



                                                                                

In [8]:
popular_artist_ids_lst = [row.artistId for row in popular_artists_sdf.collect()]
popular_artist_ids_lst[:5]

                                                                                

[28078, 11567, 10156, 18196, 11434]

In [9]:
# Number of performers remaining after filters
q1 = len(popular_artist_ids_lst)
q1

2889

In [10]:
events_filtered_popular_sdf = events_filtered_sdf.filter(F.col('artistId').isin(popular_artist_ids_lst))
events_filtered_popular_sdf.cache()
events_filtered_popular_sdf.show(5, False)

+------+--------+-----+-----+
|userId|artistId|plays|skips|
+------+--------+-----+-----+
|0     |2130    |4    |10   |
|0     |2267    |5    |3    |
|0     |2810    |5    |3    |
|0     |3568    |5    |9    |
|0     |3629    |9    |8    |
+------+--------+-----+-----+
only showing top 5 rows



In [11]:
user_artists_sdf = events_filtered_popular_sdf.groupBy('userId').agg(F.collect_set('artistId').alias('artist_list'))
user_artists_sdf.cache()
user_artists_sdf.show(5, 120)



+------+------------------------------------------------------------------------------------------------------------------------+
|userId|                                                                                                             artist_list|
+------+------------------------------------------------------------------------------------------------------------------------+
|    26|[59767, 17881, 64885, 27311, 50889, 5330, 32916, 20132, 6107, 59020, 20938, 25883, 30747, 52911, 33239, 23578, 54654,...|
|    29|[20088, 5584, 133, 7712, 23530, 6107, 67848, 410, 69670, 60313, 20003, 404, 8389, 30747, 70609, 52911, 61710, 15229, ...|
|   474|                                     [21104, 43008, 64627, 59357, 49155, 49152, 59356, 41574, 5848, 50053, 49153, 49150]|
|  1677|[60144, 23536, 21198, 43789, 64123, 32916, 20003, 23524, 39850, 30747, 6263, 58333, 979, 54879, 22706, 9961, 66874, 5...|
|  1697|[60144, 12095, 29112, 32916, 1120, 20132, 53271, 62243, 20713, 23530, 67848, 41047

                                                                                

In [12]:
user_artist_pairs_sdf = user_artists_sdf.withColumn('artist_pairs', pair_udf('artist_list')).select('artist_pairs')
artist_pairs_exploded_sdf = user_artist_pairs_sdf.select(F.explode('artist_pairs').alias('pair'))

artist_pairs_exploded_sdf.cache()
artist_pairs_exploded_sdf.show(10, False)

[Stage 17:>                                                         (0 + 1) / 1]

+-------------+
|pair         |
+-------------+
|[8202, 8202] |
|[8202, 57876]|
|[57876, 8202]|
|[8202, 23578]|
|[23578, 8202]|
|[8202, 30747]|
|[30747, 8202]|
|[8202, 41506]|
|[41506, 8202]|
|[8202, 23080]|
+-------------+
only showing top 10 rows



                                                                                

In [13]:
pair_counts_sdf = artist_pairs_exploded_sdf.groupBy('pair').count()
pair_counts_sdf.show(10, False)

[Stage 22:>                                                         (0 + 1) / 1]

+--------------+-----+
|pair          |count|
+--------------+-----+
|[8202, 32916] |16   |
|[8202, 20186] |31   |
|[41316, 8202] |23   |
|[63160, 57876]|82   |
|[51711, 57876]|47   |
|[23578, 13903]|63   |
|[30747, 45433]|174  |
|[41506, 28346]|4    |
|[30573, 41506]|2    |
|[7216, 23080] |20   |
+--------------+-----+
only showing top 10 rows



                                                                                

In [14]:
artist_counts_sdf = events_filtered_popular_sdf.groupBy('artistId').agg(F.countDistinct('userId').alias('count'))
artist_counts_sdf.cache()
artist_counts_sdf.count()

artist_count_dict = {row['artistId']: row['count'] for row in artist_counts_sdf.collect()}
sample_from_dict(artist_count_dict, 5)

                                                                                

{45595: 63, 12534: 117, 7915: 750, 56710: 57, 26097: 63}

In [15]:
jaccard_similarity_sdf = pair_counts_sdf.withColumn('jaccard', jaccard_udf('pair', 'count'))
jaccard_similarity_sdf.cache()
jaccard_similarity_sdf.show(10, False)



+--------------+-----+--------------------+
|pair          |count|jaccard             |
+--------------+-----+--------------------+
|[8202, 32916] |16   |0.0256              |
|[8202, 20186] |31   |0.026863084922010397|
|[41316, 8202] |23   |0.0344311377245509  |
|[63160, 57876]|82   |0.07677902621722846 |
|[51711, 57876]|47   |0.05147864184008762 |
|[23578, 13903]|63   |0.05389221556886228 |
|[30747, 45433]|174  |0.1673076923076923  |
|[41506, 28346]|4    |0.018779342723004695|
|[30573, 41506]|2    |0.010416666666666666|
|[7216, 23080] |20   |0.03944773175542406 |
+--------------+-----+--------------------+
only showing top 10 rows



                                                                                

In [16]:
# Number of pairs with non-zero similarity
q2 = jaccard_similarity_sdf.count()
q2

                                                                                

6838579

In [17]:
artist_to_name_dict = {row['artistId']: row['artistName'] for row in artists_sdf.collect()}
sample_from_dict(artist_to_name_dict, 5)

{53527: 'Casey James And The Staypuft Kid',
 25206: 'A Silent Film',
 23673: 'Richard Burton',
 32921: 'The Kinsmen',
 69739: 'Resonant Language'}

In [18]:
tommy_cash_ids_lst = [row.artistId for row in artists_sdf.filter(artists_sdf.artistName == 'Tommy Cash').collect()]
tommy_cash_ids_lst[:5]

[46091, 48419]

In [19]:
# Find similar artists to "Tommy Cash"
similar_to_tommy_sdf = (
    jaccard_similarity_sdf
    .filter(F.col('pair').getItem(0).isin(tommy_cash_ids_lst))
    .orderBy(F.col('jaccard').desc())
)
similar_to_tommy_sdf.cache()
similar_to_tommy_sdf.show(5, False)



+--------------+-----+-------------------+
|pair          |count|jaccard            |
+--------------+-----+-------------------+
|[48419, 48419]|134  |1.0                |
|[48419, 55471]|115  |0.1592797783933518 |
|[48419, 46516]|46   |0.1488673139158576 |
|[48419, 40001]|47   |0.14826498422712933|
|[48419, 46604]|40   |0.13468013468013468|
+--------------+-----+-------------------+
only showing top 5 rows



                                                                                

In [20]:
similar_to_tommy_sdf = similar_to_tommy_sdf.filter(~F.col('pair').getItem(1).isin(tommy_cash_ids_lst))
similar_to_tommy_sdf.cache()
similar_to_tommy_sdf.show(5, False)

+--------------+-----+-------------------+
|pair          |count|jaccard            |
+--------------+-----+-------------------+
|[48419, 55471]|115  |0.1592797783933518 |
|[48419, 46516]|46   |0.1488673139158576 |
|[48419, 40001]|47   |0.14826498422712933|
|[48419, 46604]|40   |0.13468013468013468|
|[48419, 52974]|63   |0.13097713097713098|
+--------------+-----+-------------------+
only showing top 5 rows



In [21]:
# Get the top 10 similar artists
top10_similar_lst = similar_to_tommy_sdf.select('pair', 'jaccard').limit(10).collect()
q3 = [artist_to_name_dict[row['pair'][1]] for row in top10_similar_lst]
q3

['Little Big',
 'IC3PEAK',
 'Die Antwoord',
 'Пошлая Молли',
 'Хлеб',
 'The Hatters',
 'Хаски',
 'Boulevard Depo',
 'Big Baby Tape',
 '$uicideboy$']

In [22]:
artist_users_sdf = events_filtered_popular_sdf.groupBy('artistId').agg(F.collect_set('userId').alias('user_list'))
artist_users_sdf.cache()
artist_users_sdf.show(5, 120)

+--------+------------------------------------------------------------------------------------------------------------------------+
|artistId|                                                                                                               user_list|
+--------+------------------------------------------------------------------------------------------------------------------------+
|    1806|[4647, 1453, 4845, 3604, 991, 1470, 2734, 2480, 4535, 152, 3927, 4021, 2376, 1389, 394, 373, 323, 1056, 1556, 3611, 5...|
|    3091|[335, 2767, 2971, 3683, 125, 2688, 1803, 2507, 3219, 3546, 3577, 4108, 1487, 2932, 2170, 929, 1662, 2832, 2324, 4154,...|
|   10156|[3042, 437, 793, 2644, 3123, 785, 3958, 2892, 2007, 635, 2842, 504, 4156, 3627, 48, 2734, 883, 4841, 3694, 123, 3288,...|
|   11276|[3681, 2665, 2007, 3604, 1753, 3981, 3444, 3394, 2530, 4360, 48, 2734, 3090, 558, 3265, 1872, 21, 3084, 2984, 1410, 2...|
|   11434|[3275, 3398, 3021, 4262, 1649, 31, 2128, 2361, 2463, 2, 1751, 4895

In [23]:
hashingTF = HashingTF(inputCol="user_list", outputCol="features")
minHashLSH = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=20, seed=57)
pipeline = Pipeline(stages=[hashingTF, minHashLSH])
model = pipeline.fit(artist_users_sdf)

In [24]:
artist_users_hashed_sdf = model.transform(artist_users_sdf)
artist_users_hashed_sdf.cache()
artist_users_hashed_sdf.show(5, 80)

[Stage 63:>                                                         (0 + 1) / 1]

+--------+--------------------------------------------------------------------------------+--------------------------------------------------------------------------------+--------------------------------------------------------------------------------+
|artistId|                                                                       user_list|                                                                        features|                                                                          hashes|
+--------+--------------------------------------------------------------------------------+--------------------------------------------------------------------------------+--------------------------------------------------------------------------------+
|    1806|[4647, 1453, 4845, 3604, 991, 1470, 2734, 2480, 4535, 152, 3927, 4021, 2376, ...|(262144,[6345,7547,12618,15920,25102,29077,33734,34471,37953,39049,44986,4536...|[[4.5799304E7], [2.8116252E7], [3.6689259E7], [4.3558038E7], [1.79

                                                                                

In [25]:
tommy_cash_features_sdf = artist_users_hashed_sdf.filter(artist_users_hashed_sdf.artistId.isin(tommy_cash_ids_lst))
tommy_cash_features_sdf.cache()
tommy_cash_features_sdf.show(5, 80)



+--------+--------------------------------------------------------------------------------+--------------------------------------------------------------------------------+--------------------------------------------------------------------------------+
|artistId|                                                                       user_list|                                                                        features|                                                                          hashes|
+--------+--------------------------------------------------------------------------------+--------------------------------------------------------------------------------+--------------------------------------------------------------------------------+
|   48419|[2644, 2157, 3733, 387, 227, 2769, 4062, 2211, 2538, 3881, 1187, 3090, 4768, ...|(262144,[327,4105,4259,4386,6020,6810,8631,9062,10545,11543,14117,17282,22745...|[[6799900.0], [1.3465895E7], [2.6221751E7], [3825278.0], [661544.0

                                                                                

In [26]:
similar_lsh_sdf = (
    model
    .stages[-1]
    .approxSimilarityJoin(
        tommy_cash_features_sdf,
        artist_users_hashed_sdf,
        0.95,
        distCol="JaccardDistance"
    ).select(
        F.col("datasetB.artistId").alias("artistId"),
        F.col("JaccardDistance")
    )
    .filter(~F.col('artistId').isin(tommy_cash_ids_lst))
    .distinct()
    .orderBy(F.col('JaccardDistance'))
)

similar_lsh_sdf.cache()
similar_lsh_sdf.show(5, False)



+--------+------------------+
|artistId|JaccardDistance   |
+--------+------------------+
|55471   |0.8407202216066482|
|46516   |0.8511326860841424|
|40001   |0.8512658227848101|
|46604   |0.8648648648648649|
|52974   |0.86875           |
+--------+------------------+
only showing top 5 rows



                                                                                

In [27]:
# Get the top 10 similar artists
top10_similar_lsh_sdf = similar_lsh_sdf.limit(10).collect()
q4 = [artist_to_name_dict[row['artistId']] for row in top10_similar_lsh_sdf]
q4

['Little Big',
 'IC3PEAK',
 'Die Antwoord',
 'Пошлая Молли',
 'Хлеб',
 'The Hatters',
 'Хаски',
 'Big Baby Tape',
 '$uicideboy$',
 'GONE.Fludd']

In [28]:
result = {
    "q1": q1,
    "q2": q2,
    "q3": q3,
    "q4": q4
}

result

{'q1': 2889,
 'q2': 6838579,
 'q3': ['Little Big',
  'IC3PEAK',
  'Die Antwoord',
  'Пошлая Молли',
  'Хлеб',
  'The Hatters',
  'Хаски',
  'Boulevard Depo',
  'Big Baby Tape',
  '$uicideboy$'],
 'q4': ['Little Big',
  'IC3PEAK',
  'Die Antwoord',
  'Пошлая Молли',
  'Хлеб',
  'The Hatters',
  'Хаски',
  'Big Baby Tape',
  '$uicideboy$',
  'GONE.Fludd']}

In [29]:
end_time = time.time()
print("--- %s seconds ---" % (end_time - start_time))

--- 267.03494358062744 seconds ---


In [30]:
with open('result.json', 'w') as f:
    json.dump(result, f)

In [31]:
! curl -F file=@result.json 51.250.123.136:80/MDS-LSML1/rkulkov/w6/1

1.0
Correct q1 answer! Correct q2 answer! Correct q3 answer! Correct q4 answer!
