Skip to content
Permalink
Browse files

set DTS to low on start of LISY/APC

  • Loading branch information...
jabdoa2 committed Jun 16, 2019
1 parent 64a2aa0 commit 43f0585fcc75535435085189ec1f66128c308db5
Showing with 70 additions and 10 deletions.
  1. +1 −0 mpf/config_spec.yaml
  2. +42 −4 mpf/platforms/lisy/lisy.py
  3. +19 −5 mpf/tests/loop.py
  4. +8 −1 mpf/tests/test_Lisy.py
@@ -672,6 +672,7 @@ lisy:
port: single|str|None
baud: single|int|None
poll_hz: single|int|100
disable_dtr: single|bool|True
console_log: single|enum(none,basic,full)|none
debug: single|bool|False
display_flash_frequency: single|float|1.0
@@ -209,7 +209,35 @@ def __init__(self, machine) -> None:
self.api_version = None
self._light_system = None

def _disable_dts_on_start_of_serial(self):
"""Prevent DTS toggling when opening the serial.
This works only on unix. On other platforms import of termios will fail.
"""
try:
import termios
except ImportError:
self.warning_log("Could not import terminos (this is ok on windows).")
return
serial_port = open(self.config['port'])
attrs = termios.tcgetattr(serial_port)
attrs[2] = attrs[2] & ~termios.HUPCL
termios.tcsetattr(serial_port, termios.TCSAFLUSH, attrs)
serial_port.close()

@asyncio.coroutine
def _clear_read_buffer(self):
"""Clear read buffer."""
if hasattr(self._writer.transport, "_serial"):
# pylint: disable-msg=protected-access
self._writer.transport._serial.reset_input_buffer()
# pylint: disable-msg=protected-access
self._reader._buffer = bytearray()
# pylint: disable-msg=protected-access
self._reader._maybe_resume_transport()

# pylint: disable-msg=too-many-statements
# pylint: disable-msg=too-many-branches
@asyncio.coroutine
def initialize(self):
"""Initialise platform."""
@@ -219,20 +247,30 @@ def initialize(self):

if self.config['connection'] == "serial":
self.log.info("Connecting to %s at %sbps", self.config['port'], self.config['baud'])
if self.config['disable_dtr']:
self._disable_dts_on_start_of_serial()
connector = self.machine.clock.open_serial_connection(
url=self.config['port'], baudrate=self.config['baud'], limit=0)
url=self.config['port'], baudrate=self.config['baud'], limit=0, do_not_open=True)
else:
self.log.info("Connecting to %s:%s", self.config['network_host'], self.config['network_port'])
connector = self.machine.clock.open_connection(self.config['network_host'], self.config['network_port'])

self._reader, self._writer = yield from connector

if self.config['connection'] == "serial":
if self.config['disable_dtr']:
# pylint: disable-msg=protected-access
self._writer.transport._serial.dtr = False
# pylint: disable-msg=protected-access
self._writer.transport._serial.open()

# give the serial a few ms to read the first bytes
yield from asyncio.sleep(.1, loop=self.machine.clock.loop)

while True:
# reset platform
self.debug_log("Sending reset.")
# clear buffer
# pylint: disable-msg=protected-access
self._reader._buffer = bytearray()
yield from self._clear_read_buffer()
# send command
self.send_byte(LisyDefines.GeneralReset)
try:
@@ -82,19 +82,31 @@ class MockFd:
def __init__(self):
self.is_open = False

def open(self):
if self.is_open:
raise AssertionError("Serial already open")
self.is_open = True

def read_ready(self):
if not self.is_open:
raise AssertionError("Serial not open")
return False

def send(self, data):
if not self.is_open:
raise AssertionError("Serial not open")
return len(data)

def fileno(self):
return self

def write_ready(self):
if not self.is_open:
raise AssertionError("Serial not open")
return False

def close(self):
self.is_open = False
return


@@ -495,15 +507,16 @@ def mock_serial(self, url, serial):
"""Mock a socket and use it for connections."""
self._mock_serials[url] = serial

def _open_mock_serial(self, url):
def _open_mock_serial(self, url, do_not_open):
key = url
if key not in self._mock_serials:
raise AssertionError("serial not mocked for key {}".format(key))
serial = self._mock_serials[key]
if serial.is_open:
raise AssertionError("serial already open for key {}".format(key))
if not do_not_open:
if serial.is_open:
raise AssertionError("serial already open for key {}".format(key))

serial.is_open = True
serial.is_open = True
return serial

@coroutine
@@ -526,6 +539,7 @@ def open_serial_connection(self, limit=None, **kwargs):

reader = asyncio.StreamReader(limit=limit, loop=self.loop)
protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop)
transport = SerialTransport(self.loop, protocol, self._open_mock_serial(kwargs['url']))
transport = SerialTransport(self.loop, protocol, self._open_mock_serial(kwargs['url'],
kwargs.get("do_not_open", False)))
writer = asyncio.StreamWriter(transport, protocol, reader, self.loop)
return reader, writer
@@ -2,7 +2,9 @@

import sys

from mpf.tests.MpfTestCase import MpfTestCase, test_config
from mpf.platforms.lisy import lisy

from mpf.tests.MpfTestCase import MpfTestCase, test_config, MagicMock
from mpf.tests.loop import MockSerial, MockSocket


@@ -477,8 +479,13 @@ def setUp(self):
else:
self.serialMock.expected_commands[bytes([40, number])] = b'\x00' if number != 37 else b'\x01'

# prevent changes on real hardware
lisy.LisyHardwarePlatform._disable_dts_on_start_of_serial = MagicMock()

super().setUp()

lisy.LisyHardwarePlatform._disable_dts_on_start_of_serial.assert_called_with()

self._wait_for_processing()
self.assertFalse(self.serialMock.expected_commands)

0 comments on commit 43f0585

Please sign in to comment.
You can’t perform that action at this time.