Skip to content

Commit

Permalink
allegro#1748 Add monitoring for Zookeeper cache executor service
Browse files Browse the repository at this point in the history
  • Loading branch information
michal494 committed Feb 28, 2024
1 parent 642a2a6 commit a5ce600
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 56 deletions.
Original file line number Diff line number Diff line change
@@ -1,23 +1,35 @@
package pl.allegro.tech.hermes.common.di.factories;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.curator.framework.CuratorFramework;
import pl.allegro.tech.hermes.common.cache.queue.LinkedHashSetBlockingQueue;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.infrastructure.zookeeper.cache.ModelAwareZookeeperNotifyingCache;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ModelAwareZookeeperNotifyingCacheFactory {

private final CuratorFramework curator;

private final MetricsFacade metricsFacade;

private final ZookeeperParameters zookeeperParameters;

public ModelAwareZookeeperNotifyingCacheFactory(CuratorFramework curator, ZookeeperParameters zookeeperParameters) {
public ModelAwareZookeeperNotifyingCacheFactory(CuratorFramework curator, MetricsFacade metricaFacade, ZookeeperParameters zookeeperParameters) {
this.curator = curator;
this.metricsFacade = metricaFacade;
this.zookeeperParameters = zookeeperParameters;
}

public ModelAwareZookeeperNotifyingCache provide() {
String rootPath = zookeeperParameters.getRoot();
ExecutorService executor = createExecutor(rootPath, zookeeperParameters.getProcessingThreadPoolSize());
ModelAwareZookeeperNotifyingCache cache = new ModelAwareZookeeperNotifyingCache(
curator, rootPath, zookeeperParameters.getProcessingThreadPoolSize()
curator, executor, rootPath
);
try {
cache.start();
Expand All @@ -26,4 +38,11 @@ public ModelAwareZookeeperNotifyingCache provide() {
}
return cache;
}

private ExecutorService createExecutor(String rootPath, int processingThreadPoolSize) {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(rootPath + "-zk-cache-%d").build();
ExecutorService executor = new ThreadPoolExecutor(1, processingThreadPoolSize,
Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedHashSetBlockingQueue<>(), threadFactory);
return metricsFacade.executor().monitor(executor, rootPath + "zk-cache");
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
package pl.allegro.tech.hermes.common.metric;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.ToDoubleFunction;

public class ExecutorMetrics {
private final HermesMetrics hermesMetrics;
private final MeterRegistry meterRegistry;

public ExecutorMetrics(HermesMetrics hermesMetrics, MeterRegistry meterRegistry) {
this.hermesMetrics = hermesMetrics;
public ExecutorMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}

Expand All @@ -24,38 +20,4 @@ public ExecutorService monitor(ExecutorService executorService, String executorN
public ScheduledExecutorService monitor(ScheduledExecutorService scheduledExecutorService, String executorName) {
return ExecutorServiceMetrics.monitor(meterRegistry, scheduledExecutorService, executorName);
}

public <T> void registerThreadPoolCapacity(String executorName, T stateObj, ToDoubleFunction<T> f) {
hermesMetrics.registerThreadPoolCapacity(executorName, () -> (int) f.applyAsDouble(stateObj));
registerMicrometerGauge("executors.capacity", executorName, stateObj, f);
}

public <T> void registerThreadPoolActiveThreads(String executorName, T stateObj, ToDoubleFunction<T> f) {
hermesMetrics.registerThreadPoolActiveThreads(executorName, () -> (int) f.applyAsDouble(stateObj));
registerMicrometerGauge("executors.active-threads", executorName, stateObj, f);
}

public <T> void registerThreadPoolUtilization(String executorName, T stateObj, ToDoubleFunction<T> f) {
hermesMetrics.registerThreadPoolUtilization(executorName, () -> f.applyAsDouble(stateObj));
registerMicrometerGauge("executors.utilization", executorName, stateObj, f);
}

public <T> void registerThreadPoolTaskQueueCapacity(String executorName, T stateObj, ToDoubleFunction<T> f) {
hermesMetrics.registerThreadPoolTaskQueueCapacity(executorName, () -> (int) f.applyAsDouble(stateObj));
registerMicrometerGauge("executors.task-queue-capacity", executorName, stateObj, f);
}

public <T> void registerThreadPoolTaskQueued(String executorName, T stateObj, ToDoubleFunction<T> f) {
hermesMetrics.registerThreadPoolTaskQueued(executorName, () -> (int) f.applyAsDouble(stateObj));
registerMicrometerGauge("executors.task-queue-size", executorName, stateObj, f);
}

public <T> void registerThreadPoolTaskQueueUtilization(String executorName, T stateObj, ToDoubleFunction<T> f) {
hermesMetrics.registerThreadPoolTaskQueueUtilization(executorName, () -> f.applyAsDouble(stateObj));
registerMicrometerGauge("executors.task-queue-utilization", executorName, stateObj, f);
}

private <T> void registerMicrometerGauge(String name, String executorName, T stateObj, ToDoubleFunction<T> f) {
meterRegistry.gauge(name, Tags.of("executor_name", executorName), stateObj, f);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public MetricsFacade(MeterRegistry meterRegistry, HermesMetrics hermesMetrics) {
this.trackerElasticSearchMetrics = new TrackerElasticSearchMetrics(hermesMetrics, meterRegistry);
this.persistentBufferMetrics = new PersistentBufferMetrics(hermesMetrics, meterRegistry);
this.producerMetrics = new ProducerMetrics(hermesMetrics, meterRegistry);
this.executorMetrics = new ExecutorMetrics(hermesMetrics, meterRegistry);
this.executorMetrics = new ExecutorMetrics(meterRegistry);
this.schemaClientMetrics = new SchemaClientMetrics(hermesMetrics, meterRegistry);
this.undeliveredMessagesMetrics = new UndeliveredMessagesMetrics(hermesMetrics, meterRegistry);
this.deserializationMetrics = new DeserializationMetrics(hermesMetrics, meterRegistry);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
package pl.allegro.tech.hermes.infrastructure.zookeeper.cache;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.common.cache.queue.LinkedHashSetBlockingQueue;
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;

public class ModelAwareZookeeperNotifyingCache {
Expand All @@ -26,15 +22,13 @@ public class ModelAwareZookeeperNotifyingCache {
private static final int SUBSCRIPTION_LEVEL = 2;

private final HierarchicalCache cache;
private final ThreadPoolExecutor executor;
private final ExecutorService executor;

public ModelAwareZookeeperNotifyingCache(CuratorFramework curator, String rootPath, int processingThreadPoolSize) {
public ModelAwareZookeeperNotifyingCache(CuratorFramework curator, ExecutorService executor, String rootPath) {
List<String> levelPrefixes = Arrays.asList(
ZookeeperPaths.GROUPS_PATH, ZookeeperPaths.TOPICS_PATH, ZookeeperPaths.SUBSCRIPTIONS_PATH
);
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(rootPath + "-zk-cache-%d").build();
executor = new ThreadPoolExecutor(1, processingThreadPoolSize,
Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedHashSetBlockingQueue<>(), threadFactory);
this.executor = executor;
this.cache = new HierarchicalCache(
curator,
executor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,11 @@ public InternalNotificationsBus zookeeperInternalNotificationBus(ObjectMapper ob

@Bean(destroyMethod = "stop")
public ModelAwareZookeeperNotifyingCache modelAwareZookeeperNotifyingCache(CuratorFramework curator,
MetricsFacade metricsFacade,
ZookeeperClustersProperties zookeeperClustersProperties,
DatacenterNameProvider datacenterNameProvider) {
ZookeeperProperties zookeeperProperties = zookeeperClustersProperties.toZookeeperProperties(datacenterNameProvider);
return new ModelAwareZookeeperNotifyingCacheFactory(curator, zookeeperProperties).provide();
return new ModelAwareZookeeperNotifyingCacheFactory(curator, metricsFacade, zookeeperProperties).provide();
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private WorkloadSupervisor createConsumer(String consumerId,
workloadProperties.setMonitorScanInterval(Duration.ofSeconds(1));

ModelAwareZookeeperNotifyingCache modelAwareCache = new ModelAwareZookeeperNotifyingCacheFactory(
curator, zookeeperProperties
curator, metricsSupplier.get(), zookeeperProperties
).provide();

InternalNotificationsBus notificationsBus =
Expand Down Expand Up @@ -239,7 +239,7 @@ ConsumersRuntimeMonitor monitor(String consumerId,
Duration monitorScanInterval) {
CuratorFramework curator = consumerZookeeperConnections.get(consumerId);
ModelAwareZookeeperNotifyingCache modelAwareCache =
new ModelAwareZookeeperNotifyingCacheFactory(curator, zookeeperProperties).provide();
new ModelAwareZookeeperNotifyingCacheFactory(curator, metricsSupplier.get(), zookeeperProperties).provide();
InternalNotificationsBus notificationsBus =
new ZookeeperInternalNotificationBus(objectMapper, modelAwareCache);
SubscriptionsCache subscriptionsCache = new NotificationsBasedSubscriptionCache(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,11 @@ public InternalNotificationsBus zookeeperInternalNotificationBus(ObjectMapper ob

@Bean(initMethod = "start", destroyMethod = "stop")
public ModelAwareZookeeperNotifyingCache modelAwareZookeeperNotifyingCache(CuratorFramework curator,
MetricsFacade metricsFacade,
ZookeeperClustersProperties zookeeperClustersProperties,
DatacenterNameProvider datacenterNameProvider) {
ZookeeperProperties zookeeperProperties = zookeeperClustersProperties.toZookeeperProperties(datacenterNameProvider);
return new ModelAwareZookeeperNotifyingCacheFactory(curator, zookeeperProperties).provide();
return new ModelAwareZookeeperNotifyingCacheFactory(curator, metricsFacade, zookeeperProperties).provide();
}

@Bean
Expand Down

0 comments on commit a5ce600

Please sign in to comment.