### Create a dataframe from the rating table in the database

In [2]:
ratings = spark.sql("select * from ratings")

In [3]:
display(ratings)

#### Randomly split the data into training and test datasets

In [5]:
(training, test) = ratings.randomSplit([0.8, 0.2])

### Train recommendor using spark.ml.recommendation.ALS

In [7]:
from pyspark.ml.recommendation import ALS

In [8]:
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="uid", itemCol="song_id", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

In [10]:
from pyspark.ml.evaluation import RegressionEvaluator
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

#### Set implicitPrefs to True, since the rating matrix is derived from another source of information (play_frequency)

In [12]:
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01,implicitPrefs=True, userCol="uid", itemCol="song_id", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

In [13]:
display(training)

In [14]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

##### Setting implicitPrefs to true didn't get better results in this case

### Tune "rank" parameter to get better result

In [17]:
als.rank

In [18]:
training2 = training.withColumnRenamed("rating", "label")

In [19]:
# it seems to be a bug of Spark that target values have to be in a column named "label" to use crossValidator
training2.show()

In [20]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

als = ALS(maxIter=5, regParam=0.01, userCol="uid", itemCol="song_id", ratingCol="label",
          coldStartStrategy="drop")

# We use a ParamGridBuilder to construct a grid of parameters to search over.
# TrainValidationSplit will try all combinations of values and determine best model using
# the evaluator.
paramGrid = ParamGridBuilder()\
    .addGrid(als.rank, [8, 10, 12]) \
    .build()
    
crossval = CrossValidator(estimator=als,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(),
                          numFolds= 3)

# Run cross-validation, and choose the best set of parameters.
# it seems to be a bug of Spark that target values have to be in a column named "label"
cvModel = crossval.fit(training2)


In [21]:
cvModel.bestModel.rank

In [22]:
# Evaluate the model by computing the RMSE on the test data
predictions = cvModel.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

### Generate top 10 movie recommendations for a user

In [24]:
from pyspark.ml.recommendation import ALS
ratings = spark.sql("select * from ratings")
als = ALS(maxIter=5, regParam=0.01, rank = 8, userCol="uid", itemCol="song_id", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(ratings)

In [25]:
from pyspark.sql.functions import lit
user = 168529700
# Create a Spark DataFrame with the specified user and all the movies listed in the ratings DataFrame
top10000Song = spark.sql("select * from top10000song")
dataSet = ratings.select("song_id").distinct().withColumn("uid", lit(user))

In [26]:
display(dataSet)

In [27]:
# Create a Spark DataFrame with the movies that have already been rated by this user
songsAlreadyRated = ratings.filter(ratings.uid == user).select( "uid","song_id")

songToPredict = dataSet.subtract(songsAlreadyRated)

# Apply the recommender system to the data set without the already rated movies to predict ratings
predictions = model.transform(songToPredict).dropna().orderBy("prediction", ascending=False).limit(10).select("song_id", "prediction" ) 

In [28]:
display(predictions)

In [29]:
display(top10000Song)

In [30]:
# Create a function to recommend songs to users
def als_recommender(user, ratings, model, numSongs):
  from pyspark.sql.functions import lit
  # Create a Spark DataFrame with the specified user and all the movies listed in the ratings DataFrame
  top10000Song = spark.sql("select * from top10000song")
  dataSet = ratings.select("song_id").distinct().withColumn("uid", lit(user))
  # Create a Spark DataFrame with the movies that have already been rated by this user
  songsAlreadyRated = ratings.filter(ratings.uid == user).select( "uid","song_id")

  songToPredict = dataSet.subtract(songsAlreadyRated)

  # Apply the recommender system to the data set without the already rated movies to predict ratings
  predictions = model.transform(songToPredict).dropna().orderBy("prediction", ascending=False).limit(numSongs).select("song_id", "prediction" ).withColumn("uid", lit(user))
  
  return predictions

In [31]:
display(als_recommender(user, ratings, model, 10))

#### Iterate through uid to buid up the recommendation dataframe

In [33]:
uid = ratings.select("uid").distinct().toPandas()

In [34]:
uid.head()

In [35]:
uid = uid.uid.tolist()

In [36]:
uid

In [37]:
#prediction = als_recommender(int(uid[100]), ratings, model, 10)

for user in uid[200:350]:
    new_prediction = als_recommender(int(user), ratings, model, 10)
    prediction = prediction.unionAll(new_prediction).distinct()

display(prediction)

In [38]:
userRecs = model.recommendForAllUsers(1)

In [39]:
userRecs.cache()

In [40]:
display(userRecs)

In [41]:
display(userRecs.where(userRecs.uid == 60183).select("recommendations.song_id", "recommendations.rating"))

In [42]:
df = userRecs.toPandas()