-
-
Notifications
You must be signed in to change notification settings - Fork 263
/
absstore.py
293 lines (237 loc) · 9.11 KB
/
absstore.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
"""This module contains storage classes related to Azure Blob Storage (ABS)"""
from typing import Optional
import warnings
from numcodecs.compat import ensure_bytes
from zarr.util import normalize_storage_path
from zarr._storage.store import (
_get_metadata_suffix,
data_root,
meta_root,
Store,
StoreV3,
V3_DEPRECATION_MESSAGE,
)
from zarr.types import DIMENSION_SEPARATOR
__doctest_requires__ = {
("ABSStore", "ABSStore.*"): ["azure.storage.blob"],
}
class ABSStore(Store):
"""Storage class using Azure Blob Storage (ABS).
Parameters
----------
container : string
The name of the ABS container to use.
.. deprecated::
Use ``client`` instead.
prefix : string
Location of the "directory" to use as the root of the storage hierarchy
within the container.
account_name : string
The Azure blob storage account name.
.. deprecated:: 2.8.3
Use ``client`` instead.
account_key : string
The Azure blob storage account access key.
.. deprecated:: 2.8.3
Use ``client`` instead.
blob_service_kwargs : dictionary
Extra arguments to be passed into the azure blob client, for e.g. when
using the emulator, pass in blob_service_kwargs={'is_emulated': True}.
.. deprecated:: 2.8.3
Use ``client`` instead.
dimension_separator : {'.', '/'}, optional
Separator placed between the dimensions of a chunk.
client : azure.storage.blob.ContainerClient, optional
And ``azure.storage.blob.ContainerClient`` to connect with. See
`here <https://docs.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.containerclient?view=azure-python>`_ # noqa
for more.
.. versionadded:: 2.8.3
Notes
-----
In order to use this store, you must install the Microsoft Azure Storage SDK for Python,
``azure-storage-blob>=12.5.0``.
""" # noqa: E501
def __init__(
self,
container=None,
prefix="",
account_name=None,
account_key=None,
blob_service_kwargs=None,
dimension_separator: Optional[DIMENSION_SEPARATOR] = None,
client=None,
):
warnings.warn(
V3_DEPRECATION_MESSAGE.format(store=self.__class__.__name__),
FutureWarning,
stacklevel=3,
)
self._dimension_separator = dimension_separator
self.prefix = normalize_storage_path(prefix)
if client is None:
# deprecated option, try to construct the client for them
msg = (
"Providing 'container', 'account_name', 'account_key', and 'blob_service_kwargs'"
"is deprecated. Provide and instance of 'azure.storage.blob.ContainerClient' "
"'client' instead."
)
warnings.warn(msg, FutureWarning, stacklevel=2)
from azure.storage.blob import ContainerClient
blob_service_kwargs = blob_service_kwargs or {}
client = ContainerClient(
f"https://{account_name}.blob.core.windows.net/",
container,
credential=account_key,
**blob_service_kwargs,
)
self.client = client
self._container = container
self._account_name = account_name
self._account_key = account_key
@staticmethod
def _warn_deprecated(property_):
msg = (
"The {} property is deprecated and will be removed in a future "
"version. Get the property from 'ABSStore.client' instead."
)
warnings.warn(msg.format(property_), FutureWarning, stacklevel=3)
@property
def container(self):
self._warn_deprecated("container")
return self._container
@property
def account_name(self):
self._warn_deprecated("account_name")
return self._account_name
@property
def account_key(self):
self._warn_deprecated("account_key")
return self._account_key
def _append_path_to_prefix(self, path):
if self.prefix == "":
return normalize_storage_path(path)
else:
return "/".join([self.prefix, normalize_storage_path(path)])
@staticmethod
def _strip_prefix_from_path(path, prefix):
# normalized things will not have any leading or trailing slashes
path_norm = normalize_storage_path(path)
prefix_norm = normalize_storage_path(prefix)
if prefix:
return path_norm[(len(prefix_norm) + 1) :]
else:
return path_norm
def __getitem__(self, key):
from azure.core.exceptions import ResourceNotFoundError
blob_name = self._append_path_to_prefix(key)
try:
return self.client.download_blob(blob_name).readall()
except ResourceNotFoundError as e:
raise KeyError(f"Blob {blob_name} not found") from e
def __setitem__(self, key, value):
value = ensure_bytes(value)
blob_name = self._append_path_to_prefix(key)
self.client.upload_blob(blob_name, value, overwrite=True)
def __delitem__(self, key):
from azure.core.exceptions import ResourceNotFoundError
try:
self.client.delete_blob(self._append_path_to_prefix(key))
except ResourceNotFoundError as e:
raise KeyError(f"Blob {key} not found") from e
def __eq__(self, other):
return (
isinstance(other, ABSStore)
and self.client == other.client
and self.prefix == other.prefix
)
def keys(self):
return list(self.__iter__())
def __iter__(self):
if self.prefix:
list_blobs_prefix = self.prefix + "/"
else:
list_blobs_prefix = None
for blob in self.client.list_blobs(list_blobs_prefix):
yield self._strip_prefix_from_path(blob.name, self.prefix)
def __len__(self):
return len(self.keys())
def __contains__(self, key):
blob_name = self._append_path_to_prefix(key)
return self.client.get_blob_client(blob_name).exists()
def listdir(self, path=None):
dir_path = normalize_storage_path(self._append_path_to_prefix(path))
if dir_path:
dir_path += "/"
items = [
self._strip_prefix_from_path(blob.name, dir_path)
for blob in self.client.walk_blobs(name_starts_with=dir_path, delimiter="/")
]
return items
def rmdir(self, path=None):
dir_path = normalize_storage_path(self._append_path_to_prefix(path))
if dir_path:
dir_path += "/"
for blob in self.client.list_blobs(name_starts_with=dir_path):
self.client.delete_blob(blob)
def getsize(self, path=None):
store_path = normalize_storage_path(path)
fs_path = self._append_path_to_prefix(store_path)
if fs_path:
blob_client = self.client.get_blob_client(fs_path)
else:
blob_client = None
if blob_client and blob_client.exists():
return blob_client.get_blob_properties().size
else:
size = 0
if fs_path == "":
fs_path = None
elif not fs_path.endswith("/"):
fs_path += "/"
for blob in self.client.walk_blobs(name_starts_with=fs_path, delimiter="/"):
blob_client = self.client.get_blob_client(blob)
if blob_client.exists():
size += blob_client.get_blob_properties().size
return size
def clear(self):
self.rmdir()
class ABSStoreV3(ABSStore, StoreV3):
def list(self):
return list(self.keys())
def __eq__(self, other):
return (
isinstance(other, ABSStoreV3)
and self.client == other.client
and self.prefix == other.prefix
)
def __setitem__(self, key, value):
self._validate_key(key)
super().__setitem__(key, value)
def rmdir(self, path=None):
if not path:
# Currently allowing clear to delete everything as in v2
# If we disallow an empty path then we will need to modify
# TestABSStoreV3 to have the create_store method use a prefix.
ABSStore.rmdir(self, "")
return
meta_dir = meta_root + path
meta_dir = meta_dir.rstrip("/")
ABSStore.rmdir(self, meta_dir)
# remove data folder
data_dir = data_root + path
data_dir = data_dir.rstrip("/")
ABSStore.rmdir(self, data_dir)
# remove metadata files
sfx = _get_metadata_suffix(self)
array_meta_file = meta_dir + ".array" + sfx
if array_meta_file in self:
del self[array_meta_file]
group_meta_file = meta_dir + ".group" + sfx
if group_meta_file in self:
del self[group_meta_file]
# TODO: adapt the v2 getsize method to work for v3
# For now, calling the generic keys-based _getsize
def getsize(self, path=None):
from zarr.storage import _getsize # avoid circular import
return _getsize(self, path)
ABSStoreV3.__doc__ = ABSStore.__doc__