In [1]:
val rawData = sc.textFile("ml-100k/u.data")
println(rawData.first())

196	242	3	881250949


In [2]:
val rawRatings = rawData.map(_.split("\t").take(3))
println(rawRatings.first())

[Ljava.lang.String;@1793844e


In [3]:
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating

val ratings = rawRatings.map{ case Array(user, movie, rating) => Rating(user.toInt, movie.toInt, rating.toDouble)}

In [4]:
println(ratings.first())

Rating(196,242,3.0)


In [5]:
val model = ALS.train(ratings, 50, 10, 0.01)
println(model.userFeatures)

users MapPartitionsRDD[210] at mapValues at ALS.scala:255


In [6]:
println(model.userFeatures.count)

943


In [7]:
println(model.productFeatures.count)

1682


In [8]:
val predictedRating = model.predict(789, 123)
println(predictedRating)

4.362895251633278


In [9]:
val userId = 789
val K = 10
val topKRecs = model.recommendProducts(userId, K)

println(topKRecs.mkString("\n"))

Rating(789,68,6.281131062577569)
Rating(789,646,6.158066315115435)
Rating(789,179,5.617872750610584)
Rating(789,156,5.566253337933734)
Rating(789,641,5.497144452124723)
Rating(789,199,5.481461888669354)
Rating(789,23,5.368052680459596)
Rating(789,1019,5.316037210506697)
Rating(789,53,5.266712683357319)
Rating(789,589,5.256749255853212)


In [10]:
val movies = sc.textFile("ml-100k/u.item")
val titles = movies.map(line => line.split("\\|").take(2)).map(array => (array(0).toInt, array(1))).collectAsMap()
println(titles(123))

Frighteners, The (1996)


In [11]:
val moviesForUser = ratings.keyBy(_.user).lookup(789)
println(moviesForUser.size)

33


In [12]:
moviesForUser.sortBy(-_.rating).take(10).map(rating=>(titles(rating.product), rating.rating)).foreach(println)

(Godfather, The (1972),5.0)
(Trainspotting (1996),5.0)
(Dead Man Walking (1995),5.0)
(Star Wars (1977),5.0)
(Swingers (1996),5.0)
(Leaving Las Vegas (1995),5.0)
(Bound (1996),5.0)
(Fargo (1996),5.0)
(Last Supper, The (1995),5.0)
(Private Parts (1997),4.0)


In [13]:
topKRecs.map(rating => (titles(rating.product), rating.rating)).foreach(println)

(Crow, The (1994),6.281131062577569)
(Once Upon a Time in the West (1969),6.158066315115435)
(Clockwork Orange, A (1971),5.617872750610584)
(Reservoir Dogs (1992),5.566253337933734)
(Paths of Glory (1957),5.497144452124723)
(Bridge on the River Kwai, The (1957),5.481461888669354)
(Taxi Driver (1976),5.368052680459596)
(Die xue shuang xiong (Killer, The) (1989),5.316037210506697)
(Natural Born Killers (1994),5.266712683357319)
(Wild Bunch, The (1969),5.256749255853212)


In [14]:
//Cosine similairy approach
import org.jblas.DoubleMatrix
val aMatrix = new DoubleMatrix(Array(1.0, 2.0, 3.0))
println(aMatrix)

[1.000000; 2.000000; 3.000000]


In [15]:
def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double = {
    vec1.dot(vec2) / (vec1.norm2() * vec2.norm2())
}

val itemId = 567
val itemFactor = model.productFeatures.lookup(itemId).head
val itemVector = new DoubleMatrix(itemFactor)
println(cosineSimilarity(itemVector, itemVector))

0.9999999999999998


In [16]:
val sims = model.productFeatures.map{ case (id, factor) =>
    val factorVector = new DoubleMatrix(factor)
    val sim = cosineSimilarity(factorVector, itemVector)
    (id, sim)
}

val sortedSims = sims.top(K)(Ordering.by[(Int, Double), Double] { case
    (id, similarity) => similarity
})

println(sortedSims.take(10).mkString("\n"))

(567,0.9999999999999998)
(295,0.763937835921005)
(413,0.7539794117233227)
(516,0.749396050105273)
(670,0.7491382459162684)
(642,0.7443836971217216)
(257,0.7443217706089565)
(150,0.7408857068777103)
(471,0.7387196037513115)
(405,0.734483157797005)


In [17]:
println(titles(itemId))

Wes Craven's New Nightmare (1994)


In [18]:
val sortedSims2 = sims.top(K + 1)(Ordering.by[(Int, Double), Double] {
case(id, similarity) => similarity})

sortedSims2.slice(1,11).map{ case (id, sim) => (titles(id), sim)}.mkString("\n")

(Breakdown (1997),0.763937835921005)
(Tales from the Crypt Presents: Bordello of Blood (1996),0.7539794117233227)
(Local Hero (1983),0.749396050105273)
(Body Snatchers (1993),0.7491382459162684)
(Grifters, The (1990),0.7443836971217216)
(Men in Black (1997),0.7443217706089565)
(Swingers (1996),0.7408857068777103)
(Courage Under Fire (1996),0.7387196037513115)
(Mission: Impossible (1996),0.734483157797005)
(Private Parts (1997),0.732110292336232)

In [19]:
val actualRating = moviesForUser.take(1)(0)
println(actualRating)

Rating(789,1012,4.0)


In [20]:
val predictedRating = model.predict(789, actualRating.product)
println(predictedRating)

4.053099222433738


In [21]:
val squaredError = math.pow(predictedRating - actualRating.rating, 2.0)
println(squaredError)

0.0028195274230675485


In [23]:
val userProducts = ratings.map{ case Rating(user, product, rating) => (user, product)}
val predictions = model.predict(userProducts).map{
    case Rating(user, product, rating) => ((user, product), rating)
}

val ratingsAndPredictions = ratings.map{
    case Rating(user, product, rating) => ((user, product), rating)
}.join(predictions)

val MSE = ratingsAndPredictions.map{
    case ((user, product), (actual, predicted)) => math.pow((actual - predicted), 2.0)
}.reduce(_+_) / ratingsAndPredictions.count
println("Mean Squared Error = " + MSE)

Mean Squared Error = 0.08404232440699576


In [24]:
val RMSE = math.sqrt(MSE)
println("Root Mean Squared Error = " + RMSE)

Root Mean Squared Error = 0.289900542267509


In [41]:
def avgPrecisionK(actual: Seq[Int], predicted: Seq[Int], k: Int): Double = {
    val predK = predicted.take(k)
    var score = 0.0
    var numHits = 0.0
    for((p, i) <- predK.zipWithIndex){
        if(actual.contains(p)){
            numHits += 1.0
            score += numHits / (i.toDouble + 1.0)
        }
    }
    if(actual.isEmpty){
        1.0
    }else{
        score / scala.math.min(actual.size, k).toDouble
    }
}

val actualMovies = moviesForUser.map(_.product)
println(actualMovies)

ArrayBuffer(1012, 127, 475, 93, 1161, 286, 293, 9, 50, 294, 181, 1, 1008, 508, 284, 1017, 137, 111, 742, 248, 249, 1007, 591, 150, 276, 151, 129, 100, 741, 288, 762, 628, 124)


In [44]:
val predictedMovies =  topKRecs.map(_.product)
println(predictedMovies.mkString(", "))

68, 646, 179, 156, 641, 199, 23, 1019, 53, 589


In [47]:
val apk10 = avgPrecisionK(actualMovies, predictedMovies, 10)
println(apk10)

0.0


In [48]:
val itemFactors = model.productFeatures.map{ case (id, factor) => factor }.collect()
val itemMatrix = new DoubleMatrix(itemFactors)
println(itemMatrix.rows, itemMatrix.columns)

(1682,50)


In [49]:
val imBroadcast = sc.broadcast(itemMatrix)

In [50]:
val allRecs = model.userFeatures.map{ case (userId, array) => 
    val userVector = new DoubleMatrix(array)
    val scores = imBroadcast.value.mmul(userVector)
    val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1)
    val recommendedIds = sortedWithId.map(_._2 + 1).toSeq
    (userId, recommendedIds)
}

In [51]:
val userMovies = ratings.map{ case Rating(user, product, rating) => (user, product) }.groupBy(_._1)

In [52]:
val K = 10
val MAPK = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) =>
    val actual = actualWithIds.map(_._2).toSeq
    avgPrecisionK(actual, predicted, K)
}.reduce(_+_) / allRecs.count
println("Mean Average Precision at K = " + MAPK)

Mean Average Precision at K = 0.02642179636082074


In [53]:
//MLlib's built-in evaluation functions
import org.apache.spark.mllib.evaluation.RegressionMetrics
val predictedAndTrue = ratingsAndPredictions.map{ case ((user, product), (predicted, actual)) => (predicted, actual) }
val regressionMetrics = new RegressionMetrics(predictedAndTrue)

println("Mean Squared Error = " + regressionMetrics.meanSquaredError)
println("Root Mean Squared Error = " + regressionMetrics.rootMeanSquaredError)

Mean Squared Error = 0.08404232440699576
Root Mean Squared Error = 0.289900542267509


In [55]:
//Ranking Metrics
import org.apache.spark.mllib.evaluation.RankingMetrics
val predictedAndTrueForRanking = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) =>
    val actual = actualWithIds.map(_._2)
    (predicted.toArray, actual.toArray)
}
val rankingMetrics = new RankingMetrics(predictedAndTrueForRanking)
println("Mean Average Precision = " + rankingMetrics.meanAveragePrecision)

                                                                                Mean Average Precision = 0.0729136278794132


In [56]:
val  MAPK2000 = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) =>
    val actual = actualWithIds.map(_._2).toSeq
    avgPrecision(actual, predicted, 2000)
}.reduce(_+_) / allRecs.count
println("Mean Average Precision = " + MAPK2000)

Mean Average Precision = 0.07291362787941318
