-
Notifications
You must be signed in to change notification settings - Fork 336
/
mapping.py
153 lines (129 loc) · 4.48 KB
/
mapping.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
from collections.abc import MutableMapping
from .registry import get_filesystem_class
from .core import split_protocol
class FSMap(MutableMapping):
"""Wrap a FileSystem instance as a mutable wrapping.
The keys of the mapping become files under the given root, and the
values (which must be bytes) the contents of those files.
Parameters
----------
root: string
prefix for all the files
fs: FileSystem instance
check: bool (=True)
performs a touch at the location, to check for write access.
Examples
--------
>>> fs = FileSystem(**parameters) # doctest: +SKIP
>>> d = FSMap('my-data/path/', fs) # doctest: +SKIP
or, more likely
>>> d = fs.get_mapper('my-data/path/')
>>> d['loc1'] = b'Hello World' # doctest: +SKIP
>>> list(d.keys()) # doctest: +SKIP
['loc1']
>>> d['loc1'] # doctest: +SKIP
b'Hello World'
"""
def __init__(self, root, fs, check=False, create=False):
self.fs = fs
self.root = fs._strip_protocol(root).rstrip(
"/"
) # we join on '/' in _key_to_str
if create:
if not self.fs.exists(root):
self.fs.mkdir(root)
if check:
if not self.fs.exists(root):
raise ValueError(
"Path %s does not exist. Create "
" with the ``create=True`` keyword" % root
)
self.fs.touch(root + "/a")
self.fs.rm(root + "/a")
def clear(self):
"""Remove all keys below root - empties out mapping
"""
try:
self.fs.rm(self.root, True)
self.fs.mkdir(self.root)
except: # noqa: E722
pass
def _key_to_str(self, key):
"""Generate full path for the key"""
if isinstance(key, (tuple, list)):
key = str(tuple(key))
else:
key = str(key)
return "/".join([self.root, key]) if self.root else key
def _str_to_key(self, s):
"""Strip path of to leave key name"""
return s[len(self.root) :].lstrip("/")
def __getitem__(self, key, default=None):
"""Retrieve data"""
key = self._key_to_str(key)
try:
result = self.fs.cat(key)
except: # noqa: E722
if default is not None:
return default
raise KeyError(key)
return result
def pop(self, key, default=None):
result = self.__getitem__(key, default)
try:
del self[key]
except KeyError:
pass
return result
def __setitem__(self, key, value):
"""Store value in key"""
key = self._key_to_str(key)
self.fs.mkdirs(self.fs._parent(key), exist_ok=True)
with self.fs.open(key, "wb") as f:
f.write(value)
def __iter__(self):
return (self._str_to_key(x) for x in self.fs.find(self.root))
def __len__(self):
return len(self.fs.find(self.root))
def __delitem__(self, key):
"""Remove key"""
try:
self.fs.rm(self._key_to_str(key))
except: # noqa: E722
raise KeyError
def __contains__(self, key):
"""Does key exist in mapping?"""
path = self._key_to_str(key)
return self.fs.exists(path) and self.fs.isfile(path)
def __getstate__(self):
"""Mapping should be pickleable"""
# TODO: replace with reduce to reinstantiate?
return self.fs, self.root
def __setstate__(self, state):
fs, root = state
self.fs = fs
self.root = root
def get_mapper(url, check=False, create=False, **kwargs):
"""Create key-value interface for given URL and options
The URL will be of the form "protocol://location" and point to the root
of the mapper required. All keys will be file-names below this location,
and their values the contents of each key.
Parameters
----------
url: str
Root URL of mapping
check: bool
Whether to attempt to read from the location before instantiation, to
check that the mapping does exist
create: bool
Whether to make the directory corresponding to the root before
instantiating
Returns
-------
``FSMap`` instance, the dict-like key-value store.
"""
protocol, path = split_protocol(url)
cls = get_filesystem_class(protocol)
fs = cls(**kwargs)
# Removing protocol here - could defer to each open() on the backend
return FSMap(url, fs, check, create)