<h1> Collaborative Filtering using Alternating Least Squares technique </h1>

<h2>Configure Spark</h2>

In [1]:
%%init_spark
launcher.num_executors = 4
launcher.executor_cores = 4
launcher.driver_memory = '10g'
launcher.executor_memory = '10g'

<h2> Data Preprocessing </h2>

In [2]:
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val data_file = sc.textFile("gs://mvr2126-bucket/lastfm-dataset-360K/usersha1-profile.tsv")
val user_data = data_file.sample(false,0.05,42)
val user_key_map = user_data
    .map(l => l.split("\t"))
    .map(a => a(0))
    .zipWithUniqueId
    .map(t => (t._1,t._2.toInt))
    .collectAsMap()
val fields = Array(StructField("user",IntegerType,nullable=false),
                  StructField("gender",StringType,nullable=false),
                   StructField("age",StringType,nullable=false),
                  StructField("country",StringType,nullable=false),
                  StructField("signupdate",StringType,nullable=false))
val schema = StructType(fields)
val user_data1 = user_data.map(l => l.split("\t"))
.map {
    l => {
        val long_id = user_key_map get l(0)
        Row(long_id.get,l(1),l(2),l(3),l(4))
    }
}
val user_df = spark.createDataFrame(user_data1,schema)

val data = sc.textFile("gs://mvr2126-bucket/lastfm-dataset-360K/usersha1-artmbid-artname-plays.tsv")
val data1 = data.map(l=>l.split("\t"))
    .map(a => (a(1),a(2)))
    .reduceByKey((v1, v2) => v1)

val artist_key_map = data1.map(r=> r._1)
    .zipWithUniqueId
    .map(t => (t._1,t._2.toInt))
    .collectAsMap

val artist_fields = Array(StructField("artist_id",IntegerType,nullable=false),
                  StructField("artist_name",StringType,nullable=false))

val artist_schema = StructType(artist_fields)

val artist_df = spark.createDataFrame(data1
.map {
    l => {
        val long_id = artist_key_map get l._1
        Row(long_id.get,l._2)
    }
},artist_schema)

def artist_lookup(id: Integer) = {
    artist_df.filter($"artist_id"===id).rdd.first.toSeq(1).toString
}
val play_fields = Array(StructField("user",IntegerType,nullable=false),
                  StructField("artist_id",IntegerType,nullable=false),
                   StructField("plays",IntegerType,nullable=false))

val play_schema = StructType(play_fields)

val plays_rdd = data.map(l => l.split("\t"))
    .map {
        l => {
            val user_id = user_key_map.getOrElse(l(0),-1)
            val artist_id = artist_key_map.getOrElse(l(1),-1)
            val plays = l(3).replaceAll("^\\s+", "").toInt
            Row(user_id,artist_id,plays)
        }

}

val plays_df = spark.createDataFrame(plays_rdd.filter(r => r(0) != -1),play_schema)

Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.7.214:4041
SparkContext available as 'sc' (version = 2.4.4, master = local[*], app id = local-1588607303105)
SparkSession available as 'spark'


import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
data_file: org.apache.spark.rdd.RDD[String] = ../class 11/lastfm-dataset-360K/usersha1-profile.tsv MapPartitionsRDD[1] at textFile at <console>:27
user_data: org.apache.spark.rdd.RDD[String] = PartitionwiseSampledRDD[2] at sample at <console>:28
user_key_map: scala.collection.Map[String,Int] = Map(6ff29d8fec1d97e04e950ccd1e1c853dee9f5ad5 -> 15876, 8e6af18c39b024025a4aefd64ebcf05dd45b7c6f -> 1945, b1b13addc2bf5918a1fa8a117ba6232f47cc2a3e -> 6925, 778a3b0e2312c7efbc87daa80ad5f4ca072f4f49 -> 16912, e0cab2dd54549bc9050e2e2954d7191f2d4d782f -> 13459, 972599142e08fe118319e7fcbc4a264d7e044fb4 -> 3181, a11368f80a696689ad5075ca4a23dde1a9545ad0 -> 4567, 38604f8ff08e6463a9761b7a5464fb99adf8a16d -> 7850, 171edc38d8d58428bf7620c49a0...

<h2> ALS Algorithm </h2>

In [None]:
import org.apache.spark.ml.recommendation._
import scala.util.Random
val split_data = plays_df.randomSplit(Array(0.7,0.3),20)
val train_df = split_data(0).toDF
val test_df = split_data(1).toDF

val als = new ALS().
        setSeed(Random.nextLong()).
        setImplicitPrefs(true).
        setRank(5).setRegParam(1.0).
        setAlpha(1.0).setMaxIter(20).
        setUserCol("user").setItemCol("artist_id").
        setRatingCol("plays").setPredictionCol("prediction")

val model = als.fit(train_df)

<h2>Extract played predictions</h2>

In [3]:
val played_predictions = model
    .transform(test_df.select("user","artist_id"))
    .withColumnRenamed("prediction","played")


played_predictions: org.apache.spark.sql.DataFrame = [user: int, artist_id: int ... 1 more field]


<h2> Broadcasting all artists to all clusters </h2>

In [4]:
val allArtists = plays_df.select("artist_id").as[Int].distinct().collect()
val bAllArtists = spark.sparkContext.broadcast(allArtists)

allArtists: Array[Int] = Array(97564, 13285, 133153, 14832, 94819, 133524, 117994, 135867, 48254, 68579, 110904, 65408, 89863, 108560, 120899, 150822, 107536, 93319, 146036, 141449, 32445, 92188, 143432, 158339, 24663, 143153, 81410, 42468, 12027, 1829, 10623, 49308, 7880, 79220, 129345, 156366, 139128, 51393, 15957, 43935, 145011, 91785, 150087, 100884, 140541, 145210, 28577, 152600, 9376, 74904, 35689, 133577, 133590, 34759, 130062, 153409, 140081, 102793, 139024, 28836, 72578, 29194, 154034, 92834, 79361, 63155, 134748, 7993, 142084, 156363, 31528, 57178, 115741, 19530, 111381, 145203, 6336, 156365, 126373, 43714, 6620, 109909, 150604, 100800, 57984, 833, 128367, 64628, 34239, 150383, 133730, 134205, 83861, 69637, 42635, 156941, 150843, 56680, 144991, 94950, 96044, 41751, 149761, 465...

<h2>Extracting unplayed predictions</h2>

In [5]:
import scala.collection.mutable.ArrayBuffer
import scala.util.Random


val unplayed = test_df.select("user", "artist_id").as[(Int,Int)].
      groupByKey { case (user, _) => user }.
      flatMapGroups { case (userID, user_artist_tuples) =>
        val random = new Random()
          //Set of all artists for this user that are in the test data set
        val played_set = user_artist_tuples.map { case (_, artist) => artist }.toSet
          //place holder mutable array for the artists that are not in the test set for this user
        val unplayed = new ArrayBuffer[Int]()
          //grab artist ids from the broadcast array
        val allArtists = bAllArtists.value
        var i = 0
        // Iterate over all artists until we have enough "negative" artists
          //randomly picking artists from the set of all artists
          //as long as the artist is not in the positive artists set
          
        while (i < allArtists.length && unplayed.size < played_set.size) {
          val artistID = allArtists(random.nextInt(allArtists.length))
          // Only add new distinct IDs
          if (!played_set.contains(artistID)) {
            unplayed += artistID
          }
          i += 1
        }
        // Return the set with user ID added back
        unplayed.map(artistID => (userID, artistID))
      }.toDF("user","artist_id")


import scala.collection.mutable.ArrayBuffer
import scala.util.Random
unplayed: org.apache.spark.sql.DataFrame = [user: int, artist_id: int]


<h2>Predictions for the unplayed (user,artist) data</h2>

In [6]:
val unplayed_predictions = model.transform(unplayed).
      withColumnRenamed("prediction", "unplayed")

unplayed_predictions: org.apache.spark.sql.DataFrame = [user: int, artist_id: int ... 1 more field]


<h2>Evaluate the model</h2>

In [7]:
import org.apache.spark.sql.DataFrame
def accuracy(played_predictions: DataFrame,unplayed_predictions: DataFrame): Double = {



    // Join played predictions to unplayed predictions by user.
    // This will result in a row for every possible pairing of played and unplayed
    // predictions within each user.
    val joined = played_predictions.join(unplayed_predictions, "user")
        .select("user", "played", "unplayed")
        .cache()

    // Count the number of pairs per user. In a perfect model, this should total the number of pairs
    val totals = joined
        .groupBy("user")
        .agg(count(lit("1")).as("total"))
        .select("user", "total")
    
    // Count the number of pairs for each user where played prediction > unplayed prediction
    val model_counts = joined
        .filter($"played" > $"unplayed")
        .groupBy("user")
        .agg(count("user").as("model"))
        .select("user", "model")

    // Combine these, compute their ratio, and average over all users
    val accuracy = totals
                    .join(model_counts, Seq("user"), "left_outer")
                    .select($"user", (coalesce($"model", lit(0)) / $"total").as("acc"))
                    .agg(mean("acc"))
                    .as[Double]
                    .first()

    joined.unpersist()

    accuracy
  }


import org.apache.spark.sql.DataFrame
accuracy: (played_predictions: org.apache.spark.sql.DataFrame, unplayed_predictions: org.apache.spark.sql.DataFrame)Double


In [8]:
accuracy(played_predictions,unplayed_predictions)

res0: Double = 0.8139504014789021


<h2> Hyper parameter tuning </h2>

In [19]:
import org.apache.spark.ml.recommendation._
import scala.util.Random

import scala.collection.mutable.ArrayBuffer


val unplayed = test_df.select("user", "artist_id").as[(Int,Int)].
      groupByKey { case (user, _) => user }.
      flatMapGroups { case (userID, user_artist_tuples) =>
        val random = new Random()
          //Set of all artists for this user that are in the test data set
        val played_set = user_artist_tuples.map { case (_, artist) => artist }.toSet
          //place holder mutable array for the artists that are not in the test set for this user
        val unplayed = new ArrayBuffer[Int]()
          //grab artist ids from the broadcast array
        val allArtists = bAllArtists.value
        var i = 0
        // Iterate over all artists until we have enough "negative" artists
          //randomly picking artists from the set of all artists
          //as long as the artist is not in the positive artists set
          
        while (i < allArtists.length && unplayed.size < played_set.size) {
          val artistID = allArtists(random.nextInt(allArtists.length))
          // Only add new distinct IDs
          if (!played_set.contains(artistID)) {
            unplayed += artistID
          }
          i += 1
        }
        // Return the set with user ID added back
        unplayed.map(artistID => (userID, artistID))
      }.toDF("user","artist_id")

val evaluations = for (rank     <- Seq(5,10); 
                       regParam <- Seq(.01,1.0); 
                       alpha    <- Seq(1.0,10.0))
    yield {
        val model = new ALS().
            setSeed(Random.nextLong()).
            setImplicitPrefs(true).
            setRank(rank).setRegParam(regParam).
            setAlpha(alpha).setMaxIter(20).
            setUserCol("user").setItemCol("artist_id").
            setRatingCol("plays").setPredictionCol("prediction").
            fit(train_df)
        val played_predictions = model
            .transform(test_df.select("user","artist_id"))
            .withColumnRenamed("prediction","played")
        val unplayed_predictions = model.transform(unplayed)
            .withColumnRenamed("prediction", "unplayed")
        val acc = accuracy(played_predictions,unplayed_predictions)

      (acc, (rank, regParam, alpha))
    }

import org.apache.spark.ml.recommendation._
import scala.util.Random
import scala.collection.mutable.ArrayBuffer
unplayed: org.apache.spark.sql.DataFrame = [user: int, artist_id: int]
evaluations: Seq[(Double, (Int, Double, Double))] = List((0.8133508696288466,(5,0.01,1.0)), (0.8126540419958646,(5,0.01,10.0)), (0.8131559686893157,(5,1.0,1.0)), (0.8177653645251387,(5,1.0,10.0)), (0.8224748744414134,(10,0.01,1.0)), (0.8244080234966058,(10,0.01,10.0)), (0.8228561560836951,(10,1.0,1.0)), (0.8254413267658274,(10,1.0,10.0)))


<h3> The maximum accuracy is 0.82544 for the parameters:</h3>
<li>Rank: 10</li>
<li>Regularisation Parameter: 1.0</li>
<li>Learning rate: 10.0</li>