In [9]:
import pyspark
import warnings
warnings.filterwarnings("ignore")

In [10]:
from pyspark.sql import SparkSession

In [11]:
spark = SparkSession.builder.appName('missing_values').getOrCreate()
spark

In [12]:
# Read dataset
data_path = 'data_telecom_churn.csv'
df = spark.read.csv(data_path, header=True, inferSchema=True)
# Show the df
df.show()

+----------+------+----------------+--------------------+--------------+------------+-----+
|customerID|gender|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+----------------+--------------------+--------------+------------+-----+
|7590-VHVEG|Female|             Yes|    Electronic check|         29.85|       29.85|   No|
|5575-GNVDE|  Male|              No|                null|         56.95|        null|   No|
|3668-QPYBK|  Male|             Yes|        Mailed check|         53.85|      108.15|  Yes|
|7795-CFOCW|  Male|              No|Bank transfer (au...|          null|     1840.75|   No|
|9237-HQITU|Female|             Yes|    Electronic check|          70.7|      151.65|  Yes|
|9305-CDSKC|  null|             Yes|    Electronic check|         99.65|       820.5|  Yes|
|1452-KIOVK|  Male|             Yes|Credit card (auto...|          89.1|      1949.4|   No|
|6713-OKOMC|Female|              No|        Mailed check|         29.75|       3

#### Show the size of dataframe

In [13]:
def sparkShape(dataframe):
    return (dataframe.count(), len(dataframe.columns))

In [15]:
print('The size of the dataframe before removing rows with missing values:', sparkShape(df))

The size of the dataframe before removing rows with missing values: (13, 7)


#### Drop all rows with any missing value

In [16]:
new_df = df.na.drop()
new_df.show()

+----------+------+----------------+--------------------+--------------+------------+-----+
|customerID|gender|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+----------------+--------------------+--------------+------------+-----+
|7590-VHVEG|Female|             Yes|    Electronic check|         29.85|       29.85|   No|
|3668-QPYBK|  Male|             Yes|        Mailed check|         53.85|      108.15|  Yes|
|9237-HQITU|Female|             Yes|    Electronic check|          70.7|      151.65|  Yes|
|1452-KIOVK|  Male|             Yes|Credit card (auto...|          89.1|      1949.4|   No|
|6713-OKOMC|Female|              No|        Mailed check|         29.75|       301.9|   No|
|6388-TABGU|  Male|              No|Bank transfer (au...|         56.15|     3487.95|   No|
|7469-LKBCI|  Male|              No|Credit card (auto...|         18.95|       326.8|   No|
|8091-TTVAX|  Male|              No|Credit card (auto...|        100.35|      56

In [17]:
print('The size of the dataframe after removing rows with missing values:', sparkShape(new_df))

The size of the dataframe after removing rows with missing values: (8, 7)


#### Replace missing values with another value.

In [19]:
# Replace missing values for the 'PaperlessBilling' column with a string 'No record'.
df.na.fill('No record', ['PaperlessBilling']).show()

+----------+------+----------------+--------------------+--------------+------------+-----+
|customerID|gender|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+----------------+--------------------+--------------+------------+-----+
|7590-VHVEG|Female|             Yes|    Electronic check|         29.85|       29.85|   No|
|5575-GNVDE|  Male|              No|                null|         56.95|        null|   No|
|3668-QPYBK|  Male|             Yes|        Mailed check|         53.85|      108.15|  Yes|
|7795-CFOCW|  Male|              No|Bank transfer (au...|          null|     1840.75|   No|
|9237-HQITU|Female|             Yes|    Electronic check|          70.7|      151.65|  Yes|
|9305-CDSKC|  null|             Yes|    Electronic check|         99.65|       820.5|  Yes|
|1452-KIOVK|  Male|             Yes|Credit card (auto...|          89.1|      1949.4|   No|
|6713-OKOMC|Female|              No|        Mailed check|         29.75|       3

In [23]:
# Replace missing values with the mean.
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['MonthlyCharges', 'TotalCharges'],
    outputCols=['MonlyCharges_new', 'TotalCharges_new']
).setStrategy("median")

In [24]:
# Add new imputed columns to df.
imputer.fit(df).transform(df).show()

+----------+------+----------------+--------------------+--------------+------------+-----+----------------+----------------+
|customerID|gender|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|MonlyCharges_new|TotalCharges_new|
+----------+------+----------------+--------------------+--------------+------------+-----+----------------+----------------+
|7590-VHVEG|Female|             Yes|    Electronic check|         29.85|       29.85|   No|           29.85|           29.85|
|5575-GNVDE|  Male|              No|                null|         56.95|        null|   No|           56.95|          587.45|
|3668-QPYBK|  Male|             Yes|        Mailed check|         53.85|      108.15|  Yes|           53.85|          108.15|
|7795-CFOCW|  Male|              No|Bank transfer (au...|          null|     1840.75|   No|           56.15|         1840.75|
|9237-HQITU|Female|             Yes|    Electronic check|          70.7|      151.65|  Yes|            70.7|          