Skip to content
This repository
Browse code

fix a bug when body parser finishs before the end of the stream

  • Loading branch information...
commit ef70e641d9114ff8225332bf18b4dd995bd39bcc 1 parent b68f80f
authored July 15, 2012
64  framework/src/play/src/main/scala/play/core/server/netty/RequestBodyHandler.scala
@@ -24,48 +24,56 @@ import scala.collection.JavaConverters._
24 24
 private[server] trait RequestBodyHandler {
25 25
 
26 26
   def newRequestBodyHandler[R](firstIteratee: Promise[Iteratee[Array[Byte], Either[Result, R]]], allChannels: DefaultChannelGroup, server: Server): (Promise[Iteratee[Array[Byte], Either[Result, R]]], SimpleChannelUpstreamHandler) = {
27  
-    var redeemed = false
  27
+    import scala.concurrent.stm._
  28
+    val redeemed = Ref(false)
28 29
     var p = Promise[Iteratee[Array[Byte], Either[Result, R]]]()
29 30
     val MAX_MESSAGE_WATERMARK = 10
30 31
     val MIN_MESSAGE_WATERMARK = 10
31  
-    import scala.concurrent.stm._
32 32
     val counter = Ref(0)
33 33
 
34 34
     var iteratee: Ref[Iteratee[Array[Byte], Either[Result, R]]] = Ref(Iteratee.flatten(firstIteratee))
35 35
 
36 36
     def pushChunk(ctx: ChannelHandlerContext, chunk: Input[Array[Byte]]) {
37  
-
38  
-      if (counter.single.transformAndGet { _ + 1 } > MAX_MESSAGE_WATERMARK && ctx.getChannel.isOpen())
  37
+      if (counter.single.transformAndGet { _ + 1 } > MAX_MESSAGE_WATERMARK && ctx.getChannel.isOpen() && !redeemed.single())
39 38
         ctx.getChannel.setReadable(false)
40 39
 
41  
-      if (!redeemed) {
42  
-        val itPromise = Promise[Iteratee[Array[Byte], Either[Result, R]]]()
43  
-        val current = iteratee.single.swap(Iteratee.flatten(itPromise))
44  
-        val next = current.pureFlatFold[Array[Byte], Either[Result, R]] {
45  
-          case Step.Done(_, _) => current
46  
-          case Step.Cont(k) => k(chunk)
47  
-          case Step.Error(e, _) => current
48  
-        }
49  
-
50  
-        itPromise.redeem(next)
  40
+      val itPromise = Promise[Iteratee[Array[Byte], Either[Result, R]]]()
  41
+      val current = atomic { implicit txn =>
  42
+        if(!redeemed())
  43
+          Some(iteratee.single.swap(Iteratee.flatten(itPromise)))
  44
+        else None
  45
+      }
51 46
 
52  
-        next.pureFold {
53  
-          case Step.Done(a, e) => if (!redeemed) {
54  
-            p.redeem(next);
55  
-            iteratee = null; p = null; redeemed = true
56  
-            if (ctx.getChannel.isOpen()) ctx.getChannel.setReadable(true)
57  
-          }
58  
-          case Step.Cont(k) =>
59  
-            if (counter.single.transformAndGet { _ - 1 } <= MIN_MESSAGE_WATERMARK && ctx.getChannel.isOpen())
60  
-              ctx.getChannel.setReadable(true)
61  
-
62  
-          case Step.Error(msg, e) =>
63  
-            if (!redeemed) {
64  
-              p.redeem(Done(Left(Results.InternalServerError), e))
65  
-              iteratee = null; p = null; redeemed = true
  47
+      current.foreach{ i =>
  48
+        i.unflatten.extend1 {
  49
+          case Redeemed(Step.Cont(k)) =>
  50
+            val next = k(chunk)
  51
+            continue(next)
  52
+          case Redeemed(finished) =>
  53
+            finish(finished.it)
  54
+          case Thrown(e) =>
  55
+            if (!redeemed.single.swap(true)) {
  56
+              p.throwing(e)
  57
+              iteratee = null; p = null;
66 58
               if (ctx.getChannel.isOpen()) ctx.getChannel.setReadable(true)
67 59
             }
  60
+            itPromise.throwing(e)
  61
+        }
  62
+      }
  63
+
  64
+      def continue(it:Iteratee[Array[Byte], Either[Result, R]]){
  65
+        if (counter.single.transformAndGet { _ - 1 } <= MIN_MESSAGE_WATERMARK && ctx.getChannel.isOpen())
  66
+          ctx.getChannel.setReadable(true)
  67
+        itPromise.redeem(it)
  68
+      }
  69
+
  70
+      def finish(it:Iteratee[Array[Byte], Either[Result, R]]){
  71
+        if (!redeemed.single.swap(true)) {
  72
+          p.redeem(it)
  73
+          iteratee = null; p = null;
  74
+          if (ctx.getChannel.isOpen()) ctx.getChannel.setReadable(true)
68 75
         }
  76
+        itPromise.redeem(it)
69 77
       }
70 78
     }
71 79
 

0 notes on commit ef70e64

Please sign in to comment.
Something went wrong with that request. Please try again.