# Spark Window functions

In [11]:
import sys
from pyspark.sql import SparkSession
import pyspark.sql.types as t
import pyspark.sql.functions as f
from pyspark.sql.window import Window
from pyspark.sql.functions import desc

In [2]:
spark = SparkSession.builder.master("local").appName("Windows") \
        .getOrCreate()

In [3]:
schema = t.StructType([t.StructField('depName',t.StringType(),False),
                       t.StructField('empNo',t.IntegerType(),False),
                       t.StructField('salary',t.IntegerType(),False),    
])

In [4]:
data = [
    ("sales",1,5000),
    ("personnel",2,3900),
    ("sales",3,4800),
    ("sales",4,4800),
    ("personnel",5,3500),
    ("develop",7,4200),
    ("develop",8,6000),
    ("develop",9,4500),
    ("develop",10,5200),
    ("develop",11,5200)
]

In [6]:
sdf = spark.createDataFrame(data,schema=schema)

  return f(*args, **kwds)
  return f(*args, **kwds)


In [7]:
sdf.show()

+---------+-----+------+
|  depName|empNo|salary|
+---------+-----+------+
|    sales|    1|  5000|
|personnel|    2|  3900|
|    sales|    3|  4800|
|    sales|    4|  4800|
|personnel|    5|  3500|
|  develop|    7|  4200|
|  develop|    8|  6000|
|  develop|    9|  4500|
|  develop|   10|  5200|
|  develop|   11|  5200|
+---------+-----+------+



## let's calculate average salary by department name:

In [21]:
sdf_avg_salary_by_dpt = sdf.groupBy("depName").agg(f.avg("salary").alias('average')).sort('average').show()

+---------+-----------------+
|  depName|          average|
+---------+-----------------+
|personnel|           3700.0|
|    sales|4866.666666666667|
|  develop|           5020.0|
+---------+-----------------+



## with windows: get average for every line:

In [22]:
byDepName = Window.partitionBy("depName")
sdf_with_avg_salary = sdf.withColumn("average",f.avg('salary').over(byDepName))

In [25]:
sdf_with_avg_salary.sort('average').show()

+---------+-----+------+-----------------+
|  depName|empNo|salary|          average|
+---------+-----+------+-----------------+
|personnel|    2|  3900|           3700.0|
|personnel|    5|  3500|           3700.0|
|    sales|    1|  5000|4866.666666666667|
|    sales|    3|  4800|4866.666666666667|
|    sales|    4|  4800|4866.666666666667|
|  develop|    7|  4200|           5020.0|
|  develop|    8|  6000|           5020.0|
|  develop|    9|  4500|           5020.0|
|  develop|   10|  5200|           5020.0|
|  develop|   11|  5200|           5020.0|
+---------+-----+------+-----------------+



## WindowSpec: rowsBetween

In [27]:
windowSpec = Window.rowsBetween(-2,1)
#frame de 4 lignes: 2 precedents, la courante et une suivante

In [28]:
sdf.withColumn("first_empNo",f.first("empNo").over(windowSpec)) \
    .withColumn("last_empNo",f.last("empNo").over(windowSpec)) \
    .withColumn("frame_size",f.count("empNo").over(windowSpec)) \
    .show()

+---------+-----+------+-----------+----------+----------+
|  depName|empNo|salary|first_empNo|last_empNo|frame_size|
+---------+-----+------+-----------+----------+----------+
|    sales|    1|  5000|          1|         2|         2|
|personnel|    2|  3900|          1|         3|         3|
|    sales|    3|  4800|          1|         4|         4|
|    sales|    4|  4800|          2|         5|         4|
|personnel|    5|  3500|          3|         7|         4|
|  develop|    7|  4200|          4|         8|         4|
|  develop|    8|  6000|          5|         9|         4|
|  develop|    9|  4500|          7|        10|         4|
|  develop|   10|  5200|          8|        11|         4|
|  develop|   11|  5200|          9|        11|         3|
+---------+-----+------+-----------+----------+----------+



## windowSpec: rangeBetween

In [29]:
windowSpec = Window.orderBy("salary").rangeBetween(-1000,500)

In [30]:
sdf.withColumn("range",f.concat(f.lit("["), f.col("salary")-1000,f.lit(","),f.col("salary")+500,f.lit("]"))) \
    .withColumn("frame_first",f.first("salary").over(windowSpec)) \
    .withColumn("frame_last",f.last("salary").over(windowSpec)) \
    .withColumn("frame_count",f.count("empNo").over(windowSpec)) \
    .show()

+---------+-----+------+-----------+-----------+----------+-----------+
|  depName|empNo|salary|      range|frame_first|frame_last|frame_count|
+---------+-----+------+-----------+-----------+----------+-----------+
|personnel|    5|  3500|[2500,4000]|       3500|      3900|          2|
|personnel|    2|  3900|[2900,4400]|       3500|      4200|          3|
|  develop|    7|  4200|[3200,4700]|       3500|      4500|          4|
|  develop|    9|  4500|[3500,5000]|       3500|      5000|          7|
|    sales|    3|  4800|[3800,5300]|       3900|      5200|          8|
|    sales|    4|  4800|[3800,5300]|       3900|      5200|          8|
|    sales|    1|  5000|[4000,5500]|       4200|      5200|          7|
|  develop|   10|  5200|[4200,5700]|       4200|      5200|          7|
|  develop|   11|  5200|[4200,5700]|       4200|      5200|          7|
|  develop|    8|  6000|[5000,6500]|       5000|      6000|          4|
+---------+-----+------+-----------+-----------+----------+-----

## top 2 salaries in each department ?

In [41]:
windowSpec = Window.partitionBy("depName").orderBy(f.col("salary").desc())

In [42]:
sdf.withColumn("rank",f.dense_rank().over(windowSpec)) \
    .where(f.col("rank") <= 2).show()

+---------+-----+------+----+
|  depName|empNo|salary|rank|
+---------+-----+------+----+
|  develop|    8|  6000|   1|
|  develop|   10|  5200|   2|
|  develop|   11|  5200|   2|
|    sales|    1|  5000|   1|
|    sales|    3|  4800|   2|
|    sales|    4|  4800|   2|
|personnel|    2|  3900|   1|
|personnel|    5|  3500|   2|
+---------+-----+------+----+



## For each employee in the same department, what is the difference in income compared to the highest paid employee ?

In [48]:
# 1. Using Windows: 
sdf.withColumn("Difference", f.max(f.col("salary")).over(windowSpec)  - f.col("salary") )\
    .show()

+---------+-----+------+----------+
|  depName|empNo|salary|Difference|
+---------+-----+------+----------+
|  develop|    8|  6000|         0|
|  develop|   10|  5200|       800|
|  develop|   11|  5200|       800|
|  develop|    9|  4500|      1500|
|  develop|    7|  4200|      1800|
|    sales|    1|  5000|         0|
|    sales|    3|  4800|       200|
|    sales|    4|  4800|       200|
|personnel|    2|  3900|         0|
|personnel|    5|  3500|       400|
+---------+-----+------+----------+



In [49]:
# 2. Using joins:
max_salary = sdf.groupBy("depName").agg(f.max(f.col("salary"))\
                                        .alias('max'))
max_salary.show()

+---------+----+
|  depName| max|
+---------+----+
|  develop|6000|
|    sales|5000|
|personnel|3900|
+---------+----+



In [55]:
sdf.join(max_salary,'depName').withColumn("difference", f.col('max') - f.col('salary')) \
    .show()

+---------+-----+------+----+----------+
|  depName|empNo|salary| max|difference|
+---------+-----+------+----+----------+
|  develop|    7|  4200|6000|      1800|
|  develop|    8|  6000|6000|         0|
|  develop|    9|  4500|6000|      1500|
|  develop|   10|  5200|6000|       800|
|  develop|   11|  5200|6000|       800|
|    sales|    1|  5000|5000|         0|
|    sales|    3|  4800|5000|       200|
|    sales|    4|  4800|5000|       200|
|personnel|    2|  3900|3900|         0|
|personnel|    5|  3500|3900|       400|
+---------+-----+------+----+----------+



## Cumulative salary Sum:

In [57]:
windowSpec = Window.partitionBy('depName').orderBy(f.col("salary").asc())\
        .rowsBetween(Window.unboundedPreceding,Window.currentRow)

In [58]:
sdf.withColumn("cumul_sum",f.sum(f.col("salary")).over(windowSpec)).show()

+---------+-----+------+---------+
|  depName|empNo|salary|cumul_sum|
+---------+-----+------+---------+
|  develop|    7|  4200|     4200|
|  develop|    9|  4500|     8700|
|  develop|   10|  5200|    13900|
|  develop|   11|  5200|    19100|
|  develop|    8|  6000|    25100|
|    sales|    3|  4800|     4800|
|    sales|    4|  4800|     9600|
|    sales|    1|  5000|    14600|
|personnel|    5|  3500|     3500|
|personnel|    2|  3900|     7400|
+---------+-----+------+---------+



## Lead & Lag:

In [59]:
overCategory = Window.partitionBy('depName').orderBy(f.col("salary").desc())

In [60]:
sdf.withColumn("lead", f.lead('salary',1).over(overCategory)) \
    .withColumn("lag", f.lag('salary',1).over(overCategory)).show()

+---------+-----+------+----+----+
|  depName|empNo|salary|lead| lag|
+---------+-----+------+----+----+
|  develop|    8|  6000|5200|null|
|  develop|   10|  5200|5200|6000|
|  develop|   11|  5200|4500|5200|
|  develop|    9|  4500|4200|5200|
|  develop|    7|  4200|null|4500|
|    sales|    1|  5000|4800|null|
|    sales|    3|  4800|4800|5000|
|    sales|    4|  4800|null|4800|
|personnel|    2|  3900|3500|null|
|personnel|    5|  3500|null|3900|
+---------+-----+------+----+----+

