New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NATS streaming reconnect handlers for gateway and queue-workers #17

Closed
wants to merge 5 commits into
base: master
from

Conversation

Projects
None yet
5 participants
@vosmith

vosmith commented Mar 16, 2018

Signed-off-by: Vincent Smith vosmith08@gmail.com

This PR adds support for the NATS streaming clients to reconnect to the NATS Streaming servers when a NATS connection is reset.

Description

The handler for the gateway and the queue-worker both have wrappers around their NATS connections to handle an onReconnect event from the underlying NATS connection. The gateway re-connects to the NATS Streaming server using the same client and cluster IDs. The queue-worker re-connects and resubscribes to the faas-request topic.

Motivation and Context

Previously, when a NATS connection was reset, the clients would not be able to communicate with the NATS streaming server. This is due to the NATS streaming server running in in-memory mode as detailed in this issue.

  • I have raised an issue to propose this change (required)

openfaas/faas#579

How Has This Been Tested?

Using the OpenFaaS helm chart, I deployed an OpenFaaS stack on a Kubernetes cluster running Kubernetes 1.8.8. I deployed a simple echo function using faas-cli. While tailing the logs of the gateway and the queue-worker pods, I invoked the echo function with --async and watched the logs show the messages being passed successfully. I killed the NATS container and waited for it to start up again. I invoked the function with --async again and waited for the publish ack timeout to occur.

Using a modified helm chart and Docker images, I deployed an OpenFaaS stack with the changes from this PR on a the same K8s cluster. I deployed a simple echo function using faas-cli. While tailing the logs of the gateway and the queue-worker pods, I invoked the echo function with --async and watched the messages being passed successfully. I killed the NATS container and waited for it to start up again. Log message is printed by the queue worker on successful reconnection. I invoked the function again and saw the messages being successfully passed.

I also ran the function in a loop, invoking the call every 1 second. When killing the NATS container, 1 publish ack timeout occurs and the subsequent messages are published successfully.

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)

Checklist:

  • My code follows the code style of this project.
  • My change requires a change to the documentation.
  • I have updated the documentation accordingly.
  • I've read the CONTRIBUTION guide
  • I have signed-off my commits with git commit -s
  • I have added tests to cover my changes.
  • All new and existing tests passed.
@@ -26,7 +27,13 @@ func CreateNatsQueue(address string, port int) (*NatsQueue, error) {
clientID := "faas-publisher-" + val
clusterID := "faas-cluster"
nc, err := stan.Connect(clusterID, clientID, stan.NatsURL(natsURL))
conn, err := nats.Connect(natsURL, nats.ReconnectHandler(queue1.reconnectClient(clientID, clusterID)))

This comment has been minimized.

@alexellis

alexellis Mar 19, 2018

Member

Please can you explain why this changed from stan to nats and what the implications are?

This comment has been minimized.

@vosmith

vosmith Mar 19, 2018

In order to to define a ReconnectHandler, it must be set directly on the NATS connection. The NATS streaming connection (created by the stan package) accepts an existing NATS connection as a parameter so I created a connection with a ReconnectHandler and passed into stan.

Alternatively, it should be possible to create a connection through stan, then set the ReconnectHandler on the underlying NATS Streaming connection with nc.NatsConn().SetReconnectHandler

After looking deeper into it, the difference here is we are now responsible for closing the NATS connection.

This comment has been minimized.

@wallyqs

wallyqs Mar 22, 2018

The NATS client has internal reconnection logic already, so I think maybe using the MaxReconnects option might fit better here? When connecting to NATS Streaming via stan.Connect internally the STAN client will use the default reconnection logic which is to attempt to connect a few times to the server (reconnecting state) before giving up entirely and closing the connection (closed state which is terminal so client will not reconnect again).
By setting the ReconnectHandler and ClosedHandler it is possible to add custom logic or logging for after the reconnection or closing of the connection has occurred (for example could make the server exit when closed state is reached meaning that client will not reconnect to NATS again).
It is also possible to make the client reconnect forever rather than giving up by setting nats.MaxReconnects(-1) in case that behavior might be better suited too.

This comment has been minimized.

@vosmith

vosmith Mar 23, 2018

Hey @wallyqs. The underlying NATS connection reconnect as you described. The issue is the NATS Streaming server loses all of it's data because it is in memory mode. Part of this is the client IDs. Because of this a new streaming connection has to be made to rejoin the cluster and register itself.

Btw I am working on restructuring this commit to be more reviewer friendly.

This comment has been minimized.

@wallyqs

wallyqs Mar 23, 2018

Ok might be missing something then... The reconnect handler is called when the NATS client has successfully established a connection to a new server and the NATS client internally replays all the subscriptions once having reconnected, so for the STAN client the reconnection to NATS would be transparent.

This comment has been minimized.

@vosmith

vosmith Mar 26, 2018

@wallyqs , unfortunately it hasn't worked that way in my experience, and it is documented in nats-io/nats-streaming-server#150 . I agree with you that the STAN client should resubscribe, and this may might be an issue that could be addressed by that library. But they don't seem to mind (nats-io/go-nats-streaming#153)

return func(c *nats.Conn) {
nc, err := stan.Connect(clusterID, clientID, stan.NatsConn(c))
if err != nil {
log.Println("Failed to reconnect to NATS stream")

This comment has been minimized.

@alexellis

alexellis Mar 19, 2018

Member

Does it make sense to bundle up a single error / log statement here with log.Printf()?

This comment has been minimized.

@vosmith

vosmith Mar 19, 2018

Yes that makes more sense.

clusterID := "faas-cluster"
val, _ := os.Hostname()
clientID := "faas-worker-" + val
var (

This comment has been minimized.

@alexellis

alexellis Mar 19, 2018

Member

I would prefer refactoring for style to be done in subsequent PRs as it blurs the intent of the PR and makes it harder to review the change.

req := queue.Request{}

This comment has been minimized.

@alexellis

alexellis Mar 19, 2018

Member

I don't know why this changed

}
queryString := ""
if len(req.QueryString) > 0 {

This comment has been minimized.

@alexellis

alexellis Mar 19, 2018

Member

Why was this removed?

@alexellis

Thank you for helping us out with this change - and especially for testing it with a failing/passing scenario. That's exactly what we look for from new contributions.

Unfortunately I'm having a hard time reviewing the code it because seems like there has been a fair amount of refactoring and change of style.

I'd rather you applied the minimum changeset to make this work and then raised a separate PR later for your refactoring ideas if that still makes sense.

We also need more detail on the commit message for the history of the codebase. Right now the commit message is blank with just a subject given.

I find this useful as a reference point - https://chris.beams.io/posts/git-commit

@vosmith

This comment has been minimized.

vosmith commented Mar 19, 2018

First of all, thank you for the link to the git-commit message post, it was certainly enlightening and I'll definitly be more descriptive in the future!

In a broad response to the comments on queue-worker/main.go. I had created the ReconnectHandler for this NATS connection as a separate function. This also required access to many variables inside main. I had pulled out the local variables that I needed into global variables so both main and reconnectClient would have access to them.

I hadn't considered the nightmare of a diff that it was going to create so I apologize about that. I can try to put the reconnectClient function inside of main, it just seems a bit messy.

@alexellis

This comment has been minimized.

Member

alexellis commented Mar 24, 2018

Hey @vosmith perhaps a practical way to move it forward would be to take a backup of the branch where it is now - then start over from master/head and apply the minimum set of changes to make the fix and force push that back up.

After we've merged we can look at the best way to refactor the code.

How does that sound? Do you have an ETA on when this could be ready?

@vosmith

This comment has been minimized.

vosmith commented Mar 26, 2018

Hey @alexellis . Yes I hope to complete the updates this week. 👍

Handle reconnect for STAN clients
With the NATS Streaming server running in memory mode, it loses
all information about previously connected clients and clusters when
it is restarted.  This behaviour requires connected clients to
re-initialize the STAN connections when a restart is detected. This
commit adds reconnect handlers on the NATS connections that
re-initialize the STAN connections.

Signed-off-by: Vincent Smith <vosmith08@gmail.com>

@vosmith vosmith force-pushed the vosmith:nats-reconnect branch from 30ff176 to 724998b Mar 27, 2018

@vosmith

This comment has been minimized.

vosmith commented Mar 27, 2018

The command "curl -sSL test.docker.com | sudo -E sh" failed and exited with 100 during .

Hey @alexellis , looks like the build failed due to an issue connecting with the Docker server. Is there a way to trigger this build again without pushing another commit?

@kozlovic

This comment has been minimized.

kozlovic commented Mar 28, 2018

General comment (will comment specifically on the PR): If you use a NATS Streaming server with memory store, it is true that if the server is restarted, since no state is being restored, the previously "connected" clients will stop receiving messages. Publishers would fail too since the server would reject published messages for unknown client IDs.
The streaming server and streaming clients communicate through some inboxes. When the Streaming server is restarted, since it lost that knowledge, it can't communicate with existing clients. Moreover, even internal subjects used to communicate between the server and its clients contain a unique id that won't be the same after the restart).

Note: If the NATS Streaming server connects to a non-embedded NATS Server, then if the NATS Server itself is restarted, that is fine, the client library's use of the underlying NATS connection will reconnect and everything would work fine (some timeout may occur for the operations that were inflight when the NATS server was restarted). This is because the Streaming server would still be running and its state maintained, so the communication can continue.

@@ -27,6 +28,7 @@ func CreateNatsQueue(address string, port int) (*NatsQueue, error) {
clusterID := "faas-cluster"
nc, err := stan.Connect(clusterID, clientID, stan.NatsURL(natsURL))
nc.NatsConn().SetReconnectHandler(queue1.reconnectClient(clientID, clusterID, natsURL))

This comment has been minimized.

@kozlovic

kozlovic Mar 28, 2018

As it is, you should check for err because if there was an error, nc would be nil and you would get a panic here.

But this is a moot point. You should be creating the NATS connection first, configured with the reconnect handler. Then pass the NATS connection to the stan.Connect() call. Otherwise, you still potentially miss a window between the stan.Connect() call and the nc.NatsConn().SetReconnectHandler() where the connection may have disconnect/reconnect. It will also be easier on reconnect to simply recreate the stan connection (no need to close the connection for which you just got a notification that you were reconnected).

You would need to store the NATS connection (and close it after closing the stan connection) in your NatsQueue structure. By the way, since you would need to store both, I would recommend sc for stan connection and nc for nats connection. Right now, nc is used for the stan connection.

func (q *NatsQueue) reconnectClient(clientID, clusterID, natsURL string) nats.ConnHandler {
return func(c *nats.Conn) {
c.Close()
nc, err := stan.Connect(clusterID, clientID, stan.NatsURL(natsURL))

This comment has been minimized.

@kozlovic

kozlovic Mar 28, 2018

See above. If you create the NATS connection, you should here close the stan connection (not the nats one), just for cleanup, then call stan.Connect() again and pass the c connection with the stan.NatsConn() option. (you would not need the NatsURL() option btw).

log.Printf("Listening on [%s], clientID=[%s], qgroup=[%s] durable=[%s]\n", subj, clientID, qgroup, durable)
}
sc.NatsConn().SetReconnectHandler(reconnectHandler)

This comment has been minimized.

@kozlovic

kozlovic Mar 28, 2018

Same story than describe in handler.go

@kozlovic

This comment has been minimized.

kozlovic commented Mar 28, 2018

Unrelated, but in queue-worker, both durable and unsubscribe are defined but not used at all. Unless you have plans to add durable and unsubscribe support, I would have remove that to keep the code cleaner.

@vosmith

This comment has been minimized.

vosmith commented Mar 28, 2018

@kozlovic, thanks for the feedback! I'll have those changes ready soon 👍

@alexellis

This comment has been minimized.

Member

alexellis commented Mar 29, 2018

@vosmith try to keep the changes (refactoring/style) to an absolute minimum.

Create NATS connection to pass into STAN
After feedback from the NATS community, the NATS connection should be
created before the STAN connection.  This eliminates the possibility
that a disconnect occurs before the reconnect handler is attached.

Signed-off-by: Vincent Smith <vosmith08@gmail.com>
@@ -16,6 +16,8 @@ type NatsQueue struct {
nc stan.Conn
}
var natsConn *nats.Conn

This comment has been minimized.

@kozlovic

kozlovic Mar 29, 2018

You don't need a global var...

nc, err := stan.Connect(clusterID, clientID, stan.NatsConn(natsConn))
if err != nil {
return nil, err
}
queue1.nc = nc

This comment has been minimized.

@kozlovic

kozlovic Mar 29, 2018

You should store in queue1 also the nats connection in order to close (if the code is ever closing on exit, etc..)

This comment has been minimized.

@kozlovic

kozlovic Mar 29, 2018

Note: I don't see anywhere a Close() or the like in the handler, but still, keep track of it in case this is changed later.

This comment has been minimized.

@alexellis

alexellis Apr 24, 2018

Member

@vosmith have you addressed this?

This comment has been minimized.

@vosmith

vosmith Apr 24, 2018

Hey @alexellis , there is no place to close this as the connection should stay open as long as the webserver is up and running. AFAIK there isn't a way to handle a graceful shutdown.

On second thought, it would be a good idea to put the natsConn in queue1, since it get passed on to the gateway. I'll push a commit in a few.

c.Close()
nc, err := stan.Connect(clusterID, clientID, stan.NatsURL(natsURL))
q.nc.Close()
c.SetReconnectHandler(q.reconnectClient(clientID, clusterID, natsURL))

This comment has been minimized.

@kozlovic

kozlovic Mar 29, 2018

Do not reset the reconnect handler. The nats connection is still valid.

@@ -28,6 +28,8 @@ type AsyncReport struct {
TimeTaken float64 `json:"timeTaken"`
}
var natsConn *nats.Conn

This comment has been minimized.

@kozlovic

kozlovic Mar 29, 2018

Don't need this here

client := makeClient()
sc, err := stan.Connect(clusterID, clientID, stan.NatsURL("nats://"+natsAddress+":4222"))
natsConn, err = nats.Connect("nats://" + natsAddress + ":4222")

This comment has been minimized.

@kozlovic

kozlovic Mar 29, 2018

Set the reconnect handler as an option of nats.Connect()

@@ -230,7 +240,7 @@ func main() {
log.Printf("Listening on [%s], clientID=[%s], qgroup=[%s] durable=[%s]\n", subj, clientID, qgroup, durable)
}
sc.NatsConn().SetReconnectHandler(reconnectHandler)
natsConn.SetReconnectHandler(reconnectHandler)

This comment has been minimized.

@kozlovic
nc.Close()
sc, err = stan.Connect(clusterID, clientID, stan.NatsURL("nats://"+natsAddress+":4222"))
sc.Close()
nc.SetReconnectHandler(reconnectHandler)

This comment has been minimized.

@kozlovic

kozlovic Mar 29, 2018

Remove these 2 lines (set reconnect and natsConn=nc). The reconnect handler's nc *nats.Conn will be the pointer to the connection you created earlier. No need to do anything with that.

Remove unnecessary recconnects and global vars
The *nats.Conn in the ReconnectHandler points to the same nats.Conn
that is passed to stan when the initial connection is created.  This
connection already has a the original ReconnectHandler attached to it
so there is no need to call SetReconnectHandler again.

Signed-off-by: Vincent Smith <vosmith08@gmail.com>
@vosmith

This comment has been minimized.

vosmith commented Mar 30, 2018

Thanks @kozlovic. Your input here has been invaluable!

reconnectHandler = func(nc *nats.Conn) {
sc.Close()
sc, err = stan.Connect(clusterID, clientID, stan.NatsConn(nc))

This comment has been minimized.

@kozlovic

kozlovic Mar 30, 2018

From here on, it repeats with what you do below for the initial setup. I would have made the stan.Connect() and creating of the queue sub a function (anonymous or not). You may, or may not need to have the print statements in that function, not sure what @alexellis preference would be. In the reconnect handler, you could however log that there was a reconnection.

This comment has been minimized.

@alexellis

alexellis Apr 24, 2018

Member

I think we should definitely log this. cc @vosmith

This comment has been minimized.

@alexellis

alexellis Apr 24, 2018

Member

@vosmith this is just debug information - not DRY - could we log any reconnects?

This comment has been minimized.

@vosmith

vosmith Apr 24, 2018

Hey @alexellis , I can definitly log the reconnect here. I apologize, I was focussed more on the first part of the comment about making the connection a separate function. I forgot about the comment on logging.

@alexellis

This comment has been minimized.

Member

alexellis commented Apr 13, 2018

@vosmith what are your thoughts?

@vosmith

This comment has been minimized.

vosmith commented Apr 17, 2018

Hey @alexellis , after my first attempt I was trying to keep the changes to a minimum in this PR. I'd like to do a more comprehensive refactoring of this file in a second PR, if you don't mind.

@alexellis

This comment has been minimized.

Member

alexellis commented Apr 19, 2018

Reviewed with contributors on 4/19.

Hi @vosmith have you been able to address all the feedback from @kozlovic? I would like to see that done and merged before doing a refactoring exercise.

Thanks,

Alex

@vosmith

This comment has been minimized.

vosmith commented Apr 19, 2018

@alexellis, yes sir. I've implemented all the changes except for the last suggestion about DRYing up the code. That is the part I would like to take care of in the refactor.

All the previous suggestions have been Implemented

@alexellis

This comment has been minimized.

Member

alexellis commented Apr 24, 2018

I am not seeing the push on this branch since the comments by @kozlovic

@vosmith

This comment has been minimized.

vosmith commented Apr 24, 2018

The last comment is about making the code more DRY.

As this is more in line with refactoring, I feel that it can be addressed later.

If you prefer that I handle this case right now I can do so.

Would you like me to make this change before you merge this PR?

@derek

This comment has been minimized.

derek bot commented Apr 24, 2018

Thank you for your contribution. I've just checked and your commit doesn't appear to be signed-off.
That's something we need before your Pull Request can be merged. Please see our contributing guide.

@derek derek bot added the no-dco label Apr 24, 2018

Log the reconnect event
When the queue worker client reconnects to the NATS Streaming Server,
it should log the event.

Signed-off-by: Vincent Smith <vosmith08@gmail.com>

@vosmith vosmith force-pushed the vosmith:nats-reconnect branch from c8df5d9 to e9e356c Apr 24, 2018

@derek derek bot removed the no-dco label Apr 24, 2018

Store NATS Connection in NatsQueue
The NATS connection for the handler should be stored in the
NatsQueue struct to allow the gateway or other systems to perform
a graceful shutdown.

Signed-off-by: Vincent Smith <vosmith08@gmail.com>

@alexellis alexellis referenced this pull request Jul 25, 2018

Open

Update publisher to re-connect #33

1 of 3 tasks complete
@ivanayov

This comment has been minimized.

Member

ivanayov commented Nov 8, 2018

@vosmith seems like your PR has conflicts?
What's left in order to merge?

@vosmith

This comment has been minimized.

vosmith commented Nov 8, 2018

@ivanayov, work on #33 has superceded this. Sorry I should have closed this earlier!

@ivanayov

This comment has been minimized.

Member

ivanayov commented Nov 9, 2018

Derek close

@derek derek bot closed this Nov 9, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment