## PySpark Introduction

In [1]:
import pyspark

In [2]:
import pandas as pd
pd.read_csv('test1.csv')

Unnamed: 0,Name,age,Unnamed: 2,Unnamed: 3,Unnamed: 4
0,Subject1,31,,,
1,Subject2,29,,,
2,Subject3,28,,,


In [3]:
from pyspark.sql import SparkSession

In [4]:
spark=SparkSession.builder.appName("test").master("local[*]").config("spark.driver.bindAddress", "127.0.0.1").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/03 17:40:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
spark

In [6]:
df_pyspark = spark.read.csv('test1.csv')

In [7]:
df_pyspark.show()

+--------+---+----+----+----+
|     _c0|_c1| _c2| _c3| _c4|
+--------+---+----+----+----+
|    Name|age|null|null|null|
|Subject1| 31|null|null|null|
|Subject2| 29|null|null|null|
|Subject3| 28|null|null|null|
+--------+---+----+----+----+



In [8]:
df_pyspark = spark.read.option('header','true').csv('test1.csv')

In [9]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [10]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)



## PySpark DataFrames

In [11]:
 from pyspark.sql import SparkSession

In [12]:
spark = SparkSession.builder.appName('DataFrame').getOrCreate()

22/11/03 17:40:45 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [13]:
spark

In [14]:
## read the dataset
df_spark = spark.read.option('header','true').csv('test1.csv')
df_spark.show()

22/11/03 17:40:45 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Name, age, , , 
 Schema: Name, age, _c2, _c3, _c4
Expected: _c2 but found: 
CSV file: file:///Users/juan/Documents/DE%20projects/pyspark_tutorial/test1.csv
+--------+---+----+----+----+
|    Name|age| _c2| _c3| _c4|
+--------+---+----+----+----+
|Subject1| 31|null|null|null|
|Subject2| 29|null|null|null|
|Subject3| 28|null|null|null|
+--------+---+----+----+----+



In [15]:
## Check the schema
df_spark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)



In [16]:
df_spark = spark.read.csv('test1.csv', header=True, inferSchema=True)
df_spark.show()

22/11/03 17:40:45 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Name, age, , , 
 Schema: Name, age, _c2, _c3, _c4
Expected: _c2 but found: 
CSV file: file:///Users/juan/Documents/DE%20projects/pyspark_tutorial/test1.csv
+--------+---+----+----+----+
|    Name|age| _c2| _c3| _c4|
+--------+---+----+----+----+
|Subject1| 31|null|null|null|
|Subject2| 29|null|null|null|
|Subject3| 28|null|null|null|
+--------+---+----+----+----+



In [17]:
type(df_spark)

pyspark.sql.dataframe.DataFrame

In [18]:
## Show columns name
df_spark.columns

['Name', 'age', '_c2', '_c3', '_c4']

In [19]:
## Select one/multiple columns
df_spark.select(['Name', 'age']).show()

+--------+---+
|    Name|age|
+--------+---+
|Subject1| 31|
|Subject2| 29|
|Subject3| 28|
+--------+---+



In [20]:
## Check data types
df_spark.dtypes

[('Name', 'string'),
 ('age', 'int'),
 ('_c2', 'string'),
 ('_c3', 'string'),
 ('_c4', 'string')]

In [21]:
df_spark.describe().show()

22/11/03 17:40:45 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Name, age, , , 
 Schema: Name, age, _c2, _c3, _c4
Expected: _c2 but found: 
CSV file: file:///Users/juan/Documents/DE%20projects/pyspark_tutorial/test1.csv
+-------+--------+------------------+----+----+----+
|summary|    Name|               age| _c2| _c3| _c4|
+-------+--------+------------------+----+----+----+
|  count|       3|                 3|   0|   0|   0|
|   mean|    null|29.333333333333332|null|null|null|
| stddev|    null|1.5275252316519468|null|null|null|
|    min|Subject1|                28|null|null|null|
|    max|Subject3|                31|null|null|null|
+-------+--------+------------------+----+----+----+



In [22]:
## Drop columns
df_sparkr = df_spark.drop('_c2','_c3','_c4')
df_sparkr

DataFrame[Name: string, age: int]

In [23]:
## Adding columns in data frame
from pyspark.sql.functions import when, lit
df_spark = df_sparkr.withColumn('Experience',
                     when((df_sparkr.Name == 'Subject1'), lit('10')).
                     when((df_sparkr.Name == 'Subject2'), lit('8')).
                     when((df_sparkr.Name == 'Subject3'), lit('4')))
df_spark = df_spark.withColumn('Experience After 2 years', df_spark['Experience']+2)
df_spark.show()

+--------+---+----------+------------------------+
|    Name|age|Experience|Experience After 2 years|
+--------+---+----------+------------------------+
|Subject1| 31|        10|                    12.0|
|Subject2| 29|         8|                    10.0|
|Subject3| 28|         4|                     6.0|
+--------+---+----------+------------------------+



In [24]:
## Rename column
df_spark.withColumnRenamed('age','Age').show()

+--------+---+----------+------------------------+
|    Name|Age|Experience|Experience After 2 years|
+--------+---+----------+------------------------+
|Subject1| 31|        10|                    12.0|
|Subject2| 29|         8|                    10.0|
|Subject3| 28|         4|                     6.0|
+--------+---+----------+------------------------+



## Handling Missing Values

In [25]:
df_pyspark2 = spark.read.csv('test2.csv', header=True, inferSchema=True)
df_pyspark2.show()

+-------+----+----------+------+
|   Name| age|Experience|Salary|
+-------+----+----------+------+
|  Krish|  31|        10| 30000|
| Sudhan|  30|         8| 25000|
|  Sunny|  29|         4| 20000|
|   Paul|  24|         3| 20000|
| Harsha|  21|         1| 15000|
|Shubham|  23|         2| 18000|
| Mahesh|null|      null| 40000|
|   null|  34|        10| 38000|
|   null|  36|      null|  null|
+-------+----+----------+------+



In [26]:
## Drop all rows containing NULL values
df_pyspark2.na.drop().show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Krish| 31|        10| 30000|
| Sudhan| 30|         8| 25000|
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



In [27]:
## how == 'any' (default). If how == 'all' with will only remove rows with all NULL values
df_pyspark2.na.drop(how='all').show()

+-------+----+----------+------+
|   Name| age|Experience|Salary|
+-------+----+----------+------+
|  Krish|  31|        10| 30000|
| Sudhan|  30|         8| 25000|
|  Sunny|  29|         4| 20000|
|   Paul|  24|         3| 20000|
| Harsha|  21|         1| 15000|
|Shubham|  23|         2| 18000|
| Mahesh|null|      null| 40000|
|   null|  34|        10| 38000|
|   null|  36|      null|  null|
+-------+----+----------+------+



In [28]:
## how many NULL values per row are the criterion to remove the row
df_pyspark2.na.drop(how='any', thresh=2).show()

+-------+----+----------+------+
|   Name| age|Experience|Salary|
+-------+----+----------+------+
|  Krish|  31|        10| 30000|
| Sudhan|  30|         8| 25000|
|  Sunny|  29|         4| 20000|
|   Paul|  24|         3| 20000|
| Harsha|  21|         1| 15000|
|Shubham|  23|         2| 18000|
| Mahesh|null|      null| 40000|
|   null|  34|        10| 38000|
+-------+----+----------+------+



In [29]:
## Subset: Remove rows with NULL values from column 'Experience'
df_pyspark2.na.drop(how='any', subset=['Experience']).show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Krish| 31|        10| 30000|
| Sudhan| 30|         8| 25000|
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
|   null| 34|        10| 38000|
+-------+---+----------+------+



In [30]:
## Filling the missing value
df_pyspark2.na.fill(0, ['Experience', 'age']).show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Krish| 31|        10| 30000|
| Sudhan| 30|         8| 25000|
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
| Mahesh|  0|         0| 40000|
|   null| 34|        10| 38000|
|   null| 36|         0|  null|
+-------+---+----------+------+



In [31]:
## Replace NULL values with mean of column
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['age', 'Experience', 'Salary'],
    outputCols=['{}_imputed'.format(c) for c in ['age', 'Experience', 'Salary']]
    ).setStrategy('mean')

In [32]:
imputer.fit(df_pyspark2).transform(df_pyspark2).show()

+-------+----+----------+------+-----------+------------------+--------------+
|   Name| age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+-------+----+----------+------+-----------+------------------+--------------+
|  Krish|  31|        10| 30000|         31|                10|         30000|
| Sudhan|  30|         8| 25000|         30|                 8|         25000|
|  Sunny|  29|         4| 20000|         29|                 4|         20000|
|   Paul|  24|         3| 20000|         24|                 3|         20000|
| Harsha|  21|         1| 15000|         21|                 1|         15000|
|Shubham|  23|         2| 18000|         23|                 2|         18000|
| Mahesh|null|      null| 40000|         28|                 5|         40000|
|   null|  34|        10| 38000|         34|                10|         38000|
|   null|  36|      null|  null|         36|                 5|         25750|
+-------+----+----------+------+-----------+--------

## Filter operation

In [33]:
df_pyspark3 = spark.read.option('header','true').csv('test3.csv')
df_pyspark3.show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Krish| 31|        10| 30000|
| Sudhan| 30|         8| 25000|
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



In [34]:
## Salary of the people less than or equal to 20000
df_pyspark3.filter('Salary<=20000').show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



In [35]:
## Select 'Name' and 'age' columns, filtered by Salary
df_pyspark3.filter('Salary<=20000').select(['Name', 'age']).show()

+-------+---+
|   Name|age|
+-------+---+
|  Sunny| 29|
|   Paul| 24|
| Harsha| 21|
|Shubham| 23|
+-------+---+



In [36]:
df_pyspark3.filter(df_pyspark3['Salary']<=20000).show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



In [37]:
## Filter Salary between 20000 and 15000   ------ & == and, | == or, ~ == not
df_pyspark3.filter((df_pyspark3['Salary']<=20000) & (df_pyspark3['Salary']>=15000)).show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



## GroupBy and Aggregate Functions

In [38]:
spark2=SparkSession.builder.appName('Agg').getOrCreate()

22/11/03 17:40:47 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [39]:
spark2

In [40]:
 df_pyspark4 = spark.read.option('header','true').csv('test4.csv')
df_pyspark4.show()

+---------+------------+------+
|     Name| Departments|Salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+



In [41]:
## Change Salary column str -> int
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
df_pyspark4 = df_pyspark4.withColumn('Salary', col('Salary').cast('int'))
df_pyspark4.dtypes

[('Name', 'string'), ('Departments', 'string'), ('Salary', 'int')]

In [42]:
## GroupBy
## Grouped by name to find the sum salary
df_pyspark4.groupBy('Name').sum().show()

+---------+-----------+
|     Name|sum(Salary)|
+---------+-----------+
|Sudhanshu|      35000|
|    Sunny|      12000|
|    Krish|      19000|
|   Mahesh|       7000|
+---------+-----------+



In [43]:
## Grouped by Departments to find the sum salary
df_pyspark4.groupBy('Departments').sum().show()

+------------+-----------+
| Departments|sum(Salary)|
+------------+-----------+
|         IOT|      15000|
|    Big Data|      15000|
|Data Science|      43000|
+------------+-----------+



In [44]:
## Grouped by Departments to find the mean salary
df_pyspark4.groupBy('Departments').mean().show()

+------------+-----------+
| Departments|avg(Salary)|
+------------+-----------+
|         IOT|     7500.0|
|    Big Data|     3750.0|
|Data Science|    10750.0|
+------------+-----------+



In [45]:
## Aggregate
df_pyspark4.agg({'Salary':'sum'}).show()

+-----------+
|sum(Salary)|
+-----------+
|      73000|
+-----------+

