# Hierarchical aggregation of temporal data at scale using Dask

Hierarchical aggregation allows for efficient computation of mean, SD, min, and max aggregates at varying time resolutions, by resuing previous results and avoiding processing raw data as much as possible. The core idea is to store intermediate ("lossless") aggregates, which can be used to compute the final aggregates (mean, SD, min, and max) at current or coarser time resolution.

## Code framework of hierarchical aggregation

The framework contains utility tools for operating on dataframes (`rename`, `get_vars_from_flat_cols`, and `get_cols`), and three main functions for producing the aggregates `init_agg`, `coarsen_agg`, and `finalize_agg`.

### `init_agg` : raw → lossless
`init_agg` is the only function to apply to raw temporal data. It produces intermediate ("lossless") aggregates at the requested time resolution. In an ideal use case, `init_agg` is called only once and for the finest time resolution possibly required. Its outputs are to be stored and subsequently used to compute the final aggregates, including for any coarser time resolution.

### `coarsen_agg` : lossless → lossless
`coarsen_agg` takes "lossless" aggregates and aggregates them further to some coarser time resolution. For precision, the latter should ideally be a multiple of the time resolution in the input data (e.g. coarsening aggregates at 5-second resolution into 1-minute resolution). For best performance, always choose the coarsest time resolution for the input (e.g. it's faster to obtain 1-minute aggregates from 30-second aggregates than from 5-second aggregates).

### `finalize_agg` : lossless → final
`finalize_agg` takes "lossless" aggregates and produces the final aggregates (mean, SD, min, and max) for the same time resolution. The final aggregates cannot be used for further coarsening.

In [102]:
from functools import partial
from itertools import chain
from copy import copy
import pandas as pd
import dask.dataframe as dd


def rename(df):
    '''
    Rename columns to allow element-wise operations between dataframes of the same size.

    df: dataframe
    '''
    return df.rename(columns={col: i for i, col in enumerate(df.columns)})


def get_vars_from_flat_cols(df, not_vars=[]):
    '''
    Extract variable names as the first part of flat column names.
    
    df: dataframe
    not_vars: list of columns that should not be included as variables
    '''
    return pd.unique(['.'.join(col.split('.')[:-1]) for col in df.columns if col not in not_vars])
    
    
def get_cols(variables, agg_type):
    '''
    Produce column names for a particular aggregate type.
    
    variables: iterable with variable names
    agg_type: aggregate type identifier, e.g. 'count' or 'sum_sq_diff'
    '''
    return [f'{x}.{agg_type}' for x in variables]
    

def init_agg(df_raw, time_res, grouping_cols=[]):
    '''
    "Losslessly" aggregate raw data to given time resolution.
    
    df_raw: time-indexed dataframe to aggregate
    time_res: time resolution (e.g. '15s' or '2min')
    grouping_cols: list of columns to group by
    '''
    # Extract names of variables to aggregate.
    variables = [col for col in df_raw.columns if col not in grouping_cols]
    
    # Produce time column of the given time resolution.
    time_col = df_raw.index.name
    df_raw = df_raw.assign(**{time_col: lambda x: x.index.dt.floor(time_res).values})
    df_raw[time_col] = dd.to_datetime(df_raw[time_col])
    df_raw = df_raw.reset_index(drop=True)
    
    # Convert variable types to avoid overflow when computing variance.
    df_raw[variables] = df_raw[variables].astype('float')
    
    # Produce intermediate aggregates at the given time resolution.
    df_agg = df_raw.groupby(grouping_cols + [time_col]).agg(['count', 'sum', 'var', 'max', 'min'])

    # Flatten column names of the intermediate aggregates.
    cols = partial(get_cols, variables)
    df_agg.columns = chain.from_iterable(
        zip(cols('count'), cols('sum'), cols('sum_sq_diff'), cols('max'), cols('min')))

    # Change variance to "lossless" sum of squared differences from mean.
    sum_sq_diffs = rename(df_agg[cols('sum_sq_diff')])
    counts = rename(df_agg[cols('count')])
    df_agg[cols('sum_sq_diff')] = sum_sq_diffs.fillna(0) * (counts - 1)
    
    return df_agg


def coarsen_agg(df_agg, time_res, time_col, grouping_cols=[]):
    '''
    Coarsen time resolution of "losslessly" aggregated data.
    
    df_agg: "losslessly" aggregated dataframe
    time_res: new time resolution (e.g. '15s' or '2min'), at least as coarse as the current one
    grouping_cols: list of columns to group by
    '''
    # Ensure that time_col and grouping_cols are contained in the columns.
    if sum(1 for var in grouping_cols + [time_col] if var not in df_agg.columns) > 0:
        df_coarse = df_agg.reset_index()
    else:
        df_coarse = copy(df_agg)

    # Coarsen timestamp column to the given time resolution.
    df_coarse[time_col] = df_coarse[time_col].dt.floor(time_res).values
    
    # Produce coarsened mean broadcasted to the original shape.
    cols = partial(get_cols, get_vars_from_flat_cols(df_agg, not_vars=grouping_cols + [time_col]))
    counts = rename(df_coarse[cols('count')])
    sums = rename(df_coarse[cols('sum')])
    df_coarse_grouped = df_coarse.groupby(grouping_cols + [time_col])
    coarse_counts = rename(df_coarse_grouped[cols('count')].transform(
        'sum', meta=df_coarse[cols('count')]._meta))
    coarse_sums = rename(df_coarse_grouped[cols('sum')].transform(
        'sum', meta=df_coarse[cols('sum')]._meta))
    coarse_means = coarse_sums / coarse_counts
    
    # Produce sum of squared differences from coarsened mean.
    sum_sq_diffs = rename(df_coarse[cols('sum_sq_diff')])
    df_coarse[cols('sum_sq_diff')] = sum_sq_diffs + counts * (sums / counts - coarse_means)**2
    
    # Coarsen the "lossless" aggregates.
    cols_to_agg = chain.from_iterable(
        zip(cols('count'), cols('sum'), cols('sum_sq_diff'), cols('min'), cols('max')))
    agg_functions = ['sum', 'sum', 'sum', 'min', 'max'] * len(cols(''))
    df_coarse = df_coarse.groupby(grouping_cols + [time_col]).agg(dict(zip(cols_to_agg, agg_functions)))

    return df_coarse


def finalize_agg(df_agg, time_col, grouping_cols=[]):
    '''
    Compute final aggregates from losslessly aggregated data.
    
    df_agg: "losslessly" aggregated dataframe
    grouping_cols: list of columns to group by
    '''
    # Ensure that time_col and grouping_cols are contained in the columns.
    if sum(1 for var in grouping_cols + [time_col] if var not in df_agg.columns) > 0:
        df_agg = df_agg.reset_index()
    
    # Produce means and standard deviations.
    cols = partial(get_cols, get_vars_from_flat_cols(df_agg, not_vars=grouping_cols + [time_col]))
    counts = rename(df_agg[cols('count')])
    sums = rename(df_agg[cols('sum')])
    sum_sq_diffs = rename(df_agg[cols('sum_sq_diff')])
    means = (sums / counts).rename(columns={old: new for old, new in zip(counts.columns, cols('mean'))})
    stds = ((sum_sq_diffs / (counts - 1))**.5).rename(
        columns={old: new for old, new in zip(counts.columns, cols('std'))})
    
    # Compose final aggregates and reoder columns by variable.
    df_final = dd.concat([df_agg[grouping_cols + [time_col]], means, stds, df_agg[cols('min') + cols('max')]],
                         axis=1, ignore_unknown_divisions=True)
    df_final = df_final[grouping_cols + [time_col] + list(
        chain.from_iterable(zip(cols('mean'), cols('std'), cols('min'), cols('max'))))]
    
    return df_final

## Setting up the cluster

In [54]:
import random
from dask_jobqueue import SLURMCluster
from distributed import Client


# Set up Slurm cluster.
dashboard_port = random.randint(10000,60000)
cluster = SLURMCluster(scheduler_options={"dashboard_address": f":{dashboard_port}"})

# We print out the address you copy into the dask-labextension
print("Dashboard address for the dask-labextension")
print(f"/proxy/{dashboard_port}")

# Create the client object
client = Client(cluster)
client

Dashboard address for the dask-labextension
/proxy/27961


0,1
Client  Scheduler: tcp://10.43.202.87:42901  Dashboard: http://10.43.202.87:27961/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


In [103]:
cluster.scale(jobs=4)

## Reading raw data

In [113]:
%%time
import os
import fastparquet as fp

# Ensure temporary scratch space for this example.
TMP = f'{os.environ["MEMBERWORK"]}/gen150/.gears/gears/examples'

# Read raw data.
RAW_DATA = '/gpfs/alpine/stf218/proj-shared/stf008stc/openbmc.summit.raw/openbmc-20200429-*.parquet'
df_raw = dd.read_parquet(
    RAW_DATA, engine='fastparquet', index='timestamp',
    columns=['hostname', 'total_power', 'ambient'], gather_statistics=False, chunksize='100MB')

CPU times: user 13.2 s, sys: 1.12 s, total: 14.3 s
Wall time: 2min 17s


## Hierarchical aggregation

### Compute 15-second "lossless" aggregates from raw data

In [114]:
%%time
df_agg = init_agg(df_raw, '15s', grouping_cols=['hostname'])

CPU times: user 69.8 ms, sys: 3.05 ms, total: 72.9 ms
Wall time: 70.8 ms


### Store 15-second "lossless" aggregates to disk

In [115]:
%%time
AGG_15S_LOSSLESS = f'{TMP}/openbmc.summit.15s.lossless'
os.makedirs(AGG_15S_LOSSLESS, exist_ok=True)

# Reset index because Dask won't be able to recover the entire multi-index.
df_agg.reset_index().to_parquet(AGG_15S_LOSSLESS, engine='fastparquet')

CPU times: user 23.9 s, sys: 1.35 s, total: 25.2 s
Wall time: 4min 46s


### Compute 5-minute "lossless" aggregates from the multi-indexed output of `init_agg`

In [116]:
%%time
df_coarse = coarsen_agg(df_agg, '5min', 'timestamp', grouping_cols=['hostname'])

CPU times: user 241 ms, sys: 13 ms, total: 254 ms
Wall time: 250 ms


### ...or from its time-indexed disk version

In [117]:
%%time
df_agg = dd.read_parquet(
    AGG_15S_LOSSLESS, engine='fastparquet', index=False, gather_statistics=False, chunksize='100MB')
df_coarse = coarsen_agg(df_agg, '5min', 'timestamp', grouping_cols=['hostname'])

CPU times: user 129 ms, sys: 6.98 ms, total: 136 ms
Wall time: 257 ms


### Store 5-minute "lossless" aggregates to disk

In [118]:
%%time
AGG_5MIN_LOSSLESS = f'{TMP}/openbmc.summit.15s.lossless'
os.makedirs(AGG_5MIN_LOSSLESS, exist_ok=True)

df_coarse.reset_index().to_parquet(AGG_5MIN_LOSSLESS, engine='fastparquet')

CPU times: user 1.38 s, sys: 101 ms, total: 1.48 s
Wall time: 26.6 s


### Compute final aggregates at 5-minute resolution from the multi-indexed output of `coarse_agg`

In [119]:
%%time
df_final = finalize_agg(df_coarse, 'timestamp', grouping_cols=['hostname'])

CPU times: user 27.7 ms, sys: 1.99 ms, total: 29.7 ms
Wall time: 29 ms


### ...or from its time-indexed disk version

In [120]:
%%time
df_coarse = dd.read_parquet(
    AGG_5MIN_LOSSLESS, engine='fastparquet', index=False, gather_statistics=False, chunksize='100MB')
df_final = finalize_agg(df_coarse, 'timestamp', grouping_cols=['hostname'])

CPU times: user 29.5 ms, sys: 1.02 ms, total: 30.5 ms
Wall time: 143 ms


### Store 5-minute final aggregates to disk

In [121]:
%%time
AGG_5MIN = f'{TMP}/openbmc.summit.5min'
os.makedirs(AGG_5MIN, exist_ok=True)

df_final.reset_index().to_parquet(AGG_5MIN, engine='fastparquet')

CPU times: user 99.3 ms, sys: 5.83 ms, total: 105 ms
Wall time: 1.76 s


### Check the outcome

In [122]:
df_final.head()

Unnamed: 0,hostname,timestamp,total_power.mean,total_power.std,total_power.min,total_power.max,ambient.mean,ambient.std,ambient.min,ambient.max
0,a01n01,2020-04-29 00:00:00,1077.569811,540.773446,566.0,2231.0,22.781132,0.414261,22.0,23.0
1,a01n01,2020-04-29 00:05:00,1018.244275,520.101104,566.0,2229.0,23.0,0.0,23.0,23.0
2,a01n01,2020-04-29 00:10:00,853.309963,378.770956,570.0,2135.0,23.0,0.0,23.0,23.0
3,a01n01,2020-04-29 00:15:00,780.363636,333.05469,568.0,2136.0,22.325758,0.469547,22.0,23.0
4,a01n01,2020-04-29 00:20:00,671.046154,210.677923,557.0,1521.0,22.0,0.0,22.0,22.0


# Cleaning up

In [101]:
cluster.scale(jobs=0)

In [16]:
client.close()
cluster.close()