From 250d6a0b9cb1e46049e8dfc35485019af23a820f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-No=C3=ABl=20Moyne?= Date: Wed, 6 Mar 2024 01:05:04 -0800 Subject: [PATCH] Renumber to 42 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jean-Noël Moyne --- adr/ADR-41.md | 292 ++++++++++++++++++++++++++++++++++++++++ adr/ADR-42.md | 361 +++++++++++++++++--------------------------------- 2 files changed, 417 insertions(+), 236 deletions(-) create mode 100644 adr/ADR-41.md diff --git a/adr/ADR-41.md b/adr/ADR-41.md new file mode 100644 index 0000000..fd82081 --- /dev/null +++ b/adr/ADR-41.md @@ -0,0 +1,292 @@ +# NATS Message Path Tracing + + +| Metadata | Value | +|----------|-----------------------| +| Date | 2024-02-22 | +| Author | @ripienaar, @kozlovic | +| Status | Implemented | +| Tags | observability, server | + + +| Revision | Date | Author | Info | +|----------|------------|------------|----------------| +| 1 | 2024-02-22 | @ripienaar | Initial design | + +## Context and Problem Statement + +As NATS networks become more complex with Super Clusters, Leafnodes, multiple Accounts and JetStream knowing the path that messages take through the system is hard to predict. + +Further, when things go wrong, it's hard to know where messages can be lost, denied or delayed. + +This describes a feature of the NATS Server 2.11 that allows messages to be traced throughout the NATS network. + +## Prior Work + +NATS supports tracking latency of Request-Reply service interactions, this is documented in [ADR-3](adr/ADR-3.md). + +## Design + +When tracing is activated every subsystem that touches a message will produce Trace Events. These Events are aggregated per server and published to a destination subject. + +A single message published activating tracing will therefor result in potentially a number of Trace messages - one from each server a message traverse, each holding potentially multiple Trace Events. + +At present the following _Trace Types_ are supported + + * Ingress (`in`) - The first event that indicates how the message enters the Server, client connection, route, gateway etc + * Subject Mapping (`sm`) - Indicates the message got transformed using mappings that changed it's target subject + * Stream Export (`se`) - Indicates the message traversed a Stream Export to a different account + * Service Import (`si`) - Indicates the message traversed a Service Import to a different account + * JetStream (`js`) - Indicates the message reached a JetStream Stream + * Egress (`eg`) - The final event that indicates how the message leaves a Server + +## Activation + +Not all messages are traced and there is no flag to enable it on all messages. Activation is by adding Headers to the message. + +### Ad-hoc activation + +This mode of Activation allows headers to be added to any message that declares where to deliver the traces and inhibit delivery to the final application. + +| Header | Description | +|-------------------|------------------------------------------------------------------------------------------------------| +| `Nats-Trace-Dest` | A subject that will receive the Trace messages | +| `Nats-Trace-Only` | Prevents delivery to the final client, reports that it would have been delivered (`1`, `true`, `on`) | +| `Accept-Encoding` | Enables compression of trace payloads (`gzip`, `snappy`) | + +The `Nats-Trace-Only` header can be used to prevent sending badly formed messages to subscribers, the servers will trace the message to its final destination and report the client it would be delivered to without actually delivering it. Additionally when this is set messages will also not traverse any Route, Gateway or Leafnode that does not support the Tracing feature. + +### Trace Context activation + +Messages holding the standard `traceparent` header as defined by the [Trace Context](https://www.w3.org/TR/trace-context/) specification can trigger tracing based on the `sampled` flag. + +In this case no `Nats-Trace-Dest` header can be set to indicate where the messages will flow, it requires enabling using an account setting: + +``` +accounts { + A { + users: [{user: a, password: pwd}] + msg_trace: { + dest: "a.trace.subj" + sampling: "100%" + } + } +} +``` + +Here we set the `msg_trace` configuration for the `A` account, this enables support for Trace Context and will deliver all messages with the `traceparent` header and the `sampled` flag set to true. + +Note the `sampling`, here set to 100% which is the default, will further trigger only a % of traces that have the `sampled` value in the `traceparent` header. This allow you to specifically sample only a subset of messages traversing NATS while your micro services will sample all. + +When this feature is enabled any message holding the `Nats-Trace-Dest` header as in the previous section will behave as if the `traceparent` header was not set at all. In essence the Ad-Hoc mode has precedence. + +### Cross Account Tracing + +By default a trace will end at an account boundary when crossing an Import or Export. This is a security measure to restrict visibility into foreign accounts and require opt-in to allow. + +``` +accounts { + B { + exports = [ + // on a service the direction of flow is into the exporting + // account, so the exporter need to allow tracing + { service: "nats.add", allow_trace: true } + ] + + imports = [ + // on a stream import the direction of flow is from exporter into + // the importer, so the importer need to allow tracing + {stream: {account: A, subject: ticker}, allow_trace: true} + ] + } +} +``` + +## nats CLI + +The current `main` and nightly builds of `nats` includes the `nats trace` command that is built upon these features. + +This uses a helper package to receive, parse, sort and present a series of related Traces, the source can be found in [github.com/nats-io/jsm.go/api/server/tracing](https://github.com/nats-io/jsm.go/tree/main/api/server/tracing). + +``` +$ nats trace demo +Tracing message route to subject demo + +Client "NATS CLI Version development" cid:4219 cluster:"sfo" server:"n2-sfo" version:"2.11.0-dev" +==> Gateway "n1-lon" gid:727 + Gateway "n2-sfo" gid:735 cluster:"lon" server:"n1-lon" version:"2.11.0-dev" + ~~> Leafnode "leaf1" lid:5391 + Leafnode "n1-lon" lid:8 server:"leaf1" version:"2.11.0-tracing8" + --C Client "NATS CLI Version development" cid:10 subject:"demo" + +Legend: Client: --C Router: --> Gateway: ==> Leafnode: ~~> JetStream: --J Error: --X + +Egress Count: + + Gateway: 1 + Leafnode: 1 + Client: 1 +``` + +Here we can see: + +1. Message entered the `sfo` Cluster via a Client in the server `n2-sfo`. +2. The server `n2-sfo` published it to its Gateway called `n1-lon` using connection `gid:727` +3. The server `n1-lon` received from its Gateway called `n2-sfo` with connection `gid:735` +4. The server `n1-lon` published it to its Leafnode connection `leaf1` using connection `lid:5391` +5. The server `leaf1` received from its Leafnode called `leaf1` with connection `lid:8` +6. The server `leaf1` published the message to a Client with the connection name "NATS CLI Version development" over connection `cid:10` + +This is a set of 3 Trace messages holding 6 Trace Events in total. + +## Trace Message Formats + +The full detail of the trace message types are best found in the NATS Server source code in [server/msgtrace.go](https://github.com/nats-io/nats-server/blob/main/server/msgtrace.go), here we'll call out a few specifics about these messages. + +Given this sample message - a `MsgTraceEvent`: + +```json +{ + "server": { + "name": "n3-lon", + "host": "n3-lon.js.devco.net", + "id": "NCCPZOHDJ4KZQ35FU7EDFNPWXILA7U2VJAUWPG7IFDWUADDANNVOWKRV", + "cluster": "lon", + "domain": "hub", + "ver": "2.11.0-dev", + "tags": [ + "lon" + ], + "jetstream": true, + "flags": 3, + "seq": 151861, + "time": "2024-02-22T13:18:48.400821739Z" + }, + "request": { + "header": { + "Nats-Trace-Dest": [ + "demo" + ] + }, + "msgsize": 36 + }, + "hops": 1, + "events": [ + { + "type": "in", + "ts": "2024-02-22T13:18:48.400595984Z", + "kind": 0, + "cid": 5390, + "name": "NATS CLI Version development", + "acc": "one", + "subj": "x" + }, + { + "type": "eg", + "ts": "2024-02-22T13:18:48.40062456Z", + "kind": 0, + "cid": 5376, + "name": "NATS CLI Version development", + "sub": "x" + }, + { + "type": "eg", + "ts": "2024-02-22T13:18:48.400649361Z", + "kind": 1, + "cid": 492, + "name": "n1-lon", + "hop": "1" + } + ] +} +``` + +Lets have a quick look at some key fields: + +|Key|Notes| +|`server`|This is a standard Server Info structure that you will see in many of our advisories, just indicates which server sent the event| +|`request`|Details about the message being traced, more details about `Nats-Trace-Hop` below| +|`hops`|How many remote destination (routes, gateways, Leafnodes) this server is sending the message to| +|`events`|A list of `MsgTraceEvents` that happened within the server related to this message, see below| + +### Sorting + +These events form a Directed Acyclic Graph that you can sort using the Hop information. The Server that sends the message to another Route, Gateway or Leafnode will indicate it will publish to a number of `hops` and each server that receives the message will have the `Nats-Trace-Hop` header set in its `request`. + +The origin server - the server that receives the initial message with the `Nats-Trace-Dest` header - will have a hops count indicating to how many other servers (routes, leafs, gateways) it has forwarded the message to. This is not necessarily the total number of servers that the message will traverse. + +Each time a server forwards a message to a remote server, it appends a number to its own `Nats-Trace-Hop` header value. Since the origin server does not have one, if it forwards a message to say a ROUTE, that remote server would receive the message with a new header `Nats-Trace-Hop` with the value of `1`, then the origin server forwards to a GATEWAY, and that server would receive the message with `Nats-Trace-Hop` value set to `2`. + +Each of these servers in turn, if forwarding a message to another server, would add an incremental number to their existing `Nats-Trace-Hop` value, which would result in `1.1`, and `2.1`, etc.. + +Take a simple example of the origin server that has a LEAF server, which in turn as another LEAF server (daisy chained). If a message with tracing is forwarded, the trace message emitted from the origin server would have hops to `1` (since the origin server forwarded directly only to the first LEAF remote server). The trace emitted from the first LEAF would have a `Nats-Trace-Hop` header with value `1`. It would also have hops set to `1` (assuming there was interest in the last LEAF server). Finally, the last LEAF server would have a `Nats-Trace-Hop` of `1.1` and would have no hops field (since value would be 0). + +You would associate message either by using a unique `Nats-Trace-Dest` subject or by parsing the `traceparent` to get the trace and span IDs. + +### Trace Events + +The `events` list contains all the events that happened in a given server. We see here that there are different types of event according to the table below: + +| Type | Server Data Type | Description | +|------|--------------------------|-----------------| +| `in` | `MsgTraceIngress` | Ingress | +| `sm` | `MsgTraceSubjectMapping` | Subject Mapping | +| `se` | `MsgTraceStreamExport` | Stream Export | +| `si` | `MsgTraceServiceImport` | Service Import | +| `js` | `MsgTraceJetStream` | JetStream | +| `eg` | `MsgTraceEgress` | Egress | + +We also see a `kind` field, this holds the NATS Servers client Kind, at present these are the values: + +| Kind | Server Constant | Description | +|------|--------------------|---------------------| +| `0` | `server.CLIENT` | Client Connection | +| `1` | `server.ROUTER` | Router Connection | +| `2` | `server.GATEWAY` | Gateway Connection | +| `3` | `server.SYSTEM` | The NATS System | +| `4` | `server.LEAF` | Leafnode Connection | +| `5` | `server.JETSTREAM` | JetStream | +| `6` | `server.ACCOUNT` | Account | + +You may not see all Kinds in traces but this is the complete current list. + +Using these Kinds and Types we can understand the JSON data above: + + * `Ingress` from `Client` connection `cid:5390` + * `Egress` to `Client` connection `cid:5376` + * `Egress` to `Router` connection `rid:492` called `n1-lon` + +This indicates a Client connected to this server (`n3-lon`) published the message it was received by a client connected to the same server and finally sent over a Route to a client on another server (`n1-lon`). + +### Specific Trace Types + +Most of the trace types are self explanatory but I'll call out a few specific here, the names match those in `msgtrace.go` in the Server source. + +#### MsgTraceEgress and MsgTraceIngress + +These messages indicates the message entering and leaving the server via a `kind` connection. It includes the `acc` it's being sent for, subscription (`sub`) and Queue Group (`queue`) on the Egress. + +**Note** that for non CLIENT egresses, the subscription and queue group will be omitted. This is because NATS Servers optimize and send a single message across servers, based on a known interest, and let the remote servers match local subscribers. So it would be misleading to report the first match that caused a server to forward a message across the route as the egress' subscription. + +Here in cases of ACLs denying a publish the `error` will be filled in: + +```json +{ + "events": [ + { + "type": "in", + "ts": "2024-02-22T13:44:18.326658996Z", + "kind": 0, + "cid": 5467, + "name": "NATS CLI Version development", + "acc": "one", + "subj": "deny.x", + "error": "Permissions Violation for Publish to \"deny.x\"" + } + ] +} +``` + +#### MsgTraceJetStream + +The values are quite obvious, the `nointerest` boolean indicates that an `Interest` type Stream did not persist the message because it had no interest. \ No newline at end of file diff --git a/adr/ADR-42.md b/adr/ADR-42.md index fd82081..aae8783 100644 --- a/adr/ADR-42.md +++ b/adr/ADR-42.md @@ -1,292 +1,181 @@ -# NATS Message Path Tracing +# (Partitioned) Consumer Groups for JetStream +|Metadata| Value | +|--------|---------------------| +|Date | 2024-01-31 | +|Author | @jnmoyne | +|Status | `Proposed` | +|Tags | jetstream, client | -| Metadata | Value | -|----------|-----------------------| -| Date | 2024-02-22 | -| Author | @ripienaar, @kozlovic | -| Status | Implemented | -| Tags | observability, server | - - -| Revision | Date | Author | Info | -|----------|------------|------------|----------------| -| 1 | 2024-02-22 | @ripienaar | Initial design | +| Revision | Date | Author | Info | + |----------|------------|----------|----------------| +| 1 | 2024-01-31 | @jnmoyne | Initial design | ## Context and Problem Statement -As NATS networks become more complex with Super Clusters, Leafnodes, multiple Accounts and JetStream knowing the path that messages take through the system is hard to predict. - -Further, when things go wrong, it's hard to know where messages can be lost, denied or delayed. - -This describes a feature of the NATS Server 2.11 that allows messages to be traced throughout the NATS network. - -## Prior Work - -NATS supports tracking latency of Request-Reply service interactions, this is documented in [ADR-3](adr/ADR-3.md). - -## Design - -When tracing is activated every subsystem that touches a message will produce Trace Events. These Events are aggregated per server and published to a destination subject. +Currently distributed message consumption from a stream is possible through the use of durable consumers, however the messages are distributed 'on-demand' between the client application processes currently subscribing to the consumer. This means that if you want strict in-order consumption of the messages you must set number of "max pending acknowledgements" to exactly 1, which seriously limits throughput as only one message can be consumed at a time, regardless of the number of client applications subscribing from the consumer. Moreover, even with max acks pending is set to 1, the current consumers do not provide any kind of 'stickiness' of the distribution of the messages between the subscribing client applications, nor the ability to have only one of those subscribing applications being selected to be the exclusive recipient of the messages (at a time). -A single message published activating tracing will therefor result in potentially a number of Trace messages - one from each server a message traverse, each holding potentially multiple Trace Events. +While this is fine for most message consumption use cases, there are some classes of use cases that require strictly ordered 'per key' message consumption which are currently forced to set max acks pending to 1 (and therefore the consumption of the messages can not be scaled horizontally). Furthermore, even when strictly ordered message delivery is not required, there are other classes of use cases where 'per key' stickiness of the delivery of messages can be very valuable and lead to performance improvements. E.g. if the consuming application can leverage local caching of expensive lookup data stickiness enables the best possible cache hit-rate. -At present the following _Trace Types_ are supported +## Context - * Ingress (`in`) - The first event that indicates how the message enters the Server, client connection, route, gateway etc - * Subject Mapping (`sm`) - Indicates the message got transformed using mappings that changed it's target subject - * Stream Export (`se`) - Indicates the message traversed a Stream Export to a different account - * Service Import (`si`) - Indicates the message traversed a Service Import to a different account - * JetStream (`js`) - Indicates the message reached a JetStream Stream - * Egress (`eg`) - The final event that indicates how the message leaves a Server +Kafka's 'consumer groups' provide this exact feature through partitioning, and it is an oft-requested by the NATS community (and by many wanting to migrate from Kafka to NATS JetStream). The ability to have an 'exclusive subscriber' to a consumer has also been one of the most often requested features from the community. -## Activation +## Design goals -Not all messages are traced and there is no flag to enable it on all messages. Activation is by adding Headers to the message. +The design aims for the following attributes +- The ability to distribute the message consumption of the messages in a stream, while providing ***strictly ordered 'per key' delivery*** of the messages, when the key used for the consistent hashing distribution of the messages can be any combination of the subject's tokens. +- Fault-tolerance through the deployment of 'standby' consuming client applications that automatically take over the consumption of messages even in the case of process or host failure. +- ***Administratively managed*** 'elasticity': the ability to easily scale up or down the consumption of the messages by increasing or decreasing the number of running consuming client applications. +- ***Always*** keep the strict order of delivery per subject even when scaling up or down. +- Minimization of the re-distribution of keys as the consumer group is scaled up or down. +- - Support for both automatic or administratively defined mapping of members to partitions. -### Ad-hoc activation - -This mode of Activation allows headers to be added to any message that declares where to deliver the traces and inhibit delivery to the final application. +## Design -| Header | Description | -|-------------------|------------------------------------------------------------------------------------------------------| -| `Nats-Trace-Dest` | A subject that will receive the Trace messages | -| `Nats-Trace-Only` | Prevents delivery to the final client, reports that it would have been delivered (`1`, `true`, `on`) | -| `Accept-Encoding` | Enables compression of trace payloads (`gzip`, `snappy`) | +A number of new features of nats-server 2.10 are now making consumer groups (and exclusive consumption from a JetStream consumer) possible. Namely: subject transformation in streams with the ability to insert a partition number in a subject token calculated from other subject tokens using a consistent hashing algorithm, multi-filter JS consumers, create (as opposed to create or update) consumer creation call, meta-data in consumer configuration. -The `Nats-Trace-Only` header can be used to prevent sending badly formed messages to subscribers, the servers will trace the message to its final destination and report the client it would be delivered to without actually delivering it. Additionally when this is set messages will also not traverse any Route, Gateway or Leafnode that does not support the Tracing feature. +## Terminology -### Trace Context activation +A ***consumer group*** is a named new kind of consumer on a stream that distributes messages between ***members*** which are client named application instances wanting to consume (copies of) messages in a stream in a partitioned way. -Messages holding the standard `traceparent` header as defined by the [Trace Context](https://www.w3.org/TR/trace-context/) specification can trigger tracing based on the `sampled` flag. +### Exposed functionalities -In this case no `Nats-Trace-Dest` header can be set to indicate where the messages will flow, it requires enabling using an account setting: +The following functionalities are exposed to the application programmer -``` -accounts { - A { - users: [{user: a, password: pwd}] - msg_trace: { - dest: "a.trace.subj" - sampling: "100%" - } - } -} -``` +- Get a named partitioned consumer group's configuration. +- Create a named partitioned consumer group on a stream by providing a configuration. +- Delete a named partitioned consumer group on a stream. +- List the partitioned consumer groups defined on a stream. +- Add members to a named partitioned consumer group. +- Drop members from a partitioned consumer group. +- List the partitioned consumer group's members that are currently consuming from the partitioned consumer group. +- Consume (if or when selected) from the partitioned consumer group. +- - Set (or delete) a custom member-to-partitions mapping. -Here we set the `msg_trace` configuration for the `A` account, this enables support for Trace Context and will deliver all messages with the `traceparent` header and the `sampled` flag set to true. +### Partitioned consumer group configuration -Note the `sampling`, here set to 100% which is the default, will further trigger only a % of traces that have the `sampled` value in the `traceparent` header. This allow you to specifically sample only a subset of messages traversing NATS while your micro services will sample all. +The design relies on using a KV bucket to store the configuration information for the consumer groups. The key is the combination of the stream name and the partitioned consumer group's name. You can create any number of named partitioned consumer groups on a stream, just like you can create any number of consumers on a stream. -When this feature is enabled any message holding the `Nats-Trace-Dest` header as in the previous section will behave as if the `traceparent` header was not set at all. In essence the Ad-Hoc mode has precedence. +The configuration consists of the following: +- The maximum number of members that this consumer group will distribute messages to. +- A subject filter, that must contain at least one `*` wildcard. +- A list of the subject filter partitioning wildcard indexes that will be used to make the 'key' used for the distribution of messages. For example if the subject filter is `"foo.*.*.>"` (two wildcard indexes: 1 and 2) the list would be `[1,2]` if the key is composed of both `*` wildcards, `[1]` if the key is composed of only the first `*` wildcard of the filter, or `[2]` if is it composed of only the second `*` wildcard. +- The current list of partitioned consumer group member names or the current list of consumer group members to partition mappings. -### Cross Account Tracing +Note: once the partitioned consumer group has been created, the list of members or the list member mappings are the ***only*** part of its configuration that can be changed. -By default a trace will end at an account boundary when crossing an Import or Export. This is a security measure to restrict visibility into foreign accounts and require opt-in to allow. +#### Partitioned consumer group config record format -``` -accounts { - B { - exports = [ - // on a service the direction of flow is into the exporting - // account, so the exporter need to allow tracing - { service: "nats.add", allow_trace: true } - ] - - imports = [ - // on a stream import the direction of flow is from exporter into - // the importer, so the importer need to allow tracing - {stream: {account: A, subject: ticker}, allow_trace: true} - ] - } +```go +type ConsumerGroupConfig struct { +MaxMembers uint `json:"max_members"` +Filter string `json:"filter"` +PartitioningWildcards []int `json:"partitioning-wildcards"` +MaxBufferedMsgs int64 `json:"msg-buffer-size,omitempty"` +Members []string `json:"members,omitempty"` +MemberMappings []MemberMapping `json:"member-mappings,omitempty"` } ``` -## nats CLI - -The current `main` and nightly builds of `nats` includes the `nats trace` command that is built upon these features. - -This uses a helper package to receive, parse, sort and present a series of related Traces, the source can be found in [github.com/nats-io/jsm.go/api/server/tracing](https://github.com/nats-io/jsm.go/tree/main/api/server/tracing). - -``` -$ nats trace demo -Tracing message route to subject demo - -Client "NATS CLI Version development" cid:4219 cluster:"sfo" server:"n2-sfo" version:"2.11.0-dev" -==> Gateway "n1-lon" gid:727 - Gateway "n2-sfo" gid:735 cluster:"lon" server:"n1-lon" version:"2.11.0-dev" - ~~> Leafnode "leaf1" lid:5391 - Leafnode "n1-lon" lid:8 server:"leaf1" version:"2.11.0-tracing8" - --C Client "NATS CLI Version development" cid:10 subject:"demo" - -Legend: Client: --C Router: --> Gateway: ==> Leafnode: ~~> JetStream: --J Error: --X +### Partitioned consumer group members (consuming client applications) -Egress Count: +Client applications that want to consume from the partitioned consumer group typically would either create the consumer group themselves (or rely on it being created administratively ahead of time) just like they would use a regular durable consumer. Then would then signal their ability to consume messages by consuming from (it could also be described as 'joining') the partitioned consumer group with a specific member name. Starting to 'consume' doesn't mean that messages will actually start being delivered to the client application, as that is controlled by two factors: the member's name being listed in the list or members in the partitioned consumer group's config, and if more than one instance of the same member are joined to the consumer group at the time, being selected as the 'exclusive' client application instance receiving the messages for that member's subset of the subjects in the stream. The member may start or stop receiving messages at any moment. - Gateway: 1 - Leafnode: 1 - Client: 1 -``` +When starting to consume, besides a `context.Context` the client application programmer can provide a "Max acks pending" value (which defaults to and will typically be `1` when strictly ordered processing is required), a callback to process the message (same kind you would use for subscribing to a regular consumer), as well as optional callbacks to get notified when that particular instance of the member gets activated or de-activated as the membership of the group changes. -Here we can see: - -1. Message entered the `sfo` Cluster via a Client in the server `n2-sfo`. -2. The server `n2-sfo` published it to its Gateway called `n1-lon` using connection `gid:727` -3. The server `n1-lon` received from its Gateway called `n2-sfo` with connection `gid:735` -4. The server `n1-lon` published it to its Leafnode connection `leaf1` using connection `lid:5391` -5. The server `leaf1` received from its Leafnode called `leaf1` with connection `lid:8` -6. The server `leaf1` published the message to a Client with the connection name "NATS CLI Version development" over connection `cid:10` - -This is a set of 3 Trace messages holding 6 Trace Events in total. - -## Trace Message Formats - -The full detail of the trace message types are best found in the NATS Server source code in [server/msgtrace.go](https://github.com/nats-io/nats-server/blob/main/server/msgtrace.go), here we'll call out a few specifics about these messages. - -Given this sample message - a `MsgTraceEvent`: - -```json -{ - "server": { - "name": "n3-lon", - "host": "n3-lon.js.devco.net", - "id": "NCCPZOHDJ4KZQ35FU7EDFNPWXILA7U2VJAUWPG7IFDWUADDANNVOWKRV", - "cluster": "lon", - "domain": "hub", - "ver": "2.11.0-dev", - "tags": [ - "lon" - ], - "jetstream": true, - "flags": 3, - "seq": 151861, - "time": "2024-02-22T13:18:48.400821739Z" - }, - "request": { - "header": { - "Nats-Trace-Dest": [ - "demo" - ] - }, - "msgsize": 36 - }, - "hops": 1, - "events": [ - { - "type": "in", - "ts": "2024-02-22T13:18:48.400595984Z", - "kind": 0, - "cid": 5390, - "name": "NATS CLI Version development", - "acc": "one", - "subj": "x" - }, - { - "type": "eg", - "ts": "2024-02-22T13:18:48.40062456Z", - "kind": 0, - "cid": 5376, - "name": "NATS CLI Version development", - "sub": "x" - }, - { - "type": "eg", - "ts": "2024-02-22T13:18:48.400649361Z", - "kind": 1, - "cid": 492, - "name": "n1-lon", - "hop": "1" - } - ] -} -``` +### Implementation design -Lets have a quick look at some key fields: +People too often will associate elasticity of a partitioning system with having to change the number of partitions as the membership in the group changes over time, and re-partitioning is always going to be a difficult and expensive thing to do, and therefore not something you want to do all the time. In practice however there is way around that while not providing 'infinite elasticity' is still actually quite acceptable. -|Key|Notes| -|`server`|This is a standard Server Info structure that you will see in many of our advisories, just indicates which server sent the event| -|`request`|Details about the message being traced, more details about `Nats-Trace-Hop` below| -|`hops`|How many remote destination (routes, gateways, Leafnodes) this server is sending the message to| -|`events`|A list of `MsgTraceEvents` that happened within the server related to this message, see below| +The trick is simply to see the number of partitions as the maximum number of members, and then you distribute those partitions as evenly as you can between the members, no need to try and be clever about it as the even distribution of the subjects is already done by the partitioning. Partitions in this case are cheap, they are literally nothing more than 'just another subject (filter)' so you don't have to try and be stingy with the number of them. Then you just need to have a coordinated way to move partitions between members, and you have elasticity. -### Sorting +These things are implemented on top of JetStream using new 2.10 features. The mapping of members to partitions is done by having each member create a named consumer named after the member name (with a relatively short idle timeout), and expressing the partitions that this particular member was assigned to as filter subjects in the consumer's configuration. The coordination trick is done by doing this on top of a working queue stream, which ensures that no two member consumers can have the same partition in their subject filters at the same time. -These events form a Directed Acyclic Graph that you can sort using the Hop information. The Server that sends the message to another Route, Gateway or Leafnode will indicate it will publish to a number of `hops` and each server that receives the message will have the `Nats-Trace-Hop` header set in its `request`. +So when you create a consumer group, you actually create a new working queue stream, that sources from the initial stream and on which the members create consumers and inserts the partition number token in the subject at the same time. The obvious down-side is that this can lead to requiring the space to store the data twice in two streams, but the advantages are that guarantee that a working-queue stream will never allow two members to operate on the same partition at the same time, and it also gives you a convenient way to track the progress of the members in real-time (the size of that working-queue), which is something you could use to try to automate the scaling up or down, and finally using a working queue stream allows the creation/deletion of the consumers on that stream without having to worry about having to store and specify a starting sequence number in order to not re-consume messages. -The origin server - the server that receives the initial message with the `Nats-Trace-Dest` header - will have a hops count indicating to how many other servers (routes, leafs, gateways) it has forwarded the message to. This is not necessarily the total number of servers that the message will traverse. +The final part of the puzzle is the 'exclusive subscriber to a consumer' functionality: it's not just elasticity people want but also built-in fault-tolerance, so you need to be able to run more than one instance of each member name to protect from the member process/host/whatever failing causing messages to be unprocessed and accumulating in the consumer group until a new instance of that member name gets restarted. This synchronization is done relying on 'named ephemeral' consumers the new 2.10 Consumer Create (vs CreateOrUpdate) call with the addition of metadata in the consumer config: when an instance of member "m1" starts and wants to consume from the consumer group, it first generates a unique ID that it sets in the metadata of the consumer config and uses `Consumer.Create()` to try to create a named (using the member's name) ephemeral 'synchronization consumer' on the consumer group's stream (the wq stream) with the subject filters for the partitions distributed to that member. If there's already another instance of member m1 that is running (and will have a different id), or if there's another member consumer on the stream that has overlapping filter subjects the Create call fails (because the metadata part of the consumer's config does not match), and the code just waits for a few seconds and re-tries. When that other instance of "m1" decides to stop consuming, it will delete that consumer, and another instance will be the first one to succeed in creating its version of that consumer. Because the consumer is created with an idle timeout of just a few seconds (and the currently active member instance constantly pulls on it), if the member instance crashes or gets disconnected then the server takes care of deleting the consumer. -Each time a server forwards a message to a remote server, it appends a number to its own `Nats-Trace-Hop` header value. Since the origin server does not have one, if it forwards a message to say a ROUTE, that remote server would receive the message with a new header `Nats-Trace-Hop` with the value of `1`, then the origin server forwards to a GATEWAY, and that server would receive the message with `Nats-Trace-Hop` value set to `2`. +#### Member consumer implementation detail +Using one single member consumer per member for both synchronization and getting messages does work well for failure scenarios where the member client application crashes, it breaks down if the client application gets suspended (same if isolated) because after resuming it will continue to pull from the shared synchronization consumer, not realizing that another member id now has it. To avoid this, two streams rather than one are used for each member, the 'synchronization' consumer and another instance-id specific 'data' consumer. They are configured the same, except the synchronization consumer has subject filters prepended by a token that is not a number (and therefore will not match any messages in the stream), while the data consumer uses subject filters that match the actual partitions. The implementation continuously pulls from both consumers (to keep them both non-idle) but no messages are received on the synchronization consumer and the stream messages are consumed from the data consumer. -Each of these servers in turn, if forwarding a message to another server, would add an incremental number to their existing `Nats-Trace-Hop` value, which would result in `1.1`, and `2.1`, etc.. +The implementation when the instance is joining or not currently the selected instance checks every 5 seconds: +- Can I succeed in creating the consumers for the member +- And if they think they are the current selected member instance check that the consumers still exists, and that the instance id in the meta-data still matches. If not they should 'step-down' (drain their subscribers to both member consumers) and go back to assuming not being the currently elected instance. +- In order to implement a 'step-down' administrative command the current synchronization consumer simply gets deleted (note it could also be deleted by the server if the selected instance gets suspended or on the other side of a network partition), which causes the current selected instance to after a few seconds realize it's been deleted and drain it's current data consumer and assume not being the selected instance anymore, plus wait a second before going back to trying to create the synchronization consumer (to ensure the step-down causes another instance (if there's one at the time) to successfully create its version of the member consumers and become the new selected instance). -Take a simple example of the origin server that has a LEAF server, which in turn as another LEAF server (daisy chained). If a message with tracing is forwarded, the trace message emitted from the origin server would have hops to `1` (since the origin server forwarded directly only to the first LEAF remote server). The trace emitted from the first LEAF would have a `Nats-Trace-Hop` header with value `1`. It would also have hops set to `1` (assuming there was interest in the last LEAF server). Finally, the last LEAF server would have a `Nats-Trace-Hop` of `1.1` and would have no hops field (since value would be 0). +#### Details -You would associate message either by using a unique `Nats-Trace-Dest` subject or by parsing the `traceparent` to get the trace and span IDs. +The partition to member distribution is designed to minimize the number of partitions that get re-assigned from one member to another as the member list grows/shrinks: rather than being a simple modulo (which would suffice for distribution purposes) the algorithm is the following: -### Trace Events +- De-duplicate the list of members in the consumer group's configuration +- Sort the list +- Cap the list to the max number of members +- (Unless a custom specific set of members to partition mappings is defined) Distribute automatically the partitions amongst the members in the list, assigning continuous blocks of partitions of size (number of partitions divided by the number of members in the list), and finally distribute the remainder (if the number of members in the list is not a multiple of the number of partitions) over the members using a modulo. -The `events` list contains all the events that happened in a given server. We see here that there are different types of event according to the table below: +Automatic partition to member mapping implementation: -| Type | Server Data Type | Description | -|------|--------------------------|-----------------| -| `in` | `MsgTraceIngress` | Ingress | -| `sm` | `MsgTraceSubjectMapping` | Subject Mapping | -| `se` | `MsgTraceStreamExport` | Stream Export | -| `si` | `MsgTraceServiceImport` | Service Import | -| `js` | `MsgTraceJetStream` | JetStream | -| `eg` | `MsgTraceEgress` | Egress | +```go +func GetPartitionFilters(config ConsumerGroupConfig, memberName string) []string { + members := deduplicateStringSlice(config.Members) + slices.Sort(members) -We also see a `kind` field, this holds the NATS Servers client Kind, at present these are the values: + if uint(len(members)) > config.MaxMembers { + members = members[:config.MaxMembers] + } -| Kind | Server Constant | Description | -|------|--------------------|---------------------| -| `0` | `server.CLIENT` | Client Connection | -| `1` | `server.ROUTER` | Router Connection | -| `2` | `server.GATEWAY` | Gateway Connection | -| `3` | `server.SYSTEM` | The NATS System | -| `4` | `server.LEAF` | Leafnode Connection | -| `5` | `server.JETSTREAM` | JetStream | -| `6` | `server.ACCOUNT` | Account | + // Distribute the partitions amongst the members trying to minimize the number of partitions getting re-distributed + // to another member as the number of members increases/decreases + numMembers := uint(len(members)) -You may not see all Kinds in traces but this is the complete current list. + if numMembers > 0 { + // rounded number of partitions per member + var numPer = config.MaxMembers / numMembers + var myFilters []string -Using these Kinds and Types we can understand the JSON data above: + for i := uint(0); i < config.MaxMembers; i++ { + var memberIndex = i / numPer + if i < (numMembers * numPer) { + if members[memberIndex%numMembers] == memberName { + myFilters = append(myFilters, fmt.Sprintf("%d.>", i)) + } + } else { + // remainder if the number of partitions is not a multiple of the number of members + if members[(i-(numMembers*numPer))%numMembers] == memberName { + myFilters = append(myFilters, fmt.Sprintf("%d.>", i)) + } + } + } - * `Ingress` from `Client` connection `cid:5390` - * `Egress` to `Client` connection `cid:5376` - * `Egress` to `Router` connection `rid:492` called `n1-lon` + return myFilters + } + return []string{} +} +``` -This indicates a Client connected to this server (`n3-lon`) published the message it was received by a client connected to the same server and finally sent over a Route to a client on another server (`n1-lon`). +The partition number token is stripped from the subject before being passed down to the user's message consumption callback. -### Specific Trace Types +It is up to the client application consumption callback code to acknowledge the message. In order to achieve exactly once processing the client application callback code should follow this pattern: +1. Process the message begin a transaction on the destination(s) systems of record that get modified by the processing of the message, call prepare on all of them and if they all are prepared call `DoubleAck()` on the message, if that returns without an error then commit the transaction(s) but if the double ack does return an error (because for example the process was suspended or isolated for a period of time longer than what it takes for another member instance to be selected to consume the messages, in which case the newly active member instance will get all currently un-acknowledged messages from the consumer group's stream) then it should rollback the transaction(s) to avoid double-processing. -Most of the trace types are self explanatory but I'll call out a few specific here, the names match those in `msgtrace.go` in the Server source. +### Transitioning -#### MsgTraceEgress and MsgTraceIngress +Since when processing membership changes where partitions (and therefore subject filters) are moved from one consumer to another on a working queue stream it's impossible to synchronize all the currently active member instances to change their consumer configuration at exactly the same time, it's done in two steps. As soon as they get the config update, they first delete the consumers, and then wait a small amount of time before trying to create the new consumer again with the new filters. In order to avoid flapping of the active member instance on membership changes the currently active instance waits a fixed amount of time 500ms before trying to create the consumer while the others wait 500ms plus a random amount of time up to 100ms before trying. -These messages indicates the message entering and leaving the server via a `kind` connection. It includes the `acc` it's being sent for, subscription (`sub`) and Queue Group (`queue`) on the Egress. +## Administration -**Note** that for non CLIENT egresses, the subscription and queue group will be omitted. This is because NATS Servers optimize and send a single message across servers, based on a known interest, and let the remote servers match local subscribers. So it would be misleading to report the first match that caused a server to forward a message across the route as the egress' subscription. +By simply using the consumer group config KV bucket, administrators can: +- Create consumer groups on streams -> creates a new config entry +- Delete consumer groups on streams -> deletes the config entry +- List consumer groups on a stream -> list the keys matching ".*" +- Add/Remove members to a consumer group -> add/remove a member name to the `members` array in the config entry, or Create/Delete a set of custom member names to partition number mappings -> create/delete the `member-mappings` array in the config entry +- Request the current active member instance for a member to step-down and have another member instance currently joined (if there's one take over as the current active member instance) -Here in cases of ACLs denying a publish the `error` will be filled in: +You can also see the list of members that have currently active instances by looking at what consumer currently exist on the consumer group's stream, which is important because you therefore know when you are missing any and which ones are missing meaning that some of the partitions are not being consumed. -```json -{ - "events": [ - { - "type": "in", - "ts": "2024-02-22T13:44:18.326658996Z", - "kind": 0, - "cid": 5467, - "name": "NATS CLI Version development", - "acc": "one", - "subj": "deny.x", - "error": "Permissions Violation for Publish to \"deny.x\"" - } - ] -} -``` +## Reference initial implementation +https://github.com/synadia-labs/partitioned-consumer-groups -#### MsgTraceJetStream +## Consequences -The values are quite obvious, the `nointerest` boolean indicates that an `Interest` type Stream did not persist the message because it had no interest. \ No newline at end of file +Hopefully a lot of happy NATS users and lots of new NATS users moving off from other streaming systems that already have a partitioned consumer group functionality. \ No newline at end of file