/
_io_kqueue.py
133 lines (114 loc) · 4.33 KB
/
_io_kqueue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import select
from contextlib import contextmanager
import attr
from .. import _core
from . import _public
@attr.s(frozen=True)
class _KqueueStatistics:
tasks_waiting = attr.ib()
monitors = attr.ib()
backend = attr.ib(default="kqueue")
@attr.s(slots=True, cmp=False, hash=False)
class KqueueIOManager:
_kqueue = attr.ib(default=attr.Factory(select.kqueue))
# {(ident, filter): Task or UnboundedQueue}
_registered = attr.ib(default=attr.Factory(dict))
def statistics(self):
tasks_waiting = 0
monitors = 0
for receiver in self._registered.values():
if type(receiver) is _core.Task:
tasks_waiting += 1
else:
monitors += 1
return _KqueueStatistics(
tasks_waiting=tasks_waiting,
monitors=monitors,
)
def close(self):
self._kqueue.close()
def handle_io(self, timeout):
# max_events must be > 0 or kqueue gets cranky
# and we generally want this to be strictly larger than the actual
# number of events we get, so that we can tell that we've gotten
# all the events in just 1 call.
max_events = len(self._registered) + 1
events = []
while True:
batch = self._kqueue.control([], max_events, timeout)
events += batch
if len(batch) < max_events:
break
else:
timeout = 0
# and loop back to the start
for event in events:
key = (event.ident, event.filter)
receiver = self._registered[key]
if event.flags & select.KQ_EV_ONESHOT:
del self._registered[key]
if type(receiver) is _core.Task:
_core.reschedule(receiver, _core.Value(event))
else:
receiver.put_nowait(event)
# kevent registration is complicated -- e.g. aio submission can
# implicitly perform a EV_ADD, and EVFILT_PROC with NOTE_TRACK will
# automatically register filters for child processes. So our lowlevel
# API is *very* low-level: we expose the kqueue itself for adding
# events or sticking into AIO submission structs, and split waiting
# off into separate methods. It's your responsibility to make sure
# that handle_io never receives an event without a corresponding
# registration! This may be challenging if you want to be careful
# about e.g. KeyboardInterrupt. Possibly this API could be improved to
# be more ergonomic...
@_public
def current_kqueue(self):
return self._kqueue
@_public
@contextmanager
def monitor_kevent(self, ident, filter):
key = (ident, filter)
if key in self._registered:
raise _core.ResourceBusyError(
"attempt to register multiple listeners for same "
"ident/filter pair"
)
q = _core.UnboundedQueue()
self._registered[key] = q
try:
yield q
finally:
del self._registered[key]
@_public
async def wait_kevent(self, ident, filter, abort_func):
key = (ident, filter)
if key in self._registered:
await _core.checkpoint()
raise _core.ResourceBusyError(
"attempt to register multiple listeners for same "
"ident/filter pair"
)
self._registered[key] = _core.current_task()
def abort(raise_cancel):
r = abort_func(raise_cancel)
if r is _core.Abort.SUCCEEDED:
del self._registered[key]
return r
return await _core.wait_task_rescheduled(abort)
async def _wait_common(self, fd, filter):
if not isinstance(fd, int):
fd = fd.fileno()
flags = select.KQ_EV_ADD | select.KQ_EV_ONESHOT
event = select.kevent(fd, filter, flags)
self._kqueue.control([event], 0)
def abort(_):
event = select.kevent(fd, filter, select.KQ_EV_DELETE)
self._kqueue.control([event], 0)
return _core.Abort.SUCCEEDED
await self.wait_kevent(fd, filter, abort)
@_public
async def wait_readable(self, fd):
await self._wait_common(fd, select.KQ_FILTER_READ)
@_public
async def wait_writable(self, fd):
await self._wait_common(fd, select.KQ_FILTER_WRITE)