Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Akka Typed cleanups #64

Merged
merged 2 commits into from
Nov 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions play-java-websocket-example/app/Module.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,15 @@ public ActorRef<UserParentActor.Create> get() {
@Singleton
public static class UserActorFactoryProvider implements Provider<UserActor.Factory> {
private final ActorRef<StocksActor.GetStocks> stocksActor;
private final Materializer mat;

@Inject
public UserActorFactoryProvider(ActorRef<StocksActor.GetStocks> stocksActor, Materializer mat) {
this.stocksActor = stocksActor;
this.mat = mat;
}

@Override
public UserActor.Factory get() {
return id -> UserActor.create(id, stocksActor, mat);
return id -> UserActor.create(id, stocksActor);
}
}
}
21 changes: 14 additions & 7 deletions play-java-websocket-example/app/actors/UserActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ public String toString() {
}
}

private static final class InternalStop implements Message {
private static final InternalStop INSTANCE = new InternalStop();
public static InternalStop get() {
return INSTANCE;
}
private InternalStop() {}
}

private final Duration timeout = Duration.of(5, ChronoUnit.MILLIS);

private final Map<String, UniqueKillSwitch> stocksMap = new HashMap<>();
Expand All @@ -79,19 +87,17 @@ public String toString() {
private final Sink<JsonNode, NotUsed> hubSink;
private final Flow<JsonNode, JsonNode, NotUsed> websocketFlow;

public static Behavior<Message> create(String id,
ActorRef<GetStocks> stocksActor,
Materializer mat) {
return Behaviors.setup(context -> new UserActor(id, stocksActor, mat, context).behavior());
public static Behavior<Message> create(String id, ActorRef<GetStocks> stocksActor) {
return Behaviors.setup(context -> new UserActor(id, stocksActor, context).behavior());
}

@Inject
public UserActor(String id,
ActorRef<GetStocks> stocksActor,
Materializer mat, ActorContext<Message> context) {
ActorContext<Message> context) {
this.id = id;
this.stocksActor = stocksActor;
this.mat = mat;
this.mat = Materializer.matFromSystem(context.getSystem());
this.scheduler = context.getSystem().scheduler();
this.context = context;

Expand All @@ -116,7 +122,7 @@ public UserActor(String id,
//.log("actorWebsocketFlow", logger)
.watchTermination((n, stage) -> {
// When the flow shuts down, make sure this actor also stops.
stage.thenAccept(f -> context.stop(context.getSelf())); // XXX: is self a child?
context.pipeToSelf(stage, (Done _done, Throwable _throwable) -> InternalStop.get());
return NotUsed.getInstance();
});
}
Expand All @@ -135,6 +141,7 @@ public Behavior<Message> behavior() {
unwatchStocks(unwatchStocks.symbols);
return Behaviors.same();
})
.onMessageEquals(InternalStop.get(), Behaviors::stopped)
.onSignal(PostStop.class, _postStop -> {
// If this actor is killed directly, stop anything that we started running explicitly.
context.getLog().info("Stopping actor {}", context.getSelf());
Expand Down
18 changes: 11 additions & 7 deletions play-scala-websocket-example/app/actors/UserActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import javax.inject._

import actors.StocksActor.{ GetStocks, Stocks }
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors }
import akka.actor.typed.{ ActorRef, Behavior, PostStop, Scheduler }
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, PostStop, Scheduler }
import akka.stream._
import akka.stream.scaladsl._
import akka.util.Timeout
Expand All @@ -15,26 +15,25 @@ import stocks._

import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.Try

/**
* Creates a user actor that sets up the websocket stream. Although it's not required,
* having an actor manage the stream helps with lifecycle and monitoring, and also helps
* with dependency injection through the AkkaGuiceSupport trait.
*
* @param stocksActor the actor responsible for stocks and their streams
* @param ec implicit CPU bound execution context.
*/
class UserActor @Inject()(id: String, stocksActor: ActorRef[GetStocks])(implicit
mat: Materializer,
ec: ExecutionContext,
scheduler: Scheduler,
context: ActorContext[UserActor.Message],
) {
import UserActor._

val log: Logger = context.log

implicit val timeout = Timeout(50.millis)
implicit val timeout: Timeout = Timeout(50.millis)
implicit val system: ActorSystem[Nothing] = context.system
import context.executionContext

val (hubSink, hubSource) = MergeHub.source[JsValue](perProducerBufferSize = 16)
.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.both)
Expand All @@ -58,6 +57,9 @@ class UserActor @Inject()(id: String, stocksActor: ActorRef[GetStocks])(implicit
case UnwatchStocks(symbols) =>
unwatchStocks(symbols)
Behaviors.same

case InternalStop =>
Behaviors.stopped
}.receiveSignal {
case (_, PostStop) =>
// If this actor is killed directly, stop anything that we started running explicitly.
Expand All @@ -78,7 +80,7 @@ class UserActor @Inject()(id: String, stocksActor: ActorRef[GetStocks])(implicit
// from the browse), using a coupled sink and source.
Flow.fromSinkAndSourceCoupled(jsonSink, hubSource).watchTermination() { (_, termination) =>
// When the flow shuts down, make sure this actor also stops.
termination.foreach((_: Done) => context.stop(context.self)) // XXX: is self a child?
context.pipeToSelf(termination)((_: Try[Done]) => InternalStop)
NotUsed
}
}
Expand Down Expand Up @@ -160,6 +162,8 @@ object UserActor {
require(symbols.nonEmpty, "Must specify at least one symbol!")
}

private case object InternalStop extends Message

trait Factory {
def apply(id: String): Behavior[Message]
}
Expand Down