In [1]:
import org.apache.spark.sql.functions.{min, max}
import org.apache.spark.ml.recommendation.{ALS, ALSModel}
import scala.util.Random
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.{Dataset, Row}

In [2]:
val trainFile = "../datasets/movielens/ml-100k/ua.base"
val testFile = "../datasets/movielens/ml-100k/ua.test"
val columns = Array("user", "movie", "rating", "ts")

trainFile = ../datasets/movielens/ml-100k/ua.base
testFile = ../datasets/movielens/ml-100k/ua.test
columns = Array(user, movie, rating, ts)


[user, movie, rating, ts]

In [3]:
val train = spark.read.options(Map("header" -> "false", "inferSchema" -> "true", "delimiter" -> "\t")).csv(trainFile).toDF(columns: _*)
val test = spark.read.options(Map("header" -> "false", "inferSchema" -> "true", "delimiter" -> "\t")).csv(testFile).toDF(columns: _*)

train = [user: int, movie: int ... 2 more fields]
test = [user: int, movie: int ... 2 more fields]


[user: int, movie: int ... 2 more fields]

In [4]:
train.show(1)

+----+-----+------+---------+
|user|movie|rating|       ts|
+----+-----+------+---------+
|   1|    1|     5|874965758|
+----+-----+------+---------+
only showing top 1 row



In [5]:
test.show(1)

+----+-----+------+---------+
|user|movie|rating|       ts|
+----+-----+------+---------+
|   1|   20|     4|887431883|
+----+-----+------+---------+
only showing top 1 row



## Build the model

In [6]:
val model = new ALS()
    .setUserCol("user")
    .setItemCol("movie")
    .setRatingCol("rating")
    .setPredictionCol("prediction")
    .setImplicitPrefs(true)
    .setRank(25)
    .setNumUserBlocks(25)
    .fit(train)




model = als_8c2d51206881


als_8c2d51206881

In [14]:
model.setColdStartStrategy("drop")

als_5317ff46c06c

## Test the model

In [15]:
val recommendations = model.recommendForAllUsers(1680)

recommendations = [user: int, recommendations: array<struct<movie:int,rating:float>>]


[user: int, recommendations: array<struct<movie:int,rating:float>>]

In [16]:
recommendations.schema

StructType(StructField(user,IntegerType,false), StructField(recommendations,ArrayType(StructType(StructField(movie,IntegerType,true), StructField(rating,FloatType,true)),true),true))

In [17]:
import scala.collection.mutable
val moviesByUser = mutable.Map[Int, List[Int]]()

moviesByUser = Map()


Map()

In [18]:
train.sparkSession.sparkContext.broadcast(moviesByUser)

Broadcast(66)

In [19]:
import org.apache.spark.sql.functions.udf

val populateMoviesByUser = udf { (user: Int, movie: Int) =>
    println(user)
    moviesByUser.update(user, movie :: moviesByUser.getOrElse(user, Nil))
    moviesByUser.get(user).get
}

populateMoviesByUser = UserDefinedFunction(<function2>,ArrayType(IntegerType,false),Some(List(IntegerType, IntegerType)))


UserDefinedFunction(<function2>,ArrayType(IntegerType,false),Some(List(IntegerType, IntegerType)))

In [20]:
import org.apache.spark.sql.functions._

val usersWithItems = train
    .withColumn("populated", populateMoviesByUser($"user", $"movie"))
    .withColumn("countItems", size($"populated"))
    .sort(desc("countItems"))
    .select("user", "populated", "countItems")
    .dropDuplicates("user")
    .cache

usersWithItems = [user: int, populated: array<int> ... 1 more field]


[user: int, populated: array<int> ... 1 more field]

In [21]:
usersWithItems.show(5)

|user|           populated|countItems|
+----+--------------------+----------+
| 148|[1039, 1012, 1012...|       110|
| 463|[1606, 1605, 1605...|       246|
| 471|[1219, 946, 946, ...|        42|
| 496|[1614, 1473, 1473...|       238|
| 833|[1628, 1597, 1597...|       514|
+----+--------------------+----------+
only showing top 5 rows



In [22]:
def precision_at_k(k: Int): Double = {
    val usersWithItemsMap = usersWithItems
    .collect
    .map { r => (r.getInt(0), r.getSeq[Int](1)) }.toMap[Int, Seq[Int]]

    import org.apache.spark.sql.Row

    case class UserPredictions(user: Int, predictions: Seq[Row])

    import train.sparkSession.implicits._

    val userRec = recommendations
    .collect
    .map {
        r => UserPredictions(r.getInt(0), r.getAs[Seq[Row]](1) )
    }.map { up: UserPredictions => 
        val predictions = up.predictions.map { r => (r.getInt(0), r.getFloat(1)) }
        val topPredictions = predictions.flatMap { p =>
            if (!usersWithItemsMap(up.user).contains(p._1))
                Array(p)
            else
                None
        }
        .sortWith { (p1, p2) => p1._2 > p2._2 }
        .take(k)
        .map { p => p._1 }
    
        test
            .filter($"user" equalTo up.user)
            .filter($"movie" isin (topPredictions: _*) )
            .count
    }

    var sum: Double = 0
    userRec.take(10)
    userRec.foreach { r => sum += r.toDouble / k }
    sum / userRec.size
}


precision_at_k: (k: Int)Double


In [23]:
precision_at_k(10)



0.2603393425238597

In [24]:
precision_at_k(5)



0.3295864262990433