# Customer Churn με Apache Spark

Ονοματεπώνυμο 1: Συμεών Αλμανίδης (ics23083)

Ονοματεπώνυμο 2: Ευστράτιος Μαυρίγκος (ics23117)

# Θέμα 1: Φόρτωση, Έλεγχος και Καθαρισμός Δεδομένων

In [None]:
from google.colab import files

print("Please upload the CSV file (e.g., telecom_churn_10k.csv) when prompted.")
telecom_churn_10k_csv = files.upload()
telecom_churn_10k_csv_path = next(iter(telecom_churn_10k_csv))

Please upload the CSV file (e.g., telecom_churn_10k.csv) when prompted.


Saving telecom_churn_10k.csv to telecom_churn_10k (2).csv


In [None]:
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, desc, isnull, isnan, sum, when, mean
from pyspark.sql.types import StructType, StructField, StringType, LongType

spark = SparkSession \
    .builder \
    .getOrCreate()

In [None]:
df = spark.read.options(inferSchema=True, header=True).csv(telecom_churn_10k_csv_path)
df.show(10)
df.printSchema()
df.count()

+-----------+------+----+-------+-------------+--------------+------------+----------+------+---------------+-------------+--------------+-------------+--------------+-----+
|CUSTOMER_ID|GENDER| AGE|COUNTRY|TENURE_MONTHS| CONTRACT_TYPE|HAS_INTERNET|HAS_MOBILE|HAS_TV|MONTHLY_CHARGES|TOTAL_CHARGES|NUM_COMPLAINTS|SUPPORT_CALLS|PAYMENT_METHOD|CHURN|
+-----------+------+----+-------+-------------+--------------+------------+----------+------+---------------+-------------+--------------+-------------+--------------+-----+
|    C000001|  Male|68.0|     FR|         61.0|Month-to-month|           0|         1|     0|          17.54|      1077.63|             0|            0| Bank transfer|  1.0|
|    C000002|Female|65.0|     FR|         NULL|Month-to-month|           1|         1|     0|          50.82|       833.94|             1|            1|   Credit card|  0.0|
|    C000003|  Male|36.0|     FR|         53.0|      Two year|           1|         1|     0|          34.44|       1828.5|       

10000

In [None]:

missing_exprs = []

for column_name in df.columns:
    if df.schema[column_name].dataType.typeName() in ['integer', 'long', 'float', 'double', 'decimal']:
        missing_exprs.append(sum(when(col(column_name).isNull() | isnan(col(column_name)), 1).otherwise(0)).alias(column_name))
    else:
        missing_exprs.append(sum(when(col(column_name).isNull(), 1).otherwise(0)).alias(column_name))

if not missing_exprs:
    print("DataFrame is empty or has no columns to process.")
else:
    missing_counts_df_wide = df.agg(*missing_exprs)

    missing_counts_row = missing_counts_df_wide.collect()[0]

    missing_data_list = [(column_name, missing_counts_row[column_name]) for column_name in df.columns]

    missing_schema = StructType([
        StructField("column_name", StringType(), True),
        StructField("missing_count", LongType(), True)
    ])
    missing_df_result = spark.createDataFrame(missing_data_list, schema=missing_schema)

    print("\n### Missing values for all columns:\n")
    missing_df_result.show(truncate=False)

    focused_columns = ["AGE", "TENURE_MONTHS", "MONTHLY_CHARGES", "TOTAL_CHARGES", "CHURN"]
    print("\n### Missing values for specific columns:\n")
    missing_df_result.filter(col("column_name").isin(focused_columns)).show(truncate=False)




### Missing values for all columns:

+---------------+-------------+
|column_name    |missing_count|
+---------------+-------------+
|CUSTOMER_ID    |0            |
|GENDER         |0            |
|AGE            |300          |
|COUNTRY        |0            |
|TENURE_MONTHS  |300          |
|CONTRACT_TYPE  |0            |
|HAS_INTERNET   |0            |
|HAS_MOBILE     |0            |
|HAS_TV         |0            |
|MONTHLY_CHARGES|300          |
|TOTAL_CHARGES  |300          |
|NUM_COMPLAINTS |0            |
|SUPPORT_CALLS  |0            |
|PAYMENT_METHOD |0            |
|CHURN          |300          |
+---------------+-------------+


### Missing values for specific columns:

+---------------+-------------+
|column_name    |missing_count|
+---------------+-------------+
|AGE            |300          |
|TENURE_MONTHS  |300          |
|MONTHLY_CHARGES|300          |
|TOTAL_CHARGES  |300          |
|CHURN          |300          |
+---------------+-------------+



In [None]:
# 1.3
averages = df.select(
    mean(col('AGE')).alias('AVG_AGE'),
    mean(col('TENURE_MONTHS')).alias('AVG_TENURE'),
    mean(col('MONTHLY_CHARGES')).alias('AVG_MONTHLY'),
    mean(col('TOTAL_CHARGES')).alias('AVG_TOTAL')
).collect()[0]

df_clean = df.na.drop(subset=["CHURN"]) \
    .na.fill({
        'AGE': averages['AVG_AGE'],
        'TENURE_MONTHS': averages['AVG_TENURE'],
        'MONTHLY_CHARGES': averages['AVG_MONTHLY'],
        'TOTAL_CHARGES': averages['AVG_TOTAL']
    })

df_clean.count()

# 1.4
df_clean.select("AGE", "TENURE_MONTHS", "MONTHLY_CHARGES", "TOTAL_CHARGES").summary().show()

df_clean.groupBy("CHURN").agg(
    avg("TENURE_MONTHS").alias("AVG_TENURE"),
    avg("MONTHLY_CHARGES").alias("AVG_MONTHLY")
).show()

+-------+------------------+------------------+------------------+------------------+
|summary|               AGE|     TENURE_MONTHS|   MONTHLY_CHARGES|     TOTAL_CHARGES|
+-------+------------------+------------------+------------------+------------------+
|  count|              9700|              9700|              9700|              9700|
|   mean|51.427155670102884| 36.56046597938145| 36.55743087256894|1339.5390897924374|
| stddev|19.247482167636413|20.466005947235722|11.127498102253991|  875.090293939724|
|    min|              18.0|               1.0|               5.0|               0.0|
|    25%|              35.0|              19.0|             28.86|            633.98|
|    50%|51.448144329896905|             36.58|             36.37|           1265.49|
|    75%|              68.0|              54.0|             43.53|            1897.2|
|    max|              85.0|              72.0|             80.45|           5197.62|
+-------+------------------+------------------+-------

Επιλέχθηκε η μέθοδος του μέσου όρου αντί της διαγραφής γραμμών, ώστε να αποφευχθεί η απώλεια πολύτιμων δεδομένων. Αν διαγράφαμε κάθε πελάτη που είχε έστω και μία ελλιπή τιμή (π.χ. στην ηλικία), θα χάναμε την πληροφορία για τη συμπεριφορά του (Churn) και τα υπόλοιπα χαρακτηριστικά του. Ο μέσος όρος αποτελεί μια ασφαλή και αμερόληπτη εκτίμηση για το τυπικό προφίλ του πελάτη, διατηρώντας την κεντρική τάση της κατανομής.

In [None]:
# 1.5
df_final = df_clean.withColumn("NUM_SERVICES", col("HAS_INTERNET") + col("HAS_MOBILE") + col("HAS_TV")) \
                   .withColumn("IS_LONG_TENURE", when(col("TENURE_MONTHS") >= 24, 1).otherwise(0))
df_final.select("CUSTOMER_ID", "NUM_SERVICES", "IS_LONG_TENURE", "TENURE_MONTHS").show(5)

+-----------+------------+--------------+-------------+
|CUSTOMER_ID|NUM_SERVICES|IS_LONG_TENURE|TENURE_MONTHS|
+-----------+------------+--------------+-------------+
|    C000001|           1|             1|         61.0|
|    C000002|           2|             1|        36.58|
|    C000003|           2|             1|         53.0|
|    C000004|           3|             0|          6.0|
|    C000005|           3|             1|         39.0|
+-----------+------------+--------------+-------------+
only showing top 5 rows


# Θέμα 2: Ανάλυση με Spark SQL & Επιβεβαίωση Επιχειρησιακών Μοτίβων

In [21]:
# 2.1 Δημιουργία Προσωρινού View
df_final.createOrReplaceTempView("churn_view")

# 2.2 Ερωτήματα Ανάλυσης

# Query 2.1: Churn ανά τύπο συμβολαίου
print("\n--- Churn ανά τύπο συμβολαίου ---")
q1 = spark.sql("""
    SELECT
        CONTRACT_TYPE,
        COUNT(*) as TOTAL_CUSTOMERS,
        SUM(CAST(CHURN as int)) as CHURN_COUNT,
        ROUND((SUM(CAST(CHURN as int)) / COUNT(*)) * 100, 2) as CHURN_RATE_PERCENT
    FROM churn_view
    GROUP BY CONTRACT_TYPE
    ORDER BY CHURN_RATE_PERCENT DESC
""")
q1.show()

# Query 2.2: Churn ανά πλήθος υπηρεσιών
print("\n--- Churn ανά πλήθος υπηρεσιών ---")
q2 = spark.sql("""
    SELECT
        NUM_SERVICES,
        COUNT(*) as TOTAL,
        ROUND((SUM(CAST(CHURN as int)) / COUNT(*)) * 100, 2) as CHURN_RATE_PERCENT
    FROM churn_view
    GROUP BY NUM_SERVICES
    ORDER BY NUM_SERVICES
""")
q2.show()

# Query 2.3: Churn και Μηνιαία Χρέωση
print("\n--- Μέση Μηνιαία Χρέωση ανά Churn Status ---")
q3 = spark.sql("""
    SELECT
        CHURN,
        ROUND(AVG(MONTHLY_CHARGES), 2) as AVG_MONTHLY_BILL
    FROM churn_view
    GROUP BY CHURN
""")
q3.show()

# 2.3 Γεωγραφική Ανάλυση (Top-5 Χώρες με υψηλό Churn)
print("\n--- Top-5 Χώρες με υψηλότερο ποσοστό Churn (>100 πελάτες) ---")
q4 = spark.sql("""
    SELECT
        COUNTRY,
        COUNT(*) as TOTAL_CUSTOMERS,
        ROUND((SUM(CAST(CHURN as int)) / COUNT(*)) * 100, 2) as CHURN_RATE
    FROM churn_view
    GROUP BY COUNTRY
    HAVING COUNT(*) >= 100
    ORDER BY CHURN_RATE DESC
    LIMIT 5
""")
q4.show()

# 2.4
print("\n--- Έλεγχος Hints (Support Queries) ---")

# Hint 1: Πλήρες πακέτο (3 υπηρεσίες) + Μεγάλη διάρκεια (>=24 μήνες) vs Νέοι
print("Hint 1 Check: Churn Rate για Παλιούς πελάτες με Full Services")
spark.sql("""
    SELECT
        CASE
            WHEN NUM_SERVICES = 3 AND TENURE_MONTHS >= 24 THEN 'Full Package & Loyal'
            ELSE 'Others'
        END as CUSTOMER_PROFILE,
        COUNT(*) as COUNT,
        ROUND(AVG(CHURN) * 100, 2) as CHURN_RATE
    FROM churn_view
    GROUP BY 1
""").show()

# Hint 2: Κινητή μόνο + Month-to-month
print("Hint 2 Check: Churn Rate για Mobile Only + Month-to-month")
spark.sql("""
    SELECT
        CASE
            WHEN HAS_MOBILE = 1 AND HAS_INTERNET = 0 AND CONTRACT_TYPE = 'Month-to-month' THEN 'Mobile Only & Monthly'
            ELSE 'Others'
        END as RISK_PROFILE,
        COUNT(*) as COUNT,
        ROUND(AVG(CHURN) * 100, 2) as CHURN_RATE
    FROM churn_view
    GROUP BY 1
""").show()

# Hint 3: Παράπονα & Support Calls (Σήμα κινδύνου)
print("Hint 3 Check: Μέσος όρος παραπόνων/κλήσεων στους Churners")
spark.sql("""
    SELECT
        CHURN,
        ROUND(AVG(NUM_COMPLAINTS), 2) as AVG_COMPLAINTS,
        ROUND(AVG(SUPPORT_CALLS), 2) as AVG_SUPPORT_CALLS
    FROM churn_view
    GROUP BY CHURN
""").show()


--- Churn ανά τύπο συμβολαίου ---
+--------------+---------------+-----------+------------------+
| CONTRACT_TYPE|TOTAL_CUSTOMERS|CHURN_COUNT|CHURN_RATE_PERCENT|
+--------------+---------------+-----------+------------------+
|Month-to-month|           5326|       2834|             53.21|
|      One year|           2440|        503|             20.61|
|      Two year|           1934|        264|             13.65|
+--------------+---------------+-----------+------------------+


--- Churn ανά πλήθος υπηρεσιών ---
+------------+-----+------------------+
|NUM_SERVICES|TOTAL|CHURN_RATE_PERCENT|
+------------+-----+------------------+
|           0|  229|             43.23|
|           1| 2149|             47.98|
|           2| 5152|             37.38|
|           3| 2170|             25.12|
+------------+-----+------------------+


--- Μέση Μηνιαία Χρέωση ανά Churn Status ---
+-----+----------------+
|CHURN|AVG_MONTHLY_BILL|
+-----+----------------+
|  0.0|           36.28|
|  1.0|      

**Συμβόλαια Month-to-month:** Οι πελάτες με Month-to-month συμβόλαιο εμφανίζουν το υψηλότερο ποσοστό αποχώρησης (53.21%), το οποίο είναι δραματικά μεγαλύτερο σε σχέση με τα συμβόλαια δέσμευσης (One year: 20.61%, Two year: 13.65%). Μάλιστα, όταν το μηνιαίο συμβόλαιο συνδυάζεται αποκλειστικά με κινητή (Mobile Only), το churn εκτοξεύεται στο 69.34%, αποτελώντας το πιο επικίνδυνο προφίλ πελάτη.

**Πλήθος Υπηρεσιών:** Οι πελάτες με πολλαπλές υπηρεσίες είναι πιο πιστοί. Συγκεκριμένα, όσοι διαθέτουν μόνο μία υπηρεσία έχουν ποσοστό αποχώρησης 47.98%, ενώ στους πελάτες με τεις υπηρεσίες (Internet + Mobile + TV) το ποσοστό πέφτει στο 25.12%. Αυτό υποδεικνύει ότι η δέσμευση σε ένα οικοσύστημα υπηρεσιών λειτουργεί αποτρεπτικά για την αποχώρηση.

**Μηνιαίες Χρεώσεις & Παράπονα:** Παρατηρείται ότι οι πελάτες που αποχώρησαν είχαν κατά μέσο όρο ελαφρώς υψηλότερες χρεώσεις (37.04€) έναντι των ενεργών πελατών (36.28€).

# Θέμα 3: Πρόβλεψη Μηνιαίας Χρέωσης με Decision Tree για δυναμική τιμολόγηση

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# 3.1 Επιλογή Features & Label
selected_cols = [
    "AGE", "TENURE_MONTHS", "NUM_COMPLAINTS", "SUPPORT_CALLS",
    "HAS_INTERNET", "HAS_MOBILE", "HAS_TV", "NUM_SERVICES",
    "CONTRACT_TYPE", "PAYMENT_METHOD", "COUNTRY",
    "MONTHLY_CHARGES"
]

df_ml = df_final.select(selected_cols).dropna()

# 3.2 Προεπεξεργασία (Pipeline Stages)
cat_cols = ["CONTRACT_TYPE", "PAYMENT_METHOD", "COUNTRY"]
stages = []

for c in cat_cols:
    indexer = StringIndexer(inputCol=c, outputCol=c + "_idx")
    encoder = OneHotEncoder(inputCol=c + "_idx", outputCol=c + "_vec")
    stages += [indexer, encoder]


num_cols = ["AGE", "TENURE_MONTHS", "NUM_COMPLAINTS", "SUPPORT_CALLS",
            "HAS_INTERNET", "HAS_MOBILE", "HAS_TV", "NUM_SERVICES"]

assembler_inputs = [c + "_vec" for c in cat_cols] + num_cols
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
stages += [assembler]

# Μοντέλο: Decision Tree Regressor
dt = DecisionTreeRegressor(featuresCol="features", labelCol="MONTHLY_CHARGES", maxDepth=5)
stages += [dt]

pipeline = Pipeline(stages=stages)

# 3.3 Εκπαίδευση και Αξιολόγηση

train_data, test_data = df_ml.randomSplit([0.7, 0.3], seed=42)

print("\n--- Εκπαίδευση Μοντέλου... ---")
model = pipeline.fit(train_data)

print("--- Παραγωγή Προβλέψεων ---")
predictions = model.transform(test_data)
predictions.select("MONTHLY_CHARGES", "prediction", "features").show(5)

evaluator = RegressionEvaluator(labelCol="MONTHLY_CHARGES", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

evaluator_r2 = RegressionEvaluator(labelCol="MONTHLY_CHARGES", predictionCol="prediction", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)

print(f"\n--- Αποτελέσματα Αξιολόγησης ---")
print(f"RMSE (Root Mean Squared Error): {rmse:.2f}")
print(f"R² (Coefficient of Determination): {r2:.2f}")

mean_charge = df_ml.select(mean("MONTHLY_CHARGES")).collect()[0][0]
print(f"\nΜέσος όρος Μηνιαίας Χρέωσης στο dataset: {mean_charge:.2f}")

print(f"Το μοντέλο έχει σφάλμα περίπου {rmse:.2f}€. Αν ο μέσος λογαριασμός είναι {mean_charge:.2f}€,")
print(f"To R²={r2:.2f} δείχνει πόσο καλά εξηγεί το μοντέλο τη διακύμανση της χρέωσης.")


--- Εκπαίδευση Μοντέλου... ---
--- Παραγωγή Προβλέψεων ---
+-----------------+-----------------+--------------------+
|  MONTHLY_CHARGES|       prediction|            features|
+-----------------+-----------------+--------------------+
|            36.44|39.69733932993002|(18,[0,2,5,10,11,...|
|             41.9|39.69733932993002|(18,[0,2,5,10,11,...|
|36.57246391752579|30.16856059625894|(18,[2,5,10,11,13...|
|            40.71|39.69733932993002|(18,[0,2,5,10,11,...|
|            44.76|35.42062933747911|(18,[1,3,6,10,11,...|
+-----------------+-----------------+--------------------+
only showing top 5 rows

--- Αποτελέσματα Αξιολόγησης ---
RMSE (Root Mean Squared Error): 7.88
R² (Coefficient of Determination): 0.49

Μέσος όρος Μηνιαίας Χρέωσης στο dataset: 36.56
Το μοντέλο έχει σφάλμα περίπου 7.88€. Αν ο μέσος λογαριασμός είναι 36.56€,
To R²=0.49 δείχνει πόσο καλά εξηγεί το μοντέλο τη διακύμανση της χρέωσης.


Με σφάλμα περίπου 7.88€, αν ο μέσος λογαριασμός είναι 36.56€ τότε έχουμε ποσοστό σφάλματος ≈ 21.5%.
Επομένως, για το συγκεκριμένο πρόβλημα είναι μέτριο προς ικανοποιητικό για αρχικό μοντέλο αλλά σίγουρα όχι αρκετά ακριβές για αυστηρή τιμολόγηση.