From 6f4c943b68b83fca2f5b1103df892acf9efed13f Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Fri, 5 Aug 2022 00:00:13 -0400 Subject: [PATCH 1/2] Switch from `RPi.GPIO` to `gpiozero` (#125) * Remove checks for Raspberry Pi * Replace `RPi.GPIO` with `gpiozero` * Use context manager to auto close GPIO pins * Create `async_` variants of sync GPIO and USB functions * Create `UnclosableOutputDevice` to persist the pin state * Be explicit about initial pin state * Disable closing the pin factory as well --- setup.py | 73 +++----------- tests/test_api.py | 2 - zigpy_zigate/common.py | 179 ++++++++++++++++++++-------------- zigpy_zigate/tools/flasher.py | 65 +----------- zigpy_zigate/uart.py | 4 +- 5 files changed, 125 insertions(+), 198 deletions(-) diff --git a/setup.py b/setup.py index db5ed21..81a31e8 100644 --- a/setup.py +++ b/setup.py @@ -1,80 +1,29 @@ """Setup module for zigpy-zigate""" -import os + +import pathlib from setuptools import find_packages, setup from zigpy_zigate import __version__ - -# extracted from https://raspberrypi.stackexchange.com/questions/5100/detect-that-a-python-program-is-running-on-the-pi -def is_raspberry_pi(raise_on_errors=False): - """Checks if Raspberry PI. - - :return: - """ - try: - with open('/proc/cpuinfo', 'r') as cpuinfo: - found = False - for line in cpuinfo: - if line.startswith('Hardware'): - found = True - label, value = line.strip().split(':', 1) - value = value.strip() - if value not in ( - 'BCM2708', - 'BCM2709', - 'BCM2835', - 'BCM2836' - ): - if raise_on_errors: - raise ValueError( - 'This system does not appear to be a ' - 'Raspberry Pi.' - ) - else: - return False - if not found: - if raise_on_errors: - raise ValueError( - 'Unable to determine if this system is a Raspberry Pi.' - ) - else: - return False - except IOError: - if raise_on_errors: - raise ValueError('Unable to open `/proc/cpuinfo`.') - else: - return False - - return True - - -requires = [ - 'pyserial>=3.5', - 'pyserial-asyncio>=0.5; platform_system!="Windows"', - 'pyserial-asyncio!=0.5; platform_system=="Windows"', # 0.5 broke writesv - 'pyusb>=1.1.0', - 'zigpy>=0.47.0', -] - -if is_raspberry_pi(): - requires.append('RPi.GPIO') - -this_directory = os.path.join(os.path.abspath(os.path.dirname(__file__))) -with open(os.path.join(this_directory, "README.md"), encoding="utf-8") as f: - long_description = f.read() - setup( name="zigpy-zigate", version=__version__, description="A library which communicates with ZiGate radios for zigpy", - long_description=long_description, + long_description=(pathlib.Path(__file__).parent / "README.md").read_text(), long_description_content_type="text/markdown", url="http://github.com/zigpy/zigpy-zigate", author="Sébastien RAMAGE", author_email="sebatien.ramage@gmail.com", license="GPL-3.0", packages=find_packages(exclude=['tests']), - install_requires=requires, + install_requires=[ + 'pyserial>=3.5', + 'pyserial-asyncio>=0.5; platform_system!="Windows"', + 'pyserial-asyncio!=0.5; platform_system=="Windows"', # 0.5 broke writes + 'pyusb>=1.1.0', + 'zigpy>=0.47.0', + 'gpiozero', + ], tests_require=[ 'pytest', 'pytest-asyncio', diff --git a/tests/test_api.py b/tests/test_api.py index 8da6513..578c46f 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -73,8 +73,6 @@ async def mock_conn(loop, protocol_factory, **kwargs): DEVICE_CONFIG = zigpy_zigate.config.SCHEMA_DEVICE( {zigpy_zigate.config.CONF_DEVICE_PATH: port} ) - sys.modules['RPi'] = MagicMock() - sys.modules['RPi.GPIO'] = MagicMock() res = await zigate_api.ZiGate.probe(DEVICE_CONFIG) assert res is True assert mock_raw_mode.call_count == 1 diff --git a/zigpy_zigate/common.py b/zigpy_zigate/common.py index c46ec70..254b569 100644 --- a/zigpy_zigate/common.py +++ b/zigpy_zigate/common.py @@ -1,13 +1,38 @@ import re +import time import os.path import serial.tools.list_ports import serial -import usb import logging import asyncio +from gpiozero import OutputDevice + + LOGGER = logging.getLogger(__name__) +GPIO_PIN0 = 17 +GPIO_PIN2 = 27 + + +class UnclosableOutputDevice(OutputDevice): + """ + `OutputDevice` that never closes its pins. Allows for the last-written pin state to + be retained even after the `OutputDevice` is garbage collected. + """ + + def __init__( + self, pin=None, *, active_high=True, initial_value=False, pin_factory=None + ): + super().__init__( + pin, + active_high=active_high, + initial_value=initial_value, + pin_factory=pin_factory, + ) + self._pin.close = lambda *args, **kwargs: None + self.pin_factory.close = lambda *args, **kwargs: None + def discover_port(): """ discover zigate port """ @@ -53,46 +78,44 @@ def is_zigate_wifi(port): return port.startswith('socket://') -async def set_pizigate_running_mode(): - try: - import RPi.GPIO as GPIO - LOGGER.info('Put PiZiGate in running mode') - GPIO.setmode(GPIO.BCM) - GPIO.setup(17, GPIO.OUT) # GPIO0 - GPIO.setup(27, GPIO.OUT) # GPIO2 - GPIO.output(27, GPIO.HIGH) - await asyncio.sleep(0.5) - GPIO.output(17, GPIO.LOW) - await asyncio.sleep(0.5) - GPIO.output(17, GPIO.HIGH) - await asyncio.sleep(0.5) - except Exception as e: - LOGGER.error('Unable to set PiZiGate GPIO, please check configuration') - LOGGER.error(str(e)) - - -async def set_pizigate_flashing_mode(): - try: - import RPi.GPIO as GPIO - LOGGER.info('Put PiZiGate in flashing mode') - GPIO.setmode(GPIO.BCM) - GPIO.setup(17, GPIO.OUT) # GPIO0 - GPIO.setup(27, GPIO.OUT) # GPIO2 - GPIO.output(27, GPIO.LOW) - await asyncio.sleep(0.5) - GPIO.output(17, GPIO.LOW) - await asyncio.sleep(0.5) - GPIO.output(17, GPIO.HIGH) - await asyncio.sleep(0.5) - except Exception as e: - LOGGER.error('Unable to set PiZiGate GPIO, please check configuration') - LOGGER.error(str(e)) +def set_pizigate_running_mode(): + LOGGER.info('Put PiZiGate in running mode') + + gpio0 = UnclosableOutputDevice(pin=GPIO_PIN0, initial_state=None) + gpio2 = UnclosableOutputDevice(pin=GPIO_PIN2, initial_state=None) + + gpio2.on() + time.sleep(0.5) + + gpio0.off() + time.sleep(0.5) + + gpio0.on() + time.sleep(0.5) + + +def set_pizigate_flashing_mode(): + LOGGER.info('Put PiZiGate in flashing mode') + + gpio0 = UnclosableOutputDevice(pin=GPIO_PIN0, initial_state=None) + gpio2 = UnclosableOutputDevice(pin=GPIO_PIN2, initial_state=None) + + gpio2.off() + time.sleep(0.5) + + gpio0.off() + time.sleep(0.5) + + gpio0.on() + time.sleep(0.5) def ftdi_set_bitmode(dev, bitmask): ''' Set mode for ZiGate DIN module ''' + import usb + BITMODE_CBUS = 0x20 SIO_SET_BITMODE_REQUEST = 0x0b bmRequestType = usb.util.build_request_type(usb.util.CTRL_OUT, @@ -102,39 +125,53 @@ def ftdi_set_bitmode(dev, bitmask): dev.ctrl_transfer(bmRequestType, SIO_SET_BITMODE_REQUEST, wValue) -async def set_zigatedin_running_mode(): - try: - dev = usb.core.find(idVendor=0x0403, idProduct=0x6001) - if not dev: - LOGGER.error('ZiGate DIN not found.') - return - LOGGER.info('Put ZiGate DIN in running mode') - ftdi_set_bitmode(dev, 0xC8) - await asyncio.sleep(0.5) - ftdi_set_bitmode(dev, 0xCC) - await asyncio.sleep(0.5) - except Exception as e: - LOGGER.error('Unable to set FTDI bitmode, please check configuration') - LOGGER.error(str(e)) - - -async def set_zigatedin_flashing_mode(): - try: - dev = usb.core.find(idVendor=0x0403, idProduct=0x6001) - if not dev: - LOGGER.error('ZiGate DIN not found.') - return - LOGGER.info('Put ZiGate DIN in flashing mode') - ftdi_set_bitmode(dev, 0x00) - await asyncio.sleep(0.5) - ftdi_set_bitmode(dev, 0xCC) - await asyncio.sleep(0.5) - ftdi_set_bitmode(dev, 0xC0) - await asyncio.sleep(0.5) - ftdi_set_bitmode(dev, 0xC4) - await asyncio.sleep(0.5) - ftdi_set_bitmode(dev, 0xCC) - await asyncio.sleep(0.5) - except Exception as e: - LOGGER.error('Unable to set FTDI bitmode, please check configuration') - LOGGER.error(str(e)) +def set_zigatedin_running_mode(): + import usb + + dev = usb.core.find(idVendor=0x0403, idProduct=0x6001) + if not dev: + raise RuntimeError('ZiGate DIN not found.') + + LOGGER.info('Put ZiGate DIN in running mode') + ftdi_set_bitmode(dev, 0xC8) + time.sleep(0.5) + ftdi_set_bitmode(dev, 0xCC) + time.sleep(0.5) + + +def set_zigatedin_flashing_mode(): + import usb + + dev = usb.core.find(idVendor=0x0403, idProduct=0x6001) + if not dev: + raise RuntimeError('ZiGate DIN not found.') + + LOGGER.info('Put ZiGate DIN in flashing mode') + ftdi_set_bitmode(dev, 0x00) + time.sleep(0.5) + ftdi_set_bitmode(dev, 0xCC) + time.sleep(0.5) + ftdi_set_bitmode(dev, 0xC0) + time.sleep(0.5) + ftdi_set_bitmode(dev, 0xC4) + time.sleep(0.5) + ftdi_set_bitmode(dev, 0xCC) + time.sleep(0.5) + + +def async_run_in_executor(function): + """Decorator to make a sync function async.""" + + async def replacement(*args): + return asyncio.get_running_loop().run_in_executor(None, function, *args) + + replacement._sync_func = function + + return replacement + + +# Create async version of all of the above functions +async_set_pizigate_running_mode = async_run_in_executor(set_pizigate_running_mode) +async_set_pizigate_flashing_mode = async_run_in_executor(set_pizigate_flashing_mode) +async_set_zigatedin_running_mode = async_run_in_executor(set_zigatedin_running_mode) +async_set_zigatedin_flashing_mode = async_run_in_executor(set_zigatedin_flashing_mode) \ No newline at end of file diff --git a/zigpy_zigate/tools/flasher.py b/zigpy_zigate/tools/flasher.py index 1569fe1..d3bc81a 100644 --- a/zigpy_zigate/tools/flasher.py +++ b/zigpy_zigate/tools/flasher.py @@ -20,17 +20,6 @@ import time import serial from serial.tools.list_ports import comports -try: - import RPi.GPIO as GPIO -except Exception: - # Fake GPIO - class GPIO: - def fake(self, *args, **kwargs): - pass - - def __getattr__(self, *args, **kwargs): - return self.fake - GPIO = GPIO() import usb @@ -419,19 +408,6 @@ def upgrade_firmware(port): print('ZiGate flashed with {}'.format(firmware_path)) -def ftdi_set_bitmode(dev, bitmask): - ''' - Set mode for ZiGate DIN module - ''' - BITMODE_CBUS = 0x20 - SIO_SET_BITMODE_REQUEST = 0x0b - bmRequestType = usb.util.build_request_type(usb.util.CTRL_OUT, - usb.util.CTRL_TYPE_VENDOR, - usb.util.CTRL_RECIPIENT_DEVICE) - wValue = bitmask | (BITMODE_CBUS << BITMODE_CBUS) - dev.ctrl_transfer(bmRequestType, SIO_SET_BITMODE_REQUEST, wValue) - - def main(): ports_available = [port for (port, _, _) in sorted(comports())] parser = argparse.ArgumentParser() @@ -452,33 +428,9 @@ def main(): LOGGER.setLevel(logging.DEBUG) if args.gpio: - LOGGER.info('Put PiZiGate in flash mode') - GPIO.setmode(GPIO.BCM) - GPIO.setup(27, GPIO.OUT) # GPIO2 - GPIO.output(27, GPIO.LOW) # GPIO2 - GPIO.setup(17, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) # GPIO0 - time.sleep(0.5) - GPIO.setup(17, GPIO.IN, pull_up_down=GPIO.PUD_UP) # GPIO0 - time.sleep(0.5) + c.set_pizigate_flashing_mode() elif args.din: - LOGGER.info('Put ZiGate DIN in flash mode') - dev = usb.core.find(idVendor=0x0403, idProduct=0x6001) - if not dev: - LOGGER.error('ZiGate DIN not found.') - return - ftdi_set_bitmode(dev, 0x00) - time.sleep(0.5) - # Set CBUS2/3 high... - ftdi_set_bitmode(dev, 0xCC) - time.sleep(0.5) - # Set CBUS2/3 low... - ftdi_set_bitmode(dev, 0xC0) - time.sleep(0.5) - ftdi_set_bitmode(dev, 0xC4) - time.sleep(0.5) - # Set CBUS2/3 back to tristate - ftdi_set_bitmode(dev, 0xCC) - time.sleep(0.5) + c.set_zigatedin_flashing_mode() if args.upgrade: upgrade_firmware(args.serialport) @@ -508,18 +460,9 @@ def main(): # erase_EEPROM(ser, args.pdm_only) if args.gpio: - LOGGER.info('Put PiZiGate in running mode') - GPIO.output(27, GPIO.HIGH) # GPIO2 - GPIO.setup(17, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) # GPIO0 - time.sleep(0.5) - GPIO.setup(17, GPIO.IN, pull_up_down=GPIO.PUD_UP) # GPIO0 - time.sleep(0.5) + c.set_pizigate_running_mode() elif args.din: - LOGGER.info('Put ZiGate DIN in running mode') - ftdi_set_bitmode(dev, 0xC8) - time.sleep(0.5) - ftdi_set_bitmode(dev, 0xCC) - time.sleep(0.5) + c.set_zigatedin_running_mode() if __name__ == "__main__": diff --git a/zigpy_zigate/uart.py b/zigpy_zigate/uart.py index b7eb41b..cc69936 100644 --- a/zigpy_zigate/uart.py +++ b/zigpy_zigate/uart.py @@ -156,13 +156,13 @@ async def connect(device_config: Dict[str, Any], api, loop=None): else: if c.is_pizigate(port): LOGGER.debug('PiZiGate detected') - await c.set_pizigate_running_mode() + await c.async_set_pizigate_running_mode() # in case of pizigate:/dev/ttyAMA0 syntax if port.startswith('pizigate:'): port = port[9:] elif c.is_zigate_din(port): LOGGER.debug('ZiGate USB DIN detected') - await c.set_zigatedin_running_mode() + await c.async_set_zigatedin_running_mode() _, protocol = await serial_asyncio.create_serial_connection( loop, From 9029bec0d0b42bb0252676b02f044cb8b2cec4a6 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Fri, 5 Aug 2022 00:01:53 -0400 Subject: [PATCH 2/2] 0.9.1 version bump --- zigpy_zigate/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zigpy_zigate/__init__.py b/zigpy_zigate/__init__.py index 12cae53..ba58a4d 100644 --- a/zigpy_zigate/__init__.py +++ b/zigpy_zigate/__init__.py @@ -1,5 +1,5 @@ MAJOR_VERSION = 0 MINOR_VERSION = 9 -PATCH_VERSION = '0' +PATCH_VERSION = '1' __short_version__ = '{}.{}'.format(MAJOR_VERSION, MINOR_VERSION) __version__ = '{}.{}'.format(__short_version__, PATCH_VERSION)