In [1]:
import boto3
import pyspark.sql.functions 
from pyspark.sql import SparkSession
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

In [2]:
from pyspark.sql.functions import *

In [17]:
# Configurtion of the S3 Bucket
s3 = boto3.resource(
    service_name = 's3',
    region_name = 'eu-west-1',
    aws_access_key_id = '',
    aws_secret_access_key = ''
)

In [18]:
# Buckets available
for bucket in s3.buckets.all():
    print(bucket.name)

etl-customer-churn


In [6]:
# Create a Spark Session
spark = SparkSession.builder.appName('etl-workflow-01').getOrCreate()

# creating a spark dataframe
churn_data = spark.read.csv('Data//train.csv', header = True, inferSchema = True)

In [7]:
churn_data.show(5)

+----------+------------------+------------------+----------------+----------------+----------------+-----------+-----------------+----------------+-------------------+----------------------+------------------------+---------------+------------------+----------------------+------+-------------+---------------+----------------+----------+-----+
|AccountAge|    MonthlyCharges|      TotalCharges|SubscriptionType|   PaymentMethod|PaperlessBilling|ContentType|MultiDeviceAccess|DeviceRegistered|ViewingHoursPerWeek|AverageViewingDuration|ContentDownloadsPerMonth|GenrePreference|        UserRating|SupportTicketsPerMonth|Gender|WatchlistSize|ParentalControl|SubtitlesEnabled|CustomerID|Churn|
+----------+------------------+------------------+----------------+----------------+----------------+-----------+-----------------+----------------+-------------------+----------------------+------------------------+---------------+------------------+----------------------+------+-------------+-------------

Schema 

In [8]:
churn_data.printSchema()

root
 |-- AccountAge: integer (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: double (nullable = true)
 |-- SubscriptionType: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- ContentType: string (nullable = true)
 |-- MultiDeviceAccess: string (nullable = true)
 |-- DeviceRegistered: string (nullable = true)
 |-- ViewingHoursPerWeek: double (nullable = true)
 |-- AverageViewingDuration: double (nullable = true)
 |-- ContentDownloadsPerMonth: integer (nullable = true)
 |-- GenrePreference: string (nullable = true)
 |-- UserRating: double (nullable = true)
 |-- SupportTicketsPerMonth: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- WatchlistSize: integer (nullable = true)
 |-- ParentalControl: string (nullable = true)
 |-- SubtitlesEnabled: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Churn: integer (nullable = true)



Numeric Descriptive Statistics

In [9]:
numeric_stats = churn_data.select(
    [col for col, d_t in churn_data.dtypes if d_t.startswith('string') == False]
    ).summary()

numeric_stats.show()

+-------+------------------+------------------+------------------+-------------------+----------------------+------------------------+------------------+----------------------+-----------------+-------------------+
|summary|        AccountAge|    MonthlyCharges|      TotalCharges|ViewingHoursPerWeek|AverageViewingDuration|ContentDownloadsPerMonth|        UserRating|SupportTicketsPerMonth|    WatchlistSize|              Churn|
+-------+------------------+------------------+------------------+-------------------+----------------------+------------------------+------------------+----------------------+-----------------+-------------------+
|  count|            243787|            243787|            243787|             243787|                243787|                  243787|            243787|                243787|           243787|             243787|
|   mean|60.083757542444836|12.490694506213474| 750.7410172504401|  20.50217902620334|     92.26406111425209|      24.503513312850973| 3.002

Categorical Statistics

In [10]:
categorical_cols = churn_data.select(
    [col for col, d_t in churn_data.dtypes if d_t.startswith('string') == True]
)

for col_name in categorical_cols:
    print(f' Distinct values and count in :: {col_name}')
    categorical_cols.groupBy(col_name).count().orderBy('count', ascending = False).show()

 Distinct values and count in :: Column<'SubscriptionType'>
+----------------+-----+
|SubscriptionType|count|
+----------------+-----+
|        Standard|81920|
|           Basic|81050|
|         Premium|80817|
+----------------+-----+

 Distinct values and count in :: Column<'PaymentMethod'>
+----------------+-----+
|   PaymentMethod|count|
+----------------+-----+
|Electronic check|61313|
|     Credit card|60924|
|   Bank transfer|60797|
|    Mailed check|60753|
+----------------+-----+

 Distinct values and count in :: Column<'PaperlessBilling'>
+----------------+------+
|PaperlessBilling| count|
+----------------+------+
|              No|121980|
|             Yes|121807|
+----------------+------+

 Distinct values and count in :: Column<'ContentType'>
+-----------+-----+
|ContentType|count|
+-----------+-----+
|       Both|81737|
|   TV Shows|81145|
|     Movies|80905|
+-----------+-----+

 Distinct values and count in :: Column<'MultiDeviceAccess'>
+-----------------+------+
|Mult

Checking for Missing values across the features

In [11]:
missing_values = churn_data.select(
    [
        sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in churn_data.columns
    ]
)
missing_values.show()

+----------+--------------+------------+----------------+-------------+----------------+-----------+-----------------+----------------+-------------------+----------------------+------------------------+---------------+----------+----------------------+------+-------------+---------------+----------------+----------+-----+
|AccountAge|MonthlyCharges|TotalCharges|SubscriptionType|PaymentMethod|PaperlessBilling|ContentType|MultiDeviceAccess|DeviceRegistered|ViewingHoursPerWeek|AverageViewingDuration|ContentDownloadsPerMonth|GenrePreference|UserRating|SupportTicketsPerMonth|Gender|WatchlistSize|ParentalControl|SubtitlesEnabled|CustomerID|Churn|
+----------+--------------+------------+----------------+-------------+----------------+-----------+-----------------+----------------+-------------------+----------------------+------------------------+---------------+----------+----------------------+------+-------------+---------------+----------------+----------+-----+
|         0|             

Checking for Duplicates

In [12]:
duplicate_count = churn_data.groupBy(churn_data.columns).count().filter(col('count') > 1)
duplicate_count.show()

+----------+--------------+------------+----------------+-------------+----------------+-----------+-----------------+----------------+-------------------+----------------------+------------------------+---------------+----------+----------------------+------+-------------+---------------+----------------+----------+-----+-----+
|AccountAge|MonthlyCharges|TotalCharges|SubscriptionType|PaymentMethod|PaperlessBilling|ContentType|MultiDeviceAccess|DeviceRegistered|ViewingHoursPerWeek|AverageViewingDuration|ContentDownloadsPerMonth|GenrePreference|UserRating|SupportTicketsPerMonth|Gender|WatchlistSize|ParentalControl|SubtitlesEnabled|CustomerID|Churn|count|
+----------+--------------+------------+----------------+-------------+----------------+-----------+-----------------+----------------+-------------------+----------------------+------------------------+---------------+----------+----------------------+------+-------------+---------------+----------------+----------+-----+-----+
+------

Transformation 01: ConsistencyScore

Measures viewing consistency, indicating whether a user watches for short periods frequently or longer periods less often.

In [37]:
churn_data = churn_data.withColumn(
        'Consistency_Score', col('AverageViewingDuration') / col('ViewingHoursPerWeek')    
)

churn_data.select('Consistency_Score', 'AverageViewingDuration' ,  'ViewingHoursPerWeek').show(5)

max_viewing_score = churn_data.select('Consistency_Score').agg(
        max('Consistency_Score').alias('max_viewing_consistency')
        )
max_viewing_score.show()

min_viewing_score = churn_data.select('Consistency_Score').agg(
        min('Consistency_Score').alias('min_viewing_consistency')
        ).alias('min_viewing_consistency')
min_viewing_score.show()

+------------------+----------------------+-------------------+
| Consistency_Score|AverageViewingDuration|ViewingHoursPerWeek|
+------------------+----------------------+-------------------+
|1.7283638320708867|     63.53137733399087|  36.75810391025656|
|0.7927625418724876|    25.725594639013025| 32.450567831131046|
|7.7569734813459075|     57.36406085422372|    7.3951601087942|
| 4.704423416408498|     131.5375073033797|  27.96038869388153|
| 2.258415355567302|    45.356653002287025| 20.083397365536275|
+------------------+----------------------+-------------------+
only showing top 5 rows

+-----------------------+
|max_viewing_consistency|
+-----------------------+
|     174.29703750023356|
+-----------------------+

+-----------------------+
|min_viewing_consistency|
+-----------------------+
|    0.12635851067643944|
+-----------------------+



Transformation 02: Support Rating Index

Insight : Determines if frequent support interactions correlate with lower user satisfaction.

High Support Rating Index: A high value for this feature would indicate that a user has frequent or intense interactions with support.

In [14]:
churn_data.select('SupportTicketsPerMonth','UserRating').show(5)
churn_data = churn_data.withColumn(
    'Support_Rating_Index', col('SupportTicketsPerMonth') * (5 - col('UserRating'))
)
churn_data.select('Support_Rating_Index').show(5)

+----------------------+------------------+
|SupportTicketsPerMonth|        UserRating|
+----------------------+------------------+
|                     4|2.1764975145384615|
|                     8| 3.478632294057515|
|                     6|  4.23882362326149|
|                     2| 4.276012901811724|
|                     4| 3.616170325945041|
+----------------------+------------------+
only showing top 5 rows

+--------------------+
|Support_Rating_Index|
+--------------------+
|  11.294009941846154|
|   12.17094164753988|
|  4.5670582604310574|
|  1.4479741963765527|
|   5.535318696219836|
+--------------------+
only showing top 5 rows



In [39]:
churn_data.columns

['AccountAge',
 'MonthlyCharges',
 'TotalCharges',
 'SubscriptionType',
 'PaymentMethod',
 'PaperlessBilling',
 'ContentType',
 'MultiDeviceAccess',
 'DeviceRegistered',
 'ViewingHoursPerWeek',
 'AverageViewingDuration',
 'ContentDownloadsPerMonth',
 'GenrePreference',
 'UserRating',
 'SupportTicketsPerMonth',
 'Gender',
 'WatchlistSize',
 'ParentalControl',
 'SubtitlesEnabled',
 'CustomerID',
 'Churn',
 'Viewing_Consistency_Score',
 'Support_Rating_Index',
 'Consistency_Score']

In [41]:
# Temporary View to Run SQL queries
churn_data.createOrReplaceTempView('churn_temp_view')

Pipeline

In [47]:
# Question: What are the top 3 genres with the highest churn rates, and how do they compare across different subscription types?
genre_analysis = """
    SELECT 
        GenrePreference,
        SubscriptionType,
        CONCAT(ROUND(AVG(Churn) * 100,2), '%') AS ChurnRate
    FROM 
        churn_temp_view 
    GROUP BY 
        GenrePreference,
        SubscriptionType
    ORDER BY 
        AVG(Churn) DESC
    LIMIT 3;
    """

# Question: How does churn vary across different income groups, and what are the average monthly charges for each group?
income_analysis = """ 
WITH income_flag AS
(
    SELECT
        CASE 
        WHEN MonthlyCharges < (SELECT AVG(MonthlyCharges) FROM churn_temp_view) THEN 'Low' 
        WHEN MonthlyCharges > (SELECT AVG(MonthlyCharges) FROM churn_temp_view)
             AND MonthlyCharges < 15 THEN 'Medium'  
            ELSE 'High' END AS IncomeGroup,
        Churn,
        MonthlyCharges
    FROM
        churn_temp_view
)
SELECT
     IncomeGroup,
     COUNT(*) AS total_users,
     AVG(MonthlyCharges) AS average_monthly_charges,
     CONCAT(ROUND(AVG(Churn),2),'%') AS ChurnRate   
FROM 
    income_flag
GROUP BY
    IncomeGroup
ORDER BY 
    AVG(MonthlyCharges) DESC
 """

# Question: How does AccountAge correlate with churn, and what is the average TotalCharges for users with different subscription lengths?
customer_longevity_analysis = """
WITH avg_account_age AS 
(
    SELECT AVG(AccountAge) AS avg_age
    FROM churn_temp_view
),
age_flag AS 
(
    SELECT 
        Churn,
        TotalCharges,
        AccountAge,
        CASE 
            WHEN AccountAge <= 20 THEN 'bronzeAccountType'
            WHEN AccountAge > 20 AND AccountAge <= 40 THEN 'silverAccountType'
            WHEN AccountAge > 40 AND AccountAge <= (SELECT avg_age FROM avg_account_age) THEN 'goldAccountType' 
            WHEN AccountAge > (SELECT avg_age FROM avg_account_age) THEN 'premiumAccountType'
            ELSE 'Unknown'
        END AS age_group
    FROM
        churn_temp_view
)
SELECT 
    age_group,
    ROUND(SUM(TotalCharges),2) AS TotalChargesPerAgeGroup,
    CONCAT(ROUND(AVG(Churn),2), '%') AS ChurnRate
FROM 
    age_flag
GROUP BY
        1
ORDER BY 
    AVG(Churn) DESC
"""

# Question: What are the average ViewingHoursPerWeek and AverageViewingDuration for users who churned versus those who didn’t?
viewing_behavior_analysis = """
SELECT
    Churn,
    ROUND(AVG(ViewingHoursPerWeek),2) AS avg_viewing_hps,
    ROUND(AVG(AverageViewingDuration),2) AS avg_viewing_duration
FROM 
    churn_temp_view
GROUP BY
    Churn
"""

# Question: How does churn vary by payment method, and what is the average TotalCharges for each payment method?
impact_payment_method = """
SELECT 
    PaymentMethod, 
    COUNT(*) AS TotalUsers, 
    SUM(Churn) AS ChurnedUsers, 
    CONCAT(ROUND((SUM(Churn) * 100.0 / COUNT(*)),2), '%') AS ChurnRate, 
    AVG(TotalCharges) AS AvgTotalCharges
FROM 
    churn_temp_view
GROUP BY 
    PaymentMethod;
"""

# Question: How does the number of registered devices impact churn, and what is the average ViewingHoursPerWeek for users with and without MultiDeviceAccess?

multi_device_analysis = """ 
SELECT 
    DeviceRegistered, 
    MultiDeviceAccess, 
    COUNT(*) AS TotalUsers, 
    SUM(Churn) AS ChurnedUsers, 
    CONCAT(ROUND((SUM(Churn) * 100.0 / COUNT(*)),2), '%') AS ChurnRate, 
    AVG(ViewingHoursPerWeek) AS AvgViewingHours
FROM 
    churn_temp_view
GROUP BY 
    DeviceRegistered, 
    MultiDeviceAccess
ORDER BY
    AVG(ViewingHoursPerWeek) DESC
"""

# What is the average Support_Rating_Index for users who gave a high UserRating versus those who didn’t, and how does this correlate with churn?

rating_analysis = """ 
SELECT 
    CASE 
        WHEN UserRating >= 4 THEN 'High Rating' 
        ELSE 'Low Rating' 
    END AS RatingCategory,
    AVG(Support_Rating_Index) AS AvgSupportRatingIndex, 
    COUNT(*) AS TotalUsers, 
    SUM(Churn) AS ChurnedUsers, 
    CONCAT(ROUND((SUM(Churn) * 100.0 / COUNT(*)),2), '%') AS ChurnRate
FROM 
    churn_temp_view
GROUP BY 
    RatingCategory;
"""

# Finding top 10 highest-risk customers based on their ConsistencyScore and DeviceUsageIntensity, and how can they be prioritized for retention strategies?
# incomplete
high_risk_analysis = """ 
WITH high_risk_flag AS (
    SELECT 
        CustomerID,
        AverageViewingDuration,
        ViewingHoursPerWeek,
        DeviceRegistered,
        Consistency_Score,
        (ViewingHoursPerWeek / DeviceRegistered) AS DeviceUsageIntensity,
        Churn
    FROM 
        churn_temp_view
),
rank_flag AS (
    SELECT 
        CustomerID,
        Consistency_Score,
        DeviceUsageIntensity,
        Churn,
        RANK() OVER (ORDER BY Consistency_Score DESC) AS ConsistencyRank,
        RANK() OVER (ORDER BY DeviceUsageIntensity DESC) AS DeviceUsageRank
    FROM 
        high_risk_flag
)
SELECT 
    CustomerID,
    Consistency_Score,
    DeviceUsageIntensity,
    ConsistencyRank,
    DeviceUsageRank,
    Churn,
    CASE 
        WHEN ConsistencyRank < (SELECT AVG(ConsistencyRank) FROM high_risk_flag) AND DeviceUsageRank < (SELECT AVG(DeviceUsageRank) FROM high_risk_flag) THEN 'High-Risk'
        ELSE 'Low-Risk'
    END AS RiskCategory
FROM 
    rank_flag
WHERE 
    RiskCategory = 'High-Risk'
ORDER BY 
    ConsistencyRank ASC, 
    DeviceUsageRank ASC
LIMIT 10;

"""

In [43]:
genre_analysis_df = spark.sql(genre_analysis)
genre_analysis_df.show()
income_analysis_df = spark.sql(income_analysis)
income_analysis_df.show()
impact_payment_method_df = spark.sql(impact_payment_method)
impact_payment_method_df.show()
viewing_behavior_analysis_df = spark.sql(viewing_behavior_analysis)
viewing_behavior_analysis_df.show()
multi_device_analysis_df = spark.sql(multi_device_analysis)
multi_device_analysis_df.show()
rating_analysis_df = spark.sql(rating_analysis)
rating_analysis_df.show()

+---------------+----------------+---------+
|GenrePreference|SubscriptionType|ChurnRate|
+---------------+----------------+---------+
|         Comedy|           Basic|   21.07%|
|         Sci-Fi|           Basic|   20.87%|
|         Comedy|        Standard|   19.54%|
+---------------+----------------+---------+

+-----------+-----------+-----------------------+---------+
|IncomeGroup|total_users|average_monthly_charges|ChurnRate|
+-----------+-----------+-----------------------+---------+
|       High|      81192|     17.485801653641243|    0.23%|
|     Medium|      40773|      13.75328164407855|    0.19%|
|        Low|     121822|      8.738973102147462|    0.15%|
+-----------+-----------+-----------------------+---------+

+----------------+----------+------------+---------+-----------------+
|   PaymentMethod|TotalUsers|ChurnedUsers|ChurnRate|  AvgTotalCharges|
+----------------+----------+------------+---------+-----------------+
|   Bank transfer|     60797|       10899|   17.93

In [48]:
high_risk_analysis_df= spark.sql(high_risk_analysis)
high_risk_analysis_df.show()

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `RiskCategory` cannot be resolved. Did you mean one of the following? [`Churn`, `CustomerID`, `DeviceUsageRank`, `ConsistencyRank`, `Consistency_Score`].; line 39 pos 4;
'WithCTE
:- CTERelationDef 16, false
:  +- SubqueryAlias high_risk_flag
:     +- Project [CustomerID#202, AverageViewingDuration#193, ViewingHoursPerWeek#192, DeviceRegistered#191, Consistency_Score#4330, (ViewingHoursPerWeek#192 / cast(DeviceRegistered#191 as double)) AS DeviceUsageIntensity#4818, Churn#203]
:        +- SubqueryAlias churn_temp_view
:           +- View (`churn_temp_view`, [AccountAge#183,MonthlyCharges#184,TotalCharges#185,SubscriptionType#186,PaymentMethod#187,PaperlessBilling#188,ContentType#189,MultiDeviceAccess#190,DeviceRegistered#191,ViewingHoursPerWeek#192,AverageViewingDuration#193,ContentDownloadsPerMonth#194,GenrePreference#195,UserRating#196,SupportTicketsPerMonth#197,Gender#198,WatchlistSize#199,ParentalControl#200,SubtitlesEnabled#201,CustomerID#202,Churn#203,Support_Rating_Index#2255,Consistency_Score#4330])
:              +- Project [AccountAge#183, MonthlyCharges#184, TotalCharges#185, SubscriptionType#186, PaymentMethod#187, PaperlessBilling#188, ContentType#189, MultiDeviceAccess#190, DeviceRegistered#191, ViewingHoursPerWeek#192, AverageViewingDuration#193, ContentDownloadsPerMonth#194, GenrePreference#195, UserRating#196, SupportTicketsPerMonth#197, Gender#198, WatchlistSize#199, ParentalControl#200, SubtitlesEnabled#201, CustomerID#202, Churn#203, Support_Rating_Index#2255, Consistency_Score#4330]
:                 +- Project [AccountAge#183, MonthlyCharges#184, TotalCharges#185, SubscriptionType#186, PaymentMethod#187, PaperlessBilling#188, ContentType#189, MultiDeviceAccess#190, DeviceRegistered#191, ViewingHoursPerWeek#192, AverageViewingDuration#193, ContentDownloadsPerMonth#194, GenrePreference#195, UserRating#196, SupportTicketsPerMonth#197, Gender#198, WatchlistSize#199, ParentalControl#200, SubtitlesEnabled#201, CustomerID#202, Churn#203, Viewing_Consistency_Score#2169, Support_Rating_Index#2255, (AverageViewingDuration#193 / ViewingHoursPerWeek#192) AS Consistency_Score#4330]
:                    +- Project [AccountAge#183, MonthlyCharges#184, TotalCharges#185, SubscriptionType#186, PaymentMethod#187, PaperlessBilling#188, ContentType#189, MultiDeviceAccess#190, DeviceRegistered#191, ViewingHoursPerWeek#192, AverageViewingDuration#193, ContentDownloadsPerMonth#194, GenrePreference#195, UserRating#196, SupportTicketsPerMonth#197, Gender#198, WatchlistSize#199, ParentalControl#200, SubtitlesEnabled#201, CustomerID#202, Churn#203, Viewing_Consistency_Score#2169, (cast(SupportTicketsPerMonth#197 as double) * (cast(5 as double) - UserRating#196)) AS Support_Rating_Index#2255]
:                       +- Project [AccountAge#183, MonthlyCharges#184, TotalCharges#185, SubscriptionType#186, PaymentMethod#187, PaperlessBilling#188, ContentType#189, MultiDeviceAccess#190, DeviceRegistered#191, ViewingHoursPerWeek#192, AverageViewingDuration#193, ContentDownloadsPerMonth#194, GenrePreference#195, UserRating#196, SupportTicketsPerMonth#197, Gender#198, WatchlistSize#199, ParentalControl#200, SubtitlesEnabled#201, CustomerID#202, Churn#203, (AverageViewingDuration#193 / ViewingHoursPerWeek#192) AS Viewing_Consistency_Score#2169]
:                          +- Relation [AccountAge#183,MonthlyCharges#184,TotalCharges#185,SubscriptionType#186,PaymentMethod#187,PaperlessBilling#188,ContentType#189,MultiDeviceAccess#190,DeviceRegistered#191,ViewingHoursPerWeek#192,AverageViewingDuration#193,ContentDownloadsPerMonth#194,GenrePreference#195,UserRating#196,SupportTicketsPerMonth#197,Gender#198,WatchlistSize#199,ParentalControl#200,SubtitlesEnabled#201,CustomerID#202,Churn#203] csv
:- CTERelationDef 17, false
:  +- SubqueryAlias rank_flag
:     +- Project [CustomerID#202, Consistency_Score#4330, DeviceUsageIntensity#4818, Churn#203, ConsistencyRank#4819, DeviceUsageRank#4820]
:        +- Project [CustomerID#202, Consistency_Score#4330, DeviceUsageIntensity#4818, Churn#203, ConsistencyRank#4819, DeviceUsageRank#4820, ConsistencyRank#4819, DeviceUsageRank#4820]
:           +- Window [rank(DeviceUsageIntensity#4818) windowspecdefinition(DeviceUsageIntensity#4818 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS DeviceUsageRank#4820], [DeviceUsageIntensity#4818 DESC NULLS LAST]
:              +- Window [rank(Consistency_Score#4330) windowspecdefinition(Consistency_Score#4330 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS ConsistencyRank#4819], [Consistency_Score#4330 DESC NULLS LAST]
:                 +- Project [CustomerID#202, Consistency_Score#4330, DeviceUsageIntensity#4818, Churn#203]
:                    +- SubqueryAlias high_risk_flag
:                       +- CTERelationRef 16, true, [CustomerID#202, AverageViewingDuration#193, ViewingHoursPerWeek#192, DeviceRegistered#191, Consistency_Score#4330, DeviceUsageIntensity#4818, Churn#203], false
+- 'GlobalLimit 10
   +- 'LocalLimit 10
      +- 'Sort ['ConsistencyRank ASC NULLS FIRST, 'DeviceUsageRank ASC NULLS FIRST], true
         +- 'Project ['CustomerID, 'Consistency_Score, 'DeviceUsageIntensity, 'ConsistencyRank, 'DeviceUsageRank, 'Churn, CASE WHEN (('ConsistencyRank < scalar-subquery#4815 []) AND ('DeviceUsageRank < scalar-subquery#4816 [])) THEN High-Risk ELSE Low-Risk END AS RiskCategory#4817]
            :  :- 'Project [unresolvedalias('AVG('ConsistencyRank), None)]
            :  :  +- SubqueryAlias high_risk_flag
            :  :     +- CTERelationRef 16, true, [CustomerID#4825, AverageViewingDuration#4826, ViewingHoursPerWeek#4827, DeviceRegistered#4828, Consistency_Score#4829, DeviceUsageIntensity#4830, Churn#4831], false
            :  +- 'Project [unresolvedalias('AVG('DeviceUsageRank), None)]
            :     +- SubqueryAlias high_risk_flag
            :        +- CTERelationRef 16, true, [CustomerID#4832, AverageViewingDuration#4833, ViewingHoursPerWeek#4834, DeviceRegistered#4835, Consistency_Score#4836, DeviceUsageIntensity#4837, Churn#4838], false
            +- 'Filter ('RiskCategory = High-Risk)
               +- SubqueryAlias rank_flag
                  +- CTERelationRef 17, true, [CustomerID#202, Consistency_Score#4330, DeviceUsageIntensity#4818, Churn#203, ConsistencyRank#4819, DeviceUsageRank#4820], false


In [55]:
churn_data.select('DeviceRegistered').show(5)

+-----+---------------+--------------------+
|Churn|avg_viewing_hps|avg_viewing_duration|
+-----+---------------+--------------------+
|    1|          17.43|               76.49|
|    0|          21.18|               95.75|
+-----+---------------+--------------------+



In [31]:
new_query = """
WITH cte AS 
(
    SELECT 
        CustomerID,
        AccountAge,
        ROUND(AVG(Churn), 2) AS ChurnRate
    FROM 
        churn_temp_view
    GROUP BY
        CustomerID, AccountAge
), rank_flag AS
(
    SELECT 
        CustomerID, 
        AccountAge,
        ChurnRate,
        ROW_NUMBER() OVER (PARTITION BY CustomerID ORDER BY ChurnRate DESC) AS rank
    FROM
        cte     
)
SELECT 
    CustomerID, 
    AccountAge, 
    ChurnRate
FROM 
    rank_flag
WHERE rank = 1
ORDER BY ChurnRate DESC
LIMIT 3
""" 

new_df = spark.sql(new_query)
new_df.show()

+----------+----------+---------+
|CustomerID|AccountAge|ChurnRate|
+----------+----------+---------+
|008B3OJNVT|        25|      1.0|
|002JAHXBD0|        94|      1.0|
|00E34HN125|        16|      1.0|
+----------+----------+---------+



In [33]:
new = """ 
WITH ranked_churn AS 
(
    SELECT 
        CustomerID, 
        AccountAge, 
        ROUND(AVG(Churn), 2) AS ChurnRate,
        ROW_NUMBER() OVER (PARTITION BY CustomerID ORDER BY ROUND(AVG(Churn), 2) DESC) AS rank
    FROM 
        churn_temp_view
    GROUP BY 
        CustomerID, AccountAge
)
SELECT 
    CustomerID, 
    AccountAge, 
    ChurnRate
FROM 
    ranked_churn
WHERE rank = 1
ORDER BY 
    ChurnRate DESC
LIMIT 3;

"""
new_df = spark.sql(new)
new_df.show()

+----------+----------+---------+
|CustomerID|AccountAge|ChurnRate|
+----------+----------+---------+
|008B3OJNVT|        25|      1.0|
|002JAHXBD0|        94|      1.0|
|00E34HN125|        16|      1.0|
+----------+----------+---------+



In [38]:
churn_rate_df = churn_data.groupBy("Gender").agg(
    count("*").alias("TotalCustomers"),
    sum("Churn").alias("ChurnedCustomers")
).withColumn(
    "ChurnRate", (col("ChurnedCustomers") / col("TotalCustomers")) * 100
)

In [40]:
churn_rate_df.show()

+------+--------------+----------------+------------------+
|Gender|TotalCustomers|ChurnedCustomers|         ChurnRate|
+------+--------------+----------------+------------------+
|Female|        121930|           21747| 17.83564340195194|
|  Male|        121857|           22435|18.410924280098804|
+------+--------------+----------------+------------------+



In [22]:
# Average Churn Rate by Gender (include count by gender and churned customers with rate)