In [3]:
from pyspark.rdd import RDD
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark import SparkFiles
import pandas as pd
from pyspark.sql.functions import mean
from matplotlib import pyplot as plt
from pyspark.sql.functions import when
from pyspark.sql.functions import lit
import sklearn
import numpy as np 

def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    return spark


from pyspark import SparkFiles
spark = init_spark()
sp = None


ModuleNotFoundError: No module named 'matplotlib'

In [2]:
#Load Book and Tag dataset into df
bk = spark.read.csv("book_clean.csv", header=True)
tags = spark.read.csv("book-tag_clean.csv", header=True)
print(bk.count(),tags.count())

9939 111740


In [3]:
bk.show(5)
tags.show(5)

+-------+--------------------+-------------------------+--------------------+-------------+--------------+-------------+---------+---------+---------+---------+---------+
|book_id|             authors|original_publication_year|               title|language_code|average_rating|ratings_count|ratings_1|ratings_2|ratings_3|ratings_4|ratings_5|
+-------+--------------------+-------------------------+--------------------+-------------+--------------+-------------+---------+---------+---------+---------+---------+
|      1|     Suzanne Collins|                     2008|The Hunger Games ...|          eng|          4.34|      4780653|    66715|   127936|   560092|  1481305|  2706317|
|      2|J.K. Rowling, Mar...|                     1997|Harry Potter and ...|          eng|          4.44|      4602479|    75504|   101676|   455024|  1156318|  3011543|
|      3|     Stephenie Meyer|                     2005|Twilight (Twiligh...|        en-US|          3.57|      3866839|   456191|   436802|   79

In [4]:
distinct_authors_df = bk.select('authors').distinct()
distinct_title_df = bk.select('title').distinct()
distinct_lang_df = bk.select('language_code').distinct()
distinct_tag_df = tags.select('tag_id').distinct()

num_authors = distinct_authors_df.count()
num_title = distinct_title_df.count()
num_lang = distinct_lang_df.count()
num_tag = distinct_tag_df.count()

In [5]:
print("num_authors :",num_authors)
print("num_title :",num_title)
print("num_lang :",num_lang)
print("num_tag :",num_tag)

num_authors : 4617
num_title : 9903
num_lang : 26
num_tag : 763


Load and Split Ratings Dataset

In [6]:
#spliting into Training and Testing dataset 70:30 ratio
ratings = spark.read.csv("ratings.csv", header=True)
ratings = ratings.withColumn("user_id",ratings.user_id.cast('int'))
ratings = ratings.withColumn("rating",ratings.rating.cast('int'))
ratings = ratings.withColumn("book_id",ratings.book_id.cast('int'))
train,test = ratings.randomSplit([0.7, 0.3],0)
print(train.count(),test.count())
train.show(5)

4182526 1793953
+-------+-------+------+
|user_id|book_id|rating|
+-------+-------+------+
|      1|     10|     4|
|      1|     11|     5|
|      1|     13|     4|
|      1|     22|     3|
|      1|     31|     4|
+-------+-------+------+
only showing top 5 rows



In [7]:
train.select('book_id').distinct().count()

10000

In [8]:
test.select('book_id').distinct().count()

##since all the books are present in both the ratings of the testing and training set,The normalization of the 
#book hotVector will be the same and therefore not necessary to be done seperatly. 
#(e.g the max and min of book year in both dataeset will be the same since the book is present in both) 

10000

# Hot Vector Encoding(Item Profile)

In [9]:
#format tags to get list of categories of each book
rdd1 = tags.rdd
rdd1 = rdd1.map(lambda x: (x['goodreads_book_id'],x['tag_id'])).groupByKey()
tags_ = rdd1.toDF(["book_id","tag_list"])
tags_.show()

+-------+--------------------+
|book_id|            tag_list|
+-------+--------------------+
|      2|[[14064, 8717, 30...|
|   6185|[[14064, 5775, 87...|
|  17245|[[14064, 8717, 30...|
|  30183|[[14064, 8717, 30...|
|  99561|[[14064, 8717, 30...|
| 113436|[[14064, 8717, 30...|
|1656001|[[14064, 8717, 30...|
|6304335|[[14064, 8717, 30...|
|   8852|[[5775, 8717, 305...|
|      1|[[8717, 30574, 11...|
|     13|[[8717, 30574, 11...|
|     24|[[8717, 30574, 11...|
|     33|[[8717, 30574, 11...|
|    275|[[8717, 30574, 11...|
|    304|[[8717, 30574, 11...|
|    359|[[8717, 30574, 11...|
|    446|[[8717, 30574, 11...|
|    447|[[8717, 30574, 11...|
|    621|[[8717, 30574, 21...|
|    656|[[8717, 30574, 11...|
+-------+--------------------+
only showing top 20 rows



In [10]:
#join tag/category  list to book dataframe
bk = bk.join(tags_, ['book_id'], 'left_outer')

In [11]:
bk.printSchema()
bk = bk.withColumn("tag_list",bk.tag_list['data'])

root
 |-- book_id: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- original_publication_year: string (nullable = true)
 |-- title: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- average_rating: string (nullable = true)
 |-- ratings_count: string (nullable = true)
 |-- ratings_1: string (nullable = true)
 |-- ratings_2: string (nullable = true)
 |-- ratings_3: string (nullable = true)
 |-- ratings_4: string (nullable = true)
 |-- ratings_5: string (nullable = true)
 |-- tag_list: struct (nullable = true)
 |    |-- data: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- index: long (nullable = true)
 |    |-- maxindex: long (nullable = true)



In [12]:
#replace null list of tags with empty list
import pyspark.sql.functions as F
bk = bk.withColumn("tag_list",when(bk.tag_list.isNull(),F.array([])).otherwise(bk.tag_list))

In [13]:
bk.select('book_id','tag_list').orderBy('book_id').show()
bk.printSchema()

+-------+--------------------+
|book_id|            tag_list|
+-------+--------------------+
|      1|[8717, 30574, 112...|
|     10|[8717, 30574, 115...|
|    100|                  []|
|   1000|                  []|
|  10000|                  []|
|   1001|                  []|
|   1002|                  []|
|   1003|                  []|
|   1004|                  []|
|   1005|[8717, 30574, 270...|
|   1006|                  []|
|   1007|                  []|
|   1008|                  []|
|   1009|                  []|
|    101|                  []|
|   1010|                  []|
|   1011|                  []|
|   1012|                  []|
|   1013|                  []|
|   1014|                  []|
+-------+--------------------+
only showing top 20 rows

root
 |-- book_id: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- original_publication_year: string (nullable = true)
 |-- title: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- ave

In [14]:
#cast non-categorical feature columns to int/float
bk = bk.withColumn("original_publication_year",bk.original_publication_year.cast('int'))
bk = bk.withColumn("ratings_count",bk.ratings_count.cast('int'))
bk = bk.withColumn("average_rating",bk.ratings_count.cast('float'))
bk = bk.withColumn("ratings_1",bk.ratings_1.cast('int'))
bk = bk.withColumn("ratings_2",bk.ratings_2.cast('int'))
bk = bk.withColumn("ratings_3",bk.ratings_3.cast('int'))
bk = bk.withColumn("ratings_4",bk.ratings_4.cast('int'))
bk = bk.withColumn("ratings_5",bk.ratings_5.cast('int'))

In [15]:
#assemble non-categorical features as vector
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["original_publication_year", "ratings_count","average_rating","ratings_1","ratings_2","ratings_3","ratings_4","ratings_5"],
    outputCol="non-categorical")
bk = assembler.transform(bk)
#bk.show(10)


In [16]:
bk.select("non-categorical").show(5)

+--------------------+
|     non-categorical|
+--------------------+
|[2000.0,75469.0,7...|
|[1994.0,80056.0,8...|
|[2004.0,69007.0,6...|
|[1853.0,39758.0,3...|
|[1991.0,67753.0,6...|
+--------------------+
only showing top 5 rows



In [17]:
# map categorical features to numeric(vector index) 
from pyspark.ml.feature import StringIndexer, OneHotEncoder

indexer = StringIndexer(inputCol='authors', outputCol='authors_ind')
bk=indexer.fit(bk).transform(bk)

indexer = StringIndexer(inputCol= 'title', outputCol='title_ind')
bk=indexer.fit(bk).transform(bk)


indexer = StringIndexer(inputCol='language_code', outputCol='language-code_ind')
bk=indexer.fit(bk).transform(bk)



In [18]:
bk.select("authors_ind","title_ind","language-code_ind").show(5)

+-----------+---------+-----------------+
|authors_ind|title_ind|language-code_ind|
+-----------+---------+-----------------+
|     3443.0|   3177.0|              0.0|
|     4445.0|   5961.0|              3.0|
|       70.0|   5653.0|              0.0|
|     4181.0|   9237.0|              0.0|
|     3651.0|   2043.0|              0.0|
+-----------+---------+-----------------+
only showing top 5 rows



In [19]:
# encode categorical features index to hot vectors
encoder = OneHotEncoder(inputCol='authors_ind', outputCol = 'authors_vec')
bk = encoder.fit(bk).transform(bk)

from pyspark.ml.feature import CountVectorizer
colorVectorizer = CountVectorizer(inputCol="tag_list", outputCol="tag_vec", vocabSize=763, minDF=1.0)
colorVectorizer_model = colorVectorizer.fit(bk)
bk = colorVectorizer_model.transform(bk)

encoder = OneHotEncoder(inputCol='title_ind', outputCol = 'title_vec')
bk = encoder.fit(bk).transform(bk)

encoder = OneHotEncoder(inputCol='language-code_ind', outputCol = 'language-code_vec')
bk = encoder.fit(bk).transform(bk)



In [20]:
bk.filter(bk.tag_list != F.array([]) ).select('authors_vec','tag_vec').show()

+-------------------+--------------------+
|        authors_vec|             tag_vec|
+-------------------+--------------------+
|(4616,[1375],[1.0])|(537,[0,2,3,8,10,...|
|(4616,[3772],[1.0])|(537,[1,2,62],[1....|
| (4616,[148],[1.0])|(537,[0,1,9,57],[...|
|(4616,[2235],[1.0])|(537,[0,1,9,16,12...|
|   (4616,[3],[1.0])|(537,[0,1,9,132,2...|
|(4616,[4131],[1.0])|(537,[0,1,2,3,4,5...|
|(4616,[4553],[1.0])|(537,[0,1,2,3,4,5...|
|(4616,[1507],[1.0])|(537,[0,1],[1.0,1...|
| (4616,[335],[1.0])|(537,[0,1,2,3,4,5...|
|(4616,[2305],[1.0])|(537,[0,2,3,12],[...|
|(4616,[3978],[1.0])|(537,[0,3,7,12,13...|
| (4616,[165],[1.0])|(537,[0,1,2,3,4,5...|
| (4616,[434],[1.0])|(537,[0,2],[1.0,1...|
| (4616,[967],[1.0])|(537,[0,1,2,3,4,5...|
|(4616,[3773],[1.0])|(537,[0,3,4,6,11,...|
|(4616,[2111],[1.0])|(537,[0,2,3,212,2...|
|(4616,[1420],[1.0])|(537,[0,1,2,3,4,5...|
| (4616,[273],[1.0])|(537,[0,1,2],[1.0...|
|(4616,[1481],[1.0])|(537,[0,1,2,3,4,5...|
|(4616,[3554],[1.0])|(537,[0,1,2,3,241...|
+----------

In [21]:
# Assemble all the vectors as 1 Hot-Vector : 15063 columns of features
assembler = VectorAssembler(
    inputCols=["non-categorical","authors_vec","title_vec","tag_vec"],
    outputCol="hotVector")
bk = assembler.transform(bk)

In [22]:
bk.select('hotVector').show()

+--------------------+
|           hotVector|
+--------------------+
|(15063,[0,1,2,3,4...|
|(15063,[0,1,2,3,4...|
|(15063,[0,1,2,3,4...|
|(15063,[0,1,2,3,4...|
|(15063,[0,1,2,3,4...|
|(15063,[0,1,2,3,4...|
|(15063,[0,1,2,3,4...|
|(15063,[0,1,2,3,4...|
|(15063,[0,1,2,3,4...|
|(15063,[0,1,2,3,4...|
|(15063,[0,1,2,3,4...|
|(15063,[0,1,2,3,4...|
|(15063,[0,1,2,3,4...|
|(15063,[0,1,2,3,4...|
|(15063,[0,1,2,3,4...|
|(15063,[0,1,2,3,4...|
|(15063,[0,1,2,3,4...|
|(15063,[0,1,2,3,4...|
|(15063,[0,1,2,3,4...|
|(15063,[0,1,2,3,4...|
+--------------------+
only showing top 20 rows



In [23]:
#Normalize the non-categorical features to scale 0-1
from pyspark.ml.feature import MinMaxScaler

# MinMaxScaler Transformation
scaler = MinMaxScaler(inputCol="hotVector", outputCol="hotVector_scaled")
scalerModel =  scaler.fit(bk.select("hotVector"))
bk = scalerModel.transform(bk)

In [24]:
#1 row hot-vector example
bk.where('book_id=2').select('hotVector_scaled').collect()

[Row(hotVector_scaled=SparseVector(15063, {0: 0.9777, 1: 0.9627, 2: 0.9627, 3: 0.1655, 4: 0.2327, 5: 0.5734, 6: 0.7805, 7: 1.0, 205: 1.0, 7598: 1.0, 14526: 1.0, 14527: 1.0, 14528: 1.0, 14529: 1.0, 14530: 0.5, 14531: 1.0, 14533: 1.0, 14536: 1.0, 14537: 1.0, 14539: 1.0, 14540: 1.0, 14541: 1.0, 14544: 1.0, 14545: 1.0, 14548: 1.0, 14550: 1.0, 14551: 1.0, 14555: 1.0, 14556: 1.0, 14561: 1.0, 14565: 1.0, 14566: 1.0, 14568: 1.0, 14577: 1.0, 14578: 1.0, 14579: 1.0, 14582: 1.0, 14585: 1.0, 14587: 1.0, 14594: 1.0, 14602: 1.0, 14603: 1.0, 14604: 1.0, 14613: 1.0, 14622: 1.0, 14623: 1.0, 14624: 1.0, 14628: 1.0, 14638: 1.0, 14639: 1.0, 14640: 1.0, 14643: 1.0, 14648: 1.0, 14650: 1.0, 14651: 1.0, 14655: 1.0, 14660: 1.0, 14664: 1.0, 14667: 1.0, 14670: 1.0, 14675: 1.0, 14676: 1.0, 14677: 1.0, 14679: 1.0, 14691: 1.0, 14693: 1.0, 14698: 1.0, 14701: 1.0, 14703: 1.0, 14704: 1.0, 14707: 1.0, 14720: 1.0, 14728: 1.0, 14731: 1.0, 14733: 1.0, 14735: 1.0, 14745: 1.0, 14766: 1.0, 14773: 1.0, 14776: 1.0, 14778: 1.0,

In [25]:
#denseVector to vector(array)
from pyspark.ml.functions import vector_to_array
bk = bk.withColumn("hot-vector",vector_to_array(bk.hotVector_scaled))
bk = bk.select('book_id','hot-vector')
bk.show(5)

+-------+--------------------+
|book_id|          hot-vector|
+-------+--------------------+
|   1090|[0.98104793756967...|
|   1159|[0.97435897435897...|
|   1436|[0.98550724637681...|
|   1512|[0.81716833890746...|
|   1572|[0.97101449275362...|
+-------+--------------------+
only showing top 5 rows



In [26]:
bk.printSchema()

root
 |-- book_id: string (nullable = true)
 |-- hot-vector: array (nullable = false)
 |    |-- element: double (containsNull = false)



In [42]:
from scipy.spatial import distance

#function to build a user profile with user_id as input
def get_user_profile(user_id,ratings,books):
        
        #get high ratings of the user(4 and 5)
        ratings = ratings.filter((ratings.user_id==user_id) & ((ratings.rating== 4) | (ratings.rating==5)))
        items_count = ratings.count()
        
        if items_count > 0:
            
            #get the books that was read and highly rated by the user
            items = books.join(ratings,['book_id'],'inner')
            count = items.count()
            
            #convert to rdd for easy computations 
            rdd_HV = items.select('hot-vector').rdd
            rdd_HV = rdd_HV.map(lambda x: x['hot-vector'])
            #rdd_HV = rdd_HV.reduce(lambda x,y: x+y)
            arr = np.array(rdd_HV.collect())
            
            #aggregate the highly rated book profile to get user profile
            print(arr.shape)
            return np.add.reduce(arr)/count

        
#function that return sorted cosine distances of unread books by a user
def get_cosine_distance(user_id,ratings,bk):
    
    #get the user profile
    print("Getting User Profile..")
    user_profile = get_user_profile(user_id,train,bk)
    print("User Profile:",user_profile)
    
    #get books that has not been rated by the user
    print("Getting Book Not Read By User..")
    books = ratings.filter(ratings.user_id == user_id).select('book_id').withColumn('read',lit('True'))
    books = bk.join(books,['book_id'],'left_outer')
    not_read = books.filter(books.read.isNull()).select('book_id','hot-vector').rdd
    print("Book Not Read: ",not_read.count())
    
    #compute the cosine distance of the book profiles and the user profile
    print("Computing Cosine Distances..")
    not_read = not_read.map(lambda x: (x['book_id'],x['hot-vector']))
    not_read = not_read.map(lambda x: (x[0],np.array(x[1])))
    
    #sort by cosine distance( smaller cosine distance: better)
    similarity = not_read.map(lambda x: (distance.cosine(x[1] , user_profile ),x[0]))
    similarity = similarity.sortByKey()
    print("Complete")
    
    return similarity

In [43]:

cos_d = get_cosine_distance(2,train,bk)


Getting User Profile..
(36, 15063)
User Profile: [0.95423015 0.11500936 0.11500936 ... 0.         0.02777778 0.        ]
Getting Book Not Read By User..
Book Not Read:  9895
Computing Cosine Distances..
Complete


In [49]:

def recommend(top=10,user_id):
    
    cos_d = get_cosine_distance(user_id,train,bk)    
    cos_d = cos_d.map(lambda x: (float(x[0]), int(x[1])))
    cos_d = cos_d.toDF(["cosine-distance","book_id"])
    cos_d = cos_d.limit(top)
    cos_d.show(10)
    
    #return cos_d.select('book_id')


+-------------------+-------+
|    cosine-distance|book_id|
+-------------------+-------+
| 0.4404359374374116|   5373|
| 0.4468569747115929|   4415|
|0.46440976734530204|    231|
|0.46715485848458793|     36|
| 0.4733138265736211|   2528|
| 0.4734425316019377|   3378|
|0.47345937469222243|   3384|
| 0.4734999836164072|   9589|
|0.47379208911869797|   9913|
| 0.4739809814556929|   6159|
+-------------------+-------+
only showing top 10 rows



In [None]:
cos_d = cos_d.map(lambda x: (float(x[0]), int(x[1])))
cos_d = cos_d.toDF(["cosine-distance","book_id"])
cos_d = cos_d.limit(20)
cos_d.show(10)

In [None]:
test.where('user_id=2').join(cos_d,['book_id'],'inner').show(20)