New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CHANGED] Use different subjects per PR when using Fetch and FetchBatch #1237
Conversation
@@ -2751,10 +2753,18 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { | |||
// wait this time. | |||
noWait = false | |||
err = sendReq() | |||
} else if err == ErrTimeout && len(msgs) == 0 { | |||
} else if err == ErrTimeout { | |||
// If we get a 408, we will bail if we already collected some |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wallyqs I'm not sure that's needed after this change, but I may be wrong. Was this added to suppress timeout from previous Fetch()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was to avoid a client getting stuck on 408 errors due to reaching the max waiting limits, I think behavior of the server has advanced enough that this might not be longer needed as you mention 7669068
There was a test that got added for that but we removed it at some point, so maybe could add a version of it back just in case: 7669068#diff-9e6bdee4051a6e36d6d507dcef63b5f9a39b281e421c08f02ec74a0fb6639e3aR4501
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should leave this as it is.
Test you mentioned is failing with the newer versions of the server, but the behavior of Fetch is different than FetchBatch, as it operates with both no_wait
true & false.
Let's leave it here as it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes looks like would be better to leave it as is for now, it might still possible to run into multiple 408s condition according to that test when calling it concurrently:
=== RUN TestJetStream_PullSubscribeMaxWaiting/psub_n=3_r=1/blocking_fetch
inbox=_INBOX.6c2FnkK9m5H9HgXWpzUT6r.6c2FnkK9m5H9HgXWpzUUDx, err=nats: requests pending, msgs=0, header=map[Description:[Request Timeout] Nats-Pending-Bytes:[0] Nats-Pending-Messages:[99] Status:[408]]
inbox=_INBOX.6c2FnkK9m5H9HgXWpzUT6r.6c2FnkK9m5H9HgXWpzUUAf, err=<nil>, msgs=0, header=map[]
inbox=_INBOX.6c2FnkK9m5H9HgXWpzUT6r.6c2FnkK9m5H9HgXWpzUUHF, err=nats: requests pending, msgs=0, header=map[Description:[Request Timeout] Nats-Pending-Bytes:[0] Nats-Pending-Messages:[99] Status:[408]]
inbox=_INBOX.6c2FnkK9m5H9HgXWpzUT6r.6c2FnkK9m5H9HgXWpzUUKX, err=<nil>, msgs=0, header=map[]
inbox=_INBOX.6c2FnkK9m5H9HgXWpzUT6r.6c2FnkK9m5H9HgXWpzUUDx, err=nats: timeout, msgs=0, header=map[Description:[Request Timeout] Nats-Pending-Bytes:[0] Nats-Pending-Messages:[99] Status:[408]]
inbox=_INBOX.6c2FnkK9m5H9HgXWpzUT6r.6c2FnkK9m5H9HgXWpzUUNp, err=<nil>, msgs=0, header=map[]
inbox=_INBOX.6c2FnkK9m5H9HgXWpzUT6r.6c2FnkK9m5H9HgXWpzUUHF, err=nats: timeout, msgs=0, header=map[Description:[Request Timeout] Nats-Pending-Bytes:[0] Nats-Pending-Messages:[99] Status:[408]]
inbox=_INBOX.6c2FnkK9m5H9HgXWpzUT6r.6c2FnkK9m5H9HgXWpzUUAf, err=<nil>, msgs=1, header=map[]
inbox=_INBOX.6c2FnkK9m5H9HgXWpzUT6r.6c2FnkK9m5H9HgXWpzUUDx, err=nats: timeout, msgs=0, header=map[Description:[Request Timeout] Nats-Pending-Bytes:[0] Nats-Pending-Messages:[99] Status:[408]]
inbox=_INBOX.6c2FnkK9m5H9HgXWpzUT6r.6c2FnkK9m5H9HgXWpzUUKX, err=<nil>, msgs=1, header=map[]
inbox=_INBOX.6c2FnkK9m5H9HgXWpzUT6r.6c2FnkK9m5H9HgXWpzUUNp, err=nats: requests pending, msgs=1, header=map[Description:[Request Timeout] Nats-Pending-Bytes:[0] Nats-Pending-Messages:[1] Status:[408]]
inbox=_INBOX.6c2FnkK9m5H9HgXWpzUT6r.6c2FnkK9m5H9HgXWpzUUHF, err=nats: timeout, msgs=0, header=map[Description:[Request Timeout] Nats-Pending-Bytes:[0] Nats-Pending-Messages:[1] Status:[408]]
inbox=_INBOX.6c2FnkK9m5H9HgXWpzUT6r.6c2FnkK9m5H9HgXWpzUUAf, err=nats: requests pending, msgs=2, header=map[Description:[Request Timeout] Nats-Pending-Bytes:[0] Nats-Pending-Messages:[1] Status:[408]]
inbox=_INBOX.6c2FnkK9m5H9HgXWpzUT6r.6c2FnkK9m5H9HgXWpzUUDx, err=nats: timeout, msgs=0, header=map[Description:[Request Timeout] Nats-Pending-Bytes:[0] Nats-Pending-Messages:[1] Status:[408]]
inbox=_INBOX.6c2FnkK9m5H9HgXWpzUT6r.6c2FnkK9m5H9HgXWpzUUKX, err=nats: requests pending, msgs=2, header=map[Description:[Request Timeout] Nats-Pending-Bytes:[0] Nats-Pending-Messages:[1] Status:[408]]
inbox=_INBOX.6c2FnkK9m5H9HgXWpzUT6r.6c2FnkK9m5H9HgXWpzUUHF, err=<nil>, msgs=0, header=map[]
inbox=_INBOX.6c2FnkK9m5H9HgXWpzUT6r.6c2FnkK9m5H9HgXWpzUUDx, err=nats: timeout, msgs=0, header=map[Description:[Request Timeout] Nats-Pending-Bytes:[0] Nats-Pending-Messages:[1] Status:[408]]
inbox=_INBOX.6c2FnkK9m5H9HgXWpzUT6r.6c2FnkK9m5H9HgXWpzUUHF, err=nats: timeout, msgs=1, header=map[Description:[Request Timeout] Nats-Pending-Bytes:[0] Nats-Pending-Messages:[1] Status:[408]]
inbox=_INBOX.6c2FnkK9m5H9HgXWpzUT6r.6c2FnkK9m5H9HgXWpzUUDx, err=nats: timeout, msgs=0, header=map[Description:[Request Timeout] Nats-Pending-Bytes:[0] Nats-Pending-Messages:[1] Status:[408]]
inbox=_INBOX.6c2FnkK9m5H9HgXWpzUT6r.6c2FnkK9m5H9HgXWpzUUHF, err=nats: timeout, msgs=1, header=map[Description:[Request Timeout] Nats-Pending-Bytes:[0] Nats-Pending-Messages:[1] Status:[408]]
inbox=_INBOX.6c2FnkK9m5H9HgXWpzUT6r.6c2FnkK9m5H9HgXWpzUUDx, err=nats: timeout, msgs=0, header=map[Description:[Request Timeout] Nats-Pending-Bytes:[0] Nats-Pending-Messages:[1] Status:[408]]
inbox=_INBOX.6c2FnkK9m5H9HgXWpzUT6r.6c2FnkK9m5H9HgXWpzUUHF, err=nats: timeout, msgs=1, header=map[Description:[Request Timeout] Nats-Pending-Bytes:[0] Nats-Pending-Messages:[1] Status:[408]]
inbox=_INBOX.6c2FnkK9m5H9HgXWpzUT6r.6c2FnkK9m5H9HgXWpzUUDx, err=nats: timeout, msgs=0, header=map[Description:[Request Timeout] Nats-Pending-Bytes:[0] Nats-Pending-Messages:[1] Status:[408]]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some nits and future-proof concerns.
dfb9100
to
75c2d4a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This PR adds separate reply subjects for each pull request sent with
Fetch()
orFetchBatch()
. This allows us to ignore timeouts from different calls.