Skip to content

Commit

Permalink
feat(event): Adding eventing lib (#3922)
Browse files Browse the repository at this point in the history
  • Loading branch information
robzienert committed Aug 9, 2019
1 parent 4622610 commit 9041ef4
Show file tree
Hide file tree
Showing 17 changed files with 796 additions and 7 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ buildscript {
}
classpath "com.netflix.nebula:nebula-kotlin-plugin:$kotlinVersion"
classpath "org.junit.platform:junit-platform-gradle-plugin:${junitPlatformVersion}"

classpath "org.jetbrains.kotlin:kotlin-allopen:$kotlinVersion"
}
}

Expand All @@ -55,6 +55,7 @@ allprojects {
apply plugin: 'java-library'
apply plugin: 'groovy'
apply plugin: 'nebula.kotlin'
apply plugin: "kotlin-allopen"

sourceSets.main.java.srcDirs = []
sourceSets.main.groovy.srcDirs += ["src/main/java"]
Expand Down
42 changes: 42 additions & 0 deletions clouddriver-event/clouddriver-event.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

dependencies {
annotationProcessor "org.springframework.boot:spring-boot-autoconfigure-processor"

implementation "com.netflix.spinnaker.kork:kork-core"
implementation "com.netflix.spinnaker.kork:kork-exceptions"
implementation "com.google.guava:guava"
implementation "com.google.code.findbugs:jsr305"
implementation "org.springframework.boot:spring-boot-actuator"
implementation "com.fasterxml.jackson.module:jackson-module-kotlin"
implementation "javax.validation:validation-api"
implementation "org.hibernate.validator:hibernate-validator"

testImplementation "cglib:cglib-nodep"
testImplementation "org.objenesis:objenesis"
testImplementation "org.junit.platform:junit-platform-runner"
testImplementation "org.junit.jupiter:junit-jupiter-api"
testImplementation "org.springframework:spring-test"
testImplementation "org.springframework.boot:spring-boot-test"
testImplementation "org.assertj:assertj-core"
testImplementation "io.strikt:strikt-core"
testImplementation "dev.minutest:minutest"
testImplementation "io.mockk:mockk"

testRuntimeOnly "org.junit.platform:junit-platform-launcher"
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.clouddriver.event

/**
* The identifiable collection of an event log.
*
* Aggregates are grouped by a [type] which should be unique for each domain entity, with unique
* [id] values therein. A [version] field is used to ensure business logic is operating on the
* latest event state; any modification to an [Aggregate] event log will increment this value.
* When an operation is attempted on an [version] which is not head, the event framework will
* reject the change.
*/
class Aggregate(
val type: String,
val id: String,
var version: Long
) {
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false

other as Aggregate

if (type != other.type) return false
if (id != other.id) return false

return true
}

override fun hashCode(): Int {
var result = type.hashCode()
result = 31 * result + id.hashCode()
return result
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.clouddriver.event

/**
* Interface for subscribing to Spinnaker events.
*
* An implementation must be responsible for filtering out the events that it doesn't care about.
*/
interface EventListener {
fun onEvent(event: SpinnakerEvent)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.clouddriver.event

import java.time.Instant

/**
* Metadata for a [SpinnakerEvent].
*
* @param sequence Auto-incrementing number for event ordering
* @param originatingVersion The aggregate version that originated this event
* @param timestamp The time at which the event was created
* @param serviceVersion The version of the service (clouddriver) that created the event
* @param provenance Where the event was generated and by what
*/
data class EventMetadata(
val sequence: Long,
val originatingVersion: Long,
val timestamp: Instant = Instant.now(),
val serviceVersion: String = "unknown",
val provenance: String = "unknown"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.clouddriver.event

/**
* The event sourcing library event publisher.
*
* This library assumes that events are persisted first into a durable store and then propagated out
* to subscribers afterwards. There is no contract on immediacy or locality of events being delivered
* to subscribers: This is left entirely to the implementation.
*/
interface EventPublisher {
fun publish(event: SpinnakerEvent)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.clouddriver.event

import com.fasterxml.jackson.annotation.JsonTypeInfo
import java.util.UUID

/**
* The base event class for the event sourcing library.
*
* @param aggregateType The type of aggregate the event is for
* @param aggregateId The id of the aggregate the event is for
* @property id A unique ID for the event. This value is for tracing, rather than loading events
* @property metadata Associated metadata about the event; not actually part of the "event proper"
*/
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "spinEventType"
)
abstract class SpinnakerEvent(
val aggregateType: String,
val aggregateId: String
) {
val id = UUID.randomUUID().toString()

lateinit var metadata: EventMetadata
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.clouddriver.event.config

import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Import

/**
* Auto-configures the event sourcing library.
*/
@Configuration
@Import(MemoryEventRepositoryConfig::class)
open class EventSourceAutoConfiguration
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.clouddriver.event.config

import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.clouddriver.event.persistence.EventRepository
import com.netflix.spinnaker.clouddriver.event.persistence.InMemoryEventRepository
import org.slf4j.LoggerFactory
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.ApplicationEventPublisher
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.validation.annotation.Validated
import java.time.Duration
import javax.validation.Constraint
import javax.validation.ConstraintValidator
import javax.validation.ConstraintValidatorContext
import javax.validation.constraints.Min
import kotlin.reflect.KClass

@Configuration
@ConditionalOnProperty("spinnaker.clouddriver.eventing.memory-repository.enabled", matchIfMissing = true)
@EnableConfigurationProperties(MemoryEventRepositoryConfigProperties::class)
open class MemoryEventRepositoryConfig {

private val log by lazy { LoggerFactory.getLogger(javaClass) }

init {
log.info("Configuring EventRepository: InMemoryEventRepository")
}

@Bean
open fun eventRepository(
properties: MemoryEventRepositoryConfigProperties,
applicationEventPublisher: ApplicationEventPublisher,
registry: Registry
): EventRepository =
InMemoryEventRepository(properties, applicationEventPublisher, registry)
}

@MemoryEventRepositoryConfigProperties.SpinValidated
@ConfigurationProperties("spinnaker.clouddriver.eventing.memory-repository")
open class MemoryEventRepositoryConfigProperties {
/**
* The max age of an [Aggregate]. One of this and [maxAggregatesCount] must be set.
*/
@Min(
message = "Event repository aggregate age cannot be less than 24 hours.",
value = 60 * 60 * 24 * 1000
)
var maxAggregateAgeMs: Long? = Duration.ofHours(24).toMillis()

/**
* The max number of [Aggregate] objects. One of this and [maxAggregateAgeMs] must be set.
*/
var maxAggregatesCount: Int? = null

@Validated
@Constraint(validatedBy = [Validator::class])
@Target(AnnotationTarget.CLASS)
annotation class SpinValidated(
val message: String = "Invalid event repository configuration",
val groups: Array<KClass<out Any>> = [],
val payload: Array<KClass<out Any>> = []
)

class Validator : ConstraintValidator<SpinValidated, MemoryEventRepositoryConfigProperties> {
override fun isValid(
value: MemoryEventRepositoryConfigProperties,
context: ConstraintValidatorContext
): Boolean {
if (value.maxAggregateAgeMs != null && value.maxAggregatesCount != null) {
context.buildConstraintViolationWithTemplate("Only one of 'maxAggregateAgeMs' and 'maxAggregatesCount' can be defined")
.addConstraintViolation()
return false
}
if (value.maxAggregateAgeMs == null && value.maxAggregatesCount == null) {
context.buildConstraintViolationWithTemplate("One of 'maxAggregateAgeMs' and 'maxAggregatesCount' must be set")
.addConstraintViolation()
return false
}
return true
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.clouddriver.event.exceptions

import com.netflix.spinnaker.kork.exceptions.SystemException

/**
* Thrown when one or more [SpinEvent] have been rejected from being committed to an [Aggregate].
*
* The process which originated the event must be retryable.
*/
class AggregateChangeRejectedException(message: String) : SystemException(message) {
override fun getRetryable() = true
}
Loading

0 comments on commit 9041ef4

Please sign in to comment.