# Examle: Dask Pipeline with Caching 

## Setup

In [1]:
import time

import apipe
import dask
import numpy as np
import pandas as pd
from dask import delayed
from loguru import logger

## Basic dataflow 

In [2]:
# --- Define a pipeline (no computation is done)
@apipe.delayed_cached()
def load_1():
    time.sleep(2)
    df = pd.DataFrame({'a': [1., 2., 2.], 'b': [0.1, np.nan, 0.2]})
    logger.info(f'Loaded {len(df)} records')
    return df

@apipe.delayed_cached()
def load_2(timestamp):
    assert timestamp > pd.Timestamp("2000-01-01")
    time.sleep(2)
    df = pd.DataFrame({'a': [0.9, 3., 3.], 'b': [0.001, 1., 0.43]})
    logger.info(f'Loaded {len(df)} records')
    return df

@apipe.delayed_cached()
def compute_diff(x, y, eps):
    time.sleep(2)
    assert x.shape == y.shape
    diff = ((x - y).abs() / (y.abs() + eps))
    logger.info('Difference is computed')
    return diff

@apipe.delayed_cached(nout=2)
def compute_stats(diff):
    diff_mean = diff.mean().mean()
    diff_std = diff.std().std()
    logger.info('Stats are computed')
    return diff_mean, diff_std
    
ts = pd.Timestamp(2019, 1, 1)
eps = 0.01
s1 = load_1()
s2 = load_2(ts)
diff = compute_diff(s1, s2, eps)
diff_mean, diff_std = compute_stats(diff)

In [3]:
%%time
# --- Compute #1: all computations are run and data is saved to cache
_mean, _std = apipe.delayed_compute((diff_mean, diff_std))
print(f'diff: mean={_mean:.2f} std={_std:.2f}')

2021-12-17 16:36:25.957 | DEBUG    | apipe._cached:new_foo:359 - Cache not found: Meta file is missing: /home/nameless/dev/proj/ds-examples/dataflows/cache/load_1.meta
2021-12-17 16:36:25.958 | DEBUG    | apipe._cached:new_foo:359 - Cache not found: Meta file is missing: /home/nameless/dev/proj/ds-examples/dataflows/cache/load_2_2019-01-01 00:00:00.meta
2021-12-17 16:36:27.962 | INFO     | __main__:load_2:14 - Loaded 3 records
2021-12-17 16:36:27.964 | INFO     | __main__:load_1:6 - Loaded 3 records
2021-12-17 16:36:27.966 | DEBUG    | apipe._cached:dump:244 - Task load_2_2019-01-01 00:00:00: data has been saved to cache
2021-12-17 16:36:27.967 | DEBUG    | apipe._cached:dump:244 - Task load_1: data has been saved to cache
2021-12-17 16:36:27.968 | INFO     | apipe._cached:new_foo:391 - Task load_2_2019-01-01 00:00:00: data has been computed and saved to cache
2021-12-17 16:36:27.969 | INFO     | apipe._cached:new_foo:391 - Task load_1: data has been computed and saved to cache
2021-12

diff: mean=2.51 std=4.15
CPU times: user 21.6 ms, sys: 16.9 ms, total: 38.5 ms
Wall time: 4.03 s


In [4]:
%%time
# --- Compute #2: Second run: all data is loaded from cache
_mean, _std = apipe.delayed_compute((diff_mean, diff_std))
print(f'diff: mean={_mean:.2f} std={_std:.2f}')

2021-12-17 16:36:29.998 | INFO     | apipe._cached:new_foo:372 - Task load_1: skip (cache exists)
2021-12-17 16:36:29.999 | INFO     | apipe._cached:new_foo:372 - Task load_2_2019-01-01 00:00:00: skip (cache exists)
2021-12-17 16:36:30.002 | INFO     | apipe._cached:new_foo:372 - Task compute_diff_11866620178438019178_9252423425142403179_0.01: skip (cache exists)
2021-12-17 16:36:30.004 | INFO     | apipe._cached:new_foo:372 - Task compute_stats_15800421005260689483: skip (cache exists)
2021-12-17 16:36:30.006 | DEBUG    | apipe._cached:load:228 - Task compute_stats_15800421005260689483: data has been loaded from cache


diff: mean=2.51 std=4.15
CPU times: user 14.8 ms, sys: 0 ns, total: 14.8 ms
Wall time: 11.7 ms


## Dataflow with parameters

In [5]:
# --- Define a pipeline (no computation is done)
params = apipe.DelayedParameters()
eps = params.create("eps", 0.1)
ts = params.create("ts", pd.Timestamp("2020-10-23"))

@apipe.delayed_cached()
def load_1():
    time.sleep(2)
    df = pd.DataFrame({'a': [1., 2., 2.], 'b': [0.1, np.nan, 0.2]})
    logger.info(f'Loaded {len(df)} records')
    return df

@apipe.delayed_cached()
def load_2(timestamp):
    assert timestamp > pd.Timestamp("2000-01-01")
    time.sleep(2)
    df = pd.DataFrame({'a': [0.9, 3., 3.], 'b': [0.001, 1., 0.43]})
    logger.info(f'Loaded {len(df)} records')
    return df

@apipe.delayed_cached()
def compute_diff(x, y, eps):
    time.sleep(2)
    assert x.shape == y.shape
    diff = ((x - y).abs() / (y.abs() + eps))
    logger.info('Difference is computed')
    return diff

@apipe.delayed_cached(nout=2)
def compute_stats(diff):
    diff_mean = diff.mean().mean()
    diff_std = diff.std().std()
    logger.info('Stats are computed')
    return diff_mean, diff_std
    
s1 = load_1()
s2 = load_2(ts)
diff = compute_diff(s1, s2, eps)
diff_mean, diff_std = compute_stats(diff)


# (Alternative way to define parameters)
# eps = apipe.DelayedParameter("eps", 0.1)
# ts = apipe.DelayedParameter("ts", pd.Timestamp("2020-10-23"))
# ...
# s2 = load_2(ts())
# diff = compute_diff(s1, s2, eps())

In [6]:
%%time
# --- Compute #1: all computations are run and data is saved to cache
_mean, _std = apipe.delayed_compute((diff_mean, diff_std))
print(f'diff: mean={_mean:.2f} std={_std:.2f}')

2021-12-17 16:36:30.029 | DEBUG    | apipe._cached:new_foo:359 - Cache not found: Meta file is missing: /home/nameless/dev/proj/ds-examples/dataflows/cache/load_2_2020-10-23 00:00:00.meta
2021-12-17 16:36:30.030 | INFO     | apipe._cached:new_foo:372 - Task load_1: skip (cache exists)
2021-12-17 16:36:32.033 | INFO     | __main__:load_2:18 - Loaded 3 records
2021-12-17 16:36:32.038 | DEBUG    | apipe._cached:dump:244 - Task load_2_2020-10-23 00:00:00: data has been saved to cache
2021-12-17 16:36:32.042 | INFO     | apipe._cached:new_foo:391 - Task load_2_2020-10-23 00:00:00: data has been computed and saved to cache
2021-12-17 16:36:32.048 | DEBUG    | apipe._cached:__cached_hash__:263 - Task load_2_2020-10-23 00:00:00: hash has been computed from data
2021-12-17 16:36:32.050 | DEBUG    | apipe._cached:new_foo:359 - Cache not found: Meta file is missing: /home/nameless/dev/proj/ds-examples/dataflows/cache/compute_diff_11866620178438019178_9252423425142403179_0.1.meta
2021-12-17 16:36:

diff: mean=0.48 std=0.18
CPU times: user 59.1 ms, sys: 1.71 ms, total: 60.8 ms
Wall time: 4.05 s


In [7]:
%%time
# --- Compute #2: Second run: all data is loaded from cache
_mean, _std = apipe.delayed_compute((diff_mean, diff_std))
print(f'diff: mean={_mean:.2f} std={_std:.2f}')

2021-12-17 16:36:34.084 | INFO     | apipe._cached:new_foo:372 - Task load_2_2020-10-23 00:00:00: skip (cache exists)
2021-12-17 16:36:34.085 | INFO     | apipe._cached:new_foo:372 - Task load_1: skip (cache exists)
2021-12-17 16:36:34.087 | INFO     | apipe._cached:new_foo:372 - Task compute_diff_11866620178438019178_9252423425142403179_0.1: skip (cache exists)
2021-12-17 16:36:34.088 | INFO     | apipe._cached:new_foo:372 - Task compute_stats_7655155766694278480: skip (cache exists)
2021-12-17 16:36:34.090 | DEBUG    | apipe._cached:load:228 - Task compute_stats_7655155766694278480: data has been loaded from cache


diff: mean=0.48 std=0.18
CPU times: user 11.9 ms, sys: 152 µs, total: 12 ms
Wall time: 8.6 ms
