In [None]:
from pyspark.sql.functions import col, count, isnull, when

df = spark.read.csv(
    "/FileStore/tables/Telco_Customer_Churn.csv", header=True, inferSchema=True
)

In [0]:
df.describe().show()

+-------+----------+------+------------------+-------+----------+------------------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+------------------+------------------+-----+
|summary|customerID|gender|     SeniorCitizen|Partner|Dependents|            tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|    MonthlyCharges|      TotalCharges|Churn|
+-------+----------+------+------------------+-------+----------+------------------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+------------------+------------------+-----+
|  count|      7043|  7043|              7043|   7043|      7043|     

In [0]:
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract|PaperlessBilling|PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|         0|     0|            0|      0|         0|     0|           0|            0|              0|             0|           0|               0|          0|          0|              0|       0|               0| 

In [0]:
for col_name in df.columns:
    df.select(col_name).distinct().show(5)

+----------+
|customerID|
+----------+
|3668-QPYBK|
|5575-GNVDE|
|7795-CFOCW|
|7590-VHVEG|
|9305-CDSKC|
+----------+
only showing top 5 rows

+------+
|gender|
+------+
|Female|
|  Male|
+------+

+-------------+
|SeniorCitizen|
+-------------+
|            1|
|            0|
+-------------+

+-------+
|Partner|
+-------+
|     No|
|    Yes|
+-------+

+----------+
|Dependents|
+----------+
|        No|
|       Yes|
+----------+

+------+
|tenure|
+------+
|    34|
|    22|
|     1|
|     8|
|    45|
+------+
only showing top 5 rows

+------------+
|PhoneService|
+------------+
|          No|
|         Yes|
+------------+

+----------------+
|   MultipleLines|
+----------------+
|No phone service|
|              No|
|             Yes|
+----------------+

+---------------+
|InternetService|
+---------------+
|    Fiber optic|
|             No|
|            DSL|
+---------------+

+-------------------+
|     OnlineSecurity|
+-------------------+
|                 No|
|                Yes

In [0]:
%sql
SELECT * FROM customer_churn LIMIT 5;

customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
7590-VHVEG,Female,0,Yes,No,1,No,No phone service,DSL,No,Yes,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No
5575-GNVDE,Male,0,No,No,34,Yes,No,DSL,Yes,No,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,No
3668-QPYBK,Male,0,No,No,2,Yes,No,DSL,Yes,Yes,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,Yes
7795-CFOCW,Male,0,No,No,45,No,No phone service,DSL,Yes,No,Yes,Yes,No,No,One year,No,Bank transfer (automatic),42.3,1840.75,No
9237-HQITU,Female,0,No,No,2,Yes,No,Fiber optic,No,No,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,Yes


In [0]:
%sql
CREATE OR REPLACE TABLE customers_churn_with_tenure_year AS
SELECT
  customerID,
  CASE 
    WHEN tenure <= 12 THEN '0-1 year'
    WHEN tenure <= 24 THEN '1-2 years'
    WHEN tenure <= 48 THEN '2-4 years'
    WHEN tenure <= 60 THEN '4-5 years'
    ELSE '5-6 years or more'
  END AS TenureYearTime
FROM customer_churn;

num_affected_rows,num_inserted_rows


In [0]:
%sql
CREATE OR REPLACE TABLE customers_churn_with_total_revenue AS
SELECT
  customerID,
  (MonthlyCharges * tenure) AS EstimatedTotalRevenue
FROM customer_churn;

num_affected_rows,num_inserted_rows


In [0]:
%sql
CREATE OR REPLACE TABLE customers_churn_with_new_costumer AS
SELECT 
  customerID,
  CASE 
    WHEN tenure <= 6 THEN 1 
    ELSE 0 
  END AS IsNewCustomer
FROM customer_churn;

num_affected_rows,num_inserted_rows


In [0]:
%sql
CREATE OR REPLACE TABLE customers_churn_with_services AS
SELECT 
  customerID,
  (
    CASE WHEN PhoneService = 'Yes' THEN 1 ELSE 0 END +
    CASE WHEN OnlineSecurity = 'Yes' THEN 1 ELSE 0 END +
    CASE WHEN OnlineBackup = 'Yes' THEN 1 ELSE 0 END +
    CASE WHEN StreamingTV = 'Yes' THEN 1 ELSE 0 END +
    CASE WHEN StreamingMovies = 'Yes' THEN 1 ELSE 0 END
  ) AS MultipleServices
FROM customer_churn;

num_affected_rows,num_inserted_rows


In [0]:
%sql
CREATE OR REPLACE TABLE customers_churn_with_contract_length AS
SELECT
  customerID,
  CASE 
    WHEN Contract = 'Month-to-month' THEN 1
    WHEN Contract = 'One year' THEN 12
    WHEN Contract = 'Two year' THEN 24
    ELSE NULL
  END AS ContractLengthMonths
FROM customer_churn;

num_affected_rows,num_inserted_rows


In [0]:
%sql
CREATE OR REPLACE TABLE customers_churn_with_protection AS
SELECT 
  customerID,
  CASE 
    WHEN TechSupport = 'Yes' OR OnlineSecurity = 'Yes' THEN 1 
    ELSE 0 
  END AS TechSupportOrSecurity
FROM customer_churn;

num_affected_rows,num_inserted_rows


In [0]:
%sql
CREATE OR REPLACE TABLE customer_churn_final
USING DELTA
AS
SELECT
    cc.customerID,
    cc.gender,
    cc.SeniorCitizen,
    cc.Partner,
    cc.Dependents,
    cc.tenure,
    nc.IsNewCustomer,
    ty.TenureYearTime,
    cl.ContractLengthMonths,
    cc.PhoneService,
    cc.MultipleLines,
    cc.InternetService,
    cc.OnlineSecurity,
    cc.OnlineBackup,
    ts.TechSupportOrSecurity,
    cc.DeviceProtection,
    cc.TechSupport,
    cc.StreamingTV,
    cc.StreamingMovies,
    ms.MultipleServices,
    cc.Contract,
    cc.PaperlessBilling,
    cc.PaymentMethod,
    cc.MonthlyCharges,
    cc.TotalCharges,
    er.EstimatedTotalRevenue,
    cc.Churn
FROM customer_churn cc
LEFT JOIN customers_churn_with_tenure_year ty ON cc.customerID = ty.customerID
LEFT JOIN customers_churn_with_total_revenue er ON cc.customerID = er.customerID
LEFT JOIN customers_churn_with_services ms ON cc.customerID = ms.customerID
LEFT JOIN customers_churn_with_protection ts ON cc.customerID = ts.customerID
LEFT JOIN customers_churn_with_new_costumer nc ON cc.customerID = nc.customerID
LEFT JOIN customers_churn_with_contract_length cl ON cc.customerID = cl.customerID;

num_affected_rows,num_inserted_rows


In [None]:
df.coalesce(1).write.mode("overwrite").option("header", "true").csv(
    "/tmp/tableau_ready_customer_churn.csv"
)