diff --git a/broker/src/tunneldigger_broker/eventloop.py b/broker/src/tunneldigger_broker/eventloop.py index d31461a..0c1af21 100644 --- a/broker/src/tunneldigger_broker/eventloop.py +++ b/broker/src/tunneldigger_broker/eventloop.py @@ -54,10 +54,10 @@ def start(self): pollable, file_object = mapping - if event & select.EPOLLIN: + if event & select.EPOLLIN or event & select.EPOLLERR or event & select.EPOLLHUP: + # If there's anything new, we read. If there was an error, the read will + # return that error so we can handle it. pollable.read(file_object) - elif event & select.EPOLLERR or event & select.EPOLLHUP: - pollable.error() except IOError: # IOError get produced by signal even. in version 3.5 this is fixed an the poll retries # TODO: in py3 it's InterruptedError diff --git a/broker/src/tunneldigger_broker/hooks.py b/broker/src/tunneldigger_broker/hooks.py index 7170104..43d5b59 100644 --- a/broker/src/tunneldigger_broker/hooks.py +++ b/broker/src/tunneldigger_broker/hooks.py @@ -50,9 +50,6 @@ def register(self, event_loop): event_loop.register(self, self.process.stderr, select.EPOLLIN) self.event_loop = event_loop - def error(self): - self.close() - def close(self): """ Closes the hook process. @@ -163,9 +160,6 @@ def run_hook(self, name, *args): except OSError as e: logger.error("Error while executing script '%s': %s" % (script, e)) - def error(self): - self.close() - def close(self): os.close(self.sigchld_fd) diff --git a/broker/src/tunneldigger_broker/network.py b/broker/src/tunneldigger_broker/network.py index 2c98503..7225192 100644 --- a/broker/src/tunneldigger_broker/network.py +++ b/broker/src/tunneldigger_broker/network.py @@ -55,8 +55,11 @@ def register(self, event_loop): event_loop.register(self, self.socket, select.EPOLLIN) self.event_loop = event_loop - def error(self): - self.close() + def error(self, direction, e): + """ + `direction` can be "reading" or "writing" + """ + logger.warning("{}: error {} socket: {}".format(self.name, direction, e)) def close(self): """ @@ -106,9 +109,6 @@ def read(timer_self, file_object): if interval is None: timer_self.close() - def error(self): - self.close() - def close(timer_self): self.event_loop.unregister(timer) self.timers.remove(timer_self) @@ -130,6 +130,7 @@ def write(self, address, data): try: self.socket.sendto(data, address) except socket.error: + self.error("writing", e) return def write_message(self, address, msg_type, msg_data=b''): @@ -167,11 +168,7 @@ def read(self, file_object): try: data, address = self.socket.recvfrom(2048) except socket.error as e: - if e.errno == errno.EMSGSIZE: - # silence these, they occur during PMTU probing - pass - else: - logger.warning("{}: error reading from socket: {}".format(self.name, e)) + self.error("reading", e) return msg_type, msg_data = protocol.parse_message(data) diff --git a/broker/src/tunneldigger_broker/tunnel.py b/broker/src/tunneldigger_broker/tunnel.py index 8b32cfd..6c9792c 100644 --- a/broker/src/tunneldigger_broker/tunnel.py +++ b/broker/src/tunneldigger_broker/tunnel.py @@ -239,16 +239,13 @@ def keepalive(self): logger.warning("%s: timed out", self.name) self.close(reason=protocol.ERROR_REASON_TIMEOUT) - def error(self): - # Read from the socket, to "consume" the error (and show it in the log) - self.read(None) - # Here we have a problem. This can indicate permanent connection failure - # (https://github.com/wlanslovenija/tunneldigger/issues/143), or it can - # indicate that we sent a packet that was too big (e.g. the PMTU probe - # reply, see https://github.com/wlanslovenija/tunneldigger/issues/171). - # To distinguish the two, we count how many consecutive errors we see without a proper message in between. - # If that reaches a threshold, we consider this error permanent and close the connection. - # PMTU discovery sends 6 probes, so 10 should be safe as a threshold. + def error(self, direction, e): + if e.errno == errno.EMSGSIZE: + # ignore these, they occur during PMTU probing + return + super(Tunnel, self).error(direction, e) + # This connection is probably dead, but we've seen connections in the wild that occasionally raise + # an error. So only kill the connection if this happens again and again. # We could just rely on the timeout, but when there's a lot of errors it seems better to # kill the connection early rather than waiting for 2 whole minutes. self.error_count += 1 @@ -347,8 +344,9 @@ def message(self, address, msg_type, msg_data, raw_length): # Update keepalive indicator. self.last_alive = time.time() - # Remember that we got a message -- reset error count for transient error tolerance. - self.error_count = 0 + # We got a message, so *something* is working. Reduce error count for transient error tolerance. + if self.error_count > 0: + self.error_count -= 1 if msg_type == protocol.CONTROL_TYPE_ERROR: # Error notification from the remote side.