# Speed, Memory, and Disk Comparisons

In this notebook, we'll offer some rough comparisons of the computational performance implications of ESGPT vs. other competing pipelines. We'll focus these comparisons on several metrics:
  1. The time, runtime memory, and final disk space required to construct, pre-process, and store an ESGPT dataset relative to other pipelines, where applicable.
  2. The initialization time, iteration speed, and GPU memory costs for producing batches of data within the ESGPT framework vs. other systems.
  
In particular, we'll compare (or justify why they are inappropriate comparators) against the following pipelines:
  1. TemporAI
  2. OMOP-Learn
  3. FIDDLE
  4. MIMIC-Extract
  
We'll make these comparisons leveraging the synthetic data distributed with ESGPT's sample tutorial, but this code can also be ported to any other dataset to run these profiles locally.

In [1]:
%load_ext memory_profiler

import sys
sys.path.append('..')

In [2]:
import os
import numpy as np

from collections import defaultdict
from datetime import datetime, timedelta
from humanize import naturalsize, naturaldelta
from pathlib import Path
from sparklines import sparklines
from torch.utils.data import DataLoader
from tqdm.auto import tqdm
from typing import Callable

from EventStream.data.dataset_polars import Dataset
from EventStream.data.config import PytorchDatasetConfig
from EventStream.data.types import PytorchBatch
from EventStream.data.pytorch_dataset import PytorchDataset

In [3]:
dataset_dir = Path(os.getcwd()) / "processed/sample"

First, let's check and see how much disk space the dataset uses, and in what components

In [4]:
total_dataset_size = sum(f.stat().st_size for f in dataset_dir.glob('**/*') if f.is_file())
DL_reps_size = sum(f.stat().st_size for f in (dataset_dir / "DL_reps").glob('**/*') if f.is_file())
just_dataset_size = total_dataset_size - DL_reps_size

if (dataset_dir / "flat_reps").is_dir():
    flat_reps_size = sum(f.stat().st_size for f in (dataset_dir / "flat_reps").glob('**/*') if f.is_file())
    just_dataset_size -= flat_reps_size
    flat_reps_lines = [f"  * {naturalsize(flat_reps_size)} for the flat representation dataframes."]
else:
    flat_reps_lines = []

lines = [
    f"The total dataset takes up {naturalsize(total_dataset_size)} on disk, which includes:",
    f"  * {naturalsize(just_dataset_size)} for the core dataset.",
    f"  * {naturalsize(DL_reps_size)} for the deep-learning representation dataframes.",
] + flat_reps_lines

print('\n'.join(lines))

The total dataset takes up 164.5 MB on disk, which includes:
  * 19.5 MB for the core dataset.
  * 11.7 MB for the deep-learning representation dataframes.
  * 133.2 MB for the flat representation dataframes.


First, we'll note that loading a dataset doesn't require much of either resource. This is because the data is loaded lazily, so complex dataframe elements aren't loaded until they are needed. 

In [5]:
%%time
%%memit

ESD = Dataset.load(dataset_dir)

peak memory: 347.95 MiB, increment: 1.88 MiB
CPU times: user 126 ms, sys: 22.8 ms, total: 149 ms
Wall time: 259 ms


In [6]:
%%time
%%memit

s_df = ESD.subjects_df
e_df = ESD.events_df
dm_df = ESD.dynamic_measurements_df

Loading subjects from /home/mmd/Projects/EventStreamGPT/sample_data/processed/sample/subjects_df.parquet...
Loading events from /home/mmd/Projects/EventStreamGPT/sample_data/processed/sample/events_df.parquet...
Loading dynamic_measurements from /home/mmd/Projects/EventStreamGPT/sample_data/processed/sample/dynamic_measurements_df.parquet...
peak memory: 507.10 MiB, increment: 158.85 MiB
CPU times: user 376 ms, sys: 133 ms, total: 509 ms
Wall time: 339 ms


## Pytorch Dataset Stats
Now let's load a pytorch dataset and examine iteration speed and GPU memory cost:

In [7]:
def summarize(arr: list[float], strify: Callable[float, str] = naturalsize) -> str:
    mean, std, mn, mx = np.mean(arr), np.std(arr), np.min(arr), np.max(arr)
    simple_summ = f"{strify(mean)} ± {strify(std)} ({strify(mn)}-{strify(mx)})"
    
    if len(arr) < 25: return simple_summ
    
    hist_vals, hist_bins = np.histogram(arr)
    lines = [simple_summ, "Histogram:"]
    sparkline = sparklines(hist_vals)
    
    lines.extend(sparkline)
    left_end = strify(hist_bins[0])
    right_end = strify(hist_bins[1])
    W = len(sparkline[0]) - len(left_end) - len(right_end)
    
    if W > 0:
        lines.append(f"{left_end}{'-'*W}{right_end}")
    else:
        lines.append(f"o {left_end} (left endpoint)")
        lines.append(f"{'-'*(len(sparkline[0])-1)}o {right_end} (right endpoint)")
    return '\n'.join(lines)

def summarize_times(arr: list[float, timedelta]):
    as_seconds = [x / timedelta(seconds=1) for x in arr]
    return summarize(as_seconds, strify=lambda x: str(timedelta(seconds=x)))

In [8]:
%%time
%%memit
pyd_config = PytorchDatasetConfig(
    save_dir=ESD.config.save_dir,
    max_seq_len=32,
)
pyd = PytorchDataset(config=pyd_config, split='train')

peak memory: 836.55 MiB, increment: 329.29 MiB
CPU times: user 2.12 s, sys: 191 ms, total: 2.31 s
Wall time: 2.1 s


In [9]:
%%time
%%memit

batch_size = 16
n_iter_samples = 30

dataloader = DataLoader(pyd, collate_fn=pyd.collate, batch_size=batch_size)

batch_sizes = defaultdict(list)
total_sizes = []
for batch in tqdm(dataloader, leave=False):
    total_size = 0
    for k, v in batch.items():
        if v is None: continue
        el_size = v.element_size() * v.nelement()
        batch_sizes[k].append(el_size)
        total_size += el_size
    total_sizes.append(total_size)

batch_iteration_times = []
for samp in tqdm(list(range(n_iter_samples)), leave=False, desc="Sampling Dataloader Iteration Speed"):
    dataloader = DataLoader(pyd, collate_fn=pyd.collate, batch_size=batch_size, shuffle=True)
    st = datetime.now()
    for batch in tqdm(dataloader, leave=False, desc="Sampling Batch"):
        pass
    batch_iteration_times.append((datetime.now() - st) / len(dataloader))
    
print(
    f"Iterating through an entire dataloader of {len(dataloader)} batches of size {batch_size} "
    f"took the following time per batch:\n{summarize_times(batch_iteration_times)}\n\n"
    f"Total batch size:\n{summarize(total_sizes)}"
)
for k, v in batch_sizes.items():
    print(f"  Size of {k}:\n    {summarize(v)}")

  0%|          | 0/5 [00:00<?, ?it/s]

Sampling Dataloader Iteration Speed:   0%|          | 0/30 [00:00<?, ?it/s]

Sampling Batch:   0%|          | 0/5 [00:00<?, ?it/s]

Sampling Batch:   0%|          | 0/5 [00:00<?, ?it/s]

Sampling Batch:   0%|          | 0/5 [00:00<?, ?it/s]

Sampling Batch:   0%|          | 0/5 [00:00<?, ?it/s]

Sampling Batch:   0%|          | 0/5 [00:00<?, ?it/s]

Sampling Batch:   0%|          | 0/5 [00:00<?, ?it/s]

Sampling Batch:   0%|          | 0/5 [00:00<?, ?it/s]

Sampling Batch:   0%|          | 0/5 [00:00<?, ?it/s]

Sampling Batch:   0%|          | 0/5 [00:00<?, ?it/s]

Sampling Batch:   0%|          | 0/5 [00:00<?, ?it/s]

Sampling Batch:   0%|          | 0/5 [00:00<?, ?it/s]

Sampling Batch:   0%|          | 0/5 [00:00<?, ?it/s]

Sampling Batch:   0%|          | 0/5 [00:00<?, ?it/s]

Sampling Batch:   0%|          | 0/5 [00:00<?, ?it/s]

Sampling Batch:   0%|          | 0/5 [00:00<?, ?it/s]

Sampling Batch:   0%|          | 0/5 [00:00<?, ?it/s]

Sampling Batch:   0%|          | 0/5 [00:00<?, ?it/s]

Sampling Batch:   0%|          | 0/5 [00:00<?, ?it/s]

Sampling Batch:   0%|          | 0/5 [00:00<?, ?it/s]

Sampling Batch:   0%|          | 0/5 [00:00<?, ?it/s]

Sampling Batch:   0%|          | 0/5 [00:00<?, ?it/s]

Sampling Batch:   0%|          | 0/5 [00:00<?, ?it/s]

Sampling Batch:   0%|          | 0/5 [00:00<?, ?it/s]

Sampling Batch:   0%|          | 0/5 [00:00<?, ?it/s]

Sampling Batch:   0%|          | 0/5 [00:00<?, ?it/s]

Sampling Batch:   0%|          | 0/5 [00:00<?, ?it/s]

Sampling Batch:   0%|          | 0/5 [00:00<?, ?it/s]

Sampling Batch:   0%|          | 0/5 [00:00<?, ?it/s]

Sampling Batch:   0%|          | 0/5 [00:00<?, ?it/s]

Sampling Batch:   0%|          | 0/5 [00:00<?, ?it/s]

Iterating through an entire dataloader of 5 batches of size 16 took the following time per batch:
0:00:00.029526 ± 0:00:00.002528 (0:00:00.026819-0:00:00.037488)
Histogram:
█▄▄▄▂▃▁▂▁▂
o 0:00:00.026819 (left endpoint)
---------o 0:00:00.027886 (right endpoint)

Total batch size:
200.7 kB ± 40.6 kB (142.6 kB-250.1 kB)
  Size of event_mask:
    512 Bytes ± 0 Bytes (512 Bytes-512 Bytes)
  Size of time_delta:
    2.0 kB ± 0 Bytes (2.0 kB-2.0 kB)
  Size of static_indices:
    128 Bytes ± 0 Bytes (128 Bytes-128 Bytes)
  Size of static_measurement_indices:
    128 Bytes ± 0 Bytes (128 Bytes-128 Bytes)
  Size of dynamic_indices:
    75.4 kB ± 15.5 kB (53.2 kB-94.2 kB)
  Size of dynamic_measurement_indices:
    75.4 kB ± 15.5 kB (53.2 kB-94.2 kB)
  Size of dynamic_values:
    37.7 kB ± 7.7 kB (26.6 kB-47.1 kB)
  Size of dynamic_values_mask:
    9.4 kB ± 1.9 kB (6.7 kB-11.8 kB)
peak memory: 810.73 MiB, increment: 12.79 MiB
CPU times: user 5.73 s, sys: 221 ms, total: 5.95 s
Wall time: 5.05 s


## Other Pipelines
### TemporAI Format

In [20]:
import pandas as pd
import polars as pl
import polars.selectors as cs

In [21]:
def ESD_to_temporai(ESD: Dataset) -> tuple[pd.DataFrame, pd.DataFrame]:
    """Converts an ESD data format into a TemporAI dataset format."""

    static_df = (
        ESD.subjects_df
        .select(
            'subject_id',
            *[pl.col(c) for c, cfg in ESD.measurement_configs.items() if cfg.temporality == 'static']
        )
        .to_pandas()
        .set_index("subject_id")
    )
    
    # For the time-series dataframe, as they need only one row per subject ID, timestamp, we need to use the wide
    # format of the flat representation. 
    
    flat_reps_dir = ESD.config.save_dir / "flat_reps" / "raw"
    if not flat_reps_dir.is_dir():
        raise FileNotFoundError(f"Must have pre-cached flat representations at {flat_reps_dir}!")
        
    time_series_df = (
        pl.scan_parquet(flat_reps_dir / "*" / "*.parquet")
        .select("subject_id", "timestamp", cs.starts_with("dynamic"))
        .collect()
        .to_pandas()
        .set_index(["subject_id", "timestamp"])
    )
    
    return static_df, time_series_df

In [24]:
%%time
%%memit
# We need to convert to a flat format prior to getting temporai representations.
# The performance #s here are not reliable as these files may be already generated.
ESD.cache_flat_representation(
    subjects_per_output_file=None,
    feature_inclusion_frequency=None,
    do_overwrite=False,
    do_update=True,
)

Flattening Splits:   0%|          | 0/3 [00:00<?, ?it/s]

Subject chunks:   0%|          | 0/1 [00:00<?, ?it/s]

Subject chunks:   0%|          | 0/1 [00:00<?, ?it/s]

Subject chunks:   0%|          | 0/1 [00:00<?, ?it/s]

peak memory: 1972.99 MiB, increment: 0.10 MiB
CPU times: user 319 ms, sys: 75.8 ms, total: 395 ms
Wall time: 529 ms


In [22]:
%%time
%%memit

temporai_static, temporai_ts = ESD_to_temporai(ESD)

peak memory: 2439.01 MiB, increment: 963.00 MiB
CPU times: user 1.46 s, sys: 912 ms, total: 2.37 s
Wall time: 1.16 s


In [26]:
print(
    f"TemporAI uses two dataframes, a static dataframe of shape {temporai_static.shape} "
    f"and a time series dataframe of shape {temporai_ts.shape}."
)

TemporAI uses two dataframes, a static dataframe of shape (100, 1) and a time series dataframe of shape (530742, 160).


Let's save these dataframes to disk, so we can inspect their disk cost and the memory cost to re-load them from scratch.

In [28]:
save_dir = Path("./speed_comparisons/temporai/compressed")
save_dir.mkdir(parents=True, exist_ok=True)

temporai_static.to_parquet(save_dir / "static.parquet")
temporai_ts.to_parquet(save_dir / "ts.parquet")

uncompressed_save_dir = Path("./speed_comparisons/temporai/uncompressed")
uncompressed_save_dir.mkdir(parents=True, exist_ok=True)

temporai_static.to_parquet(uncompressed_save_dir / "static.parquet", compression=None)
temporai_ts.to_parquet(uncompressed_save_dir / "ts.parquet", compression=None)

compressed_temporai_size = sum(f.stat().st_size for f in save_dir.glob('**/*') if f.is_file())
uncompressed_temporai_size = sum(f.stat().st_size for f in uncompressed_save_dir.glob('**/*') if f.is_file())

print(
    f"The compressed data takes up {naturalsize(compressed_temporai_size)} on disk.\n"
    f"The uncompressed data takes up {naturalsize(uncompressed_temporai_size)} on disk "
    "(this is a good approximation of memory cost as it is uncompressed)."
)

The compressed data takes up 23.9 MB on disk.
The uncompressed data takes up 26.0 MB on disk (this is a good approximation of memory cost as it is uncompressed).


In [29]:
%%time
%%memit

temporai_static = pd.read_parquet(save_dir / "static.parquet")
temporai_ts = pd.read_parquet(save_dir / "ts.parquet")

peak memory: 2948.04 MiB, increment: 929.04 MiB
CPU times: user 1.11 s, sys: 1.06 s, total: 2.17 s
Wall time: 1.08 s
