-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update publisher to re-connect #33
Conversation
The publisher will now re-connect when NATS Streaming becomes unavailable. Tested within the gateway code on Docker Swarm. Thanks to @vosmith for initiating this work. Signed-off-by: Alex Ellis (VMware) <alexellis2@gmail.com>
@@ -50,7 +64,26 @@ func (q *NatsQueue) Queue(req *queue.Request) error { | |||
log.Println(err) | |||
} | |||
|
|||
err = q.nc.Publish(q.Topic, out) | |||
err = q.stanConn.Publish(q.Topic, out) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kozlovic I find that this code blocks indefinitely if I scale NATS Streaming to zero replicas. It would be better, to check if the stanConn is ready/healthy before submitting the work, or potentially doing into a back-off loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Publish()
is a synchronous call that waits for the ack back from the server. The timeout is specified when creating the stan connection with the PubAckWait(t time.Duration)
option.
If you don't want to block, you can use the async version PublishAsync()
, but then you should setup an ack handler if you want to ensure that messages are persisted ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would recommend you having a look at the new client connection lost handler, but this requires running with server 0.10.2 though.
Regardless, if you keep the reconnect logic here, I believe some changes are required.
@@ -50,7 +64,26 @@ func (q *NatsQueue) Queue(req *queue.Request) error { | |||
log.Println(err) | |||
} | |||
|
|||
err = q.nc.Publish(q.Topic, out) | |||
err = q.stanConn.Publish(q.Topic, out) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Publish()
is a synchronous call that waits for the ack back from the server. The timeout is specified when creating the stan connection with the PubAckWait(t time.Duration)
option.
If you don't want to block, you can use the async version PublishAsync()
, but then you should setup an ack handler if you want to ensure that messages are persisted ok.
return nil, err | ||
} | ||
|
||
stanConn, err := stan.Connect(clusterID, clientID, stan.NatsConn(natsConn)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In current go-nats-streaming release, we have added a handler to be notified when the connection is lost. It is a bit too long to explain here, so have a look at this.
Note that your current reconnect handling may still be needed since once the connection is gone, you need to recreate connection and subs, but maybe you should rely on the stan connection handler instead. I would recommend not passing the NATS connection (let the stan create and own it) unless you need specific settings. Note that you can pass the URL to stan with stan.NatsURL() option.
return func(c *nats.Conn) { | ||
oldConn := q.stanConn | ||
|
||
defer oldConn.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the way you handle reconnect (at the NATS low level), it is possible that the Streaming server still has the connection registered (remember, streaming server connects to NATS and clients connect to NATS, so no direct connection between client and server) because it did not detect the miss of HBs from client.
So this is pretty broken actually, because it is possible that your Connect() below will fail due to a duplicate (same clientID will be detected as duplicate by Streaming server, which will contact the old connection to its INBOX which will be valid since it is reconnected to NATS) and still you will close the old connection.
So with this current model, I would first close the old connection and then create the new one.
@@ -50,7 +64,26 @@ func (q *NatsQueue) Queue(req *queue.Request) error { | |||
log.Println(err) | |||
} | |||
|
|||
err = q.nc.Publish(q.Topic, out) | |||
err = q.stanConn.Publish(q.Topic, out) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that the reconnect handler is async, so if you replace the connection there, and access it here, you need some locking to avoid races.
Thanks for the feedback. If we moved to the 0.12.0 version, how would you feel about spending a half hour or so to write the patch to fix up the re-connect logic? |
@alexellis I did not have time to have a look, but I see that you are now using 0.4.0 client lib, which has the connection lost handler. If you upgrade to 0.11.2 server, then you could make use of that. I can try to submit a PR when I get the chance. However, not sure how I would be testing that, so may ask you to have a go once you have the PR. |
@alexellis Is there anything I could help to fix this issue? Currently we're facing the same issue and the only way to solve it is to restart queue-worker and faas gateway. Thanks |
Derek close: implemented via other PRs |
🎉🎉🎉 |
Thanks @vosmith this has been in both the queue worker and gateway for several releases now. I wanted to close the old PR. |
Signed-off-by: Alex Ellis (VMware) alexellis2@gmail.com
Description
The publisher will now re-connect when NATS Streaming becomes
unavailable. Tested within the gateway code on Docker Swarm.
Thanks to @vosmith for initiating this work.
Motivation and Context
#17
How Has This Been Tested?
In the gateway, by scaling NATS Streaming to zero replicas and back again.
Types of changes
@vosmith @kozlovic @stefanprodan how is this looking?
I want to move the work started by @vosmith forward gradually. I realize that an in-memory restart is not ideal, so we're also looking to provide a configuration with some form of "ft" or PV.