In this notebook, we will analyze the physical plans of some basic window calculations and demonstrate two optimization possibilities.

In [1]:
from pyspark.sql import SparkSession, functions as F, Window

spark = SparkSession.builder.config("spark.sql.shuffle.partitions", 16).getOrCreate()



Following cell creates the demo dataset. It is the same function used in the first notebook. You can skip it.

In [2]:
import pandas as pd
import numpy as np

def create_demo_data(n_products, n_stores, start_date="2021-01-01", end_date="2022-01-01"):
    """Creates demo data, writes it as parquet partitioned by date, reads it and returns the dataframe"""
    dates = pd.date_range(start_date, end_date)
    dates = [str(date)[:10] for date in dates]

    day_index = np.arange(len(dates))
    result = []
    for product in range(n_products):
        for store in range(n_stores):
            sales = np.random.poisson(10, size=len(dates))
            partial_df = (
                pd.DataFrame(dates, columns=["date"])
                .assign(product_id=product)
                .assign(store_id=store)
                .assign(day_index=day_index)
                .assign(sales_quantity=sales)
                .assign(product_agg_level="wine")
            )
            result.append(partial_df)
    pdf = pd.concat(result)
    result = spark.createDataFrame(pdf)
    result.repartition("date").write.partitionBy("date").parquet("demo-data", mode="overwrite")
    return spark.read.parquet("demo-data")

In [3]:
df = create_demo_data(n_products=10, n_stores=10, start_date="2021-01-01", end_date="2021-01-31")
df.rdd.getNumPartitions()

16

### A simple rolling sum

Calculate moving total sales for each product/store pair for last 4 days, including current row.

In [4]:
w = Window.partitionBy("store_id", "product_id").orderBy("day_index").rangeBetween(-3, 0)

result = df.withColumn("moving_sum_sales", F.sum("sales_quantity").over(w))
result.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Window [sum(sales_quantity#22L) windowspecdefinition(store_id#20L, product_id#19L, day_index#21L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -3, currentrow$())) AS moving_sum_sales#32L], [store_id#20L, product_id#19L], [day_index#21L ASC NULLS FIRST]
   +- Sort [store_id#20L ASC NULLS FIRST, product_id#19L ASC NULLS FIRST, day_index#21L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(store_id#20L, product_id#19L, 16), ENSURE_REQUIREMENTS, [id=#41]
         +- FileScan parquet [product_id#19L,store_id#20L,day_index#21L,sales_quantity#22L,product_agg_level#23,date#24] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/workspaces/rocks/Untitled Folder/demo-data], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<product_id:bigint,store_id:bigint,day_index:bigint,sales_quantity:bigint,product_agg_level...




What is interesting with this physical plan is there is a ``Exchange hashpartitioning`` step that repartitions the data by ``product_id``/``store_id``. Spark repartitions your data according to the columns passed to ``partitionBy``. The ``rangeBetween`` or ``rowsBetween`` expression do not change the shuffle.

### Multiple uses of the same window

In [5]:
w1 = Window.partitionBy("store_id", "product_id").orderBy("day_index").rowsBetween(-1, 1)
w2 = Window.partitionBy("store_id", "product_id").orderBy("day_index").rowsBetween(-10, 0)

result = (
     df
    .withColumn("moving_sum_sales", F.sum("sales_quantity").over(w1))
    .withColumn("max_date", F.max("date").over(w1))
    .withColumn("mean_sales", F.mean("sales_quantity").over(w2))
)
result.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Window [sum(sales_quantity#22L) windowspecdefinition(store_id#20L, product_id#19L, day_index#21L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, 1)) AS moving_sum_sales#41L, max(date#24) windowspecdefinition(store_id#20L, product_id#19L, day_index#21L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, 1)) AS max_date#50, avg(sales_quantity#22L) windowspecdefinition(store_id#20L, product_id#19L, day_index#21L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -10, currentrow$())) AS mean_sales#60], [store_id#20L, product_id#19L], [day_index#21L ASC NULLS FIRST]
   +- Sort [store_id#20L ASC NULLS FIRST, product_id#19L ASC NULLS FIRST, day_index#21L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(store_id#20L, product_id#19L, 16), ENSURE_REQUIREMENTS, [id=#52]
         +- FileScan parquet [product_id#19L,store_id#20L,day_index#21L,sales_quantity#22L,product_agg_level#23,date#24] Batched: true, DataFilters: [], Form

**Takeaway:**

- Spark avoids additional shuffles if multiple windows use the same ``partitionBy``.

### Windows using different ``partitionBy``

It turns out in some cases, we can also avoid unnecessary shuffles if we re-organize our window expressions.

**Two windows, two shuffles**

1st: partitioned by ``product_id``, ``store_id``

2nd: partitioned by ``store_id``

In [6]:
w1 = Window.partitionBy("product_id", "store_id").orderBy("day_index").rangeBetween(-3, 0)
w2 = Window.partitionBy("store_id").orderBy("day_index").rangeBetween(-3, 0)

res = (
    df
    .withColumn("pair_mean_-3_0", F.mean("sales_quantity").over(w1))
    .withColumn("store_mean_-3_0", F.mean("sales_quantity").over(w2))

)
res.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Window [avg(sales_quantity#22L) windowspecdefinition(store_id#20L, day_index#21L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -3, currentrow$())) AS store_mean_-3_0#80], [store_id#20L], [day_index#21L ASC NULLS FIRST]
   +- Sort [store_id#20L ASC NULLS FIRST, day_index#21L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(store_id#20L, 16), ENSURE_REQUIREMENTS, [id=#71]
         +- Window [avg(sales_quantity#22L) windowspecdefinition(product_id#19L, store_id#20L, day_index#21L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -3, currentrow$())) AS pair_mean_-3_0#71], [product_id#19L, store_id#20L], [day_index#21L ASC NULLS FIRST]
            +- Sort [product_id#19L ASC NULLS FIRST, store_id#20L ASC NULLS FIRST, day_index#21L ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(product_id#19L, store_id#20L, 16), ENSURE_REQUIREMENTS, [id=#67]
                  +- FileScan parquet [product_id#19

See that we have now two ``Exchange hashpartitioning`` steps:

**1:** ``+- Exchange hashpartitioning(product_id#42L, store_id#43L, 16), ENSURE_REQUIREMENTS, [id=#171]``

**2:**: ``+- Exchange hashpartitioning(store_id#43L, 16), ENSURE_REQUIREMENTS, [id=#175]
``

But why? If we shuffled the data by ``store_id`` first, we wouldn't need to shuffle again since all rows for a ``store_id`` would be in the same partition. We can optimize just by reordering the ``partitionBy`` columns in the first window:


**Two windows re-organized, single shuffle**

1st: partitioned by ``store_id``, ``product_id``

2nd: partitioned by ``store_id``

In [7]:
w1 = Window.partitionBy("store_id", "product_id").orderBy("day_index").rangeBetween(-3, 0)
w2 = Window.partitionBy("store_id").orderBy("day_index").rangeBetween(-3, 0)

res = (
    df
    .withColumn("pair_mean_-3_0", F.mean("sales_quantity").over(w1))
    .withColumn("store_mean_-3_0", F.mean("sales_quantity").over(w2))

)
res.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [product_id#19L, store_id#20L, day_index#21L, sales_quantity#22L, product_agg_level#23, date#24, pair_mean_-3_0#90, store_mean_-3_0#99]
   +- Window [avg(sales_quantity#22L) windowspecdefinition(store_id#20L, product_id#19L, day_index#21L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -3, currentrow$())) AS pair_mean_-3_0#90], [store_id#20L, product_id#19L], [day_index#21L ASC NULLS FIRST]
      +- Sort [store_id#20L ASC NULLS FIRST, product_id#19L ASC NULLS FIRST, day_index#21L ASC NULLS FIRST], false, 0
         +- Window [avg(sales_quantity#22L) windowspecdefinition(store_id#20L, day_index#21L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -3, currentrow$())) AS store_mean_-3_0#99], [store_id#20L], [day_index#21L ASC NULLS FIRST]
            +- Sort [store_id#20L ASC NULLS FIRST, day_index#21L ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(store_id#20L, 16), ENSURE_REQUIREMENTS, [id=#90]

Now we have a single ``Exchange hashpartitioning`` step and we can avoid the second shuffle. Much better.

### Employing our business logic in window expressions

In a typical situation we might want to perform window calculations over different aggregation levels. We know that the granularity (from granular to coarse) is as follows:

``product_id`` -> ``product_agg_level_5`` -> ``product_agg_level_4`` etc.

But spark has no idea. By writing our window specifications explicitly we can avoid more shuffles:

**Naive approach results in two shuffles**:

1st window partitioned by: ``product_id``, ``store_id``

2nd window partitioned by: ``product_agg_level``, ``store_id``

In [8]:
from pyspark.sql import Window

w1 = Window.partitionBy("store_id", "product_id").orderBy("day_index").rangeBetween(-3, 0)
w2 = Window.partitionBy("product_agg_level", "store_id").orderBy("day_index").rangeBetween(-5, 5)


res = (
    df
    .withColumn("store_product_mean", F.mean("sales_quantity").over(w1))
    .withColumn("agg_level_store_mean", F.mean("sales_quantity").over(w2))

)
res.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Window [avg(sales_quantity#22L) windowspecdefinition(product_agg_level#23, store_id#20L, day_index#21L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -5, 5)) AS agg_level_store_mean#118], [product_agg_level#23, store_id#20L], [day_index#21L ASC NULLS FIRST]
   +- Sort [product_agg_level#23 ASC NULLS FIRST, store_id#20L ASC NULLS FIRST, day_index#21L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(product_agg_level#23, store_id#20L, 16), ENSURE_REQUIREMENTS, [id=#113]
         +- Window [avg(sales_quantity#22L) windowspecdefinition(store_id#20L, product_id#19L, day_index#21L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -3, currentrow$())) AS store_product_mean#109], [store_id#20L, product_id#19L], [day_index#21L ASC NULLS FIRST]
            +- Sort [store_id#20L ASC NULLS FIRST, product_id#19L ASC NULLS FIRST, day_index#21L ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(store_id#20L,

**Avoiding the second shuffle by being explicit:**

1st window partitioned by: ``product_agg_level``, ``product_id``, ``store_id``

2nd window partitioned by: ``product_agg_level``, ``store_id``

In [9]:
from pyspark.sql import Window

w1 = Window.partitionBy("product_agg_level", "store_id", "product_id").orderBy("day_index").rangeBetween(-3, 0)
w2 = Window.partitionBy("product_agg_level", "store_id").orderBy("day_index").rangeBetween(-5, 5)


res = (
    df
    .withColumn("store_product_mean", F.mean("sales_quantity").over(w1))
    .withColumn("agg_level_store_mean", F.mean("sales_quantity").over(w2))

)
res.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [product_id#19L, store_id#20L, day_index#21L, sales_quantity#22L, product_agg_level#23, date#24, store_product_mean#128, agg_level_store_mean#137]
   +- Window [avg(sales_quantity#22L) windowspecdefinition(product_agg_level#23, store_id#20L, product_id#19L, day_index#21L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -3, currentrow$())) AS store_product_mean#128], [product_agg_level#23, store_id#20L, product_id#19L], [day_index#21L ASC NULLS FIRST]
      +- Sort [product_agg_level#23 ASC NULLS FIRST, store_id#20L ASC NULLS FIRST, product_id#19L ASC NULLS FIRST, day_index#21L ASC NULLS FIRST], false, 0
         +- Window [avg(sales_quantity#22L) windowspecdefinition(product_agg_level#23, store_id#20L, day_index#21L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -5, 5)) AS agg_level_store_mean#137], [product_agg_level#23, store_id#20L], [day_index#21L ASC NULLS FIRST]
            +- Sort [product_agg_level#23 ASC NU

### Skewed data in window calculations

Skewed data can be a huge source of inefficiency when using window functions. There is two reasons which should be obvious by now.

**1:** Window functions cause a shuffle.

**2:** Window functions do not reduce the number of rows in the dataframe.


In ``noob``, lag feature calculation step was taking longer than we expected. This step performs window calculations over various ``partitionBy`` columns. We have discovered that the reason was just a single window, partitioned by ``product_agg_level_4``. The same step has very different run times when we remove the ``product_agg_level_4`` window:

With ``product_agg_level_4``: 6 hours

Without ``product_agg_level_4``: 38 mins


Repartitioning by ``product_agg_level`` can result in a very skewed dataframe. Some groups might contain high number of products. It is good practice to avoid such window operations if possible.

**Takeaways:**

- We can avoid unnecessary shuffles when performing window calculations

- Windowing over a skewed ``partitionBy`` can be extremely inefficient.