Skip to content

Commit

Permalink
Improved bus scanning for devices that won't properly respond
Browse files Browse the repository at this point in the history
  • Loading branch information
takeshixx committed Jun 5, 2016
1 parent 5243e82 commit f7ecd6a
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 30 deletions.
72 changes: 65 additions & 7 deletions libknx/bus.py
Expand Up @@ -25,6 +25,8 @@ def __init__(self, future, loop=None):
self.tpci_sequence_count = 0
self.knx_source_address = None # TODO: probably not needed

self.response_queue = list()

def connection_made(self, transport):
self.transport = transport
self.peername = self.transport.get_extra_info('peername')
Expand All @@ -33,6 +35,32 @@ def connection_made(self, transport):
self.transport.sendto(connect_request.get_message())
# Schedule CONNECTIONSTATE_REQUEST to keep the connection alive
self.loop.call_later(50, self.knx_keep_alive)
self.loop.call_later(4, self.poll_response_queue)

def poll_response_queue(self):
if self.response_queue:
for response in self.response_queue:
knx_source = response.parse_knx_address(response.body.get('cemi').get('knx_source'))
knx_dest = response.parse_knx_address(response.body.get('cemi').get('knx_destination'))
if not knx_source and not knx_dest:
continue

if knx_dest in self.target_futures.keys():
if not self.target_futures[knx_dest].done():
self.target_futures[knx_dest].set_result(response)
del self.target_futures[knx_dest]
elif knx_source in self.target_futures.keys():
if not self.target_futures[knx_source].done():
self.target_futures[knx_source].set_result(response)
del self.target_futures[knx_source]

self.loop.call_later(2, self.poll_response_queue)

def response_timeout(self, target):
if target in self.target_futures.keys():
if not self.target_futures[target].done():
self.target_futures[target].set_result(False)
del self.target_futures[target]

def datagram_received(self, data, addr):
LOGGER.debug('data: {}'.format(data))
Expand Down Expand Up @@ -66,15 +94,28 @@ def datagram_received(self, data, addr):

if knx_message.body.get('cemi').get('controlfield_1').get('confirm'):
LOGGER.debug('KNX device not alive: {}'.format(knx_dest))
if not self.target_futures[knx_dest].done():
self.target_futures[knx_dest].set_result(False)
del self.target_futures[knx_dest]
if knx_dest in self.target_futures.keys():
if not self.target_futures[knx_dest].done():
self.target_futures[knx_dest].set_result(False)
del self.target_futures[knx_dest]
else:
self.response_queue.append(knx_message)

else:
LOGGER.debug('KNX device is alive: {}'.format(knx_dest))
if knx_dest in self.target_futures.keys():
if not self.target_futures[knx_dest].done():
self.target_futures[knx_dest].set_result(True)
del self.target_futures[knx_dest]
else:
self.response_queue.append(knx_message)

elif CEMI_PRIMITIVES[knx_message.body.get('cemi').get('message_code')] == 'L_Data.con' and \
knx_message.body.get('cemi').get('tpci').get('type') == TPCI_TYPES['NDP']:

# if we get a confirmation for our device descriptor request, check if L_Data.ind arrives
if knx_message.body.get('cemi').get('apci') == APCI_TYPES.get('A_DeviceDescriptor_Read'):
self.loop.call_later(3, self.response_timeout, knx_dest)

elif CEMI_PRIMITIVES[knx_message.body.get('cemi').get('message_code')] == 'L_Data.ind' and \
knx_message.body.get('cemi').get('tpci').get('type') == TPCI_TYPES['UCD']:
Expand All @@ -84,6 +125,8 @@ def datagram_received(self, data, addr):
if not self.target_futures[knx_dest].done():
self.target_futures[knx_dest].set_result(False)
del self.target_futures[knx_dest]
else:
self.response_queue.append(knx_message)

elif CEMI_PRIMITIVES[knx_message.body.get('cemi').get('message_code')] == 'L_Data.ind' and \
knx_message.body.get('cemi').get('tpci').get('type') == TPCI_TYPES['NDP']:
Expand All @@ -93,6 +136,8 @@ def datagram_received(self, data, addr):
if not self.target_futures[knx_source].done():
self.target_futures[knx_source].set_result(knx_message)
del self.target_futures[knx_source]
else:
self.response_queue.append(knx_message)

elif knx_message.body.get('cemi').get('apci') == APCI_TYPES['A_Authorize_Response']:

Expand All @@ -101,30 +146,38 @@ def datagram_received(self, data, addr):

if knx_source in self.target_futures.keys():
if not self.target_futures[knx_source].done():
self.target_futures[knx_source].set_result(True)
self.target_futures[knx_source].set_result(knx_message)
del self.target_futures[knx_source]
else:
self.response_queue.append(knx_message)

elif knx_message.body.get('cemi').get('apci') == APCI_TYPES['A_PropertyValue_Response']:

LOGGER.info('{}/{}/{}: PROPERTY_VALUE_RESPONSE DATA: {}'.format(
self.peername[0], knx_source, knx_dest, knx_message.body.get('cemi').get('data')[4:]))

data = knx_message.body.get('cemi').get('data')[4:]
#data = knx_message.body.get('cemi').get('data')[4:]

if knx_source in self.target_futures.keys():
if not self.target_futures[knx_source].done():
self.target_futures[knx_source].set_result(data)
self.target_futures[knx_source].set_result(knx_message)
del self.target_futures[knx_source]
else:
self.response_queue.append(knx_message)

elif knx_message.body.get('cemi').get('apci') == APCI_TYPES['A_Memory_Response']:

LOGGER.info('{}/{}: MEMORY_RESPONSE DATA: {}'.format(
self.peername[0], knx_source, knx_message.body.get('cemi').get('data')))

#data = knx_message.body.get('cemi').get('data')[2:]

if knx_source in self.target_futures.keys():
if not self.target_futures[knx_source].done():
self.target_futures[knx_source].set_result(True)
self.target_futures[knx_source].set_result(knx_message)
del self.target_futures[knx_source]
else:
self.response_queue.append(knx_message)

if CEMI_PRIMITIVES[knx_message.body.get('cemi').get('message_code')] == 'L_Data.con' or \
CEMI_PRIMITIVES[knx_message.body.get('cemi').get('message_code')] == 'L_Data.ind':
Expand Down Expand Up @@ -191,6 +244,11 @@ def knx_tunnel_disconnect(self):
communication_channel=self.communication_channel)
self.transport.sendto(disconnect_request.get_message())

def knx_tpci_disconnect(self, target):
tunnel_request = self.make_tunnel_request(target)
tunnel_request.unnumbered_control_data('DISCONNECT')
self.transport.sendto(tunnel_request.get_message())


class KnxRoutingConnection(asyncio.DatagramProtocol):
# TODO: implement routing
Expand Down
115 changes: 92 additions & 23 deletions libknx/scanner.py
Expand Up @@ -74,6 +74,7 @@ def __init__(self, targets=None, bus_targets=None, max_workers=100, loop=None, )
self.alive_targets = set()
self.knx_gateways = list()
self.bus_devices = set()
self.bus_info = False

# save some timing information
self.t0 = time.time()
Expand Down Expand Up @@ -116,55 +117,117 @@ def knx_bus_worker(self, transport, protocol, queue):
tunnel_request.unnumbered_control_data('CONNECT')
alive = yield from protocol.send_data(tunnel_request.get_message(), target)

sequence = 0

if alive:
if not self.bus_info:
t = KnxBusTargetReport(
address=target,
type=None,
device_serial=None,
manufacturer=None)
self.bus_devices.add(t)
queue.task_done()
continue

# TODO: the device is alive, but not probably we cannot read any properties

# DeviceDescriptorRead
tunnel_request = protocol.make_tunnel_request(target)
tunnel_request.a_device_descriptor_read()
tunnel_request.a_device_descriptor_read(sequence=sequence)
descriptor = yield from protocol.send_data(tunnel_request.get_message(), target)

if not isinstance(descriptor, KnxTunnellingRequest):
t = KnxBusTargetReport(
address=target,
type=None,
device_serial=None,
manufacturer=None)
self.bus_devices.add(t)
tunnel_request = protocol.make_tunnel_request(target)
tunnel_request.unnumbered_control_data('DISCONNECT')
protocol.send_data(tunnel_request.get_message(), target)
queue.task_done()
continue

# NCD
tunnel_request = protocol.make_tunnel_request(target)
tunnel_request.numbered_control_data('ACK')
tunnel_request.numbered_control_data('ACK', sequence=sequence)
ret = yield from protocol.send_data(tunnel_request.get_message(), target)

if not ret:
LOGGER.error('ERROR OCCURED')

sequence += 1

dev_desc = data = struct.unpack('!H', descriptor.body.get('cemi').get('data'))[0]
manufacturer = None
serial = None

if dev_desc > 0x13:
# System 1 devices do not have interface objects or a serial number
# PropertyValueRead
tunnel_request = protocol.make_tunnel_request(target)
tunnel_request.a_property_value_read(
sequence=1, object_index=0, property_id=DEVICE_OBJECTS.get('PID_MANUFACTURER_ID'))
sequence=sequence, object_index=0, property_id=DEVICE_OBJECTS.get('PID_MANUFACTURER_ID'))
manufacturer = yield from protocol.send_data(tunnel_request.get_message(), target)
manufacturer = manufacturer.body.get('cemi').get('data')[4:]
else:
# MemoryRead manufacturer ID
tunnel_request = protocol.make_tunnel_request(target)
tunnel_request.a_memory_read(
sequence=sequence, memory_address=0x0104, read_count=1)
manufacturer = yield from protocol.send_data(tunnel_request.get_message(), target)
manufacturer = manufacturer.body.get('cemi').get('data')[2:]

# NCD
tunnel_request = protocol.make_tunnel_request(target)
tunnel_request.numbered_control_data('ACK', sequence=sequence)
ret = yield from protocol.send_data(tunnel_request.get_message(), target)

if not ret:
manufacturer = 'COULD NOT READ MANUFACTURER'
else:
if isinstance(manufacturer, (str, bytes)):
manufacturer = int.from_bytes(manufacturer, 'big')
manufacturer = get_manufacturer_by_id(manufacturer)

sequence += 1

if dev_desc <= 0x13:
# MemoryRead application program
tunnel_request = protocol.make_tunnel_request(target)
tunnel_request.a_memory_read(
sequence=sequence, memory_address=0x0104, read_count=4)
application_program = yield from protocol.send_data(tunnel_request.get_message(), target)
application_program = application_program.body.get('cemi').get('data')[2:]

LOGGER.info('DATA: {}'.format(application_program))
LOGGER.info('APP_PROGRAM: {}'.format(codecs.encode(application_program, 'hex')))

# NCD
tunnel_request = protocol.make_tunnel_request(target)
tunnel_request.numbered_control_data('ACK', sequence=1)
tunnel_request.numbered_control_data('ACK', sequence=2)
ret = yield from protocol.send_data(tunnel_request.get_message(), target)

if not ret:
manufacturer = 'COULD NOT READ MANUFACTURER'
else:
if isinstance(manufacturer, (str, bytes)):
manufacturer = struct.unpack('!H', manufacturer)[0]
manufacturer = get_manufacturer_by_id(manufacturer)
sequence += 1


if dev_desc > 0x13:
# PropertyValueRead
tunnel_request = protocol.make_tunnel_request(target)
tunnel_request.a_property_value_read(
sequence=2, object_index=0, property_id=DEVICE_OBJECTS.get('PID_SERIAL_NUMBER'))
sequence=sequence, object_index=0, property_id=DEVICE_OBJECTS.get('PID_SERIAL_NUMBER'))
serial = yield from protocol.send_data(tunnel_request.get_message(), target)
serial = serial.body.get('cemi').get('data')[4:]

# NCD
tunnel_request = protocol.make_tunnel_request(target)
tunnel_request.numbered_control_data('ACK', sequence=2)
tunnel_request.numbered_control_data('ACK', sequence=sequence)
ret = yield from protocol.send_data(tunnel_request.get_message(), target)

sequence += 1

if not ret:
serial = 'COULD NOT READ SERIAL'
else:
Expand All @@ -177,14 +240,13 @@ def knx_bus_worker(self, transport, protocol, queue):
type=DEVICE_DESCRIPTORS.get(dev_desc) or 'Unknown',
device_serial=serial or 'Unavailable',
manufacturer=manufacturer or 'Unknown')

self.bus_devices.add(t)

tunnel_request = protocol.make_tunnel_request(target)
tunnel_request.unnumbered_control_data('DISCONNECT')
yield from protocol.send_data(tunnel_request.get_message(), target)


# properly close the TPCI connection
#protocol.knx_tpci_disconnect(target)
tunnel_request = protocol.make_tunnel_request(target)
tunnel_request.unnumbered_control_data('DISCONNECT')
protocol.send_data(tunnel_request.get_message(), target)

queue.task_done()
except asyncio.CancelledError:
Expand Down Expand Up @@ -316,7 +378,7 @@ def knx_description_worker(self):

@asyncio.coroutine
def scan(self, targets=None, search_mode=False, search_timeout=5, iface=None,
bus_mode=False, bus_monitor_mode=False, group_monitor_mode=False):
bus_mode=False, bus_info=False, bus_monitor_mode=False, group_monitor_mode=False):
"""The function that will be called by run_until_complete(). This is the main coroutine."""
if targets:
self.set_targets(targets)
Expand Down Expand Up @@ -352,6 +414,7 @@ def scan(self, targets=None, search_mode=False, search_timeout=5, iface=None,
w.cancel()

if bus_mode and self.knx_gateways:
self.bus_info = bus_info
bus_scanners = [asyncio.Task(self.bus_scan(g), loop=self.loop) for g in self.knx_gateways]
yield from asyncio.wait(bus_scanners)
else:
Expand Down Expand Up @@ -382,9 +445,12 @@ def print_knx_target(knx_target):
for d in knx_target.bus_devices:
_d = dict()
_d[d.address] = collections.OrderedDict()
_d[d.address]['Type'] = d.type
_d[d.address]['Device Serial'] = d.device_serial
_d[d.address]['Manufacturer'] = d.manufacturer
if d.type:
_d[d.address]['Type'] = d.type
if d.device_serial:
_d[d.address]['Device Serial'] = d.device_serial
if d.manufacturer:
_d[d.address]['Manufacturer'] = d.manufacturer
o['Bus Devices'].append(_d)

print()
Expand All @@ -394,7 +460,10 @@ def print_fmt(d, indent=0):
if indent is 0:
print(' ' * indent + str(key))
elif isinstance(value, (dict, collections.OrderedDict)):
print(' ' * indent + str(key) + ': ')
if not len(value.keys()):
print(' ' * indent + str(key))
else:
print(' ' * indent + str(key) + ': ')
else:
print(' ' * indent + str(key) + ': ', end="", flush=True)

Expand Down

0 comments on commit f7ecd6a

Please sign in to comment.