# PySpark Learning 

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 1.0 MB/s eta 0:00:0101     |██████████████████████████████▎ | 266.0 MB 1.1 MB/s eta 0:00:15
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 43.4 MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845513 sha256=026dda9730e4a1d43a21cff954f7c16280ca77b1d27ee2282bf612e754cd70b4
  Stored in directory: /Users/pardeepsingh/Library/Caches/pip/wheels/51/c8/18/298a4ced8ebb3ab8a7d26a7198c0cc7035abb906bde94a4c4b
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [1]:
import pyspark

In [2]:
import pandas as pd
data = pd.read_csv("test1.csv")

In [3]:
data

Unnamed: 0,Name,Age,Experience
0,Pardeep,23,10
1,Jasmeet,24,9
2,Gurkirat,15,2
3,Shivjeet,23,4


# TO START SPARK SESSION

In [4]:
#IMPORT LIBRARY FIRST

from pyspark.sql import SparkSession

In [5]:
#Start session by defining name of the session

# syntax:  var name = SparkSession.builder.appName("Name of the session").getOrCreate()

spark = SparkSession.builder.appName("Practice").getOrCreate()


22/11/13 19:15:02 WARN Utils: Your hostname, Pardeeps-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.48 instead (on interface en0)
22/11/13 19:15:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/13 19:15:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
spark

# reading data and setting headers using options

There are multiple ways to read data and some of them are:

In [7]:
# Method 1
df_pyspark = spark.read.option('header','true').csv('test1.csv')
df_pyspark.show()

+--------+---+----------+
|    Name|Age|Experience|
+--------+---+----------+
| Pardeep| 23|        10|
| Jasmeet| 24|         9|
|Gurkirat| 15|         2|
|Shivjeet| 23|         4|
+--------+---+----------+



In [8]:
# Method 2
# inferSchema is being used to automatically detect the datatype of the column

df_pyspark = spark.read.csv("test1.csv", header= True, inferSchema= True)
df_pyspark.show()

+--------+---+----------+
|    Name|Age|Experience|
+--------+---+----------+
| Pardeep| 23|        10|
| Jasmeet| 24|         9|
|Gurkirat| 15|         2|
|Shivjeet| 23|         4|
+--------+---+----------+



# to view the schema of df

In [9]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)



## to get column names from dataframe

In [10]:
df_pyspark.columns

['Name', 'Age', 'Experience']

## selecting multiple columns instead of complete dataset

In [11]:
# selecting only one particular column
name_column = df_pyspark.select("Name")
name_column.show()

+--------+
|    Name|
+--------+
| Pardeep|
| Jasmeet|
|Gurkirat|
|Shivjeet|
+--------+



In [12]:
# selecting multiple columns

multiple_columns = df_pyspark.select(['Name','Experience'])
multiple_columns.show()

+--------+----------+
|    Name|Experience|
+--------+----------+
| Pardeep|        10|
| Jasmeet|         9|
|Gurkirat|         2|
|Shivjeet|         4|
+--------+----------+



## to check data types

In [13]:
df_pyspark.dtypes

[('Name', 'string'), ('Age', 'int'), ('Experience', 'int')]

## to check mean, std deviation, count, min or max operations

In [14]:
df_pyspark.describe().show()

+-------+--------+-----------------+------------------+
|summary|    Name|              Age|        Experience|
+-------+--------+-----------------+------------------+
|  count|       4|                4|                 4|
|   mean|    null|            21.25|              6.25|
| stddev|    null|4.193248541803041|3.8622100754188224|
|    min|Gurkirat|               15|                 2|
|    max|Shivjeet|               24|                10|
+-------+--------+-----------------+------------------+



## adding new column

In [15]:
## adding new column based on existing column

df_pyspark = df_pyspark.withColumn('Experience after 2 years', df_pyspark['Experience']+2,)

In [16]:
df_pyspark.show()

+--------+---+----------+------------------------+
|    Name|Age|Experience|Experience after 2 years|
+--------+---+----------+------------------------+
| Pardeep| 23|        10|                      12|
| Jasmeet| 24|         9|                      11|
|Gurkirat| 15|         2|                       4|
|Shivjeet| 23|         4|                       6|
+--------+---+----------+------------------------+



## drop column

In [17]:
df_pyspark = df_pyspark.drop('Experience after 2 years')
df_pyspark.show()

+--------+---+----------+
|    Name|Age|Experience|
+--------+---+----------+
| Pardeep| 23|        10|
| Jasmeet| 24|         9|
|Gurkirat| 15|         2|
|Shivjeet| 23|         4|
+--------+---+----------+



# rename the column

In [18]:
df_pyspark = df_pyspark.withColumnRenamed("Name","Names")


# Handling Missing Values

In pyspark as pandas or numpy, missing values can be handled by following approaches:
1. Dropping columns
2. Dropping rows
3. Various Parameter in dropping functionalities
4. Handling Missing values by Mean, Median or Mode.

In [20]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Practice").getOrCreate()

In [21]:

df_pyspark = spark.read.csv("test1.csv", header= True, inferSchema= True)
df_pyspark.show()

+--------+----+----------+------+
|    Name| Age|Experience|Salary|
+--------+----+----------+------+
| Pardeep|  23|        10| 10000|
| Jasmeet|  24|         9|  4000|
|Gurkirat|  15|         2|  5000|
|Shivjeet|  23|         4|  9000|
|  Ranjit|null|         3|  7000|
|    null|  33|      null| 45000|
| Navneet|null|      null|  8000|
+--------+----+----------+------+



In [22]:
# Dropping columns

df_pyspark.drop('Name').show()

+----+----------+------+
| Age|Experience|Salary|
+----+----------+------+
|  23|        10| 10000|
|  24|         9|  4000|
|  15|         2|  5000|
|  23|         4|  9000|
|null|         3|  7000|
|  33|      null| 45000|
|null|      null|  8000|
+----+----------+------+



In [23]:
# dropping rows

df_pyspark.na.drop().show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
| Pardeep| 23|        10| 10000|
| Jasmeet| 24|         9|  4000|
|Gurkirat| 15|         2|  5000|
|Shivjeet| 23|         4|  9000|
+--------+---+----------+------+



.na.drop() has three parameters which are  <b>how, thresh and subset </b> They have different functionalities. Please see the examples below

In [25]:
## using how = all
## by default how is any

# the difference is that when how is all it will only delete the rows which are entirely null, however if there is a e
#even single value for that particular row it wont be dropped. Same as pandas dataframe


df_pyspark.na.drop(how='all').show()

+--------+----+----------+------+
|    Name| Age|Experience|Salary|
+--------+----+----------+------+
| Pardeep|  23|        10| 10000|
| Jasmeet|  24|         9|  4000|
|Gurkirat|  15|         2|  5000|
|Shivjeet|  23|         4|  9000|
|  Ranjit|null|         3|  7000|
|    null|  33|      null| 45000|
| Navneet|null|      null|  8000|
+--------+----+----------+------+



In [26]:
# as we can see that it has null values because we dont have any row where all the values are null

In [33]:
## using thresh as parameter

df_pyspark.na.drop(thresh= 3).show()

+--------+----+----------+------+
|    Name| Age|Experience|Salary|
+--------+----+----------+------+
| Pardeep|  23|        10| 10000|
| Jasmeet|  24|         9|  4000|
|Gurkirat|  15|         2|  5000|
|Shivjeet|  23|         4|  9000|
|  Ranjit|null|         3|  7000|
+--------+----+----------+------+



In [34]:
# when we set thresh in parameter, it check if rows has provided number of non null values, if it does not it will
#drop the row.

# in example above, we kept the rows where there were atleast values were non null and anything below that was dropped

In [35]:

## subset

#subset is used to drop rows based on particular column. For example if there are null values in age, it will remove those rows

df_pyspark.na.drop(subset=['Age']).show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
| Pardeep| 23|        10| 10000|
| Jasmeet| 24|         9|  4000|
|Gurkirat| 15|         2|  5000|
|Shivjeet| 23|         4|  9000|
|    null| 33|      null| 45000|
+--------+---+----------+------+



In [36]:
# as we can see that it has only removed records where age was null

In [41]:
## Filling with missing values

df_pyspark.na.fill('Missing Value', 'Name').show()

# in the following df we can see that the data frame has replaced null value for Name, however Missing value 
# is a string so it did not worked for another columns because of different dataset.



+-------------+----+----------+------+
|         Name| Age|Experience|Salary|
+-------------+----+----------+------+
|      Pardeep|  23|        10| 10000|
|      Jasmeet|  24|         9|  4000|
|     Gurkirat|  15|         2|  5000|
|     Shivjeet|  23|         4|  9000|
|       Ranjit|null|         3|  7000|
|Missing Value|  33|      null| 45000|
|      Navneet|null|      null|  8000|
+-------------+----+----------+------+



In [42]:
### replacing null values with mean, median or mode

## Imputer function is being used for this as SQL


from pyspark.ml.feature import Imputer

imputer = Imputer(
        
        inputCols=['Age', 'Experience', 'Salary'],
        outputCols= ['{}_imputed'.format(c) for c in ['Age', 'Experience', 'Salary']]
        ).setStrategy("mean") # here we can simply replace mean with median or mode and it will be executed.



In [44]:

imputer.fit(df_pyspark).transform(df_pyspark).show()

# here we can see that we have created new imputed columns where we have used mean to replace the null values.


+--------+----+----------+------+-----------+------------------+--------------+
|    Name| Age|Experience|Salary|Age_imputed|Experience_imputed|Salary_imputed|
+--------+----+----------+------+-----------+------------------+--------------+
| Pardeep|  23|        10| 10000|         23|                10|         10000|
| Jasmeet|  24|         9|  4000|         24|                 9|          4000|
|Gurkirat|  15|         2|  5000|         15|                 2|          5000|
|Shivjeet|  23|         4|  9000|         23|                 4|          9000|
|  Ranjit|null|         3|  7000|         23|                 3|          7000|
|    null|  33|      null| 45000|         33|                 5|         45000|
| Navneet|null|      null|  8000|         23|                 5|          8000|
+--------+----+----------+------+-----------+------------------+--------------+



# Filter operations

In [48]:
## filter based on condition
df_pyspark.filter('Salary <=20000').show()

+--------+----+----------+------+
|    Name| Age|Experience|Salary|
+--------+----+----------+------+
| Pardeep|  23|        10| 10000|
| Jasmeet|  24|         9|  4000|
|Gurkirat|  15|         2|  5000|
|Shivjeet|  23|         4|  9000|
|  Ranjit|null|         3|  7000|
| Navneet|null|      null|  8000|
+--------+----+----------+------+



In [49]:
## displaying only particular columns based on condition

df_pyspark.filter('Salary <=20000').select(['Name','Age']).show()

+--------+----+
|    Name| Age|
+--------+----+
| Pardeep|  23|
| Jasmeet|  24|
|Gurkirat|  15|
|Shivjeet|  23|
|  Ranjit|null|
| Navneet|null|
+--------+----+

