In [None]:
import coiled
import dask
import dask.dataframe as dd
from dask.distributed import Client, wait

In [None]:
%%time
#old default
with dask.config.set({"distributed.scheduler.worker-saturation": "inf"}):
    cluster_old_def = coiled.Cluster(name="dfalign-task-queuing-old-default",
                            n_workers=10,
                            wait_for_workers=True,
                            scheduler_options={"idle_timeout": "1 hours"}                                 
                            )


In [None]:
%%time
#current default
with dask.config.set({"distributed.scheduler.worker-saturation": 1.1}):
    cluster_ws = coiled.Cluster(name="dfalign-task-queuing-new-default",
                            n_workers=10,
                            wait_for_workers=True,
                            scheduler_options={"idle_timeout": "1 hours"}
                            )


In [None]:
client_old_def = Client(cluster_old_def)
client_ws = Client(cluster_ws)

In [None]:
#utility functions

In [None]:
from __future__ import annotations
import dask.array as da
import distributed
import numpy as np
import pandas as pd
from dask.datasets import timeseries
from dask.sizeof import sizeof
from dask.utils import format_bytes, parse_bytes

def cluster_memory(client: distributed.Client) -> int:
    """Total memory available on the cluster, in bytes"""
    return int(
        sum(w["memory_limit"] for w in client.scheduler_info()["workers"].values())
    )


def timeseries_of_size(
    target_nbytes: int | str,
    *,
    start="2000-01-01",
    freq="1s",
    partition_freq="1d",
    dtypes={"name": str, "id": int, "x": float, "y": float},
    seed=None,
    **kwargs,
) -> dd.DataFrame:
    """
    Generate a `dask.demo.timeseries` of a target total size.
    Same arguments as `dask.demo.timeseries`, but instead of specifying an ``end`` date,
    you specify ``target_nbytes``. The number of partitions is set as necessary to reach
    approximately that total dataset size. Note that you control the partition size via
    ``freq``, ``partition_freq``, and ``dtypes``.
    Examples
    --------
    >>> timeseries_of_size(
    ...     "1mb", freq="1s", partition_freq="100s", dtypes={"x": float}
    ... ).npartitions
    278
    >>> timeseries_of_size(
    ...     "1mb", freq="1s", partition_freq="100s", dtypes={i: float for i in range(10)}
    ... ).npartitions
    93
    Notes
    -----
    The ``target_nbytes`` refers to the amount of RAM the dask DataFrame would use up
    across all workers, as many pandas partitions.
    This is typically larger than ``df.compute()`` would be as a single pandas
    DataFrame. Especially with many partions, there can be significant overhead to
    storing all the individual pandas objects.
    Additionally, ``target_nbytes`` certainly does not correspond to the size
    the dataset would take up on disk (as parquet, csv, etc.).
    """
    if isinstance(target_nbytes, str):
        target_nbytes = parse_bytes(target_nbytes)

    start_dt = pd.to_datetime(start)
    partition_freq_dt = pd.to_timedelta(partition_freq)
    example_part = timeseries(
        start=start,
        end=start_dt + partition_freq_dt,
        freq=freq,
        partition_freq=partition_freq,
        dtypes=dtypes,
        seed=seed,
        **kwargs,
    )
    
    #catch warning generated when computing using the threaded scheduler having active distributed ones 
    import warnings
    warnings.filterwarnings("ignore")
    
    p = example_part.compute(scheduler="threads")
    partition_size = sizeof(p)
    npartitions = round(target_nbytes / partition_size)
    assert npartitions > 0, (
        f"Partition size of {format_bytes(partition_size)} > "
        f"target size {format_bytes(target_nbytes)}"
    )

    ts = timeseries(
        start=start,
        end=start_dt + partition_freq_dt * npartitions,
        freq=freq,
        partition_freq=partition_freq,
        dtypes=dtypes,
        seed=seed,
        **kwargs,
    )
    assert ts.npartitions == npartitions
    return ts

In [None]:
memory = cluster_memory(client_old_def)
format_bytes(memory)

In [None]:
df = timeseries_of_size(
    memory // 2,
    start="2020-01-01",
    freq="600ms",
    partition_freq="12h",
    dtypes={i: float for i in range(100)},
)

df2 = timeseries_of_size(
    memory // 4,
    start="2010-01-01",
    freq="600ms",
    partition_freq="12h",
    dtypes={i: float for i in range(100)},
)


final = (df2 - df).mean()

In [None]:
print(f"Old default: {client_old_def.dashboard_link}")
print(f"New default with ws: {client_ws.dashboard_link}")

In [None]:
f_old = client_old_def.compute(final)
f_ws = client_ws.compute(final)

In [None]:
# %%time
# wait(client_old_def.compute(final))

In [None]:
# %%time
# wait(client_ws.compute(final))

### Old version
```python
%%time
wait(client_old_def.compute(final))


CPU times: user 408 ms, sys: 86.4 ms, total: 495 ms
Wall time: 1min 21s # 2min 55s #1min 44s #4min 2s
```

### New default
```python
%%time
wait(client_ws.compute(final))

CPU times: user 225 ms, sys: 52.2 ms, total: 277 ms
Wall time: 37.5 s

```

## ~2.5 - 6.5X faster


In [None]:
client_old_def.shutdown()
client_ws.shutdown()