Skip to content

Loading…

Fix http server chunking: Fail rep.writer.write on transport failure #242

Closed
wants to merge 1 commit into from

3 participants

@jdanbrown

Further, would it be appropriate to use rep.writer.fail(e) instead of rep.reader.discard here? That would provide a more informative error than ReaderDiscarded, but I'm not sure if it assumes too much about the implementation of Message.reader and Message.writer.

@luciferous

rep.reader.discard seems fine, but it also seems reasonable for an onFailure handler to invoke fail. I'm not sure which to go with (edit: rep.reader doesn't have fail so we're left with discard). We're currently working on a similar issue encompassing this patch. I think the desired behavior is to interrupt streaming immediately and not wait for the next trans.write before invoking discard, something like:

val p = new Promise[Unit]
val f = trans.write(rep) before streamChunks(rep.reader)
p setInterruptHandler { case intr => rep.reader.discard() }
p

What do you think?

@jdanbrown

Hmm, I think I don't understand:

  1. In my patch, discard will be invoked in two cases: (a) via an r.read failure if rep.writer.fail is called, (b) via a trans.write failure if trans fails, e.g. closed socket. What's the undesirable behavior in there, and how could we do better?

  2. In your code sketch, p and f aren't related—did you mean for them to be? e.g. p never actually fulfills.

@luciferous
  1. Try writing a test for transport closure before the dispatch and another after the dispatch. You'll see why setInterruptHandler is also needed.
  2. Sorry I think I left out a p.become.
@mosesn

@jdanbrown @luciferous Just wanted to follow up on this patch. What's the latest status?

@jdanbrown

@luciferous Did you guys finish the fix you mentioned in your initial reply?

@jdanbrown

@luciferous Great, thanks. 9205238 clearly subsumes and improves upon the small patch in this PR:

-        trans.write(rep) before streamChunks(rep.reader)
+        trans.write(rep) before streamChunks(rep.reader) onFailure { _ => rep.reader.discard }
-        trans.write(rep) before streamChunks(rep.reader)
+        val p = new Promise[Unit]
+        val f = trans.write(rep) before streamChunks(rep.reader)
+        // This awkwardness is unfortunate but necessary for now as you may be
+        // interrupted in the middle of a write, or when there otherwise isn’t
+        // an outstanding read (e.g. read-write race).
+        p.become(f onFailure { _ => rep.reader.discard() })
+        p setInterruptHandler { case _ => rep.reader.discard() }
+        p

@mosesn Closing as subsumed by 9205238.

@jdanbrown jdanbrown closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
View
2 finagle-http/src/main/scala/com/twitter/finagle/http/codec/HttpServerDispatcher.scala
@@ -59,7 +59,7 @@ class HttpServerDispatcher[REQUEST <: Request](
protected def handle(response: HttpResponse): Future[Unit] = response match {
case rep: Response =>
if (rep.isChunked) {
- trans.write(rep) before streamChunks(rep.reader)
+ trans.write(rep) before streamChunks(rep.reader) onFailure { _ => rep.reader.discard }
} else {
// Ensure Content-Length is set if not chunked
if (!rep.headers.contains(HttpHeaders.Names.CONTENT_LENGTH))
Something went wrong with that request. Please try again.