Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DeleteMessage Sink shuts down after one command #10

Closed
Bathtor opened this issue Sep 14, 2018 · 8 comments
Closed

DeleteMessage Sink shuts down after one command #10

Bathtor opened this issue Sep 14, 2018 · 8 comments

Comments

@Bathtor
Copy link

Bathtor commented Sep 14, 2018

This is more a request for help, than a bug (I think...you tell me).

I'm trying to also delete the original command message when it has been processed successfully and a response has been generated.
To that end I adapted the code from the core example in the following way:

class SayAsCmd(requests: RequestHelper) extends Actor with ActorLogging {
  import SayAsCmd._

  implicit val askTimeout = Timeout(5.seconds)
  implicit val system: ActorSystem = context.system;
  import requests.mat;
  import context.dispatcher;

  private val messageGraph = Sink.fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._;

    val unzip = builder.add(Unzip[DeleteMessage[NotUsed], List[CreateMessage[NotUsed]]]);

    val msgSink = requests.sinkIgnore[RawMessage, NotUsed];
    val deleteSink = requests.sinkIgnore[NotUsed, NotUsed];

    unzip.out0.log("DELETE MSG", dm => dm.messageId) ~> deleteSink;
    unzip.out1.mapConcat(identity).log("REPLY MSG", cm => cm.bodyForLogging) ~> msgSink;
    SinkShape.of(unzip.in)
  });

  private val msgQueue = Source.queue[(ActorRef, SayAsMessage, ChannelId, MessageId)](32, OverflowStrategy.backpressure)
    .mapAsyncUnordered(parallelism = 1000)((t: (ActorRef, SayAsMessage, ChannelId, MessageId)) =>
      (t._1 ? t._2).mapTo[List[CreateMessageData]].map(l => (DeleteMessage(t._3, t._4) -> l.map(CreateMessage(t._3, _)))))
    .to(messageGraph).run();

  private val serverHandlers = mutable.Map.empty[GuildId, ActorRef];
  private val fallbackHandler = context.actorOf(Props(new SayAsActor(None)));

  override def receive: Receive = {
    case InitAck => {
      log.debug("Got Init");
      sendAck(sender);
    }
    case ParsedCmd(msg, cmd @ CharacterArgs.Register(name, avatar), _, _) => {
      log.debug(s"Received register command: $cmd");
      routeToHandler(msg, RegisterCharacter(name, avatar));
    }
    case ParsedCmd(msg, cmd @ CharacterArgs.Unregister(name), _, _) => {
      log.debug(s"Received unregister command: $cmd");
      routeToHandler(msg, UnregisterCharacter(name));
    }
    case ParsedCmd(msg, cmd @ CharacterArgs.ListAll, _, _) => {
      log.debug(s"Received list command: $cmd");
      routeToHandler(msg, ListCharacters);
    }
    case ParsedCmd(msg, cmd @ SayArgs.Say(author, content), _, _) => {
      log.debug(s"Received sayas command: $cmd");
      routeToHandler(msg, SayAs(author, content));
    }
    case x => {
      log.warning(s"Received unexpected message: $x");
      sendAck(sender);
    }
  }

  def routeToHandler(orig: Message, msg: SayAsMessage): Unit = {
    // TODO routing
    val msgSender = sender();
    msgQueue.offer((fallbackHandler, msg, orig.channelId, orig.id)).onComplete(_ => sendAck(msgSender))
  }
  def sendAck(sender: ActorRef): Unit = sender ! SayAsCmd.Ack;
}

This approach works for the first command after I start the bot, and afterwards the delete path seems to shut down and only the reply path works. (I added the log elements to the flow graph and the last message from the delete flow is [DELETE MSG] Downstream finished.)

a) Is this intended behaviour?
b) How do I work around it?
c) Is there a better way to sink both CreateMessage and DeleteMessage events into the requests queue, without splitting the stream like this?

@Katrix
Copy link
Owner

Katrix commented Sep 14, 2018

Can you show me you command object? Most likely the issues lie in there somewhere. I've been dealing with this myself recently for a retry flow. From my testing, the cause is normally a Source.single together with a flatMapConcat somewhere. Why it happens I can't say though. Most likely some Akka. I've been able to work around it for my stuff by avoiding Source.single together with flatMapConcat.

As for if there is a better way to do this, have you looked at the alsoTo method on flow? You could also just be lazy and do one operation after the other (take advantage of the request context and the withContext method on Request to simplify this).

Going to leave this open until I either find a solution to this problem, and know the root cause for why it happens and what can be done to avoid it.

@Bathtor
Copy link
Author

Bathtor commented Sep 14, 2018

I updated the original post with the full actor. The relevant command factories are:

def charCmdFactory(sayAsActor: ActorRef): ParsedCmdFactory[F, SayAsCmd.CharacterArgs, NotUsed] =
    ParsedCmdFactory[F, SayAsCmd.CharacterArgs, NotUsed](
      refiner = CmdInfo[F](
        prefix = Categories.adminCommands,
        aliases = Seq("char"),
        filters = Seq(ByUser(UserId(Main.adminId)))),
      sink = _ => Sink.actorRefWithAck(
        ref = sayAsActor,
        onInitMessage = SayAsCmd.InitAck,
        ackMessage = SayAsCmd.Ack,
        onCompleteMessage = PoisonPill),
      description = Some(CmdDescription(
        name = "Character Registry",
        description = "Add/Remove/List Characters",
        usage = "register <name> [<avatar url>]|unregister <name>|list")));

def sayasCmdFactory(sayAsActor: ActorRef): ParsedCmdFactory[F, SayAsCmd.SayArgs, NotUsed] =
    ParsedCmdFactory[F, SayAsCmd.SayArgs, NotUsed](
      refiner = CmdInfo[F](
        prefix = Categories.generalCommands,
        aliases = Seq("sayas")),
      sink = _ => Sink.actorRefWithAck(
        ref = sayAsActor,
        onInitMessage = SayAsCmd.InitAck,
        ackMessage = SayAsCmd.Ack,
        onCompleteMessage = PoisonPill),
      description = Some(CmdDescription(
        name = "SayAs",
        description = "Speak as a registered character",
        usage = "<name> <text>")));

I'm not using Source.single anywhere, I think, unless there is somehow one hidden in mapAsyncUnordered?

I was originally planning to use alsoTo with filter, but then I found Unzip which fit my use-case much better. Doing them lazily is not so easy, I think. I only want the delete to occur if the reply is happening as well (so that a semantically invalid command will not get deleted), and I also need to keep the required state around to create the correct delete object (i.e. messageId and channelId).

@Bathtor
Copy link
Author

Bathtor commented Sep 14, 2018

I also find it curious, that it's always the delete flow that fails, even if I switch the positions in the Unzip. I mean the two branches are essentially identical except for the mapConcat. The sinks have different serialisation types, but apart from that they are the same implementation, aren't they? Why would one misbehave and the other work fine?

Edit: I tried unzip.out1.mapConcat(dm => List(dm)).log("DELETE MSG", dm => dm.messageId) ~> deleteSink; just to make sure that the mapConcat wasn't the difference, and indeed the behaviour persists.

@Katrix
Copy link
Owner

Katrix commented Sep 14, 2018

My only guess would be that mapConcat has some of the same logic to cause the same error. As for what I said above, when Is aid being lazy I meant more not worry about doing things properly. As for keeping the required state around, that's what withContext is for.

As for the types being different. Yep, that's the only difference. On master it's just the same flow/sink casted to different types.

@Bathtor
Copy link
Author

Bathtor commented Sep 14, 2018

Is there a common supertype I could use for a sink that would make it possible to use a single flow for both CreateMessage and DeleteMessage?

@Katrix
Copy link
Owner

Katrix commented Sep 14, 2018

Request[_, _], BaseRESTRequest[_, _, _] and RESTRequest[_, _, _, _] would all fit.

@Bathtor
Copy link
Author

Bathtor commented Sep 14, 2018

Ok, I made a single flow now for both message types like this:

private val msgQueue = Source.queue[(ActorRef, SayAsMessage, ChannelId, MessageId)](32, OverflowStrategy.backpressure)
    .mapAsyncUnordered(parallelism = 1000)((t: (ActorRef, SayAsMessage, ChannelId, MessageId)) =>
      (t._1 ? t._2).mapTo[List[CreateMessageData]].map(l => (DeleteMessage(t._3, t._4) :: l.map(CreateMessage(t._3, _)))))
    .mapConcat(identity)
    .to(requests.sinkIgnore[Any, NotUsed]).run();

But once again only the first exchange works, and then the flow simply stops executing any requests (i.e. neither DeleteMessage nor CreateMessage are executed after the first time, when both are successfully executed). I don't know why, but DeleteMessage somehow breaks the sink, without throwing any errors that I can see (I'll attach my logging output, maybe you see something I've overlooked). I don't see this behaviour if the flow consists only of CreateMessage requests. In that case it keeps working correctly forever.
output.log
Edit: I'm using a local build of master, btw, not 0.10.0.

@Katrix
Copy link
Owner

Katrix commented Apr 21, 2020

Retry requests fixes on master. Not sure how much it was ever related to this bug, but that should be the last "Stream stopped for some reason" kind of bug.

@Katrix Katrix closed this as completed Apr 21, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants