Skip to content

Commit

Permalink
Merge pull request #37 from mnot/close
Browse files Browse the repository at this point in the history
Close
  • Loading branch information
mnot committed Sep 30, 2016
2 parents b402f7b + 7e75aa0 commit 930b806
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 83 deletions.
1 change: 1 addition & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ exclude_lines =
raise AssertionError
raise NotImplementedError
if __name__ == .__main__.:
raise
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ install:
- pip install nose
# command to run tests
script:
nosetests -v --with-coverage --cover-package thor
nosetests -v --with-coverage --cover-package thor test/test_*.py
after_success:
coveralls
26 changes: 13 additions & 13 deletions test/test_loop.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ def __init__(self, testcase, loop):
thor.loop.EventSource.__init__(self, loop)
self.testcase = testcase
self.r_fd, self.w_fd = make_fifo('tmp_fifo')
self.on('writable', self.write)
self.register_fd(self.w_fd, 'writable')
self.on('fd_writable', self.write)
self.register_fd(self.w_fd, 'fd_writable')

def write(self):
self.testcase.assertTrue(self._loop.running)
Expand Down Expand Up @@ -136,30 +136,30 @@ def test_EventSource_unregister(self):
self.assertFalse(self.r_fd in list(self.loop._fd_targets))

def test_EventSource_event_del(self):
self.es.register_fd(self.r_fd, 'readable')
self.es.on('readable', self.readable_check)
self.es.event_del('readable')
self.es.register_fd(self.r_fd, 'fd_readable')
self.es.on('fd_readable', self.readable_check)
self.es.event_del('fd_readable')
os.write(self.w_fd, b'foo')
self.loop._run_fd_events()
self.assertFalse('readable' in self.events_seen)
self.assertFalse('fd_readable' in self.events_seen)

def test_EventSource_readable(self):
self.es.register_fd(self.r_fd, 'readable')
self.es.on('readable', self.readable_check)
self.es.register_fd(self.r_fd, 'fd_readable')
self.es.on('fd_readable', self.readable_check)
os.write(self.w_fd, b"foo")
self.loop._run_fd_events()
self.assertTrue('readable' in self.events_seen)
self.assertTrue('fd_readable' in self.events_seen)

def test_EventSource_not_readable(self):
self.es.register_fd(self.r_fd, 'readable')
self.es.on('readable', self.readable_check)
self.es.register_fd(self.r_fd, 'fd_readable')
self.es.on('fd_readable', self.readable_check)
self.loop._run_fd_events()
self.assertFalse('readable' in self.events_seen)
self.assertFalse('fd_readable' in self.events_seen)

def readable_check(self, check=b"foo"):
data = os.read(self.r_fd, 5)
self.assertEqual(data, check)
self.events_seen.append('readable')
self.events_seen.append('fd_readable')

# def test_EventSource_close(self):
# self.es.register_fd(self.fd, 'close')
Expand Down
1 change: 0 additions & 1 deletion thor/http/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ def _attach_conn(self, origin, handle_connect, handle_connect_error, connect_tim
def _release_conn(self, tcp_conn, scheme):
"Add an idle connection back to the pool."
tcp_conn.removeListeners('close')
tcp_conn.on('close', tcp_conn.handle_close)
origin = (scheme, tcp_conn.host, tcp_conn.port)
if tcp_conn.tcp_connected:
def idle_close():
Expand Down
22 changes: 11 additions & 11 deletions thor/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,10 @@ class PollLoop(LoopBase):
def __init__(self, *args):
# pylint: disable=E1101
self._event_types = {
select.POLLIN: 'readable',
select.POLLOUT: 'writable',
select.POLLERR: 'error',
select.POLLHUP: 'close'}
select.POLLIN: 'fd_readable',
select.POLLOUT: 'fd_writable',
select.POLLERR: 'fd_error',
select.POLLHUP: 'fd_close'}
# select.POLLNVAL - TODO

LoopBase.__init__(self, *args)
Expand Down Expand Up @@ -278,10 +278,10 @@ class EpollLoop(LoopBase):
def __init__(self, *args):
# pylint: disable=E1101
self._event_types = {
select.EPOLLIN: 'readable',
select.EPOLLOUT: 'writable',
select.EPOLLHUP: 'close',
select.EPOLLERR: 'error'
select.EPOLLIN: 'fd_readable',
select.EPOLLOUT: 'fd_writable',
select.EPOLLHUP: 'fd_close',
select.EPOLLERR: 'fd_error'
}
LoopBase.__init__(self, *args)
self._epoll = select.epoll()
Expand Down Expand Up @@ -323,8 +323,8 @@ class KqueueLoop(LoopBase):
"""
def __init__(self, *args):
self._event_types = {
select.KQ_FILTER_READ: 'readable',
select.KQ_FILTER_WRITE: 'writable'}
select.KQ_FILTER_READ: 'fd_readable',
select.KQ_FILTER_WRITE: 'fd_writable'}
LoopBase.__init__(self, *args)
self.max_ev = 50 # maximum number of events to pull from the queue
self._kq = select.kqueue()
Expand Down Expand Up @@ -364,7 +364,7 @@ def _run_fd_events(self):
for event_type in event_types:
self._fd_event(event_type, int(e.ident))
if e.flags & select.KQ_EV_EOF:
self._fd_event('close', int(e.ident))
self._fd_event('fd_close', int(e.ident))
if e.flags & select.KQ_EV_ERROR:
pass
# TODO: pull errors, etc. out of flags and fflags
Expand Down
85 changes: 37 additions & 48 deletions thor/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,11 @@ class TcpConnection(EventSource):
write_bufsize = 16
read_bufsize = 1024 * 16

_block_errs = set([
errno.EAGAIN, errno.EWOULDBLOCK, errno.ETIMEDOUT
])
_block_errs = set([errno.EAGAIN, errno.EWOULDBLOCK, errno.ETIMEDOUT])
_close_errs = set([
errno.EBADF, errno.ECONNRESET, errno.ESHUTDOWN,
errno.ECONNABORTED, errno.ECONNREFUSED,
errno.ENOTCONN, errno.EPIPE
])
errno.ENOTCONN, errno.EPIPE])

def __init__(self, sock, host, port, loop=None):
EventSource.__init__(self, loop)
Expand All @@ -106,9 +103,9 @@ def __init__(self, sock, host, port, loop=None):
self._write_buffer = []

self.register_fd(sock.fileno())
self.on('readable', self.handle_read)
self.on('writable', self.handle_write)
self.on('close', self.handle_close)
self.on('fd_readable', self.handle_read)
self.on('fd_writable', self.handle_write)
self.on('fd_close', self._handle_close)

def __repr__(self):
status = [self.__class__.__module__ + "." + self.__class__.__name__]
Expand All @@ -127,23 +124,20 @@ def __repr__(self):
def handle_read(self):
"The connection has data read for reading"
try:
# TODO: look into recv_into (but see python issue7827)
data = self.socket.recv(self.read_bufsize)
except (socket.error, OSError) as why:
if why.args[0] in self._block_errs:
return
elif why.args[0] in self._close_errs:
self.emit('close')
self._handle_close()
return
else:
raise
if data == b"":
self.emit('close')
self._handle_close()
else:
self.emit('data', data)

# TODO: try using buffer; see
# http://itamarst.org/writings/pycon05/fast.html
def handle_write(self):
"The connection is ready for writing; write any buffered data."
if len(self._write_buffer) > 0:
Expand All @@ -154,50 +148,39 @@ def handle_write(self):
if why.args[0] in self._block_errs:
return
elif why.args[0] in self._close_errs:
self.emit('close')
self._handle_close()
return
else:
raise
if sent < len(data):
self._write_buffer = [data[sent:]]
else:
self._write_buffer = []
if self._output_paused and \
len(self._write_buffer) < self.write_bufsize:
if self._output_paused and len(self._write_buffer) < self.write_bufsize:
self._output_paused = False
self.emit('pause', False)
if self._closing:
self.close()
self._close()
if len(self._write_buffer) == 0:
self.event_del('writable')

def handle_close(self):
"""
The connection has been closed by the other side.
"""
self.tcp_connected = False
# TODO: make sure removing close doesn't cause problems.
self.removeListeners('readable', 'writable', 'close')
self.unregister_fd()
self.socket.close()
self.event_del('fd_writable')

def write(self, data):
"Write data to the connection."
self._write_buffer.append(data)
if len(self._write_buffer) > self.write_bufsize:
self._output_paused = True
self.emit('pause', True)
self.event_add('writable')
self.event_add('fd_writable')

def pause(self, paused):
"""
Temporarily stop/start reading from the connection and pushing
it to the app.
"""
if paused:
self.event_del('readable')
self.event_del('fd_readable')
else:
self.event_add('readable')
self.event_add('fd_readable')
self._input_paused = paused

def close(self):
Expand All @@ -206,10 +189,22 @@ def close(self):
if len(self._write_buffer) > 0:
self._closing = True
else:
self.handle_close()

self._close()
# TODO: should loop stop automatically close all conns?

def _handle_close(self):
"The connection has been closed by the other side."
self._close()
self.emit('close')

def _close(self):
self.tcp_connected = False
self.removeListeners('fd_readable', 'fd_writable', 'fd_close')
self.unregister_fd()
if self.socket:
self.socket.close()


class TcpServer(EventSource):
"""
An asynchronous TCP server.
Expand All @@ -229,8 +224,8 @@ def __init__(self, host, port, sock=None, loop=None):
self.host = host
self.port = port
self.sock = sock or server_listen(host, port)
self.on('readable', self.handle_accept)
self.register_fd(self.sock.fileno(), 'readable')
self.on('fd_readable', self.handle_accept)
self.register_fd(self.sock.fileno(), 'fd_readable')
schedule(0, self.emit, 'start')

def handle_accept(self):
Expand All @@ -248,7 +243,7 @@ def handle_accept(self):

def shutdown(self):
"Stop accepting requests and close the listening socket."
self.removeListeners('readable')
self.removeListeners('fd_readable')
self.sock.close()
self.emit('stop')
# TODO: emit close?
Expand Down Expand Up @@ -292,12 +287,11 @@ def __init__(self, loop=None):
self.port = None
self._timeout_ev = None
self._error_sent = False
# TODO: IPV6
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setblocking(False)
self.on('error', self.handle_conn_error)
self.register_fd(self.sock.fileno(), 'writable')
self.event_add('error')
self.on('fd_error', self.handle_conn_error)
self.register_fd(self.sock.fileno(), 'fd_writable')
self.event_add('fd_error')

def connect(self, host, port, connect_timeout=None):
"""
Expand All @@ -307,7 +301,7 @@ def connect(self, host, port, connect_timeout=None):
"""
self.host = host
self.port = port
self.on('writable', self.handle_connect)
self.on('fd_writable', self.handle_connect)
# TODO: use socket.getaddrinfo(); needs to be non-blocking.
try:
err = self.sock.connect_ex((host, port))
Expand All @@ -326,8 +320,7 @@ def connect(self, host, port, connect_timeout=None):
self.handle_conn_error,
socket.error,
socket.error(errno.ETIMEDOUT, os.strerror(errno.ETIMEDOUT)),
True
)
True)

def handle_connect(self):
self.unregister_fd()
Expand All @@ -339,9 +332,7 @@ def handle_connect(self):
if err:
self.handle_conn_error(socket.error, socket.error(err, os.strerror(err)))
else:
tcp_conn = TcpConnection(
self.sock, self.host, self.port, self._loop
)
tcp_conn = TcpConnection(self.sock, self.host, self.port, self._loop)
self.emit('connect', tcp_conn)

def handle_conn_error(self, err_type=None, why=None, close=True):
Expand Down Expand Up @@ -388,5 +379,3 @@ def echo(chunk):
conn.on('data', echo)
server.on('connect', handle_conn)
run()


10 changes: 5 additions & 5 deletions thor/tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ def __init__(self, loop=None):
def handshake(self):
try:
self.sock.do_handshake()
self.once('writable', self.handle_connect)
self.once('fd_writable', self.handle_connect)
except sys_ssl.SSLError as why:
if isinstance(why, sys_ssl.SSLWantReadError):
# if why == sys_ssl.SSL_ERROR_WANT_READ:
# self.once('readable', self.handshake)
self.once('writable', self.handshake) # Oh, Linux...
# self.once('fd_readable', self.handshake)
self.once('fd_writable', self.handshake) # Oh, Linux...
# elif why == sys_ssl.SSL_ERROR_WANT_WRITE:
elif isinstance(why, sys_ssl.SSLWantWriteError):
self.once('writable', self.handshake)
self.once('fd_writable', self.handshake)
else:
self.handle_conn_error(sys_ssl.SSLError, why)
except socket.error as why:
Expand All @@ -79,7 +79,7 @@ def connect(self, host, port, connect_timeout=None):
"""
self.host = host
self.port = port
self.once('writable', self.handshake)
self.once('fd_writable', self.handshake)
# FIXME: CAs
if self.tls_context:
self.sock = self.tls_context.wrap_socket(
Expand Down
8 changes: 4 additions & 4 deletions thor/udp.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(self, loop=None):
self.max_dgram = min((2**16 - 40), self.sock.getsockopt(
socket.SOL_SOCKET, socket.SO_SNDBUF
))
self.on('readable', self.handle_datagram)
self.on('fd_readable', self.handle_datagram)
self.register_fd(self.sock.fileno())

def __repr__(self):
Expand All @@ -59,16 +59,16 @@ def bind(self, host, port):

def shutdown(self):
"Close the listening socket."
self.removeListeners('readable')
self.removeListeners('fd_readable')
self.sock.close()
# TODO: emit close?

def pause(self, paused):
"Control incoming datagram events."
if paused:
self.event_del('readable')
self.event_del('fd_readable')
else:
self.event_add('readable')
self.event_add('fd_readable')

def send(self, datagram, host, port):
"send datagram to host:port."
Expand Down

0 comments on commit 930b806

Please sign in to comment.