In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

# Create Spark Session
spark = SparkSession \
    .builder \
    .appName("Python Spark Recommendation Systems") \
    .getOrCreate()

print(spark)

<pyspark.sql.session.SparkSession object at 0x000002309DFE33C8>


In [3]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

## Praproses

In [5]:
lines = spark.read.text("ratings.txt").rdd

In [6]:
print(lines.take(5))

[Row(value='1::1193::5::978300760'), Row(value='1::661::3::978302109'), Row(value='1::914::3::978301968'), Row(value='1::3408::4::978300275'), Row(value='1::2355::5::978824291')]


In [7]:
parts = lines.map(lambda row: row.value.split("::"))

In [8]:
ratingsRDD = parts.map(lambda p: Row(UserId=int(p[0]), MovieId=int(p[1]),
                                     Rating=float(p[2]), Timestamp=int(p[3])))

In [9]:
print(ratingsRDD.take(5))

[Row(MovieId=1193, Rating=5.0, Timestamp=978300760, UserId=1), Row(MovieId=661, Rating=3.0, Timestamp=978302109, UserId=1), Row(MovieId=914, Rating=3.0, Timestamp=978301968, UserId=1), Row(MovieId=3408, Rating=4.0, Timestamp=978300275, UserId=1), Row(MovieId=2355, Rating=5.0, Timestamp=978824291, UserId=1)]


In [10]:
ratings = spark.createDataFrame(ratingsRDD)
ratings.show()

+-------+------+---------+------+
|MovieId|Rating|Timestamp|UserId|
+-------+------+---------+------+
|   1193|   5.0|978300760|     1|
|    661|   3.0|978302109|     1|
|    914|   3.0|978301968|     1|
|   3408|   4.0|978300275|     1|
|   2355|   5.0|978824291|     1|
|   1197|   3.0|978302268|     1|
|   1287|   5.0|978302039|     1|
|   2804|   5.0|978300719|     1|
|    594|   4.0|978302268|     1|
|    919|   4.0|978301368|     1|
|    595|   5.0|978824268|     1|
|    938|   4.0|978301752|     1|
|   2398|   4.0|978302281|     1|
|   2918|   4.0|978302124|     1|
|   1035|   5.0|978301753|     1|
|   2791|   4.0|978302188|     1|
|   2687|   3.0|978824268|     1|
|   2018|   4.0|978301777|     1|
|   3105|   5.0|978301713|     1|
|   2797|   4.0|978302039|     1|
+-------+------+---------+------+
only showing top 20 rows



## Create Model

In [11]:
(training, test) = ratings.randomSplit([0.8, 0.2])

In [12]:
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="UserId", itemCol="MovieId", ratingCol="Rating",
          coldStartStrategy="drop")
model = als.fit(training)

In [13]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="Rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.8946665353040703


In [14]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

In [15]:
userRecs.show()

+------+--------------------+
|UserId|     recommendations|
+------+--------------------+
|  1580|[[2964, 9.841737]...|
|  4900|[[2776, 7.6694064...|
|  5300|[[3003, 6.2810597...|
|   471|[[2197, 7.755476]...|
|  1591|[[666, 6.2788205]...|
|  4101|[[3867, 11.383509...|
|  1342|[[3867, 6.652906]...|
|  2122|[[3382, 6.647959]...|
|  2142|[[2332, 8.078419]...|
|   463|[[1038, 6.728911]...|
|   833|[[2512, 13.647819...|
|  5803|[[97, 10.026361],...|
|  3794|[[3003, 10.132432...|
|  1645|[[2063, 8.683527]...|
|  3175|[[2192, 7.945146]...|
|  4935|[[1038, 9.170467]...|
|   496|[[3867, 9.285571]...|
|  2366|[[3050, 6.762451]...|
|  2866|[[3003, 7.2467127...|
|  5156|[[2658, 6.864898]...|
+------+--------------------+
only showing top 20 rows



In [16]:
movieRecs.show()

+-------+--------------------+
|MovieId|     recommendations|
+-------+--------------------+
|   1580|[[2867, 5.493834]...|
|    471|[[283, 5.735796],...|
|   1591|[[1445, 5.484816]...|
|   1342|[[5816, 6.1502576...|
|   2122|[[708, 7.366824],...|
|   2142|[[4283, 4.890501]...|
|    463|[[642, 8.316989],...|
|    833|[[5214, 7.1485615...|
|   3794|[[4703, 8.85114],...|
|   1645|[[2534, 6.2322097...|
|   3175|[[41, 6.254596], ...|
|    496|[[3545, 8.806262]...|
|   2366|[[128, 7.266315],...|
|   2866|[[168, 5.535595],...|
|    148|[[357, 9.270704],...|
|   1088|[[41, 7.566494], ...|
|   1238|[[2375, 7.319431]...|
|   3918|[[5328, 7.3013663...|
|   1829|[[642, 8.107708],...|
|   1959|[[41, 6.1934733],...|
+-------+--------------------+
only showing top 20 rows

