#### Testing
- Handling missing values
- Conditional drop of rows
- Filling the missing values

In [67]:
import pyspark

In [68]:
from pyspark.sql import SparkSession

In [69]:
spark = SparkSession.builder.appName('SparkDemo').getOrCreate()

In [70]:
df_pyspark=spark.read.csv('test2.csv',header=True,inferSchema=True)

In [71]:
df_pyspark.show()

+------+----+--------+----------+
|  Name| Age| Company|SampleRank|
+------+----+--------+----------+
|   Sai|  30| ABC Inc|         1|
| Kumar|  29| DEF INC|         2|
|Chetan|  33|Cool INC|         3|
| James|  26|    null|         4|
|  Bond|null|     PQR|         5|
|  null|  22|    null|         6|
|Cruise|  44|    null|         7|
|  John|null|Test INC|         8|
|   Doe|  20|    null|         9|
+------+----+--------+----------+



In [72]:
# drop rows with null values
df_pyspark.na.drop().show()

+------+---+--------+----------+
|  Name|Age| Company|SampleRank|
+------+---+--------+----------+
|   Sai| 30| ABC Inc|         1|
| Kumar| 29| DEF INC|         2|
|Chetan| 33|Cool INC|         3|
+------+---+--------+----------+



In [73]:
# conditional drop with 'how'
df_pyspark.na.drop(how='all').show()

+------+----+--------+----------+
|  Name| Age| Company|SampleRank|
+------+----+--------+----------+
|   Sai|  30| ABC Inc|         1|
| Kumar|  29| DEF INC|         2|
|Chetan|  33|Cool INC|         3|
| James|  26|    null|         4|
|  Bond|null|     PQR|         5|
|  null|  22|    null|         6|
|Cruise|  44|    null|         7|
|  John|null|Test INC|         8|
|   Doe|  20|    null|         9|
+------+----+--------+----------+



In [74]:
# conditional drop with 'thresh'
# it will check atleast 2(thresh) non null values.if not it will drop
df_pyspark.na.drop(how='all',thresh=2).show()

+------+----+--------+----------+
|  Name| Age| Company|SampleRank|
+------+----+--------+----------+
|   Sai|  30| ABC Inc|         1|
| Kumar|  29| DEF INC|         2|
|Chetan|  33|Cool INC|         3|
| James|  26|    null|         4|
|  Bond|null|     PQR|         5|
|  null|  22|    null|         6|
|Cruise|  44|    null|         7|
|  John|null|Test INC|         8|
|   Doe|  20|    null|         9|
+------+----+--------+----------+



In [75]:
# conditional drop with 'subset'
df_pyspark.na.drop(subset=['Company','Name']).show()

+------+----+--------+----------+
|  Name| Age| Company|SampleRank|
+------+----+--------+----------+
|   Sai|  30| ABC Inc|         1|
| Kumar|  29| DEF INC|         2|
|Chetan|  33|Cool INC|         3|
|  Bond|null|     PQR|         5|
|  John|null|Test INC|         8|
+------+----+--------+----------+



In [76]:
# fill null with some string
df_pyspark.na.fill('Missing').show()

+-------+----+--------+----------+
|   Name| Age| Company|SampleRank|
+-------+----+--------+----------+
|    Sai|  30| ABC Inc|         1|
|  Kumar|  29| DEF INC|         2|
| Chetan|  33|Cool INC|         3|
|  James|  26| Missing|         4|
|   Bond|null|     PQR|         5|
|Missing|  22| Missing|         6|
| Cruise|  44| Missing|         7|
|   John|null|Test INC|         8|
|    Doe|  20| Missing|         9|
+-------+----+--------+----------+



In [77]:
# fill null with some string
df_pyspark.na.fill({'Age': '-1','Name':'Missing Name','Company':'Missing Company'}).show()

+------------+---+---------------+----------+
|        Name|Age|        Company|SampleRank|
+------------+---+---------------+----------+
|         Sai| 30|        ABC Inc|         1|
|       Kumar| 29|        DEF INC|         2|
|      Chetan| 33|       Cool INC|         3|
|       James| 26|Missing Company|         4|
|        Bond| -1|            PQR|         5|
|Missing Name| 22|Missing Company|         6|
|      Cruise| 44|Missing Company|         7|
|        John| -1|       Test INC|         8|
|         Doe| 20|Missing Company|         9|
+------------+---+---------------+----------+



In [80]:
# replace with imputed values
from pyspark.ml.feature import Imputer
imputer = Imputer(
    inputCols=['Age','SampleRank'],
    outputCols=["{}_imputed".format(c) for c in ['Age','SampleRank']]
).setStrategy('mean')

In [81]:
imputer.fit(df_pyspark).transform(df_pyspark).show()

+------+----+--------+----------+-----------+------------------+
|  Name| Age| Company|SampleRank|Age_imputed|SampleRank_imputed|
+------+----+--------+----------+-----------+------------------+
|   Sai|  30| ABC Inc|         1|         30|                 1|
| Kumar|  29| DEF INC|         2|         29|                 2|
|Chetan|  33|Cool INC|      null|         33|                 5|
| James|  26|    null|         4|         26|                 4|
|  Bond|null|     PQR|         5|         29|                 5|
|  null|  22|    null|      null|         22|                 5|
|Cruise|  44|    null|         7|         44|                 7|
|  John|null|Test INC|         8|         29|                 8|
|   Doe|  20|    null|         9|         20|                 9|
+------+----+--------+----------+-----------+------------------+

