Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,27 @@ jobs:
working-directory: jetstreamext
- run: uv run --python=3.10 pytest .
working-directory: jetstreamext

jetstreampcg-checks:
if: ${{ ! github.event.pull_request.draft == true }}
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v5
- uses: astral-sh/setup-uv@v6
- uses: actions/setup-python@v6
with:
python-version: |
3.10
3.13
- run: uv lock --check
working-directory: jetstreampcg
- run: uv run --python=3.13 ruff format --check .
working-directory: jetstreampcg
- run: uv run --python=3.13 ruff check .
working-directory: jetstreampcg
- run: uv run --python=3.13 ty check .
working-directory: jetstreampcg
- run: uv run --python=3.13 pytest .
working-directory: jetstreampcg
- run: uv run --python=3.10 pytest .
working-directory: jetstreampcg
27 changes: 27 additions & 0 deletions .github/workflows/publish-jetstreampcg.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: publish-jetstreampcg

on:
push:
tags:
- jetstreampcg-v*

permissions:
contents: write
id-token: write

jobs:
jetstreampcg-publish:
if: ${{ ! github.event.pull_request.draft == true }}
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v5
- uses: astral-sh/setup-uv@v6
- uses: actions/setup-python@v6
with:
python-version: |
3.13
- run: uv build
working-directory: jetstreampcg
- if: ${{ ! github.event.release.prerelease }}
run: uv publish
working-directory: jetstreampcg
1 change: 1 addition & 0 deletions jetstreampcg/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Changelog
8 changes: 8 additions & 0 deletions jetstreampcg/CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Checks that should pass

- `uv run ruff format .`
- `uv run ruff check --fix .`
- `uv run ty check .`
- `uv run basedpyright .`
- `uv run pytest .`
- `uv run --python=3.10 pytest .`
1 change: 1 addition & 0 deletions jetstreampcg/NOTICE
97 changes: 97 additions & 0 deletions jetstreampcg/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Partitioned Consumer Groups

Initial implementation of a client-side partitioned consumer group feature for NATS streams leveraging some of the new features introduced in `nats-server` version 2.11.

Note that post 2.11 versions of `nats-server` may include new features related to the consumer group use case that could render this client-side library unneeded (or make much smaller)

## Overview

This library enables the parallelization through partitioning of the consumption of messages from a stream while ensuring a strict order of not just delivery but also successful consumption of the messages using all or parts of the message's subject as a partitioning key.

In JetStream terms, strictly ordered consumption is achieved when you set the consumer's 'max acks pending' value to 1. However, setting this on a JetStream consumer has the very unfortunate side effect of being very low throughput (limited by the network latency and processing speed) and not being horizontally scalable: only one message is being delivered and processed synchronously at a time from that JetStream consumer, no matter how many instances of the consuming application are deployed.

The library allows the creation of 'consumer groups' on Stream, where each 'member' of the consumer group can consume from the group in parallel (with max acks pending 1 if needed), with the guarantee that in no way more than one message for a particular key can be consumed at the same time. Client applications wanting to consume messages from the group simply do so using a 'member name' and providing a callback. Even if more than one instance of a member is deployed, only one of those instances will be delivered messages at a time.

The library takes care of the partitioning and the mapping of the partitions between the members of the group, the idea being that it is mostly transparent to the consuming application's developers who only need to join a consumer group, providing a member name and a callback to process and acknowledge the message when successfully processed.

NATS Partitioned consumer groups come in two flavors: *elastic* and *static*.

***Static*** partitioned consumer groups assume that the stream already has a partition number present as the first token of the message's subjects (something that can be done automatically when messages are stored into to the stream by setting a subject transform for the stream). You can only create and delete static consumer groups. Any change to the consumer group's config in the KV bucket will cause all the member instances for all members of the group to stop consuming.

***Elastic*** partitioned consumer groups on the other hand are implemented differently: the stream doesn't need to already contain a partition number subject token and you can administratively add and drop members from the consumer group's config whenever you want without having to delete and re-create the consumer (like you have to with static consumer groups).

***In both cases***
In both cases you must specify when creating the consumer group the maximum number of members for the group (which is actually the number of partitions used when partitioning the messages), plus a list of "members" (named instances of the consuming application). The library takes care of distributing the members over the list of partitions using either a 'balanced' distribution (the partitions are evenly distributed between the members) or 'mappings' (where you assign administratively the mappings of partitions to the members). The membership list or mappings must be specified once at consumer group creation time for static consumer groups, but can be changed at any time for elastic consumer groups.

Each consumer groups has a configuration which is stored in a KV bucket (named `static-consumer-groups` or `elastic-consumer-groups`).

### Static

Static consumer groups operate on a stream where the partition number has already been inserted in the subject as the first token of the messages. In this mode of operation, the library creates JetStream consumers (one per member of the group) directly on the stream. This is not elastic: you create the consumer with a list of members once, and you can not adjust that membership list or mapping for the life of the consumer group (if you want to change the mapping, up to you to delete and re-create the static partitioned consumer group, and to figure out which sequence number you may want this new static partitioned consumer group to start from).

### Elastic

Elastic consumer groups operate on any stream, the messages in the stream do not have the partition number present in their subjects. The membership list (or mapping) for the consumer can be adjusted administratively at any time and up to the max number of members defined initially. The consumer group library in this case creates a new work-queue stream that sources from the stream, inserting the partition number subject token on the way. The consumer group library takes care of creating this sourced stream and managing all the consumers on this stream according to the current membership, the developer only needs to provide a stream name, consumer group name and a member name and callback and make sure to ack the messages. You can specify (at creation time) a maximum size (in number of messages or bytes) for this working queue stream, but be aware that once this stream has reached its limit, it will pause the sourcing for at least 1 second (expecting messages to be consumed from the consumer group, thereby making room for more messages to be sourced) so you will want to set this value to more than 1 second's worth of message consumption by the clients of the consumer group or this could result in small delays in the consumption of messages from the consumer group.

### High availability

You can deploy and run multiple instances of the consuming application using the same member name, in that case only one of the running instances of the member will be 'pinned' and have messages delivered to it (thereby the other instances are effectively in hot standby). There are functions (`ElasticMemberStepDown()` and `StaticMemberStepDown`) to force a change of the currently pinned member instance.

### Using Partitioned Consumer Groups

For the client application programmer, there is one basic functionality exposed by both static and elastic partitioned consumer groups: join and consume messages (when selected) from a named consumer group on a stream by specifying a _member name_, a regular JetStream consumer config, and a _callback_. The library takes care of stripping the partition number token from the subject such that you can use any existing callback code you may already have as is.

There are also administrative functions to create and delete consumer groups, plus, in the case of elastic consumer groups only, the ability to add/drop members or to change the custom member to partition mappings on an existing elastic consumer group.

### CLI

Included is a small command line interface tool, located in the `cg` directory, that allows you to manage consumer groups, as well as test or demonstrate the functionality, and which can be registered as a plugin with the `nats` CLI tool (e.g. `nats plugins register cg /path/to/go/bin/cg`).

This `cg` CLI tool can be used by passing it commands and arguments directly, or with an interactive prompt using the `prompt` command (e.g. `cg static prompt`).

### Demo walkthrough

#### Static

Create a stream "foo" that automatically partitions over 10 partitions using `static_stream_setup.sh`, then generate some traffic (a new message every 10ms) for that stream using `generate_traffic.sh`.

Create a static consumer group named "cg" on the stream in question, with two members defined called "m1" and "m2": `cg static create balanced foo cg 10 '>' m1 m2`

Start consuming messages with a simulated processing time of 20ms from an instance of member "m1": `cg static consume foo cg m1 --sleep 25ms`. Run in another window cg again to consume as member m2 a second, run multiple instances of members m1 and m2, kill the active one (the one receiving messages) and watch as one of the other instances takes over.

#### Elastic

Create a stream 'foo' that captures messages on the subjects `foo.*`, then generate some traffic (a new message every 10ms) for that stream using `generate_traffic.sh`.

Create an elastic consumer group named "cg", partitioning over 10 partitions using the second token (first `*` wildcard in the filter "foo.*") in the subject as the partitioning key: `cg elastic create foo cg 10`.

At this point the elastic consumer group is created, but no members have been added to it yet. But you can start instances of your consuming members already (e.g. `cg elastic consume foo cg m1` for an instance of a member "m1"), for example start instances of members "m1", "m2" and "m3". At this point none of those members are receiving messages.

Add "m1" and "m2" to the membership: `cg elastic add foo cg m1 m2`, see how they start receiving messages. Then drop "m1" from the membership `cg elastic drop foo cg m1`, add it again, and each time watch as the consumer starts and stops receiving messages, run another consumer "m3" and add/drop it from the membership, etc...

As soon as the elastic consumer group is created, you can start instances of consuming clients (e.g. `cg elastic consume foo cg m1`), and they will start to consume messages as soon as (and as long as) they are in the group's membership.

#### Example

To start consuming from a static consumer group, you call `pcgroups.StaticConsume`. To start consuming from an elastic consumer group you call `pcgroups.ElasticConsume`. These calls will return an error and a `ConsumerGroupConsumeContext`. Assuming no error is returned,this will create a Go routine that handles consumption and monitoring for changes in the consumer group's config.

e.g. for static
```golang
consumerGroupContext, err = pcgroups.StaticConsume(myContext, nc, streamName, consumerGroupName, memberName, messageHandler, config)
```
The arguments are:
- `myContext` is a Golang `context.Context` which is going to be used only for the operations that are part of joining the consumer group. You must use `Stop()` on the `ConsumerGroupContext` being returned to stop the consumption.
- `nc` is a NATS connection object.
- `streamName` is the name of the Stream on which the consumer group has been created.
- `consumerGroupName` is the name of the consumer group that has been created on the stream.
- `memberName` is the name of the member you want to join the consumer group as.
- `messageHandler` is a callback function that gets invoked and passed the messages for consumption. Note that if you are using an elastic consumer group you _must_ explicitly acknowledge (positively or negatively) the message in your callback.
- `config` is a regular JetStream consumer config to use by the library as a template when actually creating the JetStream consumers. For elastic consumers the acknowledgement policy must be explicit. For static consumer groups, it doesn't have to, but if you want to do strictly one at a time processing, you will need to use explicit acks in order for max acks pending 1 to apply. Note that this configuration being used as template means that some of the values will be overwritten and can be left empty (e.g. names and durable names, filters, idle timeouts) or will be overwritten if they are too small (as there is a relationship that must be maintained between the ack wait time, the consumer fetch time out, and the pinned TTL values to avoid 'flapping' of the pinned client which means that lower values will cause in increase in load on the infrastructure because of the added overhead. At this point the ack wait time must be at least 6 seconds, but this may well change in the future and the community users are encouraged to give feed back on this implementation detail).

Consumption stops either when some error is encountered (for example, any change to the consumer group's config in the case of a static consumer group), the consumer group's config gets deleted, or you invoke `Stop()` on the `ConsumerGroupConsumeContext`. Invoking `Done()` on the `ConsumerGroupConsumeContext` will return a channel on which you can receive the error code indicating why the consumption was stopped. This error code will be `nil` if the consumption terminates normally (due to the consumer group getting deleted or `Stop()` being invoked on the `ConsumerGroupConsumeContext`).

You can look at the `cg` CLI tool's source code for examples of how to create and consume for both static and elastic consumer groups.

## Requirements

Note: partitioned consumer groups require NATS server version 2.11 or above.
7 changes: 7 additions & 0 deletions jetstreampcg/examples.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""jetstreampcg example usage.

1. Ensure a local nats server is running on port 4222.
2. run `uv run examples.py`
"""

# TODO: impl
95 changes: 95 additions & 0 deletions jetstreampcg/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
[project]
name = "jetstreampcg"
version = "0.1.0"
description = "JetStream partitioned consumer groups is an implementation of a client-side partitioned consumer group feature for NATS streams."
readme = "README.md"
license = "Apache-2.0"
license-files = ["NOTICE"]
authors = [
{ name = "Oliver Lambson", email = "oliverlambson@gmail.com" }
]
requires-python = ">=3.10"
dependencies = [
"nats-py>=2.12.0",
]
classifiers = [
'License :: OSI Approved :: Apache Software License',
'Intended Audience :: Developers',
'Development Status :: 2 - Pre-Alpha',
'Programming Language :: Python',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Programming Language :: Python :: 3.12',
'Programming Language :: Python :: 3.13'
]

[project.urls]
"Homepage" = "https://github.com/oliverlambson/orbit.py"
"Bug Tracker" = "https://github.com/oliverlambson/orbit.py/issues"

[build-system]
requires = ["uv_build>=0.8.15,<0.9.0"]
build-backend = "uv_build"

[dependency-groups]
dev = [
"basedpyright>=1.31.4",
"pytest>=8.4.2",
"pytest-asyncio>=1.1.0",
"ruff>=0.12.12",
"testcontainers>=4.12.0",
"ty>=0.0.1a20",
]

[tool.uv.build-backend]
source-exclude = ["examples.py"]

[tool.uv.sources]
nats-py = { git = "https://www.github.com/oliverlambson/nats.py", subdirectory = "nats", rev = "1181018c80476413ade3f8849980afa3c8835ebd" }

[[tool.uv.index]]
name = "testpypi"
url = "https://test.pypi.org/simple/"
publish-url = "https://test.pypi.org/legacy/"
explicit = true

[tool.ty.rules]
division-by-zero = "warn"
possibly-unresolved-reference = "warn"
unused-ignore-comment = "warn"

[tool.ty.terminal]
error-on-warning = true

[tool.ruff.lint]
select = [
"E", # pycodestyle errors
"W", # pycodestyle warnings
"F", # pyflakes
"I", # isort
"UP", # pyupgrade
"B", # bugbear
"TD", # todos
"PERF", # performance
"RUF", # ruff
"ASYNC", # flake8-async
"S", # bandit
"BLE", # blind except
"FBT", # boolean trap
"A", # shadowing builtins
"DTZ", # datetime timezone
"EM", # error message
"FA", # future annotations
"G", # logging format
]
ignore = [
"E501", # line too long (handled by ruff format)
"W505", # line too long (handled by ruff format)
"TD002", # todo author
"TD003", # todo link
"S101", # use of assert
]

[tool.ruff.format]
docstring-code-format = true
89 changes: 89 additions & 0 deletions jetstreampcg/src/jetstreampcg/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright 2025 Oliver Lambson
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""JetStream partitioned consumer groups is an implementation of a client-side partitioned consumer group feature for NATS streams."""

from jetstreampcg._version import __version__
from jetstreampcg.common import (
ConsumerGroupConsumeContext,
ConsumerGroupMsg,
MemberMapping,
compose_key,
generate_partition_filters,
)
from jetstreampcg.elastic import (
ElasticConsumerGroupConfig,
ElasticConsumerGroupConsumerInstance,
add_members,
create_elastic,
delete_elastic,
delete_member_mappings,
delete_members,
elastic_consume,
elastic_get_partition_filters,
elastic_is_in_membership_and_active,
elastic_member_step_down,
get_elastic_consumer_group_config,
list_elastic_active_members,
list_elastic_consumer_groups,
set_member_mappings,
)
from jetstreampcg.static import (
StaticConsumerGroupConfig,
StaticConsumerGroupConsumerInstance,
create_static,
delete_static,
get_static_consumer_group_config,
list_static_active_members,
list_static_consumer_groups,
static_consume,
static_member_step_down,
validate_static_config,
)

__all__ = [
# Common
"ConsumerGroupConsumeContext",
"ConsumerGroupMsg",
# Elastic
"ElasticConsumerGroupConfig",
"ElasticConsumerGroupConsumerInstance",
"MemberMapping",
# Static
"StaticConsumerGroupConfig",
"StaticConsumerGroupConsumerInstance",
"__version__",
"add_members",
"compose_key",
"create_elastic",
"create_static",
"delete_elastic",
"delete_member_mappings",
"delete_members",
"delete_static",
"elastic_consume",
"elastic_get_partition_filters",
"elastic_is_in_membership_and_active",
"elastic_member_step_down",
"generate_partition_filters",
"get_elastic_consumer_group_config",
"get_static_consumer_group_config",
"list_elastic_active_members",
"list_elastic_consumer_groups",
"list_static_active_members",
"list_static_consumer_groups",
"set_member_mappings",
"static_consume",
"static_member_step_down",
"validate_static_config",
]
Loading
Loading