Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream replies are missing a backpressure mechanism #46

Open
cdevienne opened this issue Jul 18, 2018 · 9 comments
Open

Stream replies are missing a backpressure mechanism #46

cdevienne opened this issue Jul 18, 2018 · 9 comments

Comments

@cdevienne
Copy link
Member

Currently the client has no mean to slowdown the emission of the messages by the server.

There should be one, that should not slow down emissions just because of its presence.
That rules out the client emitting a ack message for each message received.

A possible way would be to send, in a heartbeat message, an optional pause/resume command. The client would send it when its local buffer is filled (or about to), and when it is emptied (or about to).
The nice thing here is that its fully compatible with the current protocol, in both direction.

@cdevienne
Copy link
Member Author

@mdevan I would like your opinion on this before coding

Thanks

@mdevan
Copy link
Member

mdevan commented Jul 19, 2018

Protocol-wise, using a heartbeat message to indicate pause/resume looks good.

I assume you're not going to leave it up to the user's client to decide when to call a StreamCallSubscription.Pause() (or equivalent), because this would complicate the user-facing API somewhat?

Maybe rather have StreamCallSubscription.nextCh as a buffered channel, keep track of how full it is, and have StreamCallSubscription.loop() automatically issue pause/resume heartbeats when high water mark/low water mark thresholds are hit? These thresholds (and maybe the capacity of nextCh also) can then be optionally configured via MethodOptions.

What about the server-side though? Ideally the user's server should be able to propagate the backpressure further upstream, so it should be able to do something like:

func (s *MyServer) sendRecords(ctx context.Context, send func(Record) ?) {
    cursor := s.startQuery()
    for !cursor.Done() {
        r := cursor.getNextRecord()
        for send(r) == nrpc.StreamPaused {
            s.pauseQuery()
            time.Sleep(5 * time.Second)
            s.resumeQuery()
        }
    }
    cursor.Close()
}

@cdevienne
Copy link
Member Author

Client side, I agree: the user should not have to handle the pause/resume.

Server side, I think propagate the back-pressure should be the result of the 'send' function not returning until the sending has resumed.
If the user server needs to propagate upstream explicitely, it can use its own buffer channel to control the pressure.

@mdevan
Copy link
Member

mdevan commented Jul 19, 2018

The server might need control on how long the send() may take? At least as a timeout?

@cdevienne
Copy link
Member Author

The timeout is a good idea: very simple API, and a lot can be done without adding a complex mechanism.
Will do that.

I should be able to work on this next week (not sure though).

@cdevienne
Copy link
Member Author

I was reading the go-nats sources and discovered that it already have a reception buffer, and we can get access its level at any time. It should be a good base for the back-pressure mechanism.

@ujwal-setlur
Copy link

This is something I could/would need. Any status on this?

@cdevienne
Copy link
Member Author

I have not spent time on this subject yet. I do not plan to do soon, but I will need to someday for sure.

@cdevienne
Copy link
Member Author

Re-reading the thread, I think passing a context to 'send' instead of a timeout would give more control and be simpler in the basic case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants