## Iniitializing Environment and Loading Data

In [1]:
import findspark
findspark.init('/home/shahayush954/spark-3.4.1-bin-hadoop3')

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('recommender').getOrCreate()

23/08/20 18:58:02 WARN Utils: Your hostname, ubuntu-22 resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
23/08/20 18:58:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/20 18:58:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/08/20 18:58:07 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/08/20 18:58:07 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [4]:
data = spark.read.csv('movielens_ratings.csv', header=True, inferSchema=True)

                                                                                

In [5]:
data.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- userId: integer (nullable = true)



In [6]:
data.describe().show()

                                                                                

+-------+------------------+------------------+------------------+
|summary|           movieId|            rating|            userId|
+-------+------------------+------------------+------------------+
|  count|              1501|              1501|              1501|
|   mean| 49.40572951365756|1.7741505662891406|14.383744170552964|
| stddev|28.937034065088994| 1.187276166124803| 8.591040424293272|
|    min|                 0|               1.0|                 0|
|    max|                99|               5.0|                29|
+-------+------------------+------------------+------------------+



In [7]:
train_data, test_data = data.randomSplit([0.8,0.2])

## Recommender Algorithms

In [8]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [9]:
als = ALS(maxIter=5, regParam=0.01, userCol='userId', ratingCol='rating', itemCol='movieId')

In [10]:
model = als.fit(train_data)

                                                                                

In [11]:
predictions = model.transform(test_data)

In [12]:
predictions.show()

                                                                                

+-------+------+------+-----------+
|movieId|rating|userId| prediction|
+-------+------+------+-----------+
|      1|   1.0|    28|  2.6166077|
|      2|   4.0|    28|  1.5665857|
|      6|   1.0|    28|-0.25033987|
|      4|   1.0|    12|  1.2256584|
|      3|   2.0|    22|  1.5913528|
|      5|   2.0|    22|  0.2002976|
|      6|   1.0|     1|  1.5998328|
|      6|   1.0|    20|   0.745488|
|      0|   1.0|    15|-0.38979286|
|      1|   4.0|    15|  1.4149723|
|      2|   1.0|    15|  -1.209104|
|      2|   3.0|     9|  1.4422479|
|      3|   1.0|     9| 0.99841034|
|      5|   1.0|     9|  0.4066971|
|      0|   1.0|     8|  1.3650738|
|      5|   1.0|     8| 0.80226296|
|      6|   2.0|    23|  0.6783154|
|      6|   3.0|    24|  1.4898047|
|      3|   1.0|    29| 0.53934264|
|      4|   1.0|    29|    1.51045|
+-------+------+------+-----------+
only showing top 20 rows



In [13]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')

In [14]:
rmse = evaluator.evaluate(predictions)

                                                                                

In [15]:
rmse

1.5804647976884063

In [16]:
single_user = test_data.filter(test_data['userId'] == 11).select(['movieId', 'userId'])

In [17]:
single_user.show()

+-------+------+
|movieId|userId|
+-------+------+
|      9|    11|
|     13|    11|
|     20|    11|
|     30|    11|
|     40|    11|
|     47|    11|
|     50|    11|
|     61|    11|
|     82|    11|
|     88|    11|
+-------+------+



In [18]:
recommendations = model.transform(single_user)

In [19]:
recommendations.orderBy('prediction').show()

                                                                                

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|     47|    11|-2.6326303|
|     61|    11|0.40718973|
|     40|    11|0.66728747|
|     82|    11| 0.9402393|
|      9|    11|  1.249193|
|     20|    11| 1.6235536|
|     13|    11|  1.743881|
|     50|    11| 1.8471425|
|     88|    11|  2.113815|
|     30|    11| 3.3900404|
+-------+------+----------+

