ADVANCED DATAFRAME OPERATION

## Windowing Functions in Spark

Windowing functions in Spark allow you to perform operations on a set of rows that are related to the current row. These functions are useful for tasks such as calculating running totals, moving averages, and ranking.

### Types of Window Functions:

1. **Ranking Functions**: These functions assign a rank to each row within a partition of a dataset.
    - `row_number()`: Assigns a unique number to each row, starting at 1.
    - `rank()`: Assigns a rank to each row, with gaps in the ranking for ties.
    - `dense_rank()`: Similar to `rank()`, but without gaps in the ranking.

2. **Analytical Functions**: These functions perform calculations across a set of rows related to the current row.
    - `cume_dist()`: Calculates the cumulative distribution of a value within a partition.
    - `percent_rank()`: Calculates the relative rank of a value within a partition.

3. **Aggregate Functions**: These functions perform aggregate calculations over a window of rows.
    - `sum()`: Calculates the sum of values.
    - `avg()`: Calculates the average of values.
    - `min()`: Finds the minimum value.
    - `max()`: Finds the maximum value.

### Example:


In [1]:
import findspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType,TimestampType, LongType, StringType
from pyspark.sql.window import Window

In [2]:
findspark.init()
spark = SparkSession.builder.appName('windowing_operation').getOrCreate()

In [3]:
spark

In [4]:
# creating data

dataemplyee = [("James","10045.0","Smith",26636.0,"M",111),
    ("Michael","Rose","",42288.0,"M",222),
    ("Robert","","Williams",47114.0,"M",333),
    ("Maria","Anne","Jones",12192.0,"F",111),
    ("Jen","Mary","Brown",80456.0,"F",222),
    ("Jas","","Smith",36634.0,"M",111),
    ("Mike","Rose","",40299.0,"M",222),
    ("Roby","","Williams",42156.0,"M",333),
    ("Ancy","","Williams",42246.0,"M",222),
    ("Mary","Anne","Jones",30002.0,"F",111),
    ("Ben","Mary","Brown",12345.0,"F",222),
    ("Black","Mary","Richard",22345.0,"F",333),
    ("Mike2","Rose","",40299.0,"M",2),
    ("Roby2","","Williams",42156.0,"M",3),
      ]
schemaemp = StructType([
    StructField("firstname",StringType(),True), 
    StructField("middlename",StringType(),True),
    StructField("lastname",StringType(),True),
    StructField("salary", FloatType(), True), 
    StructField("gender", StringType(), True), 
    StructField("dep_id",IntegerType(),True), 
                        ])

In [5]:
emp_df = spark.createDataFrame(dataemplyee, schema=schemaemp)

In [6]:
emp_df.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- salary: float (nullable = true)
 |-- gender: string (nullable = true)
 |-- dep_id: integer (nullable = true)



In [7]:
emp_df.show()

+---------+----------+--------+-------+------+------+
|firstname|middlename|lastname| salary|gender|dep_id|
+---------+----------+--------+-------+------+------+
|    James|   10045.0|   Smith|26636.0|     M|   111|
|  Michael|      Rose|        |42288.0|     M|   222|
|   Robert|          |Williams|47114.0|     M|   333|
|    Maria|      Anne|   Jones|12192.0|     F|   111|
|      Jen|      Mary|   Brown|80456.0|     F|   222|
|      Jas|          |   Smith|36634.0|     M|   111|
|     Mike|      Rose|        |40299.0|     M|   222|
|     Roby|          |Williams|42156.0|     M|   333|
|     Ancy|          |Williams|42246.0|     M|   222|
|     Mary|      Anne|   Jones|30002.0|     F|   111|
|      Ben|      Mary|   Brown|12345.0|     F|   222|
|    Black|      Mary| Richard|22345.0|     F|   333|
|    Mike2|      Rose|        |40299.0|     M|     2|
|    Roby2|          |Williams|42156.0|     M|     3|
+---------+----------+--------+-------+------+------+



1) Ranking functions

In [None]:
from pyspark.sql.functions import row_number,rank,dense_rank,avg,mean,min,max,col,sum

In [8]:
WindowPartition=Window.partitionBy('dep_id').orderBy('salary')

In [11]:
emp_df_with_new_row =  emp_df.withColumn("row_number",row_number().over(WindowPartition))
emp_df_with_new_row.show()

+---------+----------+--------+-------+------+------+----------+
|firstname|middlename|lastname| salary|gender|dep_id|row_number|
+---------+----------+--------+-------+------+------+----------+
|    Mike2|      Rose|        |40299.0|     M|     2|         1|
|    Roby2|          |Williams|42156.0|     M|     3|         1|
|    Maria|      Anne|   Jones|12192.0|     F|   111|         1|
|    James|   10045.0|   Smith|26636.0|     M|   111|         2|
|     Mary|      Anne|   Jones|30002.0|     F|   111|         3|
|      Jas|          |   Smith|36634.0|     M|   111|         4|
|      Ben|      Mary|   Brown|12345.0|     F|   222|         1|
|     Mike|      Rose|        |40299.0|     M|   222|         2|
|     Ancy|          |Williams|42246.0|     M|   222|         3|
|  Michael|      Rose|        |42288.0|     M|   222|         4|
|      Jen|      Mary|   Brown|80456.0|     F|   222|         5|
|    Black|      Mary| Richard|22345.0|     F|   333|         1|
|     Roby|          |Wil

In [15]:
emp_df_rank = emp_df.withColumn("rank", rank().over(WindowPartition))
emp_df_rank.show()

+---------+----------+--------+-------+------+------+----+
|firstname|middlename|lastname| salary|gender|dep_id|rank|
+---------+----------+--------+-------+------+------+----+
|    Mike2|      Rose|        |40299.0|     M|     2|   1|
|    Roby2|          |Williams|42156.0|     M|     3|   1|
|    Maria|      Anne|   Jones|12192.0|     F|   111|   1|
|    James|   10045.0|   Smith|26636.0|     M|   111|   2|
|     Mary|      Anne|   Jones|30002.0|     F|   111|   3|
|      Jas|          |   Smith|36634.0|     M|   111|   4|
|      Ben|      Mary|   Brown|12345.0|     F|   222|   1|
|     Mike|      Rose|        |40299.0|     M|   222|   2|
|     Ancy|          |Williams|42246.0|     M|   222|   3|
|  Michael|      Rose|        |42288.0|     M|   222|   4|
|      Jen|      Mary|   Brown|80456.0|     F|   222|   5|
|    Black|      Mary| Richard|22345.0|     F|   333|   1|
|     Roby|          |Williams|42156.0|     M|   333|   2|
|   Robert|          |Williams|47114.0|     M|   333|   

In [16]:
emp_df_dense_rank = emp_df.withColumn("dense_rank",dense_rank().over(WindowPartition))
emp_df_dense_rank.show()

+---------+----------+--------+-------+------+------+----------+
|firstname|middlename|lastname| salary|gender|dep_id|dense_rank|
+---------+----------+--------+-------+------+------+----------+
|    Mike2|      Rose|        |40299.0|     M|     2|         1|
|    Roby2|          |Williams|42156.0|     M|     3|         1|
|    Maria|      Anne|   Jones|12192.0|     F|   111|         1|
|    James|   10045.0|   Smith|26636.0|     M|   111|         2|
|     Mary|      Anne|   Jones|30002.0|     F|   111|         3|
|      Jas|          |   Smith|36634.0|     M|   111|         4|
|      Ben|      Mary|   Brown|12345.0|     F|   222|         1|
|     Mike|      Rose|        |40299.0|     M|   222|         2|
|     Ancy|          |Williams|42246.0|     M|   222|         3|
|  Michael|      Rose|        |42288.0|     M|   222|         4|
|      Jen|      Mary|   Brown|80456.0|     F|   222|         5|
|    Black|      Mary| Richard|22345.0|     F|   333|         1|
|     Roby|          |Wil

2. Aggregation Function

In [35]:
AggregationWindow=Window.partitionBy('dep_id')

In [41]:
from pyspark.sql.functions import avg, min, max, sum, col

emp_df_aggregation = emp_df.withColumn("average", avg(col("salary")).over(AggregationWindow)).withColumn("min", min(col("salary")).over(AggregationWindow)).withColumn("max", max(col("salary")).over(AggregationWindow)).withColumn("sum", sum(col("salary")).over(AggregationWindow))
emp_df_aggregation.show()

+---------+----------+--------+-------+------+------+-------+-------+-------+--------+
|firstname|middlename|lastname| salary|gender|dep_id|average|    min|    max|     sum|
+---------+----------+--------+-------+------+------+-------+-------+-------+--------+
|    Mike2|      Rose|        |40299.0|     M|     2|40299.0|40299.0|40299.0| 40299.0|
|    Roby2|          |Williams|42156.0|     M|     3|42156.0|42156.0|42156.0| 42156.0|
|    James|   10045.0|   Smith|26636.0|     M|   111|26366.0|12192.0|36634.0|105464.0|
|    Maria|      Anne|   Jones|12192.0|     F|   111|26366.0|12192.0|36634.0|105464.0|
|      Jas|          |   Smith|36634.0|     M|   111|26366.0|12192.0|36634.0|105464.0|
|     Mary|      Anne|   Jones|30002.0|     F|   111|26366.0|12192.0|36634.0|105464.0|
|  Michael|      Rose|        |42288.0|     M|   222|43526.8|12345.0|80456.0|217634.0|
|      Jen|      Mary|   Brown|80456.0|     F|   222|43526.8|12345.0|80456.0|217634.0|
|     Mike|      Rose|        |40299.0|    

3. Analytics Function

In [42]:
from pyspark.sql.functions import lag,lead

In [44]:

 
emp_df_lag=emp_df.withColumn("Lag", lag("salary", 1).over(WindowPartition)).withColumn("lead", lead("salary", 1).over(WindowPartition))
emp_df_lag.show()

+---------+----------+--------+-------+------+------+-------+-------+
|firstname|middlename|lastname| salary|gender|dep_id|    Lag|   lead|
+---------+----------+--------+-------+------+------+-------+-------+
|    Mike2|      Rose|        |40299.0|     M|     2|   NULL|   NULL|
|    Roby2|          |Williams|42156.0|     M|     3|   NULL|   NULL|
|    Maria|      Anne|   Jones|12192.0|     F|   111|   NULL|26636.0|
|    James|   10045.0|   Smith|26636.0|     M|   111|12192.0|30002.0|
|     Mary|      Anne|   Jones|30002.0|     F|   111|26636.0|36634.0|
|      Jas|          |   Smith|36634.0|     M|   111|30002.0|   NULL|
|      Ben|      Mary|   Brown|12345.0|     F|   222|   NULL|40299.0|
|     Mike|      Rose|        |40299.0|     M|   222|12345.0|42246.0|
|     Ancy|          |Williams|42246.0|     M|   222|40299.0|42288.0|
|  Michael|      Rose|        |42288.0|     M|   222|42246.0|80456.0|
|      Jen|      Mary|   Brown|80456.0|     F|   222|42288.0|   NULL|
|    Black|      Mar

### CUME_DIST() v/s percent_rank()