In [1]:
import findspark
findspark.init('/home/siddharth/spark-2.4.1-bin-hadoop2.7/')

from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('missing_val').getOrCreate()

In [3]:
df = spark.read.csv('./Python-and-Spark-for-Big-Data-master/Spark_DataFrames/ContainsNull.csv',
                     inferSchema=True,header=True)

In [4]:
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



#### 3 options for missing values
1) Keep it null <br>
2) Impute it with some value <br>
3) Drop row with null

In [5]:
# For missing value treatmeant use df.na

df.na.drop().show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [6]:
# drops rows that have less than this certain number of non-null vaules
# thresh means it should have at least 2 non null values in order to pass the threshold of being dropped

df.na.drop(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [7]:
# default ---> how is 'any'
df.na.drop(how='any').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [8]:
# if how is 'all' means drop only if all is null
df.na.drop(how='all').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [9]:
# for limiting missing value treatment on a specific col

df.na.drop(subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [10]:
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)



In [11]:
# spark does auto infering by the datatype of the col
df.na.fill('Fill values').show()

+----+-----------+-----+
|  Id|       Name|Sales|
+----+-----------+-----+
|emp1|       John| null|
|emp2|Fill values| null|
|emp3|Fill values|345.0|
|emp4|      Cindy|456.0|
+----+-----------+-----+



In [12]:
df.na.fill(0).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [13]:
# Nothing happens as column types mismatches
df.na.fill('no name',subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [14]:
df.na.fill('no name',subset=['Name']).show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|no name| null|
|emp3|no name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



##### Imputing mean values

In [15]:
from pyspark.sql.functions import mean

In [16]:
mean_val = df.select(mean(df['Sales'])).collect()

In [17]:
# again shows list of rows
mean_val

[Row(avg(Sales)=400.5)]

In [18]:
mean_val[0]

Row(avg(Sales)=400.5)

In [19]:
# row object has 1 value hence again indexing
mean_sales = mean_val[0][0]
mean_sales

400.5

In [20]:
df.na.fill(mean_sales,['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [21]:
df.na.fill(df.select(mean('Sales')).collect()[0][0],['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+

