Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Intermittent blosc decompression errors #58

Closed
rabernat opened this issue Jan 24, 2021 · 9 comments
Closed

Intermittent blosc decompression errors #58

rabernat opened this issue Jan 24, 2021 · 9 comments
Labels
bug Something isn't working testing Everything related to testing

Comments

@rabernat
Copy link
Contributor

Executor tests occasionally fail like this

___________________________________ test_recipe_w_executor[D-DaskPipelineExecutor] ____________________________________

Executor = <class 'rechunker.executors.dask.DaskPipelineExecutor'>
netCDFtoZarr_sequential_recipe = (NetCDFtoZarrSequentialRecipe(sequence_dim='time', inputs_per_chunk=1, nitems_per_input=1, target=FSSpecTarget(fs=<fss...0x7fce414fe3a0>, root_path='/private/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/pytest-of-rpa/pytest-51/target1'))

    @pytest.mark.parametrize(
        "Executor", [PythonPipelineExecutor, DaskPipelineExecutor, PrefectPipelineExecutor]
    )
    def test_recipe_w_executor(Executor, netCDFtoZarr_sequential_recipe):
        rec, ds_expected, target = netCDFtoZarr_sequential_recipe
        pipeline = rec.to_pipelines()
        ex = Executor()
        plan = ex.pipelines_to_plan(pipeline)
>       ex.execute_plan(plan)

tests/test_executors.py:19: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../rechunker/rechunker/executors/dask.py:28: in execute_plan
    return plan.compute(**kwargs)
/opt/miniconda3/envs/pangeo2020/lib/python3.8/site-packages/dask/base.py:167: in compute
    (result,) = compute(self, traverse=False, **kwargs)
/opt/miniconda3/envs/pangeo2020/lib/python3.8/site-packages/dask/base.py:452: in compute
    results = schedule(dsk, keys, **kwargs)
/opt/miniconda3/envs/pangeo2020/lib/python3.8/site-packages/dask/threaded.py:76: in get
    results = get_async(
/opt/miniconda3/envs/pangeo2020/lib/python3.8/site-packages/dask/local.py:486: in get_async
    raise_exception(exc, tb)
/opt/miniconda3/envs/pangeo2020/lib/python3.8/site-packages/dask/local.py:316: in reraise
    raise exc
/opt/miniconda3/envs/pangeo2020/lib/python3.8/site-packages/dask/local.py:222: in execute_task
    result = _execute_task(task, data)
/opt/miniconda3/envs/pangeo2020/lib/python3.8/site-packages/dask/core.py:121: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
/opt/miniconda3/envs/pangeo2020/lib/python3.8/site-packages/dask/core.py:121: in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
/opt/miniconda3/envs/pangeo2020/lib/python3.8/site-packages/dask/core.py:121: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
pangeo_forge/recipe.py:211: in _store_chunk
    ds_chunk.to_zarr(target_mapper, region=write_region)
../../Code/xarray/xarray/core/dataset.py:1745: in to_zarr
    return to_zarr(
../../Code/xarray/xarray/backends/api.py:1481: in to_zarr
    dump_to_store(dataset, zstore, writer, encoding=encoding)
../../Code/xarray/xarray/backends/api.py:1158: in dump_to_store
    store.store(variables, attrs, check_encoding, writer, unlimited_dims=unlimited_dims)
../../Code/xarray/xarray/backends/zarr.py:462: in store
    ds = open_zarr(self.ds.store, group=self.ds.path, chunks=None)
../../Code/xarray/xarray/backends/zarr.py:675: in open_zarr
    ds = open_dataset(
../../Code/xarray/xarray/backends/api.py:575: in open_dataset
    ds = maybe_decode_store(store, chunks)
../../Code/xarray/xarray/backends/api.py:471: in maybe_decode_store
    ds = conventions.decode_cf(
../../Code/xarray/xarray/conventions.py:600: in decode_cf
    ds = Dataset(vars, attrs=attrs)
../../Code/xarray/xarray/core/dataset.py:630: in __init__
    variables, coord_names, dims, indexes, _ = merge_data_and_coords(
../../Code/xarray/xarray/core/merge.py:467: in merge_data_and_coords
    return merge_core(
../../Code/xarray/xarray/core/merge.py:594: in merge_core
    collected = collect_variables_and_indexes(aligned)
../../Code/xarray/xarray/core/merge.py:278: in collect_variables_and_indexes
    variable = as_variable(variable, name=name)
../../Code/xarray/xarray/core/variable.py:160: in as_variable
    obj = obj.to_index_variable()
../../Code/xarray/xarray/core/variable.py:528: in to_index_variable
    return IndexVariable(
../../Code/xarray/xarray/core/variable.py:2412: in __init__
    self._data = PandasIndexAdapter(self._data)
../../Code/xarray/xarray/core/indexing.py:1397: in __init__
    self.array = utils.safe_cast_to_index(array)
../../Code/xarray/xarray/core/utils.py:104: in safe_cast_to_index
    index = pd.Index(np.asarray(array), **kwargs)
/opt/miniconda3/envs/pangeo2020/lib/python3.8/site-packages/numpy/core/_asarray.py:83: in asarray
    return array(a, dtype, copy=False, order=order)
../../Code/xarray/xarray/core/indexing.py:568: in __array__
    return np.asarray(array[self.key], dtype=None)
/opt/miniconda3/envs/pangeo2020/lib/python3.8/site-packages/numpy/core/_asarray.py:83: in asarray
    return array(a, dtype, copy=False, order=order)
../../Code/xarray/xarray/coding/variables.py:70: in __array__
    return self.func(self.array)
../../Code/xarray/xarray/coding/times.py:187: in decode_cf_datetime
    num_dates = np.asarray(num_dates)
/opt/miniconda3/envs/pangeo2020/lib/python3.8/site-packages/numpy/core/_asarray.py:83: in asarray
    return array(a, dtype, copy=False, order=order)
../../Code/xarray/xarray/coding/variables.py:70: in __array__
    return self.func(self.array)
../../Code/xarray/xarray/coding/variables.py:138: in _apply_mask
    data = np.asarray(data, dtype=dtype)
/opt/miniconda3/envs/pangeo2020/lib/python3.8/site-packages/numpy/core/_asarray.py:83: in asarray
    return array(a, dtype, copy=False, order=order)
../../Code/xarray/xarray/core/indexing.py:568: in __array__
    return np.asarray(array[self.key], dtype=None)
../../Code/xarray/xarray/backends/zarr.py:57: in __getitem__
    return array[key.tuple]
../../Code/zarr/zarr/core.py:571: in __getitem__
    return self.get_basic_selection(selection, fields=fields)
../../Code/zarr/zarr/core.py:696: in get_basic_selection
    return self._get_basic_selection_nd(selection=selection, out=out,
../../Code/zarr/zarr/core.py:739: in _get_basic_selection_nd
    return self._get_selection(indexer=indexer, out=out, fields=fields)
../../Code/zarr/zarr/core.py:1034: in _get_selection
    self._chunk_getitems(lchunk_coords, lchunk_selection, out, lout_selection,
../../Code/zarr/zarr/core.py:1694: in _chunk_getitems
    self._process_chunk(out, cdatas[ckey], chunk_select, drop_axes,
../../Code/zarr/zarr/core.py:1609: in _process_chunk
    self._compressor.decode(cdata, dest)
numcodecs/blosc.pyx:562: in numcodecs.blosc.Blosc.decode
    ???
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

>   ???
E   RuntimeError: error during blosc decompression: 0

numcodecs/blosc.pyx:392: RuntimeError

For example: https://github.com/pangeo-forge/pangeo-forge/runs/1754269723?check_suite_focus=true

Same as pangeo-data/pangeo#196

@rabernat rabernat added bug Something isn't working testing Everything related to testing labels Jan 24, 2021
@rabernat rabernat added this to Discussion Needed in Software Development via automation Jan 24, 2021
@rabernat rabernat moved this from Discussion Needed to Shovel-ready issues in Software Development Jan 24, 2021
@rabernat rabernat mentioned this issue Jan 25, 2021
@TomAugspurger
Copy link
Contributor

Have you seen the blosc error locally? I vaguely recall seeing something like that when I was converting Daymet to Zarr for Azure's Open Datasets.

@davidbrochart
Copy link
Contributor

Not sure it's related, but we have this warning:

<frozen importlib._bootstrap>:219: RuntimeWarning: numpy.ndarray size changed, may indicate binary incompatibility. Expected 80 from C header, got 88 from PyObject

@rabernat
Copy link
Contributor Author

Have you seen the blosc error locally?

Yes, happens constantly in my macbook during testing.

@rabernat
Copy link
Contributor Author

One hint is that I think it only happens inside the dask executor, so probably has something to do with threads.

@rabernat
Copy link
Contributor Author

Perhaps we could fix the blosc issue with dask/distributed#1054?

@rabernat
Copy link
Contributor Author

These seem to have gone away in testing.

Software Development automation moved this from Shovel-ready issues to Done Jun 17, 2021
@derekocallaghan
Copy link
Contributor

I've been frequently seeing these blosc decompression errors locally, with the following package versions:

  • xarray: 2022.3.0
  • dask: 2022.6.0
  • numcodecs: 0.9.1
  • pangeo_forge_recipes: 0.9.0

These are occurring when executing an XarrayZarrRecipe with Dask. When I use 3 input files, I never see the problem, however when I use 31 input files (a month of data), it always happens.

I'd been successfully running the recipe in the Pangeo Forge Sandbox. However, it also happened there once this week:

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
Input In [1], in <cell line: 165>()
    163 from dask.diagnostics import ProgressBar
    165 with ProgressBar():
--> 166     delayed.compute()
    168 ds = xr.open_zarr(recipe.target_mapper)
    169 ds

File /srv/conda/envs/notebook/lib/python3.9/site-packages/dask/base.py:312, in DaskMethodsMixin.compute(self, **kwargs)
    288 def compute(self, **kwargs):
    289     """Compute this dask collection
    290 
    291     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    310     dask.base.compute
    311     """
--> 312     (result,) = compute(self, traverse=False, **kwargs)
    313     return result

File /srv/conda/envs/notebook/lib/python3.9/site-packages/dask/base.py:600, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    597     keys.append(x.__dask_keys__())
    598     postcomputes.append(x.__dask_postcompute__())
--> 600 results = schedule(dsk, keys, **kwargs)
    601 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /srv/conda/envs/notebook/lib/python3.9/site-packages/dask/threaded.py:81, in get(dsk, result, cache, num_workers, pool, **kwargs)
     78     elif isinstance(pool, multiprocessing.pool.Pool):
     79         pool = MultiprocessingPoolExecutor(pool)
---> 81 results = get_async(
     82     pool.submit,
     83     pool._max_workers,
     84     dsk,
     85     result,
     86     cache=cache,
     87     get_id=_thread_get_id,
     88     pack_exception=pack_exception,
     89     **kwargs,
     90 )
     92 # Cleanup pools associated to dead threads
     93 with pools_lock:

File /srv/conda/envs/notebook/lib/python3.9/site-packages/dask/local.py:508, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    506         _execute_task(task, data)  # Re-execute locally
    507     else:
--> 508         raise_exception(exc, tb)
    509 res, worker_id = loads(res_info)
    510 state["cache"][key] = res

File /srv/conda/envs/notebook/lib/python3.9/site-packages/dask/local.py:316, in reraise(exc, tb)
    314 if exc.__traceback__ is not tb:
    315     raise exc.with_traceback(tb)
--> 316 raise exc

File /srv/conda/envs/notebook/lib/python3.9/site-packages/dask/local.py:221, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    219 try:
    220     task, data = loads(task_info)
--> 221     result = _execute_task(task, data)
    222     id = get_id()
    223     result = dumps((result, id))

File /srv/conda/envs/notebook/lib/python3.9/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /srv/conda/envs/notebook/lib/python3.9/site-packages/dask/optimization.py:990, in SubgraphCallable.__call__(self, *args)
    988 if not len(args) == len(self.inkeys):
    989     raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 990 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))

File /srv/conda/envs/notebook/lib/python3.9/site-packages/dask/core.py:149, in get(dsk, out, cache)
    147 for key in toposort(dsk):
    148     task = dsk[key]
--> 149     result = _execute_task(task, cache)
    150     cache[key] = result
    151 result = _execute_task(out, cache)

File /srv/conda/envs/notebook/lib/python3.9/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /srv/conda/envs/notebook/lib/python3.9/site-packages/pangeo_forge_recipes/executors/dask.py:16, in wrap_map_task.<locals>.wrapped(map_arg, config, *dependencies)
     15 def wrapped(map_arg, config, *dependencies):
---> 16     return function(map_arg, config=config)

File /srv/conda/envs/notebook/lib/python3.9/site-packages/pangeo_forge_recipes/recipes/xarray_zarr.py:635, in store_chunk(chunk_key, config)
    631 with lock_for_conflicts(lock_keys, timeout=config.lock_timeout):
    632     logger.info(
    633         f"Storing variable {vname} chunk {chunk_key!s} " f"to Zarr region {zarr_region}"
    634     )
--> 635     zarr_array[zarr_region] = data

File /srv/conda/envs/notebook/lib/python3.9/site-packages/zarr/core.py:1285, in Array.__setitem__(self, selection, value)
   1283     self.vindex[selection] = value
   1284 else:
-> 1285     self.set_basic_selection(pure_selection, value, fields=fields)

File /srv/conda/envs/notebook/lib/python3.9/site-packages/zarr/core.py:1380, in Array.set_basic_selection(self, selection, value, fields)
   1378     return self._set_basic_selection_zd(selection, value, fields=fields)
   1379 else:
-> 1380     return self._set_basic_selection_nd(selection, value, fields=fields)

File /srv/conda/envs/notebook/lib/python3.9/site-packages/zarr/core.py:1680, in Array._set_basic_selection_nd(self, selection, value, fields)
   1674 def _set_basic_selection_nd(self, selection, value, fields=None):
   1675     # implementation of __setitem__ for array with at least one dimension
   1676 
   1677     # setup indexer
   1678     indexer = BasicIndexer(selection, self)
-> 1680     self._set_selection(indexer, value, fields=fields)

File /srv/conda/envs/notebook/lib/python3.9/site-packages/zarr/core.py:1732, in Array._set_selection(self, indexer, value, fields)
   1729                 chunk_value = chunk_value[item]
   1731         # put data
-> 1732         self._chunk_setitem(chunk_coords, chunk_selection, chunk_value, fields=fields)
   1733 else:
   1734     lchunk_coords, lchunk_selection, lout_selection = zip(*indexer)

File /srv/conda/envs/notebook/lib/python3.9/site-packages/zarr/core.py:1994, in Array._chunk_setitem(self, chunk_coords, chunk_selection, value, fields)
   1991     lock = self._synchronizer[ckey]
   1993 with lock:
-> 1994     self._chunk_setitem_nosync(chunk_coords, chunk_selection, value,
   1995                                fields=fields)

File /srv/conda/envs/notebook/lib/python3.9/site-packages/zarr/core.py:1999, in Array._chunk_setitem_nosync(self, chunk_coords, chunk_selection, value, fields)
   1997 def _chunk_setitem_nosync(self, chunk_coords, chunk_selection, value, fields=None):
   1998     ckey = self._chunk_key(chunk_coords)
-> 1999     cdata = self._process_for_setitem(ckey, chunk_selection, value, fields=fields)
   2001     # attempt to delete chunk if it only contains the fill value
   2002     if (not self.write_empty_chunks) and all_equal(self.fill_value, cdata):

File /srv/conda/envs/notebook/lib/python3.9/site-packages/zarr/core.py:2049, in Array._process_for_setitem(self, ckey, chunk_selection, value, fields)
   2044         chunk = np.zeros(self._chunks, dtype=self._dtype, order=self._order)
   2046 else:
   2047 
   2048     # decode chunk
-> 2049     chunk = self._decode_chunk(cdata)
   2050     if not chunk.flags.writeable:
   2051         chunk = chunk.copy(order='K')

File /srv/conda/envs/notebook/lib/python3.9/site-packages/zarr/core.py:2076, in Array._decode_chunk(self, cdata, start, nitems, expected_shape)
   2074         chunk = self._compressor.decode_partial(cdata, start, nitems)
   2075     else:
-> 2076         chunk = self._compressor.decode(cdata)
   2077 else:
   2078     chunk = cdata

File numcodecs/blosc.pyx:564, in numcodecs.blosc.Blosc.decode()

File numcodecs/blosc.pyx:394, in numcodecs.blosc.decompress()

RuntimeError: error during blosc decompression: -1

Debugging this locally, I got as far as blosc.blosc_run_decompression_with_context(). However, this returns -1 for multiple scenarios so I couldn't determine which one was occurring.

No problem if you'd prefer this to be recorded in a new issue instead of adding it to this currently closed issue.

Cheers,
Derek

@rsignell-usgs
Copy link

rsignell-usgs commented Aug 8, 2023

I just ran into this issue using the new Beam pipeline approach, the pipeline erroring out when I'm baking the recipe with pangeo-forge-runner from the command line with a local_config.py. I'm getting:

  File "numcodecs/blosc.pyx", line 564, in numcodecs.blosc.Blosc.decode
  File "numcodecs/blosc.pyx", line 394, in numcodecs.blosc.decompress
RuntimeError: error during blosc decompression: -1 [while running 'Create|OpenURLWithFSSpec|OpenWithXarray|StoreToZarr/StoreToZarr/StoreDatasetFragments/Map(store_dataset_fragment)']

I then tried adding the following lines to the recipe, since this has worked in other cases where we've had this problem:

import numcodecs
numcodecs.blosc.use_threads = False
from numcodecs import Zstd
import zarr
zarr.storage.default_compressor = Zstd(level=7)

before the recipe:

recipe = (
    beam.Create(pattern.items())
    | OpenURLWithFSSpec()
    | OpenWithXarray(file_type=pattern.file_type, xarray_open_kwargs={"decode_coords": "all"})
    | StoreToZarr(
        store_name="gene",
        combine_dims=pattern.combine_dim_keys,
        target_chunks=chunk_plan
    )
)

but it didn't seem to do anything -- same error occurred.

Do we have to do something different to inject the numcodecs/blosc stuff into the pipeline?

Or am I heading down the wrong path?

@cisaacstern
Copy link
Member

@rsignell-usgs, thanks for reporting. I've transferred this to a new issue #560. Let's discuss there!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working testing Everything related to testing
Projects
Development

No branches or pull requests

6 participants