## Pyspark Handling Missing Values
 - Dropping Columns
 - Dropping Rows
 - Various Parameter in Dropping functionalities
 - Handling Missing values by Mean

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Practise').getOrCreate()

In [2]:
df_pyspark = spark.read.csv('Test1.csv',header=True, inferSchema=True)

In [3]:
df_pyspark.show()

+--------+----+----------+------+
|    Name| Age|Experience|Salary|
+--------+----+----------+------+
|   Krish|  31|        10| 30000|
|Sudanshu|  30|         8| 25000|
|   Sunny|  29|         6| 20000|
|   Shubh|  28|         4| 15000|
|  Harsha|  24|         1| 40000|
|  Mahesh|  26|         3| 35000|
|   Parul|null|      null| 38000|
|    null|  34|        10| 33000|
|    null|  35|      null|  null|
+--------+----+----------+------+



In [4]:
### Drop the Columns
df_pyspark.drop('Name').show()

+----+----------+------+
| Age|Experience|Salary|
+----+----------+------+
|  31|        10| 30000|
|  30|         8| 25000|
|  29|         6| 20000|
|  28|         4| 15000|
|  24|         1| 40000|
|  26|         3| 35000|
|null|      null| 38000|
|  34|        10| 33000|
|  35|      null|  null|
+----+----------+------+



In [5]:
### Dropping Nan Values all rows are dropped with NaN
df_pyspark.na.drop().show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|   Krish| 31|        10| 30000|
|Sudanshu| 30|         8| 25000|
|   Sunny| 29|         6| 20000|
|   Shubh| 28|         4| 15000|
|  Harsha| 24|         1| 40000|
|  Mahesh| 26|         3| 35000|
+--------+---+----------+------+



In [6]:
### any==how
df_pyspark.na.drop(how='any').show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|   Krish| 31|        10| 30000|
|Sudanshu| 30|         8| 25000|
|   Sunny| 29|         6| 20000|
|   Shubh| 28|         4| 15000|
|  Harsha| 24|         1| 40000|
|  Mahesh| 26|         3| 35000|
+--------+---+----------+------+



In [7]:
## Threshold, checks if it has 2 null values in row then only it will delete the row
df_pyspark.na.drop(how='any',thresh=2).show()

+--------+----+----------+------+
|    Name| Age|Experience|Salary|
+--------+----+----------+------+
|   Krish|  31|        10| 30000|
|Sudanshu|  30|         8| 25000|
|   Sunny|  29|         6| 20000|
|   Shubh|  28|         4| 15000|
|  Harsha|  24|         1| 40000|
|  Mahesh|  26|         3| 35000|
|   Parul|null|      null| 38000|
|    null|  34|        10| 33000|
+--------+----+----------+------+



In [8]:
## Subset, remove rows when NaN in particular column
df_pyspark.na.drop(how='any',subset=['Experience']).show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|   Krish| 31|        10| 30000|
|Sudanshu| 30|         8| 25000|
|   Sunny| 29|         6| 20000|
|   Shubh| 28|         4| 15000|
|  Harsha| 24|         1| 40000|
|  Mahesh| 26|         3| 35000|
|    null| 34|        10| 33000|
+--------+---+----------+------+



In [11]:
## Fill the Missing Values
df_pyspark.na.fill('Missing Values').show()

+--------------+----+----------+------+
|          Name| Age|Experience|Salary|
+--------------+----+----------+------+
|         Krish|  31|        10| 30000|
|      Sudanshu|  30|         8| 25000|
|         Sunny|  29|         6| 20000|
|         Shubh|  28|         4| 15000|
|        Harsha|  24|         1| 40000|
|        Mahesh|  26|         3| 35000|
|         Parul|null|      null| 38000|
|Missing Values|  34|        10| 33000|
|Missing Values|  35|      null|  null|
+--------------+----+----------+------+



In [13]:
df_pyspark.show()

+--------+----+----------+------+
|    Name| Age|Experience|Salary|
+--------+----+----------+------+
|   Krish|  31|        10| 30000|
|Sudanshu|  30|         8| 25000|
|   Sunny|  29|         6| 20000|
|   Shubh|  28|         4| 15000|
|  Harsha|  24|         1| 40000|
|  Mahesh|  26|         3| 35000|
|   Parul|null|      null| 38000|
|    null|  34|        10| 33000|
|    null|  35|      null|  null|
+--------+----+----------+------+



In [16]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['Age', 'Experience', 'Salary'], 
    outputCols=["{}_imputed".format(c) for c in ['Age', 'Experience', 'Salary']]
    ).setStrategy("median")

In [17]:
# Add imputation cols to df
imputer.fit(df_pyspark).transform(df_pyspark).show()

+--------+----+----------+------+-----------+------------------+--------------+
|    Name| Age|Experience|Salary|Age_imputed|Experience_imputed|Salary_imputed|
+--------+----+----------+------+-----------+------------------+--------------+
|   Krish|  31|        10| 30000|         31|                10|         30000|
|Sudanshu|  30|         8| 25000|         30|                 8|         25000|
|   Sunny|  29|         6| 20000|         29|                 6|         20000|
|   Shubh|  28|         4| 15000|         28|                 4|         15000|
|  Harsha|  24|         1| 40000|         24|                 1|         40000|
|  Mahesh|  26|         3| 35000|         26|                 3|         35000|
|   Parul|null|      null| 38000|         29|                 6|         38000|
|    null|  34|        10| 33000|         34|                10|         33000|
|    null|  35|      null|  null|         35|                 6|         30000|
+--------+----+----------+------+-------

## Filter Operation

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('dataframe').getOrCreate()

In [4]:
df_pyspark = spark.read.csv('test2.csv',header=True,inferSchema=True)
df_pyspark.show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|   Krish| 31|        10| 30000|
|Sudanshu| 30|         8| 25000|
|   Sunny| 29|         6| 20000|
|   Shubh| 28|         4| 15000|
|  Harsha| 24|         1| 40000|
|  Mahesh| 26|         3| 35000|
|   Parul| 27|         5| 38000|
+--------+---+----------+------+



In [6]:
### Salary of people with less than equal to 20000
df_pyspark.filter('Salary<=20000').show()

+-----+---+----------+------+
| Name|Age|Experience|Salary|
+-----+---+----------+------+
|Sunny| 29|         6| 20000|
|Shubh| 28|         4| 15000|
+-----+---+----------+------+



In [8]:
df_pyspark.filter('Salary<=20000').select(['Name','Age']).show()

+-----+---+
| Name|Age|
+-----+---+
|Sunny| 29|
|Shubh| 28|
+-----+---+



In [10]:
df_pyspark.filter(df_pyspark['Salary']<=20000).show()

+-----+---+----------+------+
| Name|Age|Experience|Salary|
+-----+---+----------+------+
|Sunny| 29|         6| 20000|
|Shubh| 28|         4| 15000|
+-----+---+----------+------+



In [13]:
# Two condition for filter
df_pyspark.filter((df_pyspark['Salary']<=30000) & (df_pyspark['Salary']>=15000)).show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|   Krish| 31|        10| 30000|
|Sudanshu| 30|         8| 25000|
|   Sunny| 29|         6| 20000|
|   Shubh| 28|         4| 15000|
+--------+---+----------+------+



In [14]:
#Inverse filter operation
df_pyspark.filter(~(df_pyspark['Salary']<=20000)).show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|   Krish| 31|        10| 30000|
|Sudanshu| 30|         8| 25000|
|  Harsha| 24|         1| 40000|
|  Mahesh| 26|         3| 35000|
|   Parul| 27|         5| 38000|
+--------+---+----------+------+



## Pyspark GroupBy and Agrregate Functions 

In [1]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('Agg').getOrCreate()

In [4]:
spark

In [5]:
df_pyspark = spark.read.csv('test3.csv',header=True, inferSchema=True);

In [6]:
df_pyspark.show()

+--------+------------+------+
|    Name|  Department|Salary|
+--------+------------+------+
|   Krish|Data Science| 30000|
|Sudanshu|         IOT| 25000|
|   Sunny|    Big Data| 20000|
|   Shubh|    Big Data| 15000|
|  Harsha|Data Science| 40000|
|  Mahesh|         IOT| 35000|
|   Parul|    Big Data| 38000|
|   Krish|         IOT| 45000|
+--------+------------+------+



In [7]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: integer (nullable = true)



In [9]:
## Group By
df_pyspark.groupBy('Name').sum().show()

+--------+-----------+
|    Name|sum(Salary)|
+--------+-----------+
|   Sunny|      20000|
|   Krish|      75000|
|Sudanshu|      25000|
|   Shubh|      15000|
|  Harsha|      40000|
|  Mahesh|      35000|
|   Parul|      38000|
+--------+-----------+



In [10]:
## Group by Department which gives max salary

df_pyspark.groupBy('Department').sum().show()

+------------+-----------+
|  Department|sum(Salary)|
+------------+-----------+
|         IOT|     105000|
|    Big Data|      73000|
|Data Science|      70000|
+------------+-----------+



In [11]:
df_pyspark.groupBy('Department').mean().show()

+------------+------------------+
|  Department|       avg(Salary)|
+------------+------------------+
|         IOT|           35000.0|
|    Big Data|24333.333333333332|
|Data Science|           35000.0|
+------------+------------------+



In [13]:
df_pyspark.groupBy('Department').count().show()

+------------+-----+
|  Department|count|
+------------+-----+
|         IOT|    3|
|    Big Data|    3|
|Data Science|    2|
+------------+-----+



In [14]:
df_pyspark.agg({'Salary':'sum'}).show()

+-----------+
|sum(Salary)|
+-----------+
|     248000|
+-----------+

