Skip to content

Commit

Permalink
Re-implemented the TcpHub
Browse files Browse the repository at this point in the history
- Set connections to be non-blocking
- Replaced the socket2hostintf dictionary with attributes on a subclass of socket.socket()
- Re-connect now closes the old socket and creates a new one
  • Loading branch information
klambrec committed Aug 27, 2020
1 parent 35188a4 commit faaf9b3
Showing 1 changed file with 51 additions and 58 deletions.
109 changes: 51 additions & 58 deletions vr-xcon/xcon.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import select
import signal
import socket
import _socket
import struct
import subprocess
import sys
Expand All @@ -24,6 +25,28 @@ def handle_SIGTERM(signal, frame):
signal.signal(signal.SIGCHLD, handle_SIGCHLD)


class EndpointSocket(socket.socket):
def __init__(self, endpoint, *args, **kwargs):
super(EndpointSocket, self).__init__(*args, **kwargs)
self.endpoint = endpoint

@classmethod
def copy(cls, sock):
fd = _socket.dup(sock.fileno())
copy = cls(sock.family, sock.type, sock.proto, fileno=fd)
copy.settimeout(sock.gettimeout())
return copy

@property
def address(self):
hostname, interface = self.endpoint.split("/")
try:
res = socket.getaddrinfo(hostname, "100%02d" % int(interface), socket.AF_INET)
except socket.gaierror:
raise NoVR("Unable to resolve %s" % hostname)
sockaddr = res[0][4]
return sockaddr


class Tcp2Raw:
def __init__(self, raw_intf = 'eth1', listen_port=10001):
Expand Down Expand Up @@ -316,90 +339,60 @@ class TcpHub:
def __init__(self):
self.logger = logging.getLogger()
self.sockets = []
self.socket2hostintf = {}

def ep2addr(self, hostintf):
""" Return address based on endpoint
"""
hostname, interface = hostintf.split("/")

try:
res = socket.getaddrinfo(hostname, "100%02d" % int(interface), socket.AF_INET)
except socket.gaierror:
raise NoVR("Unable to resolve %s" % hostname)
sockaddr = res[0][4]
return sockaddr


def add_ep(self, ep):
host, interface = ep.split("/")

remote = self.ep2addr(ep)

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# dict to map back to hostname & interface
self.socket2hostintf[s] = "%s/%s" % (host, interface)

def add_ep(self, endpoint):
s = EndpointSocket(endpoint)
try:
s.connect(remote)
s.connect(s.address)
except:
self.logger.info("Unable to connect to %s" % self.socket2hostintf[remote])

# add to list of sockets
self.logger.info("unable to connect to %s" % s.endpoint)
s.setblocking(False)
self.sockets.append(s)

def re_connect(self, s):
endpoint = s.endpoint
self.logger.info("re-connecting to %s ..." % endpoint)
s.close()

# cannot remove socket from the list normally as its __repr__() changes
# when the socket is closed
for i, t in enumerate(self.sockets):
if t.endpoint == endpoint:
del self.sockets[i]
break

self.add_ep(endpoint)

def work(self):
while True:
try:
ir,_,_ = select.select(self.sockets, [], [])
except select.error as exc:
self.logger.critical("select failed %s" % str(exc))
break

for i in ir:

try:
buf = i.recv(2048)
except ConnectionResetError as exc:
self.logger.warning("connection dropped, reconnecting to source %s" % self.socket2hostintf[i])
try:
i.connect(self.ep2addr(self.socket2hostintf[i]))
self.logger.debug("reconnect to %s successful" % self.socket2hostintf[i])
except Exception as exc:
self.logger.warning("reconnect failed %s" % str(exc))
continue
except OSError as exc:
self.logger.warning("endpoint not connected, connecting to source %s" % self.socket2hostintf[i])
try:
i.connect(self.ep2addr(self.socket2hostintf[i]))
self.logger.debug("connect to %s successful" % self.socket2hostintf[i])
except:
self.logger.warning("connect failed %s" % str(exc))
continue

except (ConnectionError, OSError):
self.logger.warning("connection error while reading from %s" % i.endpoint)
self.re_connect(i)
if len(buf) == 0:
return
self.logger.warning("empty buffer for %s" % i.endpoint)
self.re_connect(i)

# send to all other sockets
for remote in self.sockets:
self.logger.debug("%05d bytes %s -> %s " % (len(buf), self.socket2hostintf[i], self.socket2hostintf[remote]))
# don't need to send to ourselves though
if i is remote:
continue

self.logger.debug("%05d bytes %s -> %s " % (len(buf), i.endpoint, remote.endpoint))
try:
remote.send(buf)
except BrokenPipeError:
self.logger.warning("unable to send packet %05d bytes %s -> %s due to remote being down, trying reconnect" % (len(buf), self.socket2hostintf[i], self.socket2hostintf[remote]))
try:
remote.connect(self.ep2addr(self.socket2hostintf[remote]))
self.logger.debug("connect to %s successful" % self.socket2hostintf[remote])
except Exception as exc:
self.logger.warning("connect failed %s" % str(exc))
continue


except (ConnectionError, OSError):
self.logger.warning("unable to send packet %05d bytes %s -> %s due to remote being down" % (len(buf), i.endpoint, remote.endpoint))
self.re_connect(remote)


class NoVR(Exception):
Expand Down

0 comments on commit faaf9b3

Please sign in to comment.