Permalink
Browse files

several changes, semi-proper EMFILE handling

needs some refactoring, but for now it needs to work like this...
  • Loading branch information...
rep committed Sep 12, 2011
1 parent 9f21c74 commit f10e065f5da0e5240a623789812f4053ce9c2870
Showing with 76 additions and 36 deletions.
  1. +76 −36 evnet/__init__.py
View
@@ -47,6 +47,7 @@ def loop(l=default_loop):
if pyev.version()[1] < '4.00': l.loop()
else: l.start()
except OSError, e:
+ traceback.print_exc()
print 'oserror', e, e.args
def unloop(l=default_loop):
@@ -57,7 +58,8 @@ def unloop(l=default_loop):
if pyev.version()[1] < '4.00': l.unloop()
else: l.stop()
-def connectssl(host, port, cert='cert2.pem'):
+def connectssl(host, port, cert=None):
+ if cert == None: raise EVException('connectssl requires a certificate.')
return ClientConnection((host,port), cert=cert)
def connectplain(host, port):
@@ -70,7 +72,8 @@ def listensock(host='', port=0, backlog_limit=5):
sock.listen(backlog_limit)
return sock
-def listenssl(host='', port=0, backlog_limit=5, cert='cert.pem'):
+def listenssl(host='', port=0, backlog_limit=5, cert=None):
+ if cert == None: raise EVException('listenssl requires a certificate.')
sock = listensock(host, port, backlog_limit)
l = ListenerSSL(sock, cert=cert)
return l
@@ -127,10 +130,16 @@ def _readable(self, watcher, events):
c = self.connclass(sock, addr)
self._event('connection', c, addr)
except IOError as e:
+ if e.errno == errno.EMFILE:
+ logging.warn('Too many open files, suspending watcher for a second.')
+ self.read_watcher.stop()
+ later(1.0, self.restartwatcher)
+ else:
+ traceback.print_exc()
+ self.close()
+ except Exception as e:
+ print 'EXC in new conn readable cb'
traceback.print_exc()
- self.read_watcher.stop()
- self.sock.close()
- self._event('close', self)
def connclass(self, sock, addr):
raise Exception('Override this!')
@@ -141,6 +150,10 @@ def close(self):
self.sock.close()
self._event('close', self)
+ def restartwatcher(self):
+ logging.warn('Restarting watcher.')
+ if not self.read_watcher.active: self.read_watcher.start()
+
class ListenerPlain(Listener):
def connclass(self, sock, addr):
@@ -195,12 +208,6 @@ def __len__(self):
class Connection(EventGen):
def __init__(self, addr, sock=None, cert=None):
EventGen.__init__(self)
- if not sock:
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- else:
- self.sock = sock
- self.sock.setblocking(False)
- #self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.addr = addr
self.cert = cert
@@ -213,13 +220,28 @@ def __init__(self, addr, sock=None, cert=None):
self.readbytes = 0
self.writebytes = 0
- self.write_watcher = pyev.Io(self.sock, pyev.EV_WRITE, default_loop, self._writable)
- self.read_watcher = pyev.Io(self.sock, pyev.EV_READ, default_loop, self._readable)
- self.write_readwatcher = pyev.Io(self.sock, pyev.EV_READ, default_loop, self._writable)
- self.read_writewatcher = pyev.Io(self.sock, pyev.EV_WRITE, default_loop, self._readable)
+ self.sock = sock
+ try:
+ if not self.sock: self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ except IOError as e:
+ if e.errno == errno.EMFILE:
+ later(0.1, self._close, 'Too many open files - not opening socket.')
+ else:
+ logging.critical('IOError when creating socket: {0}'.format(errno.errorcode[e.errno]))
+ later(0.1, self._close, 'IOError, {0}.'.format(e))
+ except Exception as e:
+ logging.critical('Exception when creating socket: {0}'.format(e))
+ later(0.1, self._close, 'Exception, {0}.'.format(e))
+ else:
+ self.sock.setblocking(False)
+ #self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+ self.write_watcher = pyev.Io(self.sock, pyev.EV_WRITE, default_loop, self._writable)
+ self.read_watcher = pyev.Io(self.sock, pyev.EV_READ, default_loop, self._readable)
+ self.write_readwatcher = pyev.Io(self.sock, pyev.EV_READ, default_loop, self._writable)
+ self.read_writewatcher = pyev.Io(self.sock, pyev.EV_WRITE, default_loop, self._readable)
- #self.ssl_shake()
- self.initiate()
+ #self.ssl_shake()
+ self.initiate()
def initiate(self):
raise EVException('Use subclass of Connection!')
@@ -233,8 +255,11 @@ def _connected(self, watcher=None, events=None):
if serr == 0:
hint(self.sock)
self.ctx = SSL.Context(SSL.SSLv23_METHOD)
- self.ctx.use_privatekey_file(self.cert)
- self.ctx.use_certificate_file(self.cert)
+ try:
+ self.ctx.use_privatekey_file(self.cert)
+ self.ctx.use_certificate_file(self.cert)
+ except SSL.Error, e:
+ return self._close('Error setting up key and certificate. Make sure the files are existing.')
self.ctx.set_verify(SSL.VERIFY_PEER, self.verify_cb)
self.sslsock = SSL.Connection(self.ctx, self.sock)
self.sslsock.setblocking(False)
@@ -274,10 +299,10 @@ def stop(self):
if self._closed:
raise EVException('Already closed.')
- if self.read_watcher.active: self.read_watcher.stop()
- if self.write_watcher.active: self.write_watcher.stop()
- if self.read_writewatcher.active: self.read_writewatcher.stop()
- if self.write_readwatcher.active: self.write_readwatcher.stop()
+ if hasattr(self, 'read_watcher') and self.read_watcher.active: self.read_watcher.stop()
+ if hasattr(self, 'write_watcher') and self.write_watcher.active: self.write_watcher.stop()
+ if hasattr(self, 'read_writewatcher') and self.read_writewatcher.active: self.read_writewatcher.stop()
+ if hasattr(self, 'write_readwatcher') and self.write_readwatcher.active: self.write_readwatcher.stop()
def write(self, data):
if self._closed:
@@ -324,6 +349,12 @@ def _writeloop(self):
self._close(EVException('Connection closed (ZeroReturn).'))
except SSL.Error as e:
self._close(EVException('SSLError {0}'.format(e)))
+ except socket.error as e:
+ if e.errno == errno.EAGAIN:
+ self.write_watcher.start()
+ return
+ else:
+ self._close(EVException('Exception {0}'.format(e)))
except Exception as e:
self._close(EVException('Exception {0}'.format(e)))
else:
@@ -376,7 +407,7 @@ def _close(self, e):
if self.sslsock: self.sslsock.shutdown()
except:
pass
- self.sock.close()
+ if self.sock: self.sock.close()
self._closed = True
self._event('close', e)
@@ -409,22 +440,31 @@ def initiate(self):
class PlainConnection(EventGen):
def __init__(self, addr, sock=None):
EventGen.__init__(self)
- if not sock:
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- else:
- self.sock = sock
- self.sock.setblocking(False)
- #self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.addr = addr
self.buf = bytearray()
self._closed = False
self._writing = False
- self.write_watcher = pyev.Io(self.sock, pyev.EV_WRITE, default_loop, self._writable)
- self.read_watcher = pyev.Io(self.sock, pyev.EV_READ, default_loop, self._readable)
+ self.sock = sock
+ try:
+ if not self.sock: self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ except IOError as e:
+ if e.errno == errno.EMFILE:
+ later(0.1, self._close, 'Too many open files - not opening socket.')
+ else:
+ logging.critical('IOError when creating socket: {0}'.format(errno.errorcode[e.errno]))
+ later(0.1, self._close, 'IOError, {0}.'.format(e))
+ except Exception as e:
+ logging.critical('Exception when creating socket: {0}'.format(e))
+ later(0.1, self._close, 'Exception, {0}.'.format(e))
+ else:
+ self.sock.setblocking(False)
+ #self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+ self.write_watcher = pyev.Io(self.sock, pyev.EV_WRITE, default_loop, self._writable)
+ self.read_watcher = pyev.Io(self.sock, pyev.EV_READ, default_loop, self._readable)
+ self.initiate()
- self.initiate()
def initiate(self):
raise EVException('Use subclass of Connection!')
@@ -444,8 +484,8 @@ def stop(self):
if self._closed:
raise EVException('Already closed.')
- if self.read_watcher.active: self.read_watcher.stop()
- if self.write_watcher.active: self.write_watcher.stop()
+ if hasattr(self, 'read_watcher') and self.read_watcher.active: self.read_watcher.stop()
+ if hasattr(self, 'write_watcher') and self.write_watcher.active: self.write_watcher.stop()
def write(self, data):
if self._closed:
@@ -515,7 +555,7 @@ def _readable(self, watcher, events):
def _close(self, e):
self.stop()
- self.sock.close()
+ if self.sock: self.sock.close()
self._closed = True
self._event('close', e)

0 comments on commit f10e065

Please sign in to comment.