<a href="https://cocl.us/Data_Science_with_Scalla_top"><img src = "https://s3-api.us-geo.objectstorage.softlayer.net/cf-courses-data/CognitiveClass/SC0103EN/adds/Data_Science_with_Scalla_notebook_top.png" width = 750, align = "center"></a>
 <br/>
<a><img src="https://ibm.box.com/shared/static/ugcqz6ohbvff804xp84y4kqnvvk3bq1g.png" width="200" align="center"></a>"

# Module 2: Preparing Data

## Handling Missing Data and Imputing Values

### Lesson Objectives 

-	After completing this lesson, you should be able to: 
- Drop records according to different criteria
-	Fill missing data according to different criteria
-	Drop duplicate records


## DataFrame NA Functions 

-	The `na` method of DataFrames provides functionality for working with missing data 
- Returns an instance of `DataFrameNAFunctions`
-	The following methods are available: 
  -	`drop`, for dropping rows containing NaN or null values 
  -	`fill`, for replacing NaN or null values 
  -	`replace`,  for replacing values matching specified keys

In [1]:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._

Intitializing Scala interpreter ...

Spark Web UI available at http://host.docker.internal:4045
SparkContext available as 'sc' (version = 3.3.0, master = local[*], app id = local-1669538170415)
SparkSession available as 'spark'


import org.apache.spark.sql.SparkSession
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@76548b57
import spark.implicits._
import org.apache.spark.sql.functions._


In [3]:
val df = spark.range(0, 10).select("id").
    withColumn("uniform", rand(10L)).withColumn("normal", randn(10L))

val halfTonNaN = udf[Double, Double] (x => if (x > 0.5) Double.NaN else x)

val oneToNaN = udf[Double, Double] (x => if (x > 1.0) Double.NaN else x) 

val dfnan = df.withColumn("nanUniform", halfTonNaN(df("uniform"))).
    withColumn("nanNormal", oneToNaN(df("normal"))).drop("uniform"). 
    withColumnRenamed("nanUniform", "uniform").drop("normal"). 
    withColumnRenamed("nanNoemal", "normal")

dfnan.show()

+---+-------------------+--------------------+
| id|            uniform|           nanNormal|
+---+-------------------+--------------------+
|  0| 0.1709497137955568| -0.4823162289462346|
|  1|0.03422639313807285|  -1.866319666602813|
|  2| 0.3654625958161396|-0.18972856311959985|
|  3| 0.4175019040792016|-0.02656790187341...|
|  4|                NaN| 0.15777284033895733|
|  5|0.16452185994603707| -1.2370553024281632|
|  6|0.18141810315190554|  -1.140722328482787|
|  7|0.49595620559530806|-0.01560854938969...|
|  8|                NaN| -0.6471976318591632|
|  9|0.07530606222259384| 0.06487585190313583|
+---+-------------------+--------------------+



df: org.apache.spark.sql.DataFrame = [id: bigint, uniform: double ... 1 more field]
halfTonNaN: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3817/0x00000001016e1840@470e17c1,DoubleType,List(Some(class[value[0]: double])),Some(class[value[0]: double]),None,false,true)
oneToNaN: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3818/0x0000000101680040@35884101,DoubleType,List(Some(class[value[0]: double])),Some(class[value[0]: double]),None,false,true)
dfnan: org.apache.spark.sql.DataFrame = [id: bigint, uniform: double ... 1 more field]


## DataFrame NA Functions - drop 

-	`drop` is used for dropping rows containing `NaN` or `null` values according to a criteria 
-	Several implementations available:
  -	`drop(minNonNulls, cols)`
  -	`drop(minNonNulls)`
  -	`drop(how,cols)`
  - `drop(cols)`
  -	`drop(how)`
  -	`drop()`
-	`cols` is an `Array` or `Seq` of column names
- how should be equal any or all

In [4]:
// Dropping Rows With minNonNulls Argument 
dfnan.na.drop(minNonNulls = 3).show()

+---+-------------------+--------------------+
| id|            uniform|           nanNormal|
+---+-------------------+--------------------+
|  0| 0.1709497137955568| -0.4823162289462346|
|  1|0.03422639313807285|  -1.866319666602813|
|  2| 0.3654625958161396|-0.18972856311959985|
|  3| 0.4175019040792016|-0.02656790187341...|
|  5|0.16452185994603707| -1.2370553024281632|
|  6|0.18141810315190554|  -1.140722328482787|
|  7|0.49595620559530806|-0.01560854938969...|
|  9|0.07530606222259384| 0.06487585190313583|
+---+-------------------+--------------------+



In [5]:
// Dropping Rows With How Argument 
dfnan.na.drop("all", Array("uniform", "nanNormal")).show()

+---+-------------------+--------------------+
| id|            uniform|           nanNormal|
+---+-------------------+--------------------+
|  0| 0.1709497137955568| -0.4823162289462346|
|  1|0.03422639313807285|  -1.866319666602813|
|  2| 0.3654625958161396|-0.18972856311959985|
|  3| 0.4175019040792016|-0.02656790187341...|
|  4|                NaN| 0.15777284033895733|
|  5|0.16452185994603707| -1.2370553024281632|
|  6|0.18141810315190554|  -1.140722328482787|
|  7|0.49595620559530806|-0.01560854938969...|
|  8|                NaN| -0.6471976318591632|
|  9|0.07530606222259384| 0.06487585190313583|
+---+-------------------+--------------------+



In [6]:
// Dropping Rows With How Argument 
dfnan.na.drop("any", Array("uniform", "nanNormal")).show()

+---+-------------------+--------------------+
| id|            uniform|           nanNormal|
+---+-------------------+--------------------+
|  0| 0.1709497137955568| -0.4823162289462346|
|  1|0.03422639313807285|  -1.866319666602813|
|  2| 0.3654625958161396|-0.18972856311959985|
|  3| 0.4175019040792016|-0.02656790187341...|
|  5|0.16452185994603707| -1.2370553024281632|
|  6|0.18141810315190554|  -1.140722328482787|
|  7|0.49595620559530806|-0.01560854938969...|
|  9|0.07530606222259384| 0.06487585190313583|
+---+-------------------+--------------------+



## DataFrame NA Functions - fill 

-	`fill` is used for replacing NaN or null values according to a criteria
-	Several implementations available:
  - `fill(valueMap)`
  -	`fill(value,cols)`
  -	`fill(value)`

In [7]:
// Filling Missing Data By Column Type
dfnan.na.fill(0.0).show()   

// Filling Missing Data With Column Defaults 
val uniformMean = dfnan.filter("uniform <> 'NaN'").groupBy().agg(mean("uniform")).first()(0)

dfnan.na.fill(Map("uniform" -> uniformMean)).show(5)

// Filling Missing Data With Column Defaults 
val dfCols = dfnan.columns.drop(1)

val dfMeans = dfnan.na.drop().groupBy().
agg(mean("uniform"), mean("nanNormal")).first().toSeq

val meansMap = (dfCols.zip(dfMeans)).toMap
dfnan.na.fill(meansMap).show(5)

+---+-------------------+--------------------+
| id|            uniform|           nanNormal|
+---+-------------------+--------------------+
|  0| 0.1709497137955568| -0.4823162289462346|
|  1|0.03422639313807285|  -1.866319666602813|
|  2| 0.3654625958161396|-0.18972856311959985|
|  3| 0.4175019040792016|-0.02656790187341...|
|  4|                0.0| 0.15777284033895733|
|  5|0.16452185994603707| -1.2370553024281632|
|  6|0.18141810315190554|  -1.140722328482787|
|  7|0.49595620559530806|-0.01560854938969...|
|  8|                0.0| -0.6471976318591632|
|  9|0.07530606222259384| 0.06487585190313583|
+---+-------------------+--------------------+

+---+-------------------+--------------------+
| id|            uniform|           nanNormal|
+---+-------------------+--------------------+
|  0| 0.1709497137955568| -0.4823162289462346|
|  1|0.03422639313807285|  -1.866319666602813|
|  2| 0.3654625958161396|-0.18972856311959985|
|  3| 0.4175019040792016|-0.02656790187341...|
|  4| 0.2381

uniformMean: Any = 0.2381678547181019
dfCols: Array[String] = Array(uniform, nanNormal)
dfMeans: Seq[Any] = WrappedArray(0.2381678547181019, -0.6116803361174477)
meansMap: scala.collection.immutable.Map[String,Any] = Map(uniform -> 0.2381678547181019, nanNormal -> -0.6116803361174477)


## DataFrame NA Functions - replace 

-	`replace` is used for replacing values matching specified keys
-	`cols` argument may be a single column name or an array
-	replacement argument is a map: 
  -	`key` is the value to be matched 
  -	`value` is the replacement value itself

In [8]:
//Replacing Values in a DataFrame 

dfnan.na.replace("uniform", Map(Double.NaN -> 0.0)).show()

+---+-------------------+--------------------+
| id|            uniform|           nanNormal|
+---+-------------------+--------------------+
|  0| 0.1709497137955568| -0.4823162289462346|
|  1|0.03422639313807285|  -1.866319666602813|
|  2| 0.3654625958161396|-0.18972856311959985|
|  3| 0.4175019040792016|-0.02656790187341...|
|  4|                0.0| 0.15777284033895733|
|  5|0.16452185994603707| -1.2370553024281632|
|  6|0.18141810315190554|  -1.140722328482787|
|  7|0.49595620559530806|-0.01560854938969...|
|  8|                0.0| -0.6471976318591632|
|  9|0.07530606222259384| 0.06487585190313583|
+---+-------------------+--------------------+



## Duplicates

-	`dropDuplicates` is a `DataFrame` method 
-	Used to remove duplicate rows
-	May specify a subset of columns to check for duplicates

In [9]:
// Dropping Duplicate Rows 
val dfDuplicates = df.unionAll(sc.parallelize(Seq((10,1,1),(11,1,1))).toDF())

// Dropping Duplicate Rows 
val dfCols = dfnan.withColumnRenamed("nanNormal", "normal").columns

dfDuplicates.dropDuplicates(dfCols).show()

+---+-------------------+--------------------+
| id|            uniform|              normal|
+---+-------------------+--------------------+
|  0| 0.1709497137955568| -0.4823162289462346|
|  1|0.03422639313807285|  -1.866319666602813|
|  2| 0.3654625958161396|-0.18972856311959985|
|  3| 0.4175019040792016|-0.02656790187341...|
|  4| 0.9899129399827472| 0.15777284033895733|
|  5|0.16452185994603707| -1.2370553024281632|
|  6|0.18141810315190554|  -1.140722328482787|
|  7|0.49595620559530806|-0.01560854938969...|
|  9|0.07530606222259384| 0.06487585190313583|
|  8| 0.9697474945375325| -0.6471976318591632|
| 10|                1.0|                 1.0|
| 11|                1.0|                 1.0|
+---+-------------------+--------------------+



dfDuplicates: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, uniform: double ... 1 more field]
dfCols: Array[String] = Array(id, uniform, normal)


## Lesson Summary

-	Having completed this lesson, you should be able to: 
- Drop records according to different criteria
-	Fill missing data according to different criteria
-	Drop duplicate records

### About the Authors

[Petro Verkhogliad](https://www.linkedin.com/in/vpetro) is Consulting Manager at Lightbend. He holds a Masters degree in Computer Science with specialization in Intelligent Systems. He is passionate about functional programming and applications of AI.