In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark=SparkSession.builder.appName('Missing').getOrCreate()
df_pyspark_missing=spark.read.csv('test2.csv',header=True,inferSchema=True)
df_pyspark_missing.printSchema()
df_pyspark_missing.head()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



Row(Name='Krish', age=31, Experience=10, Salary=30000)

In [3]:
# check for rows that have null
df_pyspark_missing.filter(df_pyspark_missing.age.isNull() & df_pyspark_missing.Experience.isNull()).show()
# df_pyspark_missing.filter(df_pyspark_missing.age.isNull() & df_pyspark_missing.Name.isNull()).show()

# check rows that do not have null
df_pyspark_missing.filter(df_pyspark_missing.Name.isNotNull()).show()
df_pyspark_missing.filter(df_pyspark_missing.age.isNotNull()).show()
df_pyspark_missing.show()

+------+----+----------+------+
|  Name| age|Experience|Salary|
+------+----+----------+------+
|Mahesh|NULL|      NULL| 40000|
+------+----+----------+------+

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
+---------+----+----------+------+

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|     NULL| 34|        10| 38000|
|     NULL| 36|      NULL|  NULL|
+---------+---+----------+------+

+---------+----+----------+------+
|     Nam

In [16]:
# lets drop rows that have null value for age , below line however will remove all rows that have any null value for any column
# df_pyspark_missing=df_pyspark_missing.na.drop().show()

# running below will not replace the actual dataframe
df_pyspark_missing.na.drop().show()

# print actual dataframe to check if it still have null values
df_pyspark_missing.show()

# how – This takes values ‘any’ or ‘all’. By using ‘any’, drop a row if it contains NULLs on any columns. By using ‘all’, drop a row only if all columns have NULL values. Default is ‘any’.
# thresh – This takes int value, Drop rows that have less than thresh hold non-null values. Default is ‘None’.
# subset – Use this to select the columns for NULL values. Default is ‘None.

df_pyspark_missing.na.drop(how="any").show()

df_pyspark_missing.na.drop(how="any",thresh=3).show()

# remove rows that have None or na values for column=age
df_pyspark_missing.na.drop(how="any",subset=['Age']).show()

# replace with any integer for experience/age since they are integers
df_pyspark_missing.na.fill(0,['Experience','age']).show()



+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     NULL|  34|        10| 38000|
|     NULL|  36|      NULL|  NULL|
+---------+----+----------+------+

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|

In [18]:
df_pyspark_missing.show()

# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Imputer.html
# using pyspark ml Imputer library to impute the missing values with the median of the column
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['age', 'Experience', 'Salary'], 
    outputCols=["{}_imputed".format(c) for c in ['age', 'Experience', 'Salary']]
    ).setStrategy("median")

imputer.fit(df_pyspark_missing).transform(df_pyspark_missing).show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     NULL|  34|        10| 38000|
|     NULL|  36|      NULL|  NULL|
+---------+----+----------+------+

+---------+----+----------+------+-----------+------------------+--------------+
|     Name| age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+---------+----+----------+------+-----------+------------------+--------------+
|    Krish|  31|        10| 30000|         31|                10|         30000|
|Sudhanshu|  30|         8| 25000|         30|                 8|         25000|
|    Sunny|  29|         4| 20000|         29|                 4|         20000|
|     Paul|  24|         3| 20000|         24|            