Skip to content

Commit

Permalink
Add distribution metric to semantic-core and ffwd-reporter (#79)
Browse files Browse the repository at this point in the history
* Add distribution metric to semantic-core and ffwd-reporter

* Update javadoc

* Fix build failure

* Switch to ffw-java-client version 0.22

* Address PR comments:
  1. Remove SemanticMetricRegistryListenerV2
  2. Added Default methods to SemanticMetricRegistryListener
  3. We understand that there is a chance for SemanticMetricRegistry to call a no op methods.
     This can lead to confusion hopefully users of this lib will look at the source code or documentation.

* Change the name of Distribution Implementation and add Threadsafety

* Add javadoc  to Distribution

* Revert SemanticMetricRegistryAdapter changes and remove MetricRegistryListener

* Addresse PR's comments: remove code re-ordering and rename SemanticMetricRegistry.getDistribution

* Revert re-ordering in FastForwardReporter

* Additional info regarding Distribution output

* Update copyright licence agremment header
  • Loading branch information
ao2017 committed Sep 9, 2020
1 parent 2533992 commit 6244f29
Show file tree
Hide file tree
Showing 15 changed files with 379 additions and 29 deletions.
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
<artifactId>semantic-metrics-api</artifactId>
</dependency>

<dependency>
<groupId>com.tdunning</groupId>
<artifactId>t-digest</artifactId>
</dependency>

<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
Expand Down
66 changes: 66 additions & 0 deletions core/src/main/java/com/spotify/metrics/core/Distribution.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (C) 2016 - 2020 Spotify AB.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.spotify.metrics.core;

import com.codahale.metrics.Counting;
import com.codahale.metrics.Metric;

import java.nio.ByteBuffer;

/**
* {@link Distribution} is a simple interface that allows users to record measurements
* to compute rank statistics on data distribution not just local source.
*
* <p>Every implementation should produce a serialized data sketch in a byteBuffer
* as this metric point value. For more information on how this is handled upstream,
* Please refer to
* <a href="https://github.com/spotify/ffwd-client-java/blob/master/ffwd-
* client/src/main/java/com/spotify/ffwd/FastForward.java#L110"/> FastForward Java client</a>
*
* <p>Unlike traditional histogram, {@link Distribution} doesn't require
* predefined percentile value. Data recorded
* can be used upstream to compute any percentile.
*
* <p>This Distribution doesn't require any binning configuration.
* Just get an instance through SemanticMetricBuilder and record data.
*
* <p> {@link Distribution} is a good choice if you care about percentile accuracy in
* a distributed environment and you want to rely on P99 to set SLO.
*/
public interface Distribution extends Metric, Counting {

/**
* Record value from Min.Double to Max.Double.
* @param val
*/
void record(double val);

/**
* Return distribution point value and flush.
* When this method is called every internal state
* is reset and a new recording starts.
*
* @return
*/
ByteBuffer getValueAndFlush();

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@
* A quick and easy way of capturing the notion of default metrics.
*/
public interface SemanticMetricBuilder<T extends Metric> {

SemanticMetricBuilder<Distribution> DISTRIBUTION = new SemanticMetricBuilder<Distribution>() {
@Override
public Distribution newMetric() {
return new SemanticMetricDistribution();
}

@Override
public boolean isInstance(final Metric metric) {
return Distribution.class.isInstance(metric);
}
};

SemanticMetricBuilder<Counter> COUNTERS = new SemanticMetricBuilder<Counter>() {
@Override
public Counter newMetric() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright (C) 2016 - 2020 Spotify AB.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.spotify.metrics.core;


import com.google.common.annotations.VisibleForTesting;
import com.tdunning.math.stats.TDigest;

import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicReference;

/**
* Semantic Metric implementation of {@link Distribution}.
* This implementation ensures threadsafety for recording data
* and retrieving distribution point value.
*
* {@link SemanticMetricDistribution} is backed by Ted Dunning T-digest implementation.
*
* <p>{@link TDigest} "sketch" are generated by clustering real-valued samples and
* retaining the mean and number of samples for each cluster.
* The generated data structure is mergeable and produces fairly
* accurate percentile even for long-tail distribution.
*
* <p> We are using T-digest compression level of 100.
* With that level of compression, our own benchmark using Pareto distribution
* dataset, shows P99 error rate is less than 2% .
* From P99.9 to P99.999 the error rate is slightly higher than 2%.
*
*/
public final class SemanticMetricDistribution implements Distribution {

private static final int COMPRESSION_DEFAULT_LEVEL = 100;
private final AtomicReference<TDigest> distRef;

SemanticMetricDistribution() {
this.distRef = new AtomicReference<>(create());
}

@Override
public synchronized void record(double val) {
distRef.get().add(val);
}

@Override
public java.nio.ByteBuffer getValueAndFlush() {
TDigest curVal;
synchronized (this) {
curVal = distRef.getAndSet(create()); // reset tdigest
}
ByteBuffer byteBuffer = ByteBuffer.allocate(curVal.smallByteSize());
curVal.asSmallBytes(byteBuffer);
return byteBuffer;
}


@Override
public long getCount() {
return distRef.get().size();
}

@VisibleForTesting
TDigest tDigest() {
return distRef.get();
}

private TDigest create() {
return TDigest.createDigest(COMPRESSION_DEFAULT_LEVEL);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import com.codahale.metrics.Metric;
import com.codahale.metrics.Reservoir;
import com.codahale.metrics.Timer;
import org.checkerframework.checker.nullness.qual.NonNull;

import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -48,7 +50,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Supplier;
import org.checkerframework.checker.nullness.qual.NonNull;

/**
* A registry of metric instances.
Expand Down Expand Up @@ -77,7 +78,7 @@ public SemanticMetricRegistry() {

public SemanticMetricRegistry(final Supplier<Reservoir> defaultReservoirSupplier) {
this.metrics = new ConcurrentHashMap<MetricId, Metric>();
this.listeners = new CopyOnWriteArrayList<SemanticMetricRegistryListener>();
this.listeners = new CopyOnWriteArrayList<>();
this.defaultReservoirSupplier = defaultReservoirSupplier;
}

Expand All @@ -86,7 +87,7 @@ public SemanticMetricRegistry(
final Supplier<Reservoir> defaultReservoirSupplier
) {
this.metrics = metrics;
this.listeners = new CopyOnWriteArrayList<SemanticMetricRegistryListener>();
this.listeners = new CopyOnWriteArrayList<>();
this.defaultReservoirSupplier = defaultReservoirSupplier;
}

Expand Down Expand Up @@ -134,7 +135,7 @@ public <T extends Metric> T register(@NonNull final MetricId name, @NonNull fina
/**
* Given a metric set, registers them.
*
* @param metrics a set of metrics
* @param metrics a set of metrics
* @throws IllegalArgumentException if any of the names are already registered
*/
public void registerAll(final SemanticMetricSet metrics) throws IllegalArgumentException {
Expand Down Expand Up @@ -162,14 +163,21 @@ public Histogram histogram(final MetricId name) {
SemanticMetricBuilderFactory.histogramWithReservoir(defaultReservoirSupplier));
}



/**
* Creates a new {@link Distribution} or return an existing one registers under the given name.
*
* @param name the name of the metric
* @return a new {@link Distribution}
*/
public Distribution distribution(final MetricId name) {
return getOrAdd(name, SemanticMetricBuilder.DISTRIBUTION);
}

/**
* Creates a new {@link Histogram} with a custom {@link Reservoir} and registers it under
* the given name.
*
* @param name the name of the metric
* @param name the name of the metric
* @param reservoirSupplier a {@link Supplier} that returns an instance of {@link Reservoir}
* @return a new {@link Histogram}
*/
Expand Down Expand Up @@ -204,7 +212,7 @@ public Timer timer(final MetricId name) {
* Creates a new {@link Timer} with a custom {@link Reservoir} and registers it under the given
* name.
*
* @param name the name of the metric
* @param name the name of the metric
* @param reservoirSupplier a {@link Supplier} that returns an instance of {@link Reservoir}
* @return a new {@link Timer}
*/
Expand Down Expand Up @@ -293,7 +301,7 @@ public SortedMap<MetricId, Gauge> getGauges() {
/**
* Returns a map of all the gauges in the registry and their names which match the given filter.
*
* @param filter the metric filter to match
* @param filter the metric filter to match
* @return all the gauges in the registry
*/
@SuppressWarnings("rawtypes")
Expand All @@ -314,7 +322,7 @@ public SortedMap<MetricId, Counter> getCounters() {
* Returns a map of all the counters in the registry and their names which match the given
* filter.
*
* @param filter the metric filter to match
* @param filter the metric filter to match
* @return all the counters in the registry
*/
public SortedMap<MetricId, Counter> getCounters(final SemanticMetricFilter filter) {
Expand All @@ -334,7 +342,7 @@ public SortedMap<MetricId, Histogram> getHistograms() {
* Returns a map of all the histograms in the registry and their names which match the given
* filter.
*
* @param filter the metric filter to match
* @param filter the metric filter to match
* @return all the histograms in the registry
*/
public SortedMap<MetricId, Histogram> getHistograms(final SemanticMetricFilter filter) {
Expand All @@ -353,7 +361,7 @@ public SortedMap<MetricId, Meter> getMeters() {
/**
* Returns a map of all the meters in the registry and their names which match the given filter.
*
* @param filter the metric filter to match
* @param filter the metric filter to match
* @return all the meters in the registry
*/
public SortedMap<MetricId, Meter> getMeters(final SemanticMetricFilter filter) {
Expand All @@ -372,7 +380,7 @@ public SortedMap<MetricId, Timer> getTimers() {
/**
* Returns a map of all the timers in the registry and their names which match the given filter.
*
* @param filter the metric filter to match
* @param filter the metric filter to match
* @return all the timers in the registry
*/
public SortedMap<MetricId, Timer> getTimers(final SemanticMetricFilter filter) {
Expand All @@ -383,23 +391,35 @@ public SortedMap<MetricId, Timer> getTimers(final SemanticMetricFilter filter) {
* Returns a map of all the deriving meters in the registry and their names which match the
* given filter.
*
* @param filter the metric filter to match
* @param filter the metric filter to match
* @return all the deriving meters in the registry
*/
public SortedMap<MetricId, DerivingMeter> getDerivingMeters(final SemanticMetricFilter filter) {
return getMetrics(DerivingMeter.class, filter);
}

/**
* Atomically adds the given metric to the set of metrics.
* Returns a map of all the distributions metrics in the registry and their
* names which match the given filter.
*
* @param filter the metric filter to match
* @return a sorted Map of distribution metrics
*/
public SortedMap<MetricId, Distribution> getDistributions(final SemanticMetricFilter filter) {
return getMetrics(Distribution.class, filter);
}


/**
* Atomically adds the given metric to the set of metrics.
* <p>
* A side effect of this method is the calling of {@link #onMetricAdded(MetricId, Metric)} if
* a new
* metric is added.
*
* <p>
* This method should only be used on non-{@code SemanticMetricSet} metrics.
*
* @param name Name of the metric to atomically add if absent.
* @param name Name of the metric to atomically add if absent.
* @param metric The metric to atomically add if absent.
* @return {@code null} if the metric was added, or the previously mapped metric.
*/
Expand Down Expand Up @@ -493,11 +513,14 @@ private void notifyListenerOfAddedMetric(
listener.onTimerAdded(name, (Timer) metric);
} else if (metric instanceof DerivingMeter) {
listener.onDerivingMeterAdded(name, (DerivingMeter) metric);
} else if (metric instanceof Distribution) {
listener.onDistributionAdded(name, (Distribution) metric);
} else {
throw new IllegalArgumentException("Unknown metric type: " + metric.getClass());
}
}


protected void onMetricRemoved(final MetricId name, final Metric metric) {
for (final SemanticMetricRegistryListener listener : listeners) {
notifyListenerOfRemovedMetric(name, metric, listener);
Expand All @@ -519,6 +542,8 @@ private void notifyListenerOfRemovedMetric(
listener.onTimerRemoved(name);
} else if (metric instanceof DerivingMeter) {
listener.onDerivingMeterRemoved(name);
} else if (metric instanceof Distribution) {
listener.onDistributionRemoved(name);
} else {
throw new IllegalArgumentException("Unknown metric type: " + metric.getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public void onTimerRemoved(String name) {
tryRemove(name);
}


private <T extends Metric> void tryRegister(String name, T metric) {
try {
MetricId metricId = metricIdBuilder.buildMetricId(name);
Expand Down
Loading

0 comments on commit 6244f29

Please sign in to comment.