In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline

In [2]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("ChurnPreprocessing") \
    .config("spark.ui.port", "4040") \
    .getOrCreate()

25/03/10 20:47:58 WARN Utils: Your hostname, 536a82ee9c48c407 resolves to a loopback address: 127.0.0.1; using 10.62.18.194 instead (on interface eth0)
25/03/10 20:47:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/10 20:47:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Load dataset
df = spark.read.csv("/root/code/Customer-churn-project/Data/WA_Fn-UseC_-Telco-Customer-Churn.csv", header=True, inferSchema=True)

# Drop Unnecessary Column
df = df.drop("customerID")

# Convert TotalCharges to Numeric & Handle Missing Values
df = df.withColumn("TotalCharges", col("TotalCharges").cast(DoubleType()))
median_value = df.approxQuantile("TotalCharges", [0.5], 0.01)[0]
df = df.fillna({"TotalCharges": median_value})

                                                                                

In [4]:
# Replace "No internet service" & "No phone service" with "No"
replace_cols = ["MultipleLines", "OnlineSecurity", "OnlineBackup", "DeviceProtection", "TechSupport", "StreamingTV", "StreamingMovies"]
for col_name in replace_cols:
    df = df.withColumn(col_name, when(col(col_name) == "No internet service", "No").otherwise(col(col_name)))

df = df.withColumn("PhoneService", when(col("PhoneService") == "No phone service", "No").otherwise(col("PhoneService")))

# Convert gender to Numeric
df = df.withColumn("gender", when(col("gender") == "Female", 1).otherwise(0))

In [5]:
# One-Hot Encoding for Multi-Class Categorical Features
categorical_cols = ['InternetService', 'Contract', 'PaymentMethod']
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index") for col in categorical_cols]
encoders = [OneHotEncoder(inputCol=col+"_index", outputCol=col+"_encoded") for col in categorical_cols]

pipeline = Pipeline(stages=indexers + encoders)
df = pipeline.fit(df).transform(df)
df = df.drop(*categorical_cols)  # Drop original categorical columns

# Convert Binary Columns to Numeric
binary_cols = ['Partner', 'Dependents', 'PhoneService', 'MultipleLines', 
               'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 
               'TechSupport', 'StreamingTV', 'StreamingMovies', 
               'PaperlessBilling', 'Churn']

for col_name in binary_cols:
    df = df.withColumn(col_name, when(col(col_name) == "Yes", 1).otherwise(0))

                                                                                

In [6]:
# Feature Scaling for Numerical Features
num_cols = ["tenure", "MonthlyCharges", "TotalCharges"]
assembler = VectorAssembler(inputCols=num_cols, outputCol="features_unscaled")
scaler = MinMaxScaler(inputCol="features_unscaled", outputCol="features")

pipeline = Pipeline(stages=[assembler, scaler])
df = pipeline.fit(df).transform(df)

df = df.drop(*num_cols)  # Drop original numerical columns

# Split Data into Train & Test
train_df, test_df = df.randomSplit([0.67, 0.33], seed=42)

In [7]:
train_df.printSchema()

root
 |-- gender: integer (nullable = false)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: integer (nullable = false)
 |-- Dependents: integer (nullable = false)
 |-- PhoneService: integer (nullable = false)
 |-- MultipleLines: integer (nullable = false)
 |-- OnlineSecurity: integer (nullable = false)
 |-- OnlineBackup: integer (nullable = false)
 |-- DeviceProtection: integer (nullable = false)
 |-- TechSupport: integer (nullable = false)
 |-- StreamingTV: integer (nullable = false)
 |-- StreamingMovies: integer (nullable = false)
 |-- PaperlessBilling: integer (nullable = false)
 |-- Churn: integer (nullable = false)
 |-- InternetService_index: double (nullable = false)
 |-- Contract_index: double (nullable = false)
 |-- PaymentMethod_index: double (nullable = false)
 |-- InternetService_encoded: vector (nullable = true)
 |-- Contract_encoded: vector (nullable = true)
 |-- PaymentMethod_encoded: vector (nullable = true)
 |-- features_unscaled: vector (nullable = true)
 

In [9]:
# Convert to Pandas DataFrames
train_pandas = train_df.toPandas()
test_pandas = test_df.toPandas()

# Save to CSV
train_pandas.to_csv("/root/code/Customer-churn-project/Data/train_data1.csv", index=False)
test_pandas.to_csv("/root/code/Customer-churn-project/Data/test_data1.csv", index=False)

print("✅ Preprocessing complete! Data saved successfully.")

✅ Preprocessing complete! Data saved successfully.


In [26]:
train_df_csv= train_df.drop("InternetService_encoded", "Contract_encoded", 
                            "PaymentMethod_encoded", "features_unscaled", "features")

test_df_csv= test_df.drop("InternetService_encoded", "Contract_encoded", 
                           "PaymentMethod_encoded", "features_unscaled", "features")


# Save to CSV
train_df_csv.write.csv("/root/code/Customer-churn-project/Data/train_data.csv", header=True, mode="overwrite")
test_df_csv.write.csv("/root/code/Customer-churn-project/Data/test_data.csv", header=True, mode="overwrite")

print("✅ Preprocessing complete! Data saved successfully.")


                                                                                

✅ Preprocessing complete! Data saved successfully.


In [10]:
spark.stop()