In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import functions as F
from pyspark.sql.functions import (
    when, countDistinct, count, avg,
    split, explode, trim, lit, length,
    current_date, row_number, regexp_replace,
    to_date, coalesce, col,
    year, month, dayofmonth, array_contains, min, max, year
)
from pyspark.sql.functions import sum as spark_sum
from pyspark.sql.window import Window
import numpy as np

In [None]:
display(dbutils.fs.ls("dbfs:/FileStore/tables/"))

In [None]:
# File location and type
file_location = "/FileStore/tables/Imdb_Movie_Dataset-4.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
# quote <-- Important to handle commas correctly
df = (
    spark.read.format(file_type)
    .option("inferSchema", infer_schema)
    .option("header", first_row_is_header)
    .option("sep", delimiter)
    .option("quote", '"')
    .option("escape", '"')
    .load(file_location)
)

display(df)

In [None]:
# to check the total number of rows
df.count()

%md
# Data Cleaning

%md
**Can we predict the box office revenue of a movie before it is released?**

This project explores that question by building a machine learning regression model using Apache Spark MLlib to predict the revenue of a movie based on features such as budget, genre, popularity, vote average, and more.

In [None]:
# To check the type of each feature
df.printSchema()

%md
Here we can observe that some features have entries with an incorrect or inappropriate data type for what they represent.

%md
## Dataset Schema with Correct Types and Columns Description

| Column                  | Current Type | Correct Type      | Column Description                                                                          |
|-------------------------|--------------|-------------------|---------------------------------------------------------------------------------------------|
| id                      | integer      | integer           |A unique identifier for each movie                                                           |
| title                   | string       | string            |The name of the movie                                                                        |
| vote_average            | string       | **double** (float)|The average rating the movie has received from users (on a scale, typically from 0 to 10)    |
| vote_count              | string       | **integer**       |The total number of votes or ratings submitted for the movie                                 |
| status                  | string       | string            |The current state of the movie (e.g., "Released," "Post-Production")                         |
| release_date            | string       | **date**          |The date when the movie was officially released                                              |
| revenue                 | string       | **long** (integer)|The total earnings the movie made (usually in USD)                                           |
| runtime                 | string       | **integer**       |The duration of the movie in minutes                                                         |
| adult                   | string       | **boolean**       |Indicates whether the movie is classified as adult content (e.g., "True" or "False")          |
| budget                  | string       | **long** (integer)|The total cost of producing the movie (usually in USD)                                       |
| imdb_id                 | string       | string            |The unique identifier for the movie on IMDb (Internet Movie Database)                        |
| original_language       | string       | string            |The language in which the movie was originally produced (e.g., "en" for English)             |
| original_title          | string       | string            |The original title of the movie in its native language                                       |
| overview                | string       | string            |A brief summary or description of the movie's plot                                           |
| popularity              | string       | **double** (float)|A metric indicating how popular the movie is (typically based on views, searches, or ratings)|
| tagline                 | string       | string            |A short phrase or slogan associated with the movie                                           |
| genres                  | string       | string   |The categories or genres the movie belongs to (e.g., Action, Comedy, Drama)                  |
| production_companies    | string       | string   |production_companies: The names of the companies involved in producing the movie             |
| production_countries    | string       | string   |The countries where the movie was produced                                                   |
| spoken_languages        | string       | string   |The languages spoken in the movie                                                            |
| keywords                | string       | string   |Important terms or phrases associated with the movie, often used for categorization or search|

%md
## Missing Values

In [None]:
# Function to calculate missing values by column
def missing_values_table_spark(df):
    """
    Calculates the total number and percentage of missing (null) values 
    for each column in a PySpark DataFrame.

    Returns a Pandas DataFrame with columns:
    - 'Column': column name
    - 'Missing Values': count of missing values
    - '% of Total Values': percentage of missing values
    
    Only columns with missing values are included, sorted in descending order.
    """
    # Calculate the total missing values for each column
    mis_val = df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df.columns])

    # Convert to Pandas for easier handling
    mis_val_pd = mis_val.toPandas().transpose()

    # Calculate the percentage of missing values for each column
    mis_val_percent = (mis_val_pd[0] / df.count()) * 100

    # Create a new table combining count and percentage
    mis_val_table = pd.concat([mis_val_pd, mis_val_percent], axis=1)
    mis_val_table.columns = ['Missing Values', '% of Total Values']

    # Keep only columns with >0% missing, sort descending, round
    mis_val_table = (
        mis_val_table[mis_val_table['% of Total Values'] > 0]
        .sort_values('% of Total Values', ascending=False)
        .round(1)
    )

    # Reset index so that original column names become a column
    mis_val_table = mis_val_table.reset_index().rename(columns={'index': 'Column'})

    # Print summary
    print(f"Your selected dataframe has {len(df.columns)} columns.\n"
          f"There are {mis_val_table.shape[0]} columns that have missing values.")

    return mis_val_table

# Usage
missing_values = missing_values_table_spark(df)
display(missing_values)

In [None]:
def plot_missing_values(mis_val_table, title):
    """
    Plots the percentage of missing values per feature, showing feature names on the x-axis.
    Assumes `mis_val_table` has a column 'Column' with the feature names and
    '% of Total Values' with the missing-value percentages.
    """
    plt.figure(figsize=(12, 8))

    # Use the 'Column' column for x-labels
    sns.barplot(
        data=mis_val_table,
        x='Column',
        y='% of Total Values'
    )

    # Rotate the x labels for better readability
    plt.xticks(rotation=90)

    # Set plot labels and title
    plt.title(title)
    plt.xlabel('Feature')
    plt.ylabel('% of Total Values')

    plt.tight_layout()
    plt.show()

# Call the function to plot the missing values with feature names shown
plot_missing_values(missing_values, title='Percentage of Missing Values by Feature')

%md
While exploring the dataset, we quickly noticed that some columns have a large amount of missing data. For example, about 85% of the values in the `tagline` column and around 72% in `keywords` are missing. These high percentages suggest that these fields may not be very useful for our analysis. Considering what each variable represents and how relevant it is to our goals, we can start identifying which features are likely to bring little value and could be safely removed. This helps us simplify the dataset and minimize noise, making the overall analysis more efficient and focused.

%md
## Zero Values

In [None]:
# Updated function to include a 'Column' column instead of using the index
def zero_values_table_spark(df):
    """
    Calculates the total number and percentage of zero values 
    for each column in a PySpark DataFrame.

    Returns a Pandas DataFrame with columns:
    - 'Column': column name
    - 'Zero Values': count of zero values
    - '% of Total Values': percentage of zero values

    Only columns with zero values are included, sorted in descending order.
    """
    # For each column, count how many values equal zero
    zero_counts = df.select([F.sum(F.when(F.col(c) == 0, 1).otherwise(0)).alias(c) for c in df.columns])

    # Convert result to Pandas for easier manipulation
    zero_counts_pd = zero_counts.toPandas().transpose()

    # Compute percentage
    row_count = df.count()
    zero_percent = (zero_counts_pd[0] / row_count) * 100

    # Create summary table
    zero_table = pd.concat([zero_counts_pd, zero_percent], axis=1)
    zero_table.columns = ['Zero Values', '% of Total Values']

    # Keep only columns with zeros, sort and round
    zero_table = (
        zero_table[zero_table['Zero Values'] > 0]
        .sort_values('% of Total Values', ascending=False)
        .round(1)
    )

    # Reset index so column names appear as a column
    zero_table = zero_table.reset_index().rename(columns={'index': 'Column'})

    # Print summary
    print(f"Your selected dataframe has {len(df.columns)} columns.\n"
          f"There are {zero_table.shape[0]} columns that have zero values.")

    return zero_table

# Use the function
zero_values = zero_values_table_spark(df)
display(zero_values)

In [None]:
def plot_zero_values(zero_table, title):
    """
    Plots the percentage of zero values per feature, using the 'Column' column
    for the x-axis labels.
    """
    plt.figure(figsize=(10, 8))

    sns.barplot(
        data=zero_table,
        x='Column',
        y='% of Total Values'
    )

    # Rotate x-labels for readability
    plt.xticks(rotation=90)

    # Set labels and title
    plt.title(title)
    plt.xlabel('Feature')
    plt.ylabel('% of Total Values')

    plt.tight_layout()
    plt.show()

# print
plot_zero_values(zero_values, title='Percentage of Zero Values by Feature')

%md
From this analysis of zero values, we can clearly see that two of the most critical columns for our originally defined problem, `revenue` and `budget`, contain an overwhelming proportion of zero entries. In fact, `revenue`, which we had considered as our potential target variable, has 98% zero values, while `budget` has nearly 95%. These figures are highly problematic, as they can significantly compromise the reliability of any predictive modeling. Therefore, one of our key next steps will be to carefully handle these zero values, either by imputing our kick these obsrvations of our dataset.

%md
## Duplicated Records (ID and IMBD_ID features)

%md
Our dataset contains two main identifier columns: `id` and `imdb_id`. Because of this, it's possible that duplicate values exist in either of them. In the following steps, we perform a detailed analysis to check for duplicate observations in the dataset, and, if any are found, we inspect their nature by looking at specific examples.

%md
for `id` column

In [None]:
# Calculate total and distinct counts
total_rows = df.count()
unique_id = df.select(countDistinct("id")).collect()[0][0]

# Calculate duplicated counts
duplicated_id = total_rows - unique_id

# Print results
print(f"Total duplicated IDs: {duplicated_id}")

# Display examples of duplicated 'id' rows
dup_id_vals = (
    df
    .groupBy("id")
    .count()
    .filter(col("count") > 1)
    .select("id")
)
dup_id_rows = (
    df
    .join(dup_id_vals, on="id", how="inner")
    .orderBy("id")
)
print("Examples of duplicated 'id' rows:")
display(dup_id_rows.limit(10))

%md
In this part of the analysis, we found 794 records with duplicated values in the `id` column. By inspecting these duplicates alongside their corresponding columns, we noticed that many of the repeated entries contain a large number of null values in the remaining fields. This suggests that these duplicates are most likely unnecessary entries. possibly introduced by mistake during data collection or integration. Given the high number of missing values in these cases, a sensible next step will be to remove them, keeping only the most complete version of each duplicated record to ensure data quality and consistency.

%md
for `imdb_id` column

In [None]:
# Calculate counts for non-null imdb_id values
non_null_rows = df.filter(col("imdb_id").isNotNull()).count()
unique_non_null = (
    df
    .filter(col("imdb_id").isNotNull())
    .select(countDistinct("imdb_id"))
    .collect()[0][0]
)
duplicated_imdb_id = non_null_rows - unique_non_null

# Print the results
print(f"Total non-null imdb_id rows: {non_null_rows}")
print(f"Total duplicated IMDB IDs (excluding nulls): {duplicated_imdb_id}")

# Display examples of duplicated 'imdb_id' rows
dup_imdb_vals = (
    df
    .filter(col("imdb_id").isNotNull())
    .groupBy("imdb_id")
    .count()
    .filter(col("count") > 1)
    .select("imdb_id")
)

dup_imdb_rows = (
    df
    .join(dup_imdb_vals, on="imdb_id", how="inner")
    .orderBy("imdb_id")
)

print("Examples of duplicated 'imdb_id' rows:")
display(dup_imdb_rows.limit(10))


%md
As previously noted, the `imdb_id` column contains a high percentage of missing values. To accurately check for duplicates in this feature, we focused only on the non-null entries. Within this subset, we identified 1,160 duplicate records. Just like with the duplicated `id` values, our next step will be to remove these redundant entries by comparing the amount of missing data across the duplicated rows, keeping the most complete version in each case. This will help us clean the dataset and reduce unnecessary noise caused by incomplete or inconsistent records.

%md
## Summary

### Missing Values
These columns have a significant number of null entries:
- `tagline` (85.4% missing)  
- `keywords` (72.1% missing)  
- `production_companies` (54.2% missing)  
- `imdb_id` (46.5% missing)  
- `production_countries` (44.2% missing)  
- `spoken_languages` (~42% missing)  
- `genres` (39.6% missing)  
- `overview` (20.6% missing)  
- `release_date` (17.3% missing)

### Zero Values
The following columns contain a substantial fraction of zeros:
- `revenue` (98% zeros)  
- `budget` (94.6% zeros)  
- `adult` (90.7% zeros)  
- `vote_average` (66.6% zeros)  
- `vote_count` (66.5% zeros)  
- `runtime` (28% zeros)  
- `popularity` (13.2% zeros)

### Duplicated Records
The dataset contains 794 duplicate entries with the same `id`, and also several duplicates based on `imdb_id`, many of which have a high number of null values.

%md
We will work with copies of our DataFrame up to each checkpoint to ensure that no unintended changes are made to the original dataset. This also helps avoid rerunning the entire pipeline multiple times whenever we want to test the behavior of a specific code block.

In [None]:
# Safe copy of the original DataFrame
df_copy1 = df.select("*")

%md
First of all, we need to fix the data types of some features, otherwise they won't be useful for our project modeling.
Some features needs an additional analysis, we will correct their data types after.

In [None]:
df_copy1 = df_copy1.withColumn(
        "vote_average", col("vote_average").cast("double")
    ).withColumn(
        "vote_count", col("vote_count").cast("integer")
    ).withColumn(
        "revenue", col("revenue").cast("long")
    ).withColumn(
        "runtime", col("runtime").cast("integer")
    ).withColumn(
        "adult", col("adult").cast("boolean")
    ).withColumn(
        "budget", col("budget").cast("long")
    ).withColumn(
        "popularity", col("popularity").cast("double")
    )


# Verify that the types have been updated
df_copy1.printSchema()

%md
## Fixing Duplicate Records (ID and IMBD_ID features)

%md
Here, as previously mentioned in the section where we identified the presence of outliers, we will now deal with them. In order to properly handle the duplicates found in our two identifier columns, we first need to ensure that missing values are correctly represented. Specifically, we will count and replace, if present, the string *'None'* with the actual *None* (*null*) value in the `imdb_id` column.

In [None]:
# Count how many "None" strings are in imdb_id
none_count = df_copy1.filter(col("imdb_id") == "None").count()

# Replace "None" strings with actual nulls
df_copy1 = df_copy1.withColumn(
    "imdb_id",
    when(col("imdb_id") == "None", None)
      .otherwise(col("imdb_id"))
)

# Print how many values were set to null
print(f"Number of 'None' values replaced with null in 'imdb_id': {none_count}")

%md
This section is where we identified the presence of duplicates, we will now deal with them.

In [None]:
def drop_duplicates_keep_least_null(df, id_col="id"):
    """
    Remove duplicate rows among those with non-null `id_col`, keeping the row with the fewest null values.
    Rows where `id_col` is null are left untouched.
    Prints how many rows were dropped among non-null IDs.
    """
    # Split into rows with and without an ID
    non_null_df = df.filter(col(id_col).isNotNull())
    null_df     = df.filter(col(id_col).isNull())
    
    # Count non-null rows before deduplication
    orig_count = non_null_df.count()
    
    # For each row, count how many columns are null
    null_count_expr = sum(
        when(col(c).isNull(), 1).otherwise(0) for c in non_null_df.columns
    ).alias("_null_count")
    nn_with_nulls = non_null_df.withColumn("_null_count", null_count_expr)
    
    # Within each id group, rank rows by null_count ascending
    window = Window.partitionBy(id_col).orderBy(col("_null_count").asc())
    ranked = nn_with_nulls.withColumn("_rn", row_number().over(window))
    
    # Keep only the first row for each id (fewest nulls), drop helper cols
    deduped_non_null = (
        ranked
        .filter(col("_rn") == 1)
        .drop("_null_count", "_rn")
    )
    
    # Count how many non-null rows were dropped
    new_count = deduped_non_null.count()
    dropped = orig_count - new_count
    print(f"Dropped {dropped} duplicate rows among non-null '{id_col}'.")
    
    # Combine back with the untouched null-ID rows
    result_df = deduped_non_null.unionByName(null_df)
    return result_df

# Use the function to deduplicate by "id", preserving rows with null id
df_copy1 = drop_duplicates_keep_least_null(df_copy1, id_col="id")

display(df_copy1.limit(10))

In [None]:
# Now drop duplicates by imdb_id
df_copy1 = drop_duplicates_keep_least_null(df_copy1, id_col="imdb_id")

# Inspect a few rows to confirm
display(df_copy1.limit(10))

In [None]:
# Sanity check - recalculate duplicates
# Total rows
total_rows = df_copy1.count()

# id duplicates (id never null)
unique_id = df_copy1.select(countDistinct("id")).collect()[0][0]
duplicated_id = total_rows - unique_id

# imdb_id duplicates (exclude null/"None")
valid_imdb = df_copy1.filter(col("imdb_id").isNotNull() & (col("imdb_id") != "None"))
total_valid = valid_imdb.count()
unique_imdb = valid_imdb.select(countDistinct("imdb_id")).collect()[0][0]
duplicated_imdb_id = total_valid - unique_imdb

print(f"Total duplicated IDs: {duplicated_id}")
print(f"Total duplicated IMDB IDs (excluding nulls): {duplicated_imdb_id}")


%md
Now, `id` and `imdb_id` columns are deduplicated. Their datatype is correct, so no need for further changes.

%md
## Numerical Feature Analysis

%md
### First cleaning (REVENUE and BUDGET)

%md
As we mentioned earlier, one of the main issues with our dataset is that the `revenue` and `budget` features contain a very high percentage of zero values. Because of this, we need to address the problem, otherwise, we would be working with a massive dataset that is hard to run efficiently in Databricks and may lead to inconclusive results.

We need to filter or reduce the dataset. While this is technically a big data problem (we have many entries), in practice, we'll only work with a meaningful subset. This is not only for efficiency and performance reasons but also because we want to use `revenue` as our target variable and if 98% of our dataset has zero revenue, training a model on all of it would introduce a strong bias and hurt the model's ability to learn meaningful patterns. We would essentially be training on noise.

In [None]:
# Summary of null, empty-string, and "None"/"none" counts for revenue and budget
total = df_copy1.count()

for cname in ["revenue", "budget"]:
    nulls   = df_copy1.filter(col(cname).isNull()).count()
    empties = df_copy1.filter((col(cname) == "") |(col(cname) == " ")).count()
    nones   = df_copy1.filter(col(cname).isin("None", "none")).count()
    zeros   = df_copy1.filter(col(cname) == 0).count()
    
    print(
        f"{cname}: total={total}, "
        f"nulls={nulls}, "
        f"empty strings={empties}, "
        f"'None'/'none' strings={nones}, "
        f"zeros={zeros}"
    )

%md
Now, we will drop the rows where `revenue` == 0.

In [None]:
# Count total rows before filtering
initial_count = df_copy1.count()

# Filter to only movies with revenue > 0
df_copy1 = df_copy1.filter(col("revenue").isNotNull() & (col("revenue") > 0))

# Count rows after filtering
final_count = df_copy1.count()

# Compute how many were dropped
dropped_count = initial_count - final_count

# Report
print(f"Dropped {dropped_count} movies with zero or null revenue.")
print(f"Remaining movies for training: {final_count}")

%md
We drop all movies with `revenue` == 0 because in our dataset those zeroes overwhelmingly represent missing or unreported box-office figures, not true earnings, and including them would force the model to learn spurious patterns tied to data availability rather than actual revenue drivers. By training only on films with known, positive revenue, we ensure the regression learns genuine relationships (e.g. between budget, popularity, genre, etc.) and avoids biasing predictions toward “zero” whenever it encounters features associated with unreleased or untracked titles.

%md
ploting the origianl distributions

In [None]:
# Filter 
df_money = df_copy1.filter((col("budget") > 0) & (col("revenue") > 0))

# Select and convert necessary columns to Pandas
df_money_pd = df_money.select("budget", "revenue").toPandas()

# Plot original distributions
fig, axes = plt.subplots(1, 2, figsize=(14, 6))
sns.set(style="whitegrid")

# Budget distribution
sns.histplot(df_money_pd['budget'], bins=100, ax=axes[0], color='orange')
axes[0].set_title('Budget Distribution')
axes[0].set_xlabel('Budget ($)')
axes[0].set_ylabel('Count')

# Revenue distribution
sns.histplot(df_money_pd['revenue'], bins=100, ax=axes[1], color='blue')
axes[1].set_title('Revenue Distribution')
axes[1].set_xlabel('Revenue ($)')
axes[1].set_ylabel('Count')

plt.tight_layout()
plt.show()

%md
ploting with the log10 transformation

In [None]:
# Filter Spark DataFrame for positive budget and revenue
df_money = df_copy1.filter((col("budget") > 0) & (col("revenue") > 0))

# Convert to Pandas
df_money_pd = df_money.select("budget", "revenue").toPandas()

# Apply log10 transformation
df_money_pd['log_budget'] = np.log10(df_money_pd['budget'])
df_money_pd['log_revenue'] = np.log10(df_money_pd['revenue'])

# Plot
sns.set(style="whitegrid")
fig, axes = plt.subplots(1, 2, figsize=(14, 6))

sns.histplot(df_money_pd['log_budget'], bins=50, ax=axes[0], color='orange')
axes[0].set_title('Log10 Budget Distribution')
axes[0].set_xlabel('Log10(Budget)')

sns.histplot(df_money_pd['log_revenue'], bins=50, ax=axes[1], color='blue')
axes[1].set_title('Log10 Revenue Distribution')
axes[1].set_xlabel('Log10(Revenue)')

plt.tight_layout()
plt.show()

%md
Budget and Revenue (log₁₀) Distributions

Log₁₀(Budget)
The bulk of film budgets cluster around log₁₀(budget) ≈ 6.5–7.5, i.e. budgets of roughly \$3 million to \$30 million. Most budgets fall between log₁₀ ≈ 5 (≈ \$100 k) and log₁₀ ≈ 8 (≈ \$100 million). There is a long left‐hand tail extending down toward zero (very low‐budget or no‐budget films), and a shorter right tail tapering off beyond log₁₀ ≈ 8. Small peaks around log₁₀ ≈ 2–3 (\$100–\$1 000 budgets) and log₁₀ ≈ 4–5 (\$10 000–\$100 000) likely correspond to micro‐budget productions.

Log₁₀(Revenue)
Revenues tend to concentrate around log₁₀(revenue) ≈ 7–8, i.e. \$10 million to \$100 million. Revenues span from very low figures (log₁₀ ≈ 0–2, under \$100–\$100) up to blockbuster grosses (log₁₀ ≈ 9+, over \$1 billion), though the extreme high end thins out sharply. The distribution is right‐skewed, with a heavy left tail of low‐earning films and a tapering right tail of big‐blockbusters. A visible bump near log₁₀ ≈ 4–5 (\$10 000–\$100 000) suggests a cluster of modest indie releases, and occasional spikes around log₁₀ ≈ 2 (\$100–\$1 000) reflect very small‐scale runs or data artifacts.

Overall, both distributions are left-skewed on the log scale, with most films clustering in the mid‐range budgets and revenues, but with long tails toward both the ultra‐low and the ultra‐high ends.

In [None]:
# Movies with budget == 0 but revenue != 0
zero_budget_nonzero_rev = df_copy1.filter((col("budget") == 0) & (col("revenue") != 0))
count_zero_budget_nonzero_rev = zero_budget_nonzero_rev.count()
print(f"Movies with budget = 0 and revenue != 0: {count_zero_budget_nonzero_rev}")
display(zero_budget_nonzero_rev.limit(5))

%md
Here we can observe that some movies had zero budget but still generated revenue. At first, this may seem strange, but it makes sense when we consider, for example, that many animated films or productions have very low or zero production costs and can still achieve substantial profits.

In [None]:
# Movies with negative budget
neg_budget_count = df_copy1.filter(col("budget") < 0).count()
print(f"Number of movies with negative budget: {neg_budget_count}")

#  Movies with negative revenue
neg_revenue_count = df_copy1.filter(col("revenue") < 0).count()
print(f"Number of movies with negative revenue: {neg_revenue_count}")
display(df_copy1.filter(col("revenue") < 0))


%md
### VOTE_AVERAGE and VOTE_COUNT

In [None]:
# Summary of null, empty-string, and "None"/"none" counts for vote_average and vote_count
total = df_copy1.count()

for cname in ["vote_average", "vote_count"]:
    nulls   = df_copy1.filter(col(cname).isNull()).count()
    empties = df_copy1.filter((col(cname) == "") | (col(cname) == " ")).count()
    nones   = df_copy1.filter(col(cname).isin("None", "none")).count()
    
    print(
        f"{cname}: total={total}, "
        f"nulls={nulls}, "
        f"empty strings={empties}, "
        f"'None'/'none' strings={nones}"
    )

%md
IMDb’s user rating system allows you to assign any whole-number score from 1 (the lowest) up to 10 (the highest) for any movie.

In [None]:
# Movies with vote_average below the allowed minimum (1)
below_allowed = df_copy1.filter(col("vote_average") < 1)
count_below = below_allowed.count()
print(f"Number of movies with vote_average below 1: {count_below}")

# Movies with vote_average above the allowed maximum (10)
above_allowed = df_copy1.filter(col("vote_average") > 10)
count_above = above_allowed.count()
print(f"Number of movies with vote_average above 10: {count_above}")

%md
we identified nearly 4,000 records where the `vote_average` was below 1, even though the `vote_count` was not null. Since IMDb does not allow movie ratings lower than 1, we considered these values invalid. To handle this issue, we will first replace every instance, where `vote_average` == 0 with null. Later, after data split we will decide how to handle this issue further.

In [None]:
# Count instances where vote_count == 0
count_zero = df_copy1.filter(col("vote_count") == 0).count()
print(f"Number of rows with vote_count == 0: {count_zero}")

In [None]:
# Movies where vote_count == 0 but vote_average != 0
cond = df_copy1.filter((col("vote_count") == 0) & (col("vote_average") != 0))
count = cond.count()
print(f"Number of movies with vote_count == 0 and vote_average != 0: {count}")
# Display all columns for a few examples
display(cond.limit(5))

%md
Here we can see that there are 4 movies where the `vote_count` is zero, yet the `vote_average` is different from zero. This situation is clearly not possible, since the average should depend on the number of votes. Therefore, we will cast those `vote_average` to null.

In [None]:
# 1. count how many rows either have `vote_count`==0 or `vote_average` < 1
flagged_count = df_copy1.filter(
    (F.col("vote_count") == 0) & 
    (F.col("vote_average") < 1)
).count()

# 2. apply the transformation: set vote_average to null for those rows
df_copy1 = df_copy1.withColumn(
    "vote_average",
    when(
        (col("vote_average") < 1) & (col("vote_count") == 0),
        lit(None).cast("double")     # cast via type name
    ).otherwise(col("vote_average"))
)

# 3. count how many rows are now null
null_count = df_copy1.filter(F.col("vote_average").isNull()).count()

# 4. print results
print(f"Rows matching vote_count==0 AND vote_average < 1: {flagged_count}")
print(f"Rows with vote_average set to null:       {null_count}")

In [None]:
# Convert only the needed columns to Pandas to avoid memory issues
df_plot = df_copy1.select("vote_average", "vote_count").dropna().toPandas()

# Set seaborn style
sns.set(style="whitegrid")

# Create subplots
fig, axes = plt.subplots(1, 2, figsize=(14, 6))

# Plot vote_average histogram
sns.histplot(df_plot['vote_average'], bins=20, kde=True, ax=axes[0], color='skyblue')
axes[0].set_title('Distribution of Vote Average')
axes[0].set_xlabel('Vote Average')
axes[0].set_ylabel('Count')

# Plot vote_count histogram (log scale)
sns.histplot(df_plot['vote_count'], bins=50, kde=False, ax=axes[1], color='salmon')
axes[1].set_title('Distribution of Vote Count')
axes[1].set_xlabel('Vote Count')
axes[1].set_ylabel('Count')
axes[1].set_yscale('log')  # log scale for better readability of skewed data

plt.tight_layout()
plt.show()

%md
The distribution for `vote_average` looks almost normal, with a bump around 6, which aligns with typical average ratings on platforms like IMDb or TMDB. Additionally, almost 1000 movies have a perfect score of 10.

When it comes to `vote_count`, there's a long tail of movies with thousands of votes, likely the popular ones. However, a big portion of movies did not receive any votes.

%md
### RELEASE_DATE

%md
First, we need to handle the data type of `release_date`. Upon initial feature investigation, we realized that some dates might be in US formats and some in EU. We will need to hadle it appropriately.

In [None]:
# Summary of null, empty-string, and "None"/"none" counts for release_date
total_rows    = df_copy1.count()
null_count    = df_copy1.filter(col("release_date").isNull()).count()
empty_count   = df_copy1.filter((col("release_date") == "") | (col("release_date") == " ")).count()
none_count    = df_copy1.filter(col("release_date").isin("None", "none")).count()

print(f"Total rows: {total_rows}")
print(f"Null release_date values          : {null_count}")
print(f"Empty-string release_date values  : {empty_count}")
print(f"'None'/'none' release_date values : {none_count}")

In [None]:
# Enable legacy date parser to avoid SparkUpgradeException on ambiguous dates
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# define possible parsers
date_eu  = to_date(col("release_date"), "dd/MM/yyyy")
date_us  = to_date(col("release_date"), "MM/dd/yyyy")
date_iso = to_date(col("release_date"), "yyyy-MM-dd")

# preview 5 rows of each “bucket”
print("=== EU format samples ===")
df_copy1.filter(date_eu.isNotNull()) \
  .select("release_date") \
  .distinct() \
  .show(5, truncate=False)

print("=== US format samples ===")
df_copy1.filter(date_eu.isNull() & date_us.isNotNull()) \
  .select("release_date") \
  .distinct() \
  .show(5, truncate=False)

print("=== ISO format samples ===")
df_copy1.filter(date_eu.isNull() & date_us.isNull() & date_iso.isNotNull()) \
  .select("release_date") \
  .distinct() \
  .show(5, truncate=False)

print("=== Other/unrecognized samples ===")
df_copy1.filter(date_eu.isNull() & date_us.isNull() & date_iso.isNull()) \
  .select("release_date") \
  .distinct() \
  .show(5, truncate=False)

%md
Turns out we only have dates in eu, us or iso formats. 

In [None]:
# coalesce into a single DateType column
df2 = df_copy1.withColumn("parsed_date", coalesce(date_eu, date_us, date_iso))

# count how many parsed successfully (i.e. non-null)
success_count = df2.filter(col("parsed_date").isNotNull()).count()
print(f"Successfully cast {success_count} dates")


In [None]:
# overwrite the old string column with the new DateType column
df2 = df2.drop("release_date").withColumnRenamed("parsed_date", "release_date")

# Re-assign back to df_copy1 
df_copy1 = df2

# Display the DataFrame with the parsed release_date column
display(df_copy1.limit(5))

In [None]:
# Print the schema to see the data type
df_copy1.select("release_date").printSchema()

%md
To support future analyses, we will engineer three new features based on the release date: day, month, and year.

In [None]:
# `release_date` is already a DateType (parsed from EU format)
df_copy1 = df_copy1.withColumn("release_day",   dayofmonth(col("release_date"))) \
       .withColumn("release_month", month(col("release_date")))   \
       .withColumn("release_year",  year(col("release_date")))

# Confirm the new columns
df_copy1.printSchema()
display(df_copy1.limit(5))


%md
We use a boxplot to visualize the distribution of our data and detect possible outliers.

In [None]:
df_pd = df_copy1.select("release_year").toPandas()

plt.figure(figsize=(6, 4))
sns.boxplot(x=df_pd["release_year"])
plt.title("Boxplot of Release Year")
plt.xlabel("Release Year")
plt.tight_layout()
plt.show()

%md
Based on the boxplot, multiple outliers are identified, with three showing a particularly strong deviation from the rest. As a result, we will set our lower threshold at January 1st, 1910.

In [None]:
# Check for future release dates (dates beyond today)
future_movies = df_copy1.filter(col("release_date") > current_date())

# Filter movies released before 1910-01-01
old_movies = df_copy1.filter(col("release_date") < lit("1910-01-01"))

# Count and print the result
count_future = future_movies.count()
print(f"Number of movies released after today: {count_future}")
count_old = old_movies.count()
print(f"Number of movies released before 1910: {count_old}")

In [None]:
# Display a few examples 
display(old_movies.limit(20))

In [None]:
# Display 30 movies with selected columns
display(future_movies.limit(30))

%md
Some movies have "released" status but `release_date` in the future. To avoid feeding our model with inconsistent values, we will set the date of these movies to null.

In [None]:
# Set future release dates to null for movies already marked as “released”
df2 = df_copy1.withColumn(
    "release_date",
    when(
        (col("status") == "Released") &
        (col("release_date") > current_date()),
        lit(None).cast("date")
    ).otherwise(col("release_date"))
)

# Reassign back to df_copy1 if needed
df_copy1 = df2

%md
To understand the relationship between this variable and our target, we will analyze how the `revenue` behaves throughout the month, across different months of a year, and over multiple years in our dataset.

In [None]:
def compute_avg_revenue(df, group_col):
    """
    Compute the average revenue grouped by a specified column.

    Parameters:
        df (DataFrame): Spark DataFrame containing a 'revenue' column.
        group_col (str): The column name to group by (e.g., 'release_day').

    Returns:
        pd.DataFrame: A Pandas DataFrame with the group column and average revenue.
    """
    return (
        df.groupBy(group_col)
          .agg(avg("revenue").alias("avg_revenue"))
          .orderBy(group_col)
          .toPandas()
    )

def plot_avg_revenue(data, x_col, title, xlabel):
    """
    Plot a line chart of average revenue by a given column.

    Parameters:
        data (pd.DataFrame): Pandas DataFrame with the x_col and 'avg_revenue'.
        x_col (str): Column name to use as the x-axis.
        title (str): Title of the chart.
        xlabel (str): Label for the x-axis.

    Returns:
        None
    """
    plt.figure(figsize=(6, 4))
    plt.plot(data[x_col], data["avg_revenue"])
    plt.xlabel(xlabel)
    plt.ylabel("Avg Revenue")
    plt.title(title)
    plt.tight_layout()
    plt.show()

In [None]:
# Compute grouped data
day_rev = compute_avg_revenue(df_copy1, "release_day")
month_rev = compute_avg_revenue(df_copy1, "release_month")
year_rev = compute_avg_revenue(df_copy1, "release_year")

# Plot the results
plot_avg_revenue(day_rev, "release_day", "Avg Revenue by Release Day", "Release Day of Month")
plot_avg_revenue(month_rev, "release_month", "Avg Revenue by Release Month", "Release Month")
plot_avg_revenue(year_rev, "release_year", "Avg Revenue by Release Year", "Release Year")


%md
Since there are only three movies released before 1910, and considering the distribution of the rest of the data, these entries can be treated as outliers. Let's kick them and plot the graphics again.

In [None]:
df_copy1 = df_copy1.filter(
    (col("release_date") >= to_date(lit("1910-01-01"))) | col("release_date").isNull())

In [None]:
# Compute grouped data
day_rev = compute_avg_revenue(df_copy1, "release_day")
month_rev = compute_avg_revenue(df_copy1, "release_month")
year_rev = compute_avg_revenue(df_copy1, "release_year")

# Plot the results
plot_avg_revenue(day_rev, "release_day", "Avg Revenue by Release Day", "Release Day of Month")
plot_avg_revenue(month_rev, "release_month", "Avg Revenue by Release Month", "Release Month")
plot_avg_revenue(year_rev, "release_year", "Avg Revenue by Release Year", "Release Year")

In [None]:
# To check the oldest and most recent films released
min_date = df_copy1.select(min("release_date")).collect()[0][0]
max_date = df_copy1.select(max("release_date")).collect()[0][0]

print(f"Oldest release date: {min_date}")
print(f"Most recent release date: {max_date}")

display(df_copy1.filter(col("release_date") == min_date))

display(df_copy1.filter(col("release_date") == max_date))

%md
The first chart shows the average revenue by the release day of the month. We can observe that movies released on the 7th day tend to perform the best in terms of revenue. In contrast, films released at the very beginning or end of the month tend to generate lower revenues on average. This may reflect strategic scheduling, as mid-month releases might benefit from better marketing alignment and less competition.

In the second chart, we analyze average revenue by release month. It becomes clear that movies released around the middle of the year, particularly in June and July, tend to perform significantly better. This pattern aligns with the global summer holiday season, when people are more likely to go to the cinema. Additionally, there is a second peak in revenue towards the end of the year, which is often linked to award-season releases aiming to qualify for events such as the Oscars or Cannes Festival.

The third chart presents a historical perspective, showing the evolution of average revenue by release year. There is a clear upward trend, indicating that movies have been generating increasingly higher revenues over time. This makes sense considering the industry's growing investment in production, improvements in visual and sound quality, the increasing scale of global distribution, and rising ticket prices.

%md
### RUNTIME

%md
We will check if there are any inconsistencies in this column.

In [None]:
# Summary of null, empty-string, "None"/"none", and zero counts for runtime
total = df_copy1.count()

nulls   = df_copy1.filter(col("runtime").isNull()).count()
empties = df_copy1.filter((col("runtime") == "") |(col("runtime") == " ")).count()
nones   = df_copy1.filter(col("runtime").isin("None", "none")).count()
zeros   = df_copy1.filter(col("runtime") == 0).count()

print(f"runtime: total={total}, nulls={nulls}, empty strings={empties}, 'None'/'none' strings={nones}, zero values={zeros}")

In [None]:
# Check for negative runtime values
negative_runtime_count = df_copy1.filter(col("runtime") < 0).count()
print(f"Number of movies with negative runtime: {negative_runtime_count}")

In [None]:
df_copy1.select("runtime").describe().show()

%md
Although some "movies" can indeed be shorter than a minute (for example, very old movies or ads), in order to clean up our dataset we will assume that movies with 0 value for runtime, actually have this column missing. We will set them to null.

In [None]:
# Count rows where runtime == 0 before replacing
zero_runtime_count = df_copy1.filter(col("runtime") == 0).count()
print(f"Rows with runtime = 0 before nulling: {zero_runtime_count}")

# Replace runtime == 0 with null
df_copy1 = df_copy1.withColumn(
    "runtime",
    when(col("runtime") == 0, None).otherwise(col("runtime"))
)

# Verify how many runtime nulls now (should include the replacements)
null_runtime_count = df_copy1.filter(col("runtime").isNull()).count()
print(f"Rows with runtime = null after replacement: {null_runtime_count}")


%md
outliers check

In [None]:
# Calculate Q1 and Q3 using approxQuantile
q1, q3 = df.approxQuantile("runtime", [0.25, 0.75], 0.01)
iqr = q3 - q1

# Calculate bounds
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr

# Identify outliers
outliers_runtime = df_copy1.filter((col("runtime") < lower_bound) | (col("runtime") > upper_bound))
df_no_outliers = df_copy1.filter((col("runtime") >= lower_bound) & (col("runtime") <= upper_bound))

# Display the number of outliers
print(f"Movies with outlier runtime values: {outliers_runtime.count()}")

In [None]:
# Whiskers plot to visualize typical runtime and outliers
runtime_pd = df_copy1.select("runtime").toPandas()

plt.figure(figsize=(10, 6))
sns.boxplot(x=runtime_pd['runtime'], color='skyblue')

plt.title("Runtime Distribution with Outliers", fontsize=16)
plt.xlabel("Runtime (Minutes)", fontsize=12)

plt.show()

%md
We can see that some movies in our dataset have extremely long runtimes, with one reaching almost 1000 minutes, that’s nearly 17 hours. This is clearly unrealistic, considering that the average movie length is around 92 minutes (approximately 1 hour and 32 minutes). These values suggest that some of the runtime data may be incorrect or improperly recorded.

%md
Without outliers:

In [None]:
# Convert to pandas DataFrame
df_no_outliers_pd = df_no_outliers.select("runtime").toPandas()

plt.figure(figsize=(10, 6))
sns.boxplot(x=df_no_outliers_pd['runtime'], color='skyblue')

plt.title("Runtime Distribution without Outliers", fontsize=16)
plt.xlabel("Runtime (Minutes)", fontsize=12)

plt.show()

In [None]:
# Count the number of films with a runtime exceeding 4 hours (240 minutes)
count_above_4h = df_copy1.filter(col("runtime") > 240).count()
print(f"Number of movies with runtime greater than 4 hours: {count_above_4h}")

In [None]:
# total number of observations
total_count = df_copy1.count()

# Count the number of films with a runtime exceeding 4 hours (240 minutes)
above_4h_count = df_copy1.filter(col("runtime") > 240).count()

percentage = (above_4h_count / total_count) * 100

print(f"Number of movies with runtime greater than 4 hours: {count_above_4h} which represents {percentage:.2f}% of the dataset.")

%md
According to the Academy of Motion Picture Arts and Sciences (Oscars), a feature film must have a minimum runtime of 40 minutes, with no defined maximum duration. However, when applying the Interquartile Range (IQR) method to detect outliers in our dataset, we identified over 2,500 movies as outliers. Removing such a large portion of data would significantly reduce the amount of useful information available for building our model.

To avoid this loss, and considering that our dataset includes not only feature films but also short films and documentaries, which are also eligible for international awards and nominations, we will define a custom threshold. We will consider valid all movies with a runtime greater than 0 minutes and less than or equal to 240 minutes (4 hours). This range allows us to maintain a clean dataset while still preserving valuable content diversity.

By setting this custom threshold, we are discarding only 85 observations, which represent less thann 1% of the entire dataset. This minimal exclusion helps maintain both data quality and volume

In [None]:
df_outliers_4h = df_copy1.filter((col("runtime") <= 240) | col("runtime").isNull())

# Convert to pandas DataFrame
df_outliers_4h_pd = df_outliers_4h.select("runtime").toPandas()

plt.figure(figsize=(10, 6))
sns.boxplot(x=df_outliers_4h_pd['runtime'], color='skyblue')

plt.title("Runtime Distribution without Outliers", fontsize=16)
plt.xlabel("Runtime (Minutes)", fontsize=12)

plt.show()

In [None]:
# to remove the outliers
df_copy1 = df_copy1.filter((col("runtime") <= 240) | col("runtime").isNull())

%md
### POPULARITY

In [None]:
# Summary of null, empty-string, "None"/"none", and zero counts for 'popularity' column
total = df_copy1.count()

nulls   = df_copy1.filter(col("popularity").isNull()).count()
empties = df_copy1.filter((col("popularity") == "") |(col("popularity") == " ")).count()
nones   = df_copy1.filter(col("popularity").isin("None", "none")).count()
zeros   = df_copy1.filter(col("popularity") == "0").count()

print(f"popularity: total={total}, nulls={nulls}, empty strings={empties}, 'None'/'none' strings={nones}, zero values={zeros}")

%md
Can `popularity` be zero? Let's display some examples of rows with `popularity` = 0:

In [None]:
# Count how many movies have popularity == 0
zero_pop_count = df_copy1.filter(col("popularity") == 0).count()
print(f"Number of movies with popularity = 0: {zero_pop_count}")

# Display some examples of movies with popularity == 0 
display(
    df_copy1
    .filter(col("popularity") == 0)
    .limit(10)
)

%md
Through these examples, we can clearly observe that films with a popularity score of zero are highly irregular. Many of them have either extremely low or no vote counts, display unrealistic budget values, and most of their categorical (descriptive) features are null. As a result, these entries provide little to no meaningful information and represent films with virtually no traceable data.

In [None]:
# Calculate summary statistics for popularity
stats = df_copy1.agg(
    F.min("popularity").alias("min_popularity"),
    F.expr("percentile_approx(popularity, 0.5)").alias("median_popularity"),
    F.avg("popularity").alias("average_popularity"),
    F.max("popularity").alias("max_popularity")
).collect()[0]

print(f"Min popularity     : {stats['min_popularity']}")
print(f"Median popularity  : {stats['median_popularity']}")
print(f"Average popularity : {stats['average_popularity']:.2f}")
print(f"Max popularity     : {stats['max_popularity']}")

In [None]:
# Filter Spark DataFrame
pop_filtered = df_copy1.filter(df_copy1['popularity'] > 0)

# Convert to Pandas
pop_filtered_pd = pop_filtered.select('popularity').toPandas()

# Plot
plt.figure(figsize=(10, 6))
sns.histplot(pop_filtered_pd['popularity'], kde=True, color='skyblue', bins=30)

plt.xlim(0, 300)
plt.title("Distribution of Popularity", fontsize=16)
plt.xlabel("Popularity", fontsize=12)
plt.ylabel("Frequency", fontsize=12)

plt.show()

%md
let's analyse the outliers

In [None]:
plt.figure(figsize=(8, 5))
sns.boxplot(x=pop_filtered_pd["popularity"], color='skyblue')
plt.title("Boxplot of Popularity", fontsize=16)
plt.xlabel("Popularity", fontsize=12)
plt.tight_layout()
plt.show()

In [None]:
# count the total films
total_movies = df_copy1.count()

# # Count the number of films with a popularity > 100
count_above_100 = df_copy1.filter(col("popularity") > 100).count()

# percentage
percentage_above_100 = (count_above_100 / total_movies) * 100

print(f"Number of movies with popularity greater than 100: {count_above_100}")
print(f"That represents approximately {percentage_above_100:.2f}% of the dataset.")

In [None]:
pop_filtered_100_pd = df_copy1.filter(col("popularity") <= 100).select("popularity").toPandas()

plt.figure(figsize=(8, 5))
sns.boxplot(x=pop_filtered_100_pd["popularity"], color='skyblue')
plt.title("Boxplot of Popularity (≤ 100)", fontsize=16)
plt.xlabel("Popularity", fontsize=12)
plt.tight_layout()
plt.show()

%md
If we consider a threshold of 100 for the popularity variable,  treating it as if it were scaled similarly to a percentage (0 to 100), we would be discarding 125 observations, which represent less than 1% of our dataset. As such, their presence has minimal impact on the overall analysis, making their removal both justified and necessary.

In the second boxplot, we can observe the distribution of films according to this new popularity threshold, which helps us better visualize the central tendency and variability of this feature without the distortion caused by extreme values

In [None]:
# to remove them
df_copy1 = df_copy1.filter((col("popularity") <= 100) | col("popularity").isNull())

In [None]:
# to chck again our new stats
stats = df_copy1.agg(
    F.min("popularity").alias("min_popularity"),
    F.expr("percentile_approx(popularity, 0.5)").alias("median_popularity"),
    F.avg("popularity").alias("average_popularity"),
    F.max("popularity").alias("max_popularity")
).collect()[0]

print(f"Min popularity     : {stats['min_popularity']}")
print(f"Median popularity  : {stats['median_popularity']}")
print(f"Average popularity : {stats['average_popularity']:.2f}")
print(f"Max popularity     : {stats['max_popularity']}")

%md
## Save the DataSet

In [None]:
df = df_copy1

In [None]:
# to save our dataset
df.write.mode("overwrite").parquet("/FileStore/tables/Imdb_Movie_Dataset-4.csv")
display(df)