In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

In [2]:
!pip install numpy --quiet

[0m

In [3]:
spark = SparkSession\
        .builder\
        .appName("book-recs")\
        .master("spark://spark-master:7077")\
        .config("spark.executor.memory", "512m")\
        .getOrCreate()

23/11/11 18:33:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
users_schema = StructType([
    StructField('user_id', IntegerType(), True),
    StructField('location', StringType(), True), 
    StructField('age', FloatType(), True),
    StructField('_corrupt_record', StringType(), True)
])

users_df = spark.read.csv(
    path='data/Users.csv', 
    schema=users_schema,
    mode='PERMISSIVE',
    columnNameOfCorruptRecord='_corrupt_record',
    escape='"'
).cache()

In [None]:
corrupt_user_records = users_df.filter(~users_df._corrupt_record.isNull())
corrupt_user_records.show()

[Stage 0:>                                                          (0 + 1) / 1]

In [None]:
print(f'Number of corrupt records to drop: {corrupt_user_records.count()}')
users_df = users_df.filter(users_df._corrupt_record.isNull())
users_df = users_df.drop('_corrupt_record')
users_df.unpersist();

In [None]:
books_schema = StructType([
    StructField('isbn', StringType(), True),
    StructField('book_title', StringType(), True), 
    StructField('book_author', StringType(), True),
    StructField('year_of_publication', IntegerType(), True),
    StructField('publisher', StringType(), True),
    StructField('image_url_s', StringType(), True),
    StructField('image_url_m', StringType(), True),
    StructField('image_url_l', StringType(), True),
    StructField('_corrupt_record', StringType(), True)
])

books_df = spark.read.csv(
    path='data/Books.csv', 
    schema=books_schema,
    mode='PERMISSIVE',
    columnNameOfCorruptRecord='_corrupt_record',
    escape='"',
).cache()

In [None]:
corrupt_book_records = books_df.filter(~books_df._corrupt_record.isNull())
corrupt_book_records.show()

In [None]:
print(f'Number of corrupt records to drop: {corrupt_book_records.count()}')
books_df = books_df.filter(books_df._corrupt_record.isNull())
books_df = books_df.drop('_corrupt_record')
books_df.unpersist();

In [None]:
ratings_schema = StructType([
    StructField('user_id', IntegerType(), True),
    StructField('isbn', StringType(), True), 
    StructField('book_rating', IntegerType(), True),
    StructField('_corrupt_record', StringType(), True)
])

ratings_df = spark.read.csv(
    path='data/Ratings.csv', 
    schema=ratings_schema,
    mode='PERMISSIVE',
    columnNameOfCorruptRecord='_corrupt_record',
).cache()

In [None]:
corrupt_rating_records = ratings_df.filter(~ratings_df._corrupt_record.isNull())
corrupt_rating_records.show()

In [None]:
print(f'Number of corrupt records to drop: {corrupt_rating_records.count()}')
ratings_df = ratings_df.filter(ratings_df._corrupt_record.isNull())
ratings_df = ratings_df.drop('_corrupt_record')
ratings_df.unpersist();

First let's check if there are duplicate values in the dataframes.

In [None]:
print(f'Dupliactes in users_df: {users_df.distinct().count() != users_df.count()}')
print(f'Dupliactes in books_df: {books_df.distinct().count() != books_df.count()}')
print(f'Dupliactes in ratings_df: {ratings_df.distinct().count() != ratings_df.count()}')

Now let's count missing values.

In [None]:
from pyspark.sql.functions import when, count, col

for df in users_df, books_df, ratings_df:
    df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [None]:
books_df.filter(books_df['book_author'].isNull()).show()

In [None]:
no_book_author_isbns = books_df.filter(books_df['book_author'].isNull()).select('isbn').collect()
ratings_df.filter(ratings_df.isbn.isin([row[0] for row in no_book_author_isbns])).count()

Book with book_author missing was rated once. 

In [None]:
books_df.filter(books_df['publisher'].isNull()).show()

In [None]:
no_publisher_isbns = books_df.filter(books_df['publisher'].isNull()).select('isbn').collect()
ratings_df.filter(ratings_df.isbn.isin([row[0] for row in no_publisher_isbns])).count()

Books with publisher missing were rated a total of two times. 

In [None]:
books_df = books_df.na.fill('Unknown')

In [None]:
books_df = books_df.drop('image_url_s', 'image_url_m', 'image_url_l')

In [None]:
from pyspark.sql.functions import countDistinct

print('Distinct isbn values in books_df:')
books_df.agg(countDistinct(col("isbn"))).show()

print('Distinct isbn values in ratings_df:')
ratings_df.agg(countDistinct(col("isbn"))).show()

# Popularity-based recommender system

In [None]:
df = ratings_df.join(users_df, on='user_id', how='left')
df = df.join(books_df, on='isbn', how='left')

In [None]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [None]:
class PopularityBasedRecSys:
    
    def __init__(self, n_recs=5):
        self.n_recs = n_recs
        self.recs = None
        
    def fit(self, df):
#         self.recs = df.\
#                     groupBy('isbn').agg(count('isbn').alias('popularity')).\
#                     orderBy('popularity', ascending=False)
        df.createOrReplaceTempView('data')
        self.recs = spark.sql('''SELECT COUNT(isbn) AS popularity, isbn, book_title, book_author
                                 FROM data
                                 GROUP BY isbn, book_title, book_author
                                 ORDER BY COUNT(isbn) DESC''')
       
    def predict(self):
        return self.recs.limit(self.n_recs)


pop_recsys = PopularityBasedRecSys(n_recs=10)
pop_recsys.fit(df)
book_recs = pop_recsys.predict()
book_recs.show()

In [None]:
books_df.filter(books_df.isbn == '0679781587').show()

Since 0679781587 isbn is not present in books_df, book_title and book_author will not be present in the resulting data frame of recommendations.

In [None]:
class HighestRatedPopularityBasedRecSys(PopularityBasedRecSys):
    
    def __init__(self, min_num_ratings=100, **kwargs):
        super().__init__(**kwargs)
        self.min_num_ratings = min_num_ratings
        
    def fit(self, df):
        df.createOrReplaceTempView('data')
        self.recs = spark.sql(f'''SELECT isbn, ROUND(AVG(book_rating), 2) AS popularity, book_title, book_author
                                  FROM data
                                  GROUP BY isbn, book_title, book_author
                                  HAVING COUNT(isbn) > {self.min_num_ratings}
                                  ORDER BY popularity DESC''')
        
        
highest_rated_pop_recsys = HighestRatedPopularityBasedRecSys(n_recs=15)
highest_rated_pop_recsys.fit(df)
highest_rated_book_recs = highest_rated_pop_recsys.predict()
highest_rated_book_recs.show()

# Model-based collaborative filtering

In [None]:
ratings_df.printSchema()

In [None]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol='isbn', outputCol='isbn_indexed')
ratings_df = indexer.fit(ratings_df).transform(ratings_df)

In [None]:
ratings_df.show(5)

In [None]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator

train, test = ratings_df.randomSplit([0.8, 0.2])

als = ALS(
    userCol='user_id',
    itemCol='isbn_indexed',
    ratingCol='book_rating',
    nonnegative=True,
    coldStartStrategy='drop'
)

evaluator = RegressionEvaluator(labelCol='book_rating')

params = ParamGridBuilder()\
         .addGrid(als.rank, [10, 20,]).build()
#          .addGrid(als.maxIter, [10, 15, 20])\
#          .addGrid(als.regParam, [0.01, 0.1, 0.5])\
         

tvs = TrainValidationSplit(
    estimator=als,
    estimatorParamMaps=params,
    evaluator=evaluator
)

model = tvs.fit(train)
preds = model.transform(test)
rmse = evaluator.evaluate(preds)