# Recommendation with Implicit Feedback


In this notebook we demostrate how to build neural network recommendation system with implicit feedback, which indirectly reflects users’ preference through behaviours like watching videos, purchasing products and clicking items. 

## Intialization

* Start BigDL engine and spark session

In [23]:
import com.intel.analytics.bigdl._
import com.intel.analytics.bigdl.nn._
import com.intel.analytics.bigdl.numeric.NumericFloat
import com.intel.analytics.bigdl.optim._
import com.intel.analytics.bigdl.utils.Engine
import org.apache.spark.sql.functions._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
import org.apache.spark.ml.{DLClassifier, DLModel}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.log4j.{Level, Logger}

val conf = Engine.createSparkConf()
val spark = SparkSession.builder().master("local[8]").appName("RecommendationImplicitExample").config(conf).getOrCreate()
Logger.getLogger("org").setLevel(Level.ERROR)
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
Engine.init

## Data process

* Load data and check data quality. Here, we assume it is downloaded from [link](https://grouplens.org/datasets/movielens/1m/) and put it into directory of "/tmp/movielens/".

In [24]:
// mvpath is the location of downloaded data
val mvpath = "/tmp/movielens/ml-1m"

def getDataDF = {
    val indexedDF = spark.read.text(mvpath + "/ratings.dat").as[String]
      .map(x => {
        val data: Array[Double] = x.split("::").map(n => n.toDouble)
        (data(0), data(1), data(2))
      })
      .toDF("userIdIndex","itemIdIndex","label")
    
    val minMaxRow = indexedDF.agg(min("userIdIndex"), max("userIdIndex"), min("itemIdIndex"), max("itemIdIndex")).collect()(0)

    val minUserId = minMaxRow.getDouble(0)
    val userCount = minMaxRow.getDouble(1)
    val minMovieId = minMaxRow.getDouble(2)
    val itemCount = minMaxRow.getDouble(3)
    (indexedDF, userCount, itemCount)
  }

In [32]:
val (indexedDF, userCount, itemCount) = getDataDF
indexedDF.show(3)
println("userCount: " + userCount + "\nitemCount: " + itemCount)

+-----------+-----------+-----+
|userIdIndex|itemIdIndex|label|
+-----------+-----------+-----+
|        1.0|     1193.0|  5.0|
|        1.0|      661.0|  3.0|
|        1.0|      914.0|  3.0|
+-----------+-----------+-----+
only showing top 3 rows

userCount: 6040.0
itemCount: 3952.0


* We focus on impicit feedback, the user-item interaction is defined as 1 if interaction, 0 otherwise.  Ratings are all trasformed into 1.0 and negative samples are added by randomly sampling from the whole user and item space, then prepare features into label points required by DLClassifer and DLModel.
* Then split data into training and testing.

In [26]:
import scala.util.Random
import org.apache.spark.ml.feature.{LabeledPoint, StringIndexer}
import org.apache.spark.ml.linalg.Vectors

/* Negative samples are needed in this use case. 
 addNegativeSample is defined to add negative samples by randomly sampling from the whole user and item space.
*/
def addNegativeSample(indexedDF: DataFrame) = {
    val row = indexedDF.agg(max("userIdIndex"), max("itemIdIndex")).head
    val (userCount, itemCount) = (row.getAs[Double](0).toInt, row.getAs[Double](1).toInt)

    val sampleDict = indexedDF.rdd.map(row => row(0) + "," + row(1)).collect().toSet
    val numberRecords = 1 * indexedDF.count

    val ran = new Random(seed = 42L)
    val negativeSampleDF = indexedDF.sparkSession.sparkContext
      .parallelize(0 to numberRecords.toInt)
      .map(x => {
        val uid = Math.max(ran.nextInt(userCount), 1)
        val iid = Math.max(ran.nextInt(itemCount), 1)
        (uid, iid)
      })
      .filter(x => !sampleDict.contains(x._1 + "," + x._2)).distinct()
      .map(x => (x._1, x._2, 0.0))
      .toDF("userIdIndex", "itemIdIndex", "label")

    indexedDF.union(negativeSampleDF)
  }

// To tranform the dataframe of sparse features into label points
val df2LP: (DataFrame) => DataFrame = df => {
    import df.sparkSession.implicits._
    df.select("userIdIndex", "itemIdIndex", "label").rdd.map { r =>
      val f = Vectors.dense(r.getDouble(0), r.getDouble(1))
      require(f.toArray.take(2).forall(_ >= 0))
      val l = r.getDouble(2)
      LabeledPoint(l, f)
    }.toDF().orderBy(rand()).cache()
  }

In [33]:
val add1 = udf((num: Double) => num + 1)

val dataWithNegative = addNegativeSample(indexedDF.withColumn("label", lit(1.0d))).withColumn("label", add1(col("label")))
val dataInLP: DataFrame = df2LP(dataWithNegative)

val Array(trainingDF, validationDF) = dataInLP.randomSplit(Array(0.8, 0.2), seed = 1L)

trainingDF.cache()



[label: double, features: vector]

## Build model

* Here we show how to build a Multi-Layer Perceptron (MLP). A ModelParam class is defined to easily change model architecture. 
* The buttom layer is input layer, then it is embedding layer, which projects the sparse representation to a dense vector. In BigDL, we can use LookUpTable together with Select to create embedding layers, the user (item) embedding has input size of userCount(itemCount), output size of userEmbed (itemEmbed). At last, embedding layers are fed into a multi-layer neural architecture (midLayers). Eventually, a layer of LogSoftMax is added at the end. 
* Please refer to([BigDL programming guide](https://bigdl-project.github.io/master/#ProgrammingGuide/Model/Functional/)) for more details about functional API, and Nerual Collaborative filtering ([He, 2015](https://www.comp.nus.edu.sg/~xiangnan/papers/ncf.pdf)) for details about the architeture.

In [34]:
case class ModelParam(userEmbed: Int = 20,
                      itemEmbed: Int = 20,
                      mfEmbed: Int = 20,
                      midLayers: Array[Int] = Array(40, 20, 10),
                      labels: Int = 2){
  override def toString: String = {
    "userEmbed =" + userEmbed + "\n" +
    " itemEmbed = " + itemEmbed + "\n" +
    " mfEmbed = " + mfEmbed + "\n" +
    " midLayer = " + midLayers.mkString("|") + "\n" +
    " labels = " + labels
  }
}

class Model(modelParam: ModelParam) {
    import com.intel.analytics.bigdl.nn.Graph.ModuleNode
    import com.intel.analytics.bigdl.nn.{Graph, _}
    import com.intel.analytics.bigdl.numeric.NumericFloat
    import com.intel.analytics.bigdl.tensor.Tensor
    import com.intel.analytics.bigdl.nn._
    
  def this() = {
    this(ModelParam())
  }

  def mlp(userCount: Int, itemCount: Int) = {

    println(modelParam )

    val input = Identity().inputs()
    val select1: ModuleNode[Float] = Select(2, 1).inputs(input)
    val select2: ModuleNode[Float] = Select(2, 2).inputs(input)

    val userTable = LookupTable(userCount, modelParam.userEmbed)
    val itemTable = LookupTable(itemCount, modelParam.itemEmbed)
    userTable.setWeightsBias(Array(Tensor[Float](userCount, modelParam.userEmbed).randn(0, 0.1)))
    itemTable.setWeightsBias(Array(Tensor[Float](itemCount, modelParam.itemEmbed).randn(0, 0.1)))

    val userTableInput = userTable.inputs(select1)
    val itemTableInput = itemTable.inputs(select2)

    val embeddedLayer = JoinTable(2, 0).inputs(userTableInput, itemTableInput)

    val linear1: ModuleNode[Float] = Linear(modelParam.itemEmbed + modelParam.userEmbed,
      modelParam.midLayers(0)).inputs(embeddedLayer)

    val midLayer = buildMlpModuleNode(linear1, 1, modelParam.midLayers)

    val reluLast = ReLU().inputs(midLayer)
    val last: ModuleNode[Float] = Linear(modelParam.midLayers.last, modelParam.labels).inputs(reluLast)

    val output = if (modelParam.labels >= 2) LogSoftMax().inputs(last) else Sigmoid().inputs(last)

    Graph(input, output)
  }

  private def buildMlpModuleNode(linear: ModuleNode[Float], midLayerIndex: Int, midLayers: Array[Int]): ModuleNode[Float] = {

    if (midLayerIndex >= midLayers.length) {
      linear
    } else {
      val relu = ReLU().inputs(linear)
      val l = Linear(midLayers(midLayerIndex - 1), midLayers(midLayerIndex)).inputs(relu)
      buildMlpModuleNode(l, midLayerIndex + 1, midLayers)
    }
  }
}

In [35]:
val modelParam = ModelParam(userEmbed = 20,
                            itemEmbed = 20,
                            midLayers = Array(20,10),
                            labels = 2)
val recModel = new Model(modelParam)
val model = recModel.mlp(userCount.toInt, itemCount.toInt)

userEmbed =20
 itemEmbed = 20
 mfEmbed = 20
 midLayer = 20|10
 labels = 2


## Train a model using DLclassifier

* BigDL provides DLEstimator and DLClassifier for users with Apache Spark MLlib experience, which provides high level API for training a BigDL Model with the Apache Spark Estimator/ Transfomer pattern, thus users can conveniently fit BigDL into a ML pipeline. Please refer to [BigDL guide](https://bigdl-project.github.io/master/#ProgrammingGuide/MLPipeline/#overview) for more details.

# val criterion = ClassNLLCriterion()
val dlc = new DLClassifier(model, criterion, Array(2)).setBatchSize(1000).setOptimMethod(new Adam()).setLearningRate(1e-1).setLearningRateDecay(1e-6).setMaxEpoch(3)

val time1 = System.nanoTime()

val dlModel: DLModel[Float] = dlc.fit(trainingDF)
trainingDF.unpersist()

val time2 = System.nanoTime()

println("training time(s):  " + (time2-time1)*(1e-9))

## Evaluation

* Recommendation system could be evaluated using different metrics, here we show example of using traditional area under the curve, precision and recall. Metrics based on different customer's use cases are prefered.

In [39]:
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, MulticlassClassificationEvaluator}
import org.apache.spark.sql.DataFrame
import org.apache.spark.rdd.RDD

object Evaluation {
  
  def toDecimal(n: Int) = {
    (arg: Double) => BigDecimal(arg).setScale(n, BigDecimal.RoundingMode.HALF_UP).toDouble
   }
    
  def evaluate(evaluateDF: DataFrame) = {
    val binaryEva = new BinaryClassificationEvaluator().setRawPredictionCol("prediction")
    val out1 = binaryEva.evaluate(evaluateDF)
    println("AUROC: " + toDecimal(3)(out1))

    val multiEva = new MulticlassClassificationEvaluator().setMetricName("weightedPrecision")
    val out2 = multiEva.evaluate(evaluateDF)
    println("precision: " + toDecimal(3)(out2))

    val multiEva2 = new MulticlassClassificationEvaluator().setMetricName("weightedRecall")
    val out3 = multiEva2.evaluate(evaluateDF)
    println("recall: " + toDecimal(3)(out3))

    Seq(out1, out2, out3).map(x=> toDecimal(3)(x))
  }
}

In [41]:
val predictions = dlModel.setBatchSize(1).transform(validationDF)
predictions.show(3)
predictions.cache()

val toZero = udf { d: Double =>
    if (d > 1) 1.0 else 0.0
 }
val res = Evaluation.evaluate(predictions.withColumn("label", toZero(col("label"))).withColumn("prediction", toZero(col("prediction"))))
val time3 = System.nanoTime()
val resStr = modelParam +"\n" + res.mkString(" | ") +"\n"

println(resStr)
println("prediction and evaluation time(s):  " + (time3-time2)*(1e-9))

+-----+--------------+----------+
|label|      features|prediction|
+-----+--------------+----------+
|  1.0|  [81.0,981.0]|       1.0|
|  1.0|[115.0,2218.0]|       1.0|
|  1.0|[162.0,2706.0]|       2.0|
+-----+--------------+----------+
only showing top 3 rows

precision: 0.908
recall: 0.916
userEmbed =20
 itemEmbed = 20
 mfEmbed = 20
 midLayer = 20|10
 labels = 2
0.731 | 0.908 | 0.916

prediction and evaluation time(s):  122.54388534700001


## Recommendations

* Here is an example to show recommendations for users, recommendations for item.

In [None]:
val x = 1

In [None]:
val y = 1

In [None]:
val z = 1

In [None]:
val x = 1

In [None]:
val x = 1