Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve basic websocket subscription example #20

Merged
merged 1 commit into from Feb 3, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
101 changes: 74 additions & 27 deletions examples/basic_websocket_subscription.py
Expand Up @@ -10,10 +10,11 @@
"""Simple example demonstrating websocket subscription.

This example connects to katportal via a websocket. It subscribes to
a few sensor group names, and keeps printing out the published messages
the specified sensor group names, and keeps printing out the published messages
received every few seconds.
"""

import argparse
import logging
import uuid

Expand All @@ -26,17 +27,24 @@
logger.setLevel(logging.INFO)


def on_update_callback(msg):
def on_update_callback(msg_dict):
"""Handler for every JSON message published over the websocket."""
print 'GOT:', msg
print "GOT message:"
for key, value in msg_dict.items():
if key == 'msg_data':
print '\tmsg_data:'
for data_key, data_value in msg_dict['msg_data'].items():
print "\t\t{}: {}".format(data_key, data_value)
else:
print "\t{}: {}".format(key, value)


@tornado.gen.coroutine
def main(logger):
def main():
# Change URL to point to a valid portal node.
# If you are not interested in any subarray specific information
# (e.g. schedule blocks), then the number can be omitted, as below.
portal_client = KATPortalClient('http://portal.mkat/api/client',
portal_client = KATPortalClient('http://{}/api/client'.format(args.host),
on_update_callback, logger=logger)

# First connect to the websocket, before subscribing.
Expand All @@ -50,40 +58,79 @@ def main(logger):
# as this is a new namespace.
result = yield portal_client.subscribe(namespace)
print "Subscription result: {} identifier(s).".format(result)
# Example output:
# Subscription result: 1 identifier(s).

# Set the sampling strategies for the sensors of interest, on our custom
# namespace. In this example, we are interested in a number of patterns,
# e.g. any sensor with "mode" in the name. The response messages will
# be published to our namespace every 5 seconds.
result = yield portal_client.set_sampling_strategies(
namespace, ['mode', 'azim', 'elev', 'sched_observation_schedule'],
namespace, args.sensors,
'period 5.0')
print "Set sampling strategies result: {}.\n".format(result)
# Example output, printed by the callback function on_update_callback():
# GOT: []
# GOT: {u'msg_pattern': u'namespace_714a6f67-c170-4742-815a-00a7f5dcf8cd:*', u'msg_channel': u'namespace_714a6f67-c170-4742-815a-00a7f5dcf8cd:agg_m011_mode_not_error', u'msg_data': {u'status': u'nominal', u'timestamp': 1476111202.57672, u'value': True, u'name': u'agg_m011_mode_not_error', u'received_timestamp': 1476111202.669989}}
# GOT: {u'msg_pattern': u'namespace_714a6f67-c170-4742-815a-00a7f5dcf8cd:*', u'msg_channel': u'namespace_714a6f67-c170-4742-815a-00a7f5dcf8cd:agg_m022_mode_not_error', u'msg_data': {u'status': u'nominal', u'timestamp': 1476111200.082103, u'value': True, u'name': u'agg_m022_mode_not_error', u'received_timestamp': 1476111202.671959}}
# GOT: {u'msg_pattern': u'namespace_714a6f67-c170-4742-815a-00a7f5dcf8cd:*', u'msg_channel': u'namespace_714a6f67-c170-4742-815a-00a7f5dcf8cd:agg_m033_mode_not_error', u'msg_data': {u'status': u'nominal', u'timestamp': 1476111198.122401, u'value': True, u'name': u'agg_m033_mode_not_error', u'received_timestamp': 1476111202.673499}}
# GOT: {u'msg_pattern': u'namespace_714a6f67-c170-4742-815a-00a7f5dcf8cd:*', u'msg_channel': u'namespace_714a6f67-c170-4742-815a-00a7f5dcf8cd:agg_m044_mode_not_error', u'msg_data': {u'status': u'nominal', u'timestamp': 1476111194.420856, u'value': True, u'name': u'agg_m044_mode_not_error', u'received_timestamp': 1476111202.675499}}
# GOT: {u'msg_pattern': u'namespace_714a6f67-c170-4742-815a-00a7f5dcf8cd:*', u'msg_channel': u'namespace_714a6f67-c170-4742-815a-00a7f5dcf8cd:agg_m055_mode_not_error', u'msg_data': {u'status': u'nominal', u'timestamp': 1476111193.089784, u'value': True, u'name': u'agg_m055_mode_not_error', u'received_timestamp': 1476111202.677308}}
# GOT: {u'msg_pattern': u'namespace_714a6f67-c170-4742-815a-00a7f5dcf8cd:*', u'msg_channel': u'namespace_714a6f67-c170-4742-815a-00a7f5dcf8cd:kataware_alarm_m011_ap_mode_error', u'msg_data': {u'status': u'nominal', u'timestamp': 1475659566.07611, u'value': u"nominal,cleared,m011_ap_mode value = 'stop'. status = nominal.", u'name': u'kataware_alarm_m011_ap_mode_error', u'received_timestamp': 1476111202.720117}}
# GOT: {u'msg_pattern': u'namespace_714a6f67-c170-4742-815a-00a7f5dcf8cd:*', u'msg_channel': u'namespace_714a6f67-c170-4742-815a-00a7f5dcf8cd:kataware_alarm_m022_ap_mode_error', u'msg_data': {u'status': u'nominal', u'timestamp': 1475659566.11599, u'value': u"nominal,cleared,m022_ap_mode value = 'stop'. status = nominal.", u'name': u'kataware_alarm_m022_ap_mode_error', u'received_timestamp': 1476111202.721652}}
# ...
# GOT: {u'msg_pattern': u'namespace_c4a18e9c-1870-410c-aa2a-bdb570dda4f2:*', u'msg_channel': u'namespace_c4a18e9c-1870-410c-aa2a-bdb570dda4f2:sched_observation_schedule', u'msg_data': {u'status': u'nominal', u'timestamp': 1476111699.547617, u'value': u'20161010-0001', u'name': u'sched_observation_schedule', u'received_timestamp': 1476111703.4799}}
# GOT: {u'msg_pattern': u'namespace_c4a18e9c-1870-410c-aa2a-bdb570dda4f2:*', u'msg_channel': u'namespace_c4a18e9c-1870-410c-aa2a-bdb570dda4f2:sched_observation_schedule_1', u'msg_data': {u'status': u'nominal', u'timestamp': 1476111699.526987, u'value': u'20161010-0001', u'name': u'sched_observation_schedule_1', u'received_timestamp': 1476111703.481737}}
# GOT: {u'msg_pattern': u'namespace_c4a18e9c-1870-410c-aa2a-bdb570dda4f2:*', u'msg_channel': u'namespace_c4a18e9c-1870-410c-aa2a-bdb570dda4f2:sched_observation_schedule_2', u'msg_data': {u'status': u'nominal', u'timestamp': 1476111699.533452, u'value': u'', u'name': u'sched_observation_schedule_2', u'received_timestamp': 1476111703.484387}}
# GOT: {u'msg_pattern': u'namespace_c4a18e9c-1870-410c-aa2a-bdb570dda4f2:*', u'msg_channel': u'namespace_c4a18e9c-1870-410c-aa2a-bdb570dda4f2:sched_observation_schedule_3', u'msg_data': {u'status': u'nominal', u'timestamp': 1476111699.540772, u'value': u'', u'name': u'sched_observation_schedule_3', u'received_timestamp': 1476111703.486992}}
# GOT: {u'msg_pattern': u'namespace_c4a18e9c-1870-410c-aa2a-bdb570dda4f2:*', u'msg_channel': u'namespace_c4a18e9c-1870-410c-aa2a-bdb570dda4f2:sched_observation_schedule_4', u'msg_data': {u'status': u'nominal', u'timestamp': 1476111699.54758, u'value': u'', u'name': u'sched_observation_schedule_4', u'received_timestamp': 1476111703.489199}}
# ...

print "\nSet sampling strategies result: {}.\n".format(result)

# Example:
# ./basic_websocket_subscription.py --host 127.0.0.1 pos_actual_pointm anc_mean_wind
# Subscription result: 1 identifier(s).
# GOT message:
# msg_pattern: namespace_93f3e645-1818-4913-ae13-f4fbc6eacf31:*
# msg_channel: namespace_93f3e645-1818-4913-ae13-f4fbc6eacf31:m011_pos_actual_pointm_azim
# msg_data:
# status: nominal
# timestamp: 1486038182.71
# value: -185.0
# name: m011_pos_actual_pointm_azim
# received_timestamp: 1486050055.89
# GOT message:
# msg_pattern: namespace_93f3e645-1818-4913-ae13-f4fbc6eacf31:*
# msg_channel: namespace_93f3e645-1818-4913-ae13-f4fbc6eacf31:m011_pos_actual_pointm_elev
# msg_data:
# status: nominal
# timestamp: 1486038182.71
# value: 15.0
# name: m011_pos_actual_pointm_elev
# received_timestamp: 1486050055.89
# ...
# GOT message:
# msg_pattern: namespace_93f3e645-1818-4913-ae13-f4fbc6eacf31:*
# msg_channel: namespace_93f3e645-1818-4913-ae13-f4fbc6eacf31:anc_mean_wind_speed
# msg_data:
# status: nominal
# timestamp: 1486050057.07
# value: 4.9882065556
# name: anc_mean_wind_speed
# received_timestamp: 1486050057.13
# ...
#
# The IOLoop will continue to run until the program is aborted.
# Push Ctrl+C to stop.


if __name__ == '__main__':
parser = argparse.ArgumentParser(
description="Subscribe to websocket and print messages to stdout for "
"matching sensor names.")
parser.add_argument(
'--host',
default='127.0.0.1',
help="hostname or IP of the portal server (default: %(default)s).")
parser.add_argument(
'sensors',
metavar='sensor',
nargs='+',
help="list of sensor names or filter strings to request data for "
"(examples: wind_speed azim elev)")
parser.add_argument(
'-v', '--verbose',
dest='verbose', action="store_true",
default=False,
help="provide extremely verbose output.")
args = parser.parse_args()
if args.verbose:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.WARNING)

# Start up the tornado IO loop:
io_loop = tornado.ioloop.IOLoop.current()
io_loop.add_callback(main, logger)
io_loop.add_callback(main)
io_loop.start()