In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types  as t

tests_count = 0
tests_passed_count = 0

In [2]:
# Create a SparkSession
spark = SparkSession.builder.appName('MovieLens').getOrCreate()

23/12/27 08:17:23 WARN Utils: Your hostname, Simons-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.30 instead (on interface en0)
23/12/27 08:17:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/27 08:17:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Read the movies.dat file
movies_df = spark.read.load("movies.dat", format="csv", sep="::", inferSchema=True)
# Specify column names for movies_df
movies_df = movies_df.toDF("MovieID", "Title", "Genres")

# Read the ratings.dat file
ratings_df = spark.read.load("ratings.dat", format="csv", sep="::", inferSchema=True)
# Specify column names for ratings_df
ratings_df = ratings_df.toDF("UserID", "MovieID", "Rating", "Timestamp")

# Print the DataFrames to verify successful reading
print("Movies DataFrame:")
movies_df.show()

print("\nRatings DataFrame:")
ratings_df.show()

                                                                                

Movies DataFrame:
+-------+--------------------+--------------------+
|MovieID|               Title|              Genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Animation|Childre...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|        Comedy|Drama|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|Adventure|Children's|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Animation|Children's|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|      Drama|Thri

In [4]:
# Convert the string timestamp to seconds since the epoch:
ratings_df = ratings_df.withColumn('DateTime', F.to_timestamp(ratings_df["Timestamp"].cast(dataType=t.TimestampType())))

# Drop the original string timestamp column:
ratings_df = ratings_df.drop("Timestamp")

ratings_df.show()

+------+-------+------+-------------------+
|UserID|MovieID|Rating|           DateTime|
+------+-------+------+-------------------+
|     1|   1193|     5|2000-12-31 22:12:40|
|     1|    661|     3|2000-12-31 22:35:09|
|     1|    914|     3|2000-12-31 22:32:48|
|     1|   3408|     4|2000-12-31 22:04:35|
|     1|   2355|     5|2001-01-06 23:38:11|
|     1|   1197|     3|2000-12-31 22:37:48|
|     1|   1287|     5|2000-12-31 22:33:59|
|     1|   2804|     5|2000-12-31 22:11:59|
|     1|    594|     4|2000-12-31 22:37:48|
|     1|    919|     4|2000-12-31 22:22:48|
|     1|    595|     5|2001-01-06 23:37:48|
|     1|    938|     4|2000-12-31 22:29:12|
|     1|   2398|     4|2000-12-31 22:38:01|
|     1|   2918|     4|2000-12-31 22:35:24|
|     1|   1035|     5|2000-12-31 22:29:13|
|     1|   2791|     4|2000-12-31 22:36:28|
|     1|   2687|     3|2001-01-06 23:37:48|
|     1|   2018|     4|2000-12-31 22:29:37|
|     1|   3105|     5|2000-12-31 22:28:33|
|     1|   2797|     4|2000-12-3

In [5]:
ratings_df.describe().show()  # Statistical summary



+-------+------------------+------------------+-----------------+
|summary|            UserID|           MovieID|           Rating|
+-------+------------------+------------------+-----------------+
|  count|           1000209|           1000209|          1000209|
|   mean| 3024.512347919285|1865.5398981612843|3.581564453029317|
| stddev|1728.4126948999824|1096.0406894572482|1.117101845373263|
|    min|                 1|                 1|                1|
|    max|              6040|              3952|                5|
+-------+------------------+------------------+-----------------+



                                                                                

In [6]:
ratings_df.select(F.min('DateTime').alias('MinDateTime')).show()
ratings_df.select(F.max('DateTime').alias('MaxDateTime')).show()

                                                                                

+-------------------+
|        MinDateTime|
+-------------------+
|2000-04-26 00:05:32|
+-------------------+





+-------------------+
|        MaxDateTime|
+-------------------+
|2003-02-28 17:49:50|
+-------------------+



                                                                                

In [7]:
def expect_column_is_int(df, colName):
    global tests_count
    tests_count += 1
    sourceDataType = dict(df.dtypes)[colName]
    if sourceDataType != 'int':
        print(f"FAILED: COLUMN \"{colName}\" IS NOT AN INTEGER")
    else:
        global tests_passed_count
        tests_passed_count += 1
        
expect_column_is_int(ratings_df, 'Rating')

In [8]:
def expect_values_between(df, colName, lower_bound, upper_bound):
    global tests_count
    tests_count += 1
    failed_values_count = df.filter(~F.col(colName).between(lower_bound, upper_bound)).count()
    if failed_values_count > 0:
        print(f"FAILED: {failed_values_count} VALUES IN COLUMN \"{colName}\" OUTSIDE EXPECTED RANGE")
    else:
        global tests_passed_count
        tests_passed_count += 1
        
expect_values_between(ratings_df, "UserID", 1, 6040)
expect_values_between(ratings_df, "MovieID", 1, 3952)
expect_values_between(ratings_df, "Rating", 1, 5)

                                                                                

In [9]:
def expect_min_ratings_per_user(target_value=20):
    global tests_count
    tests_count += 1
    # Count ratings per user
    rating_counts_per_user = ratings_df.groupBy("userId").count()

    # Find the minimum of the group count
    min_rating_count = rating_counts_per_user.selectExpr("min(count) as min_count").first()[0]

    if min_rating_count < target_value:
        print(f"FAILED: USER(S) IN RATINGS FILE DO NOT HAVE EXPECTED MINIMUM OF {target_value} RATINGS")
    else:
        global tests_passed_count
        tests_passed_count += 1
        
expect_min_ratings_per_user()

                                                                                

In [10]:
def expect_not_null_column_values(df, colName):
    global tests_count
    tests_count += 1
    # Count duplicates in column
    null_count = df.filter(F.col(colName).isNull()).count()
    
    if null_count > 0:
        print(f"FAILED: COLUMN \"{colName}\" CONTAINS NULL VALUES")
    else:
        global tests_passed_count
        tests_passed_count += 1

expect_not_null_column_values(movies_df, "MovieID")
expect_not_null_column_values(movies_df, "Title")

In [11]:
def expect_unique_column_values(df, colName):
    global tests_count
    tests_count += 1
    # Count duplicates in column
    duplicate_count = df.groupBy(colName).count().filter(F.col("count") > 1).agg(F.count("*").alias("num_duplicates")).first()[0]
    
    if duplicate_count > 0:
        print(f"FAILED: \"{colName}\" IS NOT UNIQUE")
    else:
        global tests_passed_count
        tests_passed_count += 1

expect_unique_column_values(movies_df, "MovieID")
expect_unique_column_values(movies_df, "Title")

In [12]:
def expect_imdb_format_movie_titles():
    global tests_count
    tests_count += 1
    imdb_pattern = r".*\([0-9]{4}\)$"   # Regex pattern for title and year
    # Count format mismatch
    invalid_titles_count = movies_df.filter(~movies_df.Title.rlike(imdb_pattern)).count()
    if invalid_titles_count > 0:
        print(f"FAILED: TITLE COLUMN IN MOVIES FILE CONTAINS VALUES INCONSISTENT WITH FORMAT \"TITLE (RELEASE YEAR)\"")
    else:
        global tests_passed_count
        tests_passed_count += 1
        
expect_imdb_format_movie_titles()

In [13]:
if tests_passed_count < tests_count:
    tests_failed_count = tests_count - tests_passed_count
    raise Exception(f"{tests_failed_count}/{tests_count} TESTS FAILED! RESOLVE BAD DATA TO PROCEED!")
else:
    print(f"{tests_passed_count}/{tests_count} TESTS PASSED! PERFORMING ANALYSIS...")

10/10 TESTS PASSED! PERFORMING ANALYSIS...


In [14]:
# Register DataFrames as temporary views
movies_df.createOrReplaceTempView("movies")
ratings_df.createOrReplaceTempView("ratings")

# Execute Spark SQL query
movie_results_df = spark.sql("""
    SELECT 
        m.*,
        MAX(Rating) AS MaxRating, 
        MIN(Rating) AS MinRating,
        ROUND( AVG(Rating), 2 ) AS AvgRating
    FROM movies m 
    LEFT JOIN ratings r 
        ON m.MovieID = r.MovieID
    GROUP BY 
        m.MovieID,
        m.Title,
        m.Genres
    ORDER BY
        m.MovieID
""")

# Display the results
movie_results_df.show()

[Stage 56:>                                                         (0 + 4) / 4]

+-------+--------------------+--------------------+---------+---------+---------+
|MovieID|               Title|              Genres|MaxRating|MinRating|AvgRating|
+-------+--------------------+--------------------+---------+---------+---------+
|      1|    Toy Story (1995)|Animation|Childre...|        5|        1|     4.15|
|      2|      Jumanji (1995)|Adventure|Childre...|        5|        1|      3.2|
|      3|Grumpier Old Men ...|      Comedy|Romance|        5|        1|     3.02|
|      4|Waiting to Exhale...|        Comedy|Drama|        5|        1|     2.73|
|      5|Father of the Bri...|              Comedy|        5|        1|     3.01|
|      6|         Heat (1995)|Action|Crime|Thri...|        5|        1|     3.88|
|      7|      Sabrina (1995)|      Comedy|Romance|        5|        1|     3.41|
|      8| Tom and Huck (1995)|Adventure|Children's|        5|        1|     3.01|
|      9| Sudden Death (1995)|              Action|        5|        1|     2.66|
|     10|    Gol

                                                                                

In [15]:
# Execute Spark SQL query
user_results_df = spark.sql("""
    WITH movies_ranked AS (
        SELECT 
            r.UserID,
            m.Title,
            r.Rating,
            RANK() OVER ( PARTITION BY r.UserID ORDER BY r.Rating DESC ) AS TitleRank
        FROM ratings r
        LEFT JOIN movies m
            ON r.MovieID = m.MovieID
        )
    SELECT *
    FROM movies_ranked
    WHERE TitleRank < 4
    ORDER BY
        UserID
""")

# Display the results
user_results_df.show()



+------+--------------------+------+---------+
|UserID|               Title|Rating|TitleRank|
+------+--------------------+------+---------+
|     1|One Flew Over the...|     5|        1|
|     1|Bug's Life, A (1998)|     5|        1|
|     1|      Ben-Hur (1959)|     5|        1|
|     1|Christmas Story, ...|     5|        1|
|     1|Beauty and the Be...|     5|        1|
|     1|Sound of Music, T...|     5|        1|
|     1|   Awakenings (1990)|     5|        1|
|     1|Back to the Futur...|     5|        1|
|     1|Schindler's List ...|     5|        1|
|     1|   Pocahontas (1995)|     5|        1|
|     1|Last Days of Disc...|     5|        1|
|     1|   Cinderella (1950)|     5|        1|
|     1|    Apollo 13 (1995)|     5|        1|
|     1|    Toy Story (1995)|     5|        1|
|     1|     Rain Man (1988)|     5|        1|
|     1| Mary Poppins (1964)|     5|        1|
|     1|        Dumbo (1941)|     5|        1|
|     1|Saving Private Ry...|     5|        1|
|     2|     

                                                                                

In [16]:
def write_csv(df, file_name):
    # Write the DataFrame to a CSV file
    df.write.format("csv").option("header", "true").mode("overwrite").save(file_name)
    print(f"SAVED {file_name}")

# write_csv(movies_df, "original_movies.csv")
# write_csv(ratings_df, "original_ratings.csv")
# write_csv(movie_results_df, "aggregate_ratings_per_movie.csv")
# write_csv(user_results_df, "top_3_movies_per_user.csv")