Skip to content

Commit

Permalink
ISPN-15032 Use site and channel names as tag for metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
pruivo authored and ryanemerson committed Aug 28, 2023
1 parent ee5dda9 commit 10c7387
Show file tree
Hide file tree
Showing 12 changed files with 184 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -93,9 +95,12 @@ public static final class AttributeMetadata {
private final boolean is;
private final Function<?, ?> getterFunction; // optional
private final BiConsumer<?, ?> setterFunction; // optional
private final Map<String, String> tags; // optional
private final boolean clusterWide;

public AttributeMetadata(String name, String description, boolean writable, boolean useSetter, String type,
boolean is, Function<?, ?> getterFunction, BiConsumer<?, ?> setterFunction) {
boolean is, Function<?, ?> getterFunction, BiConsumer<?, ?> setterFunction, boolean clusterWide,
Map<String, String> tags) {
this.name = name;
this.description = description;
this.writable = writable;
Expand All @@ -104,6 +109,18 @@ public AttributeMetadata(String name, String description, boolean writable, bool
this.is = is;
this.getterFunction = getterFunction;
this.setterFunction = setterFunction;
this.clusterWide = clusterWide;
this.tags = tags == null ? Collections.emptyMap() : tags;
}

public AttributeMetadata(String name, String description, boolean writable, boolean useSetter, String type,
boolean is, Function<?, ?> getterFunction, BiConsumer<?, ?> setterFunction, boolean clusterWide) {
this(name, description, writable, useSetter, type, is, getterFunction, setterFunction, clusterWide, null);
}

public AttributeMetadata(String name, String description, boolean writable, boolean useSetter, String type,
boolean is, Function<?, ?> getterFunction, BiConsumer<?, ?> setterFunction) {
this(name, description, writable, useSetter, type, is, getterFunction, setterFunction, false, null);
}

public String getName() {
Expand Down Expand Up @@ -144,6 +161,14 @@ public Consumer<?> setter(Object instance) {
return (v) -> ((BiConsumer<Object, Object>) setterFunction).accept(instance, v);
}

public boolean isClusterWide() {
return clusterWide;
}

public Map<String, String> tags() {
return tags;
}

@Override
public String toString() {
return "AttributeMetadata{" +
Expand All @@ -152,8 +177,10 @@ public String toString() {
", writable=" + writable +
", type='" + type + '\'' +
", is=" + is +
", clusterWide=" + clusterWide +
", getterFunction=" + getterFunction +
", setterFunction=" + setterFunction +
", tags=" + tags +
'}';
}
}
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/org/infinispan/metrics/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,10 @@ public interface Constants {
String CACHE_MANAGER_TAG_NAME = "cache_manager";

String CACHE_TAG_NAME = "cache";

String JGROUPS_PREFIX = "jgroups_";

String JGROUPS_CLUSTER_TAG_NAME = "cluster";

String SITE_TAG_NAME = "site";
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private void processComponents() {
Set<Object> ids = registerMetrics(instance, beanMetadata.getJmxObjectName(), beanMetadata.getAttributes(), null, component.getName(), null);
metricIds.addAll(ids);
if (instance instanceof CustomMetricsSupplier) {
metricIds.addAll(registerMetrics(instance, beanMetadata.getJmxObjectName(), ((CustomMetricsSupplier) instance).getCustomMetrics(), null, component.getName(), null));
metricIds.addAll(registerMetrics(instance, beanMetadata.getJmxObjectName(), ((CustomMetricsSupplier) instance).getCustomMetrics(globalConfig.metrics().namesAsTags()), null, component.getName(), null));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.infinispan.factories.impl.MBeanMetadata.AttributeMetadata;

import java.util.Collection;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;

Expand All @@ -15,17 +16,24 @@
* The main goal is to allow some dynamic metrics (i.e. metrics that depends on some configuration). As an example, the
* Cross-Site response time for each configured site.
* <p>
* {@link MetricUtils#createGauge(String, String, Function)} or {@link MetricUtils#createTimer(String, String,
* BiConsumer)} can be used to create this custom metrics.
* {@link MetricUtils#createGauge(String, String, Function, Map)} or
* {@link MetricUtils#createTimer(String, String, BiConsumer, java.util.Map)} can be used to create this custom
* metrics.
*
* @author Pedro Ruivo
* @since 13.0
*/
public interface CustomMetricsSupplier {

/**
* Extra metrics to be registered.
* <p>
* These can be dynamic metrics that cannot use {@link ManagedAttribute} annotation. Extra tags can be set in
* {@link AttributeMetadata}.
*
* @param nameAsTag True if the cache name or any other identifier must be set as Tags instead of metric name.
* @return A list of {@link AttributeMetadata} to be registered.
*/
Collection<AttributeMetadata> getCustomMetrics();
Collection<AttributeMetadata> getCustomMetrics(boolean nameAsTag);

}
19 changes: 12 additions & 7 deletions core/src/main/java/org/infinispan/metrics/impl/MetricUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static org.infinispan.factories.impl.MBeanMetadata.AttributeMetadata;

import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;

Expand All @@ -22,29 +23,33 @@ private MetricUtils() {
/**
* Creates a Gauge metric.
*
* @param <C> The instance type.
* @param name The metric name.
* @param description The metric description.
* @param getterFunction The {@link Function} invoked to return the metric value
* @param <C> The instance type.
* @param getterFunction The {@link Function} invoked to return the metric value.
* @param tags The metric tags if supported.
* @return The {@link AttributeMetadata} to be registered.
*/
public static <C> AttributeMetadata createGauge(String name, String description,
Function<C, Number> getterFunction) {
return new AttributeMetadata(name, description, false, false, null, false, getterFunction, null);
Function<C, Number> getterFunction,
Map<String, String> tags) {
return new AttributeMetadata(name, description, false, false, null, false, getterFunction, null, false, tags);
}

/**
* Creates a Timer metric.
*
* @param <C> The instance type.
* @param name The metric name.
* @param description The metrics description.
* @param setterFunction The {@link BiConsumer} invoked with the {@link TimerTracker} instance to update.
* @param <C> The instance type.
* @param tags The metric tags if supported.
* @return The {@link AttributeMetadata} to be registered.
*/
public static <C> AttributeMetadata createTimer(String name, String description,
BiConsumer<C, TimerTracker> setterFunction) {
return new AttributeMetadata(name, description, false, false, null, false, null, setterFunction);
BiConsumer<C, TimerTracker> setterFunction,
Map<String, String> tags) {
return new AttributeMetadata(name, description, false, false, null, false, null, setterFunction, false, tags);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.infinispan.commons.stat.TimerTracker;
import org.infinispan.configuration.global.GlobalConfiguration;
Expand Down Expand Up @@ -75,6 +77,10 @@ protected void start() {
new BaseAdditionalMetrics().bindTo(registry);
new VendorAdditionalMetrics().bindTo(registry);

if (globalConfig.metrics().namesAsTags()) {
cacheManagerTag = Tag.of(CACHE_MANAGER_TAG_NAME, globalConfig.cacheManagerName());
}

Transport transport = transportRef.running();
String nodeName = transport != null ? transport.getAddress().toString() : globalConfig.transport().nodeName();
if (nodeName == null) {
Expand All @@ -83,10 +89,6 @@ protected void start() {
//throw new CacheConfigurationException("Node name must always be specified in configuration if metrics are enabled.");
}
nodeTag = Tag.of(NODE_TAG_NAME, nodeName);

if (globalConfig.metrics().namesAsTags()) {
cacheManagerTag = Tag.of(CACHE_MANAGER_TAG_NAME, globalConfig.cacheManagerName());
}
}

@Stop
Expand Down Expand Up @@ -132,11 +134,17 @@ public Set<Object> registerMetrics(Object instance, Collection<MBeanMetadata.Att
return registerMetrics(instance, attributes, namePrefix, asTag(CACHE_TAG_NAME, cacheName), asTag(NODE_TAG_NAME, nodeName));
}

public Set<Object> registerJGroupsMetrics(Object instance, Collection<MBeanMetadata.AttributeMetadata> attributes, String protocol, String clusterName, String nodeName) {
String prefix = globalConfig.metrics().namesAsTags() ?
JGROUPS_PREFIX + protocol.toLowerCase() + '_' :
JGROUPS_PREFIX + clusterName + '_' + protocol.toLowerCase() + '_';
return registerMetrics(instance, attributes, prefix, asTag(NODE_TAG_NAME, nodeName), asTag(JGROUPS_CLUSTER_TAG_NAME, clusterName));
}

private Set<Object> registerMetrics(Object instance, Collection<MBeanMetadata.AttributeMetadata> attributes, String namePrefix, Tag ...initialTags) {
Set<Object> metricIds = new HashSet<>(attributes.size());

GlobalMetricsConfiguration metricsCfg = globalConfig.metrics();
List<Tag> tags = prepareTags(initialTags);

for (MBeanMetadata.AttributeMetadata attr : attributes) {
Supplier<Number> getter = (Supplier<Number>) attr.getter(instance);
Expand All @@ -148,7 +156,7 @@ private Set<Object> registerMetrics(Object instance, Collection<MBeanMetadata.At
if (getter != null) {
if (metricsCfg.gauges()) {
Gauge gauge = Gauge.builder(metricName, getter)
.tags(tags)
.tags(prepareTags(attr, initialTags))
.strongReference(true)
.description(attr.getDescription())
.register(registry);
Expand All @@ -163,7 +171,7 @@ private Set<Object> registerMetrics(Object instance, Collection<MBeanMetadata.At
} else {
if (metricsCfg.histograms()) {
Timer timer = Timer.builder(metricName)
.tags(tags)
.tags(prepareTags(attr, initialTags))
.description(attr.getDescription())
.register(registry);

Expand All @@ -187,14 +195,21 @@ private Set<Object> registerMetrics(Object instance, Collection<MBeanMetadata.At
return metricIds;
}

private List<Tag> prepareTags(Tag ...tags) {
List<Tag> allTags = Arrays.stream(tags).filter(Objects::nonNull).collect(Collectors.toList());
private static Tag mapEntryToTag(Map.Entry<String, String> entry) {
return asTag(entry.getKey(), entry.getValue());
}

private List<Tag> prepareTags(MBeanMetadata.AttributeMetadata attr, Tag ...tags) {
Stream<Tag> tagStream = attr.tags().entrySet().stream().map(MetricsCollector::mapEntryToTag);
List<Tag> allTags = Stream.concat(Arrays.stream(tags), tagStream)
.filter(Objects::nonNull)
.collect(Collectors.toList());
if (cacheManagerTag != null) allTags.add(cacheManagerTag);

return allTags;
}

private Tag asTag(String key, String value) {
private static Tag asTag(String key, String value) {
return value != null
? Tag.of(key, value)
: null;
Expand Down
63 changes: 43 additions & 20 deletions core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.infinispan.jmx.annotations.MeasurementType;
import org.infinispan.jmx.annotations.Parameter;
import org.infinispan.jmx.annotations.Units;
import org.infinispan.metrics.Constants;
import org.infinispan.metrics.impl.CustomMetricsSupplier;
import org.infinispan.metrics.impl.MetricUtils;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
Expand Down Expand Up @@ -101,28 +102,50 @@ public class RpcManagerImpl implements RpcManager, JmxStatisticsExposer, CustomM
private volatile RpcOptions syncRpcOptions;

@Override
public Collection<AttributeMetadata> getCustomMetrics() {
public Collection<AttributeMetadata> getCustomMetrics(boolean nameAsTag) {
List<AttributeMetadata> attributes = new LinkedList<>();
for (String site : xSiteMetricsCollector.sites()) {
String lSite = site.toLowerCase();
attributes.add(MetricUtils.<RpcManagerImpl>createGauge("AverageXSiteReplicationTimeTo_" + lSite,
"Average Cross-Site replication time to " + site,
rpcManager -> rpcManager.getAverageXSiteReplicationTimeTo(site)));
attributes.add(MetricUtils.<RpcManagerImpl>createGauge("MinimumXSiteReplicationTimeTo_" + lSite,
"Minimum Cross-Site replication time to " + site,
rpcManager -> rpcManager.getMinimumXSiteReplicationTimeTo(site)));
attributes.add(MetricUtils.<RpcManagerImpl>createGauge("MaximumXSiteReplicationTimeTo_" + lSite,
"Maximum Cross-Site replication time to " + site,
rpcManager -> rpcManager.getMaximumXSiteReplicationTimeTo(site)));
attributes.add(MetricUtils.<RpcManagerImpl>createGauge("NumberXSiteRequestsSentTo_" + lSite,
"Number of Cross-Site request sent to " + site,
rpcManager -> rpcManager.getNumberXSiteRequestsSentTo(site)));
attributes.add(MetricUtils.<RpcManagerImpl>createGauge("NumberXSiteRequestsReceivedFrom_" + lSite,
"Number of Cross-Site request received from " + site,
rpcManager -> rpcManager.getNumberXSiteRequestsReceivedFrom(site)));
attributes.add(MetricUtils.<RpcManagerImpl>createTimer("ReplicationTimesTo_" + lSite,
"Replication times to " + site,
(rpcManager, timer) -> rpcManager.xSiteMetricsCollector.registerTimer(site, timer)));
Map<String, String> tags = Map.of(Constants.SITE_TAG_NAME, site);
if (nameAsTag) {
attributes.add(MetricUtils.<RpcManagerImpl>createGauge("AverageXSiteReplicationTime",
"Average Cross-Site replication time to " + site,
rpcManager -> rpcManager.getAverageXSiteReplicationTimeTo(site), tags));
attributes.add(MetricUtils.<RpcManagerImpl>createGauge("MinimumXSiteReplicationTime",
"Minimum Cross-Site replication time to " + site,
rpcManager -> rpcManager.getMinimumXSiteReplicationTimeTo(site), tags));
attributes.add(MetricUtils.<RpcManagerImpl>createGauge("MaximumXSiteReplicationTime",
"Maximum Cross-Site replication time to " + site,
rpcManager -> rpcManager.getMaximumXSiteReplicationTimeTo(site), tags));
attributes.add(MetricUtils.<RpcManagerImpl>createGauge("NumberXSiteRequestsSent",
"Number of Cross-Site request sent to " + site,
rpcManager -> rpcManager.getNumberXSiteRequestsSentTo(site), tags));
attributes.add(MetricUtils.<RpcManagerImpl>createGauge("NumberXSiteRequestsReceived",
"Number of Cross-Site request received from " + site,
rpcManager -> rpcManager.getNumberXSiteRequestsReceivedFrom(site), tags));
attributes.add(MetricUtils.<RpcManagerImpl>createTimer("ReplicationTimes",
"Replication times to " + site,
(rpcManager, timer) -> rpcManager.xSiteMetricsCollector.registerTimer(site, timer), tags));
} else {
String lSite = site.toLowerCase();
attributes.add(MetricUtils.<RpcManagerImpl>createGauge("AverageXSiteReplicationTimeTo_" + lSite,
"Average Cross-Site replication time to " + site,
rpcManager -> rpcManager.getAverageXSiteReplicationTimeTo(site), tags));
attributes.add(MetricUtils.<RpcManagerImpl>createGauge("MinimumXSiteReplicationTimeTo_" + lSite,
"Minimum Cross-Site replication time to " + site,
rpcManager -> rpcManager.getMinimumXSiteReplicationTimeTo(site), tags));
attributes.add(MetricUtils.<RpcManagerImpl>createGauge("MaximumXSiteReplicationTimeTo_" + lSite,
"Maximum Cross-Site replication time to " + site,
rpcManager -> rpcManager.getMaximumXSiteReplicationTimeTo(site), tags));
attributes.add(MetricUtils.<RpcManagerImpl>createGauge("NumberXSiteRequestsSentTo_" + lSite,
"Number of Cross-Site request sent to " + site,
rpcManager -> rpcManager.getNumberXSiteRequestsSentTo(site), tags));
attributes.add(MetricUtils.<RpcManagerImpl>createGauge("NumberXSiteRequestsReceivedFrom_" + lSite,
"Number of Cross-Site request received from " + site,
rpcManager -> rpcManager.getNumberXSiteRequestsReceivedFrom(site), tags));
attributes.add(MetricUtils.<RpcManagerImpl>createTimer("ReplicationTimesTo_" + lSite,
"Replication times to " + site,
(rpcManager, timer) -> rpcManager.xSiteMetricsCollector.registerTimer(site, timer), tags));
}
}
return attributes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ public class JGroupsTransport implements Transport, ChannelListener {
public static final String CHANNEL_LOOKUP = "channelLookup";
public static final String CHANNEL_CONFIGURATOR = "channelConfigurator";
public static final String SOCKET_FACTORY = "socketFactory";
private static final String METRICS_PREFIX = "jgroups_";
public static final short REQUEST_FLAGS_UNORDERED =
(short) (Message.Flag.OOB.value() | Message.Flag.NO_TOTAL_ORDER.value());
public static final short REQUEST_FLAGS_PER_SENDER = Message.Flag.NO_TOTAL_ORDER.value();
Expand Down Expand Up @@ -1597,19 +1596,14 @@ public void channelConnected(JChannel channel) {
if (isMetricsEnabled()) {
MetricsCollector mc = metricsCollector.wired();
clusters.computeIfAbsent(channel, c -> {
String name = c.clusterName();
String nodeName;
org.jgroups.Address addr = c.getAddress();
if (addr != null) {
nodeName = addr.toString();
} else {
nodeName = c.getName();
}
String clusterName = c.clusterName();
String nodeName= addr != null ? addr.toString() : c.getName();
Set<Object> metrics = new HashSet<>();
for (Protocol protocol : c.getProtocolStack().getProtocols()) {
Collection<MBeanMetadata.AttributeMetadata> attributes = JGroupsMetricsMetadata.PROTOCOL_METADATA.get(protocol.getClass());
if (attributes != null && !attributes.isEmpty()) {
metrics.addAll(mc.registerMetrics(protocol, attributes, METRICS_PREFIX + name + '_' + protocol.getName().toLowerCase() + '_', null, nodeName));
metrics.addAll(mc.registerJGroupsMetrics(protocol, attributes, protocol.getName(), clusterName, nodeName));
}
}
return metrics;
Expand Down

0 comments on commit 10c7387

Please sign in to comment.