Skip to content

Commit

Permalink
Add driver for Atx Led Dali Hat (AL-DALI-HAT). (#135)
Browse files Browse the repository at this point in the history
* Add driver for Atx Led Dali Hat (AL-DALI-HAT).

    This uses the Raspberry PI on-board serial port to communicate at 19200 baud to the Dali Hat,
    the hardware on the hat adapts the UART serial data stream into DALI encoding

* Remove path editing on import and __main__ from atxled.py and use read_until in read

* Remove __main__ since it serves no real purpose
  • Loading branch information
russedavid committed Mar 17, 2024
1 parent 0ebccd7 commit 15bcee3
Show file tree
Hide file tree
Showing 3 changed files with 270 additions and 0 deletions.
2 changes: 2 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ Library structure

- ``serial`` - asyncio-based driver for Lunatone LUBA RS232 interfaces

- ``atxled`` - Driver for ATX LED SERIAL DALI HAT

- ``exceptions`` - DALI related exceptions

- ``frame`` - Forward and backward frames; stable
Expand Down
154 changes: 154 additions & 0 deletions dali/driver/atxled.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
from dali.command import Command
from dali.driver.base import SyncDALIDriver, DALIDriver
from dali.frame import BackwardFrame
import logging
import sys

import serial
import threading
import time

DALI_PACKET_SIZE = {"j": 8, "h": 16, "l": 24, "m": 25}
DALI_PACKET_PREFIX = {v: k for k, v in DALI_PACKET_SIZE.items()}


class DaliHatSerialDriver(DALIDriver):
"""Driver for communicating with DALI devices over a serial connection."""

def __init__(self, port="/dev/ttyS0", LOG=None):
"""Initialize the serial connection to the DALI interface."""
self.port = port
self.lock = threading.RLock()
self.buffer = []
if not LOG:
self.LOG = logging.getLogger("AtxLedDaliDriver")
handler = logging.StreamHandler(sys.stdout)
self.LOG.addHandler(handler)
else:
self.LOG = LOG
try:
self.conn = serial.Serial(
port=self.port,
baudrate=19200,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=0.2,
)
except Exception as e:
self.LOG.exception("Could not open serial connection: %s", e)
self.conn = None

def read_line(self):
"""Read the next line from the buffer, refilling the buffer if necessary."""
with self.lock:
while not self.buffer:
line = self.conn.read_until(b"\n").decode("ascii")
if not line:
return ""
self.buffer.append(line)
return self.buffer.pop(0)

def construct(self, command):
"""Construct a DALI command to be sent over the serial connection."""
assert isinstance(command, Command)
f = command.frame
packet_size = len(f)
prefix = DALI_PACKET_PREFIX[packet_size]
if command.sendtwice and packet_size == 16:
prefix = "t"
data = "".join(["{:02X}".format(byte) for byte in f.pack])
command_str = (f"{prefix}{data}\n").encode("ascii")
return command_str

def extract(self, data):
"""Parse the response from the serial device and return the corresponding frame."""
if data.startswith("J"):
try:
data = int(data[1:], 16)
return BackwardFrame(data)
except ValueError as e:
self.LOG.error(f"Failed to parse response '{data}': {e}")
return None

def close(self):
"""Close the serial connection."""
if self.conn:
self.conn.close()


class SyncDaliHatDriver(DaliHatSerialDriver, SyncDALIDriver):
"""Synchronous DALI driver."""

def send(self, command: Command):
"""Send a command to the DALI interface and wait for a response."""
with self.lock:
lines = []
last_resp = None
send_twice = command.sendtwice
cmd = self.construct(command)
self.LOG.debug("command string sent: %r", cmd)
self.conn.write(cmd)
REPS = 5
i = 0
already_resent = False
resent_times = 0
resp = None
while i < REPS:
i += 1
resp = self.read_line()
self.LOG.debug("raw response received: %r", resp)
resend = False
if cmd[:3] not in ["hB1", "hB3", "hB5"]:
if resp and resp[0] in {"N", "J"}:
if send_twice:
if last_resp:
if last_resp == resp:
resp = self.extract(resp)
break
resend = True
last_resp = None
else:
last_resp = resp
else:
resp = self.extract(resp)
break
elif resp and resp[0] in {"X", "Z", ""}:
time.sleep(0.1)
collision_bytes = None
while collision_bytes != "":
collision_bytes = self.read_line()
if resp[0] == "X":
break
self.LOG.info(
"got conflict (%s) sending %r, sending again", resp, cmd
)
last_resp = None
resend = True
elif resp:
lines.append(resp)

resp = None
if resend and not already_resent:
self.conn.write((cmd).encode("ascii"))
REPS += 1 + send_twice
already_resent = True
else:
if resp and resp[0] == "N":
resp = self.extract(resp)
break
elif resp and resp[0] in {"X", "Z", ""}:
time.sleep(0.1)
collision_bytes = None
while collision_bytes != "":
collision_bytes = self.read_line()
elif resp:
last_resp = None
resend = True
if resend and resent_times < 5:
self.conn.write(cmd.encode("ascii"))
REPS += 1 + send_twice
resent_times += 1
if command.is_query:
return command.response(resp)
return resp
114 changes: 114 additions & 0 deletions examples/sync-dalihat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#!/usr/bin/env python3

from dali.gear.general import (
DAPC,
QueryControlGearPresent,
QueryGroupsZeroToSeven,
QueryGroupsEightToFifteen,
QueryActualLevel,
Off,
QueryMinLevel,
QueryMaxLevel,
QueryPhysicalMinimum,
)
from dali.driver.base import SyncDALIDriver
from dali.driver.atxled import SyncDaliHatDriver
from dali.address import GearShort

import logging


LOG = logging.getLogger("DaliHatTest")


class DaliHatTest:
def __init__(self, driver: SyncDALIDriver):
self.driver = driver

def scan_devices(self):
present_devices = []
for address in range(0, 64):
try:
response = self.driver.send(QueryControlGearPresent(GearShort(address)))
if response.value is True:
present_devices.append(address)
LOG.info(f"Device found at address: {address}")
else:
LOG.info(f"Response from address {address}: {response.value}")

except Exception as e:
LOG.info(f"Error while querying address {address}: {e}")

return present_devices

def set_device_level(self, address, level, fade_time=0):
try:
self.driver.send(DAPC(GearShort(address), level))
LOG.info(
f"Set device at address {address} to level {level} with fade time {fade_time}"
)
except Exception as e:
LOG.info(f"Error while setting level for address {address}: {e}")

def query_device_info(self, address):
current_command = None
try:
current_command = "QueryGroupsZeroToSeven"
groups_0_7 = self.driver.send(
QueryGroupsZeroToSeven(GearShort(address))
).value
LOG.info(f"Device {address} groups 0-7: {groups_0_7}")

current_command = "QueryGroupsEightToFifteen"
groups_8_15 = self.driver.send(
QueryGroupsEightToFifteen(GearShort(address))
).value
LOG.info(f"Device {address} groups 8-15: {groups_8_15}")

current_command = "QueryMinLevel"
min_level = self.driver.send(QueryMinLevel(GearShort(address))).value
LOG.info(f"Device {address} minimum level: {min_level}")

current_command = "QueryMaxLevel"
max_level = self.driver.send(QueryMaxLevel(GearShort(address))).value
LOG.info(f"Device {address} maximum level: {max_level}")

current_command = "QueryPhysicalMinimum"
physical_minimum = self.driver.send(
QueryPhysicalMinimum(GearShort(address))
).value
LOG.info(f"Device {address} physical minimum: {physical_minimum}")

current_command = "QueryActualLevel"
actual_level = self.driver.send(QueryActualLevel(GearShort(address))).value
LOG.info(f"Device {address} actual level: {actual_level}")

except Exception as e:
LOG.info(
f"Error while querying device {address} with command '{current_command}': {e}"
)

def turn_off_device(self, address):
try:
self.driver.send(Off(GearShort(address)))
LOG.info(f"Turned off device at address {address}")
except Exception as e:
LOG.info(f"Error while turning off device {address}: {e}")


if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
dali_driver = SyncDaliHatDriver()

dali_test = DaliHatTest(dali_driver)
found_devices = []

found_devices = dali_test.scan_devices()
LOG.info(f"Scanned and found {len(found_devices)} devices.")

for device in found_devices:
dali_test.query_device_info(device)
dali_test.set_device_level(device, 128)
dali_test.turn_off_device(device)

dali_driver.close()

0 comments on commit 15bcee3

Please sign in to comment.