/
eventlet.py
103 lines (77 loc) 路 2.4 KB
/
eventlet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import sys
import eventlet
import eventlet.event
import eventlet.queue
from pykka import Actor, Future, Timeout
__all__ = ["EventletActor", "EventletEvent", "EventletFuture"]
class EventletEvent(eventlet.event.Event):
"""
:class:`EventletEvent` adapts :class:`eventlet.event.Event` to
:class:`threading.Event` interface.
"""
def set(self):
if self.ready():
self.reset()
self.send()
def is_set(self):
return self.ready()
def clear(self):
if self.ready():
self.reset()
def wait(self, timeout=None):
if timeout is not None:
wait_timeout = eventlet.Timeout(timeout)
try:
with wait_timeout:
super().wait()
except eventlet.Timeout as t:
if t is not wait_timeout:
raise
return False
else:
self.event.wait()
return True
class EventletFuture(Future):
"""
:class:`EventletFuture` implements :class:`pykka.Future` for use with
:class:`EventletActor`.
"""
event = None
def __init__(self):
super().__init__()
self.event = eventlet.event.Event()
def get(self, timeout=None):
try:
return super().get(timeout=timeout)
except NotImplementedError:
pass
if timeout is not None:
wait_timeout = eventlet.Timeout(timeout)
try:
with wait_timeout:
return self.event.wait()
except eventlet.Timeout as t:
if t is not wait_timeout:
raise
raise Timeout(t)
else:
return self.event.wait()
def set(self, value=None):
self.event.send(value)
def set_exception(self, exc_info=None):
assert exc_info is None or len(exc_info) == 3
self.event.send_exception(*(exc_info or sys.exc_info()))
class EventletActor(Actor):
"""
:class:`EventletActor` implements :class:`pykka.Actor` using the `eventlet
<https://eventlet.net/>`_ library.
This implementation uses eventlet green threads.
"""
@staticmethod
def _create_actor_inbox():
return eventlet.queue.Queue()
@staticmethod
def _create_future():
return EventletFuture()
def _start_actor_loop(self):
eventlet.greenthread.spawn(self._actor_loop)