In [1]:
import org.apache.spark.ml.recommendation.ALS

In [2]:
case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)

defined class Rating


In [3]:
def parseRating(str: String): Rating = {
  val fields = str.split("::")
  assert(fields.size == 4)
  Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
}

parseRating: (str: String)Rating


In [4]:
parseRating("1::1193::5::978300760")

Rating(1,1193,5.0,978300760)

In [5]:
var raw = sc.textFile("/data/ml-1m/ratings.dat")

raw = /data/ml-1m/ratings.dat MapPartitionsRDD[1] at textFile at <console>:28


/data/ml-1m/ratings.dat MapPartitionsRDD[1] at textFile at <console>:28

In [6]:
raw.take(1)

[1::1193::5::978300760]

In [7]:
val ratings = raw.map(parseRating).toDF()

ratings = [userId: int, movieId: int ... 2 more fields]


[userId: int, movieId: int ... 2 more fields]

In [8]:
ratings.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|   1193|   5.0|978300760|
|     1|    661|   3.0|978302109|
|     1|    914|   3.0|978301968|
|     1|   3408|   4.0|978300275|
|     1|   2355|   5.0|978824291|
+------+-------+------+---------+
only showing top 5 rows



In [9]:
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))

training = [userId: int, movieId: int ... 2 more fields]
test = [userId: int, movieId: int ... 2 more fields]


[userId: int, movieId: int ... 2 more fields]

In [10]:
val als = new ALS().setMaxIter(5).setRegParam(0.01).setUserCol("userId").setItemCol("movieId").setRatingCol("rating")

als = als_c531e2ba1fdc


als_c531e2ba1fdc

In [11]:
val model = als.fit(training)



model = als_c531e2ba1fdc


als_c531e2ba1fdc

model.save("mymodel_test")

In [14]:
val predictions = model.transform(test)

predictions = [userId: int, movieId: int ... 3 more fields]


[userId: int, movieId: int ... 3 more fields]

In [15]:
predictions.map(r => r(2).asInstanceOf[Float] - r(4).asInstanceOf[Float]).map(x => x*x).filter(!_.isNaN).reduce(_ + _)



160690.31

In [None]:
predictions.take(10)

[Stage 132:>                                                        (0 + 2) / 2]

In [None]:
predictions.write.format("com.databricks.spark.csv").save("ml-predictions.csv")