New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: Go streaming API #70

Open
spenczar opened this Issue Jan 29, 2018 · 20 comments

Comments

Projects
None yet
6 participants
@spenczar
Member

spenczar commented Jan 29, 2018

#3 lays out some ideas for how Twirp could support streaming RPCs on the wire. Equally important is how we support them in generated code.

This issue is for designing the generated Go code. When we've landed on a proposal that seems pretty good, I'll edit this top text with what we've got, and then we can look at implementing it.

Let's use this service to discuss what things would look like:

syntax = "proto3";

service Streamer {
  rpc Transaction(Req) returns (Resp);
  rpc Upload(stream Req) returns (Resp);
  rpc Download(Req) returns (stream Resp);
  rpc Bidirectional(stream Req) returns (stream Resp);
}

message Req {}
message Resp {}

@spenczar spenczar added this to the v6 milestone Jan 29, 2018

@spenczar

This comment has been minimized.

Member

spenczar commented Jan 29, 2018

Here's one way to do this.

The generated interface would use <Msg>Stream types:

type Streamer interface {
  Transaction(context.Context, *Req) returns (*Resp, error)
  Upload(context.Context, ReqStream) returns (*Resp, error)
  Download(context.Context, *Req) returns (RespStream, error)
  Bidirectional(context.Context, ReqStream) returns (RespStream, error)
}

These Stream types are an interface:

type ReqStream interface {
  Next(context.Context) (*Req, error)
}

As the recipient:

You call Next to get another message from the stream. If the stream has been closed by the sender, Next returns a nil *Req, and an error that describes the shutdown. This could either be a var ErrStreamComplete, indicating a "healthy" termination of the stream, or an error that indicates a problem.

To shut down the stream early as the recipient, cancel the context. Otherwise, you must call Next repeatedly until it returns nil, <error>. Failure to do one of these two things will leave the HTTP connection hanging open.

As the sender:

It's your responsibility to provide an implementation of this interface. The generated Twirp code will be calling Next, then serializing the objects it gets and shipping them to the recipient. It will call Next repeatedly until Next returns nil, <error>, so an implementation must promise to eventually return nil, <error> or it will leak an HTTP connection.

We will provide a constructor for a simple implementation of the interface for convenience. It might look like this, but I'm not attached to this API and it probably needs work:

type reqStreamSender struct {
  ch <- chan *Req
}

func NewReqStream(ch chan *Req) *reqStreamSender {
  return &reqStreamSender{ch: ch}
}

func (s *reqStreamSender) Next(ctx context.Context) (*Req, error) {
  select {
    case <- ctx.Done():
      return nil, ctx.Err()
    case v, ok := <- s.ch:
      if !ok {
        return nil, twirp.ErrStreamComplete
      }
      return v, nil
  }
}

Fleshing this out for the 4 unidrectional cases, using service Streamer as an example:

As a unidirectional client-sender:

You construct a message stream that fulfills the ReqStream interface, and then you call Upload(ctx, stream) on the generated client.

The generated client code immediately opens an HTTP request to the server. It then repeatedly calls stream.Next(ctx), serializing the messages until stream.Next(ctx) returns nil, <error> (it uses the provided ctx).

Then, it sends the error from stream.Next as a trailer. It reads the server's response, parses it, and returns it.

If the server recipient cancels the stream early, then the generated client will immediately return a nil value for *Resp, and an error explaining that the recipient aborted the stream.

For example:

client := NewStreamerClient("http://127.0.0.1", http.DefaultClient)
ch := make(chan *Req, 5)
stream := NewReqStream(ch)
go func() {
  for i := 0; i < 100; i++ {
    ch <- &Req{}
  }
  close(ch)
}()
resp, err := client.Upload(context.Background(), stream)

As a unidirectional server-recipient:

An incoming HTTP request comes in for the Upload API, and we construct a ReqStream to describe the stream. Upload(ctx, stream). You should block until stream.Next(ctx) returns nil, <error>. If you must abort, do so by canceling a context provided to stream.Next.

Once you return, the generated HTTP handler will call req.Body.Close() and send your response.

For example:

type *streamerImpl struct {...}

func (s *streamerImpl) Upload(ctx context.Context, s ReqStream) (*Resp, error) {
  var err
  for {
    var req *Req
    req, err = s.Next(ctx)
    if req == nil && err != nil {
      break
    }
    err = handle(req)
    if err != nil {
        ctx, cancel := context.WithCancel(ctx)
        cancel()
        s.Next(ctx)
        break
    }
  }
  if err == twirp.ErrStreamComplete {
    return &Resp{}, nil
  }
  return nil, err
}

As a unidirectional server-sender:

An incoming HTTP request triggers a call to Download(ctx, req). Your code should construct a message stream that fulfills the RespStream interface and then return it. If you don't like the request you received for some reason, you can return an error and a nil RespStream.

The generated code will call stream.Next() repeatedly until it gets nil, <error>, using the original request's context for each call.

If the client cancels the stream early, then we'll trigger the Error hook on *ServerHooks.

For example:

func (s *streamerImpl) Download(ctx context.Context, req *Req) (RespStream, error) {
  if !valid(req) {
    return nil, errInvalid
  }
  ch := make(chan *Resp, 10)
  go func () {
    for i := 0; i < 100; i++ {
      ch <- &Resp{}
    }
    close(ch)
  }()
  return NewRespStream(ch), nil
}

As a unidirectional client-recipient:

You call Download(ctx, req), receiving a RespStream. You should call Next on it repeatedly until you receive nil, <error>. You can abort by canceling the context. You must do one of these two things, or else we'll hold the HTTP connection open forever.

For example:

func doDownload(handle func(*Resp) error) {
  client := NewStreamerClient("http://127.0.0.1", http.DefaultClient)
  respStream, err := client.Download(context.Background(), &Req{})
  if err != nil {
    return err
  }

  for {
    resp, err := respStream.Next(context.Background())
    if err != nil {
      if err == twirp.ErrStreamComplete {
        return nil
      }
      return err
    }
    err = handle(resp)
    if err != nil {
      ctx, cancel := context.WithCancel(context.Background())
      cancel()
      respStream.Next(ctx)
      return err
    }
  }
}

Bidirectional clients and servers

Bidirectional clients and servers behave just as natural extensions of the above, with one wrinkle for the Go API: the client might return an error immediately if it cannot successfully negotiate an http/2 connection with the server.

The generated code's implementation might be a lot more complex, here. We need to investigate whether ServeHTTP provides enough to do this right.

Advantages of this design

The generated code is simple. The stream interface is really lightweight.

Canceling the stream is possible from both sides.

Senders can configure their own buffering strategy for outbound messages by providing their own implementation of <msg>Stream.

Recipients can set timeouts while waiting for a message, which is necessary for security - otherwise, clients can easily lock up servers.

Disadvantages of this design

Recipients can't pick a buffering strategy - we have to construct <msg>Stream for them.

We have to impose rules like "You must block until you get a nil, error response" or else users will leak.

The API for clients canceling the stream is pretty weird.

@rhysh

This comment has been minimized.

Contributor

rhysh commented Jan 29, 2018

Good work finding an answer that keeps the client and server interfaces unified!


A nit: Can users provide closures rather than implementing a type with a Next method (like http.ServerHandler)? Or could the helper code convert from a func to the necessary type, rather than or in addition to the helper code to use channels?

type RespStreamFn func(ctx context.Context) (*Resp, error)

func (fn *RespStreamFn) Next(ctx context.Context) (*Resp, error) { return fn(ctx) }

A nit: Can twirp.ErrStreamComplete instead be io.EOF?


Consider a server-sender that reads a file from disk, returning its contents N bytes at a time. It's a unidirectional server-sender, but it has some clean up work to do at the end of the request (closing the file to release the file descriptor). It could get an async signal by checking context cancelation, but the usual defer patterns for synchronous cleanup won't work. The code will get tricky if several cleanup steps need to be done in a particular order.

Could Twirp provide helper code that would allow server method authors to write synchronous implementations while still conforming to the united API?

// Part of the generated interface
func (s *streamerImpl) Download(ctx context.Context, req *Req) (RespStream, error) {
  convertSyncDownload(s.DownloadSync)
}

// A convenient way to write the code, since defer works
func (s *streamerImpl) DownloadSync(ctx context.Context, req *Req, send func(*Resp)) error {
  f, err := os.Open("big.log")
  if err != nil {
    return err
  }
  defer f.Close()
  sc := bufio.NewScanner(f)
  for sc.Next() {
    send(&Resp{Line: sc.Text()})
  }
  return sc.Err()
}

// could this be generated?
func convertSyncDownload(fn func(context.Context, *Req, func(*Resp))) func(context.Context, *Req) (RespStream, error) {

	return func(ctx context.Context, req *Req) (RespStream, error) {
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

		var (
			mu     sync.Mutex
			fnErr  error
			values = make(chan *Resp)
		)

		// called by method implementation
		fnStream := func(v *Resp) {
			mu.Lock()
			defer mu.Unlock()
			select {
			case <-ctx.Done():
			case values <- v:
			}
		}
		// called by Twirp framework
		twirpStream := RespStreamFn(func(ctx context.Context) (*Resp, error) {
			select {
			case <-ctx.Done():
				return ctx.Err()
			case v, ok := <-values:
				if ok {
					return v, nil
				}
				mu.Lock()
				defer mu.Unlock()
				if fnErr != nil {
					return nil, fnErr
				}
				return twirp.ErrStreamComplete
			}
		})

		go func() {
			defer cancel()

			err := fn(ctx, req, fnStream)
			mu.Lock()
			defer mu.Unlock()

			fnErr = err
			close(values)
		}()

		return twirpStream, nil
	}
}
@spenczar

This comment has been minimized.

Member

spenczar commented Jan 30, 2018

Can users provide closures rather than implementing a type with a Next method (like http.ServerHandler)?

I think you mean http.HandlerFunc? Yes, we could do something like that through a RespStreamFunc, as you describe. There's room to maneuver in the helper function we provide to make a <msg>Stream.

A nit: Can twirp.ErrStreamComplete instead be io.EOF?

Certainly.

Consider a server-sender that reads a file from disk

I'm having a lot of trouble understanding the code you posted as a suggestion. I hope we don't need to generate anything that complex.

I think the advantage of this interface-based approach is that users can write whatever they need to handle their complex stream source. In your example, I think a custom type is right:

type FileRespStream struct {
	f  *os.File
	sc *bufio.Scanner
}

func newFileRespStream(f *os.File) *FileRespStream {
	sc := bufio.NewScanner(f)
	return &FileRespStream{
		f:  f,
		sc: sc,
	}
}

func (s *FileRespStream) Next(ctx context.Context) (resp *Resp, err error) {
	// Block until the scanner has data available, or the context is closed.
	scanCh := make(chan bool)
	go func() {
		scanCh <- s.sc.Scan()
	}()
	defer close(scanCh)

	select {
	case <-ctx.Done():
		err = io.EOF
	case more := <-scanCh:
		if more {
			resp = &Resp{Line: s.sc.Text()}
		} else {
			err = s.sc.Err()
		}
	}

	// If we didn't populate resp, then this was the last line. Clean up after
	// ourselves.
	if resp == nil {
		s.f.Close()
	}
	return resp, err
}

I think this is simple and clear for that use case, and is an advantage of the interface for streams.

@rhysh

This comment has been minimized.

Contributor

rhysh commented Jan 30, 2018

Oops, yes I meant http.HandlerFunc.

In that example the file descriptor is closed (conditionally) at the end of the Next method. If the connection to the client is lost, will the Twirp-generated server code call Next with a canceled Context?

When plugging a server implementation directly into code that needs a client, the caller should be able to clean up the call by canceling the Context they provided to the RPC call. For streaming requests, they'll have to additionally call Next with a canceled Context to be sure all resources have been cleaned up .. unless the server method has been written very carefully.

The defer keyword is really good, I'd really like to be able to use it in streaming servers.

@spenczar

This comment has been minimized.

Member

spenczar commented Jan 30, 2018

Right, the rule (in my proposal above) is that a Stream must be either drained or called with a canceled context to avoid leaks.

We might be able to improve things by expanding the interface:

type ReqStream interface {
  Next(context.Context) (*Req, error)
  End(error)
}

For a sender, End will be called by generated code when we're going to stop sending messages for any reason: either we have received nil, io.EOF from a call to Next, or we have to shut down for some other reason (like the receiver went away).

For a recipient, End is called by the user, and means "Don't send me any more data, because I ain't listenin'". Definitely nicer than using a canceled context for this!

This provides one place to put cleanup logic, like you might do through defers. It also provides a nicer way for recipients to abort.

The defer keyword is really good, I'd really like to be able to use it in streaming servers.

Oh, I agree. I'm just not sure how to work it in here. As you demonstrated, a big functor could work, as you demonstrated, but it's very complex. Functors aren't the first tool most people reach for! I don't think defer's simplicity outweighs its complexity of a function whose signature is func( func(context.Context, *Req, func(*Resp))) func(context.Context, *Req) (RespStream, error). That's a lot of func.

I think defer is hard to work in here for deep reasons. To really get the most out of it, you need to call defer in a function that starts before any calls to Next, and exits there will be no more calls to Next.

But we want the generated code to be calling Next, so doing defer "right" requires some inversion of control. The user code would need to set up, call defer, hand off execution, and then reclaim control at the very end. I can't see how this could ever work elegantly, but I'd be happy to be proven wrong.

@mocax

This comment has been minimized.

Contributor

mocax commented Jan 30, 2018

I suggest to have StreamReader and StreamWriter interfaces and use them on both client side and server side. The ReqStream on client side is for write, on server side is for read, which would be confusing when it is used in the middle of a function. While Reader/Writer are much well know design pattern, e.g. java.io.Reader.

@spenczar

This comment has been minimized.

Member

spenczar commented Jan 30, 2018

@mocax, I think you mean different types for senders and recipients, not clients and servers?

I wouldn't want different types because it would mean that clients and servers have different interfaces, and I really like the fact that there is one generated Go interface right now. It means that clients can be converted to servers:

c := NewStreamerProtobufClient("http://remote", http.DefaultClient)
s := NewStreamerServer(c)

And it means that servers can be used wherever you have clients:

var s Streamer
if (delegateUpstream) {
  s = NewStreamerProtobufClient("http://upstream", http.DefaultClient)
} else {
  s = NewStreamerServer(svc)
}

It makes it easier to write mocks, and (most importantly!) makes the generated code simple and intuitive.

Changing the client and server to have different interfaces would lose a lot of Twirp's simplicity and idiomaticity in its Go API, so I think we should only do it if it has huge, clear benefits. Could you flesh out your suggestion with a code example?

@mocax

This comment has been minimized.

Contributor

mocax commented Jan 30, 2018

A RPC stream is much like a file stream. We can model it as reader/writer, which is very popular design pattern. Or we can combine them into a single stream interface, which has both read and write method. Latter would look just like stdio stream.

Of course, there are many other consideration. For Go, we could just map stream to Go channel, which is probably the most intuitive choice.

@spenczar

This comment has been minimized.

Member

spenczar commented Jan 30, 2018

Go channels tend to make for bad types when shared across API boundaries.

Could you flesh out your suggestion with code? It's difficult to judge in just text descriptions. The details matter a lot here.

@fritzherald

This comment has been minimized.

fritzherald commented May 31, 2018

A few weeks ago I took spenczar's experimental work on the v6_streams_alpha branch and have gotten a protobuf proof-of-concept up and running with a streaming download endpoint. Based on my experience, I have a few suggestions to discuss and points to bring out, but first the codes:

https://github.com/twitchtv/twirp/pull/110/files

Main takeaways from my foray:

  • Twirp streams are/will be awesome — with proper tuning, a twirp stream client is able to receive more than a million Hats per second on my 2013 MacBook Pro (caveat: see the flushing notes at the end of this comment)

  • How protobuf errors could work (and are implemented in my PR):

    • If the server's RPC implementation returns an error directly (instead of a RespStream) a standard JSON-encoded twirp error is returned and the HTTP status code gets set accordingly (internally, uses the same writeError function as unary RPCs). This makes things easy from a client implementation point-of-view: any non-200 status code is a JSON-encoded twirp error.
    • If the server's RPC implementation returns a RespStream…
      • The HTTP status code is always 200 and Content-Type is application/protobuf
      • Errors from the RespStream's Next function are encoded as the "trailer" field of the stream: trailer-field-tag + length-delimited-protobuf-string
        • If Next returns an io.EOF error, the trailer string is just "EOF"
        • For any other error, the trailer's protobuf string is a JSON-encoded twirp error
  • Channel-based stream interface helper

    • The initial proposal of using a chan *Resp doesn't provide any way to handle mid-stream errors. In rigging up my own RespStream, I wrote myself into an annoying anti-pattern using a chan *Resp and a separate chan error — code comments like "// resps channel closes before error channel, so calling Next here to let errs signal the stream's completion" were indicatave of some deep problems with a two-stream approach. Thus I've got another proposed implementation:
      • Use a chan *RespOrError where RespOrError looks like type RespOrError struct{resp *Resp, err error} to encapsulate Next's two return values in a single channel.
      • Without a good implementation in the generated code, devs implementing their own twirp stream endpoints are likely end up in territory similar to my two-channel antipattern, particularly when trying to push data to the RespStream, as the Next interface is a pulling pattern. To me, chan *RespOrError heads in the direction of providing a clean push interface to RespStreams.
      • Relevant sections in the PR:

And now the trickiest point for last: flushing the response writer. To reliably get messages over the wire as they come in, http.Flusher's Flush function has to be used to force the sending of a packet. My naive implementation is flushing after every message (provided the response writer implements the Flusher interface), but in high-load scenarios this dramatically reduces throughput — flushing every message gives <10% throughput in my initial tests. The sweet-spot really depends on the use-case, and coming up with a good solution for when to flush the response writer is going to be tricky!

There have been some grpc discussions (here and here) about flushing, it sounds like both grpc and go's http2 server will flush "only when a packet is full or there's nothing else to send", but I haven't yet tracked down where this flushing functionality is implemented — it would be great to see both to compare/contrast. You can see that some users want to opt out of the batching or take direct control of the flushing, but grpc doesn't provide the hooks and adding them would be difficult architecturally — food for thought!

And those are my thoughts 🖖

[Update: forgot to mention that I've been writing a javascript client as part of my experiments, you can see how it all plays out in JS here, here, and here]

@spenczar

This comment has been minimized.

Member

spenczar commented May 31, 2018

@fritzherald, thank you so much for this dive! Your hands-on experience here is hugely valuable.

First, I think chan *RespOrError seems like a solid advance. I have nitpicks - maybe it should be chan RespOrError? - but the core idea of a single channel which returns a union-type-ish value seems solid. I'm especially convinced by the trickiness of maintaining two channels. Let's do what you suggest.

Your proposals on errors and the trailer format seem equally good, to me.

Flushing is hard, for sure. I think that grpc-go#1242 presents the hard part most clearly, as there's a tradeoff between latency and throughput here: if you don't flush on every message, then you're necessarily letting messages sit in a buffer for a bit. But if you do flush on every message, then you're using your TCP connection very inefficiently, especially for small messages.

Adding a smart TCP scheduler to Twirp to handle this seems very difficult and complex. I am pretty sure we shouldn't do that. Instead, we should try to find a way to let picky users choose their flushing behavior, while taking a middle route as the default.

Hypothetically, users can control things by providing their own http.ResponseWriters to Twirp servers which flush after every call to Write. For example:

func MinimizeLatency(h http.Handler) http.Handler {
  return http.HandlerFunc(w http.ResponseWriter, r *http.Request) {
    if flusher, ok := w.(responseFlusher); ok {
      w = lowLatencyResponseWriter{base: flusher}
    }
    h(w, r)
  }
}

type responseFlusher interface {
  http.ResponseWriter
  http.Flusher
}

type lowLatencyResponseWriter struct {
  base responseFlusher
}

func (w lowLatencyResponseWriter) Write(p []byte) (int, error) {
  n, err := w.base.Write(p)
  if err != nil {
    return n, err
  }
  w.base.Flush()
  return n, nil
}

func (w lowLatencyResponseWriter) Flush() { w.base.Flush() }

This could be used as middleware like so:

server := MinimizeLatency(NewHaberdasherServer(app, hook))
http.ListenAndServe(":8080", server)

Obviously, it's easy to think of more complex variations. A ResponseWriter could count some number of bytes and then flush, or guarantee that it never holds a message for longer than a millisecond, etc. We can promise that we won't call the ResponseWriter's Write method from multiple goroutines to make this more straightforward.


This control-flushing-via-response-writer strategy would work, I think, but it's definitely a little complex. I think that's okay, if we can get enough simplicity for the common case for people that most people don't have to bother with making their own ResponseWriter. Ideally, only people with very special needs will mess with this.

Would a default strategy of "never call Flush(), just expect net/http to handle this for us" be good enough for most users? Does net/http ever decide to flush small messages just because they have been sitting around for a while, or does it always only flush in big chunks? We should investigate that.

@fritzherald

This comment has been minimized.

fritzherald commented Jun 1, 2018

@spenczar the control-flushing-via-response-writer strategy seems like a great approach for enabling customized flushing strategies. As for "never call Flush()", I just ran a quick test and the results indicate there would probably need to be some flushing strategy provided by twirp:

Test with no flushing

Configuration: Using the Repeat rpc defined here, the client requests a stream of responses with a 100ms delay between each response:

// error handling omitted for clarity
startedAt = time.Now()
repeatRespStream, err := client.Repeat(req)
log.Printf("Received response stream after %v\n", startedAt)
for {
	repeatResp, err := repeatRespStream.Next(context.Background())
	log.Printf("Received resp after %v\n", startedAt)
}

Results:

  1. First ~12 seconds: client waits for Repeat to return
  2. @ 13 seconds: Repeat returns the RepeatRespStream
  3. @ 26 seconds: Next returns for the first time, followed immediately by another 126 messages
  4. @ 39 seconds: messages 128 through 252 arrive
  5. @ 51 seconds: messages 253 through 376 arrive
  6. ...

Increasing the delay 10x to one second results in a ~10x multiplier to all of the times above: i.e. ~130 seconds before the rpc call returns the RespStream.


I'm having trouble imagining how Twirp could provide a default flushing strategy that knows to get out of the way of a custom writer without some sort of option flag to disable the built-in flusher. An alternative could be for the twirp package to define one or multiple handlers like your MinimizeLatency above, but it would mean that users who aren't aware of the need for a flushing handler could easily be confused by clients that are hanging on the initial rpc call while the server's Next implementation is happily chugging away.

I guess another possibility could be to add a FlushHandler field to the ServerHooks struct and for twirp to rig up the handler internally, using a default implementation if no handler is provided by the user.

@spenczar

This comment has been minimized.

Member

spenczar commented Jun 3, 2018

Okay, that experiment makes it clear that we need to have a default flushing strategy, and we need to let users override the default. I think adding this to ServerHooks would be a mistake, as it really messes with the purpose of ServerHooks.

The natural thing here, if we had nothing in place already, would be to accept the flushing strategy as a parameter when you construct the server handler (in NewHaberdasherServer, in other words). I think it would probably be done through a config struct, like this:

// defined in the twirp library
type ServerConfig struct {
  Hooks *ServerHooks
  Flush *FlushStrategy
}

type FlushStrategy struct {
  // If at least MaxLatency time has elapsed since the last flush, and unflushed bytes 
  // are available, flush.
  MaxLatency       time.Duration
  // If at least ByteThreshold bytes have been written since the last flush, flush.
  ByteThreshold    int
  // If at least MessageThreshold messages have been written since the last flush, flush.
  MessageThreshold int
}
// generated code:
func NewHaberdasherServer(svc Haberdasher, conf twirp.ServerConfig) TwirpService

This, I think, feels extremely clear... but it's a compatibility-breaker, once code is regenerated by users. On the other hand, a major version bump is a reasonable time to break compatibility, and we'd be doing a major version bump for streaming anyway, so this might be the right time.

The other downside of switching to a config struct is that it makes it much more appealing to add tons of config junk in. I think one of Twirp's strengths is the how little configuration it takes currently, and if we could keep that, I'd be happier.

Anyway, I'd like to find an alternative, but I think it'll be hard to find something much better than this config struct style.

@spenczar

This comment has been minimized.

Member

spenczar commented Jun 3, 2018

Oh! I have an important alternative that strikes me right after posting that last comment (of course) which is a Set method on the generated code.

Generated servers could have an additional method, func (h *haberdasherServer) SetFlushStrategy(maxLatency time.Duration, byteThreshold, msgThreshold int). This would maintain compatibility. It's a little odd, but it would work.

@fritzherald

This comment has been minimized.

fritzherald commented Jun 4, 2018

SetFlushStrategy seems like a good approach. What would be the process for disabling built-in flushing when using a custom flushing response writer? Related: what do zero values imply? I'm inclined to think they should indicate that the parameter has no effect on flushing, so SetFlushStrategy(time.Millisecond, 0, 0) would imply that byte and msg thresholds are ignored.

I could also see a FlushStrategy config struct as the arg to SetFlushStrategy working well, something like...

type FlushStrategy struct {
	maxLatency time.Duration
	byteThreshold, msgThreshold int
}

// Not strictly necessary, but could be handy to have pre-defined vars for common values:
var (
	FlushStrategyDisabled = FlushStrategy{} // zero val means no flushing
	FlushStrategyDefault = FlushStrategy{/* whatever defaults are settled on */}
	FlushStrategyEveryWrite = FlushStrategy{byteThreshold: 1}
)

Then twirp.FlushStrategyDefault could be used in the generated .twirp.go file as the default if SetFlushStrategy is never called, and calling SetFlushStrategy would look like...

// zeros for byteThreshold and msgThreshold imply no flushing based on bytes/msgs
server.SetFlushStrategy(twirp.FlushStrategy{maxLatency: time.Millisecond})

// or...
server.SetFlushStrategy(twirp.FlushStrategyDisabled) // for use with custom `responseFlusher`-style response writer

// revert to default flushing...
server.SetFlushStrategy(twirp.FlushStrategyDefault)
@sdboyer-stripe

This comment has been minimized.

sdboyer-stripe commented Jun 5, 2018

@spenczar

This comment has been minimized.

Member

spenczar commented Jun 5, 2018

@fritzherald Nice, I like this a lot!

What would be the process for disabling built-in flushing when using a custom flushing response writer?

I think this would be server.SetFlushStrategy(FlushStrategyDisabled), so that the custom ResponseWriter can have control.

what do zero values imply? I'm inclined to think they should indicate that the parameter has no effect on flushing, so SetFlushStrategy(time.Millisecond, 0, 0) would imply that byte and msg thresholds are ignored.

That's my thinking too.

something like... [use a struct]

Yes, I like this proposal too. An additional advantage is that we could reuse those shapes for describing flushing in clients when they are uploading streams.

Good! I think we're ready to roll on an implementation of this!


One last thing I've been thinking about. I wonder if your chan RespOrError can be used in place of my <Msg>Stream type everywhere.

The generated interface would look like:

type Streamer interface {
  Transaction(context.Context, *Req) returns (*Resp, error)
  Upload(context.Context, <-chan ReqOrError) returns (*Resp, error)
  Download(context.Context, *Req) returns (<-chan RespOrError, error)
  Bidirectional(context.Context, <-chan ReqOrError) returns (<-chan RespOrError, error)
}

This seems like it might be easier for users to wrap their heads around.

I will write up a thorough walk-through of this design, following the pattern in #70 (comment), in the next hour or so.


@sdboyer-stripe hey there 🤓

@spenczar

This comment has been minimized.

Member

spenczar commented Jun 5, 2018

Okay, I like the channel of pairs. I think it's better.


The generated interface would use <-chan ReqOrError and <-chan RespOrError types:

type Streamer interface {
  Transaction(context.Context, *Req) returns (*Resp, error)
  Upload(context.Context, <-chan ReqOrError) returns (*Resp, error)
  Download(context.Context, *Req) returns (<-chan RespOrError, error)
  Bidirectional(context.Context, <-chan ReqOrError) returns (<-chan RespOrError, error)
}

Since the parameters are specified as <-chans, they are all read-only. This saves us from worrying about users closing the channels or inserting values from the wrong side.

These channeled types are struct pairs, generated for each type:

type <Type>OrError struct {
  Msg *<Type>
  Err error
}

Exactly one of Msg or Err will be non-nil in a valid value of this struct.

As the recipient:

You iterate over the channel. You check whether the error is non-nil; if so, the stream has a problem and won't continue. Otherwise, you can use the Msg field.

When the stream is terminated, the channel will be closed.

To shut down the stream early as the recipient, cancel the context. Otherwise, you must read from the channel until it is closed. Failure to do one of these two things will leave the HTTP connection hanging open.

The channel is unbuffered. If you need to buffer, you can do it yourself by copying into a buffered channel.

As the sender:

It's your responsibility to provide an instance of the channel. The generated Twirp code will be reading from the channel until it is closed, or until the context is canceled, or until a non-nil error value is emitted. If you put in a non-nil error value, you should close the channel right afterwards.

We don't need to provide a constructor for this - it's obvious how you can set up buffering.


For the 4 unidrectional cases:

As a unidirectional client-sender:

You construct a message channel stream, and then you call Upload(ctx, stream) on the generated client.

The generated client code immediately opens an HTTP request to the server. It then iterates over the stream, checking in each loop iteration for context cancelation too. It writes messages to the wire, and for a non-nil error, it writes the error as a trailer and ends the stream.

for {
  var (
    m   *Req
    err error
  )
  select {
  case <-ctx.Done():
    err = twirp.NewError("context canceled", twirp.Canceled)
  case v, ok := <-stream:
    if !ok {
      err = twirp.NewError("stream complete", twirp.NoError)
    }
    if v.Err != nil {
      err = v.Err
    } else {
      m = v.Msg
    }
  }

  if m != nil {
    writeMsgToWire(m)
  }
  if err != nil {
    writeErrToWireAsTrailer(err)
    terminateWireStream()
    break
  }
}

Then, it reads the server's response, parses it, and returns it.

If the server recipient cancels the stream early, then the generated client will immediately return a nil value for *Resp, and an error explaining that the recipient aborted the stream.

For example:

client := NewStreamerClient("http://127.0.0.1", http.DefaultClient)
ch := make(chan ReqOrError, 5)
go func() {
  for i := 0; i < 100; i++ {
    ch <- ReqOrError{
      *Req: &Req{},
    }
  }
  close(ch)
}()
resp, err := client.Upload(context.Background(), stream)

As a unidirectional server-recipient:

An incoming HTTP request comes in for the Upload API, and we construct a chan ReqOrError to describe the stream. You should read from the channel until it is closed. If the context is canceled, the generated code will close the channel, so you don't need to check for a context cancelation too.

To abort, return a nil *Response and a twirp.Error with a code of twirp.Canceled.

Once you return, the generated HTTP handler will call req.Body.Close() and send your response.

For example:

type *streamerImpl struct {...}

func (s *streamerImpl) Upload(ctx context.Context, s <-chan ReqOrError) (*Resp, error) {
  for v := range s {
    if v.Err != nil {
      // Oh no!
      logCanceledStream(v.Err)
      return nil, twirp.NewError("i give up", twirp.Canceled) // ???
    }

    err = handle(ctx, v.Msg)
    if err != nil {
      return nil, err
    }
  }
  return &Resp{}, nil
}

As a unidirectional server-sender:

An incoming HTTP request triggers a call to Download(ctx, req). Your code should construct a message channel and return it. If you don't like the request you received for some reason, you can return an error and a nil RespStream.

The generated code will read from the stream repeatedly until it is closed, or the context is canceled.

If the client cancels the stream early, then we'll trigger the Error hook on *ServerHooks.

For example:

func (s *streamerImpl) Download(ctx context.Context, req *Req) (RespStream, error) {
  if !valid(req) {
    return nil, errInvalid
  }
  ch := make(chan RespOrError, 10)
  go func () {
    for i := 0; i < 100; i++ {
      ch <- RespOrError{Msg: &Resp{}}
    }
    close(ch)
  }()
  return NewRespStream(ch), nil
}

As a unidirectional client-recipient:

You call Download(ctx, req), receiving a <-chan RespOrError. You should read from it repeatedly until you receive nil, <error>. You can abort by canceling the context.

For example:

func doDownload(handle func(*Resp) error) {
  client := NewStreamerClient("http://127.0.0.1", http.DefaultClient)
  ctx, cancel := context.WithCancel(context.Background())
  respCh, err := client.Download(ctx, &Req{})
  if err != nil {
    return err
  }

  for v := range respCh {
    if v.Err != nil {
      // Oh no!
      return v.Err
    }
    err = handle(v.Msg)
    if err != nil {
      cancel()
      return err
    }
  }
  return nil
}

Advantages of this design

No special constructors are necessary.

Channels don't require as much documentation as the interface types. Concepts map more cleanly: You close(ch) a channel to close the stream, for example.

User code is a bit simpler, to my eyes.

Generated code will be easier to write (especially in its interactions with context cancelations).

If messages are being transmitted too slowly, senders always have access to a context they can cancel to abort, which is very flexible.

It's obvious how recipients can add buffering on the read side: they copy from the unbuffered channel that comes from generated code into a buffered channel of their own.

Disadvantages of this design

Canceling as the recipient of a stream can be a little clumsy, and is not symmetric: servers return a special error, while clients cancel a context.

We have to demand users set one of Err and Msg in the struct union type-ish thing, but can't enforce this.

@fritzherald

This comment has been minimized.

fritzherald commented Jun 7, 2018

Rad!! Love it. I'll try to update my PR next week with an implementation of the <-chan RespOrError API for the Download rpc

@fritzherald fritzherald referenced this issue Jul 11, 2018

Closed

iOS client #118

@jessfraz

This comment has been minimized.

jessfraz commented Oct 31, 2018

I just wanted to say I am using the branch v6_streams_alpha_downloads and it works amazingly thanks! Hope to see this feature soon :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment