Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add distribution metric to semantic-core and ffwd-reporter #79

Merged
merged 13 commits into from
Sep 9, 2020
Merged
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
<artifactId>semantic-metrics-api</artifactId>
</dependency>

<dependency>
<groupId>com.tdunning</groupId>
<artifactId>t-digest</artifactId>
</dependency>

<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
Expand Down
66 changes: 66 additions & 0 deletions core/src/main/java/com/spotify/metrics/core/Distribution.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2016 Spotify AB.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.spotify.metrics.core;

import com.codahale.metrics.Counting;
import com.codahale.metrics.Metric;

import java.nio.ByteBuffer;

/**
* {@link Distribution} is a simple interface that allows users to record measurements
* to compute rank statistics on data distribution not just local source.
*
* <p>Every implementation should produce a serialized data sketch in a byteBuffer
* as this metric point value. For more information on how this is handled upstream,
* Please refer to
* <a href="https://github.com/spotify/ffwd-client-java/blob/master/ffwd-
* client/src/main/java/com/spotify/ffwd/FastForward.java#L110"/> FastForward Java client</a>
*
* <p>Unlike traditional histogram, {@link Distribution} doesn't require
* predefined percentile value. Data recorded
* can be used upstream to compute any percentile.
*
* <p>This Distribution doesn't require any binning configuration.
* Just get an instance through SemanticMetricBuilder and record data.
*
* <p> {@link Distribution} is a good choice if you care about percentile accuracy in
* a distributed environment and you want to rely on P99 to set SLO.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent comment!

*/
public interface Distribution extends Metric, Counting {

/**
* Record value from Min.Double to Max.Double.
* @param val
*/
void record(double val);

/**
* Return distribution point value and flush.
* When this method is called every internal state
* is reset and a new recording starts.
*
* @return
*/
ByteBuffer getValueAndFlush();
Copy link
Member

@mattnworb mattnworb Sep 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still curious about the idea of exposing the "value" of the Distribution as a ByteBuffer. It seems like it would couple this Distribution metric to the way in which the t-digest library serializes the underlying com.tdunning.math.stats.TDigest into bytes.

For instance, if there was an alternate implementation of Distribution in the future, it would have to return the values in the same byte format, since FastForwardReporter has the expectation that the results of getValueAndFlush() must look a certain way.

Is there any other data type that could be used? Or at least could the format of the bytes be documented here?

Copy link
Contributor Author

@ao2017 ao2017 Sep 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use Distribution as the return type then the serializer will have to know something about Distribution implementation. For instance, FFW reporter will have an implementation of this method for every Distribution type.
https://github.com/spotify/semantic-metrics/pull/79/files/4ae53fe81b12e35376529ab79184798e982e7ade#diff-455c8027b591558e13ad0a4d245b2518R265. That will not align with the current paradigm of not exposing the detail of the metric implementation to the serializer.

Documentation for bytes format is handled by FFW. There is a version number, we will use that number upstream to determine how to handle the bytes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Return something immutable like ByteString
  2. Instead of ByteString, return something more semantically meaningful like a DistributionSnapshot wrapper

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I am not sure about DistributionSnapshot, that will expose Distribution implementation to ffwReporter as noted above.
  2. Let's merge this branch. Bytebuffer is transformed into ByteString before serialization in ffw-client. I will change the return type later.


}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,19 @@
* A quick and easy way of capturing the notion of default metrics.
*/
public interface SemanticMetricBuilder<T extends Metric> {

SemanticMetricBuilder<Distribution> DISTRIBUTION = new SemanticMetricBuilder<Distribution>() {
ao2017 marked this conversation as resolved.
Show resolved Hide resolved
@Override
public Distribution newMetric() {
return new SemanticMetricDistribution();
}

@Override
public boolean isInstance(final Metric metric) {
return Distribution.class.isInstance(metric);
}
};

SemanticMetricBuilder<Counter> COUNTERS = new SemanticMetricBuilder<Counter>() {
@Override
public Counter newMetric() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright (c) 2016 Spotify AB.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update year

*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.spotify.metrics.core;


import com.google.common.annotations.VisibleForTesting;
import com.tdunning.math.stats.TDigest;

import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicReference;

/**
* Semantic Metric implementation of {@link Distribution}.
* This implementation ensures threadsafety for recording data
* and retrieving distribution point value.
*
* {@link SemanticMetricDistribution} is backed by Ted Dunning T-digest implementation.
*
* <p>{@link TDigest} "sketch" are generated by clustering real-valued samples and
* retaining the mean and number of samples for each cluster.
* The generated data structure is mergeable and produces fairly
* accurate percentile even for long-tail distribution.
*
* <p> We are using T-digest compression level of 100.
* With that level of compression, our own benchmark using Pareto distribution
* dataset, shows P99 error rate is less than 2% .
* From P99.9 to P99.999 the error rate is slightly higher than 2%.
*
*/
public final class SemanticMetricDistribution implements Distribution {

private static final int COMPRESSION_DEFAULT_LEVEL = 100;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might we want to change this level? Should it be configuration-driven perhaps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for the time being it is better to keep every aspect of Tdigest private. Power users have the ability to extend this class and use a different compression level. The typical use case 100 is ok.

private final AtomicReference<TDigest> distRef;

SemanticMetricDistribution() {
this.distRef = new AtomicReference<>(create());
}

@Override
public synchronized void record(double val) {
distRef.get().add(val);
}

@Override
public java.nio.ByteBuffer getValueAndFlush() {
TDigest curVal;
synchronized (this) {
curVal = distRef.getAndSet(create()); // reset tdigest
mattnworb marked this conversation as resolved.
Show resolved Hide resolved
}
ByteBuffer byteBuffer = ByteBuffer.allocate(curVal.smallByteSize());
curVal.asSmallBytes(byteBuffer);
return byteBuffer;
}


@Override
public long getCount() {
return distRef.get().size();
}

@VisibleForTesting
TDigest tDigest() {
return distRef.get();
}

private TDigest create() {
return TDigest.createDigest(COMPRESSION_DEFAULT_LEVEL);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import com.codahale.metrics.Metric;
import com.codahale.metrics.Reservoir;
import com.codahale.metrics.Timer;
import org.checkerframework.checker.nullness.qual.NonNull;

import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -49,7 +51,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Supplier;
import org.checkerframework.checker.nullness.qual.NonNull;

/**
* A registry of metric instances.
Expand Down Expand Up @@ -77,7 +78,7 @@ public SemanticMetricRegistry() {

public SemanticMetricRegistry(final Supplier<Reservoir> defaultReservoirSupplier) {
this.metrics = new ConcurrentHashMap<MetricId, Metric>();
this.listeners = new CopyOnWriteArrayList<SemanticMetricRegistryListener>();
this.listeners = new CopyOnWriteArrayList<>();
this.defaultReservoirSupplier = defaultReservoirSupplier;
}

Expand All @@ -86,7 +87,7 @@ public SemanticMetricRegistry(
final Supplier<Reservoir> defaultReservoirSupplier
) {
this.metrics = metrics;
this.listeners = new CopyOnWriteArrayList<SemanticMetricRegistryListener>();
this.listeners = new CopyOnWriteArrayList<>();
this.defaultReservoirSupplier = defaultReservoirSupplier;
}

Expand Down Expand Up @@ -134,7 +135,7 @@ public <T extends Metric> T register(@NonNull final MetricId name, @NonNull fina
/**
* Given a metric set, registers them.
*
* @param metrics a set of metrics
* @param metrics a set of metrics
* @throws IllegalArgumentException if any of the names are already registered
*/
public void registerAll(final SemanticMetricSet metrics) throws IllegalArgumentException {
Expand Down Expand Up @@ -162,14 +163,21 @@ public Histogram histogram(final MetricId name) {
SemanticMetricBuilderFactory.histogramWithReservoir(defaultReservoirSupplier));
}



/**
* Creates a new {@link Distribution} or return an existing one registers under the given name.
*
* @param name the name of the metric
* @return a new {@link Distribution}
*/
public Distribution distribution(final MetricId name) {
return getOrAdd(name, SemanticMetricBuilder.DISTRIBUTION);
}

/**
* Creates a new {@link Histogram} with a custom {@link Reservoir} and registers it under
* the given name.
*
* @param name the name of the metric
* @param name the name of the metric
* @param reservoirSupplier a {@link Supplier} that returns an instance of {@link Reservoir}
* @return a new {@link Histogram}
*/
Expand Down Expand Up @@ -204,7 +212,7 @@ public Timer timer(final MetricId name) {
* Creates a new {@link Timer} with a custom {@link Reservoir} and registers it under the given
* name.
*
* @param name the name of the metric
* @param name the name of the metric
* @param reservoirSupplier a {@link Supplier} that returns an instance of {@link Reservoir}
* @return a new {@link Timer}
*/
Expand Down Expand Up @@ -293,7 +301,7 @@ public SortedMap<MetricId, Gauge> getGauges() {
/**
* Returns a map of all the gauges in the registry and their names which match the given filter.
*
* @param filter the metric filter to match
* @param filter the metric filter to match
* @return all the gauges in the registry
*/
@SuppressWarnings("rawtypes")
Expand All @@ -314,7 +322,7 @@ public SortedMap<MetricId, Counter> getCounters() {
* Returns a map of all the counters in the registry and their names which match the given
* filter.
*
* @param filter the metric filter to match
* @param filter the metric filter to match
* @return all the counters in the registry
*/
public SortedMap<MetricId, Counter> getCounters(final SemanticMetricFilter filter) {
Expand All @@ -334,7 +342,7 @@ public SortedMap<MetricId, Histogram> getHistograms() {
* Returns a map of all the histograms in the registry and their names which match the given
* filter.
*
* @param filter the metric filter to match
* @param filter the metric filter to match
* @return all the histograms in the registry
*/
public SortedMap<MetricId, Histogram> getHistograms(final SemanticMetricFilter filter) {
Expand All @@ -353,7 +361,7 @@ public SortedMap<MetricId, Meter> getMeters() {
/**
* Returns a map of all the meters in the registry and their names which match the given filter.
*
* @param filter the metric filter to match
* @param filter the metric filter to match
* @return all the meters in the registry
*/
public SortedMap<MetricId, Meter> getMeters(final SemanticMetricFilter filter) {
Expand All @@ -372,7 +380,7 @@ public SortedMap<MetricId, Timer> getTimers() {
/**
* Returns a map of all the timers in the registry and their names which match the given filter.
*
* @param filter the metric filter to match
* @param filter the metric filter to match
* @return all the timers in the registry
*/
public SortedMap<MetricId, Timer> getTimers(final SemanticMetricFilter filter) {
Expand All @@ -383,23 +391,35 @@ public SortedMap<MetricId, Timer> getTimers(final SemanticMetricFilter filter) {
* Returns a map of all the deriving meters in the registry and their names which match the
* given filter.
*
* @param filter the metric filter to match
* @param filter the metric filter to match
* @return all the deriving meters in the registry
*/
public SortedMap<MetricId, DerivingMeter> getDerivingMeters(final SemanticMetricFilter filter) {
return getMetrics(DerivingMeter.class, filter);
}

/**
* Atomically adds the given metric to the set of metrics.
* Returns a map of all the distributions metrics in the registry and their
* names which match the given filter.
*
* @param filter the metric filter to match
* @return a sorted Map of distribution metrics
*/
public SortedMap<MetricId, Distribution> getDistributions(final SemanticMetricFilter filter) {
return getMetrics(Distribution.class, filter);
}


/**
* Atomically adds the given metric to the set of metrics.
* <p>
* A side effect of this method is the calling of {@link #onMetricAdded(MetricId, Metric)} if
* a new
* metric is added.
*
* <p>
* This method should only be used on non-{@code SemanticMetricSet} metrics.
*
* @param name Name of the metric to atomically add if absent.
* @param name Name of the metric to atomically add if absent.
* @param metric The metric to atomically add if absent.
* @return {@code null} if the metric was added, or the previously mapped metric.
*/
Expand Down Expand Up @@ -493,11 +513,14 @@ private void notifyListenerOfAddedMetric(
listener.onTimerAdded(name, (Timer) metric);
} else if (metric instanceof DerivingMeter) {
listener.onDerivingMeterAdded(name, (DerivingMeter) metric);
} else if (metric instanceof Distribution) {
listener.onDistributionAdded(name, (Distribution) metric);
} else {
throw new IllegalArgumentException("Unknown metric type: " + metric.getClass());
}
}


protected void onMetricRemoved(final MetricId name, final Metric metric) {
for (final SemanticMetricRegistryListener listener : listeners) {
notifyListenerOfRemovedMetric(name, metric, listener);
Expand All @@ -519,6 +542,8 @@ private void notifyListenerOfRemovedMetric(
listener.onTimerRemoved(name);
} else if (metric instanceof DerivingMeter) {
listener.onDerivingMeterRemoved(name);
} else if (metric instanceof Distribution) {
listener.onDistributionRemoved(name);
} else {
throw new IllegalArgumentException("Unknown metric type: " + metric.getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public void onTimerRemoved(String name) {
tryRemove(name);
}


private <T extends Metric> void tryRegister(String name, T metric) {
try {
MetricId metricId = metricIdBuilder.buildMetricId(name);
Expand Down
Loading