In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import pyspark
from pyspark import SparkConf, SparkContext

In [2]:
sc = pyspark.SparkContext

In [3]:
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf = conf)

In [4]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [5]:
path = "Workplace/Data Mining/Digital_Music_5.json"

In [6]:
musint = sqlContext.read.json(path)

In [7]:
musint.count()

64706

In [8]:
len(musint.columns)

9

In [9]:
musint.columns

['asin',
 'helpful',
 'overall',
 'reviewText',
 'reviewTime',
 'reviewerID',
 'reviewerName',
 'summary',
 'unixReviewTime']

In [None]:
# number of products

In [10]:
musint.select('asin').distinct().count()

3568

In [None]:
# number of reviewers or users

In [11]:
musint.select('reviewerID').distinct().count()

5541

In [12]:
musint=musint.select(["asin","overall","reviewerID"])

In [13]:
musint.show()

+----------+-------+--------------+
|      asin|overall|    reviewerID|
+----------+-------+--------------+
|5555991584|    5.0|A3EBHHCZO6V2A4|
|5555991584|    5.0| AZPWAXJG9OJXV|
|5555991584|    5.0|A38IRL0X2T4DPF|
|5555991584|    5.0|A22IK3I6U76GX0|
|5555991584|    4.0|A1AISPOIIHTHXX|
|5555991584|    5.0|A2P49WD75WHAG5|
|5555991584|    3.0|A3O90G1D7I5EGG|
|5555991584|    5.0|A3EJYJC25OJVKK|
|5555991584|    5.0|A1DA8VOH9NR6C7|
|5555991584|    5.0|A33TRNCQK4IUO7|
|5555991584|    5.0| AWY3EPKEOUV1W|
|5555991584|    4.0|A1SCJWCMQ3W3KK|
|5555991584|    5.0|A14BTJRH9VNLJJ|
|5555991584|    5.0|A2AOZQ3WTNVVOK|
|5555991584|    4.0|A1BXA3SM3AJOKL|
|5555991584|    5.0|A3CCYAQRHUTPIQ|
|5555991584|    5.0| AHUT55E980RDR|
|5555991584|    5.0|A24N1BAS3CU27H|
|5555991584|    4.0|A19YHEBK099R7U|
|5555991584|    5.0|A16KCH578FG4B4|
+----------+-------+--------------+
only showing top 20 rows



In [14]:
from pyspark.sql.functions import udf

In [15]:
# to convert alphanumeric ids to integer type ids
def modify_ids(sample):
    i = 0
    user_map = {}
    for user in sample:
        if user in user_map.keys():
            pass
        else:
            i = i + 1
            user_map[user] = i
            
    return user_map

In [16]:
arr1 = musint.select('asin').distinct().rdd.map(lambda r: r[0]).collect()
arr2 = musint.select('reviewerID').distinct().rdd.map(lambda r:r[0]).collect()
dicti1 = modify_ids(arr1)
dicti2 = modify_ids(arr2)

In [17]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

def translate(mapping):
    def translate_(col):
        return mapping.get(col)
    return udf(translate_, IntegerType())

musint = musint.withColumn("product_id", translate(dicti1)("asin"))
musint = musint.withColumn("reviewer_id", translate(dicti2)("reviewerID"))

In [18]:
# creating a new column rating by converting overall column to float type
from pyspark.sql.types import FloatType
musint=musint.withColumn("rating", musint["overall"].cast(FloatType()))

In [19]:
musint=musint.drop(*['asin','reviewerID','overall'])

In [20]:
musint.show()

+----------+-----------+------+
|product_id|reviewer_id|rating|
+----------+-----------+------+
|      2421|       4724|   5.0|
|      2421|       4756|   5.0|
|      2421|       3339|   5.0|
|      2421|        353|   5.0|
|      2421|       4631|   4.0|
|      2421|       4430|   5.0|
|      2421|       3851|   3.0|
|      2421|       2682|   5.0|
|      2421|       4910|   5.0|
|      2421|       1426|   5.0|
|      2421|       1399|   5.0|
|      2421|       2821|   4.0|
|      2421|       2741|   5.0|
|      2421|       3453|   5.0|
|      2421|       1699|   4.0|
|      2421|       1865|   5.0|
|      2421|       3884|   5.0|
|      2421|       2043|   5.0|
|      2421|       2852|   4.0|
|      2421|       2529|   5.0|
+----------+-----------+------+
only showing top 20 rows



In [21]:
np.random.seed(15)

In [62]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

(training,test) = musint.randomSplit([0.8, 0.2])

#als = ALS(userCol = "reviewer_id",itemCol="product_id", ratingCol="rating",coldStartStrategy = "drop",nonnegative = True)
als = ALS(maxIter=20, regParam=0.2, rank = 80, userCol="reviewer_id", itemCol="product_id", ratingCol="rating",
          coldStartStrategy="drop", nonnegative=True)
model = als.fit(training)

In [147]:
# best model we got with maximum iterations 20, rank = 80 and regularization parameter = 0.2

In [63]:
predictions = model.transform(test)
evaluator1 = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
evaluator2 = RegressionEvaluator(metricName="mae", labelCol="rating",predictionCol="prediction")
rmse = evaluator1.evaluate(predictions)
mae = evaluator2.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))
print("Mean Absolute error = " + str(mae))

Root-mean-square error = 0.9987045802088677
Mean Absolute error = 0.7905045967999144


In [90]:
from pyspark.sql.functions import desc
# top 20 recommendations overall based on products rated say, above 100 times.
ratedMost = training.groupBy('product_id').count().filter("count > 100").sort(desc("count"))

In [105]:
type(ratedMost)

pyspark.sql.dataframe.DataFrame

In [91]:
ratedMost.show()

+----------+-----+
|product_id|count|
+----------+-----+
|       367|  211|
|       323|  210|
|      2519|  160|
|       863|  151|
|      1365|  146|
|      1017|  144|
|      2091|  144|
|      1672|  138|
|      3053|  132|
|       448|  131|
|      3211|  129|
|      2560|  128|
|      1731|  123|
|      2779|  123|
|       104|  119|
|      2273|  113|
|      2871|  112|
|      2461|  111|
|       200|  107|
|      2388|  107|
+----------+-----+
only showing top 20 rows



In [92]:
ratedMost.count()
# there are 22 products rated more than 100 times, out of which we select top 10 to be recommended to all customers.

22

In [102]:
# top 10 recommendations alongwith Ratings
ratedMost.head(10)

[Row(product_id=367, count=211),
 Row(product_id=323, count=210),
 Row(product_id=2519, count=160),
 Row(product_id=863, count=151),
 Row(product_id=1365, count=146),
 Row(product_id=2091, count=144),
 Row(product_id=1017, count=144),
 Row(product_id=1672, count=138),
 Row(product_id=3053, count=132),
 Row(product_id=448, count=131)]

In [122]:
import pyspark.sql.functions as func
predictions = predictions.withColumn("round_prediction",func.round(predictions["prediction"]))

In [123]:
predictions.show()

+----------+-----------+------+----------+----------------+
|product_id|reviewer_id|rating|prediction|round_prediction|
+----------+-----------+------+----------+----------------+
|       148|       1507|   5.0|  4.795439|             5.0|
|       148|        808|   5.0| 4.0881286|             4.0|
|       148|       4408|   5.0|  4.877437|             5.0|
|       148|       3010|   5.0| 4.1411295|             4.0|
|       148|       1588|   3.0| 3.2804918|             3.0|
|       148|       2549|   5.0| 4.8106055|             5.0|
|       148|       2882|   4.0|  4.481371|             4.0|
|       148|       1902|   4.0|  3.920722|             4.0|
|       148|       2210|   5.0|  4.823954|             5.0|
|       148|       1275|   5.0|  4.403393|             4.0|
|       148|       3209|   5.0| 4.8427114|             5.0|
|       148|       3757|   5.0| 4.9464245|             5.0|
|       148|       2994|   5.0|  4.697911|             5.0|
|       148|       2968|   5.0|  4.75443

In [125]:
# checked for rounded values of prediction column
evaluator1 = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="round_prediction")
evaluator2 = RegressionEvaluator(metricName="mae", labelCol="rating",predictionCol="round_prediction")
rmse = evaluator1.evaluate(predictions)
mae = evaluator2.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))
print("Mean Absolute error = " + str(mae))

Root-mean-square error = 0.6596418459335643
Mean Absolute error = 0.36278904414492397


In [146]:
model.recommendForAllUsers(10).show()

+-----------+--------------------+
|reviewer_id|     recommendations|
+-----------+--------------------+
|       1580|[[2637, 5.194005]...|
|       4900|[[2143, 5.273694]...|
|       5300|[[3341, 4.46827],...|
|        471|[[1113, 4.589177]...|
|       1591|[[1358, 4.6457696...|
|       4101|[[2599, 4.9881334...|
|       1342|[[1113, 5.2407956...|
|       2122|[[1957, 4.731841]...|
|       2142|[[153, 5.0766716]...|
|        463|[[1524, 4.639947]...|
|        833|[[1185, 4.0442247...|
|       3794|[[2637, 5.2714286...|
|       1645|[[1113, 5.224621]...|
|       3175|[[2599, 4.0874147...|
|       4935|[[1547, 4.4668393...|
|        496|[[337, 3.3820245]...|
|       2366|[[1223, 4.3924766...|
|       2866|[[376, 4.4811306]...|
|       5156|[[583, 4.9026413]...|
|       3997|[[1113, 5.005373]...|
+-----------+--------------------+
only showing top 20 rows



In [137]:
from pyspark.sql.functions import struct
predictions2=predictions.withColumn("pred_tuple",struct(predictions.round_prediction,predictions.rating))

In [139]:
pred_list = predictions2.select('pred_tuple').collect()

In [141]:
predictionAndLabels = sc.parallelize(pred_list)

In [142]:
metrics = MulticlassMetrics(predictionAndLabels)

In [27]:
# implementing it on the complete dataset

In [53]:
print (sc.getConf().toDebugString())

spark.app.id=local-1525620598384
spark.app.name=My App
spark.driver.host=SpectreX.home
spark.driver.port=37337
spark.executor.id=driver
spark.master=local
spark.rdd.compress=True
spark.serializer.objectStreamReset=100
spark.submit.deployMode=client
spark.ui.showConsoleProgress=true


In [44]:
musint.groupBy('rating').count().show()

+------+-----+
|rating|count|
+------+-----+
|   5.0|35580|
|   2.0| 3010|
|   3.0| 6789|
|   1.0| 2791|
|   4.0|16536|
+------+-----+



In [None]:
precision = metrics.precision()
recall = metrics.recall()
f1Score = metrics.fMeasure()
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)