Skip to content
Permalink
Browse files

add some new commands for APC

  • Loading branch information...
jabdoa2 committed Jun 13, 2019
1 parent 90b014a commit 39642c7b3540005e8a4f775805302a8e4dadb484
Showing with 189 additions and 33 deletions.
  1. +1 −0 mpf/config_spec.yaml
  2. +8 −0 mpf/platforms/lisy/defines.py
  3. +103 −28 mpf/platforms/lisy/lisy.py
  4. +77 −5 mpf/tests/test_Lisy.py
@@ -679,6 +679,7 @@ lisy:
connection: single|enum(network,serial)|network
network_port: single|int|None
network_host: single|str|None
max_led_batch_size: single|int|3
logic_blocks_common:
enable_events: event_handler|str:ms|None
disable_events: event_handler|str:ms|None
@@ -14,16 +14,22 @@ class LisyDefines:
InfoGetNumberOfDisplays = 6
InfoGetDisplayDetails = 7
InfoGetGameInfo = 8
InfoGetSwitchCount = 9

LampsGetStatusOfLamps = 10
LampsSetLampOn = 11
LampsSetLampOff = 12
FadeModernLights = 13

GetModernLightsCount = 19

SolenoidsGetStatusOfSolenoid = 20
SolenoidsSetSolenoidToOn = 21
SolenoidsSetSolenoidToOff = 22
SolenoidsPulseSolenioid = 23
SolenoidsSetSolenoidPulseTime = 24
SetSolenoidsRecycleTime = 25
PulseAndEnableSolenoidWithPWM = 26

DisplaysSetDisplay0To = 30
DisplaysSetDisplay1To = 31
@@ -42,5 +48,7 @@ class LisyDefines:
SoundTextToSpeech = 53
SoundSetVolume = 54

ConfigureHardwareRuleForSolenoid = 60

GeneralReset = 100
GeneralWatchdog = 101
@@ -1,7 +1,10 @@
"""LISY platform for System 1 and System 80."""
import asyncio
from typing import Generator, Dict, Optional, List, Any
from distutils.version import StrictVersion

from typing import Generator, Dict, Optional, List

from mpf.core.platform_batch_light_system import PlatformBatchLight, PlatformBatchLightSystem
from mpf.platforms.interfaces.driver_platform_interface import DriverPlatformInterface, PulseSettings, HoldSettings
from mpf.platforms.interfaces.hardware_sound_platform_interface import HardwareSoundPlatformInterface
from mpf.platforms.interfaces.segment_display_platform_interface import SegmentDisplaySoftwareFlashPlatformInterface
@@ -12,7 +15,7 @@

from mpf.platforms.lisy.defines import LisyDefines

from mpf.platforms.interfaces.light_platform_interface import LightPlatformSoftwareFade
from mpf.platforms.interfaces.light_platform_interface import LightPlatformSoftwareFade, LightPlatformInterface

from mpf.core.platform import SwitchPlatform, LightsPlatform, DriverPlatform, SwitchSettings, DriverSettings, \
DriverConfig, SwitchConfig, SegmentDisplaySoftwareFlashPlatform, HardwareSoundPlatform
@@ -96,6 +99,26 @@ def get_board_name(self):
return "LISY"


class LisyModernLight(PlatformBatchLight):

"""A modern light in LISY."""

__slots__ = ["platform"]

def __init__(self, number, platform, light_system):
"""Initialise Lisy Light."""
super().__init__(number, light_system)
self.platform = platform

def get_max_fade_ms(self) -> int:
"""Return max fade time."""
return 65535

def get_board_name(self):
"""Return board name."""
return "LISY"


class LisyDisplay(SegmentDisplaySoftwareFlashPlatformInterface):

"""A segment display in the LISY platform."""
@@ -142,13 +165,17 @@ def text_to_speech(self, text: str, platform_options: dict):

def set_volume(self, volume: float):
"""Set volume."""
self.platform.send_byte(LisyDefines.SoundSetVolume, bytes([int(volume * 100)]))
if self.platform.api_version >= StrictVersion("0.9"):
self.platform.send_byte(LisyDefines.SoundSetVolume, bytes([0, int(volume * 100)]))
else:
self.platform.send_byte(LisyDefines.SoundSetVolume, bytes([int(volume * 100)]))

def stop_all_sounds(self):
"""Stop all sounds."""
self.platform.send_byte(LisyDefines.SoundStopAllSounds)


# pylint: disable-msg=too-many-instance-attributes
class LisyHardwarePlatform(SwitchPlatform, LightsPlatform, DriverPlatform,
SegmentDisplaySoftwareFlashPlatform,
HardwareSoundPlatform, LogMixin):
@@ -157,7 +184,8 @@ class LisyHardwarePlatform(SwitchPlatform, LightsPlatform, DriverPlatform,

__slots__ = ["config", "_writer", "_reader", "_poll_task", "_watchdog_task", "_number_of_lamps",
"_number_of_solenoids", "_number_of_displays", "_inputs", "_coils_start_at_one",
"_bus_lock"] # type: List[str]
"_bus_lock", "api_version", "_number_of_switches", "_number_of_modern_lights",
"_light_system"] # type: List[str]

def __init__(self, machine) -> None:
"""Initialise platform."""
@@ -170,12 +198,16 @@ def __init__(self, machine) -> None:
self._number_of_lamps = None # type: Optional[int]
self._number_of_solenoids = None # type: Optional[int]
self._number_of_displays = None # type: Optional[int]
self._number_of_switches = None # type: Optional[int]
self._number_of_modern_lights = None # type: Optional[int]
self._inputs = dict() # type: Dict[str, bool]
self._coils_start_at_one = None # type: Optional[str]
self.features['max_pulse'] = 255

self.config = self.machine.config_validator.validate_config("lisy", self.machine.config['lisy'])
self._configure_device_logging_and_debug("lisy", self.config)
self.api_version = None
self._light_system = None

# pylint: disable-msg=too-many-statements
@asyncio.coroutine
@@ -232,6 +264,8 @@ def initialize(self):
self.debug_log("Connected to %s hardware. LISY version: %s. API version: %s.",
type_str, lisy_version, api_version)

self.api_version = StrictVersion(api_version.decode())

self.machine.set_machine_var("lisy_hardware", type_str)
'''machine_var: lisy_hardware
@@ -260,28 +294,61 @@ def initialize(self):
self.send_byte(LisyDefines.InfoGetNumberOfDisplays)
self._number_of_displays = yield from self.read_byte()

self.debug_log("Number of lamps: %s. Number of coils: %s. Numbers of display: %s",
self._number_of_lamps, self._number_of_solenoids, self._number_of_displays)
# get number of switches
self.send_byte(LisyDefines.InfoGetSwitchCount)
self._number_of_switches = yield from self.read_byte()

if self.api_version >= StrictVersion("0.9"):
# get number of modern lights
self.send_byte(LisyDefines.GetModernLightsCount)
self._number_of_modern_lights = yield from self.read_byte()
else:
self._number_of_modern_lights = 0

if self._number_of_modern_lights > 0:
self._light_system = PlatformBatchLightSystem(self.machine.clock, lambda x: x.number,
lambda x, y: x.number + 1 == y.number,
self._send_multiple_light_update,
self.machine.machine_config['mpf'][
'default_light_hw_update_hz'],
self.config['max_led_batch_size'])
self._light_system.start()

self.debug_log("Number of lamps: %s. Number of coils: %s. Numbers of display: %s. Number of switches: %s",
self._number_of_lamps, self._number_of_solenoids, self._number_of_displays,
self._number_of_switches)

# initially read all switches
self.debug_log("Reading all switches.")
for row in range(8):
for col in range(8):
number = row * 10 + col
self.send_byte(LisyDefines.SwitchesGetStatusOfSwitch, bytes([number]))
state = yield from self.read_byte()
if state == 2:
self.warning_log("Switch %s does not exist in platform.", number)
elif state > 2:
raise AssertionError("Invalid switch {}. Got response: {}".format(number, state))

self._inputs[str(number)] = state == 1
for number in range(self._number_of_switches):
self.send_byte(LisyDefines.SwitchesGetStatusOfSwitch, bytes([number]))
state = yield from self.read_byte()
if state == 2:
self.warning_log("Switch %s does not exist in platform.", number)
elif state > 2:
raise AssertionError("Invalid switch {}. Got response: {}".format(number, state))

self._inputs[str(number)] = state == 1

self._watchdog_task = self.machine.clock.loop.create_task(self._watchdog())
self._watchdog_task.add_done_callback(self._done)

self.debug_log("Init of LISY done.")

@asyncio.coroutine
def _send_multiple_light_update(self, sequential_brightness_list):
common_fade_ms = sequential_brightness_list[0][2]
if common_fade_ms < 0:
common_fade_ms = 0
fade_time = int(common_fade_ms)

data = bytearray([sequential_brightness_list[0][0].number, int(fade_time / 255), int(fade_time & 0xFF),
len(sequential_brightness_list)])
for _, brightness, _ in sequential_brightness_list:
data.append(int(255 * brightness))

self.send_byte(LisyDefines.FadeModernLights, data)

@asyncio.coroutine
def start(self):
"""Start reading switch changes."""
@@ -367,21 +434,29 @@ def clear_hw_rule(self, switch: SwitchSettings, coil: DriverSettings):
"""No rules on LISY."""
raise AssertionError("Hardware rules are not support in LISY.")

def configure_light(self, number: str, subtype: str, platform_settings: dict) -> LightPlatformSoftwareFade:
def configure_light(self, number: str, subtype: str, platform_settings: dict) -> LightPlatformInterface:
"""Configure light on LISY."""
del platform_settings, subtype
del platform_settings
assert self._number_of_lamps is not None

if not self._coils_start_at_one:
if 0 < int(number) >= self._number_of_lamps:
raise AssertionError("LISY only has {} lamps. Cannot configure lamp {} (zero indexed).".
format(self._number_of_lamps, number))
if subtype is None or subtype == "matrix":
if not self._coils_start_at_one:
if 0 < int(number) >= self._number_of_lamps:
raise AssertionError("LISY only has {} lamps. Cannot configure lamp {} (zero indexed).".
format(self._number_of_lamps, number))
else:
if 1 < int(number) > self._number_of_lamps:
raise AssertionError("LISY only has {} lamps. Cannot configure lamp {} (one indexed).".
format(self._number_of_lamps, number))

return LisySimpleLamp(int(number), self)
elif subtype == "light":
if 0 < int(number) >= self._number_of_modern_lights:
raise AssertionError("LISY only has {} modern lights. Cannot configure light {}.".
format(self._number_of_modern_lights, number))
return LisyModernLight(int(number), self, self._light_system)
else:
if 1 < int(number) > self._number_of_lamps:
raise AssertionError("LISY only has {} lamps. Cannot configure lamp {} (one indexed).".
format(self._number_of_lamps, number))

return LisySimpleLamp(int(number), self)
raise self.raise_config_error("Invalid subtype {}".format(subtype), 1)

def parse_light_number_to_channels(self, number: str, subtype: str):
"""Return a single light."""
@@ -94,20 +94,22 @@ def setUp(self):
self.serialMock.expected_commands = {
b'\x00': b'LISY1\00', # hw LISY1
b'\x01': b'4.01\00', # lisy version
b'\x02': b'0.05\00', # api version
b'\x02': b'0.08\00', # api version
b'\x64': b'\x00', # reset -> ok
b'\x03': b'\x28', # get number of lamps -> 40
b'\x04': b'\x09', # get number of solenoids -> 9
b'\x06': b'\x05', # get number of displays -> 5
b'\x09': b'\x58', # get number of switches -> 88
b'\x1e\x00': None, # clear display
b'\x1f\x00': None, # clear display
b'\x20\x00': None, # clear display
}

for row in range(8):
for col in range(8):
self.serialMock.expected_commands[bytes([40, row * 10 + col])] = b'\x00'\
if row * 10 + col != 37 else b'\x01'
for number in range(88):
if number % 10 >= 8:
self.serialMock.expected_commands[bytes([40, number])] = b'\x02'
else:
self.serialMock.expected_commands[bytes([40, number])] = b'\x00' if number != 37 else b'\x01'

super().setUp()

@@ -416,3 +418,73 @@ def test_system11(self):
self.advance_time_and_run(.2)
self._wait_for_processing()
self.assertFalse(self.serialMock.expected_commands)


class TestAPC(MpfTestCase):

def getConfigFile(self):
return 'config.yaml'

def getMachinePath(self):
return 'tests/machine_files/apc/'

def _mock_loop(self):
self.clock.mock_serial("com1", self.serialMock)

def tearDown(self):
self.assertFalse(self.serialMock.crashed)
super().tearDown()

def get_platform(self):
return False

def _wait_for_processing(self):
start = time.time()
while self.serialMock.expected_commands and not self.serialMock.crashed and time.time() < start + 10:
self.advance_time_and_run(.01)

def setUp(self):
if sys.version_info[0] == 3 and sys.version_info[1] == 4:
# this fails on python 3.4 because of some asyncio bugs
self.skipTest("Test is unstable in Python 3.4")
return
self.expected_duration = 1.5
self.serialMock = MockLisySocket()

self.serialMock.permanent_commands = {
b'\x29': b'\x7F', # changed switches? -> no
b'\x65': b'\x00' # watchdog
}

self.serialMock.expected_commands = {
b'\x00': b'APC\00', # hw APC
b'\x01': b'0.02\00', # APC version
b'\x02': b'0.09\00', # api version
b'\x64': b'\x00', # reset -> ok
b'\x03': b'\x28', # get number of lamps -> 40
b'\x04': b'\x09', # get number of solenoids -> 9
b'\x06': b'\x05', # get number of displays -> 5
b'\x09': b'\x58', # get number of switches -> 88
b'\x13': b'\x00', # get number of modern lights -> 0
b'\x1e\x00': None, # clear display
b'\x1f\x00': None, # clear display
b'\x20\x00': None, # clear display
}

for number in range(88):
if number % 10 >= 8:
self.serialMock.expected_commands[bytes([40, number])] = b'\x02'
else:
self.serialMock.expected_commands[bytes([40, number])] = b'\x00' if number != 37 else b'\x01'

super().setUp()

self._wait_for_processing()
self.assertFalse(self.serialMock.expected_commands)

def test_platform(self):
# wait for watchdog
self.serialMock.expected_commands = {
b'\x65': b'\x00' # watchdog
}
self._wait_for_processing()

0 comments on commit 39642c7

Please sign in to comment.
You can’t perform that action at this time.