Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask LocalCudaCluster compute error when threads_per_worker not equal to 1 #1262

Closed
kanglcn opened this issue Oct 23, 2023 · 5 comments
Closed

Comments

@kanglcn
Copy link

kanglcn commented Oct 23, 2023

I find a weird error with LocalCUDACluster. My workflow is use dask to load data from zarr, then transfer to GPU memory, do some computation with multi-GPU, transfer the result back to CPU memory and finally save to zarr.

A minimum code to reproduce:

import numpy as np
import cupy as cp
import zarr
import dask
from dask import array as da
from dask.distributed import Client
from dask_cuda import LocalCUDACluster

# prepare the data
a_np = np.random.random((100, 100, 5, 5))
a_zarr = zarr.open('./a.zarr','w',shape=a_np.shape,chunks=(10,-1,-1,-1))
a_zarr[:] = a_np

cluster = LocalCUDACluster(n_workers=3, threads_per_worker=3)
client = Client(cluster)

a_da = da.from_zarr('./a.zarr')
a_cu_da = a_da.map_blocks(cp.asarray)

is_b = (a_cu_da < 0.5) & (a_cu_da >= 0)

b_num = da.count_nonzero(is_b,axis=(-2,-1)).astype(cp.int32)

is_b_cpu = is_b.map_blocks(cp.asnumpy)

_is_b_cpu = is_b_cpu.to_zarr('./is_b.zarr',compute=False,overwrite=True)

da.compute(_is_b_cpu)

It sometimes will generate:

2023-10-22 18:55:21,760 - distributed.worker - WARNING - Compute Failed
Key:       ('asnumpy-ac4fe35f67de082b63465a864bc6b4eb', 2, 0, 0, 0)
Function:  subgraph_callable-e9fbb40b-c7fe-4438-8765-7c7e2bac
args:      (0, 0.5, <zarr.core.Array (100, 100, 5, 5) float64 read-only>, (slice(20, 30, None), slice(0, 100, None), slice(0, 5, None), slice(0, 5, None)))
kwargs:    {}
Exception: "CUDARuntimeError('cudaErrorInvalidValue: invalid argument')"

2023-10-22 18:55:21,760 - distributed.worker - WARNING - Compute Failed
Key:       ('asnumpy-ac4fe35f67de082b63465a864bc6b4eb', 1, 0, 0, 0)
Function:  subgraph_callable-e9fbb40b-c7fe-4438-8765-7c7e2bac
args:      (0, 0.5, <zarr.core.Array (100, 100, 5, 5) float64 read-only>, (slice(10, 20, None), slice(0, 100, None), slice(0, 5, None), slice(0, 5, None)))
kwargs:    {}
Exception: "CUDARuntimeError('cudaErrorInvalidValue: invalid argument')"

2023-10-22 18:55:22,041 - distributed.worker - WARNING - Compute Failed
Key:       ('asnumpy-ac4fe35f67de082b63465a864bc6b4eb', 4, 0, 0, 0)
Function:  subgraph_callable-e9fbb40b-c7fe-4438-8765-7c7e2bac
args:      (0, 0.5, <zarr.core.Array (100, 100, 5, 5) float64 read-only>, (slice(40, 50, None), slice(0, 100, None), slice(0, 5, None), slice(0, 5, None)))
kwargs:    {}
Exception: "CUDARuntimeError('cudaErrorInvalidValue: invalid argument')"

2023-10-22 18:55:22,042 - distributed.worker - WARNING - Compute Failed
Key:       ('asnumpy-ac4fe35f67de082b63465a864bc6b4eb', 3, 0, 0, 0)
Function:  subgraph_callable-e9fbb40b-c7fe-4438-8765-7c7e2bac
args:      (0, 0.5, <zarr.core.Array (100, 100, 5, 5) float64 read-only>, (slice(30, 40, None), slice(0, 100, None), slice(0, 5, None), slice(0, 5, None)))
kwargs:    {}
Exception: "CUDARuntimeError('cudaErrorInvalidValue: invalid argument')"

---------------------------------------------------------------------------
CUDARuntimeError                          Traceback (most recent call last)
Cell In[9], line 1
----> 1 da.compute(_is_b_cpu)

File ~/miniconda3/envs/work/lib/python3.10/site-packages/dask/base.py:628, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    625     postcomputes.append(x.__dask_postcompute__())
    627 with shorten_traceback():
--> 628     results = schedule(dsk, keys, **kwargs)
    630 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/miniconda3/envs/work/lib/python3.10/site-packages/cupy/_creation/from_data.py:76, in asarray()
     49 def asarray(a, dtype=None, order=None):
     50     """Converts an object to array.
     51 
     52     This is equivalent to ``array(a, dtype, copy=False)``.
   (...)
     74 
     75     """
---> 76     return _core.array(a, dtype, False, order)

File cupy/_core/core.pyx:2360, in cupy._core.core.array()

File cupy/_core/core.pyx:2384, in cupy._core.core.array()

File cupy/_core/core.pyx:2532, in cupy._core.core._array_default()

File cupy/cuda/memory.pyx:491, in cupy.cuda.memory.MemoryPointer.copy_from_host_async()

File cupy_backends/cuda/api/runtime.pyx:588, in cupy_backends.cuda.api.runtime.memcpyAsync()

File cupy_backends/cuda/api/runtime.pyx:143, in cupy_backends.cuda.api.runtime.check_status()

CUDARuntimeError: cudaErrorInvalidValue: invalid argument

However, sometimes it just works perfect.

I find when I set threads_per_worker=1. This error always disappear.

I also asked a similar question on Dask forum: https://dask.discourse.group/t/dask-localcudacluster-compute-error-when-threads-per-worker-not-equal-to-1/2284/1

@pentschev
Copy link
Member

Using more than one thread per worker (default) is not something we recommend or officially support for LocalCUDACluster at the moment. The reason for that is we would have to ensure downstream libraries (such as CuPy) are properly prepared to handle multiple threads in one process, which is something we haven't done as that generally doesn't improve performance out-of-the-box, see #517 and #96 for related work and discussions.

If you're doing that to ensure you have multiple threads for CPU compute resources, you could consider launching a hybrid cluster with proper resource and code annotations.

@kanglcn
Copy link
Author

kanglcn commented Oct 23, 2023

Thanks for letting me know @pentschev . The reason for me to use multi-thread is I think multi-threads can accelerate the IO to disk since zarr is compressed and chunked storage (I do have tested it). I don't need use multi-threads for the calculation. Do you have any suggestions for me? I have looked at kwikio but dask doesn't support it now.

@madsbk
Copy link
Member

madsbk commented Oct 24, 2023

You should be able to use dask.array.from_array and KvikIO to read a Zarr array:

import cupy
import dask.array
from dask.distributed import Client
from dask_cuda import LocalCUDACluster


def main():
    import kvikio.zarr

    filepath = "./a.zarr"

    # prepare the data
    z = kvikio.zarr.open_cupy_array(store=filepath, mode="w", shape=(10,), chunks=(2,))
    z[:] = cupy.arange(10)

    # load the zarr array into a dask array
    a = dask.array.from_array(z, chunks=z.chunks)

    # as this point, it works as a regular dask array (backed by cupy.ndarray)
    assert a.sum().compute() == 45


if __name__ == "__main__":
    with LocalCUDACluster(n_workers=1) as cluster:
        with Client(cluster):
            main()

We should properly mention this in the KvikIO docs :)

@pentschev
Copy link
Member

pentschev commented Oct 24, 2023

Please note that in the sample above z will be entirely allocated in the client GPU, which is often undesirable as the client's GPU is often much smaller than the cluster's combined GPU memory. Instead I was discussing offline with @madsbk and, assuming your files already exist, you could do something as below instead:

z = kvikio.zarr.open_cupy_array(store=filepath, mode="r")
a = dask.array.from_array(z, chunks=z.chunks)

In that case you'll map z directly onto a Dask array with z.chunks, and assuming the number of chunks is equal to or larger than the number of workers it will be split among all available workers.

@kanglcn
Copy link
Author

kanglcn commented Oct 26, 2023

Thanks. I will try it later.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants