diff --git a/modules/akka-persistence-runtime/src/main/scala/aecor/runtime/akkapersistence/AkkaPersistenceRuntime.scala b/modules/akka-persistence-runtime/src/main/scala/aecor/runtime/akkapersistence/AkkaPersistenceRuntime.scala index f7c76c17..37c1414f 100644 --- a/modules/akka-persistence-runtime/src/main/scala/aecor/runtime/akkapersistence/AkkaPersistenceRuntime.scala +++ b/modules/akka-persistence-runtime/src/main/scala/aecor/runtime/akkapersistence/AkkaPersistenceRuntime.scala @@ -7,6 +7,7 @@ import aecor.encoding.{ KeyDecoder, KeyEncoder } import aecor.encoding.WireProtocol import aecor.encoding.WireProtocol.Encoded import aecor.runtime.akkapersistence.AkkaPersistenceRuntime._ +import aecor.runtime.akkapersistence.AkkaPersistenceRuntimeActor.CommandResult import aecor.runtime.akkapersistence.readside.{ AkkaPersistenceEventJournalQuery, JournalQuery } import aecor.runtime.akkapersistence.serialization.{ Message, PersistentDecoder, PersistentEncoder } import aecor.util.effect._ @@ -85,9 +86,9 @@ class AkkaPersistenceRuntime[O] private[akkapersistence] (system: ActorSystem, Effect[F] .fromFuture { (regionRef ? EntityCommand(keyEncoder(key), bytes.asReadOnlyBuffer())) - .asInstanceOf[Future[ByteBuffer]] + .asInstanceOf[Future[CommandResult]] } - .map(responseDecoder.decode) + .map(result => responseDecoder.decode(result.bytes)) .flatMap(F.fromEither(_)) } }) diff --git a/modules/akka-persistence-runtime/src/main/scala/aecor/runtime/akkapersistence/AkkaPersistenceRuntimeActor.scala b/modules/akka-persistence-runtime/src/main/scala/aecor/runtime/akkapersistence/AkkaPersistenceRuntimeActor.scala index 0da8b3f6..b4ffb555 100644 --- a/modules/akka-persistence-runtime/src/main/scala/aecor/runtime/akkapersistence/AkkaPersistenceRuntimeActor.scala +++ b/modules/akka-persistence-runtime/src/main/scala/aecor/runtime/akkapersistence/AkkaPersistenceRuntimeActor.scala @@ -10,7 +10,7 @@ import io.aecor.liberator.Invocation import aecor.data._ import aecor.util.effect._ import aecor.encoding.{ KeyDecoder, WireProtocol } -import aecor.runtime.akkapersistence.AkkaPersistenceRuntimeActor.HandleCommand +import aecor.runtime.akkapersistence.AkkaPersistenceRuntimeActor.{ HandleCommand, CommandResult } import aecor.runtime.akkapersistence.SnapshotPolicy.{ EachNumberOfEvents, Never } import aecor.runtime.akkapersistence.serialization.{ Message, @@ -59,6 +59,7 @@ private[akkapersistence] object AkkaPersistenceRuntimeActor { ) final case class HandleCommand(commandBytes: ByteBuffer) extends Message + final case class CommandResult(bytes: ByteBuffer) extends Message case object Stop } @@ -88,7 +89,7 @@ private[akkapersistence] final class AkkaPersistenceRuntimeActor[M[_[_]], F[_]: import context.dispatcher - private case class CommandResult(opId: UUID, events: Seq[Event], resultBytes: ByteBuffer) + private case class ActionResult(opId: UUID, events: Seq[Event], resultBytes: ByteBuffer) private val idString: String = URLDecoder.decode(self.path.name, StandardCharsets.UTF_8.name()) @@ -161,7 +162,7 @@ private[akkapersistence] final class AkkaPersistenceRuntimeActor[M[_[_]], F[_]: passivate() case AkkaPersistenceRuntimeActor.Stop => context.stop(self) - case CommandResult(opId, events, result) => + case ActionResult(opId, events, result) => log.warning( "[{}] Received result of unknown command invocation [{}], ignoring", persistenceId, @@ -193,19 +194,12 @@ private[akkapersistence] final class AkkaPersistenceRuntimeActor[M[_[_]], F[_]: result, events ) - CommandResult(opId, events, resultEncoder.encode(result)) + ActionResult(opId, events, resultEncoder.encode(result)) } .pipeTo(self)(sender) context.become { - case CommandResult(`opId`, events, reply) => - log.debug( - "[{}] Command [{}] produced reply [{}] and events [{}]", - persistenceId, - invocation, - reply, - events - ) - handleCommandResult(events, reply) + case ActionResult(`opId`, events, reply) => + handleCommandResult(events, CommandResult(reply)) unstashAll() context.become(receiveCommand) case Status.Failure(e) => @@ -217,9 +211,9 @@ private[akkapersistence] final class AkkaPersistenceRuntimeActor[M[_[_]], F[_]: } } - private def handleCommandResult[A](events: Seq[Event], replyBytes: ByteBuffer): Unit = + private def handleCommandResult[A](events: Seq[Event], response: CommandResult): Unit = if (events.isEmpty) { - sender() ! replyBytes + sender() ! response } else { val envelopes = events.map(e => Tagged(eventEncoder.encode(e), tagger.tag(id).map(_.value))) @@ -229,7 +223,7 @@ private[akkapersistence] final class AkkaPersistenceRuntimeActor[M[_[_]], F[_]: var unpersistedEventCount = events.size if (unpersistedEventCount == 1) { persist(envelopes.head) { _ => - sender() ! replyBytes + sender() ! response eventCount += 1 markSnapshotAsPendingIfNeeded() snapshotIfPending() @@ -240,7 +234,7 @@ private[akkapersistence] final class AkkaPersistenceRuntimeActor[M[_[_]], F[_]: eventCount += 1 markSnapshotAsPendingIfNeeded() if (unpersistedEventCount == 0) { - sender() ! replyBytes + sender() ! response snapshotIfPending() } } diff --git a/modules/akka-persistence-runtime/src/main/scala/aecor/runtime/akkapersistence/serialization/MessageSerializer.scala b/modules/akka-persistence-runtime/src/main/scala/aecor/runtime/akkapersistence/serialization/MessageSerializer.scala index 0c2ef097..a354575f 100644 --- a/modules/akka-persistence-runtime/src/main/scala/aecor/runtime/akkapersistence/serialization/MessageSerializer.scala +++ b/modules/akka-persistence-runtime/src/main/scala/aecor/runtime/akkapersistence/serialization/MessageSerializer.scala @@ -3,7 +3,7 @@ package aecor.runtime.akkapersistence.serialization import java.nio.ByteBuffer import aecor.runtime.akkapersistence.AkkaPersistenceRuntime.EntityCommand -import aecor.runtime.akkapersistence.AkkaPersistenceRuntimeActor.HandleCommand +import aecor.runtime.akkapersistence.AkkaPersistenceRuntimeActor.{ HandleCommand, CommandResult } import akka.actor.ExtendedActorSystem import akka.serialization.{ BaseSerializer, SerializerWithStringManifest } import com.google.protobuf.ByteString @@ -16,22 +16,27 @@ class MessageSerializer(val system: ExtendedActorSystem) val HandleCommandManifest = "A" val EntityCommandManifest = "B" + val CommandResultManifest = "C" private val fromBinaryMap = HashMap[String, Array[Byte] ⇒ AnyRef]( HandleCommandManifest -> handleCommandFromBinary, - EntityCommandManifest -> entityCommandFromBinary + EntityCommandManifest -> entityCommandFromBinary, + CommandResultManifest -> handleResponseFromBinary ) override def manifest(o: AnyRef): String = o match { case HandleCommand(_) => HandleCommandManifest case EntityCommand(_, _) => EntityCommandManifest + case CommandResult(_) => CommandResultManifest case x => throw new IllegalArgumentException(s"Serialization of [$x] is not supported") } override def toBinary(o: AnyRef): Array[Byte] = o match { case x @ HandleCommand(_) => x.commandBytes.array() + case x @ CommandResult(_) => + x.bytes.array() case x @ EntityCommand(_, _) => entityCommandToBinary(x) case x => throw new IllegalArgumentException(s"Serialization of [$x] is not supported") @@ -54,4 +59,7 @@ class MessageSerializer(val system: ExtendedActorSystem) private def handleCommandFromBinary(bytes: Array[Byte]): HandleCommand = HandleCommand(ByteBuffer.wrap(bytes)) + + private def handleResponseFromBinary(bytes: Array[Byte]): CommandResult = + CommandResult(ByteBuffer.wrap(bytes)) }