In [1]:
import pandas as pd
import re
import csv
import os

def clean_csv_for_spark(input_filepath, output_filepath):
    """
    Reads a CSV file, cleans and wraps the 'description' column, and saves the result
    to a new CSV file.

    Args:
        input_filepath (str): The path to the original CSV file.
        output_filepath (str): The path to save the cleaned CSV file.
    """
    if not os.path.exists(input_filepath):
        print(f"Error: Input file not found at '{input_filepath}'")
        return

    try:
        df = pd.read_csv(input_filepath)

        if 'description' not in df.columns:
            print("Error: 'description' column not found in the CSV.")
            return

        def clean_and_wrap_text(text):
            """Cleans a text string and wraps it with '$$$'."""
            if not isinstance(text, str):
                return ""

            text = text.replace('"', "'")
            text = re.sub(r'[\n\t\r]', ' ', text)
            text = re.sub(r'\s+', ' ', text).strip()

            if text:
                return f"$$${text}$$$"
            return ""

        df['description'] = df['description'].apply(clean_and_wrap_text)
        df['cast'] = df['cast'].apply(clean_and_wrap_text)
        df['listed_in'] = df['listed_in'].apply(clean_and_wrap_text)
        df['title'] = df['title'].apply(clean_and_wrap_text)
        df['director'] = df['director'].apply(clean_and_wrap_text)



        df.to_csv(output_filepath, index=False, quoting=csv.QUOTE_ALL)

        print(f"✅ Successfully cleaned and wrapped the 'description' column!")
        print(f"Cleaned data saved to: '{output_filepath}'")

    except Exception as e:
        print(f"An error occurred during the process: {e}")

# Define the input and output file names
original_file = 'netflix_titles.csv'
cleaned_file = 'part_1_cleaned_and_wrapped.csv'

# Run the cleaning and wrapping function
clean_csv_for_spark(original_file, cleaned_file)

✅ Successfully cleaned and wrapped the 'description' column!
Cleaned data saved to: 'part_1_cleaned_and_wrapped.csv'


#IMPORTS


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp,  regexp_extract,when, isnull, sum as _sum
from pyspark.sql.types import ShortType
from pyspark.sql.functions import col, when, count, desc, split, explode, regexp_extract, avg, round,countDistinct,trim,dense_rank
from pyspark.sql.window import Window

# 1. Initialize a SparkSession

In [3]:
spark = SparkSession.builder.appName("NetflixDataCleaning").getOrCreate()

# 2. Load the dataset

In [4]:
# Read the CSV with custom delimiter and escape handling

df = spark.read.csv(
    "part_1_cleaned_and_wrapped.csv",  # Or your latest fixed file
    header=True,
    sep=",",
    quote='"',
    escape='\\',
    multiLine=True,
    mode="PERMISSIVE"
)

# Print the schema to verify it's loaded correctly
print("Original Schema:")
df.printSchema()

df_t0 = df.withColumn(
    "description",
    regexp_extract(col("description"), r"\$\$\$(.*?)\$\$\$", 1)
).withColumn(
    "cast",
    regexp_extract(col("cast"), r"\$\$\$(.*?)\$\$\$", 1)
).withColumn(
    "listed_in",
    regexp_extract(col("listed_in"), r"\$\$\$(.*?)\$\$\$", 1)
).withColumn(
    "title",
    regexp_extract(col("title"), r"\$\$\$(.*?)\$\$\$", 1)
).withColumn(
    "director",
    regexp_extract(col("director"), r"\$\$\$(.*?)\$\$\$", 1)
)

Original Schema:
root
 |-- show_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- title: string (nullable = true)
 |-- director: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- country: string (nullable = true)
 |-- date_added: string (nullable = true)
 |-- release_year: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- listed_in: string (nullable = true)
 |-- description: string (nullable = true)



# **--- Data Cleaning ---**

# 3. Handle Missing Values

In [5]:
# Fill 'director', 'cast', and 'country' with "Unknown"
df_t1 = df_t0.na.fill("Unknown", subset=['director', 'cast', 'country'])

In [6]:
# Drop rows where 'date_added' or 'rating' are null
df_t2 = df_t1.na.drop(subset=['date_added', 'rating'])

# 4. Correct Data Types

In [7]:
# Convert 'date_added' from string to timestamp
df_t3 = df_t2.withColumn(
    "date_added_ts",
    to_timestamp(col("date_added"), "MMMM d, yyyy")
).drop("date_added").withColumnRenamed("date_added_ts", "date_added")

# 5. Remove Duplicates

In [8]:
df_t4 = df_t3.dropDuplicates()
df_t4.printSchema()

root
 |-- show_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- title: string (nullable = true)
 |-- director: string (nullable = false)
 |-- cast: string (nullable = false)
 |-- country: string (nullable = false)
 |-- release_year: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- listed_in: string (nullable = true)
 |-- description: string (nullable = true)
 |-- date_added: timestamp (nullable = true)



# **--- Optimization ---**

# 6. Optimize Data Types

In [9]:
# Cast 'release_year' to a smaller integer type (ShortType is like int16)
df_t5 = df_t4.withColumn(
    "release_year",
    col("release_year").cast(ShortType())
)

# 7. Save the Cleaned Data

In [10]:
df_t5.coalesce(1).write.csv(
    "cleaned_netflix_spark_output",
    header=True,
    mode="overwrite" # Overwrites the directory if it exists
)

In [12]:
# Example of writing data with both partitioning and bucketing
df_t5.write.partitionBy("type", "release_year") \
    .bucketBy(16, "show_id") \
    .sortBy("show_id") \
    .saveAsTable("netflix_optimized")

In [16]:
# Let's give it a clear name for this set of queries
df = spark.table("netflix_optimized")



# **--- User Story ---**

# 1. what type of movies has highest duration (more than 1 hr)? total number of movies of that type




In [17]:
# Filter movies with duration more than 60 minutes
movies_over_60min = df.filter((col("type") == "Movie") & (col("duration").endswith("min")))

# Extract numeric duration and filter those greater than 60
movies_over_60min = movies_over_60min.withColumn("duration_min", split(col("duration"), " ").getItem(0).cast("int"))
movies_over_60min = movies_over_60min.filter(col("duration_min") > 60)

# Split 'listed_in' into individual genres and explode
movies_exploded = movies_over_60min.withColumn("genre", explode(split(col("listed_in"), ", ")))

# Group by genre and count
genre_counts = movies_exploded.groupBy("genre").count()

# Find the genre with the highest count
top_genre = genre_counts.orderBy(col("count").desc()).limit(1)

# Show the result
top_genre.show()


+--------------------+-----+
|               genre|count|
+--------------------+-----+
|International Movies| 2672|
+--------------------+-----+



# 2. Which country has the highest tv shows, by show_id wise, with description.

In [None]:
tv_shows_df = df.filter(col("type") == "TV Show")

# Group by country and count distinct show_id
country_tv_counts = tv_shows_df.groupBy("country").agg(countDistinct("show_id").alias("tv_show_count"))

# Find the country with the highest number of TV shows
top_country = country_tv_counts.orderBy(desc("tv_show_count")).first()["country"]

# Filter the TV shows from the top country
top_country_tv_shows = tv_shows_df.filter(col("country") == top_country).select("show_id", "title", "description")

# Show the result
top_country, top_country_tv_shows.show(truncate=False)


# Optimized: 2. Query Which country has the highest tv shows, by show_id wise, with description.

In [None]:
# Define a window to calculate the count of shows for each country
country_window = Window.partitionBy("country")

# Add a column with the count of TV shows for each country
tv_shows_with_counts = df.filter(col("type") == "TV Show") \
                         .withColumn("tv_show_count", count("show_id").over(country_window))

# Define a window to rank the countries based on their show count
rank_window = Window.orderBy(desc("tv_show_count"))

# Add a rank column, filter for the top rank (rank=1), and select the final columns
top_country_tv_shows = tv_shows_with_counts.withColumn("rank", dense_rank().over(rank_window)) \
                                           .filter(col("rank") == 1) \
                                           .select("show_id", "title", "description")

# Show the result. This is the only action, triggering a single, optimized job.
top_country_tv_shows.show(truncate=False)

# Stop the Spark session
spark.stop()

# 3. What type of movies released in 2021?

In [None]:
# Filter for movies released in 2021
movies_2021 = df.filter((col("type") == "Movie") & (col("release_year") == 2021))

# Select and get distinct movie types from 'listed_in'
movie_types_2021 = movies_2021.select("listed_in").distinct()

# Show the result
movie_types_2021.show(truncate=False)


# 4. In which country highest movies were released?

In [None]:
# Filter only movies
movies_df = df.filter(col("type") == "Movie")

# Split multiple countries and explode into separate rows
movies_by_country = movies_df.withColumn("country", explode(split(col("country"), ",")))

# Trim whitespace from country names
movies_by_country = movies_by_country.withColumn("country", trim(col("country")))

# Group by country and count the number of movies
country_movie_counts = movies_by_country.groupBy("country").count()

# Find the country with the highest number of movies
top_country = country_movie_counts.orderBy(col("count").desc()).first()


print(top_country)

# 5. Which country has highest comedy movie category? And the total number of comedy movies, by show id.

In [None]:
# Filter for comedy movies
comedy_movies = df.filter(
    (col("type") == "Movie") &
    (col("listed_in").contains("Comedies")) &
    (col("country").isNotNull())
)

# Group by country and count unique show_id
comedy_count_by_country = comedy_movies.groupBy("country").count()

# Find the country with the highest number of comedy movies
top_country = comedy_count_by_country.orderBy(col("count").desc()).limit(1)

# Show the result
top_country.show()
