In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, functions as func
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

In [2]:
spark = SparkSession.builder.appName("MovieRecommendation").getOrCreate()

24/12/22 19:59:03 WARN Utils: Your hostname, Ngas-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.115 instead (on interface en0)
24/12/22 19:59:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/22 19:59:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
rating_schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("movie_id", StringType(), True),
    StructField("rating", FloatType(), True),
    StructField("timestamp", IntegerType(), True),
])

movie_schema = StructType([
    StructField("movie_id", StringType(), True),
    StructField("title", StringType(), True),
    # StructField("other", StringType(), True),
])

In [None]:
# movies = spark.read.schema(movie_schema).option("header", "true").csv("../data/ml-32m/movies.csv")
# ratings = spark.read.schema(rating_schema).option("header", "true").csv("../data/ml-32m/ratings.csv")

movies = spark.read.schema(movie_schema).option("sep", "|").csv("../data/ml-100k/u.item")
ratings = spark.read.schema(rating_schema).option("sep", "\t").csv("../data/ml-100k/u.data")

In [17]:
# caching
movies.cache()
ratings.cache()

DataFrame[user_id: int, movie_id: string, rating: float, timestamp: int]

In [69]:
# ratings_self_joined = (
#     ratings.alias("ratings_1")
#     .join(ratings.alias("ratings_2"), "user_id")
#     # .where("ratings_1.movie_id != ratings_2.movie_id")
#     # filter out the same movies
#     .where(func.col("ratings_1.movie_id").alias("movie_id_1") < func.col("ratings_2.movie_id").alias("movie_id_2"))
# )
ratings_self_joined = (
    ratings.alias("ratings_1")
    .join(ratings.alias("ratings_2"),
        (func.col("ratings_1.user_id") == func.col("ratings_2.user_id"))
        # filter out the same movies
        & (func.col("ratings_1.movie_id").alias("movie_id_1") < func.col("ratings_2.movie_id").alias("movie_id_2"))
    )
)
ratings_self_joined.cache()
ratings_self_joined.show()
ratings_self_joined.count()

[Stage 105:>                                                        (0 + 1) / 1]

+-------+--------+------+---------+-------+--------+------+---------+
|user_id|movie_id|rating|timestamp|user_id|movie_id|rating|timestamp|
+-------+--------+------+---------+-------+--------+------+---------+
|    196|     242|   3.0|881250949|    196|     269|   3.0|881250949|
|    196|     242|   3.0|881250949|    196|     845|   4.0|881251954|
|    196|     242|   3.0|881250949|    196|     762|   3.0|881251955|
|    196|     242|   3.0|881250949|    196|     411|   4.0|881252090|
|    196|     242|   3.0|881250949|    196|     285|   5.0|881251753|
|    196|     242|   3.0|881250949|    196|     382|   4.0|881251843|
|    196|     242|   3.0|881250949|    196|     287|   3.0|881251884|
|    196|     242|   3.0|881250949|    196|     340|   3.0|881251045|
|    196|     242|   3.0|881250949|    196|     257|   2.0|881251577|
|    196|     242|   3.0|881250949|    196|      66|   3.0|881251911|
|    196|     242|   3.0|881250949|    196|      70|   3.0|881251842|
|    196|     242|  

                                                                                

10050406

### [How to Calculate Cosine Similarity in Python?](https://www.geeksforgeeks.org/how-to-calculate-cosine-similarity-in-python/)

#### Theory

`Similarity = (A.B) / (||A||.||B||) `

where A and B are vectors:

- A.B is dot product of A and B: It is computed as sum of element-wise product of A and B.
- ||A|| is L2 norm of A: It is computed as square root of the sum of squares of elements of the vector A.

#### Example

```python
# import required libraries
import numpy as np
from numpy.linalg import norm

# define two lists or array
A = np.array([2,1,2,3,2,9])
B = np.array([3,4,2,4,5,5])

print("A:", A)
print("B:", B)

# compute cosine similarity
cosine = np.dot(A,B)/(norm(A)*norm(B))
print("Cosine Similarity:", cosine)

```

In [70]:
processed_ratings = (
    ratings_self_joined.select(
        func.col("ratings_1.movie_id").alias("movie_id_1"),
        func.col("ratings_2.movie_id").alias("movie_id_2"),
        func.col("ratings_1.rating").alias("rating_1"),
        func.col("ratings_2.rating").alias("rating_2"),
    )
    .withColumn("x^2", func.pow(func.col("rating_1"), 2))
    .withColumn("y^2", func.pow(func.col("rating_2"), 2))
    .withColumn("xy", func.col("rating_1") * func.col("rating_2"))
    .groupBy("movie_id_1", "movie_id_2")
    .agg(
        func.sum(func.col("xy")).alias("xy"),
        func.sum(func.col("x^2")).alias("x^2"),
        func.sum(func.col("y^2")).alias("y^2"),
        func.count(func.col("xy")).alias("num_pairs")
    )
)
processed_ratings.cache()

DataFrame[movie_id_1: string, movie_id_2: string, xy: double, x^2: double, y^2: double, num_pairs: bigint]

In [71]:
calculate_similarities = processed_ratings.withColumn(
    "similarity",
    func.col("xy") / (func.sqrt(func.col("x^2")) * func.sqrt(func.col("y^2")))
)
calculate_similarities.cache()

DataFrame[movie_id_1: string, movie_id_2: string, xy: double, x^2: double, y^2: double, num_pairs: bigint, similarity: double]

In [72]:
calculate_similarities.show()

[Stage 110:>                                                        (0 + 1) / 1]

+----------+----------+------+------+------+---------+------------------+
|movie_id_1|movie_id_2|    xy|   x^2|   y^2|num_pairs|        similarity|
+----------+----------+------+------+------+---------+------------------+
|       302|       332|1168.0|1534.0| 993.0|       84| 0.946358176403924|
|      1014|       537| 154.0| 139.0| 181.0|       12|0.9708989541678686|
|      1014|       341|  25.0|  25.0|  25.0|        1|               1.0|
|       222|       930| 654.0| 939.0| 520.0|       62|0.9359298106532363|
|       222|       584| 593.0| 613.0| 653.0|       46|0.9372767961332811|
|       274|       289| 677.0| 755.0| 735.0|       64|0.9088067066930118|
|       274|       576| 360.0| 441.0| 327.0|       32|0.9480021661874246|
|      1042|       393| 205.0| 202.0| 235.0|       20| 0.940901682646041|
|      1184|        89|  78.0|  59.0| 133.0|        8|0.8805272005370923|
|      1184|       317|  13.0|   6.0|  45.0|        3|  0.79115480528524|
|      1184|       161|  63.0|  51.0| 

                                                                                

In [None]:
movie_id = 50
score_threshold = 0.97
co_occurrence_threshold = 50

results = (
    calculate_similarities
    .filter(
        ((func.col("movie_id_1") == movie_id) | (func.col("movie_id_2") == movie_id))
        & (func.col("similarity") > score_threshold)
        & (func.col("num_pairs") > co_occurrence_threshold)
    ).join(movies, func.col("movie_id_1") == movies.movie_id)
    .orderBy(func.col("similarity").desc())
    .limit(50)
)
results.show()

+----------+----------+------+------+------+---------+------------------+--------+--------------------+
|movie_id_1|movie_id_2|    xy|   x^2|   y^2|num_pairs|        similarity|movie_id|               title|
+----------+----------+------+------+------+---------+------------------+--------+--------------------+
|       172|        50|6638.0|6400.0|7031.0|      345|0.9895522078385338|     172|Empire Strikes Ba...|
|       181|        50|8794.0|8164.0|9749.0|      480|0.9857230861253026|     181|Return of the Jed...|
|       174|        50|7312.0|7318.0|7580.0|      380| 0.981760098872619|     174|Raiders of the Lo...|
|       141|        50|1119.0| 854.0|1530.0|       68|0.9789385605497993|     141|20,000 Leagues Un...|
|       178|        50|2133.0|2109.0|2257.0|      109|0.9776576120448436|     178| 12 Angry Men (1957)|
|       408|        50|1850.0|1912.0|1873.0|       92|0.9775948291054827|     408|Close Shave, A (1...|
|       498|        50|2594.0|2484.0|2841.0|      138|0.97646922

In [None]:
results.select("movie_id_1", "similarity", "title").show()

+----------+------------------+--------------------+
|movie_id_1|        similarity|               title|
+----------+------------------+--------------------+
|       172|0.9895522078385338|Empire Strikes Ba...|
|       181|0.9857230861253026|Return of the Jed...|
|       174| 0.981760098872619|Raiders of the Lo...|
|       141|0.9789385605497993|20,000 Leagues Un...|
|       178|0.9776576120448436| 12 Angry Men (1957)|
|       408|0.9775948291054827|Close Shave, A (1...|
|       498|0.9764692222674887|African Queen, Th...|
|       194|0.9751512937740359|   Sting, The (1973)|
|       169|0.9748681355460885|Wrong Trousers, T...|
|       114|0.9741816128302572|Wallace & Gromit:...|
|       210|0.9735394829992481|Indiana Jones and...|
|       480|0.9734534315266805|North by Northwes...|
|       478|0.9734294611633468|Philadelphia Stor...|
|       199|0.9727591639531913|Bridge on the Riv...|
|       483|0.9726570623726027|   Casablanca (1942)|
|       302|0.9725071588724558|L.A. Confidenti

In [None]:
# Save a spark dataframe to a single csv file
# https://stackoverflow.com/questions/33174443/how-to-save-a-spark-dataframe-as-csv-on-disk
(
    results.coalesce(1)
    .write
    .option("header","true")
    .option("sep",",")
    .mode("overwrite")
    .csv("movie_recommandation")
)

In [None]:
spark.stop()