<a href="https://colab.research.google.com/github/pcamarillor/O2024_ESI3914O/blob/FranciscoFlores_Ramon_Alvarez_Lab04/Lab04.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# Check this site for the latest download link
# https://www.apache.org/dyn/closer.lua/spark
!wget -q https://dlcdn.apache.org/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3.tgz
!tar xf spark-3.5.2-bin-hadoop3.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

[33m0% [Working][0m            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
[33m0% [Waiting for headers] [Waiting for headers] [1 InRelease 3,626 B/3,626 B 100[0m[33m0% [Waiting for headers] [Waiting for headers] [Connected to r2u.stat.illinois.[0m                                                                               Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
[33m0% [Waiting for headers] [Waiting for headers] [Waiting for headers] [Connected[0m                                                                               Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:4 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Ign:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Get:6 https://r2u.stat.illinois.edu/ubuntu jammy Release [5,713 B]
Get:7 https://r2u.stat.illinois.edu/ubuntu jammy Release.gpg [793 B]
Get:8 http://archive.ubu

In [21]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, col, when, to_date, year

#spark = SparkSession.builder.appName("Movies-Activity").getOrCreate()
spark = SparkSession.builder \
    .appName("Movies-Activity") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

df_movies = spark.read \
            .option("inferSchema", "true") \
            .json("/datasets/movies.json")

#Replace null values
df_movies = df_movies.na.fill({
    "Production_Budget": 0,
    "Worldwide_Gross": 0,
    "IMDB_Rating": 5.0,
    "Distributor": "Unknown",
    "Release_Date": "01-Jan-00"
})

#Print Schema
df_movies.printSchema()

root
 |-- Creative_Type: string (nullable = true)
 |-- Director: string (nullable = true)
 |-- Distributor: string (nullable = false)
 |-- IMDB_Rating: double (nullable = false)
 |-- IMDB_Votes: long (nullable = true)
 |-- MPAA_Rating: string (nullable = true)
 |-- Major_Genre: string (nullable = true)
 |-- Production_Budget: long (nullable = false)
 |-- Release_Date: string (nullable = false)
 |-- Rotten_Tomatoes_Rating: long (nullable = true)
 |-- Running_Time_min: long (nullable = true)
 |-- Source: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- US_DVD_Sales: long (nullable = true)
 |-- US_Gross: long (nullable = true)
 |-- Worldwide_Gross: long (nullable = false)



In [22]:
# 1. Calculate the Profit Margin
def calculate_profit_margin(df):
    return df.withColumn("Profit_Margin",
                         (df["Worldwide_Gross"] - df["Production_Budget"]) / df["Production_Budget"])

# 2. Function to determine Box Office Hit"
def determine_box_office_hit(df):
    return df.withColumn("Box_Office_Hit",
                         when(df["Worldwide_Gross"] > 2 * df["Production_Budget"], "Hit")
                         .otherwise("Flop"))

# 3. Function to convert the release date and extract the year
def convert_release_date_and_extract_year(df):
    df = df.withColumn("Release_Date_Formatted", to_date(df["Release_Date"], "dd-MMM-yy"))
    return df.withColumn("Release_Year", year(df["Release_Date_Formatted"]))

# 4. Function to create the IMDB rating category
def create_imdb_rating_category(df):
    return df.withColumn("IMDB_Rating_Category",
                         when(df["IMDB_Rating"] >= 7.0, "High")
                         .when((df["IMDB_Rating"] >= 5.0) & (df["IMDB_Rating"] < 7.0), "Medium")
                         .otherwise("Low"))

# 5. Function to calculate the average IMDB rating by distributor
def calculate_avg_imdb_rating_by_distributor(df):
    return df.groupBy("Distributor") \
             .agg({"IMDB_Rating": "avg"}) \
             .withColumnRenamed("avg(IMDB_Rating)", "Average_IMDB_Rating")

In [23]:
# Apply functions
df_movies = calculate_profit_margin(df_movies)
df_movies = determine_box_office_hit(df_movies)
df_movies = convert_release_date_and_extract_year(df_movies)
df_movies = create_imdb_rating_category(df_movies)

#Calculate the average IMDB rating by distributor
df_avg_rating_by_distributor = calculate_avg_imdb_rating_by_distributor(df_movies)

# Show results
df_movies.show()
df_avg_rating_by_distributor.show()

+--------------------+-----------------+--------------+-----------+----------+-----------+-----------+-----------------+------------+----------------------+----------------+-------------------+--------------------+------------+--------+---------------+-------------------+--------------+----------------------+------------+--------------------+
|       Creative_Type|         Director|   Distributor|IMDB_Rating|IMDB_Votes|MPAA_Rating|Major_Genre|Production_Budget|Release_Date|Rotten_Tomatoes_Rating|Running_Time_min|             Source|               Title|US_DVD_Sales|US_Gross|Worldwide_Gross|      Profit_Margin|Box_Office_Hit|Release_Date_Formatted|Release_Year|IMDB_Rating_Category|
+--------------------+-----------------+--------------+-----------+----------+-----------+-----------+-----------------+------------+----------------------+----------------+-------------------+--------------------+------------+--------+---------------+-------------------+--------------+----------------------+

In [24]:
spark.stop()