In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext,SparkConf
spark = SparkSession.builder.\
            master("local").\
            appName("amzone").\
            enableHiveSupport().\
            getOrCreate()
sc = spark.sparkContext
inputPath="data/sample.json"
# rawdata.select("reviewerID","asin","reviewerName","overall","reviewTime").show()
# inputPath = "data/reviews_Books_5.json"
rawdata = spark.read.json(inputPath)
ratingDf = rawdata.select(rawdata.reviewerID.alias("user"),\
               rawdata.asin.alias("item"),\
               rawdata.overall.alias("rating")\
               )
ratingDf.registerTempTable("Ratings")
sqlString="with t1 as (select count(distinct(user)) as users from Ratings),\
           t2 as (select count(distinct(item)) as items from Ratings), \
           t3 as (select count(distinct(rating)) as ratings from Ratings) \
            select * from t1 cross join t2 cross join t3"
# rawdata.select("reviewerID").distinct().count()
resultDf = spark.sql(sqlString)
resultDf.show()
trainingData,cvData,testData = ratingDf.randomSplit([0.6,0.2,0.2])
# resultDf.show()
from pyspark.ml.pipeline import Pipeline,PipelineModel
from pyspark.ml.feature import StringIndexer
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import *
userIndexer = StringIndexer(inputCol="user",outputCol="userid",handleInvalid="skip").fit(ratingDf)
itemIndexer = StringIndexer(inputCol="item",outputCol="itemid",handleInvalid="skip").fit(ratingDf)
alsModel = ALS(rank=5,userCol="userid",itemCol="itemid",ratingCol="rating",maxIter=5,regParam=0.0001)
pipeAls = Pipeline(stages=[userIndexer,itemIndexer,alsModel])
pipeAlsModel = pipeAls.fit(trainingData)
predictionAls = pipeAlsModel.transform(trainingData)
predictionAls.sort(asc("userid")).limit(10).show()
prediction = pipeAlsModel.transform(testData)
prediction.sort(asc("userid")).limit(10).show()
evaluator = RegressionEvaluator()\
      .setMetricName("rmse")\
      .setLabelCol("rating")\
      .setPredictionCol("prediction")
rmse = evaluator.evaluate(prediction.select('userid','itemid','rating','prediction').na.drop())
print "RMSE:",rmse
sc.stop()
spark.stop()

+-----+-----+-------+
|users|items|ratings|
+-----+-----+-------+
|  100|    1|      5|
+-----+-----+-------+

+--------------------+----------+------+------+------+----------+
|                user|      item|rating|userid|itemid|prediction|
+--------------------+----------+------+------+------+----------+
|       A1FQPOYRBTTK1|000100039X|   5.0|   0.0|   0.0|  4.999921|
|A10000012B7CGYKOM...|000100039X|   5.0|   2.0|   0.0|  4.999921|
|      A1NPNGWBVD9AK3|000100039X|   5.0|   3.0|   0.0|  4.999921|
|      A2BQZRA2P81BQG|000100039X|   2.0|   6.0|   0.0|  1.999968|
|       AMRZ5G7HF7I03|000100039X|   5.0|   9.0|   0.0|  4.999921|
|      A19N3FCQCLJYUA|000100039X|   5.0|  10.0|   0.0|  4.999921|
|      A2X6GEC6LCDN4S|000100039X|   5.0|  11.0|   0.0|  4.999921|
|      A1TT4CY55WLHAR|000100039X|   5.0|  12.0|   0.0|  4.999921|
|      A3H65DAAV98C8F|000100039X|   5.0|  13.0|   0.0|  4.999921|
|      A3FI0744PG1WYG|000100039X|   5.0|  14.0|   0.0|  4.999921|
+--------------------+---------

NameError: name 'predictionRecommendation' is not defined

In [23]:
prediction.registerTempTable('t1')
prediction.select('userid','itemid','prediction').na.drop().show()

+------+------+----------+
|userid|itemid|prediction|
+------+------+----------+
+------+------+----------+



In [3]:
%matplotlib inline
#!/usr/bin/env python
# -*- coding:utf-8 -*-
__author__ = "yan bin"
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.pipeline import Pipeline,PipelineModel
from pyspark.ml.feature import StringIndexer
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import *
from pyspark.sql.functions import *  
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit,CrossValidator
from pandas import *
import matplotlib as mpl
import time
from numpy import arange,logspace
import sys
mpl.use("Agg")
import matplotlib.pyplot as plt

def myInt(x):
    try:
        return int(x)
    except:
        return np.nan
    
class spark:
    __spark = 0
    __sc = 0


    def __init__(self, master="local", appName="sparkDemo"):
        conf = (SparkConf().setAppName(appName)
                .setMaster(master))
        self.__spark = (SparkSession.builder
                .config(conf=conf)
                .enableHiveSupport()
                .getOrCreate())
        self.__sc = self.__spark.sparkContext


    def getContext(self):
        return (self.__spark, self.__sc)


    def getJson(self, inputPath):
        spark, sc = self.getContext()
        rawData = spark.read.json(inputPath)
        ratingDf = (rawData.select(rawData.reviewerID.alias("user"),
                                   rawData.asin.alias("item"),
                                   rawData.overall.alias("rating")))
        return ratingDf
    
    def getText(self, inputPath):
        spark, sc = self.getContext()
        rawData = sc.textFile(inputPath)
        rawData = (rawData.map(lambda x:x.split("\t"))
                            .map(lambda x:(myInt(x[0]),myInt(x[1]),myInt(x[2]),myInt(x[3]))))
        ratingDf =spark.createDataFrame(rawData,("user","item","rating","timestamp"))
        return ratingDf.na.drop() 
                 

    def drawPic(self,df,picSavePath):
        df.registerTempTable('df')
        sqlString ='''with t1 as (select itemid,count(itemid) as itemcnt from df group by itemid),
            t2 as (select userid,count(userid) as usercnt from df group by userid),
            t3 as (select userid,df.itemid,itemcnt from df,t1 
            where t1.itemid=df.itemid sort by userid desc),
            t4 as (select userid,sum(itemcnt)/count(itemcnt) as avgitem from t3 group by userid) 
            select t2.usercnt,t4.avgitem from t4 left join t2 on t2.userid=t4.userid'''
        pDf = spark.sql(sqlString).toPandas()
        pDf.plot(x='usercnt',y='avgitem',kind='scatter')
        plt.axis([0,5000,0,5000])
        plt.savefig(picSavePath)
        print "pic saved in %s" %(picSavePath)
        
    def drawPic2(self,df,picSavePath):
        df.registerTempTable('df')
        sqlString ="with t1 as (select itemid,count(itemid) as itemcnt from df group by itemid) select itemcnt from t1 sort by itemcnt desc"
        pDf = spark.sql(sqlString).toPandas()
        pDf.plot(y='avgitem',kind='scatter')
        plt.axis([0,5000,0,5000])
        plt.savefig(picSavePath)
        print "pic saved in %s" %(picSavePath)
    
    def indexer(self,df):
        userIndexer = StringIndexer(inputCol="user",outputCol="userid",handleInvalid="error").fit(df)
        itemIndexer = StringIndexer(inputCol="item",outputCol="itemid").fit(df)
        pipeAls = Pipeline(stages=[userIndexer,itemIndexer])
        pipeAlsModel = pipeAls.fit(df)
        return pipeAlsModel.transform(df)
    
    def onceAls(self,reg,inputDf,outputDf,rank=5,maxIter=5):
        als = ALS(rank=rank,userCol="userid",itemCol="itemid",ratingCol="rating",maxIter=maxIter,regParam=reg)
        alsModel = als.fit(inputDf)
        predictions =alsModel.transform(outputDf)
        evaluator = (RegressionEvaluator().setMetricName("rmse")
                    .setLabelCol("rating").setPredictionCol("prediction"))
        predictions = predictions.select('userid','itemid','rating','prediction').na.drop()
        rmse = evaluator.evaluate(predictions)
        return alsModel,rmse
    
    def regOpt(self,df,regParam=[0.0001],ranks=[5],maxIters=[5]):
        trainingData,cvData,testData = ratingDf.randomSplit(seed=40,weights=[0.6,0.2,0.2])
        trainingData.cache()
        rsme = sys.maxint
        best = [0,0,0]
        t=[]
        for i in range(len(regParam)):
            for j in range(len(maxIters)):
                for n in range(len(ranks)):
                    start = time.time()
                    tmprsme = self.onceAls(regParam[i],trainingData,cvData,maxIter=maxIters[j],rank=ranks[n])[1]
                    if tmprsme < rsme:
                        rsme = tmprsme
                        best = [i,j,n]
                    end = time.time()
                    t.append(end -start)
                    print "The reg:%f,the rsmes:%f,the maxIter:%d,the ranks:%d, time: %f s" \
                    %(regParam[i],rsme,ranks[n],maxIters[j],t[i])
        bestReg = regParam[best[0]]
        bestRank = ranks[best[1]]
        bestMaxIter = maxIters[best[2]]
        print "The best regParam is %f,the cv rsmes is %f,avg time:%f s" %(bestReg,rsme,__builtin__.sum(t)/len(t))
        print "The best Rank is %d,the best maxIter is %d" %(bestRank,bestMaxIter)
        alsModel,rmse = self.onceAls(bestReg,trainingData,testData)
        alsModel.transform(testData).limit(10).show()
        print "The als model rmse is %f" %rmse
        return alsModel
    
    def distince(self,df):
        ratingDf.registerTempTable("Ratings")
        sqlString="""with t1 as (select count(distinct(user)) as users from Ratings),
           t2 as (select count(distinct(item)) as items from Ratings), 
           t3 as (select count(distinct(rating)) as ratings from Ratings) 
           select * from t1 cross join t2 cross join t3"""
        self.__spark.sql(sqlString).show()
        

    def trainValidation(self,df):
        train,test = df.randomSplit([8.0,2.0],40)
        userIndexer = StringIndexer(inputCol="user",outputCol="userid",handleInvalid="skip").fit(df)
        itemIndexer = StringIndexer(inputCol="item",outputCol="itemid",handleInvalid="skip").fit(df)
        als = ALS(rank=5,userCol="userid",
                       itemCol="itemid",ratingCol="rating",maxIter=5,regParam=0.0001)
        pipeAls = Pipeline(stages=[userIndexer,itemIndexer,als])
        evaluator = RegressionEvaluator()\
              .setMetricName("rmse")\
              .setLabelCol("rating")\
              .setPredictionCol("prediction")
        paramGrid = ParamGridBuilder() \
            .addGrid(als.rank,[10]) \
            .addGrid(als.maxIter,[10]) \
            .addGrid(als.regParam,logspace(0,4,5,base=3)*0.0001)\
            .build()
        tvs = TrainValidationSplit(estimator=pipeAls,
                                   estimatorParamMaps=paramGrid,
                                   evaluator=evaluator,
                                   # 80% of the data will be used for training, 20% for validation.
                                   trainRatio=0.8)
        model = tvs.fit(train)
        prediction = model.transform(test)
        prediction.limit(10).show()
        prediction.cache()
        rmse = evaluator.evaluate(prediction.na.drop())
        print "rmse: ",rmse
        return model
    
    def Recommendation(self,model,user):
        prediction = model.transform(userid)
        prediction.registerTempTable('predictions')
        sqlString = "select *,row_number()  OVER(PARTITION BY predictions.userid ORDER BY prediction desc) as flag\
                    from predictions group by userid "
        prediction = self.spark.sql("")
    
    def crossValidation(self,df):
        train,test = df.randomSplit([8.0,2.0],seed=40)
        userIndexer = StringIndexer(inputCol="user",outputCol="userid",handleInvalid="skip").fit(df)
        itemIndexer = StringIndexer(inputCol="item",outputCol="itemid",handleInvalid="skip").fit(df)
        als = ALS(rank=5,userCol="userid",
                       itemCol="itemid",ratingCol="rating",maxIter=5,regParam=0.0001)
        pipeAls = Pipeline(stages=[userIndexer,itemIndexer,als])
        evaluator = (RegressionEvaluator()
              .setMetricName("rmse")
              .setLabelCol("rating")
              .setPredictionCol("prediction"))
        paramGrid = (ParamGridBuilder() 
            .addGrid(als.maxIter,[5]) 
            .addGrid(als.regParam,logspace(0,4,5,base=3)*0.0001)
            .build())
        crossval = (CrossValidator(estimator=pipeAls,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=2))  # use 3+ folds in practice

        cvModel = crossval.fit(train)
        cvModel.transform(test).show()
        return cvModel
    
    def Precision(self,model,testDf):
        iDf = testDf.select('item').distinct()
        uDf = testDf.select('user').distinct()
        allTrain=iDf.crossJoin(uDf)
        preDf = model.transform(testDf)
        preDf2 = model.transform(allTrain).select('userid','itemid','prediction').na.drop()
        preDf2.registerTempTable('predictions')
        sqlString = "select userid,itemid,prediction from \
            (select *,row_number() OVER(PARTITION BY predictions.userid ORDER BY prediction desc) as flag\
             from predictions) t1 where t1.flag<=10"
        resultDf = spark.sql(sqlString)
        rc = resultDf.count()
        cond = [resultDf.userid==preDf.userid,preDf.itemid==resultDf.itemid]
        TP=resultDf.join(resultDf,cond,'left_semi').count()*1.0
        P=rc*1.0
        return TP/P
        
        
if __name__ == "__main__":
    start = time.time()
    s = spark(appName="als")
    spark, sc = s.getContext()
    sc.setLogLevel('WARN')
    #   inputPath = "data/sample.json"
#     inputPath = "data/reviews_Books_5.json"
#     ratingDf = s.getJson(inputPath)
    inputPath = "data/ml-10M/ratings.dat"
#     inputPath = "data/ml-100k/u.data"
    ratingDf = s.getText(inputPath)
    model = s.trainValidation(ratingDf)
#     model = s.crossValidation(ratingDf)
    trainingDf,testDf=ratingDf.randomSplit([8.0,2.0],40)
    precise = s.Precision(model,testDf)
#   s.drawPic(ratingDf)
#     regem = logspace(0,0,1,base=3)*0.0001
#     regem = arange(0.5,0.7,0.01)
#     ranks =[5,6,7,8,9,10]
#     maxIters=[5,8,10]
#     alsModel = s.regOpt(ratingDf,regem,ranks,maxIters)
    print "精度：",precise
    end = time.time()
    print "total:%f s" %(end-start)

    


ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:33065)
Traceback (most recent call last):
  File "/home/yanbin/hadoop-2.6/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 963, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 228, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:33065)

In [7]:
trainingDf,testDf=ratingDf.randomSplit([8.0,2.0],40)
# trainPre = model.transform(trainingDf)
preDf = model.transform(testDf)


ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:35293)
Traceback (most recent call last):
  File "/home/yanbin/hadoop-2.6/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 963, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 228, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:35293)

In [None]:

resultDf.limit(20).show()


In [None]:

end = time.time()
print "total:%f s" %(end-start)

In [120]:
# ratingDf.registerTempTable('df')
# avgDf=spark.sql("select user,avg(rating) as avgrating from df group by user")
# avgDf.registerTempTable('avgDf')
# preDf.registerTempTable('predf')
# preDf2=spark.sql("select predf.user,predf.item,rating,case isNaN(predf.prediction) when True then avgDf.avgrating \
#             else predf.prediction end as prediction from predf left join avgDf on predf.user=avgDf.user")
# print "测试集rmse",evaluator.evaluate(preDf2)
# preDf.cache()
iDf = testDf.select('item').distinct()
uDf = testDf.select('user').distinct()
allTrain=iDf.crossJoin(uDf)
allTrain.cache()
preDf2 = model.transform(allTrain)
preDf2.registerTempTable('predictions')
sqlString = "select user,item,userid,itemid,prediction from \
            (select *,row_number() OVER(PARTITION BY predictions.userid ORDER BY prediction desc) as flag\
             from predictions) t1 where t1.flag<=10"

resultDf = spark.sql(sqlString)
resultDf.show()
print "R(u):",resultDf.count()
print "T(u):",testDf.count()
cond = [resultDf.user==preDf2.user,preDf2.item==resultDf.item]
TP=resultDf.join(trainPre,cond,'left_semi').count()*1.0
P=resultDf.count()*1.0
print TP,P,TP/P
# iDf.count()*\
# uDf.count()

KeyboardInterrupt: 

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 49558)
----------------------------------------


Traceback (most recent call last):
  File "/usr/lib/python2.7/SocketServer.py", line 290, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python2.7/SocketServer.py", line 318, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python2.7/SocketServer.py", line 331, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python2.7/SocketServer.py", line 652, in __init__
    self.handle()
  File "/home/yanbin/hadoop-2.6/spark/python/pyspark/accumulators.py", line 235, in handle
    num_updates = read_int(self.rfile)
  File "/home/yanbin/hadoop-2.6/spark/python/pyspark/serializers.py", line 557, in read_int
    raise EOFError
EOFError


In [66]:
# trainPre.registerTempTable('trainPre')
# sqlString="select a.* from predf a left outer join trainPre b on a.userid=b.userid or a.itemid=b.itemid\
#            where b.userid is null and b.itemid is null"
# preDf2 = spark.sql(sqlString)


+----+------+------+------+----------+
|user|itemid|userid|itemid|prediction|
+----+------+------+------+----------+
| 391| 601.0| 299.0| 601.0| 4.8130064|
| 391| 288.0| 299.0| 288.0| 4.6604967|
| 391| 484.0| 299.0| 484.0| 4.2861247|
| 391|  46.0| 299.0|  46.0|  4.248642|
| 391| 161.0| 299.0| 161.0| 4.0879326|
| 391| 127.0| 299.0| 127.0| 3.9147713|
| 391|  10.0| 299.0|  10.0| 3.9085507|
| 391| 444.0| 299.0| 444.0| 3.7267966|
| 391|  39.0| 299.0|  39.0|  3.691269|
| 391|  61.0| 299.0|  61.0| 3.6358602|
| 890| 855.0| 305.0| 855.0| 5.3031864|
| 890| 211.0| 305.0| 211.0| 5.1434965|
| 890|1098.0| 305.0|1098.0| 4.5956063|
| 890| 269.0| 305.0| 269.0| 4.5002117|
| 890| 135.0| 305.0| 135.0| 4.4449043|
| 890|   5.0| 305.0|   5.0| 4.4439526|
| 890| 316.0| 305.0| 316.0| 4.3967204|
| 890|  79.0| 305.0|  79.0| 4.3315735|
| 890| 481.0| 305.0| 481.0| 4.1904006|
| 890| 113.0| 305.0| 113.0|  4.111329|
+----+------+------+------+----------+
only showing top 20 rows



In [65]:
def Recommendation(resultDf,user):
    items = resultDf.filter(resultDf.user==user).rdd.map(lambda x:x.itemid).collect()
    return items
print Recommendation(resultDf,'601.0')
# sqlString = "select userid,itemid,prediction from \
#             (select *,row_number() OVER(PARTITION BY predictions.userid ORDER BY prediction desc) as flag\
#             from predictions)\
#             from p"
# resultDf = spark.sql(sqlString)
# resultDf.show()
# spark.sql("select a.* from predf a left outer join trainPre b on a.userid=b.userid and a.itemid=b.itemid\
#            where b.userid is null and b.itemid is null").show()

[74.0, 496.0, 122.0, 67.0, 182.0, 38.0, 99.0, 45.0, 208.0, 299.0]


In [102]:
cond = [preDf.userid==trainPre.userid]
print '测试数据集总量：',preDf.count()
xDf = preDf.join(trainPre,cond,'left_semi')
print "测试数据集去除新加入用户:",xDf.count()
yDf = xDf.join(trainPre,xDf.itemid==trainPre.itemid,'left_semi')
print "测试数据集去除新加入商品:",yDf.count()
yDf.registerTempTable('predictions')
sqlString = "select user,itemid,userid,itemid,prediction from \
            (select *,row_number() OVER(PARTITION BY predictions.userid ORDER BY prediction desc) as flag\
            from predictions) t1 where t1.flag<=10"
resultDf = spark.sql(sqlString)
print "R(u):",resultDf.count()
cond = [preDf.userid==trainPre.userid,preDf.itemid==trainPre.itemid]
TP=resultDf.join(trainPre,cond,'left_semi').count()*1.0
P=resultDf.count()*1.0
print TP,P,TP/P

测试数据集总量： 20153
测试数据集去除新加入用户 20153
测试数据集去除新加入商品 20122
R(u): 7897
0.0 7897.0 0.0


In [97]:
resultDf.filter('userid=299.0').show()
trainPre.filter('userid=299.0').show()


+----+------+------+------+----------+
|user|itemid|userid|itemid|prediction|
+----+------+------+------+----------+
| 391| 601.0| 299.0| 601.0| 4.8130064|
| 391| 288.0| 299.0| 288.0| 4.6604967|
| 391| 484.0| 299.0| 484.0| 4.2861247|
| 391|  46.0| 299.0|  46.0|  4.248642|
| 391| 161.0| 299.0| 161.0| 4.0879326|
| 391| 127.0| 299.0| 127.0| 3.9147713|
| 391|  10.0| 299.0|  10.0| 3.9085507|
| 391| 444.0| 299.0| 444.0| 3.7267966|
| 391|  39.0| 299.0|  39.0|  3.691269|
| 391|  61.0| 299.0|  61.0| 3.6358602|
+----+------+------+------+----------+

+----+----+------+---------+------+------+----------+
|user|item|rating|timestamp|userid|itemid|prediction|
+----+----+------+---------+------+------+----------+
| 391|  58|     4|877398898| 299.0| 148.0| 3.3644426|
| 391| 195|     2|877399618| 299.0|  31.0| 3.9461212|
| 391| 497|     3|877399133| 299.0| 472.0| 3.5334911|
| 391| 197|     5|877399380| 299.0|  78.0| 4.5401964|
| 391|  60|     5|877399746| 299.0| 513.0| 4.5448065|
| 391| 603|     5|877

In [93]:
df1=spark.createDataFrame([['a',1],['b',2],['c',3]],["name","socre"])
df2=spark.createDataFrame([['a',1],['d',2],['e',3]],["name","socre"])
condition=[df1.name==df2.name,df1.socre==df2.socre]
hows='inner,cross,outer,full,full_outer,left,left_outer,right,right_outer,left_semi,left_anti'.split(',')
print "=====df1====="
df1.show()
print "=====df2====="
df2.show()
for how in hows:
    print "====%s===========" %(how)
    df1.join(df2,condition,how=how).show()

=====df1=====
+----+-----+
|name|socre|
+----+-----+
|   a|    1|
|   b|    2|
|   c|    3|
+----+-----+

=====df2=====
+----+-----+
|name|socre|
+----+-----+
|   a|    1|
|   d|    2|
|   e|    3|
+----+-----+

+----+-----+----+-----+
|name|socre|name|socre|
+----+-----+----+-----+
|   a|    1|   a|    1|
+----+-----+----+-----+

+----+-----+----+-----+
|name|socre|name|socre|
+----+-----+----+-----+
|   a|    1|   a|    1|
+----+-----+----+-----+

+----+-----+----+-----+
|name|socre|name|socre|
+----+-----+----+-----+
|   a|    1|   a|    1|
|null| null|   d|    2|
|   c|    3|null| null|
|   b|    2|null| null|
|null| null|   e|    3|
+----+-----+----+-----+

+----+-----+----+-----+
|name|socre|name|socre|
+----+-----+----+-----+
|   a|    1|   a|    1|
|null| null|   d|    2|
|   c|    3|null| null|
|   b|    2|null| null|
|null| null|   e|    3|
+----+-----+----+-----+

+----+-----+----+-----+
|name|socre|name|socre|
+----+-----+----+-----+
|   a|    1|   a|    1|
|null| null|   d

In [3]:
# resultDf.show()
from pyspark.ml.pipeline import Pipeline,PipelineModel
from pyspark.ml.feature import StringIndexer
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import *
from pyspark.sql.functions import *
inputPath="data/sample.json"
# rawdata.select("reviewerID","asin","reviewerName","overall","reviewTime").show()
inputPath = "data/reviews_Books_5.json"
rawdata = spark.read.json(inputPath)
ratingDf = rawdata.select(rawdata.reviewerID.alias("user"),\
               rawdata.asin.alias("item"),\
               rawdata.overall.alias("rating"))
ratingDf = ratingDf.select(ratingDf.user,ratingDf.item,ratingDf.rating.cast(FloatType()))
ratingDf.registerTempTable("Ratings")
sqlString="with t1 as (select count(distinct(user)) as users from Ratings),\
           t2 as (select count(distinct(item)) as items from Ratings), \
           t3 as (select count(distinct(rating)) as ratings from Ratings) \
            select * from t1 cross join t2 cross join t3"
# rawdata.select("reviewerID").distinct().count()
resultDf = spark.sql(sqlString)

resultDf.show()
predictionAls.sort(asc("userid")).limit(10).show()
predictionRecommendation.sort(asc("userid")).limit(10).show()
rmse = evaluator.evaluate(predictionRecommendation.select('rating','prediction').na.drop())
print "RMSE:",rmse

+-----+-----+-------+
|users|items|ratings|
+-----+-----+-------+
|  100|    1|      5|
+-----+-----+-------+

+--------------+----------+------+------+------+----------+
|          user|      item|rating|userid|itemid|prediction|
+--------------+----------+------+------+------+----------+
| A1FQPOYRBTTK1|000100039X|   5.0|   0.0|   0.0|  4.999896|
|A1NPNGWBVD9AK3|000100039X|   5.0|   3.0|   0.0|  4.999896|
|A1MOSTXNIO5MPJ|000100039X|   5.0|   5.0|   0.0|  4.999896|
| AA6C78DHRK962|000100039X|   5.0|   8.0|   0.0|  4.999896|
| AMRZ5G7HF7I03|000100039X|   5.0|   9.0|   0.0|  4.999896|
|A2X6GEC6LCDN4S|000100039X|   5.0|  11.0|   0.0|  4.999896|
|A1TT4CY55WLHAR|000100039X|   5.0|  12.0|   0.0|  4.999896|
|A3H65DAAV98C8F|000100039X|   5.0|  13.0|   0.0|  4.999896|
| AAFLZI7MX9UIG|000100039X|   5.0|  15.0|   0.0|  4.999896|
|A2ZB1G1KUE6OS6|000100039X|   2.0|  19.0|   0.0| 1.9999582|
+--------------+----------+------+------+------+----------+

+--------------------+----------+------+------+-

Py4JJavaError: An error occurred while calling o179.evaluate.
: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$4: (string) => double)
	at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1072)
	at org.apache.spark.sql.catalyst.expressions.EqualNullSafe.eval(predicates.scala:470)
	at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$canFilterOutNull(joins.scala:116)
	at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$7.apply(joins.scala:125)
	at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$7.apply(joins.scala:125)
	at scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93)
	at scala.collection.immutable.List.exists(List.scala:84)
	at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$buildNewJoinType(joins.scala:125)
	at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:140)
	at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:138)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:287)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.apply(joins.scala:138)
	at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.apply(joins.scala:105)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
	at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
	at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
	at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:73)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
	at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2547)
	at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2544)
	at org.apache.spark.ml.evaluation.RegressionEvaluator.evaluate(RegressionEvaluator.scala:82)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Unseen label: null.
	at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:170)
	at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:166)
	at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:89)
	at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:88)
	at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1069)
	... 59 more


In [5]:
trainingData,cvData,testData = ratingDf.randomSplit([0.6,0.2,0.2])
userIndexer = StringIndexer(inputCol="user",outputCol="userid",handleInvalid="error").fit(ratingDf)
itemIndexer = StringIndexer(inputCol="item",outputCol="itemid").fit(ratingDf)
alsModel = ALS(rank=5,userCol="userid",itemCol="itemid",ratingCol="rating",maxIter=5,regParam=0.0001)
pipeAls = Pipeline(stages=[userIndexer,itemIndexer,alsModel])
pipeAlsModel = pipeAls.fit(trainingData)
predictionAls = pipeAlsModel.transform(trainingData)
predictionRecommendation= pipeAlsModel.transform(testData)
evaluator = RegressionEvaluator()\
      .setMetricName("rmse")\
      .setLabelCol("rating")\
      .setPredictionCol("prediction")

IllegalArgumentException: u'requirement failed: Nothing has been added to this summarizer.'

In [6]:
predictionRecommendation.select('itemid','userid','rating','prediction').na.drop().show()

+------+------+------+----------+
|itemid|userid|rating|prediction|
+------+------+------+----------+
+------+------+------+----------+



In [10]:
ratingDf.show()

+--------------------+----------+------+
|              userid|    itemid|rating|
+--------------------+----------+------+
|A10000012B7CGYKOM...|000100039X|   5.0|
|      A2S166WSCFIFP5|000100039X|   5.0|
|      A1BM81XB4QHOA3|000100039X|   5.0|
|      A1MOSTXNIO5MPJ|000100039X|   5.0|
|      A2XQ5LZHTD4AFT|000100039X|   5.0|
|      A3V1MKC2BVWY48|000100039X|   5.0|
|      A12387207U8U24|000100039X|   5.0|
|      A29TRDMK51GKZR|000100039X|   5.0|
|      A3FI0744PG1WYG|000100039X|   5.0|
|      A2LBBQHYLEHM7P|000100039X|   5.0|
|      A1340OFLZBW5NG|000100039X|   5.0|
|      A2KU9IU07LOJS1|000100039X|   5.0|
|      A2WVHIRDMLM82E|000100039X|   5.0|
|      A2I35JB67U20C0|000100039X|   5.0|
|      A19N3FCQCLJYUA|000100039X|   5.0|
|      A3FFNE1DR5SI1W|000100039X|   5.0|
|      A1TT4CY55WLHAR|000100039X|   5.0|
|      A2X4HE21JTAL98|000100039X|   5.0|
|       ARDQ9KNB8K22N|000100039X|   5.0|
|       A27ZH1AQORJ1L|000100039X|   5.0|
+--------------------+----------+------+
only showing top

Row(id=4, text=u'spark i j k', probability=DenseVector([0.2661, 0.7339]), prediction=1.0)
Row(id=5, text=u'l m n', probability=DenseVector([0.9209, 0.0791]), prediction=0.0)
Row(id=6, text=u'mapreduce spark', probability=DenseVector([0.4429, 0.5571]), prediction=1.0)
Row(id=7, text=u'apache hadoop', probability=DenseVector([0.8584, 0.1416]), prediction=0.0)


+--------------------+--------------------+--------------------+
|            features|               label|          prediction|
+--------------------+--------------------+--------------------+
|(10,[0,1,2,3,4,5,...|  -23.51088409032297| -1.6659388625179559|
|(10,[0,1,2,3,4,5,...| -21.432387764165806|  0.3400877302576284|
|(10,[0,1,2,3,4,5,...| -12.977848725392104|-0.02335359093652395|
|(10,[0,1,2,3,4,5,...| -11.827072996392571|  2.5642684021108417|
|(10,[0,1,2,3,4,5,...| -10.945919657782932| -0.1631314487734783|
|(10,[0,1,2,3,4,5,...|  -10.58331129986813|   2.517790654691453|
|(10,[0,1,2,3,4,5,...| -10.288657252388708| -0.9443474180536754|
|(10,[0,1,2,3,4,5,...|  -8.822357870425154|  0.6872889429113783|
|(10,[0,1,2,3,4,5,...|  -8.772667465932606|  -1.485408580416465|
|(10,[0,1,2,3,4,5,...|  -8.605713514762092|   1.110272909026478|
|(10,[0,1,2,3,4,5,...|  -6.544633229269576|  3.0454559778611285|
|(10,[0,1,2,3,4,5,...|  -5.055293333055445|  0.6441174575094268|
|(10,[0,1,2,3,4,5,...|  -

In [52]:
inputPath="data/ratings_Electronics_sample.csv"
rawdata = spark.read.csv(inputPath)
df = rawdata.select(rawdata._c0.alias("user"),\
                       rawdata._c1.alias("item"),\
                       rawdata._c2.astype("float").alias("rating"),\
                       rawdata._c3.alias("timestamp"))
trainingData,cvData,testData = df.randomSplit([0.6,0.2,0.2])

In [54]:
from pyspark.ml.pipeline import Pipeline,PipelineModel
from pyspark.ml.feature import StringIndexer
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import *



+--------------+----------+------+----------+------+------+----------+
|          user|      item|rating| timestamp|userid|itemid|prediction|
+--------------+----------+------+----------+------+------+----------+
|A2SRW2W1GXYKQZ|0594481813|   5.0|1403827200|   1.0|   1.0|  4.999883|
|A2OC1UK7VMBLOT|0594481813|   4.0|1398643200|   2.0|   1.0| 3.9999063|
| A6DI4IE8MKFYR|0972683275|   5.0|1363219200|   3.0|   0.0|  4.999068|
|A106YUCY4SVX1D|0972683275|   5.0|1310083200|   4.0|   0.0|  4.999068|
|A3OYE7X2O08LNT|0972683275|   4.0|1360195200|   5.0|   0.0| 3.9992542|
|A2ULCET06LOPBB|0972683275|   5.0|1369785600|   6.0|   0.0|  4.999068|
|A3M122DYN9L5N8|0594451647|   1.0|1388707200|   8.0|   4.0| 0.9999989|
|A3IIGCFLKVFW8M|0972683275|   5.0|1393459200|   9.0|   0.0|  4.999068|
|A39Z4OU2C7ENWH|0972683275|   3.0|1328572800|  10.0|   0.0| 2.9994407|
|A3T3XKC7H5ACI1|0594033934|   5.0|1401235200|  12.0|  16.0|   4.99998|
|A1RPEK98P97J7W|0972683275|   5.0|1359504000|  14.0|   0.0|  4.999068|
|A3S0R

In [49]:
from pyspark.sql.functions import *
prediction.select("prediction").distinct().count()

1