In [1]:
!pip -q install pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
spark = SparkSession.builder.master("local[*]").appName("Spark DataFrame").getOrCreate()
print("Spark session started")

Spark session started


**Βήμα 1ο** - Δημιουργία DataFrame

In [3]:
from google.colab import files

uploaded = files.upload()
filename = next(iter(uploaded.keys()))
df = spark.read.csv(filename, header=True, inferSchema=True)
print("CSV file loaded into DataFrame")

Saving telecom_churn_10k.csv to telecom_churn_10k.csv
CSV file loaded into DataFrame


Εμφανίσεις

In [4]:
print("First 10 rows of the DataFrame")
df.show(10)

print("DataFrame Schema")
df.printSchema()

total_count = df.count()
print(f"Total number of records",{total_count})

First 10 rows of the DataFrame
+-----------+------+----+-------+-------------+--------------+------------+----------+------+---------------+-------------+--------------+-------------+--------------+-----+
|CUSTOMER_ID|GENDER| AGE|COUNTRY|TENURE_MONTHS| CONTRACT_TYPE|HAS_INTERNET|HAS_MOBILE|HAS_TV|MONTHLY_CHARGES|TOTAL_CHARGES|NUM_COMPLAINTS|SUPPORT_CALLS|PAYMENT_METHOD|CHURN|
+-----------+------+----+-------+-------------+--------------+------------+----------+------+---------------+-------------+--------------+-------------+--------------+-----+
|    C000001|  Male|68.0|     FR|         61.0|Month-to-month|           0|         1|     0|          17.54|      1077.63|             0|            0| Bank transfer|  1.0|
|    C000002|Female|65.0|     FR|         NULL|Month-to-month|           1|         1|     0|          50.82|       833.94|             1|            1|   Credit card|  0.0|
|    C000003|  Male|36.0|     FR|         53.0|      Two year|           1|         1|     0|      

**Βήμα 2ο** - Έλεγχος ελλιπών τιμών (missing values)

In [8]:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, IntegerType, FloatType, LongType, ShortType, ByteType, DecimalType

#Λίστα για να αποθηκεύσουμε τα αποτελέσματα
missing_data_results = []
total_rows = df.count()

print(f"Total number of rows: {total_rows}")

numeric_cols = ["AGE", "TENURE_MONTHS", "MONTHLY_CHARGES", "TOTAL_CHARGES"]

#Επανάληψη σε κάθε στήλη του DataFrame
for col_name in df.columns:
    if col_name in numeric_cols:
        # For numeric columns, check for both NULL and NaN
        missing_count = (df.select(
                            F.sum(
                                F.when(F.col(col_name).isNull() | F.isnan(F.col(col_name)), 1)
                                .otherwise(0)
                            ).alias("missing_count")
                        ).collect()[0]["missing_count"])
    else:
        # For non-numeric columns, only check for NULL
        missing_count = (df.select(
                            F.sum(
                                F.when(F.col(col_name).isNull(), 1)
                                .otherwise(0)
                            ).alias("missing_count")
                        ).collect()[0]["missing_count"])

    #Υπολογισμός του πλήθους των κενών τιμών (NaN ή Null)
    #F.when(...).otherwise(0): Δίνει 1 αν είναι κενό, αλλιώς 0.


    #Υπολογισμός ποσοστού
    #Το ποσοστό μάς δίνει άμεσα τη σχετική βαρύτητα και το μέγεθος του προβλήματος σε κάθε στήλη.
    missing_percentage = (missing_count / total_rows) * 100 if total_rows > 0 else 0

    missing_data_results.append({
        "column_name": col_name,
        "missing_count": missing_count,
        "missing_percentage": missing_percentage
    })

# Δημιουργία νέου DataFrame για την εμφάνιση των αποτελεσμάτων
missing_df = spark.createDataFrame(missing_data_results)

print("\nSummary Table of Missing Values")
missing_df.orderBy(F.col("missing_count").desc()).show(truncate=False)

# Εστίαση στα ζητούμενα πεδία
# Φιλτράρουμε το DataFrame μόνο για τα πεδία ενδιαφέροντος
focus_cols = ["AGE", "TENURE_MONTHS", "MONTHLY_CHARGES", "TOTAL_CHARGES", "CHURN"]
focus_df = missing_df.filter(F.col("column_name").isin(focus_cols))

print("\nMissing values in the focus fields")
focus_df.show(truncate=False)

Total number of rows: 10000

Summary Table of Missing Values
+---------------+-------------+------------------+
|column_name    |missing_count|missing_percentage|
+---------------+-------------+------------------+
|AGE            |300          |3.0               |
|MONTHLY_CHARGES|300          |3.0               |
|TENURE_MONTHS  |300          |3.0               |
|TOTAL_CHARGES  |300          |3.0               |
|CHURN          |300          |3.0               |
|CUSTOMER_ID    |0            |0.0               |
|HAS_MOBILE     |0            |0.0               |
|GENDER         |0            |0.0               |
|HAS_TV         |0            |0.0               |
|COUNTRY        |0            |0.0               |
|NUM_COMPLAINTS |0            |0.0               |
|CONTRACT_TYPE  |0            |0.0               |
|SUPPORT_CALLS  |0            |0.0               |
|HAS_INTERNET   |0            |0.0               |
|PAYMENT_METHOD |0            |0.0               |
+---------------+----

**Βήμα 3ο** - Στρατηγική καθαρισμού

***Σχολιασμός στρατηγικής***


Επιλέξαμε τη στρατηγική της Διαμέσου (Median) για την αντικατάσταση των ελλιπών τιμών στα αριθμητικά πεδία (AGE, TENURE_MONTHS, MONTHLY_CHARGES, TOTAL_CHARGES).

Η Διάμεσος είναι η ασφαλέστερη μέθοδος και η πιο ανθεκτική σε ακραίες τιμές (outliers), διασφαλίζοντας ότι η αντικατάσταση (imputation) δεν θα παρασυρθεί από αυτές τις ακραίες τιμές.

Για το πεδίο CHURN (μεταβλητή στόχος), θα αφαιρέσουμε τις γραμμές όπου CHURN ελλιπείς ή NULL, καθώς αυτό καθιστά τη γραμμή αυτή μη χρησιμοποιήσιμη για την εκπαίδευση του επιβλεπόμενου μοντέλου.

In [9]:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType

print("Removing rows where CHURN is null")
df_clean = df.dropna(subset=['CHURN'])

print("Calculating Median and replacing missing values")

# Πεδία για imputation
impute_cols = ["AGE", "TENURE_MONTHS", "MONTHLY_CHARGES", "TOTAL_CHARGES"]

# Υπολογισμός της διαμέσου (Median) για κάθε στήλη
# Χρειάζεται να κάνουμε cast σε Double για να υπολογίσει σωστά τη Median
median_exprs = [F.median(F.col(c).cast(DoubleType())).alias(c) for c in impute_cols]
medians = df_clean.select(*median_exprs).collect()[0].asDict()
#.asDict()	Μετατρέπει το Spark Row Object σε ένα τυπικό λεξικό (Dictionary) της Python.

# Δημιουργία λεξικού με τις τιμές της διαμέσου
impute_values = {key: v for key, v in medians.items()}
print(f"Calculated medians: {impute_values}")

# Εφαρμογή της αντικατάστασης (Imputation)
df_clean = df_clean.fillna(impute_values, subset=impute_cols)

print("Missing Values Check After Cleaning")
final_check = (df_clean.select(
    F.sum(F.when(F.col("CHURN").isNull(), 1).otherwise(0)).alias("Missing_CHURN"),
    F.sum(F.when(F.col("AGE").isNull() | F.isnan(F.col("AGE")), 1).otherwise(0)).alias("Missing_AGE"),
    F.sum(F.when(F.col("TENURE_MONTHS").isNull() | F.isnan(F.col("TENURE_MONTHS")), 1).otherwise(0)).alias("Missing_TENURE_MONTHS"),
    F.sum(F.when(F.col("MONTHLY_CHARGES").isNull() | F.isnan(F.col("MONTHLY_CHARGES")), 1).otherwise(0)).alias("Missing_MONTHLY_CHARGES"),
    F.sum(F.when(F.col("TOTAL_CHARGES").isNull() | F.isnan(F.col("TOTAL_CHARGES")), 1).otherwise(0)).alias("Missing_TOTAL_CHARGES")
))
final_check.show()

Removing rows where CHURN is null
Calculating Median and replacing missing values
Calculated medians: {'AGE': 51.0, 'TENURE_MONTHS': 37.0, 'MONTHLY_CHARGES': 35.95, 'TOTAL_CHARGES': 1229.72}
Missing Values Check After Cleaning
+-------------+-----------+---------------------+-----------------------+---------------------+
|Missing_CHURN|Missing_AGE|Missing_TENURE_MONTHS|Missing_MONTHLY_CHARGES|Missing_TOTAL_CHARGES|
+-------------+-----------+---------------------+-----------------------+---------------------+
|            0|          0|                    0|                      0|                    0|
+-------------+-----------+---------------------+-----------------------+---------------------+



**Βήμα 4ο** - Βασική περιγραφική ανάλυση

In [10]:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType

analysis_cols = ["AGE", "TENURE_MONTHS", "MONTHLY_CHARGES", "TOTAL_CHARGES"]

#describe() DataFrame
desc_df = df_clean.select(*analysis_cols).describe()
formatted_desc_df = desc_df.select(
    desc_df['summary'],
    *[F.format_number(F.col(c).cast(DoubleType()), 2).alias(c) for c in analysis_cols]
)
formatted_desc_df.show(truncate=False)


#CHURN (0=Παρουσες, 1=Αποχώρηση)
churn_stats = (df_clean.groupBy("CHURN")
                 .agg(
                     F.count("*").alias("Total_Customers"),
                     F.format_number(F.avg("TENURE_MONTHS"), 2).alias("Avg_Tenure_Months"),
                     F.format_number(F.avg("MONTHLY_CHARGES"), 2).alias("Avg_Monthly_Charges")
                 )
                 .orderBy("CHURN"))
churn_stats.show(truncate=False)

+-------+--------+-------------+---------------+-------------+
|summary|AGE     |TENURE_MONTHS|MONTHLY_CHARGES|TOTAL_CHARGES|
+-------+--------+-------------+---------------+-------------+
|count  |9,700.00|9,700.00     |9,700.00       |9,700.00     |
|mean   |51.41   |36.57        |36.54          |1,336.23     |
|stddev |19.25   |20.47        |11.13          |875.29       |
|min    |18.00   |1.00         |5.00           |0.00         |
|max    |85.00   |72.00        |80.45          |5,197.62     |
+-------+--------+-------------+---------------+-------------+

+-----+---------------+-----------------+-------------------+
|CHURN|Total_Customers|Avg_Tenure_Months|Avg_Monthly_Charges|
+-----+---------------+-----------------+-------------------+
|0.0  |6099           |38.34            |36.26              |
|1.0  |3601           |33.58            |37.02              |
+-----+---------------+-----------------+-------------------+



**Βήμα 5ο** - Αρχικό Feature Engineering

In [11]:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, IntegerType

# Θα χρησιμοποιήσουμε το df_clean DataFrame από το Ερώτημα 3!

#NUM_SERVICES
# Χρησιμοποιούμε απλή πρόσθεση, καθώς τα πεδία είναι integer (0 ή 1)
df_clean = df_clean.withColumn("NUM_SERVICES",
    F.col("HAS_INTERNET") + F.col("HAS_MOBILE") + F.col("HAS_TV")
)

#AVG_CHARGE_PER_MONTH
#Αν TENURE_MONTHS = 0 (πρώτος μήνας), τότε AVG_CHARGE_PER_MONTH = MONTHLY_CHARGES, ώστε να μη κάνουμε διαίρεση με το 0.
#Αλλιώς, AVG_CHARGE_PER_MONTH = TOTAL_CHARGES / TENURE_MONTHS.
df_clean = df_clean.withColumn("AVG_CHARGE_PER_MONTH",
    F.when(F.col("TENURE_MONTHS") == 0, F.col("MONTHLY_CHARGES"))
     .otherwise(F.col("TOTAL_CHARGES") / F.col("TENURE_MONTHS"))
)

#IS_LONG_TENURE
df_clean = df_clean.withColumn("IS_LONG_TENURE",
    F.when(F.col("TENURE_MONTHS") >= 24, 1)
     .otherwise(0)
)

print("\n--- Δείγμα Γραμμών με τα Νέα Πεδία ---")
df_clean.select("TENURE_MONTHS",
                "MONTHLY_CHARGES",
                "TOTAL_CHARGES",
                "HAS_INTERNET",
                "HAS_MOBILE",
                "HAS_TV",
                "NUM_SERVICES",
                "AVG_CHARGE_PER_MONTH",
                "IS_LONG_TENURE"
).show(10)


--- Δείγμα Γραμμών με τα Νέα Πεδία ---
+-------------+---------------+-------------+------------+----------+------+------------+--------------------+--------------+
|TENURE_MONTHS|MONTHLY_CHARGES|TOTAL_CHARGES|HAS_INTERNET|HAS_MOBILE|HAS_TV|NUM_SERVICES|AVG_CHARGE_PER_MONTH|IS_LONG_TENURE|
+-------------+---------------+-------------+------------+----------+------+------------+--------------------+--------------+
|         61.0|          17.54|      1077.63|           0|         1|     0|           1|  17.666065573770492|             1|
|         37.0|          50.82|       833.94|           1|         1|     0|           2|   22.53891891891892|             1|
|         53.0|          34.44|       1828.5|           1|         1|     0|           2|                34.5|             1|
|          6.0|          50.66|       271.56|           1|         1|     1|           3|               45.26|             0|
|         39.0|          38.82|      1519.19|           1|         1|     1|  

Αποθήκευση σε νέο csv αρχείο το DataFrame με τις νέες στήλες

In [None]:
df_clean.write.csv("telecom_new_col", header=True, mode="overwrite")