Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added slow consumer events #276

Merged
merged 4 commits into from
Dec 12, 2023
Merged

Conversation

mtmk
Copy link
Collaborator

@mtmk mtmk commented Dec 8, 2023

Slow consumers are subscriptions which are unable to process their messages fast enough. When the internal subscription channel is full oldest messages should be dropped and new messages should continued to be processed. This is aligned with NATS at most once delivery.

See also https://docs.nats.io/running-a-nats-service/nats_admin/slow_consumers

Slow consumers are subscriptions which are unable to process their messages
fast enough. When the internal subscription channel is full oldest messages
should be dropped and new messages should continued to be processed. This is
aligned with NATS at most once delivery.

See also https://docs.nats.io/running-a-nats-service/nats_admin/slow_consumers
@mtmk mtmk linked an issue Dec 8, 2023 that may be closed by this pull request
@mtmk mtmk requested a review from caleblloyd December 8, 2023 14:05
/// This value will be used for subscriptions internal bounded message channel capacity.
/// The default subscriber pending message limit is 65536.
/// </summary>
public int SubscriptionChannelCapacity { get; init; } = 65536;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple comments -

  1. We've been pretty consistent about abbreviating Subscription -> Sub, should we do that here?
  2. I'm a little concerned about raising it from 1k to 65k for scenarios in which large messages are buffered to the channel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW SubChannelCapacity could be misinterpreted as a 'Subchannel capacity' (I may have done so before realizing the CamelCase rules 😅 )

Agree that 65536 is a lot. 2.5MB underlying array at least IIRC (NatsMsg<T> is 36 bytes plus sizeof(T), not counting potential padding) with some non-memmove array copies along the way.

The upper bound limit to avoid the LOH is somewhere around 1024 on x64 due to NatsMsg<T> size and Dequeue<T> being based on powers of two, less if T is a large struct (although, I suppose we already have that problem?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good points about naming. how about SubPending*? (also more inline with the docs)

Size is a valid concern. Unfortunately we can't restrict by message size easily so I guess we should pick a reasonable limit. My thinking was that now we are dropping message when full, we should be a little more generous with capacity (and pick 65k from the docs). What do you recon a good default capacity should be? Leave it at 1K?

@to11mtm yes NatsMsg<T> is at 40 bytes with padding at the moment with T being a class.

as a side note, do you all think we should reconsider having NatsMsg<T> and NatJSMsg<T> as value types? initially I thought it'd be good to avoid object creation and GC as much as possible but since they are relatively large, are we doing more harm that good trying to relieve potential GC pressure?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as a side note, do you all think we should reconsider having NatsMsg and NatJSMsg as value types? initially I thought it'd be good to avoid object creation and GC as much as possible but since they are relatively large, are we doing more harm that good trying to relieve potential GC pressure?

The though has crossed my mind at least once. Alas, per your comment, it would hinder max-perf cases.

In the interest of not getting this PR too off topic, I'll file as an issue to discuss, I do have some ideas on how to possibly improve things (with potential tradeoffs ofc) as far as the size of the struct, at least for happy paths.

Copy link
Contributor

@to11mtm to11mtm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good overall, I found one capture optimization.

It would be nice to find a good solution for the default size of our buffer, not sure what our options are though.

/// This value will be used for subscriptions internal bounded message channel capacity.
/// The default subscriber pending message limit is 65536.
/// </summary>
public int SubscriptionChannelCapacity { get; init; } = 65536;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW SubChannelCapacity could be misinterpreted as a 'Subchannel capacity' (I may have done so before realizing the CamelCase rules 😅 )

Agree that 65536 is a lot. 2.5MB underlying array at least IIRC (NatsMsg<T> is 36 bytes plus sizeof(T), not counting potential padding) with some non-memmove array copies along the way.

The upper bound limit to avoid the LOH is somewhere around 1024 on x64 due to NatsMsg<T> size and Dequeue<T> being based on powers of two, less if T is a large struct (although, I suppose we already have that problem?)

@@ -20,7 +20,8 @@ public sealed class NatsSub<T> : NatsSubBase, INatsSub<T>
: base(connection, manager, subject, queueGroup, opts, cancellationToken)
{
_msgs = Channel.CreateBounded<NatsMsg<T>>(
NatsSubUtils.GetChannelOpts(opts?.ChannelOpts));
connection.GetChannelOpts(connection.Opts, opts?.ChannelOpts),
msg => connection.MessageDropped(this, _msgs?.Reader.Count ?? 0, msg));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we be able to use Connection here instead of connection? That would lower the capture cost from two captures (connection+this) to one (this)

Now only captures 'this' reference and not 'connection' in addition.
@mtmk
Copy link
Collaborator Author

mtmk commented Dec 11, 2023

Ran a few profiling sessions. 1024 seems to be the max we can have before channel's Dequeue array jumps to LOH

(I chose NatsMsg<NatsMemoryOwner<byte>> since that seems to have a larger size of 48 bytes)

using System.Threading.Channels;
using NATS.Client.Core;

var size = 1024;

var channel = Channel.CreateBounded<NatsMsg<NatsMemoryOwner<byte>>>(size);

for (int i = 0; i < size; i++)
{
    channel.Writer.TryWrite(new NatsMsg<NatsMemoryOwner<byte>>("", "", 1, null, default, null));
}

Console.WriteLine($"Count: {channel.Reader.Count}");

Console.ReadLine();

size=1024

image

size=1025

image

Also opts SubPending* rename.
Copy link
Contributor

@caleblloyd caleblloyd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@mtmk mtmk added the breaking label Dec 12, 2023
@mtmk mtmk merged commit aececb9 into main Dec 12, 2023
9 checks passed
@mtmk mtmk deleted the 260-add-support-for-slow-consumer-events branch December 12, 2023 07:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add support for slow consumer events
3 participants