In [0]:
# http://www.sefidian.com/2022/02/18/pyspark-window-functions/

In [0]:
import pyspark.sql.functions as F
from pyspark.sql import Window

In [0]:
simpleData = ((1,"James", "Sales", 3000), \
    (2,"Michael", "Sales", 4600),  \
    (3,"Robert", "Sales", 4100),   \
    (4,"Maria", "Finance", 3000),  \
    (5,"James", "Sales", 3000),    \
    (6,"Scott", "Finance", 3300),  \
    (7,"Jen", "Finance", 3900),    \
    (8,"Jeff", "Marketing", 3000), \
    (9,"Kumar", "Marketing", 2000),\
    (10,"Saif", "Sales", 4100) \
  )

data = spark.createDataFrame(simpleData, schema=["id", "employee_name", "dept", "salary"])
data.show()

+---+-------------+---------+------+
| id|employee_name|     dept|salary|
+---+-------------+---------+------+
|  1|        James|    Sales|  3000|
|  2|      Michael|    Sales|  4600|
|  3|       Robert|    Sales|  4100|
|  4|        Maria|  Finance|  3000|
|  5|        James|    Sales|  3000|
|  6|        Scott|  Finance|  3300|
|  7|          Jen|  Finance|  3900|
|  8|         Jeff|Marketing|  3000|
|  9|        Kumar|Marketing|  2000|
| 10|         Saif|    Sales|  4100|
+---+-------------+---------+------+



#### GroupBy Aggregations
we need to compute the aggregation, not for the entire DataFrame, but separately for each group of rows where the group is defined as rows that have the same value in a specific column.

In [0]:
df = data.groupBy("dept").agg(
    F.collect_list("salary").alias("list_salary"),
    F.avg("salary").alias("average_salary"),
    F.sum("salary").alias("total_salary"),
)

df.show()

+---------+--------------------+--------------+------------+
|     dept|         list_salary|average_salary|total_salary|
+---------+--------------------+--------------+------------+
|    Sales|[3000, 4600, 4100...|        3760.0|       18800|
|  Finance|  [3000, 3300, 3900]|        3400.0|       10200|
|Marketing|        [3000, 2000]|        2500.0|        5000|
+---------+--------------------+--------------+------------+



#### Window Functions

- The window functions are a group of functions that can be called also over a group of rows similarly as we have seen in the previous case. 
- There is a couple of differences, however. Firstly, after calling a window function, the dataset will not be reduced — all rows and all columns will be in the output DataFrame and the calculation will be added in a new column. 
- The group of rows on which the function will be applied is again given by a specific column (or a list of columns) for which the rows have the same value and the group is referred to as a window. 
- Also, the window functions are more flexible in the sense that sometimes you don’t want to apply the function on the entire window, but rather only on a subset of rows from the window — a so-called frame.

In [0]:
windowSpec = Window.partitionBy("dept")

df = data.withColumn("list_salary", F.collect_list(F.col("salary")).over(windowSpec))\
         .withColumn("average_salary", F.avg(F.col("salary")).over(windowSpec))\
         .withColumn("total_salary", F.sum(F.col("salary")).over(windowSpec))

df.show()

+---+-------------+---------+------+--------------------+--------------+------------+
| id|employee_name|     dept|salary|         list_salary|average_salary|total_salary|
+---+-------------+---------+------+--------------------+--------------+------------+
|  4|        Maria|  Finance|  3000|  [3000, 3300, 3900]|        3400.0|       10200|
|  6|        Scott|  Finance|  3300|  [3000, 3300, 3900]|        3400.0|       10200|
|  7|          Jen|  Finance|  3900|  [3000, 3300, 3900]|        3400.0|       10200|
|  8|         Jeff|Marketing|  3000|        [3000, 2000]|        2500.0|        5000|
|  9|        Kumar|Marketing|  2000|        [3000, 2000]|        2500.0|        5000|
|  1|        James|    Sales|  3000|[3000, 4600, 4100...|        3760.0|       18800|
|  2|      Michael|    Sales|  4600|[3000, 4600, 4100...|        3760.0|       18800|
|  3|       Robert|    Sales|  4100|[3000, 4600, 4100...|        3760.0|       18800|
|  5|        James|    Sales|  3000|[3000, 4600, 4100.

In [0]:
# window function can be ordered by some column
windowSpec = Window.partitionBy("dept").orderBy(F.asc("salary"))

df = data.withColumn("list_salary", F.collect_list(F.col("salary")).over(windowSpec))\
         .withColumn("average_salary", F.avg(F.col("salary")).over(windowSpec))\
         .withColumn("total_salary", F.sum(F.col("salary")).over(windowSpec))

df.show()

+---+-------------+---------+------+--------------------+--------------+------------+
| id|employee_name|     dept|salary|         list_salary|average_salary|total_salary|
+---+-------------+---------+------+--------------------+--------------+------------+
|  4|        Maria|  Finance|  3000|              [3000]|        3000.0|        3000|
|  6|        Scott|  Finance|  3300|        [3000, 3300]|        3150.0|        6300|
|  7|          Jen|  Finance|  3900|  [3000, 3300, 3900]|        3400.0|       10200|
|  9|        Kumar|Marketing|  2000|              [2000]|        2000.0|        2000|
|  8|         Jeff|Marketing|  3000|        [2000, 3000]|        2500.0|        5000|
|  1|        James|    Sales|  3000|        [3000, 3000]|        3000.0|        6000|
|  5|        James|    Sales|  3000|        [3000, 3000]|        3000.0|        6000|
|  3|       Robert|    Sales|  4100|[3000, 3000, 4100...|        3550.0|       14200|
| 10|         Saif|    Sales|  4100|[3000, 3000, 4100.

#### Duplicates issue

In [0]:
df.collect()[-3]["list_salary"]

Out[29]: [3000, 3000, 4100, 4100]

In [0]:
df.collect()[-2]["list_salary"]

Out[30]: [3000, 3000, 4100, 4100]

In [0]:
# window function can be ordered by some column
windowSpec = (
    Window.partitionBy("dept")
    .orderBy(F.asc("salary"))
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
)

df = data.withColumn("list_salary", F.collect_list(F.col("salary")).over(windowSpec))\
         .withColumn("average_salary", F.avg(F.col("salary")).over(windowSpec))\
         .withColumn("total_salary", F.sum(F.col("salary")).over(windowSpec))

df.show()

+---+-------------+---------+------+--------------------+------------------+------------+
| id|employee_name|     dept|salary|         list_salary|    average_salary|total_salary|
+---+-------------+---------+------+--------------------+------------------+------------+
|  4|        Maria|  Finance|  3000|              [3000]|            3000.0|        3000|
|  6|        Scott|  Finance|  3300|        [3000, 3300]|            3150.0|        6300|
|  7|          Jen|  Finance|  3900|  [3000, 3300, 3900]|            3400.0|       10200|
|  9|        Kumar|Marketing|  2000|              [2000]|            2000.0|        2000|
|  8|         Jeff|Marketing|  3000|        [2000, 3000]|            2500.0|        5000|
|  1|        James|    Sales|  3000|              [3000]|            3000.0|        3000|
|  5|        James|    Sales|  3000|        [3000, 3000]|            3000.0|        6000|
|  3|       Robert|    Sales|  4100|  [3000, 3000, 4100]|3366.6666666666665|       10100|
| 10|     

In [0]:
df.collect()[-3]["list_salary"]

Out[32]: [3000, 3000, 4100]

In [0]:
df.collect()[-2]["list_salary"]

Out[33]: [3000, 3000, 4100, 4100]

In [0]:
# window function can be ordered by some column
windowSpec = (
    Window.partitionBy("dept")
    .orderBy(F.asc("salary"))
    .rangeBetween(0, 1000)
)

df = data.withColumn("list_salary", F.collect_list(F.col("salary")).over(windowSpec))\
         .withColumn("average_salary", F.avg(F.col("salary")).over(windowSpec))\
         .withColumn("total_salary", F.sum(F.col("salary")).over(windowSpec))

df.show()

+---+-------------+---------+------+------------------+-----------------+------------+
| id|employee_name|     dept|salary|       list_salary|   average_salary|total_salary|
+---+-------------+---------+------+------------------+-----------------+------------+
|  4|        Maria|  Finance|  3000|[3000, 3300, 3900]|           3400.0|       10200|
|  6|        Scott|  Finance|  3300|      [3300, 3900]|           3600.0|        7200|
|  7|          Jen|  Finance|  3900|            [3900]|           3900.0|        3900|
|  9|        Kumar|Marketing|  2000|      [2000, 3000]|           2500.0|        5000|
|  8|         Jeff|Marketing|  3000|            [3000]|           3000.0|        3000|
|  1|        James|    Sales|  3000|      [3000, 3000]|           3000.0|        6000|
|  5|        James|    Sales|  3000|      [3000, 3000]|           3000.0|        6000|
|  3|       Robert|    Sales|  4100|[4100, 4100, 4600]|4266.666666666667|       12800|
| 10|         Saif|    Sales|  4100|[4100, 

#### Ranking Functions

In [0]:
# window function can be ordered by some column
windowSpec = Window.partitionBy("dept").orderBy(F.asc("salary"))

df = (data.withColumn("average_salary", F.avg(F.col("salary")).over(windowSpec))
      .withColumn("total_salary", F.sum(F.col("salary")).over(windowSpec))
      .withColumn("row_num", F.row_number().over(windowSpec))
      .withColumn("rank", F.rank().over(windowSpec))
      .withColumn("dense_rank", F.dense_rank().over(windowSpec))
      .withColumn("perc_rank", F.percent_rank().over(windowSpec))
)

df.show()

+---+-------------+---------+------+--------------+------------+-------+----+----------+---------+
| id|employee_name|     dept|salary|average_salary|total_salary|row_num|rank|dense_rank|perc_rank|
+---+-------------+---------+------+--------------+------------+-------+----+----------+---------+
|  4|        Maria|  Finance|  3000|        3000.0|        3000|      1|   1|         1|      0.0|
|  6|        Scott|  Finance|  3300|        3150.0|        6300|      2|   2|         2|      0.5|
|  7|          Jen|  Finance|  3900|        3400.0|       10200|      3|   3|         3|      1.0|
|  9|        Kumar|Marketing|  2000|        2000.0|        2000|      1|   1|         1|      0.0|
|  8|         Jeff|Marketing|  3000|        2500.0|        5000|      2|   2|         2|      1.0|
|  1|        James|    Sales|  3000|        3000.0|        6000|      1|   1|         1|      0.0|
|  5|        James|    Sales|  3000|        3000.0|        6000|      2|   1|         1|      0.0|
|  3|     

#### Analytic functions

In [0]:
# window function can be ordered by some column
windowSpec = Window.partitionBy("dept").orderBy(F.asc("salary"))

df = (data.withColumn("next_salary", F.lead("salary", 1).over(windowSpec))
      .withColumn("prev_salary", F.lag("salary", 1).over(windowSpec))
)

df.show()

+---+-------------+---------+------+-----------+-----------+
| id|employee_name|     dept|salary|next_salary|prev_salary|
+---+-------------+---------+------+-----------+-----------+
|  4|        Maria|  Finance|  3000|       3300|       null|
|  6|        Scott|  Finance|  3300|       3900|       3000|
|  7|          Jen|  Finance|  3900|       null|       3300|
|  9|        Kumar|Marketing|  2000|       3000|       null|
|  8|         Jeff|Marketing|  3000|       null|       2000|
|  1|        James|    Sales|  3000|       3000|       null|
|  5|        James|    Sales|  3000|       4100|       3000|
|  3|       Robert|    Sales|  4100|       4100|       3000|
| 10|         Saif|    Sales|  4100|       4600|       4100|
|  2|      Michael|    Sales|  4600|       null|       4100|
+---+-------------+---------+------+-----------+-----------+



#### Aggregate Functions

In [0]:
windowSpec = Window.partitionBy("dept")

df = data.withColumn("avg", F.avg(F.col("salary")).over(windowSpecAgg)) \
          .withColumn("sum", F.sum(F.col("salary")).over(windowSpecAgg)) \
          .withColumn("min", F.min(F.col("salary")).over(windowSpecAgg)) \
          .withColumn("max", F.max(F.col("salary")).over(windowSpecAgg))
df.show()

+---+-------------+---------+------+------+-----+----+----+
| id|employee_name|     dept|salary|   avg|  sum| min| max|
+---+-------------+---------+------+------+-----+----+----+
|  4|        Maria|  Finance|  3000|3400.0|10200|3000|3900|
|  6|        Scott|  Finance|  3300|3400.0|10200|3000|3900|
|  7|          Jen|  Finance|  3900|3400.0|10200|3000|3900|
|  8|         Jeff|Marketing|  3000|2500.0| 5000|2000|3000|
|  9|        Kumar|Marketing|  2000|2500.0| 5000|2000|3000|
|  1|        James|    Sales|  3000|3760.0|18800|3000|4600|
|  2|      Michael|    Sales|  4600|3760.0|18800|3000|4600|
|  3|       Robert|    Sales|  4100|3760.0|18800|3000|4600|
|  5|        James|    Sales|  3000|3760.0|18800|3000|4600|
| 10|         Saif|    Sales|  4100|3760.0|18800|3000|4600|
+---+-------------+---------+------+------+-----+----+----+



#### Using multiple windows

In [0]:
# we can make a window function equivalent to a standard groupBy:
# first define two windows
aggregation_window = Window.partitionBy('dept')
grouping_window = Window.partitionBy('dept').orderBy('id')
 
# then we can use this window function for our aggregations
df_aggregations = (data.select('dept','id', 'salary')
   .withColumn(
  # note that we calculate row number over the grouping_window
  'group_rank', F.row_number().over(grouping_window))
   .withColumn(
  # but we calculate other columns over the aggregation_window
  'salary_sum', F.sum('salary').over(aggregation_window))
   .withColumn('salary_avg', F.avg('salary').over(aggregation_window))
   .withColumn('salary_min', F.min('salary').over(aggregation_window))
   .withColumn('salary_max', F.max('salary').over(aggregation_window))
   .where(F.col('group_rank') == 1)
   .select(
  'dept',
  'salary_sum',
  'salary_avg',
  'salary_min',
  'salary_max'
))
 
df_aggregations.show()
 


+---------+----------+----------+----------+----------+
|     dept|salary_sum|salary_avg|salary_min|salary_max|
+---------+----------+----------+----------+----------+
|  Finance|     10200|    3400.0|      3000|      3900|
|Marketing|      5000|    2500.0|      2000|      3000|
|    Sales|     18800|    3760.0|      3000|      4600|
+---------+----------+----------+----------+----------+



In [0]:
# this is equivalent to the rather simpler expression below
df_groupby = df.select(
  'partition', 'aggregation'
).groupBy(
  'partition'
).agg(
  fn.sum('aggregation').alias('aggregation_sum'),
  fn.avg('aggregation').alias('aggregation_avg'),
  fn.min('aggregation').alias('aggregation_min'),
  fn.max('aggregation').alias('aggregation_max'),
)
 
df_groupby.show()

#### Rough work

In [0]:
import pandas as pd
FlightDate=[20,40,51,50,60,15,17,37,36,50]
IssuingDate=[10,15,44,45,55,10,2,30,32,24]
Revenue = [100,50,40,70,60,40,30,100,200,100]
Customer = ['a','a','a','a','a','b','b','b','b','b']
df = spark.createDataFrame(pd.DataFrame([Customer,FlightDate,IssuingDate, Revenue]).T, schema=["Customer",'FlightDate', 'IssuingDate','Revenue'])
df.show()

+--------+----------+-----------+-------+
|Customer|FlightDate|IssuingDate|Revenue|
+--------+----------+-----------+-------+
|       a|        20|         10|    100|
|       a|        40|         15|     50|
|       a|        51|         44|     40|
|       a|        50|         45|     70|
|       a|        60|         55|     60|
|       b|        15|         10|     40|
|       b|        17|          2|     30|
|       b|        37|         30|    100|
|       b|        36|         32|    200|
|       b|        50|         24|    100|
+--------+----------+-----------+-------+



In [0]:
df.alias("df").join(
    df.alias("df_2"),
    on=F.expr(
        "df.Customer = df_2.Customer "
        "and df_2.issuingdate between df.flightdate and df.flightdate+10"
    ), 
    how='left'
).groupBy(
    *('df.{}'.format(c) 
      for c 
      in df.columns)
).agg(
    F.sum(F.coalesce(
        "df_2.revenue", 
        F.lit(0))
    ).alias("result")
).show()

+--------+----------+-----------+-------+------+
|Customer|FlightDate|IssuingDate|Revenue|result|
+--------+----------+-----------+-------+------+
|       a|        20|         10|    100|     0|
|       a|        40|         15|     50|   110|
|       a|        51|         44|     40|    60|
|       a|        50|         45|     70|    60|
|       a|        60|         55|     60|     0|
|       b|        15|         10|     40|   100|
|       b|        17|          2|     30|   100|
|       b|        37|         30|    100|     0|
|       b|        36|         32|    200|     0|
|       b|        50|         24|    100|     0|
+--------+----------+-----------+-------+------+



In [0]:
df.alias("df").join(
    df.alias("df_2"),
    on=F.expr(
        "df.Customer = df_2.Customer "
        "and df_2.issuingdate between df.flightdate and df.flightdate+10"
    ), 
    how='left'
).groupBy(["df.Customer","df.FlightDate","df.IssuingDate"]).agg(
    F.sum(F.coalesce(
        "df_2.revenue", 
        F.lit(0))
    ).alias("result")).show()

+--------+----------+-----------+------+
|Customer|FlightDate|IssuingDate|result|
+--------+----------+-----------+------+
|       a|        20|         10|     0|
|       a|        40|         15|   110|
|       a|        51|         44|    60|
|       a|        60|         55|     0|
|       a|        50|         45|    60|
|       b|        15|         10|   100|
|       b|        17|          2|   100|
|       b|        37|         30|     0|
|       b|        50|         24|     0|
|       b|        36|         32|     0|
+--------+----------+-----------+------+

