Skip to content

Cpp OTF User Guide

Martin Thompson edited this page Dec 5, 2015 · 10 revisions

Some applications, such as network sniffers, need to process messages dynamically and thus have to use the Intermediate Representation to decode the messages on-the-fly (OTF). An example of using the OTF API can be found here. Doxygen documentation for the C++ OTF API can be found in the header files here. You will need to have doxygen installed to build the doc.

The C++ OTF decoder follows the design principles of the generated codec stubs.

Note: Due to the dynamic nature of OTF decoding, the stubs generated by the SBE compiler will yield greater relative performance.

Introduction

The C++ OTF decoder for SBE is concerned with being able to take a piece of data and a schema and being able to decode it, well, on-the-fly. The decoder uses a reactive, Rx, style API to accomplish this. Applications subclass callback interfaces, such as OnNext, OnError, and OnComplete to be notified of decoding actions. Listener is a decoding engine object. It is configured with the data to decode as well as the schema. The schema is represented in a "compiled" form called Intermediate Representation, or IR.

Listener - Encapsulation of a Decoding Engine

Instances of the Listener class may be reused for different buffers as well as different Ir. The methods use a fluent style for composition.

The basic usage pattern is:

  • Instantiate a Listener. This can be on the stack or via new, etc.
  • Set the Ir to use for the decoding that describes the data format
  • Pass in a pointer to the start of the data along with the length of the data in bytes
  • Subscribe callbacks for Field, Group, Error, and OnCompleted events. When called, Listener::subscribe initiates the decoding of the message. Thus all callbacks come from the calling thread.

This is demonstrated in the example below.

 Listener listener();                   // instantiate a decoder
 Ir ir(irBuffer, irLen);                // create an Ir object for the format based on Ir in buffer
 listener.resetForDecode(buffer, len)   // get ready to decode data located at buffer for len bytes
         .ir(ir)                        // pass in the Ir to use
         .subscribe(...);               // subscribe callbacks and initiate decoding

A more advanced usage pattern is when a header is used to dispatch to a set of different formats for the data. This is accomplished using the Listener::dispatchMessageByHeader method. An example is below.

 Listener listener();
 Ir headerIr(headerIrBuffer, headerIrLen);                   // the Ir for the header
 listener.resetForDecode(buffer, len)                        // get ready to decode data at buffer for len bytes
         .dispatchByMessageHeader(headerIr,                  // the Ir of the header
                                  irCallback)                // the callback called for dispatch choices
         .subscribe(...);

Decoding multiple messages in a single buffer is straight forward. Simply bump the pointer to the data by the offset of the Listener after it is done and reuse the decoder. The Listener keeps track of its current offset within the buffer and this offset can be retrieved via Listener::bufferOffset.

 Listener listener();
 Ir headerIr(headerIrBuffer, headerIrLen);
 listener.resetForDecode(buffer, len)
         .dispatchByMessageHeader(headerIr, irCallback)
         .subscribe(...);                                   // go ahead and decode single message header plus message and return

 listener.resetForDecode(buffer + listener.bufferOffset(), len - listener.bufferOffset())
         .dispatchByMessageHeader(headerIr, irCallback)
         .subscribe(...);                                   // go ahead and decode single message header plus message and return     

As fields and repeating groups are encountered in a message, the callbacks passed into Listener::subscribe(OnNext *, OnError *, OnCompleted *) will be called. Fields will be seen via the OnNext::onNext(const Field &) method. Group event, such as group start and end, will be seen via the OnNext::onNext(const Group &) method. Errors that occur will be seen via the OnError::onError(const Error &) method. Errors stop decoding of the current message. If a message is successfully completed, then a completion event will be seen via the OnCompleted::onCompleted(void) method. For Listener::subscribe, the OnError and OnCompleted arguments are optional and default to NULL. Below is a simple example.

class ExampleCallback : public OnNext, OnError, OnCompleted
{
    virtual int onNext(const Field &f)
    {
        ...         // handle Fields
        return 0;   // 0 for success and -1 for failure
    }

    virtual int onNext(const Group &g)
    {
        ...         // handle Group event
        return 0;   // 0 for success and -1 for failure
    }

    virtual int onError(const Error &e)
    {
        ...         // handle error
        return 0;   // 0 for success and -1 for failure
    }

    virtual int onCompleted(void)
    {
        ...         // handle completion event
        return 0;   // 0 for success and -1 for failure
    }
};

...
ExampleCallback cbs;
// set up listener and kick off decoding with subscribe
listener.dispatchMessageByHeader(...)
        .resetForDecode(...)
        .subscribe(&cbs, &cbs, &cbs);

Fields

During decoding a Listener will call OnNext::onNext(const Field &) and pass encountered fields to the application. These fields may be of varying types, including composites (or structs), enumerations, bit sets, or variable length data. All of these types may be accessed via the Field class.

For details of the Field class, see the header file here or the doxygen doc. Below is an example of accessing various field methods, types, etc. from the example.

class CarCallbacks : public OnNext
{
public:
    // callback for when a field is encountered
    virtual int onNext(const Field &f)
    {
        std::cout << "Field name=\"" << f.fieldName() << "\" id=" << f.schemaId();

        if (f.isComposite())
        {
            std::cout << ", composite name=\"" << f.compositeName() << "\"";
        }
        std::cout << std::endl;

        if (f.isEnum())
        {
            std::cout << " Enum [" << f.validValue() << "]";
            printEncoding(f, 0); // print the encoding. Index is 0.
        }
        else if (f.isSet())
        {
            std::cout << " Set ";
            // print the various names for the bits that are set
            for (std::vector<std::string>::iterator it = ((std::vector<std::string>&)f.choices()).begin(); it != f.choices().end(); ++it)
            {
                std::cout << "[" << *it << "]";
            }

            printEncoding(f, 0); // print the encoding. Index is 0.
        }
        else if (f.isVariableData())
        {
            // index 0 is the length field type, value, etc.
            // index 1 is the actual variable length data

            std::cout << " Variable Data length=" << f.length(1);

            char tmp[256];
            f.getArray(1, tmp, 0, f.length(1));  // copy the data
            std::cout << " value=\"" << std::string(tmp, f.length(1)) << "\"";

            std::cout << " presence=" << presenceStr(f.presence(1));
            std::cout << std::endl;
        }
        else // if not enum, set, or var data, then just normal encodings, but could be composite
        {
            for (int i = 0, size = f.numEncodings(); i < size; i++)
            {
                printEncoding(f, i);
            }
        }

        return 0;
    };

protected:

    // print out details of an encoding
    void printEncoding(const Field &f, int index)
    {
        std::cout << " name=\"" << f.encodingName(index) << "\" length=" << f.length(index);
        switch (f.primitiveType(index))
        {
            case Ir::CHAR:
                if (f.length(index) == 1)
                {
                    std::cout << " type=CHAR value=\"" << (char)f.getUInt(index) << "\"";
                }
                else
                {
                    char tmp[1024];

                    // copy data to temp array and print it out.
                    f.getArray(index, tmp, 0, f.length(index));
                    std::cout << " type=CHAR value=\"" << std::string(tmp, f.length(index)) << "\"";
                }
                break;
            case Ir::INT8:
                std::cout << " type=INT8 value=\"" << f.getInt(index) << "\"";
                break;
            case Ir::INT16:
                std::cout << " type=INT16 value=\"" << f.getInt(index) << "\"";
                break;
            case Ir::INT32:
                if (f.length() == 1)
                {
                    std::cout << " type=INT32 value=\"" << f.getInt(index) << "\"";
                }
                else
                {
                    char tmp[1024];

                    // copy data to temp array and print it out.
                    f.getArray(index, tmp, 0, f.length(index));
                    std::cout << " type=INT32 value=";
                    for (int i = 0, size = f.length(index); i < size; i++)
                    {
                        std::cout << "{" << *((int32_t *)(tmp + (sizeof(int32_t) * i))) << "}";
                    }
                }
                break;
            case Ir::INT64:
                std::cout << " type=INT64 value=\"" << f.getInt(index) << "\"";
                break;
            case Ir::UINT8:
                std::cout << " type=UINT8 value=\"" << f.getUInt(index) << "\"";
                break;
            case Ir::UINT16:
                std::cout << " type=UINT16 value=\"" << f.getUInt(index) << "\"";
                break;
            case Ir::UINT32:
                std::cout << " type=UINT32 value=\"" << f.getUInt(index) << "\"";
                break;
            case Ir::UINT64:
                std::cout << " type=UINT64 value=\"" << f.getUInt(index) << "\"";
                break;
            case Ir::FLOAT:
                std::cout << " type=FLOAT value=\"" << f.getDouble(index) << "\"";
                break;
            case Ir::DOUBLE:
                std::cout << " type=DOUBLE value=\"" << f.getDouble(index) << "\"";
                break;
            default:
                break;
        }
        std::cout << " presence=" << presenceStr(f.presence(index));
        std::cout << std::endl;
    }

    // print presence
    const char *presenceStr(Ir::TokenPresence presence)
    {
        switch (presence)
        {
            case Ir::REQUIRED:
                return "REQUIRED";
                break;

            case Ir::OPTIONAL:
                return "OPTIONAL";
                break;

            case Ir::CONSTANT:
                return "CONSTANT";
                break;

            default:
                return "UNKNOWN";
                break;

        }
    }
};

Groups

Groups are markers in the event sequence of calls to OnNext::onNext. Groups contain fields. When a group starts, OnNext::onNext(const Group &) is called with a Group::Event type of Group::START, the name of the group, the iteration number (starting at 0), and the expected number of iterations. After that, a set of calls to OnNext(const Field &) should occur. A group is ended by a call to OnNext::onNext(const Group &) with a Group::Event type of Group::END. Nested repeating groups are handled as one would expect with Group::START and Group::END within an existing Group sequence.

For details of the Group class, see the header file here or the doxygen doc. Below is an example of usage from the example.

class CarCallbacks : public OnNext
{
public:
    // save reference to listener for printing offset
    CarCallbacks(Listener &listener) : listener_(listener) , indent_(0) {};

    // callback for when a group is encountered
    virtual int onNext(const Group &g)
    {
        // group started
        if (g.event() == Group::START)
        {
            std::cout << "Group name=\"" << g.name() << "\" id=\"" << g.schemaId() << "\" start (";
            std::cout << g.iteration() << "/" << g.numInGroup() - 1 << "):" << "\n";

            if (g.iteration() == 1)
            {
                indent_++;
            }
        }
        else if (g.event() == Group::END)  // group ended
        {
            std::cout << "Group name=\"" << g.name() << "\" id=\"" << g.schemaId() << "\" end (";
            std::cout << g.iteration() << "/" << g.numInGroup() - 1 << "):" << "\n";

            if (g.iteration() == g.numInGroup() - 1)
            {
                indent_--;
            }
        }
        return 0;
    }

private:
    Listener &listener_;
    int indent_;
};

Error Handling & Message Completion

Decoding of a message stops when an error is encountered, such as the length of the buffer being too short, or the message is completed successfully. The former is signalled via the OnError::onError method. And the latter is signalled via the OnCompleted::onCompleted method. An example of usage taken from the example is below.

class CarCallbacks : public OnError, public OnCompleted
{
public:
    // save reference to listener for printing offset
    CarCallbacks(Listener &listener) : listener_(listener) , indent_(0) {};

    // callback for when an error is encountered
    virtual int onError(const Error &e)
    {
        std::cout << "Error " << e.message() << " at offset " << listener_.bufferOffset() << "\n";
        return 0;
    };

    // callback for when decoding is completed
    virtual int onCompleted()
    {
        std::cout << "Completed" << "\n";
        return 0;
    };

private:
    Listener &listener_;
    int indent_;
};

IR Collections

A convenience collection object for IR is provided via the IrCollection class. This class can read in a serialized IR file, created via SbeTool and provides it ready to go for decoding automatically. An example of usage is below. For more details, please see the header or the doxygen documentation.

// class to encapsulate Ir repository as well as Ir callback for dispatch
class IrRepo : public IrCollection, public Ir::Callback
{
public:
    // save a reference to the Listener so we can print out the offset
    IrRepo(Listener &listener) : listener_(listener) {};

    virtual Ir *irForTemplateId(const int templateId, const int templateVersion)
    {
        std::cout << "Message lookup id=" << templateId << " version " << templateVersion << " offset " << listener_.bufferOffset() << std::endl;

        // lookup in IrCollection the IR for the template ID and version
        return (Ir *)IrCollection::message(templateId, templateVersion);
    };

private:
    Listener &listener_;
};

Listener listener;
IrRepo repo(listener);
CarCallbacks carCbs(listener);

// load IR from .sbeir file
if (repo.loadFromFile(irFilename) < 0)
{
    std::cout << "could not load IR" << std::endl;
    exit(-1);
}

// load data for header + message into a buffer

// set up listener and kick off decoding with subscribe
listener.dispatchMessageByHeader(repo.header(), &repo)
        .resetForDecode(buffer, length)
        .subscribe(&carCbs, &carCbs, &carCbs);