In [None]:
import numpy as np
import zarr
import cupy as cp
from itertools import product

from decorrelation.shp import ks_test
from decorrelation.co import emperical_co_sp
from decorrelation.pl import emi

In [None]:
from dask import array as da
from dask import delayed
from dask.distributed import Client, LocalCluster
from dask_cuda import LocalCUDACluster

In [None]:
cluster = LocalCUDACluster()
client = Client(cluster)
client

2023-04-12 15:46:27,548 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2023-04-12 15:46:27,548 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2023-04-12 15:46:27,548 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2023-04-12 15:46:27,548 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize


0,1
Connection method: Cluster object,Cluster type: dask_cuda.LocalCUDACluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 2
Total threads: 2,Total memory: 100.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:35802,Workers: 2
Dashboard: http://127.0.0.1:8787/status,Total threads: 2
Started: Just now,Total memory: 100.00 GiB

0,1
Comm: tcp://127.0.0.1:43875,Total threads: 1
Dashboard: http://127.0.0.1:43682/status,Memory: 50.00 GiB
Nanny: tcp://127.0.0.1:33124,
Local directory: /tmp/dask-worker-space/worker-y2n75el4,Local directory: /tmp/dask-worker-space/worker-y2n75el4
GPU: Tesla V100-SXM2-32GB,GPU memory: 31.75 GiB

0,1
Comm: tcp://127.0.0.1:45311,Total threads: 1
Dashboard: http://127.0.0.1:34893/status,Memory: 50.00 GiB
Nanny: tcp://127.0.0.1:32823,
Local directory: /tmp/dask-worker-space/worker-t0kxqwae,Local directory: /tmp/dask-worker-space/worker-t0kxqwae
GPU: Tesla V100-SXM2-32GB,GPU memory: 31.75 GiB


In this tutorial, we demostrate how to use `Dask` for distributed computing.

Two significant issues for InSAR big data processing are: 1) the memory of CPU/GPU does not fit the volume of very big data;
2) the processing speed is limited. For the first issue, one common solution is divide the data into independent chunks
and process the chunks one by one. In many case, the processing of chunks are independent. Thus the processing can be speeded
up by parallel processing.

`Dask` is a job scheduler that allows deploying process-level parallel processing.
Through the `Delayed` feature, `Dask` operations only construct the computing workflow rather than do the computation immediately.
All of the computations are done at the end to allow `Dask` better distributing computing resources for the task.
`Dask` makes the parallel processing easier and enable the decoupling of codes for computation and codes for scheduling.

In this demo, we use `Dask` for multi-GPU KS test. This includes:

- Read SLC data from disk to CPU and then transfer to GPU;
- Divide them into overlapped chunks;
- Apply the `ks_test` to each chunks;
- Trim the overlapping in the results;
- Transfer the results to CPU;
- Save data to disk.

## Processing

Load data into CPU, the chunk size is setted:

In [None]:
rslc_path = '../../data/rslc.zarr'
rslc_zarr = zarr.open(rslc_path,mode='r')
cpu_rslc = da.from_zarr(rslc_path,chunks=(1000,1000,17))

Then convert to GPU:

In [None]:
rslc = cpu_rslc.map_blocks(cp.asarray)

In [None]:
rslc

Unnamed: 0,Array,Chunk
Bytes,594.67 MiB,129.70 MiB
Shape,"(2500, 1834, 17)","(1000, 1000, 17)"
Dask graph,6 chunks in 3 graph layers,6 chunks in 3 graph layers
Data type,complex64 cupy.ndarray,complex64 cupy.ndarray
"Array Chunk Bytes 594.67 MiB 129.70 MiB Shape (2500, 1834, 17) (1000, 1000, 17) Dask graph 6 chunks in 3 graph layers Data type complex64 cupy.ndarray",17  1834  2500,

Unnamed: 0,Array,Chunk
Bytes,594.67 MiB,129.70 MiB
Shape,"(2500, 1834, 17)","(1000, 1000, 17)"
Dask graph,6 chunks in 3 graph layers,6 chunks in 3 graph layers
Data type,complex64 cupy.ndarray,complex64 cupy.ndarray


Prepare the `rmli` and processing parameters:

In [None]:
rmli = da.abs(rslc)**2
rmli

Unnamed: 0,Array,Chunk
Bytes,297.34 MiB,64.85 MiB
Shape,"(2500, 1834, 17)","(1000, 1000, 17)"
Dask graph,6 chunks in 5 graph layers,6 chunks in 5 graph layers
Data type,float32 cupy.ndarray,float32 cupy.ndarray
"Array Chunk Bytes 297.34 MiB 64.85 MiB Shape (2500, 1834, 17) (1000, 1000, 17) Dask graph 6 chunks in 5 graph layers Data type float32 cupy.ndarray",17  1834  2500,

Unnamed: 0,Array,Chunk
Bytes,297.34 MiB,64.85 MiB
Shape,"(2500, 1834, 17)","(1000, 1000, 17)"
Dask graph,6 chunks in 5 graph layers,6 chunks in 5 graph layers
Data type,float32 cupy.ndarray,float32 cupy.ndarray


In [None]:
az_half_win = 5
r_half_win = 5
az_win = 2*az_half_win+1
r_win = 2*r_half_win+1

Enable overlapping between chunks. This is because the KS test is conducted on small region (az_win*r_win).

In [None]:
depth = {0:az_half_win, 1:r_half_win, 2:0}; boundary = {0:'none',1:'none',2:'none'}
rmli_overlap = da.overlap.overlap(rmli,depth=depth, boundary=boundary)
rmli_overlap

Unnamed: 0,Array,Chunk
Bytes,301.35 MiB,65.83 MiB
Shape,"(2520, 1844, 17)","(1010, 1005, 17)"
Dask graph,6 chunks in 6 graph layers,6 chunks in 6 graph layers
Data type,float32 cupy.ndarray,float32 cupy.ndarray
"Array Chunk Bytes 301.35 MiB 65.83 MiB Shape (2520, 1844, 17) (1010, 1005, 17) Dask graph 6 chunks in 6 graph layers Data type float32 cupy.ndarray",17  1844  2520,

Unnamed: 0,Array,Chunk
Bytes,301.35 MiB,65.83 MiB
Shape,"(2520, 1844, 17)","(1010, 1005, 17)"
Dask graph,6 chunks in 6 graph layers,6 chunks in 6 graph layers
Data type,float32 cupy.ndarray,float32 cupy.ndarray


In [None]:
# the array of delayed object will lost some shape information so perserve them here
chunks_size = rmli_overlap.chunks 
nchunks = tuple(len(nchunks_in_each_dim) for nchunks_in_each_dim in chunks_size)
chunks_shape = list(product(*chunks_size))

In [None]:
chunks_size

((1005, 1010, 505), (1005, 839), (17,))

In [None]:
nchunks

(3, 2, 1)

In [None]:
chunks_shape

[(1005, 1005, 17),
 (1005, 839, 17),
 (1010, 1005, 17),
 (1010, 839, 17),
 (505, 1005, 17),
 (505, 839, 17)]

Convert each chunk to delayed object;

In [None]:
rmli_chunks = rmli_overlap.to_delayed()
rmli_chunks

array([[[Delayed(('overlap-deeb6bf02a78ef34f82e8a8fcbd6af40', 0, 0, 0))],
        [Delayed(('overlap-deeb6bf02a78ef34f82e8a8fcbd6af40', 0, 1, 0))]],

       [[Delayed(('overlap-deeb6bf02a78ef34f82e8a8fcbd6af40', 1, 0, 0))],
        [Delayed(('overlap-deeb6bf02a78ef34f82e8a8fcbd6af40', 1, 1, 0))]],

       [[Delayed(('overlap-deeb6bf02a78ef34f82e8a8fcbd6af40', 2, 0, 0))],
        [Delayed(('overlap-deeb6bf02a78ef34f82e8a8fcbd6af40', 2, 1, 0))]]],
      dtype=object)

Apply the KS test:

In [None]:
delayed_ks_test = delayed(ks_test,pure=True,nout=2)
results = [delayed_ks_test(rmli_chunk,az_half_win=az_half_win,r_half_win=r_half_win) for rmli_chunk in rmli_chunks.ravel()]
dist_chunks, p_chunks = zip(*results)
dist_chunks, p_chunks = np.array(dist_chunks), np.array(p_chunks)

In [None]:
dist_chunks

array([Delayed('getitem-f50335529b047b79b2c90cdef57d40fd'),
       Delayed('getitem-4ce12862641d4b5c6c4cc84e652dabc2'),
       Delayed('getitem-c0334ded367e1297dcd1fed323c211e8'),
       Delayed('getitem-da7af3e1e8721cc553402893fa56e087'),
       Delayed('getitem-237696b05fba3e9aec0a3bc835be6a4b'),
       Delayed('getitem-cac2e878f9a43c9f1da9233851482b57')], dtype=object)

Convert `Delayed` object to dask array:

In [None]:
for i in range(len(chunks_shape)):
    dist_chunks[i] = da.from_delayed(dist_chunks[i],shape=(*chunks_shape[i][:-1],az_win,r_win),meta=cp.array((),dtype=rmli.dtype))
    p_chunks[i] = da.from_delayed(p_chunks[i],shape=(*chunks_shape[i][:-1],az_win,r_win),meta=cp.array((),dtype=rmli.dtype))

In [None]:
dist_chunks

array([dask.array<from-value, shape=(1005, 1005, 11, 11), dtype=float32, chunksize=(1005, 1005, 11, 11), chunktype=cupy.ndarray>,
       dask.array<from-value, shape=(1005, 839, 11, 11), dtype=float32, chunksize=(1005, 839, 11, 11), chunktype=cupy.ndarray>,
       dask.array<from-value, shape=(1010, 1005, 11, 11), dtype=float32, chunksize=(1010, 1005, 11, 11), chunktype=cupy.ndarray>,
       dask.array<from-value, shape=(1010, 839, 11, 11), dtype=float32, chunksize=(1010, 839, 11, 11), chunktype=cupy.ndarray>,
       dask.array<from-value, shape=(505, 1005, 11, 11), dtype=float32, chunksize=(505, 1005, 11, 11), chunktype=cupy.ndarray>,
       dask.array<from-value, shape=(505, 839, 11, 11), dtype=float32, chunksize=(505, 839, 11, 11), chunktype=cupy.ndarray>],
      dtype=object)

Reshape and concentate the dask array:

In [None]:
dist_chunks, p_chunks = dist_chunks.reshape((*nchunks,1)).tolist(), p_chunks.reshape((*nchunks,1)).tolist()

In [None]:
dist_chunks

[[[[dask.array<from-value, shape=(1005, 1005, 11, 11), dtype=float32, chunksize=(1005, 1005, 11, 11), chunktype=cupy.ndarray>]],
  [[dask.array<from-value, shape=(1005, 839, 11, 11), dtype=float32, chunksize=(1005, 839, 11, 11), chunktype=cupy.ndarray>]]],
 [[[dask.array<from-value, shape=(1010, 1005, 11, 11), dtype=float32, chunksize=(1010, 1005, 11, 11), chunktype=cupy.ndarray>]],
  [[dask.array<from-value, shape=(1010, 839, 11, 11), dtype=float32, chunksize=(1010, 839, 11, 11), chunktype=cupy.ndarray>]]],
 [[[dask.array<from-value, shape=(505, 1005, 11, 11), dtype=float32, chunksize=(505, 1005, 11, 11), chunktype=cupy.ndarray>]],
  [[dask.array<from-value, shape=(505, 839, 11, 11), dtype=float32, chunksize=(505, 839, 11, 11), chunktype=cupy.ndarray>]]]]

In [None]:
dist = da.block(dist_chunks)
p = da.block(p_chunks)

In [None]:
dist

Unnamed: 0,Array,Chunk
Bytes,2.09 GiB,468.53 MiB
Shape,"(2520, 1844, 11, 11)","(1010, 1005, 11, 11)"
Dask graph,6 chunks in 23 graph layers,6 chunks in 23 graph layers
Data type,float32 cupy.ndarray,float32 cupy.ndarray
"Array Chunk Bytes 2.09 GiB 468.53 MiB Shape (2520, 1844, 11, 11) (1010, 1005, 11, 11) Dask graph 6 chunks in 23 graph layers Data type float32 cupy.ndarray",2520  1  11  11  1844,

Unnamed: 0,Array,Chunk
Bytes,2.09 GiB,468.53 MiB
Shape,"(2520, 1844, 11, 11)","(1010, 1005, 11, 11)"
Dask graph,6 chunks in 23 graph layers,6 chunks in 23 graph layers
Data type,float32 cupy.ndarray,float32 cupy.ndarray


In [None]:
p

Unnamed: 0,Array,Chunk
Bytes,2.09 GiB,468.53 MiB
Shape,"(2520, 1844, 11, 11)","(1010, 1005, 11, 11)"
Dask graph,6 chunks in 23 graph layers,6 chunks in 23 graph layers
Data type,float32 cupy.ndarray,float32 cupy.ndarray
"Array Chunk Bytes 2.09 GiB 468.53 MiB Shape (2520, 1844, 11, 11) (1010, 1005, 11, 11) Dask graph 6 chunks in 23 graph layers Data type float32 cupy.ndarray",2520  1  11  11  1844,

Unnamed: 0,Array,Chunk
Bytes,2.09 GiB,468.53 MiB
Shape,"(2520, 1844, 11, 11)","(1010, 1005, 11, 11)"
Dask graph,6 chunks in 23 graph layers,6 chunks in 23 graph layers
Data type,float32 cupy.ndarray,float32 cupy.ndarray


In [None]:
dist.chunks

((1005, 1010, 505), (1005, 839), (11,), (11,))

Trim the overlap region:

In [None]:
depth = {0:az_half_win, 1:r_half_win, 2:0, 3:0}; boundary = {0:'none',1:'none',2:'none',3:'none'}
dist = da.overlap.trim_overlap(dist,depth=depth,boundary=boundary)
p = da.overlap.trim_overlap(p,depth=depth,boundary=boundary)

In [None]:
dist.chunks

((1000, 1000, 500), (1000, 834), (11,), (11,))

Copy data from GPU to CPU:

In [None]:
cpu_dist = da.map_blocks(cp.asnumpy,dist)
cpu_p = da.map_blocks(cp.asnumpy,p)

In [None]:
cpu_dist

Unnamed: 0,Array,Chunk
Bytes,2.07 GiB,461.58 MiB
Shape,"(2500, 1834, 11, 11)","(1000, 1000, 11, 11)"
Dask graph,6 chunks in 26 graph layers,6 chunks in 26 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 2.07 GiB 461.58 MiB Shape (2500, 1834, 11, 11) (1000, 1000, 11, 11) Dask graph 6 chunks in 26 graph layers Data type float32 numpy.ndarray",2500  1  11  11  1834,

Unnamed: 0,Array,Chunk
Bytes,2.07 GiB,461.58 MiB
Shape,"(2500, 1834, 11, 11)","(1000, 1000, 11, 11)"
Dask graph,6 chunks in 26 graph layers,6 chunks in 26 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


Save the data to disk:

In [None]:
%%time
cpu_dist.to_zarr('dist.zarr',overwrite=True)
cpu_p.to_zarr('p.zarr',overwrite=True)

CPU times: user 909 ms, sys: 885 ms, total: 1.79 s
Wall time: 39.1 s


# Test

In [None]:
dask_dist = zarr.load('dist.zarr')
dask_p = zarr.load('p.zarr')
dist = zarr.load('../../data/dist_ks.zarr/')
p = zarr.load('../../data/p_ks.zarr/')

In [None]:
np.testing.assert_array_equal(dask_dist,dist)
np.testing.assert_array_equal(dask_p,p)

# Conclusion

- Dask can benifit when the GPU memory is not enough;
- Dask allows multi-gpu parallel processing.