Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 29 additions & 18 deletions python/sbp/client/drivers/network_drivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,23 @@ def _reconnect(self, exc):
continue
break

def _perform_io(self, io_func, validate_data=lambda _data: True):
data = None
while True:
try:
data = io_func()
except socket.timeout as socket_error:
self._reconnect(socket_error)
except socket.error as socket_error:
# this is fine, just retry
if socket_error.errno == errno.EINTR:
continue
self._reconnect(socket_error)
if not validate_data(data):
continue
break
return data

def _read(self, size):
"""
Read wrapper.
Expand All @@ -93,21 +110,16 @@ def _read(self, size):
size : int
Number of bytes to read
"""
data = ''
while True:
try:
data = self.handle.recv(size)
except socket.timeout as socket_error:
self._reconnect(socket_error)
except socket.error as socket_error:
# this is fine, just retry
if socket_error.errno == errno.EINTR:
continue
self._reconnect(IOError)

def read():
return self.handle.recv(size)

def validate_data(data):
if not data:
self._reconnect(IOError)
break
return data
return bool(data)

return self._perform_io(read, validate_data)

def flush(self):
pass
Expand All @@ -121,12 +133,11 @@ def _write(self, s):
s : bytes
Bytes to write
"""
def write():
return self.handle.sendall(s)
try:
self._write_lock.acquire()
self.handle.sendall(s)
except socket.timeout:
self._connect()
except socket.error:
raise IOError
count = self._perform_io(write)
finally:
self._write_lock.release()
return count