# simple articles recommendation with collaborative filtering in Spark ML

In [1]:
import pyspark
from pyspark.sql import SparkSession, Row, SQLContext
from pyspark.sql.types import *
from pyspark.sql import functions as F 
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from IPython.display import display, HTML
import pandas as pd

### get the data

In [2]:
! curl -o reco-collaborative-data.csv https://gist.githubusercontent.com/rawar/9870c6b95dfc0791148120f672e4703e/raw/5588316adabcc77163354dd7e3cb27ab419daf8f/reco-collaborative-data.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 13.8M  100 13.8M    0     0  1241k      0  0:00:11  0:00:11 --:--:-- 1103k


### have a short look into the data 

In [3]:
! head -n 10 reco-collaborative-data.csv

visitor_id,article_id,sum_pageviews
647828,1026,3245
34969,8371,160
739991,5892,116
739634,8446,107
320270,5240,84
294819,8381,82
506821,863,77
739634,11,75
34969,8381,72


### define the schema

In [4]:
schema = StructType([
    StructField('visitor_id', IntegerType()),      
    StructField('article_id', IntegerType()),
    StructField('sum_pageviews', IntegerType())
])

### read the data into Spark dataframe

In [5]:
sql_context = SQLContext(sc)
raw_data_df = sql_context.read.csv(path="reco-collaborative-data.csv", header=True, schema=schema)

### count the number of data

In [6]:
raw_data_df.count() 

1049188

### define some helper functions

In [7]:
reco_ = F.UserDefinedFunction(
  lambda x, y: list(zip(x, y)),
  ArrayType(StructType([
      StructField("first", IntegerType()),
      StructField("second", DoubleType())
  ]))
)

In [8]:
def displayDF(df, numberOfRows):
    tmp_df = df.limit(numberOfRows)
    display(tmp_df.toPandas())

In [9]:
displayDF(raw_data_df, 5)

Unnamed: 0,visitor_id,article_id,sum_pageviews
0,647828,1026,3245
1,34969,8371,160
2,739991,5892,116
3,739634,8446,107
4,320270,5240,84


### Recommendation with alternating least squares

In [10]:
als = ALS(
    maxIter=7, 
    regParam=0.2, 
    userCol="visitor_id", 
    itemCol="article_id", 
    ratingCol="sum_pageviews",
    coldStartStrategy="drop", 
    implicitPrefs=True,
    rank=10
)

In [11]:
print(als.explainParams())

alpha: alpha for implicit preference (default: 1.0)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)
coldStartStrategy: strategy for dealing with unknown or new users/items at prediction time. This may be useful in cross-validation or production scenarios, for handling user/item ids the model has not seen in the training data. Supported values: 'nan', 'drop'. (default: nan, current: drop)
finalStorageLevel: StorageLevel for ALS model factors. (default: MEMORY_AND_DISK)
implicitPrefs: whether to use implicit preference (default: False, current: True)
intermediateStorageLevel: StorageLevel for intermediate datasets. Cannot be 'NONE'. (default: MEMORY_AND_DISK)
itemCol: column name for item ids. Ids must be within the integer value range. (default: item, current: article_id)
maxIte

#### Creates an RMSE evaluator using the label and predicted columns
The RMSE (root-mean-square-error) estimate the quality of the created model. In general, a lower RMSE is better than a higher one.

In [12]:
reg_eval = RegressionEvaluator(
    predictionCol="prediction", 
    labelCol="sum_pageviews", 
    metricName="rmse"
)

#### Split the origin data set into and validation

In [13]:
(training_df, validation_df) = raw_data_df.randomSplit(
    [0.8, 0.2], 
    seed=123456789
)

#### Fit the model to the given input dataframe for training

In [14]:
als_model = als.fit(training_df)

In [15]:
predict_df = als_model.transform(validation_df)

#### Filter some unnecessary recommendation

In [16]:
predicted_articles_df = predict_df.filter(predict_df.prediction != float('nan'))
predicted_articles_df = predicted_articles_df\
    .withColumn("prediction", F.abs(F.round(predicted_articles_df["prediction"],0)))

In [17]:
error = reg_eval.evaluate(predicted_articles_df)
print("For rank 10, regularization parameter 0.2 the RMSE is %s" % error)

For rank 10, regularization parameter 0.2 the RMSE is 2.285393331916213


### top 10 articles for each user 

In [18]:
user_recommendations_df = als_model.recommendForAllUsers(10)
displayDF(user_recommendations_df, 10)

Unnamed: 0,visitor_id,recommendations
0,148,"[(6244, 0.4224605858325958), (8191, 0.33263662..."
1,463,"[(5584, 0.2013273537158966), (6244, 0.13268625..."
2,471,"[(8191, 0.637404203414917), (6244, 0.363770246..."
3,496,"[(8651, 0.00015352391346823424), (8446, 0.0001..."
4,833,"[(7293, 0.8094753623008728), (5584, 0.21846905..."
5,1088,"[(8651, 0.0523558109998703), (8446, 0.04186395..."
6,1238,"[(8651, 0.04071379452943802), (8446, 0.0325606..."
7,1342,"[(8223, 0.0002753616136033088), (7061, 0.00014..."
8,1580,"[(8651, 0.002572025638073683), (8024, 0.002138..."
9,1591,"[(8651, 0.02762039378285408), (8446, 0.0212976..."


### top 10 user recommendations for each article

In [19]:
article_recommendation_df = als_model.recommendForAllItems(10)
displayDF(article_recommendation_df, 10)

Unnamed: 0,article_id,recommendations
0,1580,"[(511366, 1.2085095622754271e-21), (608223, 9...."
1,5300,"[(34969, 0.001255619339644909), (294819, 0.001..."
2,6620,"[(34969, 1.0714256148958157e-07), (294819, 7.9..."
3,7240,"[(34969, 6.078237129258923e-05), (294819, 4.52..."
4,7340,"[(647828, 8.14487793832086e-06), (202570, 3.74..."
5,7880,"[(34969, 0.004088749643415213), (294819, 0.003..."
6,471,"[(511366, 1.1506140554127739e-16), (739991, 9...."
7,1591,"[(34969, 0.00030105409678071737), (294819, 0.0..."
8,4101,"[(34969, 0.002676140982657671), (294819, 0.002..."
9,1342,"[(34969, 0.0001444842346245423), (294819, 0.00..."


### top 10 article recommendations for a specified set of users

In [20]:
users = user_recommendations_df.select(als.getUserCol()).distinct().limit(10)
user_subset_recommendations_df = als_model.recommendForUserSubset(users, 10)
displayDF(user_subset_recommendations_df, 10)

Unnamed: 0,visitor_id,recommendations
0,1580,"[(8651, 0.002572025638073683), (8024, 0.002138..."
1,4900,"[(8475, 0.7443039417266846), (8594, 0.22050458..."
2,6620,"[(8024, 0.00032381396158598363), (5887, 0.0001..."
3,7240,"[(8428, 0.1912996917963028), (8172, 0.18571792..."
4,7340,"[(8024, 0.00019980572687927634), (8651, 0.0001..."
5,7880,"[(8588, 0.4726514518260956), (8252, 0.41014638..."
6,9900,"[(8651, 0.002572025638073683), (8024, 0.002138..."
7,12940,"[(5887, 0.030276881530880928), (8390, 0.026214..."
8,14450,"[(8488, 0.8548504710197449), (5887, 0.17466221..."
9,15790,"[(8651, 0.02005619741976261), (8446, 0.0158329..."


### build up a more friedly table for visitor ids, articles ids and recommendations

In [21]:
recommendation_exploded = user_recommendations_df.select(
    user_recommendations_df["visitor_id"],
    F.explode(user_recommendations_df["recommendations"]).alias("reco")
)

In [22]:
recommendation_splitted = recommendation_exploded.select(
    "visitor_id", 
    recommendation_exploded["reco"].article_id.alias("reco_article_id"),
    recommendation_exploded["reco"].rating.alias("reco_article_rating"))

displayDF(recommendation_splitted,20)

Unnamed: 0,visitor_id,reco_article_id,reco_article_rating
0,148,6244,0.422461
1,148,8191,0.332637
2,148,6708,0.319411
3,148,8488,0.156295
4,148,7781,0.130057
5,148,5584,0.129258
6,148,5887,0.108293
7,148,8452,0.103417
8,148,7541,0.08257
9,148,8172,0.082143
