In [1]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"

In [2]:
import findspark
findspark.init('../spark-3.2.0-bin-hadoop3.2')

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName('ML-recommendation').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/09 05:40:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
from pyspark.ml import Pipeline

In [6]:
from pyspark.ml.recommendation import ALS

In [7]:
from pyspark.ml.evaluation import RegressionEvaluator

In [8]:
data = spark.read.csv("../data/movielens_ratings.csv",inferSchema=True,header=True)

                                                                                

In [9]:
data.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- userId: integer (nullable = true)



In [10]:
data.describe().show()

+-------+------------------+------------------+------------------+
|summary|           movieId|            rating|            userId|
+-------+------------------+------------------+------------------+
|  count|              1501|              1501|              1501|
|   mean| 49.40572951365756|1.7741505662891406|14.383744170552964|
| stddev|28.937034065088994| 1.187276166124803| 8.591040424293272|
|    min|                 0|               1.0|                 0|
|    max|                99|               5.0|                29|
+-------+------------------+------------------+------------------+



In [14]:
train_data,test_data = data.randomSplit([0.8,0.2])

In [16]:
als = ALS(maxIter=5,regParam=0.01,userCol='userId',itemCol='movieId',ratingCol='rating')
model = als.fit(train_data)


24/09/09 05:42:31 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/09/09 05:42:31 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
24/09/09 05:42:31 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [17]:
print('Factorized user matrix with rank = %d' % model.rank)
model.userFactors.show(5)

print('-'*50)

print('Factorized item matrix with rank = %d' % model.rank)
model.itemFactors.show(5)

Factorized user matrix with rank = 10




+---+--------------------+
| id|            features|
+---+--------------------+
|  0|[0.3853289, 0.226...|
| 10|[0.5695889, -0.03...|
| 20|[-0.019640552, 0....|
|  1|[-0.1396118, 0.32...|
| 11|[0.71201307, 0.20...|
+---+--------------------+
only showing top 5 rows

--------------------------------------------------
Factorized item matrix with rank = 10
+---+--------------------+
| id|            features|
+---+--------------------+
|  0|[0.7780875, 1.221...|
| 10|[0.37656885, 1.88...|
| 20|[-1.0278602, 1.12...|
| 30|[-1.2715393, 2.52...|
| 40|[1.4292676, 1.898...|
+---+--------------------+
only showing top 5 rows



In [18]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)

In [19]:
predictions.show()

                                                                                

+-------+------+------+----------+
|movieId|rating|userId|prediction|
+-------+------+------+----------+
|      1|   1.0|     7| 1.0649889|
|      3|   3.0|    14|  2.549052|
|      3|   1.0|    17|0.81872267|
|      3|   1.0|    29| 1.1715821|
|      5|   1.0|     5|  1.542409|
|      5|   1.0|     6|0.94368494|
|      5|   1.0|     8| 1.3922232|
|      4|   1.0|     9| 1.4800656|
|      4|   3.0|    10| 2.8219302|
|      4|   1.0|    14|  3.891878|
|      4|   1.0|    19| 0.8771821|
|      4|   1.0|    23| 1.1448159|
|      4|   1.0|    24| 1.0065305|
|      2|   4.0|    10|  3.952942|
|      2|   1.0|    16|  1.792908|
|      2|   2.0|    20| 2.0474591|
|      2|   4.0|    21| 1.2471565|
|      0|   1.0|     3|0.76337856|
|      0|   1.0|    13| 0.6392587|
|      0|   1.0|    23| 1.1072091|
+-------+------+------+----------+
only showing top 20 rows



In [20]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

                                                                                

Root-mean-square error = 0.7859858287272578


In [21]:
single_user = test.filter(test['userId']==11).select(['movieId','userId'])

In [22]:
# User had 10 ratings in the test data set 
# Realistically this should be some sort of hold out set!
single_user.show()

+-------+------+
|movieId|userId|
+-------+------+
|     19|    11|
|     20|    11|
|     22|    11|
|     32|    11|
|     35|    11|
|     36|    11|
|     39|    11|
|     51|    11|
|     66|    11|
|     69|    11|
|     75|    11|
|     79|    11|
|     82|    11|
|     97|    11|
+-------+------+



In [23]:
reccomendations = model.transform(single_user)


In [24]:
reccomendations.orderBy('prediction',ascending=False).show()


+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|     32|    11| 5.0906715|
|     79|    11| 5.0278964|
|     69|    11|  4.833786|
|     66|    11| 3.8621588|
|     19|    11| 3.4755912|
|     51|    11| 3.0298998|
|     75|    11| 3.0292099|
|     35|    11| 2.9244995|
|     36|    11| 2.1798804|
|     82|    11|  1.374917|
|     97|    11| 1.2526332|
|     22|    11|  1.076541|
|     20|    11| 0.8677777|
|     39|    11|-1.3792727|
+-------+------+----------+

