Skip to content

Commit

Permalink
Event interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
Matthew Mihic committed Sep 27, 2018
1 parent d7109cd commit e468984
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 0 deletions.
54 changes: 54 additions & 0 deletions misk-events/build.gradle
@@ -0,0 +1,54 @@
import org.junit.platform.console.options.Details

buildscript {
dependencies {
classpath dep.kotlinNoArgPlugin
}
}

apply plugin: 'kotlin'
apply plugin: 'kotlin-jpa'
apply plugin: 'org.junit.platform.gradle.plugin'
apply plugin: "com.vanniktech.maven.publish"

compileKotlin {
kotlinOptions {
jvmTarget = "1.8"
allWarningsAsErrors = true
}
}
compileTestKotlin {
kotlinOptions {
jvmTarget = "1.8"
allWarningsAsErrors = true
}
}

sourceSets {
main.java.srcDirs += 'src/main/kotlin/'
test.java.srcDirs += 'src/test/kotlin/'
}

junitPlatform {
details Details.VERBOSE
}

dependencies {
compile dep.hibernateCore
compile dep.hikariCp
compile dep.hsqldb
compile dep.mysql
compile dep.openTracing
compile dep.openTracingUtil
compile dep.openTracingJdbc
compile dep.vitess
compile project(':misk')
compile project(':misk-hibernate')

testCompile project(':misk-testing')
testCompile project(':misk-hibernate-testing')
}

if (rootProject.file("hooks.gradle").exists()) {
apply from: rootProject.file("hooks.gradle")
}
4 changes: 4 additions & 0 deletions misk-events/gradle.properties
@@ -0,0 +1,4 @@
POM_ARTIFACT_ID=misk-events
POM_NAME=misk-events
POM_DESCRIPTION=A Misk module for handling event streams
POM_PACKAGING=jar
21 changes: 21 additions & 0 deletions misk-events/src/main/kotlin/misk/events/Consumer.kt
@@ -0,0 +1,21 @@
package misk.events

/** A [Consumer] allows applications to receive events from a source */
interface Consumer {
/** The [Context] provides information about a set of events being consumed */
interface Context {
/** the topic from which events are being received */
val topic: Topic

/** Defers processing a set of events to a later time, typically placing them on a retry queue */
fun retryLater(vararg events: Event)
}

/** A [Handler] handles incoming events from a topic */
interface Handler {
fun handleEvents(ctx: Context, vararg events: Event)
}

/** listens for incoming events to a topic */
fun subscribe(topic: Topic, handler: Handler)
}
58 changes: 58 additions & 0 deletions misk-events/src/main/kotlin/misk/events/Event.kt
@@ -0,0 +1,58 @@
package misk.events

import com.squareup.wire.Message
import com.squareup.wire.ProtoAdapter
import okio.ByteString
import okio.ByteString.Companion.encodeUtf8
import java.time.Instant

data class Event(
/** The type of event */
val type: String,

/** the content so the event, encoded as a protobuf */
val body: ByteString,

/** the instant at which the event occurred */
val occurredAt: Instant,

/** a global unique id for the event */
val id: ByteString,

/**
* The id of the entity to which the event is referencing. Many but not all events
* are correlated with a specific entity; if this event is related to an entity,
* the entity_identifier should specify the id of that entity
*/
val entityIdentifier: String = "",

/**
* Partitioning key for the event. The partitioning key controls the ordering and sharding
* of events on a topic. Events on a topic with the same partitioning key are delivered on
* the same shard and in the order in which they were published. Typically, the entity
* identifier is also used as a partitioning key, such that all of events on a topic for
* a given entity get delivered in the order in which they were submitted. However,
* producing applications may include an alternate partition key as part of the event
* to support ordering/sharding at a level above the individual entity. For example,
* a card processing system may want to shard and order all credit card changes relative
* to the customer to whom the card belongs; in this case the entity identifier is the credit
* card modified by the event, but the partition key is the token of the customer owning the card.
*/
val partitionKey: ByteString = if (entityIdentifier.isBlank()) id else entityIdentifier.encodeUtf8(),

/**
* Additional context information about the event, added and examined by infrastructure elements
*/
val headers: Map<String, ByteString> = mapOf()
) {
fun <A : Message<*, *>> bodyAs(adapter: ProtoAdapter<A>): A = adapter.decode(body)

inline fun <reified A : Message<*, *>> bodyAs(): A = bodyAs(ProtoAdapter.get(A::class.java))

fun <A : Message<*, *>> header(name: String, adapter: ProtoAdapter<A>): A? =
headers[name]?.let { adapter.decode(it) }

inline fun <reified A : Message<*, *>> header(name: String) =
header(name, ProtoAdapter.get(A::class.java))
}

28 changes: 28 additions & 0 deletions misk-events/src/main/kotlin/misk/events/Producer.kt
@@ -0,0 +1,28 @@
package misk.events

/**
* A [Producer] is used to send events to an event stream.
*/
interface Producer {
/**
* A producer [Transaction] is a unit of publishing. Producer transactions adhere to the
* transactional concepts of isolation (events publishing within a transaction are only
* visible to consumers once the transaction is committed) and atomicity (once committed, all
* events within the transactions are made available to consumers - no events will be lost).
*
* Note that a Producer transaction is _not_ connected to a database transaction; it is solely
* a transaction within the event streaming system. To coordinate event publishing with
* local database transactions, use a [SpooledProducer] which stores the events in a local table
* as part of the local database transaction.
*
* Transactions remaining outstanding until the application calls commit, or rollback, or until
* a producer specific timeout occurs.
*/
interface Transaction {
fun publish(topic: Topic, vararg events: Event)
fun commit()
fun rollback()
}

fun beginTransaction(): Transaction
}
17 changes: 17 additions & 0 deletions misk-events/src/main/kotlin/misk/events/SpooledProducer.kt
@@ -0,0 +1,17 @@
package misk.events

import misk.hibernate.Gid
import misk.hibernate.Session

/**
* A [SpooledProducer] is a producer that writes events to a local spool stored within a
* service's database. [SpooledProducer]s can be used to coordinate event publishing with
* local database transactions. Events published to the pool are done within the application's
* local database transaction; a rollback of the database transaction will also rollback
* any events published to the spool. Events are asynchronously forwarded from the spool
* to the event stream, and are done so through a [Producer] transaction.
*/
interface SpooledProducer {
fun publish(session: Session, groupRootId: Gid<*, *>, topic: Topic, vararg event: Event)
fun publish(session: Session, topic: Topic, vararg event: Event)
}
3 changes: 3 additions & 0 deletions misk-events/src/main/kotlin/misk/events/Topic.kt
@@ -0,0 +1,3 @@
package misk.events

data class Topic(val name: String)
1 change: 1 addition & 0 deletions settings.gradle
Expand Up @@ -10,6 +10,7 @@ include ':misk-prometheus'
include ':misk-testing'
include ':misk-zipkin'
include ':misk-eventrouter'
include ':misk-events'
include ':samples:exemplar'
include ':samples:exemplarchat'
include ':samples:urlshortener'

0 comments on commit e468984

Please sign in to comment.