In [1]:
from paths import Paths

In [1]:
from pyspark.sql import SparkSession
app_name = 'interactions'
# spark = SparkSession.builder.appName(app_name).getOrCreate()
spark = SparkSession.builder \
.appName(app_name) \
.master("local[*]") \
.config("spark.driver.memory", "12g") \
.config("spark.sql.debug.maxToStringFields", 1000) \
.getOrCreate()

23/10/23 19:13:11 WARN Utils: Your hostname, gr00stl-Legion-Y540-15IRH-PG0 resolves to a loopback address: 127.0.1.1; using 192.168.1.43 instead (on interface wlp0s20f3)
23/10/23 19:13:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/23 19:13:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

root
 |-- user_id: integer (nullable = true)
 |-- book_id: integer (nullable = true)
 |-- rating: integer (nullable = true)



                                                                                

In [None]:
data_path = Paths.data

interactions_df = spark.read.csv(f"{data_path}goodreads_interactions.csv", header=True, inferSchema=True).select(
    'user_id',
    'book_id',
    'rating'
).limit(10000)

interactions_df.printSchema()

In [2]:
# filter interactions to only include ratings
ratings_df = interactions_df.filter(interactions_df.rating != 0)
ratings_df.show(5)
ratings_df.count()

+-------+-------+------+
|user_id|book_id|rating|
+-------+-------+------+
|      0|    948|     5|
|      0|    947|     5|
|      0|    946|     5|
|      0|    945|     5|
|      0|    944|     5|
+-------+-------+------+
only showing top 5 rows



2805

In [3]:
# Splitting the data into training and testing
# 0.3/0.7
train_data, test_data = ratings_df.randomSplit([0.8, 0.2], seed=42)

# Alternating Least Squares
from pyspark.ml.recommendation import ALS

als = ALS(
    userCol='user_id',
    itemCol='book_id',
    ratingCol='rating',
    nonnegative=True,
    coldStartStrategy='drop',
    maxIter=5,
    rank=4,
    regParam=0.01,
)

model = als.fit(train_data)

# Evaluate the model by computing the RMSE on the test data
from pyspark.ml.evaluation import RegressionEvaluator
predictions = model.transform(test_data)

evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')

rmse = evaluator.evaluate(predictions)

print('RMSE:\n', rmse)



RMSE:
 2.455986167634981


In [4]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# parameter grid
param_grid = ParamGridBuilder() \
    .addGrid(als.rank, [10, 50, 100, 150]) \
    .addGrid(als.regParam, [.01, .05, .1, .15]) \
    .build()

# cross validation
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

# best model
model = cv.fit(train_data)

# make predictions on test data
predictions = model.transform(test_data)

# calculate RMSE on test data
rmse = evaluator.evaluate(predictions)

print('RMSE:\n', rmse)


RMSE:
 1.4922181968402317


In [5]:
# Recommend book to user by user_id
# select random user_id
from pyspark.sql.functions import rand

user_id = ratings_df.select('user_id').distinct().orderBy(rand()).first().user_id
print('user_id:\n', user_id)

# filter out books that user has already rated
books_already_rated = ratings_df.filter(ratings_df.user_id == user_id).select('book_id', 'user_id')

# recommend books to user
recommendations = model.transform(books_already_rated).orderBy('prediction', ascending=False)

# get recommended book_ids
recommendations.show(5)


user_id:
 7
+-------+-------+----------+
|book_id|user_id|prediction|
+-------+-------+----------+
|   7247|      7|  4.931852|
|   7244|      7|  4.931852|
|   7220|      7|  4.931852|
|   7240|      7|  4.931852|
|   7231|      7|  4.931852|
+-------+-------+----------+
only showing top 5 rows



In [9]:
# get book titles using book_id in recommendations, search in books_sample.parquet

# 1.1 read books_sample.parquet
books_df = spark.read.parquet(f"{data_path}books.parquet").select('book_id', 'title')

# 1.2 join recommendations with books_df on column book_id, select columns book_id, title
recommendations_df = recommendations.join(books_df, on='book_id', how='inner').select('book_id', 'title', 'prediction')

# 1.3 show top 5 books -> sort by prediction
recommendations_df.orderBy('prediction', ascending=False).show(5)

+-------+--------------------+----------+
|book_id|               title|prediction|
+-------+--------------------+----------+
|   7240|The Prodigal Son ...|  4.931852|
|   7238|Prodigal Son (Dea...|  4.931852|
|   7244|The Poisonwood Bible|  4.931852|
|   7220|The Secret Life o...|  4.931852|
|   7247|Barbara Kingsolve...|  4.931852|
+-------+--------------------+----------+
only showing top 5 rows



In [7]:
# show books read by user (user_id)
ratings_df.filter(ratings_df['user_id'] == user_id).orderBy('rating', ascending=False).show(100)


+-------+-------+------+
|user_id|book_id|rating|
+-------+-------+------+
|      7|    536|     5|
|      7|   1387|     5|
|      7|   1386|     5|
|      7|    417|     5|
|      7|   7247|     5|
|      7|   7244|     5|
|      7|   7243|     5|
|      7|   1574|     5|
|      7|   7241|     5|
|      7|   7240|     5|
|      7|   7238|     5|
|      7|   5770|     5|
|      7|   7231|     5|
|      7|   7220|     5|
|      7|    197|     5|
|      7|   1525|     4|
|      7|   7242|     4|
|      7|   1605|     4|
|      7|   7232|     4|
|      7|   7211|     4|
|      7|   7250|     3|
|      7|    821|     3|
|      7|    461|     3|
|      7|   7236|     3|
|      7|   7219|     3|
|      7|   7213|     3|
|      7|   1543|     3|
|      7|   7181|     3|
|      7|   7176|     3|
|      7|   7208|     2|
|      7|   7233|     1|
|      7|   1604|     1|
|      7|   7198|     1|
+-------+-------+------+



In [19]:
# Surprise reader
from surprise import Reader, Dataset, SVD
from surprise.model_selection import cross_validate

reader = Reader(rating_scale=(1, 5))

# Load the dataset using the reader
data = Dataset.load_from_df(ratings_df.toPandas(), reader)

# Use the SVD algorithm.
svd = SVD()

# Run 5-fold cross-validation and then print results
cross_validate(svd, data, measures=['RMSE', 'MAE'], cv=5, verbose=True)

# train on whole dataset
trainset = data.build_full_trainset()
svd.fit(trainset)

# predict rating for user_id and book_id
random_book_id = ratings_df.select('book_id').distinct().orderBy(rand()).first().book_id
print('book_id:\n', random_book_id)

Evaluating RMSE, MAE of algorithm SVD on 5 split(s).

                  Fold 1  Fold 2  Fold 3  Fold 4  Fold 5  Mean    Std     
RMSE (testset)    0.7979  0.7704  0.8045  0.8459  0.7986  0.8034  0.0243  
MAE (testset)     0.6416  0.6119  0.6304  0.6510  0.6389  0.6348  0.0132  
Fit time          0.03    0.02    0.02    0.02    0.02    0.02    0.00    
Test time         0.00    0.00    0.00    0.00    0.00    0.00    0.00    
book_id:
 2980


Prediction(uid=7, iid=2980, r_ui=None, est=3.595612404313358, details={'was_impossible': False})