In [12]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline

# Code Overview: Netflix Content Analysis

1. **Spark Initialization**  
   Creates a `SparkSession` for distributed data processing.

2. **Data Ingestion**  
   Reads the `netflix_titles.csv` file with inferred data types.

3. **Data Cleaning**  
   - Filters content to include only "Movies" and "TV Shows".  
   - Replaces null values in key columns with defaults.  
   - Removes rows with invalid `type` or `title` values.  

4. **Transformation**  
   - Extracts numeric values from `duration` and creates a new column `duration_minutes`.  
   - Casts `release_year` to integers.  
   - Removes duplicate records based on `show_id`.

5. **Aggregation and Analysis**  
   - Counts total rows and columns.  
   - Calculates null counts for all columns.  
   - Provides statistical summaries, content type distribution, top 10 countries, and temporal trends.

6. **Output**  
   Displays results clearly using print statements and tables for insights.


In [38]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as psf

# Spark initialization
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Netflix Content Analysis") \
    .getOrCreate()

# Dataset ingestion
netflix_data = spark.read.csv('netflix_titles.csv', header=True, inferSchema=True)

# Content type filtering
accepted_content_types = ['Movie', 'TV Show']
netflix_data = netflix_data.filter(netflix_data['type'].isin(accepted_content_types))

# Fill nulls
netflix_data = netflix_data.fillna({
    'director': 'Uncredited',
    'cast': 'Ensemble',
    'country': 'International'
})

# Strict data validation
netflix_data = netflix_data.filter(
    netflix_data['type'].isNotNull() & netflix_data['title'].isNotNull()
)

# Type casting
netflix_data = netflix_data.withColumn(
    'release_year', netflix_data['release_year'].cast('int')
)

# Duration extraction
netflix_data = netflix_data.withColumn(
    'duration_minutes', psf.regexp_extract('duration', r'(\d+)', 1).cast('int')
)

# Deduplication
netflix_data = netflix_data.dropDuplicates(['show_id'])

# Results aggregation
results = {
    "Total Entries": netflix_data.count(),
    "Total Columns": len(netflix_data.columns),
    "Null Counts": {col: netflix_data.filter(netflix_data[col].isNull()).count() for col in netflix_data.columns},
    "Statistical Summary": netflix_data.describe(),
    "Content Type Distribution": netflix_data.groupBy('type').count(),
    "Top 10 Countries": netflix_data.groupBy('country').count().orderBy('count', ascending=False).limit(10),
    "Temporal Distribution": netflix_data.groupBy('release_year', 'type').count().orderBy('release_year')
}

# Display results
print(f"\n--- REPORT ---\n")
print(f"Total Entries: {results['Total Entries']:,}")
print(f"Total Columns: {results['Total Columns']}\n")

print("\nNull Value Investigation:")
for col, count in results["Null Counts"].items():
    print(f"Nulls in '{col}': {count}")

print("\nStatistical Summary:")
results["Statistical Summary"].show(truncate=False)

print("\nContent Type Distribution:")
results["Content Type Distribution"].show(truncate=False)

print("\nTop 10 Countries with Most Content:")
results["Top 10 Countries"].show(truncate=False)

print("\nTemporal Distribution of Content by Release Year and Type:")
results["Temporal Distribution"].show(truncate=False)



--- REPORT ---

Total Entries: 8,807
Total Columns: 13


Null Value Investigation:
Nulls in 'show_id': 0
Nulls in 'type': 0
Nulls in 'title': 0
Nulls in 'director': 0
Nulls in 'cast': 0
Nulls in 'country': 0
Nulls in 'date_added': 12
Nulls in 'release_year': 20
Nulls in 'rating': 5
Nulls in 'duration': 4
Nulls in 'listed_in': 1
Nulls in 'description': 1
Nulls in 'duration_minutes': 20

Statistical Summary:
+-------+-------+-------+---------------------------------------------------+------------------------+--------------------------------------------------------------+----------------+---------------+-----------------+-----------------+-------------+-----------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+
|summary|show_id|type   |title                                              |director                |cast                                    

1. **Null Value Handling**  
   Replaces missing values in the following columns with default values:
   - `director`: "Uncredited"
   - `cast`: "Ensemble Cast"
   - `country`: "Global"
   - `date_added`: "Unspecified"
   - `rating`: "Unrated"
   - `release_year`: "Undetermined"
   - `title`: "Untitled Content"

2. **Categorical Analysis**  
   - Calculates the number of distinct values in the columns: `type`, `country`, `rating`, `listed_in`.
   - Stores the result as a dictionary (`categorical_results`).

3. **Top 5 Countries by Content**  
   - Groups the dataset by `country` and counts the number of entries.
   - Orders the countries by content count and selects the top 5 countries.

4. **Results Aggregation**  
   - Updates the `results` dictionary with:
     - `Categorical Analysis`: Distinct value counts for each column.
     - `Top Countries Content`: Top 5 countries with the most content.

5. **Results Display**  
   - Prints the number of distinct categories for each column.
   - Displays the top 5 countries and their content counts using `show()`.


In [39]:
from pyspark.sql.functions import col, desc

# Comprehensive null value handling with expanded replacements
netflix_data = netflix_data.fillna({
    'director': 'Uncredited',
    'cast': 'Ensemble Cast',
    'country': 'Global',
    'date_added': 'Unspecified',
    'rating': 'Unrated',
    'release_year': 'Undetermined',
    'title': 'Untitled Content'
})

# Categorical analysis results
categorical_results = {
    column_name: netflix_data.select(column_name).distinct().count()
    for column_name in ['type', 'country', 'rating', 'listed_in']
}

# Country-based content distribution
top_countries = netflix_data.groupBy('country') \
    .count() \
    .orderBy(desc('count')) \
    .limit(5) \
    .select(col('country').alias('Country'), col('count').alias('Content Count'))

# Results aggregation
results.update({
    "Categorical Analysis": categorical_results,
    "Top Countries Content": top_countries
})

# Display results
print("\n--- ENHANCED ANALYSIS REPORT ---\n")

print("\nCategorical Analysis:")
for column, count in results["Categorical Analysis"].items():
    print(f"  - '{column}': {count:,} distinct categories")

print("\nTop 5 Countries with Most Content:")
results["Top Countries Content"].show(truncate=False)



--- ENHANCED ANALYSIS REPORT ---


Categorical Analysis:
  - 'type': 2 distinct categories
  - 'country': 768 distinct categories
  - 'rating': 36 distinct categories
  - 'listed_in': 534 distinct categories

Top 5 Countries with Most Content:
+--------------+-------------+
|Country       |Content Count|
+--------------+-------------+
|United States |2805         |
|India         |972          |
|International |831          |
|United Kingdom|419          |
|Japan         |245          |
+--------------+-------------+



### Analysis of Content Addition Trends

This section examines the trends in content additions to Netflix based on the year of addition:

1. **Legacy Date Parsing**:
   - Sets Spark's time parser to handle legacy date formats.
   - Converts the `date_added` column into the correct date format (`MMMM d, yyyy`).

2. **Filtering Invalid Dates**:
   - Removes entries where the `date_added` value is invalid or null to ensure the analysis uses only valid date data.

3. **Extracting Year of Addition**:
   - Derives the `year_added` from the `date_added` column for temporal analysis.

4. **Year-wise Content Distribution**:
   - Groups the data by `year_added` and counts how many pieces of content were added each year.
   - Presents the yearly content distribution to observe growth and patterns over time.

This analysis reveals how Netflix's content library has expanded over the years, providing insights into its growth trajectory.


In [40]:
# Configure Spark's time parsing policy for legacy compatibility
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

from pyspark.sql.functions import to_date, year, col

# Date parsing and transformation
netflix_data = netflix_data.withColumn(
    "date_added",
    to_date("date_added", "MMMM d, yyyy")
)

# Robust date filtering to remove invalid entries
netflix_data = netflix_data.filter(col("date_added").isNotNull())

# Extract year of addition for temporal analysis
netflix_data = netflix_data.withColumn("year_added", year("date_added"))

# Temporal distribution of content additions
print("\nTemporal Distribution of Content Additions by Year:")
temporal_distribution = netflix_data.groupBy("year_added") \
    .count() \
    .orderBy("year_added") \
    .select(
        col("year_added").alias("Year Added"),
        col("count").alias("Content Count")
    )

temporal_distribution.show(truncate=False)



Temporal Distribution of Content Additions by Year:
+----------+-------------+
|Year Added|Content Count|
+----------+-------------+
|2008      |2            |
|2009      |2            |
|2010      |1            |
|2011      |13           |
|2012      |3            |
|2013      |10           |
|2014      |23           |
|2015      |72           |
|2016      |418          |
|2017      |1162         |
|2018      |1623         |
|2019      |1997         |
|2020      |1872         |
|2021      |1491         |
+----------+-------------+



### Numeric Duration Extraction and Analysis of Netflix Content

This section processes the duration data of Netflix content to extract insights:

1. **Extracting Duration**:
   - Uses a regular expression to extract numeric values from the `duration` column.
   - Converts the extracted values into integers for numerical analysis.

2. **Focusing on Movie Duration**:
   - Filters the dataset to focus on `Movie` content type.
   - Calculates the average duration (in minutes) for movies.

3. **Results Presentation**:
   - Displays the average duration of Netflix movies in a clean, tabular format.

This analysis helps understand the typical length of Netflix movies, providing insights into content structure and viewer experience.


In [41]:
from pyspark.sql.functions import regexp_extract, col

# Extract numeric values from duration strings
netflix_data = netflix_data.withColumn(
    "duration_numeric", regexp_extract('duration', r'(\d+)', 1).cast('int')
)

# Compute average duration for movies
average_duration = netflix_data.filter(col("type") == 'Movie') \
    .groupBy('type') \
    .avg('duration_numeric') \
    .select(
        col("type").alias("Content Type"),
        col("avg(duration_numeric)").alias("Average Duration (Minutes)")
    )

# Update results
results["Average Movie Duration"] = average_duration

# Display results
print("\n--- MOVIE DURATION ANALYSIS ---\n")
print("Average Duration for Movies (in minutes):")
results["Average Movie Duration"].show(truncate=False)



--- MOVIE DURATION ANALYSIS ---

Average Duration for Movies (in minutes):
+------------+--------------------------+
|Content Type|Average Duration (Minutes)|
+------------+--------------------------+
|Movie       |99.57911962035674         |
+------------+--------------------------+

