### Running a single-machine cluster
When run on a single machine, this is equivalent to the dask 'processes' scheduler, with the added addition of a handy status dashboard. It's accessible by web browser via the link below. (The local cluster server can be set up and run outside the notebook for persistence.)

In [1]:
from dask import distributed

CLUSTER_SETTINGS = dict(
    # limiting this can help to avoid system hangs caused by google drivefs
    n_workers=8,
    threads_per_worker=1,
    # host=f"tcp://127.0.0.1:8786",
    dashboard_address=f"127.0.0.1:8787",
)

# cluster = distributed.LocalCluster(**CLUSTER_SETTINGS)
if "scheduler" not in dir():
    cluster = distributed.LocalCluster(**CLUSTER_SETTINGS)
    scheduler = distributed.Client(cluster.scheduler_address)

cluster

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 8
Total threads: 8,Total memory: 31.73 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:51162,Workers: 8
Dashboard: http://127.0.0.1:8787/status,Total threads: 8
Started: Just now,Total memory: 31.73 GiB

0,1
Comm: tcp://127.0.0.1:51193,Total threads: 1
Dashboard: http://127.0.0.1:51205/status,Memory: 3.97 GiB
Nanny: tcp://127.0.0.1:51165,
Local directory: C:\Users\AROMAN~1\AppData\Local\Temp\1\dask-scratch-space\worker-ila62pog,Local directory: C:\Users\AROMAN~1\AppData\Local\Temp\1\dask-scratch-space\worker-ila62pog

0,1
Comm: tcp://127.0.0.1:51201,Total threads: 1
Dashboard: http://127.0.0.1:51213/status,Memory: 3.97 GiB
Nanny: tcp://127.0.0.1:51166,
Local directory: C:\Users\AROMAN~1\AppData\Local\Temp\1\dask-scratch-space\worker-khyzukul,Local directory: C:\Users\AROMAN~1\AppData\Local\Temp\1\dask-scratch-space\worker-khyzukul

0,1
Comm: tcp://127.0.0.1:51202,Total threads: 1
Dashboard: http://127.0.0.1:51215/status,Memory: 3.97 GiB
Nanny: tcp://127.0.0.1:51167,
Local directory: C:\Users\AROMAN~1\AppData\Local\Temp\1\dask-scratch-space\worker-pl_iqtks,Local directory: C:\Users\AROMAN~1\AppData\Local\Temp\1\dask-scratch-space\worker-pl_iqtks

0,1
Comm: tcp://127.0.0.1:51204,Total threads: 1
Dashboard: http://127.0.0.1:51217/status,Memory: 3.97 GiB
Nanny: tcp://127.0.0.1:51168,
Local directory: C:\Users\AROMAN~1\AppData\Local\Temp\1\dask-scratch-space\worker-rltbyd_r,Local directory: C:\Users\AROMAN~1\AppData\Local\Temp\1\dask-scratch-space\worker-rltbyd_r

0,1
Comm: tcp://127.0.0.1:51196,Total threads: 1
Dashboard: http://127.0.0.1:51207/status,Memory: 3.97 GiB
Nanny: tcp://127.0.0.1:51169,
Local directory: C:\Users\AROMAN~1\AppData\Local\Temp\1\dask-scratch-space\worker-g71dpe3q,Local directory: C:\Users\AROMAN~1\AppData\Local\Temp\1\dask-scratch-space\worker-g71dpe3q

0,1
Comm: tcp://127.0.0.1:51199,Total threads: 1
Dashboard: http://127.0.0.1:51210/status,Memory: 3.97 GiB
Nanny: tcp://127.0.0.1:51170,
Local directory: C:\Users\AROMAN~1\AppData\Local\Temp\1\dask-scratch-space\worker-h4jbiyzx,Local directory: C:\Users\AROMAN~1\AppData\Local\Temp\1\dask-scratch-space\worker-h4jbiyzx

0,1
Comm: tcp://127.0.0.1:51203,Total threads: 1
Dashboard: http://127.0.0.1:51219/status,Memory: 3.97 GiB
Nanny: tcp://127.0.0.1:51171,
Local directory: C:\Users\AROMAN~1\AppData\Local\Temp\1\dask-scratch-space\worker-3nveu3yc,Local directory: C:\Users\AROMAN~1\AppData\Local\Temp\1\dask-scratch-space\worker-3nveu3yc

0,1
Comm: tcp://127.0.0.1:51200,Total threads: 1
Dashboard: http://127.0.0.1:51209/status,Memory: 3.97 GiB
Nanny: tcp://127.0.0.1:51172,
Local directory: C:\Users\AROMAN~1\AppData\Local\Temp\1\dask-scratch-space\worker-pt0s1tgc,Local directory: C:\Users\AROMAN~1\AppData\Local\Temp\1\dask-scratch-space\worker-pt0s1tgc


### Creating dask dataframe objects
The following assumes that you've downloaded the specified zip archive at the specified path.

In [8]:
from sea_ingest.dask_ops import log_to_json

# this is a hack for working inside the repo source tree;
# normally, for pip install, just `import sea_ingest`
import sea_ingest
from labbench import stopwatch
import pandas as pd
import typing
import dask

data_path = "data/2023-09-25_GMM.zip"
data_timezone = "America/Denver"


def capture_summary(partition_data: typing.Dict[str, pd.DataFrame]):
    """this could be expanded into a function that does more, but for now, just make a capture summary"""

    # partition_data is the result of running zipfile.read_seamf_zipfile on a subset of files in the zip archive.
    # it takes the form of a dictionary of pandas.DataFrame objects
    
    # The below assumes metadata version 4 or above when including the 
    # calibration temperature. Remove the temperature key if running
    # this code using older data.

    partition_data["capture_summary"] = pd.DataFrame.from_dict(
        {
            "median_rms_pfp": sea_ingest.trace(
                partition_data, "pfp", capture_statistic="mean", detector="rms"
            ).median(axis=1),
            "max_max_pfp": sea_ingest.trace(
                partition_data, "pfp", capture_statistic="max", detector="peak"
            ).max(axis=1),
            "median_mean_power": sea_ingest.trace(
                partition_data, "psd", capture_statistic="mean"
            ).median(axis=1),
            "max_max_power": sea_ingest.trace(
                partition_data, "psd", capture_statistic="mean"
            ).max(axis=1),
            "noise_figure": sea_ingest.trace(partition_data, "channel_metadata")[
                "cal_noise_figure_dB"
            ].astype("float16"),
            "gain": sea_ingest.trace(partition_data, "channel_metadata")[
                "cal_gain_dB"
            ].astype("float16"),
            "calibration_temperature": sea_ingest.trace(partition_data, "channel_metadata")[
                "cal_temperature_degC"
            ].astype("float16"),
        }
    )

    return partition_data

# overwrite
log_to_json("data.log")

with dask.config.set(scheduler=cluster.scheduler), stopwatch("setup"):
    # scan the zipfiles to map out the file contents. the resulting is a dictionary of dask dataframes.
    # this is fast, because it hasn't loaded much data yet; other operations trigger "compute" that can
    # take a while to scrape the data out of the zip archive
    ddfs = sea_ingest.read_seamf_zipfile_as_ddf(
        data_path,
        partition_func=capture_summary,
        partition_size=200,
        tz=data_timezone,
        localize=False,
    )

  if isinstance(o, (numpy.bool, numpy.bool_)):
[1;30m INFO  [0m [32m2023-10-12 14:16:07.812[0m • [34mlabbench:[0m setup 0.668 s elapsed


In [9]:
with stopwatch("compute"):
    # head forces a compute operation
    df = ddfs["capture_summary"].head(10)
df

[1;30m INFO  [0m [32m2023-10-12 14:16:15.599[0m • [34mlabbench:[0m compute 6.243 s elapsed


Unnamed: 0_level_0,frequency,median_rms_pfp,max_max_pfp,median_mean_power,max_max_power,noise_figure,gain,calibration_temperature
datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
2023-09-17 17:59:06.672000+00:00,3545000000.0,-99.25,-83.6875,-169.25,-169.0,4.933594,29.84375,29.09375
2023-09-17 17:59:11.302000+00:00,3555000000.0,-81.1875,-59.15625,-150.625,-147.75,4.601562,30.125,29.09375
2023-09-17 17:59:17.219000+00:00,3565000000.0,-80.875,-60.125,-150.0,-147.75,4.300781,30.546875,29.09375
2023-09-17 17:59:21.862000+00:00,3575000000.0,-78.0,-57.25,-147.75,-144.875,4.308594,30.4375,29.09375
2023-09-17 17:59:26.955000+00:00,3585000000.0,-76.375,-56.3125,-146.625,-144.75,4.160156,30.609375,29.09375
2023-09-17 17:59:32.280000+00:00,3595000000.0,-81.25,-59.65625,-150.625,-147.75,4.164062,30.71875,29.09375
2023-09-17 17:59:36.955000+00:00,3605000000.0,-77.125,-57.5,-147.625,-146.125,4.15625,30.734375,29.09375
2023-09-17 17:59:42.963000+00:00,3615000000.0,-88.625,-67.4375,-157.75,-155.75,4.234375,30.625,29.09375
2023-09-17 17:59:47.656000+00:00,3625000000.0,-86.375,-67.4375,-156.5,-154.0,4.265625,30.421875,29.09375
2023-09-17 17:59:52.339000+00:00,3635000000.0,-88.375,-64.25,-156.375,-152.0,4.105469,30.6875,29.09375


#### on-demand access to a subset of the data
The dask .loc accessor allows faster access to a specified subset of the data

In [10]:
with stopwatch("compute"):
    df = (
        ddfs["capture_summary"]
        .loc["2023-09-21 20:50:00":"2023-09-21 20:51:00"]
        .compute(scheduler=scheduler)
    )

df

[1;30m INFO  [0m [32m2023-10-12 14:16:45.044[0m • [34mlabbench:[0m compute 16.592 s elapsed


Unnamed: 0_level_0,frequency,median_rms_pfp,max_max_pfp,median_mean_power,max_max_power,noise_figure,gain,calibration_temperature
datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
2023-09-21 20:50:03.672000+00:00,3705000000.0,-58.15625,-40.875,-129.0,-127.8125,5.449219,28.84375,28.203125
2023-09-21 20:50:13.521000+00:00,3545000000.0,-99.25,-86.625,-169.25,-169.125,4.902344,29.890625,28.203125
2023-09-21 20:50:18.148000+00:00,3555000000.0,-78.875,-59.53125,-148.75,-146.625,4.574219,30.171875,28.203125
2023-09-21 20:50:24.139000+00:00,3565000000.0,-78.25,-60.90625,-148.0,-146.625,4.277344,30.59375,28.203125
2023-09-21 20:50:28.811000+00:00,3575000000.0,-74.5,-56.75,-144.875,-143.375,4.28125,30.484375,28.203125
2023-09-21 20:50:33.911000+00:00,3585000000.0,-76.375,-57.40625,-146.5,-145.125,4.125,30.65625,28.203125
2023-09-21 20:50:39.263000+00:00,3595000000.0,-81.5,-60.0625,-150.75,-148.25,4.132812,30.75,28.203125
2023-09-21 20:50:43.943000+00:00,3605000000.0,-79.375,-58.1875,-149.5,-146.5,4.128906,30.78125,28.203125
2023-09-21 20:50:49.887000+00:00,3615000000.0,-89.9375,-69.625,-159.875,-157.125,4.203125,30.671875,28.203125
2023-09-21 20:50:54.556000+00:00,3625000000.0,-89.875,-68.5,-159.0,-154.375,4.238281,30.46875,28.203125


#### Bulk data
Dask dataframes support data save operations split output files by partition. This means that the time span of each output file can be adjusted with 'repartition'. In this example, a file save function is applied to each partition.

In [14]:
import dask
from pathlib import Path


def write_feather(df: pd.DataFrame, dirpath):
    """example for feather, but could substitute code for e.g. csv, database, etc. instead"""
    path = Path(dirpath) / df.index[0].strftime("%Y-%m-%d.feather")

    # feather like most formats requires string column names
    df.columns = df.columns.astype("str")
    df.reset_index().to_feather(path, compression="zstd")
    return str(path)


def write_pickle(df: pd.DataFrame, dirpath):
    """example for pickles"""

    path = Path(dirpath) / df.index[0].strftime("%Y-%m-%d.p.zstd")
    df.to_pickle(path)
    return str(path)


# collecting the write operations into a single options
# allows dask to optimize the execution so that the zip archives
# only need to be read once.
#
# the choice of dictionary means that the returned list of
# files written will be returned as a dictionary with the same keys.
#
# Note that running the code below requires directories "capture_summary"
# and "pfp" to exist within your "data" directory.
ddf = ddfs["capture_summary"]

save_ops = dict(
    capture_summary=(
        ddfs["capture_summary"]
        .repartition(freq="1MS")
        .map_partitions(write_pickle, "data/capture_summary")
    ),
    pfp=(ddfs["pfp"].repartition(freq="1W").map_partitions(write_pickle, "data/pfp")),
)

with stopwatch("compute"):
    files_saved = dask.compute(save_ops, scheduler=scheduler)

    print("wrote the following files:")
    print(files_saved)

[1;30m INFO  [0m [32m2023-10-12 14:25:46.141[0m • [34mlabbench:[0m compute 47.410 s elapsed


wrote the following files:
({'capture_summary': 0    data\capture_summary\2023-09-17.p.zstd
dtype: object, 'pfp': 0    data\pfp\2023-09-17.p.zstd
dtype: object},)
