Skip to content
Permalink
Browse files

Metrics introduction (#421)

* Move orbit-prometheus library to orbit-server-prometheus

* Convert to using static Metrics global registry instead of injected instance.

* Include message size and active connections metrics.

* Fix incorrectly registered message size metric.

* Introduce first passing instrumentation test.

* Better factoring of Metrics tests with rebuilding of OrbitServer between each test.

* Add placement timer metric and test

* Add a gauge for total addressables count.

* Introduce application clock as configurable, integrate with addressable and node lease timing, add test for expired addressables.

* Add a test for connected nodes gauge. Made local node and addressable directories backed by singleton collections across instances of Orbit Server and added clear function to support testing.

* Replace more static calls to current time with virtual clock implementations.

* Add unit test around metric decreasing count for expired mesh nodes.

* Add metrics tests for adding/removing nodes from the mesh.

* Add message size and count tests

* Refactor metrics tests to handle multiple servers a little better.

* Add container overrides into OrbitServer for testing, remove redundant server disconnects in Metric tests.

* Add Mockito dependency, test for Slow Tick metrics, updated container to remove duplicate registrations.

* Introduce tick timer and test, new Timer extension method to record suspend methods.

* Remove a runBlocking from a timed method and update test.

* Remove health check timer runBlocking call.

* Include namespace in key to the addressable directory.

Co-authored-by: Brett Morien <bmorien@ea.com>
  • Loading branch information
brettmorien and Brett Morien committed Mar 12, 2020
1 parent fa496fb commit ec42b266570c0631babc220223eb0f8f642f644a
Showing with 759 additions and 119 deletions.
  1. +6 −0 build.gradle.kts
  2. +1 −1 settings.gradle.kts
  3. +1 −1 src/orbit-application/build.gradle.kts
  4. +4 −2 src/orbit-client/src/main/kotlin/orbit/client/execution/ExecutionLeases.kt
  5. +3 −2 src/orbit-client/src/main/kotlin/orbit/client/mesh/NodeLeaser.kt
  6. +14 −1 src/orbit-proto/src/main/kotlin/orbit/shared/proto/AddressableConverters.kt
  7. +5 −0 src/orbit-proto/src/main/proto/orbit/shared/addressable.proto
  8. +10 −0 src/orbit-proto/src/test/kotlin/orbit/shared/proto/AddressableTest.kt
  9. +31 −22 src/orbit-server-etcd/src/main/kotlin/orbit/server/etcd/EtcdAddressableDirectory.kt
  10. +1 −1 src/orbit-server-etcd/src/main/kotlin/orbit/server/etcd/EtcdNodeDirectory.kt
  11. +0 −1 src/{orbit-prometheus → orbit-server-prometheus}/build.gradle.kts
  12. +1 −1 ...eus → orbit-server-prometheus}/src/main/kotlin/orbit/server/prometheus/PrometheusMeterEndpoint.kt
  13. +0 −4 ...rometheus → orbit-server-prometheus}/src/main/kotlin/orbit/server/prometheus/PrometheusMetrics.kt
  14. +27 −15 src/orbit-server/src/main/kotlin/orbit/server/OrbitServer.kt
  15. +14 −3 src/orbit-server/src/main/kotlin/orbit/server/OrbitServerConfig.kt
  16. +3 −2 src/orbit-server/src/main/kotlin/orbit/server/mesh/AddressableDirectory.kt
  17. +51 −37 src/orbit-server/src/main/kotlin/orbit/server/mesh/AddressableManager.kt
  18. +7 −6 src/orbit-server/src/main/kotlin/orbit/server/mesh/ClusterManager.kt
  19. +3 −1 src/orbit-server/src/main/kotlin/orbit/server/mesh/LocalNodeInfo.kt
  20. +21 −4 src/orbit-server/src/main/kotlin/orbit/server/mesh/local/LocalAddressableDirectory.kt
  21. +19 −3 src/orbit-server/src/main/kotlin/orbit/server/mesh/local/LocalNodeDirectory.kt
  22. +5 −0 src/orbit-server/src/main/kotlin/orbit/server/net/ClientConnection.kt
  23. +6 −1 src/orbit-server/src/main/kotlin/orbit/server/net/ConnectionManager.kt
  24. +5 −0 src/orbit-server/src/main/kotlin/orbit/server/net/RemoteMeshNodeManager.kt
  25. +18 −5 src/orbit-server/src/main/kotlin/orbit/server/service/HealthService.kt
  26. +457 −0 src/orbit-server/src/test/kotlin/orbit/server/MetricsTests.kt
  27. +1 −0 src/orbit-server/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
  28. +7 −0 src/orbit-shared/src/main/kotlin/orbit/shared/addressable/Addressable.kt
  29. +1 −0 src/orbit-util/build.gradle.kts
  30. +2 −3 src/orbit-util/src/main/kotlin/orbit/util/concurrent/HashMapBackedAsyncMap.kt
  31. +2 −0 src/orbit-util/src/main/kotlin/orbit/util/di/ComponentContainer.kt
  32. +20 −0 src/orbit-util/src/main/kotlin/orbit/util/instrumentation/TimerUtils.kt
  33. +8 −0 src/orbit-util/src/main/kotlin/orbit/util/time/Clock.kt
  34. +3 −1 src/orbit-util/src/main/kotlin/orbit/util/time/ConstantTicker.kt
  35. +2 −2 src/orbit-util/src/main/kotlin/orbit/util/time/Timestamp.kt
@@ -15,6 +15,9 @@ val protobufVersion by extra("3.11.1")
val kotlinCoroutinesVersion by extra("1.3.3")
val slf4jVersion by extra("1.7.30")
val jacksonVersion by extra("2.10.2")
val kotestVersion by extra ("3.4.2")
val mokitoVersion by extra("3.3.0")
val mockitoKotlin2Version by extra("2.2.0")

// Publishing info
val orbitGroup by extra("cloud.orbit")
@@ -57,6 +60,9 @@ subprojects {

"testRuntimeOnly"("org.slf4j:slf4j-simple:$slf4jVersion")
"testImplementation"(kotlin("test-junit"))
"testImplementation"("io.kotlintest:kotlintest-runner-junit5:$kotestVersion")
"testImplementation"("org.mockito:mockito-core:$mokitoVersion")
"testImplementation"("com.nhaarman.mockitokotlin2:mockito-kotlin:$mockitoKotlin2Version")
}

tasks.withType<KotlinJvmCompile> {
@@ -13,7 +13,7 @@ include(":src:orbit-shared")
include(":src:orbit-server")
include(":src:orbit-server-etcd")
include(":src:orbit-application")
include(":src:orbit-prometheus")
include(":src:orbit-server-prometheus")

include(":src:orbit-client")

@@ -16,7 +16,7 @@ plugins {
dependencies {
implementation(project(":src:orbit-server"))
implementation(project(":src:orbit-server-etcd"))
implementation(project(":src:orbit-prometheus"))
implementation(project(":src:orbit-server-prometheus"))
implementation(project(":src:orbit-shared"))
implementation(project(":src:orbit-util"))

@@ -9,10 +9,12 @@ package orbit.client.execution
import orbit.client.mesh.AddressableLeaser
import orbit.shared.addressable.AddressableLease
import orbit.shared.addressable.AddressableReference
import orbit.util.time.Clock
import java.util.concurrent.ConcurrentHashMap

internal class ExecutionLeases(
private val addressableLeaser: AddressableLeaser
private val addressableLeaser: AddressableLeaser,
private val clock: Clock
) {
private val currentLeases = ConcurrentHashMap<AddressableReference, AddressableLease>()

@@ -21,7 +23,7 @@ internal class ExecutionLeases(
suspend fun getOrRenewLease(addressableReference: AddressableReference): AddressableLease {
var currentLease = currentLeases[addressableReference]

if (currentLease == null || currentLease.expiresAt.inPast()) {
if (currentLease == null || clock.inPast(currentLease.expiresAt)) {
currentLease = renewLease(addressableReference)
}

@@ -16,9 +16,10 @@ import orbit.shared.proto.NodeManagementGrpc
import orbit.shared.proto.NodeManagementOuterClass
import orbit.shared.proto.toCapabilitiesProto
import orbit.shared.proto.toNodeInfo
import orbit.util.time.Clock
import java.util.concurrent.TimeUnit

internal class NodeLeaser(private val localNode: LocalNode, grpcClient: GrpcClient, config: OrbitClientConfig) {
internal class NodeLeaser(private val localNode: LocalNode, grpcClient: GrpcClient, config: OrbitClientConfig, private val clock: Clock) {
private val logger = KotlinLogging.logger { }
private val joinTimeout = config.joinClusterTimeout

@@ -46,7 +47,7 @@ internal class NodeLeaser(private val localNode: LocalNode, grpcClient: GrpcClie
suspend fun renewLease(force: Boolean) {
localNode.status.nodeInfo?.let { existingInfo ->
val existingLease = existingInfo.lease
if (force || existingLease.renewAt.inPast()) {
if (force || clock.inPast(existingLease.renewAt)) {
logger.debug("Renewing lease...")
val renewalResult = nodeManagementStub.renewLease(
NodeManagementOuterClass.RenewNodeLeaseRequestProto.newBuilder()
@@ -9,6 +9,7 @@ package orbit.shared.proto
import orbit.shared.addressable.AddressableLease
import orbit.shared.addressable.AddressableReference
import orbit.shared.addressable.Key
import orbit.shared.addressable.NamespacedAddressableReference

fun AddressableReference.toAddressableReferenceProto() =
Addressable.AddressableReferenceProto.newBuilder()
@@ -54,4 +55,16 @@ fun Addressable.AddressableLeaseProto.toAddressableLease() =
reference = reference.toAddressableReference(),
expiresAt = expiresAt.toTimestamp(),
renewAt = renewAt.toTimestamp()
)
)

fun NamespacedAddressableReference.toNamespacedAddressableReferenceProto() =
Addressable.NamespacedAddressableReferenceProto.newBuilder()
.setNamespace(namespace)
.setAddressableReference(addressableReference.toAddressableReferenceProto())
.build()

fun Addressable.NamespacedAddressableReferenceProto.toNamespacedAddressableReference() =
NamespacedAddressableReference(
namespace = namespace,
addressableReference = this.addressableReference.toAddressableReference()
)
@@ -24,4 +24,9 @@ message AddressableLeaseProto {
AddressableReferenceProto reference = 2;
google.protobuf.Timestamp renew_at = 3;
google.protobuf.Timestamp expires_at = 4;
}

message NamespacedAddressableReferenceProto {
string namespace = 1;
AddressableReferenceProto addressableReference = 2;
}
@@ -8,6 +8,7 @@ package orbit.shared.proto

import orbit.shared.addressable.AddressableReference
import orbit.shared.addressable.Key
import orbit.shared.addressable.NamespacedAddressableReference
import org.junit.Test
import kotlin.test.assertEquals

@@ -19,4 +20,13 @@ class AddressableTest {
val endRef = convertedRef.toAddressableReference()
assertEquals(initialRef, endRef)
}

@Test
fun `test namespaced addressable reference conversion`() {
val initialRef = NamespacedAddressableReference("test", AddressableReference("test", Key.StringKey("testId")))
val convertedRef = initialRef.toNamespacedAddressableReferenceProto()
val endRef = convertedRef.toNamespacedAddressableReference()
assertEquals(initialRef, endRef)

}
}
@@ -8,6 +8,7 @@ package orbit.server.etcd

import io.etcd.jetcd.ByteSequence
import io.etcd.jetcd.Client
import io.etcd.jetcd.KeyValue
import io.etcd.jetcd.op.Op
import io.etcd.jetcd.options.DeleteOption
import io.etcd.jetcd.options.GetOption
@@ -19,10 +20,12 @@ import orbit.server.mesh.AddressableDirectory
import orbit.shared.addressable.AddressableLease
import orbit.shared.addressable.AddressableReference
import orbit.shared.addressable.Key
import orbit.shared.addressable.NamespacedAddressableReference
import orbit.shared.proto.Addressable
import orbit.shared.proto.toAddressableLease
import orbit.shared.proto.toAddressableLeaseProto
import orbit.shared.proto.toAddressableReferenceProto
import orbit.shared.proto.toNamespacedAddressableReferenceProto
import orbit.util.di.ExternallyConfigured
import orbit.util.time.Clock
import orbit.util.time.stopwatch
@@ -74,24 +77,28 @@ class EtcdAddressableDirectory(config: EtcdAddressableDirectoryConfig, private v
}
}

override suspend fun set(key: AddressableReference, value: AddressableLease) {
client.put(toKey(key), ByteSequence.from(key.toAddressableReferenceProto().toByteArray())).await()
override suspend fun count(): Int {
return getAllItems().count()
}

override suspend fun get(key: AddressableReference): AddressableLease? {
override suspend fun set(key: NamespacedAddressableReference, value: AddressableLease) {
client.put(toKey(key), ByteSequence.from(key.toNamespacedAddressableReferenceProto().toByteArray())).await()
}

override suspend fun get(key: NamespacedAddressableReference): AddressableLease? {
val response = client.get(toKey(key)).await()
return response.kvs.firstOrNull()?.value?.let {
Addressable.AddressableLeaseProto.parseFrom(it.bytes).toAddressableLease()
}
}

override suspend fun remove(key: AddressableReference): Boolean {
override suspend fun remove(key: NamespacedAddressableReference): Boolean {
client.delete(toKey(key))
return true
}

override suspend fun compareAndSet(
key: AddressableReference,
key: NamespacedAddressableReference,
initialValue: AddressableLease?,
newValue: AddressableLease?
): Boolean {
@@ -111,7 +118,16 @@ class EtcdAddressableDirectory(config: EtcdAddressableDirectoryConfig, private v
return false
}

override suspend fun entries(): Iterable<Pair<AddressableReference, AddressableLease>> {
override suspend fun entries(): Iterable<Pair<NamespacedAddressableReference, AddressableLease>> {
return getAllItems().map { kv ->
Pair(
fromKey(kv.key),
Addressable.AddressableLeaseProto.parseFrom(kv.value.bytes).toAddressableLease()
)
}
}

private suspend fun getAllItems(): MutableList<KeyValue> {
val key = ByteSequence.from("\u0000".toByteArray())

val option = GetOption.newBuilder()
@@ -121,14 +137,7 @@ class EtcdAddressableDirectory(config: EtcdAddressableDirectoryConfig, private v
.withRange(key)
.build()

val response = client.get(key, option).await()

return response.kvs.map { kv ->
Pair(
fromKey(kv.key),
Addressable.AddressableLeaseProto.parseFrom(kv.value.bytes).toAddressableLease()
)
}
return client.get(key, option).await().kvs
}

override suspend fun tick() {
@@ -138,13 +147,13 @@ class EtcdAddressableDirectory(config: EtcdAddressableDirectoryConfig, private v
lastCleanup.set(clock.currentTime)
val addressables = values()

val (expiredLeases, validLeases) = addressables.partition { addressable -> addressable.expiresAt.inPast() }
val (expiredLeases, validLeases) = addressables.partition { addressable -> clock.inPast(addressable.expiresAt) }

if (expiredLeases.any()) {
val txn = client.txn()
txn.Then(*expiredLeases.map { addressable ->
txn.Then(*expiredLeases.map { lease ->
Op.delete(
toKey(addressable.reference),
toKey(NamespacedAddressableReference(lease.nodeId.namespace, lease.reference)),
DeleteOption.DEFAULT
)
}.toTypedArray()).commit()
@@ -158,15 +167,15 @@ class EtcdAddressableDirectory(config: EtcdAddressableDirectoryConfig, private v
}
}

private fun toKey(address: AddressableReference): ByteSequence {
return ByteSequence.from("$keyPrefix/${address.type}/${address.key}".toByteArray())
private fun toKey(reference: NamespacedAddressableReference): ByteSequence {
return ByteSequence.from("$keyPrefix/${reference.namespace}/${reference.addressableReference.type}/${reference.addressableReference.key}".toByteArray())
}

private fun fromKey(keyBytes: ByteSequence): AddressableReference {
private fun fromKey(keyBytes: ByteSequence): NamespacedAddressableReference {
val keyString = keyBytes.toString(Charset.defaultCharset())

val (_, type, key) = keyString.split("/")
val (_, namespace, type, key) = keyString.split("/")

return AddressableReference(type, Key.of(key))
return NamespacedAddressableReference(namespace, AddressableReference(type, Key.of(key)))
}
}
@@ -104,7 +104,7 @@ class EtcdNodeDirectory(config: EtcdNodeDirectoryConfig, private val clock: Cloc
lastCleanup.set(clock.currentTime)
val nodes = values()

val (expiredLeases, validLeases) = nodes.partition { node -> node.lease.expiresAt.inPast() }
val (expiredLeases, validLeases) = nodes.partition { node -> clock.inPast(node.lease.expiresAt) }

if (expiredLeases.any()) {
val txn = client.txn()
@@ -3,7 +3,6 @@
This file is part of the Orbit Project <https://www.orbit.cloud>.
See license in LICENSE.
*/
val grpcVersion = project.rootProject.ext["grpcVersion"]

plugins {
kotlin("jvm")
@@ -4,7 +4,7 @@
See license in LICENSE.
*/

package src.main.kotlin.orbit.prometheus
package orbit.server.prometheus

import com.sun.net.httpserver.HttpServer
import io.micrometer.prometheus.PrometheusMeterRegistry
@@ -9,10 +9,7 @@ package orbit.server.prometheus
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.prometheus.PrometheusConfig
import io.micrometer.prometheus.PrometheusMeterRegistry
import mu.KotlinLogging
import orbit.util.di.ExternallyConfigured
import src.main.kotlin.orbit.prometheus.PrometheusMeterEndpoint


class PrometheusMetrics(config: PrometheusMetricsConfig) : PrometheusMeterRegistry(PrometheusConfig.DEFAULT) {
data class PrometheusMetricsConfig(
@@ -22,6 +19,5 @@ class PrometheusMetrics(config: PrometheusMetricsConfig) : PrometheusMeterRegist
override val instanceType: Class<out MeterRegistry> = PrometheusMetrics::class.java
}

private val logger = KotlinLogging.logger {}
private val meterServer = PrometheusMeterEndpoint(this, config.url, config.port)
}

0 comments on commit ec42b26

Please sign in to comment.
You can’t perform that action at this time.