Perf: remove unnecessary locks, replace polling with channels, increase buffers#44
Merged
s3rius merged 1 commit intocopilot/optimize-natspy-benchmarksfrom Mar 27, 2026
Conversation
…crease buffers - IteratorSubscription: Replace Mutex+200ms polling with channel-based approach (spawned forwarder task + mpsc channel, same pattern as CallbackSubscription) - JetStreamMessage: Remove RwLock from Acker (ack methods take &self) - KeyValue: Remove RwLock from Store (all methods take &self) - PullConsumer/PushConsumer: Remove RwLock+clone (use Arc directly) - Stream: Remove RwLock (all used methods take &self) - ConsumersManager: Remove RwLock (propagated from Stream) - Counters: Remove RwLock (all used methods take &self) - Streamer: Increase channel buffer from 1 to 128 - Message: Optimize owned conversion to avoid re-borrowing (use into_string) - CallbackSubscription: Use Arc<Py<PyAny>> for callback to avoid GIL per message Agent-Logs-Url: https://github.com/taskiq-python/natsrpy/sessions/67772642-1ef4-4cf1-a068-9da30d75c528 Co-authored-by: s3rius <18153319+s3rius@users.noreply.github.com>
Copilot created this pull request from a session on behalf of
s3rius
March 27, 2026 16:07
View session
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Systematic pass to eliminate locking overhead and inefficient patterns that make natsrpy slower than nats.py in some benchmarks.
Remove unnecessary
RwLock/Mutexfrom 7 typesAll used
async_natsmethods on these types take&self, making locks pure overhead. ChangedArc<RwLock<T>>→Arc<T>:JetStreamMessage.acker— lock acquired on every ack/nack/termKeyValue.store— lock acquired on every KV get/put/delete/watchPullConsumer/PushConsumer— was cloning the entire consumer through a syncRwLockper operationStream,ConsumersManager,Counters— propagated from aboveObjectStoreretainsRwLockbecauseseal()requires&mut self.Replace
IteratorSubscriptionpolling with channel-based designOld design:
Arc<Mutex<Subscriber>>with 200ms timeout loop — constant lock churn and up to 200ms unsubscribe latency.New design: spawned forwarder task +
mpsc(128)channel + separate unsubscribe command channel. Same proven pattern already used byCallbackSubscription.Increase
Streamerchannel buffer 1 → 128Used by KV watch, keys iterator, consumer list, object store list. Capacity of 1 meant the producer could never get ahead of the consumer.
Optimize
Messageowned conversionTryFrom<async_nats::Message>was re-borrowing the owned value, forcingsubject.to_string()allocations. Now usesinto_string()for zero-copy extraction.Use
Arc<Py<PyAny>>for callback sharingCallbackSubscriptionwas acquiring the GIL per message just toclone_refthe callback. Replaced withArcclone (atomic inc only).