# Introduction to `group_by_dynamic`
By the end of this session you will be able to:

- do temporal aggregations using `group_by_dynamic`
- understand the role of sorting in `group_by_dynamic`
- use `group_by_dynamic` in lazy mode


In [None]:
from datetime import datetime

import polars as pl

We use the time series of NYC taxi pickups

In [None]:
csv_file = "../data/nyc_trip_data_1k.csv"

In [None]:
df = pl.read_csv(csv_file,try_parse_dates=True)
df.head()

## Temporal aggregation with datetime components and `group_by`
The simplest way to do a temporal aggregation on a time series is to:
- create the datetime components of interest
- do a `group_by` on these components
- `sort` the output back into time series order

In this example we get the average trip distance by date (using `dt.date` to cast the `pl.Datetime` to `pl.Date`)

In [None]:
(
    df
    .group_by(
        pl.col("pickup").dt.date().alias("date")
    )
    .agg(
        pl.col("trip_distance").mean().round(1),
    )
    .sort("date")
)

This approach with `group_by` works well if we can easily transform our `pl.Datetime/pl.Date` column to the time window we want to use. In this example we can easily use `dt.date` to transform the `pl.Datetimes` to the daily time window. However, this approach becomes more challenging when we want more complicated time windows. For example, during my PhD at Oxford as an [oceanography scientist](https://scholar.google.co.uk/citations?user=43HnNKAAAAAJ&hl=en) I sometimes had to calculate averages over a tidal period of 12 hours 25.2 minutes!

> At present we cannot use the `group_by_dynamic` approach we see below for streaming queries so it is good to remember this approach using `group_by` as a backup for these cases

## Temporal groupby with `group_by_dynamic`
With `group_by_dynamic` we can do grouping based on time windows. With this approach:
- Polars takes the input parameters to create time window boundaries
- Polars then finds all the rows that correspond to each window

### Sorted data for `group_by_dynamic`
One area that can be confusing with `group_by_dynamic` can be the requirements around sortedness.

**For `group_by_dynamic` the date/datetime column must be sorted in ascending order**. Sortedness is required because in the second step of finding all the rows that correspond to each window Polars uses a fast-track algorithm that requires sorted data.

Note that the date/datetime column we are using for the windows is called the *index* column in `group_by_dynamic`.

To ensure that index column is sorted we need to either:
- sort the index column with `sort` (slower but less risky)
- use `set_sorted` on the index column if we know the column is already sorted

To check sortedness we can use `is_sorted`

In [None]:
df["pickup"].is_sorted()

The `is_sorted` method scans the full column to check sortedness

As the output is `True` we can use `set_sorted`.

> In my own time series pipelines I normally do an explicit `sort` before doing `group_by_dynamic` to avoid hard-to-detect errors creeping in if the data is unexpectedly unsorted.

### Syntax
In its simplest form the arguments to `group_by_dynamic` are:
- the datetime `index` column to create windows on and 
- the temporal length of the windows with the `every` argument as an interval string

Here we create windows of one day's length

In [None]:
(
    df
    .with_columns(
        pl.col("pickup").set_sorted()
    )
    .group_by_dynamic(
        "pickup", 
        every="1d"
    )
    .agg(
        pl.col("trip_distance").mean().round(1)
    )
    .head(5)
)

We look at how the windows are specified in more detail in the next lecture

What happens if we don't do `set_sorted` (or `sort`)? Uncomment this code to see

In [None]:
# (
#     df
#     .group_by_dynamic(
#         "pickup", 
#         every="1d"
#     )
#     .agg(
#         pl.col("trip_distance").mean().round(1)
#     )
#     .head(5)
# )

Polars returns an exception with some advice about how we can tell it that the `index` column is sorted.

### More sorting
There is a `check_sorted` argument to `group_by_dynamic` that is `True` by default. When this argument is `True` Polars checks if the `index` column is sorted. However, this `check_sorted` operation **only checks the `flags` attribute of the `index` column**, it does not scan the data in the column to check if it is sorted. 

So when `check_sorted=True` (the default) Polars:
- checks the flag attribute of the `index` column
    - if the `index` column has `'SORTED_ASC': True` then Polars does `group_by_dynamic`
    - if the `index` column has `'SORTED_ASC': False` then Polars raises an `Exception` (as above)
 
When `check_sorted=False` Polars:
- does `group_by_dynamic` and potentially produces incorrect results if the `index` column is not actually sorted

## `DynamicGroupBy` object

When we do `group_by_dynamic` we create a `DynamicGroupBy` object.

In [None]:
(
    df
    .with_columns(
        pl.col("pickup").set_sorted()
    )
    .group_by_dynamic(
        "pickup", 
        every="1d"
    )
)

To do aggregations on a `DynamicGroupBy` we call `agg`. We cannot call aggregation methods like `count` or `sum` on a `DynamicGroupBy` directly.

## Dynamic groupby on groups
We may want to divide the `DataFrame` into groups before doing `group_by_dynamic` on each group. For example in the taxi data we may want to get the daily average tip by driver.

We can do this grouping with the `group_by` argument in `group_by_dynamic`.

To illustrate this we groupby each `VendorID` and then take hourly averages of the `trip_distance`

In [None]:
(
    df
    .sort("VendorID","pickup")
    .group_by_dynamic("pickup",every="3h",group_by="VendorID")
    .agg(
        pl.col("tip_amount").mean().round(1)
    )
    .head()
)

Notice the order of the columns - Polars first groups by `VendorID` and then does `group_by_dynamic` on each of those groups.

We can also use expressions when grouping by another column - see the exercises.

## Dynamic groupby in lazy mode
When we do `group_by_dynamic` the Polars query optimiser sees that only a subset of columns are required and only reads these columns from the CSV (`PROJECT 3/7 COLUMNS` below)

In [None]:
print(
    pl.scan_csv(csv_file,try_parse_dates=True)
    .with_columns(
        pl.col("pickup").set_sorted()
    )
    .group_by_dynamic("pickup",every="3h",group_by="passenger_count")
    .agg(
        pl.col("trip_distance").mean().round(1)
    )
    .explain()
)

## Exercises
In the exercises you will develop your understanding of:
- doing `group_by_dynamic` on a single column
- doing `group_by_dynamic` on groups
- the relative performance of `group_by_dynamic` and `groupby`

### Exercise 1
Groupby the `pickup` column on a 6-hourly basis.

Get the count, mean and max of the trip distance for each window.

Sort the output by the mean trip distance with the largest values first

Filter out all windows with less than 5 records

### Exercise 2

Get the same statistics but also group by the Vendor ID

Get the same statistics (`count`,`max` and `mean`) but group by both:
- the Vendor ID and 
- the `trip_distance` where the `trip_distance` is cast to a 16-bit integer before grouping

## Solutions

### Solution to exercise 1
Groupby the `pickup` column on a 6-hourly basis.

Get the count, mean and max of the trip distance for each window.

Sort the output by the mean trip distance with the largest values first

In [None]:
(
    pl.read_csv(csv_file,try_parse_dates=True)
    .with_columns(
        pl.col("pickup").set_sorted()
    )
    .group_by_dynamic("pickup",every="6h")
    .agg(
        pl.col("trip_distance").count().alias("count"),
        pl.col("trip_distance").mean().alias("mean"),
        pl.col("trip_distance").max().alias("max"),
    )
    .sort("mean",descending=True)
)

Filter out all windows with less than 5 records

In [None]:
(
    pl.read_csv(csv_file,try_parse_dates=True)
    .with_columns(
        pl.col("pickup").set_sorted()
    )
    .group_by_dynamic("pickup",every="6h")
    .agg(
        pl.col("trip_distance").count().alias("count"),
        pl.col("trip_distance").mean().alias("mean"),
        pl.col("trip_distance").max().alias("max"),        
    )
    .filter(pl.col("count") >= 5)
    .sort("mean",descending=True)
    .head()
)

### Solution to exercise 2

Get the same statistics but also group by the Vendor ID

In [None]:
(
    pl.read_csv(csv_file,try_parse_dates=True)
    .with_columns(
        pl.col("pickup").set_sorted()
    )
    .group_by_dynamic("pickup",every="6h",group_by="VendorID")
    .agg(
        pl.col("trip_distance").count().alias("count"),
        pl.col("trip_distance").mean().alias("mean"),
        pl.col("trip_distance").max().alias("max"),
    )
    .filter(pl.col("count") >= 5)
    .sort("mean",descending=True)
    .head(3)
)

Get the same statistics (`count`,`max` and `mean`) but group by both:
- the Vendor ID and 
- the `trip_distance` where the `trip_distance` is cast to a 16-bit integer before grouping

In [None]:
(
    pl.read_csv(csv_file,try_parse_dates=True)
    .with_columns(
        pl.col("pickup").set_sorted()
    )
    .group_by_dynamic(
        "pickup",
        every="6h",
        group_by=[
            "VendorID",
            pl.col("trip_distance").cast(pl.Int16())
        ]
    )
    .agg(
        pl.col("passenger_count").count().alias("count"),
        pl.col("passenger_count").mean().alias("mean"),
        pl.col("passenger_count").max().alias("max"),
    )
    .sort("mean",descending=True)
    .head()
)