In [3]:
! pip install pyspark

Defaulting to user installation because normal site-packages is not writeable


In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_set, collect_list
from pyspark.sql import functions
from pyspark.sql.functions import col, lit, when, expr, desc, size
import pandas as pd

In [None]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("MovieTwin") \
    .master("spark://cs103:58270") \
    .getOrCreate()

# spark = SparkSession.builder.appName("MovieTwin").config("spark.executor.instances", "10").config("spark.executor.memory", "4g").config("spark.executor.cores", "2").getOrCreate()

In [6]:
sc = spark.sparkContext
sc

## Data Loading and Pre-processing

In [7]:
# Load only the movieId
%time movies = spark.read.csv("ml-latest/movies-large.csv", header=True, inferSchema=True, schema="movieId INT")

# Load only the userId and movieId columns
# By loading in only the relevant data, we reduce loading time to 1/20!
%time ratings = spark.read.csv("ml-latest/ratings-large.csv", header=True, inferSchema=True, schema="userId INT, movieId INT")

CPU times: user 976 µs, sys: 1.53 ms, total: 2.5 ms
Wall time: 1.41 s
CPU times: user 499 µs, sys: 553 µs, total: 1.05 ms
Wall time: 36.5 ms


In [8]:
# set variables from documentation - avoid some unnecessary data loading
# the number have been verified by loading the actual datasets

# num_movies = 9742
# num_users = 610

num_movies = 86537
num_users = 330975

In [None]:
distinct_movie_ids = sorted(set(row.movieId for row in movies.collect()))
movie_id_index_map = {movie_id: index for index, movie_id in enumerate(distinct_movie_ids)}


**Consideration**: User IDs are not consecutive, neither are movie IDs. For our purposes, we do not care about actual movie IDs, as long as they are aligned so we can compute user similarity. However, we do care about actual user IDs in the final step. For now, we can pretend the user are indexed by consecutive integers, and later (when we get the top 100 pairs) convert those numerals back to actual user IDs using a dictionary.

**Loading Optimization**: Using the pivot_table function takes forever, likely due to the varying length of the list of movies. Anticipating a much larger dataset, I wrote a custom function to minimize loading time. It first creates an empty table of size num_users x num_movies, then compute the list of movies watched by each user, and map each movieId to a movieIndex, and finally filling the cell `[userIndex, movieIndex]`with 1.   

In [10]:
from pyspark.ml.linalg import Vectors

def load_data(ratings, movies):
    # create a movieId to movieIndex (0-indexed) mapping
    distinct_movie_ids = sorted(set(row.movieId for row in movies.collect()))
    movie_id_index_map = {movie_id: index for index, movie_id in enumerate(distinct_movie_ids)}

    # create a dataframe representing the list of movies watched by each user
    user_movies = group_ratings_by_user(ratings)
    user_movies_df = spark.createDataFrame([(user_id, movies) for user_id, movies in user_movies.items()], ['userId', 'movies'])
    user_movies_df.show(5)

    # Convert user-movie mapping to sparse vectors
    user_movie_sparse_vectors = user_movies_df.rdd.map(lambda row: (row.userId, sparse_vector_from_movies(row.movies, movie_id_index_map)))

    # Convert RDD to DataFrame
    user_movie_sparse_vectors = user_movie_sparse_vectors.toDF(['userId', 'movieVector'])

    return user_movie_sparse_vectors

def sparse_vector_from_movies(movies, movie_id_index_map):
    indices = sorted(movie_id_index_map[movieId] for movieId in movies)
    values = [1] * len(indices)  # Assuming all entries are 1
    return Vectors.sparse(num_movies, indices, values)


In [11]:
def group_ratings_by_user(ratings):
    user_movies_df = ratings.groupBy('userId').agg(collect_list('movieId').alias('movies')).filter(size('movies') >= 20)
    user_movies = {row.userId: row.movies for row in user_movies_df.collect()}
    return user_movies

In [12]:
user_movies = ratings.repartitionByRange(col("userId")).groupBy('userId').agg(collect_list('movieId').alias('movies')).filter(size('movies') >= 20)
user_movies.show()



+------+--------------------+
|userId|              movies|
+------+--------------------+
|     1|[1, 110, 158, 260...|
|     2|[1, 2, 6, 10, 11,...|
|     3|[296, 318, 858, 2...|
|     4|[260, 318, 356, 5...|
|     5|[47, 175, 257, 31...|
|     6|[24, 47, 60, 110,...|
|     7|[1, 3, 11, 21, 25...|
|     9|[2, 3, 5, 6, 10, ...|
|    10|[1, 32, 47, 260, ...|
|    11|[260, 318, 356, 5...|
|    12|[1, 6, 9, 12, 25,...|
|    13|[838, 858, 1025, ...|
|    14|[1, 2, 10, 17, 19...|
|    15|[16, 50, 223, 260...|
|    17|[104, 527, 1101, ...|
|    21|[1, 2, 10, 32, 39...|
|    22|[16, 18, 32, 47, ...|
|    24|[1, 9, 11, 14, 15...|
|    26|[50, 111, 296, 31...|
|    27|[111, 318, 356, 9...|
+------+--------------------+
only showing top 20 rows



                                                                                

In [13]:
%time loaded_data = load_data(ratings, movies)

24/05/15 16:48:09 WARN TaskSetManager: Stage 7 contains a task of very large size (1107 KiB). The maximum recommended task size is 1000 KiB.
24/05/15 16:48:11 WARN TaskSetManager: Stage 8 contains a task of very large size (1107 KiB). The maximum recommended task size is 1000 KiB.


+------+--------------------+
|userId|              movies|
+------+--------------------+
|   148|[1, 2, 6, 7, 10, ...|
|   463|[1, 111, 223, 260...|
|   496|[6, 10, 17, 21, 2...|
|   833|[47, 110, 260, 26...|
|  1088|[1, 2, 6, 10, 21,...|
+------+--------------------+
only showing top 5 rows



                                                                                

CPU times: user 7.61 s, sys: 790 ms, total: 8.4 s
Wall time: 21 s


In [14]:
num_partitions = loaded_data.rdd.getNumPartitions()
print("Number of partitions:", num_partitions)

Number of partitions: 80


## MinHash

In [15]:
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
from pyspark.ml.feature import MinHashLSH

In [16]:
loaded_repartitioned = loaded_data.repartitionByRange(10, "userId")

In [17]:
loaded_data.show(10)

24/05/15 16:48:12 WARN TaskSetManager: Stage 9 contains a task of very large size (1107 KiB). The maximum recommended task size is 1000 KiB.
[Stage 9:>                                                          (0 + 1) / 1]

+------+--------------------+
|userId|         movieVector|
+------+--------------------+
|   148|(86537,[0,1,5,6,9...|
|   463|(86537,[0,109,220...|
|   496|(86537,[5,9,16,20...|
|   833|(86537,[46,108,25...|
|  1088|(86537,[0,1,5,9,2...|
|  1238|(86537,[5,9,15,16...|
|  1342|(86537,[31,46,228...|
|  1645|(86537,[0,16,27,4...|
|  1829|(86537,[4,30,33,4...|
|  1959|(86537,[0,1,10,47...|
+------+--------------------+
only showing top 10 rows



                                                                                

In [18]:
first_row = loaded_data.select('movieVector').head(1)
first_row

24/05/15 16:48:17 WARN TaskSetManager: Stage 10 contains a task of very large size (1107 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

[Row(movieVector=SparseVector(86537, {0: 1.0, 1: 1.0, 5: 1.0, 6: 1.0, 9: 1.0, 20: 1.0, 24: 1.0, 31: 1.0, 33: 1.0, 46: 1.0, 49: 1.0, 69: 1.0, 93: 1.0, 108: 1.0, 124: 1.0, 148: 1.0, 149: 1.0, 158: 1.0, 159: 1.0, 162: 1.0, 170: 1.0, 188: 1.0, 196: 1.0, 243: 1.0, 257: 1.0, 289: 1.0, 292: 1.0, 313: 1.0, 314: 1.0, 324: 1.0, 343: 1.0, 344: 1.0, 345: 1.0, 348: 1.0, 351: 1.0, 352: 1.0, 356: 1.0, 359: 1.0, 362: 1.0, 368: 1.0, 372: 1.0, 374: 1.0, 377: 1.0, 378: 1.0, 421: 1.0, 429: 1.0, 435: 1.0, 437: 1.0, 449: 1.0, 452: 1.0, 459: 1.0, 469: 1.0, 470: 1.0, 475: 1.0, 486: 1.0, 504: 1.0, 506: 1.0, 512: 1.0, 522: 1.0, 528: 1.0, 534: 1.0, 536: 1.0, 579: 1.0, 580: 1.0, 581: 1.0, 582: 1.0, 584: 1.0, 585: 1.0, 586: 1.0, 588: 1.0, 589: 1.0, 600: 1.0, 637: 1.0, 642: 1.0, 662: 1.0, 705: 1.0, 718: 1.0, 721: 1.0, 722: 1.0, 734: 1.0, 764: 1.0, 784: 1.0, 789: 1.0, 814: 1.0, 840: 1.0, 876: 1.0, 878: 1.0, 879: 1.0, 881: 1.0, 883: 1.0, 884: 1.0, 887: 1.0, 890: 1.0, 891: 1.0, 892: 1.0, 893: 1.0, 895: 1.0, 898: 1.0, 

In [19]:
minhash_lsh = MinHashLSH(inputCol="movieVector", outputCol="hashes", numHashTables=5)

# Fit the model to the data
model = minhash_lsh.fit(loaded_data)

24/05/15 16:48:21 WARN TaskSetManager: Stage 11 contains a task of very large size (1107 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [20]:
print("Model summary:")
print(model)

Model summary:
MinHashLSHModel: uid=MinHashLSH_15c21929d621, numHashTables=5


In [21]:
replica = loaded_data

In [22]:
threshold = 0.3

# Perform the approximate similarity join
similar_pairs = model.approxSimilarityJoin(loaded_data, replica, threshold, distCol="JaccardDistance")\
                    .filter(col("datasetA.userId") < col("datasetB.userId"))\
                    .select(
                        col("datasetA.userId").alias("idA"),
                        col("datasetB.userId").alias("idB"),
                        col("JaccardDistance")
                    )

In [23]:
similar_pairs

DataFrame[idA: bigint, idB: bigint, JaccardDistance: double]

In [33]:
sorted_pairs = similar_pairs.orderBy(col("JaccardDistance"))
top_100_pairs = sorted_pairs.limit(100)

# Show the top 100 pairs
top_100_pairs.show(100)

24/05/15 13:54:11 WARN TaskSetManager: Stage 23 contains a task of very large size (1107 KiB). The maximum recommended task size is 1000 KiB.
24/05/15 13:54:14 WARN TaskSetManager: Stage 24 contains a task of very large size (1107 KiB). The maximum recommended task size is 1000 KiB.

+------+------+---------------+
|   idA|   idB|JaccardDistance|
+------+------+---------------+
| 85653| 95175|            0.0|
|305904|314085|            0.0|
| 51160| 62784|            0.0|
| 57540|282312|            0.0|
| 77600|132822|            0.0|
| 64872|293880|            0.0|
| 82321|236540|            0.0|
|158283|329248|            0.0|
| 82321|277862|            0.0|
| 19061|241234|            0.0|
| 97685|134981|            0.0|
| 22437| 23527|            0.0|
|234310|325068|            0.0|
|  3799| 97685|            0.0|
| 12125| 47942|            0.0|
| 50132| 87155|            0.0|
|183207|222150|            0.0|
|122621|235844|            0.0|
| 77318|227884|            0.0|
| 83065|183123|            0.0|
| 93572|177183|            0.0|
|  3799|162798|            0.0|
| 14186|253377|            0.0|
| 51160|313610|            0.0|
| 93572|119474|            0.0|
| 22437|313463|            0.0|
|  3799|227884|            0.0|
|238288|265718|            0.0|
| 59869|

                                                                                

In [34]:
# To avoid glitches, I saved the top 100 results manually into a .csv file. Then I read that file back into a spark data frame.

In [31]:
# result_path_csv = "new_top_100_pairs.csv"

# # Write the DataFrame to a CSV file
# top_100_pairs.write.csv(result_path_csv, header=True)

# # result_path_parquet = "top_100_pairs.parquet"

# # # Write the DataFrame to a Parquet file
# # top_100_pairs.write.parquet(result_path_parquet)



In [24]:
results = spark.read.csv("results_50.csv", header=True, inferSchema=True)

                                                                                

In [25]:
results.show()

+------+------+---------------+
|   idA|   idB|JaccardDistance|
+------+------+---------------+
| 85653| 95175|            0.0|
|305904|314085|            0.0|
| 51160| 62784|            0.0|
| 57540|282312|            0.0|
| 77600|132822|            0.0|
| 64872|293880|            0.0|
| 82321|236540|            0.0|
|158283|329248|            0.0|
| 82321|277862|            0.0|
| 19061|241234|            0.0|
| 97685|134981|            0.0|
| 22437| 23527|            0.0|
|234310|325068|            0.0|
|  3799| 97685|            0.0|
| 12125| 47942|            0.0|
| 50132| 87155|            0.0|
|183207|222150|            0.0|
|122621|235844|            0.0|
| 77318|227884|            0.0|
| 83065|183123|            0.0|
+------+------+---------------+
only showing top 20 rows



In [26]:
loaded_data.show()

24/05/15 16:48:55 WARN TaskSetManager: Stage 15 contains a task of very large size (1107 KiB). The maximum recommended task size is 1000 KiB.
[Stage 15:>                                                         (0 + 1) / 1]

+------+--------------------+
|userId|         movieVector|
+------+--------------------+
|   148|(86537,[0,1,5,6,9...|
|   463|(86537,[0,109,220...|
|   496|(86537,[5,9,16,20...|
|   833|(86537,[46,108,25...|
|  1088|(86537,[0,1,5,9,2...|
|  1238|(86537,[5,9,15,16...|
|  1342|(86537,[31,46,228...|
|  1645|(86537,[0,16,27,4...|
|  1829|(86537,[4,30,33,4...|
|  1959|(86537,[0,1,10,47...|
|  2122|(86537,[46,59,578...|
|  2142|(86537,[1,5,9,10,...|
|  2366|(86537,[9,46,108,...|
|  2659|(86537,[102,214,2...|
|  3175|(86537,[0,5,8,9,1...|
|  3749|(86537,[0,1,2,9,1...|
|  3918|(86537,[9,16,27,2...|
|  4101|(86537,[0,257,475...|
|  4900|(86537,[18,20,46,...|
|  4935|(86537,[163,1013,...|
+------+--------------------+
only showing top 20 rows



                                                                                

In [27]:
def jaccard_similarity(userA, userB):
    df = loaded_data 
    # Fetch movie vectors for the given user IDs
    vectorA = df.filter(df.userId == userA).select("movieVector").rdd.map(lambda x: x[0]).collect()[0]
    vectorB = df.filter(df.userId == userB).select("movieVector").rdd.map(lambda x: x[0]).collect()[0]
    
    # Convert arrays to sets
    setA = set(vectorA.indices)
    setB = set(vectorB.indices)
    
    # Calculate intersection and union of non-zero elements
    intersection = setA.intersection(setB)
    union = setA.union(setB)
    
    # Calculate Jaccard similarity
    similarity = len(intersection) / len(union)
    return similarity


In [29]:
sim = jaccard_similarity(85653, 95175)
sim

24/05/15 16:49:09 WARN TaskSetManager: Stage 16 contains a task of very large size (1107 KiB). The maximum recommended task size is 1000 KiB.
24/05/15 16:49:16 WARN TaskSetManager: Stage 17 contains a task of very large size (1107 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

1.0

In [73]:
df = loaded_data
vectorA = df.filter(df.userId == 85653).select("movieVector").rdd.map(lambda x: x[0]).collect()[0]
vectorB = df.filter(df.userId == 95175).select("movieVector").rdd.map(lambda x: x[0]).collect()[0]

24/05/15 17:24:01 WARN TaskSetManager: Stage 516 contains a task of very large size (1107 KiB). The maximum recommended task size is 1000 KiB.
24/05/15 17:24:02 WARN TaskSetManager: Stage 517 contains a task of very large size (1107 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [74]:
vectorA

SparseVector(86537, {9: 1.0, 148: 1.0, 151: 1.0, 159: 1.0, 163: 1.0, 183: 1.0, 206: 1.0, 228: 1.0, 250: 1.0, 288: 1.0, 292: 1.0, 312: 1.0, 314: 1.0, 324: 1.0, 334: 1.0, 339: 1.0, 344: 1.0, 375: 1.0, 429: 1.0, 452: 1.0, 580: 1.0, 582: 1.0, 584: 1.0, 585: 1.0, 587: 1.0})

In [75]:
vectorB

SparseVector(86537, {9: 1.0, 148: 1.0, 151: 1.0, 159: 1.0, 163: 1.0, 183: 1.0, 206: 1.0, 228: 1.0, 250: 1.0, 288: 1.0, 292: 1.0, 312: 1.0, 314: 1.0, 324: 1.0, 334: 1.0, 339: 1.0, 344: 1.0, 375: 1.0, 429: 1.0, 452: 1.0, 580: 1.0, 582: 1.0, 584: 1.0, 585: 1.0, 587: 1.0})

In [53]:
from itertools import combinations
import random

# Get all userIds from loaded_data
all_userIds = loaded_data.select("userId").distinct()

# Count the number of distinct userIds
num_userIds = all_userIds.count()

# Randomly select 100 userIds
userIds = all_userIds.orderBy(col("userId"))

user_ids = [row['userId'] for row in df.collect()]

i = 0
random_pairs = []
while i < 100:
    user_id_1 = user_ids[random.randint(0, num_userIds)]
    user_id_2 = user_ids[random.randint(0, num_userIds)]
    if user_id_1 != user_id_2:
        i += 1
        random_pairs.append((user_id_1, user_id_2))

random_pairs[:10]

24/05/15 17:02:31 WARN TaskSetManager: Stage 107 contains a task of very large size (1107 KiB). The maximum recommended task size is 1000 KiB.
24/05/15 17:02:33 WARN TaskSetManager: Stage 110 contains a task of very large size (1107 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

[(131695, 4592),
 (310427, 217769),
 (209383, 184581),
 (326980, 82753),
 (276509, 282499),
 (322988, 206986),
 (102018, 170246),
 (120095, 196785),
 (64421, 148543),
 (130900, 314043)]

In [None]:
random_pair_results = []
for user_id_1, user_id_2 in random_pairs:
    similarity = jaccard_similarity(user_id_1, user_id_2)
    random_pair_results.append((user_id_1, user_id_2, similarity))

In [62]:
random_pair_results[:10]

[(131695, 4592, 0.023923444976076555),
 (310427, 217769, 0.005291005291005291),
 (209383, 184581, 0.11700680272108843),
 (326980, 82753, 0.022727272727272728),
 (276509, 282499, 0.038461538461538464),
 (322988, 206986, 0.0),
 (102018, 170246, 0.05555555555555555),
 (120095, 196785, 0.04416403785488959),
 (64421, 148543, 0.10666666666666667),
 (130900, 314043, 0.031317754757889664)]

In [61]:
import csv

filename = "random_pairs_similarity.csv"

# Write data to CSV file
with open(filename, 'w', newline='') as csvfile:
    # Define column names
    fieldnames = ['user_id_1', 'user_id_2', 'jaccard_similarity']
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

    # Write the header
    writer.writeheader()

    # Write each row of data
    for row in random_pair_results:
        writer.writerow({'user_id_1': row[0], 'user_id_2': row[1], 'jaccard_similarity': row[2]})

print(f"CSV file '{filename}' has been created successfully.")

CSV file 'random_pairs_similarity.csv' has been created successfully.


In [67]:
# now let's calculate jaccard similarity for the chosen twin pairs
twin_pairs = [(row['idA'], row['idB']) for row in results.collect()]

# Print the list of tuples
twin_pairs[:10]

[(85653, 95175),
 (305904, 314085),
 (51160, 62784),
 (57540, 282312),
 (77600, 132822),
 (64872, 293880),
 (82321, 236540),
 (158283, 329248),
 (82321, 277862),
 (19061, 241234)]

In [None]:
twin_pair_results = []
for user_id_1, user_id_2 in twin_pairs:
    similarity = jaccard_similarity(user_id_1, user_id_2)
    twin_pair_results.append((user_id_1, user_id_2, similarity))

In [71]:
twin_pair_results[:10]

[(85653, 95175, 1.0),
 (305904, 314085, 1.0),
 (51160, 62784, 1.0),
 (57540, 282312, 1.0),
 (77600, 132822, 1.0),
 (64872, 293880, 1.0),
 (82321, 236540, 1.0),
 (158283, 329248, 1.0),
 (82321, 277862, 1.0),
 (19061, 241234, 1.0)]

In [72]:
filename = "twin_pairs_similarity.csv"

# Write data to CSV file
with open(filename, 'w', newline='') as csvfile:
    # Define column names
    fieldnames = ['user_id_1', 'user_id_2', 'jaccard_similarity']
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

    # Write the header
    writer.writeheader()

    # Write each row of data
    for row in twin_pair_results:
        writer.writerow({'user_id_1': row[0], 'user_id_2': row[1], 'jaccard_similarity': row[2]})

print(f"CSV file '{filename}' has been created successfully.")

CSV file 'twin_pairs_similarity.csv' has been created successfully.
