In [1]:
from pyspark.mllib.recommendation import ALS,Rating,MatrixFactorizationModel

In [2]:
rawData=sc.textFile('ml-100k/u.data')

In [3]:
rawData.first()

u'196\t242\t3\t881250949'

In [4]:
ratings=rawData.map(lambda x:x.split('\t')).map(lambda l:Rating(int(l[0]),int(l[1]),float(l[2])))

In [5]:
ratings.first()

Rating(user=196, product=242, rating=3.0)

In [6]:
rank=50
iterations=10

In [7]:
model=ALS.train(ratings,rank,iterations)

In [8]:
model.userFeatures().count()

943

In [9]:
model.productFeatures().count()

1682

In [10]:
model.predict(789,123)

3.9975659452254253

In [11]:
userId=789
K=10

In [12]:
topKRecs=model.recommendProducts(userId,K)

In [13]:
topKRecs

[Rating(user=789, product=675, rating=6.320453111704653),
 Rating(user=789, product=135, rating=5.663639654913135),
 Rating(user=789, product=693, rating=5.643467743801867),
 Rating(user=789, product=182, rating=5.553230730383625),
 Rating(user=789, product=488, rating=5.5180873420523895),
 Rating(user=789, product=185, rating=5.503541254998193),
 Rating(user=789, product=56, rating=5.437605185787937),
 Rating(user=789, product=518, rating=5.391559446813668),
 Rating(user=789, product=603, rating=5.385991574174252),
 Rating(user=789, product=30, rating=5.310965664778898)]

In [14]:
movies=sc.textFile("ml-100k/u.item")

In [15]:
movies.first()

u'1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0'

In [16]:
titles=movies.map(lambda line:line.split("|")).map(lambda line:(int(line[0]),line[1])).collectAsMap()

In [17]:
titles[123]

u'Frighteners, The (1996)'

In [18]:
ratings.first()

Rating(user=196, product=242, rating=3.0)

In [19]:
movieForUser=ratings.keyBy(lambda x:x.user).lookup(789)

In [20]:
len(movieForUser)

33

In [21]:
movieForUser=sc.parallelize(movieForUser).sortBy(lambda x:x.rating,ascending=False)#先将list转为RDD

In [22]:
movieForUser.first()

Rating(user=789, product=127, rating=5.0)

In [23]:
movieForUser.map(lambda line:(titles[line.product],line.rating)).take(10)

[(u'Godfather, The (1972)', 5.0),
 (u'Trainspotting (1996)', 5.0),
 (u'Dead Man Walking (1995)', 5.0),
 (u'Star Wars (1977)', 5.0),
 (u'Swingers (1996)', 5.0),
 (u'Leaving Las Vegas (1995)', 5.0),
 (u'Bound (1996)', 5.0),
 (u'Fargo (1996)', 5.0),
 (u'Last Supper, The (1995)', 5.0),
 (u'Private Parts (1997)', 4.0)]

In [24]:
topKRecs=sc.parallelize(topKRecs)

In [25]:
topKRecs.map(lambda line:(titles[line.product],line.rating)).take(10)

[(u'Nosferatu (Nosferatu, eine Symphonie des Grauens) (1922)',
  6.320453111704653),
 (u'2001: A Space Odyssey (1968)', 5.663639654913135),
 (u'Casino (1995)', 5.643467743801867),
 (u'GoodFellas (1990)', 5.553230730383625),
 (u'Sunset Blvd. (1950)', 5.5180873420523895),
 (u'Psycho (1960)', 5.503541254998193),
 (u'Pulp Fiction (1994)', 5.437605185787937),
 (u"Miller's Crossing (1990)", 5.391559446813668),
 (u'Rear Window (1954)', 5.385991574174252),
 (u'Belle de jour (1967)', 5.310965664778898)]

In [26]:
import numpy as np

In [27]:
def cosineSImilarity(x,y):
    return np.dot(x,y)/(np.linalg.norm(x)*np.linalg.norm(y))

In [29]:
testx=np.array([1.0,2.0,3.0])
cosineSImilarity(testx,testx)

1.0

In [30]:
itemId=567
itemFactor=model.productFeatures().lookup(itemId)[0]

In [41]:
sims=model.productFeatures().map(lambda (id,factor):(id,cosineSImilarity(np.array(factor),np.array(itemFactor))))

In [53]:
sims.sortBy(lambda (x,y):y,ascending=False).map(lambda (x,y):(titles[x],y)).take(11)

[(u"Wes Craven's New Nightmare (1994)", 1.0),
 (u'Tales from the Crypt Presents: Bordello of Blood (1996)',
  0.71625703446945999),
 (u'Army of Darkness (1993)', 0.70045590720233386),
 (u'Batman (1989)', 0.68610765085660463),
 (u'Evil Dead II (1987)', 0.68112580576396298),
 (u'Escape from New York (1981)', 0.67772719731275688),
 (u"Stephen King's The Langoliers (1995)", 0.67506804063022674),
 (u'Fierce Creatures (1997)', 0.67017336600034927),
 (u'Star Trek IV: The Voyage Home (1986)', 0.66609289021578766),
 (u'Pompatus of Love, The (1996)', 0.66456853599925403),
 (u'Last Supper, The (1995)', 0.66259787398097159)]

In [46]:
titles[itemId]

u"Wes Craven's New Nightmare (1994)"

In [63]:
actual=movieForUser.take(1)[0]

In [64]:
actualRating=actual.rating

In [66]:
predictedRating=model.predict(789,actual.product)

In [67]:
squaredError=np.power(actualRating-predictedRating,2)

In [68]:
squaredError

0.0019197459945576867

In [69]:
userProducts=ratings.map(lambda rating:(rating.user,rating.product))

In [70]:
userProducts.take(5)

[(196, 242), (186, 302), (22, 377), (244, 51), (166, 346)]

In [77]:
model.predictAll(userProducts).first()

Rating(user=316, product=1084, rating=4.119838733246423)

In [80]:
predictions=model.predictAll(userProducts).map(lambda rating:((rating.user,rating.product),rating.rating))

In [81]:
predictions.take(5)

[((316, 1084), 4.119838733246423),
 ((504, 1084), 4.072617254774273),
 ((424, 1084), 4.976444456823297),
 ((541, 1084), 4.136577587274045),
 ((181, 1084), 1.997842446932531)]

In [92]:
ratingsAndPredictions=ratings.map(lambda rating:((rating.user,rating.product),rating.rating)).join(predictions)

In [93]:
ratingsAndPredictions.take(5)

[((711, 707), (5.0, 4.95815174004629)),
 ((650, 622), (3.0, 3.038367320072271)),
 ((472, 584), (1.0, 1.5927066637609788)),
 ((752, 316), (3.0, 2.98831121022243)),
 ((18, 428), (3.0, 3.3914034640895614))]

In [97]:
MSE=ratingsAndPredictions.map(lambda ((x,y),(m,n)):np.power(m-n,2)).reduce(lambda x,y:x+y)/ratingsAndPredictions.count()

In [99]:
np.sqrt(MSE)

0.29206118204091508

In [100]:
def avgPrecisionK(actual,predicted,k):
    if len(predicted)>k:
        predK=predicted[:k]
    else:
        predK=predicted
    score=0.0
    numHits=0.0
    for i,p in enumerate(predK):
        if p in actual and p not in predK:
            numHits=numHits+1
            score=score+numHits/(i+1)
    if not actual:
        return 1.0
    else:
        return score/min(len(actual),k)

In [112]:
movieForUser

PythonRDD[316] at RDD at PythonRDD.scala:48

In [113]:
actualMovies=[rating.product for rating in movieForUser.collect()]

In [117]:
predictMovies=[rating.product for rating in topKRecs.collect()]

In [118]:
predictMovies

[675, 135, 693, 182, 488, 185, 56, 518, 603, 30]

In [119]:
MAP10=avgPrecisionK(actualMovies,predictMovies,10)

In [120]:
MAP10

0.0

In [121]:
itemFactors=model.productFeatures().map(lambda (id,factor):factor).collect()

In [124]:
itemMatrix=np.array(itemFactors)

In [125]:
itemMatrix.shape

(1682, 50)

In [126]:
imBroadcast=sc.broadcast(itemMatrix)

In [127]:
userVector=model.userFeatures().map(lambda (userId,array):(userId,np.array(array)))

In [131]:
userVector=userVector.map(lambda (userId,x):(userId,imBroadcast.value.dot((np.array(x).transpose()))))

In [132]:
userVectorId=userVector.map(lambda (userId,x):(userId,[(xx,i) for i,xx in enumerate(x.tolist())]))

In [138]:
sortUserVectorId=userVectorId.map(lambda (userId,x):(userId,sorted(x,key=lambda x:x[0],reverse=True)))

In [140]:
sortUserVectorRecId=sortUserVectorId.map(lambda (userId,x):(userId,[xx[1] for xx in x]))

In [141]:
sortUserVectorRecId.count()

943

In [142]:
userMovies=ratings.map(lambda rating:(rating.user,rating.product)).groupBy(lambda (x,y):x)

In [145]:
userMovies=userMovies.map(lambda (userId,x):(userId,[xx[1] for xx in x]))

In [146]:
allAPK=sortUserVectorRecId.join(userMovies).map(lambda (userId,(predicted,actual)):avgPrecisionK(actual,predicted,2000))

In [149]:
allAPK.reduce(lambda x,y:x+y)/allAPK.count()

0.0

In [150]:
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.mllib.evaluation import RankingMetrics

In [151]:
predictedAndTrue=ratingsAndPredictions.map(lambda ((userId,product),(predicted,actual)):(predicted,actual))

In [152]:
predictedAndTrue.take(5)

[(5.0, 4.95815174004629),
 (3.0, 3.038367320072271),
 (1.0, 1.5927066637609788),
 (3.0, 2.98831121022243),
 (3.0, 3.3914034640895614)]

In [153]:
regressionMetrics=RegressionMetrics(predictedAndTrue)

In [154]:
regressionMetrics.meanSquaredError

0.08529973405513652

In [155]:
regressionMetrics.rootMeanSquaredError

0.2920611820409151

In [156]:
sortedLabels=sortUserVectorRecId.join(userMovies).map(lambda (userId,(predicted,actual)):(predicted,actual))

In [157]:
rankMetrics=RankingMetrics(sortedLabels)

In [158]:
rankMetrics.meanAveragePrecision

0.07233375416125158

In [159]:
rankMetrics.precisionAt(10)

0.07062566277836692