Skip to content

Commit

Permalink
Replace RPi.GPIO with gpiozero
Browse files Browse the repository at this point in the history
  • Loading branch information
puddly committed Aug 1, 2022
1 parent 5e51597 commit c9a1940
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 134 deletions.
2 changes: 0 additions & 2 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ async def mock_conn(loop, protocol_factory, **kwargs):
DEVICE_CONFIG = zigpy_zigate.config.SCHEMA_DEVICE(
{zigpy_zigate.config.CONF_DEVICE_PATH: port}
)
sys.modules['RPi'] = MagicMock()
sys.modules['RPi.GPIO'] = MagicMock()
res = await zigate_api.ZiGate.probe(DEVICE_CONFIG)
assert res is True
assert mock_raw_mode.call_count == 1
Expand Down
150 changes: 79 additions & 71 deletions zigpy_zigate/common.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import re
import time
import os.path
import serial.tools.list_ports
import serial
import usb
import logging
import asyncio

Expand Down Expand Up @@ -53,46 +53,56 @@ def is_zigate_wifi(port):
return port.startswith('socket://')


async def set_pizigate_running_mode():
try:
import RPi.GPIO as GPIO
LOGGER.info('Put PiZiGate in running mode')
GPIO.setmode(GPIO.BCM)
GPIO.setup(17, GPIO.OUT) # GPIO0
GPIO.setup(27, GPIO.OUT) # GPIO2
GPIO.output(27, GPIO.HIGH)
await asyncio.sleep(0.5)
GPIO.output(17, GPIO.LOW)
await asyncio.sleep(0.5)
GPIO.output(17, GPIO.HIGH)
await asyncio.sleep(0.5)
except Exception as e:
LOGGER.error('Unable to set PiZiGate GPIO, please check configuration')
LOGGER.error(str(e))


async def set_pizigate_flashing_mode():
try:
import RPi.GPIO as GPIO
LOGGER.info('Put PiZiGate in flashing mode')
GPIO.setmode(GPIO.BCM)
GPIO.setup(17, GPIO.OUT) # GPIO0
GPIO.setup(27, GPIO.OUT) # GPIO2
GPIO.output(27, GPIO.LOW)
await asyncio.sleep(0.5)
GPIO.output(17, GPIO.LOW)
await asyncio.sleep(0.5)
GPIO.output(17, GPIO.HIGH)
await asyncio.sleep(0.5)
except Exception as e:
LOGGER.error('Unable to set PiZiGate GPIO, please check configuration')
LOGGER.error(str(e))
def async_run_in_executor(function):
"""Decorator to make a sync function async."""

async def replacement(*args):
return asyncio.get_running_loop().run_in_executor(None, function, *args)

replacement._sync_func = function

return replacement


@async_run_in_executor
def set_pizigate_running_mode():
from gpiozero import OutputDevice

LOGGER.info('Put PiZiGate in running mode')
gpio17 = OutputDevice(pin=17)
gpio27 = OutputDevice(pin=27)

gpio27.on()
time.sleep(0.5)
gpio17.off()
time.sleep(0.5)
gpio17.on()
time.sleep(0.5)


@async_run_in_executor
def set_pizigate_flashing_mode():
from gpiozero import OutputDevice

LOGGER.info('Put PiZiGate in flashing mode')
gpio17 = OutputDevice(pin=17)
gpio27 = OutputDevice(pin=27)

gpio27.off()
time.sleep(0.5)
gpio17.off()
time.sleep(0.5)
gpio17.on()
time.sleep(0.5)


@async_run_in_executor
def ftdi_set_bitmode(dev, bitmask):
'''
Set mode for ZiGate DIN module
'''
import usb

BITMODE_CBUS = 0x20
SIO_SET_BITMODE_REQUEST = 0x0b
bmRequestType = usb.util.build_request_type(usb.util.CTRL_OUT,
Expand All @@ -102,39 +112,37 @@ def ftdi_set_bitmode(dev, bitmask):
dev.ctrl_transfer(bmRequestType, SIO_SET_BITMODE_REQUEST, wValue)


async def set_zigatedin_running_mode():
try:
dev = usb.core.find(idVendor=0x0403, idProduct=0x6001)
if not dev:
LOGGER.error('ZiGate DIN not found.')
return
LOGGER.info('Put ZiGate DIN in running mode')
ftdi_set_bitmode(dev, 0xC8)
await asyncio.sleep(0.5)
ftdi_set_bitmode(dev, 0xCC)
await asyncio.sleep(0.5)
except Exception as e:
LOGGER.error('Unable to set FTDI bitmode, please check configuration')
LOGGER.error(str(e))


async def set_zigatedin_flashing_mode():
try:
dev = usb.core.find(idVendor=0x0403, idProduct=0x6001)
if not dev:
LOGGER.error('ZiGate DIN not found.')
return
LOGGER.info('Put ZiGate DIN in flashing mode')
ftdi_set_bitmode(dev, 0x00)
await asyncio.sleep(0.5)
ftdi_set_bitmode(dev, 0xCC)
await asyncio.sleep(0.5)
ftdi_set_bitmode(dev, 0xC0)
await asyncio.sleep(0.5)
ftdi_set_bitmode(dev, 0xC4)
await asyncio.sleep(0.5)
ftdi_set_bitmode(dev, 0xCC)
await asyncio.sleep(0.5)
except Exception as e:
LOGGER.error('Unable to set FTDI bitmode, please check configuration')
LOGGER.error(str(e))
@async_run_in_executor
def set_zigatedin_running_mode():
import usb

dev = usb.core.find(idVendor=0x0403, idProduct=0x6001)
if not dev:
raise RuntimeError('ZiGate DIN not found.')

LOGGER.info('Put ZiGate DIN in running mode')
ftdi_set_bitmode(dev, 0xC8)
time.sleep(0.5)
ftdi_set_bitmode(dev, 0xCC)
time.sleep(0.5)


@async_run_in_executor
def set_zigatedin_flashing_mode():
import usb

dev = usb.core.find(idVendor=0x0403, idProduct=0x6001)
if not dev:
raise RuntimeError('ZiGate DIN not found.')

LOGGER.info('Put ZiGate DIN in flashing mode')
ftdi_set_bitmode(dev, 0x00)
time.sleep(0.5)
ftdi_set_bitmode(dev, 0xCC)
time.sleep(0.5)
ftdi_set_bitmode(dev, 0xC0)
time.sleep(0.5)
ftdi_set_bitmode(dev, 0xC4)
time.sleep(0.5)
ftdi_set_bitmode(dev, 0xCC)
time.sleep(0.5)
65 changes: 4 additions & 61 deletions zigpy_zigate/tools/flasher.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,6 @@
import time
import serial
from serial.tools.list_ports import comports
try:
import RPi.GPIO as GPIO
except Exception:
# Fake GPIO
class GPIO:
def fake(self, *args, **kwargs):
pass

def __getattr__(self, *args, **kwargs):
return self.fake
GPIO = GPIO()
import usb


Expand Down Expand Up @@ -419,19 +408,6 @@ def upgrade_firmware(port):
print('ZiGate flashed with {}'.format(firmware_path))


def ftdi_set_bitmode(dev, bitmask):
'''
Set mode for ZiGate DIN module
'''
BITMODE_CBUS = 0x20
SIO_SET_BITMODE_REQUEST = 0x0b
bmRequestType = usb.util.build_request_type(usb.util.CTRL_OUT,
usb.util.CTRL_TYPE_VENDOR,
usb.util.CTRL_RECIPIENT_DEVICE)
wValue = bitmask | (BITMODE_CBUS << BITMODE_CBUS)
dev.ctrl_transfer(bmRequestType, SIO_SET_BITMODE_REQUEST, wValue)


def main():
ports_available = [port for (port, _, _) in sorted(comports())]
parser = argparse.ArgumentParser()
Expand All @@ -452,33 +428,9 @@ def main():
LOGGER.setLevel(logging.DEBUG)

if args.gpio:
LOGGER.info('Put PiZiGate in flash mode')
GPIO.setmode(GPIO.BCM)
GPIO.setup(27, GPIO.OUT) # GPIO2
GPIO.output(27, GPIO.LOW) # GPIO2
GPIO.setup(17, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) # GPIO0
time.sleep(0.5)
GPIO.setup(17, GPIO.IN, pull_up_down=GPIO.PUD_UP) # GPIO0
time.sleep(0.5)
c.set_pizigate_flashing_mode._sync_func()
elif args.din:
LOGGER.info('Put ZiGate DIN in flash mode')
dev = usb.core.find(idVendor=0x0403, idProduct=0x6001)
if not dev:
LOGGER.error('ZiGate DIN not found.')
return
ftdi_set_bitmode(dev, 0x00)
time.sleep(0.5)
# Set CBUS2/3 high...
ftdi_set_bitmode(dev, 0xCC)
time.sleep(0.5)
# Set CBUS2/3 low...
ftdi_set_bitmode(dev, 0xC0)
time.sleep(0.5)
ftdi_set_bitmode(dev, 0xC4)
time.sleep(0.5)
# Set CBUS2/3 back to tristate
ftdi_set_bitmode(dev, 0xCC)
time.sleep(0.5)
c.set_zigatedin_flashing_mode._sync_func()

if args.upgrade:
upgrade_firmware(args.serialport)
Expand Down Expand Up @@ -508,18 +460,9 @@ def main():
# erase_EEPROM(ser, args.pdm_only)

if args.gpio:
LOGGER.info('Put PiZiGate in running mode')
GPIO.output(27, GPIO.HIGH) # GPIO2
GPIO.setup(17, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) # GPIO0
time.sleep(0.5)
GPIO.setup(17, GPIO.IN, pull_up_down=GPIO.PUD_UP) # GPIO0
time.sleep(0.5)
c.set_pizigate_running_mode._sync_func()
elif args.din:
LOGGER.info('Put ZiGate DIN in running mode')
ftdi_set_bitmode(dev, 0xC8)
time.sleep(0.5)
ftdi_set_bitmode(dev, 0xCC)
time.sleep(0.5)
c.set_zigatedin_running_mode._sync_func()


if __name__ == "__main__":
Expand Down

0 comments on commit c9a1940

Please sign in to comment.