Skip to content

Commit

Permalink
some bugfixes, improve ptls support, logging of things that should no…
Browse files Browse the repository at this point in the history
…t happen
  • Loading branch information
rep committed Oct 8, 2012
1 parent 199fc20 commit 9a705c4
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 15 deletions.
39 changes: 25 additions & 14 deletions py/pwrcall/gev.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,15 @@ def __init__(self, sock):
def read(self):
try:
return self.sock.recv(BUFSIZE)
except pwrtls.pwrtls_closed:
return ''
except socket.error:
self.close()
return ''
def write(self, data):
try: return self.sock.send(data)
except pwrtls.pwrtls_closed:
return ''
except socket.error:
self.close()
return 0
Expand All @@ -59,7 +63,7 @@ def _close(self, e):
self._closed = True
self._event('close', e)
def close(self):
self._close(EVException('Connection closed.'))
if not self._closed: self._close(EVException('Connection closed.'))


class Node(rpcnode.Node):
Expand All @@ -77,20 +81,17 @@ def connect(self, host, port):

c = gevent.socket.create_connection((host, port))

rc = RPCConnection(c, (host, port), self)
self._event('connection', rc, (host, port))
return rc
return self._new_conn(c, (host, port))

def connectPTLS(self, host, port):
def connectPTLS(self, host, port, statepath=None):
logging.info('Connecting to, {0}:{1}'.format(host, port))
if not statepath: raise NodeException('PTLS needs statepath!')

c = gevent.socket.create_connection((host, port))
c = pwrtls.wrap_socket(c, **state)
c = pwrtls.wrap_socket(c, **pwrtls.state_file(statepath))
c.do_handshake()

rc = RPCConnection(c, (host, port), self)
self._event('connection', rc, (host, port))
return rc
return self._new_conn(c, (host, port))

def listen(self, host='', port=0, backlog_limit=5):
def handle(sock, addr):
Expand All @@ -114,7 +115,8 @@ def handle(socket, addr):
def _new_conn(self, c, addr):
rc = RPCConnection(c, addr, self)
self._event('connection', rc, addr)
logging.info('New connection from {0}'.format(addr))
logging.info('New connection: {0}'.format(addr))
return rc

def establish(self, url):
try:
Expand Down Expand Up @@ -171,23 +173,29 @@ def ready(self):
self.negotiated = True
logging.info("Connected to remote {0} ({1}).".format(self.peerfp, self.remote_info))
self._event('ready')
gevent.spawn(self.keepalive)
self.alivegreenlet = gevent.spawn(self.keepalive)
self.handlegreenlet = gevent.spawn(self.handle)

def keepalive(self):
while True:
with gevent.Timeout(self.node.timeoutseconds, ConnTimeout) as timeout:
try: r = self.call('%ping', 'ping')
try:
r = self.call('%ping', 'ping')
gevent.sleep(self.node.timeoutseconds-2)
except ConnTimeout:
return self.logclose('Connection timeout.')
except CallException:
# this is normal, as ping does not exist
pass
except NodeException as e:
# maybe connection was already closed
logging.info('Keepalive: {0}'.format(e))
break
except Exception as e:
logging.debug('Exception in keepalive: {0}'.format(e))
traceback.print_exc()
break
gevent.sleep(self.node.timeoutseconds-2)
if self.conn._closed: break

def call(self, ref, method, *params):
self.last_msgid += 1
Expand Down Expand Up @@ -231,6 +239,7 @@ def handle(self):
return self.logclose('Invalid opcode. Dropping connection.')

data = self.conn.read()
return self.logclose('handle finished.')

def handler_exception(self, *args, **kwargs):
print 'handler exception:', args, kwargs
Expand Down Expand Up @@ -279,7 +288,9 @@ def close(self, reason=None):
def closed(self, reason):
self.node._remove_connection(self)
logging.info('Connection closed, {0}'.format(reason))
ne = NodeException('Connection closed, {0}'.format(reason))
for ar in self.out_requests.values():
ar.set_exception(NodeException('Connection closed, {0}'.format(reason)))
ar.set_exception(ne)
#gevent.kill(self.alivegreenlet, exception=ne)
self._event('close', reason)

3 changes: 2 additions & 1 deletion py/pwrcall/rpcnode.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ def _shutdown_request(self):

def _remove_connection(self, c):
if not self._closing:
self.connections.remove(c)
if c in self.connections: self.connections.remove(c)
else: logging.critical('connection to be removed not in self.connections!')
if c.peerfp:
self.peers.pop(c.peerfp, None)
logging.info('Disconnect by {0}'.format(c.addr))
Expand Down

0 comments on commit 9a705c4

Please sign in to comment.