Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer not persisting Cursor #180

Closed
sschepens opened this issue Jan 30, 2017 · 44 comments
Closed

Consumer not persisting Cursor #180

sschepens opened this issue Jan 30, 2017 · 44 comments
Milestone

Comments

@sschepens
Copy link
Contributor

We have 4 consumers for the same topic, all are supposedly up to date (0 backlog).
But when restarting brokers, thus loading cursor state from zookeeper and bookkeeper, we're consistently getting one consumer reset a long time backwards.
Looking at zookeeper metadata that consumer is the only one to have an outdated markDeleteLedgerId and markDeleteEntryId, this happens for all 10 partitions of the consumer.
Not only has this consumer an outdated cursor, it seems to no be updating it as it consumes all the backlog generated when a broker is restarted and thus consumer is reset.

@sschepens
Copy link
Contributor Author

@merlimat @rdhabalia we really need help with this it's causing issues to us in production.

@merlimat
Copy link
Contributor

@sschepens Few questions to try identify the issue:

  • Is that consumer receiving at the same rate as the other consumer?
  • Is it keeping some messages in the consumer receiving queue?
  • Does it change if using ack timeout on the consumer ?
  • Can you try to remove the throttling of persisting the mark-delete position? There are 2 ways for doing that :
    • Change the broker-wide setting: managedLedgerDefaultMarkDeleteRateLimit=0. That mens to persist each and every update
    • Change the setting on a particular namespace :
      bin/pulsar-admin namespaces set-persistent --bookkeeper-ensemble 2 --bookkeeper-write-quorum 2 --bookkeeper-ack-quorum 2 --ml-mark-delete-max-rate 0

@merlimat
Copy link
Contributor

managedLedgerDefaultMarkDeleteRateLimit controls how frequently the position is being persisted, to avoid too many writes on BookKeeper when consuming messages.
The default is 0.1 (updates per seconds), which means 1 update every 10 seconds.

Given the current implementation (using a guava rate limiter), subsequent acks are tracked in memory. The problem might be related to the fact that if there are no more writes on the topic, the persisted position for the cursor doesn't get a chance to get updated.

@sschepens
Copy link
Contributor Author

@merlimat

Is that consumer receiving at the same rate as the other consumer?

Yes it is, we also tried consuming harder.

Is it keeping some messages in the consumer receiving queue?

It wouldn't appear so, as backlog reported by broker before restarting is 0, which would confirm that it considers all messages were acked.

Does it change if using ack timeout on the consumer ?

We're already using ack timeout for all consumers.

Can you try to remove the throttling of persisting the mark-delete position?

I'll try, we're now using managedLedgerDefaultMarkDeleteRateLimit=10 to theoretically save state 10 times per second.

Given the current implementation (using a guava rate limiter), subsequent acks are tracked in memory. The problem might be related to the fact that if there are no more writes on the topic, the persisted position for the cursor doesn't get a chance to get updated.

Hmm shouldn't this have at least a way of checking when state has not been saved for some time and force a save?

@merlimat
Copy link
Contributor

Right now all the checking is done when processing an acknowledgment. There was the plan of fixing that, by going through all the topics every 1min to flush all the pending cursor updates, thought it's still like this.

@sschepens
Copy link
Contributor Author

@merlimat does changing managedLedgerMaxMarkDeleteRate via pulsar-admin require restarting brokers to take changes?

@merlimat
Copy link
Contributor

it requires re-loading the topics, pulsar-admin namespaces unload ...

@sschepens
Copy link
Contributor Author

Right now all the checking is done when processing an acknowledgment. There was the plan of fixing that, by going through all the topics every 1min to flush all the pending cursor updates, thought it's still like this.

The strage thing is, all consumers consume at the same rate, and should have acked messages and saved state at about the same time, still this is the only consumer experiencing this issue.

Here are metrics showing what happened:
Thursday 17:20 Buenos Aires Time a bunch of messages were produced:
screenshot from 2017-01-30 16-08-45

About the same time, all 4 consumers, consumed all the messages at about the same rate:
screenshot from 2017-01-30 16-08-56

Friday 19:40 Buenos Aires Time following a bundle unload one consumer gets reset and starts consuming messages again:
screenshot from 2017-01-30 16-09-16

@merlimat
Copy link
Contributor

Are these consumers on the same subscription or in 4 different subscriptions?

@sschepens
Copy link
Contributor Author

4 different subscriptions, all are shared subscriptions and are hosted on the same 2 instances

@merlimat
Copy link
Contributor

Ok, anything in the logs that could signal there was any failure in recovering the cursor after the unload?
In case of failure to recover the cursor position, the behavior is to rollback to the oldest available message, to ensure no data is skipped.

@sschepens
Copy link
Contributor Author

I couln't find any log specifying a failure recovering the cursor, on the contrary, the cursor appeared to be recovered correctly, but on a really old position.

@merlimat
Copy link
Contributor

If you're able to reproduce it, can you past the stats-internal for the topic (after consuming and before restarting)?

@sschepens
Copy link
Contributor Author

I wasn't able to reproduce this again, but we had already set markDeleteRatio=0, and also reset the consumer.
I will inform if we experience this behavior again

@sschepens
Copy link
Contributor Author

@merlimat this has happened again for different consumers when restarting a broker. We have the following persistence for the namespace settings:

{
  "bookkeeperEnsemble" : 3,
  "bookkeeperWriteQuorum" : 3,
  "bookkeeperAckQuorum" : 3,
  "managedLedgerMaxMarkDeleteRate" : 0.0
}

This is absolutely not tolerable, we must find a way to fix this

@sschepens
Copy link
Contributor Author

sschepens commented Feb 3, 2017

By the way, this are all partitioned topics with 10 partitions.
Another thing, maybe it's unrelated, but we restarted brokers because they we're trying to connect to bookies which have been replaced, and do not exist in zookeeper.

@rdhabalia
Copy link
Contributor

@sschepens as @merlimat mentioned: is it possible to get stats-internal for a given topic before restarting the server.
bin/pulsar-admin persistent stats-internal persistent://property/cluster/ns/topic
it provides current state of the cursor and importantly markDeletePosition and individuallyDeletedMessages. if consumer missed to ack one of the message then markDeletePosition position will not move. So, if possible can we try to capture if it happens next time.

Sample output:
"cursors" : {
    "subscription1" : {
      "markDeletePosition" : "7029:1506",
      "individuallyDeletedMessages" : "[]",

@sschepens
Copy link
Contributor Author

@rdhabalia i'm checking this, we restarted another broker and the backlog bumped up yet again, and the consumer had no individuallyDeletedMessages pending...
We're going to try one more time capturing stats-internal before and after restart, but, anyways, this doesn't seem to be the case that one messages was not acked.

@sschepens
Copy link
Contributor Author

This is the behavior we're experiencing with backlog, the bumps are in the order of millions of messages.
And as I said, there are no, or maybe just a few unacked messages at a given time.
screenshot from 2017-02-03 15-22-46

@merlimat
Copy link
Contributor

merlimat commented Feb 3, 2017

@sschepens In the stats internal you can see also the real cursor position, apart from the individuallyDeletedMessages. That could give few hints on where to look. Also can you attach the logs for that particular topic, before/after restart? That might be useful as well.

@merlimat
Copy link
Contributor

merlimat commented Feb 3, 2017

@sschepens also, having both internal and normal stats, might be useful in setting up a quick test to try to reproduce the issue.

@sschepens
Copy link
Contributor Author

@merlimat here I upload topic stats-internal for each partition before and after restart.
The consumers that got bumped are:
f6f49168781f402c99ddfa871bc0e90c-fury-orderfeed-test2.loyal-api (blue)
f6f49168781f402c99ddfa871bc0e90c-fury-orders.purchases-api (violet)

As the name specifies, the first one is a test consumer, i'm more interested in the second one. I can see that the first did have several unacked messages before restarting, even though, with each restart it seems to get more and more lagged.

This is the screenshot of the last restart:
screenshot from 2017-02-03 16-02-22

topic-stats.zip

@estebangarcia
Copy link

I'm attaching the logs of the instance before and after restart.
logs_instance_i-04cd666923f24ceaf.zip

@sschepens
Copy link
Contributor Author

we're collectiong logs mentioning the topic from all other instances after the restart

@merlimat
Copy link
Contributor

merlimat commented Feb 3, 2017

@sschepens Do you have message retention on time enabled, correct? (I'm seeing older ledgers are not being deleted, just wanted to make sure)

@merlimat
Copy link
Contributor

merlimat commented Feb 3, 2017

@sschepens about the violet consumer, in the logs you pasted, does it gets backlog on all the partitions? I don't see any problem in the "after" stats for that subscription.

@estebangarcia
Copy link

These are the logs of all of our brokers before and after restart.
logs_all_instances.zip

@sschepens
Copy link
Contributor Author

@sschepens about the violet consumer, in the logs you pasted, does it gets backlog on all the partitions? I don't see any problem in the "after" stats for that subscription.

I cannot now that right now, we don't have our metrics split by partition, and we didn't get the normal stats before and after.
Maybe the issue is not related to what is shown on the stats-internal, but the issue is certainly present.

@sschepens
Copy link
Contributor Author

@sschepens Do you have message retention on time enabled, correct? (I'm seeing older ledgers are not being deleted, just wanted to make sure)

Yes, this is our retention policy:

{
  "retentionTimeInMinutes" : 10080,
  "retentionSizeInMB" : 2147483647
}

@sschepens
Copy link
Contributor Author

sschepens commented Feb 3, 2017

@sschepens about the violet consumer, in the logs you pasted, does it gets backlog on all the partitions? I don't see any problem in the "after" stats for that subscription.

@merlimat I would expect that the increased backlog should come from partitions that got unloaded from the restarted broker only. Which should be visible in the logs

@merlimat
Copy link
Contributor

merlimat commented Feb 3, 2017

@sschepens My point is that, looking that "after" internal stats, the cursor for f6f49168781f402c99ddfa871bc0e90c-fury-orders.purchases-api doens't show backlog :

{
  "entriesAddedCounter": 36299,
  "numberOfEntries": 6101124,
  "totalSize": 784103746,
  "currentLedgerEntries": 36299,
  "currentLedgerSize": 4663005,
  "lastLedgerCreatedTimestamp": "2017-02-03 17:47:24.912",
  "lastLedgerCreationFailureTimestamp": null,
  "waitingCursorsCount": 5,
  "pendingAddEntriesCount": 0,
  "lastConfirmedEntry": "69312:36298",
  "state": "LedgerOpened",
  "ledgers": [
....

    "f6f49168781f402c99ddfa871bc0e90c-fury-orders.purchases-api": {
      "markDeletePosition": "69312:36297",
      "readPosition": "69312:36299",
      "waitingReadOp": true,
      "pendingReadOps": 0,
      "messagesConsumedCounter": 36298,
      "cursorLedger": 69336,
      "cursorLedgerLastEntry": 31461,
      "individuallyDeletedMessages": "[]",
      "lastLedgerSwitchTimestamp": "2017-02-03 17:47:24.972",
      "state": "Open"
    },
....

If you subtract entriesAddedCounter - messagesConsumedCounter you get the backlog that it gets reported by brokers. In this case, 1 single in-flight message.

@merlimat
Copy link
Contributor

merlimat commented Feb 3, 2017

.. Ok, found the same thing on partition-6:

    "f6f49168781f402c99ddfa871bc0e90c-fury-orders.purchases-api": {
      "markDeletePosition": "59439:44524",
      "readPosition": "61542:30970",
      "waitingReadOp": false,
      "pendingReadOps": 0,
      "messagesConsumedCounter": -986146,
      "cursorLedger": -1,
      "cursorLedgerLastEntry": -1,
      "individuallyDeletedMessages": "[(59439:44525‥60577:45163], (60577:45164‥60886:818], (60886:819‥60886:21168], (60886:21169‥60886:27590], (60886:27591‥60886:41528], (60886:41529‥60886:49252], (60886:49253‥61145:10375], (61145:10376‥61145:36353], (61145:36354‥61542:7158], (61542:7159‥61542:14589], (61542:14590‥61542:26054], (61542:26055‥61542:29862], (61542:29863‥61542:30509], (61542:30510‥61542:30515], (61542:30516‥61542:30517], (61542:30518‥61542:30534], (61542:30535‥61542:30537], (61542:30538‥61542:30556], (61542:30557‥61542:30559], (61542:30560‥61542:30563], (61542:30565‥61542:30566], (61542:30567‥61542:30568], (61542:30569‥61542:30573], (61542:30577‥61542:30581], (61542:30582‥61542:30583], (61542:30585‥61542:30587], (61542:30589‥61542:30591], (61542:30610‥61542:30613], (61542:30619‥61542:30637], (61542:30638‥61542:30640], (61542:30641‥61542:30643], (61542:30647‥61542:30649], (61542:30672‥61542:30674], (61542:30675‥61542:30677], (61542:30678‥61542:30679]]",
      "lastLedgerSwitchTimestamp": "2017-02-03 18:33:43.431",
      "state": "NoLedger"
    },

@merlimat
Copy link
Contributor

merlimat commented Feb 3, 2017

So, that cursors had 1 single unacked message (59439:44525) before restart :

    "f6f49168781f402c99ddfa871bc0e90c-fury-orders.purchases-api": {
      "markDeletePosition": "59439:44524",
      "readPosition": "70009:12925",
      "waitingReadOp": true,
      "pendingReadOps": 0,
      "messagesConsumedCounter": 12922,
      "cursorLedger": -1,
      "cursorLedgerLastEntry": -1,
      "individuallyDeletedMessages": "[(59439:44525‥70009:12922]]",
      "lastLedgerSwitchTimestamp": "2017-02-03 18:16:48.247",
      "state": "NoLedger"
    },

@merlimat
Copy link
Contributor

merlimat commented Feb 3, 2017

The same message still appears as "unacked" even after restart. Is there the possibility that the consumer is getting some unexpected data and fails to consume that particular message?

You can dump the content of the message by doing :

$ pulsar-admin persistent peek-messages $MY_TOPIC -s $MY_SUBSCRIPTION

@estebangarcia
Copy link

The contents of the message seems to be fine. For some reason is still unacked.

message_id_59439_44525.txt

@estebangarcia
Copy link

The message is from yesterday for some reason it was never consumed.

@merlimat
Copy link
Contributor

merlimat commented Feb 3, 2017

So, if you have set the ack-timeout, this message should be keeping getting resent to the consumer. You can enable debug logs (either on broker or consumer) to check that.
Another option is to get a tcpdump capture of the traffic to the consumer (for port 6650).

@sschepens
Copy link
Contributor Author

@merlimat this messages is effectively being redelivered constantly.
This brings up the question, should pulsar support dead-letter queues?
I don't know what we can use to detect this realtime to prevent this issues.

@merlimat
Copy link
Contributor

merlimat commented Feb 3, 2017

Just to clarify, is the consumer acknowledging the message?

About how to detect that, one way is to monitor the storage size constantly increasing (though in your case, with retention that is not feasible). Otherwise, monitor the re-delivery rate for the consumers (in the topic stats).

About dead-letter queue. It's an interesting topic and I'd would support that, just not enabled by default. Potentially it could be an option on the namespace itself to define a dead-letter topic where to dump messages after N delivery attempts.
When placing messages there, the broker could attach few properties to the messages themselves.

@sschepens
Copy link
Contributor Author

Just to clarify, is the consumer acknowledging the message?

No, because it's failing to process it.

About how to detect that, one way is to monitor the storage size constantly increasing (though in your case, with retention that is not feasible). Otherwise, monitor the re-delivery rate for the consumers (in the topic stats).

I don't think message re-delivery rate is going to work, messages are potentially constantly being redelivered because they could spend ackTimeout in the internal queue of the consumers.
Plus, we would like to identify which precise messages are being constantly redelivered.

About dead-letter queue. It's an interesting topic and I'd would support that, just not enabled by default. Potentially it could be an option on the namespace itself to define a dead-letter topic where to dump messages after N delivery attempts.
When placing messages there, the broker could attach few properties to the messages themselves.

Of course, dead-letter queues should be optional, but there are also things to consider, should there be a dead-letter topic for each consumer? if not, how would we handle this?

@sschepens
Copy link
Contributor Author

sschepens commented Feb 3, 2017

Another thought @merlimat
Would it be possible for brokers to store the value of individuallyDeletedMessages every given time or something, and have new brokers read that value to restore the actual state of that consumer?
The only issue I can think of is that the value of individuallyDeletedMessages could grow forever, maybe we could allow for a given number of unacked messages and then print warnings or errors for that consumer?

The line of thought is: what can we possibly do to prevent this from happening? (a consumer re-processing all messages when a broker restarts because it did not acknowledge a message)

@sschepens
Copy link
Contributor Author

@merlimat I'm thinking, what would happen if a consumer like the one we had issues with has a single messages that it cannot process and that message get expired via ttl or retention policies, would broker immediately discard it from individuallyDeletedMessages list.
My concern is, when this message gets expired the broker could advance the markDeletePosition, if it would not do this, when bundle gets unloaded it would reset the consumer back to the beginning, wouldn't it?

@merlimat
Copy link
Contributor

merlimat commented Feb 7, 2017

Would it be possible for brokers to store the value of individuallyDeletedMessages every given time or something, and have new brokers read that value to restore the actual state of that consumer?

Yes, we thought about that many times. The thing is storing that information, avoids the big backlog on restart but doesn't address the root cause of why the message wasn't acked.

At the end, having a hole in the ack-sequence is always the symptom of a separate problem. And for the system, the data cannot be removed from disk because the storage is not a traditional key-value store where you can point-delete data.

About the size of the individuallyDeletedMessages data itself, the good thing is that we have
introduced a max amount of "unacked" messages that a consumer can have. After that, the broker will stop pushing messages to the consumer. I think default is 50K.

To summarize my point of view

  • It would be nice to address the ack-hole problem, even if only to a limited degree, in a way that has no performance impact when it's not needed:
    • eg: only do that if mark-delete falls behind more than X thousand messages compared to read-positition
  • Give an intuitive and easy to use way to reject messages
  • Better report in stats when an "ack-hole" is detected, for quicker identification or alerting

My concern is, when this message gets expired the broker could advance the markDeletePosition, if it would not do this, when bundle gets unloaded it would reset the consumer back to the beginning, wouldn't it?

When the broker applies TTL, it will acknowledge the messages and thus close the gap and move the markDeletePosition forward.

@merlimat
Copy link
Contributor

Closing this one since the changes are already implemented and merged in #276

@merlimat merlimat added this to the 1.17 milestone Mar 24, 2017
sijie pushed a commit to sijie/pulsar that referenced this issue Mar 4, 2018
hrsakai pushed a commit to hrsakai/pulsar that referenced this issue Dec 10, 2020
Signed-off-by: xiaolong.ran <rxl@apache.org>

### Motivation

- Add seek by time on consumer

### Modifications

- Add seek by time on consumer
- Add test case
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants