Skip to content

Commit

Permalink
+str akka#17579 akka#17617 Initial step for ModelAdapter
Browse files Browse the repository at this point in the history
+ per plugin scoped adapters
+ could be swapped during runtime
- todo not happy yet with initialization (message)?
+ adapters work both ways
+ tests show "tagging"
  • Loading branch information
ktoso committed Jun 21, 2015
1 parent 450a43e commit 40852ff
Show file tree
Hide file tree
Showing 6 changed files with 299 additions and 23 deletions.
Expand Up @@ -4,6 +4,8 @@

package akka.persistence

import akka.persistence.journal.ModelAdapter

import scala.collection.immutable

import akka.actor._
Expand Down Expand Up @@ -45,6 +47,13 @@ private[persistence] object JournalProtocol {
final case class WriteMessages(messages: immutable.Seq[PersistentEnvelope], persistentActor: ActorRef, actorInstanceId: Int)
extends Request

/**
* Register an model adapter for the Journal
* @param adapter to be applied on all events going to this journal
*/
final case class RegisterModelAdapter(adapter: ModelAdapter[Any])
extends Message

/**
* Reply message to a successful [[WriteMessages]] request. This reply is sent to the requestor
* before all subsequent [[WriteMessageSuccess]] replies.
Expand Down
94 changes: 80 additions & 14 deletions akka-persistence/src/main/scala/akka/persistence/Persistence.scala
Expand Up @@ -4,16 +4,20 @@

package akka.persistence

import scala.concurrent.duration._
import com.typesafe.config.Config
import java.util.concurrent.atomic.AtomicReference

import akka.actor._
import akka.dispatch.Dispatchers
import akka.persistence.journal.AsyncWriteJournal
import akka.event.{ Logging, LoggingAdapter }
import akka.persistence.JournalProtocol.RegisterModelAdapter
import akka.persistence.journal.{ IdentityModelAdapter, AsyncWriteJournal, ModelAdapter }
import akka.util.Helpers.ConfigOps
import akka.event.LoggingAdapter
import akka.event.Logging
import java.util.concurrent.atomic.AtomicReference
import com.typesafe.config.Config

import scala.annotation.tailrec
import scala.concurrent.duration._
import scala.util.{ Success, Try }
import scala.util.control.NonFatal

/**
* Persistence configuration.
Expand Down Expand Up @@ -110,7 +114,7 @@ object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider {
def createExtension(system: ExtendedActorSystem): Persistence = new Persistence(system)
def lookup() = Persistence
/** INTERNAL API. */
private[persistence] case class PluginHolder(actor: ActorRef) extends Extension
private[persistence] case class PluginHolder(actor: ActorRef, adapter: Option[ModelAdapter[Any]]) extends Extension
}

/**
Expand Down Expand Up @@ -156,6 +160,32 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
/** Discovered persistence snapshot store plugins. */
private val snapshotPluginExtensionId = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty)

// TODO rethink this
private lazy val IdentityModelAdapter = new IdentityModelAdapter(system)

/**
* TODO
*/
@tailrec final def adapterFor(journalPluginId: String): ModelAdapter[Any] = {
val configPath = if (isEmpty(journalPluginId)) defaultJournalPluginId else journalPluginId
val extensionIdMap = journalPluginExtensionId.get
extensionIdMap.get(configPath) match {
case Some(extensionId)
extensionId(system).adapter.getOrElse(IdentityModelAdapter)
case None
val extensionId = new ExtensionId[PluginHolder] {
override def createExtension(system: ExtendedActorSystem): PluginHolder = {
val plugin = createPlugin(configPath)(journalDispatchSelector)
val adapter = createAdapter(configPath) // TODO solve some other way
adapter foreach { plugin ! RegisterModelAdapter(_) }
PluginHolder(plugin, adapter)
}
}
journalPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId))
adapterFor(journalPluginId) // Recursive invocation.
}
}

/**
* Returns a journal plugin actor identified by `journalPluginId`.
* When empty, looks in `akka.persistence.journal.plugin` to find configuration entry path.
Expand All @@ -170,8 +200,12 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
extensionId(system).actor
case None
val extensionId = new ExtensionId[PluginHolder] {
override def createExtension(system: ExtendedActorSystem): PluginHolder =
PluginHolder(createPlugin(configPath)(journalDispatchSelector))
override def createExtension(system: ExtendedActorSystem): PluginHolder = {
val plugin = createPlugin(configPath)(journalDispatchSelector)
val adapter = createAdapter(configPath) // TODO solve some other way
adapter foreach { plugin ! RegisterModelAdapter(_) }
PluginHolder(plugin, adapter)
}
}
journalPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId))
journalFor(journalPluginId) // Recursive invocation.
Expand All @@ -192,8 +226,12 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
extensionId(system).actor
case None
val extensionId = new ExtensionId[PluginHolder] {
override def createExtension(system: ExtendedActorSystem): PluginHolder =
PluginHolder(createPlugin(configPath)(snapshotDispatchSelector))
override def createExtension(system: ExtendedActorSystem): PluginHolder = {
val plugin = createPlugin(configPath)(journalDispatchSelector)
val adapter = createAdapter(configPath) // TODO solve some other way
adapter foreach { plugin ! RegisterModelAdapter(_) }
PluginHolder(plugin, adapter)
}
}
snapshotPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId))
snapshotStoreFor(snapshotPluginId) // Recursive invocation.
Expand All @@ -202,19 +240,47 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {

private def createPlugin(configPath: String)(dispatcherSelector: Class[_] String) = {
require(!isEmpty(configPath) && system.settings.config.hasPath(configPath),
s"'reference.conf' is missing persistence plugin config path: '${configPath}'")
s"'reference.conf' is missing persistence plugin config path: '$configPath'")
val pluginActorName = configPath
val pluginConfig = system.settings.config.getConfig(configPath)
val pluginClassName = pluginConfig.getString("class")
log.debug(s"Create plugin: ${pluginActorName} ${pluginClassName}")
val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get
log.debug(s"Create plugin: $pluginActorName $pluginClassName")
val pluginClass = system.dynamicAccess.getClassFor[Any](pluginClassName).get
val pluginInjectConfig = if (pluginConfig.hasPath("inject-config")) pluginConfig.getBoolean("inject-config") else false
val pluginDispatcherId = if (pluginConfig.hasPath("plugin-dispatcher")) pluginConfig.getString("plugin-dispatcher") else dispatcherSelector(pluginClass)
val pluginActorArgs = if (pluginInjectConfig) List(pluginConfig) else Nil
val pluginActorProps = Props(Deploy(dispatcher = pluginDispatcherId), pluginClass, pluginActorArgs)
system.systemActorOf(pluginActorProps, pluginActorName)
}

private def createAdapter(configPath: String): Option[ModelAdapter[Any]] = {
val pluginConfig = system.settings.config.getConfig(configPath)
if (!pluginConfig.hasPath("adapter")) None
else {
val adapterClassName = pluginConfig.getString("adapter")
if (isEmpty(adapterClassName)) None
else
try {
log.debug(s"Create model adapter: $adapterClassName")
val actorSystemClazz = classOf[ActorSystem]
val clazz = system.dynamicAccess.getClassFor[ModelAdapter[Any]](adapterClassName).get

Try(clazz.getConstructor(actorSystemClazz)) match {
case Success(_)
system.dynamicAccess.createInstanceFor[ModelAdapter[Any]](adapterClassName, (actorSystemClazz, system) :: Nil).toOption
case _
system.dynamicAccess.createInstanceFor[ModelAdapter[Any]](adapterClassName, Nil).toOption
}
} catch {
case NonFatal(ex)
log.error(ex, "Unable to create ModelAdapter for [{}] for config at: {}", adapterClassName, configPath)
System.err.println(system.settings.config.getConfig(configPath))
None
}
}

}

/** Creates a canonical persistent actor id from a persistent actor ref. */
def persistenceId(persistentActor: ActorRef): String = id(persistentActor)

Expand Down
Expand Up @@ -47,7 +47,9 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
// Send replayed messages and replay result to persistentActor directly. No need
// to resequence replayed messages relative to written and looped messages.
asyncReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max) { p
if (!p.deleted || replayDeleted) persistentActor.tell(ReplayedMessage(p), p.sender)
if (!p.deleted || replayDeleted) {
persistentActor.tell(ReplayedMessage(adaptFromJournal(p)), p.sender)
}
} map {
case _ ReplayMessagesSuccess
} recover {
Expand All @@ -68,6 +70,8 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
case Success(_) if (publish) context.system.eventStream.publish(d)
case Failure(e)
}
case RegisterModelAdapter(adapter)
modelAdapter = adapter
}

//#journal-plugin-api
Expand Down
@@ -0,0 +1,27 @@
/*
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/

package akka.persistence.journal

import akka.actor.ActorSystem
import akka.event.Logging

trait ModelAdapter[JournalModel] {
//#model-adapter-api
def toJournal(event: Any): JournalModel

def fromJournal(event: JournalModel): Any
//#model-adapter-api
}

/** No-op model adapter which passes through the incoming events as-is. */
final class IdentityModelAdapter(val system: ActorSystem) extends ModelAdapter[Any] {
val log = Logging(system, "hello")

override def toJournal(event: Any): Any = {
log.warning("Adapting event: {}", event)
event
}
override def fromJournal(event: Any): Any = event
}
Expand Up @@ -11,14 +11,24 @@ import scala.collection.immutable
private[akka] trait WriteJournalBase {
this: Actor

/** INTERNAL API */
protected[akka] var modelAdapter: ModelAdapter[Any] = _

protected def preparePersistentBatch(rb: immutable.Seq[PersistentEnvelope]): immutable.Seq[PersistentRepr] =
rb.filter(persistentPrepareWrite).asInstanceOf[immutable.Seq[PersistentRepr]] // filter instead of flatMap to avoid Some allocations

private def persistentPrepareWrite(r: PersistentEnvelope): Boolean = r match {
case p: PersistentRepr
p.prepareWrite(); true
case _
false
}
rb.collect {
case p: PersistentRepr
p.prepareWrite()
adaptToJournal(p)
} // filter instead of flatMap to avoid Some allocations

/** INTERNAL API */
private[akka] final def adaptFromJournal(repr: PersistentRepr): PersistentRepr =
if (modelAdapter ne null) repr.withPayload(modelAdapter.fromJournal(repr.payload))
else repr

/** INTERNAL API */
private[akka] final def adaptToJournal(repr: PersistentRepr): PersistentRepr =
if (modelAdapter ne null) repr.withPayload(modelAdapter.toJournal(repr.payload))
else repr

}

0 comments on commit 40852ff

Please sign in to comment.