Skip to content

Commit

Permalink
=str akka#23929 Fix double push message in sub source (akka#24083)
Browse files Browse the repository at this point in the history
* Fix double push message in sub source

* Add name of pushed port in failure message

* Update GraphStageLogicSpec.scala

* Change substreamEmit to SubstreamEmit
  • Loading branch information
mattsu authored and manonthegithub committed Jan 31, 2018
1 parent d42e495 commit e919781
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 2 deletions.
Expand Up @@ -4,17 +4,20 @@

package akka.stream.impl

import akka.stream.stage.GraphStageLogic.{ EagerTerminateOutput, EagerTerminateInput }
import akka.NotUsed
import akka.stream.stage.GraphStageLogic.{ EagerTerminateInput, EagerTerminateOutput }
import akka.stream.testkit.StreamSpec
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage._
import akka.stream.testkit.Utils.assertAllStagesStopped
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.impl.fusing._
import org.scalatest.concurrent.ScalaFutures

import scala.concurrent.duration.Duration

class GraphStageLogicSpec extends StreamSpec with GraphInterpreterSpecKit {
class GraphStageLogicSpec extends StreamSpec with GraphInterpreterSpecKit with ScalaFutures {

implicit val materializer = ActorMaterializer()

Expand All @@ -32,6 +35,29 @@ class GraphStageLogicSpec extends StreamSpec with GraphInterpreterSpecKit {
}
}

class SubstreamEmit extends GraphStage[SourceShape[Source[Int, NotUsed]]] {
val out = Outlet[Source[Int, NotUsed]]("out")
override val shape = SourceShape(out)

override def createLogic(attr: Attributes) = new GraphStageLogic(shape) with OutHandler {

setHandler(out, this)

override def onPull(): Unit = {
val subOut = new SubSourceOutlet[Int]("subOut")
subOut.setHandler(new OutHandler {
override def onPull(): Unit = {
()
}
})
subOut.push(1)
subOut.push(2) // expecting this to fail!

???
}
}
}

object emit5678 extends GraphStage[FlowShape[Int, Int]] {
val in = Inlet[Int]("in")
val out = Outlet[Int]("out")
Expand Down Expand Up @@ -346,6 +372,12 @@ class GraphStageLogicSpec extends StreamSpec with GraphInterpreterSpecKit {
ex.getMessage should startWith("No handler defined in stage [stage-name] for out port [out")
}

"give a good error message if sub source is pushed twice" in {
intercept[Exception] {
Source.fromGraph(new SubstreamEmit()).async.runWith(Sink.ignore).futureValue
}.getCause.getMessage should startWith("requirement failed: Cannot push port (SubSourceOutlet(subOut)) twice, or before it being pulled")
}

}

}
Expand Up @@ -1361,6 +1361,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
* Push to this output port.
*/
def push(elem: T): Unit = {
require(isAvailable, s"Cannot push port ($this) twice, or before it being pulled")
available = false
_source.pushSubstream(elem)
}
Expand Down

0 comments on commit e919781

Please sign in to comment.