In [40]:
# code reference: 
# https://github.com/KevinLiao159/MyDataSciencePortfolio/blob/master/movie_recommender/movie_recommendation_using_ALS.ipynb
import os
import time

# spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction, explode, desc
from pyspark.sql.types import StringType, ArrayType
# from pyspark.mllib.recommendation import ALS
from pyspark.ml.recommendation import ALS

# data science imports
import math
import numpy as np
import pandas as pd

# visualization imports
import seaborn as sns
import matplotlib.pyplot as plt

%matplotlib inline

In [2]:
# spark config
spark = SparkSession \
    .builder \
    .appName("movie recommendation") \
    .config("spark.driver.maxResultSize", "96g") \
    .config("spark.driver.memory", "96g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.master", "local[12]") \
    .getOrCreate()
# get spark context
sc = spark.sparkContext

In [8]:
valid = spark.read.csv('valid.csv', header = True)

In [10]:
valid.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- is_read: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- review_text_incomplete: string (nullable = true)
 |-- date_added: string (nullable = true)
 |-- date_updated: string (nullable = true)
 |-- read_at: string (nullable = true)
 |-- started_at: string (nullable = true)



In [13]:
valid.show(3)

+-------+--------------------+-------+--------------------+-------+------+----------------------+--------------------+--------------------+-------+----------+
|    _c0|             user_id|book_id|           review_id|is_read|rating|review_text_incomplete|          date_added|        date_updated|read_at|started_at|
+-------+--------------------+-------+--------------------+-------+------+----------------------+--------------------+--------------------+-------+----------+
|2505429|a97d9da59ea6ecb0a...| 590170|dca50f489d1152c13...|   True|     5|                  null|Tue Nov 27 09:22:...|Tue Nov 27 09:25:...|   null|      null|
|2505430|a97d9da59ea6ecb0a...| 203838|fb5950f550a8a0169...|   True|     3|                  null|Tue Nov 27 09:16:...|Tue Nov 27 19:10:...|   null|      null|
|2505431|a97d9da59ea6ecb0a...| 973197|6177f7b27372a37e9...|   True|     3|                  null|Tue Nov 27 09:13:...|Tue Nov 27 19:04:...|   null|      null|
+-------+--------------------+-------+--------

In [14]:
# only use the columns we need

nd = valid.select(valid['user_id'], valid['book_id'], valid['rating'])
nd.show()

+--------------------+--------+------+
|             user_id| book_id|rating|
+--------------------+--------+------+
|a97d9da59ea6ecb0a...|  590170|     5|
|a97d9da59ea6ecb0a...|  203838|     3|
|a97d9da59ea6ecb0a...|  973197|     3|
|a97d9da59ea6ecb0a...| 6370459|     0|
|a97d9da59ea6ecb0a...|  239231|     4|
|a97d9da59ea6ecb0a...| 5461604|     3|
|a97d9da59ea6ecb0a...|  112138|     4|
|a97d9da59ea6ecb0a...|16041612|     0|
|a97d9da59ea6ecb0a...|   13273|     4|
|a97d9da59ea6ecb0a...|  201217|     4|
|a97d9da59ea6ecb0a...|  112166|     5|
|a97d9da59ea6ecb0a...|  732562|     5|
|7169ffbd13efe9b46...|  395090|     0|
|7169ffbd13efe9b46...|  144611|     0|
|7169ffbd13efe9b46...|  102962|     0|
|7169ffbd13efe9b46...|   19351|     0|
|7169ffbd13efe9b46...| 1639333|     0|
|e756a6179fb257a7f...|  478992|     5|
|e756a6179fb257a7f...|  289302|     5|
|e756a6179fb257a7f...|   15997|     4|
+--------------------+--------+------+
only showing top 20 rows



In [15]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

In [16]:
indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in list(set(nd.columns)-set(['rating'])) ]

In [17]:
# Converting String to index

pipeline = Pipeline(stages=indexer)
transformed = pipeline.fit(nd).transform(nd)
transformed.show()

+--------------------+--------+------+-------------+-------------+
|             user_id| book_id|rating|book_id_index|user_id_index|
+--------------------+--------+------+-------------+-------------+
|a97d9da59ea6ecb0a...|  590170|     5|      15293.0|       4093.0|
|a97d9da59ea6ecb0a...|  203838|     3|        580.0|       4093.0|
|a97d9da59ea6ecb0a...|  973197|     3|       3929.0|       4093.0|
|a97d9da59ea6ecb0a...| 6370459|     0|       5614.0|       4093.0|
|a97d9da59ea6ecb0a...|  239231|     4|        571.0|       4093.0|
|a97d9da59ea6ecb0a...| 5461604|     3|      15386.0|       4093.0|
|a97d9da59ea6ecb0a...|  112138|     4|       1375.0|       4093.0|
|a97d9da59ea6ecb0a...|16041612|     0|      15763.0|       4093.0|
|a97d9da59ea6ecb0a...|   13273|     4|       1963.0|       4093.0|
|a97d9da59ea6ecb0a...|  201217|     4|       1087.0|       4093.0|
|a97d9da59ea6ecb0a...|  112166|     5|        228.0|       4093.0|
|a97d9da59ea6ecb0a...|  732562|     5|         28.0|       409

In [79]:
transformed.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- book_id_index: double (nullable = false)
 |-- user_id_index: double (nullable = false)



In [52]:
transformed.createOrReplaceTempView('transformed_view')

In [84]:
haha = transformed.withColumn("rating", transformed.rating.cast("int"))

In [85]:
haha.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- book_id_index: double (nullable = false)
 |-- user_id_index: double (nullable = false)



In [86]:
# will modify this 
(training,test)=haha.randomSplit([0.8, 0.2])

In [87]:
als = ALS(maxIter=5,regParam=0.09,rank=10,userCol="user_id_index",itemCol="book_id_index",ratingCol="rating",coldStartStrategy="drop",nonnegative=True)

In [88]:
model=als.fit(training)

In [90]:
from pyspark.ml.evaluation import RegressionEvaluator

In [92]:
evaluator=RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
predictions=model.transform(test)
rmse=evaluator.evaluate(predictions)
print("RMSE="+str(rmse))
predictions.show()

RMSE=1.9144878833427348
+--------------------+-------+------+-------------+-------------+-----------+
|             user_id|book_id|rating|book_id_index|user_id_index| prediction|
+--------------------+-------+------+-------------+-------------+-----------+
|810347650d59d9a51...|9658269|     0|        148.0|        255.0| 0.45312956|
|e2fd31881738bd252...|9658269|     0|        148.0|       4119.0| 0.22569682|
|2ae6c1f350316c7b1...|9658269|     4|        148.0|       8423.0|   2.258429|
|90aee9270f3fbd507...|9658269|     0|        148.0|       3166.0|  4.6350756|
|e9b3e911bcc0320bf...|9658269|     0|        148.0|       5040.0|        0.0|
|cb322ac52fedb7061...|9658269|     0|        148.0|        844.0|  0.2538706|
|db80b1fc0f7379205...|9658269|     0|        148.0|        233.0|  0.4696538|
|0b8f2c04b63bdeec6...|9658269|     0|        148.0|        603.0|  1.7773238|
|d4d8c8af7d8305bc4...|9658269|     4|        148.0|       1008.0|  2.4211395|
|13d99bee264960246...|9658269|     0|   

In [96]:
# the number of users having less than 10 ratings
less_rating = spark.sql('SELECT COUNT(*), user_id FROM transformed_view GROUP BY user_id HAVING COUNT(*) < 10')

In [99]:
less_rating.createOrReplaceTempView('less_rating')

In [100]:
counts = spark.sql('SELECT COUNT(*) AS count FROM less_rating')

In [101]:
counts.show()

+-----+
|count|
+-----+
| 6886|
+-----+



In [102]:
# the total users
counts = spark.sql('SELECT COUNT(*) AS Total FROM transformed_view')

In [103]:
counts.show()

+------+
| Total|
+------+
|192492|
+------+

