# Dask Tips and Tricks: Rolling Averages in Dask

How can a user take advantage of Dask parallelism while calculating rolling averages?

> "I need to calculate a rolling average of a numerical column, in time series data. In pandas, I can do this with rolling(x).mean() with sorted values, but what do I do in Dask, with distributed data?"


Great question! Time series data often poses unique challenges with distributed data and parallelization, but Dask can do it. Here's what we need to do.

* Sort by index within AND across partitions
* Know when to compute (convert to Pandas DF) or persist (process computations on cluster)
* Fill in gaps in the date index, if any. This example uses data that has some days missing - if you need to ensure every day is represented, but have gaps in your dataset, you'll need to take steps to fill in the index from a time series (shown below).
* Run calculations, with attention to our need to cross partitions correctly.


This example will walk you through these specific points, and demonstrate how it's done. We'll use New York City taxi trip data, and get the 30-day rolling average of base fare prices, for our example. Also, in order to really show how this can improve your life, we have chosen data too large to be held in memory at one time in pandas.

## Set Up A Cluster

This is going to employ a three worker CPU machine cluster, so we can handle some large data.

In [1]:
from dask_saturn import SaturnCluster
from dask.distributed import Client

cluster = SaturnCluster(
    scheduler_size='medium',
    worker_size='xlarge',
    n_workers=3,
    nthreads=4,
)
client = Client(cluster)

[2020-11-19 19:08:52] INFO - dask-saturn | Cluster is ready


In [2]:
client.restart()

0,1
Client  Scheduler: tcp://d-steph-ml-workshop-443217fa1282437da4d84713212eacde.main-namespace:8786  Dashboard: https://d-steph-ml-workshop-443217fa1282437da4d84713212eacde.internal.saturnenterprise.io,Cluster  Workers: 3  Cores: 12  Memory: 94.50 GB


In [3]:
import s3fs
import pandas as pd
import numpy as np
import dask
import dask.dataframe as dd
from dask.distributed import wait
import datetime
import warnings
warnings.simplefilter("ignore")

***

## Load Large Dataset 

NYC taxi data is a good use case, because it is too large to hold in pandas memory. That really shows us what Dask can do!

In [4]:
s3 = s3fs.S3FileSystem(anon=True)
files_2019 = 's3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv'
s3.glob(files_2019)

['nyc-tlc/trip data/yellow_tripdata_2019-01.csv',
 'nyc-tlc/trip data/yellow_tripdata_2019-02.csv',
 'nyc-tlc/trip data/yellow_tripdata_2019-03.csv',
 'nyc-tlc/trip data/yellow_tripdata_2019-04.csv',
 'nyc-tlc/trip data/yellow_tripdata_2019-05.csv',
 'nyc-tlc/trip data/yellow_tripdata_2019-06.csv',
 'nyc-tlc/trip data/yellow_tripdata_2019-07.csv',
 'nyc-tlc/trip data/yellow_tripdata_2019-08.csv',
 'nyc-tlc/trip data/yellow_tripdata_2019-09.csv',
 'nyc-tlc/trip data/yellow_tripdata_2019-10.csv',
 'nyc-tlc/trip data/yellow_tripdata_2019-11.csv',
 'nyc-tlc/trip data/yellow_tripdata_2019-12.csv']

In [5]:
%%time

taxi = dd.read_csv(
    files_2019,
    parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
    storage_options={'anon': True},
    assume_missing=True,
)

CPU times: user 33.4 ms, sys: 21.4 ms, total: 54.8 ms
Wall time: 514 ms


Great, we have our Dask Dataframe ready to go.

### Dataset Size

That's a lot of rows!

In [6]:
taxi.shape[0].compute()

84399019

***

## Shape Data

Create a general date field, and ensure that all data across and within partitions is sorted by it. This index management and mapping is the most time-intensive part of this job.

Format the date field, so it can be an integer index for our use.

In [7]:
%%time
taxi['date'] = taxi.tpep_pickup_datetime.dt.date
taxi['datenum'] = taxi.date.astype('datetime64[ns]').astype(int) 

CPU times: user 13.8 ms, sys: 239 µs, total: 14 ms
Wall time: 12.9 ms


Set the integer date column as our index. Note I am using `map_partitions` here which is how we ensure that the sorting by the index is carried out both **within** and **across** partitions. This is important! Otherwise, your partitions might not be aligned date-wise and then your rolling averages will be inaccurate.

In [8]:
%%time
taxi = taxi.set_index('datenum').map_partitions(lambda taxi: taxi.sort_index()).persist()
_ = wait(taxi)

CPU times: user 1.1 s, sys: 40.1 ms, total: 1.14 s
Wall time: 3min 51s


***
### A Note On Persist and Compute

Lots of new users of Dask find the `.persist()` and `.compute()` processes confusing. This is understandable! But the answer is not as hard as you might think.

First, remember we have several machines working for us right now. We have our Jupyter instance right here running on one, and then our cluster of three worker machines also.

If we use `.compute()`, we are asking Dask to take all the computations and adjustments to the data that we have queued up, and run them, and bring it all to the surface here, in Jupyter. That means if it *was* distributed we want to convert it into a local object here and now. If it's a Dask Dataframe, when we call `.compute()`, we're saying "Run the transformations we've queued, and convert this into a pandas dataframe immediately.". If our data is too big to be held in local pandas memory, this can be a disaster! But if it is small, then we might be fine.

If we use `.persist()`, we are asking Dask to take all the computations and adjustments to the data that we have queued up, and run them, but then the object is going to remain distributed and will live on the cluster, not on the Jupyter instance. So when we do this with a Dask Dataframe, we are telling our cluster "Run the transformations we've queued, and leave this as a distributed Dask Dataframe."

So, if you want to process all the delayed tasks you've applied to a Dask object, either of these methods will do it. The difference is where your object will live at the end.

***


### Begin Calculations

Back to work! Date (as integer) is now our index, which will speed up other processes. Our data is sorted by this index within and across partitions.

> Note: `fare_amount` is the column we're going to work on, so here we'll average the fare by date, returning a Series that is the average fare per date. This means our end result is going to be the rolling average of the average daily fare - this may not be what you want to do in a real business case, but for this situation working with one value per date makes the computations easier to explain. We could change our grain to hour or minute, and aggregate that way, or not aggregate at all and fill in all the intervening time periods.


Calculate the daily mean base fare:

In [9]:
%%time
fares = taxi.groupby(by='datenum').fare_amount.mean().persist()
_ = wait(fares)

CPU times: user 89.4 ms, sys: 0 ns, total: 89.4 ms
Wall time: 683 ms


Let's take a quick look and see what we're working with.

In [40]:
%%time
fares.tail(10)

CPU times: user 3.92 ms, sys: 3.51 ms, total: 7.43 ms
Wall time: 30.4 ms


Unnamed: 0_level_0,date,daily_avg_fare_amount
datenum,Unnamed: 1_level_1,Unnamed: 2_level_1
1595462400000000000,2020-07-23,
1595548800000000000,2020-07-24,
1595635200000000000,2020-07-25,9.0
1595721600000000000,2020-07-26,
1595808000000000000,2020-07-27,
1595894400000000000,2020-07-28,
1595980800000000000,2020-07-29,
1596067200000000000,2020-07-30,
1596153600000000000,2020-07-31,
1596240000000000000,2020-08-01,


In [11]:
type(fares)

dask.dataframe.core.Series

Our taxi dataset has some unlikely extreme dates on the outer edges, so I'm going to filter by date just to make sure we have reliable data.

In [12]:
import numpy as np
t1 = datetime.datetime(2019, 1, 1,0,0,0)
t2 = np.datetime64(t1, 'ns')
t3 = t2.astype(int)

t1 = datetime.datetime(2020, 8, 1,0,0,0)
t2 = np.datetime64(t1, 'ns')
t4 = t2.astype(int)

taxi = taxi.loc[t3:t4]

## Optional: Fill Missing Dates in Index

If you don't have gaps in your date field, then you can ignore this entirely! But for many time series use cases, you'll have missing time points, and want to fill those in before calculating a rolling aggregation. The way we'll do that here is to create a brand new complete index, using pandas functions, and then we'll merge that with our starting dataset.


Create the index...

In [13]:
idx = pd.Series(pd.date_range('01-01-2019', '08-01-2020'), name='date')

Then make it a Dask Dataframe, so it can be compatible with our existing data.

In [14]:
idx_dd = dd.from_pandas(idx, npartitions=1).to_frame()

Format the date field that we've made just like we formatted our date field on the taxi data, so that we have integers.

In [15]:
idx_dd['datenum'] = idx_dd['date'].astype('datetime64[ns]').astype(int) 

And then set the new `datenum` field as index on our Dask dataframe.

In [16]:
idx_dd = idx_dd.set_index('datenum')

Excellent! So here's our index for merging.

In [17]:
idx_dd.head()

Unnamed: 0_level_0,date
datenum,Unnamed: 1_level_1
1546300800000000000,2019-01-01
1546387200000000000,2019-01-02
1546473600000000000,2019-01-03
1546560000000000000,2019-01-04
1546646400000000000,2019-01-05


### Apply New Index

So let's put it all together! We'll take our fares object, the daily average we calculated above, make it a Dask Dataframe, then merge it with the index Dask Dataframe. Remember, we are merging on index because that makes these tasks much, much faster with Dask.

In [18]:
fares = idx_dd.merge(fares.to_frame(), left_index= True, right_index = True, how="left")
fares = fares.rename(columns={"fare_amount": "daily_avg_fare_amount"})

Here we have it- every day in the desired range is present, with the human-readable date as well as the average fare for the day. Some days have no fares, and we can see that (NaN is shown for some of the tail rows).

In [19]:
fares.tail(20)
fares.head(10)

Unnamed: 0_level_0,date,daily_avg_fare_amount
datenum,Unnamed: 1_level_1,Unnamed: 2_level_1
1546300800000000000,2019-01-01,13.651428
1546387200000000000,2019-01-02,13.35681
1546473600000000000,2019-01-03,12.564294
1546560000000000000,2019-01-04,12.351668
1546646400000000000,2019-01-05,11.422265
1546732800000000000,2019-01-06,12.367822
1546819200000000000,2019-01-07,12.437283
1546905600000000000,2019-01-08,12.257762
1546992000000000000,2019-01-09,12.23954
1547078400000000000,2019-01-10,12.319


### Calculate Desired Feature

Now, we are ready to do our rolling average of the new `fare_amount` field. This is when understanding our Dask Dataframe is important. 

As a Dask Dataframe, our data is being stored under the surface in multiple pandas dataframes. This means we need to expect that our data is stored in different chunks, and because this next computation is order-specific/stateful, we need to get those chunks synchronized. We already made sure our dates were sorted correctly, now we need to make sure our rolling averages cross the partition breaks correctly.

This next line will take care of that for us! 
* `map_overlap`: "Map a function over blocks of arrays with some overlap" [says the docs](https://docs.dask.org/en/latest/array-overlap.html). Our chunks are time based (our index is date!), so this will let us make sure that rolling averages that cross the break between chunks will be lined up right. We'll use a 30 day overlap to get it.
* `rolling().mean()`: This is just what you think it is- once the data is lined up tidily with `map_overlap`, we can calculate the rolling mean at 30 days. The `min_periods` argument is important! This is how it knows that you will allow any 30 day segment with at least one non-NaN value to calculate. This is the equivalent of ignoring NaNs.
* `persist()`: All this is doing is making the computations cache on the cluster, so that the operations run when we ask them. (Remember the note above!)

In [20]:
%%time
rolling_fares_30d = fares.map_overlap(lambda fares: fares.rolling(30, min_periods=1).mean(), 30, 0).persist() 
rolling_fares_30d = rolling_fares_30d.rename(columns={"daily_avg_fare_amount": "roll_avg_fare_amount"})

CPU times: user 10.8 ms, sys: 239 µs, total: 11 ms
Wall time: 10.5 ms


In [41]:
%%time
rolling_fares2 = dd.concat([rolling_fares_30d,fares], axis = 1)
rolling_fares2.head(10)

CPU times: user 1.91 ms, sys: 8.48 ms, total: 10.4 ms
Wall time: 42 ms


Unnamed: 0_level_0,roll_avg_fare_amount,date,daily_avg_fare_amount
datenum,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1546300800000000000,13.651428,2019-01-01,13.651428
1546387200000000000,13.504119,2019-01-02,13.35681
1546473600000000000,13.190844,2019-01-03,12.564294
1546560000000000000,12.98105,2019-01-04,12.351668
1546646400000000000,12.669293,2019-01-05,11.422265
1546732800000000000,12.619048,2019-01-06,12.367822
1546819200000000000,12.593082,2019-01-07,12.437283
1546905600000000000,12.551167,2019-01-08,12.257762
1546992000000000000,12.516541,2019-01-09,12.23954
1547078400000000000,12.496787,2019-01-10,12.319


Check our math, to make sure we're getting what we expected...

In [42]:
(13.651428 + 13.356810 + 12.564294)/3

13.190843999999998

Spot check in the middle of the data...

In [23]:
rolling_fares2.loc[1571788800000000000:1572566400000000000].compute()

Unnamed: 0_level_0,roll_avg_fare_amount,date,daily_avg_fare_amount
datenum,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1571788800000000000,13.70877,2019-10-23,13.985914
1571875200000000000,13.705452,2019-10-24,14.315537
1571961600000000000,13.678487,2019-10-25,13.937396
1572048000000000000,13.629068,2019-10-26,13.05143
1572134400000000000,13.613382,2019-10-27,13.776347
1572220800000000000,13.643104,2019-10-28,13.746779
1572307200000000000,13.638153,2019-10-29,13.426783
1572393600000000000,13.642512,2019-10-30,13.396489
1572480000000000000,13.650316,2019-10-31,13.313771
1572566400000000000,13.639814,2019-11-01,13.432414


## Attach Feature to Original Dataset

Convert the Dask Series to a Dask Dataframe, and then merge on the shared indices ("date").

Are we sure the indices are comparable? We can check very easily.

In [33]:
rolling_fares2.index

Dask Index Structure:
npartitions=1
1546300800000000000    int64
1596240000000000000      ...
Name: datenum, dtype: int64
Dask Name: concat-indexed, 20 tasks

In [25]:
taxi.index

Dask Index Structure:
npartitions=126
1546300800000000000    int64
1546387200000000000      ...
                       ...  
1577577600000000000      ...
1596240000000000000      ...
Name: datenum, dtype: int64
Dask Name: loc, 379 tasks

### Finally, time to merge!

The merge itself is very fast here because it is lazily evaluated.
This creates our new dataset, including all dates and with averages calculated.

In [26]:
taxi_new = taxi.join(rolling_fares2, how='outer', lsuffix = "_day", rsuffix= "_rolled")

In [27]:
type(taxi_new)

dask.dataframe.core.DataFrame

In [28]:
len(taxi_new)

84398118

## Conclusion

And with that, our dataset is ready! 
* All our original fields (not all shown here, for ease of reading)
* Daily average fare
* 30 day rolling average of fare, if at least one fare found in the last 30 days

**Remember**, this has been appended back to the original object, which is too large to hold in memory, so we should not `.compute()` it.

In [37]:
taxi_new[['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime','passenger_count','trip_distance','tip_amount',
          'fare_amount', 'total_amount', 'date_day', 'roll_avg_fare_amount','date_rolled','daily_avg_fare_amount']].head(10)

Unnamed: 0_level_0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,tip_amount,fare_amount,total_amount,date_day,roll_avg_fare_amount,date_rolled,daily_avg_fare_amount
datenum,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
1546300800000000000,1.0,2019-01-01 16:02:52,2019-01-01 16:10:35,1.0,1.2,0.0,7.0,7.8,2019-01-01,13.651428,2019-01-01,13.651428
1546300800000000000,2.0,2019-01-01 16:55:02,2019-01-01 17:12:19,1.0,2.69,0.0,13.5,14.3,2019-01-01,13.651428,2019-01-01,13.651428
1546300800000000000,1.0,2019-01-01 16:12:37,2019-01-01 16:21:12,1.0,1.5,2.0,8.0,10.8,2019-01-01,13.651428,2019-01-01,13.651428
1546300800000000000,1.0,2019-01-01 16:40:23,2019-01-01 17:04:34,1.0,8.4,5.25,25.5,31.55,2019-01-01,13.651428,2019-01-01,13.651428
1546300800000000000,2.0,2019-01-01 16:13:35,2019-01-01 16:45:16,1.0,16.97,0.0,52.0,58.56,2019-01-01,13.651428,2019-01-01,13.651428
1546300800000000000,2.0,2019-01-01 16:47:03,2019-01-01 17:10:39,1.0,6.07,4.36,21.0,26.16,2019-01-01,13.651428,2019-01-01,13.651428
1546300800000000000,1.0,2019-01-01 16:35:30,2019-01-01 16:59:23,2.0,14.9,9.3,40.0,55.86,2019-01-01,13.651428,2019-01-01,13.651428
1546300800000000000,2.0,2019-01-01 16:02:28,2019-01-01 16:17:32,2.0,1.04,2.46,10.5,14.76,2019-01-01,13.651428,2019-01-01,13.651428
1546300800000000000,1.0,2019-01-01 16:54:30,2019-01-01 17:05:45,3.0,2.1,2.7,10.5,14.0,2019-01-01,13.651428,2019-01-01,13.651428
1546300800000000000,1.0,2019-01-01 16:28:04,2019-01-01 16:51:20,3.0,6.0,0.0,22.0,22.8,2019-01-01,13.651428,2019-01-01,13.651428


In [39]:
taxi_new[['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime','passenger_count','trip_distance','tip_amount',
          'fare_amount', 'total_amount', 'date_day', 'roll_avg_fare_amount','date_rolled','daily_avg_fare_amount']].tail(20)

Unnamed: 0_level_0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,tip_amount,fare_amount,total_amount,date_day,roll_avg_fare_amount,date_rolled,daily_avg_fare_amount
datenum,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
1594598400000000000,,NaT,NaT,,,,,,,20.504167,2020-07-13,
1594684800000000000,,NaT,NaT,,,,,,,21.705,2020-07-14,
1594771200000000000,,NaT,NaT,,,,,,,16.38125,2020-07-15,
1594857600000000000,,NaT,NaT,,,,,,,18.8,2020-07-16,
1594944000000000000,,NaT,NaT,,,,,,,18.8,2020-07-17,
1595030400000000000,,NaT,NaT,,,,,,,18.8,2020-07-18,
1595116800000000000,,NaT,NaT,,,,,,,18.8,2020-07-19,
1595203200000000000,,NaT,NaT,,,,,,,18.8,2020-07-20,
1595289600000000000,,NaT,NaT,,,,,,,18.8,2020-07-21,
1595376000000000000,,NaT,NaT,,,,,,,18.8,2020-07-22,
