Permalink
Switch branches/tags
Nothing to show
Find file
Fetching contributors…
Cannot retrieve contributors at this time
154 lines (128 sloc) 3.72 KB
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import atexit, time
from select import select
from concurrency import selectable_waiter
from threading import Thread, Lock
class Acceptor:
def __init__(self, sock, handler):
self.sock = sock
self.handler = handler
def fileno(self):
return self.sock.fileno()
def reading(self):
return True
def writing(self):
return False
def timing(self):
return None
def readable(self, selector):
sock, addr = self.sock.accept()
self.handler(sock, selector)
class Selector:
lock = Lock()
DEFAULT = None
@staticmethod
def default():
Selector.lock.acquire()
try:
if Selector.DEFAULT is None:
sel = Selector()
atexit.register(sel.stop)
sel.start()
Selector.DEFAULT = sel
return Selector.DEFAULT
finally:
Selector.lock.release()
def __init__(self):
self.selectables = set()
self.reading = set()
self.writing = set()
self.waiter = selectable_waiter()
self.reading.add(self.waiter)
self.stopped = False
self.thread = None
def wakeup(self):
self.waiter.wakeup()
def register(self, selectable):
self.selectables.add(selectable)
self.modify(selectable)
def _update(self, selectable):
if selectable.reading():
self.reading.add(selectable)
else:
self.reading.discard(selectable)
if selectable.writing():
self.writing.add(selectable)
else:
self.writing.discard(selectable)
return selectable.timing()
def modify(self, selectable):
self._update(selectable)
self.wakeup()
def unregister(self, selectable):
self.reading.discard(selectable)
self.writing.discard(selectable)
self.selectables.discard(selectable)
self.wakeup()
def start(self):
self.stopped = False
self.thread = Thread(target=self.run)
self.thread.setDaemon(True)
self.thread.start();
def run(self, idle=None, period=None):
while not self.stopped:
wakeup = None
for sel in self.selectables.copy():
t = self._update(sel)
if t is not None:
if wakeup is None:
wakeup = t
else:
wakeup = min(wakeup, t)
if wakeup is None:
timeout = period
else:
timeout = max(0, wakeup - time.time())
if period:
timeout = min(period, timeout)
rd, wr, ex = select(self.reading, self.writing, (), timeout)
is_idle = True
for sel in wr:
if sel.writing():
sel.writeable(self)
is_idle = False
for sel in rd:
if sel.reading():
sel.readable(self)
is_idle = False
now = time.time()
for sel in self.selectables.copy():
w = sel.timing()
if w is not None and now > w:
sel.timeout(self)
is_idle = False
if is_idle and idle:
idle()
def stop(self, timeout=None):
self.stopped = True
self.wakeup()
if self.thread:
self.thread.join(timeout)
self.thread = None