-
Notifications
You must be signed in to change notification settings - Fork 7
/
table_proxy.py
390 lines (314 loc) · 11.6 KB
/
table_proxy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
from threading import Lock
import weakref
from dask.base import normalize_token
import numpy as np
import pyrap.tables as pt
from daskms.table_executor import Executor, STANDARD_EXECUTOR
log = logging.getLogger(__name__)
_table_cache = weakref.WeakValueDictionary()
_table_lock = Lock()
# CASA Table Locking Modes
NOLOCK = 0
READLOCK = 1
WRITELOCK = 2
_LOCKTYPE_STRINGS = {
0: 'NOLOCK',
1: 'READLOCK',
2: 'WRITELOCK'
}
# List of CASA Table methods to proxy and the appropriate locking mode
_proxied_methods = [
# Queries
("nrows", READLOCK),
("colnames", READLOCK),
("getcoldesc", READLOCK),
("getdminfo", READLOCK),
# Modification
("addrows", WRITELOCK),
("addcols", WRITELOCK),
# Reads
("getcol", READLOCK),
("getcolnp", READLOCK),
("getvarcol", READLOCK),
("getcell", READLOCK),
("getcellslice", READLOCK),
("getkeywords", READLOCK),
("getcolkeywords", READLOCK),
# Writes
("putcol", WRITELOCK),
("putcolnp", WRITELOCK),
("putvarcol", WRITELOCK),
("putcellslice", WRITELOCK),
("putkeywords", WRITELOCK),
("putcolkeywords", WRITELOCK)]
_PROXY_DOCSTRING = ("""
Proxies calls to :func:`~pyrap.tables.table.%s`
via a :class:`~concurrent.futures.ThreadPoolExecutor`
Returns
-------
future : :class:`concurrent.futures.Future`
Future containing the result of the call
""")
def proxied_method_factory(method, locktype):
"""
Proxy pyrap.tables.table.method calls.
Creates a private implementation function which performs
the call locked according to to ``locktype``.
The private implementation is accessed by a public ``method``
which submits a call to the implementation
on a concurrent.futures.ThreadPoolExecutor.
"""
def _impl(self, args, kwargs):
self._acquire(locktype)
try:
return getattr(self._table, method)(*args, **kwargs)
finally:
self._release(locktype)
_impl.__name__ = method + "_impl"
_impl.__doc__ = ("Calls table.%s, wrapped in a %s." %
(method, _LOCKTYPE_STRINGS[locktype]))
def public_method(self, *args, **kwargs):
"""
Submits _impl(args, kwargs) to the executor
and returns a Future
"""
return self._ex.submit(_impl, self, args, kwargs)
public_method.__name__ = method
public_method.__doc__ = _PROXY_DOCSTRING % method
return public_method
def _hasher(args):
""" Recursively hash data structures -- handles list and dicts """
if isinstance(args, (tuple, list, set)):
return hash(tuple(_hasher(v) for v in args))
elif isinstance(args, dict):
return hash(tuple((k, _hasher(v)) for k, v in sorted(args.items())))
elif isinstance(args, np.ndarray):
# NOTE(sjperkins)
# https://stackoverflow.com/a/16592241/1611416
# Slowish, but we shouldn't be passing
# huge numpy arrays in the TableProxy constructor
return hash(args.tostring())
else:
return hash(args)
class TableProxyMetaClass(type):
"""
https://en.wikipedia.org/wiki/Multiton_pattern
"""
def __new__(cls, name, bases, dct):
for method, locktype in _proxied_methods:
proxy_method = proxied_method_factory(method, locktype)
dct[method] = proxy_method
return type.__new__(cls, name, bases, dct)
def __call__(cls, *args, **kwargs):
key = _hasher((cls,) + args + (kwargs,))
with _table_lock:
try:
return _table_cache[key]
except KeyError:
instance = type.__call__(cls, *args, **kwargs)
_table_cache[key] = instance
return instance
def proxy_delete_reference(table_proxy, table):
# http://pydev.blogspot.com/2015/01/creating-safe-cyclic-reference.html
# To avoid cyclic references, table_proxy may not be used within _callback
def _callback(ref):
# We close the table **without** using the executor due to
# reentrancy issues with Python queues and garbage collection
# https://codewithoutrules.com/2017/08/16/concurrency-python/
# There could be internal casacore issues here, due to accessing
# the table from a different thread, but test cases are passing
tabstr = hash(str(table))
log.debug("Begin closing %s", tabstr)
try:
table.close()
except Exception:
log.exception("Error closing %s", tabstr)
raise
finally:
log.debug("Finished closing %s", tabstr)
return weakref.ref(table_proxy, _callback)
def _map_create_proxy(cls, factory, args, kwargs):
""" Support pickling of kwargs in TableProxy.__reduce__ """
return cls(factory, *args, **kwargs)
class MismatchedLocks(Exception):
pass
def taql_factory(query, style='Python', tables=[]):
""" Calls pt.taql, converting TableProxy's in tables to pyrap tables """
tabs = [t._table for t in tables]
for t in tables:
t._acquire(READLOCK)
try:
return pt.taql(query, style=style, tables=tabs)
finally:
for t in tables:
t._release(READLOCK)
class TableProxy(object, metaclass=TableProxyMetaClass):
"""
Proxies calls to a :class:`pyrap.tables.table` object via
a :class:`concurrent.futures.ThreadPoolExecutor`.
"""
def __init__(self, factory, *args, **kwargs):
"""
Parameters
----------
factory : callable
Function to call which creates the CASA table
*args : tuple
Positional arguments passed to factory.
**kwargs : dict
Keyword arguments passed to factory.
__executor_key__ : str, optional
Executor key. Identifies a unique threadpool
in which table operations will be performed.
"""
# Save the arguments as keys for pickling and tokenising
self._factory = factory
self._args = args
self._kwargs = kwargs
# NOTE(sjperkins)
# Copy the kwargs and remove (any) __executor_key__
# This is smelly but we do this to maintain
# key uniqueness derived from
# the *args and **kwargs in the MetaClass
# as well as uniqueness when pickling/unpickling
# A named keyword is possible but
# TableProxy(*args, *kwargs)
# doesn't produce the same unique key as
# TableProxy(*args, ex_key=..., **kwargs)
kwargs = kwargs.copy()
self._ex_key = kwargs.pop("__executor_key__", STANDARD_EXECUTOR)
ex = Executor(key=self._ex_key)
table = ex.impl.submit(factory, *args, **kwargs).result()
if not isinstance(table, pt.table):
raise RuntimeError("'%s' did not produce a "
"CASA table object" % factory)
# Ensure tables are closed when the object is deleted
self._del_ref = proxy_delete_reference(self, table)
# Store a reference to the Executor wrapper class
# so that the Executor is retained while this TableProxy
# still lives
self._ex_wrapper = ex
# Reference to the internal ThreadPoolExecutor
self._ex = ex.impl
# Private, should be inaccessible
self._table = table
self._readlocks = 0
self._writelocks = 0
self._write = False
self._writeable = table.iswritable()
@property
def executor_key(self):
return self._ex_key
def __reduce__(self):
""" Defer to _map_create_proxy to support kwarg pickling """
return (_map_create_proxy, (TableProxy, self._factory,
self._args, self._kwargs))
def __enter__(self):
return self
def __exit__(self, evalue, etype, etraceback):
pass
def __runner(self, fn, locktype, args, kwargs):
self._acquire(locktype)
try:
return fn(self._table, *args, **kwargs)
finally:
self._release(locktype)
def submit(self, fn, locktype, *args, **kwargs):
"""
Submits :code:`fn(table, *args, **kwargs)` within
the executor, returning a Future.
Parameters
----------
fn : callable
Function with signature :code:`fn(table, *args, **kwargs)`
locktype : {NOLOCK, READLOCK, WRITELOCK}
Type of lock to acquire before and release
after calling `fn`
*args :
Arguments passed to `fn`
**kwargs :
Keyword arguments passed to `fn`
Returns
-------
future : :class:`concurrent.futures.Future`
Future containing the result of :code:`fn(table, *args, **kwargs)`
"""
return self._ex.submit(self.__runner, fn, locktype, args, kwargs)
def _acquire(self, locktype):
"""
Acquire a lock on the table
Notes
-----
This should **only** be called from within the associated Executor
"""
if locktype == READLOCK:
# No locks at all, acquire readlock
if self._readlocks + self._writelocks == 0:
self._table.lock(write=False)
self._readlocks += 1
elif locktype == WRITELOCK:
if not self._writeable:
raise ValueError("Table is not writeable")
# Acquire writelock if we had none previously
if self._writelocks == 0:
self._table.lock(write=True)
self._write = True
self._writelocks += 1
elif locktype == NOLOCK:
pass
else:
raise ValueError("Invalid lock type %d" % locktype)
def _release(self, locktype):
"""
Release a lock on the table
Notes
-----
This should **only** be called from within the associated Executor
"""
if locktype == READLOCK:
self._readlocks -= 1
if self._readlocks == 0:
if self._writelocks > 0:
# Should be write-locked, check the invariant
assert self._write is True
else:
# Release all locks
self._table.unlock()
self._write = False
elif self._readlocks < 0:
raise MismatchedLocks("mismatched readlocks")
elif locktype == WRITELOCK:
self._writelocks -= 1
if self._writelocks == 0:
if self._readlocks > 0:
# Downgrade from write to read lock if
# there are any remaining readlocks
self._write = False
self._table.lock(write=False)
else:
# Release all locks
self._write = False
self._table.unlock()
elif self._writelocks < 0:
raise MismatchedLocks("mismatched writelocks")
elif locktype == NOLOCK:
pass
else:
raise ValueError("Invalid lock type %d" % locktype)
def __repr__(self):
return "TableProxy[%s](%s, %s, %s)" % (
self._ex_key,
self._factory.__name__,
",".join(str(s) for s in self._args),
",".join("%s=%s" % (str(k), str(v))
for k, v in self._kwargs.items()))
__str__ = __repr__
@normalize_token.register(TableProxy)
def _normalize_table_proxy_tokens(tp):
""" Generate tokens based on TableProxy arguments """
return (tp._factory, tp._args, tp._kwargs)