Skip to content

Commit

Permalink
Add support for push mode and FRP Style
Browse files Browse the repository at this point in the history
Fixes #50
Fixes #51
  • Loading branch information
rahulsom committed Nov 20, 2017
1 parent d26ba10 commit 28d9048
Show file tree
Hide file tree
Showing 23 changed files with 1,095 additions and 29 deletions.
5 changes: 5 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,8 @@ link:examples/javaee[examples/javaee]::
The persistence here is completely made up using a few `List` objects.
The reason for that is to show how you don't have to be tied to any one persistence mechanism.

link:examples/pushstyle[examples/pushstyle]::
This uses Kotlin and Guava EventBus.
The snapshots are persisted using jOOQ.
The events are never persisted. They are processed as they come on the EventBus.

102 changes: 102 additions & 0 deletions examples/pushstyle/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import org.jooq.SQL

buildscript {
repositories {
mavenCentral()
maven { url 'https://plugins.gradle.org/m2/' }
maven { url "http://dl.bintray.com/kotlin/kotlin-eap-1.2" }
}
dependencies {
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:${kotlinVersion}"
classpath 'nu.studer:gradle-jooq-plugin:2.0.9'
classpath 'com.h2database:h2:1.4.196'
classpath "gradle.plugin.com.boxfuse.client:flyway-release:4.2.0"
}
}

apply plugin: 'kotlin'
apply plugin: 'nu.studer.jooq'
apply plugin: "org.flywaydb.flyway"

version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8
compileKotlin {
kotlinOptions.jvmTarget = "1.8"
}
compileTestKotlin {
kotlinOptions.jvmTarget = "1.8"
}

repositories {
mavenCentral()
maven { url "http://dl.bintray.com/kotlin/kotlin-eap-1.2" }
}

dependencies {
compile "org.jetbrains.kotlin:kotlin-stdlib-jre8:${kotlinVersion}"
compile "org.jetbrains.kotlin:kotlin-reflect:${kotlinVersion}"

compile 'com.google.inject:guice:4.1.0'
compile 'com.google.guava:guava:23.4-jre'

compile 'com.h2database:h2:1.4.196'
compile 'org.jooq:jooq'

compile project(':grooves-api')

jooqRuntime 'com.h2database:h2:1.4.196'

runtime 'org.slf4j:slf4j-simple:1.7.25'
testCompile 'junit:junit:4.12'
}

sonarqube {
properties {
property 'sonar.moduleKey', 'com.github.rahulsom:grooves:examples-pushstyle'
}
}

jooq {
version = '3.10.1'
edition = 'OSS'
main(sourceSets.main) {
jdbc {
driver = 'org.h2.Driver'
url = "jdbc:h2:file:${buildDir}/schema"
user = 'sa'
password = ''
}
generator {
name = 'org.jooq.util.DefaultGenerator'
strategy {
name = 'org.jooq.util.DefaultGeneratorStrategy'
}
database {
name = 'org.jooq.util.h2.H2Database'
inputSchema = 'public'
}
generate {
relations = true
deprecated = false
records = true
immutablePojos = true
fluentSetters = true
}
target {
packageName = 'grooves.example.pushstyle'
}
}
}
}

flyway {
url = "jdbc:h2:file:${buildDir}/schema"
user = 'sa'
password = ''
schemas = ['public']
}

generateMainJooqSchemaSource.dependsOn 'flywayMigrate'
compileKotlin.dependsOn 'generateMainJooqSchemaSource'

checkstyleMain.source = "src/main/java"
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package grooves.example.push

import com.google.common.eventbus.EventBus
import com.google.inject.Inject
import grooves.example.pushstyle.Public
import grooves.example.pushstyle.Tables.BALANCE
import org.jooq.DSLContext
import org.slf4j.LoggerFactory.getLogger
import java.util.*
import grooves.example.pushstyle.tables.Balance as BalanceTable

class Application @Inject constructor(
val eventBus: EventBus, eventService: EventService, val database: Database,
dslContext: DSLContext) {

private val log = getLogger(this.javaClass)

init {
log.error("Setting up schema")
dslContext.createSchemaIfNotExists(Public.PUBLIC).execute()
log.error("Setting up table")
dslContext.createTableIfNotExists(BALANCE)
.column(BALANCE.B_ID)
.column(BALANCE.B_VERSION)
.column(BALANCE.B_TIME)
.column(BALANCE.B_ACCOUNT)
.column(BALANCE.BALANCE_)
.execute()
log.error("Registering eventService")
eventBus.register(eventService)
log.error("doStart complete")
}

fun deposit(accountId: String, position: Long, atmId: String, amount: Long) =
eventBus.post(
Transaction.Deposit(
UUID.randomUUID().toString(), Account(accountId), Date(), position, atmId, amount))

fun withdraw(accountId: String, position: Long, atmId: String, amount: Long) =
eventBus.post(
Transaction.Withdraw(
UUID.randomUUID().toString(), Account(accountId), Date(), position, atmId, amount))

fun getBalance(accountId: String) =
database.getBalance(Account(accountId), null)?.balance

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package grooves.example.push

import com.google.common.eventbus.EventBus
import com.google.inject.AbstractModule
import com.google.inject.Provides
import org.jooq.SQLDialect.H2
import org.jooq.impl.DSL
import java.sql.DriverManager.getConnection

object BankingModule : AbstractModule() {

override fun configure() {
bind(Application::class.java)
bind(EventBus::class.java)
bind(EventService::class.java)
bind(Database::class.java)
}

@Provides
fun dslContext() =
DSL.using(getConnection("jdbc:h2:mem:app", "sa", ""), H2)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package grooves.example.push

import io.reactivex.Scheduler
import io.reactivex.disposables.CompositeDisposable
import io.reactivex.disposables.Disposable
import io.reactivex.disposables.Disposables
import io.reactivex.internal.schedulers.NewThreadWorker
import io.reactivex.internal.schedulers.RxThreadFactory
import java.util.concurrent.TimeUnit

object ContextAwareScheduler : Scheduler() {

private val worker = NewThreadWorker(RxThreadFactory("ContextAwareScheduler"))

override fun createWorker(): Scheduler.Worker = ContextAwareWorker(worker)

internal class ContextAwareWorker(val worker: NewThreadWorker) : Scheduler.Worker() {

val tracking = CompositeDisposable()

override fun schedule(runnable: Runnable, delay: Long, unit: TimeUnit): Disposable {
if (isDisposed) {
return Disposables.disposed()
}

val context = ContextManager.get()
val contextAwareRunnable = {
ContextManager.set(context)
runnable.run()
}

return worker.scheduleActual(contextAwareRunnable, delay, unit, tracking)
}

override fun isDisposed() = tracking.isDisposed

override fun dispose() = tracking.dispose()
}

}

object ContextManager {

internal val ctx = ThreadLocal<Any>()

fun get(): Any {
return ctx.get()
}

fun set(context: Any) {
ctx.set(context)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package grooves.example.push

import com.google.inject.Inject
import grooves.example.pushstyle.Tables
import grooves.example.pushstyle.tables.records.BalanceRecord
import org.jooq.DSLContext

class Database {

@Inject lateinit var dslContext: DSLContext

fun getBalance(account: Account, version: Long?): BalanceRecord? =
dslContext
.select()
.from(Tables.BALANCE)
.where(Tables.BALANCE.B_ACCOUNT.eq(account.id))
.and(Tables.BALANCE.B_VERSION.le(version ?: Long.MAX_VALUE))
.orderBy(Tables.BALANCE.B_VERSION.desc())
.limit(1)
.fetchAny() as BalanceRecord?

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package grooves.example.push

import com.github.rahulsom.grooves.api.EventApplyOutcome.CONTINUE
import com.github.rahulsom.grooves.queries.Grooves
import com.google.common.eventbus.Subscribe
import com.google.inject.Inject
import grooves.example.pushstyle.tables.records.BalanceRecord
import io.reactivex.Flowable
import io.reactivex.Flowable.just
import io.reactivex.Maybe
import org.jooq.DSLContext
import org.omg.CORBA.Object
import org.slf4j.LoggerFactory
import java.sql.Timestamp

class EventService {

@Inject lateinit var database: Database
@Inject lateinit var dslContext: DSLContext

val log = LoggerFactory.getLogger(this::class.java)

val query =
Grooves.versioned<String, Account, String, Transaction, String, Balance>()
.withSnapshot { version, account ->
Maybe.fromCallable { database.getBalance(account, version) }
.map { dbBalance ->
Balance(
dbBalance.bId, Account(dbBalance.bAccount),
dbBalance.balance, dbBalance.bVersion, dbBalance.bTime
)
}
.toFlowable()
}
.withEmptySnapshot { Balance() }
.withEvents { transaction, balance, date ->
val t = (ContextManager.get() as Map<String, Transaction>).get("transaction")
Flowable.just(t)
}
// .withApplyEvents { balance -> true }
.withDeprecator { balance, deprecatingAccount -> /* No op */ }
.withExceptionHandler { exception, balance, transaction ->
log.warn("$exception occurred")
just(CONTINUE)
}
.withEventHandler(this::updateBalance)
.build()


private fun updateBalance(transaction: Transaction, balance: Balance) =
when (transaction) {
is Transaction.Deposit -> {
balance.balance += transaction.amount
just(CONTINUE)
}
is Transaction.Withdraw -> {
balance.balance -= transaction.amount
just(CONTINUE)
}
}

@Suppress("unused")
@Subscribe
fun onTransaction(newTransaction: Transaction) {

val context = mapOf("transaction" to newTransaction)

ContextManager.set(context)

val snapshotPublisher =
query.computeSnapshot(newTransaction.aggregate, newTransaction.position)

Flowable.fromPublisher(snapshotPublisher)
.subscribe { balance ->
dslContext.executeInsert(
BalanceRecord(
balance.id,
balance.lastEventPosition,
Timestamp.from(balance.lastEventTimestamp?.toInstant()),
balance.aggregate?.id,
balance.balance
)
)
}
}

}
Loading

0 comments on commit 28d9048

Please sign in to comment.