Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better streaming response performance #122

Closed
wants to merge 1 commit into from

Conversation

erincandescent
Copy link

Status: Draft

  • I have disabled the bypass at the top of copyChunked for non-flushable streams and ran the entire Typhon testsuite, and experienced no errors doing so that weren't already present (primarily a unix domain socket reuse issue, which seems unlikely to be related)
  • I haven't yet written a direct test case for this code to ensure it doesn't mangle any data flowing through. I'd like to do so before this is merged
  • Before going ahead and writing a test case, I'd like some opinions as to whether we should go with this approach. This ended up a lot longer and more complicated than I expected when I started sketching out how to implement this

Monzonauts: You can find some motivation for this change in Notion

Changes and Motivation description

In various circumstances, we've discovered that Typhon's default way of
doing response streaming produces suboptimal performance. We've often
worked around this by using a Streamer in the middle, as the decoupling
allows the data source to better exploit paralellism.

Golang reconises this issue and has a io.WriterTo type which readers can
implement to avoid this sort of issue, however Typhon does not use this
as it wants to ensure that the underlying ResponseWriter is regularly
flushed.

Therefore we instead implement our own intermediate which can make use
of WriterTo and ensures the ResponseWriter is flushed each time there is
a pause in the source data (determined by the fact that there is no more
data from the source for us to write to the destination)

Description of the implementation I have taken from the code:

Best streaming performance (especially when passing through data from pull based sources,
e.g. a service which is proxying read data from an SFTP server on the ocean) is achieved
by using src.(io.WriterTo).WriteTo(dst). By doing this, we allow the source to take
responsibility for extracting maximum parallelism from whatever underlying protocol is in
use. Especially with these sorts of sources, the fallback src.Read()/dst.Write loop can
produce very pessimistic results.

However, we also wish to ensure that if the source is slow at producing output, we do not
allow said data to sit in the destination's internal buffer forever. To avoid this, we
implement our own writer here which can be used as the target of WriterTo, but will also
regularly flush the underlying writer. Our logic is:

  • We will accept writes from the source into our internal buffer
  • In parallel, we will attempt to empty that internal buffer into the backing writer
  • Each time our internal buffer empties (i.e. we are faster than our source), we will
    flush the underlying stream to avoid keeping the reader waiting

We use an internal ring buffer to handle this

@erincandescent
Copy link
Author

erincandescent commented Apr 27, 2020

A nit of my own:flusherWriter isn't a great name. streamingWriter?

Also, it might potentially make sense to use this to replace typhon.Streamer. Instead of an embedded backing stream, it would itself implement io.WriterTo on its read side (WriterTo would contain the logic currently in Run). This would require a bit of complexity to implement Read to satisfy the interface contract though, but that seems doable

copyChunked would then just see if the response is already a streamer. If it is, it can just call streamer.WriteTo(dst)

Otherwise,

  • Construct a streamer
  • Spawn a goroutine to call streamer.WriteTo(dst)
  • Call src.WriteTo(streamer) or streamer.ReaderFrom(src) as now

In various circumstances, we've discovered that Typhon's default way of
doing response streaming produces suboptimal performance. We've often
worked around this by using a Streamer in the middle, as the decoupling
allows the data source to better exploit paralellism.

Golang reconises this issue and has a io.WriterTo type which readers can
implement to avoid this sort of issue, however Typhon does not use this
as it wants to ensure that the underlying ResponseWriter is regularly
flushed.

Therefore we instead implement our own intermediate which can make use
of WriterTo and ensures the ResponseWriter is flushed each time there is
a pause in the source data (determined by the fact that there is no more
data from the source for us to write to the destination)

Description of the implementation I have taken from the code:

Best streaming performance (especially when passing through data from pull based sources,
e.g. a service which is proxying read data from an SFTP server on the ocean) is achieved
by using src.(io.WriterTo).WriteTo(dst). By doing this, we allow the source to take
responsibility for extracting maximum parallelism from whatever underlying protocol is in
use. Especially with these sorts of sources, the fallback src.Read()/dst.Write loop can
produce very pessimistic results.

However, we also wish to ensure that if the source is slow at producing output, we do not
allow said data to sit in the destination's internal buffer forever. To avoid this, we
implement our own writer here which can be used as the target of WriterTo, but will also
regularly flush the underlying writer. Our logic is:
* We will accept writes from the source into our internal buffer
* In parallel, we will attempt to empty that internal buffer into the backing writer
* Each time our internal buffer empties (i.e. we are faster than our source), we will
  flush the underlying stream to avoid keeping the reader waiting

We use an internal ring buffer to handle this
}

// Otherwise, this is a write
nw, err := f.dst.Write(chunk.buf)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're calling out to the streamer here in a background goroutine, so any panic() inside will crash us.

This probably isn't a concern (we trust net/http, I'd guess?) but a little thing to be aware of. Ideally we might wrap this in some sort of teardown/recover wrapper

Copy link
Member

@rdingwall rdingwall left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a first scan of this, and realised this is a really hard PR to review 😄

It looks like you've thought really deeply about this. The code seems very meticulous at first glance but it's very low level and I'm wondering how I can review it properly.

Shall we go through it together on a call maybe with @obeattie or someone from platform as well?

Is any of this code ported from existing io libraries, which I can compare against?

I'm also thinking, what are some of the ways this could go wrong and how could we mitigate/test that?

  • Bytes don't arrive in the same order/same values at the destination
  • Memory leaks

How can we do a canary deploy of this? (reducing the blast radius if something goes wrong). I wonder if we could use this to stream existing files around and compare hashes to validate that everything was copied verbatim?

// Check interface completeness
var _ io.WriteCloser = &flusherWriter{}
var _ io.ReaderFrom = &flusherWriter{}
var _ http.Flusher = &flusherWriter{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL, I hadn't seen this pattern before

@erincandescent
Copy link
Author

Sorry, yes, this is a bunch of surprisingly low level code (perhaps more interesting, after writing it I realised that I'd ended up writing a few hundred lines of Go to do something which would probably be much shorter in C. This is a rare case where non-blocking I/O is easier than Go's emulation of blocking I/O!)

Is any of this code ported from existing io libraries, which I can compare against?
No, this was written from scratch. I suspect the "passing slices of the underlying ringbuffer through a channel" approach might even be novel, as I've never seen it before, though it did help deeply simplify this code.

I think a review on a call might be a good way to go through it. That said, I think the first question is if we think this approach is reasonable in principle to proceed with (I guess that means whether or not @obeattie finds the prospect of having this code in Typhon too scary for the reward)

In terms of gaining confidence, I think there are three steps I would go through:

Firstly, writing a unit test which streams a few gigabytes of data through this code with random read/write sizes and interspersed special conditions such as "wait for the buffer to fill before draining" and "wait for the buffer to empty before writing"

Secondly, write and deploy a couple of services off of a branch (to staging, perhaps) where one serves takes a seed and a length and returns that much data from the seeded random number generator (especially through a couple of type wrappers designed to force use of Writer vs ReaderFrom vs WriterTo); and the second triggers downloads and verifies the correct data is returned

Finally, I'd deploy this to the Mastercard SFTP service. I realise deploying this to the most important file download service in the company is inherently scary, but I think this concern can be safely mitigated by having that service append the SHA256 hash of the file transferred after the end of the file is downloaded, and having the store service validate that hash before marking the file as complete

@obeattie
Copy link
Contributor

obeattie commented May 1, 2020

This looks exciting, thanks for doing it! I'll take a look in the next few days. 👀

@obeattie obeattie self-assigned this May 1, 2020
@obeattie
Copy link
Contributor

obeattie commented May 2, 2020

I've taken a cursory look at this and I'm very in favor of any change that improves streaming performance 🚀 I don't think I would have any objection in principle to low-level code in Typhon so long as we're sure there's a good reason for its inclusion.

So I can evaluate this, I think I need to better understand the problem being solved. It's clear from the description and Notion link above that you are encountering some poor performance with Mastercard SFTP streaming internally at Monzo, but I'm not entirely clear on how I could reproduce the issue you're seeing in a more contained environment.

What do you think the best way to get that context would be? I'm happy to:

  • jump on a call
  • talk about it on Slack
  • talk about it in these comments
  • receive a test case

I'm leaning toward a call though because I think it's probably the highest-bandwidth medium for synchronizing brain state. Wdyt? 🧠

@obeattie obeattie self-requested a review May 4, 2020 14:32
@obeattie obeattie removed their assignment May 4, 2020
@erincandescent
Copy link
Author

Sad that I was too busy with other work to drive this to completion, and then got tied up in redundancy proceedings.

Since I'm leaving and am probably not going to be using Typhon, I'll close this PR. Someone else can pick it up if they want. There's aspects of this which are definitely useful, so I might look into picking up the general idea as a standalone library

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants