Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

ISSUE-6891: Expose topic/subscription/consumer number of messages as counter. #953

Closed
sijie opened this issue May 6, 2020 · 0 comments
Closed

Comments

@sijie
Copy link
Member

sijie commented May 6, 2020

Original Issue: apache#6891


Is your feature request related to a problem? Please describe.
Topic metrics are expressed as computed rates, e.g. gauges: pulsar_rate_in pulsar_rate_out. This makes Prometheus scraping miss any change of rate in between the scrape interval.

Describe the solution you'd like
Exposing the metrics as counters instead/in addition to gauges, e.g. pulsar_count_in would allow the rate to be computed in prometheus, and would not miss change of rates within the scraping interval.

Describe alternatives you've considered
Increasing the scrape interval to <15s however it still misses spikes.
I have considered using a sidecar container that would create the counters from gauges, but it adds unwanted complexity.

@sijie sijie closed this as completed May 8, 2020
streamnativebot pushed a commit that referenced this issue Aug 27, 2020
### Motivation

A deadlock occurred on our Pulsar 2.4.2 broker server. The cause is the following two threads:
```
"prometheus-stats-36-1" #410 prio=5 os_prio=0 tid=0x00007f4b70019800 nid=0x30ca waiting for monitor entry [0x00007f4bbe3b7000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.pulsar.broker.service.persistent.PersistentSubscription.getNumberOfEntriesDelayed(PersistentSubscription.java:1013)
        - waiting to lock <0x00007f913d098dd0> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$getTopicStats$8(NamespaceStatsAggregator.java:129)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$523/2109257042.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.getTopicStats(NamespaceStatsAggregator.java:122)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$null$0(NamespaceStatsAggregator.java:64)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$521/1017174654.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$null$1(NamespaceStatsAggregator.java:63)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$520/1098830264.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$generate$2(NamespaceStatsAggregator.java:62)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$316/212211274.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.generate(NamespaceStatsAggregator.java:59)
        at org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator.generate(PrometheusMetricsGenerator.java:73)
        at org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet.lambda$doGet$0(PrometheusMetricsServlet.java:70)
        at org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet$$Lambda$315/1221766138.run(Unknown Source)
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

"ForkJoinPool.commonPool-worker-104" #953 daemon prio=5 os_prio=0 tid=0x00007f4dc8030800 nid=0x3b87 waiting on condition [0x00007f48f6ce1000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007f913d08b5c8> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
        at java.util.concurrent.locks.StampedLock.acquireRead(StampedLock.java:1215)
        at java.util.concurrent.locks.StampedLock.readLock(StampedLock.java:428)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:377)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.values(ConcurrentOpenHashMap.java:174)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getNumberOfConsumers(PersistentTopic.java:1227)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.isConsumersExceededOnTopic(PersistentDispatcherMultipleConsumers.java:178)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:144)
        - locked <0x00007f91120dc258> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
        at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:238)
        - locked <0x00007f913d098dd0> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$subscribe$11(PersistentTopic.java:590)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$451/1414070467.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
        at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:584)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$408/1168861154.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
        at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$406/1351396211.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:163)
```

`prometheus-stats-36-1` was trying to lock `PersistentSubscription` (and `PersistentDispatcherMultipleConsumers`) after locking `PersistentTopic#subscriptions`, an instance of `ConcurrentOpenHashMap`.
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java#L122-L129

On the other hand, `ForkJoinPool.commonPool-worker-104` was trying to lock these instances in reverse order.
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L176-L237
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L127-L144
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L159-L175
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1212-L1214

`PersistentSubscription#getNumberOfEntriesDelayed()` is no longer used in the master code, but it seems that this deadlock has not yet been resolved.
https://github.com/apache/pulsar/blob/e06e8726847584700d9e4fc98fd56a495eb05a23/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1528-L1529
https://github.com/apache/pulsar/blob/17ae233a5d0fa364048b7c30ec90b8f7291d0d07/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L967-L1000
https://github.com/apache/pulsar/blob/6e7d1a83c3c2737610f01cb372f61e2b830a62f7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L809

### Modifications

Moved the `isConsumersExceededOnTopic()` method to check the number of connected consumers when adding a consumer to the topic from the `Dispatcher` classes to the `AbstractTopic` class. Avoid the deadlock mentioned above by calling `PersistentTopic#getNumberOfConsumers()` before locking the `PersistentSubscription` instance.
zymap pushed a commit that referenced this issue Nov 3, 2020
### Motivation

A deadlock occurred on our Pulsar 2.4.2 broker server. The cause is the following two threads:
```
"prometheus-stats-36-1" #410 prio=5 os_prio=0 tid=0x00007f4b70019800 nid=0x30ca waiting for monitor entry [0x00007f4bbe3b7000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.pulsar.broker.service.persistent.PersistentSubscription.getNumberOfEntriesDelayed(PersistentSubscription.java:1013)
        - waiting to lock <0x00007f913d098dd0> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$getTopicStats$8(NamespaceStatsAggregator.java:129)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$523/2109257042.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.getTopicStats(NamespaceStatsAggregator.java:122)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$null$0(NamespaceStatsAggregator.java:64)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$521/1017174654.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$null$1(NamespaceStatsAggregator.java:63)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$520/1098830264.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.lambda$generate$2(NamespaceStatsAggregator.java:62)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator$$Lambda$316/212211274.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.generate(NamespaceStatsAggregator.java:59)
        at org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator.generate(PrometheusMetricsGenerator.java:73)
        at org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet.lambda$doGet$0(PrometheusMetricsServlet.java:70)
        at org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet$$Lambda$315/1221766138.run(Unknown Source)
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

"ForkJoinPool.commonPool-worker-104" #953 daemon prio=5 os_prio=0 tid=0x00007f4dc8030800 nid=0x3b87 waiting on condition [0x00007f48f6ce1000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007f913d08b5c8> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
        at java.util.concurrent.locks.StampedLock.acquireRead(StampedLock.java:1215)
        at java.util.concurrent.locks.StampedLock.readLock(StampedLock.java:428)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:377)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.values(ConcurrentOpenHashMap.java:174)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getNumberOfConsumers(PersistentTopic.java:1227)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.isConsumersExceededOnTopic(PersistentDispatcherMultipleConsumers.java:178)
        at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:144)
        - locked <0x00007f91120dc258> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
        at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:238)
        - locked <0x00007f913d098dd0> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$subscribe$11(PersistentTopic.java:590)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$451/1414070467.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
        at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:584)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$408/1168861154.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
        at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$406/1351396211.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:163)
```

`prometheus-stats-36-1` was trying to lock `PersistentSubscription` (and `PersistentDispatcherMultipleConsumers`) after locking `PersistentTopic#subscriptions`, an instance of `ConcurrentOpenHashMap`.
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java#L122-L129

On the other hand, `ForkJoinPool.commonPool-worker-104` was trying to lock these instances in reverse order.
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L176-L237
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L127-L144
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L159-L175
https://github.com/apache/pulsar/blob/v2.4.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1212-L1214

`PersistentSubscription#getNumberOfEntriesDelayed()` is no longer used in the master code, but it seems that this deadlock has not yet been resolved.
https://github.com/apache/pulsar/blob/e06e8726847584700d9e4fc98fd56a495eb05a23/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1528-L1529
https://github.com/apache/pulsar/blob/17ae233a5d0fa364048b7c30ec90b8f7291d0d07/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L967-L1000
https://github.com/apache/pulsar/blob/6e7d1a83c3c2737610f01cb372f61e2b830a62f7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L809

### Modifications

Moved the `isConsumersExceededOnTopic()` method to check the number of connected consumers when adding a consumer to the topic from the `Dispatcher` classes to the `AbstractTopic` class. Avoid the deadlock mentioned above by calling `PersistentTopic#getNumberOfConsumers()` before locking the `PersistentSubscription` instance.

(cherry picked from commit 23dc5c7)
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

2 participants