# Collaborative filtering usinf Spark(Pyspark)


## Step 1: 
* Import the pyspark libary and create spark session
* Import other required libraries

In [1]:
import findspark
findspark.init()
import pyspark

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
import requests, json, os, sys, time, re
from sklearn.metrics.pairwise import linear_kernel,cosine_similarity
import pandas as pd

In [3]:
######## Spark session is a unified entry point of a spark application  #############
spark = SparkSession \
    .builder \
    .appName('spark-ALS') \
    .config("configuration_key", "configuration_value") \
    .enableHiveSupport() \
    .getOrCreate()

sc = spark.sparkContext

In [4]:
import os
import math
import datetime
import pyspark.sql.functions as sf
from pyspark.sql.functions import desc
from pyspark.sql.window import Window
from pyspark import SparkConf, SparkContext
import itertools
from math import sqrt
from operator import add
from os.path import join, isfile, dirname
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from pyspark.sql.types import TimestampType
import hashlib

## Step 2 : 
* Load the already preprocessed data into sparkRDD
* Reformat the data as per need of Spark MLib
* analyzing the loaded data

In [5]:
ratings_raw_data = sc.textFile("../../pre_processed_data/experiment_1_dataset.csv")
ratings_raw_data_header = ratings_raw_data.take(1)[0]
ratings_data = ratings_raw_data.filter(lambda line: line != ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(hashlib.sha1(tokens[0].encode('utf-8')).hexdigest(), 16) % (10 ** 8),tokens[1],int(float(tokens[2][:10])))).cache()

#ratings_data = ratings_data.map(lambda x: (x[0] , x[1], int(int(x[2])/10) +1.0 ))
#ratings_data = ratings_data.filter(lambda x: int(x[2]) >=2 )
rddTraining, rddValidating, rddTesting = ratings_data.randomSplit([6,2,2], seed=1001)

In [6]:
ratings_data.take(10)

[(64592103, '55150', 1),
 (46027953, '12900', 1),
 (11418539, '4000', 100),
 (22844952, '363970', 10),
 (30902696, '317360', 4),
 (29257067, '65800', 1),
 (68176534, '218230', 6),
 (35300949, '240', 5),
 (5765927, '65700', 1),
 (39525566, '221100', 7)]

In [7]:
#rddTraining.take(10)
#new_ratings_data = ratings_data.filter(lambda x: int(x[2]) >=2)
schema = ["user","item","rating"]
df = spark.createDataFrame(data=ratings_data, schema = schema)

df.printSchema()
df.show(truncate=False)

root
 |-- user: long (nullable = true)
 |-- item: string (nullable = true)
 |-- rating: long (nullable = true)

+--------+------+------+
|user    |item  |rating|
+--------+------+------+
|64592103|55150 |1     |
|46027953|12900 |1     |
|11418539|4000  |100   |
|22844952|363970|10    |
|30902696|317360|4     |
|29257067|65800 |1     |
|68176534|218230|6     |
|35300949|240   |5     |
|5765927 |65700 |1     |
|39525566|221100|7     |
|59586993|291410|2     |
|20441736|304930|1     |
|8742321 |4000  |8     |
|62551528|113400|4     |
|67019253|238460|1     |
|49237568|242860|3     |
|58504375|235800|2     |
|2591283 |10150 |9     |
|90966574|280790|2     |
|96131366|221040|2     |
+--------+------+------+
only showing top 20 rows



In [8]:
df.groupBy("rating").count().show(100)
df.registerTempTable("df");
spark.sql("select count(*) , rating from df group by rating order by rating").show(100)

+------+-------+
|rating|  count|
+------+-------+
|    26|   7493|
|    29|   6215|
|    65|   1458|
|    19|  13032|
|    54|   2056|
|    22|   9863|
|     7|  61709|
|    77|   1059|
|    34|   4713|
|    50|   2288|
|    94|    726|
|    57|   1819|
|    43|   3200|
|    32|   5239|
|    84|    874|
|    31|   5659|
|    39|   3632|
|    98|    636|
|    25|   7995|
|    95|    714|
|    71|   1270|
|     6|  77079|
|    68|   1278|
|    72|   1220|
|    87|    832|
|    58|   1783|
|     9|  41852|
|    27|   7129|
|    63|   1538|
|    56|   1951|
|    51|   2260|
|    52|   2131|
|    17|  15388|
|    79|    989|
|    41|   3331|
|    33|   4906|
|    28|   6711|
|    88|    829|
|     5| 101001|
|     1|1687641|
|    96|    719|
|    10|  36035|
|    89|    806|
|    85|    880|
|    67|   1360|
|    48|   2546|
|   100|  65991|
|    44|   2897|
|    61|   1567|
|     3| 209694|
|    37|   4044|
|    83|    928|
|    12|  27145|
|    55|   1960|
|    74|   1132|
|     8|  5062

## Step 3: Finding the best parameters 

In [10]:
# reference: https://github.com/Akxay/recommendation_engine/blob/master/Jobs_RE_spark.ipynb
#Using function to compare the best possible parameters
def howFarAreWe(model, against, sizeAgainst):
    againstNoRatings = against.map(lambda x: (int(x[0]), int(x[1])) )
    againstWiRatings = against.map(lambda x: ((int(x[0]),int(x[1])), int(x[2])) )
    predictions = model.predictAll(againstNoRatings).map(lambda p: ( (p[0],p[1]), p[2]) )
    predictionsAndRatings = predictions.join(againstWiRatings).values()    
    return sqrt(predictionsAndRatings.map(lambda s: (s[0] - s[1]) ** 2).reduce(add) / float(sizeAgainst))

In [13]:
#finding best set of parameters
ranks  = [5,10]
iters  = [5,10]
alpha = [0.01, 0.02]

finalModel = None
finalRank  = 0
finalIter  = -1
finalDist   = float(300)
finalAlpha = float(0)

#[START train_model]
for cRank, cIter, cAlpha in itertools.product(ranks, iters, alpha):
    model = ALS.trainImplicit(rddTraining, cRank, cIter, alpha=float(cAlpha))
    dist = howFarAreWe(model, rddValidating, rddValidating.count())
    if dist < finalDist:
        print(cIter, cRank,cAlpha)
        print("Best so far:%f" % dist)
        finalModel = model
        finalRank  = cRank
        finalIter  = cIter
        finalDist  = dist
        finalAlpha  = cAlpha 
#[END train_model]

print("Rank %i" % finalRank)  
print("Iter %i" % finalIter)  
print("Dist %f" % finalDist) 
print("Alpha %f" % finalAlpha)

5 5 0.01
Best so far:18.991656
5 5 0.02
Best so far:18.985128
10 5 0.02
Best so far:18.982636
Rank 5
Iter 10
Dist 18.982636
Alpha 0.020000


## Step 4: Training the best parameters and validating the results 

In [None]:
rank = 10   
# The size of the feature vector used; the minimum value is 10, the minimum value of the feature vector, 
#the better the model produced, but it also costs more calculation cost
numIterations = 10
#Iteration numbers
alpha=0.01
#model = ALS.trainImplicit(rddTraining, 10, 10,alpha=0.01) #18.99407447555247
#model = ALS.trainImplicit(rddTraining, 10, 10,alpha=0.5) #18.88809845032351
#model = ALS.train(rddTraining, 10, 10) #29.13044908080498
model = ALS.trainImplicit(rddTraining, rank, numIterations, alpha=0.01) #18.96886550953952
testset = sc.parallelize([(80937808, 386360), (80937808, 380600)])   
model.predictAll(testset).collect()

In [9]:
# Calculate all predictions
rddTesting_map = rddTesting.map(lambda r: ((r[0], r[1]))) 
predictions = model.predictAll(rddTesting_map).map(lambda r: ((r[0], r[1]), (r[2]))) 
predictions.take(5)    ####### Output 5 results
#model.predictAll(rddTesting_map).collect()     Show all the Recommendation Results

[((80937808, 386360), 0.16070414091136048),
 ((80937808, 380600), 0.048304704863768),
 ((80937808, 315640), 0.0799210385878367),
 ((80937808, 226320), 0.11713325371411347),
 ((80937808, 339610), 0.09109506708076365)]

In [10]:
rates_and_preds = rddTesting.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions) 
#rates_and_preds.filter(lambda x : x[1][0] >= 10).take(100)

## Step 5: Calculate the RMSE

In [11]:
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()) 
print ('For testing data the RMSE is %s' % (error))

For testing data the RMSE is 18.99229791297807


## Step 6: Conculsion

RMSE score of around 19 is good for the datasets considering the sparse data set. The rating are on par with SVD model but this program executes 3 times faster than surprise SVD due to high parallel processing done by Apache Spark.  

In [None]:
#Referenece: https://towardsdatascience.com/steam-recommendation-systems-4358917288eb
#https://github.com/Akxay/recommendation_engine/blob/master/Jobs_RE_spark.ipynb