Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add distribution metric to semantic-core and ffwd-reporter #79

Merged
merged 13 commits into from
Sep 9, 2020
Merged
66 changes: 66 additions & 0 deletions core/src/main/java/com/spotify/metrics/core/Distribution.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (C) 2016 - 2020 Spotify AB.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.spotify.metrics.core;

import com.codahale.metrics.Counting;
import com.codahale.metrics.Metric;

import java.nio.ByteBuffer;

/**
* {@link Distribution} is a simple interface that allows users to record measurements
* to compute rank statistics on data distribution not just local source.
*
* <p>Every implementation should produce a serialized data sketch in a byteBuffer
* as this metric point value. For more information on how this is handled upstream,
* Please refer to
* <a href="https://github.com/spotify/ffwd-client-java/blob/master/ffwd-
* client/src/main/java/com/spotify/ffwd/FastForward.java#L110"/> FastForward Java client</a>
*
* <p>Unlike traditional histogram, {@link Distribution} doesn't require
* predefined percentile value. Data recorded
* can be used upstream to compute any percentile.
*
* <p>This Distribution doesn't require any binning configuration.
* Just get an instance through SemanticMetricBuilder and record data.
*
* <p> {@link Distribution} is a good choice if you care about percentile accuracy in
* a distributed environment and you want to rely on P99 to set SLO.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent comment!

*/
public interface Distribution extends Metric, Counting {

/**
* Record value from Min.Double to Max.Double.
* @param val
*/
void record(double val);

/**
* Return distribution point value and flush.
* When this method is called every internal state
* is reset and a new recording starts.
*
* @return
*/
ByteBuffer getValueAndFlush();
Copy link
Member

@mattnworb mattnworb Sep 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still curious about the idea of exposing the "value" of the Distribution as a ByteBuffer. It seems like it would couple this Distribution metric to the way in which the t-digest library serializes the underlying com.tdunning.math.stats.TDigest into bytes.

For instance, if there was an alternate implementation of Distribution in the future, it would have to return the values in the same byte format, since FastForwardReporter has the expectation that the results of getValueAndFlush() must look a certain way.

Is there any other data type that could be used? Or at least could the format of the bytes be documented here?

Copy link
Contributor Author

@ao2017 ao2017 Sep 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use Distribution as the return type then the serializer will have to know something about Distribution implementation. For instance, FFW reporter will have an implementation of this method for every Distribution type.
https://github.com/spotify/semantic-metrics/pull/79/files/4ae53fe81b12e35376529ab79184798e982e7ade#diff-455c8027b591558e13ad0a4d245b2518R265. That will not align with the current paradigm of not exposing the detail of the metric implementation to the serializer.

Documentation for bytes format is handled by FFW. There is a version number, we will use that number upstream to determine how to handle the bytes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Return something immutable like ByteString
  2. Instead of ByteString, return something more semantically meaningful like a DistributionSnapshot wrapper

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I am not sure about DistributionSnapshot, that will expose Distribution implementation to ffwReporter as noted above.
  2. Let's merge this branch. Bytebuffer is transformed into ByteString before serialization in ffw-client. I will change the return type later.


}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.Timer;
import com.spotify.metrics.core.codahale.metrics.ext.Distribution;

/**
* A quick and easy way of capturing the notion of default metrics.
Expand All @@ -37,7 +36,7 @@ public interface SemanticMetricBuilder<T extends Metric> {
SemanticMetricBuilder<Distribution> DISTRIBUTION = new SemanticMetricBuilder<Distribution>() {
ao2017 marked this conversation as resolved.
Show resolved Hide resolved
@Override
public Distribution newMetric() {
return new DistributionImpl();
return new SemanticMetricDistribution();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016 Spotify AB.
* Copyright (C) 2016 - 2020 Spotify AB.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
Expand All @@ -23,53 +23,66 @@


import com.google.common.annotations.VisibleForTesting;
import com.spotify.metrics.core.codahale.metrics.ext.Distribution;
import com.tdunning.math.stats.TDigest;

import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicReference;


public class DistributionImpl implements Distribution {
/**
* Semantic Metric implementation of {@link Distribution}.
* This implementation ensures threadsafety for recording data
* and retrieving distribution point value.
*
* {@link SemanticMetricDistribution} is backed by Ted Dunning T-digest implementation.
*
* <p>{@link TDigest} "sketch" are generated by clustering real-valued samples and
* retaining the mean and number of samples for each cluster.
* The generated data structure is mergeable and produces fairly
* accurate percentile even for long-tail distribution.
*
* <p> We are using T-digest compression level of 100.
* With that level of compression, our own benchmark using Pareto distribution
* dataset, shows P99 error rate is less than 2% .
* From P99.9 to P99.999 the error rate is slightly higher than 2%.
*
*/
public final class SemanticMetricDistribution implements Distribution {

private static final int COMPRESSION_DEFAULT_LEVEL = 100;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might we want to change this level? Should it be configuration-driven perhaps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for the time being it is better to keep every aspect of Tdigest private. Power users have the ability to extend this class and use a different compression level. The typical use case 100 is ok.

private final AtomicReference<TDigest> distRef;

protected DistributionImpl() {
this.distRef = new AtomicReference<>(TDigest.createDigest(COMPRESSION_DEFAULT_LEVEL));
SemanticMetricDistribution() {
this.distRef = new AtomicReference<>(create());
}

@Override
public void record(double val) {
public synchronized void record(double val) {
distRef.get().add(val);
}

@Override
public java.nio.ByteBuffer getValueAndFlush() {
TDigest curVal = distRef.getAndSet(create()); // reset tdigest
TDigest curVal;
synchronized (this) {
curVal = distRef.getAndSet(create()); // reset tdigest
mattnworb marked this conversation as resolved.
Show resolved Hide resolved
}
ByteBuffer byteBuffer = ByteBuffer.allocate(curVal.smallByteSize());
curVal.asSmallBytes(byteBuffer);
return byteBuffer;
}

/**
* Returns the current count.
*
* @return the current count
*/

@Override
public long getCount() {
return this.tDigest().size();
return distRef.get().size();
}

private TDigest create() {
return TDigest.createDigest(COMPRESSION_DEFAULT_LEVEL);
}


@VisibleForTesting
TDigest tDigest() {
return distRef.get();
}

private TDigest create() {
return TDigest.createDigest(COMPRESSION_DEFAULT_LEVEL);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import com.codahale.metrics.Metric;
import com.codahale.metrics.Reservoir;
import com.codahale.metrics.Timer;
import com.spotify.metrics.core.codahale.metrics.ext.Distribution;
import org.checkerframework.checker.nullness.qual.NonNull;

import java.util.Collections;
Expand All @@ -58,7 +57,7 @@
*/
public class SemanticMetricRegistry implements SemanticMetricSet {
private final ConcurrentMap<MetricId, Metric> metrics;
private final List<SemanticMetricRegistryListenerV2> listeners;
private final List<SemanticMetricRegistryListener> listeners;
private final Supplier<Reservoir> defaultReservoirSupplier;

/**
Expand All @@ -79,7 +78,7 @@ public SemanticMetricRegistry() {

public SemanticMetricRegistry(final Supplier<Reservoir> defaultReservoirSupplier) {
this.metrics = new ConcurrentHashMap<MetricId, Metric>();
this.listeners = new CopyOnWriteArrayList<SemanticMetricRegistryListenerV2>();
this.listeners = new CopyOnWriteArrayList<>();
this.defaultReservoirSupplier = defaultReservoirSupplier;
}

Expand All @@ -88,7 +87,7 @@ public SemanticMetricRegistry(
final Supplier<Reservoir> defaultReservoirSupplier
) {
this.metrics = metrics;
this.listeners = new CopyOnWriteArrayList<SemanticMetricRegistryListenerV2>();
this.listeners = new CopyOnWriteArrayList<>();
this.defaultReservoirSupplier = defaultReservoirSupplier;
}

Expand Down Expand Up @@ -254,15 +253,15 @@ public void removeMatching(final SemanticMetricFilter filter) {
}

/**
* Adds a {@link SemanticMetricRegistryListenerV2} to a collection of listeners that will be
* Adds a {@link SemanticMetricRegistryListener} to a collection of listeners that will be
* notified on
* metric creation. Listeners will be notified in the order in which they are added.
* <p/>
* <b>N.B.:</b> The listener will be notified of all existing metrics when it first registers.
*
* @param listener the listener that will be notified
*/
public void addListener(final SemanticMetricRegistryListenerV2 listener) {
public void addListener(final SemanticMetricRegistryListener listener) {
listeners.add(listener);

for (final Map.Entry<MetricId, Metric> entry : metrics.entrySet()) {
Expand All @@ -271,12 +270,12 @@ public void addListener(final SemanticMetricRegistryListenerV2 listener) {
}

/**
* Removes a {@link SemanticMetricRegistryListenerV2} from this registry's collection of
* Removes a {@link SemanticMetricRegistryListener} from this registry's collection of
* listeners.
*
* @param listener the listener that will be removed
*/
public void removeListener(final SemanticMetricRegistryListenerV2 listener) {
public void removeListener(final SemanticMetricRegistryListener listener) {
listeners.remove(listener);
}

Expand Down Expand Up @@ -406,7 +405,7 @@ public SortedMap<MetricId, DerivingMeter> getDerivingMeters(final SemanticMetric
* @param filter the metric filter to match
* @return a sorted Map of distribution metrics
*/
public SortedMap<MetricId, Distribution> getDistribution(final SemanticMetricFilter filter) {
public SortedMap<MetricId, Distribution> getDistributions(final SemanticMetricFilter filter) {
return getMetrics(Distribution.class, filter);
}

Expand Down Expand Up @@ -494,13 +493,13 @@ protected <T extends Metric> SortedMap<MetricId, T> getMetrics(
}

protected void onMetricAdded(final MetricId name, final Metric metric) {
for (final SemanticMetricRegistryListenerV2 listener : listeners) {
for (final SemanticMetricRegistryListener listener : listeners) {
notifyListenerOfAddedMetric(listener, metric, name);
}
}

private void notifyListenerOfAddedMetric(
final SemanticMetricRegistryListenerV2 listener, final Metric metric, final MetricId name
final SemanticMetricRegistryListener listener, final Metric metric, final MetricId name
) {
if (metric instanceof Gauge) {
listener.onGaugeAdded(name, (Gauge<?>) metric);
Expand All @@ -521,14 +520,15 @@ private void notifyListenerOfAddedMetric(
}
}


protected void onMetricRemoved(final MetricId name, final Metric metric) {
for (final SemanticMetricRegistryListenerV2 listener : listeners) {
for (final SemanticMetricRegistryListener listener : listeners) {
notifyListenerOfRemovedMetric(name, metric, listener);
}
}

private void notifyListenerOfRemovedMetric(
final MetricId name, final Metric metric, final SemanticMetricRegistryListenerV2 listener
final MetricId name, final Metric metric, final SemanticMetricRegistryListener listener
) {
if (metric instanceof Gauge) {
listener.onGaugeRemoved(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.MetricRegistryListener;
import com.codahale.metrics.Timer;
import com.spotify.metrics.core.codahale.metrics.ext.Distribution;
import com.spotify.metrics.core.codahale.metrics.ext.MetricRegistryListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -109,28 +108,6 @@ public void onTimerRemoved(String name) {
}


@Override
public void onDerivingMeterAdded(String name, DerivingMeter derivingMeter) {
tryRegister(name, derivingMeter);
}

@Override
public void onDerivingMeterRemoved(String name) {
tryRemove(name);
}

@Override
public void onDistributionAdded(String name, Distribution distribution) {
tryRegister(name, distribution);
}

@Override
public void onDistributionRemoved(String name) {
tryRemove(name);
}



private <T extends Metric> void tryRegister(String name, T metric) {
try {
MetricId metricId = metricIdBuilder.buildMetricId(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ public void onDerivingMeterAdded(MetricId name, DerivingMeter derivingMeter) {
@Override
public void onDerivingMeterRemoved(MetricId name) {
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not a fan of this class name. Is it convention?

@Override
public void onDistributionAdded(MetricId name, Distribution distribution) {

}

@Override
public void onDistributionRemoved(MetricId name) {

}
}

/**
Expand Down Expand Up @@ -184,4 +194,27 @@ public void onDerivingMeterRemoved(MetricId name) {
* @param name the meter's name
*/
void onDerivingMeterRemoved(MetricId name);

malish8632 marked this conversation as resolved.
Show resolved Hide resolved
/**
* This is a no op implementation for backward compatibility.
* Please override this method if you are using a Distribution metric.
* Method is called when a {@link Distribution} is added to the registry.
*
* @param name the distribution's name
* @param distribution the distribution
*/
public default void onDistributionAdded(MetricId name, Distribution distribution) {

}

/**
* This is a no op implementation for backward compatibility.
* Please override this method if you are using a Distribution metric.
* Method is called when a {@link Distribution} is removed from the registry.
*
* @param name the distribution's name
*/
public default void onDistributionRemoved(MetricId name) {

}
}
Loading