Skip to content

Commit

Permalink
Akka Typed cleanups (#64)
Browse files Browse the repository at this point in the history
Akka Typed cleanups
  • Loading branch information
octonato committed Nov 19, 2019
2 parents a76838f + a63be31 commit 4df7a23
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 17 deletions.
4 changes: 1 addition & 3 deletions play-java-websocket-example/app/Module.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,15 @@ public ActorRef<UserParentActor.Create> get() {
@Singleton
public static class UserActorFactoryProvider implements Provider<UserActor.Factory> {
private final ActorRef<StocksActor.GetStocks> stocksActor;
private final Materializer mat;

@Inject
public UserActorFactoryProvider(ActorRef<StocksActor.GetStocks> stocksActor, Materializer mat) {
this.stocksActor = stocksActor;
this.mat = mat;
}

@Override
public UserActor.Factory get() {
return id -> UserActor.create(id, stocksActor, mat);
return id -> UserActor.create(id, stocksActor);
}
}
}
21 changes: 14 additions & 7 deletions play-java-websocket-example/app/actors/UserActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ public String toString() {
}
}

private static final class InternalStop implements Message {
private static final InternalStop INSTANCE = new InternalStop();
public static InternalStop get() {
return INSTANCE;
}
private InternalStop() {}
}

private final Duration timeout = Duration.of(5, ChronoUnit.MILLIS);

private final Map<String, UniqueKillSwitch> stocksMap = new HashMap<>();
Expand All @@ -79,19 +87,17 @@ public String toString() {
private final Sink<JsonNode, NotUsed> hubSink;
private final Flow<JsonNode, JsonNode, NotUsed> websocketFlow;

public static Behavior<Message> create(String id,
ActorRef<GetStocks> stocksActor,
Materializer mat) {
return Behaviors.setup(context -> new UserActor(id, stocksActor, mat, context).behavior());
public static Behavior<Message> create(String id, ActorRef<GetStocks> stocksActor) {
return Behaviors.setup(context -> new UserActor(id, stocksActor, context).behavior());
}

@Inject
public UserActor(String id,
ActorRef<GetStocks> stocksActor,
Materializer mat, ActorContext<Message> context) {
ActorContext<Message> context) {
this.id = id;
this.stocksActor = stocksActor;
this.mat = mat;
this.mat = Materializer.matFromSystem(context.getSystem());
this.scheduler = context.getSystem().scheduler();
this.context = context;

Expand All @@ -116,7 +122,7 @@ public UserActor(String id,
//.log("actorWebsocketFlow", logger)
.watchTermination((n, stage) -> {
// When the flow shuts down, make sure this actor also stops.
stage.thenAccept(f -> context.stop(context.getSelf())); // XXX: is self a child?
context.pipeToSelf(stage, (Done _done, Throwable _throwable) -> InternalStop.get());
return NotUsed.getInstance();
});
}
Expand All @@ -135,6 +141,7 @@ public Behavior<Message> behavior() {
unwatchStocks(unwatchStocks.symbols);
return Behaviors.same();
})
.onMessageEquals(InternalStop.get(), Behaviors::stopped)
.onSignal(PostStop.class, _postStop -> {
// If this actor is killed directly, stop anything that we started running explicitly.
context.getLog().info("Stopping actor {}", context.getSelf());
Expand Down
18 changes: 11 additions & 7 deletions play-scala-websocket-example/app/actors/UserActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import javax.inject._

import actors.StocksActor.{ GetStocks, Stocks }
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors }
import akka.actor.typed.{ ActorRef, Behavior, PostStop, Scheduler }
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, PostStop, Scheduler }
import akka.stream._
import akka.stream.scaladsl._
import akka.util.Timeout
Expand All @@ -15,26 +15,25 @@ import stocks._

import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.Try

/**
* Creates a user actor that sets up the websocket stream. Although it's not required,
* having an actor manage the stream helps with lifecycle and monitoring, and also helps
* with dependency injection through the AkkaGuiceSupport trait.
*
* @param stocksActor the actor responsible for stocks and their streams
* @param ec implicit CPU bound execution context.
*/
class UserActor @Inject()(id: String, stocksActor: ActorRef[GetStocks])(implicit
mat: Materializer,
ec: ExecutionContext,
scheduler: Scheduler,
context: ActorContext[UserActor.Message],
) {
import UserActor._

val log: Logger = context.log

implicit val timeout = Timeout(50.millis)
implicit val timeout: Timeout = Timeout(50.millis)
implicit val system: ActorSystem[Nothing] = context.system
import context.executionContext

val (hubSink, hubSource) = MergeHub.source[JsValue](perProducerBufferSize = 16)
.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.both)
Expand All @@ -58,6 +57,9 @@ class UserActor @Inject()(id: String, stocksActor: ActorRef[GetStocks])(implicit
case UnwatchStocks(symbols) =>
unwatchStocks(symbols)
Behaviors.same

case InternalStop =>
Behaviors.stopped
}.receiveSignal {
case (_, PostStop) =>
// If this actor is killed directly, stop anything that we started running explicitly.
Expand All @@ -78,7 +80,7 @@ class UserActor @Inject()(id: String, stocksActor: ActorRef[GetStocks])(implicit
// from the browse), using a coupled sink and source.
Flow.fromSinkAndSourceCoupled(jsonSink, hubSource).watchTermination() { (_, termination) =>
// When the flow shuts down, make sure this actor also stops.
termination.foreach((_: Done) => context.stop(context.self)) // XXX: is self a child?
context.pipeToSelf(termination)((_: Try[Done]) => InternalStop)
NotUsed
}
}
Expand Down Expand Up @@ -160,6 +162,8 @@ object UserActor {
require(symbols.nonEmpty, "Must specify at least one symbol!")
}

private case object InternalStop extends Message

trait Factory {
def apply(id: String): Behavior[Message]
}
Expand Down

0 comments on commit 4df7a23

Please sign in to comment.