-
Notifications
You must be signed in to change notification settings - Fork 52
/
eventloop.py
65 lines (49 loc) · 1.7 KB
/
eventloop.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import select
class EventLoop(object):
"""
An epoll-based event loop.
"""
def __init__(self):
"""
Constructs the event loop.
"""
self.pollables = {}
self.poller = select.epoll()
def register(self, pollable, file_object, flags):
"""
Registers a new pollable into the event loop.
:param pollable: Pollable instance
:param file_object: File-like object or file descriptor
:param flags: Epoll flags
"""
raw_file_object = file_object
if hasattr(file_object, 'fileno'):
file_object = file_object.fileno()
self.poller.register(file_object, flags)
self.pollables[file_object] = (pollable, raw_file_object)
def unregister(self, fd):
"""
Unregisters an existing file descriptor.
:param fd: File descriptor
"""
self.poller.unregister(fd)
del self.pollables[fd]
def start(self):
"""
Starts the event loop.
"""
while True:
try:
for fd, event in self.poller.poll():
mapping = self.pollables.get(fd, None)
if not mapping:
continue
pollable, file_object = mapping
if event & select.EPOLLIN:
pollable.read(file_object)
elif event & select.EPOLLERR or event & select.EPOLLHUP:
pollable.close()
except IOError:
# IOError get produced by signal even. in version 3.5 this is fixed an the poll retries
# TODO: in py3 it's InterruptedError
pass