# Pyspark Tutorial 3

Youtube Link: https://www.youtube.com/watch?v=pOMXkbc06m4&list=PLZoTAELRMXVNjiiawhzZ0afHcPvC8jpcg&index=3

In [32]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('pratices').getOrCreate()
spark

25/06/16 17:14:33 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [33]:
df = spark.read.csv('test2.csv', header=True, inferSchema=True)
df.show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     NULL|  34|        10| 38000|
|     NULL|  36|      NULL|  NULL|
|     NULL|NULL|      NULL|  NULL|
+---------+----+----------+------+



## Drop Column

In [34]:
df.drop('Name').show()

+----+----------+------+
| age|Experience|Salary|
+----+----------+------+
|  31|        10| 30000|
|  30|         8| 25000|
|  29|         4| 20000|
|  24|         3| 20000|
|  21|         1| 15000|
|  23|         2| 18000|
|NULL|      NULL| 40000|
|  34|        10| 38000|
|  36|      NULL|  NULL|
|NULL|      NULL|  NULL|
+----+----------+------+



### Dropping null values rows

#### For this we can use `na.drop()`

Parameters
----------
* how : str, optional, the values that can be 'any' or 'all', default 'any'.
    If 'any', drop a row if it contains any nulls.
    If 'all', drop a row only if all its values are null.
    
* thresh: int, optional, default None.
    If specified, drop rows that have less than `thresh` non-null values.
    This overwrites the `how` parameter.
* subset : str, tuple or list, optional
    optional list of column names to consider

In [35]:
df.na.drop().show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [36]:
df.na.drop(how='all').show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     NULL|  34|        10| 38000|
|     NULL|  36|      NULL|  NULL|
+---------+----+----------+------+



In [37]:
df.na.drop(how='any', subset=['age']).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|     NULL| 34|        10| 38000|
|     NULL| 36|      NULL|  NULL|
+---------+---+----------+------+



In [38]:
df.na.drop(how='any', thresh=2, subset=['Experience', 'Salary']).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|     NULL| 34|        10| 38000|
+---------+---+----------+------+



Drop the rows minimum of 2 null values in Experience, Salary columns

## Filing NULL values

### for this we can use the na.fill()

Parameters
----------
* value : int, float, string, bool or dict, the value to replace null values with.
    If the value is a dict, then `subset` is ignored and `value` must be a mapping
    from column name (string) to replacement value. The replacement value must be
    an int, float, boolean, or string.
* subset : str, tuple or list, optional
    optional list of column names to consider.
    Columns specified in subset that do not have matching data types are ignored.
    For example, if `value` is a string, and subset contains a non-string column,
    then the non-string column is simply ignored.

In [39]:
df = spark.read.csv('test2.csv', header=True)
df.na.fill(value='Miss', subset=['Name','age','Experience','Salary']).show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|Miss|      Miss| 40000|
|     Miss|  34|        10| 38000|
|     Miss|  36|      Miss|  Miss|
|     Miss|Miss|      Miss|  Miss|
+---------+----+----------+------+



In [40]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- Experience: string (nullable = true)
 |-- Salary: string (nullable = true)



In [41]:
df = spark.read.csv('test2.csv', header=True, inferSchema=True)
df.na.fill(value='Miss', subset=['Name','age','Experience','Salary']).show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     Miss|  34|        10| 38000|
|     Miss|  36|      NULL|  NULL|
|     Miss|NULL|      NULL|  NULL|
+---------+----+----------+------+



Here name is str and age, salary, Experience are int. <br>
for fill we have passed the str that's way it fills only Name column missing values

In [42]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



#### Filling missing values with Mean, Median or mode

Filling Nan values with Mean, median or mode can be achived using the Imputer class from pyspark.ml.feature 

In [43]:
!python3 -m pip install --trusted-host pypi.org --trusted-host files.pythonhosted.org --upgrade numpy



In [44]:
from pyspark.ml.feature import Imputer

In [45]:
impute = Imputer(
    inputCols = ['age', 'Experience', 'Salary'],
    outputCols = ["{}_imputed".format(c) for c in ['age', 'Experience', 'Salary']]
).setStrategy('mean') # median, mode

In [46]:
impute.fit(df).transform(df).show()

+---------+----+----------+------+-----------+------------------+--------------+
|     Name| age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+---------+----+----------+------+-----------+------------------+--------------+
|    Krish|  31|        10| 30000|         31|                10|         30000|
|Sudhanshu|  30|         8| 25000|         30|                 8|         25000|
|    Sunny|  29|         4| 20000|         29|                 4|         20000|
|     Paul|  24|         3| 20000|         24|                 3|         20000|
|   Harsha|  21|         1| 15000|         21|                 1|         15000|
|  Shubham|  23|         2| 18000|         23|                 2|         18000|
|   Mahesh|NULL|      NULL| 40000|         28|                 5|         40000|
|     NULL|  34|        10| 38000|         34|                10|         38000|
|     NULL|  36|      NULL|  NULL|         36|                 5|         25750|
|     NULL|NULL|      NULL| 

Or calculating the aggregate values manually and fill using `na.fill`

In [51]:
from pyspark.sql.functions import mean, col, mode, expr

In [61]:
# Calculate mean for age
value = df.select(mean(col('age'))).collect()[0][0] # return 28.5

# Calculate median for age (using approxQuantile for large datasets)
# value = df.approxQuantile('age',[0.5], 0.001)[0] # return 29.0

# Calculate mode for categorical_col
# This requires a bit more manual work to find the most frequent value
# value = df.groupBy('Name').count().orderBy(col('count').desc()).first()[0] # returns None

value

28.5

In [62]:
df.na.fill(value, subset=['age']).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|   Mahesh| 28|      NULL| 40000|
|     NULL| 34|        10| 38000|
|     NULL| 36|      NULL|  NULL|
|     NULL| 28|      NULL|  NULL|
+---------+---+----------+------+



In [63]:
spark.stop()