Permalink
Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
889 lines (596 sloc) 62.3 KB
.output chapter4.wd
++ Chapter Four - Advanced Request-Reply: Reliability
In Chapter Three we looked at advanced use of 0MQ's request-reply pattern with worked examples. In this chapter we'll look at the general question of reliability and build a set of reliable messaging patterns on top of 0MQ's core request-reply pattern.
We'll cover:
* How to define 'reliability' in 0MQ applications.
* How to implement reliability on top of the 0MQ core patterns.
* How to implement heartbeating between 0MQ peers.
* How to write a reusable protocol specification.
* How to design a service-oriented framework API.
* How to implement service discovery.
* How to implement non-idempotent server applications.
* How to implement disk-based reliability.
In this chapter we focus heavily on user-space 'patterns', which are reusable models that help you design your 0MQ architecture:
* The //Lazy Pirate// pattern: reliable request reply from the client side.
* The //Simple Pirate// pattern: reliable request-reply using a LRU queue.
* The //Paranoid Pirate// pattern: reliable request-reply with heartbeating.
* The //Majordomo// pattern: service-oriented reliable queuing.
* The //Titanic// pattern: disk-based / disconnected reliable queuing.
* The //Freelance// pattern: brokerless reliable request-reply.
+++ What is "Reliability"?
To understand what 'reliability' means, we have to look at its opposite, namely *failure*. If we can handle a certain set of failures, we are reliable with respect to those failures. No more, no less. So let's look at the possible causes of failure in a distributed 0MQ application, in roughly descending order of probability:
* Application code is the worst offender. It can crash and exit, freeze and stop responding to input, run too slowly for its input, exhaust all memory, etc.
* System code - like brokers we write using 0MQ - can die. System code should be more reliable than application code but can still crash and burn, and especially run out of memory if it tries to compensate for slow clients.
* Message queues can overflow, typically in system code that has learned to deal brutally with slow clients. When a queue overflows, it starts to discard messages.
* Networks can fail temporarily, causing intermittent message loss. Such errors are hidden to 0MQ applications since it automatically reconnects peers after a network-forced disconnection.
* Hardware can fail and take with it all the processes running on that box.
* Networks can fail in exotic ways, e.g. some ports on a switch may die and those parts of the network become inaccessible.
* Entire data centers can be struck by lightning, earthquakes, fire, or more mundane power or cooling failures.
To make a software system fully reliable against //all// of these possible failures is an enormously difficult and expensive job and goes beyond the scope of this modest guide.
Since the first five cases cover 99.9% of real world requirements outside large companies (according to a highly scientific study I just ran), that's what we'll look at. If you're a large company with money to spend on the last two cases, contact me immediately, there's a large hole behind my beach house waiting to be converted into a pool.
+++ Designing Reliability
So to make things brutally simple, reliability is "keeping things working properly when code freezes or crashes", a situation we'll shorten to "dies". However the things we want to keep working properly are more complex than just messages. We need to take each core 0MQ messaging pattern and see how to make it work (if we can) even when code dies.
Let's take them one by one:
* Request-reply: if the server dies (while processing a request), the client can figure that out since it won't get an answer back. Then it can give up in a huff, wait and try again later, find another server, etc. As for the client dying, we can brush that off as "someone else's problem" for now.
* Publish-subscribe: if the client dies (having gotten some data), the server doesn't know about it. Pubsub doesn't send any information back from client to server. But the client can contact the server out-of-band, e.g. via request-reply, and ask, "please resend everything I missed". As for the server dying, that's out of scope for here. Subscribers can also self-verify that they're not running too slowly, and take action (e.g. warn the operator, and die) if they are.
* Pipeline: if a worker dies (while working), the ventilator doesn't know about it. Pipelines, like pubsub, and the grinding gears of time, only work in one direction. But the downstream collector can detect that one task didn't get done, and send a message back to the ventilator saying, "hey, resend task 324!" If the ventilator or collector die, then whatever upstream client originally sent the work batch can get tired of waiting and resend the whole lot. It's not elegant but system code should really not die often enough to matter.
In this chapter we'll focus on request-reply, and we'll cover reliable pub-sub and pipeline in the following chapters.
The basic request-reply pattern (a REQ client socket doing a blocking send/recv to a REP server socket) scores low on handling the most common types of failure. If the server crashes while processing the request, the client just hangs forever. If the network loses the request or the reply, the client hangs forever.
It is a lot better than TCP, thanks to 0MQ's ability to reconnect peers silently, to load-balance messages, and so on. But it's still not good enough for real work. The only use case where you can trust the basic request-reply pattern is between two threads in the same process where there's no network or separate server process to die.
However, with a little extra work this humble pattern becomes a good basis for real work across a distributed network, and we get a reliable request-reply pattern I like to call the "Pirate" pattern. RRR!
There are, roughly, three ways to connect clients to servers, each needing a specific approach to reliability:
* Multiple clients talking directly to a single server. Use case: single well-known server that clients need to talk to. Types of failure we aim to handle: server crashes and restarts, network disconnects.
* Multiple clients talking to a single queue device that distributes work to multiple servers. Use case: workload distribution to workers. Types of failure we aim to handle: worker crashes and restarts, worker busy looping, worker overload, queue crashes and restarts, network disconnects.
* Multiple clients talking to multiple servers with no intermediary devices. Use case: distributed services such as name resolution. Types of failure we aim to handle: service crashes and restarts, service busy looping, service overload, network disconnects.
Each of these has their trade-offs and often you'll mix them. We'll look at all three of these in detail.
+++ Client-side Reliability (Lazy Pirate Pattern)
We can get very simple reliable request-reply with only some changes in the client. We call this the Lazy Pirate pattern. Rather than doing a blocking receive, we:
* Poll the REQ socket and only receive from it when it's sure a reply has arrived.
* Resend a request several times, it no reply arrived within a timeout period.
* Abandon the transaction if after several requests, there is still no reply.
[[code type="textdiagram"]]
+-----------+ +-----------+ +-----------+
| Client | | Client | | Client |
+-----------+ +-----------+ +-----------+
| Retry | | Retry | | Retry |
+-----------+ +-----------+ +-----------+
| REQ | | REQ | | REQ |
\-----------/ \-----------/ \-----------/
^ ^ ^
| | |
\---------------+---------------/
|
v
/-------------\
| REP |
+-------------+
| |
| Server |
| |
+-------------+
Figure # - Lazy Pirate pattern
[[/code]]
If you try to use a REQ socket in anything than a strict send-recv fashion, you'll get an error (technically, the REQ socket implements a small finite-state machine to enforce the send-recv ping-pong, and so the error code is called "EFSM"). This is slightly annoying when we want to use REQ in a pirate pattern, because we may send several requests before getting a reply. The pretty good brute-force solution is to close and reopen the REQ socket after an error:
[[code type="example" title="Lazy Pirate client" name="lpclient"]]
[[/code]]
Run this together with the matching server:
[[code type="example" title="Lazy Pirate server" name="lpserver"]]
[[/code]]
To run this testcase, start the client and the server in two console windows. The server will randomly misbehave after a few messages. You can check the client's response. Here is a typical output from the server:
[[code]]
I: normal request (1)
I: normal request (2)
I: normal request (3)
I: simulating CPU overload
I: normal request (4)
I: simulating a crash
[[/code]]
And here is the client's response:
[[code]]
I: connecting to server...
I: server replied OK (1)
I: server replied OK (2)
I: server replied OK (3)
W: no response from server, retrying...
I: connecting to server...
W: no response from server, retrying...
I: connecting to server...
E: server seems to be offline, abandoning
[[/code]]
The client sequences each message, and checks that replies come back exactly in order: that no requests or replies are lost, and no replies come back more than once, or out of order. Run the test a few times until you're convinced this mechanism actually works. You don't need sequence numbers in reality, they just help us trust our design.
The client uses a REQ socket, and does the brute-force close/reopen because REQ sockets impose a strict send/receive cycle. You might be tempted to use a DEALER (XREQ) instead, but it would not be a good decision. First, it would mean emulating the secret sauce that REQ does with envelopes (if you've forgotten what that is, it's a good sign you don't want to have to do it). Second, it would mean potentially getting back replies that you didn't expect.
Handling failures only at the client works when we have a set of clients talking to a single server. It can handle a server crash, but only if recovery means restarting that same server. If there's a permanent error - e.g. a dead power supply on the server hardware - this approach won't work. Since the application code in servers is usually the biggest source of failures in any architecture, depending on a single server is not a great idea.
So, pros and cons:
* Pro: simple to understand and implement.
* Pro: works easily with existing client and server application code.
* Pro: 0MQ automatically retries the actual reconnection until it works.
* Con: doesn't do failover to backup / alternate servers.
+++ Basic Reliable Queuing (Simple Pirate Pattern)
Our second approach takes Lazy Pirate pattern and extends it with a queue device that lets us talk, transparently, to multiple servers, which we can more accurately call 'workers'. We'll develop this in stages, starting with a minimal working model, the Simple Pirate pattern.
In all these Pirate patterns, workers are stateless, or have some shared state we don't know about, e.g. a shared database. Having a queue device means workers can come and go without clients knowing anything about it. If one worker dies, another takes over. This is a nice simple topology with only one real weakness, namely the central queue itself, which can become a problem to manage, and a single point of failure.
The basis for the queue device is the least-recently-used (LRU) routing queue from Chapter 3. What is the very //minimum// we need to do to handle dead or blocked workers? Turns out, its surprisingly little. We already have a retry mechanism in the client. So using the standard LRU queue will work pretty well. This fits with 0MQ's philosophy that we can extend a peer-to-peer pattern like request-reply by plugging naive devices in the middle:
[[code type="textdiagram"]]
+-----------+ +-----------+ +-----------+
| Client | | Client | | Client |
+-----------+ +-----------+ +-----------+
| Retry | | Retry | | Retry |
+-----------+ +-----------+ +-----------+
| REQ | | REQ | | REQ |
\-----------/ \-----------/ \-----------/
^ ^ ^
| | |
\---------------+---------------/
|
v
/-----------\
| ROUTER |
+-----------+
| LRU |
| Queue |
+-----------+
| ROUTER |
\-----------/
^
|
/---------------+---------------\
| | |
v v v
/-----------\ /-----------\ /-----------\
| REQ | | REQ | | REQ |
+-----------+ +-----------+ +-----------+
| LRU | | LRU | | LRU |
| Worker | | Worker | | Worker |
+-----------+ +-----------+ +-----------+
Figure # - Simple Pirate Pattern
[[/code]]
We don't need a special client, we're still using the Lazy Pirate client. Here is the queue, which is exactly a LRU queue, no more or less:
[[code type="example" title="Simple Pirate queue" name="spqueue"]]
[[/code]]
Here is the worker, which takes the Lazy Pirate server and adapts it for the LRU pattern (using the REQ 'ready' signaling):
[[code type="example" title="Simple Pirate worker" name="spworker"]]
[[/code]]
To test this, start a handlful of workers, a client, and the queue, in any order. You'll see that the workers eventually all crash and burn, and the client retries and then gives up. The queue never stops, and you can restart workers and clients ad-nauseam. This model works with any number of clients and workers.
+++ Robust Reliable Queuing (Paranoid Pirate Pattern)
The Simple Pirate Queue pattern works pretty well, especially since it's just a combination of two existing patterns, but it has some weaknesses:
* It's not robust against a queue crash and restart. The client will recover, but the workers won't. While 0MQ will reconnect workers' sockets automatically, as far as the newly started queue is concerned, the workers haven't signalled "READY", so don't exist. To fix this we have to do heartbeating from queue to worker, so that the worker can detect when the queue has gone away.
* The queue does not detect worker failure, so if a worker dies while idle, the queue can only remove it from its worker queue by first sending it a request. The client waits and retries for nothing. It's not a critical problem but it's not nice. To make this work properly we do heartbeating from worker to queue, so that the queue can detect a lost worker at any stage.
We'll fix these in a properly pedantic Paranoid Pirate Pattern.
We previously used a REQ socket for the worker. For the Paranoid Pirate worker we'll switch to a DEALER (XREQ) socket. This has the advantage of letting us send and receive messages at any time, rather than the lock-step send/receive that REQ imposes. The downside of DEALER is that we have to do our own envelope management. If you don't know what I mean, please re-read Chapter 3.
[[code type="textdiagram"]]
+-----------+ +-----------+ +-----------+
| Client | | Client | | Client |
+-----------+ +-----------+ +-----------+
| Retry | | Retry | | Retry |
+-----------+ +-----------+ +-----------+
| REQ | | REQ | | REQ |
\-----------/ \-----------/ \-----------/
^ ^ ^
| | |
\---------------+---------------/
|
v
/-----------\
| ROUTER |
+-----------+
| Queue |
+-----------+
| Heartbeat |
+-----------+
| ROUTER |
\-----------/
^
|
/---------------+---------------\
| | |
v v v
/-----------\ /-----------\ /-----------\
| DEALER | | DEALER | | DEALER |
+-----------+ +-----------+ +-----------+
| Heartbeat | | Heartbeat | | Heartbeat |
+-----------+ +-----------+ +-----------+
| Worker | | Worker | | Worker |
+-----------+ +-----------+ +-----------+
Figure # - Paranoid Pirate Pattern
[[/code]]
We're still using the Lazy Pirate client. Here is the Paranoid Pirate queue device:
[[code type="example" title="Paranoid Pirate queue" name="ppqueue"]]
[[/code]]
Some comments about this example:
* In C, it's quite horrid to manage any kind of data structure. The queue really needs two data structures: a least-recently used list of servers, and a hash of the same set of servers. The C code is not optimized, and won't scale as such. A proper version would use hash and list containers such as the [http://zfl.zeromq.org ZFL project] provides.
* The queue extends the LRU pattern with heartbeating of workers. It's simple once it works, but quite difficult to invent. I'll explain more about heartbeating in a second.
Here is the Paranoid Pirate worker:
[[code type="example" title="Paranoid Pirate worker" name="ppworker"]]
[[/code]]
Some comments about this example:
* The code includes simulation of failures, as before. This makes it (a) very hard to debug, and (b) dangerous to reuse. When you want to debug this, disable the failure simulation.
* As for the Paranoid Pirate queue, the heartbeating is quite tricky to get right. See below for a discussion about this.
* The worker uses a reconnect strategy similar to the one we designed for the Lazy Pirate client. With two major differences: (a) it does an exponential back-off, and (b) it never abandons.
Try the client, queue, and workers, e.g. using a script like this:
[[code]]
ppqueue &
for i in 1 2 3 4; do
ppworker &
sleep 1
done
lpclient &
[[/code]]
You should see the workers die, one by one, as they simulate a crash, and the client eventually give up. You can stop and restart the queue and both client and workers will reconnect and carry on. And matter what you do to queues and workers, the client will never get an out-of-order reply: the whole chain either works, or the client abandons.
+++ Heartbeating
When writing the Paranoid Pirate examples, it took about five hours to get the queue-to-worker heartbeating working properly. The rest of the request-reply chain took perhaps ten minutes. Heartbeating is one of those reliability layers that often causes more trouble than it saves. It is especially easy to create 'false failures', i.e. peers decide that they are disconnected because the heartbeats aren't sent properly.
Some points to consider when understanding and implementing heartbeating:
* Note that heartbeats are not request-reply. They flow asynchronously in both directions. Either peer can decide the other is 'dead' and stop talking to it.
* If one of the peers uses durable sockets, this means it may get heartbeats queued up that it will receive if it reconnects. For this reason, workers should //not// reuse durable sockets. The example code uses durable sockets for debugging purposes but they are randomized to (in theory) never reuse an existing socket.
* First, get the heartbeating working, and only //then// add in the rest of the message flow. You should be able to prove the heartbeating works by starting peers in any order, stopping and restarting them, simulating freezes, and so on.
* When your main loop is based on zmq_poll[3], use a secondary timer to trigger heartbeats. Do //not// use the poll loop for this, because it will either send too many heartbeats (overloading the network), or too few (causing peers to disconnect). The zhelpers package provides an s_clock() method that returns the current system clock in milliseconds. It's easy to use this to calculate when to send the next heartbeats. Thus, in C:
[[code]]
// Send out heartbeats at regular intervals
uint64_t heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;
while (1) {
...
zmq_poll (items, 1, HEARTBEAT_INTERVAL * 1000);
...
// Do this unconditionally, whatever zmq_poll did
if (s_clock () > heartbeat_at) {
... Send heartbeats to all peers that expect them
// Set timer for next heartbeat
heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;
}
}
[[/code]]
* Your main poll loop should use the heartbeat interval as its timeout. Obviously, don't use infinity. Anything less will just waste cycles.
* Use simple tracing, i.e. print to console, to get this working. Some tricks to help you trace the flow of messages between peers: a dump method such as zmsg offers; number messages incrementally so you can see if there are gaps.
* In a real application, heartbeating must be configurable and usually negotiated with the peer. Some peers will want aggressive heartbeating, as low as 10 msecs. Other peers will be far away and want heartbeating as high as 30 seconds.
* If you have different heartbeat intervals for different peers, your poll timeout should be the lowest of these.
* You might be tempted to open a separate socket dialog for heartbeats. This is superficially nice because you can separate different dialogs, e.g. the synchronous request-reply from the asynchronous heartbeating. However it's a bad idea for several reasons. First, if you're sending data you don't need to send heartbeats. Second, sockets may, due to network vagaries, become jammed. You need to know when your main data socket is silent because it's dead, rather than just not busy, so you need heartbeats on that socket. Lastly, two sockets is more complex than one.
* We're not doing heartbeating from client to queue. We could, but it would add //significant// complexity for no real benefit.
+++ Contracts and Protocols
If you're paying attention you'll realize that Paranoid Pirate is not compatible with Simple Pirate, because of the heartbeats.
In fact what we have here is a protocol that needs writing down. It's fun to experiment without specifications, but that's not a sensible basis for real applications. What happens if we want to write a worker in another language? Do we have to read code to see how things work? What if we want to change the protocol for some reason? The protocol may be simple but it's not obvious, and if it's successful it'll become more complex.
Lack of contracts is a sure sign of a disposable application. So, let's write a contract for this protocol. How do we do that?
* There's a wiki, at [http://rfc.zeromq.org rfc.zeromq.org], that we made especially as a home for public 0MQ contracts.
* To create a new specification, register, and follow the instructions. It's straight-forward, though technical writing is not for everyone.
It took me about fifteen minutes to draft the new [http://rfc.zeromq.org/spec:6 Pirate Pattern Protocol]. It's not a big specification but it does capture enough to act as the basis for arguments ("your queue isn't PPP compatible, please fix it!").
Turning PPP into a real protocol would take more work:
* There should be a protocol version number in the READY command so that it's possible to create new versions of PPP safely.
* Right now, READY and HEARTBEAT are not entirely distinct from requests and replies. To make them distinct, we would want a message structure that includes a "message type" part.
+++ Service-Oriented Reliable Queuing (Majordomo Pattern)
The nice thing about progress is how fast it happens when lawyers and committees aren't involved. Just a few sentences ago we were dreaming of a better protocol that would fix the world. And here we have it:
* http://rfc.zeromq.org/spec:7
This one-page specification takes PPP and turns it into something more solid. This is how we should design complex architectures: start by writing down the contracts, and only //then// write software to implement them.
The Majordomo Protocol (MDP) extends and improves PPP in one interesting way apart from the two points above. It adds a "service name" to requests that the client sends, and asks workers to register for specific services. The nice thing about MDP is that it came from working code, a simpler protocol, and a precise set of improvements. This made it easy to draft.
Adding service names is a small but significant change that turns our Paranoid Pirate queue into a service-oriented broker:
[[code type="textdiagram"]]
+-----------+ +-----------+ +-----------+
| | | | | |
| Client | | Client | | Client |
| | | | | |
\-----------/ \-----------/ \-----------/
^ ^ ^
| | |
\---------------+---------------/
"Give me coffee" | "Give me tea"
v
/-----------\
| |
| Broker |
| |
\-----------/
^
|
/---------------+---------------\
| | |
v v v
/-----------\ /-----------\ /-----------\
| "Water" | | "Tea" | | "Coffee" |
+-----------+ +-----------+ +-----------+
| | | | | |
| Worker | | Worker | | Worker |
| | | | | |
+-----------+ +-----------+ +-----------+
Figure # - Majordomo Pattern
[[/code]]
To implement Majordomo we need to write a framework for clients and workers. It's really not sane to ask every application developer to read the spec and make it work, when they could be using a simpler API built and tested just once.
So, while our first contract (MDP itself) defines how the pieces of our distributed architecture talk to each other, our second contract defines how user applications talk to the technical framework we're going to design.
Majordomo has two halves, a client side and a worker side. Since we'll write both client and worker applications, we will need two APIs. Here is a sketch for the client API, using a simple object-oriented approach. We write this in C, using the style of the [http://zfl.zeromq.org/page:read-the-manual ZFL library]:
[[code]]
mdcli_t *mdcli_new (char *broker);
void mdcli_destroy (mdcli_t **self_p);
zmsg_t *mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p);
[[/code]]
That's it. We open a session to the broker, we send a request message and get a reply message back, and we eventually close the connection. Here's a sketch for the worker API:
[[code]]
mdwrk_t *mdwrk_new (char *broker,char *service);
void mdwrk_destroy (mdwrk_t **self_p);
zmsg_t *mdwrk_recv (mdwrk_t *self, zmsg_t *reply);
[[/code]]
It's more or less symmetrical but the worker dialog is a little different. The first time a worker does a recv(), it passes a null reply, thereafter it passes the current reply, and gets a new request.
The client and worker APIs were fairly simple to construct, since they're heavily based on the Paranoid Pirate code we already developed. Here is the client API:
[[code type="example" title="Majordomo client API" name="mdcliapi"]]
[[/code]]
With an example test program that does 100K request-reply cycles:
[[code type="example" title="Majordomo client application" name="mdclient"]]
[[/code]]
And here is the worker API:
[[code type="example" title="Majordomo worker API" name="mdwrkapi"]]
[[/code]]
With an example test program that implements an 'echo' service:
[[code type="example" title="Majordomo worker application" name="mdworker"]]
[[/code]]
Notes on this code:
* The APIs are single threaded. This means, for example, that the worker won't send heartbeats in the background. Happily, this is exactly what we want: if the worker application gets stuck, heartbeats will stop and the broker will stop sending requests to the worker.
* The worker API doesn't do an exponential backoff, it's not worth the extra complexity.
* The APIs don't do any error reporting. If something isn't as expected, they raise an assertion (or exception depending on the language). This is ideal for a reference implementation, so any protocol errors show immediately. For real applications the API should be robust against invalid messages.
Let's design the Majordomo broker. Its core structure is a set of queues, one per service. We will create these queues as workers appear (we could delete them as workers disappear but forget that for now, it gets complex). Additionally, we keep a queue of workers per service.
To make the C examples easier to write and read, I've taken the hash and list container classes from the [http://zfl.zeromq.org ZFL project], and renamed them as [[https://github.com/imatix/zguide/blob/master/examples/C/zlist.h zlist] and [https://github.com/imatix/zguide/blob/master/examples/C/zhash.h zhash], as we did with zmsg. In any modern language you can of course use built-in containers.
And here is the broker:
[[code type="example" title="Majordomo Worker API" name="mdbroker"]]
[[/code]]
This is by far the most complex example we've seen. It's almost 500 lines of code. To write this, and make it fully robust took two days. However this is still a short piece of code for a full service-oriented broker.
Notes on this code:
* The Majordomo Protocol lets us handle both clients and workers on a single socket. This is nicer for those deploying and managing the broker: it just sits on one 0MQ endpoint rather than the two that most devices need.
* The broker implements all of MDP/0.1 properly (as far as I know), including disconnection if the broker sends invalid commands, heartbeating, and the rest.
* It can be extended to run multiple threads, each managing one socket and one set of clients and workers. This could be interesting for segmenting large architectures. The C code is already organized around a broker class to make this trivial.
* A primary-failover or live-live broker reliability model is easy, since the broker essentially has no state except service presence. It's up to clients and workers to choose another broker if their first choice isn't up and running.
* The examples use 5-second heartbeats, mainly to reduce the amount of output when you enable tracing. Realistic values would be lower for most LAN applications. However, any retry has to be slow enough to allow for a service to restart, say 10 seconds at least.
+++ Asynchronous Majordomo Pattern
The way we implemented Majordomo, above, is simple and stupid. The client is just the original Simple Pirate, wrapped up in a sexy API. When I fire up a client, broker, and worker on a test box, it can process 100,000 requests in about 14 seconds. That is partly due to the code, which cheerfully copies message frames around as if CPU cycles were free. But the real problem is that we're 'round-tripping'. 0MQ disables [http://en.wikipedia.org/wiki/Nagle's_algorithm], but round-tripping is still slow.
Theory is great in theory, but in practice, practice is better. Let's measure the cost of round-tripping with a simple test program. This sends a bunch of messages, first waiting for a reply to each message, and second as a batch, reading all the replies back as a batch. Both approaches do the same work, but they give very different results. We mockup a client, broker, and worker:
[[code type="example" title="Round-trip demonstrator" name="tripping"]]
[[/code]]
On my development box, this program says:
[[code]]
Setting up test...
Synchronous round-trip test...
9057 calls/second
Asynchronous round-trip test...
173010 calls/second
[[/code]]
Note that the client thread does a small pause before starting. This is to get around one of the 'features' of the ROUTER (XREP) socket: if you send a message with the address of a peer that's not yet connected, the message gets discarded. In this example we don't use the LRU mechanism, so without the sleep, if the worker thread is too slow to connect, it'll lose messages, making a mess of our test.
As we see, round-tripping in the simplest case is 20 times slower than "shove it down the pipe as fast as it'll go" asynchronous approach. Let's see if we can apply this to Majordomo.
First, let's modify the client API to have separate send and recv methods:
[[code]]
mdcli_t *mdcli_new (char *broker);
void mdcli_destroy (mdcli_t **self_p);
int mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p);
zmsg_t *mdcli_recv (mdcli_t *self);
[[/code]]
It's literally a few minutes' work to refactor the synchronous client API to become asynchronous:
[[code type="example" title="Majordomo asynchronous client API" name="mdcliapi2"]]
[[/code]]
And here's the corresponding client test program:
[[code type="example" title="Majordomo client application" name="mdclient2"]]
[[/code]]
The broker and worker are unchanged, since we've not modified the protocol at all. We see an immediate improvement in performance. Here's the synchronous client chugging through 100K request-reply cycles:
[[code]]
$ time mdclient
100000 requests/replies processed
real 0m14.088s
user 0m1.310s
sys 0m2.670s
[[/code]]
And here's the asynchronous client, with a single worker:
[[code]]
$ time mdclient2
100000 replies received
real 0m8.730s
user 0m0.920s
sys 0m1.550s
[[/code]]
Twice as fast. Not bad, but let's fire up 10 workers, and see how it handles:
[[code]]
$ time mdclient2
100000 replies received
real 0m3.863s
user 0m0.730s
sys 0m0.470s
[[/code]]
It isn't fully asynchronous since workers get their messages on a strict LRU basis. But it will scale better with more workers. On my fast test box, after eight or so workers it doesn't get any faster. Four cores only stretches so far. But we got a 4x improvement in throughput with just a few minutes' work. The broker is still unoptimized. It spends most of its time copying message frames around, instead of doing zero copy, which it could. But we're getting 25K reliable request/reply calls a second, with pretty low effort.
However the asynchronous Majordomo pattern isn't all roses. It has a fundamental weakness, namely that it cannot survive a broker crash without fmore work. If you look at the mdcliapi2 code you'll see it does not attempt to reconnect after a failure. A proper reconnect would require:
* That every request is numbered, and every reply has a matching number, which would ideally require a change to the protocol to enforce.
* That the client API tracks and holds onto all outstanding requests, i.e. for which no reply had yet been received.
* That in case of failover, the client API //resends// all outstanding requests to the broker.
It's not a deal breaker but it does show that performance often means complexity. Is this worth doing for Majordomo? It depends on your use case. For a name lookup service you call once per session, no. For a web front-end serving thousands of clients, probably yes.
+++ Service Discovery
So, we have a nice service-oriented broker, but we have no way of knowing whether a particular service is available or not. We know if a request failed, but we don't know why. It is useful to be able to ask the broker, "is the echo service running?" The most obvious way would be to modify our MDP/Client protocol to add commands to ask the broker, "is service X running?" But MDP/Client has the great charm of being simple. Adding service discovery to it would make it as complex as the MDP/Worker protocol.
An other option is to do what email does, and ask that undeliverable requests be returned. This can work well in an asynchronous world but it also adds complexity. We need ways to distinguish a returned requests from a replies, and to handle these properly.
Let's try to use what we've already built, building on top of MDP instead of modifying it. Service discovery is, itself, a service. It might indeed be one of several management services, such as "disable service X", "provide statistics", and so on. What we want is a general, extensible solution that doesn't affect the protocol nor existing applications.
So here's a small RFC - MMI, or the Majordomo Management Interface - that layers this on top of MDP: http://rfc.zeromq.org/spec:8. We already implemented it in the broker, though unless you read the whole thing you probably missed that. Here's how we use the service discovery in an application:
[[code type="example" title="Service discovery over Majordomo" name="mmiecho"]]
[[/code]]
The broker checks the service name, and handles any service starting with "mmi." itself, rather than passing the request on to a worker. Try this with and without a worker running, and you should see the little program report '200' or '404' accordingly. The implementation of MMI in our example broker is pretty weak. For example if a worker disappears, services remain "present". In practice a broker should remove services that have no workers after some configurable timeout.
+++ Idempotent Services
Idempotency is not something to take a pill for. What it means is that it's safe to repeat an operation. Checking the clock is idempotent. Lending ones credit card to ones wife is not. While many client-to-server use cases are idempotent, some are not. Examples of idempotent use cases include:
* Stateless task distribution, i.e. a pipeline where the servers are stateless workers that compute a reply based purely on the state provided by a request. In such a case it's safe (though inefficient) to execute the same request many times.
* A name service that translates logical addresses into endpoints to bind or connect to. In such a case it's safe to make the same lookup request many times.
And here are examples of a non-idempotent use cases:
* A logging service. One does not want the same log information recorded more than once.
* Any service that has impact on downstream nodes, e.g. sends on information to other nodes. If that service gets the same request more than once, downstream nodes will get duplicate information.
* Any service that modifies shared data in some non-idempotent way. E.g. a service that debits a bank account is definitely not idempotent.
When our server applications are not idempotent, we have to think more carefully about when exactly they might crash. If an application dies when it's idle, or while it's processing a request, that's usually fine. We can use database transactions to make sure a debit and a credit are always done together, if at all. If the server dies while sending its reply, that's a problem, because as far as its concerned, it's done its work.
if the network dies just as the reply is making its way back to the client, the same problem arises. The client will think the server died, will resend the request, and the server will do the same work twice. Which is not what we want.
We use the fairly standard solution of detecting and rejecting duplicate requests. This means:
* The client must stamp every request with a unique client identifier and a unique message number.
* The server, before sending back a reply, stores it using the client id + message number as a key.
* The server, when getting a request from a given client, first checks if it has a reply for that client id + message number. If so, it does not process the request but just resends the reply.
+++ Disconnected Reliability (Titanic Pattern)
Once you realize that Majordomo is a 'reliable' message broker, you might be tempted to add some spinning rust[[footnote]]I.e. ferrous-based hard disk platters.[[/footnote]]. After all, this works for all the enterprise messaging systems. It's such a tempting idea that it's a little sad to have to be negative. But that's one of my specialties. So, some reasons you don't want rust-based brokers sitting in the center of your architecture are:
* As you've seen, the Lazy Pirate client performs surprisingly well. It works across a whole range of architectures, from direct client-to-server to distributed queue devices. It does tend to assume that workers are stateless and idempotent. But we can work around that limitation without resorting to rust.
* Rust brings a whole set of problems, from slow performance to additional pieces to have to manage, repair, and create 6am panics as they inevitably break at the start of daily operations. The beauty of the Pirate patterns in general is their simplicity. They won't crash. And if you're still worried about the hardware, you can move to a peer-to-peer pattern that has no broker at all. I'll explain later in this chapter.
Having said this, however, there is one sane use case for rust-based reliability, which is an asynchronous deconnected network. It solves a major problem with Pirate, namely that a client has to wait for an answer in real time. If clients and workers are only sporadically connected (think of email as an analogy), we can't use a stateless network between clients and workers. We have to put state in the middle.
So, here's the Titanic pattern, in which we write messages to disk to ensure they never get lost, no matter how sporadically clients and workers are connected. As we did for service discovery, we're going to layer Titanic on top of Majordomo rather than extend MDP. It's wonderfully lazy because it means we can implement our fire-and-forget reliability in a specialized worker, rather than in the broker. This is excellent for several reasons:
* It's much, //much// easier.
* It lets us mix brokers written in one language with workers written in another.
* It lets us evolve the fire-and-forget technology independently.
The only downside is that there's an extra network hop between broker and hard disk. This is easily worth it.
There are many ways to make a persistent request-reply architecture. We'll aim for simple and painless. The simplest design I could come up with, after playing with this for a few hours, is Titanic as a "proxy service". That is, it doesn't affect workers at all. If a client wants a reply immediately, it talks directly to a service and hopes the service is available. If a client is happy to wait a while, it talks to Titanic instead and asks, "hey, buddy, would you take care of this for me while I go buy my groceries?"
[[code type="textdiagram"]]
+-----------+ +-----------+ +-----------+
| | | | | |
| Client | | Client | | Client |
| | | | | |
\-----------/ \-----------/ \-----------/
^ ^ ^
| | |
\---------------+---------------/
"Titanic, | "Titanic,
give me coffee" | give me tea"
v Disk
/-----------\ +---------+ +-------+
| | | | | |
| Broker |<--->| Titanic |<--->| {s} |
| | | | | |
\-----------/ +---------+ +-------+
^
|
/---------------+---------------\
| | |
v v v
/-----------\ /-----------\ /-----------\
| "Water" | | "Tea" | | "Coffee" |
+-----------+ +-----------+ +-----------+
| | | | | |
| Worker | | Worker | | Worker |
| | | | | |
+-----------+ +-----------+ +-----------+
Figure # - Titanic Pattern
[[/code]]
Titanic is thus both a worker, and a client. The dialog between client and Titanic goes along these lines:
* Client: please accept this request for me. Titanic: OK, done.
* Client: do you have a reply for me? Titanic: Yes, here it is. Or, no, not yet.
* Client: ok, you can wipe that request now, it's all happy. Titanic: OK, done.
Whereas the dialog between Titanic and broker and worker goes like this:
* Titanic: hey, broker, is there an echo service? Broker: uhm, yeah, seems like.
* Titanic: hey, echo, please handle this for me. Echo: sure, here you are.
* Titanic: sweeeeet!
You can work through this, and the possible failure scenarios. If a worker crashes while processing a request, Titanic retries, indefinitely. If a reply gets lost somewhere, Titanic will retry. If the request gets processed but the client doesn't get the reply, it will ask again. If Titanic crashes while processing a request, or a reply, the client will try again. As long as requests are fully committed to safe storage, work can't get lost.
The handshaking is pedantic, but can be pipelined, i.e. clients can use the asynchronous Majordomo pattern to do a lot of work and then get the responses later.
We need some way for a client to request //its// replies. We'll have many clients asking for the same services, and clients disappear and reappear with different identities. So here is a simple, reasonably secure solution:
* Every request generates a universally unique ID (UUID), which Titanic returns to the client when it's queued the request.
* When a client asks for a reply, it must specify the UUID for the original request.
This puts some onus on the client to store its request UUIDs safely, but it removes any need for authentication. What alternatives are there? We could use durable sockets, i.e. explicit client identities. That creates a management issue when we have many clients, and opens the door for the inevitable errors caused by two clients using the same identity.
Before we jump off and write yet another formal specification (fun, fun!) let's consider how the client talks to Titanic. One way is to use a single service and send it three different request types. Another way, which seems simpler, is to use three services:
* **titanic.request** - store a request message, return a UUID for the request.
* **titanic.reply** - fetch a reply, if available, for a given request UUID.
* **titanic.close** - confirm that a reply has been stored and processed.
We'll just make a multithreaded worker, which as we've seen from our multithreading experience with 0MQ, is trivial. However before jumping into code let's sketch down what Titanic would look like in terms of 0MQ messages and frames: http://rfc.zeromq.org/spec:9. This is the "Titanic Service Protocol", or TSP.
Using TSP is clearly more work for client applications than accessing a service directly via MDP. Here's the shortest robust 'echo' client example:
[[code type="example" title="Titanic client example" name="ticlient"]]
[[/code]]
Of course this can and in practice would be wrapped up in some kind of framework. Real application developers should never see messaging up close, it's a tool for more technically-minded experts to build frameworks and APIs. If we had infinite time to explore this, I'd make a TSP API example, and bring the client application back down to a few lines of code. But it's the same principle as we saw for MDP, no need to be repetitive.
Here's the Titanic implementation. This server handles the three services using three threads, as proposed. It does full persistence to disk using the most brute-force approach possible: one file per message. It's so simple it's scary, the only complex part is that it keeps a separate 'queue' of all requests to avoid reading the directory over and over:
[[code type="example" title="Titanic client example" name="titanic"]]
[[/code]]
Some notes about this code:
* We use MMI to only send requests to services that appear to be running. This works as well as the MMI implementation in the broker.
* We use an inproc connection to send new request data from the **titanic.request** service through to the main dispatcher. This saves the dispatcher from having to scan the disk directory, load all request files, and sort them by date/time.
The important thing about this example is not performance (which is surely terrible, I've not tested it), but how well it implements the reliability contract. To try it, start the mdbroker and titanic programs. Then start the ticlient, and then start the mdworker echo service. You can run all four of these using the '-v' option to do verbose tracing of activity. You can stop and restart any piece //except// the client and nothing will get lost.
If you want to use Titanic in real cases, you'll rapidly be asking "how do we make this faster?" Here's what I'd do, starting with the example implementation:
* Use a single disk file for all data, rather than multiple files. Operating systems are usually better at handling a few large files than many smaller ones.
* Organize that disk file as a circular buffer so that new requests can be written contiguously (with very occasional wraparound). One thread, writing full speed to a disk file can work rapidly.
* Keep the index in memory and rebuild the index at startup time, from the disk buffer. This saves the extra disk head flutter needed to keep the index fully safe on disk. You would want an fsync after every message, or every N milliseconds if you were prepared to lose the last M messages in case of a system failure.
* Use a solid-state drive rather than spinning iron oxide platters.
* Preallocate the entire file, or allocate in large chunks allowing the circular buffer to grow and shrink as needed. This avoids fragmentation and ensures most reads and writes are contiguous.
And so on. What I'd not recommend is storing messages in a database, not even a 'fast' key/value store, unless you really like a specific database and don't have performance worries. You will pay a steep price for the abstraction, 10 to 1000x over a raw disk file.
If you want to make Titanic //even more reliable//, you can do this by duplicating requests to a second server, which you'd place in a second location just far enough to survive nuclear attack on your primary location, yet not so far that you get too much latency.
If you want to make Titanic //much faster and less reliable//, you can store requests and replies purely in memory. This will give you the functionality of a disconnected network, but it won't survive a crash of the Titanic server itself.
+++ Brokerless Reliability (Freelance Pattern)
It might seem ironic to focus so much on broker-based reliability, when we often explain 0MQ as "brokerless messaging". However in messaging, as in real life, the middleman is both a burden and a benefit. In practice, most messaging architectures benefit from a mix of distributed and brokered messaging. You get the best results when you can decide freely what tradeoffs you want to make. This is why I can drive 10km to a wholesaler to buy five cases of wine for a party, but I can also walk 10 minutes to a corner store to buy one bottle for a dinner. Our highly context-sensitive relative valuations of time, energy, and cost are essential to the real world economy. And they are essential to an optimal message based architecture.
Which is why 0MQ does not //impose// a broker-centric architecture, though it gives you the tools to build brokers, aka "devices", and we've built a dozen or so different ones so far, just for practice.
So we'll end this chapter by deconstructing the broker-based reliability we've built so far, and turning it back into a distributed peer-to-peer architecture I call the Freelance pattern. Our use case will be a name resolution service. This is a common problem with 0MQ architectures: how do we know the endpoint to connect to? Hard-coding TCP/IP addresses in code is insanely fragile. Using configuration files creates an administration nightmare. Imagine if you had to hand-configure your web browser, on every PC or mobile phone you used, to realize that "google.com" was "74.125.230.82".
A 0MQ name service (and we'll make a simple implementation) has to:
* Resolve a logical name into at least a bind endpoint, and a connect endpoint. A realistic name service would provide multiple bind endpoints, and possibly multiple connect endpoints too.
* Allow us to manage multiple parallel environments, e.g. "test" vs. "production" without modifying code.
* Be reliable, because if it is unavailable, applications won't be able to connect to the network.
Putting a name service behind a service-oriented Majordomo broker is clever from some points of view. However it's simpler and much less surprising to just expose the name service as a server that clients can connect to directly. If we do this right, the name service becomes the //only// global network endpoint we need to hard-code in our code or config files.
The types of failure we aim to handle are service crashes and restarts, service busy looping, service overload, and network issues. To get reliability, we'll create a pool of name servers so if one crashes or goes away, clients can connect to another, and so on. In practice, two would be enough. But for the example, we'll assume the pool can be any size:
[[code type="textdiagram"]]
+-----------+ +-----------+ +-----------+
| | | | | |
| Client | | Client | | Client |
| | | | | |
\-----------/ \-----------/ \-----------/
connect connect connect
| | |
| | |
+---------------+---------------+
| | |
| | |
bind bind bind
/-----------\ /-----------\ /-----------\
| | | | | |
| Service | | Service | | Service |
| | | | | |
+-----------+ +-----------+ +-----------+
Figure # - The Freelance Pattern
[[/code]]
In this architecture a large set of clients connect to a small set of services directly. The services bind to their respective addresses. It's fundamentally different from a broker-based approach like Majordomo, where workers connect to the broker. For clients, there are a couple of options:
* Clients could use REQ sockets and the Lazy Pirate pattern. Easy, but would need some additional intelligence to not stupidly reconnect to dead services over and over.
* Clients could use DEALER (XREQ) sockets and blast out requests (which will be load balanced to all connected services) until they get a reply. Brutal, but not elegant.
* Clients could use ROUTER (XREP) sockets so they can address specific services. But how does the client know the identity of the service sockets? Either the service has to ping the client first (complex), or the each service has to use a hard-coded, fixed identity known to the client (nasty).
++++ Model 1 - Simple Retry and Failover
So our menu appears to offer: simple, brutal, complex, or nasty. Let's start with 'simple' and then work out the kinks. We take Lazy Pirate and rewrite it to work with multiple service endpoints. Start the server first, specifying a bind endpoint as argument. Run one or several servers:
[[code type="example" title="Freelance server, Model 1" name="flserver1"]]
[[/code]]
Then start the client, specifying one or more connect endpoints as arguments:
[[code type="example" title="Freelance client, Model 1" name="flclient1"]]
[[/code]]
While the basic approach is Lazy Pirate, the client aims to just get one successful reply. It has two techniques, depending on whether you are running a single service, or multiple services:
* With a single service, the client will retry several times, exactly as for Lazy Pirate.
* With multiple services, the client will try each service at most once, until it's received a reply, or has tried all services.
This solves the main weakness of Lazy Pirate, namely that it could not do failover to backup / alternate servers.
However this design won't work well in a real application. If we're connecting many sockets, and our primary name server is down, we're going to do this painful timeout each time.
++++ Model 2 - Brutal Shotgun Massacre
Let's switch our client to using a DEALER socket. Our goal here is to make sure we get a reply back within the shortest possible time, no matter whether the primary server is down or not. Our client takes this approach:
* We set things up, connecting to all services.
* When we have a request, we blast it out as many times as we have services.
* We wait for the first reply, and take that.
* We ignore any other replies.
What will happen in practice is that when all services are running, 0MQ will distribute the requests so each service gets one request, and sends one reply. When any service is offline, and disconnected, 0MQ will distribute the requests to the remaining services. So a service may in some cases get the same request more than once.
What's more annoying for the client is that we'll get multiple replies back, but there's no guarantee we'll get a precise number of replies. Requests and replies can get lost (e.g. if the service crashes while processing a request).
So, we have to number requests, and ignore any replies that don't match the request number. Our Model 1 server will work, since it's an echo server, but coincidence is not a great basis for understanding. So we'll make a Model 2 server that chews up the message, returns a correctly-numbered reply with the content "OK". We'll use messages consisting of two parts, a sequence number and a body.
Start the server once or more:
[[code type="example" title="Freelance server, Model 2" name="flserver2"]]
[[/code]]
Then start the client, specifying the connect endpoints as arguments:
[[code type="example" title="Freelance client, Model 2" name="flclient2"]]
[[/code]]
Some notes on this code:
* The client is structured as a nice little class-based API that hides the dirty work of creating 0MQ contexts and sockets, and talking to the server. If a shotgun blast to the midriff can be called "talking".
* The client will abandon the chase if it can't find //any// responsive service within a few seconds.
* The client has to create a valid REP envelope, i.e. add an empty message part to the front of the message.
The client does 10,000 name resolution requests (fake ones, since our server does essentially nothing), and measures the average cost. On my test box, talking to one server, it's about 60 usec. Talking to three servers, it's about 80 usec.
So pros and cons of our shotgun approach:
* Pro: it is simple, easy to make and easy to understand.
* Pro: it does the job of failover, and works rapidly, so long as there is at least one service running.
* Con: it creates redundant network traffic.
* Con: we can't prioritize our servers, i.e. Primary, then Secondary.
* Con: the server can do at most one request at a time, period.
++++ Model 3 - Complex and Nasty
The shotgun approach seems too good to be true. Let's be scientific and try the complex/nasty option, even if it's only to finally realize that we preferred brutal.
We can solve the main problems of the client by switching to a ROUTER (XREP) socket. That lets us send requests to specific services, avoid services we know are dead, and thus act as intelligently as we'd expect. We can also solve the main problem of the server (single threadedness) by switching to a ROUTER socket.
But doing ROUTER-to-ROUTER between two transient sockets is not possible. Both sides generate an identity (at their own end of the conversation) only when they receive a first message, and thus neither can talk to the other until it has first received a message. The only way out of this conundrum is to cheat, and use hard-coded identities in one direction. The proper way to cheat, in a client server case, is that the client 'knows' the identity of the server. Vice-versa would be insane, on top of complex and nasty. Great attributes for a genocidal dictator, terrible ones for software.
For our example we'll use the connection endpoint as identity. This is at least a unique string both sides can agree on without more prior knowledge than they already have for the shotgun model. It's sneaky.
Remember how 0MQ identities work. The server sets an identity before it binds its socket. When a client connects, they exchange identities, before either side sends a message. The client, having not set an identity, sends a null identity to the server. The server generates a random UUID for the client. The server sends its identity (which we've agreed is going to be an endpoint string) to the client.
This means our client can route a message to the server (i.e. send on its ROUTER socket, specifying the server endpoint as identity) as soon as the connection is established. That's not //immediately// after doing a zmq_connect, but some random time thereafter. Herein lies one problem: we don't know when the server will actually be available and complete its connection handshake. It could be after 1 millisecond. It could be after one hour, if the sysadmin was out for lunch and forgot to start the server process.
We want to send requests only to servers that we know are probably on-line. But in the Freelance pattern, unlike broker-based patterns, servers are silent until they get a request. Thus we can't talk to a server until it's told us it's on-line, which it can't do until we've asked it.
There are probably several solutions to this paradox. The one I'll demonstrate uses the shotgun approach to build a list of "known active servers" that may be used for real work. Basically, we fire blanks at random and when something reacts, we mark it as "alive".
- control endpoints not unique, won't allow multiple instances in an app.
--- can client initiate conversation with server?
... server binds, sets identity
... client connects, sends message to identity
- client maintains list of connected servers
- client maintains list of known servers
- client keep servers in a list
- client connect to all servers
- server replies, no heartbeating
- server status:
- active
- dead (retry again in a while)
Model 3 of the server is just slightly different:
[[code type="example" title="Freelance server, Model 3" name="flserver3"]]
[[/code]]
Here's the client:
[[code type="example" title="Freelance client, Model 3" name="flclient3"]]
[[/code]]
Some notes on this code:
.end