# Building a recommender


### Content-based filtering
Use the content or attributes of an item/user

### Collaborative filtering
Generate estimated preferences of users for items with **which they have not yet interacted**.

#### User-based approach
* Use the **known preferences of other users** that exhibit similar behavior.
* Select a set of similar users and compute some form of combined score based on the items they have shown a preference for.

#### Item-based approach
* Computes some measure of similarity between items based on the **existing user-item preferences**.
* Items that tend to be rated the same by similar users will be classed as similar under this approach.

### Matrix Factorization
* Explicit - Preference is explicit. e.g. ratings
* Implicit - Binary Matrix of watched/not watched, Count Matrix of how many times you have watched.

**ALTERNATING LEAST SQUARES** - ALS works by iteratively solving a series of least squares regression problems. 
* In each iteration,user- or item-factor matrices is treated as fixed, while the other one is updated using the fixed factor and the rating data.
* Then, the factor matrix that was solved for is, in turn, treated as fixed, while the other one is updated.
* This process continues until the model has converged (or for a fixed number of iterations).

In [1]:
import os
DATA_DIR = '../resources/data/ml-100k'
user_path = os.path.abspath(DATA_DIR + '/u.user')
item_path = os.path.abspath(DATA_DIR + '/u.item')
rating_path = os.path.abspath(DATA_DIR + '/u.data')

In [2]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, LongType, DoubleType, TimestampType
user_schema = StructType([
    StructField("userId", IntegerType(), True),
    StructField("age", IntegerType(), True),
    StructField("gender", StringType(), True),
    StructField("occupation", StringType(), True),
    StructField("zipCode", StringType(), True)])

item_schema = StructType([
    StructField("itemId", IntegerType(), True),
    StructField("title", StringType(), True),
    StructField("releaseDate", StringType(), True),
    StructField("videoReleaseDate", StringType(), True),
    StructField("imbdUrl", StringType(), True),
    StructField("unknown", IntegerType(), True),
    StructField("action", IntegerType(), True),
    StructField("adventure", IntegerType(), True),
    StructField("animation", IntegerType(), True),
    StructField("children", IntegerType(), True),
    StructField("comedy", IntegerType(), True),
    StructField("crime", IntegerType(), True),
    StructField("documentary", IntegerType(), True),
    StructField("drama", IntegerType(), True),
    StructField("fantasy", IntegerType(), True),
    StructField("noir", IntegerType(), True),
    StructField("horror", IntegerType(), True),
    StructField("musical", IntegerType(), True),
    StructField("mystery", IntegerType(), True),
    StructField("romance", IntegerType(), True),
    StructField("sciFi", IntegerType(), True),
    StructField("thriller", IntegerType(), True),
    StructField("war", IntegerType(), True),
    StructField("western", StringType(), True)])

rating_schema = StructType([
    StructField("userId", IntegerType(), True),
    StructField("itemId", IntegerType(), True),
    StructField("rating", DoubleType(), True),
    StructField("timestamp", LongType(), True)])

In [3]:
userDf = spark.read.option("delimiter","|").csv('file://' + user_path, schema = user_schema)
itemDf = spark.read.option("delimiter","|").csv('file://' + item_path, schema = item_schema)
ratingDf = spark.read.option("delimiter","\t").csv('file://' + rating_path, schema = rating_schema)

# User-based model (Alternating Least Square)

## RDD using pyspark.mllib

In [4]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
ratings = ratingDf.rdd.map(lambda l: Rating(l.userId, l.itemId, l.rating))

# rank: Number of factors (hidden features) in our ALS model. (10 - 200 is usually reasonable)
model_10 = ALS.train(ratings, rank=10, iterations=10, lambda_=0.01)
model_50 = ALS.train(ratings, rank=50, iterations=10, lambda_=0.01, seed = 322)
model_100 = ALS.train(ratings, rank=100, iterations=10, lambda_=0.01)

In [5]:
# We will have userFeature for each user and productFeature for each item - with the specified rank
print('User Features: ' , model_10.userFeatures().count())
print('Product Features: ', model_10.productFeatures().count())

User Features:  943
Product Features:  1682


## DF using pyspark.ml

In [6]:
from pyspark.ml.recommendation import ALS as ml_als
als = ml_als(rank=50, maxIter=10, userCol="userId", itemCol="itemId", ratingCol="rating", regParam=0.01, seed = 322)
model_50_ = als.fit(ratingDf)

In [7]:
top5_rdd = model_50.recommendProductsForUsers(5)
top5_rdd.filter(lambda x: x[0] == 3).take(1)

[(3,
  (Rating(user=3, product=697, rating=5.75608262976389),
   Rating(user=3, product=659, rating=5.433312345498309),
   Rating(user=3, product=315, rating=5.249307269844362),
   Rating(user=3, product=529, rating=5.241794827864484),
   Rating(user=3, product=638, rating=5.0284916010276)))]

In [8]:
top5_df = model_50_.recommendForAllUsers(5)
top5_df.filter(top5_df.userId == 3).show(1, False)

+------+---------------------------------------------------------------------------------------+
|userId|recommendations                                                                        |
+------+---------------------------------------------------------------------------------------+
|3     |[[430, 5.8786845], [59, 5.7773647], [663, 5.621893], [444, 5.597992], [320, 5.0101237]]|
+------+---------------------------------------------------------------------------------------+



## Evaluate the model on training data

* **RMSE/MSE** (Root Mean Squared Error)
    * direct measure of the reconstruction error of the user-item rating matrix, commonly used in explicit ratings
    * minimized by the conditional **mean**, suitable for asymmetric distribution and an unbiased fit.
* **MAE** (Mean Absolute Error)
    * minimized by the conditional **median**. The fit will be closer to the median and biased.
* **MAP** (Mean Average Precision at K)
    * When the order of recommendation is important


In [9]:
testdata = ratingDf.rdd.map(lambda l: (l.userId, l.itemId))
actual = ratingDf.rdd.map(lambda r: ((r[0], r[1]), r[2]))
predictions_10 = model_10.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
predictions_50 = model_50.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
predictions_100 = model_100.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))

MSE_10 = predictions_10.join(actual).map(lambda r: (r[1][0] - r[1][1])**2).mean()
MSE_50 = predictions_50.join(actual).map(lambda r: (r[1][0] - r[1][1])**2).mean()
MSE_100 = predictions_100.join(actual).map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Rank 10 Mean Squared Error = " + str(MSE_10))
print("Rank 50 Mean Squared Error = " + str(MSE_50))
print("Rank 100 Mean Squared Error = " + str(MSE_100))

Rank 10 Mean Squared Error = 0.482593374846323
Rank 50 Mean Squared Error = 0.083412172973507
Rank 100 Mean Squared Error = 0.014943923943970767


In [10]:
# Save to HDFS and reload the model
# model_50.save(sc, "target/movielens-recommender")
# model = MatrixFactorizationModel.load(sc, "target/movielens-recommender")
model = model_50
predictions = predictions_50

## Using python

In [11]:
import math
joined_50 = predictions.join(actual)
MSE_50 = joined_50.map(lambda r: (r[1][0] - r[1][1])**2).mean()
MAE_50 = joined_50.map(lambda r: math.fabs(r[1][0] - r[1][1])).mean()
print("MSE  = " + str(MSE_50))
print("RMSE = " + str(math.sqrt(MSE_50)))
print("MAE  = " + str(MAE_50))

MSE  = 0.083412172973507
RMSE = 0.28881165657484636
MAE  = 0.2034041795446187


## Using pyspark

In [12]:
from pyspark.mllib.evaluation import RegressionMetrics
predictedAndActual = joined_50.map(lambda l: l[1])
regressionMetrics = RegressionMetrics(predictedAndActual)

# Squared Error
print("MSE  = %s" % regressionMetrics.meanSquaredError)
print("RMSE = %s" % regressionMetrics.rootMeanSquaredError)

# Mean absolute error
print("MAE  = %s" % regressionMetrics.meanAbsoluteError)

# R-squared
print("R-squared = %s" % regressionMetrics.r2)

# Explained variance
print("Explained variance = %s" % regressionMetrics.explainedVariance)

MSE  = 0.08341217297350699
RMSE = 0.28881165657484636
MAE  = 0.20340417954461876
R-squared = 0.9341722794124651
Explained variance = 1.0831797853882736


In [13]:
# Calculating the predictions for all users and items
# +1 to user and item indices because they start from 1
import numpy as np
itemMatrix = np.array(model.productFeatures().map(lambda l: l[1]).collect())
userMatrix = np.array(model.userFeatures().map(lambda l: l[1]).collect())
all_predictions = []
for idx, userVector in enumerate(userMatrix):
    scores = itemMatrix.dot(userVector)
    scores_with_idx = [(i+1,a) for i, a in enumerate(scores)]
    scores_with_idx.sort(key=lambda tup: tup[1], reverse = True)
    sorted_items = (idx+1, [tup[0] for tup in scores_with_idx])
    all_predictions.append(sorted_items)
predictAllDf = sc.parallelize(all_predictions).toDF() \
                .withColumnRenamed('_1', 'userId').withColumnRenamed('_2', 'predictedAll')

## Other Measures

### Average Precision at K (apk/ ap@k)

In [14]:
K = 5
# Taken and modified from https://github.com/benhamner/Metrics/blob/master/Python/ml_metrics/average_precision.py
# Calculate the average precision of top k values
def apk(actual, predicted, k=K):
    
    if not actual:
        return 0.0
    
    if len(predicted) > k:
        predicted = predicted[:k]
        
    score = 0.0
    num_hits = 0.0

    for i,p in enumerate(predicted):
        if p in actual and p not in predicted[:i]:
            num_hits += 1.0
            score += num_hits / (i+1.0)
    
    #return num_hits / min(len(actual), k)  # if you return num_hits, you will get precision at K instead of apk
    return score / min(len(actual), k)

In [15]:
import pyspark.sql.functions as f
from pyspark.sql import Window

apk_udf = f.udf(apk, DoubleType())

# window by userId sorted by actual rating desc
w1 = Window.partitionBy('userId').orderBy(f.desc('rating'))
userMovieList = ratingDf.withColumn('items', f.collect_list('itemId').over(w1)) \
    .groupBy('userId').agg(f.max('items').alias('items'))

predictionDF = predictions.map(lambda l: (l[0][0], l[0][1], l[1])).toDF()
# window by userId sorted by predicted rating desc
w2 = Window.partitionBy('_1').orderBy(f.desc('_3'))
userPredictionList = predictionDF.withColumn('predicted', f.collect_list('_2').over(w2)) \
    .groupBy('_1').agg(f.max('predicted').alias('predicted')).withColumnRenamed('_1', 'userId')

In [16]:
apkDf = predictAllDf.join(userPredictionList, 'userId', 'left').join(userMovieList, 'userId', 'left') \
    .withColumn('items', f.when(f.col('items').isNull(), f.array().cast("array<integer>")).otherwise(f.col('items'))) \
    .withColumn('predicted', f.when(f.col('predicted').isNull(), f.array().cast("array<integer>")).otherwise(f.col('predicted'))) \
    .withColumn('apk', apk_udf(f.col('items'),f.col('predicted'))) \
    .withColumn('apkAll', apk_udf(f.col('items'),f.col('predictedAll')))   
#apkDf.show()

In [17]:
mapk = apkDf.agg(f.avg('apkAll').alias('mapk')).collect()[0].mapk
print('MAP@'+ str(K) +' is', mapk)

MAP@5 is 0.04058677978084126


In [18]:
from pyspark.mllib.evaluation import RankingMetrics
rankingMetrics = RankingMetrics(apkDf.select('predictedAll', 'items').rdd.map(lambda l: (l.predictedAll, l.items)))
print('MAP = ', rankingMetrics.meanAveragePrecision)
print('Precision@' + str(K) + ' =', rankingMetrics.precisionAt(K))

MAP =  0.07205784911469854
Precision@5 = 0.07126193001060449


## Predictions

In [19]:
print('Rating prediction for user 789, item 123: ', model.predict(789, 123))

Rating prediction for user 789, item 123:  3.1765442733287967


In [20]:
print('Top 10 recommendations for user 789')
model.recommendProducts(789, 10)

Top 10 recommendations for user 789


[Rating(user=789, product=504, rating=6.5757757828850725),
 Rating(user=789, product=715, rating=5.972789751703749),
 Rating(user=789, product=56, rating=5.821156452044317),
 Rating(user=789, product=47, rating=5.775194796607585),
 Rating(user=789, product=507, rating=5.539731363663269),
 Rating(user=789, product=101, rating=5.51104380813376),
 Rating(user=789, product=427, rating=5.369601850540164),
 Rating(user=789, product=772, rating=5.354445467654191),
 Rating(user=789, product=52, rating=5.345410382948737),
 Rating(user=789, product=357, rating=5.345010861865714)]

In [21]:
print('Top 10 recommendations for item 123')
model.recommendUsers(123, 10)

Top 10 recommendations for item 123


[Rating(user=190, product=123, rating=6.105111706727907),
 Rating(user=246, product=123, rating=6.078838860749971),
 Rating(user=811, product=123, rating=5.698228719542813),
 Rating(user=548, product=123, rating=5.49832344438362),
 Rating(user=125, product=123, rating=5.284790748803018),
 Rating(user=597, product=123, rating=5.259023729879939),
 Rating(user=871, product=123, rating=5.0432371507945675),
 Rating(user=676, product=123, rating=5.023456831388421),
 Rating(user=103, product=123, rating=4.960695229655085),
 Rating(user=126, product=123, rating=4.953671109193628)]

# Item-based (Cosine similarity)

In [22]:
# Collect itemFactors to a dictionary
itemFactors = model.productFeatures().collectAsMap()

In [23]:
# Compute cosine similarity of 2 numpy arrays
import numpy as np
def cosineSimilarity(a, b):
    norma = np.linalg.norm(a)
    normb = np.linalg.norm(b)
    return np.dot(a, b) / (norma * normb)

In [24]:
factor123 = itemFactors.get(123)
npFactor123 = np.array(factor123)
cosineSimilarity(npFactor123, npFactor123)

1.0

In [25]:
itemSimilarities = dict(map(lambda kv: (kv[0], cosineSimilarity(
    np.array(kv[1]), npFactor123
)), itemFactors.items()))

In [26]:
# 10 most similar items to item 123 (asc vs desc)
sorted(itemSimilarities.items(), key = lambda kv:(kv[1], kv[0]), reverse=True)[:10]
#sorted(itemSimilarities.items(), key = lambda kv:(kv[1], kv[0]))[-10:]

[(123, 1.0),
 (471, 0.8020833091931823),
 (436, 0.7948980428777909),
 (172, 0.7922805693228504),
 (568, 0.780453912924985),
 (50, 0.778950514377293),
 (1, 0.7778277588197464),
 (210, 0.7762099990810513),
 (433, 0.7758186194809127),
 (403, 0.7737553411333368)]