Skip to content

Channel Configuration

Dmytro Vyazelenko edited this page May 22, 2024 · 89 revisions

Channels are specified via an Aeron URI provided when adding a publication or subscription. The Aeron URI has the following format:

aeron-uri = ["aeron-spy:"] "aeron:" media [ "?" param *( "|" param ) ]
media     = *( "[^?:]" )
param     = key "=" value
key       = *( "[^=]" )
value     = *( "[^|]" )

To help with building a channel URI string then use ChannelUriStringBuilder for Java or ChannelUriStringBuilder for C++, URIs can also be parsed with ChannelUri for Java and ChannelUri for C++.

All Media Transports

The following parameters are available on all media transports when applied to a Publication:

  • alias - (optional) Reserved param which can contain a user provided string to identify a channel to for an application.

  • term-length - (optional) The term length to be applied to the log when adding a Publication. This parameter overrides the aeron.term.buffer.length or aeron.ipc.term.buffer.length system properties.

  • sparse - (optional) Boolean value to indicate if a sparse file should be used to back the log buffers. This parameter overrides the aeron.term.buffer.sparse.file system property.

  • session-id - (optional) 32-bit signed integer value that is assigned for the session id to override the default generation for publications, or to restrict the scope of a subscription to a single session. Session ids additionally support tags to select a configuration from another entity using the session-id=value:<tag> syntax. For example, a session specific subscription could tag reference a specific tagged publication.

The following 3 parameters can be applied as a group to set an initial position on an Exclusive Publication for replay purposes:

  • init-term-id - (optional) The initial term id which can be applied to an Exclusive Publication.
  • term-id - (optional) The current term id which can be applied to an Exclusive Publication.
  • term-offset - (optional) The current term offset from the base of the term buffer which can be applied to an Exclusive Publication.

Subscriptions can be tethered for how they participate in local flow control.

  • tether - (optional) Boolean value to indicate if a subscription should be a permanent tether for local flow control.

  • untethered-window-limit-timeout - (optional, since 1.45.0) a duration in nanoseconds that sets the timeout for how long an untethered subscription that is outside the window will participate in local flow control. This overrides the aeron.untethered.window.limit.timeout media driver property.

  • untethered-resting-timeout - (optional, since 1.45.0) a duration in nanoseconds that sets the timeout for how long an untethered subscription is resting after not being able to keep up before it is allowed to rejoin a stream. This overrides the aeron.untethered.resting.timeout media driver property.

IPC Media - Inter Process Communication

If shared memory is to be used for communications on the same machine between threads or processes then the IPC media value can be applied. This is the highest throughput and lowest latency means of communicating between two process or threads on the same machine.

aeron:ipc

UDP Media

The Aeron URI uses a media value of udp to determine that messages will be sent via the UDP protocol. To distinguish between Unicast and Multicast, the UdpChannel will look at the specific parameters supplied and infer the appropriate choice.

Many of the values options are network addresses, in this case they can take for the form of a host name, IPv4, or IPv6 address enclosed in square brackets.

Configuration Properties

Both Unicast and Multicast channel configuration uses the same set of properties. Distinguishing between unicast and multicast is done by examining the endpoint address of the channel. The important configuration options are:

  • endpoint - (required unless MDC or MDS) The socket address to which publications will send messages and from which subscriptions will receive messages. This takes the <host>:<port> format, where both the host and port are required. The port value can be a wildcard (0), see section on wildcard ports below. When the endpoint is multicast it is important to also specify the interface. In the case of unicast the address will be the host on which the subscription is located.

  • interface - (optional) The socket address that identifies that local interface that will be used for receiving messages (and sending messages in the multicast case). The value can take the form <host>[:<port>][/<subnet mask>]. When specified the system will scan the local interfaces for one that matches the specified pattern. The default value for this field is 0.0.0.0:0/0 (or [::]:0/0 for IPv6). With a unicast configuration this will simply map to any local address and will listen for messages from all local network interfaces. This setting will only impact the behaviour of subscriptions for multicast, unicast publications will effectively ignore this value. With a multicast configuration this will scan all of the local network interfaces and will use the first non-loopback network interface that supports multicast, falling back to the local loopback if none are found.

  • control - (optional) Multi-Destination-Cast (MDC) control address to be used for dynamically allocating new destination streams. The value will be an IP address and port, e.g. 192.168.0.1:40456. When set for the publication with MDC then control-mode should also be set.

  • control-mode - (optional) Valid value is manual if using the Publication.addDestination(String) / Publication.removeDestination(String) API, or dynamic, if initiated by connections to the control address.

  • ttl - (optional) The TTL used for multicast traffic. Only used when the endpoint specified is a multicast address. This value is passed directly as the IP_MULTICAST_TTL and should be within the range 0 - 255. A value outside this range will generate an exception set as the socket option.

  • mtu - (optional) The MTU is the Maximum Transmission Unit which sets the length limit for a UDP datagram payload on this stream. This overrides the aeron.mtu.length property for the driver when adding a new Publication.

  • reliable - (optional) The reliable property when applied to a Subscription disables the NAK'ing of lost packets on the stream. The lost packets are gap filled so the stream can progress. Valid values are true and false with the default being true.

  • linger - (optional) Timeout in nanoseconds a network publication should linger around to service NAKs. This parameter overrides the aeron.publication.linger.timeout system property.

  • tags - (optional) Identity and references for associating channels in simplifying configuration. See tags for more details.

  • eos - (optional) Indicates if a publication should send an End Of Stream (EOS) flag in its heartbeat messages when closed. Valid values are true and false with the default being true.

  • group - (optional) Indicates if a subscription should behave as part of a receiver group such as with multicast. This will impact NAK behaviour and related semantics. It will default to true for multicast and false for unicast. For a subscriber to MDC unicast stream it will default to false but should be changed to true if the group is larger than 2.

  • rejoin - (optional) Indicates if an image should rejoin a stream if it gets disconnected due to timeout or falling behind. Defaults to true.

  • cc - (optional) Indicates what congestion control algorithm should be applied. Default to static which is static window for just flow control. Can be set to cubic for the Cubic congestion control algorithm. Cubic is recommend over a WAN or congested networks.

  • fc - (optional) Indicates what flow control algorithm should be used on the Publication side and the configuration for it. See Flow Control Configuration

  • gtag - (optional) Used to include a receiver in a specified group for tagged flow control. See Flow Control Configuration

  • ssc - (optional) Used on a publication to indicate is a spy subscription being present simulates a connection even if there are no receivers. Note that this can override a min groups size with min or tagged flow control is configured.

  • media-rcv-ts-offset - (optional) An offset within a incoming message to store a timestamp captured at the closest point to the networking hardware. The special value of "reserved" can be used to store the value in the message's reserved field. Only supported on the C media driver, will error with the Java Driver. See Timestamps.

  • channel-rcv-ts-offset - (optional) An offset within a incoming message to store a timestamp when received by receive channel endpoint within Aeron. The special value of "reserved" can be used to store the value in the message's reserved field. See Timestamps.

  • channel-snd-ts-offset - (optional) An offset within a outgoing message to store a timestamp captured by the send channel endpoint. The special value of "reserved" can be used to store the value in the message's reserved field. See Timestamps.

  • nak-delay - (optional, since 1.44.0) a duration in nanoseconds that sets the amount of time between the initial detection of a missing message and the sending of the nak for that range of data. This value overrides aeron.nak.unicast.delay for unicast streams.

Examples

Simple Unicast, with any local address used for binding subscriptions:

aeron:udp?endpoint=192.168.0.1:40456

Unicast with a specific address to receive messages:

aeron:udp?endpoint=192.168.0.1:40456|interface=192.168.0.3

Unicast with a search pattern to identify an address to receive messages. This is especially useful if you have a large number of machines on the same network and want to reuse the same configuration string across all of them.

aeron:udp?endpoint=192.168.0.1:40456|interface=192.168.0.0/24

Use hostnames instead of addresses:

aeron:udp?endpoint=localhost:40456

Or use IPv6:

aeron:udp?endpoint=[::1]:1234|interface=[::1]

Simple multicast:

aeron:udp?endpoint=224.0.1.1:40456|interface=192.168.0.3

It is also possible to specify an IPv6 address, but the IPv6 address portion needs to placed inside square brackets []:

aeron:udp?endpoint=[ff02::1]:40456

To specify which interface to use, add the interface parameter. It will find the interface with that matches whose bound address matches the one specified. For an interface config, such as the following:

en0: flags=8863<UP,BROADCAST,SMART,RUNNING,SIMPLEX,MULTICAST> mtu 1500
    ether 04:0c:ce:e3:c8:c0 
    inet6 fe80::60c:ceff:fee3:c8c0%en0 prefixlen 64 scopeid 0x4 
    inet 192.168.1.4 netmask 0xffffff00 broadcast 192.168.1.255
    nd6 options=1<PERFORMNUD>
    media: autoselect
    status: active

Then the following configuration will ensure that the 'en0' interface is used.

aeron:udp?endpoint=224.0.1.1:40456|interface=192.168.1.4

Or similarly for IPv6.

aeron:udp?endpoint=[ff02::1]:40456|interface=[fe80::60c:ceff:fee3:c8c0]

However, having to specify a full IP for the interface can be difficult. Especially if you want to share the configuration throughout a number of machines in the same network. Therefore with the interface specification you specify a subnet mask to allow Aeron to search for an appropriate interface. The following will find an interface on the 192.168.1.x network. If there are multiple interfaces that match the criteria, the result will be undefined. The first interface returned from NetworkInterface.getNetworkInterfaces() that matches the IP address pattern and supports multicast will be used.

aeron:udp?endpoint=224.0.1.1:40456|interface=192.168.1.0/24

Specifically the /24 means that the first 24 bits of the IP address will be used when comparing against the IP addresses on the interface. Similarly for IPv6.

aeron:udp?endpoint=[ff02::1]:40456|interface=[fe80::60c:ceff:fee3]/88

Hostnames can be used instead of IP addresses for both the group and interface parameters.

aeron:udp?endpoint=all-systems.mcast.net:40456|interface=localhost

Simple multicast with specified TTL of 16.

aeron:udp?endpoint=224.0.1.1:40456|interface=192.168.0.3|ttl=16

Multi-Destination-Cast dynamic publication using control port of 40456.

aeron:udp?control=localhost:40456

Multi-Destination-Cast dynamic subscription using local endpoint port of 40457 and control port at 192.168.0.1 and port 40456.

aeron:udp?endpoint=localhost:40457|control=192.168.0.1:40456|control-mode=dynamic

Multi-Destination-Cast manual publication using control port of 40456.

aeron:udp?control=localhost:40456|control-mode=manual

Outgoing Channel Spy

If you wish to subscribe to a local Publication without receiving the packets back in the network interface, for which they are being sent out, then this is possible by spying on a network publication as an optimisation. A spy Subscription will receive the messages over IPC without a copy.

aeron-spy:aeron:udp?endpoint=224.20.30.39:54326

aeron-spy:aeron:udp?endpoint=224.0.1.1:40456|interface=192.168.1.0/24|session-id=tag:7

Note:

  • Spy subscriptions are not applicable to IPC media.
  • Spy subscriptions do not affect the Publication connected status by default. They can simulate a connected publication if the aeron.spies.simulate.connection property is set, in which case the publication can progress if it only has spies.
  • Spy subscriptions apply back-pressure like normal Subscriptions.
  • Spy subscriptions override the min group size for min and tagged flow control strategies.

Flow Control Configuration

Flow control parameters take the following form:

fc             = strategy ?( group ) ?( timeout )
strategy       = "max" | "min" | "tagged"
group          = ",g:" ( group_tag | ( ?( group_tag ) "/" ( group_min_size ) )
group_tag      = ?( "-" ) [0-9]+
group_min_size = [0-9]+
timeout        = ",t:" ( [0-9]+ ) ?( "s" | "ns" | "us" | "ms" )

The first part of the parameter specifies the flow control strategy to use and is limited to the built in strategies. If you want the URI to configure a custom strategy, you will need to handle that in your own flow control supplier.

The group can include a group_tag (signed 64 bit integer) and group_min_size (unsigned 31 bit integer). The group_tag identifies which receivers are included in the group for the tagged flow control strategy. The group_min_size is the minimum number of receivers required for the flow control strategy to indicate that the Publication should be connected.

The timeout specifies how long the flow control strategy should wait before removing receivers that have not sent status messages from the list of valid receivers. An optional time unit is allowed, if not time unit is nanoseconds.

Some of the parameters are not relevant for the different flow control strategies, however as long the parameters are properly formed, the URI won't be rejected it will just ignore the ones which don't apply. This is to allow the additional parameters in the future.

Examples:

aeron:udp?endpoint=224.20.30.39:24326|fc=max
aeron:udp?endpoint=224.20.30.39:24326|fc=min
aeron:udp?endpoint=224.20.30.39:24326|fc=tagged

When specifying the flow control strategy you can use default values or those specified in the context for group parameters and timeouts.

aeron:udp?endpoint=224.20.30.39:24326|fc=min,g:/3,t:2s
aeron:udp?endpoint=224.20.30.39:24326|fc=min,g:/3,t:2000ms
aeron:udp?endpoint=224.20.30.39:24326|fc=min,g:/3,t:2000000000
aeron:udp?endpoint=224.20.30.39:24326|fc=min,g:1001/3,t:2000000000

All of the preceding are valid and set the min strategy with a minimum of 3 receivers for connectivity and timeout of 2 seconds. In the final case the group id is specified but ignored.

aeron:udp?endpoint=224.20.30.39:24326|fc=tagged,g:1001/3,t:2s
aeron:udp?endpoint=224.20.30.39:24326|fc=tagged,g:1001,t:2s
aeron:udp?endpoint=224.20.30.39:24326|fc=tagged,g:/3,t:2s
aeron:udp?endpoint=224.20.30.39:24326|fc=tagged,t:2s

The above use the tagged strategy. Where one of the parameters are not specified the value will be taken from the media driver context. When using the tagged strategy the receiver on the Subscription side will need to indicate that it is part of the group. This can be done either by a media driver parameter or via the gtag parameter on the URI. For example, a subscription to be included in the 1001 group then the following could be used:

aeron:udp?endpoint=224.20.30.39:24326|gtag=1001

The following are invalid and will be rejected:

aeron:udp?endpoint=224.20.30.39:24326|fc=tagged,g:1001/,t:2s
aeron:udp?endpoint=224.20.30.39:24326|fc=tagged,g:/,t:2s
aeron:udp?endpoint=224.20.30.39:24326|fc=tagged,g:,t:2s
aeron:udp?endpoint=224.20.30.39:24326|fc=tagged,t:2s

Wildcard Ports

At times it is possible that we don't want to pre-assign ports for the endpoint or control address. The common usecase for this is where we want to create a subscription that has a transient lifetime. For example, a channel provided to another service in order to receive messages, then be closed down later.

The simplest way to do this is to make use of the wildcard port (0) and let the operating system handle the port assignment. Aeron supports using wildcard ports and making the assigned ports available via the client from version 1.28.0.

Firstly declare the URI and create a subscription to it.

final String wildcardUriA = "aeron:udp?tags=1001|endpoint=192.168.10.1:0";
final Subscription subscriptionA1 = client.addSubscription(wildcardUriA);

We recommend using tags with wildcard ports so that further subscriptions to the same channel are possible. Therefore if you want to create a second subscription to the same endpoint use the tag. If you need a separate channel with a different port use a different tag.

final String tagUriA = "aeron:udp?tags=1001";
final Subscription subscriptionA2 = client.addSubscription(tagUriA, STREAM_ID);

final String wildcardUriB = "aeron:udp?tags=1002|endpoint=192.168.10.1:0";
final Subscription subscriptionB = client.addSubscription(wildcardUriB, STREAM_ID);

Tags should also be used when using wildcard control ports for publications and spies.

final String publicationUri = "aeron:udp?control=localhost:0|control-mode=dynamic|ssc=true|tags=1003";
final Publication publication = client.addPublication(publicationUri, STREAM_ID);

final String spyUri = "aeron:udp?tags=1003";
final Subscription spy = client.addSubscription(spyUri, STREAM_ID);

Once the subscription is established and bound, the list of local bound socket addresses is available via the Subscription.localSocketAddresses() method. In the case of MDS it is possible to have multiple destinations on the same subscription returned as a list. If the socket has not been bound, as there is a possible delay between creating a subscription and it being bound, then the list will be empty. The result will be a list of strings formatted in the same manner as an IP address that is used in an Aeron URI, so they can be used directly in a URI for a publication.

List<String> localSocketAddresses
while ((localSocketAddresses = subscriptionA1.localSocketAddresses()).isEmpty())
{
    Thread.yield();
    // Check for timeout...
}

// Assuming a simple unicast subscription
final String uri = new ChannelUriStringBuilder()
    .media("udp")
    .endpoint(localSocketAddresses.get(0))
    .build();

final Publication publication = client.addPublication(uri, STREAM_ID);

The same approach can be used when setting up explicit control addresses for publications. Use Publication.localSocketAddresses() to get the local socket address for the control. This returns a list for symmetry with the subscription, but as of version 1.28.0 it will at most have 1 entry.

Durations

Some of the parameters on a channel reference a length of time. For convenience these can be express with a suffix to represent units. The following suffixes are supported:

Suffix Units Example
s seconds 10s
ms milliseconds 10ms
us microseconds 10us
ns nanoseconds 10ns

Timestamps

Aeron supports the capturing of timestamps within the system - 3 points within the C driver (media-rcv-ts-offset, channel-rcv-ts-offset, channel-snd-ts-offset), 2 within the Java Driver (channel-rcv-ts-offset, channel-snd-ts-offset). These can be used to measure queuing between different parts of the system. Both drivers support capturing timestamps as the message moves through the channel endpoints. On Linux the C driver supports capture packet level timestamps. Linux has broad support for packet timestamps. Aeron uses SO_TIMESTAMPNS and SOF_TIMESTAMPING_RX_HARDWARE, which generate nanosecond precision timestamps using network card hardware when available and will fall back to software timestamps otherwise.

If the machine's network card's supports packet timestamping, then this will need to be enabled on the system. This is a privileged operation, so cannot be set up automatically by Aeron. Look at the hwstamp_ctl program provided the linuxptp (precision time protocol) package as a means to do this.

The easiest way to make the timestamp visible to the user was to store them in the incoming packet, so all of the configuration options take an offset from the beginning of the message. If more than one timestamp is required, then space will need to be made within the application message for the value to be stored. Care is required as this may overwrite application data. The special value of reserved can also be used, this will store the timestamp within the reserved field. There is only one reserved field available on the message, so it can only be used for a single timestamp. Timestamps are a little-endian 64-bit integer which encodes the number of nanoseconds since the epoch 1 Jan 1970.

Note that the timestamp will only be applied to the first fragment if multiple fragments are received in a single batch. Client code that uses the timestamps will need to cater for this, i.e. if the timestamp is not set, then it should assume that the previous timestamp received is the one to use.

Timestamps are not supported on IPC channels as the driver is not in the message path. A sending timestamp can be added at time of publication to the reserved field by providing a ReservedValueSupplier as an argument to the offer methods or by setting BufferClaim.reservedValue on the claim before committing.

Using media driver timestamps:

// Create a URI that will enable timestamping.
// aeron:udp?endpoint=localhost:9090|media-rcv-ts-offset=reserved|channel-rcv-ts-offset=0|channel-snd-ts-offset=8
final String uri = new ChannelUriStringBuilder()
    .media("udp")
    .endpoint("localhost")
    .mediaReceiveTimestampOffset("reserved") // Store the media timestamp in the reserved field (generated by NIC if available)
    .channelReceiveTimestampOffset("0")      // Store the channel receive timestamp at offset 0
    .channelSendTimestampOffset("8")         // Store the channel send timestamp at offset 8
    .build();

// Create publication and subscription as normal.  Technically the publication only requires channel-snd-ts-offset and the
// subscription media-rcv-ts-offset and channel-rcv-ts-offset.  However, it is often easier to use the same URI across both for simplicity.
final Subscription sub = aeron.addSubscription(uri, 10000);
final Publication pub = aeron.addPublication(uri, 10000);

final ExpandableArrayBuffer message = new ExpandableArrayBuffer(64);
final int messageStartOffset = 16;  // Allow padding in message for the timestamp fields. 
message.putLong(0, 0);   // Optional, use 0 as a sentinel value to check if the timestamp has been set.
message.putLong(8, 0);
final int messageLength = message.putStringAscii(messageStartOffset, "Hello World");
final int totalLength = messageStartOffset + messageLength;
pub.offer(message, 0, totalLength);

// Timestamps on messages can be read on the subscription side.  However, when messages are batched into a single MTU only the 
// first message of the batch will contain the timestamp values, so using the previously received values is the best option.
final class Poller implements FragmentHandler
{
    private long lastMediaRcvTimestamp = 0;
    private long lastChannelRcvTimestamp = 0;
    private long lastChannelSndTimestamp = 0;

    public void onFragment(final DirectBuffer buffer, final int offset, final int length, final Header header)
    {

        long mediaRcvTimestamp = lastMediaRcvTimestamp;
        if (0 != header.reservedValue())
        {
            mediaRcvTimestamp = header.reservedValue();
            lastMediaRcvTimestamp = mediaRcvTimestamp;
        }

        long channelRcvTimestamp = lastChannelRcvTimestamp;
        if (0 != buffer.getLong(offset))
        {
            channelRcvTimestamp = buffer.getLong(offset);
            lastChannelRcvTimestamp = channelRcvTimestamp;
        }

        long channelSndTimestamp = lastChannelSndTimestamp;
        if (0 != buffer.getLong(offset + 8))
        {
            channelSndTimestamp = buffer.getLong(offset + 8);
            lastChannelSndTimestamp = channelSndTimestamp;
        }

        final String message = buffer.getStringAscii(offset + 16);
        
        // Do something with timestamps
    }
}