In [1]:
import pyspark.sql.functions as F
from pyspark.sql.functions import exp
import pandas as pd
from pyspark.ml.fpm import FPGrowth
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
from pyspark.sql.window import Window

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import random

# COLLABORATIVE FILTERING - ALS

In [32]:
interactions_df = spark.read.option("header", "true").csv('gs://msca-bdp-project-goodreads/goodreads_interactions.csv')

In [33]:
limit_df = interactions_df.filter(F.col('is_read')==1).select('user_id','book_id','rating')#.limit(50000)

In [34]:
del interactions_df

### Normalizing ratings

In [35]:
# Calculate the minimum and maximum ratings for each user
window_spec = Window.partitionBy("user_id")
min_rating = F.min("rating").over(window_spec)
max_rating = F.max("rating").over(window_spec)

# Normalize the ratings to the range [0, 1]
normalized_ratings = (F.col("rating") - min_rating) / (max_rating - min_rating)

# Add the normalized_ratings column to your DataFrame
limit_df = limit_df.withColumn("normalized_rating", normalized_ratings)

### TRAIN TEST SPLIT

In [41]:
user_id_list = limit_df.select(F.collect_set('user_id')).first()[0]

user_id_split_dict = {}
for user_set in user_id_list:
    for user in user_set:
        user_id_split_dict[user] = 0.7

                                                                                

In [42]:
temp_train = limit_df.sampleBy("user_id",fractions = user_id_split_dict)
temp_test = limit_df.subtract(temp_train)

In [8]:
temp_test.show(5)

[Stage 16:>                                                         (0 + 1) / 1]

+-------+-------+------+-----------------+
|user_id|book_id|rating|normalized_rating|
+-------+-------+------+-----------------+
| 101021|  31493|     4|              0.8|
| 107790|   1572|     5|              1.0|
| 109051|   6452|     5|              1.0|
| 109355|   5017|     5|              1.0|
| 109577|1807186|     5|              1.0|
+-------+-------+------+-----------------+
only showing top 5 rows



                                                                                

In [9]:
temp_train.count(),temp_test.count()

                                                                                

(1444, 112129748)

In [10]:
temp_test.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- normalized_rating: double (nullable = true)



### ALS MODEL TRAINING + CUSTOM IMPLEMENTED GRID SEARCH

In [11]:
def generate_random_params():
    #rank = random.choice([10,20,30,40])
    #maxIter = random.choice([2,4,6,8])
    #regParam = random.choice([0.05, 0.15, 0.2,0.25])
    rank = random.choice([25,30,35,40])
    maxIter = random.choice([5,6,7,8])
    regParam = random.choice([0.2,0.25,0.3,0.35])
    return {
        'rank': rank,
        'maxIter': maxIter,
        'regParam': regParam,
        'userCol': 'user_id',
        'itemCol': 'book_id',
        'ratingCol': 'normalized_rating',
        'coldStartStrategy':'drop'
    }

In [48]:
temp_train = temp_train.withColumn("rating",temp_train["rating"].cast('float'))
temp_train = temp_train.withColumn("user_id",temp_train["user_id"].cast('float'))
temp_train = temp_train.withColumn("book_id",temp_train["book_id"].cast('float'))
temp_train = temp_train.withColumn("normalized_rating",temp_train["normalized_rating"].cast('float'))


temp_test = temp_test.withColumn("rating",temp_test["rating"].cast('float'))
temp_test = temp_test.withColumn("user_id",temp_test["user_id"].cast('float'))
temp_test = temp_test.withColumn("book_id",temp_test["book_id"].cast('float'))
temp_test = temp_test.withColumn("normalized_rating",temp_test["normalized_rating"].cast('float'))

In [20]:
success=0
while(success!=10):
    try:
        params = generate_random_params()
        _als = ALS(**params)
        model = _als.fit(temp_train)
        preds = model.transform(temp_test)
        evaluator = RegressionEvaluator(metricName="rmse",labelCol='normalized_rating',predictionCol='prediction')
        rmse = evaluator.evaluate(preds)
        print(params,rmse)
        success+=1
    except:
        print(params)
        print('error occured')

                                                                                

{'rank': 40, 'maxIter': 5, 'regParam': 0.25, 'userCol': 'user_id', 'itemCol': 'book_id', 'ratingCol': 'normalized_rating', 'coldStartStrategy': 'drop'} 0.28901522142748937


                                                                                

{'rank': 35, 'maxIter': 8, 'regParam': 0.3, 'userCol': 'user_id', 'itemCol': 'book_id', 'ratingCol': 'normalized_rating', 'coldStartStrategy': 'drop'} 0.5622663725592922


                                                                                

{'rank': 40, 'maxIter': 6, 'regParam': 0.35, 'userCol': 'user_id', 'itemCol': 'book_id', 'ratingCol': 'normalized_rating', 'coldStartStrategy': 'drop'} 0.6318426536230914


                                                                                

{'rank': 25, 'maxIter': 6, 'regParam': 0.25, 'userCol': 'user_id', 'itemCol': 'book_id', 'ratingCol': 'normalized_rating', 'coldStartStrategy': 'drop'} 0.2918207229181383


                                                                                

{'rank': 25, 'maxIter': 7, 'regParam': 0.25, 'userCol': 'user_id', 'itemCol': 'book_id', 'ratingCol': 'normalized_rating', 'coldStartStrategy': 'drop'} 0.5880215050376434


                                                                                

{'rank': 35, 'maxIter': 8, 'regParam': 0.35, 'userCol': 'user_id', 'itemCol': 'book_id', 'ratingCol': 'normalized_rating', 'coldStartStrategy': 'drop'} 0.5946380720307918


                                                                                

{'rank': 35, 'maxIter': 7, 'regParam': 0.2, 'userCol': 'user_id', 'itemCol': 'book_id', 'ratingCol': 'normalized_rating', 'coldStartStrategy': 'drop'} 0.24787438310595356


                                                                                

{'rank': 35, 'maxIter': 7, 'regParam': 0.25, 'userCol': 'user_id', 'itemCol': 'book_id', 'ratingCol': 'normalized_rating', 'coldStartStrategy': 'drop'} 0.5166064749943842


                                                                                

{'rank': 30, 'maxIter': 7, 'regParam': 0.25, 'userCol': 'user_id', 'itemCol': 'book_id', 'ratingCol': 'normalized_rating', 'coldStartStrategy': 'drop'} 0.531828887329914




{'rank': 40, 'maxIter': 6, 'regParam': 0.3, 'userCol': 'user_id', 'itemCol': 'book_id', 'ratingCol': 'normalized_rating', 'coldStartStrategy': 'drop'} 0.5632321325931182


                                                                                

***List of books in training that is not read by users***

In [49]:
user_read_books = temp_train.groupBy("user_id").agg(F.collect_set("book_id").alias("books_read"))
user_read_books.show()



+-------+--------------------+
|user_id|          books_read|
+-------+--------------------+
|    9.0|[7446.0, 1590.0, ...|
|    5.0|[7038.0, 7053.0, ...|
|    7.0|[7241.0, 7250.0, ...|
|    2.0|[1170.0, 1213.0, ...|
|    3.0|[493.0, 1376.0, 1...|
|    6.0|[7111.0, 704.0, 7...|
|    1.0|[704.0, 974.0, 97...|
|    8.0|[7364.0, 7405.0, ...|
|    4.0|[1622.0, 1368.0, ...|
|    0.0|[819.0, 930.0, 85...|
+-------+--------------------+



                                                                                

In [50]:
# Step 1: Get the list of all book_ids as books_list
books_list = temp_train.select("book_id").distinct()

# Step 2: Get the set difference between all books and the books a user has read
user_unread = books_list.crossJoin(user_read_books.select("user_id")).subtract(temp_train.select("book_id", "user_id").distinct())

# Step 3: For each user, select random 100 unread books
window_spec = Window.partitionBy('user_id').orderBy(F.rand())

user_unread = user_unread.withColumn('row_number', F.row_number().over(window_spec)) \
    .filter(F.col('row_number') <= 100) \
    .drop('row_number')

In [51]:
user_unread.show()

                                                                                

+-------+-------+
|book_id|user_id|
+-------+-------+
| 7169.0|    9.0|
| 1077.0|    9.0|
| 7270.0|    9.0|
| 1576.0|    9.0|
|  916.0|    9.0|
|  357.0|    9.0|
| 1122.0|    9.0|
|  555.0|    9.0|
|  126.0|    9.0|
| 5069.0|    9.0|
|  706.0|    9.0|
| 7405.0|    9.0|
| 2171.0|    9.0|
| 7086.0|    9.0|
| 7375.0|    9.0|
| 5036.0|    9.0|
| 3046.0|    9.0|
|  983.0|    9.0|
| 2369.0|    9.0|
|  356.0|    9.0|
+-------+-------+
only showing top 20 rows



In [52]:
user_unread_books = user_unread.groupBy("user_id").agg(F.collect_set("book_id").alias("books_unread"))

In [17]:
user_unread_books.show(1, vertical=True, truncate=False)

                                                                                

-RECORD 0----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 user_id      | 9.0                                                                                                                                                                                                 

### USING THE ALS MODEL TO PREDICT

In [53]:
params = {'rank': 35, 'maxIter': 7, 'regParam': 0.2, 'userCol': 'user_id', 'itemCol': 'book_id', 'ratingCol': 'normalized_rating', 'coldStartStrategy': 'drop'}
als = ALS(**params)
model = als.fit(temp_train)
final_predictions = model.transform(user_unread)

                                                                                

In [54]:
final_predictions.show(5)

                                                                                

+-------+-------+-----------+
|book_id|user_id| prediction|
+-------+-------+-----------+
| 1004.0|    1.0|  0.4390945|
| 1519.0|    1.0|        0.0|
| 1544.0|    1.0| 0.14924791|
| 5212.0|    1.0|0.032352816|
|  773.0|    1.0|  0.2550499|
+-------+-------+-----------+
only showing top 5 rows



### FINAL RECOMMENDATION - ALS

In [20]:
window_spec = Window.partitionBy("user_id").orderBy(col("prediction").desc())
final_predictions = final_predictions.withColumn("rank", F.rank().over(window_spec))

In [21]:
recommendations_als = final_predictions.filter(F.col("rank") <= 10)

In [23]:
recommendations_als.show()

                                                                                

+-------+-------+----------+----+
|book_id|user_id|prediction|rank|
+-------+-------+----------+----+
|  407.0|    9.0|0.41038376|   1|
| 1502.0|    9.0|0.36008602|   2|
| 1524.0|    9.0|0.36008602|   2|
| 1190.0|    9.0|0.33609986|   4|
| 1216.0|    9.0|0.22406659|   5|
| 4482.0|    9.0| 0.2077171|   6|
| 2787.0|    9.0| 0.2077171|   6|
| 3615.0|    9.0| 0.2077171|   6|
| 6188.0|    9.0| 0.2077171|   6|
| 5037.0|    9.0| 0.2077171|   6|
| 6503.0|    9.0| 0.2077171|   6|
| 5931.0|    9.0| 0.2077171|   6|
| 6776.0|    9.0| 0.2077171|   6|
| 7468.0|    5.0|0.19432098|   1|
| 1034.0|    5.0|0.17359444|   2|
|  941.0|    5.0|0.16222112|   3|
| 1091.0|    5.0| 0.1615011|   4|
| 1086.0|    5.0| 0.1615011|   4|
| 1152.0|    5.0| 0.1615011|   4|
| 1218.0|    5.0| 0.1615011|   4|
+-------+-------+----------+----+
only showing top 20 rows



# POPULARITY BASED RECOMMENDATION

In [2]:
metadata_df = spark.read.json('gs://msca-bdp-project-goodreads/goodreads_books.json')

                                                                                

In [25]:
metadata_df.show(1,vertical=True)

23/11/24 20:55:28 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


-RECORD 0------------------------------------
 asin                 |                      
 authors              | [{604031, }]         
 average_rating       | 4.00                 
 book_id              | 5333265              
 country_code         | US                   
 description          |                      
 edition_information  |                      
 format               | Paperback            
 image_url            | https://images.gr... 
 is_ebook             | false                
 isbn                 | 0312853122           
 isbn13               | 9780312853129        
 kindle_asin          |                      
 language_code        |                      
 link                 | https://www.goodr... 
 num_pages            | 256                  
 popular_shelves      | [{3, to-read}, {1... 
 publication_day      | 1                    
 publication_month    | 9                    
 publication_year     | 1984                 
 publisher            | St. Martin

In [26]:
metadata_df = metadata_df.withColumn('average_rating',F.col('average_rating').cast('float'))
metadata_df = metadata_df.withColumn('ratings_count',F.col('ratings_count').cast('float'))

In [27]:
metadata_df = metadata_df.withColumn('weighted_rating',F.col('average_rating')*F.col('ratings_count'))

In [28]:
metadata_df.select('title','ratings_count','average_rating','weighted_rating').orderBy(F.col('weighted_rating').desc()).show(10,truncate=False)



+-----------------------------------------------------------+-------------+--------------+---------------+
|title                                                      |ratings_count|average_rating|weighted_rating|
+-----------------------------------------------------------+-------------+--------------+---------------+
|The Hunger Games (The Hunger Games, #1)                    |4899965.0    |4.34          |2.1265848E7    |
|Harry Potter and the Sorcerer's Stone (Harry Potter, #1)   |4765497.0    |4.45          |2.120646E7     |
|Twilight (Twilight, #1)                                    |3941381.0    |3.57          |1.407073E7     |
|To Kill a Mockingbird                                      |3255518.0    |4.26          |1.3868507E7    |
|The Great Gatsby                                           |2758812.0    |3.89          |1.0731779E7    |
|The Fault in Our Stars                                     |2429317.0    |4.26          |1.0348891E7    |
|The Hobbit                          

                                                                                

## CONTENT BASED RECOMMENDATION 

In [5]:
metadata_df = metadata_df.withColumn("genre", (F.rand() * 10 + 1).cast("int"))

In [36]:
limit_df = limit_df.join(metadata_df.select('book_id','genre'),on='book_id')

In [37]:
limit_df.show(5)

[Stage 68:>                                                         (0 + 1) / 1]

+-------+-------+------+-----------------+-----+
|book_id|user_id|rating|normalized_rating|genre|
+-------+-------+------+-----------------+-----+
|    691| 100010|     2|              0.0|    5|
| 327548| 100537|     3|              0.6|    8|
|  45157| 100537|     2|              0.4|    9|
|  47984| 100537|     4|              0.8|    9|
|    691| 100576|     5|              1.0|    5|
+-------+-------+------+-----------------+-----+
only showing top 5 rows



                                                                                

In [56]:
user_genre_rating = limit_df.groupby('user_id','genre').agg(F.avg("normalized_rating").alias("genre_rating"))

In [57]:
user_genre_rating.show(5)

[Stage 340:>                                                        (0 + 1) / 1]

+-------+-----+------------------+
|user_id|genre|      genre_rating|
+-------+-----+------------------+
| 100010|    2|               1.0|
| 100010|    8|0.6666666666666666|
| 100010|    6|0.6666666666666666|
| 100010|    7|0.6666666666666666|
| 100010|    5|               0.0|
+-------+-----+------------------+
only showing top 5 rows



                                                                                

## FINAL RECOMMENDATIONS

In [62]:
final_recommendations = final_predictions.join(metadata_df.select('book_id','genre'),on='book_id').join(user_genre_rating,on=['user_id','genre'])

In [63]:
final_recommendations.show(5)



+-------+-----+-------+----------+------------------+
|user_id|genre|book_id|prediction|      genre_rating|
+-------+-----+-------+----------+------------------+
|    7.0|    4| 5971.0|-0.0502137|0.6666666666666666|
|    7.0|    4|  402.0|       0.0|0.6666666666666666|
|    7.0|    4| 1169.0|       0.0|0.6666666666666666|
|    7.0|    4| 1430.0| 0.5380945|0.6666666666666666|
|    7.0|    4|  936.0| 0.2988092|0.6666666666666666|
+-------+-----+-------+----------+------------------+
only showing top 5 rows



                                                                                

In [64]:
final_recommendations = final_recommendations.withColumn('overall_rating',0.7*F.col('prediction')+0.3*F.col('genre_rating'))

In [65]:
final_recommendations.show(5)

                                                                                

+-------+-----+-------+----------+------------------+-------------------+
|user_id|genre|book_id|prediction|      genre_rating|     overall_rating|
+-------+-----+-------+----------+------------------+-------------------+
|    7.0|    4| 1169.0|       0.0|0.6666666666666666|0.19999999999999998|
|    7.0|    4| 1633.0|0.40357092|0.6666666666666666| 0.4824996441602707|
|    7.0|    4|  820.0| 0.2988092|0.6666666666666666|0.40916644036769867|
|    7.0|    4|  936.0| 0.2988092|0.6666666666666666|0.40916644036769867|
|    7.0|    4|  909.0| 0.2988092|0.6666666666666666|0.40916644036769867|
+-------+-----+-------+----------+------------------+-------------------+
only showing top 5 rows



In [72]:
window_spec = Window.partitionBy("user_id").orderBy(F.col("overall_rating").desc())
final_recommendations = final_recommendations.withColumn("rank", F.rank().over(window_spec))

In [73]:
final_recommendations = final_recommendations.filter(F.col("rank") <= 10).select('user_id','book_id','overall_rating','rank')

In [74]:
final_recommendations.filter(F.col('user_id')==7.0).show()



+-------+-------+------------------+----+
|user_id|book_id|    overall_rating|rank|
+-------+-------+------------------+----+
|    7.0| 1471.0|0.7399285018444061|   1|
|    7.0| 1539.0|0.6766661643981933|   2|
|    7.0|  423.0|0.5614580661058426|   3|
|    7.0|  457.0|0.5614580661058426|   3|
|    7.0|  764.0|0.5614580661058426|   3|
|    7.0|  765.0|0.5614580661058426|   3|
|    7.0|  912.0|0.5614580661058426|   3|
|    7.0|  344.0|0.5614580661058426|   3|
|    7.0|  947.0|0.5476091557741165|   9|
|    7.0| 6548.0|0.5466070127487183|  10|
+-------+-------+------------------+----+



                                                                                

In [75]:
final_recommendations.write.format("bigquery").option("temporaryGcsBucket","msca-bdp-project-goodreads").option("table", "msca-bdp-student-ap.Goodreads_Project.Reviews_Summarized").mode("overwrite").save()

                                                                                ]