In [1]:
import findspark
findspark.init()
findspark.find()

'C:\\spark\\spark-3.4.2-bin-hadoop3'

In [2]:
#PySpark Libraries
from pyspark.ml.feature import Imputer
from pyspark.sql.functions import *
from pyspark.sql.types import * 
import pandas as pd
from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkConf, SparkContext
from pyspark import SparkFiles

In [3]:
# Create a Spark session
spark = SparkSession.\
    builder.\
    appName("TelecomCustomerChurn_Cleaning_EDA").\
    config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.0").\
    getOrCreate()

In [4]:
spark

## Data Cleaning

In [6]:
# give the appropirate datatype to the columns of the data.
schema = StructType(
                        [
                            StructField("customerID", StringType(), True),
                            StructField("gender", StringType(), True),
                            StructField("SeniorCitizen", StringType(), True),
                            StructField("Partner", StringType(), True),
                            StructField("Dependents", StringType(), True),
                            StructField("tenure", DoubleType(), True),
                            StructField("PhoneService", StringType(), True),
                            StructField("MultipleLines", StringType(), True),
                            StructField("InternetService", StringType(), True),
                            StructField("OnlineSecurity", StringType(), True),
                            StructField("OnlineBackup", StringType(), True),
                            StructField("DeviceProtection", StringType(), True),
                            StructField("TechSupport", StringType(), True),
                            StructField("StreamingTV", StringType(), True),
                            StructField("StreamingMovies", StringType(), True),
                            StructField("Contract", StringType(), True),
                            StructField("PaperlessBilling", StringType(), True),
                            StructField("PaymentMethod", StringType(), True),
                            StructField("MonthlyCharges", DoubleType(), True),
                            StructField("TotalCharges", DoubleType(), True),
                            StructField("Churn", StringType(), True),
                        ]
                    )

In [7]:
# Load the dataset
data_path = "telecome__churn__data.csv" 
churn_data = spark.read.csv(data_path,  schema= schema ,header= True, nullValue= " ")

In [8]:
# Show the first 5 rows of the dataframe
# churn_data.show(5)
churn_data.show(3, truncate=False, vertical=True)

-RECORD 0----------------------------
 customerID       | 7590-VHVEG       
 gender           | Female           
 SeniorCitizen    | 0                
 Partner          | Yes              
 Dependents       | No               
 tenure           | 1.0              
 PhoneService     | No               
 MultipleLines    | No phone service 
 InternetService  | DSL              
 OnlineSecurity   | No               
 OnlineBackup     | Yes              
 DeviceProtection | No               
 TechSupport      | No               
 StreamingTV      | No               
 StreamingMovies  | No               
 Contract         | Month-to-month   
 PaperlessBilling | Yes              
 PaymentMethod    | Electronic check 
 MonthlyCharges   | 29.85            
 TotalCharges     | 29.85            
 Churn            | No               
-RECORD 1----------------------------
 customerID       | 5575-GNVDE       
 gender           | Male             
 SeniorCitizen    | 0                
 Partner    

In [9]:
churn_data.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: string (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: double (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: double (nullable = true)
 |-- Churn: string (nullable = true)



In [10]:
# # SQL Select query
# spark.sql("SELECT * FROM telecom_churn") \
#      .show(5)

In [11]:
# Number of rows
num_rows = churn_data.count()

# Number of columns
num_columns = len(churn_data.columns)

print(f"Number of rows: {num_rows}")
print(f"Number of columns: {num_columns}")

Number of rows: 100000
Number of columns: 21


In [12]:
header_values = churn_data.columns
print(header_values)

['customerID', 'gender', 'SeniorCitizen', 'Partner', 'Dependents', 'tenure', 'PhoneService', 'MultipleLines', 'InternetService', 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport', 'StreamingTV', 'StreamingMovies', 'Contract', 'PaperlessBilling', 'PaymentMethod', 'MonthlyCharges', 'TotalCharges', 'Churn']


## Feature Information

 - **Gender:** The customer’s gender: Male, Female
 - **Senior Citizen:** Indicates if the customer is 65 or older: Yes==1, No==0

 - **Phone Service:** Indicates if the customer subscribes to home phone service with the company: Yes, No

 - **Multiple Lines:** Indicates if the customer subscribes to multiple telephone lines with the company: Yes, No

 - **Internet Service:** Indicates if the customer subscribes to Internet service with the company: No, DSL, Fiber Optic, Cable.
 - **Tenure in Months:** Indicates the total amount of months that the customer has been with the company by the end of the quarter specified above.
 - **Online Security:** Indicates if the customer subscribes to an additional online security service provided by the company: Yes, No
 - **Online Backup:** Indicates if the customer subscribes to an additional online backup service provided by the company: Yes, No
 - **Device Protection Plan:** Indicates if the customer subscribes to an additional device protection plan for their Internet equipment provided by the company: Yes, No
 - **Tech Support:** Indicates if the customer subscribes to an additional technical support plan from the company with reduced wait times: Yes, No
 - **Streaming TV:** Indicates if the customer uses their Internet service to stream television programing from a third party provider: Yes, No. The company does not charge an additional fee for this service. 
 - **Streaming Movies:** Indicates if the customer uses their Internet service to stream movies from a third party provider: Yes, No. The company does not charge an additional fee for this service.
 - **Contract:** Indicates the customer’s current contract type: Month-to-Month, One Year, Two Year.
 - **Paperless Billing:** Indicates if the customer has chosen paperless billing: Yes, No
 - **Payment Method:** Indicates how the customer pays their bill: Bank Withdrawal, Credit Card, Mailed Check
 - **Monthly Charge:** Indicates the customer’s current total monthly charge for all their services from the company.
 - **Total Charges:** Indicates the customer’s total charges, calculated to the end of the quarter specified above.
 - **Churn :** Yes = the customer left the company. No = the customer remained with the company. Directly related to Churn Value.

In [13]:
# Get the data types of all columns
data_types = churn_data.dtypes

# Print the data types
for column, dtype in data_types:
    print(f"{column} ----> Data Type: {dtype}")

customerID ----> Data Type: string
gender ----> Data Type: string
SeniorCitizen ----> Data Type: string
Partner ----> Data Type: string
Dependents ----> Data Type: string
tenure ----> Data Type: double
PhoneService ----> Data Type: string
MultipleLines ----> Data Type: string
InternetService ----> Data Type: string
OnlineSecurity ----> Data Type: string
OnlineBackup ----> Data Type: string
DeviceProtection ----> Data Type: string
TechSupport ----> Data Type: string
StreamingTV ----> Data Type: string
StreamingMovies ----> Data Type: string
Contract ----> Data Type: string
PaperlessBilling ----> Data Type: string
PaymentMethod ----> Data Type: string
MonthlyCharges ----> Data Type: double
TotalCharges ----> Data Type: double
Churn ----> Data Type: string


In [14]:
# Checking for the missing values.
churn_data.select([count(when(isnull(c),c)).alias(c) for c in churn_data.columns]).show(vertical= True)

-RECORD 0---------------
 customerID       | 0   
 gender           | 0   
 SeniorCitizen    | 0   
 Partner          | 0   
 Dependents       | 0   
 tenure           | 0   
 PhoneService     | 0   
 MultipleLines    | 0   
 InternetService  | 0   
 OnlineSecurity   | 0   
 OnlineBackup     | 0   
 DeviceProtection | 0   
 TechSupport      | 0   
 StreamingTV      | 0   
 StreamingMovies  | 0   
 Contract         | 0   
 PaperlessBilling | 0   
 PaymentMethod    | 0   
 MonthlyCharges   | 0   
 TotalCharges     | 11  
 Churn            | 0   



In [15]:
#Repalce the null Values using imputer
#----- Chnage
mean_col = Imputer(
       inputCols=['TotalCharges'], 
       outputCols=["{}".format(c) for c in ['TotalCharges']]
       ).setStrategy("mean")

In [16]:
churn_data=mean_col.fit(churn_data).transform(churn_data)

In [17]:
# Checking for the missing values.
churn_data.select([count(when(isnull(c),c)).alias(c) for c in churn_data.columns]).show(vertical= True)

-RECORD 0---------------
 customerID       | 0   
 gender           | 0   
 SeniorCitizen    | 0   
 Partner          | 0   
 Dependents       | 0   
 tenure           | 0   
 PhoneService     | 0   
 MultipleLines    | 0   
 InternetService  | 0   
 OnlineSecurity   | 0   
 OnlineBackup     | 0   
 DeviceProtection | 0   
 TechSupport      | 0   
 StreamingTV      | 0   
 StreamingMovies  | 0   
 Contract         | 0   
 PaperlessBilling | 0   
 PaymentMethod    | 0   
 MonthlyCharges   | 0   
 TotalCharges     | 0   
 Churn            | 0   



In [18]:
# Check for duplicates
duplicates = churn_data.groupBy(churn_data.columns).count().filter("count > 1").count()
print(f"Number of duplicate rows: {duplicates}")

# If duplicates exist, drop them
churn_data = churn_data.dropDuplicates()

# Confirm row count after dropping duplicates
print(f"Rows after dropping duplicates: {churn_data.count()}")


Number of duplicate rows: 0
Rows after dropping duplicates: 100000


In [19]:
from pyspark.sql.types import NumericType

# Step 1: Identify numerical columns
numerical_columns = [field.name for field in churn_data.schema.fields if isinstance(field.dataType, NumericType)]

# Step 2: Apply `summary()` to numerical columns
summary_df = churn_data.select(numerical_columns).summary()

# Show the summary statistics including percentiles
summary_df.show(truncate=False)

+-------+------------------+------------------+------------------+
|summary|tenure            |MonthlyCharges    |TotalCharges      |
+-------+------------------+------------------+------------------+
|count  |100000            |100000            |100000            |
|mean   |35.73362          |68.52964770000008 |2461.3409484043245|
|stddev |21.334209926330324|29.485682294182872|1925.8421920648254|
|min    |0.0               |18.0              |0.0               |
|25%    |17.0              |43.16             |911.44            |
|50%    |36.0              |69.02             |1984.32           |
|75%    |54.0              |93.84             |3659.58           |
|max    |72.0              |120.0             |8684.8            |
+-------+------------------+------------------+------------------+



### Insights we get from the data
-**75% of customer having tenure less than 55 montths**

-**Average monthly charges USD 64.76 where as 25% of customer pay more than USD 89.85** 

In [20]:
mongo_ip = "mongodb://localhost:27017/acts0324.telecom_churn_data"
churn_data.write.format("com.mongodb.spark.sql.DefaultSource")\
            .option("uri", mongo_ip).save()