-
Notifications
You must be signed in to change notification settings - Fork 61
Description
Hello.
I've tried running the example code from the documentation to retrieve the logical address of an ECU. Both the await_vehicle_announcement
version
from doipclient import DoIPClient
address, announcement = DoIPClient.await_vehicle_announcement()
# Power cycle your ECU and wait for a few seconds for the broadcast to be
# received
logical_address = announcement.logical_address
ip, port = address
print(ip, port, logical_address)
and the get_entity()
version
from doipclient import DoIPClient
address, announcement = DoIPClient.get_entity()
logical_address = announcement.logical_address
ip, port = address
print(ip, port, logical_address)
produce an error.
The get_entity()
method always assumes IPv4 addressing, as the ip address defaults to an IPv4 broadcast address, and the socket is always made for IPv4, as the _create_udp_socket()
does not receive the IPv6 argument, which defaults to False
.
_create_udp_socket()
is also problematic, because if IPv6 is true, the socket is bound to a multicast link local address, which will always fail, as a socket cannot be bound to a multicast address. The correct way would be to bind the socket to all addresses with
sock.bind(("::", udp_port))
so that the socket receives packets from all addresses. This shortcoming of the socket creating is also the reason why IPv6 version of await_vehicle_announcement
always fails.
I have noticed, that this topic has been discussed somewhat in this issue, but without any conclusion.
I ended up creating the socket myself, and sending the request manually. Reception of the answer then works correctly with await_vehicle_announcement()
with the created socket provided.
def pack_doip(protocol_version, payload_type, payload_data):
data_bytes = struct.pack(
"!BBHL",
protocol_version,
0xFF ^ protocol_version,
payload_type,
len(payload_data),
)
data_bytes += payload_data
return data_bytes
def open_udp_socket(port, interface_name, timeout=None):
iface_idx = socket.if_nametoindex(interface_name)
sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("::", port))
mc_addr = ipaddress.IPv6Address(LINK_LOCAL_MULTICAST_ADDRESS)
join_data = struct.pack("16sI", mc_addr.packed, iface_idx)
sock.setsockopt(IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, join_data)
if timeout is not None:
sock.settimeout(timeout)
return sock
def get_address_of_ecu(ip_address: str = "::1", protocol_version=0x02):
doip_port = 13400
sock = open_udp_socket(port=doip_port, interface_name="lo", timeout=2)
message = VehicleIdentificationRequest()
payload_data = message.pack()
payload_type = payload_message_to_type[type(message)]
data_bytes = pack_doip(protocol_version, payload_type, payload_data)
sock.sendto(data_bytes, (ip_address, doip_port))
addr, announcement = DoIPClient.await_vehicle_announcement(sock=sock)
ip, port = addr
return Address(ip=ip, port=port, logical_address=announcement.logical_address)
Is there any plan to amend this issue?
Thanks,
Martin