Komandante is CQRS / ES toolkit inspired by Eventhorizon and Eventhus
kotlin
ktor
exposed
komandante
https://github.com/hasanozgan/komandante-todomvc
val messageBus = newMessageBusWithLocalAdapter()
val commandBus = newCommandBus(messageBus)
val eventBus = newEventBus(messageBus)
val eventStore = newEventStoreWithExposedAdapter()
val aggregateHandler = AggregateHandler(eventStore, eventBus)
commandBus.registerAggregate(aggregateHandler, BankAccountAggregateFactory())
commandBus.subscribe<NotificationCommand> {
println("SAGA COMMAND: ${it}")
}
val bankAccountProjector = BankAccountProjector()
val projectorEventHandler = ProjectorEventHandler(bankAccountProjector, commandBus)
eventBus.addHandler(projectorEventHandler)
val bankAccountWorkflow = BankAccountWorkflow()
val sagaEventHandler = SagaEventHandler(bankAccountWorkflow, commandBus)
eventBus.addHandler(sagaEventHandler)
sealed class BankAccountCommand(override val aggregateID: AggregateID) : Command()
data class CreateAccount(val accountID: AggregateID, val owner: String) : BankAccountCommand(accountID)
data class ChangeOwner(val accountID: AggregateID, val owner: String) : BankAccountCommand(accountID)
data class PerformDeposit(val accountID: AggregateID, val amount: Double) : BankAccountCommand(accountID)
data class PerformWithdrawal(val accountID: AggregateID, val amount: Double) : BankAccountCommand(accountID)
sealed class NotificationCommand() : Command() {
override val aggregateID: AggregateID
get() = newAggregateID()
}
data class SendMessage(val message: String) : NotificationCommand()
sealed class BankAccountEvent(override val aggregateID: AggregateID) : Event()
data class AccountCreated(val accountID: AggregateID, val owner: String) : BankAccountEvent(accountID)
data class DepositPerformed(val accountID: AggregateID, val amount: Double) : BankAccountEvent(accountID)
data class OwnerChanged(val accountID: AggregateID, val owner: String) : BankAccountEvent(accountID)
data class WithdrawalPerformed(val accountID: AggregateID, val amount: Double) : BankAccountEvent(accountID)
sealed class NotificationEvent(override val aggregateID: AggregateID) : Event()
data class MessageSent(val messageID: AggregateID, val message: String) : NotificationEvent(messageID)
class BankAccountAggregate(override var id: AggregateID) : Aggregate() {
var owner: String = "not/assigned"
var balance: Double = 0.0
fun handle(command: CreateAccount): Validated<DomainError, Event> {
return Valid(AccountCreated(command.aggregateID, command.owner))
}
fun handle(command: PerformDeposit): Event {
return DepositPerformed(command.aggregateID, command.amount)
}
fun handle(command: ChangeOwner): Event {
return OwnerChanged(command.aggregateID, command.owner)
}
fun handle(command: PerformWithdrawal): Validated<DomainError, Event> {
if (balance < command.amount) {
return Invalid(InsufficientBalanceError)
}
return Valid(WithdrawalPerformed(command.aggregateID, command.amount))
}
fun apply(event: AccountCreated) {
this.owner = event.owner
}
fun apply(event: OwnerChanged) {
this.owner = event.owner
}
fun apply(event: DepositPerformed) {
this.balance = this.balance.plus(event.amount)
}
fun apply(event: WithdrawalPerformed) {
this.balance = this.balance.minus(event.amount)
}
}
class BankAccountAggregateFactory : AggregateFactory<BankAccountCommand, BankAccountEvent> {
override fun create(aggregateID: AggregateID): Aggregate {
return BankAccountAggregate(aggregateID)
}
}
class BankAccountProjector : Projector<BankAccountEvent> {
private val logger = LoggerFactory.getLogger(javaClass)
override fun project(event: Event): Option<Command> {
println(DomainError("Event ${event} is not projected"))
return none()
}
fun project(event: AccountCreated) {
transaction {
val query = BankAccounts.select { aggregateID.eq(event.aggregateID) }
if (query.empty()) {
BankAccounts.insert {
it[aggregateID] = event.aggregateID
it[owner] = event.owner
it[balance] = 0.0
it[updatedOn] = DateTime.now()
it[version] = event.version
}
commit()
} else {
logger.error("account is created before")
}
}
}
fun project(event: DepositPerformed) {
transaction {
BankAccounts.select { aggregateID.eq(event.aggregateID) }
.filterNot {
it[version] >= event.version
}
.forEach { row ->
BankAccounts.update({ aggregateID.eq(event.aggregateID) }, 1, {
it[balance] = row[balance].plus(event.amount)
it[updatedOn] = DateTime.now()
it[version] = event.version
})
commit()
}
}
}
fun project(event: OwnerChanged) {
transaction {
BankAccounts.select { aggregateID.eq(event.aggregateID) }
.filterNot {
it[version] >= event.version
}
.forEach {
BankAccounts.update({ aggregateID.eq(event.aggregateID) }, 1, {
it[owner] = event.owner
it[updatedOn] = DateTime.now()
it[version] = event.version
})
commit()
}
}
}
fun project(event: WithdrawalPerformed): Option<Command> {
transaction {
BankAccounts.select { aggregateID.eq(event.aggregateID) }
.filterNot {
it[version] >= event.version
}
.forEach { row ->
BankAccounts.update({ aggregateID.eq(event.aggregateID) }, 1, {
it[balance] = row[balance].minus(event.amount)
it[updatedOn] = DateTime.now()
it[version] = event.version
})
commit()
}
}
return none()
}
}
class BankAccountWorkflow : Workflow<BankAccountEvent> {
fun run(event: AccountCreated): List<Command> {
return listOf(SendMessage("account created ${event.owner} for ${event.aggregateID}"))
}
fun run(event: DepositPerformed): List<Command> {
return listOf(SendMessage("${event.amount} deposit performed for ${event.aggregateID}"))
}
fun run(event: OwnerChanged): List<Command> {
return listOf(SendMessage("${event.owner} owner changed for ${event.aggregateID}"))
}
fun run(event: WithdrawalPerformed): List<Command> {
return listOf(SendMessage("${event.amount} withdrawal performed for ${event.aggregateID}"))
}
}