forked from twitter/finagle
-
Notifications
You must be signed in to change notification settings - Fork 0
/
HttpServerDispatcher.scala
124 lines (110 loc) · 4.21 KB
/
HttpServerDispatcher.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package com.twitter.finagle.http.codec
import com.twitter.finagle.Service
import com.twitter.finagle.http._
import com.twitter.finagle.http.exp.{GenSerialServerDispatcher, StreamTransport}
import com.twitter.finagle.stats.{RollupStatsReceiver, StatsReceiver}
import com.twitter.logging.Logger
import com.twitter.util.{Future, Promise, Throwables}
private[http] object HttpServerDispatcher {
val handleHttp10: PartialFunction[Throwable, Response] = {
case _ => Response(Version.Http10, Status.InternalServerError)
}
val handleHttp11: PartialFunction[Throwable, Response] = {
case _ => Response(Version.Http11, Status.InternalServerError)
}
}
private[finagle] class HttpServerDispatcher(
trans: StreamTransport[Response, Request],
service: Service[Request, Response],
stats: StatsReceiver)
extends GenSerialServerDispatcher[Request, Response, Response, Request](trans) {
import HttpServerDispatcher._
private[this] val failureReceiver =
new RollupStatsReceiver(stats.scope("stream")).scope("failures")
trans.onClose.ensure {
service.close()
}
private def mayHaveContent(status: Status): Boolean = status match {
case Status.NoContent | Status.NotModified => false
case _ => true
}
protected def dispatch(m: Request): Future[Response] = m match {
case badReq: BadReq =>
val resp = badReq match {
case _: ContentTooLong =>
Response(badReq.version, Status.RequestEntityTooLarge)
case _: UriTooLong =>
Response(badReq.version, Status.RequestURITooLong)
case _: HeaderFieldsTooLarge =>
Response(badReq.version, Status.RequestHeaderFieldsTooLarge)
case _ =>
Response(badReq.version, Status.BadRequest)
}
// The connection is unusable so we close it here.
// Note that state != Idle while inside dispatch
// so state will be set to Closed but trans.close
// will not be called. Instead, isClosing will be
// set to true, keep-alive headers set correctly
// in handle, and trans.close will be called in
// the respond statement of loop().
close()
Future.value(resp)
case req: Request =>
val handleFn = req.version match {
case Version.Http10 => handleHttp10
case _ => handleHttp11
}
service(req).handle(handleFn)
case invalid =>
Future.exception(new IllegalArgumentException("Invalid message "+invalid))
}
protected def handle(rep: Response): Future[Unit] = {
setKeepAlive(rep, !isClosing)
if (rep.isChunked) {
// We remove content length here in case the content is later
// compressed. This is a pretty bad violation of modularity;
// this is likely an issue with the Netty content
// compressors, which (should?) adjust headers regardless of
// transfer encoding.
rep.headerMap.remove(Fields.ContentLength)
rep.headerMap.set(Fields.TransferEncoding, "chunked")
val p = new Promise[Unit]
val f = trans.write(rep)
f.proxyTo(p)
// This awkwardness is unfortunate but necessary for now as you may be
// interrupted in the middle of a write, or when there otherwise isn’t
// an outstanding read (e.g. read-write race).
f.onFailure { t =>
Logger.get(this.getClass.getName).debug(t, "Failed mid-stream. Terminating stream, closing connection")
failureReceiver.counter(Throwables.mkString(t): _*).incr()
rep.reader.discard()
}
p.setInterruptHandler { case intr =>
rep.reader.discard()
f.raise(intr)
}
p
} else {
// Ensure Content-Length is set if not chunked
if (mayHaveContent(rep.status) && rep.contentLength.isEmpty)
rep.contentLength = rep.content.length
trans.write(rep)
}
}
protected def setKeepAlive(rep: Response, keepAlive: Boolean): Unit = {
rep.version match {
case Version.Http10 =>
if (keepAlive) {
rep.headers.set(Fields.Connection, "keep-alive")
} else {
rep.headers.remove(Fields.Connection)
}
case Version.Http11 =>
if (keepAlive) {
rep.headers.remove(Fields.Connection)
} else {
rep.headers.set(Fields.Connection, "close")
}
}
}
}