In [0]:
from pyspark.sql import SparkSession,functions as func

In [0]:
from pyspark.sql.types import StructType,StructField,StringType,FloatType,LongType,IntegerType,DoubleType

In [0]:
spark=SparkSession.builder.appName("Cosine Similarity").getOrCreate()

In [0]:
schema1=StructType([StructField("movieId",IntegerType(),True),StructField("title",StringType(),True),StructField("genres",StringType(),True)])

In [0]:
movies=spark.read.schema(schema1).option("header","true").csv("dbfs:/FileStore/tables/movies.csv")
movies.show(5)

In [0]:
movies.printSchema()

In [0]:
schema2=StructType([StructField("userId",IntegerType(),True),StructField("movieId",IntegerType(),True),StructField("rating",DoubleType(),True),StructField("timestamp",StringType(),True)])

In [0]:
ratings=spark.read.schema(schema2).option("header","true").csv("dbfs:/FileStore/tables/ratings.csv")
ratings.show(5)

In [0]:
ratings.printSchema()

In [0]:
rating=ratings.select("userId","movieId","rating")
rating.show(5)

In [0]:
moviePairs=rating.alias("ratings1").join(rating.alias("ratings2"),\
           (func.col("ratings1.userId")==func.col("ratings2.userId")) & (func.col("ratings1.movieId")<func.col("ratings2.movieId"))) \
           .select(func.col("ratings1.movieId").alias("movie1"), \
           func.col("ratings2.movieId").alias("movie2"), \
           func.col("ratings1.rating").alias("rating1"), \
           func.col("ratings2.rating").alias("rating2"))

In [0]:
moviePairs.show(10)

In [0]:
def computeCosineSimilarity(spark,df):
  pairScores=df.withColumn("xx",func.col("rating1")*func.col("rating1")) \
             .withColumn("yy",func.col("rating2")*func.col("rating2")) \
             .withColumn("xy",func.col("rating1")*func.col("rating2"))
  
  calculateSimilarity=pairScores.groupBy("movie1","movie2").agg(func.sum(func.col("xy")).alias("numerator"), \
                      (func.sqrt(func.sum(func.col("xx"))) * func.sqrt(func.sum(func.col("yy")))).alias("denominator"), \
                      func.count(func.col("xy")).alias("numPairs"))
  
  result=calculateSimilarity.withColumn("score",func.when(func.col("denominator")!=0,func.col("numerator")/func.col("denominator")).otherwise(0)) \
        .select("movie1","movie2","score","numPairs")
  return result

In [0]:
moviePairsSimilarity=computeCosineSimilarity(spark,moviePairs).cache()

In [0]:
moviePairsSimilarity.show(10)

In [0]:
scoreThreshold=0.95
coOccurrenceThreshold=50.0
movie_id=926

In [0]:
filtered_results=moviePairsSimilarity.filter(((func.col("movie1")==movie_id) | (func.col("movie2")==movie_id)) & \
                 (func.col("score") > scoreThreshold) & (func.col("numPairs")>coOccurrenceThreshold))

In [0]:
filtered_results.show(10)

In [0]:
filtered_results.sort("score",ascending=False).show(10)

In [0]:
result=filtered_results.sort(func.col("score").desc()).take(10)

In [0]:
def getMovieName(movieNames,movie_id):
    result = movieNames.filter(func.col("movieId") == movie_id).select("title").collect()[0]
    return result[0]

In [0]:
print ("Top 10 similar movies for " + getMovieName(movies,movie_id))
for r in result:
  similarMovieID = r.movie1
  if(similarMovieID == movie_id):
    similarMovieID = r.movie2
  print(getMovieName(movies,similarMovieID) + "\tscore: " + str(r.score) + "\tstrength: " + str(r.numPairs))