In [65]:
import findspark
findspark.init( '/home/kajili/spark-2.4.5-bin-hadoop2.7' )
from pyspark.sql import SparkSession

In [66]:
spark = SparkSession.builder.appName('miss').getOrCreate()

In [67]:
df = spark.read.csv('./test_data/ContainsNull.csv', header=True, inferSchema=True)

In [68]:
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



# Can use `na.drop()` function to drop any row with missing data

In [6]:
df.na.drop()

DataFrame[Id: string, Name: string, Sales: double]

In [9]:
df.na.drop(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [10]:
df.na.drop(how='any').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [11]:
df.na.drop(how='all').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [12]:
df.na.drop(subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



# Can use `na.fill()` function to fill missing data instead

In [15]:
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)



In [17]:
df.na.fill('').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2|     | null|
|emp3|     |345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [18]:
df.na.fill('FILL VALUE').show()

+----+----------+-----+
|  Id|      Name|Sales|
+----+----------+-----+
|emp1|      John| null|
|emp2|FILL VALUE| null|
|emp3|FILL VALUE|345.0|
|emp4|     Cindy|456.0|
+----+----------+-----+



In [19]:
df.na.fill(0).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [22]:
# So, to fill any missing values in the `Name` Column with the string `No Name`, you would do this:
df.na.fill('No Name',subset=['Name']).show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|No Name| null|
|emp3|No Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [23]:
from pyspark.sql.functions import mean

In [44]:
mean_val = df.select(mean(df['Sales']))

In [45]:
mean_val

DataFrame[avg(Sales): double]

In [46]:
mean_val.show()

+----------+
|avg(Sales)|
+----------+
|     400.5|
+----------+



In [47]:
mean_val.collect()

[Row(avg(Sales)=400.5)]

In [48]:
mean_val = mean_val.collect()

In [49]:
mean_val

[Row(avg(Sales)=400.5)]

In [50]:
mean_val[0]

Row(avg(Sales)=400.5)

In [51]:
# Can instance it out
mean_val[0][0]

400.5

In [56]:
# or you can turn it into a dict to get the value
mean_val[0].asDict()['avg(Sales)']

400.5

In [57]:
mean_sales = mean_val[0][0] 

In [59]:
df.na.fill(mean_sales,subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [63]:
# Replacing missing `Sales` values with mean value, all in one line
df.na.fill(df.select(mean(df['Sales'])).collect()[0][0],subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



# End of Section 8 Lecture 28