# Amazon Books Recommender System

# Setting up Spark Modules

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://apache.osuosl.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

# Working with drive
 - load data 
 - data preparation and analysis

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


# Spark dependencies 

In [None]:
import findspark
findspark.init()

import pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession, Row
from pyspark.sql.functions import udf, col, when
from pyspark.sql import SparkSession, functions as F 
from pyspark.sql.functions import countDistinct
from pyspark.sql.types import StringType
from pyspark.ml.feature import Tokenizer, RegexTokenizer, CountVectorizer, IDF
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import pipeline
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.feature import IDF

from sklearn.metrics.pairwise import linear_kernel
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer


import numpy as np


In [None]:
spark = pyspark.sql.SparkSession.builder\
                .master("local[*]")\
                .getOrCreate()
sc =SparkContext.getOrCreate() 
sqlContext = SQLContext(sc)

# Load data and cleaning


In [None]:
ratings_data = spark.read.csv("/content/drive/Shared drives/IDS 561 - Big Data /Project/Data/Amazon data/ratings.csv",
                              header = True,
                              inferSchema = True)

books_data = spark.read.csv("/content/drive/Shared drives/IDS 561 - Big Data /Project/Data/Amazon data/books.csv",
                              header = True,
                              inferSchema = True)

In [None]:
import pandas as pd

def count_column_types(spark_df):
    """Count number of columns per type"""
    return pd.DataFrame(spark_df.dtypes).groupby(1, as_index=False)[0].agg({'count':'count', 'names': lambda x: " | ".join(set(x))}).rename(columns={1:"type"})

In [None]:
ratings_data.count()

981756

In [None]:
books_data.count()

10000

In [None]:
ratings_data.show(1)

+-------+-------+------+
|book_id|user_id|rating|
+-------+-------+------+
|      1|    314|     5|
+-------+-------+------+
only showing top 1 row



In [None]:
books_data.show(1,False)

+---+-------+------------+-------+-----------+---------+----------------+---------------+-------------------------+----------------+---------------------------------------+-------------+--------------+-------------+------------------+-----------------------+---------+---------+---------+---------+---------+----------------------------------------------------------+----------------------------------------------------------+
|id |book_id|best_book_id|work_id|books_count|isbn     |isbn13          |authors        |original_publication_year|original_title  |title                                  |language_code|average_rating|ratings_count|work_ratings_count|work_text_reviews_count|ratings_1|ratings_2|ratings_3|ratings_4|ratings_5|image_url                                                 |small_image_url                                           |
+---+-------+------------+-------+-----------+---------+----------------+---------------+-------------------------+----------------+--------------

In [None]:
count_column_types(ratings_data)   ### function to check the columns type

Unnamed: 0,type,count,names
0,int,3,rating | book_id | user_id


In [None]:
count_column_types(books_data)

Unnamed: 0,type,count,names
0,double,3,isbn13 | ratings_1 | original_publication_year
1,int,9,ratings_2 | ratings_4 | book_id | work_id | ra...
2,string,11,isbn | image_url | original_title | ratings_co...


In [None]:
#### making sure the book_id present in the ratings_data also exists in books_data. Otherwise the collaborative filtering gives recommendations whose book title can't be found
ratings_data = ratings_data.join(books_data, ratings_data.book_id == books_data.book_id, 'inner').select(books_data.book_id,"user_id","rating")

In [None]:
ratings_data.show(2)

+-------+-------+------+
|book_id|user_id|rating|
+-------+-------+------+
|      1|    314|     5|
|      1|    439|     3|
+-------+-------+------+
only showing top 2 rows



In [None]:
# data split
train_data, test_data = ratings_data.randomSplit([0.8, 0.2])

In [None]:
train_data.count(), test_data.count()

(63814, 15887)

## Popularity based recommendation

In [None]:
ratings_data.show(2)

+-------+-------+------+
|book_id|user_id|rating|
+-------+-------+------+
|      1|    314|     5|
|      1|    439|     3|
+-------+-------+------+
only showing top 2 rows



In [None]:
"""
Popularity based recommendation
For book present we found out the number of users who read the book and the average rating they gave.
This is not that personalized recommendation option but is especially useful if a new user arrives and wants some recommendation on the books
"""
popular = ratings_data.groupBy("book_id").agg(F.countDistinct("user_id"), F.avg("rating")).withColumnRenamed('count(user_id)', 'Count_of_Users')
popular.show(1)

+-------+--------------+-----------+
|book_id|Count_of_Users|avg(rating)|
+-------+--------------+-----------+
|   1591|           100|       4.01|
+-------+--------------+-----------+
only showing top 1 row



In [None]:
details = popular.join(books_data, popular.book_id == books_data.book_id ,'inner')\
          .drop(books_data.book_id).select("book_id","Count_of_Users","avg(rating)", "original_title")
details.orderBy(F.col("Count_of_Users").desc(), F.col("avg(rating)").desc()).select("original_title","Count_of_Users","avg(rating)").show(10,False)

+----------------------------------------------------------------------+--------------+-----------+
|original_title                                                        |Count_of_Users|avg(rating)|
+----------------------------------------------------------------------+--------------+-----------+
|The Beautiful and Damned                                              |100           |4.66       |
|The Taste of Home Cookbook                                            |100           |4.55       |
|A People's History of the United States: 1492 to Present              |100           |4.54       |
|Girl with a Pearl Earring                                             |100           |4.53       |
|Deception Point                                                       |100           |4.5        |
|The Curious Incident of the Dog in the Night-Time                     |100           |4.48       |
|The Last Juror                                                        |100           |4.47       |


## Collaborative filtering using ALS model
The most important kind of recommender system is collaborative filtering based approach. Let’s say you know a friend who has the same taste as you because you both love psychology, then you might like reading other books that your friend has read but you haven’t. This is the sole concept behind collaborative filtering. Hence it provides a more personalized touch.

Collaborative filtering can be easily achieved by matrix factorization techniques like Singular Value decomposition where a user-rating matrix is decomposed into the user-concept matrix, concept-weights matrix, and rating-concept matrix. Concepts are basically latent or hidden factors that the matrix decomposition implicitly generates.

Most of the matrix factorization techniques like Singular Value decomposition don’t know how to deal with an incomplete/sparse matrix which means having empty values in the user-rating matrix. 
Recent methods like Alternating Least square don’t suffer from these fallbacks. They suggest modeling directly the observed ratings while avoiding overfitting through a regularized model.

In [None]:
def ALS_model(rank_value, iterations, step):
  model_coldstart = ALS(rank = rank_value,maxIter=iterations,regParam=step, userCol="user_id", itemCol="book_id", ratingCol="rating", nonnegative = True, coldStartStrategy="drop", seed = 11)
  model = model_coldstart.fit(train_data)
  return model

In [None]:
# get predictions
iterations = 5 
step = 0.1  # regularization parameter 
rank_value = 10 ## by default # rank - Latent features

# error initiation 
error_rate = [] 
loss_error = 0 
base_model = ALS_model(rank_value, iterations, step)
predict_train = base_model.transform(train_data)
predict_test = base_model.transform(test_data)

In [None]:
predict_train.show(5)

+-------+-------+------+----------+
|book_id|user_id|rating|prediction|
+-------+-------+------+----------+
|   1591|  35982|     5|  4.103718|
|   1591|  19526|     3|  3.343424|
|   1591|  43689|     4| 3.7329583|
|   1591|  15161|     4| 3.8242831|
|   1591|   6213|     4| 3.5726175|
+-------+-------+------+----------+
only showing top 5 rows



In [None]:
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='rating',metricName='rmse')

In [None]:
rmse_train = evaluator.evaluate(predict_train)
rmse_test = evaluator.evaluate(predict_test)

In [None]:
print(rmse_train,rmse_test)

0.32334890238629627 1.4280916075496217


**Cross validation ALS**

In [None]:
model_cv = ALS(userCol="user_id", itemCol="book_id", ratingCol="rating", nonnegative = True, coldStartStrategy= "drop", seed = 11)

In [None]:
import time
start = time.time()
# Parameters for tuning
paramGrid = ParamGridBuilder().addGrid(model_cv.maxIter, [10]).addGrid(model_cv.regParam, [0.1, 1, 10]).addGrid(model_cv.rank, [10,12]).build()

crossvalidation = CrossValidator(estimator = model_cv,
                     estimatorParamMaps = paramGrid,
                     evaluator = evaluator,
                     numFolds=5)

# Run cross-validation, and choose the best set of parameters.
Best_model = crossvalidation.fit(train_data).bestModel
(time.time() - start)/60

6.629543689886729

In [None]:
### Best model Rmse
# get predictions
predict_train_bm = Best_model.transform(train_data)
predict_test_bm = Best_model.transform(test_data)
rmse_train_bm = evaluator.evaluate(predict_train_bm)
rmse_test_bm = evaluator.evaluate(predict_test_bm)

In [None]:
print(rmse_train_bm,rmse_test_bm)

0.2558271264916035 1.236640618117528


In [None]:
Best_model

ALSModel: uid=ALS_44068ea58d9d, rank=12

In [None]:
top_10_bookid = Best_model.recommendForAllUsers(10)

In [None]:
top_10_bookid.show(5,False)

+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user_id|recommendations                                                                                                                                                                           |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|148    |[[1823, 3.6361318], [1519, 3.6086044], [6310, 3.5921187], [9712, 3.5328784], [840, 3.5204031], [7728, 3.515255], [7073, 3.500424], [5367, 3.4896665], [2956, 3.4867005], [378, 3.4820368]]|
|496    |[[8966, 3.8918335], [7069, 3.8009326], [2095, 3.7020848], [8087, 3.6697586], [475, 3.66822], [29, 3.6594186], [9784, 3.6126285], [5752, 3.61161], [6425, 3.6094944], [10, 3.5703354]]     |
|833    |[[5084

In [None]:
top10_userid = Best_model.recommendForItemSubset(books_data, 10)
top10_userid.select("book_id","recommendations.user_id").show(5)

+-------+--------------------+
|book_id|             user_id|
+-------+--------------------+
|   4900|[7120, 29240, 301...|
|   1591|[32226, 24335, 25...|
|   2122|[15159, 43083, 53...|
|   2142|[9675, 37700, 221...|
|   7993|[34992, 24341, 28...|
+-------+--------------------+
only showing top 5 rows



In [None]:
nrecommend = top_10_bookid.withColumn("rec_exp", F.explode("recommendations")).select('user_id','rec_exp.book_id', 'rec_exp.rating')

In [None]:
nrecommend.show(10, False)

+-------+-------+---------+
|user_id|book_id|rating   |
+-------+-------+---------+
|148    |1823   |3.6361318|
|148    |1519   |3.6086044|
|148    |6310   |3.5921187|
|148    |9712   |3.5328784|
|148    |840    |3.5204031|
|148    |7728   |3.515255 |
|148    |7073   |3.500424 |
|148    |5367   |3.4896665|
|148    |2956   |3.4867005|
|148    |378    |3.4820368|
+-------+-------+---------+
only showing top 10 rows



In [None]:
top_10_bookid = top_10_bookid.toPandas()

In [None]:
# Data Display                                                                  
for i in range(len(top_10_bookid)):
    rec = top_10_bookid.iloc[i, 1]
    top10_new = []
    top10_book_id = 0
    for j in range(len(rec)):
        lst = rec[j]
        top10_new.append(str(lst[0]))
    book_ids = ','.join(top10_new)
    top_10_bookid.loc[i, 'recommendations'] = book_ids

In [None]:
top_10_bookid.head(10)

Unnamed: 0,user_id,recommendations
0,148,18231519631097128407728707353672956378
1,496,89667069209580874752997845752642510
2,833,5084151954139569286537581196742643764
3,1088,2199774551753586873280878648161895311622
4,1342,582651679566828238858904407400689099998
5,1645,9569894844075413171564261554286516359864
6,1829,9569541328656715199566346397121191618
7,2142,9569965028653858956677453463161833787039
8,2366,9531873216189569808754132872437365148648
9,2866,9569541328659566667876828282337899988647


In [None]:
### predicted preference
queried_user_id = '148'
print(f'Queried user id : {queried_user_id}')
nrecommend.join(books_data, nrecommend.book_id == books_data.book_id).filter(nrecommend.user_id == queried_user_id).select('user_id',nrecommend.book_id,'original_title').show(10, False)

Queried user id : 148
+-------+-------+----------------------------------------+
|user_id|book_id|original_title                          |
+-------+-------+----------------------------------------+
|148    |760    |Memoria de mis putas tristes            |
|148    |6310   |Charlie and the Chocolate Factory       |
|148    |4708   |The Beautiful and Damned                |
|148    |8968   |The Vampire Prince (Cirque Du Freak, #6)|
|148    |2978   |Lost Horizon                            |
|148    |7677   |Jurassic Park                           |
|148    |3872   |A History of the World in 6 Glasses     |
|148    |2872   |Falling Angels                          |
|148    |11     |The Hitchhiker's Guide to the Galaxy    |
|148    |6149   |Beloved                                 |
+-------+-------+----------------------------------------+



### Content based recommendation system
The content based recommendation system tries to recommend similar content items which is in our case books. Since we do not have the books description we had to go with title and the author's name to derive the features and provide recommendations. The recommendations are hence not that accurate.

In the feature engineering step the raw text data will be transformed into feature vectors and new features will be created using the existing dataset. We will implement the following different ideas in order to obtain relevant features from our dataset.

- Count Vectors as features
- TF-IDF Vectors as features

In [None]:
concat_udf = F.udf(lambda cols: " ".join([x if x is not None else "*" for x in cols]), StringType())
books_datacont = books_data.withColumn("desc", concat_udf(F.array("authors","rating")))                         
books_datacont = books_datacont.withColumn("desc", F.regexp_replace("desc", "[/(,)]", " "))

In [None]:
books_datacont.show(1,False)

+---+-------+------------+-------+-----------+---------+----------------+---------------+-------------------------+----------------+---------------------------------------+-------------+--------------+-------------+------------------+-----------------------+---------+---------+---------+---------+---------+----------------------------------------------------------+----------------------------------------------------------+------------------------------------------------------------+
|id |book_id|best_book_id|work_id|books_count|isbn     |isbn13          |authors        |original_publication_year|original_title  |title                                  |language_code|average_rating|ratings_count|work_ratings_count|work_text_reviews_count|ratings_1|ratings_2|ratings_3|ratings_4|ratings_5|image_url                                                 |small_image_url                                           |desc                                                        |
+---+-------+-----------

In [None]:
#from pyspark.ml.feature import HashingTF, IDF
#hashingTF = HashingTF(inputCol="tokens", outputCol="tf")
#tf = hashingTF.transform(regexTokenized)
#idf = IDF(inputCol="tf", outputCol="feature").fit(tf)
#tfidf = idf.transform(tf)

In [None]:
#from pyspark.ml.feature import Normalizer
#normalizer = Normalizer(inputCol="feature", outputCol="norm")
#data = normalizer.transform(tfidf)

In [None]:
books_datacont = books_datacont.toPandas()

**countVectorizer + cosine simiarity**

In [None]:
"""
Tried to find the content based on countvectorizer
"""
from sklearn.feature_extraction.text import CountVectorizer
cv = CountVectorizer()
count_matrix = cv.fit_transform(books_datacont['desc'])

In [None]:
cos_sim = cosine_similarity(count_matrix,count_matrix)

In [None]:
from pyspark.sql.types import IntegerType

def recommendation_content(index):
  cosine_score = list(enumerate(cos_sim[index]))
  cosine_score = sorted(cosine_score, key = lambda x: x[1], reverse = True)
      #Top 10 recommendations based on content
  recommendations = cosine_score[1:10]
  recommend = [i[0] for i in recommendations]
  recom = spark.createDataFrame(recommend, IntegerType())
  recom.join(books_dataidx, recom.value == books_dataidx.id ,'inner').select("book_id","original_title").show(10,False)

In [None]:
Book = "Harry Potter and the Philosopher's Stone"
print(f"Queried Book name: {Book} \n")
query_index = books_dataidx.where(books_data['original_title'] == Book).select("idx").collect()
recommendation_content(query_index[0].idx)

Queried Book name: Harry Potter and the Philosopher's Stone 

+--------+----------------------------------------------------------------------+
|book_id |original_title                                                        |
+--------+----------------------------------------------------------------------+
|18635016|The One                                                               |
|2429135 |Män som hatar kvinnor                                                 |
|34      | The Fellowship of the Ring                                           |
|3763    |Live and Let Die                                                      |
|51497   |The Strange Case of Dr. Jekyll and Mr. Hyde and Other Tales of Terror |
|531350  |The Choice                                                            |
|261161  |Dial L for Loser (The Clique, #6)                                     |
|5       |Harry Potter and the Prisoner of Azkaban                              |
|24019   |The New Best Recipe: All-N

**tf-idf + cosine similarity**

In [None]:
regexTokenizer = RegexTokenizer(inputCol="desc", outputCol="tokens")
regexTokenized = regexTokenizer.transform(books_datacont)

In [None]:
regexTokenized.show(1,False)

+---+-------+------------+-------+-----------+---------+----------------+---------------+-------------------------+----------------+---------------------------------------+-------------+--------------+-------------+------------------+-----------------------+---------+---------+---------+---------+---------+----------------------------------------------------------+----------------------------------------------------------+--------------------+------------------------+
|id |book_id|best_book_id|work_id|books_count|isbn     |isbn13          |authors        |original_publication_year|original_title  |title                                  |language_code|average_rating|ratings_count|work_ratings_count|work_text_reviews_count|ratings_1|ratings_2|ratings_3|ratings_4|ratings_5|image_url                                                 |small_image_url                                           |desc                |tokens                  |
+---+-------+------------+-------+-----------+--------

In [None]:
"""
used tfidf for content recommendation
"""
tfv = TfidfVectorizer(min_df = 3, max_features = None, strip_accents = 'unicode', analyzer = 'word', token_pattern =r'\w{1,}',
            ngram_range = (1,2),
            stop_words = 'english')
tfv_matrix = tfv.fit_transform(books_datacont['desc'])     ###Converted to sparse matrix

In [None]:
rows, cols = tfv_matrix.nonzero()     ### checking the nonzero values of sparse matrix
rows,cols

(array([   0,    0,    0, ..., 9999, 9999, 9999], dtype=int32),
 array([ 161, 1021, 4139, ...,    0, 2263,  126], dtype=int32))

In [None]:
###Cosine similarity
cos_sim = cosine_similarity(tfv_matrix,tfv_matrix)

In [None]:
### Suppose i want to search for books based on book Attachments
Book = "Harry Potter and the Philosopher's Stone"
print(f"Queried Book name: {Book} \n")
query_index = books_dataidx.where(books_data['original_title'] == Book).select("idx").collect()
recommendation_content(query_index[0].idx)

Queried Book name: Harry Potter and the Philosopher's Stone 

+--------+----------------------------------------------------------------------------------------+
|book_id |original_title                                                                          |
+--------+----------------------------------------------------------------------------------------+
|2429135 |Män som hatar kvinnor                                                                   |
|4922079 |One Second After                                                                        |
|7095831 |Ship Breaker                                                                            |
|14142   |The Art of Loving                                                                       |
|77378   |The Seven-Percent Solution: Being a Reprint from the Reminiscences of John H. Watson, MD|
|34      | The Fellowship of the Ring                                                             |
|18635016|The One                     

**Binariser + cosine simiarity**

In [None]:
#### trying Binarizer
from sklearn.preprocessing import Binarizer
binary = Binarizer().fit(count_matrix)         #### takes the count matrix we used to build the binary matrix
binary_transformed = binary.transform(count_matrix)

In [None]:
cos_sim = cosine_similarity(binary_transformed,binary_transformed)

In [None]:
### Suppose i want to search for books based on book Attachments
Book = "Harry Potter and the Philosopher's Stone"
print(f"Queried Book name: {Book} \n")
query_index = books_dataidx.where(books_data['original_title'] == Book).select("idx").collect()
recommendation_content(query_index[0].idx)

Queried Book name: Harry Potter and the Philosopher's Stone 

+--------+----------------------------------------------------------------------------------------+
|book_id |original_title                                                                          |
+--------+----------------------------------------------------------------------------------------+
|2429135 |Män som hatar kvinnor                                                                   |
|4922079 |One Second After                                                                        |
|34      | The Fellowship of the Ring                                                             |
|18635016|The One                                                                                 |
|77276   |A Swiftly Tilting Planet                                                                |
|14142   |The Art of Loving                                                                       |
|7095831 |Ship Breaker                