-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update publisher to re-connect #33
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,13 +5,15 @@ import ( | |
"fmt" | ||
"log" | ||
|
||
nats "github.com/nats-io/go-nats" | ||
"github.com/nats-io/go-nats-streaming" | ||
"github.com/openfaas/faas/gateway/queue" | ||
) | ||
|
||
// NatsQueue queue for work | ||
type NatsQueue struct { | ||
nc stan.Conn | ||
stanConn stan.Conn | ||
natsConn *nats.Conn | ||
ClientID string | ||
ClusterID string | ||
NATSURL string | ||
|
@@ -27,20 +29,32 @@ func CreateNatsQueue(address string, port int, clientConfig NatsConfig) (*NatsQu | |
clientID := clientConfig.GetClientID() | ||
clusterID := "faas-cluster" | ||
|
||
nc, err := stan.Connect(clusterID, clientID, stan.NatsURL(natsURL)) | ||
queue1 := NatsQueue{ | ||
nc: nc, | ||
ClientID: clientID, | ||
ClusterID: clusterID, | ||
NATSURL: natsURL, | ||
Topic: "faas-request", | ||
} | ||
|
||
natsConn, err := nats.Connect(natsURL, nats.ReconnectHandler(queue1.reconnectClient(clientID, clusterID, natsURL))) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
stanConn, err := stan.Connect(clusterID, clientID, stan.NatsConn(natsConn)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
queue1.natsConn = natsConn | ||
queue1.stanConn = stanConn | ||
|
||
return &queue1, err | ||
} | ||
|
||
// Queue request for processing | ||
func (q *NatsQueue) Queue(req *queue.Request) error { | ||
|
||
var err error | ||
|
||
fmt.Printf("NatsQueue - submitting request: %s.\n", req.Function) | ||
|
@@ -50,7 +64,26 @@ func (q *NatsQueue) Queue(req *queue.Request) error { | |
log.Println(err) | ||
} | ||
|
||
err = q.nc.Publish(q.Topic, out) | ||
err = q.stanConn.Publish(q.Topic, out) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @kozlovic I find that this code blocks indefinitely if I scale NATS Streaming to zero replicas. It would be better, to check if the stanConn is ready/healthy before submitting the work, or potentially doing into a back-off loop. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that the reconnect handler is async, so if you replace the connection there, and access it here, you need some locking to avoid races. |
||
|
||
return err | ||
} | ||
|
||
func (q *NatsQueue) reconnectClient(clientID, clusterID, natsURL string) nats.ConnHandler { | ||
return func(c *nats.Conn) { | ||
oldConn := q.stanConn | ||
|
||
defer oldConn.Close() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With the way you handle reconnect (at the NATS low level), it is possible that the Streaming server still has the connection registered (remember, streaming server connects to NATS and clients connect to NATS, so no direct connection between client and server) because it did not detect the miss of HBs from client. So this is pretty broken actually, because it is possible that your Connect() below will fail due to a duplicate (same clientID will be detected as duplicate by Streaming server, which will contact the old connection to its INBOX which will be valid since it is reconnected to NATS) and still you will close the old connection. So with this current model, I would first close the old connection and then create the new one. |
||
|
||
stanConn, err := stan.Connect(clusterID, clientID, stan.NatsConn(c)) | ||
if err != nil { | ||
log.Printf("Failed to reconnect to NATS\n%v", err) | ||
return | ||
} else { | ||
log.Printf("Reconnected to NATS\n") | ||
} | ||
|
||
q.stanConn = stanConn | ||
q.natsConn = c | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In current go-nats-streaming release, we have added a handler to be notified when the connection is lost. It is a bit too long to explain here, so have a look at this.
Note that your current reconnect handling may still be needed since once the connection is gone, you need to recreate connection and subs, but maybe you should rely on the stan connection handler instead. I would recommend not passing the NATS connection (let the stan create and own it) unless you need specific settings. Note that you can pass the URL to stan with stan.NatsURL() option.