In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285387 sha256=b4a0e83a3afb11cb889a1fe119ba3761c3be2654990a32508dab3023506472fd
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [3]:
import pyspark

First let's write code in RDD Level

In [5]:
import pandas as pd
pd.read_csv('/content/test1.csv')

Unnamed: 0,name,age,experience
0,Viswaa,19,10
1,Vignesh,30,8
2,Ganesh,26,4


Creating a Spark Session

In [6]:
from pyspark.sql import SparkSession

In [7]:
spark=SparkSession.builder.appName('Spark_Practise').getOrCreate()

In [8]:
spark

In [9]:
df_pyspark=spark.read.csv("/content/test1.csv")

In [10]:
df_pyspark

DataFrame[_c0: string, _c1: string, _c2: string]

In [11]:
df_pyspark.show()

+-------+---+----------+
|    _c0|_c1|       _c2|
+-------+---+----------+
|   name|age|experience|
| Viswaa| 19|        10|
|Vignesh| 30|         8|
| Ganesh| 26|         4|
+-------+---+----------+



The above means we have two columns and taking A and B as main column thus we are getting c0 and c1 instead of Name and Age we can solve it using below methon named **option - header**

In [12]:
df_pyspark=spark.read.option('header','true').csv('test1.csv')

In [13]:
df_pyspark

DataFrame[name: string, age: string, experience: string]

In [14]:
df_pyspark.show()

+-------+---+----------+
|   name|age|experience|
+-------+---+----------+
| Viswaa| 19|        10|
|Vignesh| 30|         8|
| Ganesh| 26|         4|
+-------+---+----------+



In [15]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [16]:
df_pyspark.head(3)

[Row(name='Viswaa', age='19', experience='10'),
 Row(name='Vignesh', age='30', experience='8'),
 Row(name='Ganesh', age='26', experience='4')]

In [17]:
df_pyspark.printSchema() #similar to .desc

root
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- experience: string (nullable = true)



**DATA FRAMES**

In [18]:
spark=SparkSession.builder.appName('Data Frame').getOrCreate()

Read the dataset

In [19]:
df=spark.read.option('header','true').csv('test1.csv')

In [20]:
df.show()

+-------+---+----------+
|   name|age|experience|
+-------+---+----------+
| Viswaa| 19|        10|
|Vignesh| 30|         8|
| Ganesh| 26|         4|
+-------+---+----------+



In [21]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- experience: string (nullable = true)



By default it is taking everything as string <br/>
It can be changed using inferSchema

In [22]:
df=spark.read.option('header','true').csv('test1.csv',inferSchema=True)

In [23]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience: integer (nullable = true)



Now we can see the data type is correct

**Another way to do the above case**

In [24]:
 df=spark.read.csv("test1.csv",header=True,inferSchema=True)

In [25]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience: integer (nullable = true)



Selecting Columns and Indexing

In [26]:
df.columns

['name', 'age', 'experience']

In [27]:
df.select("name").show() #return type is dataframe (pyspark dataframe) so use .show

+-------+
|   name|
+-------+
| Viswaa|
|Vignesh|
| Ganesh|
+-------+



In [28]:
df.select(["name","Experience"]).show()

+-------+----------+
|   name|Experience|
+-------+----------+
| Viswaa|        10|
|Vignesh|         8|
| Ganesh|         4|
+-------+----------+



**Here slicing will not work**

Checking the datatype

In [29]:
df.dtypes

[('name', 'string'), ('age', 'int'), ('experience', 'int')]

Describe Function

In [30]:
df.describe()

DataFrame[summary: string, name: string, age: string, experience: string]

on .show gives us standard deviation and mean

In [31]:
df.describe().show()

+-------+------+------------------+-----------------+
|summary|  name|               age|       experience|
+-------+------+------------------+-----------------+
|  count|     3|                 3|                3|
|   mean|  null|              25.0|7.333333333333333|
| stddev|  null|5.5677643628300215|3.055050463303893|
|    min|Ganesh|                19|                4|
|    max|Viswaa|                30|               10|
+-------+------+------------------+-----------------+



Adding Columns

In [32]:
df=df.withColumn("Experience After 2 Years",df["experience"]+2)

In [33]:
df.show()

+-------+---+----------+------------------------+
|   name|age|experience|Experience After 2 Years|
+-------+---+----------+------------------------+
| Viswaa| 19|        10|                      12|
|Vignesh| 30|         8|                      10|
| Ganesh| 26|         4|                       6|
+-------+---+----------+------------------------+



Drop Columns

In [34]:
df=df.drop("Experience After 2 Years")

In [35]:
df.show()

+-------+---+----------+
|   name|age|experience|
+-------+---+----------+
| Viswaa| 19|        10|
|Vignesh| 30|         8|
| Ganesh| 26|         4|
+-------+---+----------+



Rename Column

In [36]:
df=df.withColumnRenamed("name","firstname")

In [37]:
df.show()

+---------+---+----------+
|firstname|age|experience|
+---------+---+----------+
|   Viswaa| 19|        10|
|  Vignesh| 30|         8|
|   Ganesh| 26|         4|
+---------+---+----------+



**Handling Missing Values**

In [38]:
from pyspark.sql import SparkSession

In [39]:
spark=SparkSession.builder.appName("Missing Values").getOrCreate()

In [40]:
d=spark.read.csv("/content/tes2.csv",header=True,inferSchema=True)

In [41]:
d.show()

+----------+----+----------+------+
|      Name| Age|Experience|Salary|
+----------+----+----------+------+
|     Krish|  31|        10| 30000|
|Sudharshan|  30|         8| 25000|
|     Sunny|  29|         4| 20000|
|      Paul|  24|         3| 20000|
|    Harsha|  21|         1| 15000|
|   Shubham|  23|         2| 18000|
|    Mahesh|null|      null| 40000|
|      null|  34|        10| 38000|
|      null|  36|      null|  null|
+----------+----+----------+------+



Drop the columns

In [42]:
d.drop("Name").show()

+----+----------+------+
| Age|Experience|Salary|
+----+----------+------+
|  31|        10| 30000|
|  30|         8| 25000|
|  29|         4| 20000|
|  24|         3| 20000|
|  21|         1| 15000|
|  23|         2| 18000|
|null|      null| 40000|
|  34|        10| 38000|
|  36|      null|  null|
+----+----------+------+



Dropping rows with null values

In [43]:
d.na.drop().show()

+----------+---+----------+------+
|      Name|Age|Experience|Salary|
+----------+---+----------+------+
|     Krish| 31|        10| 30000|
|Sudharshan| 30|         8| 25000|
|     Sunny| 29|         4| 20000|
|      Paul| 24|         3| 20000|
|    Harsha| 21|         1| 15000|
|   Shubham| 23|         2| 18000|
+----------+---+----------+------+



how parameter (any or all)

In [44]:
d.na.drop(how="all").show() #here it drops a row only when a row fully is null
#by default it is any

+----------+----+----------+------+
|      Name| Age|Experience|Salary|
+----------+----+----------+------+
|     Krish|  31|        10| 30000|
|Sudharshan|  30|         8| 25000|
|     Sunny|  29|         4| 20000|
|      Paul|  24|         3| 20000|
|    Harsha|  21|         1| 15000|
|   Shubham|  23|         2| 18000|
|    Mahesh|null|      null| 40000|
|      null|  34|        10| 38000|
|      null|  36|      null|  null|
+----------+----+----------+------+



Threshold parameter

In [45]:
d.na.drop(how="any",thresh=2).show() #atleast 2 non null value must be present

+----------+----+----------+------+
|      Name| Age|Experience|Salary|
+----------+----+----------+------+
|     Krish|  31|        10| 30000|
|Sudharshan|  30|         8| 25000|
|     Sunny|  29|         4| 20000|
|      Paul|  24|         3| 20000|
|    Harsha|  21|         1| 15000|
|   Shubham|  23|         2| 18000|
|    Mahesh|null|      null| 40000|
|      null|  34|        10| 38000|
+----------+----+----------+------+



In [46]:
d.na.drop(how="any",thresh=1).show() #atleast 1 non null value must be present

+----------+----+----------+------+
|      Name| Age|Experience|Salary|
+----------+----+----------+------+
|     Krish|  31|        10| 30000|
|Sudharshan|  30|         8| 25000|
|     Sunny|  29|         4| 20000|
|      Paul|  24|         3| 20000|
|    Harsha|  21|         1| 15000|
|   Shubham|  23|         2| 18000|
|    Mahesh|null|      null| 40000|
|      null|  34|        10| 38000|
|      null|  36|      null|  null|
+----------+----+----------+------+



subset parameter - remove null value from specific columns

In [47]:
d.na.drop(how="any",subset=["Experience"]).show()

+----------+---+----------+------+
|      Name|Age|Experience|Salary|
+----------+---+----------+------+
|     Krish| 31|        10| 30000|
|Sudharshan| 30|         8| 25000|
|     Sunny| 29|         4| 20000|
|      Paul| 24|         3| 20000|
|    Harsha| 21|         1| 15000|
|   Shubham| 23|         2| 18000|
|      null| 34|        10| 38000|
+----------+---+----------+------+



Filling missing values

In [48]:
d.na.fill("Missing Values").show()

+--------------+----+----------+------+
|          Name| Age|Experience|Salary|
+--------------+----+----------+------+
|         Krish|  31|        10| 30000|
|    Sudharshan|  30|         8| 25000|
|         Sunny|  29|         4| 20000|
|          Paul|  24|         3| 20000|
|        Harsha|  21|         1| 15000|
|       Shubham|  23|         2| 18000|
|        Mahesh|null|      null| 40000|
|Missing Values|  34|        10| 38000|
|Missing Values|  36|      null|  null|
+--------------+----+----------+------+



In [49]:
d.na.fill("Missing Values","Experience").show()

+----------+----+----------+------+
|      Name| Age|Experience|Salary|
+----------+----+----------+------+
|     Krish|  31|        10| 30000|
|Sudharshan|  30|         8| 25000|
|     Sunny|  29|         4| 20000|
|      Paul|  24|         3| 20000|
|    Harsha|  21|         1| 15000|
|   Shubham|  23|         2| 18000|
|    Mahesh|null|      null| 40000|
|      null|  34|        10| 38000|
|      null|  36|      null|  null|
+----------+----+----------+------+



In [50]:
d.na.fill("Missing Values",["Experience","Age"]).show()

+----------+----+----------+------+
|      Name| Age|Experience|Salary|
+----------+----+----------+------+
|     Krish|  31|        10| 30000|
|Sudharshan|  30|         8| 25000|
|     Sunny|  29|         4| 20000|
|      Paul|  24|         3| 20000|
|    Harsha|  21|         1| 15000|
|   Shubham|  23|         2| 18000|
|    Mahesh|null|      null| 40000|
|      null|  34|        10| 38000|
|      null|  36|      null|  null|
+----------+----+----------+------+



Inputer Function

In [51]:
d.show()

+----------+----+----------+------+
|      Name| Age|Experience|Salary|
+----------+----+----------+------+
|     Krish|  31|        10| 30000|
|Sudharshan|  30|         8| 25000|
|     Sunny|  29|         4| 20000|
|      Paul|  24|         3| 20000|
|    Harsha|  21|         1| 15000|
|   Shubham|  23|         2| 18000|
|    Mahesh|null|      null| 40000|
|      null|  34|        10| 38000|
|      null|  36|      null|  null|
+----------+----+----------+------+



In [52]:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=["Age","Experience","Salary"],outputCols=["{}_inputed".format(c) for c in ["Age","Experience","Salary"]]).setStrategy("mean")

Now we are going to fit and transform

Now Null values will be replaced by mean

In [55]:
imputer.fit(d).transform(d).show()

+----------+----+----------+------+-----------+------------------+--------------+
|      Name| Age|Experience|Salary|Age_inputed|Experience_inputed|Salary_inputed|
+----------+----+----------+------+-----------+------------------+--------------+
|     Krish|  31|        10| 30000|         31|                10|         30000|
|Sudharshan|  30|         8| 25000|         30|                 8|         25000|
|     Sunny|  29|         4| 20000|         29|                 4|         20000|
|      Paul|  24|         3| 20000|         24|                 3|         20000|
|    Harsha|  21|         1| 15000|         21|                 1|         15000|
|   Shubham|  23|         2| 18000|         23|                 2|         18000|
|    Mahesh|null|      null| 40000|         28|                 5|         40000|
|      null|  34|        10| 38000|         34|                10|         38000|
|      null|  36|      null|  null|         36|                 5|         25750|
+----------+----

We can do the above method using median and mode as well

Filter Operation

In [56]:
from pyspark.sql import SparkSession

In [57]:
spark=SparkSession.builder.appName("Filter").getOrCreate()

In [60]:
d=spark.read.csv("/content/test3.csv",header=True,inferSchema=True)
d.show()

+----------+---+----------+------+
|      Name|Age|Experience|Salary|
+----------+---+----------+------+
|     Krish| 31|        10| 30000|
|Sudharshan| 30|         8| 25000|
|     Sunny| 29|         4| 20000|
|      Paul| 24|         3| 20000|
|    Harsha|  2|         1| 15000|
|   Shubham| 23|         2| 18000|
+----------+---+----------+------+



salary of people less than or equal to 20000

In [61]:
d.filter("Salary<=20000").show()

+-------+---+----------+------+
|   Name|Age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha|  2|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



To select particular columns along with filter

In [62]:
d.filter("Salary<=20000").select(["Name","Age"]).show()

+-------+---+
|   Name|Age|
+-------+---+
|  Sunny| 29|
|   Paul| 24|
| Harsha|  2|
|Shubham| 23|
+-------+---+



Another way

In [63]:
d.filter(d["Salary"]<=20000).show()

+-------+---+----------+------+
|   Name|Age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha|  2|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



More than one condition

In [66]:
d.filter((d["Salary"]<=20000) &
         (d["Salary"]>=15000)).show()

+-------+---+----------+------+
|   Name|Age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha|  2|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



In [67]:
d.filter((d["Salary"]<=20000) |
         (d["Salary"]>=15000)).show()

+----------+---+----------+------+
|      Name|Age|Experience|Salary|
+----------+---+----------+------+
|     Krish| 31|        10| 30000|
|Sudharshan| 30|         8| 25000|
|     Sunny| 29|         4| 20000|
|      Paul| 24|         3| 20000|
|    Harsha|  2|         1| 15000|
|   Shubham| 23|         2| 18000|
+----------+---+----------+------+



In [68]:
d.filter(~(d["Salary"]<=20000)).show()

+----------+---+----------+------+
|      Name|Age|Experience|Salary|
+----------+---+----------+------+
|     Krish| 31|        10| 30000|
|Sudharshan| 30|         8| 25000|
+----------+---+----------+------+

