Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ __pycache__
.vscode
dist
*.egg-info
*.swp
108 changes: 108 additions & 0 deletions examples/injector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""
This example sets up a connection and sends a packet specified on command line as a hex encoded string

To run this example as a script, the following command:
```
$ python3 -m examples.injector --amid 0x76 --src 0xFAFA --dest 0xFFFF --group 0x99 sf@injector01.local DEADBEEFDEADBEEF

```
"""
from __future__ import print_function

from argparse import ArgumentParser
from functools import partial
import logging
import time
from codecs import decode

from moteconnection.connection import Connection
from moteconnection.message import MessageDispatcher, Message


def send_packet(connection_string, amid, dest, src, group, hex_data):
"""Send the packet with specified AM ID."""

connection = construct_connection(connection_string, src, group)

bin_data = decode(hex_data, "hex")
msg = Message(amid, dest, bin_data)

time.sleep(0.2)
connection.send(msg)
time.sleep(0.2)

connection.disconnect()
connection.join()


def construct_connection(connection_string, src, group):
"""
Constructs the connection object and returns it.

The connection string takes the form of protocol@location:port_or_baud
Examples: sf@localhost:9002
serial@/dev/ttyUSB0

:param str connection string: A string in the form of protocol@location:port_or_baud
:rtype: moteconnection.connection.Connection
"""
connection = Connection()
connection.connect(
connection_string,
reconnect=10,
connected=partial(print, "Connected to {}".format(connection_string)),
disconnected=partial(print, "Disconnected from {}".format(connection_string))
)

dispatcher = MessageDispatcher(src, group)
connection.register_dispatcher(dispatcher)
return connection


def get_args():
"""
Parses the arguments and returns them.

:rtype argparse.Namespace
"""
parser = ArgumentParser(description='An example moteconnection sending program.')
parser.add_argument('connection',
help="The connection string used to connect to the device. "
"Can be a serial forwarder address or a serial device address.")
parser.add_argument('--verbose', '-v',
action='store_true',
default=False,
help='Verbose mode (displays moteconnection logs).')
parser.add_argument('--amid', '-a',
default='0x76',
help='AM ID')
parser.add_argument('--dest', '-d',
default='0xFFFF',
help='Destination address')
parser.add_argument('--src', '-s',
default='0xCCC4',
help='Source address')
parser.add_argument('--group', '-g',
default='0x22',
help='Group ID')
parser.add_argument("data",
help='Binary data (hex)')
return parser.parse_args()


def main():
"""Main entrypoint to the sniffer application."""
args = get_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
send_packet(
args.connection,
int(args.amid, base=0),
int(args.dest, base=0),
int(args.src, base=0),
int(args.group, base=0),
args.data)


if __name__ == '__main__':
main()