Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ ignore-patterns=mqtt_pb2.py,channel_pb2.py,environmental_measurement_pb2.py,admi
# no Warning level messages displayed, use"--disable=all --enable=classes
# --disable=W"
#
disable=invalid-name,fixme,logging-fstring-interpolation,too-many-statements,too-many-branches,too-many-locals,no-member,f-string-without-interpolation,protected-access,no-self-use,pointless-string-statement,too-few-public-methods,broad-except,no-else-return,no-else-raise,bare-except
disable=invalid-name,fixme,logging-fstring-interpolation,too-many-statements,too-many-branches,too-many-locals,no-member,f-string-without-interpolation,protected-access,no-self-use,pointless-string-statement,too-few-public-methods,broad-except,no-else-return,no-else-raise,bare-except,too-many-public-methods


[BASIC]
Expand Down
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ lint:
slow:
pytest -m unit --durations=5

proto: FORCE
git submodule update --init --recursive
git pull --rebase
git submodule update --remote --merge
./bin/regen-protos.sh

# run the coverage report and open results in a browser
cov:
pytest --cov-report html --cov=meshtastic
Expand Down
17 changes: 17 additions & 0 deletions meshtastic/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,12 @@ def onConnected(interface):
print(f"Setting device owner short to {args.set_owner_short}")
interface.getNode(args.dest).setOwner(long_name=None, short_name=args.set_owner_short)

# TODO: add to export-config and configure
if args.set_canned_message:
closeNow = True
print(f"Setting canned plugin message to {args.set_canned_message}")
interface.getNode(args.dest).set_canned_message(args.set_canned_message)

if args.pos_fields:
# If --pos-fields invoked with args, set position fields
closeNow = True
Expand Down Expand Up @@ -510,6 +516,11 @@ def setSimpleChannel(modem_config):
print(f"Writing modified channels to device")
interface.getNode(args.dest).writeChannel(channelIndex)

if args.get_canned_message:
closeNow = True
print("")
interface.getNode(args.dest).get_canned_message()

if args.info:
print("")
# If we aren't trying to talk to our local node, don't show it
Expand Down Expand Up @@ -746,6 +757,9 @@ def initParser():
parser.add_argument("--info", help="Read and display the radio config information",
action="store_true")

parser.add_argument("--get-canned-message", help="Show the canned message plugin message",
action="store_true")

parser.add_argument("--nodes", help="Print Node List in a pretty formatted table",
action="store_true")

Expand Down Expand Up @@ -808,6 +822,9 @@ def initParser():
parser.add_argument(
"--set-owner", help="Set device owner name", action="store")

parser.add_argument(
"--set-canned-message", help="Set the canned messages plugin message (up to 1000 characters).", action="store")

parser.add_argument(
"--set-owner-short", help="Set device owner short name", action="store")

Expand Down
5 changes: 3 additions & 2 deletions meshtastic/mesh_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,12 @@ def sendData(self, data, destinationId=BROADCAST_ADDR,
meshPacket.decoded.payload = data
meshPacket.decoded.portnum = portNum
meshPacket.decoded.want_response = wantResponse
meshPacket.id = self._generatePacketId()

if onResponse is not None:
self._addResponseHandler(meshPacket.id, onResponse)
p = self._sendPacket(meshPacket, destinationId,
wantAck=wantAck, hopLimit=hopLimit)
if onResponse is not None:
self._addResponseHandler(p.id, onResponse)
return p

def sendPosition(self, latitude=0.0, longitude=0.0, altitude=0, timeSec=0,
Expand Down
172 changes: 169 additions & 3 deletions meshtastic/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import logging
import base64
import time
from google.protobuf.json_format import MessageToJson
from meshtastic import portnums_pb2, apponly_pb2, admin_pb2, channel_pb2
from meshtastic.util import pskToString, stripnl, Timeout, our_exit, fromPSK
Expand All @@ -24,6 +25,13 @@ def __init__(self, iface, nodeNum, noProto=False):
self.partialChannels = None
self.noProto = noProto

self.cannedPluginMessage = None

self.cannedPluginMessagePart1 = None
self.cannedPluginMessagePart2 = None
self.cannedPluginMessagePart3 = None
self.cannedPluginMessagePart4 = None

def showChannels(self):
"""Show human readable description of our channels."""
print("Channels:")
Expand Down Expand Up @@ -56,6 +64,14 @@ def requestConfig(self):
self.channels = None
self.partialChannels = [] # We keep our channels in a temp array until finished

# Note: We do not get the canned plugin message, unless get_canned_message() is called
self.cannedPluginMessage = None

self.cannedPluginMessagePart1 = None
self.cannedPluginMessagePart2 = None
self.cannedPluginMessagePart3 = None
self.cannedPluginMessagePart4 = None

self._requestSettings()

def turnOffEncryptionOnPrimaryChannel(self):
Expand All @@ -64,9 +80,9 @@ def turnOffEncryptionOnPrimaryChannel(self):
print("Writing modified channels to device")
self.writeChannel(0)

def waitForConfig(self):
def waitForConfig(self, attribute='channels'):
"""Block until radio config is received. Returns True if config has been received."""
return self._timeout.waitForSet(self, attrs=('radioConfig', 'channels'))
return self._timeout.waitForSet(self, attrs=('radioConfig', attribute))

def writeConfig(self):
"""Write the current (edited) radioConfig to the device"""
Expand Down Expand Up @@ -237,7 +253,7 @@ def onResponseRequestSettings(self, p):
"""Handle the response packet for requesting settings _requestSettings()"""
logging.debug(f'onResponseRequestSetting() p:{p}')
errorFound = False
if 'routing' in p["decoded"]:
if "routing" in p["decoded"]:
if p["decoded"]["routing"]["errorReason"] != "NONE":
errorFound = True
print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
Expand Down Expand Up @@ -268,6 +284,156 @@ def _requestSettings(self):

return self._sendAdmin(p, wantResponse=True, onResponse=self.onResponseRequestSettings)

def onResponseRequestCannedMessagePluginMessagePart1(self, p):
"""Handle the response packet for requesting canned message plugin message part 1"""
logging.debug(f'onResponseRequestCannedMessagePluginMessagePart1() p:{p}')
errorFound = False
if "routing" in p["decoded"]:
if p["decoded"]["routing"]["errorReason"] != "NONE":
errorFound = True
print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
if errorFound is False:
if "decoded" in p:
if "admin" in p["decoded"]:
if "raw" in p["decoded"]["admin"]:
self.cannedPluginMessagePart1 = p["decoded"]["admin"]["raw"].get_canned_message_plugin_part1_response
logging.debug(f'self.cannedPluginMessagePart1:{self.cannedPluginMessagePart1}')
self.gotResponse = True

def onResponseRequestCannedMessagePluginMessagePart2(self, p):
"""Handle the response packet for requesting canned message plugin message part 2"""
logging.debug(f'onResponseRequestCannedMessagePluginMessagePart2() p:{p}')
errorFound = False
if "routing" in p["decoded"]:
if p["decoded"]["routing"]["errorReason"] != "NONE":
errorFound = True
print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
if errorFound is False:
if "decoded" in p:
if "admin" in p["decoded"]:
if "raw" in p["decoded"]["admin"]:
self.cannedPluginMessagePart2 = p["decoded"]["admin"]["raw"].get_canned_message_plugin_part2_response
logging.debug(f'self.cannedPluginMessagePart2:{self.cannedPluginMessagePart2}')
self.gotResponse = True

def onResponseRequestCannedMessagePluginMessagePart3(self, p):
"""Handle the response packet for requesting canned message plugin message part 3"""
logging.debug(f'onResponseRequestCannedMessagePluginMessagePart3() p:{p}')
errorFound = False
if "routing" in p["decoded"]:
if p["decoded"]["routing"]["errorReason"] != "NONE":
errorFound = True
print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
if errorFound is False:
if "decoded" in p:
if "admin" in p["decoded"]:
if "raw" in p["decoded"]["admin"]:
self.cannedPluginMessagePart3 = p["decoded"]["admin"]["raw"].get_canned_message_plugin_part3_response
logging.debug(f'self.cannedPluginMessagePart3:{self.cannedPluginMessagePart3}')
self.gotResponse = True

def onResponseRequestCannedMessagePluginMessagePart4(self, p):
"""Handle the response packet for requesting canned message plugin message part 4"""
logging.debug(f'onResponseRequestCannedMessagePluginMessagePart4() p:{p}')
errorFound = False
if "routing" in p["decoded"]:
if p["decoded"]["routing"]["errorReason"] != "NONE":
errorFound = True
print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
if errorFound is False:
if "decoded" in p:
if "admin" in p["decoded"]:
if "raw" in p["decoded"]["admin"]:
self.cannedPluginMessagePart4 = p["decoded"]["admin"]["raw"].get_canned_message_plugin_part4_response
logging.debug(f'self.cannedPluginMessagePart4:{self.cannedPluginMessagePart4}')
self.gotResponse = True

def get_canned_message(self):
"""Get the canned message string. Concatenate all pieces together and return a single string."""
logging.debug(f'in get_canned_message()')
if not self.cannedPluginMessage:

p1 = admin_pb2.AdminMessage()
p1.get_canned_message_plugin_part1_request = True
self.gotResponse = False
self._sendAdmin(p1, wantResponse=True, onResponse=self.onResponseRequestCannedMessagePluginMessagePart1)
while self.gotResponse is False:
time.sleep(0.1)

p2 = admin_pb2.AdminMessage()
p2.get_canned_message_plugin_part2_request = True
self.gotResponse = False
self._sendAdmin(p2, wantResponse=True, onResponse=self.onResponseRequestCannedMessagePluginMessagePart2)
while self.gotResponse is False:
time.sleep(0.1)

p3 = admin_pb2.AdminMessage()
p3.get_canned_message_plugin_part3_request = True
self.gotResponse = False
self._sendAdmin(p3, wantResponse=True, onResponse=self.onResponseRequestCannedMessagePluginMessagePart3)
while self.gotResponse is False:
time.sleep(0.1)

p4 = admin_pb2.AdminMessage()
p4.get_canned_message_plugin_part4_request = True
self.gotResponse = False
self._sendAdmin(p4, wantResponse=True, onResponse=self.onResponseRequestCannedMessagePluginMessagePart4)
while self.gotResponse is False:
time.sleep(0.1)

# TODO: This feels wrong to have a sleep here. Is there a way to ensure that
# all requests are complete? Perhaps change to a while loop any parts are None... maybe?
time.sleep(3)

logging.debug(f'self.cannedPluginMessagePart1:{self.cannedPluginMessagePart1}')
logging.debug(f'self.cannedPluginMessagePart2:{self.cannedPluginMessagePart2}')
logging.debug(f'self.cannedPluginMessagePart3:{self.cannedPluginMessagePart3}')
logging.debug(f'self.cannedPluginMessagePart4:{self.cannedPluginMessagePart4}')

self.cannedPluginMessage = ""
if self.cannedPluginMessagePart1:
self.cannedPluginMessage += self.cannedPluginMessagePart1
if self.cannedPluginMessagePart2:
self.cannedPluginMessage += self.cannedPluginMessagePart2
if self.cannedPluginMessagePart3:
self.cannedPluginMessage += self.cannedPluginMessagePart3
if self.cannedPluginMessagePart4:
self.cannedPluginMessage += self.cannedPluginMessagePart4

print(f'canned_plugin_message:{self.cannedPluginMessage}')
logging.debug(f'canned_plugin_message:{self.cannedPluginMessage}')
return self.cannedPluginMessage

def set_canned_message(self, message):
"""Set the canned message. Split into parts of 200 chars each."""

if len(message) > 800:
our_exit("Warning: The canned message must be less than 800 characters.")

# split into chunks
chunks = []
chunks_size = 200
for i in range(0, len(message), chunks_size):
chunks.append(message[i: i + chunks_size])

# for each chunk, send a message to set the values
#for i in range(0, len(chunks)):
for i, chunk in enumerate(chunks):
p = admin_pb2.AdminMessage()

# TODO: should be a way to improve this
if i == 0:
p.set_canned_message_plugin_part1 = chunk
elif i == 1:
p.set_canned_message_plugin_part2 = chunk
elif i == 2:
p.set_canned_message_plugin_part3 = chunk
elif i == 3:
p.set_canned_message_plugin_part4 = chunk

logging.debug(f"Setting canned message '{chunk}' part {i+1}")
self._sendAdmin(p)

def exitSimulator(self):
"""Tell a simulator node to exit (this message
is ignored for other nodes)"""
Expand Down
37 changes: 37 additions & 0 deletions meshtastic/tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,43 @@ def test_main_set_owner_short_to_bob(capsys):
mo.assert_called()


@pytest.mark.unit
@pytest.mark.usefixtures("reset_globals")
def test_main_set_canned_messages(capsys):
"""Test --set-canned-message """
sys.argv = ['', '--set-canned-message', 'foo']
Globals.getInstance().set_args(sys.argv)

iface = MagicMock(autospec=SerialInterface)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
main()
out, err = capsys.readouterr()
assert re.search(r'Connected to radio', out, re.MULTILINE)
assert re.search(r'Setting canned plugin message to foo', out, re.MULTILINE)
assert err == ''
mo.assert_called()


@pytest.mark.unit
@pytest.mark.usefixtures("reset_globals")
def test_main_get_canned_messages(capsys, caplog, iface_with_nodes):
"""Test --get-canned-message """
sys.argv = ['', '--get-canned-message']
Globals.getInstance().set_args(sys.argv)

iface = iface_with_nodes
iface.localNode.cannedPluginMessage = 'foo'

with caplog.at_level(logging.DEBUG):
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
main()
out, err = capsys.readouterr()
assert re.search(r'Connected to radio', out, re.MULTILINE)
assert re.search(r'canned_plugin_message:foo', out, re.MULTILINE)
assert err == ''
mo.assert_called()


@pytest.mark.unit
@pytest.mark.usefixtures("reset_globals")
def test_main_set_ham_to_KI123(capsys):
Expand Down
Loading