Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unsubscribe does not allow consumption of already fetched messages #77

Closed
nickcarenza opened this issue Sep 22, 2015 · 19 comments
Closed

Comments

@nickcarenza
Copy link

Perhaps there is a good reason for this, but it appears that Unsubscribe sets it's message channel to nil which is potentially a destructive action.

Unless there is another way to consume messages that have already been fetched by the client. I would expect the channel to be closed to prevent future writes but to remain available for reads.

s.unsubscribe()
...consume remaining messages in channel

I am working on a PR that addresses this. I am curious though if this was intended behavior and if I am misusing it.

@nickcarenza
Copy link
Author

Is the preferred way to use an encoded conn and BindRecvQueueChan instead?

@derekcollison
Copy link
Member

Unsubscribe should be immediate, hence why the channel is closed. If you want more control, you could auto-unsubscribe if you know how many messages you want, or use Go channels externally as you suggest.

@nickcarenza
Copy link
Author

I need to unsubscribe without losing any messages that have already been sent to the client. Because i am subscribed as part of a queue group, if I lose messages after they have already been sent, there isn't a way to recover them.

In order to not lose messages, the unsubscribe message would need to be sent to and processed by nats and all messages before it processed by the client and read on the subscriber channel.

To accomplish this, I am testing moving the unsubscribe request ahead of the call to removeSub and waiting for a flush before proceeding.

I realize this is not appropriate for all use cases but I would think more appropriate for queue group subscribers.

@derekcollison
Copy link
Member

What is the condition by which you will unsubscribe? Is this a shutdown
operation? What happens if the queue worker fails? In general the requestor
should have a timeout and retry (or do other logic) if the requests is not
answered. The responders, in my opinion, even as part of a queue group
should be simple and assumed to easily fail or be stopped.

On Mon, Sep 28, 2015 at 6:45 PM, Nick Carenza notifications@github.com
wrote:

I need to unsubscribe without losing any messages that have already been
sent to the client. Because i am subscribed as part of a queue group, if I
lose messages after they have already been sent, there isn't a way to
recover them.

In order to not lose messages, the unsubscribe message would need to be
sent to and processed by nats and all messages before it processed by the
client and read on the subscriber channel.

To accomplish this, I am testing moving the unsubscribe request
https://github.com/nats-io/nats/blob/master/nats.go#L1414 ahead of the
call to removeSub
https://github.com/nats-io/nats/blob/master/nats.go#L1409 and waiting
for a flush before proceeding.

I realize this is not appropriate for all use cases but I would think more
appropriate for queue group subscribers.


Reply to this email directly or view it on GitHub
#77 (comment).

@nickcarenza
Copy link
Author

Thank you for your attention on this issue so far @derekcollison.

I am trying to do a graceful shutdown of a queue-group subscriber using the pub/sub model. The publisher doesn't know about the number of potential queue-group subscribers so I don't think I could use the request/reply model multiple-reply-gist.

@nickcarenza
Copy link
Author

Any further thought on this issue @derekcollison?

@mcqueary
Copy link

Nick, just curious, did you see the comment Derek made on your gist? The best practice for the way you're using request-reply is to use Request() and specify a timeout. This ensures that your requestor can retry the request if it times out, and not worry about losing anything, regardless of whether (or when) queue subscribers come and go.

@kozlovic
Copy link
Member

Nick,

If I understand your issue, is that messages are sent to a subscriber that is going away, and all "pending" messages for that subscriber are lost since they were given to this subscriber alone because the subscriber is a "queue subscriber".

Pub sends 1, 2, 3, 4, 5, 6 => Server => sub1: 1, 3, 5
                                     => sub2: 2, 4, 6

Sub2 processes 2, and then unsubscribe, so messages 4 and 6 that were sent to it are "lost" (meaning that they are not going to be processed).

If my understanding is correct, Derek and Larry's answers are still valid. By using request-reply, you let the publisher (requestor) know that someone has indeed processed this message. In the example above, since sub2 would stop after message 2, the request for message 4 and 6 would timeout, so they could be resent. Another subscriber (sub1 here, or other if more are started in between) would be able to process those messages.

Does this work for you?

@nickcarenza
Copy link
Author

@kozlovic @mcqueary The request-reply model works if the requestor knows or cares about the number of queue groups. My gist just demonstrated to me that request-reply would work with multiple queue group subscribers because I wasn't sure if it would.

Since the publisher doesn't know how many queue groups might be subscribed to any particular message (maybe none are), it can't know if the number of replies it has gotten is enough.

@kozlovic This is how the system looks using your diagram style

Pub sends 1, 2, 3, 4, 5, 6 => Server => groupA-sub1: 1, 3, 5
                                     => groupA-sub2: 2, 4, 6
                                     ...
                                     => groupN-sub1: 1, 2, 3
                                     => groupN-sub2: 4, 5, 6

It might help too to understand what these different subscribers are doing. Each group is responsible for processing events independently and dispatching them to 3rd party services. I want to be able to integrate with any new 3rd party service by creating a new group that is responsible for translating our events into the format expected by the new 3rd party service. The app that's originating the events isn't aware of the services we are integrating with.

The only service that knows how many queue groups are interested in a subject is nats.

@derekcollison
Copy link
Member

So are you looking for a barrier from the requestor side for app level acks? There may be more that exists that the requestor may not care about. If N becomes large then you may want to use a delegate pattern where the original requestor knows about the delgator only and the delgator pattern understand the downstream processing groups.

@kozlovic
Copy link
Member

@nickcarenza Ok, I think understand now. As you have seen, solving this problem would require coding at the application level. NATS is really fire-and-forget and there is no way to have the client fetch only a message at a time (I am not talking about auto-unsubscribe, but really getting one message at a time from the server until the application decides that it wants to unsubscribe).

You could follow derek's advice, or another approach would be to have each subscriber communicate with the publisher when starting (they would send their group name for instance). The publisher would then maintain a list and know if this is a new group or not and adjust the number of expected replies (on a message basis since group could be added/removed dynamically I suppose). There is much more to that, but that would be one way.

But I think what you are asking is that if the application could call sub.Unsubscribe() (which would cause the server, when receiving the protocol, to remove this subscriber from the group and cause new messages to go only to the other members of the group), but still be able to have maybe a specific API call to get all the previously fetched messages, then you would - I guess - resend those messages from the subscriber hoping that others will pick them up. Do I understand you correctly?

Still, you cannot ignore that even with that call, you could still lose those messages if the sub, before having a chance to resend those messages, crashes. So from a logical point of view, I think the publisher should be the one deciding if a message is correctly processed by all groups, which makes us go back to the application solutions in paragraph 2...

@mcqueary
Copy link

@nickcarenza
TL;DR (and edit) It seems @kozlovic and I came to more or less the same conclusion independently about using a separate registration or 'who's alive' protocol (or perhaps both). Like him, I'm unsure if you have remaining doubts about possible edge cases with Unsubscribe.

I understand now. To summarize: Since NATS currently offers no built-in delivery guarantee, you need a way to ensure message delivery to all queue groups subscribed to subject FOO. NATS request-reply is insufficient to this task because your requestor has no a priori knowledge of how many queue groups are subscribed to FOO, in order to know how many responses to expect.

If I have stated the problem correctly, then a few more questions:

  • At what rate is/are the publisher(s) publishing messages to FOO?
  • How large might the number of 3rd party services become?
  • How often might 3rd party services (and hence their queue groups) come and go?

If the number of services (queue groups) isn't rapidly changing (i.e. <10 second intervals), you might try creating a simple "Who's Alive" protocol, where the publisher polls the subject to find out how many service queue groups are up. The publisher simply counts the number of responses, which identifies the number of queue groups, and uses this number as the number of acknowledgements to wait for when it sends a 'data' message.

It is simple-minded, and not foolproof, but it will work.

Since doing that every time you publish a message would create unnecessary overhead, you might consider having the publisher perform this "Who's Alive" request in a separate thread/goroutine periodically and update a global variable. Then for every new data message, perform a Request and use this gvar value as the number of responses to expect.

@nickcarenza
Copy link
Author

I think I have resolved to an approach that is a combination of all of your helpful suggestions. As @derekcollison suggested, I have created an delegator that is a thin layer between the app and nats. And as @mcqueary and @kozlovic suggested, I need to make that delegator aware of how many subscribers to expect.

This approach comes with a new challenge in the case of a failure: resending the message to failed queue groups or rebroadcasting to all and deduping. There isn't currently a way in nats to publish a message to a particular queue-group so i need an additional way for workers to receive messages (perhaps subscribing to the subject name prefixed with the group name). I don't think deduping would be viable in my case.

While such a system seems necessary for message durability in the case of _non_graceful shutdowns, wouldn't it be helpful in more modest cases for the client to not destroy messages? Do you gentlemen have any thoughts on the solution I proposed earlier and have been using?

In order to not lose messages, the unsubscribe message would need to be sent to and processed by nats and all messages before it processed by the client and read on the subscriber channel.

To accomplish this, I am testing moving the unsubscribe request ahead of the call to removeSub and waiting for a flush before proceeding.

https://github.com/the-control-group/nats/blob/master/nats.go#L1415-L1424
instead of
https://github.com/nats-io/nats/blob/master/nats.go#L1409-L1419

EDIT If Unsubscribe did do something like this, perhaps it would be in a new method like UnsubscribeSync(timeout time.Duration) or something like that since it is significantly different.

@kozlovic
Copy link
Member

@nickcarenza I am not sure how making the unsubscribe followed by a flush followed by the removal of the subscription helps you in any way. The call to removeSub is going to get rid of all the "pending" messages. I thought that this is what you did not want, that is, you wanted to preserve those messages. Am I missing something?

@nickcarenza
Copy link
Author

@kozlovic Once Nats received the unsub it will stop sending messages for the subject to the server, then it will receive the PING and send a PONG. By waiting for the PONG I can be sure that every message received before it has already been processed by the client and that no more messages will follow it.

@ColinSullivan1
Copy link
Member

@nickcarenza: I thought I'd offer slight variation on Larry's suggestion:
The subscribers (third party services) could publish a heartbeat message
with the queue group name in it at a periodic interval, rather than waiting
for the publisher to solicit them for this information. From these
heartbeat messages, the publisher maintains a list of known queue groups,
and thus the number of responses to wait for. With this, the publisher
could detect a missing queue group before sending requests, allowing
corrective action / logging, etc, to occur earlier.

When a subscriber comes up online, it immediately sends a heartbeat so the
publisher is aware of it. If the publisher has been started after the
subscribers, where it'd likely have to solicit for heartbeats (Larry's
suggestion), or wait and collect heartbeats for the right period of time.

Regarding recovery after an error, a backchannel as you suggested would let
the publisher can resend data to a particular queue group. If the
publisher hasn't received a response from a particular queue group, it
could periodically retry on a subject unique to that particular third party
service. This keeps the handling of a failure at the application level,
allowing you to handle a much wider set of failure cases (e.g. crash
processing a received message, etc), than you could solely relying on NATs.

On Fri, Oct 16, 2015 at 1:55 PM, Larry McQueary notifications@github.com
wrote:

@nickcarenza https://github.com/nickcarenza I understand now. To
summarize: Since NATS currently offers no built-in delivery guarantee, you
need a way to ensure message delivery to all queue groups subscribed to
subject FOO. NATS request-reply is insufficient to this task because your
requestor has no a priori knowledge of how many queue groups are
subscribed to FOO, in order to know how many responses to expect.

If I have stated the problem correctly, then a few more questions:

  • At what rate is/are the publisher(s) publishing messages to FOO?
  • How large might the number of 3rd party services become?
  • How often might 3rd party services (and hence their queue groups)
    come and go?

If the number of services (queue groups) isn't rapidly changing (i.e. <10
second intervals), you might try creating a simple "Who's Alive" protocol,
where the publisher polls the subject to find out how many service queue
groups are up. The publisher simply counts the number of responses. then
Request, specifying that number of acknowledgements to wait for.

It is simple-minded, and not foolproof, but it will work.

Since doing that every time you publish a message would create unnecessary
overhead, you might consider having the publisher perform this "Who's
Alive" request in a separate thread/goroutine periodically and update a
global variable. Then for every new data message, perform a Request and use
this gvar value as the number of responses to expect.


Reply to this email directly or view it on GitHub
#77 (comment).

Colin Sullivan | Principal Engineer
colin.sullivan@apcera.com brian@apcera.com | @ColinSullivan01

@kozlovic
Copy link
Member

@nickcarenza you are right!

@mcqueary
Copy link

@ColinSullivan1 I nearly suggested exactly what you've suggested here -- a 'beacon' message published at intervals. I think the only drawback is needing to prune the list if a queue group simply goes away permanently. .

@ColinSullivan1
Copy link
Member

Yep, that pruning could come from a shutdown message sent by the third
party service application.

On Fri, Oct 16, 2015 at 2:53 PM, Larry McQueary notifications@github.com
wrote:

@ColinSullivan1 https://github.com/ColinSullivan1 I nearly suggested
exactly what you've suggested here -- a 'beacon' message published at
intervals. I think the only drawback is needing to prune the list if a
queue groups simply goes away permanently. .


Reply to this email directly or view it on GitHub
#77 (comment).

Colin Sullivan | Principal Engineer
colin.sullivan@apcera.com brian@apcera.com | @ColinSullivan01

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants