Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add distribution metric to semantic-core and ffwd-reporter #79

Merged
merged 13 commits into from
Sep 9, 2020
Merged
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
<artifactId>semantic-metrics-api</artifactId>
</dependency>

<dependency>
<groupId>com.tdunning</groupId>
<artifactId>t-digest</artifactId>
</dependency>

<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
Expand Down
75 changes: 75 additions & 0 deletions core/src/main/java/com/spotify/metrics/core/DistributionImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (c) 2016 Spotify AB.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.spotify.metrics.core;


import com.google.common.annotations.VisibleForTesting;
import com.spotify.metrics.core.codahale.metrics.ext.Distribution;
import com.tdunning.math.stats.TDigest;

import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicReference;


public class DistributionImpl implements Distribution {

private static final int COMPRESSION_DEFAULT_LEVEL = 100;
private final AtomicReference<TDigest> distRef;

protected DistributionImpl() {
this.distRef = new AtomicReference<>(TDigest.createDigest(COMPRESSION_DEFAULT_LEVEL));
Copy link

@sming sming Aug 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the rationale behind using an AtomicReference? With AtomicRefernce, the only guarantee is that this reference will be updated/set in a thread-safe manner and AFAICT we don't update it...?

Also, AFAICT, multiple threads could still overwrite this.distRef here, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's ok, As long as each thread has its own copy we are good.

}

@Override
public void record(double val) {
distRef.get().add(val);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TDigest.add() does not appear to be threadsafe

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I am looking into this. I checked couple of Dropwizard metrics there are threadsafe so I will make this thread safe.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could either

  • add synchronized around it and hope it's enough.
  • Use ThreadLocal and then merge all the thread local entries somehow inside getValueAndFlush
  • Push to a queue and have another thread popping them off and adding to the TDigest object

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check the implementation I added yesterday. I thought about adding threadLocal then change mind. ThreadLocal may introduce memory leak if getValueAndFlush isn't called often.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that should work now I think (but it's hard to know if there will be much contention, some multithreaded JMH benchmarks might be useful).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, sure will do. Please take a look I need a +1

}

@Override
public java.nio.ByteBuffer getValueAndFlush() {
ao2017 marked this conversation as resolved.
Show resolved Hide resolved
TDigest curVal = distRef.getAndSet(create()); // reset tdigest
ByteBuffer byteBuffer = ByteBuffer.allocate(curVal.smallByteSize());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: smallBytesSize(), it reads:

Returns the number of bytes required to encode this TDigest using #asSmallBytes(). Note that this is just as expensive as actually compressing the digest. If you don't care about time, but want to never over-allocate, this is fine. If you care about compression and speed, you pretty much just have to overallocate by using allocating #byteSize() bytes.

  • are we sure we don't care about time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are ok with the performance. Data storage cost is more of a concern.

curVal.asSmallBytes(byteBuffer);
return byteBuffer;
}

/**
* Returns the current count.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not terribly useful :) Either remove or explain what's being counted, cheers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, something is better than guessing :) . I will update.

*
* @return the current count
*/
@Override
public long getCount() {
return this.tDigest().size();
}

private TDigest create() {
return TDigest.createDigest(COMPRESSION_DEFAULT_LEVEL);
}


@VisibleForTesting
TDigest tDigest() {
return distRef.get();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,25 @@
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.Timer;
import com.spotify.metrics.core.codahale.metrics.ext.Distribution;

/**
* A quick and easy way of capturing the notion of default metrics.
*/
public interface SemanticMetricBuilder<T extends Metric> {

SemanticMetricBuilder<Distribution> DISTRIBUTION = new SemanticMetricBuilder<Distribution>() {
ao2017 marked this conversation as resolved.
Show resolved Hide resolved
@Override
public Distribution newMetric() {
return new DistributionImpl();
}

@Override
public boolean isInstance(final Metric metric) {
return Distribution.class.isInstance(metric);
}
};

SemanticMetricBuilder<Counter> COUNTERS = new SemanticMetricBuilder<Counter>() {
@Override
public Counter newMetric() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
import com.codahale.metrics.Metric;
import com.codahale.metrics.Reservoir;
import com.codahale.metrics.Timer;
import com.spotify.metrics.core.codahale.metrics.ext.Distribution;
import org.checkerframework.checker.nullness.qual.NonNull;

import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -49,14 +52,13 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Supplier;
import org.checkerframework.checker.nullness.qual.NonNull;

/**
* A registry of metric instances.
*/
public class SemanticMetricRegistry implements SemanticMetricSet {
private final ConcurrentMap<MetricId, Metric> metrics;
private final List<SemanticMetricRegistryListener> listeners;
private final List<SemanticMetricRegistryListenerV2> listeners;
private final Supplier<Reservoir> defaultReservoirSupplier;

/**
Expand All @@ -77,7 +79,7 @@ public SemanticMetricRegistry() {

public SemanticMetricRegistry(final Supplier<Reservoir> defaultReservoirSupplier) {
this.metrics = new ConcurrentHashMap<MetricId, Metric>();
this.listeners = new CopyOnWriteArrayList<SemanticMetricRegistryListener>();
this.listeners = new CopyOnWriteArrayList<SemanticMetricRegistryListenerV2>();
this.defaultReservoirSupplier = defaultReservoirSupplier;
}

Expand All @@ -86,7 +88,7 @@ public SemanticMetricRegistry(
final Supplier<Reservoir> defaultReservoirSupplier
) {
this.metrics = metrics;
this.listeners = new CopyOnWriteArrayList<SemanticMetricRegistryListener>();
this.listeners = new CopyOnWriteArrayList<SemanticMetricRegistryListenerV2>();
this.defaultReservoirSupplier = defaultReservoirSupplier;
}

Expand Down Expand Up @@ -134,7 +136,7 @@ public <T extends Metric> T register(@NonNull final MetricId name, @NonNull fina
/**
* Given a metric set, registers them.
*
* @param metrics a set of metrics
* @param metrics a set of metrics
* @throws IllegalArgumentException if any of the names are already registered
*/
public void registerAll(final SemanticMetricSet metrics) throws IllegalArgumentException {
Expand Down Expand Up @@ -162,14 +164,21 @@ public Histogram histogram(final MetricId name) {
SemanticMetricBuilderFactory.histogramWithReservoir(defaultReservoirSupplier));
}



/**
* Creates a new {@link Distribution} or return an existing one registers under the given name.
*
* @param name the name of the metric
* @return a new {@link Distribution}
*/
public Distribution distribution(final MetricId name) {
return getOrAdd(name, SemanticMetricBuilder.DISTRIBUTION);
}

/**
* Creates a new {@link Histogram} with a custom {@link Reservoir} and registers it under
* the given name.
*
* @param name the name of the metric
* @param name the name of the metric
* @param reservoirSupplier a {@link Supplier} that returns an instance of {@link Reservoir}
* @return a new {@link Histogram}
*/
Expand Down Expand Up @@ -204,7 +213,7 @@ public Timer timer(final MetricId name) {
* Creates a new {@link Timer} with a custom {@link Reservoir} and registers it under the given
* name.
*
* @param name the name of the metric
* @param name the name of the metric
* @param reservoirSupplier a {@link Supplier} that returns an instance of {@link Reservoir}
* @return a new {@link Timer}
*/
Expand Down Expand Up @@ -245,15 +254,15 @@ public void removeMatching(final SemanticMetricFilter filter) {
}

/**
* Adds a {@link SemanticMetricRegistryListener} to a collection of listeners that will be
* Adds a {@link SemanticMetricRegistryListenerV2} to a collection of listeners that will be
* notified on
* metric creation. Listeners will be notified in the order in which they are added.
* <p/>
* <b>N.B.:</b> The listener will be notified of all existing metrics when it first registers.
*
* @param listener the listener that will be notified
*/
public void addListener(final SemanticMetricRegistryListener listener) {
public void addListener(final SemanticMetricRegistryListenerV2 listener) {
ao2017 marked this conversation as resolved.
Show resolved Hide resolved
listeners.add(listener);

for (final Map.Entry<MetricId, Metric> entry : metrics.entrySet()) {
Expand All @@ -262,12 +271,12 @@ public void addListener(final SemanticMetricRegistryListener listener) {
}

/**
* Removes a {@link SemanticMetricRegistryListener} from this registry's collection of
* Removes a {@link SemanticMetricRegistryListenerV2} from this registry's collection of
* listeners.
*
* @param listener the listener that will be removed
*/
public void removeListener(final SemanticMetricRegistryListener listener) {
public void removeListener(final SemanticMetricRegistryListenerV2 listener) {
listeners.remove(listener);
}

Expand All @@ -293,7 +302,7 @@ public SortedMap<MetricId, Gauge> getGauges() {
/**
* Returns a map of all the gauges in the registry and their names which match the given filter.
*
* @param filter the metric filter to match
* @param filter the metric filter to match
* @return all the gauges in the registry
*/
@SuppressWarnings("rawtypes")
Expand All @@ -314,7 +323,7 @@ public SortedMap<MetricId, Counter> getCounters() {
* Returns a map of all the counters in the registry and their names which match the given
* filter.
*
* @param filter the metric filter to match
* @param filter the metric filter to match
* @return all the counters in the registry
*/
public SortedMap<MetricId, Counter> getCounters(final SemanticMetricFilter filter) {
Expand All @@ -334,7 +343,7 @@ public SortedMap<MetricId, Histogram> getHistograms() {
* Returns a map of all the histograms in the registry and their names which match the given
* filter.
*
* @param filter the metric filter to match
* @param filter the metric filter to match
* @return all the histograms in the registry
*/
public SortedMap<MetricId, Histogram> getHistograms(final SemanticMetricFilter filter) {
Expand All @@ -353,7 +362,7 @@ public SortedMap<MetricId, Meter> getMeters() {
/**
* Returns a map of all the meters in the registry and their names which match the given filter.
*
* @param filter the metric filter to match
* @param filter the metric filter to match
* @return all the meters in the registry
*/
public SortedMap<MetricId, Meter> getMeters(final SemanticMetricFilter filter) {
Expand All @@ -372,7 +381,7 @@ public SortedMap<MetricId, Timer> getTimers() {
/**
* Returns a map of all the timers in the registry and their names which match the given filter.
*
* @param filter the metric filter to match
* @param filter the metric filter to match
* @return all the timers in the registry
*/
public SortedMap<MetricId, Timer> getTimers(final SemanticMetricFilter filter) {
Expand All @@ -383,23 +392,35 @@ public SortedMap<MetricId, Timer> getTimers(final SemanticMetricFilter filter) {
* Returns a map of all the deriving meters in the registry and their names which match the
* given filter.
*
* @param filter the metric filter to match
* @param filter the metric filter to match
* @return all the deriving meters in the registry
*/
public SortedMap<MetricId, DerivingMeter> getDerivingMeters(final SemanticMetricFilter filter) {
return getMetrics(DerivingMeter.class, filter);
}

/**
* Atomically adds the given metric to the set of metrics.
* Returns a map of all the distributions metrics in the registry and their
* names which match the given filter.
*
* @param filter the metric filter to match
* @return a sorted Map of distribution metrics
*/
public SortedMap<MetricId, Distribution> getDistribution(final SemanticMetricFilter filter) {
malish8632 marked this conversation as resolved.
Show resolved Hide resolved
return getMetrics(Distribution.class, filter);
}


/**
* Atomically adds the given metric to the set of metrics.
* <p>
* A side effect of this method is the calling of {@link #onMetricAdded(MetricId, Metric)} if
* a new
* metric is added.
*
* <p>
* This method should only be used on non-{@code SemanticMetricSet} metrics.
*
* @param name Name of the metric to atomically add if absent.
* @param name Name of the metric to atomically add if absent.
* @param metric The metric to atomically add if absent.
* @return {@code null} if the metric was added, or the previously mapped metric.
*/
Expand Down Expand Up @@ -473,13 +494,13 @@ protected <T extends Metric> SortedMap<MetricId, T> getMetrics(
}

protected void onMetricAdded(final MetricId name, final Metric metric) {
for (final SemanticMetricRegistryListener listener : listeners) {
for (final SemanticMetricRegistryListenerV2 listener : listeners) {
notifyListenerOfAddedMetric(listener, metric, name);
}
}

private void notifyListenerOfAddedMetric(
final SemanticMetricRegistryListener listener, final Metric metric, final MetricId name
final SemanticMetricRegistryListenerV2 listener, final Metric metric, final MetricId name
) {
if (metric instanceof Gauge) {
listener.onGaugeAdded(name, (Gauge<?>) metric);
Expand All @@ -493,19 +514,21 @@ private void notifyListenerOfAddedMetric(
listener.onTimerAdded(name, (Timer) metric);
} else if (metric instanceof DerivingMeter) {
listener.onDerivingMeterAdded(name, (DerivingMeter) metric);
} else if (metric instanceof Distribution) {
listener.onDistributionAdded(name, (Distribution) metric);
} else {
throw new IllegalArgumentException("Unknown metric type: " + metric.getClass());
}
}

protected void onMetricRemoved(final MetricId name, final Metric metric) {
for (final SemanticMetricRegistryListener listener : listeners) {
for (final SemanticMetricRegistryListenerV2 listener : listeners) {
notifyListenerOfRemovedMetric(name, metric, listener);
}
}

private void notifyListenerOfRemovedMetric(
final MetricId name, final Metric metric, final SemanticMetricRegistryListener listener
final MetricId name, final Metric metric, final SemanticMetricRegistryListenerV2 listener
) {
if (metric instanceof Gauge) {
listener.onGaugeRemoved(name);
Expand All @@ -519,6 +542,8 @@ private void notifyListenerOfRemovedMetric(
listener.onTimerRemoved(name);
} else if (metric instanceof DerivingMeter) {
listener.onDerivingMeterRemoved(name);
} else if (metric instanceof Distribution) {
listener.onDistributionRemoved(name);
} else {
throw new IllegalArgumentException("Unknown metric type: " + metric.getClass());
}
Expand Down
Loading