# Project 1

The goal of this assignment is to help you build your intuition about recommender systems, with a basic soup to nuts implementation coded “from scratch.”

Your task is to build a very basic recommender system, first by writing your own functions, then by replacing those functions with those provided in an R Package or a Python library (such as scikit-learn).

* You should very briefly first describe the recommender system that you’re going to build out from a business perspective, e.g. “This system recommends movies to users.”
* You can find a dataset, or build out your own toy dataset and load into (for example) an R or pandas dataframe, a Python dictionary or list of lists, (or other data structure of your choosing).
* You can use either collaborative filtering, or a hybrid of content management and collaborative filtering. 
* You are encouraged to hand code at least your similarity function.
* After you have built out your own code base, create an alternate version using packages or libraries.  Compare the results and performance.
* You are also encouraged to think about how to best handle missing data.
* Your code should be turned in an RMarkdown file or a Jupyter notebook, and posted to Github.


** Requires the Jupyter-Scala language Kernel, available from: (https://github.com/alexarchambault/jupyter-scala)[https://github.com/alexarchambault/jupyter-scala]

In [51]:
classpath.add( "org.apache.spark" %% "spark-core" % "1.6.1",
             "org.apache.spark" %% "spark-mllib" % "1.6.1",
              "org.apache.spark" %% "spark-sql" % "1.6.1")

0 new artifact(s)




# Response

## The Recommender System

As I'm farily new to Spark and the whole data manipulation world in Scala, let's keep the problem simple. This is a system that recommends movies to users based on the dataset collected by the class survey.

As part of this exercise, I will produce a manual similarity function and compare the performance against the collaborative filtering library in Spark

## The Code

### Firing up a Spark Context

In [52]:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.{MatrixEntry, RowMatrix}

[32mimport [36morg.apache.spark.{SparkConf, SparkContext}[0m
[32mimport [36morg.apache.spark.sql._[0m
[32mimport [36morg.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType}[0m
[32mimport [36morg.apache.spark.mllib.linalg.Vectors[0m
[32mimport [36morg.apache.spark.mllib.linalg.distributed.{MatrixEntry, RowMatrix}[0m

In [53]:


val conf = new SparkConf()
  .setAppName("week1-EstimatePi")
  .setMaster("local") 
val sc = new SparkContext(conf)


[36mconf[0m: org.apache.spark.SparkConf = org.apache.spark.SparkConf@582f9904
[36msc[0m: org.apache.spark.SparkContext = org.apache.spark.SparkContext@2d1c95ef

### Data Loading and Transformations

The objective here is to:

* Load the `MovieRatings.csv` file
* Transform into Zero filled matrix
* Transform into Long-format data structure


In [54]:
// Read the CSV file
val csv = 
    sc
        .textFile("MovieRatings.csv")
        .map(line => 
             line
                 .replaceAll(",$",", ")
                 .split(",")
                 .map(t => t.trim)
            )
csv.collect


[36mcsv[0m: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at Main.scala:29
[36mres53_1[0m: Array[Array[String]] = [33mArray[0m(
  [33mArray[0m(
    [32m"Critic"[0m,
    [32m"CaptainAmerica"[0m,
    [32m"Deadpool"[0m,
    [32m"Frozen"[0m,
    [32m"JungleBook"[0m,
    [32m"PitchPerfect2"[0m,
    [32m"StarWarsForce"[0m
  ),
  [33mArray[0m([32m"Burton"[0m, [32m""[0m, [32m""[0m, [32m""[0m, [32m"4"[0m, [32m""[0m, [32m"4"[0m),
  [33mArray[0m([32m"Charley"[0m, [32m"4"[0m, [32m"5"[0m, [32m"4"[0m, [32m"3"[0m, [32m"2"[0m, [32m"3"[0m),
  [33mArray[0m([32m"Dan"[0m, [32m""[0m, [32m"5"[0m, [32m""[0m, [32m""[0m, [32m""[0m, [32m"5"[0m),
  [33mArray[0m([32m"Dieudonne"[0m, [32m"5"[0m, [32m"4"[0m, [32m""[0m, [32m""[0m, [32m""[0m, [32m"5"[0m),
  [33mArray[0m([32m"Matt"[0m, [32m"4"[0m, [32m""[0m, [32m"2"[0m, [32m""[0m, [32m"2"[0m, [32m"5"[0m),
  [33mArray[0m([32m"Mauricio"[0m, [3

#### Transforming into Zero-filled Matrix

In [55]:
//val movies = sc.parallelize(csv.first)
val movies = csv.first.splitAt(1)._2
val critics = csv.collect.map(_(0)).splitAt(1)._2

// let's also make a parallelized version of those
val moviesPar = sc.parallelize(movies).zipWithIndex
val criticsPar = sc.parallelize(critics).zipWithIndex

[36mmovies[0m: Array[String] = [33mArray[0m(
  [32m"CaptainAmerica"[0m,
  [32m"Deadpool"[0m,
  [32m"Frozen"[0m,
  [32m"JungleBook"[0m,
  [32m"PitchPerfect2"[0m,
  [32m"StarWarsForce"[0m
)
[36mcritics[0m: Array[String] = [33mArray[0m(
  [32m"Burton"[0m,
  [32m"Charley"[0m,
  [32m"Dan"[0m,
  [32m"Dieudonne"[0m,
  [32m"Matt"[0m,
  [32m"Mauricio"[0m,
  [32m"Max"[0m,
  [32m"Nathan"[0m,
  [32m"Param"[0m,
  [32m"Parshu"[0m,
  [32m"Prashanth"[0m,
  [32m"Shipra"[0m,
  [32m"Sreejaya"[0m,
  [32m"Steve"[0m,
  [32m"Vuthy"[0m,
  [32m"Xingjia"[0m
)
[36mmoviesPar[0m: org.apache.spark.rdd.RDD[(String, Long)] = ZippedWithIndexRDD[4] at zipWithIndex at Main.scala:38
[36mcriticsPar[0m: org.apache.spark.rdd.RDD[(String, Long)] = ZippedWithIndexRDD[6] at zipWithIndex at Main.scala:41

In [56]:
val zeroFilledMatrix = 
    csv
        .collect
        .filterNot(r => r(1) == movies(0)) // filter out first row
        .map(r => r.filterNot(value => critics contains value)) // map function that returns the record minus the critic name
        .map(r => r.map(value => if (value == "") 0.00 else value.toDouble))

// and now parallelize it
val zeroFilledMatrixPar = sc.parallelize(zeroFilledMatrix)

// now, let's convert it into a linalg matrix so we can perform linear algebra operations on it
val criticMoviesMatrix = new RowMatrix(zeroFilledMatrixPar.map(line => Vectors.dense(line)))


// also, let's have a transposed version of it:
val dataTransposed =  sc.parallelize(zeroFilledMatrix.toSeq.transpose)

val moviesCriticMatrix = new RowMatrix(dataTransposed.map(line => Vectors.dense(line.toArray)))


[36mzeroFilledMatrix[0m: Array[Array[Double]] = [33mArray[0m(
  [33mArray[0m([32m0.0[0m, [32m0.0[0m, [32m0.0[0m, [32m4.0[0m, [32m0.0[0m, [32m4.0[0m),
  [33mArray[0m([32m4.0[0m, [32m5.0[0m, [32m4.0[0m, [32m3.0[0m, [32m2.0[0m, [32m3.0[0m),
  [33mArray[0m([32m0.0[0m, [32m5.0[0m, [32m0.0[0m, [32m0.0[0m, [32m0.0[0m, [32m5.0[0m),
  [33mArray[0m([32m5.0[0m, [32m4.0[0m, [32m0.0[0m, [32m0.0[0m, [32m0.0[0m, [32m5.0[0m),
  [33mArray[0m([32m4.0[0m, [32m0.0[0m, [32m2.0[0m, [32m0.0[0m, [32m2.0[0m, [32m5.0[0m),
  [33mArray[0m([32m4.0[0m, [32m0.0[0m, [32m3.0[0m, [32m3.0[0m, [32m4.0[0m, [32m0.0[0m),
  [33mArray[0m([32m4.0[0m, [32m4.0[0m, [32m4.0[0m, [32m2.0[0m, [32m2.0[0m, [32m4.0[0m),
  [33mArray[0m([32m0.0[0m, [32m0.0[0m, [32m0.0[0m, [32m0.0[0m, [32m0.0[0m, [32m4.0[0m),
  [33mArray[0m([32m4.0[0m, [32m4.0[0m, [32m1.0[0m, [32m0.0[0m, [32m0.0[0m, [32m5.0[0m),
  [33mArray[0m

#### Transforming into a Long-format data structure

For practical purposes, we'll do an index-based long format, meaning that the string names will be substituted for an index

In [57]:
val longFormat = 
    csv
        .collect
        .filterNot(r => r(1) == movies(0)) // filter out first row
        .flatMap(r=> (1 to movies.length-1).map(i=> (r(0),movies(i-1),r(i)))) // pivot each column sothat we have: (user,movie,rating)
        .filter(r=> r._3 !="") // filter out those unrated movies
        .map(r=> (critics.indexOf(r._1),movies.indexOf(r._2),r._3.toDouble)) // convert the remaining rating to a double
                           
val ratingsLong = sc.parallelize(longFormat)

[36mlongFormat[0m: Array[(Int, Int, Double)] = [33mArray[0m(
  [33m[0m([32m0[0m, [32m3[0m, [32m4.0[0m),
  [33m[0m([32m1[0m, [32m0[0m, [32m4.0[0m),
  [33m[0m([32m1[0m, [32m1[0m, [32m5.0[0m),
  [33m[0m([32m1[0m, [32m2[0m, [32m4.0[0m),
  [33m[0m([32m1[0m, [32m3[0m, [32m3.0[0m),
  [33m[0m([32m1[0m, [32m4[0m, [32m2.0[0m),
  [33m[0m([32m2[0m, [32m1[0m, [32m5.0[0m),
  [33m[0m([32m3[0m, [32m0[0m, [32m5.0[0m),
  [33m[0m([32m3[0m, [32m1[0m, [32m4.0[0m),
  [33m[0m([32m4[0m, [32m0[0m, [32m4.0[0m),
  [33m[0m([32m4[0m, [32m2[0m, [32m2.0[0m),
  [33m[0m([32m4[0m, [32m4[0m, [32m2.0[0m),
  [33m[0m([32m5[0m, [32m0[0m, [32m4.0[0m),
  [33m[0m([32m5[0m, [32m2[0m, [32m3.0[0m),
  [33m[0m([32m5[0m, [32m3[0m, [32m3.0[0m),
  [33m[0m([32m5[0m, [32m4[0m, [32m4.0[0m),
  [33m[0m([32m6[0m, [32m0[0m, [32m4.0[0m),
  [33m[0m([32m6[0m, [32m1[0m, [32m4.0[0m),
  [33m[0m([32m

## Model Building - Manual Recommendations System


### Critic-Critic Similarity Model

Let's now build a User-User similarity model based on the cosine distance

In [58]:
val userUserCosineDistance = moviesCriticMatrix.columnSimilarities()

val userUserSimilarities = userUserCosineDistance
  .entries
  .map {
    case MatrixEntry(i, j, u) => (i, j, u) }
  .collect
  .map(r => Seq(critics(r._1.toInt), critics(r._2.toInt), r._3.toDouble))
  .sortBy(-_(2).asInstanceOf[Double])

[36muserUserCosineDistance[0m: org.apache.spark.mllib.linalg.distributed.CoordinateMatrix = org.apache.spark.mllib.linalg.distributed.CoordinateMatrix@44033cf3
[36muserUserSimilarities[0m: Array[Seq[Any]] = [33mArray[0m(
  [33mList[0m(Dieudonne, Param, 0.9859249803487347),
  [33mList[0m(Max, Sreejaya, 0.9847319278346619),
  [33mList[0m(Charley, Max, 0.9811873171500672),
  [33mList[0m(Charley, Sreejaya, 0.9792633226865932),
  [33mList[0m(Charley, Prashanth, 0.9610484599102903),
  [33mList[0m(Parshu, Prashanth, 0.9600666937386864),
  [33mList[0m(Parshu, Sreejaya, 0.955672134494952),
  [33mList[0m(Charley, Parshu, 0.9474847084398104),
  [33mList[0m(Max, Prashanth, 0.9410294354946785),
  [33mList[0m(Max, Parshu, 0.9296599791147713),
  [33mList[0m(Prashanth, Sreejaya, 0.9293555142631518),
  [33mList[0m(Charley, Vuthy, 0.927771250724491),
  [33mList[0m(Matt, Steve, 0.9091372900969895),
  [33mList[0m(Shipra, Xingjia, 0.8999999999999999),
  [33mList[0m(Sreej

### Putting Together a Recommender System

Let's try the following strategy:

* For a given user, find simiar users. A similar user is another user with a cosine distance of at least 0.6
* Make a movie rating recommendation based on the average of the similar user-group

First, let's Generate Movie ratings for all critics:

In [59]:
def recommendMoviesForUser(user:String,userUserSimilarities:Array[Seq[Any]],ratings:Array[Array[Double]])={
    val similarUsers = userUserSimilarities
        .filter(r=> r(0) == user || r(1)==user).map(r=>(if (r(1)==user) r(0) else r(1), r(2).asInstanceOf[Double]))
        .filter(_._2>0.6)
        .map(r=> (critics.indexOf(r._1),r._2))
        .sortBy(-_._2)

    val similarRatings = ratings
        .zipWithIndex // add user indexes
        .filter(r=>similarUsers.map(_._1) contains r._2) // filter out based on the index
        .map(_._1) // revert back to the array
    val meanRating = (0 to (movies.length-1)).map{
        m => 
            val nonZeroRatings = similarRatings.map(c => c(m)).filter(_>0)
            nonZeroRatings.sum/nonZeroRatings.length
    }.toArray
    meanRating
}


val predictedCriticRatings = critics.map(c=>recommendMoviesForUser(c,userUserSimilarities,zeroFilledMatrix))

defined [32mfunction [36mrecommendMoviesForUser[0m
[36mpredictedCriticRatings[0m: Array[Array[Double]] = [33mArray[0m(
  [33mArray[0m([32m4.0[0m, [32m3.0[0m, [32m4.5[0m, [32m5.0[0m, [32m2.0[0m, [32m3.3333333333333335[0m),
  [33mArray[0m(
    [32m4.333333333333333[0m,
    [32m4.375[0m,
    [32m3.5555555555555554[0m,
    [32m3.857142857142857[0m,
    [32m2.8333333333333335[0m,
    [32m4.333333333333333[0m
  ),
  [33mArray[0m([32m4.4[0m, [32m4.4[0m, [32m3.5[0m, [32m3.0[0m, [32m2.6666666666666665[0m, [32m4.333333333333333[0m),
  [33mArray[0m([32m4.25[0m, [32m4.428571428571429[0m, [32m3.7142857142857144[0m, [32m3.8[0m, [32m2.4[0m, [32m4.2[0m),
  [33mArray[0m(
    [32m4.333333333333333[0m,
    [32m4.285714285714286[0m,
    [32m3.857142857142857[0m,
    [32m3.6666666666666665[0m,
    [32m2.8[0m,
    [32m4.111111111111111[0m
  ),
[33m...[0m

Let's now calculate the mean squared error for this model

In [60]:
val MSE = (0 to critics.length-1).map{ 
    c => 
        val row = (0 to movies.length-1).map{ 
            m =>
                val err = predictedCriticRatings(c)(m) - zeroFilledMatrix(c)(m)
                err * err
        }
        row.sum / row.length
}
MSE.sum/MSE.length

[36mMSE[0m: collection.immutable.IndexedSeq[Double] = [33mVector[0m(
  [32m8.449074074074074[0m,
  [32m0.6510305125136474[0m,
  [32m8.087592592592593[0m,
  [32m5.897015306122449[0m,
  [32m6.133667590492988[0m,
  [32m6.671012849584277[0m,
  [32m0.8786625514403292[0m,
  [32m9.922453703703702[0m,
  [32m4.949259259259259[0m,
  [32m1.3704329386075418[0m,
  [32m2.003435715335517[0m,
  [32m7.851481481481482[0m,
  [32m1.0986684565801628[0m,
  [32m7.296342592592592[0m,
  [32m2.957535903250189[0m,
  [32m8.578703703703704[0m
)
[36mres59_1[0m: Double = [32m5.174773076958407[0m

### Querying the Models

* User: Who should Mauricio go out to the movies with?

In [61]:
val user= "Mauricio"
userUserSimilarities
    .filter(r=> r(0) == user || r(1)==user).map(r=>(if (r(1)==user) r(0) else r(1), r(2).asInstanceOf[Double]))
    .filter(_._2>0.75)
    .sortBy(-_._2)

[36muser[0m: String = [32m"Mauricio"[0m
[36mres60_1[0m: Array[(Any, Double)] = [33mArray[0m(
  [33m[0m(Parshu, [32m0.8140806303599618[0m),
  [33m[0m(Vuthy, [32m0.7888934916555406[0m),
  [33m[0m(Sreejaya, [32m0.7754763931697963[0m)
)

* What movies should Mauricio watch?

In [62]:
recommendMoviesForUser("Mauricio",userUserSimilarities,zeroFilledMatrix).zipWithIndex.map(r => (movies(r._2),r._1)).sortBy(-_._2)

[36mres61[0m: Array[(String, Double)] = [33mArray[0m(
  [33m[0m([32m"Deadpool"[0m, [32m4.5[0m),
  [33m[0m([32m"CaptainAmerica"[0m, [32m4.285714285714286[0m),
  [33m[0m([32m"Frozen"[0m, [32m4.0[0m),
  [33m[0m([32m"StarWarsForce"[0m, [32m4.0[0m),
  [33m[0m([32m"JungleBook"[0m, [32m3.6666666666666665[0m),
  [33m[0m([32m"PitchPerfect2"[0m, [32m2.5[0m)
)

## Using Collaborative Filtering ALS Model

Based on: http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html

In [63]:
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
import org.apache.spark.mllib.recommendation.Rating

val ratingsALS = ratingsLong.map(r=>Rating(r._1, r._2, r._3))

// Build the recommendation model using ALS
val rank = 10
val numIterations = 10
val model = ALS.train(ratingsALS, rank, numIterations, 0.01)


[32mimport [36morg.apache.spark.mllib.recommendation.ALS[0m
[32mimport [36morg.apache.spark.mllib.recommendation.MatrixFactorizationModel[0m
[32mimport [36morg.apache.spark.mllib.recommendation.Rating[0m
[36mratingsALS[0m: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating] = MapPartitionsRDD[17] at map at Main.scala:35
[36mrank[0m: Int = [32m10[0m
[36mnumIterations[0m: Int = [32m10[0m
[36mmodel[0m: org.apache.spark.mllib.recommendation.MatrixFactorizationModel = org.apache.spark.mllib.recommendation.MatrixFactorizationModel@6b9230e

### Querying the ALS Model

Let's get:

* Recommend 5 movies for Mau

In [66]:
//What movies 
model.recommendProducts(5,critics.indexOf("Mauricio")).map(r => (critics(r.user), movies(r.product), r.rating)) 

model.recommendUsersForProducts(3).collect.flatMap(m=>m._2.map(r=>(movies(m._1),critics(r.user),r.rating)))

model.recommendProductsForUsers(3).collect.flatMap(m=>m._2.map(r=>(movies(r.product),critics(m._1),r.rating)))


: 

In [None]:
sc.stop

### Model Performance

In [65]:
// Evaluate the model on rating data
val criticsMovies = ratingsALS.map { case Rating(critic, movie, rate) =>
  (critic, movie)
}
val predictions =
  model.predict(criticsMovies).map { case Rating(critic, movie, rate) =>
    ((critic, movie), rate)
  }
val ratesAndPredictions = ratingsALS.map { case Rating(critic, movie, rate) =>
  ((critic, movie), rate)
}.join(predictions)
val MSE = ratesAndPredictions.map { case ((critic, movie), (r1, r2)) =>
  val err = (r1 - r2)
  err * err
}.mean()

: 

# Conclusion

* Clearly the ALS model (MSE=9.456951543917609E-5) beats the manual model (MSE=5.174773076958407)
* Alternative recommendation strategies can be explored such as nearest neighbors
