From b56a054f5022df30fa8c62fc093abde4ab026e2a Mon Sep 17 00:00:00 2001 From: Balazs Kelemen <10376327+prampec@users.noreply.github.com> Date: Mon, 21 Feb 2022 06:36:26 +0100 Subject: [PATCH] Canned message - Extend messages length --- .pylintrc | 2 +- Makefile | 6 + meshtastic/__main__.py | 17 ++ meshtastic/mesh_interface.py | 5 +- meshtastic/node.py | 172 ++++++++++++- meshtastic/tests/test_main.py | 37 +++ meshtastic/tests/test_node.py | 465 ++++++++++++++++++++++++++++++++++ 7 files changed, 698 insertions(+), 6 deletions(-) diff --git a/.pylintrc b/.pylintrc index 981c40c9..2c3fca93 100644 --- a/.pylintrc +++ b/.pylintrc @@ -23,7 +23,7 @@ ignore-patterns=mqtt_pb2.py,channel_pb2.py,environmental_measurement_pb2.py,admi # no Warning level messages displayed, use"--disable=all --enable=classes # --disable=W" # -disable=invalid-name,fixme,logging-fstring-interpolation,too-many-statements,too-many-branches,too-many-locals,no-member,f-string-without-interpolation,protected-access,no-self-use,pointless-string-statement,too-few-public-methods,broad-except,no-else-return,no-else-raise,bare-except +disable=invalid-name,fixme,logging-fstring-interpolation,too-many-statements,too-many-branches,too-many-locals,no-member,f-string-without-interpolation,protected-access,no-self-use,pointless-string-statement,too-few-public-methods,broad-except,no-else-return,no-else-raise,bare-except,too-many-public-methods [BASIC] diff --git a/Makefile b/Makefile index 14ad4d64..1826382d 100644 --- a/Makefile +++ b/Makefile @@ -22,6 +22,12 @@ lint: slow: pytest -m unit --durations=5 +proto: FORCE + git submodule update --init --recursive + git pull --rebase + git submodule update --remote --merge + ./bin/regen-protos.sh + # run the coverage report and open results in a browser cov: pytest --cov-report html --cov=meshtastic diff --git a/meshtastic/__main__.py b/meshtastic/__main__.py index 2d41d888..a4d704d1 100644 --- a/meshtastic/__main__.py +++ b/meshtastic/__main__.py @@ -200,6 +200,12 @@ def onConnected(interface): print(f"Setting device owner short to {args.set_owner_short}") interface.getNode(args.dest).setOwner(long_name=None, short_name=args.set_owner_short) + # TODO: add to export-config and configure + if args.set_canned_message: + closeNow = True + print(f"Setting canned plugin message to {args.set_canned_message}") + interface.getNode(args.dest).set_canned_message(args.set_canned_message) + if args.pos_fields: # If --pos-fields invoked with args, set position fields closeNow = True @@ -510,6 +516,11 @@ def setSimpleChannel(modem_config): print(f"Writing modified channels to device") interface.getNode(args.dest).writeChannel(channelIndex) + if args.get_canned_message: + closeNow = True + print("") + interface.getNode(args.dest).get_canned_message() + if args.info: print("") # If we aren't trying to talk to our local node, don't show it @@ -746,6 +757,9 @@ def initParser(): parser.add_argument("--info", help="Read and display the radio config information", action="store_true") + parser.add_argument("--get-canned-message", help="Show the canned message plugin message", + action="store_true") + parser.add_argument("--nodes", help="Print Node List in a pretty formatted table", action="store_true") @@ -808,6 +822,9 @@ def initParser(): parser.add_argument( "--set-owner", help="Set device owner name", action="store") + parser.add_argument( + "--set-canned-message", help="Set the canned messages plugin message (up to 1000 characters).", action="store") + parser.add_argument( "--set-owner-short", help="Set device owner short name", action="store") diff --git a/meshtastic/mesh_interface.py b/meshtastic/mesh_interface.py index 39e67e0d..2866f025 100644 --- a/meshtastic/mesh_interface.py +++ b/meshtastic/mesh_interface.py @@ -254,11 +254,12 @@ def sendData(self, data, destinationId=BROADCAST_ADDR, meshPacket.decoded.payload = data meshPacket.decoded.portnum = portNum meshPacket.decoded.want_response = wantResponse + meshPacket.id = self._generatePacketId() + if onResponse is not None: + self._addResponseHandler(meshPacket.id, onResponse) p = self._sendPacket(meshPacket, destinationId, wantAck=wantAck, hopLimit=hopLimit) - if onResponse is not None: - self._addResponseHandler(p.id, onResponse) return p def sendPosition(self, latitude=0.0, longitude=0.0, altitude=0, timeSec=0, diff --git a/meshtastic/node.py b/meshtastic/node.py index 5eb65324..2623c7c6 100644 --- a/meshtastic/node.py +++ b/meshtastic/node.py @@ -3,6 +3,7 @@ import logging import base64 +import time from google.protobuf.json_format import MessageToJson from meshtastic import portnums_pb2, apponly_pb2, admin_pb2, channel_pb2 from meshtastic.util import pskToString, stripnl, Timeout, our_exit, fromPSK @@ -24,6 +25,13 @@ def __init__(self, iface, nodeNum, noProto=False): self.partialChannels = None self.noProto = noProto + self.cannedPluginMessage = None + + self.cannedPluginMessagePart1 = None + self.cannedPluginMessagePart2 = None + self.cannedPluginMessagePart3 = None + self.cannedPluginMessagePart4 = None + def showChannels(self): """Show human readable description of our channels.""" print("Channels:") @@ -56,6 +64,14 @@ def requestConfig(self): self.channels = None self.partialChannels = [] # We keep our channels in a temp array until finished + # Note: We do not get the canned plugin message, unless get_canned_message() is called + self.cannedPluginMessage = None + + self.cannedPluginMessagePart1 = None + self.cannedPluginMessagePart2 = None + self.cannedPluginMessagePart3 = None + self.cannedPluginMessagePart4 = None + self._requestSettings() def turnOffEncryptionOnPrimaryChannel(self): @@ -64,9 +80,9 @@ def turnOffEncryptionOnPrimaryChannel(self): print("Writing modified channels to device") self.writeChannel(0) - def waitForConfig(self): + def waitForConfig(self, attribute='channels'): """Block until radio config is received. Returns True if config has been received.""" - return self._timeout.waitForSet(self, attrs=('radioConfig', 'channels')) + return self._timeout.waitForSet(self, attrs=('radioConfig', attribute)) def writeConfig(self): """Write the current (edited) radioConfig to the device""" @@ -237,7 +253,7 @@ def onResponseRequestSettings(self, p): """Handle the response packet for requesting settings _requestSettings()""" logging.debug(f'onResponseRequestSetting() p:{p}') errorFound = False - if 'routing' in p["decoded"]: + if "routing" in p["decoded"]: if p["decoded"]["routing"]["errorReason"] != "NONE": errorFound = True print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}') @@ -268,6 +284,156 @@ def _requestSettings(self): return self._sendAdmin(p, wantResponse=True, onResponse=self.onResponseRequestSettings) + def onResponseRequestCannedMessagePluginMessagePart1(self, p): + """Handle the response packet for requesting canned message plugin message part 1""" + logging.debug(f'onResponseRequestCannedMessagePluginMessagePart1() p:{p}') + errorFound = False + if "routing" in p["decoded"]: + if p["decoded"]["routing"]["errorReason"] != "NONE": + errorFound = True + print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}') + if errorFound is False: + if "decoded" in p: + if "admin" in p["decoded"]: + if "raw" in p["decoded"]["admin"]: + self.cannedPluginMessagePart1 = p["decoded"]["admin"]["raw"].get_canned_message_plugin_part1_response + logging.debug(f'self.cannedPluginMessagePart1:{self.cannedPluginMessagePart1}') + self.gotResponse = True + + def onResponseRequestCannedMessagePluginMessagePart2(self, p): + """Handle the response packet for requesting canned message plugin message part 2""" + logging.debug(f'onResponseRequestCannedMessagePluginMessagePart2() p:{p}') + errorFound = False + if "routing" in p["decoded"]: + if p["decoded"]["routing"]["errorReason"] != "NONE": + errorFound = True + print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}') + if errorFound is False: + if "decoded" in p: + if "admin" in p["decoded"]: + if "raw" in p["decoded"]["admin"]: + self.cannedPluginMessagePart2 = p["decoded"]["admin"]["raw"].get_canned_message_plugin_part2_response + logging.debug(f'self.cannedPluginMessagePart2:{self.cannedPluginMessagePart2}') + self.gotResponse = True + + def onResponseRequestCannedMessagePluginMessagePart3(self, p): + """Handle the response packet for requesting canned message plugin message part 3""" + logging.debug(f'onResponseRequestCannedMessagePluginMessagePart3() p:{p}') + errorFound = False + if "routing" in p["decoded"]: + if p["decoded"]["routing"]["errorReason"] != "NONE": + errorFound = True + print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}') + if errorFound is False: + if "decoded" in p: + if "admin" in p["decoded"]: + if "raw" in p["decoded"]["admin"]: + self.cannedPluginMessagePart3 = p["decoded"]["admin"]["raw"].get_canned_message_plugin_part3_response + logging.debug(f'self.cannedPluginMessagePart3:{self.cannedPluginMessagePart3}') + self.gotResponse = True + + def onResponseRequestCannedMessagePluginMessagePart4(self, p): + """Handle the response packet for requesting canned message plugin message part 4""" + logging.debug(f'onResponseRequestCannedMessagePluginMessagePart4() p:{p}') + errorFound = False + if "routing" in p["decoded"]: + if p["decoded"]["routing"]["errorReason"] != "NONE": + errorFound = True + print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}') + if errorFound is False: + if "decoded" in p: + if "admin" in p["decoded"]: + if "raw" in p["decoded"]["admin"]: + self.cannedPluginMessagePart4 = p["decoded"]["admin"]["raw"].get_canned_message_plugin_part4_response + logging.debug(f'self.cannedPluginMessagePart4:{self.cannedPluginMessagePart4}') + self.gotResponse = True + + def get_canned_message(self): + """Get the canned message string. Concatenate all pieces together and return a single string.""" + logging.debug(f'in get_canned_message()') + if not self.cannedPluginMessage: + + p1 = admin_pb2.AdminMessage() + p1.get_canned_message_plugin_part1_request = True + self.gotResponse = False + self._sendAdmin(p1, wantResponse=True, onResponse=self.onResponseRequestCannedMessagePluginMessagePart1) + while self.gotResponse is False: + time.sleep(0.1) + + p2 = admin_pb2.AdminMessage() + p2.get_canned_message_plugin_part2_request = True + self.gotResponse = False + self._sendAdmin(p2, wantResponse=True, onResponse=self.onResponseRequestCannedMessagePluginMessagePart2) + while self.gotResponse is False: + time.sleep(0.1) + + p3 = admin_pb2.AdminMessage() + p3.get_canned_message_plugin_part3_request = True + self.gotResponse = False + self._sendAdmin(p3, wantResponse=True, onResponse=self.onResponseRequestCannedMessagePluginMessagePart3) + while self.gotResponse is False: + time.sleep(0.1) + + p4 = admin_pb2.AdminMessage() + p4.get_canned_message_plugin_part4_request = True + self.gotResponse = False + self._sendAdmin(p4, wantResponse=True, onResponse=self.onResponseRequestCannedMessagePluginMessagePart4) + while self.gotResponse is False: + time.sleep(0.1) + + # TODO: This feels wrong to have a sleep here. Is there a way to ensure that + # all requests are complete? Perhaps change to a while loop any parts are None... maybe? + time.sleep(3) + + logging.debug(f'self.cannedPluginMessagePart1:{self.cannedPluginMessagePart1}') + logging.debug(f'self.cannedPluginMessagePart2:{self.cannedPluginMessagePart2}') + logging.debug(f'self.cannedPluginMessagePart3:{self.cannedPluginMessagePart3}') + logging.debug(f'self.cannedPluginMessagePart4:{self.cannedPluginMessagePart4}') + + self.cannedPluginMessage = "" + if self.cannedPluginMessagePart1: + self.cannedPluginMessage += self.cannedPluginMessagePart1 + if self.cannedPluginMessagePart2: + self.cannedPluginMessage += self.cannedPluginMessagePart2 + if self.cannedPluginMessagePart3: + self.cannedPluginMessage += self.cannedPluginMessagePart3 + if self.cannedPluginMessagePart4: + self.cannedPluginMessage += self.cannedPluginMessagePart4 + + print(f'canned_plugin_message:{self.cannedPluginMessage}') + logging.debug(f'canned_plugin_message:{self.cannedPluginMessage}') + return self.cannedPluginMessage + + def set_canned_message(self, message): + """Set the canned message. Split into parts of 200 chars each.""" + + if len(message) > 800: + our_exit("Warning: The canned message must be less than 800 characters.") + + # split into chunks + chunks = [] + chunks_size = 200 + for i in range(0, len(message), chunks_size): + chunks.append(message[i: i + chunks_size]) + + # for each chunk, send a message to set the values + #for i in range(0, len(chunks)): + for i, chunk in enumerate(chunks): + p = admin_pb2.AdminMessage() + + # TODO: should be a way to improve this + if i == 0: + p.set_canned_message_plugin_part1 = chunk + elif i == 1: + p.set_canned_message_plugin_part2 = chunk + elif i == 2: + p.set_canned_message_plugin_part3 = chunk + elif i == 3: + p.set_canned_message_plugin_part4 = chunk + + logging.debug(f"Setting canned message '{chunk}' part {i+1}") + self._sendAdmin(p) + def exitSimulator(self): """Tell a simulator node to exit (this message is ignored for other nodes)""" diff --git a/meshtastic/tests/test_main.py b/meshtastic/tests/test_main.py index c11e506a..58d17215 100644 --- a/meshtastic/tests/test_main.py +++ b/meshtastic/tests/test_main.py @@ -442,6 +442,43 @@ def test_main_set_owner_short_to_bob(capsys): mo.assert_called() +@pytest.mark.unit +@pytest.mark.usefixtures("reset_globals") +def test_main_set_canned_messages(capsys): + """Test --set-canned-message """ + sys.argv = ['', '--set-canned-message', 'foo'] + Globals.getInstance().set_args(sys.argv) + + iface = MagicMock(autospec=SerialInterface) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + main() + out, err = capsys.readouterr() + assert re.search(r'Connected to radio', out, re.MULTILINE) + assert re.search(r'Setting canned plugin message to foo', out, re.MULTILINE) + assert err == '' + mo.assert_called() + + +@pytest.mark.unit +@pytest.mark.usefixtures("reset_globals") +def test_main_get_canned_messages(capsys, caplog, iface_with_nodes): + """Test --get-canned-message """ + sys.argv = ['', '--get-canned-message'] + Globals.getInstance().set_args(sys.argv) + + iface = iface_with_nodes + iface.localNode.cannedPluginMessage = 'foo' + + with caplog.at_level(logging.DEBUG): + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + main() + out, err = capsys.readouterr() + assert re.search(r'Connected to radio', out, re.MULTILINE) + assert re.search(r'canned_plugin_message:foo', out, re.MULTILINE) + assert err == '' + mo.assert_called() + + @pytest.mark.unit @pytest.mark.usefixtures("reset_globals") def test_main_set_ham_to_KI123(capsys): diff --git a/meshtastic/tests/test_node.py b/meshtastic/tests/test_node.py index 97612744..842d3c25 100644 --- a/meshtastic/tests/test_node.py +++ b/meshtastic/tests/test_node.py @@ -11,6 +11,9 @@ from ..admin_pb2 import AdminMessage from ..channel_pb2 import Channel from ..radioconfig_pb2 import RadioConfig +from ..cannedmessages_pb2 import (CannedMessagePluginMessagePart1, CannedMessagePluginMessagePart2, + CannedMessagePluginMessagePart3, CannedMessagePluginMessagePart4, + CannedMessagePluginMessagePart5) from ..util import Timeout @@ -43,6 +46,122 @@ def test_node_requestConfig(capsys): assert err == '' +@pytest.mark.unit +def test_node_get_canned_message_with_all_parts(capsys): + """Test run get_canned_message()""" + iface = MagicMock(autospec=SerialInterface) + amesg = MagicMock(autospec=AdminMessage) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg): + # we have a sleep in this method, so override it so it goes fast + with patch('time.sleep'): + anode = Node(mo, 'bar') + anode.cannedPluginMessagePart1 = 'a' + anode.cannedPluginMessagePart2 = 'b' + anode.cannedPluginMessagePart3 = 'c' + anode.cannedPluginMessagePart4 = 'd' + anode.cannedPluginMessagePart5 = 'e' + anode.get_canned_message() + out, err = capsys.readouterr() + assert re.search(r'canned_plugin_message:abcde', out, re.MULTILINE) + assert err == '' + + +@pytest.mark.unit +def test_node_get_canned_message_with_some_parts(capsys): + """Test run get_canned_message()""" + iface = MagicMock(autospec=SerialInterface) + amesg = MagicMock(autospec=AdminMessage) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg): + # we have a sleep in this method, so override it so it goes fast + with patch('time.sleep'): + anode = Node(mo, 'bar') + anode.cannedPluginMessagePart1 = 'a' + anode.get_canned_message() + out, err = capsys.readouterr() + assert re.search(r'canned_plugin_message:a', out, re.MULTILINE) + assert err == '' + + +@pytest.mark.unit +def test_node_set_canned_message_one_part(caplog): + """Test run set_canned_message()""" + iface = MagicMock(autospec=SerialInterface) + amesg = MagicMock(autospec=AdminMessage) + with caplog.at_level(logging.DEBUG): + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg): + anode = Node(mo, 'bar') + anode.set_canned_message('foo') + assert re.search(r"Setting canned message 'foo' part 1", caplog.text, re.MULTILINE) + assert not re.search(r"Setting canned message '' part 2", caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_node_set_canned_message_200(caplog): + """Test run set_canned_message() 200 characters long""" + iface = MagicMock(autospec=SerialInterface) + amesg = MagicMock(autospec=AdminMessage) + with caplog.at_level(logging.DEBUG): + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg): + anode = Node(mo, 'bar') + message_200_chars_long = 'a' * 200 + anode.set_canned_message(message_200_chars_long) + assert re.search(r" part 1", caplog.text, re.MULTILINE) + assert not re.search(r"Setting canned message '' part 2", caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_node_set_canned_message_201(caplog): + """Test run set_canned_message() 201 characters long""" + iface = MagicMock(autospec=SerialInterface) + amesg = MagicMock(autospec=AdminMessage) + with caplog.at_level(logging.DEBUG): + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg): + anode = Node(mo, 'bar') + message_201_chars_long = 'a' * 201 + anode.set_canned_message(message_201_chars_long) + assert re.search(r" part 1", caplog.text, re.MULTILINE) + assert re.search(r"Setting canned message 'a' part 2", caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_node_set_canned_message_1000(caplog): + """Test run set_canned_message() 1000 characters long""" + iface = MagicMock(autospec=SerialInterface) + amesg = MagicMock(autospec=AdminMessage) + with caplog.at_level(logging.DEBUG): + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg): + anode = Node(mo, 'bar') + message_1000_chars_long = 'a' * 1000 + anode.set_canned_message(message_1000_chars_long) + assert re.search(r" part 1", caplog.text, re.MULTILINE) + assert re.search(r" part 2", caplog.text, re.MULTILINE) + assert re.search(r" part 3", caplog.text, re.MULTILINE) + assert re.search(r" part 4", caplog.text, re.MULTILINE) + assert re.search(r" part 5", caplog.text, re.MULTILINE) + + +@pytest.mark.unit +def test_node_set_canned_message_1001(capsys): + """Test run set_canned_message() 1001 characters long""" + iface = MagicMock(autospec=SerialInterface) + with pytest.raises(SystemExit) as pytest_wrapped_e: + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + anode = Node(mo, 'bar') + message_1001_chars_long = 'a' * 1001 + anode.set_canned_message(message_1001_chars_long) + assert pytest_wrapped_e.type == SystemExit + assert pytest_wrapped_e.value.code == 1 + out, err = capsys.readouterr() + assert re.search(r'Warning: The canned message', out, re.MULTILINE) + assert err == '' + + @pytest.mark.unit def test_setOwner_and_team(caplog): """Test setOwner""" @@ -694,6 +813,352 @@ def test_requestChannel_localNode(caplog): assert not re.search(r'from remote node', caplog.text, re.MULTILINE) +@pytest.mark.unit +def test_onResponseRequestCannedMessagePluginMesagePart1(caplog): + """Test onResponseRequestCannedMessagePluginMessagePart1()""" + + part1 = CannedMessagePluginMessagePart1() + part1.text = 'foo1' + + msg1 = MagicMock(autospec=AdminMessage) + msg1.get_canned_message_plugin_part1_response = part1 + + packet = { + 'from': 682968612, + 'to': 682968612, + 'decoded': { + 'portnum': 'ADMIN_APP', + 'payload': 'faked', + 'requestId': 927039000, + 'admin': { + 'getCannedMessagePluginPart1Response': {'text': 'foo1'}, + 'raw': msg1 + } + }, + 'id': 589440320, + 'rxTime': 1642710843, + 'hopLimit': 3, + 'priority': 'RELIABLE', + 'raw': 'faked', + 'fromId': '!28b54624', + 'toId': '!28b54624' + } + + iface = MagicMock(autospec=SerialInterface) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + anode = Node(mo, 'bar', noProto=True) + # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock + mo.localNode = anode + + with caplog.at_level(logging.DEBUG): + anode.onResponseRequestCannedMessagePluginMessagePart1(packet) + assert re.search(r'onResponseRequestCannedMessagePluginMessagePart1', caplog.text, re.MULTILINE) + assert anode.cannedPluginMessagePart1 == 'foo1' + + +@pytest.mark.unit +def test_onResponseRequestCannedMessagePluginMesagePart2(caplog): + """Test onResponseRequestCannedMessagePluginMessagePart2()""" + + part2 = CannedMessagePluginMessagePart2() + part2.text = 'foo2' + + msg2 = MagicMock(autospec=AdminMessage) + msg2.get_canned_message_plugin_part2_response = part2 + + packet = { + 'from': 682968612, + 'to': 682968612, + 'decoded': { + 'portnum': 'ADMIN_APP', + 'payload': 'faked', + 'requestId': 927039000, + 'admin': { + 'getCannedMessagePluginPart2Response': {'text': 'foo2'}, + 'raw': msg2 + } + }, + 'id': 589440320, + 'rxTime': 1642710843, + 'hopLimit': 3, + 'priority': 'RELIABLE', + 'raw': 'faked', + 'fromId': '!28b54624', + 'toId': '!28b54624' + } + + iface = MagicMock(autospec=SerialInterface) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + anode = Node(mo, 'bar', noProto=True) + # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock + mo.localNode = anode + + with caplog.at_level(logging.DEBUG): + anode.onResponseRequestCannedMessagePluginMessagePart2(packet) + assert re.search(r'onResponseRequestCannedMessagePluginMessagePart2', caplog.text, re.MULTILINE) + assert anode.cannedPluginMessagePart2 == 'foo2' + + +@pytest.mark.unit +def test_onResponseRequestCannedMessagePluginMesagePart3(caplog): + """Test onResponseRequestCannedMessagePluginMessagePart3()""" + + part3 = CannedMessagePluginMessagePart3() + part3.text = 'foo3' + + msg3 = MagicMock(autospec=AdminMessage) + msg3.get_canned_message_plugin_part3_response = part3 + + packet = { + 'from': 682968612, + 'to': 682968612, + 'decoded': { + 'portnum': 'ADMIN_APP', + 'payload': 'faked', + 'requestId': 927039000, + 'admin': { + 'getCannedMessagePluginPart3Response': {'text': 'foo3'}, + 'raw': msg3 + } + }, + 'id': 589440320, + 'rxTime': 1642710843, + 'hopLimit': 3, + 'priority': 'RELIABLE', + 'raw': 'faked', + 'fromId': '!28b54624', + 'toId': '!28b54624' + } + + iface = MagicMock(autospec=SerialInterface) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + anode = Node(mo, 'bar', noProto=True) + # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock + mo.localNode = anode + + with caplog.at_level(logging.DEBUG): + anode.onResponseRequestCannedMessagePluginMessagePart3(packet) + assert re.search(r'onResponseRequestCannedMessagePluginMessagePart3', caplog.text, re.MULTILINE) + assert anode.cannedPluginMessagePart3 == 'foo3' + + +@pytest.mark.unit +def test_onResponseRequestCannedMessagePluginMesagePart4(caplog): + """Test onResponseRequestCannedMessagePluginMessagePart4()""" + + part4 = CannedMessagePluginMessagePart4() + part4.text = 'foo4' + + msg4 = MagicMock(autospec=AdminMessage) + msg4.get_canned_message_plugin_part4_response = part4 + + packet = { + 'from': 682968612, + 'to': 682968612, + 'decoded': { + 'portnum': 'ADMIN_APP', + 'payload': 'faked', + 'requestId': 927039000, + 'admin': { + 'getCannedMessagePluginPart4Response': {'text': 'foo4'}, + 'raw': msg4 + } + }, + 'id': 589440320, + 'rxTime': 1642710843, + 'hopLimit': 3, + 'priority': 'RELIABLE', + 'raw': 'faked', + 'fromId': '!28b54624', + 'toId': '!28b54624' + } + + iface = MagicMock(autospec=SerialInterface) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + anode = Node(mo, 'bar', noProto=True) + # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock + mo.localNode = anode + + with caplog.at_level(logging.DEBUG): + anode.onResponseRequestCannedMessagePluginMessagePart4(packet) + assert re.search(r'onResponseRequestCannedMessagePluginMessagePart4', caplog.text, re.MULTILINE) + assert anode.cannedPluginMessagePart4 == 'foo4' + + +@pytest.mark.unit +def test_onResponseRequestCannedMessagePluginMesagePart5(caplog): + """Test onResponseRequestCannedMessagePluginMessagePart5()""" + + part5 = CannedMessagePluginMessagePart5() + part5.text = 'foo5' + + msg5 = MagicMock(autospec=AdminMessage) + msg5.get_canned_message_plugin_part5_response = part5 + + + packet = { + 'from': 682968612, + 'to': 682968612, + 'decoded': { + 'portnum': 'ADMIN_APP', + 'payload': 'faked', + 'requestId': 927039000, + 'admin': { + 'getCannedMessagePluginPart5Response': {'text': 'foo5'}, + 'raw': msg5 + } + }, + 'id': 589440320, + 'rxTime': 1642710843, + 'hopLimit': 3, + 'priority': 'RELIABLE', + 'raw': 'faked', + 'fromId': '!28b54624', + 'toId': '!28b54624' + } + + iface = MagicMock(autospec=SerialInterface) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + anode = Node(mo, 'bar', noProto=True) + # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock + mo.localNode = anode + + with caplog.at_level(logging.DEBUG): + anode.onResponseRequestCannedMessagePluginMessagePart5(packet) + assert re.search(r'onResponseRequestCannedMessagePluginMessagePart5', caplog.text, re.MULTILINE) + assert anode.cannedPluginMessagePart5 == 'foo5' + + +@pytest.mark.unit +def test_onResponseRequestCannedMessagePluginMesagePart1_error(caplog, capsys): + """Test onResponseRequestCannedMessagePluginMessagePart1() with error""" + + packet = { + 'decoded': { + 'routing': { + 'errorReason': 'some made up error', + }, + }, + } + + iface = MagicMock(autospec=SerialInterface) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + anode = Node(mo, 'bar', noProto=True) + # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock + mo.localNode = anode + + with caplog.at_level(logging.DEBUG): + anode.onResponseRequestCannedMessagePluginMessagePart1(packet) + assert re.search(r'onResponseRequestCannedMessagePluginMessagePart1', caplog.text, re.MULTILINE) + out, err = capsys.readouterr() + assert re.search(r'Error on response', out) + assert err == '' + + +@pytest.mark.unit +def test_onResponseRequestCannedMessagePluginMesagePart2_error(caplog, capsys): + """Test onResponseRequestCannedMessagePluginMessagePart2() with error""" + + packet = { + 'decoded': { + 'routing': { + 'errorReason': 'some made up error', + }, + }, + } + + iface = MagicMock(autospec=SerialInterface) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + anode = Node(mo, 'bar', noProto=True) + # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock + mo.localNode = anode + + with caplog.at_level(logging.DEBUG): + anode.onResponseRequestCannedMessagePluginMessagePart2(packet) + assert re.search(r'onResponseRequestCannedMessagePluginMessagePart2', caplog.text, re.MULTILINE) + out, err = capsys.readouterr() + assert re.search(r'Error on response', out) + assert err == '' + + +@pytest.mark.unit +def test_onResponseRequestCannedMessagePluginMesagePart3_error(caplog, capsys): + """Test onResponseRequestCannedMessagePluginMessagePart3() with error""" + + packet = { + 'decoded': { + 'routing': { + 'errorReason': 'some made up error', + }, + }, + } + + iface = MagicMock(autospec=SerialInterface) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + anode = Node(mo, 'bar', noProto=True) + # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock + mo.localNode = anode + + with caplog.at_level(logging.DEBUG): + anode.onResponseRequestCannedMessagePluginMessagePart3(packet) + assert re.search(r'onResponseRequestCannedMessagePluginMessagePart3', caplog.text, re.MULTILINE) + out, err = capsys.readouterr() + assert re.search(r'Error on response', out) + assert err == '' + + +@pytest.mark.unit +def test_onResponseRequestCannedMessagePluginMesagePart4_error(caplog, capsys): + """Test onResponseRequestCannedMessagePluginMessagePart4() with error""" + + packet = { + 'decoded': { + 'routing': { + 'errorReason': 'some made up error', + }, + }, + } + + iface = MagicMock(autospec=SerialInterface) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + anode = Node(mo, 'bar', noProto=True) + # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock + mo.localNode = anode + + with caplog.at_level(logging.DEBUG): + anode.onResponseRequestCannedMessagePluginMessagePart4(packet) + assert re.search(r'onResponseRequestCannedMessagePluginMessagePart4', caplog.text, re.MULTILINE) + out, err = capsys.readouterr() + assert re.search(r'Error on response', out) + assert err == '' + + +@pytest.mark.unit +def test_onResponseRequestCannedMessagePluginMesagePart5_error(caplog, capsys): + """Test onResponseRequestCannedMessagePluginMessagePart5() with error""" + + packet = { + 'decoded': { + 'routing': { + 'errorReason': 'some made up error', + }, + }, + } + + iface = MagicMock(autospec=SerialInterface) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + anode = Node(mo, 'bar', noProto=True) + # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock + mo.localNode = anode + + with caplog.at_level(logging.DEBUG): + anode.onResponseRequestCannedMessagePluginMessagePart5(packet) + assert re.search(r'onResponseRequestCannedMessagePluginMessagePart5', caplog.text, re.MULTILINE) + out, err = capsys.readouterr() + assert re.search(r'Error on response', out) + assert err == '' + + @pytest.mark.unit def test_onResponseRequestChannel(caplog): """Test onResponseRequestChannel()"""