From faaf9b3d02bb247fffd56fa1bf6a4e17a5b6c254 Mon Sep 17 00:00:00 2001 From: Kris Lambrechts Date: Thu, 27 Aug 2020 07:06:06 +0200 Subject: [PATCH] Re-implemented the TcpHub - Set connections to be non-blocking - Replaced the socket2hostintf dictionary with attributes on a subclass of socket.socket() - Re-connect now closes the old socket and creates a new one --- vr-xcon/xcon.py | 109 ++++++++++++++++++++++-------------------------- 1 file changed, 51 insertions(+), 58 deletions(-) diff --git a/vr-xcon/xcon.py b/vr-xcon/xcon.py index 8cc94032..cc93df67 100755 --- a/vr-xcon/xcon.py +++ b/vr-xcon/xcon.py @@ -7,6 +7,7 @@ import select import signal import socket +import _socket import struct import subprocess import sys @@ -24,6 +25,28 @@ def handle_SIGTERM(signal, frame): signal.signal(signal.SIGCHLD, handle_SIGCHLD) +class EndpointSocket(socket.socket): + def __init__(self, endpoint, *args, **kwargs): + super(EndpointSocket, self).__init__(*args, **kwargs) + self.endpoint = endpoint + + @classmethod + def copy(cls, sock): + fd = _socket.dup(sock.fileno()) + copy = cls(sock.family, sock.type, sock.proto, fileno=fd) + copy.settimeout(sock.gettimeout()) + return copy + + @property + def address(self): + hostname, interface = self.endpoint.split("/") + try: + res = socket.getaddrinfo(hostname, "100%02d" % int(interface), socket.AF_INET) + except socket.gaierror: + raise NoVR("Unable to resolve %s" % hostname) + sockaddr = res[0][4] + return sockaddr + class Tcp2Raw: def __init__(self, raw_intf = 'eth1', listen_port=10001): @@ -316,90 +339,60 @@ class TcpHub: def __init__(self): self.logger = logging.getLogger() self.sockets = [] - self.socket2hostintf = {} - - def ep2addr(self, hostintf): - """ Return address based on endpoint - """ - hostname, interface = hostintf.split("/") - - try: - res = socket.getaddrinfo(hostname, "100%02d" % int(interface), socket.AF_INET) - except socket.gaierror: - raise NoVR("Unable to resolve %s" % hostname) - sockaddr = res[0][4] - return sockaddr - - - def add_ep(self, ep): - host, interface = ep.split("/") - - remote = self.ep2addr(ep) - - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - - # dict to map back to hostname & interface - self.socket2hostintf[s] = "%s/%s" % (host, interface) + def add_ep(self, endpoint): + s = EndpointSocket(endpoint) try: - s.connect(remote) + s.connect(s.address) except: - self.logger.info("Unable to connect to %s" % self.socket2hostintf[remote]) - - # add to list of sockets + self.logger.info("unable to connect to %s" % s.endpoint) + s.setblocking(False) self.sockets.append(s) + def re_connect(self, s): + endpoint = s.endpoint + self.logger.info("re-connecting to %s ..." % endpoint) + s.close() + + # cannot remove socket from the list normally as its __repr__() changes + # when the socket is closed + for i, t in enumerate(self.sockets): + if t.endpoint == endpoint: + del self.sockets[i] + break + + self.add_ep(endpoint) def work(self): while True: try: ir,_,_ = select.select(self.sockets, [], []) except select.error as exc: + self.logger.critical("select failed %s" % str(exc)) break for i in ir: - try: buf = i.recv(2048) - except ConnectionResetError as exc: - self.logger.warning("connection dropped, reconnecting to source %s" % self.socket2hostintf[i]) - try: - i.connect(self.ep2addr(self.socket2hostintf[i])) - self.logger.debug("reconnect to %s successful" % self.socket2hostintf[i]) - except Exception as exc: - self.logger.warning("reconnect failed %s" % str(exc)) - continue - except OSError as exc: - self.logger.warning("endpoint not connected, connecting to source %s" % self.socket2hostintf[i]) - try: - i.connect(self.ep2addr(self.socket2hostintf[i])) - self.logger.debug("connect to %s successful" % self.socket2hostintf[i]) - except: - self.logger.warning("connect failed %s" % str(exc)) - continue - + except (ConnectionError, OSError): + self.logger.warning("connection error while reading from %s" % i.endpoint) + self.re_connect(i) if len(buf) == 0: - return + self.logger.warning("empty buffer for %s" % i.endpoint) + self.re_connect(i) # send to all other sockets for remote in self.sockets: - self.logger.debug("%05d bytes %s -> %s " % (len(buf), self.socket2hostintf[i], self.socket2hostintf[remote])) # don't need to send to ourselves though if i is remote: continue + self.logger.debug("%05d bytes %s -> %s " % (len(buf), i.endpoint, remote.endpoint)) try: remote.send(buf) - except BrokenPipeError: - self.logger.warning("unable to send packet %05d bytes %s -> %s due to remote being down, trying reconnect" % (len(buf), self.socket2hostintf[i], self.socket2hostintf[remote])) - try: - remote.connect(self.ep2addr(self.socket2hostintf[remote])) - self.logger.debug("connect to %s successful" % self.socket2hostintf[remote]) - except Exception as exc: - self.logger.warning("connect failed %s" % str(exc)) - continue - - + except (ConnectionError, OSError): + self.logger.warning("unable to send packet %05d bytes %s -> %s due to remote being down" % (len(buf), i.endpoint, remote.endpoint)) + self.re_connect(remote) class NoVR(Exception):