In [26]:
import dask
import json
import zarr
import numpy as np
from ome_zarr.writer import write_image
from ome_zarr.scale import Scaler

In [27]:
downsampling='4'
in_zarr=f'results/sub-mouse1/micr/sub-mouse1_acq-1x_desc-raw_sample-brain_downsampling-{downsampling}x_stain-autof_fusedspim.zarr'
metadata_json='results/sub-mouse1/micr/sub-mouse1_acq-1x_sample-brain_spim.metadata.json'
max_layer=4 #number of downsamplings by 2 to include in zarr
rechunk_size=(128,256,256)
out_zarr='results/sub-mouse1/micr/sub-mouse1_acq-1x_desc-raw_sample-brain_downsampling-4x_stain-autof_fusedspim.ome.zarr'


In [28]:
#open zarr to get group name
zi = zarr.open(in_zarr)
group_name = [g for g in zi.group_keys()][0]

da = dask.array.from_zarr(in_zarr,component=f'{group_name}/s0',chunks=rechunk_size)
da

Unnamed: 0,Array,Chunk
Bytes,3.10 GiB,16.00 MiB
Shape,"(602, 1808, 1529)","(128, 256, 256)"
Dask graph,240 chunks in 2 graph layers,240 chunks in 2 graph layers
Data type,>u2 numpy.ndarray,>u2 numpy.ndarray
"Array Chunk Bytes 3.10 GiB 16.00 MiB Shape (602, 1808, 1529) (128, 256, 256) Dask graph 240 chunks in 2 graph layers Data type >u2 numpy.ndarray",1529  1808  602,

Unnamed: 0,Array,Chunk
Bytes,3.10 GiB,16.00 MiB
Shape,"(602, 1808, 1529)","(128, 256, 256)"
Dask graph,240 chunks in 2 graph layers,240 chunks in 2 graph layers
Data type,>u2 numpy.ndarray,>u2 numpy.ndarray


In [29]:
# prepare metadata for ome-zarr

with open(metadata_json) as fp:
    metadata = json.load(fp)

voxdim = float(metadata['physical_size_x']) * float(downsampling) / 1000.0

coordinate_transformations = []
#for each resolution (dataset), we have a list of dicts, transformations to apply.. 
#in this case just a single one (scaling by voxel size)

for l in range(max_layer+1):
    
    coordinate_transformations.append( [{'scale': [(2**l)*voxdim for i in range(3)],
                                            'type': 'scale'}]) #isotropic 


axes =  [  {'name': ax, 'type': 'space', 'unit': 'micrometer'} for ax in ['z','y','x'] ] 


In [30]:

store = zarr.DirectoryStore(out_zarr)
root = zarr.group(store,path='/fused',overwrite=True)
scaler = Scaler(max_layer=max_layer)

In [31]:
dasks = write_image(image=da,
                            group=root,
                            scaler=scaler,
                            coordinate_transformations=coordinate_transformations,
                            axes=axes,
                           compute=False)

In [32]:
from dask.distributed import Client, progress
client = Client(threads_per_worker=2, n_workers=6)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 36279 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:36279/status,

0,1
Dashboard: http://127.0.0.1:36279/status,Workers: 6
Total threads: 12,Total memory: 47.05 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:45533,Workers: 6
Dashboard: http://127.0.0.1:36279/status,Total threads: 12
Started: Just now,Total memory: 47.05 GiB

0,1
Comm: tcp://127.0.0.1:43447,Total threads: 2
Dashboard: http://127.0.0.1:46773/status,Memory: 7.84 GiB
Nanny: tcp://127.0.0.1:44189,
Local directory: /tmp/dask-scratch-space/worker-3a02np_y,Local directory: /tmp/dask-scratch-space/worker-3a02np_y

0,1
Comm: tcp://127.0.0.1:35473,Total threads: 2
Dashboard: http://127.0.0.1:34623/status,Memory: 7.84 GiB
Nanny: tcp://127.0.0.1:46357,
Local directory: /tmp/dask-scratch-space/worker-5p5l9ftf,Local directory: /tmp/dask-scratch-space/worker-5p5l9ftf

0,1
Comm: tcp://127.0.0.1:42293,Total threads: 2
Dashboard: http://127.0.0.1:39079/status,Memory: 7.84 GiB
Nanny: tcp://127.0.0.1:40267,
Local directory: /tmp/dask-scratch-space/worker-j5c_9o3h,Local directory: /tmp/dask-scratch-space/worker-j5c_9o3h

0,1
Comm: tcp://127.0.0.1:41421,Total threads: 2
Dashboard: http://127.0.0.1:33749/status,Memory: 7.84 GiB
Nanny: tcp://127.0.0.1:40961,
Local directory: /tmp/dask-scratch-space/worker-lk5y5i4l,Local directory: /tmp/dask-scratch-space/worker-lk5y5i4l

0,1
Comm: tcp://127.0.0.1:39081,Total threads: 2
Dashboard: http://127.0.0.1:38827/status,Memory: 7.84 GiB
Nanny: tcp://127.0.0.1:36421,
Local directory: /tmp/dask-scratch-space/worker-2goabtej,Local directory: /tmp/dask-scratch-space/worker-2goabtej

0,1
Comm: tcp://127.0.0.1:41305,Total threads: 2
Dashboard: http://127.0.0.1:44837/status,Memory: 7.84 GiB
Nanny: tcp://127.0.0.1:36951,
Local directory: /tmp/dask-scratch-space/worker-uqcvmmyn,Local directory: /tmp/dask-scratch-space/worker-uqcvmmyn


In [33]:
compute_status = dask.compute(*dasks)

In [34]:
zo = zarr.open(out_zarr)

In [35]:
zo['/'].info

0,1
Name,/
Type,zarr.hierarchy.Group
Read-only,False
Store type,zarr.storage.DirectoryStore
No. members,1
No. arrays,0
No. groups,1
Groups,fused


In [36]:
zo['fused'].attrs.asdict()

{'multiscales': [{'axes': [{'name': 'z',
     'type': 'space',
     'unit': 'micrometer'},
    {'name': 'y', 'type': 'space', 'unit': 'micrometer'},
    {'name': 'x', 'type': 'space', 'unit': 'micrometer'}],
   'datasets': [{'coordinateTransformations': [{'scale': [0.010833332000000001,
        0.010833332000000001,
        0.010833332000000001],
       'type': 'scale'}],
     'path': '0'},
    {'coordinateTransformations': [{'scale': [0.021666664000000002,
        0.021666664000000002,
        0.021666664000000002],
       'type': 'scale'}],
     'path': '1'},
    {'coordinateTransformations': [{'scale': [0.043333328000000004,
        0.043333328000000004,
        0.043333328000000004],
       'type': 'scale'}],
     'path': '2'},
    {'coordinateTransformations': [{'scale': [0.08666665600000001,
        0.08666665600000001,
        0.08666665600000001],
       'type': 'scale'}],
     'path': '3'},
    {'coordinateTransformations': [{'scale': [0.17333331200000002,
        0.1733333120

In [144]:
zo.groups()

<generator object Group.groups at 0x7f0b09211900>