forked from booksbyus/zguide
-
Notifications
You must be signed in to change notification settings - Fork 0
/
chapter5.txt
104 lines (60 loc) · 11.8 KB
/
chapter5.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
++ Chapter Five - Advanced Publish-Subscribe
- extreme high speed subscribers
- frontend TCP
- workers, callback on inproc
- segment your feeds
- one NIC per slice
- one socket per slice
- one thread per slice
- large box for subscribers
++ Reference Material
* The //Suicidal Snail// pattern: how to handle slow clients.
* The //Clone// pattern: reliable publish-subscribe using distributed key-value tables.
+++ Slow Subscriber Detection (Suicidal Snail Pattern)
A common problem you will hit when using the pubsub pattern in real life is the slow subscriber. In an ideal world, we stream data at full speed from publishers to subscribers. In reality, subscriber applications are often written in interpreted languages, or just do a lot of work, or are just badly written, to the extent that they can't keep up with publishers.
How do we handle a slow subscriber? The ideal fix is to make the subscriber faster, but that might take work and time. Some of the classic strategies for handling a slow subscriber are:
* **Queue messages on the publisher**. This is what Gmail does when I don't read my email for a couple of hours. But in high-volume messaging, pushing queues upstream has the thrilling but unprofitable result of making publishers run out of memory and crash. Especially if there are lots of subscribers and it's not possible to flush to disk for performance reasons.
* **Queue messages on the subscriber**. This is much better, and it's what 0MQ does by default if the network can keep up with things. If anyone's going to run out of memory and crash, it'll be the subscriber rather than the publisher, which is fair. This is perfect for "peaky" streams where a subscriber can't keep up for a while, but can catch up when the stream slows down. However it's no answer to a subscriber that's simply too slow in general.
* **Stop queuing new messages after a while**. This is what Gmail does when my mailbox overflows its 7.554GB, no 7.555GB of space. New messages just get rejected or dropped. This is a great strategy from the perspective of the publisher, and it's what 0MQ does when the publisher sets a high water mark or HWM. However it still doesn't help us fix the slow subscriber. Now we just get gaps in our message stream.
* **Punish slow subscribers with disconnect**. This is what Hotmail does when I don't login for two weeks, which is why I'm on my fifteenth Hotmail account. It's a nice brutal strategy that forces subscribers to sit up and pay attention, and would be ideal, but 0MQ doesn't do this, and there's no way to layer it on top since subscribers are invisible to publisher applications.
None of these classic strategies fit. So we need to get creative. Rather than disconnect the publisher, let's convince the subscriber to kill itself. This is the Suicidal Snail pattern. When a subscriber detects that it's running too slowly (where "too slowly" is presumably a configured option that really means "so slowly that if you ever get here, shout really loudly because I need to know, so I can fix this!"), it croaks and dies.
How can a subscriber detect this? One way would be to sequence messages (number them in order), and use a HWM at the publisher. Now, if the subscriber detects a gap (i.e. the numbering isn't consecutive), it knows something is wrong. We then tune the HWM to the "croak and die if you hit this" level.
There are two problems with this solution. One, if we have many publishers, how do we sequence messages? The solution is to give each publisher a unique ID and add that to the sequencing. Second, if subscribers use ZMQ_SUBSCRIBE filters, they will get gaps by definition. Our precious sequencing will be for nothing.
Some use cases won't use filters, and sequencing will work for them. But a more general solution is that the publisher timestamps each message. When a subscriber gets a message it checks the time, and if the difference is more than, say, one second, it does the "croak and die" thing. Possibly firing off a squawk to some operator console first.
The Suicide Snail pattern works especially when subscribers have their own clients and service-level agreements and need to guarantee certain maximum latencies. Aborting a subscriber may not seem like a constructive way to guarantee a maximum latency, but it's the assertion model. Abort today, and the problem will be fixed. Allow late data to flow downstream, and the problem may cause wider damage and take longer to appear on the radar.
So here is a minimal example of a Suicidal Snail:
[[code type="example" title="Suicidal Snail" name="suisnail"]]
[[/code]]
Notes about this example:
* The message here consists simply of the current system clock as a number of milliseconds. In a realistic application you'd have at least a message header with the timestamp, and a message body with data.
* The example has subscriber and publisher in a single process, as two threads. In reality they would be separate processes. Using threads is just convenient for the demonstration.
+++ Reliable Publish-Subscribe (Clone Pattern)
Pubsub is like a radio broadcast, you miss everything before you join, and then how much information you get depends on the quality of your reception. It's so easy to lose messages with this pattern that you might wonder why 0MQ bothers to implement it at all.[[footnote]]If you're German or Norwegian, that is what we call 'humor'. There are many cases where simplicity and speed are more important than pedantic delivery. In fact the radio broadcast covers perhaps the majority of information distribution in the real world. Think of Facebook and Twitter. No, I'm not still joking.[[/footnote]]
However, reliable pubsub is also a useful tool. Let's do as before and define what that 'reliability' means in terms of what can go wrong.
Happens all the time:
* Subscribers join late, so miss messages the server already sent.
* Subscriber connections take a non-zero time, and can lose messages during that time.
Happens exceptionally:
* Subscribers can crash, and restart, and lose whatever data they already received.
* Subscribers can fetch messages too slowly, so queues build up and then overflow.
* Networks can become overloaded and drop data (specifically, for PGM).
* Networks can become too slow, so publisher-side queues overflow.
A lot more can go wrong but these are the typical failures we see in a realistic system. The difficulty in defining 'reliability' now is that we have no idea, at the messaging level, what the application actually does with its data. So we need a generic model that we can implement once, and then use for a wide range of applications.
What we'll design is a simple *shared key-value cache* that stores a set of blobs indexed by unique keys. Don't confuse this with *distributed hash tables*, which solve the wider problem of connecting peers in a distributed network, or with *distributed key-value tables*, which act like non-SQL databases. All we will build is a system that reliably clones some in-memory state from a server to a set of clients. We want to:
* Let a client join the network at any time, and reliably get the current server state.
* Let any client update the key-value cache (inserting new key-value pairs, updating existing ones, or deleting them).
* Reliably propagates changes to all clients, and does this with minimum latency overhead.
* Handle very large numbers of clients, e.g. tens of thousands or more.
The key aspect of the Clone pattern is that clients talk back to servers, which is more than we do in a simple pub-sub dialog. This is why I use the terms 'server' and 'client' instead of 'publisher' and 'subscriber'. We'll use pubsub as part of the Clone pattern but it is more than that.
When a client joins the network, it subscribes a SUB socket, as we'd expect, to the data stream coming from the server (the publisher). This goes across some pub-sub topology (a multicast bus, perhaps, or a tree of forwarder devices, or direct client-to-server connections).
At some undetermined point, it will start getting messages from the server. Note that we can't predict what the client will receive as its first message. If a zmq_connect[3] call takes 10msec, and in that time the server has sent 100 messages, the client might get messages starting from the 100th message.
Let's define a message as a key-value pair. The semantics are simple: if the value is provided, it's an insert or update operation. If there is no value, it's a delete operation. The key provides the subscription filter, so clients can treat the cache as a tree, and select whatever branches of the tree they want to hold.
The client now connects to the server using a different socket (a REQ socket) and asks for a snapshot of the cache. It tells the server two things: which message it received (which means the server has to number messages), and which branch or branches of the cache it wants. To keep things simple we'll assume that any client has exactly one server that it talks to, and gets its cache from. The server *must* be running; we do not try to solve the question of what happens if the server crashes (that's left as an exercise for you to hurt your brain over).
The server builds a snapshot and sends that to the client's REQ socket. This can take some time, especially if the cache is large. The client continues to receive updates from the server on its SUB socket, which it queues but does not process. We'll assume these updates fit into memory. At some point it gets the snapshot on its REQ socket. It then applies the updates to that snapshot, which gives it a working cache.
You'll perhaps see one difficulty here. If the client asks for a snapshot based on message 100, how does the server provide this? After all, it may have sent out lots of updates in the meantime. We solve this by cheating gracefully. The server just sends its current snapshot, but tells the client what its latest message number is. Say that's 200. The client gets the snapshot, and in its queue, it has messages 100 to 300. It throws out 100 to 200, and starts applying 201 to 300 to the snapshot.
Once the client has happily gotten its cache, it disconnects from the server (destroys that REQ socket), which is not used for anything more.
How does Clone handle updates from clients? There are several options but the simplest seems to be that each client acts as a publisher back to the server, which subscribes. In a TCP network this will mean persistent connections between clients and servers. In a PGM network this will mean using a shared multicast bus that clients write to, and the server listens to.
So the client, at startup, opens a PUB socket and part of its initial request to the server includes the address of that socket, so the server can open a SUB socket and connect back to it.
Why don't we allow clients to publish updates directly to other clients? While this would reduce latency, it makes it impossible to sequence messages. Updates *must* pass through the server to make sense to other clients. There's a more subtle second reason. In many applications it's important that updates have a single order, across many clients. Forcing all updates through the server ensures that they have the same order when they finally get to clients.
With unique sequencing, clients can detect the nastier failures - network congestion and queue overflow. If a client discovers that its incoming message stream has a hole, it can take action. It seems sensible that the client contact the server and ask for the missing messages, but in practice that isn't useful. If there are holes, adding more stress to the network will make things worse. All the client can really do is warn its users "Unable to continue", and stop, and not restart until someone has manually checked the cause of the problem.
Clone is complex enough in practice that you don't want to implement it directly in your applications. Instead, it makes a good basis for an application server framework, which talks to applications via the key-value table.