**Drive Connection**

In [1]:
# --- Mount Google Drive ---
from google.colab import drive
drive.mount('/content/drive')

# --- PySpark Setup ---
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, when
from pyspark.sql.types import IntegerType, DoubleType

spark = SparkSession.builder.appName("TelcoChurnPreprocessing").getOrCreate()

# --- Load Dataset ---
file_path = "/content/drive/MyDrive/CSE 4262 Project/Telco Churn.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

print("Shape:", (df.count(), len(df.columns)))
df.show(5)

Mounted at /content/drive
Shape: (7043, 21)
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|7590-VHVEG|Female|            0|    Yes|        No|     1|          No|No phone service|            DSL|            No|  

**Initial Exploration**

In [4]:
# --- Missing Values ---
print(" Missing Values Per Column:")
print("-" * 30)
missing_counts = [(c, df.filter(col(c).isNull()).count()) for c in df.columns]
for colname, miss in missing_counts:
    print(f"{colname}: {miss}")
print("\n")

# --- Dataset Info ---
print(" Dataset Info:")
print("-" * 30)
df.printSchema()
print(f"Rows: {df.count()}, Columns: {len(df.columns)}\n")

from pyspark.sql import functions as F

# --- Unique Values ---
print(" Unique Values Per Column:")
print("-" * 30)
unique_counts = df.agg(*(F.countDistinct(col(c)).alias(c) for c in df.columns))
unique_counts.show(truncate=False)




 Missing Values Per Column:
------------------------------
customerID: 0
gender: 0
SeniorCitizen: 0
Partner: 0
Dependents: 0
tenure: 0
PhoneService: 0
MultipleLines: 0
InternetService: 0
OnlineSecurity: 0
OnlineBackup: 0
DeviceProtection: 0
TechSupport: 0
StreamingTV: 0
StreamingMovies: 0
Contract: 0
PaperlessBilling: 0
PaymentMethod: 0
MonthlyCharges: 0
TotalCharges: 0
Churn: 0


 Dataset Info:
------------------------------
root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |

**Basic Cleaning & Drop ID**

In [5]:
from pyspark.sql.functions import trim

# --- Trim spaces in column names ---
for c in df.columns:
    df = df.withColumnRenamed(c, c.strip())

# --- Trim whitespace in all string columns ---
for c, dtype in df.dtypes:
    if dtype == "string":
        df = df.withColumn(c, trim(col(c)))

# --- Drop customerID if exists ---
if "customerID" in df.columns:
    df = df.drop("customerID")

print("Columns now:", df.columns)


Columns now: ['gender', 'SeniorCitizen', 'Partner', 'Dependents', 'tenure', 'PhoneService', 'MultipleLines', 'InternetService', 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport', 'StreamingTV', 'StreamingMovies', 'Contract', 'PaperlessBilling', 'PaymentMethod', 'MonthlyCharges', 'TotalCharges', 'Churn']


**Fix numeric types & missing values**

In [6]:
from pyspark.sql.functions import col, when
from pyspark.sql.types import DoubleType, IntegerType

# --- Numeric columns ---
numeric_cols = ["tenure", "MonthlyCharges", "TotalCharges"]

# Cast numeric columns to Double
for c in numeric_cols:
    if c in df.columns:
        df = df.withColumn(c, col(c).cast(DoubleType()))

# Fill missing TotalCharges with median
if "TotalCharges" in df.columns:
    median_total = df.approxQuantile("TotalCharges", [0.5], 0.001)[0]
    df = df.fillna({"TotalCharges": median_total})

# Ensure SeniorCitizen is integer (0/1)
if "SeniorCitizen" in df.columns:
    df = df.withColumn(
        "SeniorCitizen",
        when(col("SeniorCitizen").isNull(), 0)
        .otherwise(col("SeniorCitizen"))
        .cast(IntegerType())
    )

print("Fix numeric types & missing values.")

# --- Summary statistics (like Pandas .describe()) ---
df.select(numeric_cols).describe().show()


Fix numeric types & missing values.
+-------+------------------+------------------+-----------------+
|summary|            tenure|    MonthlyCharges|     TotalCharges|
+-------+------------------+------------------+-----------------+
|  count|              7043|              7043|             7043|
|   mean| 32.37114865824223| 64.76169246059922|2281.912359789866|
| stddev|24.559481023094442|30.090047097678482|2265.272185332062|
|    min|               0.0|             18.25|             18.8|
|    max|              72.0|            118.75|           8684.8|
+-------+------------------+------------------+-----------------+



**Normalize service-related columns, then binary-encode**

In [7]:
from pyspark.sql.functions import when
from pyspark.sql.types import IntegerType

# 1) Binary Yes/No columns → 1/0
binary_cols = ["Partner", "Dependents", "PhoneService", "PaperlessBilling"]

for c in binary_cols:
    if c in df.columns:
        df = df.withColumn(
            c,
            when(col(c) == "Yes", 1)
            .when(col(c) == "No", 0)
            .otherwise(None)
            .cast(IntegerType())
        )

# 2) Gender: Female → 0, Male → 1
if "gender" in df.columns:
    df = df.withColumn(
        "gender",
        when(col("gender") == "Female", 0)
        .when(col("gender") == "Male", 1)
        .otherwise(None)
        .cast(IntegerType())
    )

# 3) Multi-level service columns (Yes/No/No internet service/No phone service)
multi_level_cols = [
    "MultipleLines", "OnlineSecurity", "OnlineBackup",
    "DeviceProtection", "TechSupport", "StreamingTV", "StreamingMovies"
]

for c in multi_level_cols:
    if c in df.columns:
        df = df.withColumn(
            c,
            when(col(c) == "No", 0)
            .when(col(c) == "Yes", 1)
            .when((col(c) == "No internet service") | (col(c) == "No phone service"), 2)
            .otherwise(None)
            .cast(IntegerType())
        )

# 4) Label: Churn → 0/1
if "Churn" in df.columns:
    df = df.withColumn(
        "Churn",
        when(col("Churn") == "Yes", 1)
        .when(col("Churn") == "No", 0)
        .otherwise(None)
        .cast(IntegerType())
    )

print("Sample:")
df.show(5)


Sample:
+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|     0|            0|      1|         0|   1.0|           0|            2|            DSL|             0|           1|               0|          0|          0|              0|Month-to-month|         

**Encode Multi-class Categorical Columns**

In [8]:
from pyspark.sql.types import IntegerType

# 1) InternetService -> Label Encoding
if "InternetService" in df.columns:
    df = df.withColumn(
        "InternetService",
        when(col("InternetService") == "No", 0)
        .when(col("InternetService") == "DSL", 1)
        .when(col("InternetService") == "Fiber optic", 2)
        .otherwise(None)
        .cast(IntegerType())
    )

# 2) Contract -> Ordinal Encoding
if "Contract" in df.columns:
    df = df.withColumn(
        "Contract",
        when(col("Contract") == "Month-to-month", 0)
        .when(col("Contract") == "One year", 1)
        .when(col("Contract") == "Two year", 2)
        .otherwise(None)
        .cast(IntegerType())
    )

# 3) PaymentMethod -> Label Encoding
if "PaymentMethod" in df.columns:
    df = df.withColumn(
        "PaymentMethod",
        when(col("PaymentMethod") == "Electronic check", 0)
        .when(col("PaymentMethod") == "Mailed check", 1)
        .when(col("PaymentMethod") == "Bank transfer (automatic)", 2)
        .when(col("PaymentMethod") == "Credit card (automatic)", 3)
        .otherwise(None)
        .cast(IntegerType())
    )

print("Shape:", (df.count(), len(df.columns)))



Shape: (7043, 20)


**Scale numeric features**

In [10]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.functions import vector_to_array
from pyspark.sql.functions import col

# --- Select numeric columns to scale ---
scale_cols = [c for c in ["tenure", "MonthlyCharges", "TotalCharges"] if c in df.columns]

# Assemble into a single features vector
assembler = VectorAssembler(inputCols=scale_cols, outputCol="features")
df_vec = assembler.transform(df)

# Apply StandardScaler (z-score scaling)
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
scaler_model = scaler.fit(df_vec)
df_scaled = scaler_model.transform(df_vec)

# Convert the vector to an array so we can index it
df_scaled = df_scaled.withColumn("scaled_arr", vector_to_array(col("scaled_features")))

# Extract scaled values back into original columns
for i, c in enumerate(scale_cols):
    df_scaled = df_scaled.withColumn(c, col("scaled_arr")[i])

# Drop helper columns
df_scaled = df_scaled.drop("features", "scaled_features", "scaled_arr")

print("Scaled columns:", scale_cols)
df_scaled.select(scale_cols).show(5)


Scaled columns: ['tenure', 'MonthlyCharges', 'TotalCharges']
+-------------------+--------------------+--------------------+
|             tenure|      MonthlyCharges|        TotalCharges|
+-------------------+--------------------+--------------------+
|-1.2773538915070017| -1.1602405389153594| -0.9941685482090249|
|0.06632271016746097|-0.25961050958945525|-0.17322967294207875|
|-1.2366364187289876|-0.36263460888503846|  -0.959603165511527|
| 0.5142149107256152| -0.7464824627121305|-0.19475026561772396|
|-1.2366364187289876| 0.19735122115708273| -0.9404001751240282|
+-------------------+--------------------+--------------------+
only showing top 5 rows



**Saving the CSV file**

In [11]:
save_path = "/content/drive/MyDrive/CSE 4262 Project/Preprocessed_Churn_PySpark.csv"
df_scaled.toPandas().to_csv(save_path, index=False)

print("✅ File saved at:", save_path)


✅ File saved at: /content/drive/MyDrive/CSE 4262 Project/Preprocessed_Churn_PySpark.csv
