Skip to content

Commit

Permalink
Merge pull request akka#24383 from jrudolph/jr/23668-watchWith-warning
Browse files Browse the repository at this point in the history
!act akka#23668 fail watch/watchWith when previous watch to same subject registered different message
  • Loading branch information
jrudolph committed Jan 24, 2018
2 parents 44fdca0 + 1064b7f commit b90a116
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 33 deletions.
202 changes: 179 additions & 23 deletions akka-actor-typed-tests/src/test/scala/akka/actor/typed/WatchSpec.scala
Expand Up @@ -3,12 +3,20 @@
*/
package akka.actor.typed

import akka.Done
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.testkit.EventFilter
import akka.testkit.typed.scaladsl.TestProbe

import scala.concurrent._
import scala.concurrent.duration._
import akka.testkit.typed.TestKit
import com.typesafe.config.ConfigFactory

object WatchSpec {
val config = ConfigFactory.parseString("""akka.loggers = ["akka.testkit.TestEventListener"]""")

case object Stop

val terminatorBehavior =
Expand All @@ -17,54 +25,202 @@ object WatchSpec {
}

sealed trait Message
case object CustomTerminationMessage extends Message
case class StartWatchingWith(watchee: ActorRef[Stop.type], msg: CustomTerminationMessage.type) extends Message
sealed trait CustomTerminationMessage extends Message
case object CustomTerminationMessage extends CustomTerminationMessage
case object CustomTerminationMessage2 extends CustomTerminationMessage
case class StartWatching(watchee: ActorRef[Stop.type]) extends Message
case class StartWatchingWith(watchee: ActorRef[Stop.type], msg: CustomTerminationMessage) extends Message
}

class WatchSpec extends TestKit("WordSpec")
class WatchSpec extends TestKit("WordSpec", WatchSpec.config)
with TypedAkkaSpecWithShutdown {
implicit def untypedSystem = system.toUntyped

import WatchSpec._

"Actor monitoring" must {
"get notified of actor termination" in {
case class StartWatching(watchee: ActorRef[Stop.type])
class WatchSetup {
val terminator = systemActor(terminatorBehavior)
val receivedTerminationSignal: Promise[ActorRef[Nothing]] = Promise()
val watchProbe = TestProbe[Done]()

val watcher = systemActor(Behaviors.immutable[StartWatching] {
case (ctx, StartWatching(watchee))
ctx.watch(watchee)
Behaviors.same
}.onSignal {
case (_, Terminated(stopped))
receivedTerminationSignal.success(stopped)
Behaviors.stopped
})
val watcher = systemActor(
Behaviors.supervise(
Behaviors.immutable[StartWatching] {
case (ctx, StartWatching(watchee))
ctx.watch(watchee)
watchProbe.ref ! Done
Behaviors.same
}.onSignal {
case (_, Terminated(stopped))
receivedTerminationSignal.success(stopped)
Behaviors.stopped
}
).onFailure[Throwable](SupervisorStrategy.stop))
}
"get notified of actor termination" in new WatchSetup {
watcher ! StartWatching(terminator)
watchProbe.expectMsg(Done)
terminator ! Stop

receivedTerminationSignal.future.futureValue shouldEqual terminator
}
"allow idempotent invocations of watch" in new WatchSetup {
watcher ! StartWatching(terminator)
watchProbe.expectMsg(Done)
// shouldn't fail when watched twice
watcher ! StartWatching(terminator)
watchProbe.expectMsg(Done)
terminator ! Stop

receivedTerminationSignal.future.futureValue shouldEqual terminator
}

"get notified of actor termination with a custom message" in {
class WatchWithSetup {
val terminator = systemActor(terminatorBehavior)
val receivedTerminationSignal: Promise[Message] = Promise()
val watchProbe = TestProbe[Done]()

val watcher = systemActor(Behaviors.immutable[Message] {
case (ctx, StartWatchingWith(watchee, msg))
ctx.watchWith(watchee, msg)
Behaviors.same
case (_, msg)
receivedTerminationSignal.success(msg)
Behaviors.stopped
})
val watcher = systemActor(
Behaviors.supervise(
Behaviors.immutable[Message] {
case (ctx, StartWatchingWith(watchee, msg))
ctx.watchWith(watchee, msg)
watchProbe.ref ! Done
Behaviors.same
case (_, msg)
receivedTerminationSignal.success(msg)
Behaviors.stopped
}).onFailure[Throwable](SupervisorStrategy.stop)
)
}
"get notified of actor termination with a custom message" in new WatchWithSetup {
watcher ! StartWatchingWith(terminator, CustomTerminationMessage)
watchProbe.expectMsg(Done)
terminator ! Stop

receivedTerminationSignal.future.futureValue shouldEqual CustomTerminationMessage
}
"allow idempotent invocations of watchWith with matching msgs" in new WatchWithSetup {
watcher ! StartWatchingWith(terminator, CustomTerminationMessage)
watchProbe.expectMsg(Done)
// shouldn't fail when watchWith'd twice
watcher ! StartWatchingWith(terminator, CustomTerminationMessage)
watchProbe.expectMsg(Done)
terminator ! Stop

receivedTerminationSignal.future.futureValue shouldEqual CustomTerminationMessage
}

"allow watch message definition after watch using unwatch" in {
val terminator = systemActor(terminatorBehavior)
val receivedTerminationSignal: Promise[Message] = Promise()
val watchProbe = TestProbe[Done]()

val watcher = systemActor(
Behaviors.supervise(
Behaviors.immutable[Message] {
case (ctx, StartWatching(watchee))
ctx.watch(watchee)
Behaviors.same
case (ctx, StartWatchingWith(watchee, msg))
ctx.unwatch(watchee)
ctx.watchWith(watchee, msg)
watchProbe.ref ! Done
Behaviors.same
case (_, msg)
receivedTerminationSignal.success(msg)
Behaviors.stopped
}).onFailure[Throwable](SupervisorStrategy.stop)
)

watcher ! StartWatching(terminator)
watcher ! StartWatchingWith(terminator, CustomTerminationMessage)
watchProbe.expectMsg(Done)
terminator ! Stop

receivedTerminationSignal.future.futureValue shouldEqual CustomTerminationMessage
}

"allow watch message redefinition using unwatch" in {
val terminator = systemActor(terminatorBehavior)
val receivedTerminationSignal: Promise[Message] = Promise()
val watchProbe = TestProbe[Done]()

val watcher = systemActor(
Behaviors.supervise(
Behaviors.immutable[Message] {
case (ctx, StartWatchingWith(watchee, msg))
ctx.unwatch(watchee)
ctx.watchWith(watchee, msg)
watchProbe.ref ! Done
Behaviors.same
case (_, msg)
receivedTerminationSignal.success(msg)
Behaviors.stopped
}).onFailure[Throwable](SupervisorStrategy.stop)
)

watcher ! StartWatchingWith(terminator, CustomTerminationMessage)
watcher ! StartWatchingWith(terminator, CustomTerminationMessage2)
watchProbe.expectMsg(Done)
terminator ! Stop

receivedTerminationSignal.future.futureValue shouldEqual CustomTerminationMessage2
}

class ErrorTestSetup {
val terminator = systemActor(terminatorBehavior)
private val stopProbe = TestProbe[Done]()

val watcher = systemActor(
Behaviors.supervise(
Behaviors.immutable[Message] {
case (ctx, StartWatchingWith(watchee, msg))
ctx.watchWith(watchee, msg)
Behaviors.same
case (ctx, StartWatching(watchee))
ctx.watch(watchee)
Behaviors.same
case (_, msg)
Behaviors.stopped
}.onSignal {
case (_, PostStop)
Behaviors.stopped
}
).onFailure[Throwable](SupervisorStrategy.stop)
)

def expectStopped(): Unit = stopProbe.expectTerminated(watcher, 1.second)
}

"fail when watch is used after watchWith on same subject" in new ErrorTestSetup {
watcher ! StartWatchingWith(terminator, CustomTerminationMessage)

EventFilter[IllegalStateException](pattern = ".*termination message was not overwritten.*", occurrences = 1) intercept {
watcher ! StartWatching(terminator)
}
// supervisor should have stopped the actor
expectStopped()
}

"fail when watchWitch is used after watchWith with different termination message" in new ErrorTestSetup {
watcher ! StartWatchingWith(terminator, CustomTerminationMessage)

EventFilter[IllegalStateException](pattern = ".*termination message was not overwritten.*", occurrences = 1) intercept {
watcher ! StartWatchingWith(terminator, CustomTerminationMessage2)
}
// supervisor should have stopped the actor
expectStopped()
}
"fail when watchWith is used after watch on same subject" in new ErrorTestSetup {
watcher ! StartWatching(terminator)

EventFilter[IllegalStateException](pattern = ".*termination message was not overwritten.*", occurrences = 1) intercept {
watcher ! StartWatchingWith(terminator, CustomTerminationMessage)
}
// supervisor should have stopped the actor
expectStopped()
}
}
}
Expand Up @@ -107,6 +107,11 @@ trait ActorContext[T] {
* given [[ActorRef]] terminates. This message is also sent when the watched actor
* is on a node that has been removed from the cluster when using akka-cluster
* or has been marked unreachable when using akka-remote directly.
*
* `watch` is idempotent if it is not mixed with `watchWith`.
*
* It will fail with an [[IllegalStateException]] if the same subject was watched before using `watchWith`.
* To clear the termination message, unwatch first.
*/
def watch[U](other: ActorRef[U]): Unit

Expand All @@ -115,6 +120,11 @@ trait ActorContext[T] {
* given [[ActorRef]] terminates. This message is also sent when the watched actor
* is on a node that has been removed from the cluster when using akka-cluster
* or has been marked unreachable when using akka-remote directly.
*
* `watchWith` is idempotent if it is called with the same `msg` and not mixed with `watch`.
*
* It will fail with an [[IllegalStateException]] if the same subject was watched before using `watch` or `watchWith` with
* another termination message. To change the termination message, unwatch first.
*/
def watchWith[U](other: ActorRef[U], msg: T): Unit

Expand Down
Expand Up @@ -94,6 +94,11 @@ trait ActorContext[T] { this: akka.actor.typed.javadsl.ActorContext[T] ⇒
* given [[ActorRef]] terminates. This message is also sent when the watched actor
* is on a node that has been removed from the cluster when using akka-cluster
* or has been marked unreachable when using akka-remote directly
*
* `watch` is idempotent if it is not mixed with `watchWith`.
*
* It will fail with an [[IllegalStateException]] if the same subject was watched before using `watchWith`.
* To clear the termination message, unwatch first.
*/
def watch[U](other: ActorRef[U]): Unit

Expand All @@ -102,6 +107,11 @@ trait ActorContext[T] { this: akka.actor.typed.javadsl.ActorContext[T] ⇒
* given [[ActorRef]] terminates. This message is also sent when the watched actor
* is on a node that has been removed from the cluster when using akka-cluster
* or has been marked unreachable when using akka-remote directly.
*
* `watchWith` is idempotent if it is called with the same `msg` and not mixed with `watch`.
*
* It will fail with an [[IllegalStateException]] if the same subject was watched before using `watch` or `watchWith` with
* another termination message. To change the termination message, unwatch first.
*/
def watchWith[U](other: ActorRef[U], msg: T): Unit

Expand Down
12 changes: 12 additions & 0 deletions akka-actor/src/main/scala/akka/actor/ActorCell.scala
Expand Up @@ -148,6 +148,12 @@ trait ActorContext extends ActorRefFactory {
* Registers this actor as a Monitor for the provided ActorRef.
* This actor will receive a Terminated(subject) message when watched
* actor is terminated.
*
* `watch` is idempotent if it is not mixed with `watchWith`.
*
* It will fail with an [[IllegalStateException]] if the same subject was watched before using `watchWith`.
* To clear the termination message, unwatch first.
*
* @return the provided ActorRef
*/
def watch(subject: ActorRef): ActorRef
Expand All @@ -156,6 +162,12 @@ trait ActorContext extends ActorRefFactory {
* Registers this actor as a Monitor for the provided ActorRef.
* This actor will receive the specified message when watched
* actor is terminated.
*
* `watchWith` is idempotent if it is called with the same `msg` and not mixed with `watch`.
*
* It will fail with an [[IllegalStateException]] if the same subject was watched before using `watch` or `watchWith` with
* another termination message. To change the termination message, unwatch first.
*
* @return the provided ActorRef
*/
def watchWith(subject: ActorRef, msg: Any): ActorRef
Expand Down
38 changes: 28 additions & 10 deletions akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala
Expand Up @@ -23,22 +23,28 @@ private[akka] trait DeathWatch { this: ActorCell ⇒

override final def watch(subject: ActorRef): ActorRef = subject match {
case a: InternalActorRef
if (a != self && !watchingContains(a)) {
maintainAddressTerminatedSubscription(a) {
a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
watching = watching.updated(a, None)
}
if (a != self) {
if (!watchingContains(a))
maintainAddressTerminatedSubscription(a) {
a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
updateWatching(a, None)
}
else
checkWatchingSame(a, None)
}
a
}

override final def watchWith(subject: ActorRef, msg: Any): ActorRef = subject match {
case a: InternalActorRef
if (a != self && !watchingContains(a)) {
maintainAddressTerminatedSubscription(a) {
a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
watching = watching.updated(a, Some(msg))
}
if (a != self) {
if (!watchingContains(a))
maintainAddressTerminatedSubscription(a) {
a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
updateWatching(a, Some(msg))
}
else
checkWatchingSame(a, Some(msg))
}
a
}
Expand Down Expand Up @@ -111,6 +117,18 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
if (subject.path.uid != ActorCell.undefinedUid) (map - subject) - new UndefinedUidActorRef(subject)
else map filterKeys (_.path != subject.path)

private def updateWatching(ref: InternalActorRef, newMessage: Option[Any]): Unit =
watching = watching.updated(ref, newMessage)

/** Call only if it was checked before that `watching contains ref` */
private def checkWatchingSame(ref: InternalActorRef, newMessage: Option[Any]): Unit = {
val previous = watching.get(ref).get
if (previous != newMessage)
throw new IllegalStateException(
s"Watch($self, $ref) termination message was not overwritten from [$previous] to [$newMessage]. " +
s"If this was intended, unwatch first before using `watch` / `watchWith` with another message.")
}

protected def tellWatchersWeDied(): Unit =
if (!watchedBy.isEmpty) {
try {
Expand Down

0 comments on commit b90a116

Please sign in to comment.