从MovieLens 100k 数据集提取特征
该数据由用户ID，影片ID，星级和时间戳依次组成

In [2]:
val rawData = sc.textFile("../data/ml-100k/u.data")
rawData.first()

196	242	3	881250949

我们需要前面的三个字短时间戳不需要，所以提取前三个字段即可

In [3]:
val rawRatings = rawData.map(_.split('\t').take(3))
rawRatings.first()

Array(196, 242, 3)

In [6]:
import org.apache.spark.mllib.recommendation.ALS
ALS.train

Name: Compile Error
Message: <console>:24: error: ambiguous reference to overloaded definition,
both method train in object ALS of type (ratings: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating], rank: Int, iterations: Int)org.apache.spark.mllib.recommendation.MatrixFactorizationModel
and  method train in object ALS of type (ratings: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating], rank: Int, iterations: Int, lambda: Double)org.apache.spark.mllib.recommendation.MatrixFactorizationModel
match expected type ?
              ALS.train
                  ^
StackTrace: 

说明需要ratings,rank(是在模型中的潜在因素的数量),iterations(迭代是运行的迭代次数),lambda(λ指定ALS正则化参数)

In [7]:
import org.apache.spark.mllib.recommendation.Rating
Rating()

Name: Compile Error
Message: <console>:25: error: not enough arguments for method apply: (user: Int, product: Int, rating: Double)org.apache.spark.mllib.recommendation.Rating in object Rating.
Unspecified value parameters user, product, rating.
              Rating()
                    ^
StackTrace: 

说明这个函数需要 user: Int, product: Int, rating: Double 这三个参数

In [8]:
val ratings = rawRatings.map{ case Array(user, movie, rating) => Rating(user.toInt,movie.toInt,rating.toDouble)}
ratings.first()

Rating(196,242,3.0)

使用MovieLens 100k 数据集训练模型

In [9]:
val model = ALS.train(ratings, 50, 10, 0.01) // 返回MatrixFactorizationModel类型
println(model.userFeatures.count)
println(model.productFeatures.count)

943
1682


从MovieLens 100k 数据集生成电影推荐

In [10]:
val predictedRating = model.predict(789,123)
predictedRating

2.798580688040862

该模型预测用户789对电影123的评分为3.6882841660770698

In [21]:
val userId = 789
val K = 10
val topKPecs = model.recommendProducts(userId,K)
println(topKPecs.mkString("\n"))

Rating(789,693,5.813314375961789)
Rating(789,56,5.520335322657545)
Rating(789,320,5.460135374404292)
Rating(789,412,5.133744344558684)
Rating(789,527,5.119148476064776)
Rating(789,182,5.067447791600747)
Rating(789,108,5.0230360111047645)
Rating(789,475,5.003838567165683)
Rating(789,76,5.003693777869666)
Rating(789,129,5.003023445511154)


In [12]:
val movies = sc.textFile("../data/ml-100k/u.item")
val titles = movies.map(line => line.split("\\|").take(2)).map( array => ( array(0).toInt, array(1) ) ).collectAsMap()
titles(123)

Frighteners, The (1996)

In [13]:
val moviesForUser = ratings.keyBy(_.user).lookup(789)
println(moviesForUser.size)

33


In [14]:
moviesForUser.sortBy(-_.rating).take(10).map(rating => (titles(rating.product),rating.rating)).foreach(println)

(Godfather, The (1972),5.0)
(Trainspotting (1996),5.0)
(Dead Man Walking (1995),5.0)
(Star Wars (1977),5.0)
(Swingers (1996),5.0)
(Leaving Las Vegas (1995),5.0)
(Bound (1996),5.0)
(Fargo (1996),5.0)
(Last Supper, The (1995),5.0)
(Private Parts (1997),4.0)


In [15]:
topKPecs.map( rating => ( titles(rating.product), rating.rating ) ).foreach(println)

(Casino (1995),5.813314375961789)
(Pulp Fiction (1994),5.520335322657545)
(Paradise Lost: The Child Murders at Robin Hood Hills (1996),5.460135374404292)
(Very Brady Sequel, A (1996),5.133744344558684)
(Gandhi (1982),5.119148476064776)
(GoodFellas (1990),5.067447791600747)
(Kids in the Hall: Brain Candy (1996),5.0230360111047645)
(Trainspotting (1996),5.003838567165683)
(Carlito's Way (1993),5.003693777869666)
(Bound (1996),5.003023445511154)


In [16]:
import org.jblas.DoubleMatrix
val aMatrix = new DoubleMatrix( Array(1.0,2.0,3.0) )
aMatrix

[1.000000; 2.000000; 3.000000]

In [17]:
def cosinesSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double = {vec1.dot(vec2)/(vec1.norm2()*vec2.norm2())}

In [18]:
val itemId = 587
val itemFactor = model.productFeatures.lookup(itemId).head
val itemVector = new DoubleMatrix(itemFactor)
cosinesSimilarity(itemVector,itemVector)

1.0

In [19]:
val sims = model.productFeatures.map{
    case(id, factor) => {
        val factorVector = new DoubleMatrix(factor)
        val sim = cosinesSimilarity(factorVector,itemVector)
        (id,sim)
    }
}

In [22]:
val sortedSims = sims.top(K)(Ordering.by[(Int,Double),Double]{case(id,similarity)=>similarity})

In [23]:
println(sortedSims.take(10).mkString("\n"))

(587,1.0)
(203,0.8553947461812577)
(285,0.8524174510561012)
(513,0.847503237055832)
(98,0.8474839866036622)
(357,0.8370917852978695)
(527,0.8268506840340327)
(185,0.8171777003605697)
(193,0.8162315881604099)
(921,0.8131365126928216)


In [24]:
println(titles(itemId))

Hour of the Pig, The (1993)


In [25]:
val sortedSim2 = sims.top(K+1)(Ordering.by[(Int,Double), Double]{case(id, similarity) => similarity})
sortedSim2.slice(1,11).map{ case(id, sim)=>(titles(id),sim)}.mkString("\n")

(Unforgiven (1992),0.8553947461812577)
(Secrets & Lies (1996),0.8524174510561012)
(Third Man, The (1949),0.847503237055832)
(Silence of the Lambs, The (1991),0.8474839866036622)
(One Flew Over the Cuckoo's Nest (1975),0.8370917852978695)
(Gandhi (1982),0.8268506840340327)
(Psycho (1960),0.8171777003605697)
(Right Stuff, The (1983),0.8162315881604099)
(Farewell My Concubine (1993),0.8131365126928216)
(Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1963),0.8110154201102143)

模型效果的评估

In [27]:
val actualRating = moviesForUser.take(1)(0)
actualRating

Rating(789,1012,4.0)

In [28]:
val predictedRating = model.predict(789, actualRating.product)
predictedRating

4.015580977578227

In [30]:
val squaredError = math.pow(predictedRating - actualRating.rating, 2.0)
squaredError

2.4276686229321825E-4

In [31]:
val userProducts = ratings.map{case Rating(user,product,rating)=>(user,product)}
val predictions = model.predict(userProducts).map{case Rating(user,product,rating)=>((user,product),rating)}

In [51]:
val ratingsAndPredictions = ratings.map{case Rating(user,product,rating)=>((user,product),rating)}.join(predictions)
ratingsAndPredictions

MapPartitionsRDD[473] at join at <console>:39

In [33]:
val MSE = ratingsAndPredictions.map{
    case((user,product),(actual,predicted)) => math.pow((actual - predicted),2)
}.reduce(_+_) / ratingsAndPredictions.count
println("Mean Squared Error = " + MSE)

Mean Squared Error = 0.08425286297664059


In [34]:
val RMSE = math.sqrt(MSE)
println("Root Mean Squared Error=" + RMSE)

Root Mean Squared Error=0.29026343720255326


K值平均准确率

In [37]:
def avgPrecisionK(actual:Seq[Int],predicted:Seq[Int],k: Int): Double = {
    val predK = predicted.take(K)
    var score = 0.0
    var numHits = 0.0
    for((p,i) <- predK.zipWithIndex){
        if(actual.contains(p)){
            numHits += 1.0
            score += numHits / (i.toDouble + 1.0)
        }
    }
    if(actual.isEmpty){
        1.0
    }else{
        score / scala.math.min(actual.size, k).toDouble
    }
}

In [38]:
val actualMovies = moviesForUser.map(_.product)
actualMovies

ArrayBuffer(1012, 127, 475, 93, 1161, 286, 293, 9, 50, 294, 181, 1, 1008, 508, 284, 1017, 137, 111, 742, 248, 249, 1007, 591, 150, 276, 151, 129, 100, 741, 288, 762, 628, 124)

In [40]:
val predictedMovies = topKPecs.map(_.product)
predictedMovies

Array(693, 56, 320, 412, 527, 182, 108, 475, 76, 129)

In [42]:
val apk10 = avgPrecisionK(actualMovies,predictedMovies,10)
apk10

0.0325

In [43]:
val itemFactors = model.productFeatures.map{case(id, factor)=>factor}.collect()
val itemMatrix = new DoubleMatrix(itemFactors)
println(itemMatrix.rows, itemMatrix.columns)

(1682,50)


In [46]:
val imBroadcast = sc.broadcast(itemMatrix)
imBroadcast

Broadcast(103)

In [48]:
val allRecs = model.userFeatures.map{
    case(userId,array)=>{
        val userVector = new DoubleMatrix(array)
        val scores = imBroadcast.value.mmul(userVector)
        val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1)
        val recommendedIds = sortedWithId.map(_._2 + 1).toSeq
        (userId, recommendedIds)
    }
}
allRecs

MapPartitionsRDD[462] at map at <console>:43

In [49]:
val userMovies = ratings.map{
    case Rating(user,product,rating) => (user,product)
}.groupBy(_._1)
userMovies

ShuffledRDD[465] at groupBy at <console>:35

In [50]:
val K = 10
val MAPK = allRecs.join(userMovies).map{
    case (userId,(predicted,actualWithIds)) => {
        val actual = actualWithIds.map(_._2).toSeq
        avgPrecisionK(actual,predicted,K)
    }
}.reduce(_+_) / allRecs.count
println("Mean Average Precision at K = " + MAPK)

Mean Average Precision at K = 0.03551574677237455


使用 MLlib 内置的评估函数

In [55]:
import org.apache.spark.mllib.evaluation.RegressionMetrics
val predictedAndTrue = ratingsAndPredictions.map{ case ((user,product),(predicted,actual)) => (predicted, actual)}
val regressionMetrics = new RegressionMetrics(predictedAndTrue)

println("Mean Squared Error = " + regressionMetrics.meanSquaredError)
println("Root Mean Squared Error = " + regressionMetrics.rootMeanSquaredError)

Mean Squared Error = 0.08425286297664057
Root Mean Squared Error = 0.29026343720255326


In [57]:
import org.apache.spark.mllib.evaluation.RankingMetrics
val predictedAndTrueForRanking = allRecs.join(userMovies).map{
    case(userId,(predicted,actualWithIds)) => {
        val actual = actualWithIds.map(_._2)
        (predicted.toArray, actual.toArray)
    }
}
val rankingMetrics = new RankingMetrics(predictedAndTrueForRanking)
println("Mean Average Precision = " + rankingMetrics.meanAveragePrecision)

                                                                                Mean Average Precision = 0.0834671273746858


In [59]:
val MAPK2000 = allRecs.join(userMovies).map{
    case(userId,(predicted,actualWithIds)) => {
        val actual = actualWithIds.map(_._2).toSeq
        avgPrecisionK(actual,predicted,2000)
    }
}.reduce(_+_) / allRecs.count
println("Mean Average Precision =" + MAPK2000)

Mean Average Precision =0.0030767884677027776
