In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType
import sys

In [2]:
def computePearsonCorrelation(spark, data):
    
    # Compute numerator, denominator and numPairs columns
    calculateRatingsMeans = data \
                        .select(func.avg(func.col("rating1")).alias("avg_rating1"), \
                        func.avg(func.col("rating2")).alias("avg_rating2"))  
    
    calculateRatingsMeans = calculateRatingsMeans.take(1)
    avg_rating1 = calculateRatingsMeans[0].avg_rating1
    avg_rating2 = calculateRatingsMeans[0].avg_rating2
    
    print("avg_rating1....")
    print(calculateRatingsMeans)

    pairScores = data \
      .withColumn("rating1-rating1_avg", func.col("rating1")-avg_rating1) \
      .withColumn("rating2-rating2_avg", func.col("rating2")-avg_rating2)
        
    print(pairScores)
    
    pairScores = pairScores \
      .withColumn("(rating1-rating1_avg)2", func.col("rating1-rating1_avg")*func.col("rating1-rating1_avg")) \
      .withColumn("(rating2-rating2_avg)2", func.col("rating2-rating2_avg")*func.col("rating2-rating2_avg"))
    
    calculatePearsonCorrelation = pairScores.groupBy("movie1", "movie2").\
                                agg(func.sum(func.col("rating1-rating1_avg")*func.col("rating2-rating2_avg")).alias("numerator"),\
                                    func.sqrt(func.sum(func.col("(rating1-rating1_avg)2"))*func.sum(func.col("(rating2-rating2_avg)2"))).alias("denominator"),\
                                    func.count(func.col("rating1-rating1_avg")).alias("numPairs")
                                )
        
    result = calculatePearsonCorrelation \
            .withColumn("score", \
            func.when(func.col("denominator") != 0, func.col("numerator") / func.col("denominator")) \
              .otherwise(0) \
          ).select("movie1", "movie2", "score", "numPairs")


    return result

In [3]:
# Get movie name by given movie id 
def getMovieName(movieNames, movieId):
    result = movieNames.filter(func.col("movieID") == movieId) \
        .select("movieTitle").collect()[0]

    return result[0]



In [4]:
spark = SparkSession.builder.appName("MovieSimilarities").master("local[*]").getOrCreate()

movieNamesSchema = StructType([ \
                               StructField("movieID", IntegerType(), True), \
                               StructField("movieTitle", StringType(), True) \
                               ])
    
moviesSchema = StructType([ \
                     StructField("userID", IntegerType(), True), \
                     StructField("movieID", IntegerType(), True), \
                     StructField("rating", IntegerType(), True), \
                     StructField("timestamp", LongType(), True)])
    

In [5]:
# Create a broadcast dataset of movieID and movieTitle.
# Apply ISO-885901 charset
movieNames = spark.read \
      .option("sep", "|") \
      .option("charset", "ISO-8859-1") \
      .schema(movieNamesSchema) \
      .csv("./ml-100k/u.item")

# Load up movie data as dataset
movies = spark.read \
      .option("sep", "\t") \
      .schema(moviesSchema) \
      .csv("./ml-100k/u.data")


ratings = movies.select("userId", "movieId", "rating")


In [6]:
# Emit every movie rated together by the same user.
# Self-join to find every combination.
# Select movie pairs and rating pairs
moviePairs = ratings.alias("ratings1") \
      .join(ratings.alias("ratings2"), (func.col("ratings1.userId") == func.col("ratings2.userId")) \
            & (func.col("ratings1.movieId") < func.col("ratings2.movieId"))) \
      .select(func.col("ratings1.movieId").alias("movie1"), \
        func.col("ratings2.movieId").alias("movie2"), \
        func.col("ratings1.rating").alias("rating1"), \
        func.col("ratings2.rating").alias("rating2"))
moviePairSimilarities = computePearsonCorrelation(spark, moviePairs).cache()

avg_rating1....
[Row(avg_rating1=3.5387608221996207, avg_rating2=3.328259773784263)]
DataFrame[movie1: int, movie2: int, rating1: int, rating2: int, rating1-rating1_avg: double, rating2-rating2_avg: double]


In [7]:
moviePairSimilarities.show(10)

+------+------+--------------------+--------+
|movie1|movie2|               score|numPairs|
+------+------+--------------------+--------+
|    51|   924| 0.48510926322433373|      15|
|   451|   529|-0.25775258245291877|      30|
|    86|   318| 0.25969042145386984|      95|
|    40|   167| 0.21993755394170952|      23|
|   274|  1211| 0.47952217480856885|       7|
|  1042|  1067|  0.9843770697791376|       2|
|   118|   946| 0.18713050345791857|      40|
|   234|   461| 0.25696289013597445|      54|
|    88|   523| 0.16214675211898022|      74|
|   796|  1036|  0.6667465315739322|       8|
+------+------+--------------------+--------+
only showing top 10 rows



In [17]:
scoreThreshold = 0.7
coOccurrenceThreshold = 50.0

movieID = 51

# Filter for movies with this sim that are "good" as defined by
# our quality thresholds above
filteredResults = moviePairSimilarities.filter( \
    ((func.col("movie1") == movieID) | (func.col("movie2") == movieID)) & \
      (func.col("score") > scoreThreshold))

# Sort by quality score.
results = filteredResults.sort(func.col("score").desc()).take(200)

print ("Top 10 similar movies for " + getMovieName(movieNames, movieID))

for result in results:
    # Display the similarity result that isn't the movie we're looking at
    similarMovieID = result.movie1
    if (similarMovieID == movieID):
      similarMovieID = result.movie2

    print(getMovieName(movieNames, similarMovieID) + "\tscore: " \
          + str(result.score) + "\tstrength: " + str(result.numPairs))



Top 10 similar movies for Legends of the Fall (1994)
Shadows (Cienie) (1988)	score: 1.0000000000000002	strength: 1
Hugo Pool (1997)	score: 1.0000000000000002	strength: 1
Total Eclipse (1995)	score: 1.0000000000000002	strength: 1
Pyromaniac's Love Story, A (1995)	score: 1.0000000000000002	strength: 1
Liebelei (1933)	score: 1.0000000000000002	strength: 1
Duoluo tianshi (1995)	score: 1.0000000000000002	strength: 1
Careful (1992)	score: 1.0000000000000002	strength: 1
It Takes Two (1995)	score: 1.0000000000000002	strength: 1
Last Time I Committed Suicide, The (1997)	score: 1.0000000000000002	strength: 1
Two Deaths (1995)	score: 1.0000000000000002	strength: 1
Kim (1950)	score: 1.0000000000000002	strength: 1
Hunted, The (1995)	score: 1.0000000000000002	strength: 1
Hear My Song (1991)	score: 1.0000000000000002	strength: 1
Invitation, The (Zaproszenie) (1986)	score: 1.0000000000000002	strength: 1
Yankee Zulu (1994)	score: 1.0000000000000002	strength: 1
Journey of August King, The (1995)	score: 

In the Line of Duty 2 (1987)	score: 1.0	strength: 1
Prisoner of the Mountains (Kavkazsky Plennik) (1996)	score: 1.0	strength: 1
Fille seule, La (A Single Girl) (1995)	score: 1.0	strength: 1
Party Girl (1995)	score: 1.0	strength: 2
Rhyme & Reason (1997)	score: 1.0	strength: 1
All Over Me (1997)	score: 1.0	strength: 1
Mr. Jones (1993)	score: 1.0	strength: 1
Girls Town (1996)	score: 1.0	strength: 1
Heavy (1995)	score: 1.0	strength: 2
You So Crazy (1994)	score: 1.0	strength: 1
Flower of My Secret, The (Flor de mi secreto, La) (1995)	score: 1.0	strength: 1
Lamerica (1994)	score: 1.0	strength: 1
Deceiver (1997)	score: 1.0	strength: 1
Thieves (Voleurs, Les) (1996)	score: 1.0	strength: 1
House of Yes, The (1997)	score: 1.0	strength: 1
Nénette et Boni (1996)	score: 1.0	strength: 1
Eighth Day, The (1996)	score: 1.0	strength: 1
Switchblade Sisters (1975)	score: 1.0	strength: 1
Albino Alligator (1996)	score: 1.0	strength: 1
Nico Icon (1995)	score: 1.0	strength: 1
Metisse (Café au Lait) (1993)	scor