In [1]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand

## Create Spark Session

In [2]:
spark = SparkSession.builder.master("local").appName("als").getOrCreate()

### Read the data with Mapped Integer IDs

Spark ALS algorithm requires IDs (both user and categories) to be integers. We use the data we already converted to use integer IDs here

In [3]:
lines = spark.read.text("MappedData/data.csv").rdd

In [4]:
parts = lines.map(lambda row: row.value.split(";"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), category=int(p[1]),
                                     rating=float(p[2])))

In [5]:
ratings = spark.createDataFrame(ratingsRDD)

We split training and test data to evaluate our algorithm

In [6]:
(training, test) = ratings.orderBy(rand()).randomSplit([0.85, 0.15])

Build the recommendation model using ALS on the training data

In [7]:

als = ALS(maxIter=20, regParam=0.01, rank=20, userCol="userId", itemCol="category", ratingCol="rating")
model = als.fit(training)

In [8]:
predictions = model.transform(test)

In [9]:
predictions = predictions.filter("not isNaN(prediction)")

In [10]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

In [83]:
als.save(path="alsmodel")

### RMSE Result

In [12]:
evaluator.evaluate(predictions)

0.7449980124036636

### Predicting for a User

Pick a User Id. This is mapped to integers, and to back reference it, refer to UserMap/data.csv

In [73]:
userID = 567529

In [74]:
forUser = set(ratingsRDD.filter(lambda a: a['userId'] == userID).map(lambda a: a['category']).collect())

In [75]:
allCats = set(list(range(0,20)))

We pick the categories this user hasn't rated yet.

In [76]:
targetCats = allCats.difference(forUser)

In [78]:
targets = spark.sparkContext.parallelize(targetCats).map(lambda c :Row(userId=userID, category=c))

In [80]:
targetFrame = spark.createDataFrame(targets)

In [81]:
userPredictions = model.transform(targetFrame)

These are the predictions for categories user hasn't checked out yet.

In [82]:
userPredictions.collect()

[Row(category=1, userId=567529, prediction=2.8941004276275635),
 Row(category=9, userId=567529, prediction=2.782672643661499),
 Row(category=2, userId=567529, prediction=2.963834285736084),
 Row(category=18, userId=567529, prediction=3.476372718811035)]

We can take a look at the ratings here, and pick a category user might like. 

Then pick a restaurant for that category, filter that list based on other preferences user might have which we haven't taken into account yet, and present that final list as recommendations for the user.