Skip to content

Commit

Permalink
feat: Export more metrics to prometheus. (#2129)
Browse files Browse the repository at this point in the history
* feat: Export JVM metrics to prometheus.
* feat: Add a startup_time metric for detection of restarts.
* feat: Add metrics for queue drops/exceptions.
  • Loading branch information
bgrozev committed May 2, 2024
1 parent b623d6a commit acf024b
Show file tree
Hide file tree
Showing 15 changed files with 369 additions and 58 deletions.
Expand Up @@ -152,7 +152,7 @@ class RtpReceiverImpl @JvmOverloads constructor(
}

companion object {
val queueErrorCounter = CountingErrorHandler()
var queueErrorCounter = CountingErrorHandler()

private const val PACKET_QUEUE_ENTRY_EVENT = "Entered RTP receiver incoming queue"
private const val PACKET_QUEUE_EXIT_EVENT = "Exited RTP receiver incoming queue"
Expand Down
Expand Up @@ -324,7 +324,7 @@ class RtpSenderImpl(
}

companion object {
val queueErrorCounter = CountingErrorHandler()
var queueErrorCounter = CountingErrorHandler()

private const val PACKET_QUEUE_ENTRY_EVENT = "Entered RTP sender incoming queue"
private const val PACKET_QUEUE_EXIT_EVENT = "Exited RTP sender incoming queue"
Expand Down
15 changes: 5 additions & 10 deletions jvb/src/main/java/org/jitsi/videobridge/Conference.java
Expand Up @@ -219,10 +219,7 @@ public Conference(Videobridge videobridge,
this.id = Objects.requireNonNull(id, "id");
this.conferenceName = conferenceName;
this.colibri2Handler = new Colibri2ConferenceHandler(this, logger);
colibriQueue = new PacketQueue<>(
Integer.MAX_VALUE,
true,
"colibri-queue",
colibriQueue = new ColibriQueue(
request ->
{
try
Expand Down Expand Up @@ -256,12 +253,10 @@ public Conference(Videobridge videobridge,
e.getMessage()));
}
return true;
},
TaskPools.IO_POOL,
Clock.systemUTC(), // TODO: using the Videobridge clock breaks tests somehow
/* Allow running tasks to complete (so we can close the queue from within the task. */
false
);
}
)
{
};

speechActivity = new ConferenceSpeechActivity(new SpeechActivityListener());
updateLastNEndpointsFuture = TaskPools.SCHEDULED_POOL.scheduleAtFixedRate(() -> {
Expand Down
Expand Up @@ -16,10 +16,13 @@
package org.jitsi.videobridge

import org.jitsi.utils.logging2.Logger
import org.jitsi.utils.queue.CountingErrorHandler
import org.jitsi.utils.queue.PacketQueue
import org.jitsi.videobridge.message.BridgeChannelMessage
import org.jitsi.videobridge.message.BridgeChannelMessage.Companion.parse
import org.jitsi.videobridge.message.MessageHandler
import org.jitsi.videobridge.metrics.QueueMetrics
import org.jitsi.videobridge.metrics.VideobridgeMetricsContainer
import org.jitsi.videobridge.util.TaskPools
import org.json.simple.JSONObject
import java.io.IOException
Expand All @@ -31,7 +34,7 @@ abstract class AbstractEndpointMessageTransport(parentLogger: Logger) : MessageH

abstract val isConnected: Boolean

private val incomingMessageQueue: PacketQueue<MessageAndSource> = PacketQueue(
private val incomingMessageQueue: PacketQueue<MessageAndSource> = PacketQueue<MessageAndSource>(
50,
true,
INCOMING_MESSAGE_QUEUE_ID,
Expand All @@ -47,7 +50,7 @@ abstract class AbstractEndpointMessageTransport(parentLogger: Logger) : MessageH
},
TaskPools.IO_POOL,
Clock.systemUTC()
)
).apply { setErrorHandler(queueErrorCounter) }

/**
* Fires the message transport ready event for the associated endpoint.
Expand Down Expand Up @@ -97,5 +100,23 @@ abstract class AbstractEndpointMessageTransport(parentLogger: Logger) : MessageH

companion object {
const val INCOMING_MESSAGE_QUEUE_ID = "bridge-channel-message-incoming-queue"
private val droppedPacketsMetric = VideobridgeMetricsContainer.instance.registerCounter(
"endpoint_receive_message_queue_dropped_packets",
"Number of packets dropped out of the Endpoint receive message queue."
)
private val exceptionsMetric = VideobridgeMetricsContainer.instance.registerCounter(
"endpoint_receive_message_queue_exceptions",
"Number of exceptions from the Endpoint receive message queue."
)
val queueErrorCounter = object : CountingErrorHandler() {
override fun packetDropped() = super.packetDropped().also {
droppedPacketsMetric.inc()
QueueMetrics.droppedPackets.inc()
}
override fun packetHandlingFailed(t: Throwable?) = super.packetHandlingFailed(t).also {
exceptionsMetric.inc()
QueueMetrics.exceptions.inc()
}
}
}
}
68 changes: 68 additions & 0 deletions jvb/src/main/kotlin/org/jitsi/videobridge/ColibriQueue.kt
@@ -0,0 +1,68 @@
/*
* Copyright @ 2024 - present 8x8, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jitsi.videobridge

import org.jitsi.utils.queue.CountingErrorHandler
import org.jitsi.utils.queue.PacketQueue
import org.jitsi.videobridge.metrics.QueueMetrics
import org.jitsi.videobridge.metrics.VideobridgeMetricsContainer
import org.jitsi.videobridge.util.TaskPools
import org.jitsi.videobridge.xmpp.XmppConnection
import java.time.Clock
import kotlin.Int.Companion.MAX_VALUE

abstract class ColibriQueue(packetHandler: PacketHandler<XmppConnection.ColibriRequest>) :
PacketQueue<XmppConnection.ColibriRequest>(
MAX_VALUE,
true,
QUEUE_NAME,
packetHandler,
TaskPools.IO_POOL,
// TODO: using the Videobridge clock breaks tests somehow
Clock.systemUTC(),
// Allow running tasks to complete (so we can close the queue from within the task).
false,
) {
init {
setErrorHandler(queueErrorCounter)
}

companion object {
val QUEUE_NAME = "colibri-queue"

val droppedPacketsMetric = VideobridgeMetricsContainer.instance.registerCounter(
"colibri_queue_dropped_packets",
"Number of packets dropped out of the Colibri queue."
)

val exceptionsMetric = VideobridgeMetricsContainer.instance.registerCounter(
"colibri_queue_exceptions",
"Number of exceptions from the Colibri queue."
)

/** Count the number of dropped packets and exceptions. */
val queueErrorCounter = object : CountingErrorHandler() {
override fun packetDropped() = super.packetDropped().also {
droppedPacketsMetric.inc()
QueueMetrics.droppedPackets.inc()
}
override fun packetHandlingFailed(t: Throwable?) = super.packetHandlingFailed(t).also {
exceptionsMetric.inc()
QueueMetrics.exceptions.inc()
}
}
}
}
27 changes: 23 additions & 4 deletions jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt
Expand Up @@ -66,7 +66,9 @@ import org.jitsi.videobridge.message.BridgeChannelMessage
import org.jitsi.videobridge.message.ForwardedSourcesMessage
import org.jitsi.videobridge.message.ReceiverVideoConstraintsMessage
import org.jitsi.videobridge.message.SenderSourceConstraintsMessage
import org.jitsi.videobridge.metrics.QueueMetrics
import org.jitsi.videobridge.metrics.VideobridgeMetrics
import org.jitsi.videobridge.metrics.VideobridgeMetricsContainer
import org.jitsi.videobridge.relay.AudioSourceDesc
import org.jitsi.videobridge.relay.RelayedEndpoint
import org.jitsi.videobridge.rest.root.debug.EndpointDebugFeatures
Expand Down Expand Up @@ -1109,11 +1111,28 @@ class Endpoint @JvmOverloads constructor(
*/
private const val OPEN_DATA_CHANNEL_LOCALLY = false

/**
* Count the number of dropped packets and exceptions.
*/
private val droppedPacketsMetric = VideobridgeMetricsContainer.instance.registerCounter(
"srtp_send_queue_dropped_packets",
"Number of packets dropped out of the Endpoint SRTP send queue."
)

private val exceptionsMetric = VideobridgeMetricsContainer.instance.registerCounter(
"srtp_send_queue_exceptions",
"Number of exceptions from the Endpoint SRTP send queue."
)

/** Count the number of dropped packets and exceptions. */
@JvmField
val queueErrorCounter = CountingErrorHandler()
val queueErrorCounter = object : CountingErrorHandler() {
override fun packetDropped() = super.packetDropped().also {
droppedPacketsMetric.inc()
QueueMetrics.droppedPackets.inc()
}
override fun packetHandlingFailed(t: Throwable?) = super.packetHandlingFailed(t).also {
exceptionsMetric.inc()
QueueMetrics.exceptions.inc()
}
}

/**
* The executor which runs bandwidth probing.
Expand Down
106 changes: 106 additions & 0 deletions jvb/src/main/kotlin/org/jitsi/videobridge/metrics/JvmMetrics.kt
@@ -0,0 +1,106 @@
/*
* Copyright @ 2024 - present 8x8, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jitsi.videobridge.metrics

import com.sun.management.UnixOperatingSystemMXBean
import org.jitsi.config.JitsiConfig
import org.jitsi.metaconfig.config
import org.jitsi.utils.logging2.createLogger
import java.lang.management.ManagementFactory
import org.jitsi.videobridge.metrics.VideobridgeMetricsContainer.Companion.instance as metricsContainer

class JvmMetrics private constructor() {
val logger = createLogger()

private val gcType = ManagementFactory.getGarbageCollectorMXBeans().firstOrNull()?.name.let {
when {
it?.contains("shenandoah", ignoreCase = true) == true -> GcType.Shenandoah
it?.contains("zgc", ignoreCase = true) == true -> GcType.Zgc
it?.contains("g1", ignoreCase = true) == true -> GcType.G1
else -> GcType.Other
}
}.also {
logger.info("Detected GC type $it")
}

fun update() {
threadCount.set(ManagementFactory.getThreadMXBean().threadCount.toLong())
gcCount.set(
ManagementFactory.getGarbageCollectorMXBeans().sumOf { it.collectionCount }
)
gcTime.set(
ManagementFactory.getGarbageCollectorMXBeans().sumOf { it.collectionTime }
)
(ManagementFactory.getOperatingSystemMXBean() as? UnixOperatingSystemMXBean)?.let {
openFdCount.set(it.openFileDescriptorCount)
}
if (gcType != GcType.Other) {
ManagementFactory.getMemoryPoolMXBeans().find { it.name == gcType.memoryPoolName }?.let {
heapUsed.set(it.usage.used)
heapCommitted.set(it.usage.committed)
}
}
}

val threadCount = metricsContainer.registerLongGauge(
"thread_count",
"Current number of JVM threads."
)

private val gcCount = metricsContainer.registerLongGauge(
"jvm_gc_count",
"Garbage collection count."
)

private val gcTime = metricsContainer.registerLongGauge(
"jvm_gc_time",
"Garbage collection time."
)

private val heapCommitted = metricsContainer.registerLongGauge(
"jvm_heap_committed",
"Capacity of the main memory pool for the heap (GC type specific)."
)

private val heapUsed = metricsContainer.registerLongGauge(
"jvm_heap_used",
"Usage of the main memory pool for the heap (GC type specific)."
)

private val openFdCount = metricsContainer.registerLongGauge(
"jvm_open_fd_count",
"Number of open file descriptors."
)

private enum class GcType(
/** The name of the memory pool we're interested with this type of GC */
val memoryPoolName: String?
) {
G1("G1 Old Gen"),
Zgc("ZHeap"),
Shenandoah("Shenandoah"),
Other(null)
}

companion object {
val enable: Boolean by config {
"videobridge.stats.jvm.enabled".from(JitsiConfig.newConfig)
}

val INSTANCE = if (enable) JvmMetrics() else null
fun update() = INSTANCE?.update()
}
}
7 changes: 6 additions & 1 deletion jvb/src/main/kotlin/org/jitsi/videobridge/metrics/Metrics.kt
Expand Up @@ -40,7 +40,12 @@ object Metrics {
val lock: Any
get() = metricsUpdater

fun start() = metricsUpdater.addUpdateTask { ThreadsMetric.update() }
fun start() {
if (JvmMetrics.enable) {
metricsUpdater.addUpdateTask { JvmMetrics.update() }
}
QueueMetrics.init()
}
fun stop() {
metricsUpdater.stop()
executor.shutdown()
Expand Down

0 comments on commit acf024b

Please sign in to comment.