Skip to content
This repository
Browse code

Merge pull request #945 from huntc/http-pipelining

Http pipelining support
  • Loading branch information...
commit a09dfd3f032ca7f8ed4bd1cd8f1f1d550ab6d81b 2 parents 289ca2c + d280228
James Roper authored April 12, 2013
25  framework/project/Build.scala
@@ -36,10 +36,10 @@ object BuildSettings {
36 36
     scalaVersion := buildScalaVersion,
37 37
     scalaBinaryVersion := CrossVersion.binaryScalaVersion(buildScalaVersion),
38 38
     ivyLoggingLevel := UpdateLogging.DownloadOnly,
39  
-    publishTo := Some(playRepository),
  39
+    publishTo := Some(publishingMavenRepository),
40 40
     javacOptions ++= Seq("-source", "1.6", "-target", "1.6", "-encoding", "UTF-8"),
41 41
     javacOptions in doc := Seq("-source", "1.6"),
42  
-    resolvers += typesafe)
  42
+    resolvers ++= typesafeResolvers)
43 43
 
44 44
   def PlaySharedJavaProject(name: String, dir: String, testBinaryCompatibility: Boolean = false): Project = {
45 45
     val bcSettings: Seq[Setting[_]] = if (testBinaryCompatibility) {
@@ -76,7 +76,7 @@ object BuildSettings {
76 76
       .settings(
77 77
         scalaVersion := buildScalaVersionForSbt,
78 78
         scalaBinaryVersion := CrossVersion.binaryScalaVersion(buildScalaVersionForSbt),
79  
-        publishTo := Some(playRepository),
  79
+        publishTo := Some(publishingMavenRepository),
80 80
         publishArtifact in packageDoc := false,
81 81
         publishArtifact in (Compile, packageSrc) := false,
82 82
         scalacOptions ++= Seq("-encoding", "UTF-8", "-Xlint", "-deprecation", "-unchecked"))
@@ -89,14 +89,17 @@ object Resolvers {
89 89
 
90 90
   import BuildSettings._
91 91
 
92  
-  val playLocalRepository = Resolver.file("Play Local Repository", file("../repository/local"))(Resolver.ivyStylePatterns)
93  
-  val typesafe = "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
94  
-  val typesafeReleases = "Typesafe Releases Repository" at "https://typesafe.artifactoryonline.com/typesafe/maven-releases/"
95  
-  val typesafeSnapshot = "Typesafe Snapshots Repository" at "https://typesafe.artifactoryonline.com/typesafe/maven-snapshots/"
96  
-  val playRepository = if (buildVersion.endsWith("SNAPSHOT")) typesafeSnapshot else typesafeReleases
  92
+  val typesafeReleases = "Typesafe Releases Repository" at "http://repo.typesafe.com/typesafe/releases/"
  93
+  val typesafeSnapshots = "Typesafe Snapshots Repository" at "http://repo.typesafe.com/typesafe/snapshots/"
  94
+  val typesafeMavenReleases = "Typesafe Maven Releases Repository" at "https://typesafe.artifactoryonline.com/typesafe/maven-releases/"
  95
+  val typesafeMavenSnapshots = "Typesafe Maven Snapshots Repository" at "https://typesafe.artifactoryonline.com/typesafe/maven-snapshots/"
97 96
   val typesafeIvyReleases = Resolver.url("Typesafe Ivy Releases Repository", url("https://typesafe.artifactoryonline.com/typesafe/ivy-releases/"))(Resolver.ivyStylePatterns)
98  
-  val typesafeIvySnapshot = Resolver.url("Typesafe Ivy Snapshots Repository", url("https://typesafe.artifactoryonline.com/typesafe/ivy-snapshots/"))(Resolver.ivyStylePatterns)
99  
-  val playIvyRepository = if (buildVersion.endsWith("SNAPSHOT")) typesafeIvySnapshot else typesafeIvyReleases
  97
+  val typesafeIvySnapshots = Resolver.url("Typesafe Ivy Snapshots Repository", url("https://typesafe.artifactoryonline.com/typesafe/ivy-snapshots/"))(Resolver.ivyStylePatterns)
  98
+
  99
+  val isSnapshotBuild = buildVersion.endsWith("SNAPSHOT")
  100
+  val typesafeResolvers = if (isSnapshotBuild) Seq(typesafeReleases, typesafeSnapshots) else Seq(typesafeReleases)
  101
+  val publishingMavenRepository = if (isSnapshotBuild) typesafeMavenSnapshots else typesafeMavenReleases
  102
+  val publishingIvyRepository = if (isSnapshotBuild) typesafeIvySnapshots else typesafeIvyReleases
100 103
 }
101 104
 
102 105
 
@@ -201,7 +204,7 @@ object PlayBuild extends Build {
201 204
       libraryDependencies += "com.typesafe.sbtidea" % "sbt-idea" % "1.1.1" extra("sbtVersion" -> buildSbtVersionBinaryCompatible, "scalaVersion" -> buildScalaVersionForSbt),
202 205
       libraryDependencies += "org.specs2" %% "specs2" % "1.12.3" % "test" exclude("javax.transaction", "jta"),
203 206
       libraryDependencies += "org.scala-sbt" % "sbt" % buildSbtVersion % "provided",
204  
-      publishTo := Some(playIvyRepository)
  207
+      publishTo := Some(publishingIvyRepository)
205 208
     ).dependsOn(SbtLinkProject, PlayExceptionsProject, RoutesCompilerProject, TemplatesCompilerProject, ConsoleProject)
206 209
 
207 210
   // todo this can be 2.10 and not cross-versioned or anything.  GO HOG WILD JAMES!
2  framework/project/Dependencies.scala
@@ -64,6 +64,8 @@ object Dependencies {
64 64
   val runtime = Seq(
65 65
     "io.netty" % "netty" % "3.6.3.Final",
66 66
 
  67
+    "com.typesafe.netty" % "netty-http-pipelining" % "1.0.0",
  68
+
67 69
     "org.slf4j" % "slf4j-api" % "1.6.6",
68 70
     "org.slf4j" % "jul-to-slf4j" % "1.6.6",
69 71
     "org.slf4j" % "jcl-over-slf4j" % "1.6.6",
2  framework/src/play/src/main/scala/play/core/server/NettyServer.scala
@@ -19,6 +19,7 @@ import play.core.server.netty._
19 19
 import java.security.cert.X509Certificate
20 20
 import java.io.{File, FileInputStream}
21 21
 import scala.util.control.NonFatal
  22
+import com.typesafe.netty.http.pipelining.HttpPipeliningHandler
22 23
 
23 24
 /**
24 25
  * provides a stopable Server
@@ -54,6 +55,7 @@ class NettyServer(appProvider: ApplicationProvider, port: Int, sslPort: Option[I
54 55
       newPipeline.addLast("decoder", new HttpRequestDecoder(4096, 8192, 8192))
55 56
       newPipeline.addLast("encoder", new HttpResponseEncoder())
56 57
       newPipeline.addLast("decompressor", new HttpContentDecompressor())
  58
+      newPipeline.addLast("http-pipelining", new HttpPipeliningHandler())
57 59
       newPipeline.addLast("handler", defaultUpStreamHandler)
58 60
       newPipeline
59 61
     }
115  framework/src/play/src/main/scala/play/core/server/netty/PlayDefaultUpstreamHandler.scala
@@ -4,18 +4,13 @@ import scala.language.reflectiveCalls
4 4
 
5 5
 import org.jboss.netty.buffer._
6 6
 import org.jboss.netty.channel._
7  
-import org.jboss.netty.bootstrap._
8  
-import org.jboss.netty.channel.Channels._
9 7
 import org.jboss.netty.handler.codec.http._
10  
-import org.jboss.netty.channel.socket.nio._
11  
-import org.jboss.netty.handler.stream._
12 8
 import org.jboss.netty.handler.codec.http.HttpHeaders._
13 9
 import org.jboss.netty.handler.codec.http.HttpHeaders.Names._
14 10
 import org.jboss.netty.handler.codec.http.HttpHeaders.Values._
15 11
 import org.jboss.netty.handler.ssl._
16 12
 
17 13
 import org.jboss.netty.channel.group._
18  
-import java.util.concurrent._
19 14
 import play.core._
20 15
 import server.Server
21 16
 import play.api._
@@ -27,6 +22,7 @@ import play.api.libs.concurrent._
27 22
 import scala.collection.JavaConverters._
28 23
 import scala.util.control.NonFatal
29 24
 import scala.util.control.Exception
  25
+import com.typesafe.netty.http.pipelining.{OrderedDownstreamMessageEvent, OrderedUpstreamMessageEvent}
30 26
 
31 27
 
32 28
 private[server] class PlayDefaultUpstreamHandler(server: Server, allChannels: DefaultChannelGroup) extends SimpleChannelUpstreamHandler with Helpers with WebSocketHandler with RequestBodyHandler {
@@ -131,6 +127,11 @@ private[server] class PlayDefaultUpstreamHandler(server: Server, allChannels: De
131 127
         // attach the cleanup function to the channel context for after cleaning
132 128
         ctx.setAttachment(cleanup _)
133 129
 
  130
+        // It is a pre-requesite that we're using the http pipelining capabilities provided and that we have a
  131
+        // handler downstream from this one that produces these events.
  132
+        implicit val msgCtx = ctx
  133
+        implicit val oue = e.asInstanceOf[OrderedUpstreamMessageEvent]
  134
+
134 135
         // converting netty response to play's
135 136
         val response = new Response {
136 137
 
@@ -173,27 +174,27 @@ private[server] class PlayDefaultUpstreamHandler(server: Server, allChannels: De
173 174
                 // Stream the result
174 175
                 headers.get(CONTENT_LENGTH).map { contentLength =>
175 176
                   val bodyIteratee = {
176  
-                    def step(in:Input[r.BODY_CONTENT]):Iteratee[r.BODY_CONTENT,Unit] = (e.getChannel.isConnected(),in) match {
177  
-                      case (true,Input.El(x)) =>
178  
-                         Iteratee.flatten(
179  
-                           NettyPromise(e.getChannel.write(ChannelBuffers.wrappedBuffer(r.writeable.transform(x))))
180  
-                             .map(_ => if(e.getChannel.isConnected()) Cont(step) else Done((),Input.Empty)))
181  
-                      case (true,Input.Empty) => Cont(step)
182  
-                      case (_,in) => Done((),in)
  177
+                    def step(subsequence: Int)(in: Input[r.BODY_CONTENT]): Iteratee[r.BODY_CONTENT, Unit] = in match {
  178
+                      case Input.El(x) =>
  179
+                        val b = ChannelBuffers.wrappedBuffer(r.writeable.transform(x))
  180
+                        nextWhenComplete(sendDownstream(subsequence, false, b), step(subsequence + 1))
  181
+                      case Input.Empty =>
  182
+                        Cont(step(subsequence))
  183
+                      case Input.EOF =>
  184
+                        sendDownstream(subsequence, true, ChannelBuffers.EMPTY_BUFFER)
  185
+                        Done(())
183 186
                     }
184  
-                    Iteratee.flatten(
185  
-                      NettyPromise(e.getChannel.write(nettyResponse))
186  
-                        .map( _ => if(e.getChannel.isConnected()) Cont(step) else Done((),Input.Empty:Input[r.BODY_CONTENT])))
  187
+                    nextWhenComplete(sendDownstream(0, false, nettyResponse), step(1))
187 188
                   }
188 189
 
189 190
                   (body |>>> bodyIteratee).extend1 {
190 191
                     case Redeemed(_) =>
191 192
                       cleanup()
192 193
                       ctx.setAttachment(null)
193  
-                      if (e.getChannel.isConnected() && !keepAlive) e.getChannel.close()
  194
+                      if (!keepAlive) Channels.close(e.getChannel)
194 195
                     case Thrown(ex) =>
195  
-                      Logger("play").debug(ex.toString) 
196  
-                      if(e.getChannel.isConnected())  e.getChannel.close()
  196
+                      Logger("play").debug(ex.toString)
  197
+                      Channels.close(e.getChannel)
197 198
                   }
198 199
                 }.getOrElse {
199 200
 
@@ -205,21 +206,21 @@ private[server] class PlayDefaultUpstreamHandler(server: Server, allChannels: De
205 206
                     case Input.El(buffer) =>
206 207
                       nettyResponse.setHeader(CONTENT_LENGTH, channelBuffer.readableBytes)
207 208
                       nettyResponse.setContent(buffer)
208  
-                      val f = e.getChannel.write(nettyResponse)
209  
-                      val p = NettyPromise(f)
  209
+                      val f = sendDownstream(0, true, nettyResponse)
210 210
                       if (!keepAlive) f.addListener(ChannelFutureListener.CLOSE)
211  
-                      Iteratee.flatten(p.map(_ => Done(1,Input.Empty:Input[org.jboss.netty.buffer.ChannelBuffer])))
  211
+                      val p = NettyPromise(f)
  212
+                      Iteratee.flatten(p.map(_ => Done(1, Input.Empty:Input[ChannelBuffer])))
212 213
 
213  
-                    case other => Error("unexepected input",other)
  214
+                    case other => Error("unexpected input",other)
214 215
                   })
215 216
                   p.extend1 {
216 217
                     case Redeemed(_) =>
217 218
                       cleanup()
218 219
                       ctx.setAttachment(null)
219  
-                      if (e.getChannel.isConnected() && !keepAlive) e.getChannel.close()
  220
+                      if (!keepAlive) Channels.close(e.getChannel)
220 221
                     case Thrown(ex) =>
221  
-                      Logger("play").debug(ex.toString) 
222  
-                      if(e.getChannel.isConnected())  e.getChannel.close()
  222
+                      Logger("play").debug(ex.toString)
  223
+                      Channels.close(e.getChannel)
223 224
                   }
224 225
                 }
225 226
               }
@@ -251,46 +252,31 @@ private[server] class PlayDefaultUpstreamHandler(server: Server, allChannels: De
251 252
                 nettyResponse.setHeader(TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED)
252 253
                 nettyResponse.setChunked(true)
253 254
                 val bodyIteratee = {
254  
-                    def step(in:Input[r.BODY_CONTENT]):Iteratee[r.BODY_CONTENT,Unit] = (e.getChannel.isConnected(),in) match {
255  
-                      case (true,Input.El(x)) =>
256  
-                         Iteratee.flatten(
257  
-                           NettyPromise(e.getChannel.write(new DefaultHttpChunk(ChannelBuffers.wrappedBuffer(r.writeable.transform(x)))))
258  
-                             .extend1{ 
259  
-                               case Redeemed(_) => if(e.getChannel.isConnected()) Cont(step) else Done((),Input.Empty)
260  
-                               case Thrown(ex) =>
261  
-                                 Logger("play").debug(ex.toString) 
262  
-                                 if(e.getChannel.isConnected())  e.getChannel.close()
263  
-                                 throw ex
264  
-                             })
265  
-                      case (true,Input.Empty) => Cont(step)
266  
-                      case (_,in) => Done((),in)
267  
-                    }
268  
-                    Iteratee.flatten(
269  
-                      NettyPromise(e.getChannel.write(nettyResponse))
270  
-                        .extend1{ 
271  
-                          case Redeemed(_) => if(e.getChannel.isConnected()) Cont(step) else Done((),Input.Empty:Input[r.BODY_CONTENT])
272  
-                          case Thrown(ex) =>
273  
-                            Logger("play").debug(ex.toString) 
274  
-                            if(e.getChannel.isConnected())  e.getChannel.close()
275  
-                            throw ex
276  
-                        })
  255
+                  def step(subsequence: Int)(in:Input[r.BODY_CONTENT]): Iteratee[r.BODY_CONTENT, Unit] = in match {
  256
+                    case Input.El(x) =>
  257
+                      val b = new DefaultHttpChunk(ChannelBuffers.wrappedBuffer(r.writeable.transform(x)))
  258
+                      nextWhenComplete(sendDownstream(subsequence, false, b), step(subsequence + 1))
  259
+                    case Input.Empty =>
  260
+                      Cont(step(subsequence))
  261
+                    case Input.EOF =>
  262
+                      val f = sendDownstream(subsequence, true, HttpChunk.LAST_CHUNK)
  263
+                      val p = NettyPromise(f)
  264
+                      Iteratee.flatten(p.map(_ => Done(())))
  265
+                  }
  266
+                  nextWhenComplete(sendDownstream(0, false, nettyResponse), step(1))
277 267
                 }
278 268
 
279 269
                 chunks apply bodyIteratee.map { _ =>
280 270
                   cleanup()
281  
-                  if (e.getChannel.isConnected()) {
282  
-                    val f = e.getChannel.write(HttpChunk.LAST_CHUNK);
283  
-                    if (!keepAlive)  f.addListener(ChannelFutureListener.CLOSE)
284  
-                  }
  271
+                  ctx.setAttachment(null)
  272
+                  if (!keepAlive) Channels.close(e.getChannel)
285 273
                 }
286 274
               }
287 275
 
288 276
               case _ =>
289  
-                val channelBuffer = ChannelBuffers.dynamicBuffer(512)
290 277
                 val nettyResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(500))
291  
-                nettyResponse.setContent(channelBuffer)
292 278
                 nettyResponse.setHeader(CONTENT_LENGTH, 0)
293  
-                val f = e.getChannel.write(nettyResponse)
  279
+                val f = sendDownstream(0, true, nettyResponse)
294 280
                 if (!keepAlive) f.addListener(ChannelFutureListener.CLOSE)
295 281
             }
296 282
           }
@@ -361,9 +347,7 @@ private[server] class PlayDefaultUpstreamHandler(server: Server, allChannels: De
361 347
           requestHeader.headers.get("Expect").filter(_ == "100-continue").foreach { _ =>
362 348
             eventuallyBodyParser.flatMap(_.unflatten).map {
363 349
               case Step.Cont(k) =>
364  
-                val continue = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE)
365  
-                //TODO wait for the promise of the write
366  
-                e.getChannel.write(continue)
  350
+                sendDownstream(0, true, new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE))
367 351
               case _ =>
368 352
             }
369 353
           }
@@ -410,4 +394,19 @@ private[server] class PlayDefaultUpstreamHandler(server: Server, allChannels: De
410 394
     }
411 395
   }
412 396
 
  397
+  def sendDownstream(subSequence: Int, last: Boolean, message: Object)
  398
+                    (implicit ctx: ChannelHandlerContext, oue: OrderedUpstreamMessageEvent) = {
  399
+    val ode = new OrderedDownstreamMessageEvent(oue, subSequence, last, message)
  400
+    ctx.sendDownstream(ode)
  401
+    ode.getFuture
  402
+  }
  403
+
  404
+  def nextWhenComplete[E](future: ChannelFuture, step: (Input[E]) => Iteratee[E, Unit])
  405
+                      (implicit ctx: ChannelHandlerContext)
  406
+                      : Iteratee[E, Unit] = {
  407
+    Iteratee.flatten(
  408
+      NettyPromise(future)
  409
+        .map(_ => if (ctx.getChannel.isConnected()) Cont(step) else Done((), Input.Empty)))
  410
+  }
  411
+
413 412
 }

0 notes on commit a09dfd3

Please sign in to comment.
Something went wrong with that request. Please try again.