In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 40 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 34.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=eeec4edff5321291e0fc665f6027a737bd48a12a8e077d2d288c56cb01617d01
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [None]:
# importing all the libraries we’ll require to build the book recommender
import sys
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions  import *
from pyspark.sql.types import *

# define the configurations for this Spark program
conf = SparkConf().setMaster("local[*]").setAppName("Books")
conf.set("spark.executor.memory", "6G")
conf.set("spark.driver.memory", "2G")
conf.set("spark.executor.cores", "4")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.default.parallelism", "4")

# create a Spark Session instead of a Spark Context
spark = SparkSession.builder \
    .config(conf = conf) \
  .appName("spark session example") \
  .getOrCreate()
sc = spark.sparkContext

In [None]:
books_df = spark.read.option("delimiter", ";").option("header", "true").csv('/content/Books.csv')
books_df.show()

+----------+--------------------+--------------------+----+--------------------+
|      ISBN|               Title|              Author|Year|           Publisher|
+----------+--------------------+--------------------+----+--------------------+
|0195153448| Classical Mythology|  Mark P. O. Morford|2002|Oxford University...|
|0002005018|        Clara Callan|Richard Bruce Wright|2001|HarperFlamingo Ca...|
|0060973129|Decision in Normandy|        Carlo D'Este|1991|     HarperPerennial|
|0374157065|Flu: The Story of...|    Gina Bari Kolata|1999|Farrar Straus Giroux|
|0393045218|The Mummies of Ur...|     E. J. W. Barber|1999|W. W. Norton & Co...|
|0399135782|The Kitchen God's...|             Amy Tan|1991|    Putnam Pub Group|
|0425176428|What If?: The Wor...|       Robert Cowley|2000|Berkley Publishin...|
|0671870432|     PLEADING GUILTY|         Scott Turow|1993|          Audioworks|
|0679425608|Under the Black F...|     David Cordingly|1996|        Random House|
|074322678X|Where You'll Fin

In [None]:
user_ratings_df = spark.read.option("delimiter", ";").option("header", "true").csv('/content/Ratings.csv')

In [None]:
user_ratings_df.show()

+-------+----------+------+
|User-ID|      ISBN|Rating|
+-------+----------+------+
| 276725|034545104X|     0|
| 276726|0155061224|     5|
| 276727|0446520802|     0|
| 276729|052165615X|     3|
| 276729|0521795028|     6|
| 276733|2080674722|     0|
| 276736|3257224281|     8|
| 276737|0600570967|     6|
| 276744|038550120X|     7|
| 276745| 342310538|    10|
| 276746|0425115801|     0|
| 276746|0449006522|     0|
| 276746|0553561618|     0|
| 276746|055356451X|     0|
| 276746|0786013990|     0|
| 276746|0786014512|     0|
| 276747|0060517794|     9|
| 276747|0451192001|     0|
| 276747|0609801279|     0|
| 276747|0671537458|     9|
+-------+----------+------+
only showing top 20 rows



In [None]:
user_ratings_df.printSchema()

root
 |-- User-ID: string (nullable = true)
 |-- ISBN: string (nullable = true)
 |-- Rating: string (nullable = true)



In [None]:
# Columns User-ID, ISBN and Book-Rating were in string format, which we convert to int
ratings_df = user_ratings_df.withColumn("User-ID",
                                        user_ratings_df['User-ID'].\
                                        cast(IntegerType())).\
										withColumn("ISBN", user_ratings_df['ISBN'].\
           								cast(IntegerType())).\
    									withColumn("Rating",\
                                        user_ratings_df['Rating'].\
                                  		cast(IntegerType())).\
        								na.drop()
ratings_df.show()

+-------+----------+------+
|User-ID|      ISBN|Rating|
+-------+----------+------+
| 276726| 155061224|     5|
| 276727| 446520802|     0|
| 276729| 521795028|     6|
| 276733|2080674722|     0|
| 276737| 600570967|     6|
| 276745| 342310538|    10|
| 276746| 425115801|     0|
| 276746| 449006522|     0|
| 276746| 553561618|     0|
| 276746| 786013990|     0|
| 276746| 786014512|     0|
| 276747|  60517794|     9|
| 276747| 451192001|     0|
| 276747| 609801279|     0|
| 276747| 671537458|     9|
| 276747| 679776818|     8|
| 276747| 943066433|     7|
| 276747|1570231028|     0|
| 276747|1885408226|     7|
| 276748| 747558167|     6|
+-------+----------+------+
only showing top 20 rows



In [None]:
ratings_df.printSchema()

root
 |-- User-ID: integer (nullable = true)
 |-- ISBN: integer (nullable = true)
 |-- Rating: integer (nullable = true)



In [None]:
users_df = spark.read.option("delimiter", ";").option("header", "true").csv('/content/Users.csv')

In [None]:
# define parameters
als = ALS(maxIter=5, regParam=0.01, userCol="User-ID", itemCol="ISBN", ratingCol="Rating",coldStartStrategy="drop")
#fit the model to the ratings
model = als.fit(ratings_df)

#RegressionEvaluator

In [None]:
# Import the required functions
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
# Create test and train set
(train, test) = ratings_df.randomSplit([0.8, 0.2], seed = 1234)

In [None]:
best_model = als.fit(train)

In [None]:
# View the predictions
test_predictions = best_model.transform(test)

In [None]:
test_predictions.show()

+-------+---------+------+-----------+
|User-ID|     ISBN|Rating| prediction|
+-------+---------+------+-----------+
|   8086|684868865|     8| -4.6288714|
|  12799|449221393|     0|        0.0|
|  15957|  6177492|     0|        0.0|
|  15957| 60929596|    10|-0.79417866|
|  15957| 61007161|     5|  2.3564427|
|  15957|312983387|     0|   0.874372|
|  15957|345331001|     0| -0.1596441|
|  15957|375700757|     0|  2.5452433|
|  15957|385335482|     9| -0.6146819|
|  15957|440184886|     0|        0.0|
|  15957|446673544|     9|  2.5453572|
|  15957|450411435|     0|  3.2633176|
|  15957|451167716|     0|  2.6793787|
|  15957|451190548|     8| -3.0437832|
|  15957|451204530|     0|  3.0421953|
|  15957|553571737|     0|  2.1254148|
|  15957|679724362|     0|  1.2744019|
|  15957|679760377|     0|  1.8253385|
|  15957|743444329|     0|  3.2795553|
|  15957|804109052|     0|  1.1500984|
+-------+---------+------+-----------+
only showing top 20 rows



In [None]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="Rating", predictionCol="prediction") 

In [None]:
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

6.131161917726394


#recommendForUserSubset

In [None]:
ratings = ratings_df.filter(col('User-ID')==17)
books_df.join(ratings,ratings.ISBN==books_df.ISBN).select(col('User-ID'),col('Title'),col('Author'),col('Year'),col('Rating')).show()

+-------+--------------------+-----------------+----+------+
|User-ID|               Title|           Author|Year|Rating|
+-------+--------------------+-----------------+----+------+
|     17|OUT OF THE SILENT...|       C.S. Lewis|1996|     0|
|     17|Prelude to Founda...|     ISAAC ASIMOV|1989|     0|
|     17|             Prophet| Frank E. Peretti|1992|     3|
|     17|     Winter Solstice|Rosamunde Pilcher|2001|     0|
|     17| Death in the Clouds|  Agatha Christie|1997|     7|
|     17|Piercing the Dark...| Frank E. Peretti|1989|     6|
|     17|Bant/Spec.Last of...|    Louis L'Amour|1987|     5|
+-------+--------------------+-----------------+----+------+



In [None]:
user_id = [[17]]
# convert this into a dataframe so that it can be passed into the recommendForUserSubset
functiondf = sc.parallelize(user_id).toDF(['User-ID'])
num_rec = 10
recommendations = model.recommendForUserSubset(functiondf , num_rec)
recommendations.collect()
# pick only the ISBN of the books, ignore other fields
recommended_ISBN = [recommendations.collect()[0]['recommendations'][x]['ISBN'] for x in range(0,num_rec)]
recommended_ISBN



[505525526,
 394800206,
 836218787,
 393050939,
 140012486,
 375815260,
 1888054557,
 1400032806,
 374403589,
 505525178]

In [None]:
# convert the recommended_ISBN list into a dataframe so that it can be joined with books_df
rec_df = spark.createDataFrame(recommended_ISBN, IntegerType())
print('Top book recommendations for User-ID ',user_id[0][0], 'are:')
books_df.join(rec_df,rec_df.value==books_df.ISBN).select(col('Title'),col('Author'),col('Year'),col('ISBN')).show()

Top book recommendations for User-ID  17 are:
+--------------------+--------------------+----+----------+
|               Title|              Author|Year|      ISBN|
+--------------------+--------------------+----+----------+
|Single White Vampire|        Lynsay Sands|2003|0505525526|
|Go, Dog, Go (I Ca...|   Philip D. Eastman|1961|0394800206|
|Scientific Progre...|      Bill Watterson|1991|0836218787|
|Stiff: The Curiou...|          Mary Roach|2003|0393050939|
|Charlie and the C...|          ROALD DAHL|2001|0375815260|
|Postmarked Yester...|Pamela E. Apkaria...|2001|1888054557|
|Under the Banner ...|        JON KRAKAUER|2004|1400032806|
|    Improper English|    Katie Macalister|2003|0505525178|
+--------------------+--------------------+----+----------+



#recommendForAllItems

In [None]:
recommendations2=model.recommendForAllItems(5)



In [None]:
recommendations2.show(10)

+------+--------------------+
|  ISBN|     recommendations|
+------+--------------------+
|  1400|[{88187, 73.9973}...|
|  1404|[{96989, 79.42860...|
|  1460|[{33875, 25.42917...|
| 14044|[{10, 0.0}, {20, ...|
| 53416|[{70878, 36.47485...|
| 70854|[{88187, 29.24179...|
|200092|[{10, 0.0}, {20, ...|
|205138|[{10, 0.0}, {20, ...|
|222318|[{10, 0.0}, {20, ...|
|225755|[{10, 0.0}, {20, ...|
+------+--------------------+
only showing top 10 rows



In [None]:
recommendations2 = recommendations2\
    .withColumn("rec_exp", explode("recommendations"))\
    .select('ISBN', col("rec_exp.User-ID"), col("rec_exp.Rating"))
recommendations2.limit(10).show()

+----+-------+---------+
|ISBN|User-ID|   Rating|
+----+-------+---------+
|1400|  88187|  73.9973|
|1400|  77307| 52.19913|
|1400|  17065|51.761963|
|1400|  91875| 51.44953|
|1400|  53408| 47.90209|
|1404|  96989|79.428604|
|1404|  81484|70.117386|
|1404|  81311| 64.43918|
|1404|  98574|58.447224|
|1404| 104051|53.849194|
+----+-------+---------+



In [None]:
recommendations2.join(users_df, on='User-ID').filter('ISBN = 0316779059').show()

+-------+---------+----------+---+
|User-ID|     ISBN|    Rating|Age|
+-------+---------+----------+---+
|  60255|316779059| 133.16617| 33|
|  83166|316779059|127.142944| 44|
|  47465|316779059| 107.51777| 23|
|  36286|316779059| 101.76853| 21|
|  36677|316779059| 100.30067| 28|
+-------+---------+----------+---+



#recommendForItemSubset

In [None]:
ratings1 = books_df.filter(col('ISBN')==316779059)

In [None]:
ratings1.show()

+----------+--------------------+------------+----+-------------+
|      ISBN|               Title|      Author|Year|    Publisher|
+----------+--------------------+------------+----+-------------+
|0316779059|The Baby Book: Ev...|Martha Sears|1993|Little, Brown|
+----------+--------------------+------------+----+-------------+



In [None]:
ID1 = [[316779059]]
# convert this into a dataframe so that it can be passed into the recommendForUserSubset
functiondf1 = sc.parallelize(ID1).toDF(['ISBN'])
num_rec1 = 10

In [None]:
functiondf1.show()

+---------+
|     ISBN|
+---------+
|316779059|
+---------+



In [None]:
recommendations3 = model.recommendForItemSubset(functiondf1 , num_rec1)
recommendations3.collect()



[Row(ISBN=316779059, recommendations=[Row(User-ID=60255, rating=133.16616821289062), Row(User-ID=83166, rating=127.1429443359375), Row(User-ID=47465, rating=107.51776885986328), Row(User-ID=36286, rating=101.7685317993164), Row(User-ID=36677, rating=100.30066680908203), Row(User-ID=50110, rating=94.44934844970703), Row(User-ID=96019, rating=92.83719635009766), Row(User-ID=83971, rating=92.48200988769531), Row(User-ID=27317, rating=91.5135726928711), Row(User-ID=31250, rating=90.21512603759766)])]

In [None]:
recommended_clothing1 = [recommendations3.collect()[0]['recommendations'][x]['User-ID'] for x in range(0,num_rec1)]
recommended_clothing1

[60255, 83166, 47465, 36286, 36677, 50110, 96019, 83971, 27317, 31250]

In [None]:
# convert the recommended_ISBN list into a dataframe so that it can be joined with books_df
rec_df1 = spark.createDataFrame(recommended_clothing1, IntegerType())

In [None]:
rec_df1.show()

+-----+
|value|
+-----+
|60255|
|83166|
|47465|
|36286|
|36677|
|50110|
|96019|
|83971|
|27317|
|31250|
+-----+



In [None]:
User_df = spark.read.option("delimiter", ";").option("header", "true").csv('/content/Users.csv')

In [None]:
User_df.show()

+-------+----+
|User-ID| Age|
+-------+----+
|      1|null|
|      2|  18|
|      3|null|
|      4|  17|
|      5|null|
|      6|  61|
|      7|null|
|      8|null|
|      9|null|
|     10|  26|
|     11|  14|
|     12|null|
|     13|  26|
|     14|null|
|     15|null|
|     16|null|
|     17|null|
|     18|  25|
|     19|  14|
|     20|  19|
+-------+----+
only showing top 20 rows



In [None]:
User_df = User_df.withColumnRenamed("User-ID", "User_ID")

In [None]:
df3 = rec_df1.join(User_df,rec_df1.value == User_df.User_ID)

In [None]:
df3.show()

+-----+-------+---+
|value|User_ID|Age|
+-----+-------+---+
|60255|  60255| 33|
|83166|  83166| 44|
|47465|  47465| 23|
|36286|  36286| 21|
|36677|  36677| 28|
|50110|  50110| 60|
|96019|  96019| 25|
|83971|  83971| 59|
|27317|  27317| 25|
|31250|  31250| 34|
+-----+-------+---+



In [None]:
df3.select(col('User_ID'),col('Age')).show()

+-------+---+
|User_ID|Age|
+-------+---+
|  60255| 33|
|  83166| 44|
|  47465| 23|
|  36286| 21|
|  36677| 28|
|  50110| 60|
|  96019| 25|
|  83971| 59|
|  27317| 25|
|  31250| 34|
+-------+---+



#recommendForAllUsers

In [None]:
recommendations1 = model.recommendForAllUsers(5)



In [None]:
recommendations1.show(5)

+-------+--------------------+
|User-ID|     recommendations|
+-------+--------------------+
|     12|[{394871804, 16.2...|
|     16|[{316358436, 34.4...|
|     26|[{1570820538, 37....|
|     44|[{312187106, 14.6...|
|     53|[{394800206, 17.9...|
+-------+--------------------+
only showing top 5 rows



In [None]:
nrecommendations = recommendations1\
    .withColumn("rec_exp", explode("recommendations"))\
    .select('User-ID', col("rec_exp.ISBN"), col("rec_exp.Rating"))

nrecommendations.limit(10).show()

+-------+----------+---------+
|User-ID|      ISBN|   Rating|
+-------+----------+---------+
|     12| 394871804|16.202433|
|     12|  70064547| 14.79994|
|     12| 425133656|14.303633|
|     12|1570820872|14.258346|
|     12| 140431357| 13.88073|
|     16| 316358436|34.493313|
|     16| 670835382|27.750908|
|     16| 316779059|27.248983|
|     16| 140258418|25.719477|
|     16|1570820538|  25.5685|
+-------+----------+---------+



In [None]:
nrecommendations = nrecommendations.withColumnRenamed("User-ID", "User_ID")

In [None]:
books_df = books_df.withColumnRenamed("User-ID", "User_ID")

In [None]:
nrecommendations.join(books_df, on='ISBN').filter('User_ID = 12').show()

+----------+-------+---------+--------------------+---------------+----+--------------------+
|      ISBN|User_ID|   Rating|               Title|         Author|Year|           Publisher|
+----------+-------+---------+--------------------+---------------+----+--------------------+
| 394871804|     12|16.202433|The Berenstain Be...|Stan Berenstain|1985|Random House Chil...|
|  70064547|     12| 14.79994|Motherhood: The S...|   Erma Bombeck|1983|         McGraw-Hill|
| 425133656|     12|14.303633|       Deep Thoughts|    Jack Handey|1994|Berkley Publishin...|
|1570820872|     12|14.258346|Disney's the Lion...|   Don Ferguson|1994|Random House Chil...|
| 140431357|     12| 13.88073|Tess of the D'Urb...|   Thomas Hardy|1978|       Penguin Books|
+----------+-------+---------+--------------------+---------------+----+--------------------+

