Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] getitems / setitems for BaseStore #1040

Closed
wants to merge 11 commits into from
14 changes: 13 additions & 1 deletion zarr/_storage/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
from collections.abc import MutableMapping
from string import ascii_letters, digits
from typing import Any, List, Mapping, Optional, Union
from typing import Any, Hashable, List, Mapping, Optional, Union, Dict, Iterable

from zarr.meta import Metadata2, Metadata3
from zarr.util import normalize_storage_path
Expand Down Expand Up @@ -89,6 +89,18 @@ def rename(self, src_path: str, dst_path: str) -> None:
) # pragma: no cover
_rename_from_keys(self, src_path, dst_path)

def getitems(self, keys: Iterable[Hashable]) -> Dict[Hashable, Any]:
result = {}
for key in keys:
try:
result[key] = self.__getitem__(key)
except KeyError:
pass
return result

def setitems(self, items):
return tuple(self.__setitem__(key, value) for key, value in items.items())

@staticmethod
def _ensure_store(store: Any):
"""
Expand Down
84 changes: 38 additions & 46 deletions zarr/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import re
from collections.abc import MutableMapping
from functools import reduce
from typing import Any
from typing import Any, Optional, Sequence, Union

import numpy as np
from numcodecs.compat import ensure_bytes, ensure_ndarray
Expand Down Expand Up @@ -788,7 +788,7 @@ def __getitem__(self, selection):
result = self.get_basic_selection(pure_selection, fields=fields)
return result

def get_basic_selection(self, selection=Ellipsis, out=None, fields=None):
def get_basic_selection(self, selection=Ellipsis, out=None, fields: Optional[Union[str, Sequence[str]]]=None):
"""Retrieve data for an item or region of the array.

Parameters
Expand Down Expand Up @@ -1230,28 +1230,19 @@ def _get_selection(self, indexer, out=None, fields=None):

# determine output shape
out_shape = indexer.shape


indexer_parts = tuple(i for i in indexer)

# setup output array
if out is None:
out = np.empty(out_shape, dtype=out_dtype, order=self._order)
else:
check_array_shape('out', out, out_shape)

# iterate over chunks
if not hasattr(self.chunk_store, "getitems") or \
any(map(lambda x: x == 0, self.shape)):
# sequentially get one key at a time from storage
for chunk_coords, chunk_selection, out_selection in indexer:

# load chunk selection into output array
self._chunk_getitem(chunk_coords, chunk_selection, out, out_selection,
drop_axes=indexer.drop_axes, fields=fields)
else:
# allow storage to get multiple items at once
lchunk_coords, lchunk_selection, lout_selection = zip(*indexer)

if len(indexer_parts) > 0:
lchunk_coords, lchunk_selection, lout_selection = zip(*indexer_parts)
self._chunk_getitems(lchunk_coords, lchunk_selection, out, lout_selection,
drop_axes=indexer.drop_axes, fields=fields)

drop_axes=indexer.drop_axes, fields=fields)
if out.shape:
return out
else:
Expand Down Expand Up @@ -1776,7 +1767,7 @@ def _set_selection(self, indexer, value, fields=None):
check_array_shape('value', value, sel_shape)

# iterate over chunks in range
if not hasattr(self.store, "setitems") or self._synchronizer is not None \
if self._synchronizer is not None \
or any(map(lambda x: x == 0, self.shape)):
# iterative approach
for chunk_coords, chunk_selection, out_selection in indexer:
Expand All @@ -1799,26 +1790,27 @@ def _set_selection(self, indexer, value, fields=None):
# put data
self._chunk_setitem(chunk_coords, chunk_selection, chunk_value, fields=fields)
else:
lchunk_coords, lchunk_selection, lout_selection = zip(*indexer)
chunk_values = []
for out_selection in lout_selection:
if sel_shape == ():
chunk_values.append(value)
elif is_scalar(value, self._dtype):
chunk_values.append(value)
else:
cv = value[out_selection]
# handle missing singleton dimensions
if indexer.drop_axes: # pragma: no cover
item = [slice(None)] * self.ndim
for a in indexer.drop_axes:
item[a] = np.newaxis
item = tuple(item)
cv = chunk_value[item]
chunk_values.append(cv)

self._chunk_setitems(lchunk_coords, lchunk_selection, chunk_values,
fields=fields)
if all(map(lambda x: x > 0, sel_shape)):
lchunk_coords, lchunk_selection, lout_selection = zip(*indexer)
chunk_values = []
for out_selection in lout_selection:
if sel_shape == ():
chunk_values.append(value)
elif is_scalar(value, self._dtype):
chunk_values.append(value)
else:
cv = value[out_selection]
# handle missing singleton dimensions
if indexer.drop_axes:
item = [slice(None)] * self.ndim
for a in indexer.drop_axes:
item[a] = np.newaxis
item = tuple(item)
cv = cv[item]
chunk_values.append(cv)

self._chunk_setitems(lchunk_coords, lchunk_selection, chunk_values,
fields=fields)

def _process_chunk(
self,
Expand Down Expand Up @@ -1972,7 +1964,6 @@ def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection,
and hasattr(self._compressor, "decode_partial")
and not fields
and self.dtype != object
and hasattr(self.chunk_store, "getitems")
):
partial_read_decode = True
cdatas = {
Expand All @@ -1982,17 +1973,18 @@ def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection,
}
else:
partial_read_decode = False
cdatas = self.chunk_store.getitems(ckeys, on_error="omit")
for ckey, chunk_select, out_select in zip(ckeys, lchunk_selection, lout_selection):
cdatas = self.chunk_store.getitems(ckeys)
for ckey, chunk_selection, out_selection in zip(ckeys, lchunk_selection, lout_selection):
if ckey in cdatas:
cdata = cdatas[ckey]
self._process_chunk(
out,
cdatas[ckey],
chunk_select,
cdata,
chunk_selection,
drop_axes,
out_is_ndarray,
fields,
out_select,
out_selection,
partial_read_decode=partial_read_decode,
)
else:
Expand All @@ -2002,7 +1994,7 @@ def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection,
fill_value = self._fill_value[fields]
else:
fill_value = self._fill_value
out[out_select] = fill_value
out[out_selection] = fill_value

def _chunk_setitems(self, lchunk_coords, lchunk_selection, values, fields=None):
ckeys = map(self._chunk_key, lchunk_coords)
Expand Down
Loading