Skip to content

Commit

Permalink
Further code coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
veeso committed Oct 26, 2019
1 parent 3f4883a commit 2ec6ddf
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 42 deletions.
47 changes: 5 additions & 42 deletions attila/virtual/atvirtualcommunicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def is_open(self):
:returns bool
"""
return self._device != None
return super().is_open()

def exec(self, command, timeout = None):
"""
Expand All @@ -146,44 +146,7 @@ def exec(self, command, timeout = None):
:returns tuple of (list of string, execution time ms); list: command response without line break; empty lines are ignored
:raises ATSerialPortError
"""
if not self._device:
raise ATSerialPortError("Serial port device is closed")
if not timeout:
timeout = self.default_timeout
t_start = int(time() * 1000)
if self._line_break:
self._device.write(b"%s%s" % (command.encode("utf-8"), self._line_break.encode("utf-8")))
else:
self._device.write(b"%s" % command.encode("utf-8"))
data = ""
#Set timeout to t_start + timeout seconds
t_timeout = t_start + (timeout * 1000)
t_now = t_start
data_still_available = True
sleep_time_based_on_baud = 1000 / self.baud_rate
#Try to read until there are data available and t_now < t_timeout
while t_now < t_timeout and data_still_available:
t_now = int(time() * 1000)
#Read one byte
read_byte = self._device.read()
if not read_byte:
continue
#Sleep for a while in order to give data the time to come
sleep(sleep_time_based_on_baud)
#Check if there are still data available
if self._device.in_waiting > 0:
data_still_available = True
else:
data_still_available = False
data += read_byte.decode("utf-8")
#lines = self._device.readlines()
lines = data.splitlines()
t_end = int(time() * 1000)
for i in range(len(lines)):
#lines[i] = lines[i].decode("utf-8")
#Remove newline
if re.search("(\\r|)\\n$", lines[i]):
lines[i] = re.sub("(\\r|)\\n$", "", lines[i])
#Flush input buffer
self._device.reset_input_buffer()
return (lines, t_end - t_start)
try:
return super().exec(command, timeout)
except ATSerialPortError as err:
raise err
5 changes: 5 additions & 0 deletions tests/test_virtual.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ def test_exceptions(self):
exc = VirtualSerialException(msg)
self.assertIsNotNone(str(exc))
self.assertEqual(repr(exc), msg)
#Open / Close exceptions
com = ATVirtualCommunicator("/dev/virtual", None, 1, read_callback, write_callback, in_waiting)
com.open()
with self.assertRaises(ATSerialPortError):
com.close()


if __name__ == "__main__":
Expand Down

0 comments on commit 2ec6ddf

Please sign in to comment.