# Simple recommenders in Scala for Retail Rocket dataset

We've published our dataset for research purposes recently here: https://www.kaggle.com/retailrocket/ecommerce-dataset Views, downloads and upvotes are good but there hasn't published any kernels yet. So I decided to write some posts about using such type of data for creating simple recommender systems. Scala is heavily used in Retailrocket. This notebook is aimed to show how to create simple recommenders in Scala. I created a simple recommender to predict the next click on item. 

The first step: Let's define path to file with click data and case class for a convinient way of reading data. 

In [None]:
import scala.io.Source

val eventsPath = "./events.csv"
case class Event(timeStamp: Long, visitorId: Int, event: String, itemId: Int, orderId: Option[Int]) // to store Events

It's time to read and parse data.

In [2]:
val events = Source.fromFile(eventsPath)
    .getLines
    .toSeq
    .tail // skip first line with columns' names
    .map(_.split(",", -1)) 
    .map{ case Array(timeStamp, visitorId, event, itemId, orderId) => 
        Event(timeStamp.toLong, visitorId.toInt, event, itemId.toInt, if(orderId == "") None else Some(orderId.toInt))}


[36mevents[39m: [32mSeq[39m[[32mEvent[39m] = [33mStream[39m(
  [33mEvent[39m([32m1433221332117L[39m, [32m257597[39m, [32m"view"[39m, [32m355908[39m, None),
  [33mEvent[39m([32m1433224214164L[39m, [32m992329[39m, [32m"view"[39m, [32m248676[39m, None),
  [33mEvent[39m([32m1433221999827L[39m, [32m111016[39m, [32m"view"[39m, [32m318965[39m, None),
  [33mEvent[39m([32m1433221955914L[39m, [32m483717[39m, [32m"view"[39m, [32m253185[39m, None),
  [33mEvent[39m([32m1433221337106L[39m, [32m951259[39m, [32m"view"[39m, [32m367447[39m, None),
  [33mEvent[39m([32m1433224086234L[39m, [32m972639[39m, [32m"view"[39m, [32m22556[39m, None),
  [33mEvent[39m([32m1433221923240L[39m, [32m810725[39m, [32m"view"[39m, [32m443030[39m, None),
  [33mEvent[39m([32m1433223291897L[39m, [32m794181[39m, [32m"view"[39m, [32m439202[39m, None),
  [33mEvent[39m([32m1433220899221L[39m, [32m824915[39m, [32m"view"[39m, [32m428805

A visitor's interest may vary over time. Let's break all log data into sessions for all users.

In [48]:
def sessionize(events: Seq[Event], delay: Long, sessionMaxSize: Int = 1000): Seq[(Int, Seq[Seq[Event]])] = 
    events.groupBy(_.visitorId) 
    // delete  visitors with single clicks 
    .filter{ case(visitorId, visitorEvents) => visitorEvents.size > 1 && visitorEvents.size < sessionMaxSize}
    .flatMap{ case (visitorId, visitorEvents) => 
        val sessions: Option[Seq[Seq[Event]]] = visitorEvents
        .sortBy(_.timeStamp)
        .sliding(2) //make pairs of sorted clicks
        .filterNot{ case Seq(a,b) => a.event == b.event && a.itemId == b.itemId}
        .filter{ case Seq(a,b) => b.timeStamp - a.timeStamp < delay} // delay - is maximum amount of inactivity between clicks 
        .map(Seq(_))
        .reduceOption{ (left, right) => // merge adjacent clicks into sessions
            if (left.last(1) == right.head(0)) left.init :+ (left.last :+ right.head(1))
            else left :+ right.head }
        .filter(_.size < sessionMaxSize) //filter extreme session with high number of clicks
       sessions.map{ s => (visitorId, s) }} // generate sessions for each visitorId
    .toSeq

val sessionizedEvents = sessionize(events, 2 * 3600 * 1000) // our delay of inactivity is 2 hours 

defined [32mfunction[39m [36msessionize[39m
[36msessionizedEvents[39m: [32mSeq[39m[([32mInt[39m, [32mSeq[39m[[32mSeq[39m[[32mEvent[39m]])] = [33mArrayBuffer[39m(
  (
    [32m1253013[39m,
    [33mList[39m(
      [33mStream[39m(
        [33mEvent[39m([32m1437856704939L[39m, [32m1253013[39m, [32m"view"[39m, [32m214357[39m, None),
        [33mEvent[39m([32m1437856765656L[39m, [32m1253013[39m, [32m"view"[39m, [32m203772[39m, None),
        [33mEvent[39m([32m1437857105399L[39m, [32m1253013[39m, [32m"view"[39m, [32m262407[39m, None)
      ),
      [33mStream[39m(
        [33mEvent[39m([32m1437857105399L[39m, [32m1253013[39m, [32m"view"[39m, [32m262407[39m, None),
        [33mEvent[39m([32m1437857205669L[39m, [32m1253013[39m, [32m"view"[39m, [32m214357[39m, None),
[33m...[39m

Before splitting sessions into train and test, we will examine some statistics about last digit of visitorIds.

In [49]:
sessionizedEvents.map(_._1 % 10)
 .groupBy(x => x)
 .map{ case(key, values) => (key, values.size)  }

[36mres48[39m: [32mMap[39m[[32mInt[39m, [32mInt[39m] = [33mMap[39m(
  [32m0[39m -> [32m24052[39m,
  [32m5[39m -> [32m23961[39m,
  [32m1[39m -> [32m24017[39m,
  [32m6[39m -> [32m24126[39m,
  [32m9[39m -> [32m24188[39m,
  [32m2[39m -> [32m24324[39m,
  [32m7[39m -> [32m23913[39m,
  [32m3[39m -> [32m24296[39m,
  [32m8[39m -> [32m24548[39m,
  [32m4[39m -> [32m23897[39m
)

As you can see, this digit is evenly distributed. We will use this information to split sessions into train/test datasets. 

In [51]:
def splitDataSet(sessionizedEvents: Seq[(Int, Seq[Seq[Event]])], testProportion: Double): 
(Seq[(Int, Seq[Seq[Event]])],Seq[(Int, Seq[Seq[Event]])]) = {
    require(testProportion >= 0 && testProportion <= 1.0, "proportion must be in [0,1]")
    val train = sessionizedEvents.filter{ case (visitorId, visitorSessions) => visitorId % 100 > (1 - testProportion * 100)}
    val test = sessionizedEvents.filter{ case (visitorId, visitorSessions) => visitorId % 100 <= testProportion * 100 }
    (train, test)
} 

val (train, test) = splitDataSet(sessionizedEvents, 0.2) // 20% for test
val trainSessions = train.flatMap{ case(visitorId, visitorSessions) => visitorSessions}
val testSessions = test.flatMap{ case(visitorId, visitorSessions) => visitorSessions}


489163


defined [32mfunction[39m [36msplitDataSet[39m
[36mtrain[39m: [32mSeq[39m[([32mInt[39m, [32mSeq[39m[[32mSeq[39m[[32mEvent[39m]])] = [33mArrayBuffer[39m(
  (
    [32m1253013[39m,
    [33mList[39m(
      [33mStream[39m(
        [33mEvent[39m([32m1437856704939L[39m, [32m1253013[39m, [32m"view"[39m, [32m214357[39m, None),
        [33mEvent[39m([32m1437856765656L[39m, [32m1253013[39m, [32m"view"[39m, [32m203772[39m, None),
        [33mEvent[39m([32m1437857105399L[39m, [32m1253013[39m, [32m"view"[39m, [32m262407[39m, None)
      ),
      [33mStream[39m(
        [33mEvent[39m([32m1437857105399L[39m, [32m1253013[39m, [32m"view"[39m, [32m262407[39m, None),
        [33mEvent[39m([32m1437857205669L[39m, [32m1253013[39m, [32m"view"[39m, [32m214357[39m, None),
[33m...[39m
[36mtest[39m: [32mSeq[39m[([32mInt[39m, [32mSeq[39m[[32mSeq[39m[[32mEvent[39m]])] = [33mArrayBuffer[39m(
  (
    [32m1253013[39m,
    [3

In [52]:
println("train sessions: " + trainSessions.size)
println("test sessions: " + testSessions.size)

train sessions: 489163
test sessions: 102563


Now recommendations will be generated from the train data. I use a simple strategy: analyse co-occurrence of items in sessions  with one condition: potential recommended item (itemRight) should be clicked after item (itemLeft), for which we are going to recommend something  

In [53]:
case class Rec(itemIdRight: Int, weight: Float) // class to store recommendations to particular itemIdLeft

def itemToItem(sessions: Seq[Seq[Event]], leftEvent: String, rightEvent: String, 
               minCount: Int = 2, minimumRecs: Int = 3): Map[Int,Seq[Rec]] = {
    
    val itemToItemCounts: Seq[(Int, Int, Int)] = sessions
     .flatMap{ eventSession => 
         for { left <- eventSession 
               right <- eventSession
               if left.event == leftEvent
               if right.event == rightEvent
               if left.timeStamp < right.timeStamp  
            } yield ((left.itemId, right.itemId), 1)} 
     .groupBy(_._1)
     .map{ case((itemIdLeft, itemIdRight), counts) => (itemIdLeft, itemIdRight, counts.map(_._2).sum)} 
     .toSeq
    
    itemToItemCounts
     .filter{ case(itemIdLeft, itemIdRight, counts) => counts >= minCount }    
     .groupBy{ case(itemIdLeft, itemIdRight, counts) => itemIdLeft }
     .filter{ case (itemIdLeft, rights) => rights.size >= minimumRecs }
     .map{ case (itemIdLeft, rights) => 
         val totalCount = rights.map{ case(itemIdLeft, itemIdRight, counts) => counts }.sum
         val recs = rights.map{ case(itemIdLeft, itemIdRight, counts) => Rec(itemIdRight, counts.toFloat / totalCount)}
          .sortBy(_.weight) //sorting to get top recs easier 
          .reverse 
         itemIdLeft -> recs}
}

defined [32mclass[39m [36mRec[39m
defined [32mfunction[39m [36mitemToItem[39m

In [54]:
val itemToItemRecs = itemToItem(trainSessions, "view", "view")
itemToItemRecs.toSeq.size

[36mitemToItemRecs[39m: [32mMap[39m[[32mInt[39m, [32mSeq[39m[[32mRec[39m]] = [33mMap[39m(
  [32m109603[39m -> [33mList[39m(
    [33mRec[39m([32m421284[39m, [32m0.42857143F[39m),
    [33mRec[39m([32m356981[39m, [32m0.2857143F[39m),
    [33mRec[39m([32m172699[39m, [32m0.2857143F[39m)
  ),
  [32m360487[39m -> [33mList[39m(
    [33mRec[39m([32m257040[39m, [32m0.09565217F[39m),
    [33mRec[39m([32m309778[39m, [32m0.09565217F[39m),
    [33mRec[39m([32m135900[39m, [32m0.073913045F[39m),
    [33mRec[39m([32m354555[39m, [32m0.052173913F[39m),
    [33mRec[39m([32m414410[39m, [32m0.047826085F[39m),
[33m...[39m
[36mres53_1[39m: [32mInt[39m = [32m12930[39m

Ok, we have train and test datasets, recommendations based on the first one. I'm going to eavluate them. My key metrics are Recall and Empty recommendations. Recall = get top n recommendation (top@n) for each click (view events) and count a hit if the next click (itemId) are in top@n recommandations. Empty recs = percent of empty recommendations for particular itemId in session.

In [55]:
case class Evaluate(hits: Int, emptyRecs: Int, total: Int){
    def +(that: Evaluate): Evaluate = Evaluate(this.hits + that.hits, emptyRecs + that.emptyRecs, total + that.total)
    override def toString = "Recall: %f  Empty recs: %f".format(hits.toDouble / total.toDouble, emptyRecs.toDouble / total.toDouble)
}

def evaluateNextView(sessions: Seq[Seq[Event]],recs: Map[Int, Seq[Rec]], top: Int = 3): Evaluate = {
    sessions.map( _.filter(_.event == "view"))
     .filter(_.size > 1)
     .map(_.map(_.itemId))
     .flatMap(_.sliding(2))
     .map{ case Seq(currentItemId, toPredictItemId) => 
            val recsForCurrentItemId: Option[Seq[Rec]] = recs.get(currentItemId).map(_.take(top)).map(_.filter(_.itemIdRight == toPredictItemId))
         recsForCurrentItemId match {
             case Some(Seq(x)) => Evaluate(1, 0, 1) // hit
             case Some(Seq()) => Evaluate(0, 0, 1) //missed recs
             case None => Evaluate(0, 1, 1) //empty recs
         }}
    .reduce(_+_)
}

defined [32mclass[39m [36mEvaluate[39m
defined [32mfunction[39m [36mevaluateNextView[39m

In [56]:
println("Evaluation for the train dataset " + evaluateNextView(trainSessions,itemToItemRecs))
println("Evaluation for the test dataset " + evaluateNextView(testSessions,itemToItemRecs))

Evaluation for the train dataset Recall: 0.194188  Empty recs: 0.418063
Evaluation for the test dataset Recall: 0.196900  Empty recs: 0.417510
