Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

msg.ack_sync(timeout=15.0) but acknowledgment floor stays at 8 #525

Closed
mxchinegod opened this issue Jan 23, 2024 · 2 comments
Closed

msg.ack_sync(timeout=15.0) but acknowledgment floor stays at 8 #525

mxchinegod opened this issue Jan 23, 2024 · 2 comments
Labels
defect Suspected defect such as a bug or regression

Comments

@mxchinegod
Copy link

mxchinegod commented Jan 23, 2024

Observed behavior

  1. Subscribe with push to a stream category using a durable & deliver group with an ack_wait of 20.0
self.config = ConsumerConfig(
                    name=self.session
                    , deliver_group=self.session
                    , durable_name=self.session
                    , ack_wait=20
                )
await self.js.add_consumer(
                        stream=self.stream
                        , config=self.config
                        , deliver_subject=self.session
                )
self.sub = await self.js.subscribe(
                        self.category
                        , queue=self.session
                        , config=self.config
                    )
  1. consume from stream & log msg output after .ack_sync()
await msg.in_progress()
# ...processing code
sync = await msg.ack_sync(timeout=15)
print(sync)
  1. observe output says False for _ackd in the ack response from server
Msg(_client=<nats client v2.6.0>, subject='_INBOX.hEKvcQkCkUi1xXv9cU8mjF.hEKvcQkCkUi1xXv9cU8ugn1fed', reply='', data=b'', headers=None, _metadata=None, _ackd=False, _sid=1)
State:

  Last Delivered Message: Consumer sequence: 65,190 Stream sequence: 1,445 Last delivery: 9ms ago
    Acknowledgment Floor: Consumer sequence: 8 Stream sequence: 8 Last Ack: 2.52s ago
        Outstanding Acks: 1,000 out of maximum 1,000
    Redelivered Messages: 991
    Unprocessed Messages: 287,100
         Active Interest: Active using Queue Group bge_large_en_v15_workers

Meanwhile my middle function is humming along and running the ack_sync code for each message I convert into a payload and process, I am storing them locally on a different machine successfully.

I can only get 8 in acknowledgement floor.

Expected behavior

Acknowledgement floor goes up

Server and client version

nats-server 2.10.7
nats-py 2.6.0

Host environment

macOS Sonoma, Kubernetes chart with a 3-node cluster on Ubuntu 22

Steps to reproduce

  1. Subscribe with push to a stream category using a durable & deliver group with an ack_wait of 20.0
  2. consume from stream & log msg output after msg.ack_sync()
  3. observe output says False for _ackd in the ack response from server, observe acknowledgement floor sticking
@mxchinegod mxchinegod added the defect Suspected defect such as a bug or regression label Jan 23, 2024
@mxchinegod mxchinegod changed the title msg.ack_sync(timeout=15.0) returns msg._ackd=False msg.ack_sync(timeout=15.0) but acknowledgment floor stays at 8 Jan 23, 2024
@charbonnierg
Copy link
Contributor

charbonnierg commented Jan 23, 2024

I'm not a contributor to this project, but you're printing the reply (sync) and not the original message (msg) thus it is normal that _ackd is False. If you take a look at msg._ackd you should expect True indeed.

Also, not sure why you are calling msg.in_progress() but you already have an ack_wait set to 20 so it should not be necessary to call in_progress() unless you're taking more than 20 seconds to process the message and you want to avoid a second delivery (maybe that's the case, I just wanted to warn you)

@mxchinegod
Copy link
Author

unless you're taking more than 20 seconds to process the message and you want to avoid a second delivery (maybe that's the case, I just wanted to warn you)

this is indeed the case

also, I addressed the unwanted behavior by setting max unacknowledged to n where n=number of consumers in the group.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

No branches or pull requests

2 participants