Skip to content

Commit

Permalink
Ensure NPE is always through when VirtualProcessor.onError(null) is i…
Browse files Browse the repository at this point in the history
…nvoked

This fix is similar to akka#24749, fixing a spec violation bug that was
introduced in akka#24722.
  • Loading branch information
jroper committed Jul 5, 2018
1 parent 46b433b commit ca00286
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala
Original file line number Diff line number Diff line change
Expand Up @@ -240,16 +240,13 @@ import scala.util.control.NonFatal
case null
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(null).onError(${t.getMessage}) -> ErrorPublisher")
if (!compareAndSet(null, ErrorPublisher(ex, "failed-VirtualProcessor"))) rec(ex)
else if (t == null) throw ex
case s: Subscription
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onError(${t.getMessage}) -> ErrorPublisher")
if (!compareAndSet(s, ErrorPublisher(ex, "failed-VirtualProcessor"))) rec(ex)
else if (t == null) throw ex
case Both(s)
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(Both($s)).onError(${t.getMessage}) -> ErrorPublisher")
set(Inert)
try tryOnError(s, ex)
finally if (t == null) throw ex // must throw NPE, rule 2.13
tryOnError(s, ex)
case s: Subscriber[_] // spec violation
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onError(${t.getMessage}) -> Inert")
getAndSet(Inert) match {
Expand All @@ -259,14 +256,17 @@ import scala.util.control.NonFatal
case est @ Establishing(_, false, OptionVal.None)
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($est).onError(${t.getMessage}), loop")
if (!compareAndSet(est, est.copy(onErrorBuffered = OptionVal.Some(ex)))) rec(ex)
case other // spec violation or cancellation race, but nothing we can do
if (t == null) throw ex // must throw NPE, rule 2.13
case other
// spec violation or cancellation race, but nothing we can do
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($other).onError(${t.getMessage}). spec violation or cancellation race")
}

val ex = if (t == null) exceptionMustNotBeNullException else t
rec(ex)
// must throw NPE, rule 2.13
if (t == null) {
throw ex
}
}

@tailrec override def onComplete(): Unit = {
Expand Down

0 comments on commit ca00286

Please sign in to comment.