In [1]:
import pyspark

In [2]:
# importing session method to start a new session
from pyspark.sql import SparkSession

In [3]:
# creating a new session

spark = SparkSession.builder.appName('intro').getOrCreate()

**Topics Covered:**
 - PySpark DataFrame
 - Reading the Dataset
 - Checkng the Datatypes of the Column [Schema]
 - Selecting Columns and Indexing
 - Adding Columns
 - Dropping Columns

In [4]:
# age and exp are considered as string
# inferSchema should be flagged  True to preserve the dataframe's schema

dfps = spark.read.csv("pyspark.csv",header=True,inferSchema = True)

In [5]:
dfps

DataFrame[Name: string, Age: int, Experience: int, Salary: int]

In [6]:
dfps.show()

+-------+----+----------+------+
|   Name| Age|Experience|Salary|
+-------+----+----------+------+
|Gowtham|  20|         2| 90000|
|  James|  17|         2| 90000|
|  Wendy|  80|        50|100000|
|Charles|  47|        19|150000|
|   Mike|  45|        10|150000|
| Angela|  44|        10|150000|
| Ritvik|null|      null| 90000|
|   null|  20|        15| 50000|
|  Kevin|null|      null|  null|
+-------+----+----------+------+



In [7]:
# check the schema
dfps.printSchema()
# like df.show() in pandas

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [8]:
# dropping a column

dfps.drop("Name").show()

+----+----------+------+
| Age|Experience|Salary|
+----+----------+------+
|  20|         2| 90000|
|  17|         2| 90000|
|  80|        50|100000|
|  47|        19|150000|
|  45|        10|150000|
|  44|        10|150000|
|null|      null| 90000|
|  20|        15| 50000|
|null|      null|  null|
+----+----------+------+



In [9]:
# dropping null  values

dfps.na.drop().show()             # wherever null values - will get deleted


+-------+---+----------+------+
|   Name|Age|Experience|Salary|
+-------+---+----------+------+
|Gowtham| 20|         2| 90000|
|  James| 17|         2| 90000|
|  Wendy| 80|        50|100000|
|Charles| 47|        19|150000|
|   Mike| 45|        10|150000|
| Angela| 44|        10|150000|
+-------+---+----------+------+



In [10]:
dfps.na.drop(how='any').show()      #drop a row if it contains any nulls.

+-------+---+----------+------+
|   Name|Age|Experience|Salary|
+-------+---+----------+------+
|Gowtham| 20|         2| 90000|
|  James| 17|         2| 90000|
|  Wendy| 80|        50|100000|
|Charles| 47|        19|150000|
|   Mike| 45|        10|150000|
| Angela| 44|        10|150000|
+-------+---+----------+------+



In [11]:
dfps.na.drop(how = 'all').show()       # drop a row only if all its values are null.

+-------+----+----------+------+
|   Name| Age|Experience|Salary|
+-------+----+----------+------+
|Gowtham|  20|         2| 90000|
|  James|  17|         2| 90000|
|  Wendy|  80|        50|100000|
|Charles|  47|        19|150000|
|   Mike|  45|        10|150000|
| Angela|  44|        10|150000|
| Ritvik|null|      null| 90000|
|   null|  20|        15| 50000|
|  Kevin|null|      null|  null|
+-------+----+----------+------+



In [12]:
dfps.show()

+-------+----+----------+------+
|   Name| Age|Experience|Salary|
+-------+----+----------+------+
|Gowtham|  20|         2| 90000|
|  James|  17|         2| 90000|
|  Wendy|  80|        50|100000|
|Charles|  47|        19|150000|
|   Mike|  45|        10|150000|
| Angela|  44|        10|150000|
| Ritvik|null|      null| 90000|
|   null|  20|        15| 50000|
|  Kevin|null|      null|  null|
+-------+----+----------+------+



In [13]:
# threshold   n nulls are permittable
dfps.na.drop(thresh = 2,how = 'any').show()    #drop rows that have less than 2 non-null values.

+-------+----+----------+------+
|   Name| Age|Experience|Salary|
+-------+----+----------+------+
|Gowtham|  20|         2| 90000|
|  James|  17|         2| 90000|
|  Wendy|  80|        50|100000|
|Charles|  47|        19|150000|
|   Mike|  45|        10|150000|
| Angela|  44|        10|150000|
| Ritvik|null|      null| 90000|
|   null|  20|        15| 50000|
+-------+----+----------+------+



In [14]:
## subset  - list of column names to drop nulls


dfps.na.drop(how = 'any',subset = ['Experience']).show()

+-------+---+----------+------+
|   Name|Age|Experience|Salary|
+-------+---+----------+------+
|Gowtham| 20|         2| 90000|
|  James| 17|         2| 90000|
|  Wendy| 80|        50|100000|
|Charles| 47|        19|150000|
|   Mike| 45|        10|150000|
| Angela| 44|        10|150000|
|   null| 20|        15| 50000|
+-------+---+----------+------+



# Filling the Missing Values

In [15]:
dfps.show()

+-------+----+----------+------+
|   Name| Age|Experience|Salary|
+-------+----+----------+------+
|Gowtham|  20|         2| 90000|
|  James|  17|         2| 90000|
|  Wendy|  80|        50|100000|
|Charles|  47|        19|150000|
|   Mike|  45|        10|150000|
| Angela|  44|        10|150000|
| Ritvik|null|      null| 90000|
|   null|  20|        15| 50000|
|  Kevin|null|      null|  null|
+-------+----+----------+------+



In [16]:
dfps.na.fill('Missing values').show()

+--------------+----+----------+------+
|          Name| Age|Experience|Salary|
+--------------+----+----------+------+
|       Gowtham|  20|         2| 90000|
|         James|  17|         2| 90000|
|         Wendy|  80|        50|100000|
|       Charles|  47|        19|150000|
|          Mike|  45|        10|150000|
|        Angela|  44|        10|150000|
|        Ritvik|null|      null| 90000|
|Missing values|  20|        15| 50000|
|         Kevin|null|      null|  null|
+--------------+----+----------+------+



In [17]:
dfps.na.fill({
    'Age':50,
    'Name':"Unknown"
}).show()

+-------+---+----------+------+
|   Name|Age|Experience|Salary|
+-------+---+----------+------+
|Gowtham| 20|         2| 90000|
|  James| 17|         2| 90000|
|  Wendy| 80|        50|100000|
|Charles| 47|        19|150000|
|   Mike| 45|        10|150000|
| Angela| 44|        10|150000|
| Ritvik| 50|      null| 90000|
|Unknown| 20|        15| 50000|
|  Kevin| 50|      null|  null|
+-------+---+----------+------+



In [18]:
# impute using stat values
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols = ['Age','Experience','Salary'],
    outputCols = ['{}_imputed'.format(c) for c in ['Age','Experience','Salary']]).setStrategy("mode")
    

In [19]:
imputer.fit(dfps).transform(dfps).show()

+-------+----+----------+------+-----------+------------------+--------------+
|   Name| Age|Experience|Salary|Age_imputed|Experience_imputed|Salary_imputed|
+-------+----+----------+------+-----------+------------------+--------------+
|Gowtham|  20|         2| 90000|         20|                 2|         90000|
|  James|  17|         2| 90000|         17|                 2|         90000|
|  Wendy|  80|        50|100000|         80|                50|        100000|
|Charles|  47|        19|150000|         47|                19|        150000|
|   Mike|  45|        10|150000|         45|                10|        150000|
| Angela|  44|        10|150000|         44|                10|        150000|
| Ritvik|null|      null| 90000|         20|                 2|         90000|
|   null|  20|        15| 50000|         20|                15|         50000|
|  Kevin|null|      null|  null|         20|                 2|         90000|
+-------+----+----------+------+-----------+--------