-
Notifications
You must be signed in to change notification settings - Fork 31
/
concurrent.py
117 lines (98 loc) · 4.28 KB
/
concurrent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from __future__ import absolute_import
import warnings
from concurrent import futures
def map_future(src, func, *args, **kw):
"""Create a future that applies a function to the result of a source future.
Args:
src (concurrent.futures.Future): The future whose result we wish to
modify.
func (callable): Function to apply to the result of src. This function
will be called like func(result, *args, **kw) where result is the
value produced by src.
args: Positional args to be passed to func.
kw: Keyword args to be passed to func.
Returns:
A new Future whose result will be the given function applied to the
result of src. If src fails with an exception, or if func raises an
exception when called, the error will be propagated to the returned
future.
"""
return MappedFuture(src, func, *args, **kw)
class MappedFuture(futures.Future):
"""A Future that receives the result or failure of another Future.
Args:
src (concurrent.future.Future): Source future whose value this future
will receive and possibly modify.
func (callable | None): Function that will be applied to the result of
src before setting it on this Future. This function will be called
like func(result, *args, **kw). If None, the result will be passed
along unchanged.
args: Positional args to be passed to func.
kw: Keyword args to be passed to func.
"""
def __init__(self, src, func, *args, **kw):
super(MappedFuture, self).__init__()
def handle_result(f):
if not self.set_running_or_notify_cancel():
return
if src.cancelled():
self.set_exception(futures.CancelledError())
return
error = src.exception()
if error is not None:
self.set_exception(error)
return
try:
result = src.result()
if func is not None:
result = func(result, *args, **kw)
self.set_result(result)
except Exception as e:
self.set_exception(e)
src.add_done_callback(handle_result)
self.__src = src
def cancel(self):
return super(MappedFuture, self).cancel() and self.__src.cancel()
class MutableFuture(MappedFuture):
"""Backwards compatible subclass of concurrent.futures.Future.
The biggest difference from standard Futures is that the old labrad Future
tries to emulate the mutable addCallback interface of twisted Deferreds. To
emulate this behavior, we keep a list of callback functions which are used
to transform the result of the future when .result() is called.
We also provide .wait() as an alias for .result(), since the old Future
class used the former to get value.
Note: this class will be removed after users have updated to use the
standard Future interface with mapping.
"""
def __init__(self, source):
super(MutableFuture, self).__init__(source, None)
self.__callbacks = []
self.__done = False
self.__result = None
self.__error = None
def addCallback(self, func, *args, **kw):
warnings.warn("addCallback is deprecated; use map_future instead.")
self.__callbacks.append((func, args, kw))
def wait(self):
"""Alias for the result() method; for backwards compatibility."""
warnings.warn(".wait() is deprecated; use .result() instead.")
return self.result()
def result(self, timeout=None):
if not self.__done:
self.__result = super(MutableFuture, self).result(timeout)
self.__done = True
if self.__error is not None:
raise self.__error
while len(self.__callbacks):
func, args, kw = self.__callbacks.pop(0)
try:
self.__result = func(self.__result, *args, **kw)
except Exception as e:
self.__error = e
raise e
return self.__result
def exception(self, timeout=None):
if not self.__done:
self.__error = super(MutableFuture, self).exception(timeout)
self.__done = True
return self.__error