### links
[Collaborative filtering](https://en.wikipedia.org/wiki/Collaborative_filtering)

latent-factor [Factor analysis](https://en.wikipedia.org/wiki/Factor_analysis)

[Matrix factorization](https://en.wikipedia.org/wiki/Non-negative_matrix_factorization)

[QR decomposition](https://en.wikipedia.org/wiki/QR_decomposition)

[ROC curve](https://en.wikipedia.org/wiki/Receiver_operating_characteristic)

[Evaluation measures](https://en.wikipedia.org/wiki/Evaluation_measures_(information_retrieval)#Mean_average_precision)

[k-fold cross-validation](https://machinelearningmastery.com/k-fold-cross-validation/)

see

```OryxProject```

In [1]:
%%init_spark
launcher.driver_memory = '6g'

In [2]:
val rawUserArtistData = sc.textFile("../../data/music/user_artist_data.txt")

Intitializing Scala interpreter ...

Spark Web UI available at http://czu8001-precision-5520.home:4040
SparkContext available as 'sc' (version = 2.4.4, master = local[*], app id = local-1572904279943)
SparkSession available as 'spark'


rawUserArtistData: org.apache.spark.rdd.RDD[String] = ../../data/music/user_artist_data.txt MapPartitionsRDD[1] at textFile at <console>:25


In [3]:
rawUserArtistData.map(_.split(" ")(0).toDouble).stats
rawUserArtistData.map(_.split(" ")(1).toDouble).stats

res0: org.apache.spark.util.StatCounter = (count: 24296858, mean: 1718704.093757, stdev: 2539389.040171, max: 10794401.000000, min: 1.000000)


In [4]:
val rawArtistData = sc.textFile("../../data/music/artist_data.txt")

rawArtistData: org.apache.spark.rdd.RDD[String] = ../../data/music/artist_data.txt MapPartitionsRDD[7] at textFile at <console>:25


In [5]:
val artistByID = rawArtistData.map { line =>
    val (id, name) = line.span(_ != '\t')
    if (name.isEmpty) {
        None
    } else {
        try {
            Some((id.toInt, name.trim))
        } catch {
            case e: NumberFormatException => None
        }
    }
}

artistByID: org.apache.spark.rdd.RDD[Option[(Int, String)]] = MapPartitionsRDD[8] at map at <console>:26


In [6]:
val rawArtistAlias = sc.textFile("../../data/music/artist_alias.txt")
val artistAlias = rawArtistAlias.flatMap { line =>
    val tokens = line.split('\t')
    if (tokens(0).isEmpty) {
        None
    } else {
        Some((tokens(0).toInt, tokens(1).toInt))
    }
}.collectAsMap()

rawArtistAlias: org.apache.spark.rdd.RDD[String] = ../../data/music/artist_alias.txt MapPartitionsRDD[10] at textFile at <console>:25
artistAlias: scala.collection.Map[Int,Int] = Map(6803336 -> 1000010, 6663187 -> 1992, 2124273 -> 2814, 10412283 -> 1010353, 9969191 -> 1320354, 2024757 -> 1001941, 10208201 -> 4605, 2139121 -> 1011083, 1186393 -> 78, 2094504 -> 1012167, 9931106 -> 1000289, 2167517 -> 2060894, 1351735 -> 1266817, 6943682 -> 1003342, 2027368 -> 1000024, 2056419 -> 1020783, 1214789 -> 1001066, 1022944 -> 1004983, 6640739 -> 1010367, 6902331 -> 411, 10303141 -> 82, 10029249 -> 2070, 7001129 -> 739, 6627784 -> 1046699, 1113560 -> 1275800, 2155414 -> 1000790, 1291139 -> 4163, 10061700 -> 831, 1043158 -> 1301875, 10294241 -> 1234737, 9991298 -> 1001419, 9965450 -> 1016520, 68004...

### train data

In [7]:
import org.apache.spark.mllib.recommendation._

val bArtistAlias = sc.broadcast(artistAlias)

val trainData = rawUserArtistData.map { line =>
    val Array(userID, artistID, count) = line.split(' ').map(_.toInt)
    val finalArtistID = bArtistAlias.value.getOrElse(artistID, artistID)
    Rating(userID, finalArtistID, count)
}.cache()

import org.apache.spark.mllib.recommendation._
bArtistAlias: org.apache.spark.broadcast.Broadcast[scala.collection.Map[Int,Int]] = Broadcast(6)
trainData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating] = MapPartitionsRDD[12] at map at <console>:33


In [8]:
val model = ALS.trainImplicit(trainData, 10, 5, 0.01, 1.0)

model: org.apache.spark.mllib.recommendation.MatrixFactorizationModel = org.apache.spark.mllib.recommendation.MatrixFactorizationModel@2fca6529


In [9]:
// checking model
val rawArtistForUser = rawUserArtistData.map(_.split(' ')).filter{ case Array(user,_,_) => user.toInt == 2093760 }
val existingProducts = rawArtistForUser.map { case Array(_,artist,_) => artist.toInt }.collect().toSet

rawArtistForUser: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[141] at filter at <console>:30
existingProducts: scala.collection.immutable.Set[Int] = Set(1255340, 942, 1180, 813, 378)


In [10]:
artistByID.filter(x => x != None).filter(x => existingProducts.contains(x.get._1)).collect.foreach(println)

Some((1180,David Gray))
Some((378,Blackalicious))
Some((813,Jurassic 5))
Some((1255340,The Saw Doctors))
Some((942,Xzibit))


In [11]:
val recommendations = model.recommendProducts(2093760, 5)
recommendations.foreach(println)

Rating(2093760,1001819,0.02628420179655294)
Rating(2093760,2814,0.026010115646986007)
Rating(2093760,1811,0.024987081528557605)
Rating(2093760,1300642,0.024822425820989272)
Rating(2093760,4605,0.024688995357883187)


recommendations: Array[org.apache.spark.mllib.recommendation.Rating] = Array(Rating(2093760,1001819,0.02628420179655294), Rating(2093760,2814,0.026010115646986007), Rating(2093760,1811,0.024987081528557605), Rating(2093760,1300642,0.024822425820989272), Rating(2093760,4605,0.024688995357883187))


In [12]:
val recommendedProductsIDs = recommendations.map(_.product).toSet

recommendedProductsIDs: scala.collection.immutable.Set[Int] = Set(2814, 1811, 1001819, 1300642, 4605)


In [13]:
artistByID.filter{x => x != None}.filter {x => recommendedProductsIDs.contains(x.get._1) }.collect()

res3: Array[Option[(Int, String)]] = Array(Some((2814,50 Cent)), Some((4605,Snoop Dogg)), Some((1811,Dr. Dre)), Some((1001819,2Pac)), Some((1300642,The Game)))


In [14]:
import org.apache.spark.rdd._

import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import org.apache.spark.broadcast.Broadcast
//import org.apache.spark.ml.recommendation.{ALS, ALSModel}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.functions._

  def areaUnderCurve(
      positiveData: DataFrame,
      bAllArtistIDs: Broadcast[Array[Int]],
      predictFunction: (DataFrame => DataFrame)): Double = {

    // What this actually computes is AUC, per user. The result is actually something
    // that might be called "mean AUC".

    // Take held-out data as the "positive".
    // Make predictions for each of them, including a numeric score
    val positivePredictions = predictFunction(positiveData.select("user", "artist")).
      withColumnRenamed("prediction", "positivePrediction")

    // BinaryClassificationMetrics.areaUnderROC is not used here since there are really lots of
    // small AUC problems, and it would be inefficient, when a direct computation is available.

    // Create a set of "negative" products for each user. These are randomly chosen
    // from among all of the other artists, excluding those that are "positive" for the user.
    val negativeData = positiveData.select("user", "artist").as[(Int,Int)].
      groupByKey { case (user, _) => user }.
      flatMapGroups { case (userID, userIDAndPosArtistIDs) =>
        val random = new Random()
        val posItemIDSet = userIDAndPosArtistIDs.map { case (_, artist) => artist }.toSet
        val negative = new ArrayBuffer[Int]()
        val allArtistIDs = bAllArtistIDs.value
        var i = 0
        // Make at most one pass over all artists to avoid an infinite loop.
        // Also stop when number of negative equals positive set size
        while (i < allArtistIDs.length && negative.size < posItemIDSet.size) {
          val artistID = allArtistIDs(random.nextInt(allArtistIDs.length))
          // Only add new distinct IDs
          if (!posItemIDSet.contains(artistID)) {
            negative += artistID
          }
          i += 1
        }
        // Return the set with user ID added back
        negative.map(artistID => (userID, artistID))
      }.toDF("user", "artist")

    // Make predictions on the rest:
    val negativePredictions = predictFunction(negativeData).
      withColumnRenamed("prediction", "negativePrediction")

    // Join positive predictions to negative predictions by user, only.
    // This will result in a row for every possible pairing of positive and negative
    // predictions within each user.
    val joinedPredictions = positivePredictions.join(negativePredictions, "user").
      select("user", "positivePrediction", "negativePrediction").cache()

    // Count the number of pairs per user
    val allCounts = joinedPredictions.
      groupBy("user").agg(count(lit("1")).as("total")).
      select("user", "total")
    // Count the number of correctly ordered pairs per user
    val correctCounts = joinedPredictions.
      filter($"positivePrediction" > $"negativePrediction").
      groupBy("user").agg(count("user").as("correct")).
      select("user", "correct")

    // Combine these, compute their ratio, and average over all users
    val meanAUC = allCounts.join(correctCounts, Seq("user"), "left_outer").
      select($"user", (coalesce($"correct", lit(0)) / $"total").as("auc")).
      agg(mean("auc")).
      as[Double].first()

    joinedPredictions.unpersist()

    meanAUC
  }


def buildRatings(
      rawUserArtistData: RDD[String],
      bArtistAlias: Broadcast[Map[Int,Int]]) = {
    rawUserArtistData.map { line =>
      val Array(userID, artistID, count) = line.split(' ').map(_.toInt)
      val finalArtistID = bArtistAlias.value.getOrElse(artistID, artistID)
      Rating(userID, finalArtistID, count)
    }
  }

import org.apache.spark.rdd._
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.functions._
areaUnderCurve: (positiveData: org.apache.spark.sql.DataFrame, bAllArtistIDs: org.apache.spark.broadcast.Broadcast[Array[Int]], predictFunction: org.apache.spark.sql.DataFrame => org.apache.spark.sql.DataFrame)Double
buildRatings: (rawUserArtistData: org.apache.spark.rdd.RDD[String], bArtistAlias: org.apache.spark.broadcast.Broadcast[scala.collection.Map[Int,Int]])org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating]


In [30]:
val allData = buildRatings(rawUserArtistData, bArtistAlias)
val Array(trainData, cvData) = allData.randomSplit(Array(0.9, 0.1))
trainData.cache()
cvData.cache()

val allItemIDs = allData.map(_.product).distinct().collect()
val bAllItemIDS = sc.broadcast(allItemIDs)

allData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating] = MapPartitionsRDD[302] at map at <console>:116
trainData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating] = MapPartitionsRDD[303] at randomSplit at <console>:51
cvData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating] = MapPartitionsRDD[304] at randomSplit at <console>:51
allItemIDs: Array[Int] = Array(1115179, 2157922, 6697353, 9979788, 10486944, 2292732, 2094547, 10132642, 10269896, 10692487, 10124491, 10784358, 10537033, 6848803, 10526529, 6740630, 1102283, 1276561, 10449985, 6981052, 6764186, 9959690, 10090912, 9961692, 10186813, 6657404, 6944769, 10468315, 1082679, 10224097, 7037004, 2116751, 10661235, 10384998, 9993152, 2043119, 6811740, 1216787, 2108067, 1...

In [16]:
val model = ALS.trainImplicit(trainData, 10, 5, 0.01, 1.0)

model: org.apache.spark.mllib.recommendation.MatrixFactorizationModel = org.apache.spark.mllib.recommendation.MatrixFactorizationModel@7f87277a


In [35]:
val auc = areaUnderCurve(cvData.toDF, bAllItemIDS, model.transform)

<console>: 45: error: value transform is not a member of org.apache.spark.mllib.recommendation.MatrixFactorizationModel

In [34]:
model

res8: org.apache.spark.mllib.recommendation.MatrixFactorizationModel = org.apache.spark.mllib.recommendation.MatrixFactorizationModel@7f87277a
