diff --git a/Makefile b/Makefile index aa8f164e..dc809270 100644 --- a/Makefile +++ b/Makefile @@ -10,6 +10,7 @@ install: lint: pylint meshtastic +# run the coverage report and open results in a browser cov: pytest --cov-report html --cov=meshtastic # on mac, this will open the coverage report in a browser @@ -19,4 +20,5 @@ cov: examples: FORCE pytest -mexamples +# Makefile hack to get the examples to always run FORCE: ; diff --git a/README.md b/README.md index 003c2be1..37b26edb 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ An example using Python 3 code to send a message to the mesh: ``` import meshtastic -interface = meshtastic.SerialInterface() # By default will try to find a meshtastic device, otherwise provide a device path like /dev/ttyUSB0 +interface = meshtastic.serial_interface.SerialInterface() # By default will try to find a meshtastic device, otherwise provide a device path like /dev/ttyUSB0 interface.sendText("hello mesh") # or sendData to send binary data, see documentations for other options. interface.close() ``` @@ -103,7 +103,7 @@ You can even set the channel preshared key to a particular AES128 or AES256 sequ meshtastic --ch-set psk 0x1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b --info ``` -Use "--ch-set psk none" to turn off encryption. +Use "--ch-set psk none" to turn off encryption. Use "--ch-set psk random" will assign a new (high quality) random AES256 key to the primary channel (similar to what the Android app does when making new channels). diff --git a/meshtastic/mesh_interface.py b/meshtastic/mesh_interface.py index fee91664..92581c52 100644 --- a/meshtastic/mesh_interface.py +++ b/meshtastic/mesh_interface.py @@ -103,6 +103,7 @@ def getTimeAgo(ts): rows = [] if self.nodes: + logging.debug(f'self.nodes:{self.nodes}') for node in self.nodes.values(): if not includeSelf and node['num'] == self.localNode.nodeNum: continue @@ -227,8 +228,9 @@ def sendData(self, data, destinationId=BROADCAST_ADDR, data = data.SerializeToString() logging.debug(f"len(data): {len(data)}") + logging.debug(f"mesh_pb2.Constants.DATA_PAYLOAD_LEN: {mesh_pb2.Constants.DATA_PAYLOAD_LEN}") if len(data) > mesh_pb2.Constants.DATA_PAYLOAD_LEN: - Exception("Data payload too big") + raise Exception("Data payload too big") if portNum == portnums_pb2.PortNum.UNKNOWN_APP: # we are now more strict wrt port numbers our_exit("Warning: A non-zero port number must be specified") @@ -261,12 +263,15 @@ def sendPosition(self, latitude=0.0, longitude=0.0, altitude=0, timeSec=0, p = mesh_pb2.Position() if latitude != 0.0: p.latitude_i = int(latitude / 1e-7) + logging.debug(f'p.latitude_i:{p.latitude_i}') if longitude != 0.0: p.longitude_i = int(longitude / 1e-7) + logging.debug(f'p.longitude_i:{p.longitude_i}') if altitude != 0: p.altitude = int(altitude) + logging.debug(f'p.altitude:{p.altitude}') if timeSec == 0: timeSec = time.time() # returns unix timestamp in seconds @@ -307,7 +312,10 @@ def _sendPacket(self, meshPacket, elif destinationId == BROADCAST_ADDR: nodeNum = BROADCAST_NUM elif destinationId == LOCAL_ADDR: - nodeNum = self.myInfo.my_node_num + if self.myInfo: + nodeNum = self.myInfo.my_node_num + else: + our_exit("Warning: No myInfo found.") # A simple hex style nodeid - we can parse this without needing the DB elif destinationId.startswith("!"): nodeNum = int(destinationId[1:], 16) @@ -330,7 +338,7 @@ def _sendPacket(self, meshPacket, meshPacket.id = self._generatePacketId() toRadio.packet.CopyFrom(meshPacket) - #logging.debug(f"Sending packet: {stripnl(meshPacket)}") + logging.debug(f"Sending packet: {stripnl(meshPacket)}") self._sendToRadio(toRadio) return meshPacket @@ -344,6 +352,7 @@ def getMyNodeInfo(self): """Get info about my node.""" if self.myInfo is None: return None + logging.debug(f'self.nodesByNum:{self.nodesByNum}') return self.nodesByNum.get(self.myInfo.my_node_num) def getMyUser(self): @@ -370,8 +379,9 @@ def getShortName(self): def _waitConnected(self): """Block until the initial node db download is complete, or timeout and raise an exception""" - if not self.isConnected.wait(15.0): # timeout after x seconds - raise Exception("Timed out waiting for connection completion") + if not self.noProto: + if not self.isConnected.wait(15.0): # timeout after x seconds + raise Exception("Timed out waiting for connection completion") # If we failed while connecting, raise the connection to the client if self.failure: diff --git a/meshtastic/node.py b/meshtastic/node.py index 4b682b37..3158d577 100644 --- a/meshtastic/node.py +++ b/meshtastic/node.py @@ -8,6 +8,8 @@ from .util import pskToString, stripnl, Timeout, our_exit, fromPSK + + class Node: """A model of a (local or remote) node in the mesh @@ -28,7 +30,9 @@ def showChannels(self): """Show human readable description of our channels.""" print("Channels:") if self.channels: + logging.debug(f'self.channels:{self.channels}') for c in self.channels: + #print('c.settings.psk:', c.settings.psk) cStr = stripnl(MessageToJson(c.settings)) # only show if there is no psk (meaning disabled channel) if c.settings.psk: @@ -108,7 +112,7 @@ def deleteChannel(self, channelIndex): # *moving* the admin channel index as we are writing if (self.iface.localNode == self) and index >= adminIndex: # We've now passed the old location for admin index - # (and writen it), so we can start finding it by name again + # (and written it), so we can start finding it by name again adminIndex = 0 def getChannelByName(self, name): @@ -220,25 +224,29 @@ def setURL(self, url): self.writeChannel(ch.index) i = i + 1 + + def onResponseRequestSettings(self, p): + """Handle the response packet for requesting settings _requestSettings()""" + logging.debug(f'onResponseRequestSetting() p:{p}') + errorFound = False + if 'routing' in p["decoded"]: + if p["decoded"]["routing"]["errorReason"] != "NONE": + errorFound = True + print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}') + if errorFound is False: + self.radioConfig = p["decoded"]["admin"]["raw"].get_radio_response + logging.debug(f'self.radioConfig:{self.radioConfig}') + logging.debug("Received radio config, now fetching channels...") + self._timeout.reset() # We made foreward progress + self._requestChannel(0) # now start fetching channels + + def _requestSettings(self): """Done with initial config messages, now send regular MeshPackets to ask for settings.""" p = admin_pb2.AdminMessage() p.get_radio_request = True - def onResponse(p): - """A closure to handle the response packet""" - errorFound = False - if 'routing' in p["decoded"]: - if p["decoded"]["routing"]["errorReason"] != "NONE": - errorFound = True - print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}') - if errorFound is False: - self.radioConfig = p["decoded"]["admin"]["raw"].get_radio_response - logging.debug("Received radio config, now fetching channels...") - self._timeout.reset() # We made foreward progress - self._requestChannel(0) # now start fetching channels - # Show progress message for super slow operations if self != self.iface.localNode: print("Requesting preferences from remote node.") @@ -249,7 +257,7 @@ def onResponse(p): print(" 4. All devices have been rebooted after all of the above. (optional, but recommended)") print("Note: This could take a while (it requests remote channel configs, then writes config)") - return self._sendAdmin(p, wantResponse=True, onResponse=onResponse) + return self._sendAdmin(p, wantResponse=True, onResponse=self.onResponseRequestSettings) def exitSimulator(self): """Tell a simulator node to exit (this message @@ -290,6 +298,34 @@ def _fillChannels(self): self.channels.append(ch) index += 1 + + def onResponseRequestChannel(self, p): + """Handle the response packet for requesting a channel _requestChannel()""" + logging.debug(f'onResponseRequestChannel() p:{p}') + c = p["decoded"]["admin"]["raw"].get_channel_response + self.partialChannels.append(c) + self._timeout.reset() # We made foreward progress + logging.debug(f"Received channel {stripnl(c)}") + index = c.index + + # for stress testing, we can always download all channels + fastChannelDownload = True + + # Once we see a response that has NO settings, assume + # we are at the end of channels and stop fetching + quitEarly = (c.role == channel_pb2.Channel.Role.DISABLED) and fastChannelDownload + + if quitEarly or index >= self.iface.myInfo.max_channels - 1: + logging.debug("Finished downloading channels") + + self.channels = self.partialChannels + self._fixupChannels() + + # FIXME, the following should only be called after we have settings and channels + self.iface._connected() # Tell everyone else we are ready to go + else: + self._requestChannel(index + 1) + def _requestChannel(self, channelNum: int): """Done with initial config messages, now send regular MeshPackets to ask for settings""" @@ -303,33 +339,7 @@ def _requestChannel(self, channelNum: int): else: logging.debug(f"Requesting channel {channelNum}") - def onResponse(p): - """A closure to handle the response packet for requesting a channel""" - c = p["decoded"]["admin"]["raw"].get_channel_response - self.partialChannels.append(c) - self._timeout.reset() # We made foreward progress - logging.debug(f"Received channel {stripnl(c)}") - index = c.index - - # for stress testing, we can always download all channels - fastChannelDownload = True - - # Once we see a response that has NO settings, assume - # we are at the end of channels and stop fetching - quitEarly = (c.role == channel_pb2.Channel.Role.DISABLED) and fastChannelDownload - - if quitEarly or index >= self.iface.myInfo.max_channels - 1: - logging.debug("Finished downloading channels") - - self.channels = self.partialChannels - self._fixupChannels() - - # FIXME, the following should only be called after we have settings and channels - self.iface._connected() # Tell everyone else we are ready to go - else: - self._requestChannel(index + 1) - - return self._sendAdmin(p, wantResponse=True, onResponse=onResponse) + return self._sendAdmin(p, wantResponse=True, onResponse=self.onResponseRequestChannel) # pylint: disable=R1710 diff --git a/meshtastic/tests/conftest.py b/meshtastic/tests/conftest.py index 3bcaecb1..12c566c1 100644 --- a/meshtastic/tests/conftest.py +++ b/meshtastic/tests/conftest.py @@ -2,9 +2,11 @@ import argparse +from unittest.mock import MagicMock import pytest from meshtastic.__main__ import Globals +from ..mesh_interface import MeshInterface @pytest.fixture def reset_globals(): @@ -13,3 +15,46 @@ def reset_globals(): parser = argparse.ArgumentParser() Globals.getInstance().reset() Globals.getInstance().set_parser(parser) + + +@pytest.fixture +def iface_with_nodes(): + """Fixture to setup some nodes.""" + nodesById = { + '!9388f81c': { + 'num': 2475227164, + 'user': { + 'id': '!9388f81c', + 'longName': 'Unknown f81c', + 'shortName': '?1C', + 'macaddr': 'RBeTiPgc', + 'hwModel': 'TBEAM' + }, + 'position': {}, + 'lastHeard': 1640204888 + } + } + + nodesByNum = { + 2475227164: { + 'num': 2475227164, + 'user': { + 'id': '!9388f81c', + 'longName': 'Unknown f81c', + 'shortName': '?1C', + 'macaddr': 'RBeTiPgc', + 'hwModel': 'TBEAM' + }, + 'position': { + 'time': 1640206266 + }, + 'lastHeard': 1640206266 + } + } + iface = MeshInterface(noProto=True) + iface.nodes = nodesById + iface.nodesByNum = nodesByNum + myInfo = MagicMock() + iface.myInfo = myInfo + iface.myInfo.my_node_num = 2475227164 + return iface diff --git a/meshtastic/tests/test_mesh_interface.py b/meshtastic/tests/test_mesh_interface.py index 94299119..2f56ebb3 100644 --- a/meshtastic/tests/test_mesh_interface.py +++ b/meshtastic/tests/test_mesh_interface.py @@ -9,13 +9,36 @@ from ..mesh_interface import MeshInterface from ..node import Node from .. import mesh_pb2 -from ..__init__ import LOCAL_ADDR +from ..__init__ import LOCAL_ADDR, BROADCAST_ADDR @pytest.mark.unit def test_MeshInterface(capsys, reset_globals): """Test that we can instantiate a MeshInterface""" iface = MeshInterface(noProto=True) + anode = Node('foo', 'bar') + + nodes = { + '!9388f81c': { + 'num': 2475227164, + 'user': { + 'id': '!9388f81c', + 'longName': 'Unknown f81c', + 'shortName': '?1C', + 'macaddr': 'RBeTiPgc', + 'hwModel': 'TBEAM' + }, + 'position': {}, + 'lastHeard': 1640204888 + } + } + + iface.nodesByNum = {1: anode } + iface.nodes = nodes + + myInfo = MagicMock() + iface.myInfo = myInfo + iface.showInfo() iface.localNode.showInfo() iface.showNodes() @@ -30,6 +53,36 @@ def test_MeshInterface(capsys, reset_globals): assert err == '' +@pytest.mark.unit +def test_getMyUser(reset_globals, iface_with_nodes): + """Test getMyUser()""" + iface = iface_with_nodes + + iface.myInfo.my_node_num = 2475227164 + myuser = iface.getMyUser() + print(f'myuser:{myuser}') + assert myuser is not None + assert myuser["id"] == '!9388f81c' + + +@pytest.mark.unit +def test_getLongName(reset_globals, iface_with_nodes): + """Test getLongName()""" + iface = iface_with_nodes + iface.myInfo.my_node_num = 2475227164 + mylongname = iface.getLongName() + assert mylongname == 'Unknown f81c' + + +@pytest.mark.unit +def test_getShortName(reset_globals, iface_with_nodes): + """Test getShortName().""" + iface = iface_with_nodes + iface.myInfo.my_node_num = 2475227164 + myshortname = iface.getShortName() + assert myshortname == '?1C' + + @pytest.mark.unit def test_handlePacketFromRadio_no_from(capsys, reset_globals): """Test _handlePacketFromRadio with no 'from' in the mesh packet.""" @@ -216,3 +269,194 @@ def test_handleFromRadio_with_node_info_tbeam_with_bad_data(reset_globals, caplo with caplog.at_level(logging.DEBUG): iface._startConfig() iface._handleFromRadio(from_radio_bytes) + + +@pytest.mark.unit +def test_MeshInterface_sendToRadioImpl(caplog, reset_globals): + """Test _sendToRadioImp()""" + iface = MeshInterface(noProto=True) + with caplog.at_level(logging.DEBUG): + iface._sendToRadioImpl('foo') + assert re.search(r'Subclass must provide toradio', caplog.text, re.MULTILINE) + iface.close() + + +@pytest.mark.unit +def test_MeshInterface_sendToRadio_no_proto(caplog, reset_globals): + """Test sendToRadio()""" + iface = MeshInterface() + with caplog.at_level(logging.DEBUG): + iface._sendToRadioImpl('foo') + assert re.search(r'Subclass must provide toradio', caplog.text, re.MULTILINE) + iface.close() + + +@pytest.mark.unit +def test_sendData_too_long(caplog, reset_globals): + """Test when data payload is too big""" + iface = MeshInterface(noProto=True) + some_large_text = b'This is a long text that will be too long for send text.' + some_large_text += b'This is a long text that will be too long for send text.' + some_large_text += b'This is a long text that will be too long for send text.' + some_large_text += b'This is a long text that will be too long for send text.' + some_large_text += b'This is a long text that will be too long for send text.' + some_large_text += b'This is a long text that will be too long for send text.' + some_large_text += b'This is a long text that will be too long for send text.' + some_large_text += b'This is a long text that will be too long for send text.' + some_large_text += b'This is a long text that will be too long for send text.' + some_large_text += b'This is a long text that will be too long for send text.' + some_large_text += b'This is a long text that will be too long for send text.' + some_large_text += b'This is a long text that will be too long for send text.' + with caplog.at_level(logging.DEBUG): + with pytest.raises(Exception) as pytest_wrapped_e: + iface.sendData(some_large_text) + assert re.search('Data payload too big', caplog.text, re.MULTILINE) + assert pytest_wrapped_e.type == Exception + iface.close() + + +@pytest.mark.unit +def test_sendData_unknown_app(capsys, reset_globals): + """Test sendData when unknown app""" + iface = MeshInterface(noProto=True) + with pytest.raises(SystemExit) as pytest_wrapped_e: + iface.sendData(b'hello', portNum=0) + out, err = capsys.readouterr() + assert re.search(r'Warning: A non-zero port number', out, re.MULTILINE) + assert err == '' + assert pytest_wrapped_e.type == SystemExit + assert pytest_wrapped_e.value.code == 1 + + +@pytest.mark.unit +def test_sendPosition_with_a_position(caplog, reset_globals): + """Test sendPosition when lat/long/alt""" + iface = MeshInterface(noProto=True) + with caplog.at_level(logging.DEBUG): + iface.sendPosition(latitude=40.8, longitude=-111.86, altitude=201) + assert re.search(r'p.latitude_i:408', caplog.text, re.MULTILINE) + assert re.search(r'p.longitude_i:-11186', caplog.text, re.MULTILINE) + assert re.search(r'p.altitude:201', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_sendPacket_with_no_destination(capsys, reset_globals): + """Test _sendPacket()""" + iface = MeshInterface(noProto=True) + with pytest.raises(SystemExit) as pytest_wrapped_e: + iface._sendPacket(b'', destinationId=None) + out, err = capsys.readouterr() + assert re.search(r'Warning: destinationId must not be None', out, re.MULTILINE) + assert err == '' + assert pytest_wrapped_e.type == SystemExit + assert pytest_wrapped_e.value.code == 1 + + +@pytest.mark.unit +def test_sendPacket_with_destination_as_int(caplog, reset_globals): + """Test _sendPacket() with int as a destination""" + iface = MeshInterface(noProto=True) + with caplog.at_level(logging.DEBUG): + meshPacket = mesh_pb2.MeshPacket() + iface._sendPacket(meshPacket, destinationId=123) + assert re.search(r'Sending packet', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_sendPacket_with_destination_starting_with_a_bang(caplog, reset_globals): + """Test _sendPacket() with int as a destination""" + iface = MeshInterface(noProto=True) + with caplog.at_level(logging.DEBUG): + meshPacket = mesh_pb2.MeshPacket() + iface._sendPacket(meshPacket, destinationId='!1234') + assert re.search(r'Sending packet', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_sendPacket_with_destination_as_BROADCAST_ADDR(caplog, reset_globals): + """Test _sendPacket() with BROADCAST_ADDR as a destination""" + iface = MeshInterface(noProto=True) + with caplog.at_level(logging.DEBUG): + meshPacket = mesh_pb2.MeshPacket() + iface._sendPacket(meshPacket, destinationId=BROADCAST_ADDR) + assert re.search(r'Sending packet', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_sendPacket_with_destination_as_LOCAL_ADDR_no_myInfo(capsys, reset_globals): + """Test _sendPacket() with LOCAL_ADDR as a destination with no myInfo""" + iface = MeshInterface(noProto=True) + with pytest.raises(SystemExit) as pytest_wrapped_e: + meshPacket = mesh_pb2.MeshPacket() + iface._sendPacket(meshPacket, destinationId=LOCAL_ADDR) + out, err = capsys.readouterr() + assert re.search(r'Warning: No myInfo', out, re.MULTILINE) + assert err == '' + assert pytest_wrapped_e.type == SystemExit + assert pytest_wrapped_e.value.code == 1 + + +@pytest.mark.unit +def test_sendPacket_with_destination_as_LOCAL_ADDR_with_myInfo(caplog, reset_globals): + """Test _sendPacket() with LOCAL_ADDR as a destination with myInfo""" + iface = MeshInterface(noProto=True) + myInfo = MagicMock() + iface.myInfo = myInfo + with caplog.at_level(logging.DEBUG): + meshPacket = mesh_pb2.MeshPacket() + iface._sendPacket(meshPacket, destinationId=LOCAL_ADDR) + assert re.search(r'Sending packet', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_sendPacket_with_destination_is_blank_with_nodes(capsys, reset_globals, iface_with_nodes): + """Test _sendPacket() with '' as a destination with myInfo""" + iface = iface_with_nodes + meshPacket = mesh_pb2.MeshPacket() + with pytest.raises(SystemExit) as pytest_wrapped_e: + iface._sendPacket(meshPacket, destinationId='') + assert pytest_wrapped_e.type == SystemExit + assert pytest_wrapped_e.value.code == 1 + out, err = capsys.readouterr() + assert re.match(r'Warning: NodeId not found in DB', out, re.MULTILINE) + assert err == '' + + +@pytest.mark.unit +def test_sendPacket_with_destination_is_blank_without_nodes(caplog, reset_globals, iface_with_nodes): + """Test _sendPacket() with '' as a destination with myInfo""" + iface = iface_with_nodes + iface.nodes = None + meshPacket = mesh_pb2.MeshPacket() + with caplog.at_level(logging.WARNING): + iface._sendPacket(meshPacket, destinationId='') + assert re.search(r'Warning: There were no self.nodes.', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_getMyNodeInfo(reset_globals): + """Test getMyNodeInfo()""" + iface = MeshInterface(noProto=True) + anode = iface.getNode(LOCAL_ADDR) + iface.nodesByNum = {1: anode } + assert iface.nodesByNum.get(1) == anode + myInfo = MagicMock() + iface.myInfo = myInfo + iface.myInfo.my_node_num = 1 + myinfo = iface.getMyNodeInfo() + assert myinfo == anode + + +@pytest.mark.unit +def test_generatePacketId(capsys, reset_globals): + """Test _generatePacketId() when no currentPacketId (not connected)""" + iface = MeshInterface(noProto=True) + # not sure when this condition would ever happen... but we can simulate it + iface.currentPacketId = None + assert iface.currentPacketId is None + with pytest.raises(Exception) as pytest_wrapped_e: + iface._generatePacketId() + out, err = capsys.readouterr() + assert re.search(r'Not connected yet, can not generate packet', out, re.MULTILINE) + assert err == '' + assert pytest_wrapped_e.type == Exception diff --git a/meshtastic/tests/test_node.py b/meshtastic/tests/test_node.py index 528bead4..e9b2ff8d 100644 --- a/meshtastic/tests/test_node.py +++ b/meshtastic/tests/test_node.py @@ -9,12 +9,16 @@ from ..node import Node from ..serial_interface import SerialInterface from ..admin_pb2 import AdminMessage +from ..channel_pb2 import Channel +from ..radioconfig_pb2 import RadioConfig @pytest.mark.unit def test_node(capsys): """Test that we can instantiate a Node""" anode = Node('foo', 'bar') + radioConfig = RadioConfig() + anode.radioConfig = radioConfig anode.showChannels() anode.showInfo() out, err = capsys.readouterr() @@ -139,3 +143,689 @@ def test_setURL_valid_URL_but_no_settings(caplog): anode.setURL(url) assert pytest_wrapped_e.type == SystemExit assert pytest_wrapped_e.value.code == 1 + + +@pytest.mark.unit +def test_showChannels(capsys): + """Test showChannels""" + anode = Node('foo', 'bar') + + # primary channel + # role: 0=Disabled, 1=Primary, 2=Secondary + # modem_config: 0-5 + # role: 0=Disabled, 1=Primary, 2=Secondary + channel1 = Channel(index=1, role=1) + channel1.settings.modem_config = 3 + channel1.settings.psk = b'\x01' + + channel2 = Channel(index=2, role=2) + channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84' + channel2.settings.name = 'testing' + + channel3 = Channel(index=3, role=0) + channel4 = Channel(index=4, role=0) + channel5 = Channel(index=5, role=0) + channel6 = Channel(index=6, role=0) + channel7 = Channel(index=7, role=0) + channel8 = Channel(index=8, role=0) + + channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ] + + anode.channels = channels + anode.showChannels() + out, err = capsys.readouterr() + assert re.search(r'Channels:', out, re.MULTILINE) + # primary channel + assert re.search(r'Primary channel URL', out, re.MULTILINE) + assert re.search(r'PRIMARY psk=default ', out, re.MULTILINE) + assert re.search(r'"modemConfig": "Bw125Cr48Sf4096"', out, re.MULTILINE) + assert re.search(r'"psk": "AQ=="', out, re.MULTILINE) + # secondary channel + assert re.search(r'SECONDARY psk=secret ', out, re.MULTILINE) + assert re.search(r'"psk": "ipR5DsbJHjWREkCmMKi0M4cA8ksO539Bes31sJAwqDQ="', out, re.MULTILINE) + assert err == '' + + +@pytest.mark.unit +def test_deleteChannel_try_to_delete_primary_channel(capsys): + """Try to delete primary channel.""" + anode = Node('foo', 'bar') + + channel1 = Channel(index=1, role=1) + channel1.settings.modem_config = 3 + channel1.settings.psk = b'\x01' + + # no secondary channels + channel2 = Channel(index=2, role=0) + channel3 = Channel(index=3, role=0) + channel4 = Channel(index=4, role=0) + channel5 = Channel(index=5, role=0) + channel6 = Channel(index=6, role=0) + channel7 = Channel(index=7, role=0) + channel8 = Channel(index=8, role=0) + + channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ] + + anode.channels = channels + with pytest.raises(SystemExit) as pytest_wrapped_e: + anode.deleteChannel(0) + assert pytest_wrapped_e.type == SystemExit + assert pytest_wrapped_e.value.code == 1 + out, err = capsys.readouterr() + assert re.search(r'Warning: Only SECONDARY channels can be deleted', out, re.MULTILINE) + assert err == '' + + +@pytest.mark.unit +def test_deleteChannel_secondary(): + """Try to delete a secondary channel.""" + + channel1 = Channel(index=1, role=1) + channel1.settings.modem_config = 3 + channel1.settings.psk = b'\x01' + + channel2 = Channel(index=2, role=2) + channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84' + channel2.settings.name = 'testing' + + channel3 = Channel(index=3, role=0) + channel4 = Channel(index=4, role=0) + channel5 = Channel(index=5, role=0) + channel6 = Channel(index=6, role=0) + channel7 = Channel(index=7, role=0) + channel8 = Channel(index=8, role=0) + + channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ] + + + iface = MagicMock(autospec=SerialInterface) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + mo.localNode.getChannelByName.return_value = None + mo.myInfo.max_channels = 8 + anode = Node(mo, 'bar', noProto=True) + + anode.channels = channels + assert len(anode.channels) == 8 + assert channels[0].settings.modem_config == 3 + assert channels[1].settings.name == 'testing' + assert channels[2].settings.name == '' + assert channels[3].settings.name == '' + assert channels[4].settings.name == '' + assert channels[5].settings.name == '' + assert channels[6].settings.name == '' + assert channels[7].settings.name == '' + + anode.deleteChannel(1) + + assert len(anode.channels) == 8 + assert channels[0].settings.modem_config == 3 + assert channels[1].settings.name == '' + assert channels[2].settings.name == '' + assert channels[3].settings.name == '' + assert channels[4].settings.name == '' + assert channels[5].settings.name == '' + assert channels[6].settings.name == '' + assert channels[7].settings.name == '' + + +@pytest.mark.unit +def test_deleteChannel_secondary_with_admin_channel_after_testing(): + """Try to delete a secondary channel where there is an admin channel.""" + + channel1 = Channel(index=1, role=1) + channel1.settings.modem_config = 3 + channel1.settings.psk = b'\x01' + + channel2 = Channel(index=2, role=2) + channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84' + channel2.settings.name = 'testing' + + channel3 = Channel(index=3, role=2) + channel3.settings.name = 'admin' + + channel4 = Channel(index=4, role=0) + channel5 = Channel(index=5, role=0) + channel6 = Channel(index=6, role=0) + channel7 = Channel(index=7, role=0) + channel8 = Channel(index=8, role=0) + + channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ] + + + iface = MagicMock(autospec=SerialInterface) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + mo.localNode.getChannelByName.return_value = None + mo.myInfo.max_channels = 8 + anode = Node(mo, 'bar', noProto=True) + + # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock + mo.localNode = anode + + assert mo.localNode == anode + + anode.channels = channels + assert len(anode.channels) == 8 + assert channels[0].settings.modem_config == 3 + assert channels[1].settings.name == 'testing' + assert channels[2].settings.name == 'admin' + assert channels[3].settings.name == '' + assert channels[4].settings.name == '' + assert channels[5].settings.name == '' + assert channels[6].settings.name == '' + assert channels[7].settings.name == '' + + anode.deleteChannel(1) + + assert len(anode.channels) == 8 + assert channels[0].settings.modem_config == 3 + assert channels[1].settings.name == 'admin' + assert channels[2].settings.name == '' + assert channels[3].settings.name == '' + assert channels[4].settings.name == '' + assert channels[5].settings.name == '' + assert channels[6].settings.name == '' + assert channels[7].settings.name == '' + + +@pytest.mark.unit +def test_deleteChannel_secondary_with_admin_channel_before_testing(): + """Try to delete a secondary channel where there is an admin channel.""" + + channel1 = Channel(index=1, role=1) + channel1.settings.modem_config = 3 + channel1.settings.psk = b'\x01' + + channel2 = Channel(index=2, role=2) + channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84' + channel2.settings.name = 'admin' + + channel3 = Channel(index=3, role=2) + channel3.settings.name = 'testing' + + channel4 = Channel(index=4, role=0) + channel5 = Channel(index=5, role=0) + channel6 = Channel(index=6, role=0) + channel7 = Channel(index=7, role=0) + channel8 = Channel(index=8, role=0) + + channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ] + + + iface = MagicMock(autospec=SerialInterface) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + mo.localNode.getChannelByName.return_value = None + mo.myInfo.max_channels = 8 + anode = Node(mo, 'bar', noProto=True) + + anode.channels = channels + assert len(anode.channels) == 8 + assert channels[0].settings.modem_config == 3 + assert channels[1].settings.name == 'admin' + assert channels[2].settings.name == 'testing' + assert channels[3].settings.name == '' + assert channels[4].settings.name == '' + assert channels[5].settings.name == '' + assert channels[6].settings.name == '' + assert channels[7].settings.name == '' + + anode.deleteChannel(2) + + assert len(anode.channels) == 8 + assert channels[0].settings.modem_config == 3 + assert channels[1].settings.name == 'admin' + assert channels[2].settings.name == '' + assert channels[3].settings.name == '' + assert channels[4].settings.name == '' + assert channels[5].settings.name == '' + assert channels[6].settings.name == '' + assert channels[7].settings.name == '' + + +@pytest.mark.unit +def test_getChannelByName(capsys): + """Get a channel by the name.""" + anode = Node('foo', 'bar') + + channel1 = Channel(index=1, role=1) + channel1.settings.modem_config = 3 + channel1.settings.psk = b'\x01' + + channel2 = Channel(index=2, role=2) + channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84' + channel2.settings.name = 'admin' + + channel3 = Channel(index=3, role=0) + channel4 = Channel(index=4, role=0) + channel5 = Channel(index=5, role=0) + channel6 = Channel(index=6, role=0) + channel7 = Channel(index=7, role=0) + channel8 = Channel(index=8, role=0) + + channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ] + + anode.channels = channels + ch = anode.getChannelByName('admin') + assert ch.index == 2 + + +@pytest.mark.unit +def test_getChannelByName_invalid_name(capsys): + """Get a channel by the name but one that is not present.""" + anode = Node('foo', 'bar') + + channel1 = Channel(index=1, role=1) + channel1.settings.modem_config = 3 + channel1.settings.psk = b'\x01' + + channel2 = Channel(index=2, role=2) + channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84' + channel2.settings.name = 'admin' + + channel3 = Channel(index=3, role=0) + channel4 = Channel(index=4, role=0) + channel5 = Channel(index=5, role=0) + channel6 = Channel(index=6, role=0) + channel7 = Channel(index=7, role=0) + channel8 = Channel(index=8, role=0) + + channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ] + + anode.channels = channels + ch = anode.getChannelByName('testing') + assert ch is None + + +@pytest.mark.unit +def test_getDisabledChannel(capsys): + """Get the first disabled channel.""" + anode = Node('foo', 'bar') + + channel1 = Channel(index=1, role=1) + channel1.settings.modem_config = 3 + channel1.settings.psk = b'\x01' + + channel2 = Channel(index=2, role=2) + channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84' + channel2.settings.name = 'testingA' + + channel3 = Channel(index=3, role=2) + channel3.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84' + channel3.settings.name = 'testingB' + + channel4 = Channel(index=4, role=0) + channel5 = Channel(index=5, role=0) + channel6 = Channel(index=6, role=0) + channel7 = Channel(index=7, role=0) + channel8 = Channel(index=8, role=0) + + channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ] + + anode.channels = channels + ch = anode.getDisabledChannel() + assert ch.index == 4 + + +@pytest.mark.unit +def test_getDisabledChannel_where_all_channels_are_used(capsys): + """Get the first disabled channel.""" + anode = Node('foo', 'bar') + + channel1 = Channel(index=1, role=1) + channel1.settings.modem_config = 3 + channel1.settings.psk = b'\x01' + + channel2 = Channel(index=2, role=2) + channel3 = Channel(index=3, role=2) + channel4 = Channel(index=4, role=2) + channel5 = Channel(index=5, role=2) + channel6 = Channel(index=6, role=2) + channel7 = Channel(index=7, role=2) + channel8 = Channel(index=8, role=2) + + channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ] + + anode.channels = channels + ch = anode.getDisabledChannel() + assert ch is None + + +@pytest.mark.unit +def test_getAdminChannelIndex(capsys): + """Get the 'admin' channel index.""" + anode = Node('foo', 'bar') + + channel1 = Channel(index=1, role=1) + channel1.settings.modem_config = 3 + channel1.settings.psk = b'\x01' + + channel2 = Channel(index=2, role=2) + channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84' + channel2.settings.name = 'admin' + + channel3 = Channel(index=3, role=0) + channel4 = Channel(index=4, role=0) + channel5 = Channel(index=5, role=0) + channel6 = Channel(index=6, role=0) + channel7 = Channel(index=7, role=0) + channel8 = Channel(index=8, role=0) + + channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ] + + anode.channels = channels + i = anode._getAdminChannelIndex() + assert i == 2 + + +@pytest.mark.unit +def test_getAdminChannelIndex_when_no_admin_named_channel(capsys): + """Get the 'admin' channel when there is not one.""" + anode = Node('foo', 'bar') + + channel1 = Channel(index=1, role=1) + channel1.settings.modem_config = 3 + channel1.settings.psk = b'\x01' + + channel2 = Channel(index=2, role=0) + channel3 = Channel(index=3, role=0) + channel4 = Channel(index=4, role=0) + channel5 = Channel(index=5, role=0) + channel6 = Channel(index=6, role=0) + channel7 = Channel(index=7, role=0) + channel8 = Channel(index=8, role=0) + + channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ] + + anode.channels = channels + i = anode._getAdminChannelIndex() + assert i == 0 + + +# TODO: should we check if we need to turn it off? +@pytest.mark.unit +def test_turnOffEncryptionOnPrimaryChannel(capsys): + """Turn off encryption when there is a psk.""" + anode = Node('foo', 'bar', noProto=True) + + channel1 = Channel(index=1, role=1) + channel1.settings.modem_config = 3 + # value from using "--ch-set psk 0x1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b " + channel1.settings.psk = b'\x1a\x1a\x1a\x1a++++\x1a\x1a\x1a\x1a++++\x1a\x1a\x1a\x1a++++\x1a\x1a\x1a\x1a++++' + + channel2 = Channel(index=2, role=0) + channel3 = Channel(index=3, role=0) + channel4 = Channel(index=4, role=0) + channel5 = Channel(index=5, role=0) + channel6 = Channel(index=6, role=0) + channel7 = Channel(index=7, role=0) + channel8 = Channel(index=8, role=0) + + channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ] + + anode.channels = channels + anode.turnOffEncryptionOnPrimaryChannel() + out, err = capsys.readouterr() + assert re.search(r'Writing modified channels to device', out) + assert err == '' + + +@pytest.mark.unit +def test_writeConfig_with_no_radioConfig(capsys): + """Test writeConfig with no radioConfig.""" + anode = Node('foo', 'bar', noProto=True) + + with pytest.raises(SystemExit) as pytest_wrapped_e: + anode.writeConfig() + assert pytest_wrapped_e.type == SystemExit + assert pytest_wrapped_e.value.code == 1 + out, err = capsys.readouterr() + assert re.search(r'Error: No RadioConfig has been read', out) + assert err == '' + + +@pytest.mark.unit +def test_writeConfig(caplog): + """Test writeConfig""" + anode = Node('foo', 'bar', noProto=True) + radioConfig = RadioConfig() + anode.radioConfig = radioConfig + + with caplog.at_level(logging.DEBUG): + anode.writeConfig() + assert re.search(r'Wrote config', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_requestChannel_not_localNode(caplog): + """Test _requestChannel()""" + iface = MagicMock(autospec=SerialInterface) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + mo.localNode.getChannelByName.return_value = None + mo.myInfo.max_channels = 8 + anode = Node(mo, 'bar', noProto=True) + with caplog.at_level(logging.DEBUG): + anode._requestChannel(0) + assert re.search(r'Requesting channel 0 info from remote node', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_requestChannel_localNode(caplog): + """Test _requestChannel()""" + iface = MagicMock(autospec=SerialInterface) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + mo.localNode.getChannelByName.return_value = None + mo.myInfo.max_channels = 8 + anode = Node(mo, 'bar', noProto=True) + + # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock + mo.localNode = anode + + with caplog.at_level(logging.DEBUG): + anode._requestChannel(0) + assert re.search(r'Requesting channel 0', caplog.text, re.MULTILINE) + assert not re.search(r'from remote node', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_onResponseRequestChannel(caplog): + """Test onResponseRequestChannel()""" + + channel1 = Channel(index=1, role=1) + channel1.settings.modem_config = 3 + channel1.settings.psk = b'\x01' + + msg1 = MagicMock(autospec=AdminMessage) + msg1.get_channel_response = channel1 + + msg2 = MagicMock(autospec=AdminMessage) + channel2 = Channel(index=2, role=0) # disabled + msg2.get_channel_response = channel2 + + # default primary channel + packet1 = { + 'from': 2475227164, + 'to': 2475227164, + 'decoded': { + 'portnum': 'ADMIN_APP', + 'payload': b':\t\x12\x05\x18\x03"\x01\x01\x18\x01', + 'requestId': 2615094405, + 'admin': { + 'getChannelResponse': { + 'settings': { + 'modemConfig': 'Bw125Cr48Sf4096', + 'psk': 'AQ==' + }, + 'role': 'PRIMARY' + }, + 'raw': msg1, + } + }, + 'id': 1692918436, + 'hopLimit': 3, + 'priority': + 'RELIABLE', + 'raw': 'fake', + 'fromId': '!9388f81c', + 'toId': '!9388f81c' + } + + # no other channels + packet2 = { + 'from': 2475227164, + 'to': 2475227164, + 'decoded': { + 'portnum': 'ADMIN_APP', + 'payload': b':\x04\x08\x02\x12\x00', + 'requestId': 743049663, + 'admin': { + 'getChannelResponse': { + 'index': 2, + 'settings': {} + }, + 'raw': msg2, + } + }, + 'id': 1692918456, + 'rxTime': 1640202239, + 'hopLimit': 3, + 'priority': 'RELIABLE', + 'raw': 'faked', + 'fromId': '!9388f81c', + 'toId': '!9388f81c' + } + + iface = MagicMock(autospec=SerialInterface) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + mo.localNode.getChannelByName.return_value = None + mo.myInfo.max_channels = 8 + anode = Node(mo, 'bar', noProto=True) + + radioConfig = RadioConfig() + anode.radioConfig = radioConfig + + # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock + mo.localNode = anode + + with caplog.at_level(logging.DEBUG): + anode.requestConfig() + anode.onResponseRequestChannel(packet1) + assert re.search(r'Received channel', caplog.text, re.MULTILINE) + anode.onResponseRequestChannel(packet2) + assert re.search(r'Received channel', caplog.text, re.MULTILINE) + assert re.search(r'Finished downloading channels', caplog.text, re.MULTILINE) + assert len(anode.channels) == 8 + assert anode.channels[0].settings.modem_config == 3 + assert anode.channels[1].settings.name == '' + assert anode.channels[2].settings.name == '' + assert anode.channels[3].settings.name == '' + assert anode.channels[4].settings.name == '' + assert anode.channels[5].settings.name == '' + assert anode.channels[6].settings.name == '' + assert anode.channels[7].settings.name == '' + + +@pytest.mark.unit +def test_onResponseRequestSetting(caplog): + """Test onResponseRequestSetting()""" + # Note: Split out the get_radio_response to a MagicMock + # so it could be "returned" (not really sure how to do that + # in a python dict. + amsg = MagicMock(autospec=AdminMessage) + amsg.get_radio_response = """{ + preferences { + phone_timeout_secs: 900 + ls_secs: 300 + position_broadcast_smart: true + position_flags: 35 + } +}""" + packet = { + 'from': 2475227164, + 'to': 2475227164, + 'decoded': { + 'portnum': 'ADMIN_APP', + 'payload': b'*\x0e\n\x0c0\x84\x07P\xac\x02\x88\x01\x01\xb0\t#', + 'requestId': 3145147848, + 'admin': { + 'getRadioResponse': { + 'preferences': { + 'phoneTimeoutSecs': 900, + 'lsSecs': 300, + 'positionBroadcastSmart': True, + 'positionFlags': 35 + } + }, + 'raw': amsg + }, + 'id': 365963704, + 'rxTime': 1640195197, + 'hopLimit': 3, + 'priority': 'RELIABLE', + 'raw': 'faked', + 'fromId': '!9388f81c', + 'toId': '!9388f81c' + } + } + iface = MagicMock(autospec=SerialInterface) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + mo.localNode.getChannelByName.return_value = None + mo.myInfo.max_channels = 8 + anode = Node(mo, 'bar', noProto=True) + + radioConfig = RadioConfig() + anode.radioConfig = radioConfig + + # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock + mo.localNode = anode + + with caplog.at_level(logging.DEBUG): + anode.onResponseRequestSettings(packet) + assert re.search(r'Received radio config, now fetching channels..', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_onResponseRequestSetting_with_error(capsys): + """Test onResponseRequestSetting() with an error""" + packet = { + 'from': 2475227164, + 'to': 2475227164, + 'decoded': { + 'portnum': 'ADMIN_APP', + 'payload': b'*\x0e\n\x0c0\x84\x07P\xac\x02\x88\x01\x01\xb0\t#', + 'requestId': 3145147848, + 'routing': { + 'errorReason': 'some made up error', + }, + 'admin': { + 'getRadioResponse': { + 'preferences': { + 'phoneTimeoutSecs': 900, + 'lsSecs': 300, + 'positionBroadcastSmart': True, + 'positionFlags': 35 + } + }, + }, + 'id': 365963704, + 'rxTime': 1640195197, + 'hopLimit': 3, + 'priority': 'RELIABLE', + 'fromId': '!9388f81c', + 'toId': '!9388f81c' + } + } + iface = MagicMock(autospec=SerialInterface) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + mo.localNode.getChannelByName.return_value = None + mo.myInfo.max_channels = 8 + anode = Node(mo, 'bar', noProto=True) + + radioConfig = RadioConfig() + anode.radioConfig = radioConfig + + # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock + mo.localNode = anode + + anode.onResponseRequestSettings(packet) + out, err = capsys.readouterr() + assert re.search(r'Error on response', out) + assert err == ''