In [0]:
%pyspark

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql import SQLContext
from pyspark import SparkConf

conf = SparkConf()
conf.setMaster('spark://192.168.56.101:7077')
conf.setAppName('recommend')

sc = SparkContext.getOrCreate(conf=conf)
#sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")

spark = SparkSession(sc)

data = spark.read.format("csv").option("header", "true").option("encoding", "UTF-8").load("hdfs://hadoop-master:9000/dataset/dataset1/ratings.csv")
data.printSchema()

In [1]:
%sql
CREATE TABLE ratings (
    userId int,
    movieId int,
    rating double,
    ts int
)
USING CSV
OPTIONS (path "hdfs://hadoop-master:9000/dataset/dataset1/ratings.csv", header="true")

In [2]:
%pyspark
data = spark.table("ratings")
data.show()

In [3]:
%pyspark
#Count null value
from pyspark.sql.functions import col,sum
data.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in data.columns)).show()

In [4]:
%pyspark
#Count Null value
from pyspark.sql.functions import lit, col

rows = data.count()
summary = data.describe().filter(col("summary") == "count")
summary.select(*((lit(rows)-col(c)).alias(c) for c in data.columns)).show()

In [5]:
%pyspark
print('No. of row: %d' % data.count())
data.show(5)

In [6]:
%pyspark
# count, mean, std, min & max
data.describe().show()

In [7]:
%pyspark
#Split dataset to train and test
train_data, test_data = data.randomSplit([0.8, 0.2])

In [8]:
%pyspark
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [9]:
%pyspark
# Build the recommendation model using ALS on the training data
als = ALS(maxIter=10, regParam=0.1, rank=8, nonnegative=True, coldStartStrategy="drop",\
          userCol='userId', itemCol='movieId', ratingCol='rating')
model = als.fit(train_data)

In [10]:
%pyspark
print('Factorized user matrix with rank = %d' % model.rank)
model.userFactors.show(5)

print('-'*50)

print('Factorized item matrix with rank = %d' % model.rank)
model.itemFactors.show(5)

In [11]:
%pyspark
print('Recommended top users (e.g. 1 top user) for all items with the corresponding predicted ratings:')
model.recommendForAllItems(1).show(5)

print('-'*50)

print('Recommended top items (e.g. 1 top item) for all users with the corresponding predicted ratings:')
model.recommendForAllUsers(1).show(5)

In [12]:
%pyspark
model.save('hdfs://hadoop-master:9000/model')

In [13]:
%pyspark
#Let see how the model perform
predictions = model.transform(test_data)
predictions.show()

In [14]:
%pyspark
predictions.printSchema()


In [15]:
%pyspark
# check the root mean squared error
evaluator = RegressionEvaluator(metricName='rmse', predictionCol='prediction', labelCol='rating')
rmse = evaluator.evaluate(predictions)
print('Root mean squared error of the test_data: %.4f' % rmse)

In [16]:
%pyspark
# see historical rating of the user
user_history = train_data.filter(train_data['userId']==11)
user_history.show()

In [17]:
%pyspark
# a list of movies we are thinking to offer
user_suggest = test_data.filter(train_data['userId']==11).select(['movieId', 'userId'])
user_suggest.show()

In [18]:
%pyspark
# offer movies with a high predicted rating
user_offer = model.transform(user_suggest)
user_offer.orderBy('prediction', ascending=False).show()