In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java
!java -version
!pip install pyspark

In [80]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("MovielensDataSet").getOrCreate()
# read csv's
moviesDF = spark.read.option("header","true").csv("movies.csv")
ratingsDF = spark.read.option("header","true").csv("ratings.csv")

# make rating value double
# make movieid and userid value integer 
ratingsDF = ratingsDF.withColumn("rating", ratingsDF["rating"].cast("double")) \
                     .withColumn("movieId", ratingsDF["movieId"].cast("integer")) \
                     .withColumn("userId", ratingsDF["userId"].cast("integer"))

# join tables needed
joinedDF = ratingsDF.join(moviesDF,"movieId","inner")

joinedDF.show(5)

+-------+------+------+---------+--------------------+--------------------+
|movieId|userId|rating|timestamp|               title|              genres|
+-------+------+------+---------+--------------------+--------------------+
|      1|     1|   4.0|964982703|    Toy Story (1995)|Adventure|Animati...|
|      3|     1|   4.0|964981247|Grumpier Old Men ...|      Comedy|Romance|
|      6|     1|   4.0|964982224|         Heat (1995)|Action|Crime|Thri...|
|     47|     1|   5.0|964983815|Seven (a.k.a. Se7...|    Mystery|Thriller|
|     50|     1|   5.0|964982931|Usual Suspects, T...|Crime|Mystery|Thr...|
+-------+------+------+---------+--------------------+--------------------+
only showing top 5 rows



In [81]:
# calculate the average rating for each movie and count rating for each movie
joinedDF = joinedDF.groupBy("title").agg( avg("rating").alias("avg_rating"), count("rating").alias("number_of_rating"))

# eliminate movies with counted rating under 50.
filteredDF = joinedDF.filter(joinedDF['number_of_rating'] > 50).orderBy("number_of_rating",ascending= False)

filteredDF.show()

+--------------------+------------------+----------------+
|               title|        avg_rating|number_of_rating|
+--------------------+------------------+----------------+
| Forrest Gump (1994)| 4.164133738601824|             329|
|Shawshank Redempt...| 4.429022082018927|             317|
| Pulp Fiction (1994)| 4.197068403908795|             307|
|Silence of the La...| 4.161290322580645|             279|
|  Matrix, The (1999)| 4.192446043165468|             278|
|Star Wars: Episod...| 4.231075697211155|             251|
|Jurassic Park (1993)|              3.75|             238|
|   Braveheart (1995)| 4.031645569620253|             237|
|Terminator 2: Jud...| 3.970982142857143|             224|
|Schindler's List ...|             4.225|             220|
|   Fight Club (1999)| 4.272935779816514|             218|
|    Toy Story (1995)|3.9209302325581397|             215|
|Star Wars: Episod...|4.2156398104265405|             211|
|Usual Suspects, T...| 4.237745098039215|             20

In [82]:
joinedDF = ratingsDF.join(moviesDF,"movieId","inner")

# join new table with the first df, inner join to keep only choosed movies with over 50 ratings
joinedDF = joinedDF.join(filteredDF,"title","inner")

joinedDF.show(5)

+--------------------+-------+------+------+---------+--------------------+------------------+----------------+
|               title|movieId|userId|rating|timestamp|              genres|        avg_rating|number_of_rating|
+--------------------+-------+------+------+---------+--------------------+------------------+----------------+
|       Psycho (1960)|   1219|     1|   2.0|964983393|        Crime|Horror| 4.036144578313253|              83|
|Men in Black (a.k...|   1580|     1|   3.0|964981125|Action|Comedy|Sci-Fi| 3.487878787878788|             165|
|       Psycho (1960)|   1219|     4|   4.0|964539961|        Crime|Horror| 4.036144578313253|              83|
|Men in Black (a.k...|   1580|     4|   3.0|986935244|Action|Comedy|Sci-Fi| 3.487878787878788|             165|
|O Brother, Where ...|   4027|     4|   3.0|986849201|Adventure|Comedy|...|3.8085106382978724|              94|
+--------------------+-------+------+------+---------+--------------------+------------------+----------

In [83]:
#  infos
distinct_titles = joinedDF.select("title").distinct().count()
print("Number of movie titles with more than 50 ratings: ", distinct_titles)

distinct_users = joinedDF.select("userId").distinct().count()
print("Number of distinct users: ", distinct_users)

distinct_ratings = joinedDF.select("rating").distinct().count()
print("Number of distinct ratings: ", distinct_ratings)

Number of movie titles with more than 50 ratings:  437
Number of distinct users:  606
Number of distinct ratings:  10


In [84]:
#  make pivot table

userRatingsDF = joinedDF.groupBy('userId').pivot('title').sum('rating').na.fill(0)

# Rename columns to remove special characters and spaces
import pandas as pd

# Spark DataFrame'i Pandas DataFrame'e dönüştür
pandas_df = userRatingsDF.toPandas()

# Kolon isimlerini değiştir
pandas_df.columns = pandas_df.columns.str.replace(" ", "_") \
                                     .str.replace(".", "") \
                                     .str.replace("-", "_") \
                                     .str.replace(":", "") \
                                     .str.replace("'", "") \
                                     .str.replace(",", "")

# Pandas DataFrame'i Spark DataFrame'e dönüştür
userRatingsDF = spark.createDataFrame(pandas_df)

userRatingsDF.show(20)

  .str.replace(".", "") \


+------+---------------------------------+-------------------+---------------------------+--------------------+----------+-----------------------------+---------------------------------+----------------+--------------------------------+------------------------------------+---------------------------+--------------------+----------------+--------------+------------+-------------+--------------------+--------------+--------------------------------------------------+----------------------+-------------------------+-------------------+-----------------------------+----------------------+-------------------------------------------+-------------------+-----------------+---------------------+----------------+--------------------+-----------------+-----------------------+-------------------------+----------------------------------+-------------------------------------------------+-------------------------------------------+-------------+-------------------+-----------+-------------------------

In [85]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors

# Normalizer kullanarak vektörleri normalize ediyoruz
normalizer = Normalizer(inputCol="features", outputCol="normalized_feautures")
vec = VectorAssembler(inputCols=userRatingsDF.columns[1:], outputCol="features")
df = vec.transform(userRatingsDF)
df = normalizer.transform(df).drop(*df.columns[1:-1])

df.show(10)

+------+--------------------+--------------------+
|userId|            features|normalized_feautures|
+------+--------------------+--------------------+
|   463|(437,[57,69,86,10...|(437,[57,69,86,10...|
|   148|(437,[36,37,152,1...|(437,[36,37,152,1...|
|   496|(437,[79,114,170,...|(437,[79,114,170,...|
|   471|(437,[18,54,95,15...|(437,[18,54,95,15...|
|   243|(437,[10,13,42,43...|(437,[10,13,42,43...|
|   392|(437,[7,19,21,35,...|(437,[7,19,21,35,...|
|   540|(437,[20,42,62,64...|(437,[20,42,62,64...|
|   516|(437,[101,128,148...|(437,[101,128,148...|
|    31|(437,[8,13,14,15,...|(437,[8,13,14,15,...|
|   137|(437,[1,2,13,14,1...|(437,[1,2,13,14,1...|
+------+--------------------+--------------------+
only showing top 10 rows



In [89]:
# crossjoin users
users_df = df.select("userId", "normalized_feautures")
users_df = users_df.alias("u1").crossJoin(users_df.alias("u2"))

users_df_joined = users_df.select(col("u1.userId").alias("userId1"),
                                         col("u2.userId").alias("userId2"),
                                         col("u1.normalized_feautures").alias("features1"),
                                         col("u2.normalized_feautures").alias("features2"))

users_df_joined.show()

+-------+-------+--------------------+--------------------+
|userId1|userId2|           features1|           features2|
+-------+-------+--------------------+--------------------+
|    463|    463|(437,[57,69,86,10...|(437,[57,69,86,10...|
|    463|    148|(437,[57,69,86,10...|(437,[36,37,152,1...|
|    463|    496|(437,[57,69,86,10...|(437,[79,114,170,...|
|    463|    471|(437,[57,69,86,10...|(437,[18,54,95,15...|
|    463|    243|(437,[57,69,86,10...|(437,[10,13,42,43...|
|    463|    392|(437,[57,69,86,10...|(437,[7,19,21,35,...|
|    463|    540|(437,[57,69,86,10...|(437,[20,42,62,64...|
|    463|    516|(437,[57,69,86,10...|(437,[101,128,148...|
|    463|     31|(437,[57,69,86,10...|(437,[8,13,14,15,...|
|    463|    137|(437,[57,69,86,10...|(437,[1,2,13,14,1...|
|    463|    580|(437,[57,69,86,10...|(437,[1,2,3,4,6,7...|
|    463|    451|(437,[57,69,86,10...|(437,[56,73,139,1...|
|    463|     85|(437,[57,69,86,10...|(437,[106,161,175...|
|    463|    251|(437,[57,69,86,10...|(4

In [90]:
from pyspark.sql.types import DoubleType

# cosine similarity function
def cosine_similarity(v1, v2):
    return float(v1.dot(v2)/(v1.norm(2)*v2.norm(2)))

# UDF
cosine_similarity_udf = udf(cosine_similarity, DoubleType())

# cosine similarity for each line
result = users_df_joined.select(col("userId1"), col("userId2"), cosine_similarity_udf(col("features1"), col("features2")).alias("cosine_similarity"))

result.show()

+-------+-------+-------------------+
|userId1|userId2|  cosine_similarity|
+-------+-------+-------------------+
|    463|    463| 1.0000000000000002|
|    463|    148|0.18656977379625883|
|    463|    496|0.24471702495439443|
|    463|    471|0.22648235142108256|
|    463|    243|0.12240756379705775|
|    463|    392|0.09333769687663385|
|    463|    540|0.17909956224765455|
|    463|    516|                0.0|
|    463|     31|0.03251806623278645|
|    463|    137|0.28467419499722163|
|    463|    580| 0.2253727906295172|
|    463|    451|0.03323217479568267|
|    463|     85|0.11949725712131366|
|    463|    251| 0.2189345808305877|
|    463|    458| 0.1429632711333721|
|    463|     65|0.17252528243073947|
|    463|    255|                0.0|
|    463|    588|0.17525929895316236|
|    463|    481|                0.0|
|    463|    133|0.22857175139031538|
+-------+-------+-------------------+
only showing top 20 rows



In [91]:
# remove lines with userId1 == userId2
result = result.filter(col("userId1") != col("userId2"))

# get similar users for each user
top_10 = result.select(col("userId1"),struct(col("userId2")).alias("user_sim"))\
                .groupBy("userId1").agg(collect_set("user_sim").alias("similar_users"))

# get most similar 10 user
top_10 = top_10.withColumn("similar_users", slice(col("similar_users"), 1, 10)).orderBy("userId1")

# results
top_10.show(20,truncate=False)


+-------+--------------------------------------------------------------------+
|userId1|similar_users                                                       |
+-------+--------------------------------------------------------------------+
|1      |[{191}, {353}, {486}, {63}, {538}, {299}, {18}, {236}, {135}, {267}]|
|2      |[{191}, {353}, {486}, {63}, {538}, {299}, {18}, {236}, {135}, {267}]|
|3      |[{191}, {353}, {486}, {63}, {538}, {299}, {18}, {236}, {135}, {267}]|
|4      |[{191}, {353}, {486}, {63}, {538}, {299}, {18}, {236}, {135}, {267}]|
|5      |[{191}, {353}, {486}, {63}, {538}, {299}, {18}, {236}, {135}, {267}]|
|6      |[{191}, {353}, {486}, {63}, {538}, {299}, {18}, {236}, {135}, {267}]|
|7      |[{191}, {353}, {486}, {63}, {538}, {299}, {18}, {236}, {135}, {267}]|
|8      |[{191}, {353}, {486}, {63}, {538}, {299}, {18}, {236}, {135}, {267}]|
|9      |[{191}, {353}, {486}, {63}, {538}, {299}, {18}, {236}, {135}, {267}]|
|10     |[{191}, {353}, {486}, {63}, {538}, {299}, {