In [1]:
import json

from pyspark.sql import SparkSession, Row

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [2]:
spark = SparkSession.builder.appName("dusiak").getOrCreate()
data = spark.read.json("/user/mob2021032/joined_dataset")
data.printSchema()
data.count()

root
 |-- experiments: struct (nullable = true)
 |    |-- AA: string (nullable = true)
 |    |-- PERSONALIZED: string (nullable = true)
 |    |-- RECOMMENDERS: string (nullable = true)
 |-- latency: double (nullable = true)
 |-- message: string (nullable = true)
 |-- recommendation: long (nullable = true)
 |-- time: double (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- track: long (nullable = true)
 |-- user: long (nullable = true)



9243132

In [3]:
(train, test) = data.randomSplit([0.85, 0.15])

In [4]:
als = ALS(
         userCol="user", 
         itemCol="track",
         ratingCol="time", 
         nonnegative = True, 
         implicitPrefs = False,
         coldStartStrategy="drop"
)

param_grid = ParamGridBuilder().addGrid(als.rank, [50, 100]).addGrid(als.regParam, [0.05, 0.1]).build()
evaluator = RegressionEvaluator(metricName="rmse", labelCol="time", predictionCol="prediction")
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=4)

In [5]:
# train
model = cv.fit(train)
best_model = model.bestModel

In [6]:
print("Best model params")
print("Rank:", best_model._java_obj.parent().getRank())
print("MaxIter:", best_model._java_obj.parent().getMaxIter())
print("RegParam:", best_model._java_obj.parent().getRegParam())

Best model params
Rank: 100
MaxIter: 10
RegParam: 0.05


In [7]:
# calc RMSE on test
test_preds = best_model.transform(test)
RMSE = evaluator.evaluate(test_preds)
print(RMSE)  # 0.2961529860005571

0.2961529860005571


In [8]:
# save best model
best_model.save("best_models/model_1.3")
# best_model = ALSModel.load("best_models/model_1.3")

In [9]:
# generate recommendations
res = best_model.recommendForAllUsers(30).select("user", "recommendations.track").collect()

In [10]:
# prepare for json
users = {}
for r in res:
    users.setdefault(r.user, []).extend(r.track)

In [11]:
# save to json
with open('/home/mobod2021/mob2021032/users_recs.json', 'w') as f:
    json.dump(users, fp=f, indent=4)