diff --git a/.gitignore b/.gitignore index f79f7cfa..21a8509d 100644 --- a/.gitignore +++ b/.gitignore @@ -31,3 +31,4 @@ cleanmvn.dot *.iml .idea /analytics/server/log/ +*.versionsBackup diff --git a/deploy_release.sh b/deploy_release.sh new file mode 100644 index 00000000..6c01c520 --- /dev/null +++ b/deploy_release.sh @@ -0,0 +1,2 @@ +#!/bin/bash +mvn clean deploy -P release diff --git a/deploy_snapshot.sh b/deploy_snapshot.sh new file mode 100644 index 00000000..4a673a93 --- /dev/null +++ b/deploy_snapshot.sh @@ -0,0 +1,2 @@ +#!/bin/bash +mvn clean deploy diff --git a/pom.xml b/pom.xml index e46d0570..15ff00dc 100644 --- a/pom.xml +++ b/pom.xml @@ -3,10 +3,16 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 + + org.sonatype.oss + oss-parent + 7 + + com.signalfuse.public clients-parent SignalFuse parent - 1-SNAPSHOT + 0.0.6-SNAPSHOT pom @@ -52,22 +58,22 @@ com.signalfuse.public signalfuse-protoc - 1.0-SNAPSHOT + 0.0.6-SNAPSHOT com.signalfuse.public signalfuse-codahale - 1.0-SNAPSHOT + 0.0.6-SNAPSHOT com.signalfuse.public signalfuse-commons-protoc-java - 1.0-SNAPSHOT + 0.0.6-SNAPSHOT com.signalfuse.public signalfuse-java - 1.0-SNAPSHOT + 0.0.6-SNAPSHOT @@ -148,14 +154,38 @@ + + + + + maven-deploy-plugin + 2.8.1 + + + + + org.sonatype.plugins + nexus-staging-maven-plugin + 1.6.2 + true + + ossrh + https://oss.sonatype.org/ + true + + org.apache.maven.plugins maven-compiler-plugin 3.1 - 1.5 - 1.5 + 1.6 + 1.6 @@ -184,6 +214,20 @@ + + org.apache.maven.plugins + maven-gpg-plugin + 1.5 + + + sign-artifacts + verify + + sign + + + + org.apache.maven.plugins maven-dependency-plugin diff --git a/signalfuse-codahale/pom.xml b/signalfuse-codahale/pom.xml index 38019008..40d49a8e 100644 --- a/signalfuse-codahale/pom.xml +++ b/signalfuse-codahale/pom.xml @@ -7,12 +7,12 @@ com.signalfuse.public clients-parent - 1-SNAPSHOT + 0.0.6-SNAPSHOT signalfuse-codahale Codahale to SignalFuse - 1.0-SNAPSHOT + 0.0.6-SNAPSHOT jar Codahale yammer metrics plugin for signalfuse @@ -53,6 +53,10 @@ com.signalfuse.public signalfuse-protoc + + com.google.guava + guava + org.slf4j slf4j-simple diff --git a/signalfuse-codahale/src/main/java/com/signalfuse/codahale/metrics/CallbackCumulativeCounter.java b/signalfuse-codahale/src/main/java/com/signalfuse/codahale/metrics/CallbackCumulativeCounter.java new file mode 100644 index 00000000..4342ca09 --- /dev/null +++ b/signalfuse-codahale/src/main/java/com/signalfuse/codahale/metrics/CallbackCumulativeCounter.java @@ -0,0 +1,42 @@ +package com.signalfuse.codahale.metrics; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; + +/** + *

+ * Sometimes you want the rate of something like you would with a {@link Counter}, but you can't get + * individual events and instead must set a total "count" of events at some periodic rate. This + * class abstracts that out into a {@link Counter} that codahale can understand. + *

+ */ +public class CallbackCumulativeCounter extends Counter { + private final Callback callback; + + public CallbackCumulativeCounter(Callback callback) { + this.callback = callback; + } + + @Override public void inc() { + throw new UnsupportedOperationException("inc() on CallbackCumulativeCounter"); + } + + @Override public void inc(long n) { + throw new UnsupportedOperationException("inc() on CallbackCumulativeCounter"); + } + + @Override public void dec() { + throw new UnsupportedOperationException("dec() on CallbackCumulativeCounter"); + } + + @Override public void dec(long n) { + throw new UnsupportedOperationException("dec() on CallbackCumulativeCounter"); + } + + @Override public long getCount() { + return callback.getValue(); + } + + public interface Callback extends Gauge { + } +} diff --git a/signalfuse-codahale/src/main/java/com/signalfuse/codahale/metrics/SettableDoubleGauge.java b/signalfuse-codahale/src/main/java/com/signalfuse/codahale/metrics/SettableDoubleGauge.java new file mode 100644 index 00000000..f564a727 --- /dev/null +++ b/signalfuse-codahale/src/main/java/com/signalfuse/codahale/metrics/SettableDoubleGauge.java @@ -0,0 +1,18 @@ +package com.signalfuse.codahale.metrics; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Metric; +import com.google.common.util.concurrent.AtomicDouble; + +/** + * Same as {@link SettableLongGauge} but for a double + */ +public class SettableDoubleGauge implements Metric, Gauge { + private final AtomicDouble value = new AtomicDouble(); + public void setValue(double value) { + this.value.set(value); + } + public Double getValue() { + return value.get(); + } +} diff --git a/signalfuse-codahale/src/main/java/com/signalfuse/codahale/metrics/SettableLongGauge.java b/signalfuse-codahale/src/main/java/com/signalfuse/codahale/metrics/SettableLongGauge.java new file mode 100644 index 00000000..2d0e6321 --- /dev/null +++ b/signalfuse-codahale/src/main/java/com/signalfuse/codahale/metrics/SettableLongGauge.java @@ -0,0 +1,19 @@ +package com.signalfuse.codahale.metrics; + +import java.util.concurrent.atomic.AtomicLong; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Metric; + +/** + * Works like a Gauge, but rather than getting its value from a callback, the value + * is set when needed. This can be somewhat convienent, but direct use of a Gauge is likely better + */ +public class SettableLongGauge implements Metric, Gauge { + private final AtomicLong value = new AtomicLong(); + public void setValue(long value) { + this.value.set(value); + } + public Long getValue() { + return value.get(); + } +} diff --git a/signalfuse-codahale/src/main/java/com/signalfuse/codahale/reporter/AggregateMetricSenderSessionWrapper.java b/signalfuse-codahale/src/main/java/com/signalfuse/codahale/reporter/AggregateMetricSenderSessionWrapper.java new file mode 100644 index 00000000..968e5eaa --- /dev/null +++ b/signalfuse-codahale/src/main/java/com/signalfuse/codahale/reporter/AggregateMetricSenderSessionWrapper.java @@ -0,0 +1,177 @@ +package com.signalfuse.codahale.reporter; + +import java.io.Closeable; +import java.util.Map; +import java.util.Set; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Metered; +import com.codahale.metrics.Metric; +import com.codahale.metrics.Sampling; +import com.codahale.metrics.Snapshot; +import com.codahale.metrics.Timer; +import com.google.common.base.Optional; +import com.signalfuse.metrics.SignalfuseMetricsException; +import com.signalfuse.metrics.flush.AggregateMetricSender; +import com.signalfuse.metrics.protobuf.SignalFuseProtocolBuffers; + +class AggregateMetricSenderSessionWrapper implements Closeable { + private final AggregateMetricSender.Session metricSenderSession; + private final Set detailsToAdd; + private final MetricMetadata metricMetadata; + private final String defaultSourceName; + private final Map hardCounterValueCache; + + AggregateMetricSenderSessionWrapper( + AggregateMetricSender.Session metricSenderSession, + Set detailsToAdd, + MetricMetadata metricMetadata, + String defaultSourceName, Map hardCounterValueCache) { + this.metricSenderSession = metricSenderSession; + this.detailsToAdd = detailsToAdd; + this.metricMetadata = metricMetadata; + this.defaultSourceName = defaultSourceName; + this.hardCounterValueCache = hardCounterValueCache; + } + + public void close() { + try { + metricSenderSession.close(); + } catch (Exception e) { + throw new SignalfuseMetricsException("Unable to close session and send metrics", e); + } + } + + // These three called from report + void addTimer(String key, Timer value) { + addMetered(key, value); + addSampling(key, value); + } + + void addHistogram(String baseName, + Histogram histogram) { + addMetric(histogram, baseName, + Optional.of(SignalFuseReporter.MetricDetails.COUNT), + SignalFuseProtocolBuffers.MetricType.CUMULATIVE_COUNTER, histogram.getCount()); + addSampling(baseName, histogram); + } + + void addMetered(String baseName, Metered metered) { + addMetric(metered, baseName, + SignalFuseReporter.MetricDetails.COUNT, + SignalFuseProtocolBuffers.MetricType.CUMULATIVE_COUNTER, metered.getCount()); + addMetric(metered, baseName, + SignalFuseReporter.MetricDetails.RATE_15_MIN, + SignalFuseProtocolBuffers.MetricType.GAUGE, metered.getFifteenMinuteRate()); + addMetric(metered, baseName, + SignalFuseReporter.MetricDetails.RATE_1_MIN, + SignalFuseProtocolBuffers.MetricType.GAUGE, metered.getOneMinuteRate()); + addMetric(metered, baseName, + SignalFuseReporter.MetricDetails.RATE_5_MIN, + SignalFuseProtocolBuffers.MetricType.GAUGE, metered.getFiveMinuteRate()); + + addMetric(metered, baseName, + SignalFuseReporter.MetricDetails.RATE_MEAN, + SignalFuseProtocolBuffers.MetricType.GAUGE, metered.getMeanRate()); + } + + private void addSampling(String baseName, Sampling sampling) { + Metric metric = (Metric)sampling; + final Snapshot snapshot = sampling.getSnapshot(); + addMetric(metric, baseName, + SignalFuseReporter.MetricDetails.MEDIAN, + SignalFuseProtocolBuffers.MetricType.GAUGE, snapshot.getMedian()); + addMetric(metric, baseName, + SignalFuseReporter.MetricDetails.PERCENT_75, + SignalFuseProtocolBuffers.MetricType.GAUGE, snapshot.get75thPercentile()); + addMetric(metric, baseName, + SignalFuseReporter.MetricDetails.PERCENT_95, + SignalFuseProtocolBuffers.MetricType.GAUGE, snapshot.get95thPercentile()); + addMetric(metric, baseName, + SignalFuseReporter.MetricDetails.PERCENT_98, + SignalFuseProtocolBuffers.MetricType.GAUGE, snapshot.get98thPercentile()); + addMetric(metric, baseName, + SignalFuseReporter.MetricDetails.PERCENT_99, + SignalFuseProtocolBuffers.MetricType.GAUGE, snapshot.get99thPercentile()); + addMetric(metric, baseName, + SignalFuseReporter.MetricDetails.PERCENT_999, + SignalFuseProtocolBuffers.MetricType.GAUGE, snapshot.get999thPercentile()); + addMetric(metric, baseName, + SignalFuseReporter.MetricDetails.MAX, + SignalFuseProtocolBuffers.MetricType.GAUGE, snapshot.getMax()); + addMetric(metric, baseName, + SignalFuseReporter.MetricDetails.MIN, + SignalFuseProtocolBuffers.MetricType.GAUGE, snapshot.getMin()); + + + // These are slower to calculate. Only calculate if we need. + if (detailsToAdd.contains(SignalFuseReporter.MetricDetails.STD_DEV)) { + addMetric(metric, baseName, + SignalFuseReporter.MetricDetails.STD_DEV, + SignalFuseProtocolBuffers.MetricType.GAUGE, snapshot.getStdDev()); + } + if (detailsToAdd.contains(SignalFuseReporter.MetricDetails.MEAN)) { + addMetric(metric, baseName, + SignalFuseReporter.MetricDetails.MEAN, + SignalFuseProtocolBuffers.MetricType.GAUGE, snapshot.getMean()); + } + } + + void addMetric(Metric metric, String codahaleName, + SignalFuseProtocolBuffers.MetricType defaultMetricType, + Object originalValue) { + addMetric(metric, codahaleName, Optional.absent(), + defaultMetricType, originalValue); + } + + private void addMetric(Metric metric, String codahaleName, SignalFuseReporter.MetricDetails metricDetails, + SignalFuseProtocolBuffers.MetricType defaultMetricType, + Object originalValue) { + addMetric(metric, codahaleName, Optional.of(metricDetails), + defaultMetricType, originalValue); + + } + + void addMetric(Metric metric, String codahaleName, + Optional metricDetails, + SignalFuseProtocolBuffers.MetricType defaultMetricType, Object originalValue) { + final Number value; + if (originalValue instanceof Number) { + value = (Number) originalValue; + } else if (originalValue instanceof Boolean) { + value = ((Boolean)originalValue).booleanValue() ? 1 : 0; + } else { + // Unsupported type + return; + } + final String metricDetailsMetricNamePrefix; + if (metricDetails.isPresent()) { + if (!detailsToAdd.contains(metricDetails.get())) { + return; + } + metricDetailsMetricNamePrefix = "." + metricDetails.get().getDescription(); + } else { + metricDetailsMetricNamePrefix = ""; + } + Optional userSetMetricType = metricMetadata.getMetricType(metric); + SignalFuseProtocolBuffers.MetricType metricType = userSetMetricType.or(defaultMetricType); + Map tags = metricMetadata.getTags(metric); + final String sourceName = Optional.fromNullable(tags.get(MetricMetadata.SOURCE)).or(defaultSourceName); + final String metricName = Optional.fromNullable(tags.get(MetricMetadata.METRIC)).or(codahaleName) + metricDetailsMetricNamePrefix; + if (value instanceof Long || value instanceof Integer || value instanceof Short) { + final long valueToActuallySendToSignalFuse; + if (metricType.equals(SignalFuseProtocolBuffers.MetricType.COUNTER)) { + final long lastSeenCounterValue = Optional.fromNullable(hardCounterValueCache.put(metric, value.longValue())).or(0L).longValue(); + valueToActuallySendToSignalFuse = value.longValue() - lastSeenCounterValue; + } else { + valueToActuallySendToSignalFuse = value.longValue(); + } + metricSenderSession.setDatapoint(sourceName, metricName, metricType, valueToActuallySendToSignalFuse); + } else { + final double doubleToSend = value.doubleValue(); + if (Double.isInfinite(doubleToSend) || Double.isNaN(doubleToSend)) { + // Invalid values. Don't report + } + metricSenderSession.setDatapoint(sourceName, metricName, metricType, value.doubleValue()); + } + } +} diff --git a/signalfuse-codahale/src/main/java/com/signalfuse/codahale/reporter/MetricMetadata.java b/signalfuse-codahale/src/main/java/com/signalfuse/codahale/reporter/MetricMetadata.java new file mode 100644 index 00000000..4703c34d --- /dev/null +++ b/signalfuse-codahale/src/main/java/com/signalfuse/codahale/reporter/MetricMetadata.java @@ -0,0 +1,24 @@ +package com.signalfuse.codahale.reporter; + +import java.util.Map; +import com.codahale.metrics.Metric; +import com.google.common.base.Optional; +import com.signalfuse.metrics.protobuf.SignalFuseProtocolBuffers; + +/** + * Note: This class must be thread safe. + */ +public interface MetricMetadata { + public static final String SOURCE = "source"; + public static final String METRIC = "metric"; + public Map getTags(Metric metric); + public Optional getMetricType(Metric metric); + public Tagger tagMetric(M metric); + + public interface Tagger { + Tagger withSourceName(String sourceName); + Tagger withMetricName(String metricName); + Tagger withMetricType(SignalFuseProtocolBuffers.MetricType metricType); + M metric(); + } +} diff --git a/signalfuse-codahale/src/main/java/com/signalfuse/codahale/reporter/MetricMetadataImpl.java b/signalfuse-codahale/src/main/java/com/signalfuse/codahale/reporter/MetricMetadataImpl.java new file mode 100644 index 00000000..e3ed6419 --- /dev/null +++ b/signalfuse-codahale/src/main/java/com/signalfuse/codahale/reporter/MetricMetadataImpl.java @@ -0,0 +1,89 @@ +package com.signalfuse.codahale.reporter; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import com.codahale.metrics.Metric; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.signalfuse.metrics.protobuf.SignalFuseProtocolBuffers; + +public class MetricMetadataImpl implements MetricMetadata { + private final Map metaDataCollection; + + public MetricMetadataImpl() { + // This map must be thread safe + metaDataCollection = new ConcurrentHashMap(); + } + + public Map getTags(Metric metric) { + Metadata existingMetaData = metaDataCollection.get(metric); + if (existingMetaData == null) { + return Collections.emptyMap(); + } else { + return Collections.unmodifiableMap(existingMetaData.tags); + } + } + + @Override + public Optional getMetricType(Metric metric) { + Metadata existingMetaData = metaDataCollection.get(metric); + if (existingMetaData == null) { + return Optional.absent(); + } else { + return Optional.of(existingMetaData.metricType); + } + } + + @Override + public synchronized Tagger tagMetric(M metric) { + if (metaDataCollection.containsKey(metric)) { + return new TaggerImpl(metric, metaDataCollection.get(metric)); + } else { + Metadata thisMetricsMetadata = new Metadata(); + Metadata oldMetaData = metaDataCollection.put(metric, thisMetricsMetadata); + Preconditions.checkArgument(oldMetaData == null, "Concurrency issue adding metadat"); + return new TaggerImpl(metric, thisMetricsMetadata); + } + } + + private static final class TaggerImpl implements Tagger { + + private final M metric; + private final Metadata thisMetricsMetadata; + + TaggerImpl(M metric, Metadata thisMetricsMetadata) { + this.metric = metric; + this.thisMetricsMetadata = thisMetricsMetadata; + } + + @Override public Tagger withSourceName(String sourceName) { + thisMetricsMetadata.tags.put(SOURCE, sourceName); + return this; + } + + @Override public Tagger withMetricName(String metricName) { + thisMetricsMetadata.tags.put(METRIC, metricName); + return this; + } + + @Override public Tagger withMetricType( + SignalFuseProtocolBuffers.MetricType metricType) { + thisMetricsMetadata.metricType = metricType; + return this; + } + + @Override public M metric() { + return metric; + } + } + + private static final class Metadata { + private final Map tags; + private SignalFuseProtocolBuffers.MetricType metricType = null; + + private Metadata() { + tags = new ConcurrentHashMap(6); + } + } +} diff --git a/signalfuse-codahale/src/main/java/com/signalfuse/codahale/metrics/SignalFuseReporter.java b/signalfuse-codahale/src/main/java/com/signalfuse/codahale/reporter/SignalFuseReporter.java similarity index 58% rename from signalfuse-codahale/src/main/java/com/signalfuse/codahale/metrics/SignalFuseReporter.java rename to signalfuse-codahale/src/main/java/com/signalfuse/codahale/reporter/SignalFuseReporter.java index df6f8881..35b2a0f1 100644 --- a/signalfuse-codahale/src/main/java/com/signalfuse/codahale/metrics/SignalFuseReporter.java +++ b/signalfuse-codahale/src/main/java/com/signalfuse/codahale/reporter/SignalFuseReporter.java @@ -1,36 +1,32 @@ -package com.signalfuse.codahale.metrics; +package com.signalfuse.codahale.reporter; -import java.io.Closeable; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.SortedMap; import java.util.concurrent.TimeUnit; import com.codahale.metrics.Counter; -import com.codahale.metrics.Counting; import com.codahale.metrics.Gauge; import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; -import com.codahale.metrics.Metered; +import com.codahale.metrics.Metric; import com.codahale.metrics.MetricFilter; import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Sampling; import com.codahale.metrics.ScheduledReporter; -import com.codahale.metrics.Snapshot; import com.codahale.metrics.Timer; -import com.signalfuse.metrics.SignalfuseMetricsException; import com.signalfuse.metrics.SourceNameHelper; import com.signalfuse.metrics.auth.AuthToken; import com.signalfuse.metrics.auth.StaticAuthToken; -import com.signalfuse.metrics.connection.DataPointReceiver; import com.signalfuse.metrics.connection.DataPointReceiverFactory; import com.signalfuse.metrics.connection.HttpDataPointProtobufReceiverFactory; import com.signalfuse.metrics.endpoint.DataPointEndpoint; import com.signalfuse.metrics.endpoint.DataPointReceiverEndpoint; import com.signalfuse.metrics.errorhandler.OnSendErrorHandler; import com.signalfuse.metrics.flush.AggregateMetricSender; +import com.signalfuse.metrics.protobuf.SignalFuseProtocolBuffers; /** * Reporter object for codahale metrics that reports values to com.signalfuse.signalfuse at some @@ -39,6 +35,8 @@ public class SignalFuseReporter extends ScheduledReporter { private final AggregateMetricSender aggregateMetricSender; private final Set detailsToAdd; + private final MetricMetadata metricMetadata; + private final Map hardCounterValueCache; /** * Creates a new {@link com.codahale.metrics.ScheduledReporter} instance. @@ -56,10 +54,13 @@ public class SignalFuseReporter extends ScheduledReporter { protected SignalFuseReporter(MetricRegistry registry, String name, MetricFilter filter, TimeUnit rateUnit, TimeUnit durationUnit, AggregateMetricSender aggregateMetricSender, - Set detailsToAdd) { + Set detailsToAdd, + MetricMetadata metricMetadata) { super(registry, name, filter, rateUnit, durationUnit); this.aggregateMetricSender = aggregateMetricSender; this.detailsToAdd = detailsToAdd; + this.metricMetadata = metricMetadata; + this.hardCounterValueCache = new HashMap(); } @Override @@ -67,17 +68,16 @@ public void report(SortedMap gauges, SortedMap c SortedMap histograms, SortedMap meters, SortedMap timers) { AggregateMetricSenderSessionWrapper session = new AggregateMetricSenderSessionWrapper( - aggregateMetricSender.createSession()); + aggregateMetricSender.createSession(), Collections.unmodifiableSet(detailsToAdd), metricMetadata, + aggregateMetricSender.getDefaultSourceName(), hardCounterValueCache); try { for (Map.Entry entry : gauges.entrySet()) { - Object gaugeValue = entry.getValue().getValue(); - if (gaugeValue instanceof Number) { - session.reportGauge(entry.getKey(), (Number) gaugeValue); - } + session.addMetric(entry.getValue(), entry.getKey(), + SignalFuseProtocolBuffers.MetricType.GAUGE, entry.getValue().getValue()); } for (Map.Entry entry : counters.entrySet()) { - session.metricSenderSession - .setCumulativeCounter(entry.getKey(), entry.getValue().getCount()); + session.addMetric(entry.getValue(), entry.getKey(), + SignalFuseProtocolBuffers.MetricType.CUMULATIVE_COUNTER, entry.getValue().getCount()); } for (Map.Entry entry : histograms.entrySet()) { session.addHistogram(entry.getKey(), entry.getValue()); @@ -97,99 +97,8 @@ public void report(SortedMap gauges, SortedMap c } } - private final class AggregateMetricSenderSessionWrapper implements Closeable { - private final AggregateMetricSender.Session metricSenderSession; - - private AggregateMetricSenderSessionWrapper( - AggregateMetricSender.Session metricSenderSession) { - this.metricSenderSession = metricSenderSession; - } - - public void close() { - try { - metricSenderSession.close(); - } catch (Exception e) { - throw new SignalfuseMetricsException("Unable to close session and send metrics", e); - } - } - - // These three called from report - private void addTimer(String key, Timer value) { - addMetered(key, value); - addSampling(key, value); - } - - private void addHistogram(String baseName, - Histogram histogram) { - addCounting(baseName, histogram); - addSampling(baseName, histogram); - } - - private void addMetered(String baseName, Metered metered) { - addCounting(baseName, metered); - checkedAdd(MetricDetails.RATE_15_MIN, baseName, metered.getFifteenMinuteRate()); - checkedAdd(MetricDetails.RATE_5_MIN, baseName, metered.getFiveMinuteRate()); - checkedAdd(MetricDetails.RATE_1_MIN, baseName, metered.getOneMinuteRate()); - if (detailsToAdd.contains(MetricDetails.RATE_MEAN)) { - checkedAdd(MetricDetails.RATE_MEAN, baseName, metered.getMeanRate()); - } - } - - // Shared - private void addCounting(String baseName, Counting counting) { - checkedAddCumulativeCounter(MetricDetails.COUNT, baseName, counting.getCount()); - } - - private void addSampling(String baseName, Sampling sampling) { - final Snapshot snapshot = sampling.getSnapshot(); - checkedAdd(MetricDetails.MEDIAN, baseName, snapshot.getMedian()); - checkedAdd(MetricDetails.PERCENT_75, baseName, snapshot.get75thPercentile()); - checkedAdd(MetricDetails.PERCENT_95, baseName, snapshot.get95thPercentile()); - checkedAdd(MetricDetails.PERCENT_98, baseName, snapshot.get98thPercentile()); - checkedAdd(MetricDetails.PERCENT_99, baseName, snapshot.get99thPercentile()); - checkedAdd(MetricDetails.PERCENT_999, baseName, snapshot.get999thPercentile()); - checkedAdd(MetricDetails.MAX, baseName, snapshot.getMax()); - checkedAdd(MetricDetails.MIN, baseName, snapshot.getMin()); - - // These are slower to calculate. Only calculate if we need. - if (detailsToAdd.contains(MetricDetails.STD_DEV)) { - checkedAdd(MetricDetails.STD_DEV, baseName, snapshot.getStdDev()); - } - if (detailsToAdd.contains(MetricDetails.MEAN)) { - checkedAdd(MetricDetails.MEAN, baseName, snapshot.getMean()); - } - } - - private void reportGauge(String baseName, Number value) { - if (Double.isInfinite(value.doubleValue()) || Double.isNaN(value.doubleValue())) { - return; - } - if (value instanceof Long || value instanceof Integer) { - metricSenderSession.setGauge(baseName, value.longValue()); - } else { - metricSenderSession.setGauge(baseName, value.doubleValue()); - } - } - - // helpers - private void checkedAddCumulativeCounter(MetricDetails type, String baseName, long value) { - if (detailsToAdd.contains(type)) { - metricSenderSession - .setCumulativeCounter(baseName + '.' + type.getDescription(), value); - } - } - - private void checkedAdd(MetricDetails type, String baseName, double value) { - if (detailsToAdd.contains(type)) { - metricSenderSession.setGauge(baseName + '.' + type.getDescription(), value); - } - } - - private void checkedAdd(MetricDetails type, String baseName, long value) { - if (detailsToAdd.contains(type)) { - metricSenderSession.setGauge(baseName + '.' + type.getDescription(), value); - } - } + public MetricMetadata getMetricMetadata() { + return metricMetadata; } public enum MetricDetails { @@ -213,7 +122,7 @@ public enum MetricDetails { RATE_1_MIN("rate.1min"), RATE_5_MIN("rate.5min"), RATE_15_MIN("rate.15min"); - public static final Set ALL = EnumSet.allOf(MetricDetails.class); + public static final Set ALL = Collections.unmodifiableSet(EnumSet.allOf(MetricDetails.class)); private final String description; MetricDetails(String description) { @@ -233,13 +142,13 @@ public static final class Builder { private String name = "signalfuse-reporter"; private int timeoutMs = HttpDataPointProtobufReceiverFactory.DEFAULT_TIMEOUT_MS; private DataPointReceiverFactory dataPointReceiverFactory = new - HttpDataPointProtobufReceiverFactory(); + HttpDataPointProtobufReceiverFactory(dataPointEndpoint); private MetricFilter filter = MetricFilter.ALL; private TimeUnit rateUnit = TimeUnit.SECONDS; private TimeUnit durationUnit = TimeUnit.MILLISECONDS; // Maybe nano eventually? private Set detailsToAdd = MetricDetails.ALL; - private Collection onSendErrorHandlerCollection = Collections - .emptyList(); + private Collection onSendErrorHandlerCollection = Collections.emptyList(); + private MetricMetadata metricMetadata = new MetricMetadataImpl(); public Builder(MetricRegistry registry, String authToken) { this(registry, new StaticAuthToken(authToken)); @@ -267,6 +176,9 @@ public Builder setAuthToken(AuthToken authToken) { public Builder setDataPointEndpoint(DataPointReceiverEndpoint dataPointEndpoint) { this.dataPointEndpoint = dataPointEndpoint; + this.dataPointReceiverFactory = + new HttpDataPointProtobufReceiverFactory(dataPointEndpoint) + .setTimeoutMs(this.timeoutMs); return this; } @@ -277,6 +189,9 @@ public Builder setName(String name) { public Builder setTimeoutMs(int timeoutMs) { this.timeoutMs = timeoutMs; + this.dataPointReceiverFactory = + new HttpDataPointProtobufReceiverFactory(dataPointEndpoint) + .setTimeoutMs(this.timeoutMs); return this; } @@ -312,13 +227,16 @@ public Builder setOnSendErrorHandlerCollection( return this; } + public Builder setMetricMetadata(MetricMetadata metricMetadata) { + this.metricMetadata = metricMetadata; + return this; + } + public SignalFuseReporter build() { - DataPointReceiver dataPointReceiver = dataPointReceiverFactory.setTimeoutMs(timeoutMs) - .createDataPointReceiver(dataPointEndpoint); AggregateMetricSender aggregateMetricSender = new AggregateMetricSender( - defaultSourceName, dataPointReceiver, authToken, onSendErrorHandlerCollection); + defaultSourceName, dataPointReceiverFactory, authToken, onSendErrorHandlerCollection); return new SignalFuseReporter(registry, name, filter, rateUnit, durationUnit, - aggregateMetricSender, detailsToAdd); + aggregateMetricSender, detailsToAdd, metricMetadata); } } } diff --git a/signalfuse-codahale/src/test/java/com/signalfuse/codahale/metrics/SignalFuseReporterTest.java b/signalfuse-codahale/src/test/java/com/signalfuse/codahale/metrics/SignalFuseReporterTest.java index efd28cae..c438b8c8 100644 --- a/signalfuse-codahale/src/test/java/com/signalfuse/codahale/metrics/SignalFuseReporterTest.java +++ b/signalfuse-codahale/src/test/java/com/signalfuse/codahale/metrics/SignalFuseReporterTest.java @@ -1,21 +1,14 @@ package com.signalfuse.codahale.metrics; -import java.util.Collections; -import java.util.SortedMap; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; import org.junit.Test; -import com.codahale.metrics.Counter; import com.codahale.metrics.Gauge; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Meter; -import com.codahale.metrics.MetricFilter; import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Timer; +import com.google.common.collect.ImmutableSet; +import com.signalfuse.codahale.reporter.SignalFuseReporter; import com.signalfuse.metrics.auth.StaticAuthToken; +import com.signalfuse.metrics.connection.StaticDataPointReceiverFactory; import com.signalfuse.metrics.connection.StoredDataPointReceiver; -import com.signalfuse.metrics.errorhandler.OnSendErrorHandler; -import com.signalfuse.metrics.flush.AggregateMetricSender; +import com.signalfuse.metrics.protobuf.SignalFuseProtocolBuffers; import static org.junit.Assert.assertEquals; @@ -25,56 +18,53 @@ public void testReporter() throws InterruptedException { StoredDataPointReceiver dbank = new StoredDataPointReceiver(); assertEquals(0, dbank.addDataPoints.size()); - AggregateMetricSender aggregateMetricSender = new AggregateMetricSender("", dbank, - new StaticAuthToken(""), - Collections.emptyList()); - MetricRegistry metricRegistery = new MetricRegistry(); - String name = "sf_reporter"; - MetricFilter filter = MetricFilter.ALL; - TimeUnit rateUnit = TimeUnit.SECONDS; - TimeUnit durationUnit = TimeUnit.SECONDS; - final Semaphore S = new Semaphore(0); - LockedSignalFuseReporter reporter = new LockedSignalFuseReporter(metricRegistery, name, - filter, rateUnit, durationUnit, aggregateMetricSender, S); + SignalFuseReporter reporter = new SignalFuseReporter.Builder(metricRegistery, new StaticAuthToken(""), "myserver") + .setDataPointReceiverFactory(new StaticDataPointReceiverFactory(dbank)) + .setDetailsToAdd(ImmutableSet.of(SignalFuseReporter.MetricDetails.COUNT, SignalFuseReporter.MetricDetails.MIN, SignalFuseReporter.MetricDetails.MAX)) + .build(); metricRegistery.register("gauge", new Gauge() { public Integer getValue() { return 1; } }); + + reporter.getMetricMetadata().tagMetric(metricRegistery.counter("counter")) + .withMetricName("newname") + .withSourceName("newsource") + .withMetricType(SignalFuseProtocolBuffers.MetricType.GAUGE); metricRegistery.counter("counter").inc(); metricRegistery.counter("counter").inc(); - reporter.start(1, TimeUnit.MICROSECONDS); - S.acquire(1); + metricRegistery.timer("atimer").time().close(); - assertEquals(2, dbank.addDataPoints.size()); - assertEquals(2, (int) dbank.addDataPoints.get(1).getValue().getIntValue()); - } + reporter.report(); + + assertEquals(5, dbank.addDataPoints.size()); + assertEquals("newname", dbank.addDataPoints.get(1).getMetric()); + assertEquals("newsource", dbank.addDataPoints.get(1).getSource()); + assertEquals(SignalFuseProtocolBuffers.MetricType.GAUGE, dbank.registeredMetrics.get( + "newname")); + assertEquals(SignalFuseProtocolBuffers.MetricType.CUMULATIVE_COUNTER, dbank.registeredMetrics.get("atimer.count")); + assertEquals(SignalFuseProtocolBuffers.MetricType.GAUGE, dbank.registeredMetrics.get("atimer.max")); + assertEquals(2, dbank.lastValueFor("newsource", "newname").getIntValue()); - /** - * A version of SignalFuseReporter that lets us know (via a Semaphore) when it's report() is - * called, then stops itself - */ - private static final class LockedSignalFuseReporter extends SignalFuseReporter { - private final Semaphore S; + assertEquals("atimer.count", dbank.addDataPoints.get(2).getMetric()); - private LockedSignalFuseReporter(MetricRegistry registry, String name, - MetricFilter filter, TimeUnit rateUnit, - TimeUnit durationUnit, AggregateMetricSender metricFactory, - Semaphore S) { - super(registry, name, filter, rateUnit, durationUnit, metricFactory, MetricDetails.ALL); - this.S = S; - } + dbank.addDataPoints.clear(); + reporter.getMetricMetadata().tagMetric(metricRegistery.counter("raw_counter")) + .withMetricType(SignalFuseProtocolBuffers.MetricType.COUNTER); + metricRegistery.counter("raw_counter").inc(10); + reporter.report(); + assertEquals(6, dbank.addDataPoints.size()); + assertEquals(10, dbank.lastValueFor("myserver", "raw_counter").getIntValue()); + assertEquals(SignalFuseProtocolBuffers.MetricType.COUNTER, dbank.registeredMetrics.get("raw_counter")); + metricRegistery.counter("raw_counter").inc(14); + dbank.addDataPoints.clear(); + reporter.report(); + assertEquals(6, dbank.addDataPoints.size()); + assertEquals(14, dbank.lastValueFor("myserver", "raw_counter").getIntValue()); - @Override - public void report(SortedMap gauges, SortedMap counters, - SortedMap histograms, - SortedMap meters, SortedMap timers) { - super.report(gauges, counters, histograms, meters, timers); - S.release(); - this.stop(); - } } } diff --git a/signalfuse-commons-protoc-java/pom.xml b/signalfuse-commons-protoc-java/pom.xml index 8b71c2af..687222c5 100644 --- a/signalfuse-commons-protoc-java/pom.xml +++ b/signalfuse-commons-protoc-java/pom.xml @@ -6,12 +6,12 @@ com.signalfuse.public clients-parent - 1-SNAPSHOT + 0.0.6-SNAPSHOT signalfuse-commons-protoc-java SignalFuse Protobuf Utilities - 1.0-SNAPSHOT + 0.0.6-SNAPSHOT jar Common java protocol buffer library functions diff --git a/signalfuse-java/pom.xml b/signalfuse-java/pom.xml index 7462de77..6baa0fdd 100644 --- a/signalfuse-java/pom.xml +++ b/signalfuse-java/pom.xml @@ -6,12 +6,12 @@ com.signalfuse.public clients-parent - 1-SNAPSHOT + 0.0.6-SNAPSHOT signalfuse-java SignalFuse java libraries - 1.0-SNAPSHOT + 0.0.6-SNAPSHOT jar Bare minimum core library needed to sending metrics to SignalFuse from Java clients @@ -92,5 +92,12 @@ + + + src/main/resources + true + + + diff --git a/signalfuse-java/src/main/java/com/signalfuse/metrics/SendMetrics.java b/signalfuse-java/src/main/java/com/signalfuse/metrics/SendMetrics.java index a23e9fac..32eacd5e 100644 --- a/signalfuse-java/src/main/java/com/signalfuse/metrics/SendMetrics.java +++ b/signalfuse-java/src/main/java/com/signalfuse/metrics/SendMetrics.java @@ -25,8 +25,7 @@ public static void main(String[] args) throws Exception { DataPointReceiverEndpoint dataPointEndpoint = new DataPointEndpoint(host, DataPointEndpoint.DEFAULT_PORT); AggregateMetricSender mf = new AggregateMetricSender("test", - new HttpDataPointProtobufReceiverFactory() - .createDataPointReceiver(dataPointEndpoint), + new HttpDataPointProtobufReceiverFactory(dataPointEndpoint), new StaticAuthToken(auth_token), Collections.emptyList()); int count = 0; diff --git a/signalfuse-java/src/main/java/com/signalfuse/metrics/connection/DataPointReceiver.java b/signalfuse-java/src/main/java/com/signalfuse/metrics/connection/DataPointReceiver.java index fe655001..ad0a6362 100644 --- a/signalfuse-java/src/main/java/com/signalfuse/metrics/connection/DataPointReceiver.java +++ b/signalfuse-java/src/main/java/com/signalfuse/metrics/connection/DataPointReceiver.java @@ -14,6 +14,6 @@ void backfillDataPoints(String auth, String source, String metric, List datumPoints) throws SignalfuseMetricsException; - void registerMetrics(String auth, Map metricTypes) + Map registerMetrics(String auth, Map metricTypes) throws SignalfuseMetricsException; } diff --git a/signalfuse-java/src/main/java/com/signalfuse/metrics/connection/DataPointReceiverFactory.java b/signalfuse-java/src/main/java/com/signalfuse/metrics/connection/DataPointReceiverFactory.java index 9f8937f9..b633c1c1 100644 --- a/signalfuse-java/src/main/java/com/signalfuse/metrics/connection/DataPointReceiverFactory.java +++ b/signalfuse-java/src/main/java/com/signalfuse/metrics/connection/DataPointReceiverFactory.java @@ -1,8 +1,5 @@ package com.signalfuse.metrics.connection; -import com.signalfuse.metrics.SignalfuseMetricsException; -import com.signalfuse.metrics.endpoint.DataPointReceiverEndpoint; - /** * A factory that creates connections to datapoint given an endpoint to connect to. * @@ -12,8 +9,5 @@ public interface DataPointReceiverFactory { /** * Create connection to datapoint. */ - DataPointReceiver createDataPointReceiver(DataPointReceiverEndpoint dataPointEndpoint) - throws SignalfuseMetricsException; - - public DataPointReceiverFactory setTimeoutMs(int timeoutMs); + DataPointReceiver createDataPointReceiver(); } diff --git a/signalfuse-java/src/main/java/com/signalfuse/metrics/connection/HttpDataPointProtobufReceiverConnection.java b/signalfuse-java/src/main/java/com/signalfuse/metrics/connection/HttpDataPointProtobufReceiverConnection.java index 39af275c..060fcaf3 100644 --- a/signalfuse-java/src/main/java/com/signalfuse/metrics/connection/HttpDataPointProtobufReceiverConnection.java +++ b/signalfuse-java/src/main/java/com/signalfuse/metrics/connection/HttpDataPointProtobufReceiverConnection.java @@ -3,9 +3,11 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import org.apache.commons.io.IOUtils; import org.apache.http.HttpHost; @@ -21,7 +23,9 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; import com.signalfuse.common.proto.ProtocolBufferStreamingInputStream; import com.signalfuse.metrics.SignalfuseMetricsException; import com.signalfuse.metrics.endpoint.DataPointReceiverEndpoint; @@ -30,7 +34,7 @@ public class HttpDataPointProtobufReceiverConnection implements DataPointReceiver { private static final ContentType PROTO_TYPE = ContentType.create("application/x-protobuf"); private static final ContentType JSON_TYPE = ContentType.APPLICATION_JSON; - private static final String USER_AGENT = "SignalFx-java-client/0.0.2"; + private static final String USER_AGENT = "SignalFx-java-client/" + version(); private static final Logger log = LoggerFactory .getLogger(HttpDataPointProtobufReceiverConnection.class); private final CloseableHttpClient client = HttpClientBuilder.create().build(); @@ -45,6 +49,17 @@ public HttpDataPointProtobufReceiverConnection(DataPointReceiverEndpoint dataPoi .setConnectionRequestTimeout(timeoutMs).setConnectTimeout(timeoutMs).build(); } + private static final String version() { + final Properties properties = new Properties(); + try { + properties.load(HttpDataPointProtobufReceiverConnection.class + .getResourceAsStream("signalfuse.java.client.version")); + return properties.getProperty("signalfuse.java.client.version", "0.0.0"); + } catch (IOException e) { + return "0.0.1"; + } + } + private CloseableHttpResponse postToEndpoint(String auth, InputStream postBodyInputStream, String endpoint, ContentType contentType) throws IOException { @@ -128,50 +143,67 @@ public void backfillDataPoints(String auth, String source, String metric, } @Override - public void registerMetrics(String auth, + public Map registerMetrics(String auth, Map metricTypes) throws SignalfuseMetricsException { + Map res = new HashMap(); + for (Map.Entry i: metricTypes.entrySet()) { + res.put(i.getKey(), false); + } if (metricTypes.isEmpty()) { - return; + return res; } + List> postBodyList = new ArrayList>(metricTypes.size()); for (Map.Entry entity : metricTypes .entrySet()) { - Map post_body = new HashMap(2); - post_body.put("sf_metric", entity.getKey()); - post_body.put("sf_metricType", entity.getValue().toString()); - final byte[] map_as_json; - try { - map_as_json = new ObjectMapper().writeValueAsBytes(post_body); - } catch (JsonProcessingException e) { - throw new SignalfuseMetricsException("Unable to write protocol buffer", e); - } + postBodyList.add(ImmutableMap.of("sf_metric", entity.getKey(), "sf_metricType", entity.getValue().toString())); + } + + final byte[] map_as_json; + try { + map_as_json = new ObjectMapper().writeValueAsBytes(postBodyList); + } catch (JsonProcessingException e) { + throw new SignalfuseMetricsException("Unable to write protocol buffer", e); + } + String body = ""; + try { + CloseableHttpResponse resp = null; try { - CloseableHttpResponse resp = null; + resp = postToEndpoint(auth, + new ByteArrayInputStream(map_as_json), "/metric?bulkupdate=true", + JSON_TYPE); try { - resp = postToEndpoint(auth, - new ByteArrayInputStream(map_as_json), "/metric", - JSON_TYPE); - if (resp.getStatusLine().getStatusCode() != HttpStatus.SC_CONFLICT - && resp.getStatusLine().getStatusCode() != HttpStatus.SC_CREATED) { - final String body; - try { - body = IOUtils.toString(resp.getEntity().getContent()); - } catch (IOException e) { - throw new SignalfuseMetricsException("Unable to get reponse content", - e); - } - throw new SignalfuseMetricsException("Invalid status code " - + resp.getStatusLine().getStatusCode() + ": " + body); - } - } finally { - if (resp != null) { - resp.close(); + body = IOUtils.toString(resp.getEntity().getContent()); + } catch (IOException e) { + throw new SignalfuseMetricsException("Unable to get reponse content", + e); + } + if (resp.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { + throw new SignalfuseMetricsException("Invalid status code " + + resp.getStatusLine().getStatusCode() + ": " + body); + } + List> respObject = + new ObjectMapper().readValue(body.getBytes(), + new TypeReference>>(){}); + if (respObject.size() != metricTypes.size()) { + throw new SignalfuseMetricsException( + String.format("json map mismatch: post_body=%s, resp=%s", + new String(map_as_json), body)); + } + for (int i=0;i m = respObject.get(i); + if (!m.containsKey("code") || "409".equals(m.get("code").toString())) { + res.put(postBodyList.get(i).get("sf_metric"), true); } } - } catch (IOException e) { - throw new SignalfuseMetricsException(String.format("series=%s, auth=%s, post=%s", - auth, post_body, e)); + } finally { + if (resp != null) { + resp.close(); + } } + } catch (IOException e) { + throw new SignalfuseMetricsException(String.format("post_body=%s, resp=%s", new String(map_as_json), body), e); } + return res; } } diff --git a/signalfuse-java/src/main/java/com/signalfuse/metrics/connection/HttpDataPointProtobufReceiverFactory.java b/signalfuse-java/src/main/java/com/signalfuse/metrics/connection/HttpDataPointProtobufReceiverFactory.java index 9a02b80a..1aab02d0 100644 --- a/signalfuse-java/src/main/java/com/signalfuse/metrics/connection/HttpDataPointProtobufReceiverFactory.java +++ b/signalfuse-java/src/main/java/com/signalfuse/metrics/connection/HttpDataPointProtobufReceiverFactory.java @@ -5,15 +5,20 @@ public class HttpDataPointProtobufReceiverFactory implements DataPointReceiverFactory { public static final int DEFAULT_TIMEOUT_MS = 2000; + private final DataPointReceiverEndpoint dataPointEndpoint; private int timeoutMs = DEFAULT_TIMEOUT_MS; + public HttpDataPointProtobufReceiverFactory(DataPointReceiverEndpoint dataPointEndpoint) { + this.dataPointEndpoint = dataPointEndpoint; + } + public HttpDataPointProtobufReceiverFactory setTimeoutMs(int timeoutMs) { this.timeoutMs = timeoutMs; return this; } @Override - public DataPointReceiver createDataPointReceiver(DataPointReceiverEndpoint dataPointEndpoint) throws + public DataPointReceiver createDataPointReceiver() throws SignalfuseMetricsException { return new HttpDataPointProtobufReceiverConnection(dataPointEndpoint, this.timeoutMs); } diff --git a/signalfuse-java/src/main/java/com/signalfuse/metrics/connection/StaticDataPointReceiverFactory.java b/signalfuse-java/src/main/java/com/signalfuse/metrics/connection/StaticDataPointReceiverFactory.java new file mode 100644 index 00000000..5c7f8dfd --- /dev/null +++ b/signalfuse-java/src/main/java/com/signalfuse/metrics/connection/StaticDataPointReceiverFactory.java @@ -0,0 +1,13 @@ +package com.signalfuse.metrics.connection; + +public class StaticDataPointReceiverFactory implements DataPointReceiverFactory { + private final DataPointReceiver dataPointReciever; + + public StaticDataPointReceiverFactory(DataPointReceiver dataPointReciever) { + this.dataPointReciever = dataPointReciever; + } + + @Override public DataPointReceiver createDataPointReceiver() { + return this.dataPointReciever; + } +} diff --git a/signalfuse-java/src/main/java/com/signalfuse/metrics/connection/StoredDataPointReceiver.java b/signalfuse-java/src/main/java/com/signalfuse/metrics/connection/StoredDataPointReceiver.java index 3165d75d..89daae09 100644 --- a/signalfuse-java/src/main/java/com/signalfuse/metrics/connection/StoredDataPointReceiver.java +++ b/signalfuse-java/src/main/java/com/signalfuse/metrics/connection/StoredDataPointReceiver.java @@ -6,6 +6,9 @@ import java.util.List; import java.util.Map; +import org.apache.commons.lang3.tuple.Pair; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.signalfuse.metrics.SignalfuseMetricsException; import com.signalfuse.metrics.protobuf.SignalFuseProtocolBuffers; @@ -16,6 +19,7 @@ */ public class StoredDataPointReceiver implements DataPointReceiver { public final List addDataPoints; + private final Map, List> pointsFor; public final Map registeredMetrics; public boolean throwOnAdd = false; @@ -23,6 +27,8 @@ public StoredDataPointReceiver() { addDataPoints = Collections .synchronizedList(new ArrayList()); registeredMetrics = Collections.synchronizedMap(new HashMap()); + + pointsFor = Maps.newHashMap(); } @Override @@ -32,6 +38,14 @@ public void addDataPoints(String auth, List throw new SignalfuseMetricsException("Flag set to true"); } addDataPoints.addAll(dataPoints); + for (SignalFuseProtocolBuffers.DataPoint dp: dataPoints) { + Pair key = Pair.of(dp.getSource(), dp.getMetric()); + if (pointsFor.containsKey(key)) { + pointsFor.get(key).add(dp.getValue()); + } else { + pointsFor.put(key, Lists.newArrayList(dp.getValue())); + } + } } @Override @@ -40,10 +54,33 @@ public void backfillDataPoints(String auth, String source, String metric, throws SignalfuseMetricsException {} @Override - public void registerMetrics(String auth, + public Map registerMetrics(String auth, Map metricTypes) throws SignalfuseMetricsException { registeredMetrics.putAll(metricTypes); + Map ret = new HashMap(); + for (Map.Entry i: metricTypes.entrySet()) { + ret.put(i.getKey(), true); + } + return ret; + } + + public List valuesFor(String source, String metric) { + Pair key = Pair.of(source, metric); + List ret = pointsFor.get(key); + if (ret == null) { + return Collections.emptyList(); + } else { + return Collections.unmodifiableList(ret); + } } + public SignalFuseProtocolBuffers.Datum lastValueFor(String source, String metric) { + List vals = valuesFor(source, metric); + if (vals.isEmpty()) { + throw new RuntimeException("No value for source/metric"); + } else { + return vals.get(vals.size() - 1); + } + } } diff --git a/signalfuse-java/src/main/java/com/signalfuse/metrics/endpoint/DataPointEndpoint.java b/signalfuse-java/src/main/java/com/signalfuse/metrics/endpoint/DataPointEndpoint.java index cfa7bb10..7b26a0fd 100644 --- a/signalfuse-java/src/main/java/com/signalfuse/metrics/endpoint/DataPointEndpoint.java +++ b/signalfuse-java/src/main/java/com/signalfuse/metrics/endpoint/DataPointEndpoint.java @@ -83,6 +83,6 @@ private static int getDefaultPort() throws NumberFormatException { @Override public String toString() { - return getScheme() + "//" + getHostname() + ':' + getPort(); + return getScheme() + "://" + getHostname() + ':' + getPort(); } } diff --git a/signalfuse-java/src/main/java/com/signalfuse/metrics/flush/AggregateMetricSender.java b/signalfuse-java/src/main/java/com/signalfuse/metrics/flush/AggregateMetricSender.java index 1e45f20a..b2bd7ce6 100644 --- a/signalfuse-java/src/main/java/com/signalfuse/metrics/flush/AggregateMetricSender.java +++ b/signalfuse-java/src/main/java/com/signalfuse/metrics/flush/AggregateMetricSender.java @@ -4,12 +4,16 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import com.signalfuse.metrics.SignalfuseMetricsException; import com.signalfuse.metrics.auth.AuthToken; import com.signalfuse.metrics.auth.NoAuthTokenException; import com.signalfuse.metrics.connection.DataPointReceiver; +import com.signalfuse.metrics.connection.DataPointReceiverFactory; import com.signalfuse.metrics.errorhandler.MetricErrorImpl; import com.signalfuse.metrics.errorhandler.MetricErrorType; import com.signalfuse.metrics.errorhandler.OnSendErrorHandler; @@ -17,22 +21,25 @@ public class AggregateMetricSender { private final String defaultSourceName; - private final Map registeredMetricPairs; - private final DataPointReceiver dataPointReceiver; + private final Set registeredMetricPairs; + private final DataPointReceiverFactory dataPointReceiverFactory; private final AuthToken authToken; private final Collection onSendErrorHandlerCollection; - public AggregateMetricSender(String defaultSourceName, DataPointReceiver dataPointReceiver, + public AggregateMetricSender(String defaultSourceName, DataPointReceiverFactory dataPointReceiverFactory, AuthToken authToken, Collection onSendErrorHandlerCollection) { this.defaultSourceName = defaultSourceName; - registeredMetricPairs = new HashMap(); - this.dataPointReceiver = dataPointReceiver; + registeredMetricPairs = new HashSet(); + this.dataPointReceiverFactory = dataPointReceiverFactory; this.authToken = authToken; this.onSendErrorHandlerCollection = onSendErrorHandlerCollection; } + public String getDefaultSourceName() { + return defaultSourceName; + } + private void communicateError(String message, MetricErrorType code, SignalfuseMetricsException signalfuseMetricsException) { for (OnSendErrorHandler onSendErrorHandler : onSendErrorHandlerCollection) { @@ -62,8 +69,7 @@ public Session setCumulativeCounter(String metric, long value) { } public Session setCumulativeCounter(String source, String metric, long value) { - check(metric, SignalFuseProtocolBuffers.MetricType.CUMULATIVE_COUNTER); - addDatapoint(source, metric, value); + setDatapoint(source, metric, SignalFuseProtocolBuffers.MetricType.CUMULATIVE_COUNTER, value); return this; } @@ -72,7 +78,24 @@ public Session incrementCounter(String metric, long value) { } public Session incrementCounter(String source, String metric, long value) { - check(metric, SignalFuseProtocolBuffers.MetricType.COUNTER); + setDatapoint(source, metric, SignalFuseProtocolBuffers.MetricType.COUNTER, value); + return this; + } + + @Override + public Session setDatapoint(String source, String metric, + SignalFuseProtocolBuffers.MetricType metricType, + long value) { + check(metric, metricType); + addDatapoint(source, metric, value); + return this; + } + + @Override + public Session setDatapoint(String source, String metric, + SignalFuseProtocolBuffers.MetricType metricType, + double value) { + check(metric, metricType); addDatapoint(source, metric, value); return this; } @@ -82,8 +105,7 @@ public Session setGauge(String metric, long value) { } public Session setGauge(String source, String metric, long value) { - check(metric, SignalFuseProtocolBuffers.MetricType.GAUGE); - addDatapoint(source, metric, value); + setDatapoint(source, metric, SignalFuseProtocolBuffers.MetricType.GAUGE, value); return this; } @@ -92,8 +114,7 @@ public Session setGauge(String metric, double value) { } public Session setGauge(String source, String metric, double value) { - check(metric, SignalFuseProtocolBuffers.MetricType.GAUGE); - addDatapoint(source, metric, value); + setDatapoint(source, metric, SignalFuseProtocolBuffers.MetricType.GAUGE, value); return this; } @@ -115,7 +136,7 @@ private void addDatapoint(String source, String metric, long value) { private void check(String metricPair, com.signalfuse.metrics.protobuf.SignalFuseProtocolBuffers.MetricType metricType) { - if (!registeredMetricPairs.containsKey(metricPair)) { + if (!registeredMetricPairs.contains(metricPair)) { toBeRegisteredMetricPairs.put(metricPair, metricType); } } @@ -130,10 +151,17 @@ public void close() { return; } + DataPointReceiver dataPointReceiver = dataPointReceiverFactory.createDataPointReceiver(); + if (!toBeRegisteredMetricPairs.isEmpty()) { try { - dataPointReceiver.registerMetrics(authTokenStr, + Map registeredPairs = dataPointReceiver.registerMetrics(authTokenStr, toBeRegisteredMetricPairs); + for (Map.Entry i: registeredPairs.entrySet()) { + if (i.getValue()) { + registeredMetricPairs.add(i.getKey()); + } + } } catch (SignalfuseMetricsException e) { communicateError("Unable to register metrics", MetricErrorType.REGISTRATION_ERROR, @@ -142,7 +170,13 @@ public void close() { } } - registeredMetricPairs.putAll(toBeRegisteredMetricPairs); + Iterator i = pointsToFlush.iterator(); + while (i.hasNext()) { + SignalFuseProtocolBuffers.DataPoint currentEntry = i.next(); + if (!registeredMetricPairs.contains(currentEntry.getMetric())) { + i.remove(); + } + } if (!pointsToFlush.isEmpty()) { try { @@ -173,5 +207,9 @@ public interface Session extends Closeable { Session incrementCounter(String metric, long value); Session incrementCounter(String source, String metric, long value); + + Session setDatapoint(String source, String metric, SignalFuseProtocolBuffers.MetricType metricType, long value); + + Session setDatapoint(String source, String metric, SignalFuseProtocolBuffers.MetricType metricType, double value); } } diff --git a/signalfuse-java/src/main/resources/.properties b/signalfuse-java/src/main/resources/.properties new file mode 100644 index 00000000..df24b8ae --- /dev/null +++ b/signalfuse-java/src/main/resources/.properties @@ -0,0 +1 @@ +signalfuse.java.client.version=${project.version} diff --git a/signalfuse-protoc/pom.xml b/signalfuse-protoc/pom.xml index bc99d898..075240c5 100644 --- a/signalfuse-protoc/pom.xml +++ b/signalfuse-protoc/pom.xml @@ -6,12 +6,12 @@ com.signalfuse.public clients-parent - 1-SNAPSHOT + 0.0.6-SNAPSHOT signalfuse-protoc SignalFuse Protocol Buffer definitions - 1.0-SNAPSHOT + 0.0.6-SNAPSHOT jar diff --git a/update_version.py b/update_version.py new file mode 100644 index 00000000..66d49ba9 --- /dev/null +++ b/update_version.py @@ -0,0 +1,28 @@ +import os +import subprocess +from subprocess import PIPE +import sys +import logging + +logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) +logger = logging.getLogger(__name__) + +version = sys.argv[1] + +def execute(cmd, expected_code=None, stdin=None, background=False): + logger.debug("Executing %s", cmd) + proc = subprocess.Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) + if background: + return ('', '', 0) # In background + stdout, stderr = proc.communicate(stdin) + logger.debug("Result (%s, %s, %d)", stdout, stderr, proc.returncode) + ret = (stdout, stderr, proc.returncode) + if expected_code is not None and expected_code != ret[2]: + raise ExecuteError("Unable to execute command %s, result %s/%s/%d", ret[0], ret[1], ret[2]) + return ret + +(stdout, _, code) = execute(["find", os.getcwd(), '-name', 'pom.xml'], expected_code=0) +for dir in stdout.strip().split("\n"): + dirname = os.path.dirname(dir) + os.chdir(dirname) + (stdout, _, code) = execute(["mvn", "versions:set", 'versions:update-child-modules', '-DnewVersion=%s' % version], expected_code=0)