Skip to content

Cpp Programming Guide

Martin Thompson edited this page Mar 31, 2022 · 27 revisions

C++11 Programming Guide

The Aeron API is designed to be as simple as possible and no simpler. In this guide, we will walk through a set of applications demonstrating specific points as we do. The entire applications can be found in the locations below. Note the the C++ implementation is client only, the Java Media Driver process must be running to support the C++ and Java clients.

NOTE: The Doxygen doc is the definitive source of documentation. Please consider this guide as only a starting point.

Aeron

Aeron client applications need to coordinate operation with a running Media Driver. This interaction handles creating Publications and Subscriptions and housekeeping. The interaction point for the application is the Aeron class.

Context context;

Aeron aeron(context);

Alternatively, it is preferred to use an Aeron instance wrapped in a std::shared_ptr and so, a static method is provided that handles this easily and mimics the semantics of the Java API. It uses std::make_shared as well.

Context context;

std::shared_ptr<Aeron> aeron = Aeron::connect(context);

Settings for the instance may be changed via a Context instance that is passed into the Aeron::connect method or passed into the Aeron constructor. Similar to the Java API, the C++11 API runs an internal thread for each Aeron instance. So, the material here applies equally.

Event Handling

Aeron instances have a set of handlers that might be called for some events. The application can specify these handlers via the Context instance used to create the Aeron instance.

  • Context::errorHandler lets the application specify a lambda to call when errors/exceptions occur.
  • Context::availableImageHandler specifies a lambda to call when new Images are encountered.
  • Context::unavailableImageHandler specifies a lambda to call when an Image becomes inactive. An image is the replication of the publication stream on the subscription side.
  • Context::newPublicationHandler specifies a lambda to call when the media driver successfully adds a Publication.
  • Context::newSubscriptionHandler specifies a lambda to call when the media driver successfully adds a Subscription.

These handlers are called from the ClientConductor thread.

From BasicSubscriber:

Context context;

context.newSubscriptionHandler(
    [](const std::string &channel, std::int32_t streamId, std::int64_t correlationId)
    {
        std::cout << "Subscription: " << channel << " " << correlationId << ":" << streamId << std::endl;
    });

context.availableImageHandler(
    [](Image &image)
    {
        std::cout << "Available image correlationId=" << image.correlationId() << " sessionId=" << image.sessionId();
        std::cout << " at position=" << image.position() << " from " << image.sourceIdentity() << std::endl;
    });

context.unavailableImageHandler(
    [](Image &image)
    {
        std::cout << "Unavailable image on correlationId=" << image.correlationId() << " sessionId=" << image.sessionId();
        std::cout << " at position=" << image.position() << " from " << image.sourceIdentity() << std::endl;
    });

std::shared_ptr<Aeron> aeron = Aeron::connect(context);

AtomicBuffer

Accessing and modifying buffers that Aeron uses for sending and receiving of messages is done via an instance of AtomicBuffer.

The methods should look familiar to anyone you uses Java ByteBuffer regularly. However, it can be easily bypassed for those more familiar handling pointers to memory.

An AtomicBuffer wraps a buffer. This can be done in the constructor or via AtomicBuffer::wrap. Access to the underlying buffer can be accessed via AtomicBuffer::buffer().

From BasicPublisher, putting some bytes into a buffer:

AERON_DECL_ALIGNED(buffer_t buffer, 16);
concurrent::AtomicBuffer srcBuffer(&buffer[0], buffer.size());
char message[256];

...

const int messageLen = ::snprintf(message, sizeof(message), "Hello World! %ld", i);
srcBuffer.putBytes(0, reinterpret_cast<std::uint8_t *>(message), messageLen);

For a subscriber, grabbing some bytes from a buffer:

[](const AtomicBuffer &buffer, util::index_t offset, util::index_t length, const Header &header)
{
    ...
    std::cout << std::string(reinterpret_cast<const char *>(buffer.buffer()) + offset, static_cast<std::size_t>(length)) << ">>" << std::endl;
};

Using a struct to overlay a buffer at an offset:

struct DataLayout
{
    std::uint16_t type;
    std::uint16_t version;
    // ...
};
...
DataLayout &data = buffer.overlayStruct<DataLayout>(0);

data.type = 5;
data.version = 1;

Subscription

An application that subscribes to data streams needs to use a channel and stream to listen on.

From BasicSubscriber, listen on a channel and a stream:

std::int64_t id = aeron->addSubscription(settings.channel, settings.streamId);

The Aeron.addSubscription method is non-blocking in the C++11 API. The method returns an id, called a registrationId that can be used to determine if the Media Driver has acknowledged the add command successfully, a timeout has occurred, or an error has been returned.

It is the applications responsibility to check for the status. This can be done via Aeron::findSubscription.

From BasicSubscriber, wait until Media Driver has responded or an error/timeout has occurred.

std::shared_ptr<Subscription> subscription = aeron->findSubscription(id);

while (!subscription)
{
    std::this_thread::yield();
    subscription = aeron->findSubscription(id);
}

fragment_handler_t

Messages arrive into Subscription instances via fragment_handler_t method calls. The arguments are:

  • buffer holding the data
  • offset indicating the offset in the buffer that starts the message
  • length of the message
  • header holding the metadata of the message

Example of printing the contents of a message as a string along with some metadata:

[](const AtomicBuffer &buffer, util::index_t offset, util::index_t length, const Header &header)
{
    std::cout << "Message to stream " << header.streamId() << " from session " << header.sessionId();
    std::cout << "(" << length << "@" << offset << ") <<";
    std::cout << std::string(reinterpret_cast<const char *>(buffer.buffer()) + offset, static_cast<std::size_t>(length)) << ">>" << std::endl;
};

Polling

Subscribing applications totally control when data is delivered to the fragment_handler_t methods via the Subscription::poll method. When called, this method determines if there is any fragments to deliver and delivers them via the passed in fragment_handler_t up to the limit of the number of fragments to deliver before returning.

Example of polling for new messages with a per poll limit of 10 fragments and an Idle Strategy:

fragment_handler_t handler = printStringMessage();
SleepingIdleStrategy idleStrategy(IDLE_SLEEP_MS);

while (running)
{
    const int fragmentsRead = subscription->poll(handler, 10);
    idleStrategy.idle(fragmentsRead);
}

Message Reassembly

Publication instances automatically fragment large messages into data frames that Aeron sends. Subscription instances that desire these fragments to be reassembled prior to delivery to the fragment_handler_t by chaining an instance of FragmentAssembler to do this.

FragmentAssembler fragmentAssembler(printStringMessage());
fragment_handler_t handler = fragmentAssembler.handler();
SleepingIdleStrategy idleStrategy(IDLE_SLEEP_MS);

while (running)
{
    const int fragmentsRead = subscription->poll(handler, 10);
    idleStrategy.idle(fragmentsRead);
}

Note: Reassembly has been shown to be minimal impact to latency. But not totally negligible. If the lowest latency is desired, then limiting message sizes to MTU size is a good practice.

Note: There is a maximum length allowed for messages which is the min of 1/8th a term length or 16MB. Messages larger than this should chunked using an application level chunking protocol. Chunking has better recovery properties from failure and streams with mechanical sympathy.

Publication

An application that desires to send data needs to specify a channel and stream to send to.

From BasicPublisher, send to a channel and a stream:

std::int64_t id = aeron->addPublication(settings.channel, settings.streamId);

The Aeron.addPublication method is non-blocking in the C++11 API. The method returns an id, called a registrationId that can be used to determine if the Media Driver has acknowledged the add command successfully, a timeout has occurred, or an error has been returned.

It is the applications responsibility to check for the status. This can be done via Aeron::findPublication.

From BasicPublisher, wait until Media Driver has responded or an error/timeout has occurred.

std::shared_ptr<Publication> publication = aeron->findPublication(id);

while (!publication)
{
    std::this_thread::yield();
    publication = aeron->findPublication(id);
}

After successful acknowledgement, the application is free to send data via the Publication::offer method.

AERON_DECL_ALIGNED(buffer_t buffer, 16);
concurrent::AtomicBuffer srcBuffer(&buffer[0], buffer.size());
char message[256];
...
const int messageLen = ::snprintf(message, sizeof(message), "Hello World! %ld", i);

srcBuffer.putBytes(0, reinterpret_cast<std::uint8_t *>(message), messageLen);

const std::int64_t result = publication->offer(srcBuffer, 0, messageLen);

Handling Back Pressure

Aeron has built in back pressure for a publisher. It will not allow a publisher to send data that exceeds proscribed flow control limits.

When calling Publication::offer a return value greater than 0 indicates the message was sent. Negative values indicate that the messages has not be enqueued for sending. Constants for negative values are as follows:

  • NOT_CONNECTED means no subscriber is connected to the publication, this can be a transient state as subscribers come and go.
  • BACK_PRESSURED indicates the message was not sent due to back pressure from Subscribers, but can be retried if desired.
  • ADMIN_ACTION indicates the message was not sent due to an administration action, such as log rotation, but can be retried if desired.
  • PUBLICATION_CLOSED indicates the message was not sent due to the Publication being closed. This is a permanent error.
  • MAX_POSITION_EXCEEDED indicates that the Publication has reached the maximum possible position given the term-length. This is possible with a small term-length. Max position is 2^31 * term-length for a Publication.

The ways that an application may handle back pressure are, by necessity, dependent on the application semantics. Here are a few options. This is not an exhaustive list.

  • Retry until success. Keep calling Publication::offer until it succeeds. This may spin or have some sort of idle strategy. Many examples do this.
  • Ignore failure and continue. Ignore that the data didn't send and move on. This is usually appropriate for situations where the data being sent has some lifetime and it would be better to not send stale data.
  • Retry until success or timeout. As normal retry with or without some sort of idle strategy but with a timeout attached.
  • Retry asynchronously. Retry periodically, but instead of idling, do some other work.

The needs of an application, or system, are quite complex. The common use case is one of non-blocking offer, though. Out of this more complex scenarios may be developed.

Monitoring

The Aeron Media Driver and the status of various buffers may be monitored outside of the driver via the counter files in use by the driver. Below is the major parts of an example application, AeronStat that reads this data and prints it periodically.

Settings settings = parseCmdLine(cp, argc, argv);

MemoryMappedFile::ptr_t cncFile = MemoryMappedFile::mapExistingReadOnly(
    (settings.basePath + "/" + CncFileDescriptor::CNC_FILE).c_str());

const std::int32_t cncVersion = CncFileDescriptor::cncVersionVolatile(cncFile);

if (semanticVersionMajor(cncVersion) != semanticVersionMajor(CncFileDescriptor::CNC_VERSION))
{
    std::cerr << "CNC version not supported: "
    << " file=" << semanticVersionToString(cncVersion)
    << " app=" << semanticVersionToString(CncFileDescriptor::CNC_VERSION) << std::endl;

    return EXIT_FAILURE;
}

const std::int64_t clientLivenessTimeoutNs = CncFileDescriptor::clientLivenessTimeout(cncFile);
const std::int64_t pid = CncFileDescriptor::pid(cncFile);

AtomicBuffer metadataBuffer = CncFileDescriptor::createCounterMetadataBuffer(cncFile);
AtomicBuffer valuesBuffer = CncFileDescriptor::createCounterValuesBuffer(cncFile);

CountersReader counters(metadataBuffer, valuesBuffer);

while (running)
{
    time_t rawtime;
    char currentTime[80];

    ::time(&rawtime);
    struct tm localTm;

    ::localtime_r(&rawtime, &localTm);

    ::strftime(currentTime, sizeof(currentTime) - 1, "%H:%M:%S", &localTm);

    std::printf("\033[H\033[2J");

    std::printf(
        "%s - Aeron Stat (CnC v%s), pid %" PRId64 ", client liveness %s ns\n",
        currentTime,
        semanticVersionToString(cncVersion).c_str(),
        pid,
        toStringWithCommas(clientLivenessTimeoutNs).c_str());
    std::printf("===========================\n");

    counters.forEach(
        [&](std::int32_t counterId, std::int32_t, const AtomicBuffer &, const std::string &label)
        {
            std::int64_t value = counters.getCounterValue(counterId);

            std::printf("%3d: %20s - %s\n", counterId, toStringWithCommas(value).c_str(), label.c_str());
        });

    std::this_thread::sleep_for(std::chrono::milliseconds(settings.updateIntervalMs));
}

std::cout << "Exiting..." << std::endl;

The AeronStat application above does the following:

  1. Find the CnC file in the file system
  2. Map the file and return AtomicBuffer instances that expose the counters metadata and values
  3. Use a CountersReader to access the metadata and values.
  4. While running, in a loop do the following:
    1. Grab the time and print it out along with the version and client liveness value.
    2. For each counter, print out a line with the counter id, value, and label.