Skip to content

Using Raw Amplet2 Data

Brendon Jones edited this page May 7, 2021 · 5 revisions

The amplet2 data is intended to be stored using the nntsc tool and visualised using amp-web, but if you have special requirements and need to use your own software you can read the data directly from a RabbitMQ queue on the collector.

Install amplet2-server Package

The amplet2-server package will create a minimal collector to receive AMP data, and install a python library that understands the protocol buffer message format used by each of the amplet2 tests. Install the package on Debian/Ubuntu using commands like the following:

sudo apt-get install curl apt-transport-https gnupg
curl -1sLf 'https://dl.cloudsmith.io/public/wand/amp/setup.deb.sh' | sudo -E bash
sudo apt-get install amplet2-server python3-pika

Sample Program

The following python code can be used as an example on how to read data from the RabbitMQ queue on the collector. It will discard any messages that it doesn't understand, and print the result dictionary for any that it does. Instead of printing, you could insert the result into a database of your choice, or perform any other action with the data.

import pika
from ampsave.importer import import_data_functions
from google.protobuf.message import DecodeError

amp_modules = import_data_functions()

def process_data(channel, method, properties, body):
    """ Process a single message from the queue.
        Depending on the test this message may include multiple results.
    """
 
    # ignore any messages that don't have user_id set
    if not hasattr(properties, "user_id"):
        return

    # ignore any messages that don't have timestamp set
    if not hasattr(properties, "timestamp"):
        return

    # ignore any messages that don't have a test type
    if "x-amp-test-type" not in properties.headers:
        return

    # get the test type being reported
    test = properties.headers["x-amp-test-type"]

    # ignore any messages for tests we don't have a module for
    if test not in amp_modules:
        print("unknown test: '%s'" % properties.headers["x-amp-test-type"])
        channel.basic_ack(method.delivery_tag)
        return

    try:
        # Extract the test specific results from the message. This may include
        # multiple results if the test has multiple targets.
        data = amp_modules[test].get_data(body)
    except DecodeError as e:
        # Protocol buffer decoding failed for some reason
        print("Failed to decode result from %s for %s test: %s" % (
                    properties.user_id, test, e))
        data = None

    # TODO do something useful with the data
    if data:
        print(properties.timestamp, properties.user_id, test)
        print(data)

    # acknowledge message
    channel.basic_ack(method.delivery_tag)

# As a simple example, establish an insecure connection to a rabbitmq broker
# that is running on localhost with the data in a queue called "ampqueue"
amqp_credentials = pika.PlainCredentials("guest", "guest")
amqp_connection = pika.BlockingConnection(pika.ConnectionParameters(
            host="localhost", credentials=amqp_credentials))
amqp_channel = amqp_connection.channel()
amqp_channel.basic_consume(process_data, queue="ampqueue")
amqp_channel.start_consuming()
Clone this wiki locally