-
Notifications
You must be signed in to change notification settings - Fork 19
JavaInteractingWithTheTransport
Java Guide Series
Architecture | Knowledge Base | Networking | Containers | Threads
The main entry point into the MADARA transport layer is the QoSTransportSettings class. The QoSTransportSettings class contains dozens of network-specific settings that govern basic configuration, quality-of-service, and filtering.
- MADARA Transport Layer
- Table of Contents
- Understanding the QoS Transport Settings class
- Understanding When Knowledge is Sent Over the Network
- More Information
Creating a networked transport in MADARA generally requires editing QoS_Transport_Settings.type
(which specifies the network transport type) and a list of hosts (for UDP, Multicast, Broadcast) or domains (map to topics in DDS transports).
UDP requires a hosts vector with the ip:port of the sender first (where we want to bind and how we want to be known to others in the originator field of messages) and all peers in the rest of the vector. Multicast requires a valid multicast ip and port in the first entry of the hosts vector. Similarly, broadcast requires a valid broadcast ip (for the subnet of your machine) and port in the first entry of the hosts vector.
import ai.madara.knowledge.KnowledgeBase;
import ai.madara.transport.QoSTransportSettings;
import ai.madara.transport.TransportType;
//Create transport settings for a multicast transport
QoSTransportSettings settings = new QoSTransportSettings();
settings.setHosts(new String[]{"localhost:40000", "localhost:40001});
settings.setType(TransportType.UDP_TRANSPORT);
//create a knowledge base with the multicast transport settings
KnowledgeBase knowledge = new KnowledgeBase("", settings);
//send all agents listening on this multicast address the knowledge
//that all agents are currently ready
knowledge.evaluate("all_agents_ready = 1");
UDP Registry is a special UDP transport that dynamically recognizes new and leaving hosts after deployment. To use the UDP Registry transport, you need to have a madara_registry running (located at $MADARA_ROOT/bin/madara_registry). This UDP registry should work over 3G, 4G, and from behind NATs as long as the NAT maintains the UDP host:port for the madara_registry binding.
If the madara_registry goes offline, the agents that have talked with the madara_registry before will continue to use the last hosts they knew of. Whenever a madara_registry comes back online (e.g., when you restart it), everything should work without any issues.
import ai.madara.knowledge.KnowledgeBase;
import ai.madara.transport.QoSTransportSettings;
import ai.madara.transport.TransportType;
import ai.madara.filters.EndpointClear;
//create transport settings for a multicast transport
QoSTransportSettings settings = new QoSTransportSettings();
//assume a registry server on the local host at port 40000
settings.setHosts(new String[]{"localhost:40000"});
settings.setType(TransportType.REGISTRY_CLIENT);
//create endpoint clear filter, which will tidy up registry updates
EndpointClear filter = new EndpointClear();
filter.addReceiveFilterTo(settings);
//create a knowledge base with the multicast transport settings
KnowledgeBase knowledge = new KnowledgeBase("", settings);
//send all agents listening on this multicast address the knowledge
//that all agents are currently ready
knowledge.evaluate("all_agents_ready = 1");
Multicast is a one-to-many or many-to-many transport protocol that allows hosts to subscribe to ips as topics. The multicast standard guarantees copies of all messages are delivered to each subscriber, if the connection/subscription is available. This allows for it be used in intrahost and interhost situations.
You can find the acceptable multicast IP range that you can use here:
import ai.madara.knowledge.KnowledgeBase;
import ai.madara.transport.QoSTransportSettings;
import ai.madara.transport.TransportType;
//Create transport settings for a multicast transport
QoSTransportSettings settings = new QoSTransportSettings ();
settings.setHosts(new String[]{"239.255.0.1:4150"});
settings.setType(TransportType.MULTICAST_TRANSPORT);
//create a knowledge base with the multicast transport settings
KnowledgeBase knowledge = new KnowledgeBase("agent1", settings);
//send all agents listening on this multicast address the knowledge
//that all agents are currently ready
knowledge.evaluate("all_agents_ready = 1");
Broadcast is a one-to-many or many-to-many transport protocol that allows hosts to signal all hosts on an IP subnet of UDP datagrams. UDP broadcast does not guarantee that all agents on a host get a copy of UDP datagrams sent to the broadcast IP. Instead, the first subscriber takes the message from the OS, and the message is deleted. This means broadcast is not ideal for intrahost communication amongst co-located processes. It is still very useful for interhost communication between two hosts/systems and is widely supported on most routers.
import ai.madara.knowledge.KnowledgeBase;
import ai.madara.transport.QoSTransportSettings;
import ai.madara.transport.TransportType;
//Create transport settings for a multicast transport
QoSTransportSettings settings = new QoSTransportSettings ();
settings.setHosts(new String[]{"192.168.1.255:15000"});
settings.setType(TransportType.BROADCAST_TRANSPORT);
//create a knowledge base with the multicast transport settings
KnowledgeBase knowledge = new KnowledgeBase("", settings);
//send all agents listening on this multicast address the knowledge
//that all agents are currently ready
knowledge.evaluate("all_agents_ready = 1");
}
ZeroMQ is a transport maintained by the open source community for high-performance, low-latency, highly-connected applications. It is especially high performance on intrahost communication, and tends to be at least as fast as IPC mechanisms for communication between colocated processes on the same machine. ZeroMQ is not really meant for outdoor, edge environments and was really created for applications like stock exchanges where communication is highly available.
import ai.madara.knowledge.KnowledgeBase;
import ai.madara.transport.QoSTransportSettings;
import ai.madara.transport.TransportType;
//Create transport settings for a multicast transport
QoSTransportSettings settings = new QoSTransportSettings ();
settings.setHosts(new String[]{"tcp://127.0.0.1:30001", "tcp://127.0.0.1:30000"});
settings.setType(TransportType.ZMQ_TRANSPORT);
//create a knowledge base with the multicast transport settings
KnowledgeBase knowledge = new KnowledgeBase("", settings);
//send all agents listening on this multicast address the knowledge
//that all agents are currently ready
knowledge.evaluate("all_agents_ready = 1");
ZeroMQ also includes an interprocess communication implementation for shared memory within a host. This can be used by developers when a transport should not be exposed to external hosts and is only expected to be used between processes on the same machine.
import ai.madara.knowledge.KnowledgeBase;
import ai.madara.transport.QoSTransportSettings;
import ai.madara.transport.TransportType;
//Create transport settings for a multicast transport
QoSTransportSettings settings = new QoSTransportSettings ();
settings.setHosts(new String[]{"ipc:///tmp/my_file_0", "ipc:///tmp/my_file_1"});
settings.setType(TransportType.ZMQ_TRANSPORT);
//create a knowledge base with the multicast transport settings
KnowledgeBase knowledge = new KnowledgeBase("", settings);
//send all agents listening on this multicast address the knowledge
//that all agents are currently ready
knowledge.evaluate("all_agents_ready = 1");
When using real robots in a wide area, it may not be possible to reach all neighbors in a single message send. To aid developers trying to solve problems where multi-hops are required for message routing, MADARA supports rebroadcast TTLs set on the sender side. MADARA also supports opting out of rebroadcasts. In fact, all transports opt out by default.
Example routing between agent1 and agent3 that is not possible without rebroadcasts
Example of sending a packet with a rebroadcast ttl of 3
import ai.madara.knowledge.KnowledgeBase;
import ai.madara.transport.QoSTransportSettings;
import ai.madara.transport.TransportType;
//Create transport settings for a multicast transport
QoSTransportSettings settings = new QoSTransportSettings ();
settings.setHosts(new String[]{"239.255.0.1:4150"});
settings.setType(TransportType.MULTICAST_TRANSPORT);
//create a knowledge base with the multicast transport settings
KnowledgeBase knowledge = new KnowledgeBase("", settings);
// set rebroadcast ttl to 3.
Madara::Transport::QoS_Transport_Settings settings;
settings.setRebroadcastTtl(3);
KnowledgeBase knowledge = new KnowledgeBase("", settings);
Example of enabling participation in rebroadcasts
import ai.madara.knowledge.KnowledgeBase;
import ai.madara.transport.QoSTransportSettings;
import ai.madara.transport.TransportType;
//Create transport settings for a multicast transport
QoSTransportSettings settings = new QoSTransportSettings ();
settings.setHosts(new String[]{"239.255.0.1:4150"});
settings.setType(TransportType.MULTICAST_TRANSPORT);
settings.setRebroadcastTtl(2);
settings.enableParticipantTtl(1);
KnowledgeBase knowledge = new KnowledgeBase("", settings);
The MADARA architecture monitors two different types of bandwidth usage: sending and receiving bandwidth. Setting a limit for either of these will serve as a hard limit that does not differentiate between priorities of information. Once a packet is sent, the bandwidth counters are updated, and if the bytes per second rate is higher than the limit you set, all packets will be dropped until the rate dips below the limit. These limiters do not currently look at the size of the packet that is being sent out, so if you are at 99,500 B/s and your limit is 100,000 B/s, it will try to send any next packet (even if it 1MB), update the bandwidth counter, and then not send another packet until you reach 99,500 B/s sent over the past 10s.
For a more flexible bandwidth option that you can configure, see Bandwidth Filters.
Deadline enforcement is concerned with enforcing latency deadlines between reasoning entitites on the network. Deadline enforcement requires some type of time synchronization protocol between agents in the network, preferably accurate to within a second, for it to be useful to the MADARA entities.
Enforcing send bandwidth limits
This type of enforcement is useful if you want to make sure no agent is using more than a certain limit (e.g., 100KB/s) individually.
Example of enforcing a send bandwidth limit
import ai.madara.knowledge.KnowledgeBase;
import ai.madara.transport.QoSTransportSettings;
import ai.madara.transport.TransportType;
//Create transport settings for a multicast transport
QoSTransportSettings settings = new QoSTransportSettings ();
settings.setHosts(new String[]{"239.255.0.1:4150"});
settings.setType(TransportType.MULTICAST_TRANSPORT);
settings.setSendBandwidthLimit(100000);
KnowledgeBase knowledge = new KnowledgeBase("", settings);
Enforcing send bandwidth limits based on received bandwidth
This type of enforcement is useful if you want to make sure the agent is not violating a collective bandwidth limit (e.g., 2MB/s) and is based on the amount of data received per second over the past 10s.
Example of enforcing a total bandwidth limit
import ai.madara.knowledge.KnowledgeBase;
import ai.madara.transport.QoSTransportSettings;
import ai.madara.transport.TransportType;
//Create transport settings for a multicast transport
QoSTransportSettings settings = new QoSTransportSettings ();
settings.setHosts(new String[]{"239.255.0.1:4150"});
settings.setType(TransportType.MULTICAST_TRANSPORT);
settings.setTotalBandwidthLimit(100000);
KnowledgeBase knowledge = new KnowledgeBase("", settings);
Note that even though this is keying off received bandwidth, it affects messages being sent (i.e., enforcement is applied when the agent attempts to send or rebroadcast knowledge to the network).
Deadline enforcement aims to discard received packets that are too old to be useful to agent state reasoning. Handling old packets can significantly impede performance in an agent-saturated network, so clearing your queues quickly can aid the agent network.
Example of enforcing a transport latency deadline
import ai.madara.knowledge.KnowledgeBase;
import ai.madara.transport.QoSTransportSettings;
import ai.madara.transport.TransportType;
//Create transport settings for a multicast transport
QoSTransportSettings settings = new QoSTransportSettings ();
settings.setHosts(new String[]{"239.255.0.1:4150"});
settings.setType(TransportType.MULTICAST_TRANSPORT);
settings.setDeadline(10);
KnowledgeBase knowledge = new KnowledgeBase("", settings);
User callbacks can also be inserted into the transport layer to modify payloads. The MADARA transport system allows users to insert callbacks into three key operations: receive, send, and rebroadcast.
Filter callbacks are given a number of arguments that are relevant to the filtering operation. As of version 1.1.9, these arguments include the following:
- args
[0]
: The knowledge record that the filter is acting upon - args
[1]
: The name of the knowledge record, if applicable ("" if unnamed, but this should never happen) - args
[2]
: The type of operation calling the filter (integer valued). Valid types are:IDLE_OPERATION
(should never see),SENDING_OPERATION
(transport is trying to send the record),RECEIVING_OPERATION
(transport has received the record and is ready to apply the update),REBROADCASTING_OPERATION
(transport is trying to rebroadcast the record -- only happens if rebroadcast is enabled in Transport Settings) - args
[3]
: Bandwidth used while sending through this transport, measured in bytes per second. - args
[4]
: Bandwidth used while receiving from this transport, measured in bytes per second. - args
[5]
: Message timestamp (when the message was originally sent, in seconds) - args
[6]
: Current timestamp (the result of time (NULL)) - args
[7]
: Knowledge Domain (partition of the knowledge updates) - args
[8]
: Originator (identifier of sender, usually host:port)
The filter can add data to the payload by pushing a variable name (string) followed by a value, which can be a double, string, integer, byte array, or array of integers or doubles, just as you would do with a set operation. This can be useful if other reasoners in the network are expecting additional meta data for the update (which they are free to strip out or ignore in a receive filter, if they don't need the information).
The filter can also access any variable in the Knowledge_Base through the Variables facade. With the arguments and variables interfaces, developers can respond to transport events in highly dynamic and extensible ways.
To create a bandwidth filter, we simply need to create a filter that looks at argument index 3 (how much bandwidth we have used sending) or argument index 4 (how much bandwidth is being used--i.e., what we have received in bytes per second), depending on which type of bandwidth enforcement we want to do.
In the following example, we drop any packet members if we are over 100,000 bytes per second sending within the past 10s.
Example of enforcing a send bandwidth limit via filtering
import ai.madara.knowledge.KnowledgeRecord;
import ai.madara.knowledge.Variables;
import ai.madara.knowledge.KnowledgeList;
import ai.madara.transport.filters.RecordFilter;
import ai.madara.knowledge.KnowledgeType;
public class SendBandwidthLimit implements RecordFilter
{
// Do not allow more than 100k bytes per second
public KnowledgeRecord filter(KnowledgeList args, Variables variables)
{
KnowledgeRecord result = new KnowledgeRecord();
// only modify the return value with arg[0] if we are under 100KB/s
if (args.get(3).toIntegerValue() < 100000)
{
result = args[0];
}
return result;
}
public static void main (String...args) throws InterruptedException, Exception
{
QoSTransportSettings settings = new QoSTransportSettings();
settings.setHosts(new String[]{"239.255.0.1:4150"});
settings.setType(TransportType.MULTICAST_TRANSPORT);
// add the above filter for all file types, applied before sending
settings.add_send_filter (KnowledgeType.ALL_TYPES,
new SendBandwidthLimit());
settings.add_rebroadcast_filter (KnowledgeType.ALL_TYPES,
new SendBandwidthLimit());
// create a knowledge base with the multicast transport settings
KnowledgeBase knowledge = new KnowledgeBase("", settings);
// do normal reasoning, read files, or whatever other logic is needed
...
}
}
Filters can inspect argument indices 5 and 6 for information on the sent and received time for each packet. The difference between these two arguments is called the packet latency, and this latency value can inform the filter of deadline violations.
Example of enforcing a network latency deadline via filtering
import ai.madara.knowledge.KnowledgeRecord;
import ai.madara.knowledge.Variables;
import ai.madara.knowledge.KnowledgeList;
import ai.madara.transport.filters.RecordFilter;
public class DeadlineEnforcement implements RecordFilter
{
// Check the deadline
public KnowledgeRecord filter(KnowledgeList args, Variables variables)
{
KnowledgeRecord result = new KnowledgeRecord();
// args[5] is sent time, args[6] is current time
// keep any packet with a latency of less than 5 seconds
if (args[6].toIntegerValue() - args[5].toIntegerValue() < 5)
{
result = args[0];
}
return result;
}
public static void main (String...args) throws InterruptedException, Exception
{
QoSTransportSettings settings = new QoSTransportSettings();
settings.setHosts(new String[]{"239.255.0.1:4150"});
settings.setType(TransportType.MULTICAST_TRANSPORT);
// add the above filter for all file types, applied before sending
settings.add_send_filter (KnowledgeType.ALL_TYPES, new DeadlineEnforcement());
settings.add_rebroadcast_filter (KnowledgeType.ALL_TYPES, new DeadlineEnforcement());
// create a knowledge base with the multicast transport settings
KnowledgeBase knowledge = new KnowledgeBase("", settings);
// do normal reasoning, read files, or whatever other logic is needed
...
}
}
Filters that drop packets can be very useful, but developers also have unlimited options for inflating or reducing packets and updates within the packets. Packet shaping is any operation that mutates a packet element into a different form. This can be operations like converting packets into XML, resizing an image payload, or even encrypting part of a packet with a private key.
In the following example, we simply encapsulate any string payload with the xml elements <item>
and </item>
.
Example of shaping a payload before it gets sent out
import ai.madara.knowledge.KnowledgeRecord;
import ai.madara.knowledge.Variables;
import ai.madara.knowledge.KnowledgeList;
import ai.madara.transport.filters.RecordFilter;
import java.util.StringBuffer;
public class AddItemTag implements RecordFilter
{
// Add an item tag around the record
public KnowledgeRecord filter(KnowledgeList args, Variables variables)
{
KnowledgeRecord result = new KnowledgeRecord();
// encase it in an <item> tag
StringBuffer buffer = new StringBuffer();
buffer.add("<item>");
buffer.add(args[0].toString();
buffer.add("</item>");
result.set(buffer.toString());
return result;
}
public static void main (String...args) throws InterruptedException, Exception
{
QoSTransportSettings settings = new QoSTransportSettings();
settings.setHosts(new String[]{"239.255.0.1:4150"});
settings.setType(TransportType.MULTICAST_TRANSPORT);
// add the above filter for all file types, applied before sending
settings.add_send_filter (KnowledgeType.ALL_TYPES,
new AddItemTag());
settings.add_rebroadcast_filter (KnowledgeType.ALL_TYPES,
new AddItemTag());
// create a knowledge base with the multicast transport settings
KnowledgeBase knowledge = new KnowledgeBase("", settings);
// do normal reasoning, read files, or whatever other logic is needed
...
}
}
Aggregate filters differ from individual filters in that aggregate filters take the entire map of records, are generally more efficient, and are more flexible.
We will highlight the utility and power of aggregate filters with a simple example which adds the id of the current process as metadata to every message sent. It then strips the id from the aggregate before applying the information to the knowledge base.
import ai.madara.knowledge.KnowledgeRecord;
import ai.madara.knowledge.Variables;
import ai.madara.knowledge.KnowledgeList;
import ai.madara.knowledge.EvalSettings;
import ai.madara.transport.filters.AggregateFilter;
import java.util.StringBuffer;
public class AddOrEraseId implements AggregateFilter
{
// Add an id if it doesn't exist. Strip id if it does.
public KnowledgeRecord filter(Packet packet, TransportContext context, Variables variables)
{
KnowledgeRecord result = new KnowledgeRecord();
if(!packet.exists("id"))
{
packet.add("id",variables.get(".id"));
}
else
{
packet.erase("id");
}
return result;
}
public static void main (String...args) throws InterruptedException, Exception
{
QoSTransportSettings settings = new QoSTransportSettings();
settings.setHosts(new String[]{"239.255.0.1:4150"});
settings.setType(TransportType.MULTICAST_TRANSPORT);
// add the above filter for all file types, applied before sending
settings.addSendFilter (new AddOrEraseId());
settings.addReceiveFilter (new AddOrEraseId());
// create a knowledge base with the multicast transport settings
KnowledgeBase knowledge = new KnowledgeBase("", settings);
EvalSettings queueUntilLater = new EvalSettings();
queueUntilLater.setDelaySendingModifieds(true);
// set id so we have access to it in the aggregate outgoing filter
knowledge.set (".id", 1);
// build a packet from this id with some information
knowledge.set ("occupation", "Banker", queueUntilLater);
knowledge.set ("age", 43, queueUntilLater);
// a final piece of data with default settings will activate the
// aggregate filter
knowledge.set ("money", 553200.50);
}
}
MADARA transports can be configured to trust or ban lists of peers. The banned list can be useful for mitigating known faulty sensors that are flooding the network as well as enforcing basic security. The accept list is more useful for security policies and is significantly more stringent. Once a node is added to the trusted list, anything not on the trusted list will be automatically denied any update abilities to the local context.
Example of adding peers to trusted list
import ai.madara.transport.QoSTransportSettings;
// only trust the agents "agent2" and "agent3"
QoSTransportSettings settings = new QoSTransportSettings();
settings.addTrustedPeer("agent2");
settings.addTrustedPeer("agent3");
// others could add "agent1" to their trusted list
KnowledgeBase knowledge = new KnowledgeBase("", settings);
The above code has agent1 set his unique identifier to "agent1". Let's show how we could make a peer that doesn't trust knowledge from the above entity.
Example of adding peers to banned list
import ai.madara.transport.QoSTransportSettings;
// Do not trust "agent1"
QoSTransportSettings settings = new QoSTransportSettings();
settings.addBannedPeer("agent1");
// others could add "agent1" to their trusted list
KnowledgeBase knowledge = new KnowledgeBase("", settings);
Before deploying MADARA applications into real-world, real-time situations (especially wireless situations), developers should probably test that their applications work despite packet loss. MADARA provides first class support for dropping packets at deterministic and random intervals.
The deterministic policy is enforced via a stride scheduler within the Packet Scheduler.
Example of setting a deterministic non-bursty packet drop policy
import ai.madara.transport.QoSTransportSettings;
import ai.madara.transport.DropType;
QoSTransportSettings settings = new QoSTransportSettings();
/**
* Set a drop rate of 20% in a deterministic manner (1 drop, then 4 successful sends)
**/
settings.updateDropRate(.2, DropType.PACKET_DROP_DETERMINISTIC, 1);
KnowledgeBase knowledge = new KnowledgeBase("", settings);
Burst usage
When updating a drop rate, you can specify a drop burst type. Bursts are sequences of drops in a row that may prove useful when trying to mimic real world drop patterns. Burst rates can cause unusual patterns within the deterministic policy settings that may cause a higher rate than intended.
Example of setting a deterministic bursty packet drop policy
import ai.madara.transport.QoSTransportSettings;
import ai.madara.transport.DropType;
QoSTransportSettings settings = new QoSTransportSettings();
/**
* Set a drop rate of 20% in a deterministic manner with 2 successive drops (burst rate = 2)
**/
settings.updateDropRate(.2, DropType.PACKET_DROP_DETERMINISTIC, 2);
KnowledgeBase knowledge = new KnowledgeBase("", settings);
The probablistic policy enforces a uniform distribution of drops at a target rate.
Normal usage
The following example shows how to set a 20% drop rate in a probablistic manner without scheduled bursts.
Example of setting a deterministic non-bursty packet drop policy
import ai.madara.transport.QoSTransportSettings;
import ai.madara.transport.DropType;
QoSTransportSettings settings = new QoSTransportSettings();
// Set a drop rate of 20% in a probablistic manner
settings.updateDropRate(.2, DropType.PACKET_DROP_PROBABLISTIC, 1);
KnowledgeBase knowledge = new KnowledgeBase("", settings);
Burst usage
When updating a drop rate, you can specify a drop burst type. Bursts are sequences of drops in a row that may prove useful when trying to mimic real world drop patterns.
Example of setting a deterministic bursty packet drop policy
import ai.madara.transport.QoSTransportSettings;
import ai.madara.transport.DropType;
QoSTransportSettings settings = new QoSTransportSettings();
/**
* Set a drop rate of 20% in a probablistic manner with 2 successive drops (burst rate = 2)
**/
settings.updateDropRate(.2, DropType.PACKET_DROP_PROBABLISTIC, 2);
KnowledgeBase knowledge = new KnowledgeBase("", settings);
Mutated knowledge is aggregated and sent over the network transports whenever you call KnowledgeBase.set
, KnowledgeBase.evaluate
, KnowledgeBase.wait
, or KnowledgeBase.sendModifieds
. This can be a bit non-intuitive for large application developers who just want many variables and then send everything. You can delay sending modifieds by setting the delaySendingModifieds
member to true
when providing the madara.knowledge.EvalSettings or madara.knowledge.WaitSettings classes to the set
, evaluate
, and wait
functions on the madara.knowledge.KnowledgeBase.
The preferred way to aggregate knowledge in a larger application, especially, is to use Knowledge Containers. Containers are object-oriented abstractions that point to a specific variable inside of the KnowledgeBase. Most of these Containers are extremely fast, O(1) lookups and they also suppress sends over the network during set
-like operations. Basically, using Containers, you can quickly set/get values and build knowledge aggregations without sending over the network. When you're ready to send, you call KnowledgeBase.sendModifieds
. See the Containers wiki for more information and examples.
The Java module is fully documented with standard java documentation. Please use the API at JavaDocs or tell your editor of choice to load source files from the Git repository.
Java Guide Series
Architecture | Knowledge Base | Networking | Containers | Threads