Skip to content

Latest commit

 

History

History
177 lines (104 loc) · 18.1 KB

zmq_architecture.adoc

File metadata and controls

177 lines (104 loc) · 18.1 KB

NAME

zmq_architecture - 0MQ Internal Architecture

Introduction

This oage is an attempt to provide an overview of the libzmq internal architecture. It is not intended to go into too much detail. Details tend to change as time goes on and the document would get out of sync quickly. To get the details you should check the source code instead.

The first thing to do is to warn the reader that the codebase is complex. It’s not complex in terms of number of lines (currently it has ~10k lines) or by being spaghetti code (it’s not). Rather, it’s complex because of sheer amount of different combinations it takes into account. Consider that it runs on over ten operating systems each of them having several versions. It runs on many different microarchitctures ranging from ARM to Itanium. It can be compiled by different compilers, starting with gcc and ending with MSVC and SunStudio. It has to interact nicely with 20+ different language bindings. It allows to use different underlying transports as different as in-process message passing and reliable multicast. It supports different messaging patterns: remote procedure call, data distribution, parallel pipelining and more. Each socket can either connect to the peer or allow the peer to connect to it. It can even do both. Dialogue between two nodes can either survive breakages of the underlying connection or it can be transient. Etc. Etc. All these options are mutually orthogonal, giving literally thousands of possible combinations to take into account.

The moral of the above is that the code is complex even though it may look simple and it is easy to break. Till now, approximately ten man-years were invested into the project amounting to 2 hours spent on each single line of code, including lines like "i++;". Thus: Be careful and try to understand what the code does and why does it do it in that particular way before changing it.

Global state

Using global variables in a library is a pretty sure way to shoot yourself in a foot. Everything works well until the library gets linked into an executable twice (see the picture). At that point you’ll start getting bizzare errors and crashes.

[[=image arch1.png]]

To prevent this problem //libzmq// has no global variables. Instead, user of the library is responsible for creating the global state explicitly. Object containing the global state is called 'context'. While from the users perspective context looks more or less like a pool of I/O threads to be used with your sockets, from libzmq’s perspective it’s just an object to store any global state that we happen to need. For example, the list of available //inproc// endpoints is stored in the context. The list of sockets that have been closed but still linger in the memory because of unsent messages is held by context. Etc.

Context is implemented as class //ctx_t//.

Concurrency model

ØMQ’s concurency model may a bit confusing at first. The reason is that we eat our own dogfood and use message passing to achieve concurrency and internal scalability. Thus, even though ØMQ is a multithreaded application you won’t find mutexes, condition variables or semaphores meant to orchestrate the parallel processing. Instead, each object will live in its own thread and no other thread will ever touch it (that’s why mutexes are not needed). Other threads will communicate with the object by sending it messages (called 'commands' to distinguish them for user-level ØMQ messages). Same way the object can speak to other objects — potentially running in different threads — by sending them 'commands'.

From user’s point of view passing commands between objects is easy. Just derive your object from base class 'object_t' and that’s it. You can send commands and define handlers for incoming commands. Have a look at command.hpp file. It defines all available commands. Say, there is a 'term' command with a single argument called 'linger'. To send 'term' command with linger argument of 100 to object 'p' do this:

send_term (p, 100); [[/code]]

On the other hand, if you want to define handler for the 'term' command do this:

void my_object_t::process_term (int linger) { // Implement your action here. } [[/code]]

However, be aware that the above only works if you are derived from 'object_t' class!

For most commands, there’s a guarantee that the destination object won’t disappear while command is in-flight. (To understand the guarantee check the section explaining the object tree model used to tie asynchronous objects into well-defined hierarchies.) However, for couple of commands (basically for those that are sent across the object tree rather than along its latices) the guarantee doesn’t apply. For these commands the sender calls //inc_seqnum// function on the destination object, which synchronously increments a counter stored in the destination object (//sent_seqnum//) before sending the command itself. When the destination object processes the command, it increases another counter (//processed_seqnum//). When the object is shutting down, it knows that it can’t finish while //processed_seqnum// is less than //sent_seqnum//, i.e. there are still commands in flight to be delivered to this object. The whole process is done transparently in //object_t// and //own_t// classes. Command sender and command receiver can just send and receive commands and don’t have to care about the command sequence numbers.

Remark: Actually, some pieces of data //are// enclosed in critical sections. Two rules apply to choosing where to use critical section:

There’s a need for the data to be accessible from any thread at any time (say the list of existing //inproc// endpoints).

The data guarded by the critical section should never be touched on the critical path (message passing per se).

Threading Model

As for threads, there are only two kinds of them in ØMQ. Each thread is either 'application thread' i.e. thread created outside of ØMQ and used to access the API, or an I/O thread — created inside of ØMQ and used to send and receive messages in the background. 'thread_t' is a simple portability class to create threads in OS-agnostic manner.

It should be understood that while the above discussion is correct from OS’s point of view, ØMQ has a bit different notion of threads. From ØMQ’s perspective, thread is any object that has a 'mailbox'. Mailbox is basically a queue to store commands sent to any object living in that thread. The thread retrieves the commands from the mailbox in the order they were sent in and processes them one by one. The mailbox is implemented in 'mailbox_t' class.

There are two different kinds of threads as far as ØMQ is concerned: I/O threads and sockets.

I/O threads are easy. In this case OS thread and ØMQ thread correspond each to another. Particular I/O thread is running in its own OS thread and has a single mailbox for incoming commands.

Sockets are somehow more complex. In short, each ØMQ socket has its own mailbox for incoming commands and thus it is treated by ØMQ as a separate thread. In reality, single application thread can create multiple sockets meaning that in this case multiple ØMQ threads map to a single OS thread. To make it even more complex, ØMQ sockets can be migrated between OS threads. For example, Java binding may use ØMQ socket from a single thread, however, after the work is done, it passes it to the garbage collection thread to be destroyed. In such case the association between ØMQ thread and OS thread changes — the socket is said to be migrated to a different OS thread.

I/O threads

I/O threads are background threads used by ØMQ to handle network traffic in asynchronous way. The implementation is pretty minimal. //io_thread_t// class is derived from //thread_t// which is a simple compatibility wrapper on top of OS-specific threading API. It is also derived from //object_t// which makes it capable of sending and receiving commads (such as //stop// command which is sent to the I/O thread when the library is being terminated).

In addition to that, each I/O thread owns a poller object. Poller object (//poller_t//) is an abstraction over different polling mechanisms as provided by different OSes. It is a typedef for a preferred polling mechanism class, such as //select_t//, //poll_t//, //epoll_t// etc.

There’s a simple helper class called //io_object_t// which all the objects living in I/O threads are derived from. Thanks to that they are able to perform following functions: Register a file descriptor (//add_fd//). From that point on, a callback is invoked when something happens with the file descriptor (//in_event//, //out_event//). Once the file descriptor is not needed, the object can unregister it using //rm_fd// function. Object can also register a timer using //add_timer// function which causes //timer_event// to be invoked when the timer expires. Timer can be canceled using //cancel_timer// function.

[[=image io1.png]]

It is worth of noting that the //io_thread_t// itself registers a file descriptor with the poller it owns. It’s the file descriptor associated with its mailbox (recall that any thread, whether I/O or application thread have an associated mailbox). That fires the //in_event// on the //io_thread_t// when a new command arrives. //io_thread_t// then dispatches the command to its destined object.

Object trees

The internal objects created within ØMQ library, are, for the most part, organised into tree hierarchies. The root of the hierarchy is always a socket:

[[=image objtree1.png]]

Each object in the tree can live in a different thread. It is in no way bound to live in the same thread as its parent. The root of the tree (socket) lives in an application thread while the remaining objects live in I/O threads:

[[=image objtree2.png]]

The main raison d’être of the object trees is to provide deterministic shutdown mechanism. The rule of the thumb is that object asked to shut down sends shutdown request to all its children and waits for the confirmations before shutting down itself.

Note that the exchange of shutdown request and confirmation — which are both commands — effectively flushes all the commands currently on the flight between the two objects. That’s why most commands (those that are passed along latices of the object tree) don’t need to use command sequence counters (see above) to guarantee that the object won’t be shut down while there are still messages in flight aimed at it.

The shutdown process gets more complex when object decides to shut itself down without being asked to do so by the parent — such as when session object shuts down after the TCP disconnection. We have to account for parent-initiated termination, self-initiated termination and even the case when the two accidentally happen at the same time.

It turns out that all the cases can be solved by self-terminating object asking its parent to shut it down. The diagrams below are sequence diagrams for all the scenarios. Note that parent asks child to shut down sending it //term// command. Child confirm its termination by sending //term_ack// back to the parent. Additionally, child, when it wants to self-destruct, asks parent to shut it down by sending it //term_req// command.

[[=image objtree3.png]]

Note that in the last case, the //term_req// command is simply ignored and dropped by the parent. It knows it have already sent a termination request (//term//) to the child, so there’s no point in sending it anew. If it did send the second termination request, it would arrive at the child after it have been deallocated and trash the process by causing a segmentation fault or by overwriting the memory.

The object tree mechanism is implemented in //own_t// class. Note that //own_t// is derived from //object_t// and thus every object in the object tree can send and receive commands (it needs to do so during the termination sequence). However, the opposite is not true. There are objects that can send and receive commands, but are not part of an object tree (e.g. pipe endpoints).

The reaper thread

There’s one specific problem with the shutdown mechanism as described above. Shut down of any particular object (including socket) can take arbitrary amount of time. However, we would like //zmq_close// to have POSIX-like behaviour: You can close TCP socket, the call returns immediately, even if there’s pending outbound data to be sent to the peer later on.

So, call to //zmq_close// from an application thread should initiate the socket shutdown, however, we cannot rely on the said thread to do all the handshaking with the child objects. The thread may be already involved in doing something completely different and it may never even invoke ØMQ library anymore. Thus, the socket should be migrated to a worker thread that would handle all the handshaking in application thread’s stead.

The logical choice would be to migrate the socket to one of the I/O threads, however, ØMQ can be initialised with zero I/O threads (to be used exclusively for in-process communication). Thus, we need a dedicated thread to do the task. And reaper thread is exactly that.

It’s implemented as //reaper_t// class. The socket sends //reap// command to the reaper thread, which, upon receiving the command, takes care of the socket and shuts it down cleanly.

Messages

Before we can progress further, we have to know how ØMQ messages work.

The requirements for messages are rather complex. The main reason for complexity is that the implementation should be very efficient for both very small and very large messages. The requirements are as follows:

  • For very small messages it’s cheaper to copy the message than keep the shared data on the heap. These messages thus have no associated buffer and the data are stored directly in the //zmq_msg_t// structure — presumably on the stack. This has huge impact on performance as it almost entirely avoids need to do memory allocations/deallocations.

  • When using //inproc// transport, message should never be copied. Thus, the buffer sent in one thread, should be received in the other thread and get deallocated there.

  • Messages should support reference counting. Thus, if a message is published to many different TCP connections, all the sending I/O threads access the same buffer rather then copying the buffer for rach I/O thread or TCP connection.

  • The same trick should be accessible to user, so that he can send same physical buffer to multiple ØMQ sockets without need to copy the content.

  • User should be able to send buffer that was allocated by application-specific allocation mechanism without need to copy the data. This is especially important with legacy applications which allocate large amounts of data.

To achieve these goals messages in ØMQ look like this:

For very small messages (VSMs) the buffer is part of //zmq_msg_t// structure itself. This way the buffer is allocated on the stack and there’s no need for using allocation function, such as //malloc//, which tend to be the performance bottleneck for small message transfer. //content// field contains ZMQ_VSM constant. The message data are stored in //vsm_data// byte array. //vsm_size// specifies the length of the message:

[[=image msg1.png]]

Note that maximal size of the message that fits into //vsm_data// buffer is specified by ZMQ_MAX_VSM_SIZE constant. By default it is set to 30, but you can change the value when building the library.

For messages that won’t fit into //vsm_data// buffer we assume that allocating the buffer on heap is cheaper than copying the message data around all the time. We allocate the buffer on the heap and make //zmq_msg_t// structure point to it.

The structure allocated on the heap is called //msg_content_t// and it contains all the metadata relevant to the allocated chunk: its address (//data//), its size (//size//), funtion pointer to use to deallocate it (//ffn//) and a hint value to pass to the deallocation function (//hint//).

The buffer along with its metadata can be shared between several //zmq_msg_t// instances. In such a case we have to keep buffer’s reference count (//refcnt//) so that we can deallocate it when there are no more //zmq_msg_t// structures pointing to it:

[[=image msg2.png]]

As can be seen on the diagram, to minimise the number of allocations, buffer for message data and the metadata can be allocated in a single memory chunk.

Note that the reference counting mechanism is accessible to the user. Calling //zmq_msg_copy// doesn’t physically copy the buffer, instead it creates a new //zmq_msg_t// structure that points to the same buffer as the original message.

Finally, if message buffer is supplied by the user, we can’t store the buffer metadata in the same memory chunk and thus we have to allocate a separate chunk for the metadata:

[[=image msg3.png]]

Pipes

Message scheduling

There are different scheduling algorithms used in ØMQ, however, all of them work on a single datastructure, namely on a flat array of pipes. Some pipes are //active// meaning that they you can send/recv messages from them, some are //passive//, which means that message cannot be sent to the pipe because high watermark have been reached or that message cannot be read from the pipe because there is none.

If the individual pipes in the array were just flagged by active/passive flag, scheduling would be inefficient. For example, if there were 10,000 inbound pipes, all of them except of a single one passive, fair-queueing algorithm would have to check 9,999 pipes before receiving each single message.

To solve this problem, all the active pipes are located at the beginning of the list, while passive pipes are at the end. There’s a single variable (//active//) that determines how much of the pipes at the beginning of the array are active. The rest are passive.

Thus, when performing a scheduling algorithm, such as load-balancing or fair-queueing we have to bother only with the N initial pipes which are guaranteed to be active and completely ignore the rest. That kind of approach leads to O(1) scheduling.

A bit more complex part is how to activate and deactivate individual pipes. And it turns out that it’s not so complex after all and that it can be done in O(1) time. The diagram below shows deactivation of pipe X. Note that all that’s needed is swapping two elements in the array and decrementing the //active// variable:

[[=image sched1.png]]