In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# Creating a SparkSession
spark = SparkSession.builder.appName("Tomatometer") \
                       .getOrCreate()

In [3]:
# Read a CSV file
df = spark.read.csv('./data/rotten_tomatoes_movies.csv', header=True, inferSchema=True)

In [4]:
df.show()

+--------------------+--------------------+-------------+-----------+------+--------------------+-------------------+--------------------+--------------+--------------------+----------------+--------------------+--------------------+---------+-----------------+--------+
|                  id|               title|audienceScore|tomatoMeter|rating|      ratingContents|releaseDateTheaters|releaseDateStreaming|runtimeMinutes|               genre|originalLanguage|            director|              writer|boxOffice|      distributor|soundMix|
+--------------------+--------------------+-------------+-----------+------+--------------------+-------------------+--------------------+--------------+--------------------+----------------+--------------------+--------------------+---------+-----------------+--------+
|  space-zombie-bingo| Space Zombie Bingo!|           50|       NULL|  NULL|                NULL|               NULL|          2018-08-25|            75|Comedy, Horror, S...|         Engl

### Pyspark Steps

#### Cleaning

In [5]:
# check missing valiues in audienceScore
audience_score_nulls = df.filter(df['audienceScore'].isNull()).count()
print(audience_score_nulls)

70010


In [6]:
# check missing valiues in tomatometer
tomato_meter_nulls = df.filter(df['tomatoMeter'].isNull()).count()
print(tomato_meter_nulls)

109381


In [7]:
# Check for missing values in releaseDateTheaters
release_date_nulls = df.filter(df['releaseDateTheaters'].isNull()).count()
print(release_date_nulls)

112485


In [8]:
# Drop rows nulls from previous columns
df_cleaned = df.dropna(subset=['audienceScore', 'tomatoMeter', 'releaseDateTheaters'])

In [9]:
audience_score_nulls = df_cleaned.filter(df['audienceScore'].isNull()).count()
print(audience_score_nulls)
# checking to make sure dropna worked

0


#### Processing 

In [18]:
# Year: Create a year column by parsing the year from the releaseDateTheaters column.

In [10]:
df_cleaned.show(10)

+--------------------+--------------------+-------------+-----------+------+--------------------+-------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+--------+
|                  id|               title|audienceScore|tomatoMeter|rating|      ratingContents|releaseDateTheaters|releaseDateStreaming|runtimeMinutes|               genre|    originalLanguage|            director|              writer|boxOffice|         distributor|soundMix|
+--------------------+--------------------+-------------+-----------+------+--------------------+-------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+--------+
|         adrift_2018|              Adrift|           65|         69| PG-13|['Injury Images',...|         2018-06-01|          2018-08-21|           120|Adventure, Dr

In [11]:
df = df_cleaned.withColumn('year', F.year(F.col('releaseDateTheaters')))
df.show(5)

+--------------------+--------------------+-------------+-----------+------+--------------------+-------------------+--------------------+--------------+--------------------+----------------+--------------------+--------------------+---------+-------------------+--------+----+
|                  id|               title|audienceScore|tomatoMeter|rating|      ratingContents|releaseDateTheaters|releaseDateStreaming|runtimeMinutes|               genre|originalLanguage|            director|              writer|boxOffice|        distributor|soundMix|year|
+--------------------+--------------------+-------------+-----------+------+--------------------+-------------------+--------------------+--------------+--------------------+----------------+--------------------+--------------------+---------+-------------------+--------+----+
|         adrift_2018|              Adrift|           65|         69| PG-13|['Injury Images',...|         2018-06-01|          2018-08-21|           120|Adventure, Dr

In [15]:
# Top Movies: Find the top 5000 movies based on Tomatometer score using functions like orderBy and limit.
top_movies_df = df.orderBy(F.col('tomatoMeter').desc()).limit(5000)
top_movies_df.show()

+--------------------+--------------------+-------------+-----------+------+--------------------+-------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+-----------+----+
|                  id|               title|audienceScore|tomatoMeter|rating|      ratingContents|releaseDateTheaters|releaseDateStreaming|runtimeMinutes|               genre|    originalLanguage|            director|              writer|boxOffice|         distributor|   soundMix|year|
+--------------------+--------------------+-------------+-----------+------+--------------------+-------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+-----------+----+
|     the_human_trial|     The Human Trial|           99|        100|  NULL|                NULL|         2022-06-24|          2022-06-24|    

In [14]:
# save as parquet
top_movies_df.write.parquet('top_movies_df.parquet')