**2020/21**

# Recommender Systems
This lecture is primarily about recommender systems. Also, we highlight the usefulness of Spark SQL, particularly when it relates to persistent tables.

# Spark SQL



As mentioned in the initial lectures, Spark SQL is a Spark module for structured data processing. It works alongside the APIs of DataFrame and Dataset and it is responsible for performing extra optimizations. 
We can also execute SQL queries and reading data from various files formats an Hive tables. (Apache Hive can manage large datasets residing in distributed storage using SQL) 

Further details can be found in https://spark.apache.org/docs/latest/sql-programming-guide.html and 
https://spark.apache.org/docs/latest/api/sql/index.html

We can check the reference guide for Structured Query Language (SQL) which includes syntax, semantics, keywords, and examples for common SQL usage.

# Exercise

This exercise is about to build a **recommender system of books**, with focus on the recommendation model itself.

The functional requirements for the Spark program we want to create are as follows:


1. To load the dataset of interest and perform exploratory analysis, then store the information, including as SQL tables.
2. To create a recommendation model supported by the Spark's ALS algorithm
3. To pre-compute recommendations and store them as SQL tables.
4. To show recommendations of interest.

## Context

The data we are processing is from the dataset **Book-Crossing**. As stated in the website from where it can be downloaded, http://www2.informatik.uni-freiburg.de/~cziegler/BX/, the BookCrossing (BX) dataset was collected by Cai-Nicolas Ziegler in a 4-week crawl (August / September 2004) from the Book-Crossing community with kind permission from Ron Hornbaker, CTO of Humankind Systems. It contains 278,858 users (anonymized but with demographic information) providing 1,149,780 ratings (explicit / implicit) about 271,379 books.

The `Book-Crossing` dataset comprises 3 tables, as follows:

- **BX-Users**.
Contains the users. Note that user IDs (`User-ID`) have been anonymized and map to integers. Demographic data is provided (`Location`, `Age`) if available. Otherwise, these fields contain NULL-values.

- **BX-Books**.
Books are identified by their respective ISBN. Invalid ISBNs have already been removed from the dataset. Moreover, some content-based information is given (`Book-Title`, `Book-Author`, `Year-Of-Publication`, `Publisher`), obtained from Amazon Web Services. Note that in case of several authors, only the first is provided. URLs linking to cover images are also given, appearing in three different flavours (`Image-URL-S`, `Image-URL-M`, `Image-URL-L`), i.e., small, medium, large. These URLs point to the Amazon web site.

- **BX-Book-Ratings**.
Contains the book rating information. Ratings (`Book-Rating`) are either explicit, expressed on a scale from 1-10 (higher values denoting higher appreciation), or implicit, expressed by 0.


The columns are separated by `;` and all files contain the correspondent header.

## Initial setup

In [None]:
# basic imports 

import os # OS e.g directory structure
import sys
import numpy as np # linear algebra
import scipy as sc  # scientific computing
import pandas as pd # data processing, file I/O
import seaborn as sns  # visualization
import matplotlib.pyplot as plt # visualization
import warnings
warnings.filterwarnings("ignore")

In [None]:
# Spark related imports

from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import *
from pyspark.sql.types import *

from pyspark.ml.feature import StringIndexer
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

## Data ingestion
To load the dataset of interest and perform exploratory analysis, then store the information in a SQL table

### Loading the dataset

In [None]:
! pwd 
! ls -la
! head -n 3 BX-Users.csv
! tail -n 3 BX-Users.csv
! head -n 3 BX-Books.csv
! tail -n 3 BX-Books.csv
! head -n 3 BX-Book-Ratings.csv
! tail -n 3 BX-Book-Ratings.csv

In [None]:
# read the dataset
# this time we will be using pyspark.sql.DataFrameReader.csv

df_users = spark.read.csv("BX-Users.csv", header="true", inferSchema="true", sep=";")
df_books = spark.read.csv("BX-Books.csv", header="true", inferSchema="true", sep=";")
df_ratings = spark.read.csv("BX-Book-Ratings.csv", header="true", inferSchema="true", sep=";")

### Exploratory data analysis

Let us get some insight from data

In [None]:
# check the users, both schema and some content

df_users.printSchema()
df_users.count()

In [None]:
df_users.show(100, truncate=False)

In [None]:
# check the books, both schema and some content

df_books.printSchema()
df_books.count()

In [None]:
df_books.show(5, truncate=False)

In [None]:
# check the ratings, both schema and some content

df_ratings.printSchema()
df_ratings.count()

In [None]:
df_ratings.show(5, truncate=False)

### Data cleasing/preparation
Based on the initial reading of data ...

In [None]:
# check Age column



There are a lot of NULL (as string) in the Age column, a little bit less than half. So, we may

- replace the NULLs with the average of others
- drop the column Age in case we can live without it
- delete the records with NULL in the column Age

It is an open discussion ... 

In [None]:
df_users = 

In [None]:
# checking nulls

[df_users.dropna().count(), df_books.dropna().count(), df_ratings.dropna().count()]

In [None]:
[df_users.count(), df_books.count(), df_ratings.count()]

In [None]:
# summary of basic statistics about numerical columns with describe()

df_books.
df_ratings.

In [None]:
# 2004 was when data was collected

df_books.select
df_books.select

In [None]:
# check prior to 1900

df_books.select
df_books.select

In [None]:
# check with different years of publication ... 0, 1378, 2031, etc.

df_books.select('Book-Title', 'Book-Author', 'Year-Of-Publication').where(col('Year-Of-Publication')==0).show()


Again, we may take cleansing data further ... we leave it as an exercise!

In [None]:
# we will drop some columns anyway

df_books = df_books.drop

But there is a problem with ISBN: we need it as a number so we can use it to build the model!

We are going to use StringIndexer for that matter.

In [None]:
indexer = StringIndexer(inputCol="ISBN", outputCol="ISBN-Index")
df_ratings_indexed = indexer.fit(df_ratings).transform(df_ratings)
df_ratings_indexed.select('ISBN','ISBN-Index').show(truncate=False)

Once data is clean, save it to files so we can use them to build models when we want to.

In [None]:
output_users = "users.parquet"

df_users.write.mode("overwrite").parquet(output_users)

In [None]:
output_books = "books.parquet"

df_books.write.mode("overwrite").parquet(output_books)

In [None]:
output_ratings = "ratings.parquet"

df_ratings_indexed.write.mode("overwrite").parquet(output_ratings)

Also, save them as persistent tables into Hive metastore

**Notice** 

- An existing Hive deployment is not necessary to use this feature. Spark will take care of it.
- We can create a SQL table from a DataFrame with *createOrReplaceTempView* command, valid for the session. (there is also the option of global temporary views, to be shared among all sessions till the Spark application terminates)
- But with *saveAsTable*, there will be a pointer to the data in the Hive metastore. So persistent tables will exist even after the Spark program has restarted, as long as connection is maintained to the same metastore. 

See details in http://spark.apache.org/docs/latest/sql-data-sources.html

In [None]:
df_users.write.mode("overwrite").saveAsTable("UsersTable")
df_books.write.mode("overwrite").saveAsTable("BooksTable")
df_ratings_indexed.write.mode("overwrite").saveAsTable("RatingsTable")

In [None]:
# get rid of dataframes no longer needed

del df_ratings

## Recommendation model

In order to create the recommendation model, we will use the Alternating Least Squares (ALS) algorithm provided by Spark MLlib. See details in 
http://spark.apache.org/docs/latest/ml-collaborative-filtering.html, as we advise to check the main assumptions the implemented algorithm relies upon. For example, notice that:

- it underlies a collaborative filtering strategy;
- it aims to fill in the missing entries of a user-item association matrix, in which users and items are described by a small set of latent factors that can be used to predict missing entries. The latent factors are learned by the ALS algorithm.

### Training/testing data split

In [None]:
dftrain, dftest = df_ratings_indexed.randomSplit([0.8, 0.2], 42)

# caching data ... but just the train
dftrain.cache()

# print the number of rows in each part
[dftrain.count(), dftest.count(), df_ratings_indexed.count()]

In [None]:
# recalling the schema of training data

dftrain.printSchema()

### Model setting

In [None]:
# build the recommendation model using ALS on the training data
# note that we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics

als = ALS(maxIter=5, 
          regParam=0.01, 
          userCol="User-ID", 
          itemCol="ISBN-Index", 
          ratingCol="Book-Rating",
          coldStartStrategy="drop")

# if the rating matrix is derived from another source of information 
# (i.e. it is inferred from other signals), we may set implicitPrefs 
# to True to get better results (see ALS reference)

# fit the model

model = als.fit(dftrain)

In [None]:
# evaluate the model by computing the RMSE on the test data

predictions = model.transform(dftest)
evaluator = RegressionEvaluator(metricName="rmse", 
                                labelCol="Book-Rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

print("Root-mean-square error = " + str(rmse))

In [None]:
# save the ALS model for further use if required

modelpath = "ALSmodel"
model.write().overwrite().save(modelpath)


In [None]:
# just checking the files

! ls -la ./ALSmodel

### Pre-computing recommendations and store as persistent tables

In [None]:
# get all users and books

users = df_ratings_indexed.
books = df_ratings_indexed.

In [None]:
users.show()
books.show()
[users.count(), books.count()]

In [None]:
# generate top book recommendations for each user

n_topbooks = 5
user_recs = model.recommendForAllUsers(n_topbooks)

# generate top user recommendations for each book

#n_topusers = 5
# book_recs = model.recommendForAllItems(n_topusers)

In [None]:
user_recs.show()
# book_recs.show()

Save the recommendations as persistent tables into the Hive metastore

In [None]:
user_recs.write.mode("overwrite").saveAsTable("UserRecommendationsTable")
# book_recs.write.mode("overwrite").saveAsTable("BookRecommendationsTable")

## Results

1. Give a user, shows the recommended list of books.
2. Given a book, shows the list of users might be interested on.

In [None]:
user = 1238
# book = 

First, let use check the SQL tables

In [None]:
# register information about users and books as SQL temporary views

df_users.
df_books.

In [None]:
print(spark.catalog.listDatabases())

In [None]:
spark.catalog.listTables

In [None]:
# use managed tables

spark.sql("USE default")


In [None]:
spark.catalog.listColumns

In [None]:
spark.sql("SELECT * FROM userrecommendationstable").show(10, truncate=False)

In [None]:
# it is not managed so ...

# spark.catalog.listColumns('users')

In [None]:
spark.sql("SELECT * FROM users").show(10, truncate=False)

In [None]:
spark.sql("SELECT * FROM books").show(10, truncate=False)

In [None]:
print("The recommended books for user ... ")

# Additional exercise

Given the current status of this notebook, redo its content such that major tasks are split into various notebooks, ou Python modules. The purpose is to modularize code having in mind the setup of a *real* recommender system. That is:

- A downloader module, focussing on downloading data, cleasing it, and then storing it in a data store. 
- A recommender module, to create a recommendation module and to pre-compute recommendations in order to save them a data store.
- A recommender server, to retrieve recommendations upon queries made to the data store. 
  


# References

* Learning Spark - Lightning-Fast Data Analytics, 2nd Ed. J. Damji, B. Wenig, T. Das, and D. Lee. O'Reilly, 2020
* Spark: The Definitive Guide - Big Data Processing Made Simple, 1st Ed. B. Chambers and M. Zaharia. O'Reilly, 2018
* http://spark.apache.org/docs/latest/ml-guide.html
* https://docs.python.org/3/