# System rekomendacji

## Wczytanie bibliotek

In [5]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import pandas as pd

## 1. Przygotować zbiór pod system rekomendacji (ALS - item/user/rating)

In [3]:
# Inicjalizacja sesji Spark

spark = SparkSession.builder.appName("BookRecommendation").getOrCreate()

# Wczytanie danych z tabel

books_with_header = spark.read.format("csv").option("header", "true").load("/Users/zuzapiekarczyk/Downloads/data/books_with_header.csv")
ratings_with_header = spark.read.format("csv").option("header", "true").load("/Users/zuzapiekarczyk/Downloads/data/ratings_with_header.csv")

23/12/03 13:47:16 WARN Utils: Your hostname, Zuzas-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 172.20.10.10 instead (on interface en0)
23/12/03 13:47:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/03 13:47:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Konwersja typów kolumn do odpowiednich formatów

books_with_header = books_with_header.withColumn("book_id", books_with_header["book_id"].cast("int"))
ratings_with_header = ratings_with_header.withColumn("book_id", ratings_with_header["book_id"].cast("int"))
ratings_with_header = ratings_with_header.withColumn("user_id", ratings_with_header["user_id"].cast("int"))
ratings_with_header = ratings_with_header.withColumn("rating", ratings_with_header["rating"].cast("int"))


In [4]:
# Utworzenie macierzy rzadkiej

rating_matrix = ratings_with_header.select("user_id", "book_id", "rating")
rating_matrix

DataFrame[user_id: int, book_id: int, rating: int]

In [5]:
# Konwersja do formatu MLlib

assembler = VectorAssembler(inputCols=["user_id", "book_id"], outputCol="features")
rating_matrix = assembler.transform(rating_matrix)


## 2. Stworzyć model zbiorowej filtracji (dla zadanych wartości hiperparametrów) i obliczyć błąd prognoz.

In [6]:
# Podział danych na zbiór treningowy i testowy w proporcji 80/20

(training, test) = rating_matrix.randomSplit([0.8, 0.2])

In [42]:
# Utworzenie modelu ALS

als = ALS(maxIter=5, regParam=0.1, rank=10, userCol="user_id", itemCol="book_id", ratingCol="rating",
          coldStartStrategy="drop")

model = als.fit(training)

                                                                                

23/11/28 22:19:12 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1008502 ms exceeds timeout 120000 ms
23/11/28 22:19:12 WARN SparkContext: Killing executors is not supported by current scheduler.
23/11/28 22:27:38 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at 

In [40]:
# Przewidywanie ocen dla zbioru testowego

predictions = model.transform(test)

In [37]:
predictions.head(10)

                                                                                

[Row(user_id=26, book_id=13, rating=4, features=DenseVector([26.0, 13.0]), prediction=4.117001533508301),
 Row(user_id=26, book_id=179, rating=4, features=DenseVector([26.0, 179.0]), prediction=4.784938335418701),
 Row(user_id=26, book_id=494, rating=4, features=DenseVector([26.0, 494.0]), prediction=4.356354236602783),
 Row(user_id=26, book_id=591, rating=5, features=DenseVector([26.0, 591.0]), prediction=4.5152177810668945),
 Row(user_id=26, book_id=688, rating=4, features=DenseVector([26.0, 688.0]), prediction=3.5828335285186768),
 Row(user_id=26, book_id=795, rating=3, features=DenseVector([26.0, 795.0]), prediction=3.730262517929077),
 Row(user_id=26, book_id=941, rating=4, features=DenseVector([26.0, 941.0]), prediction=3.913710832595825),
 Row(user_id=26, book_id=2029, rating=5, features=DenseVector([26.0, 2029.0]), prediction=4.600663185119629),
 Row(user_id=26, book_id=2229, rating=2, features=DenseVector([26.0, 2229.0]), prediction=3.3940694332122803),
 Row(user_id=26, book_i

In [41]:
# Ewaluacja modelu za pomocą RMSE

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

print(f"Root Mean Squared Error (RMSE) = {rmse}")



Root Mean Squared Error (RMSE) = 0.8447153388537996


                                                                                

In [11]:
# User Recommendation
userRecs = model.recommendForAllUsers(10)

# Book Recommendation
bookRecs = model.recommendForAllItems(10)

In [12]:
userRecs.show(truncate=False)




+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user_id|recommendations                                                                                                                                                                             |
+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|26     |[{8548, 5.9490333}, {7947, 5.5772033}, {9934, 5.559303}, {8877, 5.4523516}, {9537, 5.448577}, {8182, 5.418609}, {8732, 5.3043675}, {7942, 5.300556}, {9219, 5.257366}, {7540, 5.2399592}]   |
|27     |[{7947, 5.6720543}, {8217, 5.420964}, {8548, 5.354665}, {7457, 5.271227}, {9934, 5.1303253}, {8182, 5.042175}, {7548, 4.9938865}, {9024, 4.90975}, {9049, 4.9077272}, {9531, 4.8725295}]    |
|28  

                                                                                

In [13]:
bookRecs.show(truncate=False)



+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|book_id|recommendations                                                                                                                                                                                      |
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|26     |[{46878, 7.161837}, {33901, 6.873577}, {28723, 6.7447267}, {36303, 6.6279874}, {34937, 6.620085}, {46930, 6.555542}, {25707, 6.5366373}, {24746, 6.3959656}, {37320, 6.3178897}, {27149, 6.3118634}] |
|27     |[{47971, 6.526763}, {39805, 6.4469504}, {34498, 6.4283485}, {14037, 6.426958}, {42583, 6.3730054}, {46698, 6.283315}, {25707, 6.2439566}, {44282, 6.2052207}, {

                                                                                

## 3. Tuning modelu - wykorzystać kroswalidację (CrossValidator i ParamGridBuilder w pySparku) - spróbować uzyskać lepszy model niż w pkt 2.

In [26]:
# Definicja siatki parametrów

param_grid = ParamGridBuilder() \
    .addGrid(als.maxIter, [5, 10, 15]) \
    .addGrid(als.regParam, [0.1, 0.2, 0.5]) \
    .addGrid(als.rank, [10, 20, 25]) \
    .build()

In [27]:
# Utworzenie kroswalidatora

crossval = CrossValidator(estimator=als,
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator,
                          numFolds=3)  # liczba podziałów kroswalidacji


In [28]:
# Dopasowanie modelu za pomocą kroswalidacji

cv_model = crossval.fit(training)

23/11/28 13:23:32 WARN MemoryStore: Not enough space to cache rdd_30347_5 in memory! (computed 28.0 MiB so far)
23/11/28 13:23:32 WARN BlockManager: Persisting block rdd_30347_5 to disk instead.
23/11/28 13:23:32 WARN MemoryStore: Not enough space to cache rdd_30347_1 in memory! (computed 27.6 MiB so far)
23/11/28 13:23:32 WARN BlockManager: Persisting block rdd_30347_1 to disk instead.
23/11/28 13:23:32 WARN MemoryStore: Not enough space to cache rdd_30347_2 in memory! (computed 27.7 MiB so far)
23/11/28 13:23:32 WARN BlockManager: Persisting block rdd_30347_2 to disk instead.
23/11/28 13:23:32 WARN MemoryStore: Not enough space to cache rdd_30347_6 in memory! (computed 28.1 MiB so far)
23/11/28 13:23:32 WARN BlockManager: Persisting block rdd_30347_6 to disk instead.
23/11/28 13:28:21 WARN MemoryStore: Not enough space to cache rdd_34672_3 in memory! (computed 27.8 MiB so far)
23/11/28 13:28:21 WARN BlockManager: Persisting block rdd_34672_3 to disk instead.
23/11/28 13:28:21 WARN Me

In [29]:
# Predykcja dla zbioru testowego

predictions = cv_model.transform(test)

In [30]:
# Obliczenie RMSE

rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) = {rmse}")



Root Mean Squared Error (RMSE) = 0.8137209848577016


                                                                                

In [31]:
# Najlepsze parametry modelu

best_model = cv_model.bestModel
print("Best model parameters:")
print(f"  Max Iterations: {best_model._java_obj.parent().getMaxIter()}")
print(f"  Regularization Parameter: {best_model._java_obj.parent().getRegParam()}")
print(f"  Rank: {best_model._java_obj.parent().getRank()}")

Best model parameters:
  Max Iterations: 10
  Regularization Parameter: 0.1
  Rank: 20


### **Interpretacja wyników:**

##### RMSE to miara błędu prognoz - im niższa wartość RMSE, tym lepiej model dopasowuje się do danych.

##### Pierwszy model miał wyższe RMSE (0.8447), co oznacza, że miał gorsze dopasowanie do danych testowych/walidacyjnych w porównaniu do drugiego modelu (0.8137).

##### Niestety oba modele posiadają zbyt wysoką wartość RMSE (są obarczone zbyt duzym błedem predykcji) i nie rekomendujemy ich wykorzystania.

In [32]:
spark.stop()