Here we are going to learn how to,

* drop rows with null values based on different criterias
* handle null values by replacing them

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark_session = SparkSession.builder.appName("pyspark_dataframe_part_02").getOrCreate()
spark_session

In [3]:
spark_df = spark_session.read.csv("file:///mnt/92D26AE0D26AC7D5/Python/pyspark/ararental.csv", header=True, inferSchema=True)
spark_df.show()

+---------+----------------+---------------+--------------+--------------+-----+-----+
|Vendor_ID|     Vendor_Name|        Address|          City|           Fax|Years|Sales|
+---------+----------------+---------------+--------------+--------------+-----+-----+
|   185227|American Rental |100 Grantley Ct| Sandy Springs|800  -714-7422|    3| 4530|
|    16054|    Attema Sales|  117 E 13th St|         Pella|641  -628-4983| null| 2300|
|   195852|     B & S Sales|  218 Maquan St|        Hanson|          null|    2| 8000|
|    35600|            null|    PO Box 3374|South Pasadena|818  -276-8409|    2| null|
|      761|            null|           null|   Lees Summit|816  -524-6983|    5| 5000|
+---------+----------------+---------------+--------------+--------------+-----+-----+



In [4]:
# with na.drop() we can drop all rows which contain null values(each field should contain null value)
spark_df.na.drop().show()

+---------+----------------+---------------+-------------+--------------+-----+-----+
|Vendor_ID|     Vendor_Name|        Address|         City|           Fax|Years|Sales|
+---------+----------------+---------------+-------------+--------------+-----+-----+
|   185227|American Rental |100 Grantley Ct|Sandy Springs|800  -714-7422|    3| 4530|
+---------+----------------+---------------+-------------+--------------+-----+-----+



In [5]:
# there are three arguments we can use with drop as 'how', 'thresh' and 'subset'
# by set 'how' value as 'all' we can drop all rows with all values are null 
spark_df.na.drop(how="all").show()

+---------+----------------+---------------+--------------+--------------+-----+-----+
|Vendor_ID|     Vendor_Name|        Address|          City|           Fax|Years|Sales|
+---------+----------------+---------------+--------------+--------------+-----+-----+
|   185227|American Rental |100 Grantley Ct| Sandy Springs|800  -714-7422|    3| 4530|
|    16054|    Attema Sales|  117 E 13th St|         Pella|641  -628-4983| null| 2300|
|   195852|     B & S Sales|  218 Maquan St|        Hanson|          null|    2| 8000|
|    35600|            null|    PO Box 3374|South Pasadena|818  -276-8409|    2| null|
|      761|            null|           null|   Lees Summit|816  -524-6983|    5| 5000|
+---------+----------------+---------------+--------------+--------------+-----+-----+



In [6]:
# by set 'how' value as 'any' we can drop all rows with any value is null 
# this work similar to spark_df.na.drop()
spark_df.na.drop(how="any").show()

+---------+----------------+---------------+-------------+--------------+-----+-----+
|Vendor_ID|     Vendor_Name|        Address|         City|           Fax|Years|Sales|
+---------+----------------+---------------+-------------+--------------+-----+-----+
|   185227|American Rental |100 Grantley Ct|Sandy Springs|800  -714-7422|    3| 4530|
+---------+----------------+---------------+-------------+--------------+-----+-----+



In [7]:
# by set 'thresh' value as some integer and this means, each row should have at least given number of non-null values and if not it should be dropped
# here if the row not contain at least 5 non-null values, then drop it
spark_df.na.drop(thresh=5).show()

+---------+----------------+---------------+--------------+--------------+-----+-----+
|Vendor_ID|     Vendor_Name|        Address|          City|           Fax|Years|Sales|
+---------+----------------+---------------+--------------+--------------+-----+-----+
|   185227|American Rental |100 Grantley Ct| Sandy Springs|800  -714-7422|    3| 4530|
|    16054|    Attema Sales|  117 E 13th St|         Pella|641  -628-4983| null| 2300|
|   195852|     B & S Sales|  218 Maquan St|        Hanson|          null|    2| 8000|
|    35600|            null|    PO Box 3374|South Pasadena|818  -276-8409|    2| null|
|      761|            null|           null|   Lees Summit|816  -524-6983|    5| 5000|
+---------+----------------+---------------+--------------+--------------+-----+-----+



In [8]:
# with 'subset' we can define which columns should be considered for null-drop operation and other columns are not considered
spark_df.na.drop(subset=["Address", "Fax"]).show()

+---------+----------------+---------------+--------------+--------------+-----+-----+
|Vendor_ID|     Vendor_Name|        Address|          City|           Fax|Years|Sales|
+---------+----------------+---------------+--------------+--------------+-----+-----+
|   185227|American Rental |100 Grantley Ct| Sandy Springs|800  -714-7422|    3| 4530|
|    16054|    Attema Sales|  117 E 13th St|         Pella|641  -628-4983| null| 2300|
|    35600|            null|    PO Box 3374|South Pasadena|818  -276-8409|    2| null|
+---------+----------------+---------------+--------------+--------------+-----+-----+



In [9]:
# we can use combinations of these arguments
# here I drop each row if its' both "Vendor_Name" and "Fax" values are null
spark_df.na.drop(how="all", subset=["Vendor_Name", "Fax"]).show()

+---------+----------------+---------------+--------------+--------------+-----+-----+
|Vendor_ID|     Vendor_Name|        Address|          City|           Fax|Years|Sales|
+---------+----------------+---------------+--------------+--------------+-----+-----+
|   185227|American Rental |100 Grantley Ct| Sandy Springs|800  -714-7422|    3| 4530|
|    16054|    Attema Sales|  117 E 13th St|         Pella|641  -628-4983| null| 2300|
|   195852|     B & S Sales|  218 Maquan St|        Hanson|          null|    2| 8000|
|    35600|            null|    PO Box 3374|South Pasadena|818  -276-8409|    2| null|
|      761|            null|           null|   Lees Summit|816  -524-6983|    5| 5000|
+---------+----------------+---------------+--------------+--------------+-----+-----+



In [10]:
# now we are going to fill all the missing values
spark_df.na.fill("FILLED").show()

+---------+----------------+---------------+--------------+--------------+-----+-----+
|Vendor_ID|     Vendor_Name|        Address|          City|           Fax|Years|Sales|
+---------+----------------+---------------+--------------+--------------+-----+-----+
|   185227|American Rental |100 Grantley Ct| Sandy Springs|800  -714-7422|    3| 4530|
|    16054|    Attema Sales|  117 E 13th St|         Pella|641  -628-4983| null| 2300|
|   195852|     B & S Sales|  218 Maquan St|        Hanson|        FILLED|    2| 8000|
|    35600|          FILLED|    PO Box 3374|South Pasadena|818  -276-8409|    2| null|
|      761|          FILLED|         FILLED|   Lees Summit|816  -524-6983|    5| 5000|
+---------+----------------+---------------+--------------+--------------+-----+-----+



In [11]:
# here we going to fill missing values for given columns
spark_df.na.fill("FILLED", ["Vendor_Name", "Address"]).show()

+---------+----------------+---------------+--------------+--------------+-----+-----+
|Vendor_ID|     Vendor_Name|        Address|          City|           Fax|Years|Sales|
+---------+----------------+---------------+--------------+--------------+-----+-----+
|   185227|American Rental |100 Grantley Ct| Sandy Springs|800  -714-7422|    3| 4530|
|    16054|    Attema Sales|  117 E 13th St|         Pella|641  -628-4983| null| 2300|
|   195852|     B & S Sales|  218 Maquan St|        Hanson|          null|    2| 8000|
|    35600|          FILLED|    PO Box 3374|South Pasadena|818  -276-8409|    2| null|
|      761|          FILLED|         FILLED|   Lees Summit|816  -524-6983|    5| 5000|
+---------+----------------+---------------+--------------+--------------+-----+-----+



In [17]:
# we can also replace missing values with descriptive statistics
# here I have used 'mean' and you can use 'median', mode, etc
from pyspark.ml.feature import Imputer

cols = ["Years", "Sales"]
imputer = Imputer(
    inputCols=cols,
    outputCols=["Imp_{}".format(col) for col in cols],
).setStrategy("mean")

In [18]:
#transform data 
transformer = imputer.fit(spark_df)
transformer.transform(spark_df).show()

+---------+----------------+---------------+--------------+--------------+-----+-----+---------+---------+
|Vendor_ID|     Vendor_Name|        Address|          City|           Fax|Years|Sales|Imp_Years|Imp_Sales|
+---------+----------------+---------------+--------------+--------------+-----+-----+---------+---------+
|   185227|American Rental |100 Grantley Ct| Sandy Springs|800  -714-7422|    3| 4530|        3|     4530|
|    16054|    Attema Sales|  117 E 13th St|         Pella|641  -628-4983| null| 2300|        3|     2300|
|   195852|     B & S Sales|  218 Maquan St|        Hanson|          null|    2| 8000|        2|     8000|
|    35600|            null|    PO Box 3374|South Pasadena|818  -276-8409|    2| null|        2|     4957|
|      761|            null|           null|   Lees Summit|816  -524-6983|    5| 5000|        5|     5000|
+---------+----------------+---------------+--------------+--------------+-----+-----+---------+---------+

