Skip to content

Commit

Permalink
Fix #36 (#38)
Browse files Browse the repository at this point in the history
  • Loading branch information
notxcain committed Mar 29, 2018
1 parent e1a82ea commit a0f1353
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 21 deletions.
Expand Up @@ -7,6 +7,7 @@ import aecor.encoding.{ KeyDecoder, KeyEncoder }
import aecor.encoding.WireProtocol
import aecor.encoding.WireProtocol.Encoded
import aecor.runtime.akkapersistence.AkkaPersistenceRuntime._
import aecor.runtime.akkapersistence.AkkaPersistenceRuntimeActor.CommandResult
import aecor.runtime.akkapersistence.readside.{ AkkaPersistenceEventJournalQuery, JournalQuery }
import aecor.runtime.akkapersistence.serialization.{ Message, PersistentDecoder, PersistentEncoder }
import aecor.util.effect._
Expand Down Expand Up @@ -85,9 +86,9 @@ class AkkaPersistenceRuntime[O] private[akkapersistence] (system: ActorSystem,
Effect[F]
.fromFuture {
(regionRef ? EntityCommand(keyEncoder(key), bytes.asReadOnlyBuffer()))
.asInstanceOf[Future[ByteBuffer]]
.asInstanceOf[Future[CommandResult]]
}
.map(responseDecoder.decode)
.map(result => responseDecoder.decode(result.bytes))
.flatMap(F.fromEither(_))
}
})
Expand Down
Expand Up @@ -10,7 +10,7 @@ import io.aecor.liberator.Invocation
import aecor.data._
import aecor.util.effect._
import aecor.encoding.{ KeyDecoder, WireProtocol }
import aecor.runtime.akkapersistence.AkkaPersistenceRuntimeActor.HandleCommand
import aecor.runtime.akkapersistence.AkkaPersistenceRuntimeActor.{ HandleCommand, CommandResult }
import aecor.runtime.akkapersistence.SnapshotPolicy.{ EachNumberOfEvents, Never }
import aecor.runtime.akkapersistence.serialization.{
Message,
Expand Down Expand Up @@ -59,6 +59,7 @@ private[akkapersistence] object AkkaPersistenceRuntimeActor {
)

final case class HandleCommand(commandBytes: ByteBuffer) extends Message
final case class CommandResult(bytes: ByteBuffer) extends Message
case object Stop
}

Expand Down Expand Up @@ -88,7 +89,7 @@ private[akkapersistence] final class AkkaPersistenceRuntimeActor[M[_[_]], F[_]:

import context.dispatcher

private case class CommandResult(opId: UUID, events: Seq[Event], resultBytes: ByteBuffer)
private case class ActionResult(opId: UUID, events: Seq[Event], resultBytes: ByteBuffer)

private val idString: String =
URLDecoder.decode(self.path.name, StandardCharsets.UTF_8.name())
Expand Down Expand Up @@ -161,7 +162,7 @@ private[akkapersistence] final class AkkaPersistenceRuntimeActor[M[_[_]], F[_]:
passivate()
case AkkaPersistenceRuntimeActor.Stop =>
context.stop(self)
case CommandResult(opId, events, result) =>
case ActionResult(opId, events, result) =>
log.warning(
"[{}] Received result of unknown command invocation [{}], ignoring",
persistenceId,
Expand Down Expand Up @@ -193,19 +194,12 @@ private[akkapersistence] final class AkkaPersistenceRuntimeActor[M[_[_]], F[_]:
result,
events
)
CommandResult(opId, events, resultEncoder.encode(result))
ActionResult(opId, events, resultEncoder.encode(result))
}
.pipeTo(self)(sender)
context.become {
case CommandResult(`opId`, events, reply) =>
log.debug(
"[{}] Command [{}] produced reply [{}] and events [{}]",
persistenceId,
invocation,
reply,
events
)
handleCommandResult(events, reply)
case ActionResult(`opId`, events, reply) =>
handleCommandResult(events, CommandResult(reply))
unstashAll()
context.become(receiveCommand)
case Status.Failure(e) =>
Expand All @@ -217,9 +211,9 @@ private[akkapersistence] final class AkkaPersistenceRuntimeActor[M[_[_]], F[_]:
}
}

private def handleCommandResult[A](events: Seq[Event], replyBytes: ByteBuffer): Unit =
private def handleCommandResult[A](events: Seq[Event], response: CommandResult): Unit =
if (events.isEmpty) {
sender() ! replyBytes
sender() ! response
} else {
val envelopes =
events.map(e => Tagged(eventEncoder.encode(e), tagger.tag(id).map(_.value)))
Expand All @@ -229,7 +223,7 @@ private[akkapersistence] final class AkkaPersistenceRuntimeActor[M[_[_]], F[_]:
var unpersistedEventCount = events.size
if (unpersistedEventCount == 1) {
persist(envelopes.head) { _ =>
sender() ! replyBytes
sender() ! response
eventCount += 1
markSnapshotAsPendingIfNeeded()
snapshotIfPending()
Expand All @@ -240,7 +234,7 @@ private[akkapersistence] final class AkkaPersistenceRuntimeActor[M[_[_]], F[_]:
eventCount += 1
markSnapshotAsPendingIfNeeded()
if (unpersistedEventCount == 0) {
sender() ! replyBytes
sender() ! response
snapshotIfPending()
}
}
Expand Down
Expand Up @@ -3,7 +3,7 @@ package aecor.runtime.akkapersistence.serialization
import java.nio.ByteBuffer

import aecor.runtime.akkapersistence.AkkaPersistenceRuntime.EntityCommand
import aecor.runtime.akkapersistence.AkkaPersistenceRuntimeActor.HandleCommand
import aecor.runtime.akkapersistence.AkkaPersistenceRuntimeActor.{ HandleCommand, CommandResult }
import akka.actor.ExtendedActorSystem
import akka.serialization.{ BaseSerializer, SerializerWithStringManifest }
import com.google.protobuf.ByteString
Expand All @@ -16,22 +16,27 @@ class MessageSerializer(val system: ExtendedActorSystem)

val HandleCommandManifest = "A"
val EntityCommandManifest = "B"
val CommandResultManifest = "C"

private val fromBinaryMap =
HashMap[String, Array[Byte] AnyRef](
HandleCommandManifest -> handleCommandFromBinary,
EntityCommandManifest -> entityCommandFromBinary
EntityCommandManifest -> entityCommandFromBinary,
CommandResultManifest -> handleResponseFromBinary
)

override def manifest(o: AnyRef): String = o match {
case HandleCommand(_) => HandleCommandManifest
case EntityCommand(_, _) => EntityCommandManifest
case CommandResult(_) => CommandResultManifest
case x => throw new IllegalArgumentException(s"Serialization of [$x] is not supported")
}

override def toBinary(o: AnyRef): Array[Byte] = o match {
case x @ HandleCommand(_) =>
x.commandBytes.array()
case x @ CommandResult(_) =>
x.bytes.array()
case x @ EntityCommand(_, _) =>
entityCommandToBinary(x)
case x => throw new IllegalArgumentException(s"Serialization of [$x] is not supported")
Expand All @@ -54,4 +59,7 @@ class MessageSerializer(val system: ExtendedActorSystem)

private def handleCommandFromBinary(bytes: Array[Byte]): HandleCommand =
HandleCommand(ByteBuffer.wrap(bytes))

private def handleResponseFromBinary(bytes: Array[Byte]): CommandResult =
CommandResult(ByteBuffer.wrap(bytes))
}

0 comments on commit a0f1353

Please sign in to comment.