# DSCI 632 - Applied Cloud Computing - Final Project

## Team Members
- Aman Ostwal (ago34@drexel.edu)
- Darshit Rai (dr3264@drexel.edu)
- Sanskruti Chavanke (sc4323@drexel.edu)

## Configuration:

In [1]:
# Install findspark and pyspark
!pip install -q findspark pyspark

# Import required libraries
import os

## Import Data to PySpark


In [2]:
# Import required libraries
from pyspark import SparkContext
from pyspark.sql import SparkSession

# Create a SparkContext (entry point to Spark)
sc = SparkContext.getOrCreate()

# Create a SparkSession with necessary configurations
spark = SparkSession.builder.getOrCreate()

In [3]:
# Define file paths for data import
ratings_file_path = "/content/ratings.csv"
tags_file_path = "/content/tags.csv"
movies_file_path = "/content/movies.csv"

# Read ratings data
ratings = spark.read.csv(ratings_file_path, header=True, inferSchema=True)
# Show ratings data
ratings.show(truncate = False)

# Read tags data
tags = spark.read.csv(tags_file_path, header=True, inferSchema=True)
# Show tags data
tags.show(truncate = False)

# Read movies data
movies = spark.read.csv(movies_file_path, header=True, inferSchema=True)
# Show movies data
movies.show(truncate = False)

+------+-------+------+----------+
|userId|movieId|rating|timestamp |
+------+-------+------+----------+
|1     |2      |3.5   |1112486027|
|1     |29     |3.5   |1112484676|
|1     |32     |3.5   |1112484819|
|1     |47     |3.5   |1112484727|
|1     |50     |3.5   |1112484580|
|1     |112    |3.5   |1094785740|
|1     |151    |4.0   |1094785734|
|1     |223    |4.0   |1112485573|
|1     |253    |4.0   |1112484940|
|1     |260    |4.0   |1112484826|
|1     |293    |4.0   |1112484703|
|1     |296    |4.0   |1112484767|
|1     |318    |4.0   |1112484798|
|1     |337    |3.5   |1094785709|
|1     |367    |3.5   |1112485980|
|1     |541    |4.0   |1112484603|
|1     |589    |3.5   |1112485557|
|1     |593    |3.5   |1112484661|
|1     |653    |3.0   |1094785691|
|1     |919    |3.5   |1094785621|
+------+-------+------+----------+
only showing top 20 rows

+------+-------+-----------------+----------+
|userId|movieId|tag              |timestamp |
+------+-------+-----------------+--------

In [4]:
# Import necessary functions
from pyspark.sql.functions import collect_list, col, array_distinct, lit, size

# Merge Spark DataFrames
tags_and_movies = movies.join(tags, "movieId")
# Show the merged DataFrame
tags_and_movies.show(truncate = False)

+-------+------------------------------------------------+------------------------------------+------+-----------------+----------+
|movieId|title                                           |genres                              |userId|tag              |timestamp |
+-------+------------------------------------------------+------------------------------------+------+-----------------+----------+
|4141   |Head Over Heels (2001)                          |Comedy|Romance                      |18    |Mark Waters      |1240597180|
|208    |Waterworld (1995)                               |Action|Adventure|Sci-Fi             |65    |dark hero        |1368150078|
|353    |Crow, The (1994)                                |Action|Crime|Fantasy|Thriller       |65    |dark hero        |1368150079|
|521    |Romeo Is Bleeding (1993)                        |Crime|Thriller                      |65    |noir thriller    |1368149983|
|592    |Batman (1989)                                   |Action|Crime|Thril

In [5]:
# Import necessary functions
from pyspark.sql.functions import collect_list, col, array_distinct, size

# Group by movieId, title, and genres, and aggregate tag values into a list
df_tag_list = tags_and_movies.groupby("movieId", "title", "genres") \
    .agg(array_distinct(collect_list(col("tag"))).alias("tag_list"))

# Display the DataFrame where the size of tag_list is greater than 5
# df_tag_list.where(size(df_tag_list.tag_list) > 5).orderBy(size(df_tag_list.tag_list)).count() #135

In [6]:
# Import necessary functions and types
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StructType, StructField, IntegerType, ArrayType, StringType, FloatType

def getSimilarMoviesByJacard(movie_in_id, df_with_tags):
    # Filter tags DataFrame for the selected movie
    movie_in_tags_row = df_with_tags.filter(col("movieId") == movie_in_id).select("title", "tag_list").collect()

    # Define schema for the DataFrame to store Jacard similarities
    schema = StructType([
        StructField('movieId', IntegerType(), True),
        StructField('tag_list', ArrayType(StringType()), True),
        StructField('jacSim', FloatType(), True)
    ])

    # Create an empty DataFrame with the defined schema
    all_jacards = spark.createDataFrame([], schema=schema)

    # Check if tags are found for the selected movie
    if movie_in_tags_row and len(movie_in_tags_row) > 0 and len(movie_in_tags_row[0].tag_list) > 0:
        movie_in_tag_list = movie_in_tags_row[0].tag_list

        # Select relevant columns from DataFrame
        movies_tags_list = df_with_tags.select("movieId", "tag_list")

        # Define Jacard similarity function
        jac = udf(lambda x: len(set(x).intersection(set(movie_in_tag_list))) / len(set(x).union(set(movie_in_tag_list))), FloatType())

        # Compute Jacard similarity for all movies
        movies_tags_list_jac = movies_tags_list.withColumn('jacSim', jac(col("tag_list")))

        # Filter movies with non-zero Jacard similarity
        all_jacards = movies_tags_list_jac.filter(col("jacSim") > 0.0)

        # Join with movies DataFrame to get movie titles
        all_jacards = all_jacards.alias("a").join(movies.alias("b"), col('a.movieId') == col('b.movieId')).select('a.*', "b.title")

        # Sort by Jacard similarity in descending order
        all_jacards = all_jacards.sort("jacSim", ascending=False)

    return all_jacards

# Test the function with an example
test = getSimilarMoviesByJacard(541, df_tag_list)
test.show(truncate = False)

+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [7]:
# Import necessary functions and types
from pyspark.sql.types import StructType, StructField, ArrayType, IntegerType, FloatType, StringType

# Define a function to get movies watched by a user
def get_movies_by_user(userId):
    # Filter ratings DataFrame for the given userId and select distinct movieId-userId pairs
    movies_by_user = ratings.filter(col("userId") == userId).select(['movieId','userId']).distinct().collect()

    # Extract movieIds from the collected DataFrame
    movies_vector_user = [row.movieId for row in movies_by_user]

    return movies_vector_user

# Define a function to get all Jacard similarities for movies watched by a user
def get_all_jacards(movieId_list, df_movie):
    # Define schema for the DataFrame to store Jacard similarities
    schema = StructType([
        StructField('movieId', IntegerType(), True),
        StructField('tag_list', ArrayType(StringType()), True),
        StructField('jacSim', FloatType(), True)
    ])

    # Create an empty DataFrame with the defined schema
    all_jacards = spark.createDataFrame([], schema=schema)

    # Iterate over the list of movieIds watched by the user
    for movieId in movieId_list:
        # Get Jacard similarities for the current movie
        newDf = getSimilarMoviesByJacard(movieId, df_movie)

        # Union the new DataFrame with the existing DataFrame of all Jacard similarities
        all_jacards = all_jacards.union(newDf.select("movieId", "tag_list", "jacSim"))  # Select only necessary columns for union

    # Filter out movies that the user has already watched
    ajc = all_jacards.filter(~all_jacards.movieId.isin(movieId_list))

    # Join with movies DataFrame to get movie titles
    finalRecs = ajc.alias("a").join(movies.alias("b"), col('a.movieId') == col('b.movieId')).select('a.*', "b.title")

    # Sort by Jacard similarity in descending order
    finalRecs = finalRecs.sort("jacSim", ascending=False)

    return finalRecs

# Get movies watched by a user with userId 406
mvu = get_movies_by_user(406)
print(mvu)

# Get all Jacard similarities for movies watched by the user
allJacFrame = get_all_jacards(mvu, df_tag_list)
allJacFrame.show(truncate = False)

[376, 553, 360, 165, 337, 32, 586, 371, 339, 315, 158, 229, 319, 208, 380, 442, 480, 329, 170, 196, 247, 348, 172, 466, 514, 365, 533, 183, 475, 543, 539, 252, 469, 232, 203, 367, 341, 588, 272, 150, 500, 497, 410, 535, 542, 39, 296, 300, 541, 47, 610, 485, 224, 318, 161, 452, 521, 736, 218, 435, 256, 548, 276, 508, 178, 434, 265, 544, 160, 587, 316, 529, 491, 516, 357, 338, 292, 253, 450, 227, 422, 490, 527, 471, 356, 235, 474, 353, 593, 509, 597, 282, 345, 361, 432]
+-------+-----------------------------------+---------+---------------------------------------------------------+
|movieId|tag_list                           |jacSim   |title                                                    |
+-------+-----------------------------------+---------+---------------------------------------------------------+
|101350 |[Seen 2014]                        |1.0      |Miss Farkku-Suomi (2012)                                 |
|105554 |[Seen 2014]                        |1.0      |Fanatics (Kulman

In [8]:
# Group ratings DataFrame by userId and count the number of ratings for each user
df_ratings_count = ratings.groupBy("userId").count()

# Sort the DataFrame by the count of ratings in ascending order
df_ratings_count = df_ratings_count.sort("count", ascending=True)

# Show the first 30 rows of the sorted DataFrame
df_ratings_count.show(30, truncate=False)

+------+-----+
|userId|count|
+------+-----+
|27633 |20   |
|15658 |20   |
|30751 |20   |
|21298 |20   |
|33265 |20   |
|243   |20   |
|2027  |20   |
|34488 |20   |
|4092  |20   |
|19758 |20   |
|6866  |20   |
|1303  |20   |
|17438 |20   |
|20120 |20   |
|20717 |20   |
|28871 |20   |
|31309 |20   |
|11317 |20   |
|33012 |20   |
|9169  |20   |
|1766  |20   |
|18759 |20   |
|6010  |20   |
|23123 |20   |
|6376  |20   |
|34450 |20   |
|15216 |20   |
|15740 |20   |
|15827 |20   |
|17677 |20   |
+------+-----+
only showing top 30 rows



In [9]:
# Import necessary function for splitting the data
from sklearn.model_selection import train_test_split

def evaluate_users_ratings(userId):
    # Split the movies watched by the user
    umv = get_movies_by_user(userId)

    # Split the user's movies into train and test sets
    x_train, x_test = train_test_split(umv, test_size=0.5, random_state=7)

    # Get recommendations for the first split
    jac_frame = get_all_jacards(x_train, df_tag_list)
    jac_frame = jac_frame.sort("jacSim", ascending=False)

    # Count total movies rated with tags
    total_movies_rated_with_tags = jac_frame.count()

    # Filter recommended movies that are in the test set
    movies_in_test = jac_frame.filter(jac_frame.movieId.isin(x_test))
    movies_in_test = movies_in_test.withColumn("userId", lit(userId))

    # Join recommended movies with ratings to get ratings for recommended movies in the test set
    movies_recommended_rating = movies_in_test.join(ratings, (movies_in_test.movieId == ratings.movieId) & (movies_in_test.userId == ratings.userId))

    # Count total movies recommended that are in the test set
    movies_test_count = movies_in_test.count()

    # Print evaluation results
    print("Total Rated with tags: " + str(total_movies_rated_with_tags))
    print("Total movies recommended that are in test: " + str(movies_test_count))

    # Show recommended movies with ratings and Jacard similarities DataFrame
    movies_recommended_rating.show(truncate=False)
    jac_frame.show(truncate = False)

# Evaluate user with userId 1
evaluate_users_ratings(1)

Total Rated with tags: 337946
Total movies recommended that are in test: 5408
+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------