# IOMax Evaluations



## Setup

In [60]:
import dask
import dask.dataframe as dd
import itertools as it
import numpy as np
import pandas as pd
import pyarrow
from time import time

## Query Definitions

Define indices & combinations

In [10]:
indices = ['file_name', 'proc_name', 'io_cat', 'acc_pat']
combinations = list(it.combinations(indices, r=2))[:5]

### Simple Queries

In [17]:
def base_simple_pandas_queries(log_dir):
    print(f",base_simple_pandas_queries")
    df = pd.read_parquet(log_dir, engine='pyarrow')
    query_index = 0
    for ix in ['file_name', 'proc_name']:
        for col in ['size', 'duration', 'index', 'bandwidth', 'io_cat']: 
            query_index += 1 
            t1 = time()
            res = df.groupby([ix]).agg({col: 'sum'})
            time_elapsed = time() - t1
            print(f"simple,Q{query_index},{time_elapsed},{res.memory_usage(deep=True).sum()}")

def base_simple_dask_queries(log_dir):
    print(f",base_simple_dask_queries")
    query_index = 0
    ddf = dd.read_parquet(f"{log_dir}/*.parquet", engine='pyarrow')
    for ix in ['file_name', 'proc_name']:
        for col in ['size', 'duration', 'index', 'bandwidth', 'io_cat']:
            query_index += 1 
            t1 = time()
            res = ddf.groupby([ix]).agg({col: 'sum'}).compute()
            time_elapsed = time() - t1
            print(f"{time_elapsed},{res.memory_usage(deep=True).sum()}")

def iomax_simple_pandas_queries(log_dir):
    print(f",iomax_simple_pandas_queries")
    query_index = 0
    df = pd.read_parquet(log_dir, engine='pyarrow')
    cols = ['size', 'duration', 'index', 'bandwidth', 'io_cat']
    agg_dict = {col: sum for col in cols}
    for ix in ['file_name', 'proc_name']:
        for col in cols:
            query_index += 1 
            t1 = time()
            m = 0
            if query_index == 1:
                x = df.groupby(['file_name', 'proc_name']).agg(agg_dict)
                m = x.memory_usage(deep=True).sum()
            x.groupby([ix]).agg({col: 'sum'})
            time_elapsed = time() - t1
            print(f"{time_elapsed},{m}")

def iomax_simple_dask_queries(log_dir):
    print(f",iomax_simple_dask_queries")
    query_index = 0
    ddf = dd.read_parquet(f"{log_dir}/*.parquet", engine='pyarrow')
    cols = ['size', 'duration', 'index', 'bandwidth', 'io_cat']
    agg_dict = {col: sum for col in cols}
    for ix in ['file_name', 'proc_name']:
        for col in cols:
            query_index += 1 
            t1 = time()
            m = 0
            if query_index == 1:
                x = ddf.groupby(['file_name', 'proc_name']).agg(agg_dict).compute()
                m = x.memory_usage(deep=True).sum()
            x.groupby([ix]).agg({col: 'sum'})
            time_elapsed = time() - t1
            print(f"{time_elapsed},{m}")

### Medium Queries

In [21]:
def base_medium_pandas_queries(log_dir):
    print(f",base_medium_pandas_queries")
    df = pd.read_parquet(log_dir, engine='pyarrow')
    query_index = 0
    for ix, iy in combinations:
        for col in ['size', 'duration']:
            query_index += 1 
            t1 = time()
            res = df.groupby([ix, iy]).agg({col: 'sum'}).groupby([ix]).sum()
            time_elapsed = time() - t1
            print(f"medium,Q{query_index},{time_elapsed},{res.memory_usage(deep=True).sum()}")
            

def base_medium_dask_queries(log_dir):
    print(f",base_medium_dask_queries")
    ddf = dd.read_parquet(f"{log_dir}/*.parquet", engine='pyarrow')
    query_index = 0
    for ix, iy in combinations:
        for col in ['size', 'duration']:
            query_index += 1 
            t1 = time()
            res = ddf.groupby([ix, iy]).agg({col: 'sum'}).groupby([ix]).sum().compute()
            time_elapsed = time() - t1
            print(f"{time_elapsed},{res.memory_usage(deep=True).sum()}")

def iomax_medium_pandas_queries(log_dir):
    print(f",iomax_medium_pandas_queries")
    df = pd.read_parquet(log_dir, engine='pyarrow')
    query_index = 0
    for ix, iy in combinations:
        for col in ['size', 'duration']:
            query_index += 1 
            t1 = time()
            m = 0
            if query_index == 1:
                x = df.groupby(indices).agg({'size': sum, 'duration': sum})
                m = x.memory_usage(deep=True).sum()
            x.groupby([ix, iy]).agg({col: 'sum'}).groupby([ix]).sum()   
            time_elapsed = time() - t1
            print(f"{time_elapsed},{m}")

def iomax_medium_dask_queries(log_dir):
    print(f",iomax_medium_dask_queries")
    ddf = dd.read_parquet(f"{log_dir}/*.parquet", engine='pyarrow')
    query_index = 0
    for ix, iy in combinations:
        for col in ['size', 'duration']:
            query_index += 1 
            t1 = time()
            m = 0
            if query_index == 1:
                x = ddf.groupby(indices).agg({'size': sum, 'duration': sum}).compute()
                m = x.memory_usage(deep=True).sum()
            x.groupby([ix, iy]).agg({col: 'sum'}).groupby([ix]).sum()
            time_elapsed = time() - t1
            print(f"{time_elapsed},{m}")

### Hard Queries

In [24]:
def base_hard_pandas_queries(log_dir):
    print(f",base_hard_pandas_queries")
    df = pd.read_parquet(log_dir, engine='pyarrow')
    query_index = 0
    for ix, iy in combinations:
        for col in ['size', 'duration']:
            query_index += 1
            t1 = time()
            res = df.groupby([ix]).agg({iy: list, col: sum}).explode(iy).reset_index().groupby([iy]).agg({ix: list, col: sum})
            time_elapsed = time() - t1
            print(f"hard,Q{query_index},{time_elapsed},{res.memory_usage(deep=True).sum()}")

def base_hard_dask_queries(log_dir):
    print(f",base_hard_dask_queries")
    ddf = dd.read_parquet(f"{log_dir}/*.parquet", engine='pyarrow')
    query_index = 0
    for ix, iy in combinations:
        for col in ['size', 'duration']:
            query_index += 1 
            t1 = time()
            res = ddf.groupby([ix]).agg({iy: list, col: sum}).explode(iy).reset_index().groupby([iy]).agg({ix: list, col: sum}).compute()
            time_elapsed = time() - t1
            print(f"{time_elapsed},{res.memory_usage(deep=True).sum()}")

def iomax_hard_pandas_queries(log_dir):
    print(f",iomax_hard_pandas_queries")
    df = pd.read_parquet(log_dir, engine='pyarrow')
    query_index = 0
    for ix, iy in combinations:
        for col in ['size', 'duration']:
            query_index += 1 
            t1 = time()
            m = 0
            if query_index == 1:
                x = df.groupby(indices).agg({'size': sum, 'duration': sum}).reset_index()
                m = x.memory_usage(deep=True).sum()
            x.groupby([ix]) \
                .agg({iy: list, col: sum}) \
                .reset_index() \
                .explode(iy) \
                .groupby([iy]) \
                .agg({ix: list, col: sum})
            time_elapsed = time() - t1
            print(f"{time_elapsed},{m}")

def iomax_hard_dask_queries(log_dir):
    print(f",iomax_hard_dask_queries")
    ddf = dd.read_parquet(f"{log_dir}/*.parquet", engine='pyarrow')
    query_index = 0
    for ix, iy in combinations:
        for col in ['size', 'duration']:
            query_index += 1 
            t1 = time()
            m = 0
            if query_index == 1:
                x = ddf.groupby(indices).agg({'size': sum, 'duration': sum}).reset_index().compute()
                m = x.memory_usage(deep=True).sum()
            x.groupby([ix]) \
                .agg({iy: list, col: sum}) \
                .reset_index() \
                .explode(iy) \
                .groupby([iy]) \
                .agg({ix: list, col: sum})
            time_elapsed = time() - t1
            print(f"{time_elapsed},{m}")

## Datasets

### Download Datasets

Define datasets

In [64]:
datasets = [
    'dataset-125m',
    'dataset-25m',
    'dataset-5m',
    'dataset-1m',
    'dataset-200k',
]

Create `datasets` folder

In [50]:
!mkdir -p datasets

Download datasets from Zenodo

In [71]:
for ds in datasets:
    !wget https://zenodo.org/record/8393987/files/{ds}.tar.gz?download=1 -O datasets/{ds}.tar.gz

--2023-09-29 19:24:53--  https://zenodo.org/record/8393987/files/dataset-125m.tar.gz?download=1
Resolving zenodo.org (zenodo.org)... 188.185.124.72
Connecting to zenodo.org (zenodo.org)|188.185.124.72|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1574979255 (1.5G) [application/octet-stream]
Saving to: ‘datasets/dataset-125m.tar.gz’

 1% [                                       ] 27,827,194  3.18MB/s  eta 8m 14s ^C
--2023-09-29 19:25:05--  https://zenodo.org/record/8393987/files/dataset-25m.tar.gz?download=1
Resolving zenodo.org (zenodo.org)... 188.185.124.72
Connecting to zenodo.org (zenodo.org)|188.185.124.72|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 314991520 (300M) [application/octet-stream]
Saving to: ‘datasets/dataset-25m.tar.gz’

 5% [=>                                     ] 17,030,140  3.20MB/s  eta 98s    ^C
--2023-09-29 19:25:12--  https://zenodo.org/record/8393987/files/dataset-5m.tar.gz?download=1
Resolving zenodo.or

Extract datasets

In [None]:
for ds in datasets:
    !mkdir -p datasets/{ds}
    !tar -xzvf datasets/{ds}.tar.gz -C datasets/{ds}

## Evaluations

### Simple Queries

#### 200k Records

In [77]:
base_simple_pandas_queries('datasets/dataset-200k')

,base_simple_pandas_queries
simple,Q1,0.05765700340270996,83196
simple,Q2,0.049272775650024414,80676
simple,Q3,0.04894280433654785,83196
simple,Q4,0.04857945442199707,80676
simple,Q5,0.048729658126831055,80676
simple,Q6,0.03307533264160156,4419
simple,Q7,0.032759904861450195,4247
simple,Q8,0.03292679786682129,4419
simple,Q9,0.032509565353393555,4247
simple,Q10,0.032901763916015625,4247


In [78]:
iomax_simple_pandas_queries('datasets/dataset-200k')

,iomax_simple_pandas_queries
0.10136675834655762,103210
0.0011987686157226562,0
0.0014908313751220703,0
0.0012526512145996094,0
0.0014369487762451172,0
0.0013620853424072266,0
0.0012354850769042969,0
0.0013713836669921875,0
0.0012328624725341797,0
0.0013530254364013672,0


In [79]:
base_simple_dask_queries('datasets/dataset-200k')

,base_simple_dask_queries
0.20629167556762695,83196
0.1940934658050537,80676
0.19583749771118164,83196
0.1996769905090332,80676
0.2173151969909668,80676
0.1799612045288086,4419
0.18725967407226562,4247
0.1804027557373047,4419
0.18625211715698242,4247
0.1864008903503418,4247


In [80]:
iomax_simple_dask_queries('datasets/dataset-200k')

,iomax_simple_dask_queries
0.24787020683288574,120834
0.0023605823516845703,0
0.0017986297607421875,0
0.0015556812286376953,0
0.0016965866088867188,0
0.001600027084350586,0
0.00145721435546875,0
0.0016016960144042969,0
0.0014636516571044922,0
0.0015964508056640625,0
