# install pyspark/java/findspark

In [1]:
# install java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [2]:
# install findspark using pip
!pip install -q findspark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=03635bb99fbc15b0302c95fda8e930b26f8a63ebff41e4213c365085e15e4ed8
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
import findspark
findspark.init()

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType
import sys

In [6]:
def getMovieName(movieNames, movieId):
    result = movieNames.filter(func.col("movieID") == movieId).select("movieTitle").collect()[0]
    return result[0]

In [7]:
def computeCosineSimilarity(data):
    pairScores = data \
      .withColumn("xx", func.col("rating1") * func.col("rating1")) \
      .withColumn("yy", func.col("rating2") * func.col("rating2")) \
      .withColumn("xy", func.col("rating1") * func.col("rating2"))

    calculateSimilarity = pairScores \
      .groupBy("movie1", "movie2") \
      .agg( \
        func.sum(func.col("xy")).alias("numerator"), \
        (func.sqrt(func.sum(func.col("xx"))) * func.sqrt(func.sum(func.col("yy")))).alias("denominator"), \
        func.count(func.col("xy")).alias("numPairs")
      )

    result = calculateSimilarity \
      .withColumn("score", \
        func.when(func.col("denominator") != 0, func.col("numerator") / func.col("denominator")) \
          .otherwise(0) \
      ).select("movie1", "movie2", "score", "numPairs")

    return result

In [8]:
spark = SparkSession.builder.appName("MovieSimilarities").getOrCreate()

In [9]:
movieNamesSchema = StructType([StructField("movieID", IntegerType(), True), StructField("movieTitle", StringType(), True)])

moviesSchema = StructType([ \
                     StructField("userID", IntegerType(), True), \
                     StructField("movieID", IntegerType(), True), \
                     StructField("rating", IntegerType(), True), \
                     StructField("timestamp", LongType(), True)])

import data from google drive

In [10]:
movieNames = spark.read \
      .option("sep", "|") \
      .option("charset", "ISO-8859-1") \
      .schema(movieNamesSchema) \
      .csv("/content/drive/MyDrive/Colab Notebooks/spark-course/ml-100k/u.item")

movies = spark.read \
      .option("sep", "\t") \
      .schema(moviesSchema) \
      .csv("/content/drive/MyDrive/Colab Notebooks/spark-course/ml-100k/u.data")

In [11]:
ratings = movies.select("userId", "movieId", "rating")

In [12]:
moviePairs = ratings.alias("ratings1") \
      .join(ratings.alias("ratings2"), (func.col("ratings1.userId") == func.col("ratings2.userId")) \
            & (func.col("ratings1.movieId") < func.col("ratings2.movieId"))) \
      .select(func.col("ratings1.movieId").alias("movie1"), \
        func.col("ratings2.movieId").alias("movie2"), \
        func.col("ratings1.rating").alias("rating1"), \
        func.col("ratings2.rating").alias("rating2"))

In [13]:
moviePairSimilarities = computeCosineSimilarity(moviePairs).cache()

In [14]:
scoreThreshold = 0.97
coOccurrenceThreshold = 50.0

movieID = 56

filteredResults = moviePairSimilarities.filter( \
    ((func.col("movie1") == movieID) | (func.col("movie2") == movieID)) & \
      (func.col("score") > scoreThreshold) & (func.col("numPairs") > coOccurrenceThreshold))

results = filteredResults.sort(func.col("score").desc()).take(10)

print ("Top 10 similar movies for " + getMovieName(movieNames, movieID))

for result in results:
    similarMovieID = result.movie1
    if (similarMovieID == movieID):
          similarMovieID = result.movie2

    print(getMovieName(movieNames, similarMovieID) + "\tscore: " \
          + str(result.score) + "\tstrength: " + str(result.numPairs))

Top 10 similar movies for Pulp Fiction (1994)
Smoke (1995)	score: 0.9743848338030823	strength: 68
Reservoir Dogs (1992)	score: 0.9740674165782123	strength: 134
Donnie Brasco (1997)	score: 0.9738247291149608	strength: 75
Sling Blade (1996)	score: 0.9713796344244161	strength: 111
True Romance (1993)	score: 0.9707295689679896	strength: 99
Jackie Brown (1997)	score: 0.9706179145690377	strength: 55
Carlito's Way (1993)	score: 0.9706021261759088	strength: 52


In [15]:
spark.stop()