New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unsubscribe does not allow consumption of already fetched messages #77
Comments
Is the preferred way to use an encoded conn and BindRecvQueueChan instead? |
Unsubscribe should be immediate, hence why the channel is closed. If you want more control, you could auto-unsubscribe if you know how many messages you want, or use Go channels externally as you suggest. |
I need to unsubscribe without losing any messages that have already been sent to the client. Because i am subscribed as part of a queue group, if I lose messages after they have already been sent, there isn't a way to recover them. In order to not lose messages, the unsubscribe message would need to be sent to and processed by nats and all messages before it processed by the client and read on the subscriber channel. To accomplish this, I am testing moving the unsubscribe request ahead of the call to removeSub and waiting for a flush before proceeding. I realize this is not appropriate for all use cases but I would think more appropriate for queue group subscribers. |
What is the condition by which you will unsubscribe? Is this a shutdown On Mon, Sep 28, 2015 at 6:45 PM, Nick Carenza notifications@github.com
|
Thank you for your attention on this issue so far @derekcollison. I am trying to do a graceful shutdown of a queue-group subscriber using the pub/sub model. The publisher doesn't know about the number of potential queue-group subscribers so I don't think I could use the request/reply model multiple-reply-gist. |
Any further thought on this issue @derekcollison? |
Nick, just curious, did you see the comment Derek made on your gist? The best practice for the way you're using request-reply is to use |
Nick, If I understand your issue, is that messages are sent to a subscriber that is going away, and all "pending" messages for that subscriber are lost since they were given to this subscriber alone because the subscriber is a "queue subscriber".
Sub2 processes 2, and then unsubscribe, so messages 4 and 6 that were sent to it are "lost" (meaning that they are not going to be processed). If my understanding is correct, Derek and Larry's answers are still valid. By using request-reply, you let the publisher (requestor) know that someone has indeed processed this message. In the example above, since sub2 would stop after message 2, the request for message 4 and 6 would timeout, so they could be resent. Another subscriber (sub1 here, or other if more are started in between) would be able to process those messages. Does this work for you? |
@kozlovic @mcqueary The request-reply model works if the requestor knows or cares about the number of queue groups. My gist just demonstrated to me that request-reply would work with multiple queue group subscribers because I wasn't sure if it would. Since the publisher doesn't know how many queue groups might be subscribed to any particular message (maybe none are), it can't know if the number of replies it has gotten is enough. @kozlovic This is how the system looks using your diagram style
It might help too to understand what these different subscribers are doing. Each group is responsible for processing events independently and dispatching them to 3rd party services. I want to be able to integrate with any new 3rd party service by creating a new group that is responsible for translating our events into the format expected by the new 3rd party service. The app that's originating the events isn't aware of the services we are integrating with. The only service that knows how many queue groups are interested in a subject is nats. |
So are you looking for a barrier from the requestor side for app level acks? There may be more that exists that the requestor may not care about. If N becomes large then you may want to use a delegate pattern where the original requestor knows about the delgator only and the delgator pattern understand the downstream processing groups. |
@nickcarenza Ok, I think understand now. As you have seen, solving this problem would require coding at the application level. NATS is really fire-and-forget and there is no way to have the client fetch only a message at a time (I am not talking about auto-unsubscribe, but really getting one message at a time from the server until the application decides that it wants to unsubscribe). You could follow derek's advice, or another approach would be to have each subscriber communicate with the publisher when starting (they would send their group name for instance). The publisher would then maintain a list and know if this is a new group or not and adjust the number of expected replies (on a message basis since group could be added/removed dynamically I suppose). There is much more to that, but that would be one way. But I think what you are asking is that if the application could call sub.Unsubscribe() (which would cause the server, when receiving the protocol, to remove this subscriber from the group and cause new messages to go only to the other members of the group), but still be able to have maybe a specific API call to get all the previously fetched messages, then you would - I guess - resend those messages from the subscriber hoping that others will pick them up. Do I understand you correctly? Still, you cannot ignore that even with that call, you could still lose those messages if the sub, before having a chance to resend those messages, crashes. So from a logical point of view, I think the publisher should be the one deciding if a message is correctly processed by all groups, which makes us go back to the application solutions in paragraph 2... |
@nickcarenza I understand now. To summarize: Since NATS currently offers no built-in delivery guarantee, you need a way to ensure message delivery to all queue groups subscribed to subject If I have stated the problem correctly, then a few more questions:
If the number of services (queue groups) isn't rapidly changing (i.e. <10 second intervals), you might try creating a simple "Who's Alive" protocol, where the publisher polls the subject to find out how many service queue groups are up. The publisher simply counts the number of responses, which identifies the number of queue groups, and uses this number as the number of acknowledgements to wait for when it sends a 'data' message. It is simple-minded, and not foolproof, but it will work. Since doing that every time you publish a message would create unnecessary overhead, you might consider having the publisher perform this "Who's Alive" request in a separate thread/goroutine periodically and update a global variable. Then for every new data message, perform a Request and use this gvar value as the number of responses to expect. |
I think I have resolved to an approach that is a combination of all of your helpful suggestions. As @derekcollison suggested, I have created an delegator that is a thin layer between the app and nats. And as @mcqueary and @kozlovic suggested, I need to make that delegator aware of how many subscribers to expect. This approach comes with a new challenge in the case of a failure: resending the message to failed queue groups or rebroadcasting to all and deduping. There isn't currently a way in nats to publish a message to a particular queue-group so i need an additional way for workers to receive messages (perhaps subscribing to the subject name prefixed with the group name). I don't think deduping would be viable in my case. While such a system seems necessary for message durability in the case of _non_graceful shutdowns, wouldn't it be helpful in more modest cases for the client to not destroy messages? Do you gentlemen have any thoughts on the solution I proposed earlier and have been using?
https://github.com/the-control-group/nats/blob/master/nats.go#L1415-L1424 EDIT If Unsubscribe did do something like this, perhaps it would be in a new method like |
@nickcarenza I am not sure how making the unsubscribe followed by a flush followed by the removal of the subscription helps you in any way. The call to removeSub is going to get rid of all the "pending" messages. I thought that this is what you did not want, that is, you wanted to preserve those messages. Am I missing something? |
@kozlovic Once Nats received the unsub it will stop sending messages for the subject to the server, then it will receive the PING and send a PONG. By waiting for the PONG I can be sure that every message received before it has already been processed by the client and that no more messages will follow it. |
@nickcarenza: I thought I'd offer slight variation on Larry's suggestion: When a subscriber comes up online, it immediately sends a heartbeat so the Regarding recovery after an error, a backchannel as you suggested would let On Fri, Oct 16, 2015 at 1:55 PM, Larry McQueary notifications@github.com
Colin Sullivan | Principal Engineer |
@nickcarenza you are right! |
@ColinSullivan1 I nearly suggested exactly what you've suggested here -- a 'beacon' message published at intervals. I think the only drawback is needing to prune the list if a queue group simply goes away permanently. . |
Yep, that pruning could come from a shutdown message sent by the third On Fri, Oct 16, 2015 at 2:53 PM, Larry McQueary notifications@github.com
Colin Sullivan | Principal Engineer |
Perhaps there is a good reason for this, but it appears that Unsubscribe sets it's message channel to nil which is potentially a destructive action.
Unless there is another way to consume messages that have already been fetched by the client. I would expect the channel to be closed to prevent future writes but to remain available for reads.
I am working on a PR that addresses this. I am curious though if this was intended behavior and if I am misusing it.
The text was updated successfully, but these errors were encountered: