<a href="https://colab.research.google.com/github/saurater/ciencia_de_dados_pyspark/blob/main/PySpark_Tutorial_Part_3_Handling_Missing_Values.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark - Tutorial - Part 3 - Handling Missing Values
## Notebook by Sam Faraday
June 2022

1. Dropping Columns
2. Dropping Rows
3. Dropping Parameters (how, threshold, subset)
4. Handling Missingg Values by Mean, Median, Mode

## Sources: 
1. Free Code Camp: PySpark Tutorial at https://www.youtube.com/watch?v=_C8kWso4ne4
2. Apache Spark API Refernce at  https://spark.apache.org/docs/latest/api/python/reference/index.html

# 1. Installing PySpark

In [None]:
pip install pyspark # run it every time you connect you Google Colab Notebook

# 2. Importing the required libraries

In [None]:
from pyspark.sql.functions import col,isnan,when,count

In [None]:
import pandas as pd
import numpy as np

# 3. Creating the Test3 Dataset

In [None]:
data = {'Index':[1,2,3,4,5,6,np.NaN], 'Name':['Tom', 'Nick', 'Krish', '','Jack',  '',''], 'Age':[20, np.NaN, np.NaN, 19, 18,19, np.NaN], 'Salary':[2000, 3000, np.NaN, 4000, 3000, 3500, np.NaN] ,'Drop Me':[np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN]}
# Create DataFrame
df = pd.DataFrame(data)
df

Unnamed: 0,Index,Name,Age,Salary,Drop Me
0,1.0,Tom,20.0,2000.0,
1,2.0,Nick,,3000.0,
2,3.0,Krish,,,
3,4.0,,19.0,4000.0,
4,5.0,Jack,18.0,3000.0,
5,6.0,,19.0,3500.0,
6,,,,,


# 4. Saving the Dataset

In [None]:
df.to_csv('test3.csv', index=False)

# 5. Initializing PySpark


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Practise").getOrCreate()

spark

# 6. Reading the Dataset

In [None]:
df_spark = spark.read.csv("test3.csv", header =True, inferSchema =True)
df_spark.show()

+-----+-----+----+------+-------+
|Index| Name| Age|Salary|Drop Me|
+-----+-----+----+------+-------+
|  1.0|  Tom|20.0|2000.0|   null|
|  2.0| Nick|null|3000.0|   null|
|  3.0|Krish|null|  null|   null|
|  4.0| null|19.0|4000.0|   null|
|  5.0| Jack|18.0|3000.0|   null|
|  6.0| null|19.0|3500.0|   null|
| null| null|null|  null|   null|
+-----+-----+----+------+-------+



# 7. Checking the Schema

In [None]:
df_spark.printSchema()

root
 |-- Index: double (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Salary: double (nullable = true)
 |-- Drop Me: string (nullable = true)



# 8. Find count for empty, None, Null, Nan with string literals.

df2 = df.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in df.columns])
df2.show()

In [None]:
df_spark2 = df_spark.select([count(when(col(c).contains('None') |
col(c).contains('NULL') |
(col(c) == '' ) |
col(c).isNull() |
isnan(c), c )).alias(c) for c in df_spark.columns]) 

df_spark2.show()

+-----+----+---+------+-------+
|Index|Name|Age|Salary|Drop Me|
+-----+----+---+------+-------+
|    1|   3|  3|     2|      7|
+-----+----+---+------+-------+



# 9. Dropping Columns

In [None]:
df_spark = df_spark.drop("Drop me")
df_spark.show()

+-----+-----+----+------+
|Index| Name| Age|Salary|
+-----+-----+----+------+
|  1.0|  Tom|20.0|2000.0|
|  2.0| Nick|null|3000.0|
|  3.0|Krish|null|  null|
|  4.0| null|19.0|4000.0|
|  5.0| Jack|18.0|3000.0|
|  6.0| null|19.0|3500.0|
| null| null|null|  null|
+-----+-----+----+------+



# 10. Dropping All Rows with Null Values
Note we have not saved the dataset (not in place)

In [None]:
df_spark.na.drop().show()

+-----+----+----+------+
|Index|Name| Age|Salary|
+-----+----+----+------+
|  1.0| Tom|20.0|2000.0|
|  5.0|Jack|18.0|3000.0|
+-----+----+----+------+



# 11. Dropping Rows with ANY Null Values

In [None]:
df_spark.na.drop(how='any').show()

+-----+----+----+------+
|Index|Name| Age|Salary|
+-----+----+----+------+
|  1.0| Tom|20.0|2000.0|
|  5.0|Jack|18.0|3000.0|
+-----+----+----+------+



# 12. Dropping Rows with ALL Null Values

In [None]:
df_spark.show()

+-----+-----+----+------+
|Index| Name| Age|Salary|
+-----+-----+----+------+
|  1.0|  Tom|20.0|2000.0|
|  2.0| Nick|null|3000.0|
|  3.0|Krish|null|  null|
|  4.0| null|19.0|4000.0|
|  5.0| Jack|18.0|3000.0|
|  6.0| null|19.0|3500.0|
| null| null|null|  null|
+-----+-----+----+------+



In [None]:
df_spark.na.drop(how='all').show() # row 7 has been deleted, the only one with all null values

+-----+-----+----+------+
|Index| Name| Age|Salary|
+-----+-----+----+------+
|  1.0|  Tom|20.0|2000.0|
|  2.0| Nick|null|3000.0|
|  3.0|Krish|null|  null|
|  4.0| null|19.0|4000.0|
|  5.0| Jack|18.0|3000.0|
|  6.0| null|19.0|3500.0|
+-----+-----+----+------+



# 13. Dropping Rows with ANY Null Values AND THRESHOLD = Some number
Please note if there are 2 null values, you have threshold = 3 (n =1)
In htis case, row 3 has been deleted

In [None]:
df_spark.na.drop(how='any', thresh=3).show()

+-----+----+----+------+
|Index|Name| Age|Salary|
+-----+----+----+------+
|  1.0| Tom|20.0|2000.0|
|  2.0|Nick|null|3000.0|
|  4.0|null|19.0|4000.0|
|  5.0|Jack|18.0|3000.0|
|  6.0|null|19.0|3500.0|
+-----+----+----+------+



# 14. Dropping Rows with Subset
Specific Columns

In [None]:
df_spark.show()

+-----+-----+----+------+
|Index| Name| Age|Salary|
+-----+-----+----+------+
|  1.0|  Tom|20.0|2000.0|
|  2.0| Nick|null|3000.0|
|  3.0|Krish|null|  null|
|  4.0| null|19.0|4000.0|
|  5.0| Jack|18.0|3000.0|
|  6.0| null|19.0|3500.0|
| null| null|null|  null|
+-----+-----+----+------+



In [None]:
df_spark.na.drop(how='any', subset=['Name']).show()

+-----+-----+----+------+
|Index| Name| Age|Salary|
+-----+-----+----+------+
|  1.0|  Tom|20.0|2000.0|
|  2.0| Nick|null|3000.0|
|  3.0|Krish|null|  null|
|  5.0| Jack|18.0|3000.0|
+-----+-----+----+------+



# 15. Filling All Missing Values
In case you do not specifiy a subset(column), it will fill all columns with missing value with the same data type

In [None]:
df_spark.na.fill(0).show() # Numbers  only

+-----+-----+----+------+
|Index| Name| Age|Salary|
+-----+-----+----+------+
|  1.0|  Tom|20.0|2000.0|
|  2.0| Nick| 0.0|3000.0|
|  3.0|Krish| 0.0|   0.0|
|  4.0| null|19.0|4000.0|
|  5.0| Jack|18.0|3000.0|
|  6.0| null|19.0|3500.0|
|  0.0| null| 0.0|   0.0|
+-----+-----+----+------+



In [None]:
df_spark.na.fill("Missing Values").show() # Strings only

+-----+--------------+----+------+
|Index|          Name| Age|Salary|
+-----+--------------+----+------+
|  1.0|           Tom|20.0|2000.0|
|  2.0|          Nick|null|3000.0|
|  3.0|         Krish|null|  null|
|  4.0|Missing Values|19.0|4000.0|
|  5.0|          Jack|18.0|3000.0|
|  6.0|Missing Values|19.0|3500.0|
| null|Missing Values|null|  null|
+-----+--------------+----+------+



# 16. Filling Missing Values by Columns Names

In [None]:
df_spark.na.fill(0, "Age").show() # Age Column only

+-----+-----+----+------+
|Index| Name| Age|Salary|
+-----+-----+----+------+
|  1.0|  Tom|20.0|2000.0|
|  2.0| Nick| 0.0|3000.0|
|  3.0|Krish| 0.0|  null|
|  4.0| null|19.0|4000.0|
|  5.0| Jack|18.0|3000.0|
|  6.0| null|19.0|3500.0|
| null| null| 0.0|  null|
+-----+-----+----+------+



In [None]:
df_spark.na.fill(0, ["Age","Salary"]).show() # Age and Salary Columns only

+-----+-----+----+------+
|Index| Name| Age|Salary|
+-----+-----+----+------+
|  1.0|  Tom|20.0|2000.0|
|  2.0| Nick| 0.0|3000.0|
|  3.0|Krish| 0.0|   0.0|
|  4.0| null|19.0|4000.0|
|  5.0| Jack|18.0|3000.0|
|  6.0| null|19.0|3500.0|
| null| null| 0.0|   0.0|
+-----+-----+----+------+



# 17. Imputing Missing Values Strategies

In [None]:
from pyspark.ml.feature import Imputer

## Please note that for learning only, we have created 2 new columns: Age_Imputed and Salary_Imputed.

In real life, after analysing the result you could remore the _Imputed from the outputCols like this:

my_imputer = Imputer(
    inputCols = ['Age', 'Salary'],
    outputCols =  ["{}".format(c) for c in ['Age',  'Salary']]).setStrategy('mean')

## Imputation Startegies
Mean, Median, Mode    

In [None]:
my_imputer = Imputer(
    inputCols = ['Age', 'Salary'],
    outputCols =  ["{}_Imputed".format(c) for c in ['Age',  'Salary']]).setStrategy('mean')

# 18. Add imputation to df_spark

In [None]:
my_imputer.fit(df_spark).transform(df_spark).show()

+-----+-----+----+------+-----------+--------------+
|Index| Name| Age|Salary|Age_Imputed|Salary_Imputed|
+-----+-----+----+------+-----------+--------------+
|  1.0|  Tom|20.0|2000.0|       20.0|        2000.0|
|  2.0| Nick|null|3000.0|       19.0|        3000.0|
|  3.0|Krish|null|  null|       19.0|        3100.0|
|  4.0| null|19.0|4000.0|       19.0|        4000.0|
|  5.0| Jack|18.0|3000.0|       18.0|        3000.0|
|  6.0| null|19.0|3500.0|       19.0|        3500.0|
| null| null|null|  null|       19.0|        3100.0|
+-----+-----+----+------+-----------+--------------+

