Skip to content

Commit

Permalink
device_emulator: Add encoding argument to serial relay
Browse files Browse the repository at this point in the history
  • Loading branch information
BrianJKoopman committed Jun 8, 2022
1 parent c5fc6b3 commit 72c24b9
Showing 1 changed file with 28 additions and 6 deletions.
34 changes: 28 additions & 6 deletions socs/testing/device_emulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def create_device():
device = DeviceEmulator(responses)

if relay_type == 'serial':
device.create_serial_relay()
device.create_serial_relay(encoding)
elif relay_type == 'tcp':
device.create_tcp_relay(port, encoding)

Expand Down Expand Up @@ -96,7 +96,7 @@ def _setup_socat():

return proc

def create_serial_relay(self):
def create_serial_relay(self, encoding='utf-8'):
"""Create the serial relay, emulating a hardware device connected over
serial.
Expand All @@ -111,6 +111,13 @@ def create_serial_relay(self):
DeviceEmulator.define_responses() after instantiation of the DeviceEmulator
object within a given test.
Args:
encoding (str): Encoding for the messages and responses.
DeviceEmulator will try to encode and decode messages with the
given encoding. No encoding is used if set to None. That can be
useful if you need to use raw data from your hardware. Defaults
to 'utf-8'.
"""
self._type = 'serial'
self.proc = self._setup_socat()
Expand All @@ -119,7 +126,9 @@ def create_serial_relay(self):
baudrate=57600,
timeout=5,
)
bkg_read = threading.Thread(name='background', target=self._read_serial)
bkg_read = threading.Thread(name='background',
target=self._read_serial,
kwargs={'encoding': encoding})
bkg_read.start()

def _get_response(self, msg):
Expand Down Expand Up @@ -150,25 +159,38 @@ def _get_response(self, msg):

return response

def _read_serial(self):
def _read_serial(self, encoding):
"""Loop until shutdown, reading any commands sent over the relay.
Respond immediately to a command with the response in self.responses.
Args:
encoding (str): Encoding for the messages and responses. See
:func:`socs.testing.device_emulator.DeviceEmulator.create_serial_relay`
for more details.
"""
self._read = True

while self._read:
if self.ser.in_waiting > 0:
msg = self.ser.readline().strip().decode('utf-8')
msg = self.ser.readline()
if encoding:
msg = msg.strip().decode(encoding)
print(f"msg='{msg}'")

response = self._get_response(msg)

# Avoid user providing bytes-like response
if isinstance(response, bytes) and encoding is not None:
response = response.decode()

if response is None:
continue

print(f"response='{response}'")
self.ser.write((response + '\r\n').encode('utf-8'))
if encoding:
response = (response + '\r\n').encode(encoding)
self.ser.write(response)

time.sleep(0.01)

Expand Down

0 comments on commit 72c24b9

Please sign in to comment.