# Movies Similarity

In [51]:
from pyspark.sql import SparkSession 
from pyspark.sql import types as data_types
from pyspark.sql.functions import col, size, split, sum, min, max, sqrt, count, when, desc

In [2]:
spark = SparkSession.builder.appName("MoviesSimilarity").getOrCreate()

## Loading Data

In [4]:
def getMovieNamesDF():
    schema = [
        data_types.StructField("movieID", data_types.IntegerType(), True),
        data_types.StructField("movieTitle", data_types.StringType(), True)
    ]
    df = spark.read.option( "sep" , "|" ).option( "charset" , "ISO-8859-1" ).schema( data_types.StructType(fields=schema)).csv("ml_100k/u.item")
    return df

In [6]:
def getMoviesDF():
    schema = [
        data_types.StructField("userID", data_types.IntegerType(), True),
        data_types.StructField("movieID", data_types.IntegerType(), True),
        data_types.StructField("rating", data_types.IntegerType(), True),
        data_types.StructField("timestamp", data_types.LongType(), True)
    ]
    df = spark.read.option( "sep" , "\t" ).schema( data_types.StructType(fields=schema)).csv("ml_100k/u.data")
    return df

In [5]:
movie_names_df = getMovieNamesDF()
movie_names_df.show(5)

+-------+-----------------+
|movieID|       movieTitle|
+-------+-----------------+
|      1| Toy Story (1995)|
|      2| GoldenEye (1995)|
|      3|Four Rooms (1995)|
|      4|Get Shorty (1995)|
|      5|   Copycat (1995)|
+-------+-----------------+
only showing top 5 rows



In [7]:
movies_df = getMoviesDF()
movies_df.show(5)

+------+-------+------+---------+
|userID|movieID|rating|timestamp|
+------+-------+------+---------+
|   196|    242|     3|881250949|
|   186|    302|     3|891717742|
|    22|    377|     1|878887116|
|   244|     51|     2|880606923|
|   166|    346|     1|886397596|
+------+-------+------+---------+
only showing top 5 rows



## Making Tranformations

In [8]:
ratings_df = movies_df.select("userID","movieID","rating")
ratings_df.show(5)

+------+-------+------+
|userID|movieID|rating|
+------+-------+------+
|   196|    242|     3|
|   186|    302|     3|
|    22|    377|     1|
|   244|     51|     2|
|   166|    346|     1|
+------+-------+------+
only showing top 5 rows



In [22]:
movie_pairs_df = ratings_df.alias("ratings1").join(
    ratings_df.alias("ratings2"), 
    (col("ratings1.userID") == col("ratings2.userID")) 
    & (col("ratings1.movieID") < col("ratings2.movieID"))
).select(
    col( "ratings1.movieID" ).alias( "movie1" ) ,
    col( "ratings2.movieID" ).alias( "movie2" ) ,
    col( "ratings1.rating" ).alias( "rating1" ) ,
    col( "ratings2.rating" ).alias( "rating2" )
)
movie_pairs_df.show(5)

+------+------+-------+-------+
|movie1|movie2|rating1|rating2|
+------+------+-------+-------+
|   242|   269|      3|      3|
|   242|   845|      3|      4|
|   242|  1022|      3|      4|
|   242|   762|      3|      3|
|   242|   411|      3|      4|
+------+------+-------+-------+
only showing top 5 rows



In [39]:
def computeCosineSimilarity(movie_pairs_df):
    pairScoresDF = movie_pairs_df.withColumn( "xx" , col( "rating1" ) * col( "rating1" ) )
    pairScoresDF = pairScoresDF.withColumn( "yy" , col( "rating2" ) * col( "rating2" ) )
    pairScoresDF = pairScoresDF.withColumn( "xy" , col( "rating1" ) * ( col( "rating2" ) ) )
    result = pairScoresDF.groupBy( 
        "movie1" , 
        "movie2" 
    ).agg(
        sum( col( "xy" )).alias( "numerator" ) ,
        (sqrt( sum( col( "xx" ) ) ) * sqrt( sum( col( "yy" ) ) )).alias( "denominator" ) ,
        count( col( "xy" ) ).alias( "numPairs" )
    )
    result = result.withColumn( 
        "score" ,
        when(
            col( "denominator" ) != 0,
            col( "numerator" ) / col( "denominator" ) 
        ).otherwise( None )
    ).select( "movie1" , "movie2" , "score" , "numPairs" )
    return result

In [40]:
moviePairsSimilarityDF = computeCosineSimilarity( movie_pairs_df ).cache()
moviePairsSimilarityDF.show(5)

+------+------+------------------+--------+
|movie1|movie2|             score|numPairs|
+------+------+------------------+--------+
|    51|   924|0.9465030160396292|      15|
|   451|   529|0.8700048504395461|      30|
|    86|   318|0.9562989269248869|      95|
|    40|   167|0.9488483124502475|      23|
|   274|  1211|0.9799118698777318|       7|
+------+------+------------------+--------+
only showing top 5 rows



## Evaluating Similars

In [41]:
scoreThreshold = 0.97
coOccurrenceThreshold = 0.5
movieID = 252

In [56]:
filteredResults = moviePairsSimilarityDF.filter(
    ((col("movie1") == movieID) | (col("movie2") == movieID))
    & ((col("score") > scoreThreshold) & (col("numPairs") > coOccurrenceThreshold))
)
filteredResults = filteredResults.sort( desc( "score" ) ).take(10)


[Row(movie1=252, movie2=1302, score=1.0000000000000002, numPairs=2), Row(movie1=252, movie2=1611, score=1.0000000000000002, numPairs=2), Row(movie1=37, movie2=252, score=1.0000000000000002, numPairs=2), Row(movie1=252, movie2=883, score=1.0000000000000002, numPairs=2), Row(movie1=252, movie2=1490, score=1.0000000000000002, numPairs=2), Row(movie1=252, movie2=1657, score=1.0, numPairs=1), Row(movie1=252, movie2=1434, score=1.0, numPairs=1), Row(movie1=252, movie2=1663, score=1.0, numPairs=1), Row(movie1=252, movie2=1328, score=1.0, numPairs=1), Row(movie1=252, movie2=1628, score=1.0, numPairs=1)]


In [59]:
for result in filteredResults:
    movieId = result["movie1"]
    if movieId == movieID:
        movieId = result["movie2"]
    name = movie_names_df.filter(col("movieId") == movieId).select("movieTitle").take(1)[0]
    print(name)

Row(movieTitle='Late Bloomers (1996)')
Row(movieTitle='Intimate Relations (1996)')
Row(movieTitle='Nadja (1994)')
Row(movieTitle='Telling Lies in America (1997)')
Row(movieTitle='Fausto (1993)')
Row(movieTitle='Target (1995)')
Row(movieTitle='Shooting Fish (1997)')
Row(movieTitle='Nothing Personal (1995)')
Row(movieTitle='Of Love and Shadows (1994)')
Row(movieTitle='Lamerica (1994)')
