In [0]:
df = spark.read.csv("/Volumes/workspace/default/netflix/netflix_titles.csv", header=True, inferSchema=True)


In [0]:
df.printSchema()
df.show(5)
print(f"Initial dataset shape: {df.count()} rows, {len(df.columns)} columns")

root
 |-- show_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- title: string (nullable = true)
 |-- director: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- country: string (nullable = true)
 |-- date_added: string (nullable = true)
 |-- release_year: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- listed_in: string (nullable = true)
 |-- description: string (nullable = true)

+-------+-------+--------------------+---------------+--------------------+-------------+------------------+------------+------+---------+--------------------+--------------------+
|show_id|   type|               title|       director|                cast|      country|        date_added|release_year|rating| duration|           listed_in|         description|
+-------+-------+--------------------+---------------+--------------------+-------------+------------------+------------+------+---------+-------------------

In [0]:
from pyspark.sql.functions import col, isnan, when, count
missing_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
print("Missing values per column:")
missing_counts.show()

Missing values per column:
+-------+----+-----+--------+----+-------+----------+------------+------+--------+---------+-----------+
|show_id|type|title|director|cast|country|date_added|release_year|rating|duration|listed_in|description|
+-------+----+-----+--------+----+-------+----------+------------+------+--------+---------+-----------+
|      0|   1|    2|    2636| 826|    832|        13|           2|     6|       5|        3|          3|
+-------+----+-----+--------+----+-------+----------+------------+------+--------+---------+-----------+



In [0]:
initial_count = df.count()
df = df.dropDuplicates()
final_count = df.count()
print(f"Removed {initial_count - final_count} duplicate rows")

Removed 0 duplicate rows


In [0]:
df = df.fillna({
    "country": "Unknown",
    "director": "Unknown", 
    "cast": "Unknown",
    "rating": "Not Rated"
})
print("Missing values handled for country, director, cast, and rating")

Missing values handled for country, director, cast, and rating


In [0]:
from pyspark.sql.functions import trim, lower, col, initcap

df = df.withColumn("title", trim(col("title"))) \
       .withColumn("type", trim(lower(col("type")))) \
       .withColumn("country", trim(initcap(col("country")))) \
       .withColumn("rating", trim(col("rating"))) \
       .withColumn("director", trim(col("director"))) \
       .withColumn("cast", trim(col("cast"))) \
       .withColumn("listed_in", trim(col("listed_in"))) \
       .withColumn("date_added", trim(col("date_added")))  # Just clean, don't convert

print("Text columns cleaned and standardized")

Text columns cleaned and standardized


In [0]:
from pyspark.sql.functions import expr, when, regexp_extract
df = df.withColumn("duration_int", 
    expr("try_cast(regexp_extract(duration, '([0-9]+)', 1) AS INT)"))

df = df.withColumn("duration_type", 
    when(col("duration").contains("min"), "minutes")
    .when(col("duration").contains("Season"), "seasons")
    .otherwise("unknown"))

df = df.withColumn("release_year", expr("try_cast(trim(release_year) AS INT)"))

print("Duration and release year converted successfully")
df.select("duration", "duration_int", "duration_type", "release_year").show(10)

Duration and release year converted successfully
+---------+------------+-------------+------------+
| duration|duration_int|duration_type|release_year|
+---------+------------+-------------+------------+
|  104 min|         104|      minutes|        2016|
|  135 min|         135|      minutes|        2013|
|3 Seasons|           3|      seasons|        2020|
| 1 Season|           1|      seasons|        2009|
|   58 min|          58|      minutes|        2021|
| 1 Season|           1|      seasons|        2018|
| 1 Season|           1|      seasons|        2017|
|  106 min|         106|      minutes|        2021|
|   47 min|          47|      minutes|        2021|
|5 Seasons|           5|      seasons|        2021|
+---------+------------+-------------+------------+
only showing top 10 rows


In [0]:
from pyspark.sql.functions import regexp_extract

# Extract year from date_added string (safer than date parsing)
df = df.withColumn("added_year", 
    expr("try_cast(regexp_extract(date_added, '([0-9]{4})', 1) AS INT)"))

# Extract month name and convert to number
df = df.withColumn("added_month", 
    when(col("date_added").contains("January"), 1)
    .when(col("date_added").contains("February"), 2)
    .when(col("date_added").contains("March"), 3)
    .when(col("date_added").contains("April"), 4)
    .when(col("date_added").contains("May"), 5)
    .when(col("date_added").contains("June"), 6)
    .when(col("date_added").contains("July"), 7)
    .when(col("date_added").contains("August"), 8)
    .when(col("date_added").contains("September"), 9)
    .when(col("date_added").contains("October"), 10)
    .when(col("date_added").contains("November"), 11)
    .when(col("date_added").contains("December"), 12)
    .otherwise(None))

print("Extracted year and month from date_added (kept original as string)")

Extracted year and month from date_added (kept original as string)


In [0]:
from pyspark.sql.functions import current_date, year

# Get current year
current_year = int(df.select(year(current_date())).first()[0])

print("Applying data validation filters...")
before_filter = df.count()

# Filter invalid release years
df = df.filter((col("release_year") >= 1900) & (col("release_year") <= current_year))

# Filter invalid durations
df = df.filter(col("duration_int").isNotNull() & (col("duration_int") > 0))

# Remove rows where essential fields are missing
df = df.filter(col("title").isNotNull() & (col("title") != ""))

# Filter invalid added_year
df = df.filter(col("added_year").isNull() | 
               ((col("added_year") >= 2000) & (col("added_year") <= current_year)))

after_filter = df.count()
print(f"Removed {before_filter - after_filter} invalid records")

Applying data validation filters...
Removed 25 invalid records


In [0]:

print("\nFinal Schema:")
df.printSchema()

print("\nUnique content types:")
df.select("type").distinct().show()

print("\nUnique ratings:")
df.select("rating").distinct().show()

print("\nDuration types:")
df.select("duration_type").distinct().show()

print("\nAdded year range:")
df.select("added_year").describe().show()




=== DATA QUALITY SUMMARY ===

Final Schema:
root
 |-- show_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- title: string (nullable = true)
 |-- director: string (nullable = false)
 |-- cast: string (nullable = false)
 |-- country: string (nullable = false)
 |-- date_added: string (nullable = true)
 |-- release_year: integer (nullable = true)
 |-- rating: string (nullable = false)
 |-- duration: string (nullable = true)
 |-- listed_in: string (nullable = true)
 |-- description: string (nullable = true)
 |-- duration_int: integer (nullable = true)
 |-- duration_type: string (nullable = false)
 |-- added_year: integer (nullable = true)
 |-- added_month: integer (nullable = true)


Unique content types:
+-------+
|   type|
+-------+
|tv show|
|  movie|
+-------+


Unique ratings:
+---------+
|   rating|
+---------+
|    TV-14|
|     TV-Y|
|    TV-PG|
|    TV-Y7|
|        G|
|    TV-MA|
|        R|
|       PG|
|     TV-G|
|    PG-13|
|    NC-17|
| TV-Y7-FV|
|Not Rated

In [0]:
print("\n=== FINAL DATASET SUMMARY ===")
print(f"Final dataset shape: {df.count()} rows, {len(df.columns)} columns")

# Show sample of cleaned data
print("\nSample of cleaned data:")
df.select("title", "type", "release_year", "rating", "duration_int", "added_year", "added_month").show(10)

print("Data cleaning completed successfully!")


=== FINAL DATASET SUMMARY ===
Final dataset shape: 8784 rows, 16 columns

Sample of cleaned data:
+--------------------+-------+------------+------+------------+----------+-----------+
|               title|   type|release_year|rating|duration_int|added_year|added_month|
+--------------------+-------+------------+------+------------+----------+-----------+
|             Görümce|  movie|        2016| TV-MA|         104|      2020|          4|
|     Chennai Express|  movie|        2013| TV-14|         135|      2021|          8|
|The Last Kids on ...|tv show|        2020| TV-Y7|           3|      2020|         10|
|   Boys Over Flowers|tv show|        2009| TV-14|           1|      2020|          5|
|Searching For Sheela|  movie|        2021| TV-14|          58|      2021|          4|
|  Peasants Rebellion|tv show|        2018| TV-14|           1|      2021|          4|
|Strongest Deliver...|tv show|        2017| TV-MA|           1|      2020|         11|
|You vs. Wild: Out...|  movie| 

In [0]:
df.write.mode('overwrite').option("header", "true").csv("/Volumes/workspace/default/netflix/cleaned_netflix_titles")