In [0]:
%sh
# wget https://files.grouplens.org/datasets/movielens/ml-25m.zip
# unzip ml-25m.zip
# ls
# hadoop fs -ls

In [1]:
%sh
spark-shell --deploy-mode client

In [2]:
val movieSchema = "movieId INT, title STRING, genres STRING"
val df_movies = spark.read.format("csv").schema(movieSchema).option("header", "true").load("ml-25m/movies.csv")
//df_movies.show(25, false)

val ratingSchema = "userId INT, movieId INT, rating DOUBLE, timestamp LONG"
val df_ratings_timestamp = spark.read.format("csv").schema(ratingSchema).option("header", "true").load("ml-25m/ratings.csv")
val df_ratings = df_ratings_timestamp.drop("timestamp")
//df_ratings.show(25, false)


// sc.stop()



//Commands to try training an ALS model:


val set = df_ratings.randomSplit(Array(0.8, 0.2))
val training = set(0).cache()
val test = set(1).cache()
//training.take(5).foreach(println)

println(s"Training: ${training.count()}, test: ${test.count()}")

In [3]:
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.ml.recommendation.ALSModel
import org.apache.spark.ml.evaluation.RegressionEvaluator


// Build the recommendation model using ALS on the training data
val als = new ALS()
  .setMaxIter(15)
  .setRank(10)
  .setRegParam(0.01)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating")
val model = als.fit(training)

// Evaluate the model by computing the RMSE on the test data
// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model.setColdStartStrategy("drop")
// model.save("target/tmp/myCollaborativeFilter")
// val model = ALSModel.load("target/tmp/myCollaborativeFilter")
val predictions = model.transform(test)

val evaluator = new RegressionEvaluator()
  .setMetricName("rmse")
  .setLabelCol("rating")
  .setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")

In [4]:
predictions.show(25)

In [5]:
// Generate top 10 movie recommendations for each user
val userRecs = model.recommendForAllUsers(10)
// Generate top 10 user recommendations for each movie
val movieRecs = model.recommendForAllItems(10)

In [6]:
// For user 148:

// Generate list of movies that he has rated with atleast 4.0 score out of 5. Get this from df_movies
// Check the predictions that i have made for user 148 with atleast 4.0 out of 5. Get this from predictions
// See if those movies are similar based on genre , year? and tags?

// If so, my analytic's goodness is shown!

In [7]:
// Join predictions.filter(userId === 148)
// with df_movies on movieId

val predictions148 = predictions.filter(predictions("userId") === 148)
val predictions148NoUser = predictions148.drop("userId")

In [8]:
val predShow = df_movies.join(predictions148NoUser, df_movies("movieId") === predictions148NoUser("movieId"))

In [9]:
val predShowFinal = predShow.drop("movieId")

In [10]:
predShowFinal.show(15, false)

In [11]:
val arrMovies148 = model.transform (test.where(test("userId") === 148))
    .select ('movieId, 'prediction)
    .orderBy('prediction.desc)
    .limit(10)
    .toDF()

In [12]:
arrMovies148.show(10)

In [13]:
val movies148 = df_movies.join(arrMovies148, df_movies("movieId") === arrMovies148("movieId")).drop("movieId")

In [14]:
movies148.show(10, false)

In [15]:
val ratings148 = df_ratings.filter(df_ratings("userId") === 148 && df_ratings("rating") === 5.0)

In [16]:
ratings148.join(df_movies, df_movies("movieId") === ratings148("movieId")).drop("movieId", "userId").show(10, false)