Permalink
Browse files

replaced ExponentiallyDecayingReservoir with SlidingTimeWindowReservoir

  • Loading branch information...
1 parent bcdaa71 commit 486949c7eccd6e770bf036b639bae5dbed171ae6 @mikedanese mikedanese committed Dec 31, 2013
@@ -1,16 +1,16 @@
-package com.raskol.tradebot.math;
+package com.raskol.tradebot.metrics;
import com.codahale.metrics.*;
import com.google.common.base.Joiner;
import java.util.ArrayList;
-public class ExponentiallyDecayingHistogram implements Metric, Sampling, Counting {
- private final ExponentiallyDecayingReservoir reservoir;
+public class CompleteHistogram implements Metric, Sampling, Counting {
+ private final SlidingTimeWindowReservoir reservoir;
private final String label;
- private final com.raskol.tradebot.math.LongAdder count;
+ private final LongAdder count;
- public ExponentiallyDecayingHistogram(ExponentiallyDecayingReservoir reservoir, String label) {
+ public CompleteHistogram(SlidingTimeWindowReservoir reservoir, String label) {
this.reservoir = reservoir;
this.label = label;
this.count = new LongAdder();
@@ -30,11 +30,6 @@ public void update(long value) {
reservoir.update(value);
}
- public void update(long value, long timestamp) {
- count.increment();
- reservoir.update(value, timestamp);
- }
-
@Override
public long getCount() {
return count.sum();
@@ -48,9 +43,10 @@ public Snapshot getSnapshot() {
@Override
public String toString() {
final ArrayList<String> sb = new ArrayList<>();
- sb.add(String.format("%s:", getLabel()));
- sb.add(String.format(" count = %d", getCount()));
final Snapshot snapshot = reservoir.getSnapshot();
+
+ sb.add(String.format("%s:", getLabel()));
+ sb.add(String.format(" count = %d", snapshot.getValues().length));
sb.add(String.format(" min = %d", snapshot.getMin()));
sb.add(String.format(" max = %d", snapshot.getMax()));
sb.add(String.format(" mean = %2.2f", snapshot.getMean()));
@@ -61,6 +57,7 @@ public String toString() {
sb.add(String.format(" 98%% <= %2.2f", snapshot.get98thPercentile()));
sb.add(String.format(" 99%% <= %2.2f", snapshot.get99thPercentile()));
sb.add(String.format(" 99.9%% <= %2.2f", snapshot.get999thPercentile()));
+
return Joiner.on("\n").join(sb);
}
}
@@ -8,11 +8,12 @@
* http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/LongAdder.java?revision=1.14&view=markup
*/
-package com.raskol.tradebot.math;
+package com.raskol.tradebot.metrics;
import java.io.Serializable;
// CHECKSTYLE:OFF
+
/**
* One or more variables that together maintain an initially zero {@code long} sum. When updates
* (method {@link #add}) are contended across threads, the set of variables may grow dynamically to
@@ -0,0 +1,91 @@
+package com.raskol.tradebot.metrics;
+
+import com.codahale.metrics.Clock;
+import com.codahale.metrics.Reservoir;
+import com.codahale.metrics.Snapshot;
+
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class SlidingTimeWindowReservoir implements Reservoir {
+ // allow for this many duplicate ticks before overwriting measurements
+ private static final int COLLISION_BUFFER = 256;
+ // only trim on updating once every N
+ private static final int TRIM_THRESHOLD = 256;
+
+ private final Clock clock;
+ private final ConcurrentSkipListMap<Long, Long> measurements;
+ private final long window;
+ private final AtomicLong lastTick;
+ private final AtomicLong count;
+
+ /**
+ * Creates a new {@link SlidingTimeWindowReservoir} with the given window of time.
+ *
+ * @param window the window of time
+ * @param windowUnit the unit of {@code window}
+ */
+ public SlidingTimeWindowReservoir(long window, TimeUnit windowUnit) {
+ this(window, windowUnit, Clock.defaultClock());
+ }
+
+ /**
+ * Creates a new {@link SlidingTimeWindowReservoir} with the given clock and window of time.
+ *
+ * @param window the window of time
+ * @param windowUnit the unit of {@code window}
+ * @param clock the {@link Clock} to use
+ */
+ public SlidingTimeWindowReservoir(long window, TimeUnit windowUnit, Clock clock) {
+ this.clock = clock;
+ this.measurements = new ConcurrentSkipListMap<Long, Long>();
+ this.window = windowUnit.toNanos(window) * COLLISION_BUFFER;
+ this.lastTick = new AtomicLong();
+ this.count = new AtomicLong();
+ }
+
+ @Override
+ public int size() {
+ trim();
+ return measurements.size();
+ }
+
+ @Override
+ public void update(long value) {
+ if (count.incrementAndGet() % TRIM_THRESHOLD == 0) {
+ trim();
+ }
+ measurements.put(getTick(), value);
+ }
+
+ public void update(long value, long timestamp) {
+ if (count.incrementAndGet() % TRIM_THRESHOLD == 0) {
+ trim();
+ }
+ measurements.put(timestamp, value);
+ }
+
+ @Override
+ public Snapshot getSnapshot() {
+ trim();
+ return new Snapshot(measurements.values());
+ }
+
+ private long getTick() {
+ for (; ; ) {
+ final long oldTick = lastTick.get();
+ final long tick = clock.getTick() * COLLISION_BUFFER;
+ // ensure the tick is strictly incrementing even if there are duplicate ticks
+ final long newTick = tick > oldTick ? tick : oldTick + 1;
+ if (lastTick.compareAndSet(oldTick, newTick)) {
+ return newTick;
+ }
+ }
+ }
+
+ private void trim() {
+ measurements.headMap(getTick() - window).clear();
+ }
+}
+
@@ -6,11 +6,12 @@
*
* http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/Striped64.java?revision=1.8&view=markup
*/
-package com.raskol.tradebot.math;
+package com.raskol.tradebot.metrics;
import java.util.Random;
// CHECKSTYLE:OFF
+
/**
* A package-local class holding common representation and mechanics for classes supporting dynamic
* striping on 64bit values. The class extends Number so that concrete subclasses must publicly do
@@ -2,6 +2,7 @@
import com.google.common.base.Joiner;
import com.raskol.tradebot.service.TickerService;
+import com.yammer.metrics.annotation.Timed;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@@ -16,7 +17,7 @@ public TickerResource(TickerService tickerService) {
}
@GET
- @Produces("text/plain")
+ @Produces("text/plain") @Timed
public String getInfos() {
return Joiner.on("\n\n\n").join(tickerService.getHistograms());
}
@@ -1,8 +1,9 @@
package com.raskol.tradebot.service;
-import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.codahale.metrics.Histogram;
import com.google.common.util.concurrent.AbstractScheduledService;
-import com.raskol.tradebot.math.ExponentiallyDecayingHistogram;
+import com.raskol.tradebot.metrics.CompleteHistogram;
+import com.raskol.tradebot.metrics.SlidingTimeWindowReservoir;
import com.xeiam.xchange.ExchangeFactory;
import com.xeiam.xchange.bitstamp.BitstampExchange;
import com.xeiam.xchange.bitstamp.service.polling.BitstampPollingMarketDataService;
@@ -16,46 +17,42 @@
import java.util.concurrent.TimeUnit;
public class TickerService extends AbstractScheduledService {
- private static final int INTERVAL = 5;
- public static final int SAMPLE_SIZE = 1024;
-
private final PollingMarketDataService marketDataService = ExchangeFactory.INSTANCE
.createExchange(BitstampExchange.class.getName()).getPollingMarketDataService();
- private final ExponentiallyDecayingHistogram oneMinute = getExponentiallyDecayingHistogram("one minute", SAMPLE_SIZE, getAlpha(1, TimeUnit.MINUTES));
- private final ExponentiallyDecayingHistogram oneHour = getExponentiallyDecayingHistogram("one hour", SAMPLE_SIZE, getAlpha(1, TimeUnit.HOURS));
- private final ExponentiallyDecayingHistogram oneDay = getExponentiallyDecayingHistogram("one day", SAMPLE_SIZE, getAlpha(1, TimeUnit.DAYS));
- private final ExponentiallyDecayingHistogram fiveDays = getExponentiallyDecayingHistogram("five days", SAMPLE_SIZE, getAlpha(5, TimeUnit.DAYS));
- private final ExponentiallyDecayingHistogram oneMonth = getExponentiallyDecayingHistogram("one month", SAMPLE_SIZE, getAlpha(30, TimeUnit.DAYS));
- private final List<ExponentiallyDecayingHistogram> histograms = Arrays.asList(oneMinute, oneHour, oneDay, fiveDays, oneMonth);
+ private final CompleteHistogram oneMinute = getExponentiallyDecayingHistogram("one minute", 1, TimeUnit.MINUTES);
+ private final CompleteHistogram oneHour = getExponentiallyDecayingHistogram("one hour", 1, TimeUnit.HOURS);
+ private final CompleteHistogram oneDay = getExponentiallyDecayingHistogram("one day", 1, TimeUnit.DAYS);
+ private final CompleteHistogram fiveDays = getExponentiallyDecayingHistogram("five days", 5, TimeUnit.DAYS);
+ private final CompleteHistogram oneMonth = getExponentiallyDecayingHistogram("one month", 30, TimeUnit.DAYS);
+ private final List<CompleteHistogram> histograms = Arrays.asList(oneMinute, oneHour, oneDay, fiveDays, oneMonth);
@Override
protected void runOneIteration() throws Exception {
Trades trades = marketDataService.getTrades(Currencies.BTC, Currencies.USD, BitstampPollingMarketDataService.BitstampTime.MINUTE);
for (Trade trade : trades.getTrades()) {
- final long time = trade.getTimestamp().getTime();
final long price = trade.getPrice().getAmountMinorLong();
- for (ExponentiallyDecayingHistogram histogram : histograms) {
- histogram.update(price, time);
+ for (CompleteHistogram histogram : histograms) {
+ histogram.update(price);
}
}
}
+ public static void main(String[] args) {
+ new Histogram(new SlidingTimeWindowReservoir(1, TimeUnit.DAYS));
+ }
+
@Override
protected Scheduler scheduler() {
return Scheduler.newFixedRateSchedule(0, 1, TimeUnit.MINUTES);
}
- private double getAlpha(Integer duration, TimeUnit timeUnit) {
- return 1 - Math.exp(-INTERVAL / timeUnit.toSeconds(duration));
- }
-
- private ExponentiallyDecayingHistogram getExponentiallyDecayingHistogram(String label, int sampleSize, double alpha) {
- return new ExponentiallyDecayingHistogram(new ExponentiallyDecayingReservoir(sampleSize, alpha), label);
+ private CompleteHistogram getExponentiallyDecayingHistogram(String label, int duration, TimeUnit timeUnit) {
+ return new CompleteHistogram(new SlidingTimeWindowReservoir(duration, timeUnit), label);
}
- public List<ExponentiallyDecayingHistogram> getHistograms() {
+ public List<CompleteHistogram> getHistograms() {
return histograms;
}
}

0 comments on commit 486949c

Please sign in to comment.