In [None]:
from pasteur.kedro.ipython import * # type: ignore
register_kedro() # type: ignore

%load_ext autoreload
%autoreload 2

[0m[34mINFO    [0m Kedro project Pasteur                                                                                           [2m__init__.py[0m[2m:[0m[2m77[0m
[0m[34mINFO    [0m Defined global variable [32m'context'[0m, [32m'session'[0m, [32m'catalog'[0m and [32m'pipelines'[0m                                         [2m__init__.py[0m[2m:[0m[2m78[0m
[0m[34mINFO    [0m Registered line magic [32m'run_viz'[0m                                                                                 [2m__init__.py[0m[2m:[0m[2m84[0m


In [None]:
from pasteur.hierarchy import rebalance_attributes
from pasteur.marginal import MarginalOracle

old_attrs = catalog.load("mimic_billion.trn.table")["idx"].get_attributes()
wrk = catalog.load("mimic_billion.wrk.idx_table")

m = MarginalOracle(old_attrs, wrk)
counts = m.get_counts()
m.close()

attrs = rebalance_attributes(counts, old_attrs, fixed=[2, 4, 8, 16, 32, 48], u=4)

[0m[34mINFO    [0m Processed [1;36m16[0m marginals.                                                                                          [2moracle.py[0m[2m:[0m[2m603[0m


In [None]:
from pasteur.marginal import AttrSelector
from pasteur.marginal.oracle import parallel_load
from pasteur.marginal.memory import map_to_memory, load_from_memory
from pasteur.utils import LazyDataset, LazyPartition
from pasteur.utils.progress import init_pool
import time

In [None]:
mar = {
    "gender": AttrSelector("gender", 0, {"gender": 0}),
    "warning": AttrSelector("warning", 0, {"warning": 0}),
    "intime": AttrSelector("intime", 0, {"intime_day": 1}),
    "outtime": AttrSelector("outtime", 0, {"outtime_day": 1}),
    "charttime": AttrSelector(
        "charttime", 0, {"charttime_day": 0, "charttime_time": 2}
    ),
}

In [None]:
from functools import partial

sample = wrk.sample()

def get_small_dataset(n):
    return LazyDataset(merged_load=LazyPartition(lambda: sample[:n], lambda: sample[:n].shape))

def get_big_dataset(n):
    return LazyDataset(merged_load=None, partitions=dict(list(wrk._partitions.items())[:n]))


In [None]:
import pasteur.utils.progress
pasteur.utils.progress.JUPYTER_MAX_NEST = 0

In [None]:
M_base = 6e11

tests = [
    (20_000, get_small_dataset(20_000), 1_000_000, 100_000),
    (100_000, get_small_dataset(100_000), 1_000_000, 100_000),
    (500_000, get_small_dataset(500_000), 1_000_000, 50_000),
    (1_000_000, get_small_dataset(1_000_000), 500_000, 25_000),
    (5_000_000, get_small_dataset(5_000_000), 100_000, 5_000),
    (10_000_000, get_small_dataset(10_000_000), 50_000, 2_500),
    (50_000_000, get_big_dataset(3), 25_000, 2_000),
    (100_000_000, get_big_dataset(6), 15_000, 1_000),
    (500_000_000, get_big_dataset(31), 4_000, 200),
    (1_000_000_000, wrk, 1_500, 35),
]

In [None]:
get_big_dataset(3).shape

[1m([0m[1;36m50059779[0m, [1;36m16[0m[1m)[0m


In [None]:
pasteur.utils.progress.IS_SUBPROCESS = True

print("> Single Core")
for N, ds, _, M in tests:

    with MarginalOracle(attrs, ds, mode="inmemory_shared", max_worker_mult=1, log=False) as m:
        with init_pool():
            m.load_data()
        reqs = [mar for _ in range(M)]
        
        with init_pool(max_workers=1):
            start = time.perf_counter()
            m.process(reqs, desc=f"N={N:,}")
            end = time.perf_counter()

        print(f"N={N: 15,d}: {(M / (end - start)):>10.3f} m/s ({end - start:>6.1f}s)")

pasteur.utils.progress.IS_SUBPROCESS = False


> Single Core
N=         20,000:  16813.807 m/s (   5.9s)
N=        100,000:   7136.520 m/s (  14.0s)
N=        500,000:   1607.646 m/s (  31.1s)
N=      1,000,000:    754.892 m/s (  33.1s)
N=      5,000,000:    160.452 m/s (  31.2s)
N=     10,000,000:     81.071 m/s (  30.8s)
N=     50,000,000:     16.073 m/s ( 124.4s)
N=    100,000,000:      8.034 m/s ( 124.5s)
N=    500,000,000:      1.601 m/s ( 124.9s)
N=  1,000,000,000:      0.832 m/s (  42.0s)


In [None]:
print("> Shared Memory")
for N, ds, M, _ in tests:
    with init_pool(), MarginalOracle(attrs, ds, mode="inmemory_shared", max_worker_mult=1, log=False) as m:
        m.load_data()
        reqs = [mar for _ in range(M)]
        
        start = time.perf_counter()
        m.process(reqs, desc=f"N={N:,}")
        end = time.perf_counter()

        print(f"N={N: 15,d}: {(M / (end - start)):>10.3f} m/s ({end - start:>6.1f}s)")


> Shared Memory
N=         20,000:  26589.471 m/s (  37.6s)
N=        100,000:  22948.376 m/s (  43.6s)
N=        500,000:  14391.234 m/s (  69.5s)
N=      1,000,000:   8983.527 m/s (  55.7s)
N=      5,000,000:    935.436 m/s ( 106.9s)
N=     10,000,000:    413.813 m/s ( 120.8s)
N=     50,000,000:    122.456 m/s ( 204.2s)
N=    100,000,000:     98.648 m/s ( 152.1s)
N=    500,000,000:     18.605 m/s ( 215.0s)
N=  1,000,000,000:     10.128 m/s ( 197.5s)


In [None]:
print("> Unique Copy")
for N, ds, M, _ in tests:
    if N > 52_000_000:
        continue

    with init_pool(), MarginalOracle(attrs, ds, mode="inmemory_copy", max_worker_mult=1, log=False) as m:
        m.load_data()
        reqs = [mar for _ in range(M)]
        
        start = time.perf_counter()
        m.process(reqs, desc=f"N={N:,}")
        end = time.perf_counter()

        print(f"N={N: 15,d}: {(M / (end - start)):>10.3f} m/s ({end - start:>6.1f}s)")


> Unique Copy
N=         20,000:  26614.486 m/s (  37.6s)
N=        100,000:  23344.753 m/s (  42.8s)
N=        500,000:  13172.459 m/s (  75.9s)
N=      1,000,000:   8897.922 m/s (  56.2s)
N=      5,000,000:   2325.286 m/s (  43.0s)
N=     10,000,000:   1167.204 m/s (  42.8s)
N=     50,000,000:    222.590 m/s ( 112.3s)


In [None]:
print("> Unique Partition")
for N, ds, M, _ in tests:
    M = min(M, 10000)
    if N < 10_000_000:
        continue

    with init_pool(), MarginalOracle(attrs, ds, mode="inmemory_batched", max_worker_mult=1, log=False, repartitions=32) as m:
        m.load_data()
        reqs = [mar for _ in range(M)]
        
        start = time.perf_counter()
        m.process(reqs, desc=f"N={N:,}")
        end = time.perf_counter()

        print(f"N={N: 15,d}: {(M / (end - start)):>10.3f} m/s ({end - start:>6.1f}s)")


> Unique Partition
N=     10,000,000:    469.820 m/s (  21.3s)
N=     50,000,000:    179.130 m/s (  55.8s)
N=    100,000,000:    103.713 m/s (  96.4s)
N=    500,000,000:     25.505 m/s ( 156.8s)
N=  1,000,000,000:     12.394 m/s ( 121.0s)
