# Create SparkContext, SparkSession

https://spark.apache.org/docs/latest/rdd-programming-guide.html

http://spark.apache.org/docs/latest/sql-getting-started.html

In [1]:
import findspark
findspark.init()

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc = SparkContext(appName="RecSys")
se = SparkSession(sc)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2023-04-08 12:57:17,671 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [2]:
! aws s3 cp s3://ydatazian/yandex_music yandex_music --recursive


Provided region_name 'ru-central1-' doesn't match a supported format.


# Yandex.Music dataset

In [3]:
! ls -lh yandex_music

total 52M
-rw-rw-r-- 1 jovyan root 3.7M Oct 13 07:35 artists.jsonl
-rw-rw-r-- 1 jovyan root  48M Oct 13 07:35 events.csv
-rw-rw-r-- 1 jovyan root  254 Oct 13 07:35 README.txt


In [4]:
! head -n 5 yandex_music/artists.jsonl

{"artistId":0,"artistName":"Mack Gordon"}
{"artistId":1,"artistName":"Kenny Dorham"}
{"artistId":2,"artistName":"Max Roach"}
{"artistId":3,"artistName":"Francis Rossi"}
{"artistId":4,"artistName":"Status Quo"}


In [5]:
! head -n 5 yandex_music/events.csv

userId,artistId,plays,skips
0,335,1,0
0,708,1,0
0,710,2,1
0,815,1,1


# Copy data to HDFS

In [2]:
! hadoop fs -copyFromLocal yandex_music /

copyFromLocal: `/yandex_music/events.csv': File exists
copyFromLocal: `/yandex_music/artists.jsonl': File exists
copyFromLocal: `/yandex_music/README.txt': File exists
copyFromLocal: `/yandex_music/.ipynb_checkpoints/artists-checkpoint.jsonl': File exists


In [3]:
! hadoop fs -ls -h /yandex_music

Found 4 items
drwxr-xr-x   - jovyan supergroup          0 2023-04-08 12:14 /yandex_music/.ipynb_checkpoints
-rw-r--r--   1 jovyan supergroup        254 2023-04-08 12:14 /yandex_music/README.txt
-rw-r--r--   1 jovyan supergroup      3.7 M 2023-04-08 12:14 /yandex_music/artists.jsonl
-rw-r--r--   1 jovyan supergroup     47.6 M 2023-04-08 12:14 /yandex_music/events.csv


# Load dataset

In [2]:
artists = se.read.json("hdfs:///yandex_music/artists.jsonl")
artists.registerTempTable("artists")
artists.limit(5).toPandas()

                                                                                

Unnamed: 0,artistId,artistName
0,0,Mack Gordon
1,1,Kenny Dorham
2,2,Max Roach
3,3,Francis Rossi
4,4,Status Quo


In [3]:
events = se.read.csv("hdfs:///yandex_music/events.csv", header=True, 
                     schema='userId bigint, artistId bigint, plays INT, skips INT')
events.registerTempTable("events")
events.limit(5).toPandas()

                                                                                

Unnamed: 0,userId,artistId,plays,skips
0,0,335,1,0
1,0,708,1,0
2,0,710,2,1
3,0,815,1,1
4,0,880,1,1


In [8]:
print((events.count(), len(events.columns)))



(671287, 4)


                                                                                

In [6]:
events = events.filter(events.plays > 2)

In [9]:
events = events.toPandas()

                                                                                

In [11]:
import pandas as pd

events.head()

Unnamed: 0,userId,artistId,plays,skips
0,0,2130,4,10
1,0,2267,5,3
2,0,2810,5,3
3,0,3568,5,9
4,0,3629,9,8


In [18]:
grouped = events.groupby('artistId')
grouped

<pandas.core.groupby.generic.DataFrameGroupBy object at 0x7f24ef698c70>

In [20]:
events = grouped.filter(lambda x: x['userId'].count() > 50)

In [22]:
events.shape

(498589, 4)

In [23]:
events.to_csv("filtered_events.csv",index=False)

In [None]:
# оставляем только plays > 2
# оставляем только artistId, которых слушали > 50 раз

In [10]:
# statistics
se.sql("""
select
    count(distinct userId) as users,
    count(distinct artistId) as artists,
    count(*) as interactions,
    count(*) / (count(distinct userId) * count(distinct artistId)) as density
from 
    events
""").toPandas()

                                                                                

Unnamed: 0,users,artists,interactions,density
0,4999,53031,3412504,0.012872


In [11]:
# most popular artists
se.sql("""
select
    artists.artistName,
    sum(plays) as popularity
from 
    events join artists on events.artistId = artists.artistId
group by artistName
order by popularity desc
limit 30
""").toPandas()

                                                                                

Unnamed: 0,artistName,popularity
0,Imagine Dragons,43447
1,Би-2,29415
2,Баста,27264
3,Ленинград,26311
4,Сплин,25062
5,Queen,24905
6,Sia,22803
7,LOBODA,21923
8,Noize MC,21774
9,Linkin Park,21584


# Train iALS

Assume, rating is encoded into `plays` column

In [12]:
import numpy as np

In [13]:
%%time
train, test = events.rdd.randomSplit([0.95, 0.05], seed=0)

# speed-up, we request it often
train.cache()
test.cache()

train.count()
test.count()



CPU times: user 39.2 ms, sys: 6.6 ms, total: 45.8 ms
Wall time: 21.6 s


                                                                                

170458

In [14]:
train.take(5)

[Row(userId=0, artistId=335, plays=1, skips=0),
 Row(userId=0, artistId=708, plays=1, skips=0),
 Row(userId=0, artistId=710, plays=2, skips=1),
 Row(userId=0, artistId=815, plays=1, skips=1),
 Row(userId=0, artistId=880, plays=1, skips=1)]

In [15]:
%%time

from pyspark.mllib.recommendation import ALS

model = ALS().trainImplicit(
    train.map(lambda x: (x.userId, x.artistId, np.log2(x.plays + 1))),
    rank=32, iterations=10, lambda_=0.01, alpha=10.0, seed=0
)

                                                                                

CPU times: user 500 ms, sys: 98.1 ms, total: 598 ms
Wall time: 1min 9s


In [16]:
# we take all artist profiles
import numpy as np

artist_to_name = {}
for row in artists.collect():
    artist_to_name[row.artistId] = row.artistName

artist_ids = []
artist_names = []
artist_profiles = []

for artistId, profile in model.productFeatures().collect():
    artist_ids.append(artistId)
    artist_names.append(artist_to_name[artistId])
    artist_profiles.append(profile)

artist_ids = np.array(artist_ids)
artist_names = np.array(artist_names)
artist_profiles = np.vstack(artist_profiles)
print(artist_profiles.shape)

                                                                                

(52665, 32)


# Artists similarity

In [17]:
target_artists = {index: v 
                  for index, v in enumerate(artist_names) 
                  if "Coldplay" == v or "50 Cent" == v or "AC/DC" == v}
target_artists

{11563: '50 Cent', 22207: 'AC/DC', 32914: 'Coldplay'}

In [18]:
import scipy
import scipy.spatial

for index, name in target_artists.items():
    print("#############", name, "#############")
    
    cosines = (-scipy.spatial.distance.cdist([artist_profiles[index]], artist_profiles, metric='cosine') + 1)[0]
    cosines[np.isnan(cosines)] = -1e20

    for idx in np.argsort(cosines)[::-1][:10]:
        print(artist_names[idx], "\t", cosines[idx])

############# 50 Cent #############
50 Cent 	 1.0
Dr. Dre 	 0.8717692118912255
Lloyd Banks 	 0.8625710691278187
Jay-Z 	 0.8352357952062348
2Chainz 	 0.8225631381331043
Cashis 	 0.8124567354985288
Snoop Dogg 	 0.8104842145938375
Missy  Elliott 	 0.8056353255938117
Akon 	 0.7981630854199838
Busta Rhymes 	 0.7877800832384918
############# AC/DC #############
AC/DC 	 1.0
The Offspring 	 0.8808062857712116
Nirvana 	 0.8766071953113563
Metallica 	 0.8748606350664875
Red Hot Chili Peppers 	 0.8738631982477807
System of A Down 	 0.8536837930773402
Limp Bizkit 	 0.8534537400121748
Bon Jovi 	 0.8506781841069039
Nickelback 	 0.8467098655356567
Scorpions 	 0.842406452076229
############# Coldplay #############
Coldplay 	 1.0
Lana Del Rey 	 0.9573989776119042
Adele 	 0.9547716910424779
OneRepublic 	 0.9511146699908802
Maroon 5 	 0.9508569586033432
Sam Smith 	 0.9296947895827019
Katy Perry 	 0.9213200117105023
Ed Sheeran 	 0.9199729963886845
Pharrell Williams 	 0.9198391159734666
Twenty One Pilots 	

# NDCG

In [19]:
def dcg(ratings):
    return float(np.sum((2 ** np.array(ratings, np.float32) - 1) / np.log2(np.arange(1, len(ratings) + 1) + 1)))


def ndcg(ratings, at=None):
    idcg = dcg(sorted(ratings, reverse=True))
    return dcg(ratings) / idcg if idcg > 0 else 0


def ndcg_score(y_true, y_pred):
    assert y_true.shape == y_pred.shape
    order = np.argsort(y_pred)[::-1]
    return ndcg(y_true[order])


# tests
def test1():
    y_true = np.array([  0,   0,   2,   1,   0])
    y_pred = np.array([0.2, 0.1, 0.5, 0.3, 0.4])
    correct_ndcg = (3 / np.log(1 + 1) + 1 / np.log(3 + 1)) / (3 / np.log(1 + 1) + 1 / np.log(2 + 1))
    assert np.allclose(ndcg_score(y_true, y_pred), correct_ndcg)

    
def test2():
    y_true = np.array([  0,   0,   0,   0,   0])
    y_pred = np.array([0.2, 0.1, 0.5, 0.3, 0.4])
    assert np.allclose(ndcg_score(y_true, y_pred), 0.0)

    
def test3():
    y_true = np.array([  1,   0,   0,   0,   0])
    y_pred = np.array([0.2, 0.1, 0.5, 0.3, 0.4])
    correct_ndcg = (1 / np.log(4 + 1)) / (1 / np.log(1 + 1))
    assert np.allclose(ndcg_score(y_true, y_pred), correct_ndcg)


test1()
test2()
test3()

In [20]:
print(dcg([5, 4, 3, 2, 1]))
print(dcg([3, 4, 5, 2, 1]))
print(dcg([5, 4, 1, 2, 3]))

45.64282878502658
33.64282878502658
44.963945628433834


# Calc NDCG for baseline

Range artists by popularity

In [21]:
artist_to_popularity = (
    train
    .map(lambda x: (x.artistId, x.plays))
    .reduceByKey(lambda a, b: a + b)
    .collect()
)

artist_to_popularity = {a: p for a, p in artist_to_popularity}

                                                                                

In [22]:
predictions_and_ratings_per_user = (
    test
    .map(lambda x: (x.userId, (artist_to_popularity.get(x.artistId, 0), np.log2(x.plays + 1))))
    .groupByKey()
    .map(lambda x: (x[0], list(x[1])))
)

In [23]:
predictions_and_ratings_per_user.take(1)

                                                                                

[(0,
  [(2371, 1.0),
   (22102, 3.321928094887362),
   (609, 1.0),
   (7399, 1.0),
   (884, 1.584962500721156),
   (481, 1.584962500721156),
   (4008, 1.0),
   (2325, 1.584962500721156),
   (774, 1.0),
   (1035, 1.584962500721156),
   (4484, 3.584962500721156),
   (234, 1.0),
   (1523, 1.0),
   (2273, 1.584962500721156),
   (1243, 2.321928094887362),
   (5388, 2.321928094887362),
   (7856, 5.977279923499917),
   (781, 1.0),
   (4743, 1.584962500721156),
   (1234, 1.0),
   (1569, 0.0),
   (650, 0.0),
   (817, 0.0),
   (1579, 0.0),
   (1397, 0.0),
   (9438, 0.0),
   (9, 0.0),
   (773, 0.0),
   (543, 0.0),
   (46, 0.0),
   (574, 0.0),
   (9578, 0.0),
   (241, 0.0)])]

In [24]:
def ndcg_for_user(x):
    y_pred = np.array([e[0] for e in x])
    y_true = np.array([e[1] for e in x])
    return ndcg_score(y_true, y_pred)
    
(
    predictions_and_ratings_per_user
    .map(lambda x: ndcg_for_user(x[1]))
    .mean()
)

                                                                                

0.6610124103340557

# NDCG for iALS

In [25]:
predictions = (
    model
    .predictAll(test.map(lambda x: (x.userId, x.artistId)))
    .map(lambda x: ((x[0], x[1]), x[2]))
)

                                                                                

In [26]:
predictions.take(5)

                                                                                

[((1858, 17312), 0.38881211524726256),
 ((3949, 17312), 0.8030994761437918),
 ((4147, 17312), -0.1829481619020704),
 ((2464, 17312), 0.7113483728016025),
 ((77, 3456), 0.30611075609806926)]

In [27]:
predictions_and_ratings_per_user = (
    predictions
    .join(test.map(lambda x: ((x.userId, x.artistId), np.log2(x.plays + 1))))
    .map(lambda x: (x[0][0], x[1]))
    .groupByKey()
    .map(lambda x: (x[0], list(x[1])))
)

In [28]:
predictions_and_ratings_per_user.take(1)

                                                                                

[(864,
  [(0.7832904146298161, 1.0),
   (0.8023146751650059, 1.0),
   (0.10020332468113025, 1.0),
   (0.7946341906902686, 1.0),
   (0.7894454254207347, 1.0),
   (0.4687807864869595, 0.0),
   (0.06172397932051743, 0.0),
   (0.9913096478838179, 1.0),
   (0.641669151598321, 1.0),
   (0.6555492335487434, 1.0),
   (1.0480275904604701, 0.0),
   (0.6459493999697666, 1.0),
   (0.3741283354431978, 0.0),
   (0.6796485104395817, 1.0),
   (0.11280977513852944, 2.0),
   (0.4805730480743158, 1.584962500721156),
   (0.502553240607686, 1.0),
   (1.0070265407051093, 2.584962500721156),
   (0.047524256567414366, 1.0),
   (0.4254244675221187, 1.584962500721156),
   (0.8290185031117128, 2.584962500721156),
   (0.9726173416613716, 2.321928094887362),
   (1.0403367124985738, 1.584962500721156),
   (0.1763616266886649, 1.0)])]

In [29]:
def ndcg_for_user(x):
    y_pred = np.array([e[0] for e in x])
    y_true = np.array([e[1] for e in x])
    return ndcg_score(y_true, y_pred)
    
(
    predictions_and_ratings_per_user
    .map(lambda x: ndcg_for_user(x[1]))
    .mean()
)

                                                                                

0.716527890500379

In [30]:
print("Increased by {:0.3}%!".format(100 * (0.716 / 0.661 - 1)))

Increased by 8.32%!
