In [0]:
spark

<pyspark.sql.connect.session.SparkSession at 0xff85996ce4b0>

In [0]:
flight_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("mode", "PERMISSIVE") \
    .load("dbfs:/Volumes/workspace/default/task2/netflix_titles_CLEANED.csv")

flight_df.show(5)


+-------+-------+--------------------+---------------+--------------------+-------------+------------------+------------+------+---------+--------------------+--------------------+
|show_id|   type|               title|      directors|                cast|    countries|        date_added|release_year|rating| duration|           listed_in|         description|
+-------+-------+--------------------+---------------+--------------------+-------------+------------------+------------+------+---------+--------------------+--------------------+
|     s1|  Movie|Dick Johnson Is Dead|Kirsten Johnson|                NULL|United States|September 25, 2021|        2020| PG-13|   90 min|       Documentaries|As her father nea...|
|     s2|TV Show|       Blood & Water|           NULL|Ama Qamata, Khosi...| South Africa|September 24, 2021|        2021| TV-MA|2 Seasons|International TV ...|After crossing pa...|
|     s3|TV Show|           Ganglands|Julien Leclercq|Sami Bouajila, Tr...|         NULL|Septem

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType


In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Create a schema that includes the corrupt record column
my_schema = StructType([
    StructField("show_id", StringType(), True),
    StructField("type", StringType(), True),
    StructField("title", StringType(), True),
    StructField("director", StringType(), True),
    StructField("cast", StringType(), True),
    StructField("country", StringType(), True),
    StructField("date_added", StringType(), True),
    StructField("release_year", IntegerType(), True),
    StructField("rating", StringType(), True),
    StructField("duration", StringType(), True),
    StructField("listed_in", StringType(), True),
    StructField("description", StringType(), True),
    StructField("_corrupt_record", StringType(), True)  # Add this at the end
])


In [0]:
flight_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("mode", "PERMISSIVE") \
    .schema(my_schema) \
    .load("dbfs:/Volumes/workspace/default/task2/netflix_titles_CLEANED.csv")

flight_df.show(5, truncate=False)


+-------+-------+---------------------+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+------------------+------------+------+---------+-------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+
|show_id|type   |title                |director       |cast                                                                                                                                                                                                                                                                                               

In [0]:
flight_df.show(5)
flight_df.printSchema()


+-------+-------+--------------------+---------------+--------------------+-------------+------------------+------------+------+---------+--------------------+--------------------+---------------+
|show_id|   type|               title|       director|                cast|      country|        date_added|release_year|rating| duration|           listed_in|         description|_corrupt_record|
+-------+-------+--------------------+---------------+--------------------+-------------+------------------+------------+------+---------+--------------------+--------------------+---------------+
|     s1|  Movie|Dick Johnson Is Dead|Kirsten Johnson|                NULL|United States|September 25, 2021|        2020| PG-13|   90 min|       Documentaries|As her father nea...|           NULL|
|     s2|TV Show|       Blood & Water|           NULL|Ama Qamata, Khosi...| South Africa|September 24, 2021|        2021| TV-MA|2 Seasons|International TV ...|After crossing pa...|           NULL|
|     s3|TV Sho

In [0]:
flight_df.groupBy("type").count().show()


+-------------+-----+
|         type|count|
+-------------+-----+
|      TV Show| 2676|
|        Movie| 6131|
|         NULL|    1|
|William Wyler|    1|
+-------------+-----+



In [0]:
flight_df.columns


['show_id',
 'type',
 'title',
 'director',
 'cast',
 'country',
 'date_added',
 'release_year',
 'rating',
 'duration',
 'listed_in',
 'description']

In [0]:
len(flight_df.columns)


12

In [0]:
flight_df.filter(flight_df["_corrupt_record"].isNotNull()).show(truncate=False)

+-------+-------+----------------------------------------------------+-------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------+------------------------------+------------+-----------------------+-------------+-----------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
manual_schema = StructType([
    StructField("show_id", StringType(), True),
    StructField("type", StringType(), True),
    StructField("title", StringType(), True),
    StructField("director", StringType(), True),
    StructField("cast", StringType(), True),
    StructField("country", StringType(), True),
    StructField("date_added", StringType(), True),
    StructField("release_year", IntegerType(), True),
    StructField("rating", StringType(), True),
    StructField("duration", StringType(), True),
    StructField("listed_in", StringType(), True),
    StructField("description", StringType(), True)
])


In [0]:
flight_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("mode", "PERMISSIVE") \
    .schema(manual_schema) \
    .load("dbfs:/Volumes/workspace/default/task2/netflix_titles_CLEANED.csv")
#correct the schema code

In [0]:
from pyspark.sql.functions import col

flight_df.select(
    col("title").alias("Movie_Title"),
    col("country").alias("Production_Country"),
    col("rating")
).show(5, truncate=False)


+---------------------+------------------+------+
|Movie_Title          |Production_Country|rating|
+---------------------+------------------+------+
|Dick Johnson Is Dead |United States     |PG-13 |
|Blood & Water        |South Africa      |TV-MA |
|Ganglands            |NULL              |TV-MA |
|Jailbirds New Orleans|NULL              |TV-MA |
|Kota Factory         |India             |TV-MA |
+---------------------+------------------+------+
only showing top 5 rows


In [0]:
filtered_df = flight_df.filter(
    (col("type") == "Movie") & (col("release_year") > 2019)
)
filtered_df.show(5, truncate=False)


+-------+-----+--------------------------------+-----------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+------------------+------------+------+--------+----------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------+
|show_id|type |title                           |director                     |cast                                                                                                                                                                   |country                |date_added        |release_year|rating|duration|listed_in                         |description                                                                                                                                

In [0]:
from pyspark.sql.functions import lit #column literal

flight_df = flight_df.withColumn("Source", lit("Netflix"))
flight_df.select("title", "Source").show(5, truncate=False)


+---------------------+-------+
|title                |Source |
+---------------------+-------+
|Dick Johnson Is Dead |Netflix|
|Blood & Water        |Netflix|
|Ganglands            |Netflix|
|Jailbirds New Orleans|Netflix|
|Kota Factory         |Netflix|
+---------------------+-------+
only showing top 5 rows


In [0]:
from pyspark.sql.functions import when #derived col

flight_df = flight_df.withColumn(
    "content_type",
    when(col("duration").like("%Season%"), "Series").otherwise("Movie")
)
flight_df.select("title", "duration", "content_type").show(5, truncate=False)


+---------------------+---------+------------+
|title                |duration |content_type|
+---------------------+---------+------------+
|Dick Johnson Is Dead |90 min   |Movie       |
|Blood & Water        |2 Seasons|Series      |
|Ganglands            |1 Season |Series      |
|Jailbirds New Orleans|1 Season |Series      |
|Kota Factory         |2 Seasons|Series      |
+---------------------+---------+------------+
only showing top 5 rows


In [0]:
renamed_df = flight_df.withColumnRenamed("listed_in", "Genre") \
                      .withColumnRenamed("date_added", "Added_On")
renamed_df.printSchema()


root
 |-- show_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- title: string (nullable = true)
 |-- director: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- country: string (nullable = true)
 |-- Added_On: string (nullable = true)
 |-- release_year: integer (nullable = true)
 |-- rating: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- description: string (nullable = true)
 |-- Source: string (nullable = false)
 |-- content_type: string (nullable = false)



In [0]:
casted_df = flight_df.withColumn("release_year", col("release_year").cast("string"))
casted_df.printSchema()
#chnage relese year to sting from int   

root
 |-- show_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- title: string (nullable = true)
 |-- director: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- country: string (nullable = true)
 |-- date_added: string (nullable = true)
 |-- release_year: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- listed_in: string (nullable = true)
 |-- description: string (nullable = true)
 |-- Source: string (nullable = false)
 |-- content_type: string (nullable = false)



In [0]:
clean_df = flight_df.drop("description", "cast") #delee thes both
clean_df.printSchema()


root
 |-- show_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- title: string (nullable = true)
 |-- director: string (nullable = true)
 |-- country: string (nullable = true)
 |-- date_added: string (nullable = true)
 |-- release_year: integer (nullable = true)
 |-- rating: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- listed_in: string (nullable = true)
 |-- Source: string (nullable = false)
 |-- content_type: string (nullable = false)



In [0]:
from pyspark.sql.functions import col, sum, when

null_counts = flight_df.select([
    sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in flight_df.columns
])

null_counts.show()
#null vals count code


+-------+----+-----+--------+----+-------+----------+------------+------+--------+---------+-----------+------+------------+
|show_id|type|title|director|cast|country|date_added|release_year|rating|duration|listed_in|description|Source|content_type|
+-------+----+-----+--------+----+-------+----------+------------+------+--------+---------+-----------+------+------------+
|      0|   1|    2|    2636| 826|    832|        13|          22|     6|       5|        3|          3|     0|           0|
+-------+----+-----+--------+----+-------+----------+------------+------+--------+---------+-----------+------+------------+



In [0]:
flight_df.filter(" or ".join([f"{c} is null" for c in flight_df.columns])).show(truncate=False)
#print rows will null val

+-------+-------+---------------------------------------------------+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+------------------+------------+------+---------+------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+------------+
|show_id|type   |title               

In [0]:
cleaned_df = flight_df.na.drop()
#ddrop null vals

In [0]:
filled_df = flight_df.fillna({
    "director": "Unknown Director",
    "cast": "Not Available",
    "country": "Unknown",
    "date_added": "N/A",
    "rating": "Unrated"
})
filled_df.show(5)
#fill null

+-------+-------+--------------------+----------------+--------------------+-------------+------------------+------------+------+---------+--------------------+--------------------+-------+------------+
|show_id|   type|               title|        director|                cast|      country|        date_added|release_year|rating| duration|           listed_in|         description| Source|content_type|
+-------+-------+--------------------+----------------+--------------------+-------------+------------------+------------+------+---------+--------------------+--------------------+-------+------------+
|     s1|  Movie|Dick Johnson Is Dead| Kirsten Johnson|       Not Available|United States|September 25, 2021|        2020| PG-13|   90 min|       Documentaries|As her father nea...|Netflix|       Movie|
|     s2|TV Show|       Blood & Water|Unknown Director|Ama Qamata, Khosi...| South Africa|September 24, 2021|        2021| TV-MA|2 Seasons|International TV ...|After crossing pa...|Netflix

In [0]:
filled_df.write \
    .format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save("dbfs:/Volumes/workspace/default/task2/cleaned_netflix_data_csv")
#update df using spark write operation

In [0]:
filled_df.write \
    .format("parquet") \
    .mode("overwrite") \
    .save("dbfs:/Volumes/workspace/default/task2/cleaned_netflix_data_parquet")
#same abuve better for large dataset