In [1]:
import findspark
findspark.init()
import pyspark
import pyspark.sql.functions as F

In [2]:
spark = pyspark.sql.SparkSession.builder.appName('MovieRecommender').getOrCreate()

In [3]:
movies = spark.read.csv('data/ml100/movies.csv', header=True)
movies = movies.drop('genres')
ratings = spark.read.csv('data/ml100/ratings.csv', header=True, inferSchema=True)
ratings = ratings.drop('timestamp')

In [4]:
movies.show(5)
ratings.show(5)

+-------+--------------------+
|movieId|               title|
+-------+--------------------+
|      1|    Toy Story (1995)|
|      2|      Jumanji (1995)|
|      3|Grumpier Old Men ...|
|      4|Waiting to Exhale...|
|      5|Father of the Bri...|
+-------+--------------------+
only showing top 5 rows

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|     31|   2.5|
|     1|   1029|   3.0|
|     1|   1061|   3.0|
|     1|   1129|   2.0|
|     1|   1172|   4.0|
+------+-------+------+
only showing top 5 rows



In [5]:
joinedRatings = ratings.join(ratings
                             .withColumnRenamed('movieId', '_movieId')
                             .withColumnRenamed('rating', '_rating')
                             , 'userId')
joinedRatings.show(5)

+------+-------+------+--------+-------+
|userId|movieId|rating|_movieId|_rating|
+------+-------+------+--------+-------+
|     1|     31|   2.5|    3671|    3.0|
|     1|     31|   2.5|    2968|    1.0|
|     1|     31|   2.5|    2455|    2.5|
|     1|     31|   2.5|    2294|    2.0|
|     1|     31|   2.5|    2193|    2.0|
+------+-------+------+--------+-------+
only showing top 5 rows



In [6]:
joinedRatings = joinedRatings.filter('movieId < _movieId')
joinedRatings.show(5)

+------+-------+------+--------+-------+
|userId|movieId|rating|_movieId|_rating|
+------+-------+------+--------+-------+
|     1|     31|   2.5|    3671|    3.0|
|     1|     31|   2.5|    2968|    1.0|
|     1|     31|   2.5|    2455|    2.5|
|     1|     31|   2.5|    2294|    2.0|
|     1|     31|   2.5|    2193|    2.0|
+------+-------+------+--------+-------+
only showing top 5 rows



In [7]:
def moviePairId(movieId, _movieId):
    return ''.join([str(movieId),'-',str(_movieId)])
udfMoviePairId = F.udf(moviePairId)

In [8]:
joinedRatings = joinedRatings.withColumn('moviePairId', udfMoviePairId('movieId', '_movieId'))
joinedRatings.show(5)

+------+-------+------+--------+-------+-----------+
|userId|movieId|rating|_movieId|_rating|moviePairId|
+------+-------+------+--------+-------+-----------+
|     1|     31|   2.5|    3671|    3.0|    31-3671|
|     1|     31|   2.5|    2968|    1.0|    31-2968|
|     1|     31|   2.5|    2455|    2.5|    31-2455|
|     1|     31|   2.5|    2294|    2.0|    31-2294|
|     1|     31|   2.5|    2193|    2.0|    31-2193|
+------+-------+------+--------+-------+-----------+
only showing top 5 rows



In [9]:
moviePairSimilarities = joinedRatings.groupBy('moviePairId').agg((F.sum(joinedRatings.rating * joinedRatings._rating)/
                                                                  (F.sqrt(F.sum(joinedRatings.rating**2))*
                                                                   F.sqrt(F.sum(joinedRatings._rating**2)))).alias('score'),
                                                                F.count(joinedRatings.rating).alias('numPairs'))
moviePairSimilarities.cache()

DataFrame[moviePairId: string, score: double, numPairs: bigint]

In [10]:
moviePairSimilarities.show(5)

+-----------+------------------+--------+
|moviePairId|             score|numPairs|
+-----------+------------------+--------+
|  1172-1293| 0.958507806456253|      17|
|  1405-2193|0.9617497019191603|      12|
|     10-273|0.9685067726005836|      16|
|     50-110|0.9517990774006699|     100|
|     62-186|0.9441628304892208|      16|
+-----------+------------------+--------+
only showing top 5 rows



In [11]:
def getMovie1(moviePairId):
    return moviePairId.split('-')[0]
def getMovie2(moviePairId):
    return moviePairId.split('-')[1]
udfGetMovie1 = F.udf(getMovie1)
udfGetMovie2 = F.udf(getMovie2)

In [12]:
moviePairScores = (moviePairSimilarities.withColumn('movieId', udfGetMovie1(moviePairSimilarities.moviePairId))
                                        .withColumn('_movieId', udfGetMovie2(moviePairSimilarities.moviePairId))
                                        .drop('moviePairId'))

In [13]:
def weightedScore(score, numPairs):
    return score*numPairs
udfWeightedScore = F.udf(weightedScore)

In [14]:
moviePairScores = moviePairScores.withColumn('weightedScore', udfWeightedScore(moviePairScores.score, moviePairScores.numPairs))
moviePairScores.cache()
moviePairScores.show(5)

+------------------+--------+-------+--------+------------------+
|             score|numPairs|movieId|_movieId|     weightedScore|
+------------------+--------+-------+--------+------------------+
| 0.958507806456253|      17|   1172|    1293|  16.2946327097563|
|0.9617497019191603|      12|   1405|    2193|11.540996423029924|
|0.9685067726005836|      16|     10|     273|15.496108361609338|
|0.9517990774006699|     100|     50|     110|   95.179907740067|
|0.9441628304892208|      16|     62|     186|15.106605287827533|
+------------------+--------+-------+--------+------------------+
only showing top 5 rows



In [15]:
targetMovieId = '1'
targetMovieTitle = movies.where(movies.movieId == targetMovieId).take(1)[0]['title']
print('Top 10 recommendations for movies similar to {0}'.format(targetMovieTitle))

(moviePairScores.where(moviePairScores.movieId == targetMovieId)
                .orderBy('weightedScore', ascending=False)
                .join(movies, moviePairScores._movieId == movies.movieId)
                .select('title', 'weightedScore', 'score', 'numPairs')
                .show(10, truncate=False))

Top 10 recommendations for movies similar to Toy Story (1995)
+-----------------------------------------------------+-----------------+------------------+--------+
|title                                                |weightedScore    |score             |numPairs|
+-----------------------------------------------------+-----------------+------------------+--------+
|Toy Story 2 (1999)                                   |99.69683618780475|0.9870973879980668|101     |
|Lord of the Rings: The Return of the King, The (2003)|99.01370257559776|0.9612980832582307|103     |
|American Beauty (1999)                               |98.47238856397547|0.9654155741566223|102     |
|Godfather, The (1972)                                |98.42559454806472|0.9649568092947522|102     |
|Usual Suspects, The (1995)                           |97.37221104067706|0.9640812974324461|101     |
|Fight Club (1999)                                    |96.81383960353611|0.9585528673617437|101     |
|Willy Wonka & the C

In [16]:
spark.stop()