In [1]:
import pandas as pd
import numpy as np
import scipy.stats as ss
import json
from collections import namedtuple, defaultdict
from pyspark.sql.functions import *
from pyspark.sql import SparkSession, DataFrame, Row, Column, Window
from pyspark.sql.types import *
import matplotlib.pyplot as plt
from pyspark import SparkFiles
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

%matplotlib inline

In [2]:
spark = SparkSession.builder.appName("vika").getOrCreate()

In [3]:
data = spark.read.json("/user/mob2021083/data")

In [4]:
data.printSchema()

root
 |-- experiments: struct (nullable = true)
 |    |-- AA: string (nullable = true)
 |    |-- PERSONALIZED: string (nullable = true)
 |    |-- RECOMMENDERS: string (nullable = true)
 |-- latency: double (nullable = true)
 |-- message: string (nullable = true)
 |-- recommendation: long (nullable = true)
 |-- time: double (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- track: long (nullable = true)
 |-- user: long (nullable = true)



In [5]:
data.count()

9243132

In [6]:
data.show()

+-----------+--------------------+-------+--------------+----+-------------+-----+----+
|experiments|             latency|message|recommendation|time|    timestamp|track|user|
+-----------+--------------------+-------+--------------+----+-------------+-----+----+
|   [C,, T3]|2.570152282714844E-4|   last|          null| 0.0|1636224100701|36176|6109|
|   [C,, T3]|6.573200225830078E-4|   next|          1004| 1.0|1636224100741| 2225|6239|
|   [C,, T3]|5.886554718017578E-4|   next|          7417| 1.0|1636224100753| 1004|6239|
|   [C,, T3]|7.193088531494141E-4|   next|          2332| 0.0|1636224100765| 7417|6239|
|   [C,, T3]|5.955696105957031E-4|   next|         36514|0.25|1636224100777| 2332|6239|
|   [C,, T3]|5.311965942382812E-4|   next|            19| 0.0|1636224100788|36514|6239|
|   [C,, T3]|8.442401885986328E-4|   next|         23754|0.01|1636224100800|   19|6239|
|   [C,, T3]|2.171993255615234...|   last|          null| 0.0|1636224100811|23754|6239|
|   [C,, T2]|0.001013755798339..

In [7]:
data = data.orderBy('timestamp')

In [8]:
data.groupby('message', 'recommendation').agg(count('user').alias('count')).orderBy(desc('count')).show(10)

+-------+--------------+-------+
|message|recommendation|  count|
+-------+--------------+-------+
|   last|          null|1300000|
|   next|         42118|    345|
|   next|         40373|    303|
|   next|         16504|    293|
|   next|         25060|    290|
|   next|         44782|    288|
|   next|         14445|    286|
|   next|         17556|    282|
|   next|         34342|    281|
|   next|         12763|    280|
+-------+--------------+-------+
only showing top 10 rows



In [9]:
train, test = data.randomSplit([0.9, 0.1], seed=42)

In [10]:
als = ALS(maxIter=20, regParam=0.01, userCol="user", itemCol="track", ratingCol="time", nonnegative = True, implicitPrefs = False, coldStartStrategy="drop")

In [11]:
model = als.fit(train)

predictions = model.transform(test)
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="time",
    predictionCol="prediction",
)
rmse = evaluator.evaluate(predictions)

print("RMSE: {}".format(rmse))

RMSE: 0.29623392687202726


Попробуем дедублицировать данные и отсеить из выборки те рекомендации, которые не понравились пользователям.

In [12]:
data_deduplicated = data.filter(col("time") > 0.5).select("*").dropDuplicates()

In [13]:
train_deduplicated, test_deduplicated = data_deduplicated.randomSplit([0.9, 0.1], seed=42)

In [14]:
model_after_deduplicate = als.fit(train_deduplicated)

predictions_deduplicate = model_after_deduplicate.transform(test_deduplicated)
evaluator_deduplicate = RegressionEvaluator(
    metricName="rmse",
    labelCol="time",
    predictionCol="prediction",
)
rmse_deduplicate = evaluator_deduplicate.evaluate(predictions_deduplicate)

print("RMSE: {}".format(rmse_deduplicate))

RMSE: 0.08965529120528133


Сохраним обе модели 

In [16]:
model_after_deduplicate.save('model_after_deduplicate_v2')
model.save('model_before_deduplicate_v2')

Посмотрим, сколько в среднем треков слушали пользователи

In [17]:
data.groupby('user').agg(count('track').alias('count')).agg(avg('count')).show()

+----------+
|avg(count)|
+----------+
|  924.3132|
+----------+



In [18]:
data.groupby('track').agg(count('user').alias('count')).agg(avg('count')).show()

+----------+
|avg(count)|
+----------+
| 184.86264|
+----------+



In [20]:
# Generate top 100 track recommendations for each user
userRecs = model.recommendForAllUsers(100)
# Generate top 100 user recommendations for each track
trackRecs = model.recommendForAllItems(100)

In [21]:
data_deduplicated.groupby('user').agg(count('track').alias('count')).agg(avg('count')).show()

+----------+
|avg(count)|
+----------+
|  257.0071|
+----------+



In [22]:
data_deduplicated.groupby('track').agg(count('user').alias('count')).agg(avg('count')).show()

+----------+
|avg(count)|
+----------+
|  51.40142|
+----------+



In [27]:
# Generate top 25 track recommendations for each user
userRecs_after_deduplicate = model_after_deduplicate.recommendForAllUsers(25)
# Generate top 25 user recommendations for each track
trackRecs_after_deduplicate = model_after_deduplicate.recommendForAllItems(25)

In [24]:
userRecs.printSchema()

root
 |-- user: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- track: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [25]:
users_all = {}
for rec in userRecs.collect():
    if (rec.user not in users_all.keys()):
        users_all[rec.user] = []
    for recommendation in rec.recommendations:
        users_all[rec.user].append(recommendation.track)

In [28]:
users_deduplicate = {}
for rec in userRecs_after_deduplicate.collect():
    if (rec.user not in users_deduplicate.keys()):
        users_deduplicate[rec.user] = []
    for recommendation in rec.recommendations:
        users_deduplicate[rec.user].append(recommendation.track)

In [29]:
with open('/home/mobod2021/mob2021083/users_recommendations.json', 'w') as f:
    json.dump(users_all, fp = f)

In [30]:
with open('/home/mobod2021/mob2021083/users_deduplicate_recommendations.json', 'w') as f:
    json.dump(users_deduplicate, fp = f)