In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Practise').getOrCreate()

In [3]:
df = spark.read.csv(r"C:\Users\pavan\OneDrive\Desktop\prep\datasets\test1.csv",header = True,inferSchema = True)

In [4]:
df.show()

+----+-------+----+
| Id |   Name| Age|
+----+-------+----+
|   1|   john|  22|
|   2|Stephen|  22|
|   3|  Stacy|  24|
|   4|   paul|  23|
|   5|   NULL|  32|
|   6|    Joe|NULL|
|NULL|   NULL|  45|
|   8|   jack|NULL|
+----+-------+----+



In [5]:
#drop columns

df.drop('Name').show()

+----+----+
| Id | Age|
+----+----+
|   1|  22|
|   2|  22|
|   3|  24|
|   4|  23|
|   5|  32|
|   6|NULL|
|NULL|  45|
|   8|NULL|
+----+----+



In [6]:
#drop specific rows with na values

df.na.drop().show()

+---+-------+---+
|Id |   Name|Age|
+---+-------+---+
|  1|   john| 22|
|  2|Stephen| 22|
|  3|  Stacy| 24|
|  4|   paul| 23|
+---+-------+---+



In [7]:
#options in na.drop
#1. how

# all ->  deletes rows only if all columns of a row has nulls
df.na.drop(how='all').show()

+----+-------+----+
| Id |   Name| Age|
+----+-------+----+
|   1|   john|  22|
|   2|Stephen|  22|
|   3|  Stacy|  24|
|   4|   paul|  23|
|   5|   NULL|  32|
|   6|    Joe|NULL|
|NULL|   NULL|  45|
|   8|   jack|NULL|
+----+-------+----+



In [8]:
#any ->deletes rows if atleast any of column in a row has null values

df.na.drop(how='any').show()

+---+-------+---+
|Id |   Name|Age|
+---+-------+---+
|  1|   john| 22|
|  2|Stephen| 22|
|  3|  Stacy| 24|
|  4|   paul| 23|
+---+-------+---+



In [9]:
#2. Threshold - atleast mentioned num of nulls has to there for a row to be deleted

df.na.drop(how='any', thresh=2).show()

+---+-------+----+
|Id |   Name| Age|
+---+-------+----+
|  1|   john|  22|
|  2|Stephen|  22|
|  3|  Stacy|  24|
|  4|   paul|  23|
|  5|   NULL|  32|
|  6|    Joe|NULL|
|  8|   jack|NULL|
+---+-------+----+



In [10]:
df.na.drop(how='any', thresh=1).show()

+----+-------+----+
| Id |   Name| Age|
+----+-------+----+
|   1|   john|  22|
|   2|Stephen|  22|
|   3|  Stacy|  24|
|   4|   paul|  23|
|   5|   NULL|  32|
|   6|    Joe|NULL|
|NULL|   NULL|  45|
|   8|   jack|NULL|
+----+-------+----+



In [11]:
#subset - whereever null value for specified column is available it would be deleted

df.na.drop(how='any', subset=['Age']).show()

+----+-------+---+
| Id |   Name|Age|
+----+-------+---+
|   1|   john| 22|
|   2|Stephen| 22|
|   3|  Stacy| 24|
|   4|   paul| 23|
|   5|   NULL| 32|
|NULL|   NULL| 45|
+----+-------+---+



In [12]:
#Filling missing values

df.na.fill('Missing Values').show()

# here since Age & Id is an integer it's not replaced by 'Missing value' term

+----+--------------+----+
| Id |          Name| Age|
+----+--------------+----+
|   1|          john|  22|
|   2|       Stephen|  22|
|   3|         Stacy|  24|
|   4|          paul|  23|
|   5|Missing Values|  32|
|   6|           Joe|NULL|
|NULL|Missing Values|  45|
|   8|          jack|NULL|
+----+--------------+----+



In [13]:
#Filling missing values in specific column

df.na.fill(0,'Age').show()

+----+-------+---+
| Id |   Name|Age|
+----+-------+---+
|   1|   john| 22|
|   2|Stephen| 22|
|   3|  Stacy| 24|
|   4|   paul| 23|
|   5|   NULL| 32|
|   6|    Joe|  0|
|NULL|   NULL| 45|
|   8|   jack|  0|
+----+-------+---+



In [14]:
df.na.fill('Missing Values',['Name','Age']).show()

# here since age is an integer it's not replaced by 'Missing value' term

+----+--------------+----+
| Id |          Name| Age|
+----+--------------+----+
|   1|          john|  22|
|   2|       Stephen|  22|
|   3|         Stacy|  24|
|   4|          paul|  23|
|   5|Missing Values|  32|
|   6|           Joe|NULL|
|NULL|Missing Values|  45|
|   8|          jack|NULL|
+----+--------------+----+



In [15]:
df.show()

+----+-------+----+
| Id |   Name| Age|
+----+-------+----+
|   1|   john|  22|
|   2|Stephen|  22|
|   3|  Stacy|  24|
|   4|   paul|  23|
|   5|   NULL|  32|
|   6|    Joe|NULL|
|NULL|   NULL|  45|
|   8|   jack|NULL|
+----+-------+----+



In [16]:
df.dtypes

[('Id ', 'int'), ('Name', 'string'), ('Age', 'int')]

In [23]:
# replace with mean using imputer
#worked for Age but not working for Id - saying column not available and Name - since its string type

from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols = ['Name'],
    outputCols = ["{}_imputed".format(c) for c in [ 'Name']]
    ).setStrategy("mode")

In [24]:
#Add imputation cols to df

imputer.fit(df).transform(df).show()

IllegalArgumentException: requirement failed: Column Name must be of type numeric but was actually of type string.

In [None]:
df.show()