# Prototype workflow to produce daily Sv files

In [1]:
from pathlib import Path
import datetime as dt

import xarray as xr

import echopype as ep
import gen_mvbs_utils

In [2]:
output_path = Path("./tmp_outputs/")
if not output_path.exists():
    output_path.mkdir()

In [3]:
ooi_CE04OSPS = (
    "https://rawdata.oceanobservatories.org/files/"
    "CE04OSPS/PC01B/ZPLSCB102_10.33.10.143/"
)

In [4]:
start = dt.datetime(year=2017, month=8, day=30)
end = dt.datetime(year=2017, month=8, day=30)

In [5]:
raw_file_list = gen_mvbs_utils.get_raw_file_url(
    file_url=ooi_CE04OSPS,
    start=start,
    end=end
)
len(raw_file_list)

15

## Convert individually and then combine

In [6]:
%%time
for raw_file in raw_file_list:
    ed = ep.open_raw(raw_file=raw_file, sonar_model="EK60")
    ed.to_zarr(save_path=output_path, overwrite=True)
    ds_Sv = ep.calibrate.compute_Sv(ed)
    Sv_fname = Path(raw_file).with_name(Path(raw_file).stem + "_Sv")
    ds_Sv = ds_Sv.chunk({dim: ds_Sv[dim].size for dim in ds_Sv.dims})
    ds_Sv.to_zarr(output_path / Sv_fname.with_suffix(".zarr").name, mode="w")

15:47:00  parsing file OOI-D20170830-T000000.raw, time of first ping: 2017-Aug-30 00:00:00
15:47:06  saving tmp_outputs/OOI-D20170830-T000000.zarr
15:47:09  parsing file OOI-D20170830-T013905.raw, time of first ping: 2017-Aug-30 01:39:05
15:47:14  saving tmp_outputs/OOI-D20170830-T013905.zarr
15:47:17  parsing file OOI-D20170830-T031813.raw, time of first ping: 2017-Aug-30 03:18:13
15:47:21  saving tmp_outputs/OOI-D20170830-T031813.zarr
15:47:25  parsing file OOI-D20170830-T045714.raw, time of first ping: 2017-Aug-30 04:57:14
15:47:29  saving tmp_outputs/OOI-D20170830-T045714.zarr
15:47:33  parsing file OOI-D20170830-T063625.raw, time of first ping: 2017-Aug-30 06:36:25
15:47:38  saving tmp_outputs/OOI-D20170830-T063625.zarr
15:47:41  parsing file OOI-D20170830-T081527.raw, time of first ping: 2017-Aug-30 08:15:27
15:47:48  saving tmp_outputs/OOI-D20170830-T081527.zarr
15:47:52  parsing file OOI-D20170830-T095428.raw, time of first ping: 2017-Aug-30 09:54:28
15:47:57  saving tmp_output

In [7]:
Sv_zarr_list = list((output_path).glob("*_Sv.zarr"))
Sv_zarr_list

[PosixPath('tmp_outputs/OOI-D20170830-T194851_Sv.zarr'),
 PosixPath('tmp_outputs/OOI-D20170830-T230703_Sv.zarr'),
 PosixPath('tmp_outputs/OOI-D20170830-T131240_Sv.zarr'),
 PosixPath('tmp_outputs/OOI-D20170830-T212802_Sv.zarr'),
 PosixPath('tmp_outputs/OOI-D20170830-T113335_Sv.zarr'),
 PosixPath('tmp_outputs/OOI-D20170830-T063625_Sv.zarr'),
 PosixPath('tmp_outputs/OOI-D20170830-T081527_Sv.zarr'),
 PosixPath('tmp_outputs/OOI-D20170830-T180944_Sv.zarr'),
 PosixPath('tmp_outputs/OOI-D20170830-T095428_Sv.zarr'),
 PosixPath('tmp_outputs/OOI-D20170830-T013905_Sv.zarr'),
 PosixPath('tmp_outputs/OOI-D20170830-T045714_Sv.zarr'),
 PosixPath('tmp_outputs/OOI-D20170830-T000000_Sv.zarr'),
 PosixPath('tmp_outputs/OOI-D20170830-T163043_Sv.zarr'),
 PosixPath('tmp_outputs/OOI-D20170830-T031813_Sv.zarr'),
 PosixPath('tmp_outputs/OOI-D20170830-T145142_Sv.zarr')]

In [8]:
def replace_time3(ds):
    """
    Replace coorindate time3 with ping_time since for EK60 these are the same.
    """
    ds["water_level"] = (
        ds["water_level"]
        .assign_coords({"ping_time": ("time3", ds["ping_time"].values)})
        .swap_dims({"time3": "ping_time"}).drop("time3")
    )
    ds = ds.drop_dims("time3")
    return ds

In [9]:
ds_Sv_zarr_all = xr.open_mfdataset(
    Sv_zarr_list,
    engine="zarr",
    preprocess=replace_time3,
    data_vars="minimal",
    coords="minimal",
    compat="override"  # this can be removed if filenames is a coordinate
)

In [10]:
ds_Sv_zarr_all["Sv"]

Unnamed: 0,Array,Chunk
Bytes,2.06 GiB,145.35 MiB
Shape,"(3, 85795, 1072)","(3, 5924, 1072)"
Count,45 Tasks,15 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 2.06 GiB 145.35 MiB Shape (3, 85795, 1072) (3, 5924, 1072) Count 45 Tasks 15 Chunks Type float64 numpy.ndarray",1072  85795  3,

Unnamed: 0,Array,Chunk
Bytes,2.06 GiB,145.35 MiB
Shape,"(3, 85795, 1072)","(3, 5924, 1072)"
Count,45 Tasks,15 Chunks
Type,float64,numpy.ndarray


In [11]:
ds_Sv_zarr_all["Sv"].chunks

((3,),
 (5923,
  5924,
  5923,
  5924,
  5923,
  5923,
  5924,
  5923,
  5923,
  5923,
  5923,
  5924,
  5924,
  5923,
  2868),
 (1072,))

### Save to a single file using dask

In [12]:
from dask.distributed import Client

In [13]:
client = Client()

2022-06-28 15:49:02,739 - distributed.diskutils - INFO - Found stale lock file and directory '/Users/wu-jung/code_git/ooi-nmf-workflow/dask-worker-space/worker-f51f3vr8', purging
2022-06-28 15:49:02,739 - distributed.diskutils - INFO - Found stale lock file and directory '/Users/wu-jung/code_git/ooi-nmf-workflow/dask-worker-space/worker-ngyl7_vl', purging
2022-06-28 15:49:02,740 - distributed.diskutils - INFO - Found stale lock file and directory '/Users/wu-jung/code_git/ooi-nmf-workflow/dask-worker-space/worker-gb2b2au2', purging


In [14]:
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 8,Total memory: 16.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:62484,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 8
Started: Just now,Total memory: 16.00 GiB

0,1
Comm: tcp://127.0.0.1:62502,Total threads: 2
Dashboard: http://127.0.0.1:62505/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:62488,
Local directory: /Users/wu-jung/code_git/ooi-nmf-workflow/dask-worker-space/worker-zdn18q3y,Local directory: /Users/wu-jung/code_git/ooi-nmf-workflow/dask-worker-space/worker-zdn18q3y

0,1
Comm: tcp://127.0.0.1:62501,Total threads: 2
Dashboard: http://127.0.0.1:62506/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:62490,
Local directory: /Users/wu-jung/code_git/ooi-nmf-workflow/dask-worker-space/worker-ta5o5_62,Local directory: /Users/wu-jung/code_git/ooi-nmf-workflow/dask-worker-space/worker-ta5o5_62

0,1
Comm: tcp://127.0.0.1:62499,Total threads: 2
Dashboard: http://127.0.0.1:62504/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:62487,
Local directory: /Users/wu-jung/code_git/ooi-nmf-workflow/dask-worker-space/worker-kx81fqe1,Local directory: /Users/wu-jung/code_git/ooi-nmf-workflow/dask-worker-space/worker-kx81fqe1

0,1
Comm: tcp://127.0.0.1:62500,Total threads: 2
Dashboard: http://127.0.0.1:62503/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:62489,
Local directory: /Users/wu-jung/code_git/ooi-nmf-workflow/dask-worker-space/worker-97tbcn59,Local directory: /Users/wu-jung/code_git/ooi-nmf-workflow/dask-worker-space/worker-97tbcn59


In [15]:
ds_Sv_zarr_all = ds_Sv_zarr_all.chunk(
    {
        "channel": 3,
        "ping_time": 25000,
        "range_sample": 1072
    }
)

In [16]:
# This helps with saving to zarr
# otherwise encoding chunks and dask chunks are different and raise errors
for var in ds_Sv_zarr_all.data_vars:
    ds_Sv_zarr_all[var].encoding = {}

In [17]:
ds_Sv_zarr_all["Sv"]

Unnamed: 0,Array,Chunk
Bytes,2.06 GiB,613.40 MiB
Shape,"(3, 85795, 1072)","(3, 25000, 1072)"
Count,55 Tasks,4 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 2.06 GiB 613.40 MiB Shape (3, 85795, 1072) (3, 25000, 1072) Count 55 Tasks 4 Chunks Type float64 numpy.ndarray",1072  85795  3,

Unnamed: 0,Array,Chunk
Bytes,2.06 GiB,613.40 MiB
Shape,"(3, 85795, 1072)","(3, 25000, 1072)"
Count,55 Tasks,4 Chunks
Type,float64,numpy.ndarray


In [18]:
ds_Sv_zarr_all

Unnamed: 0,Array,Chunk
Bytes,2.06 GiB,613.40 MiB
Shape,"(3, 85795, 1072)","(3, 25000, 1072)"
Count,55 Tasks,4 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 2.06 GiB 613.40 MiB Shape (3, 85795, 1072) (3, 25000, 1072) Count 55 Tasks 4 Chunks Type float64 numpy.ndarray",1072  85795  3,

Unnamed: 0,Array,Chunk
Bytes,2.06 GiB,613.40 MiB
Shape,"(3, 85795, 1072)","(3, 25000, 1072)"
Count,55 Tasks,4 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,2.06 GiB,613.40 MiB
Shape,"(3, 85795, 1072)","(3, 25000, 1072)"
Count,55 Tasks,4 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 2.06 GiB 613.40 MiB Shape (3, 85795, 1072) (3, 25000, 1072) Count 55 Tasks 4 Chunks Type float64 numpy.ndarray",1072  85795  3,

Unnamed: 0,Array,Chunk
Bytes,2.06 GiB,613.40 MiB
Shape,"(3, 85795, 1072)","(3, 25000, 1072)"
Count,55 Tasks,4 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,1.96 MiB,585.94 kiB
Shape,"(3, 85795)","(3, 25000)"
Count,55 Tasks,4 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 1.96 MiB 585.94 kiB Shape (3, 85795) (3, 25000) Count 55 Tasks 4 Chunks Type float64 numpy.ndarray",85795  3,

Unnamed: 0,Array,Chunk
Bytes,1.96 MiB,585.94 kiB
Shape,"(3, 85795)","(3, 25000)"
Count,55 Tasks,4 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,24 B,24 B
Shape,"(3,)","(3,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 24 B 24 B Shape (3,) (3,) Count 2 Tasks 1 Chunks Type float64 numpy.ndarray",3  1,

Unnamed: 0,Array,Chunk
Bytes,24 B,24 B
Shape,"(3,)","(3,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,1.96 MiB,585.94 kiB
Shape,"(85795, 3)","(25000, 3)"
Count,55 Tasks,4 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 1.96 MiB 585.94 kiB Shape (85795, 3) (25000, 3) Count 55 Tasks 4 Chunks Type float64 numpy.ndarray",3  85795,

Unnamed: 0,Array,Chunk
Bytes,1.96 MiB,585.94 kiB
Shape,"(85795, 3)","(25000, 3)"
Count,55 Tasks,4 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,1.96 MiB,585.94 kiB
Shape,"(85795, 3)","(25000, 3)"
Count,55 Tasks,4 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 1.96 MiB 585.94 kiB Shape (85795, 3) (25000, 3) Count 55 Tasks 4 Chunks Type float64 numpy.ndarray",3  85795,

Unnamed: 0,Array,Chunk
Bytes,1.96 MiB,585.94 kiB
Shape,"(85795, 3)","(25000, 3)"
Count,55 Tasks,4 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,1.96 MiB,585.94 kiB
Shape,"(3, 85795)","(3, 25000)"
Count,55 Tasks,4 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 1.96 MiB 585.94 kiB Shape (3, 85795) (3, 25000) Count 55 Tasks 4 Chunks Type float64 numpy.ndarray",85795  3,

Unnamed: 0,Array,Chunk
Bytes,1.96 MiB,585.94 kiB
Shape,"(3, 85795)","(3, 25000)"
Count,55 Tasks,4 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,1.96 MiB,585.94 kiB
Shape,"(3, 85795)","(3, 25000)"
Count,55 Tasks,4 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 1.96 MiB 585.94 kiB Shape (3, 85795) (3, 25000) Count 55 Tasks 4 Chunks Type float64 numpy.ndarray",85795  3,

Unnamed: 0,Array,Chunk
Bytes,1.96 MiB,585.94 kiB
Shape,"(3, 85795)","(3, 25000)"
Count,55 Tasks,4 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,480 B,480 B
Shape,"(1,)","(1,)"
Count,2 Tasks,1 Chunks
Type,numpy.ndarray,
"Array Chunk Bytes 480 B 480 B Shape (1,) (1,) Count 2 Tasks 1 Chunks Type numpy.ndarray",1  1,

Unnamed: 0,Array,Chunk
Bytes,480 B,480 B
Shape,"(1,)","(1,)"
Count,2 Tasks,1 Chunks
Type,numpy.ndarray,

Unnamed: 0,Array,Chunk
Bytes,1.96 MiB,585.94 kiB
Shape,"(3, 85795)","(3, 25000)"
Count,55 Tasks,4 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 1.96 MiB 585.94 kiB Shape (3, 85795) (3, 25000) Count 55 Tasks 4 Chunks Type float64 numpy.ndarray",85795  3,

Unnamed: 0,Array,Chunk
Bytes,1.96 MiB,585.94 kiB
Shape,"(3, 85795)","(3, 25000)"
Count,55 Tasks,4 Chunks
Type,float64,numpy.ndarray


In [19]:
%%time
ds_Sv_zarr_all.to_zarr(
    output_path / "combined_Sv_test.zarr",
    mode="w",
)

CPU times: user 2.86 s, sys: 1.03 s, total: 3.89 s
Wall time: 31 s


<xarray.backends.zarr.ZarrStore at 0x16351cf90>

## Try to append zarr file

In [20]:
%%time
for seq, raw_file in enumerate(raw_file_list):
    ed = ep.open_raw(raw_file=raw_file, sonar_model="EK60")
    ed.to_zarr(save_path=output_path, overwrite=True)
    ds_Sv = ep.calibrate.compute_Sv(ed)
    # Change time3 to ping_time (the same for EK60)
    ds_Sv["water_level"] = (
        ds_Sv["water_level"]
        .assign_coords({"ping_time": ("time3", ds_Sv["ping_time"].values)})
        .swap_dims({"time3": "ping_time"}).drop("time3")
    )
    ds_Sv = ds_Sv.drop_dims("time3")
    # Assign chunk size
    ds_Sv = ds_Sv.chunk({dim: ds_Sv[dim].size for dim in ds_Sv.dims})
    if seq == 0:
        ds_Sv.to_zarr(output_path / "append_test.zarr", mode="w")
    else:
        ds_Sv.to_zarr(output_path / "append_test.zarr", mode="a", append_dim="ping_time")

15:49:35  parsing file OOI-D20170830-T000000.raw, time of first ping: 2017-Aug-30 00:00:00
15:49:39  overwriting tmp_outputs/OOI-D20170830-T000000.zarr
15:49:44  parsing file OOI-D20170830-T013905.raw, time of first ping: 2017-Aug-30 01:39:05
15:49:49  overwriting tmp_outputs/OOI-D20170830-T013905.zarr
15:49:53  parsing file OOI-D20170830-T031813.raw, time of first ping: 2017-Aug-30 03:18:13
15:49:57  overwriting tmp_outputs/OOI-D20170830-T031813.zarr
15:50:02  parsing file OOI-D20170830-T045714.raw, time of first ping: 2017-Aug-30 04:57:14
15:50:06  overwriting tmp_outputs/OOI-D20170830-T045714.zarr
15:50:11  parsing file OOI-D20170830-T063625.raw, time of first ping: 2017-Aug-30 06:36:25
15:50:17  overwriting tmp_outputs/OOI-D20170830-T063625.zarr
15:50:25  parsing file OOI-D20170830-T081527.raw, time of first ping: 2017-Aug-30 08:15:27
15:50:31  overwriting tmp_outputs/OOI-D20170830-T081527.zarr
15:50:35  parsing file OOI-D20170830-T095428.raw, time of first ping: 2017-Aug-30 09:54:

In [21]:
ds_append = xr.open_dataset(output_path / "append_test.zarr", engine="zarr")

In [22]:
ds_append