Browse files

documentation updates:

* add upgrade notes to changelog
* add topic/channel info to design doc (and slides link)
* add document describing basic patterns from blog post
* update protocol doc with missing commands MPUB and IDENTIFY
  • Loading branch information...
mreiferson committed Dec 22, 2012
1 parent 1bed80b commit f6720646adf37d974b10ce8ae6a554d51d087631
@@ -3,7 +3,16 @@
## NSQ Binaries
* 0.2.16 - rc.1
- * #90/#108 performance optimizations / IDENTIFY protocol support in nsqd
+ **Upgrading from 0.2.15**: there are no backward incompatible changes in this release.
+ However, this release introduces the `IDENTIFY` command (which supersedes sending
+ metadata along with `SUB`) for clients of `nsqd`. The old functionality will be
+ removed in a future release.
+ * #90/#108 performance optimizations / IDENTIFY protocol support in nsqd. For
+ a single consumer of small messages (< 4k) increases throughput ~400% and
+ reduces # of allocations ~30%.
* #105 nsq_to_file --filename-format parameter
* #103 nsq_to_http handler logging
* #102 compatibility with Go tip
@@ -42,6 +51,22 @@
## NSQ Go Client Library
* 0.2.5 - rc.1
+ **Upgrading from 0.2.4**: There are no backward incompatible changes to applications
+ written against the public `nsq.Reader` API.
+ However, there *are* a few backward incompatible changes to the API for applications that
+ directly use other public methods, or properties of a few NSQ data types:
+ `nsq.Message` IDs are now a type `nsq.MessageID` (a `[16]byte` array). The signatures of
+ `nsq.Finish()` and `nsq.Requeue()` reflect this change.
+ `nsq.SendCommand()` and `nsq.Frame()` were removed in favor of `nsq.SendFramedResponse()`.
+ `nsq.Subscribe()` no longer accepts `shortId` and `longId`. If upgrading your consumers
+ before upgrading your `nsqd` binaries to `0.2.16-rc.1` they will not be able to send the
+ optional custom identifiers.
* #90 performance optimizations
* #81 reader performance improvements / MPUB support
* 0.2.4 - 2012-10-15
@@ -1,5 +1,7 @@
## NSQ Design
+NOTE: for accompanying visual illustration see this [slide deck][slides].
### Simplifying Configuration and Administration
A single `nsqd` instance is designed to handle multiple streams of data at once. Streams are called
@@ -15,6 +17,10 @@ client. For example:
![nsqd clients](
+To summarize, messages are multicast from topic -> channel (every channel receives a copy of all
+messages for that topic) but evenly distributed from channel -> consumers (each consumer receives a
+portion of the messages for that channel).
**NSQ** also includes a helper application, `nsqlookupd`, which provides a directory service where
consumers can lookup the addresses of `nsqd` instances that provide the topics they are interested
in subscribing to. In terms of configuration, this decouples the consumers from the producers (they
@@ -189,3 +195,4 @@ interfaces, and iteratively build functionality.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,197 @@
+This document describes some NSQ patterns that solve a variety of common
+DISCLAIMER: there are *some* obvious technology suggestions but this document
+generally ignores the deeply personal details of choosing proper tools, getting
+software installed on production machines, managing what service is running
+where, service configuration, and managing running processes (daemontools,
+supervisord, init.d, etc.).
+### Metrics Collection
+Regardless of the type of web service you're building, in most cases you're
+going to want to collect some form of metrics in order to understand your
+infrastructure, your users, or your business.
+For a web service, most often these metrics are produced by events that happen
+via HTTP requests, like an API. The naive approach would be to structure
+this synchronously, writing to your metrics system directly in the API request
+![naive approach](
+ * What happens when your metrics system goes down?
+ * Do your API requests hang and/or fail?
+ * How will you handle the scaling challenge of increasing API request volume
+ or breadth of metrics collection?
+One way to resolve all of these issues is to somehow perform the work of
+writing into your metrics system asynchronously - that is, place the data in
+some sort of local queue and write into your downstream system via some other
+process (consuming that queue). This separation of concerns allows the system
+to be more robust and fault tolerant. At bitly, we use NSQ to achieve this.
+Brief tangent: NSQ has the concept of **topics** and **channels**. Basically,
+think of a **topic** as a unique stream of messages (like our stream of API
+events above). Think of a **channel** as a *copy* of that stream of messages
+for a given set of consumers. Topics and channels are both independent queues,
+too. These properties enable NSQ to support both multicast (a topic *copying*
+each message to N channels) and distributed (a channel *equally dividing* its
+messages among N consumers) message delivery.
+For a more thorough treatment of these concepts, read through the [design
+doc][design_doc] and [slides from our Golang NYC talk][golang_slides],
+specifically [slides 19 through 33][channel_slides] describe topics and
+channels in detail.
+![architecture with NSQ](
+Integrating NSQ is straightforward, let's take the simple case:
+ 1. Run an instance of `nsqd` on the same host that runs your API application.
+ 2. Update your API application to write to the local `nsqd` instance to
+ queue events, instead of directly into the metrics system. To be able
+ to easily introspect and manipulate the stream, we generally format this
+ type of data in line-oriented JSON. Writing into `nsqd` can be as simple
+ as performing an HTTP POST request to the `/put` endpoint.
+ 3. Create a consumer in your preferred language using one of our
+ [client libraries][client_libs]. This "worker" will subscribe to the
+ stream of data and process the events, writing into your metrics system.
+ It can also run locally on the host running both your API application
+ and `nsqd`.
+Here's an example worker written with our official [Python client
+<a class="gist" href="">Python Example Code</a>
+In addition to de-coupling, by using one of our official client libraries,
+consumers will degrade gracefully when message processing fails. Our libraries
+have two key features that help with this:
+ 1. **Retries** - when your message handler indicates failure, that information is
+ sent to `nsqd` in the form of a `REQ` (re-queue) command. Also, `nsqd` will
+ *automatically* time out (and re-queue) a message if it hasn't been responded
+ to in a configurable time window. These two properties are critical to
+ providing a delivery guarantee.
+ 2. **Exponential Backoff** - when message processing fails the reader library will
+ delay the receipt of additional messages for a duration that scales
+ exponentially based on the # of consecutive failures. The opposite sequence
+ happens when a reader is in a backoff state and begins to process
+ successfully, until 0.
+In concert, these two features allow the system to respond gracefully to
+downstream failure, automagically.
+### Persistence
+Ok, great, now you have the ability to withstand a situation where your
+metrics system is unavailable with no data loss and no degraded API service to
+other endpoints. You also have the ability to scale the processing of this
+stream horizontally by adding more worker instances to consume from the same
+But, it's kinda hard ahead of time to think of all the types of metrics you
+might want to collect for a given API event.
+Wouldn't it be nice to have an archived log of this data stream for any future
+operation to leverage? Logs tend to be relatively easy to redundantly backup,
+making it a "plan z" of sorts in the event of catastrophic downstream data
+loss. But, would you want this same consumer to also have the responsibility
+of archiving the message data? Probably not, because of that whole "separation
+of concerns" thing.
+Archiving an NSQ topic is such a common pattern that we built a
+utility, [nsq_to_file][nsq_to_file], packaged with NSQ, that does
+exactly what you need.
+Remember, in NSQ, each channel of a topic is independent and receives a
+*copy* of all the messages. You can use this to your advantage when archiving
+the stream by doing so over a new channel, `archive`. Practically, this means
+that if your metrics system is having issues and the `metrics` channel gets
+backed up, it won't effect the separate `archive` channel you'll be using to
+persist messages to disk.
+So, add an instance of `nsq_to_file` to the same host and use a command line
+like the following:
+/usr/local/bin/nsq_to_file --nsqd-tcp-address= --topic=api_requests --channel=archive
+![archiving the stream](
+### Distributed Systems
+You'll notice that the system has not yet evolved beyond a single production
+host, which is a glaring single point of failure.
+Unfortunately, building a distributed system [is hard][dist_link].
+Fortunately, NSQ can help. The following changes demonstrate how
+NSQ alleviates some of the pain points of building distributed systems
+as well as how its design helps achieve high availability and fault tolerance.
+Let's assume for a second that this event stream is *really* important. You
+want to be able to tolerate host failures and continue to ensure that messages
+are *at least* archived, so you add another host.
+![adding a second host](
+Assuming you have some sort of load balancer in front of these two hosts you
+can now tolerate any single host failure.
+Now, let's say the process of persisting, compressing, and transferring these
+logs is affecting performance. How about splitting that responsibility off to
+a tier of hosts that have higher IO capacity?
+![separate archive hosts](
+This topology and configuration can easily scale to double-digit hosts, but
+you're still managing configuration of these services manually, which does not
+scale. Specifically, in each consumer, this setup is hard-coding the address
+of where `nsqd` instances live, which is a pain. What you really want is for
+the configuration to evolve and be accessed at runtime based on the state of
+the NSQ cluster. This is *exactly* what we built `nsqlookupd` to
+`nsqlookupd` is a daemon that records and disseminates the state of an NSQ
+cluster at runtime. `nsqd` instances maintain persistent TCP connections to
+`nsqlookupd` and push state changes across the wire. Specifically, an `nsqd`
+registers itself as a producer for a given topic as well as all channels it
+knows about. This allows consumers to query an `nsqlookupd` to determine who
+the producers are for a topic of interest, rather than hard-coding that
+configuration. Over time, they will *learn* about the existence of new
+producers and be able to route around failures.
+The only changes you need to make are to point your existing `nsqd` and
+consumer instances at `nsqlookupd` (everyone explicitly knows where
+`nsqlookupd` instances are but consumers don't explicitly know where
+producers are, and vice versa). The topology now looks like this:
+![adding nsqlookupd](
+At first glance this may look *more* complicated. It's deceptive though, as
+the effect this has on a growing infrastructure is hard to communicate
+visually. You've effectively decoupled producers from consumers because
+`nsqlookupd` is now acting as a directory service in between. Adding
+additional downstream services that depend on a given stream is trivial, just
+specify the topic you're interested in (producers will be discovered by
+querying `nsqlookupd`).
+But what about availability and consistency of the lookup data? We generally
+recommend basing your decision on how many to run in congruence with your
+desired availability requirements. `nsqlookupd` is not resource intensive and
+can be easily homed with other services. Also, `nsqlookupd` instances do not
+need to coordinate or otherwise be consistent with each other. Consumers will
+generally only require one `nsqlookupd` to be available with the information
+they need (and they will union the responses from all of the `nsqlookupd`
+instances they know about). Operationally, this makes it easy to migrate to a
+new set of `nsqlookupd`.
@@ -18,7 +18,9 @@ For `nsqd`, there is currently only one protocol:
### V2 Protocol
-After authenticating, in order to begin consuming messages, a client must `SUB` to a channel.
+After authenticating, the client can optionally send an `IDENTIFY` command to provide custom
+metadata (like, for example, more descriptive identifiers). In order to begin consuming messages, a
+client must `SUB` to a channel.
Upon subscribing the client is placed in a `RDY` state of 0. This means that no messages
will be sent to the client. When a client is ready to receive messages it should send a command that
@@ -32,14 +34,29 @@ seconds, `nsqd` will timeout and forcefully close a client connection that it ha
Commands are line oriented and structured as follows:
+ * `IDENTIFY` - update client metadata on the server
+ [ 4-byte size in bytes ][ N-byte JSON data ]
+ NOTE: this command takes a size prefixed JSON body, relevant fields:
+ <short_id> - an identifier used as a short-form descriptor (ie. short hostname)
+ <long_id> - an identifier used as a long-form descriptor (ie. fully-qualified hostname)
+ NOTE: there is no success response
+ Error Responses:
* `SUB` - subscribe to a specified topic/channel
- SUB <topic_name> <channel_name> <short_id> <long_id>\n
+ SUB <topic_name> <channel_name>\n
<topic_name> - a valid string
<channel_name> - a valid string (optionally having #ephemeral suffix)
- <short_id> - an identifier used as a short-form descriptor (ie. short hostname)
- <long_id> - an identifier used as a long-form descriptor (ie. fully-qualified hostname)
NOTE: there is no success response
@@ -67,6 +84,25 @@ Commands are line oriented and structured as follows:
+ * `MPUB` - publish multiple messages to a specified **topic**:
+ MPUB <topic_name> <num_messages>\n
+ [ 4-byte size in bytes ][ N-byte binary data ]... (repeated <num_messages> times)
+ <topic_name> - a valid string
+ <num_messages> - a string representation of integer N
+ Success Response:
+ OK
+ Error Responses:
* `RDY` - update `RDY` state (indicate you are ready to receive messages)
RDY <count>\n
@@ -513,9 +513,6 @@ func (c *Channel) inFlightWorker() {
// generic loop (executed in a goroutine) that periodically wakes up to walk
// the priority queue and call the callback
-// if the first element on the queue is not ready (not enough time has elapsed)
-// the amount of time to wait before the next iteration is adjusted to optimize
func (c *Channel) pqWorker(pq *pqueue.PriorityQueue, mutex *sync.Mutex, callback func(item *pqueue.Item)) {
ticker := time.NewTicker(defaultWorkerWait)
for {

0 comments on commit f672064

Please sign in to comment.