# PySpark Tutorial Part III: Handling Missing Values

In [5]:
from pyspark.sql import SparkSession

In [6]:
spark = SparkSession.builder.appName('HMPractice').getOrCreate()

23/04/19 13:57:49 WARN Utils: Your hostname, Zipcoders-MacBook-Pro-60.local resolves to a loopback address: 127.0.0.1; using 192.168.3.39 instead (on interface en0)
23/04/19 13:57:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/19 13:57:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [14]:
df_pyspark = spark.read.csv('wines.csv', header=True, inferSchema=True)

In [15]:
df_pyspark.show()

+----+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+
| _c0|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|
+----+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+
|1665|          6.4|            0.26|       0.24|           6.4|     0.04|               27.0|               124.0|
|1749|          7.9|            0.22|       0.38|           8.0|    0.043|               46.0|               152.0|
|1774|          6.9|            0.23|        0.4|           7.5|     0.04|               50.0|               151.0|
|1791|          6.8|            0.28|       0.36|           8.0|    0.045|               28.0|               123.0|
|1802|          6.8|            0.26|       0.34|          13.9|    0.034|               39.0|               134.0|
|1910|          5.0|            0.55|       0.14|           8.3|    0.03

23/04/19 14:06:49 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , fixed acidity, volatile acidity, citric acid, residual sugar, chlorides, free sulfur dioxide, total sulfur dioxide
 Schema: _c0, fixed acidity, volatile acidity, citric acid, residual sugar, chlorides, free sulfur dioxide, total sulfur dioxide
Expected: _c0 but found: 
CSV file: file:///Users/robert/Desktop/DataProjects/PySpark%20Tutorial/wines.csv


In [16]:
# drop column by name
df_pyspark.drop('_c0').show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+
|          6.4|            0.26|       0.24|           6.4|     0.04|               27.0|               124.0|
|          7.9|            0.22|       0.38|           8.0|    0.043|               46.0|               152.0|
|          6.9|            0.23|        0.4|           7.5|     0.04|               50.0|               151.0|
|          6.8|            0.28|       0.36|           8.0|    0.045|               28.0|               123.0|
|          6.8|            0.26|       0.34|          13.9|    0.034|               39.0|               134.0|
|          5.0|            0.55|       0.14|           8.3|    0.032|               35.0|               164.0|
|

In [None]:
# drop all rows with NaN values
# df_pyspark.na.drop()

In [None]:
# any == how - drop only rows where all values are NaN
df_pyspark.na.drop(how='all')

In [None]:
# threshold sets the threshold for non-NaN values a row must have before it is deleted
df_pyspark.na.drop(how='any', thresh=2)

In [None]:
# subset drops NaN values from a specific column
df_pyspark.na.drop(how='any', subset=[''])

In [None]:
# filling the missing values with 'Missing values'
df_pyspark.na.fill('Missing Values')

In [18]:
# filling the missing values within a subset of columns
df_pyspark.na.fill('Missing Values', ['citric acid', 'chlorides'])

DataFrame[_c0: int, fixed acidity: double, volatile acidity: double, citric acid: double, residual sugar: double, chlorides: double, free sulfur dioxide: double, total sulfur dioxide: double]

In [21]:
from pyspark.ml.feature import Imputer

In [25]:
imputer = Imputer(
    inputCols = ['citric acid', 'chlorides'],
    outputCols = ["{}_imputed".format(c) for c in ['volatililty', 'citric acid']]).setStrategy('mean')
# can be mean, median or mode

In [26]:
# add imputation columns to datafram
imputer.fit(df_pyspark).transform(df_pyspark).show()

+----+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------------------+-------------------+
| _c0|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|volatililty_imputed|citric acid_imputed|
+----+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------------------+-------------------+
|1665|          6.4|            0.26|       0.24|           6.4|     0.04|               27.0|               124.0|               0.24|               0.04|
|1749|          7.9|            0.22|       0.38|           8.0|    0.043|               46.0|               152.0|               0.38|              0.043|
|1774|          6.9|            0.23|        0.4|           7.5|     0.04|               50.0|               151.0|                0.4|               0.04|
|1791|          6.8|            0.28|       0.36|           8.0|

23/04/19 14:30:40 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , fixed acidity, volatile acidity, citric acid, residual sugar, chlorides, free sulfur dioxide, total sulfur dioxide
 Schema: _c0, fixed acidity, volatile acidity, citric acid, residual sugar, chlorides, free sulfur dioxide, total sulfur dioxide
Expected: _c0 but found: 
CSV file: file:///Users/robert/Desktop/DataProjects/PySpark%20Tutorial/wines.csv
