### Find similar movies based on cosine similarity

In [1]:
// Workaround notebook namespace conflict issue
val spark2 = spark
import spark2.implicits._

In [2]:
val movieDf = (spark.read
                    .format("CSV")
                    .option("header", "true")
                    .option("inferSchema", "true")
                    .load("ml-latest-small/movies.csv"))
movieDf.show(truncate=false)

+-------+-------------------------------------+-------------------------------------------+
|movieId|title                                |genres                                     |
+-------+-------------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                     |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                       |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)              |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)             |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)   |Comedy                                     |
|6      |Heat (1995)                          |Action|Crime|Thriller                      |
|7      |Sabrina (1995)                       |Comedy|Romance                             |
|8      |Tom and Huck (1995)                  |Adventure|Children               

In [3]:
val ratingDf = (spark.read
                    .format("CSV")
                    .option("header", "true")
                    .option("inferSchema", "true")
                    .load("ml-latest-small/ratings.csv")
                    .select("userId", "movieId", "rating"))
ratingDf.show(false)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|1     |31     |2.5   |
|1     |1029   |3.0   |
|1     |1061   |3.0   |
|1     |1129   |2.0   |
|1     |1172   |4.0   |
|1     |1263   |2.0   |
|1     |1287   |2.0   |
|1     |1293   |2.0   |
|1     |1339   |3.5   |
|1     |1343   |2.0   |
|1     |1371   |2.5   |
|1     |1405   |1.0   |
|1     |1953   |4.0   |
|1     |2105   |4.0   |
|1     |2150   |3.0   |
|1     |2193   |2.0   |
|1     |2294   |2.0   |
|1     |2455   |2.5   |
|1     |2968   |1.0   |
|1     |3671   |3.0   |
+------+-------+------+
only showing top 20 rows



In [4]:
// Get (userId, (movieId, rating))
val ratings = ratingDf.as[(Int, Int, Double)].rdd.map(line => (line._1, (line._2, line._3)))

In [5]:
// Get (userId, ((movie1, rating1), (movie2, rating2)))
val joinedRatings = ratings.join(ratings)

In [6]:
// Filter out duplicate pairs: movie1 < movie2
val uniqueJoinedRatings = joinedRatings.filter(x => x._2._1._1 < x._2._2._1)

In [7]:
// Now we have ((movie1, movie2), (rating1, rating2))
val moviePairs = uniqueJoinedRatings.map(x => ((x._2._1._1, x._2._2._1), (x._2._1._2, x._2._2._2)))

In [8]:
val moviePairRatings = moviePairs.groupByKey()

In [9]:
import scala.math.{sqrt, round}

type RatingPair = (Double, Double)
type RatingPairs = Iterable[RatingPair]

def computeCosineSimilarity(ratingPairs: RatingPairs): (Double, Int) = {
    var numPairs: Int = 0
    var sum_xx: Double = 0.0
    var sum_yy: Double = 0.0
    var sum_xy: Double = 0.0

    for (pair <- ratingPairs) {
        val ratingX = pair._1
        val ratingY = pair._2

        sum_xx += ratingX * ratingX
        sum_yy += ratingY * ratingY
        sum_xy += ratingX * ratingY
        numPairs += 1
    }

    val numerator: Double = sum_xy
    val denominator: Double = sqrt(sum_xx) * sqrt(sum_yy)
    var score: Double = 0.0
    if (denominator != 0) {
        score = numerator / denominator
    }

    (score, numPairs)
}

In [10]:
val moviePairSimilarities = moviePairRatings.mapValues(computeCosineSimilarity).cache()

In [11]:
// Now we have ((movie1, movie2), (similarity, strength))
moviePairSimilarities.take(10)

Array(((782,2125),(0.9899494936611665,2)), ((2478,3734),(0.9997418355449429,2)), ((112552,161155),(1.0,1)), ((4471,8641),(1.0,1)), ((1334,1777),(0.9734793000685468,5)), ((3713,157296),(1.0,1)), ((1997,100556),(0.9847835588179369,2)), ((8207,64957),(1.0,1)), ((2253,2542),(0.9293403409880338,4)), ((1610,30749),(0.9655370324381085,7)))

In [12]:
// List all Star Wars movies
movieDf.filter($"title".contains("Star Wars")).show(false)

+-------+-------------------------------------------------------------+------------------------------------+
|movieId|title                                                        |genres                              |
+-------+-------------------------------------------------------------+------------------------------------+
|260    |Star Wars: Episode IV - A New Hope (1977)                    |Action|Adventure|Sci-Fi             |
|1196   |Star Wars: Episode V - The Empire Strikes Back (1980)        |Action|Adventure|Sci-Fi             |
|1210   |Star Wars: Episode VI - Return of the Jedi (1983)            |Action|Adventure|Sci-Fi             |
|2628   |Star Wars: Episode I - The Phantom Menace (1999)             |Action|Adventure|Sci-Fi             |
|5378   |Star Wars: Episode II - Attack of the Clones (2002)          |Action|Adventure|Sci-Fi|IMAX        |
|33493  |Star Wars: Episode III - Revenge of the Sith (2005)          |Action|Adventure|Sci-Fi             |
|61160  |Star Wars:

In [13]:
def similarMovies(mid: Int, strength: Int, top: Int) = {
    val filteredResults = moviePairSimilarities.filter(x =>
        {
            val pair = x._1
            val sim = x._2
            (pair._1 == mid || pair._2 == mid) && sim._2 > strength
        }
    )
    
    def selectId(x: (Int, Int)) = if (x._1 == mid) x._2 else x._1

    val results = (filteredResults.map(x => (x._2, x._1))
                                  .sortByKey(ascending=false)
                                  .take(top)
                                  .map(x => (selectId(x._2), round(x._1._1*100000000.0)/100000000.0, x._1._2)))
                                  
    val resultsDf = spark.createDataFrame(results).toDF("id", "score", "strength")
    
    val joinedDf = resultsDf.join(movieDf, movieDf("movieId") === resultsDf("id")).select("title", "score", "strength")
    
    joinedDf
}

In [14]:
// Find top 30 movies similar to Star Wars (1977) with strength > 70
val result = similarMovies(260, 70, 30)
result.show(30, false)

+------------------------------------------------------------------------------+----------+--------+
|title                                                                         |score     |strength|
+------------------------------------------------------------------------------+----------+--------+
|Star Wars: Episode V - The Empire Strikes Back (1980)                         |0.98964027|203     |
|Star Wars: Episode VI - Return of the Jedi (1983)                             |0.9891532 |187     |
|Raiders of the Lost Ark (Indiana Jones and the Raiders of the Lost Ark) (1981)|0.98287535|177     |
|Dark Knight, The (2008)                                                       |0.98163928|74      |
|Lord of the Rings: The Fellowship of the Ring, The (2001)                     |0.98024039|147     |
|Hunt for Red October, The (1990)                                              |0.97716648|73      |
|Godfather, The (1972)                                                         |0.97691802|

In [15]:
// Let's consider the useful information in Genres column!

def similarMoviesGen(mid: Int) = {
    
    val filteredResults = moviePairSimilarities.filter(x =>
        {
            val pair = x._1
            pair._1 == mid || pair._2 == mid
        }
    )
    
    def selectId(x: (Int, Int)) = if (x._1 == mid) x._2 else x._1

    val results = filteredResults.map(x => (selectId(x._1), x._2._1, x._2._2))
                                  
    val resultsDf = spark.createDataFrame(results).toDF("id", "score", "strength")
    
    val joinedDf = resultsDf.join(movieDf, movieDf("movieId") === resultsDf("id")).select("title", "genres", "score", "strength")
    
    val mGen = movieDf.filter($"movieId"===mid).select($"genres").collect().mkString.replaceAll("^.|.$", "").split('|').toList
    
    import org.apache.spark.sql.functions._
    
    // udf to calculate genres similarity genScore
    val func = udf((col: String) => col.split('|').toList.intersect(mGen).length)
    val df = joinedDf.withColumn("genScore", func(joinedDf("genres"))).sort(desc("genScore"), desc("score"), desc("strength"))
   
    df.select("title", "genScore", "score", "strength")
}

In [16]:
// Find top 30 movies similar to Star Wars (1977) with strength > 70
val result = similarMoviesGen(260)
result.filter($"strength" > 70).show(30, false)

+------------------------------------------------------------------------------+--------+------------------+--------+
|title                                                                         |genScore|score             |strength|
+------------------------------------------------------------------------------+--------+------------------+--------+
|Star Wars: Episode V - The Empire Strikes Back (1980)                         |3       |0.9896402714789055|203     |
|Star Wars: Episode VI - Return of the Jedi (1983)                             |3       |0.9891531995341264|187     |
|Aliens (1986)                                                                 |3       |0.9693337395902628|100     |
|Total Recall (1990)                                                           |3       |0.965841780998697 |85      |
|Spider-Man (2002)                                                             |3       |0.9650922091952354|96      |
|Jurassic Park (1993)                                   

In [17]:
// List all Mission Impossible movies
movieDf.filter($"title".contains("Mission: Impossible")).show(false)

+-------+-------------------------------------------+---------------------------------+
|movieId|title                                      |genres                           |
+-------+-------------------------------------------+---------------------------------+
|648    |Mission: Impossible (1996)                 |Action|Adventure|Mystery|Thriller|
|3623   |Mission: Impossible II (2000)              |Action|Adventure|Thriller        |
|45186  |Mission: Impossible III (2006)             |Action|Adventure|Thriller        |
|91630  |Mission: Impossible - Ghost Protocol (2011)|Action|Adventure|Thriller|IMAX   |
|111781 |Mission: Impossible - Rogue Nation (2015)  |Action|Adventure|Thriller        |
+-------+-------------------------------------------+---------------------------------+



In [18]:
// Find top 30 movies similar to Mission: Impossible (1996) with strength > 40
val result = similarMoviesGen(648)
result.filter($"strength" > 40).show(30, false)

+-------------------------------------------------------------+--------+------------------+--------+
|title                                                        |genScore|score             |strength|
+-------------------------------------------------------------+--------+------------------+--------+
|Minority Report (2002)                                       |3       |0.9763691247739893|54      |
|Rock, The (1996)                                             |3       |0.975463863491967 |96      |
|Bourne Identity, The (2002)                                  |3       |0.9754433707092234|53      |
|Cliffhanger (1993)                                           |3       |0.9741719915829986|47      |
|Hunt for Red October, The (1990)                             |3       |0.9738590945584866|43      |
|Die Hard 2 (1990)                                            |3       |0.9692995145421154|45      |
|Total Recall (1990)                                          |3       |0.9678625938467216|