# PySpark Essentials - Hands-On Exercises

In [26]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, when, length, trim, lower

# Create SparkSession - entry point para sa PySpark
spark = SparkSession.builder \
    .appName("Netflix Data Analysis") \
    .getOrCreate()

print("✅ SparkSession created successfully!")
print(f"Spark Version: {spark.version}")

✅ SparkSession created successfully!
Spark Version: 4.1.1


# 1. Load Data into PySpark DataFrame

In [27]:
# 1. Load the netflix_titles.csv - treating as "large CSV" for this exercise
# gamit spark.read.csv() para mag-load, header=True means first row is column names
# inferSchema=True para automatic detect ng data types
df_spark = spark.read.csv('/root/netflix_titles.csv', header=True, inferSchema=True)

# Display basic info about the DataFrame
print("=== PySpark DataFrame Info ===")
print(f"Total rows: {df_spark.count()}")
print(f"Total columns: {len(df_spark.columns)}")
print("\n")

# Show schema - eto yung structure ng DataFrame with data types
print("=== Schema ===")
df_spark.printSchema()
print("\n")

# Display first 5 rows - same concept as pandas .head()
print("=== First 5 Rows ===")
df_spark.show(5, truncate=False)

=== PySpark DataFrame Info ===
Total rows: 5839
Total columns: 12


=== Schema ===
root
 |-- show_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- director: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- country: string (nullable = true)
 |-- date_added: string (nullable = true)
 |-- release_year: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- listed_in: string (nullable = true)
 |-- description: string (nullable = true)
 |-- type: string (nullable = true)



=== First 5 Rows ===
+--------+-----------------------------------+--------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------+-----------------+------------+------+---------+----------------------------------------------------------+----------------------------------------

# 2. Compute Aggregations

In [28]:
# 2.1 Calculate the number of movies and TV shows released per year
# gamit ng .groupBy() para group by release_year and type
# tapos .count() para mabilang per group
releases_per_year = df_spark.groupBy("release_year", "type") \
    .count() \
    .orderBy("release_year", "type")

print("=== Movies and TV Shows Released Per Year ===")
releases_per_year.show(20)

# Save this aggregation for later
print(f"\nTotal unique years: {releases_per_year.select('release_year').distinct().count()}")

=== Movies and TV Shows Released Per Year ===
+-----------------+---------------+-----+
|     release_year|           type|count|
+-----------------+---------------+-----+
|             NULL|           NULL|    2|
|   Francis Weddey|             NR|    1|
|      Jade Eshete|   Devale Ellis|    1|
| Kristen Johnston|August 15, 2018|    1|
| Marquell Manning|           2017|    1|
|   Peter Ferriero|          TV-MA|    1|
|     Ted Ferguson|  United States|    1|
|             1925|        TV Show|    1|
|             1942|          Movie|    2|
|             1943|          Movie|    3|
|             1944|          Movie|    2|
|             1945|          Movie|    3|
|             1946|          Movie|    2|
|             1946|        TV Show|    1|
|             1947|          Movie|    1|
|             1954|          Movie|    1|
|             1955|          Movie|    1|
|             1956|          Movie|    1|
|             1958|          Movie|    2|
|             1959|          M

In [29]:
# 2.2 Find the average duration of movies
# Kailangan i-clean muna yung 'duration' column para extract number

# Filter movies only, then extract number from "93 min" format
# gamit ng regexp_extract para kunin yung number part lang
from pyspark.sql.functions import regexp_extract

movies_only = df_spark.filter(col("type") == "Movie")

# Extract numeric duration from "93 min" format
movies_with_duration = movies_only.withColumn(
    "duration_minutes",
    regexp_extract(col("duration"), r'(\d+)', 1).cast("integer")
)

# Calculate average duration
avg_duration = movies_with_duration.agg(avg("duration_minutes").alias("average_duration"))

print("=== Average Movie Duration ===")
avg_duration.show()

# Show some examples para ma-verify na tama extraction
print("\nSample movies with extracted duration:")
movies_with_duration.select("title", "duration", "duration_minutes").show(10)

=== Average Movie Duration ===
+----------------+
|average_duration|
+----------------+
|98.3484691229891|
+----------------+


Sample movies with extracted duration:
+--------------------+--------+----------------+
|               title|duration|duration_minutes|
+--------------------+--------+----------------+
|Guatemala: Heart ...|  67 min|              67|
|     The Zoya Factor| 135 min|             135|
|           Atlantics| 106 min|             106|
|        Crazy people| 107 min|             107|
|      I Lost My Body|  81 min|              81|
|Kalushi: The Stor...| 107 min|             107|
|Lagos Real Fake Life| 118 min|             118|
|              Payday| 110 min|             110|
|  The Accidental Spy| 104 min|             104|
|          The Island|  93 min|              93|
+--------------------+--------+----------------+
only showing top 10 rows


In [30]:
# 2.3 Determine the count of titles per unique rating
# simple groupBy and count, tapos order by count descending para makita most common ratings
titles_per_rating = df_spark.groupBy("rating") \
    .count() \
    .orderBy(col("count").desc())

print("=== Count of Titles Per Rating ===")
titles_per_rating.show()

# Show total unique ratings
print(f"\nTotal unique ratings: {titles_per_rating.count()}")

=== Count of Titles Per Rating ===
+-----------------+-----+
|           rating|count|
+-----------------+-----+
|            TV-MA| 1932|
|            TV-14| 1593|
|            TV-PG|  677|
|                R|  439|
|            PG-13|  227|
|               NR|  217|
|               PG|  160|
|            TV-Y7|  156|
|             TV-G|  147|
|             TV-Y|  139|
|         TV-Y7-FV|   92|
|                G|   32|
|             NULL|   12|
|               UR|    7|
|            NC-17|    2|
| Shavidee Trotter|    1|
|    Adriane Lenox|    1|
|           40 min|    1|
|  Jowharah Jones"|    1|
|  Richard Pepple"|    1|
+-----------------+-----+
only showing top 20 rows

Total unique ratings: 22


# 3. Clean and Filter Data

In [31]:
# 3.1 Filter out rows with missing director or cast information
# gamit ng .filter() with conditions - parang WHERE clause sa SQL
# isNotNull() para sure na may value, at hindi ma-empty string
df_filtered = df_spark.filter(
    (col("director").isNotNull()) & 
    (col("director") != "") &
    (col("cast").isNotNull()) & 
    (col("cast") != "")
)

print("=== After Filtering Missing Director/Cast ===")
print(f"Original rows: {df_spark.count()}")
print(f"After filtering: {df_filtered.count()}")
print(f"Rows removed: {df_spark.count() - df_filtered.count()}")
print("\n")

# 3.2 Standardize the type column to be consistent
# gamit ng .withColumn() para ma-replace/modify column
# trim() removes spaces, tapos title case para consistent
df_cleaned = df_filtered.withColumn(
    "type",
    when(lower(trim(col("type"))) == "movie", "Movie")
    .when(lower(trim(col("type"))) == "tv show", "TV Show")
    .otherwise(col("type"))
)

print("=== Type Column Standardization ===")
df_cleaned.groupBy("type").count().show()

=== After Filtering Missing Director/Cast ===
Original rows: 5839
After filtering: 3588
Rows removed: 2251


=== Type Column Standardization ===
+--------------------+-----+
|                type|count|
+--------------------+-----+
| raking in the lo...|    1|
| Robb Wells and M...|    1|
|"" four short fil...|    1|
| and tensions run...|    1|
| political hypocr...|    1|
|"" his exposé on ...|    1|
| his family and h...|    1|
| Frankie Stein an...|    1|
| music and acting...|    1|
| middling salesma...|    1|
|    North Carolina."|    1|
|"" a secretly gay...|    1|
| Prius-driving co...|    1|
|                2017|    1|
|"" Ali Wong is ba...|    1|
|"" does not accep...|    1|
| relationships an...|    1|
|"" a young couple...|    1|
|"" the Mickey Mou...|    1|
| comes this docum...|    1|
+--------------------+-----+
only showing top 20 rows


In [32]:
# 3.3 Select only relevant columns for a transformed output
# gamit ng .select() para piliin specific columns lang
# parang SELECT sa SQL
relevant_columns = [
    "show_id",
    "type", 
    "title",
    "director",
    "cast",
    "country",
    "date_added",
    "release_year",
    "rating",
    "duration",
    "listed_in"
]

df_final = df_cleaned.select(relevant_columns)

print("=== Final Cleaned DataFrame ===")
print(f"Columns selected: {len(df_final.columns)}")
print(f"Total rows: {df_final.count()}")
print("\nColumns in final DataFrame:")
print(df_final.columns)
print("\n")

# Show sample of cleaned data
print("Sample of cleaned data:")
df_final.show(5, truncate=True)

=== Final Cleaned DataFrame ===
Columns selected: 11
Total rows: 3588

Columns in final DataFrame:
['show_id', 'type', 'title', 'director', 'cast', 'country', 'date_added', 'release_year', 'rating', 'duration', 'listed_in']


Sample of cleaned data:
+--------+-----+--------------------+--------------------+--------------------+--------------------+-----------------+------------+------+--------+--------------------+
| show_id| type|               title|            director|                cast|             country|       date_added|release_year|rating|duration|           listed_in|
+--------+-----+--------------------+--------------------+--------------------+--------------------+-----------------+------------+------+--------+--------------------+
|81197050|Movie|Guatemala: Heart ...|Luis Ara, Ignacio...|   Christian Morales|                NULL|November 30, 2019|        2019|  TV-G|  67 min|Documentaries, In...|
|81213894|Movie|     The Zoya Factor|     Abhishek Sharma|Sonam Kapoor, Du

# 4. Save Cleaned Data

In [38]:
# Save CSV 
print("\n1. Saving CSV...")
df_final.coalesce(1).write.mode("overwrite") \
    .option("header", "true") \
    .csv('/root/netflix_final_csv')
print("✅ CSV folder created!")

# Save Parquet 
print("\n2. Saving Parquet...")
df_final.coalesce(1).write.mode("overwrite") \
    .parquet('/root/netflix_final_parquet')
print("✅ Parquet folder created!")

# List the actual files inside both folders
import os

# CSV file
csv_files = [f for f in os.listdir('/root/netflix_final_csv') if f.endswith('.csv')]
print(f"\nCSV file: /root/netflix_final_csv/{csv_files[0]}")

# Parquet file
parquet_files = [f for f in os.listdir('/root/netflix_final_parquet') if f.endswith('.parquet')]
print(f"Parquet file: /root/netflix_final_parquet/{parquet_files[0]}")


1. Saving CSV...
✅ CSV folder created!

2. Saving Parquet...
✅ Parquet folder created!

CSV file: /root/netflix_final_csv/part-00000-909362bc-e3c2-4f14-a7e0-9c31f57cef5b-c000.csv
Parquet file: /root/netflix_final_parquet/part-00000-f33f3b5a-53bd-4a4d-a448-db03e7227dee-c000.snappy.parquet


In [40]:
# Verify CSV file
print("=== Verifying CSV File ===")
df_csv_verify = spark.read.csv('/root/netflix_final_csv', header=True, inferSchema=True)
print(f"CSV rows loaded: {df_csv_verify.count()}")
df_csv_verify.show(3)
print("\n")

# Verify Parquet file
print("=== Verifying Parquet File ===")
df_parquet_verify = spark.read.parquet('/root/netflix_final_parquet')
print(f"Parquet rows loaded: {df_parquet_verify.count()}")
df_parquet_verify.show(3)
print("\n")

print("✅ Both files verified successfully!")

=== Verifying CSV File ===
CSV rows loaded: 3588
+--------+-----+--------------------+--------------------+--------------------+--------------------+-----------------+------------+------+--------+--------------------+
| show_id| type|               title|            director|                cast|             country|       date_added|release_year|rating|duration|           listed_in|
+--------+-----+--------------------+--------------------+--------------------+--------------------+-----------------+------------+------+--------+--------------------+
|81197050|Movie|Guatemala: Heart ...|Luis Ara, Ignacio...|   Christian Morales|                NULL|November 30, 2019|        2019|  TV-G|  67 min|Documentaries, In...|
|81213894|Movie|     The Zoya Factor|     Abhishek Sharma|Sonam Kapoor, Dul...|               India|November 30, 2019|        2019| TV-14| 135 min|Comedies, Dramas,...|
|81082007|Movie|           Atlantics|           Mati Diop|Mama Sane, Amadou...|France, Senegal, ...|Novemb

In [None]:
# Stop SparkSession para di sayang resources
spark.stop()
print("✅ SparkSession stopped successfully!")

✅ SparkSession stopped successfully!
