-
Notifications
You must be signed in to change notification settings - Fork 1
/
context.py
111 lines (84 loc) · 3.06 KB
/
context.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
from __future__ import absolute_import, division, print_function
import inspect
from copy import copy
from toolz import curry
from six import wraps
from functools import partial
from collections import defaultdict
from cloudpickle import CloudPickler
from .lazy import Proxy
_globals = defaultdict(lambda: None)
class set_options(object):
"""Set global state within controled context
This lets you specify various global settings in a tightly controlled with
block
Valid keyword arguments currently include:
get - the scheduler to use
pool - a thread or process pool
cache - Cache to use for intermediate results
func_loads/func_dumps - loads/dumps functions for serialization of data
likely to contain functions. Defaults to
cloudpickle.loads/cloudpickle.dumps
rerun_exceptions_locally - rerun failed tasks in master process
Examples
--------
>>> with set_options(get=dask.get): # doctest: +SKIP
... x = np.array(x) # uses dask.get internally
"""
def __init__(self, **kwargs):
self.old = _globals.copy()
_globals.update(kwargs)
def __enter__(self):
return
def __exit__(self, type, value, traceback):
_globals.clear()
_globals.update(self.old)
def getter(name, k, d):
return _globals.get(name, {}).get(k, d)
@curry
def lazyargs(fn):
try:
spec = inspect.getargspec(fn)
func = fn
except TypeError:
spec = inspect.getargspec(fn.func)
func = fn.func
defaults = spec.defaults or tuple()
args = spec.args[-len(defaults):]
params = [partial(getter, name=fn.__name__, k=args[k], d=d)
for k, d in enumerate(defaults)]
# fn = wraps(fn, assigned=('__name__', '__doc__'))
func.__defaults__ = tuple(map(Proxy, params))
return fn
@curry
def options(fn, key=None, data={}):
key = key or fn.__name__
try:
spec = inspect.getargspec(fn)
except TypeError: # curried
spec = inspect.getargspec(fn.func)
defaults = spec.defaults or []
kwds = dict(zip(spec.args[-len(defaults):], defaults))
@wraps(fn, assigned=('__name__', '__doc__'))
def closure(*args, **kwargs):
envs = data or _globals[key] or {} # get values just before calling
if not spec.keywords: # if fn doesn't have kwargs
envs = {k: v for k, v in envs.items() if k in spec.args}
params = copy(kwds)
params.update(envs)
params.update(zip(spec.args[:len(args)], args))
params.update(kwargs)
return fn(**params)
return closure
@curry
def envargs(fn, prefix='', envs={}):
envs = envs or _globals['envs'] or {}
if len(prefix):
prefix += '_' # MARATHON => MARATHON_
envs = {k[len(prefix):].lower(): v
for k, v in envs.items() if k.startswith(prefix)}
return options(fn, data=envs)
def inject_addons(self):
self.save_reduce(lambda opts: set_options(**opts), (_globals,))
# register reducer to auto pickle _globals configuration
CloudPickler.inject_addons = inject_addons