# **Operations performed in spark dataframe**

Once if you load your data as a dataframe using pyspark, there are lot many operations available in spark dataframe which will be more useful in analyzing the dara in a better way. In this notebook let's discuss how to



> Execute SQL commands in a spark dataframe.


> Query the spark dataframe using filter with single and multiple conditional operators.


> Converting a query result (spark datframe) into a json(python dictionary).


> Groupby,Orderby and aggregate functions

> Filling and handling null/empty values









In [1]:
#As an initial step let's create a spark session.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("df_ops").getOrCreate()

In [2]:
#Reading the data using spark as a dataframe.
df=spark.read.csv("Data1.csv", inferSchema=True, header=True)

In [3]:
df.show()

+-----------+----+
|       Name| Age|
+-----------+----+
|Swaminathan|  24|
|      Peter|null|
|        Sam|  54|
|      Henry|null|
+-----------+----+



In [4]:
#Creating one new column as "NewAge" in the dataframe.
df=df.withColumn("NewAge",df["Age"]*2)
df.show()

+-----------+----+------+
|       Name| Age|NewAge|
+-----------+----+------+
|Swaminathan|  24|    48|
|      Peter|null|  null|
|        Sam|  54|   108|
|      Henry|null|  null|
+-----------+----+------+



## **Executing SQL commands on spark dataframe**

Querying using SQL commands over a spark dataframe is possible by creating a temporary view of the entire dataframe as shown below.

In [5]:
# I have created a temporary view by replicating the entire spark dataframe in the name of "employee."
df.createOrReplaceTempView("employee")

Getting data of a employee whose age is less than or equal to 25.

In [6]:
#Getting Employee's age less than or equal to 25
spark.sql("SELECT * FROM employee WHERE Age<=25").show()

+-----------+---+------+
|       Name|Age|NewAge|
+-----------+---+------+
|Swaminathan| 24|    48|
+-----------+---+------+



Getting name of the employee whose age is beyond 50.

In [7]:
#Getting name of the employee whose age is beyond 50.
spark.sql("SELECT Name FROM employee WHERE Age>=50").show()

+----+
|Name|
+----+
| Sam|
+----+



Getting name of the person whose age is 24 and newage is 48

In [8]:
#Getting name of the employee whose age is 24 and newage is 48.
spark.sql("SELECT Name FROM employee WHERE Age==24 AND NewAge==48").show()

+-----------+
|       Name|
+-----------+
|Swaminathan|
+-----------+



## **Query the spark dataframe using filter**

As like executing SQL queries in a spark dataframe, we can also query a spark dataframe using the filter() that is available for pyspark dataframes. 

### **Filtering using single condition**

Let's see how to filter a spark dataframe using one condition.

In [9]:
#Getting details of a employee whose age is above 50.
df.filter("Age > 50").show()

+----+---+------+
|Name|Age|NewAge|
+----+---+------+
| Sam| 54|   108|
+----+---+------+



You can also do the same query in another way as shown below

In [10]:
df.filter(df["Age"]>50).show()

+----+---+------+
|Name|Age|NewAge|
+----+---+------+
| Sam| 54|   108|
+----+---+------+



In [11]:
#Let's choose two columns out from the filter result using select()
df.filter(df["Age"]>50).select(["Name","Age"]).show()

+----+---+
|Name|Age|
+----+---+
| Sam| 54|
+----+---+



### **Filtering using multiple conditions**

Basically you can query a datframe with one or more than one conditions using filter(), those multiple conditions will be declared using the logical operators like  (AND)& , (OR)|, (NOT)~ . Spark dataframe filter supports logical operations declared only in the symbol rather than declaring those operators in words, and also the conditions should be enclosed within the brackets (). If you don't do so then it may throws an error. 

In [12]:
#Filtering using multiple condition
df.filter((df["Age"]>20) & (df["Name"]=="Swaminathan")).show()

+-----------+---+------+
|       Name|Age|NewAge|
+-----------+---+------+
|Swaminathan| 24|    48|
+-----------+---+------+



In [13]:
#Choosing specific columns of a filtered spark dataframe
df.filter((df["Age"]>20) & (df["Name"]=="Swaminathan")).select(["Name","Age"]).show()

+-----------+---+
|       Name|Age|
+-----------+---+
|Swaminathan| 24|
+-----------+---+



## **Converting a filter query result to json(python dictionary)**

You can convert a filtered query result which is basically in the form of a spark dataframe into a python dictionary(json) using the collect() and asDict() method available in pyspark and python.

In [14]:
#Let's filter the record of a individual whose new age is beyond 60.
df.filter(df["NewAge"]>60).show()

+----+---+------+
|Name|Age|NewAge|
+----+---+------+
| Sam| 54|   108|
+----+---+------+



In [15]:
#Later then let's assign the filter result to a variable
filter_result = df.filter(df["NewAge"]>60).collect()

In [16]:
#The result would be a list of rows.
filter_result

[Row(Name='Sam', Age=54, NewAge=108)]

In [17]:
#Let's convert the result into a python dictionary
filter_result[0].asDict()

{'Name': 'Sam', 'Age': 54, 'NewAge': 108}

In [18]:
#Let's try to get the "Name" from the dictionary
#Let's convert the result into a python dictionary
filter_result[0].asDict()["Name"]

'Sam'

## **Group By, Orderby and aggregate functions**

  These functions will be useful in combining multiple simlar rows/records based on some conditions.This way you can explore the data in a better way. 

In [19]:
#Loading the car_sales data.
df = spark.read.csv("car_sales.csv",inferSchema=True,header=True)

Note:This is a custom created data for this demo purpose, this does not depict the original sales of any car companies.

In [20]:
df.printSchema()

root
 |-- Brand: string (nullable = true)
 |-- Price: integer (nullable = true)



In [21]:
df.show()

+------------------+-------+
|             Brand|  Price|
+------------------+-------+
|              BENZ|4500000|
|               BMW|6000000|
|              AUDI|6400000|
|              FORD|5345532|
|               BMW|2345452|
|              BENZ|3564577|
|              AUDI|5453453|
|              FORD|7854085|
|Morris and Garages|8957405|
|             SKODA|6859358|
|              JEEP|4309584|
|        VOLKSWAGEN|5487504|
|              BENZ|8465047|
|Morris and Garages|6398535|
|        VOLKSWAGEN|3247504|
|              JEEP|6437453|
+------------------+-------+



### **Group by**

Group by will helps you to combine similar row entries with a standard operation like(sum,mean) etc,

In [22]:
#Making a groupby of car company names by making a average of their sales value.
df.groupBy("Brand").mean().show()

+------------------+-----------------+
|             Brand|       avg(Price)|
+------------------+-----------------+
|             SKODA|        6859358.0|
|              AUDI|        5926726.5|
|              FORD|        6599808.5|
|Morris and Garages|        7677970.0|
|              JEEP|        5373518.5|
|               BMW|        4172726.0|
|        VOLKSWAGEN|        4367504.0|
|              BENZ|5509874.666666667|
+------------------+-----------------+



In [23]:
#Making a groupby of car company names by making a count of their each occorence.
df.groupBy("Brand").count().show()

+------------------+-----+
|             Brand|count|
+------------------+-----+
|             SKODA|    1|
|              AUDI|    2|
|              FORD|    2|
|Morris and Garages|    2|
|              JEEP|    2|
|               BMW|    2|
|        VOLKSWAGEN|    2|
|              BENZ|    3|
+------------------+-----+



### **Aggregate functions**

Aggregate function is more same to groupby but it will only displays the certain column value instead of displaying the entire pyspark dataframe

In [24]:
#Lets aggregate the sum of sales made by all the car companies
df.agg({"Price":"sum"}).show()

+----------+
|sum(Price)|
+----------+
|  91625489|
+----------+



In [25]:
#Lets aggregate the average of sales made by all the car companies
df.agg({"Price":"avg"}).show()

+------------+
|  avg(Price)|
+------------+
|5726593.0625|
+------------+



In [26]:
#Lets aggregate the maximum of sales made by the car company
df.agg({"Price":"max"}).show()

+----------+
|max(Price)|
+----------+
|   8957405|
+----------+



In [27]:
df.agg({"Brand":"count"}).show()

+------------+
|count(Brand)|
+------------+
|          16|
+------------+



### **Executing spark functions using aggregate**

There are some available functions in pyspark such as stddev,mean,count_distinct that used to make operations in a spark dataframe.

In [28]:
#Importing spark dataframe functions like standard deviaton,mean and getting count of distinct values.
from pyspark.sql.functions import stddev, mean, count_distinct, format_number

In [29]:
#Getting standard deviation of price using spark function.
#alias() will let you name the aggregated column instead of the default name.
#format_number() will reduces the decimal positions of the calculated value.
df.select(format_number(stddev("Price"),2).alias("Standard deviation of price")).show()

+---------------------------+
|Standard deviation of price|
+---------------------------+
|               1,848,845.46|
+---------------------------+



In [30]:
#Getting mean(average) of price using spark function.
df.select(mean("Price").alias("Average of price")).show()

+----------------+
|Average of price|
+----------------+
|    5726593.0625|
+----------------+



In [31]:
#Getting counts of unique brand names using spark function.
df.select(count_distinct("Brand").alias("Unique brand")).show()

+------------+
|Unique brand|
+------------+
|           8|
+------------+



### **Order by**

The orderby functionality is used to arrange the ro w elements in ascending and descending order 

In [32]:
#Ordering brand names using order by
#By default it is in ascending order
df.orderBy("Brand").show()

+------------------+-------+
|             Brand|  Price|
+------------------+-------+
|              AUDI|6400000|
|              AUDI|5453453|
|              BENZ|3564577|
|              BENZ|8465047|
|              BENZ|4500000|
|               BMW|6000000|
|               BMW|2345452|
|              FORD|5345532|
|              FORD|7854085|
|              JEEP|6437453|
|              JEEP|4309584|
|Morris and Garages|8957405|
|Morris and Garages|6398535|
|             SKODA|6859358|
|        VOLKSWAGEN|5487504|
|        VOLKSWAGEN|3247504|
+------------------+-------+



In [33]:
#Ordering brand names by descending order
df.orderBy("Brand",ascending=False).show()

+------------------+-------+
|             Brand|  Price|
+------------------+-------+
|        VOLKSWAGEN|5487504|
|        VOLKSWAGEN|3247504|
|             SKODA|6859358|
|Morris and Garages|6398535|
|Morris and Garages|8957405|
|              JEEP|4309584|
|              JEEP|6437453|
|              FORD|5345532|
|              FORD|7854085|
|               BMW|2345452|
|               BMW|6000000|
|              BENZ|4500000|
|              BENZ|3564577|
|              BENZ|8465047|
|              AUDI|6400000|
|              AUDI|5453453|
+------------------+-------+



In [34]:
#Using group and order by together
df.groupBy("Brand").sum().orderBy("Brand").show()

+------------------+----------+
|             Brand|sum(Price)|
+------------------+----------+
|              AUDI|  11853453|
|              BENZ|  16529624|
|               BMW|   8345452|
|              FORD|  13199617|
|              JEEP|  10747037|
|Morris and Garages|  15355940|
|             SKODA|   6859358|
|        VOLKSWAGEN|   8735008|
+------------------+----------+



## **Filling/handling null and empty values**

Whenever if you have data, there will be more chance of occurence for null or empty row values on some parts of the dataframe. Basically this leads to information redundancy and inorder to overcome this let's see how to handle the null values in a spark dataframe.

In [35]:
#Loading the dataset.
df = spark.read.csv("Data1.csv", inferSchema=True, header=True)

In [36]:
df.show()

+-----------+----+
|       Name| Age|
+-----------+----+
|Swaminathan|  24|
|      Peter|null|
|        Sam|  54|
|      Henry|null|
+-----------+----+



In the above dataset we have certain rows of the age column has null values, let's try to remove/resolve that using pyspark builtin methods

In [37]:
#Dropping null values
#Will generally remove the entire row which has null value for a column
df.na.drop().show()

+-----------+---+
|       Name|Age|
+-----------+---+
|Swaminathan| 24|
|        Sam| 54|
+-----------+---+



In [38]:
#Setting threshold for dropping null
#Threshold means the minimum amount of null values that should be there for a specific row
#We get the full dataframe here because we only have two columns.
df.na.drop(thresh=1).show()

+-----------+----+
|       Name| Age|
+-----------+----+
|Swaminathan|  24|
|      Peter|null|
|        Sam|  54|
|      Henry|null|
+-----------+----+



#### **Dropping null values based on methods**

In [39]:

#Drops any null values within the dataframe
df.na.drop(how="any").show()

+-----------+---+
|       Name|Age|
+-----------+---+
|Swaminathan| 24|
|        Sam| 54|
+-----------+---+



In [40]:
#Drops all the null values in the datframe.
df.na.drop(how="all").show()

+-----------+----+
|       Name| Age|
+-----------+----+
|Swaminathan|  24|
|      Peter|null|
|        Sam|  54|
|      Henry|null|
+-----------+----+



#### **Creating subset**

In [41]:
#Subset will gives the dataframe without the null for the column that is mentioned on this function parameter.
df.na.drop(subset="Age").show()

+-----------+---+
|       Name|Age|
+-----------+---+
|Swaminathan| 24|
|        Sam| 54|
+-----------+---+



#### **Filling null values**

In [42]:
#Filling null values in the Age column.
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)



As you see the dataframe has two columns one is of type string and another one is of type integer, if you give any string value for the replacement of null value then it will replace the null value in the column that was in string type. Likewise if you are providing any number values, it will fill the null value for the column that belongs to integer type.

In [43]:
#Let's fill the null values in the Age column
df.na.fill(20).show()

+-----------+---+
|       Name|Age|
+-----------+---+
|Swaminathan| 24|
|      Peter| 20|
|        Sam| 54|
|      Henry| 20|
+-----------+---+



In [44]:
#Let's check the same function by giving a string value.
df.na.fill("test_value").show()

+-----------+----+
|       Name| Age|
+-----------+----+
|Swaminathan|  24|
|      Peter|null|
|        Sam|  54|
|      Henry|null|
+-----------+----+



Here you see the null values in the Age column is not replaced since Age is a integer type based column.

In [45]:
#Let's fill the null values of the age column with it's mean using the spark functions
from pyspark.sql.functions import mean
mean_age=df.select(mean("Age").alias("Meanage")).collect()[0].asDict("Meanage")["Meanage"]
mean_age

39.0

In [46]:
#This will specifically fill all the null values of Age column with it's mean(average of age).
df.na.fill(value=int(mean_age),subset="Age").show()

+-----------+---+
|       Name|Age|
+-----------+---+
|Swaminathan| 24|
|      Peter| 39|
|        Sam| 54|
|      Henry| 39|
+-----------+---+

