# Reading Data from a File

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Recommender Systems Demo').getOrCreate()

22/10/24 11:31:21 WARN Utils: Your hostname, m0 resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
22/10/24 11:31:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/24 11:31:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/10/24 11:31:31 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/10/24 11:31:31 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
22/10/24 11:31:31 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


In [2]:
df = spark.read.csv('7_data/ratings.csv', inferSchema=True, header=True)

                                                                                

In [3]:
def shape(df):
    return (df.count(), len(df.columns))

In [4]:
shape(df)

                                                                                

(100000, 3)

In [5]:
df.show(10,False)

+------+------------+------+
|userId|title       |rating|
+------+------------+------+
|196   |Kolya (1996)|3     |
|63    |Kolya (1996)|3     |
|226   |Kolya (1996)|5     |
|154   |Kolya (1996)|3     |
|306   |Kolya (1996)|5     |
|296   |Kolya (1996)|4     |
|34    |Kolya (1996)|5     |
|271   |Kolya (1996)|4     |
|201   |Kolya (1996)|4     |
|209   |Kolya (1996)|4     |
+------+------------+------+
only showing top 10 rows



In [6]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)



In [8]:
from pyspark.sql.functions import rand

df.orderBy(rand()).show(10)

[Stage 7:>                                                          (0 + 1) / 1]

+------+--------------------+------+
|userId|               title|rating|
+------+--------------------+------+
|   774|         Wolf (1994)|     2|
|   334|Antonia's Line (1...|     4|
|    64|Bridge on the Riv...|     4|
|   690|In the Line of Fi...|     4|
|   548| Jackie Brown (1997)|     4|
|   227|    Star Wars (1977)|     4|
|   682|       Sliver (1993)|     2|
|   385|  Raging Bull (1980)|     5|
|   321|  Hoop Dreams (1994)|     4|
|   417|Rebel Without a C...|     4|
+------+--------------------+------+
only showing top 10 rows



                                                                                

In [12]:
from pyspark.ml.feature import StringIndexer, IndexToString 
stringIndexer = StringIndexer(inputCol='title', outputCol='title_id')
model = stringIndexer.fit(df)
indexed = model.transform(df)
indexed.show(10)

                                                                                

+------+------------+------+--------+
|userId|       title|rating|title_id|
+------+------------+------+--------+
|   196|Kolya (1996)|     3|   287.0|
|    63|Kolya (1996)|     3|   287.0|
|   226|Kolya (1996)|     5|   287.0|
|   154|Kolya (1996)|     3|   287.0|
|   306|Kolya (1996)|     5|   287.0|
|   296|Kolya (1996)|     4|   287.0|
|    34|Kolya (1996)|     5|   287.0|
|   271|Kolya (1996)|     4|   287.0|
|   201|Kolya (1996)|     4|   287.0|
|   209|Kolya (1996)|     4|   287.0|
+------+------------+------+--------+
only showing top 10 rows



In [13]:
train, test = indexed.randomSplit([.8, .2], seed=10)

In [14]:
from pyspark.ml.recommendation import ALS

rec = ALS(maxIter=10,
          regParam=0.01,
          userCol='userId',
          itemCol='title_id',
          ratingCol='rating',
          nonnegative=True, # Ensure that no negative ratings are created.
          coldStartStrategy='drop') # Prevent any NaN ratings predictions.

In [16]:
rec_model = rec.fit(train)
predicted_ratings = rec_model.transform(test)
predicted_ratings.printSchema()

                                                                                

22/10/24 11:37:42 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/10/24 11:37:42 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


                                                                                

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- title_id: double (nullable = false)
 |-- prediction: float (nullable = false)



In [17]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName='rmse',
                                predictionCol='prediction',
                                labelCol='rating')
rmse = evaluator.evaluate(predicted_ratings)
rmse

                                                                                

1.0102331219856

In [18]:
unique_items = indexed.select('title_id').distinct()
unique_items.count()

                                                                                

1664

In [19]:
a = unique_items.alias('a')

In [20]:
user_id = 85

visited_items = indexed.filter(indexed['userId'] == user_id).select('title_id').distinct()
visited_items.count()

                                                                                

287

In [27]:
b = visited_items.alias('b')

In [28]:
total_items = a.join(b, a.title_id== b.title_id, how='left')
total_items.show(10)

                                                                                

+--------+--------+
|title_id|title_id|
+--------+--------+
|   305.0|   305.0|
|   596.0|    null|
|   299.0|    null|
|   769.0|    null|
|   692.0|    null|
|   934.0|    null|
|  1051.0|    null|
|   496.0|    null|
|   558.0|   558.0|
|   170.0|    null|
+--------+--------+
only showing top 10 rows



In [29]:
from pyspark.sql.functions import col, lit

remaining_items = total_items.where(col('b.title_id').isNull()).select(a.title_id).distinct()
remaining_items.count()

                                                                                

1377

In [31]:
remaining_items = remaining_items.withColumn('userId', lit(int(user_id)))
remaining_items.show(10)

                                                                                

+--------+------+
|title_id|userId|
+--------+------+
|   596.0|    85|
|   299.0|    85|
|   769.0|    85|
|   692.0|    85|
|   934.0|    85|
|  1051.0|    85|
|   496.0|    85|
|   170.0|    85|
|   184.0|    85|
|   576.0|    85|
+--------+------+
only showing top 10 rows



In [32]:
recommendations = rec_model.transform(remaining_items).orderBy('prediction', ascending=False)
recommendations.show(5)

                                                                                

+--------+------+----------+
|title_id|userId|prediction|
+--------+------+----------+
|  1207.0|    85| 5.5590105|
|  1120.0|    85|  5.400318|
|  1411.0|    85| 5.1537695|
|  1277.0|    85| 5.0424075|
|  1288.0|    85| 5.0154605|
+--------+------+----------+
only showing top 5 rows



In [33]:
items_title = IndexToString(inputCol='title_id',
                            outputCol='title',
                            labels=model.labels)
final_recommendations = items_title.transform(recommendations)
final_recommendations.show(10, False)

                                                                                

+--------+------+----------+----------------------------------------------------+
|title_id|userId|prediction|title                                               |
+--------+------+----------+----------------------------------------------------+
|1207.0  |85    |5.5590105 |Aparajito (1956)                                    |
|1120.0  |85    |5.400318  |Crooklyn (1994)                                     |
|1411.0  |85    |5.1537695 |Boys, Les (1997)                                    |
|1277.0  |85    |5.0424075 |Mina Tannenbaum (1994)                              |
|1288.0  |85    |5.0154605 |Whole Wide World, The (1996)                        |
|1325.0  |85    |4.9820385 |Prisoner of the Mountains (Kavkazsky Plennik) (1996)|
|942.0   |85    |4.962156  |Set It Off (1996)                                   |
|982.0   |85    |4.848961  |Switchback (1997)                                   |
|1347.0  |85    |4.8392844 |Angel Baby (1995)                                   |
|860.0   |85    