Permalink
Browse files

Adding peristence support for FlowTransactions

  • Loading branch information...
atooni committed Nov 30, 2018
1 parent cbe7894 commit 8f6e89fe9dc6816bdb6626373c23c2f9b94c05d5
@@ -28,30 +28,30 @@ class PersistenceServiceJdbc(
t
}
override def deleteByExample(pClass: String, data: ju.Map[String, _ <: AnyRef]): Long = {
override def deleteByExample(pClass: String, data: ju.Map[String, _ <: Any]): Long = {
log.debug(s"About to delete by example pClass: ${pClass}, data: ${data}")
val fields = PersistedField.extractFieldsWithoutDataId(data)
txTemplate.execute { ts =>
dao.deleteByFields(pClass, fields)
}
}
override def findAll(pClass: String): Seq[ju.Map[String, _ <: AnyRef]] = {
override def findAll(pClass: String): Seq[ju.Map[String, _ <: Any]] = {
val classes = txTemplateRo.execute { ts =>
dao.findAll(pClass)
}
classes.map(pc => PersistedField.toJuMap(pc.fields))
}
override def findByExample(pClass: String, data: ju.Map[String, _ <: AnyRef]): Seq[ju.Map[String, _ <: AnyRef]] = {
override def findByExample(pClass: String, data: ju.Map[String, _ <: Any]): Seq[ju.Map[String, _ <: Any]] = {
val fields = PersistedField.extractFieldsWithoutDataId(data)
val classes = txTemplateRo.execute { ts =>
dao.findByFields(pClass, fields)
}
classes.map(pc => PersistedField.toJuMap(pc.fields))
}
override def persist(pClass: String, data: ju.Map[String, _ <: AnyRef]): ju.Map[String, _ <: AnyRef] = {
override def persist(pClass: String, data: ju.Map[String, _ <: Any]): ju.Map[String, _ <: Any] = {
// TODO check if data already contains id
log.debug(s"About to persist class [${pClass}] with data [${data}]")
@@ -19,12 +19,12 @@ import java.{ util => ju }
*/
trait PersistenceService {
def persist(pClass: String, data: ju.Map[String, _ <: AnyRef]): ju.Map[String, _ <: AnyRef]
def persist(pClass: String, data: ju.Map[String, _ <: Any]): ju.Map[String, _ <: Any]
def findAll(pClass: String): Seq[ju.Map[String, _ <: AnyRef]]
def findAll(pClass: String): Seq[ju.Map[String, _ <: Any]]
def findByExample(pClass: String, data: ju.Map[String, _ <: AnyRef]): Seq[ju.Map[String, _ <: AnyRef]]
def findByExample(pClass: String, data: ju.Map[String, _ <: Any]): Seq[ju.Map[String, _ <: Any]]
def deleteByExample(pClass: String, data: ju.Map[String, _ <: AnyRef]): Long
def deleteByExample(pClass: String, data: ju.Map[String, _ <: Any]): Long
}
@@ -6,7 +6,7 @@ import blended.streams.message.MsgProperty
object StreamAssertions {
def verifyHeader(expected: FlowMessageProps, actual: FlowMessageProps)
: List[(String, MsgProperty[_], Option[MsgProperty[_]])] = {
: List[(String, MsgProperty, Option[MsgProperty])] = {
val broken = expected.filter { p =>
actual.get(p._1) match {
@@ -220,7 +220,7 @@ final class JmsAckSourceStage(
scheduleOnce(Poll(session.sessionId), 100.millis)
case Failure(e) =>
fail.invoke(e)
failStage(e)
}
}
@@ -58,7 +58,7 @@ object JmsFlowSupport extends JmsEnvelopeHeader {
val prefix = headerConfig.prefix
val props: Map[String, MsgProperty[_]] = {
val props: Map[String, MsgProperty] = {
val dest = JmsDestination.asString(JmsDestination.create(msg.getJMSDestination()).get)
val delMode = new JmsDeliveryMode(msg.getJMSDeliveryMode()).asString
@@ -81,19 +81,19 @@ object JmsFlowSupport extends JmsEnvelopeHeader {
deliveryModeHeader(prefix) -> delMode
).get
val expireHeaderMap : Map[String, MsgProperty[_]] = msg.getJMSExpiration() match {
val expireHeaderMap : Map[String, MsgProperty] = msg.getJMSExpiration() match {
case 0L => Map.empty
case v => Map(expireHeader(prefix) -> MsgProperty.lift(v).get)
}
val corrIdMap : Map[String, MsgProperty[_]] =
val corrIdMap : Map[String, MsgProperty] =
Option(msg.getJMSCorrelationID()).map( s => corrIdHeader(prefix) -> MsgProperty.lift(s).get).toMap
val props : Map[String, MsgProperty[_]] = msg.getPropertyNames().asScala.map { name =>
val props : Map[String, MsgProperty] = msg.getPropertyNames().asScala.map { name =>
(name.toString -> lift(msg.getObjectProperty(name.toString())).get)
}.toMap
val replyToMap : Map[String, MsgProperty[_]] =
val replyToMap : Map[String, MsgProperty] =
Option(msg.getJMSReplyTo()).map( d => replyToHeader(prefix) -> lift(JmsDestination.create(d).get.asString).get).toMap
props ++ headers ++ expireHeaderMap ++ corrIdMap ++ replyToMap
@@ -5,37 +5,37 @@ import akka.util.ByteString
import scala.util.Try
sealed trait MsgProperty[T] {
def value : T
sealed trait MsgProperty {
def value : Any
override def toString: String = value.toString
}
case class StringMsgProperty(override val value: String) extends MsgProperty[String] {
case class StringMsgProperty(override val value: String) extends MsgProperty {
override def toString: String = "\"" + super.toString + "\""
}
case class IntMsgProperty(override val value: Int) extends MsgProperty[Int]
case class LongMsgProperty(override val value: Long) extends MsgProperty[Long]
case class BooleanMsgProperty(override val value : Boolean) extends MsgProperty[Boolean]
case class ByteMsgProperty(override val value : Byte) extends MsgProperty[Byte]
case class ShortMsgProperty(override val value : Short) extends MsgProperty[Short]
case class FloatMsgProperty(override val value: Float) extends MsgProperty[Float]
case class DoubleMsgProperty(override val value: Double) extends MsgProperty[Double]
case class IntMsgProperty(override val value: Int) extends MsgProperty
case class LongMsgProperty(override val value: Long) extends MsgProperty
case class BooleanMsgProperty(override val value : Boolean) extends MsgProperty
case class ByteMsgProperty(override val value : Byte) extends MsgProperty
case class ShortMsgProperty(override val value : Short) extends MsgProperty
case class FloatMsgProperty(override val value: Float) extends MsgProperty
case class DoubleMsgProperty(override val value: Double) extends MsgProperty
object MsgProperty {
import scala.language.implicitConversions
def apply(s : String) : MsgProperty[String] = StringMsgProperty(s)
def apply(i : Int) : MsgProperty[Int] = IntMsgProperty(i)
def apply(l : Long) : MsgProperty[Long] = LongMsgProperty(l)
def apply(b : Boolean) : MsgProperty[Boolean] = BooleanMsgProperty(b)
def apply(b : Byte) : MsgProperty[Byte] = ByteMsgProperty(b)
def apply(s : Short) : MsgProperty[Short] = ShortMsgProperty(s)
def apply(f : Float) : MsgProperty[Float] = FloatMsgProperty(f)
def apply(d : Double) : MsgProperty[Double] = DoubleMsgProperty(d)
def apply(s : String) : MsgProperty = StringMsgProperty(s)
def apply(i : Int) : MsgProperty = IntMsgProperty(i)
def apply(l : Long) : MsgProperty = LongMsgProperty(l)
def apply(b : Boolean) : MsgProperty = BooleanMsgProperty(b)
def apply(b : Byte) : MsgProperty = ByteMsgProperty(b)
def apply(s : Short) : MsgProperty = ShortMsgProperty(s)
def apply(f : Float) : MsgProperty = FloatMsgProperty(f)
def apply(d : Double) : MsgProperty = DoubleMsgProperty(d)
def lift(o : Any) : Try[MsgProperty[_]] = Try {
def lift(o : Any) : Try[MsgProperty] = Try {
o match {
case s: String => apply(s)
case i: java.lang.Integer => apply(i)
@@ -49,30 +49,21 @@ object MsgProperty {
}
}
def unapply(p: MsgProperty[_]): Any = p match {
case s : StringMsgProperty => s.value
case i : IntMsgProperty => i.value
case l : LongMsgProperty => l.value
case b : BooleanMsgProperty => b.value
case b : ByteMsgProperty => b.value
case s : ShortMsgProperty => s.value
case f : FloatMsgProperty => f.value
case d : DoubleMsgProperty => d.value
}
def unapply(p: MsgProperty): Any = p.value
}
object FlowMessage {
type FlowMessageProps = Map[String, MsgProperty[_]]
type FlowMessageProps = Map[String, MsgProperty]
def props(m :(String, Any)*) : Try[FlowMessageProps] = Try {
m.map { case (k, v) =>
val p : MsgProperty[_] = MsgProperty.lift(v).get
val p : MsgProperty = MsgProperty.lift(v).get
k -> p
}.toMap
}
val noProps : FlowMessageProps = Map.empty[String, MsgProperty[_]]
val noProps : FlowMessageProps = Map.empty[String, MsgProperty]
def apply(props: FlowMessageProps): FlowMessage = BaseFlowMessage(props)
def apply(content : String)(props : FlowMessageProps) : FlowMessage= TextFlowMessage(content, props)
@@ -47,7 +47,7 @@ object FlowTransaction {
val branches : Map[String, List[String]] =
s.split(branchSeparator)
.map(b => b.split("="))
.filter(_.size == 2)
.filter(_.length == 2)
.map{ b =>
b(0) -> b(1).split(stateSeparator).toList
}.toMap
@@ -58,27 +58,8 @@ object FlowTransaction {
FlowTransaction(id = env.id, creationProps = env.flowMessage.header, state = state, worklist = worklistState)
}
}
case class FlowTransaction private [transaction](
id : String,
creationProps : Map[String, MsgProperty[_]],
worklist : Map[String, List[WorklistState]] = Map.empty,
state : FlowTransactionState = FlowTransactionState.Started,
) {
override def toString: String = {
val wlString = worklist.mkString(",")
s"FlowTransaction[$state][$id][$wlString][$creationProps]"
}
val tid : String = id
private[this] val log = Logger[FlowTransaction]
def terminated: Boolean = state == FlowTransactionState.Completed || state == FlowTransactionState.Failed
private def worklistState(wl : Map[String, List[WorklistState]]) : List[FlowTransactionState] = {
private[transaction] def worklistState(currentState : FlowTransactionState, wl : Map[String, List[WorklistState]]) : List[FlowTransactionState] = {
wl.map { case (_,v) =>
if (v.contains(WorklistState.Failed) || v.contains(WorklistState.TimeOut)) {
FlowTransactionState.Failed
@@ -89,14 +70,14 @@ case class FlowTransaction private [transaction](
} else if (v.contains(WorklistState.Completed)) {
FlowTransactionState.Updated
} else {
state
currentState
}
}.toList.distinct
}
private def transactionState(wl : Map[String, List[WorklistState]]): FlowTransactionState = {
private[transaction] def transactionState(currentState : FlowTransactionState, wl : Map[String, List[WorklistState]]): FlowTransactionState = {
val itemStates : List[FlowTransactionState] = worklistState(wl)
val itemStates : List[FlowTransactionState] = FlowTransaction.worklistState(currentState, wl)
if (itemStates.contains(FlowTransactionState.Failed)) {
FlowTransactionState.Failed
@@ -107,9 +88,28 @@ case class FlowTransaction private [transaction](
} else if (itemStates.equals(List(FlowTransactionState.Completed))) {
FlowTransactionState.Completed
} else {
state
currentState
}
}
}
case class FlowTransaction private [transaction](
id : String,
creationProps : Map[String, MsgProperty],
worklist : Map[String, List[WorklistState]] = Map.empty,
state : FlowTransactionState = FlowTransactionState.Started
) {
override def toString: String = {
val wlString = worklist.mkString(",")
s"FlowTransaction[$state][$id][$wlString][$creationProps]"
}
val tid : String = id
private[this] val log = Logger[FlowTransaction]
def terminated: Boolean = state == FlowTransactionState.Completed || state == FlowTransactionState.Failed
def updateTransaction(
event : FlowTransactionEvent
@@ -148,7 +148,7 @@ case class FlowTransaction private [transaction](
val newWorklist : Map[String, List[WorklistState]] =
worklist.filterKeys { id => !updatedItemIds.contains(id) } ++ updatedItemIds
copy(worklist = newWorklist, state = transactionState(newWorklist))
copy(worklist = newWorklist, state = FlowTransaction.transactionState(state, newWorklist))
}
} else {
this
@@ -143,7 +143,7 @@ sealed trait FlowTransactionEvent {
case class FlowTransactionStarted(
override val transactionId : String,
override val properties : Map[String, MsgProperty[_]]
override val properties : Map[String, MsgProperty]
) extends FlowTransactionEvent {
override val state: FlowTransactionState = FlowTransactionState.Started
}
@@ -0,0 +1,100 @@
package blended.streams.transaction
import java.{util => ju}
import blended.persistence.PersistenceService
import blended.streams.message.MsgProperty
import blended.streams.transaction.FlowTransactionState.FlowTransactionState
import blended.streams.worklist.WorklistState
import blended.streams.worklist.WorklistState.WorklistState
import scala.collection.JavaConverters._
import scala.util.Try
class TransactionNotFoundException(id : String)
extends Exception(s"Transaction [$id] not found in persistence store")
class TransactionIdNotUnique(id : String)
extends Exception(s"Transaction [$id] is not unique in persistence store")
class FlowTransactionPersistor(pSvc : PersistenceService) {
private val creationPrefix = "create."
private val worklistPrefix = "worklist."
private val fieldPrefix = "transaction."
private val stateField : String = fieldPrefix + "transactionState"
private val idField : String = fieldPrefix + "transactionId"
private val pClass : String = classOf[FlowTransaction].getName()
private def storeProps(t : FlowTransaction) : ju.Map[String, _ <: Any] = {
val cProps : Map[String, _ <: Any] = t.creationProps.map { case (k,v) =>
creationPrefix + k -> v.value
}
val wlProps : Map[String, _ <: Any] = t.worklist.map { case (k,states) =>
worklistPrefix + k -> states.map(_.toString).mkString(",")
}
val stateProps : Map[String, _ <: Any] = Map(
stateField -> t.state.toString(),
idField -> t.id
)
(cProps ++ wlProps ++ stateProps).asJava
}
private def transaction(storeProps: Map[String, _ <: Any]) : Try[FlowTransaction] = Try {
def property[T](propName : String, props: Map[String, _ <: Any]) : Try[T] = Try {
props.get(propName).map(_.asInstanceOf[T]).get
}
val state : FlowTransactionState = {
val stateString : String = property[String](stateField, storeProps).get
FlowTransactionState.withName(stateString)
}
val creationProps : Map[String, MsgProperty] =
storeProps.filterKeys(_.startsWith(creationPrefix)).map{ case (k,v) =>
k.substring(creationPrefix.length) -> MsgProperty.lift(v).get
}.toMap
val worklist : Map[String, List[WorklistState]] = {
val wlProps = storeProps.filterKeys(_.startsWith(worklistPrefix))
wlProps.map { case (k,v) =>
val states : List[WorklistState] =
v.toString.split(",").map(s => WorklistState.withName(s)).toList
k.substring(worklistPrefix.length) -> states
}
}
val id : String = property[String](idField, storeProps).get
FlowTransaction(
id = id,
creationProps = creationProps,
worklist = worklist,
state = state
)
}
def persistTransaction(t : FlowTransaction) : Try[Unit] = Try {
pSvc.deleteByExample(pClass, Map(idField -> t.id).asJava)
pSvc.persist(pClass, storeProps(t))
}
def restoreTransaction(id : String) : Try[FlowTransaction] = Try {
pSvc.findByExample(pClass, Map(idField -> id).asJava) match {
case Seq() => throw new TransactionNotFoundException(id)
case h :: Seq() => transaction(h.asScala.toMap).get
case _ => throw new TransactionIdNotUnique(id)
}
}
}
Oops, something went wrong.

0 comments on commit 8f6e89f

Please sign in to comment.