# Missing data cleaning

In [1]:
import findspark
findspark.init('/home/ubuntu/spark-3.0.0-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession



spark = SparkSession.builder.appName('missingData').getOrCreate()

22/05/14 20:08:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
df = spark.read.csv('ContainsNull.csv', inferSchema=True, header=True)

                                                                                

In [3]:
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [4]:
df.na.drop().show()  # drop the rows that have at least one null

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [5]:
df.na.drop(thresh=2).show() # drop the rows that have at least 2 nulls

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [6]:
df.na.drop(how='all').show() # drop the rows that have all nulls

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [7]:
df.na.drop(how='any').show() # drop the rows that have at least one null

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [8]:
df.na.drop(subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [9]:
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)



In [10]:
df.na.fill("FILL VALUE").show()  # fill all the string column's null 

+----+----------+-----+
|  Id|      Name|Sales|
+----+----------+-----+
|emp1|      John| null|
|emp2|FILL VALUE| null|
|emp3|FILL VALUE|345.0|
|emp4|     Cindy|456.0|
+----+----------+-----+



In [11]:
df.na.fill(0).show() # fill all the number column's null

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [12]:
df.na.fill("No Name", subset=['Name']).show() # using subset to target certain column

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|No Name| null|
|emp3|No Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [13]:
"""
Fill the missing sales values with mean of the rest of sales
"""
from pyspark.sql.functions import mean
mean_value = df.select(mean(df['Sales'])).collect()

In [33]:
df.groupBy("id").mean().show()

                                                                                

+----+----------+
|  id|avg(Sales)|
+----+----------+
|emp3|     345.0|
|emp2|      null|
|emp4|     456.0|
|emp1|      null|
+----+----------+





In [15]:
mean_value

[Row(avg(Sales)=400.5)]

In [16]:
mean_value[0].asDict()['avg(Sales)']

400.5

In [17]:
mean_sale = mean_value[0][0]

In [18]:
df.na.fill(mean_sale, ['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [19]:
df.na.fill(
    df.select(mean(df['Sales'])).collect()[0][0], 
    subset=
    ['Sales']
).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [20]:
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



# df.na.replace

In [21]:
df.na.replace(345.0, 1000, ['Sales']).show()

+----+-----+------+
|  Id| Name| Sales|
+----+-----+------+
|emp1| John|  null|
|emp2| null|  null|
|emp3| null|1000.0|
|emp4|Cindy| 456.0|
+----+-----+------+



In [22]:
from pyspark.sql.functions import format_number, min, stddev


In [23]:
fill_missing = df.na.fill(
                    df.select(stddev(df['Sales'])).collect()[0][0],
                    subset=['Sales']
                )

In [31]:
formatted_fill_missing = fill_missing.select(['ID', 'Name', format_number("Sales", 2).alias('formatted_Sales')])

In [32]:
formatted_fill_missing.show()

+----+-----+---------------+
|  ID| Name|formatted_Sales|
+----+-----+---------------+
|emp1| John|          78.49|
|emp2| null|          78.49|
|emp3| null|         345.00|
|emp4|Cindy|         456.00|
+----+-----+---------------+



In [35]:
formatted_fill_missing

DataFrame[ID: string, Name: string, formatted_Sales: string]

# How to replace with filter ???

In [39]:
"""
Replace the formatted sales < 300 to 300
"""
final_data = formatted_fill_missing.na.replace(
                'formatted_Sales' < 300,
                300, 
                subset=['formatted_Sales']
            )

TypeError: '<' not supported between instances of 'str' and 'int'