Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timer sum double #442

Merged
merged 7 commits into from Mar 4, 2015
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -51,7 +51,7 @@ public class PreaggregatedMetricsIntegrationTest extends IntegrationTestBase {
public void createFixtures() throws Exception {
simple = new TimerRollup()
.withSampleCount(1)
.withSum(100L)
.withSum(100d)
.withCountPS(101d)
.withAverage(102L)
.withVariance(103d)
Expand Down
Expand Up @@ -88,7 +88,7 @@ public void setUp() throws Exception {
.withMaxValue(100 - i)
.withMinValue(100 - i - i)
.withAverage(i / 2)
.withCountPS((double)i).withSum(2 * i)
.withCountPS((double)i).withSum(Double.valueOf(2 * i))
.withVariance((double) i / 2d);
metric = new PreaggregatedMetric(time, timerLocator, ttl, timer);
preaggregatedMetrics.add(metric);
Expand Down
Expand Up @@ -31,6 +31,8 @@ public class Constants {
public static final byte VERSION_1_ROLLUP = 0;
public static final byte VERSION_1_HISTOGRAM = 0;
public static final byte VERSION_1_TIMER = 0;
public static final byte VERSION_2_TIMER = 1;

public static final byte VERSION_1_COUNTER_ROLLUP = 0;
public static final byte VERSION_1_SET_ROLLUP = VERSION_1_ROLLUP; // don't change this.

Expand Down
Expand Up @@ -17,6 +17,7 @@
package com.rackspacecloud.blueflood.io.serializers;

import com.codahale.metrics.Histogram;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
import com.netflix.astyanax.serializers.AbstractSerializer;
Expand All @@ -41,10 +42,6 @@
import java.nio.ByteBuffer;
import java.util.Map;

import static com.rackspacecloud.blueflood.io.Constants.VERSION_1_FULL_RES;
import static com.rackspacecloud.blueflood.io.Constants.VERSION_1_ROLLUP;
import static com.rackspacecloud.blueflood.io.Constants.VERSION_1_TIMER;

import static com.rackspacecloud.blueflood.io.Constants.*;

public class NumericSerializer {
Expand Down Expand Up @@ -167,7 +164,7 @@ public static Number getUnversionedDoubleOrLong(CodedInputStream in) throws IOEx
private static void serializeFullResMetric(Object o, byte[] buf) throws IOException {
CodedOutputStream protobufOut = CodedOutputStream.newInstance(buf);
byte type = typeOf(o);
fullResSize.update(sizeOf(o, type));
fullResSize.update(sizeOf(o, type, VERSION_2_TIMER));
protobufOut.writeRawByte(Constants.VERSION_1_FULL_RES);

switch (type) {
Expand All @@ -192,7 +189,8 @@ private static void serializeFullResMetric(Object o, byte[] buf) throws IOExcept
}
}

private static int sizeOf(Object o, byte type) throws IOException {
private static int sizeOf(Object o, byte type, byte timerVersion)
throws IOException {
int sz = 0;
switch (type) {
case Constants.B_I32:
Expand All @@ -216,10 +214,10 @@ private static int sizeOf(Object o, byte type) throws IOException {
BasicRollup basicRollup = (BasicRollup)o;
sz += CodedOutputStream.computeRawVarint64Size(basicRollup.getCount());
if (basicRollup.getCount() > 0) {
sz += sizeOf(basicRollup.getAverage(), Type.B_ROLLUP_STAT);
sz += sizeOf(basicRollup.getVariance(), Type.B_ROLLUP_STAT);
sz += sizeOf(basicRollup.getMinValue(), Type.B_ROLLUP_STAT);
sz += sizeOf(basicRollup.getMaxValue(), Type.B_ROLLUP_STAT);
sz += sizeOf(basicRollup.getAverage(), Type.B_ROLLUP_STAT, timerVersion);
sz += sizeOf(basicRollup.getVariance(), Type.B_ROLLUP_STAT, timerVersion);
sz += sizeOf(basicRollup.getMinValue(), Type.B_ROLLUP_STAT, timerVersion);
sz += sizeOf(basicRollup.getMaxValue(), Type.B_ROLLUP_STAT, timerVersion);
}
break;
case Type.B_SET:
Expand All @@ -240,14 +238,21 @@ private static int sizeOf(Object o, byte type) throws IOException {
case Type.B_TIMER:
sz += 1; // version
TimerRollup rollup = (TimerRollup)o;
sz += CodedOutputStream.computeRawVarint64Size(rollup.getSum());
if (timerVersion == VERSION_1_TIMER) {
sz += CodedOutputStream.computeRawVarint64Size((long) rollup.getSum());
} else if (timerVersion == VERSION_2_TIMER) {

sz += CodedOutputStream.computeDoubleSizeNoTag(rollup.getSum());
} else {
throw new SerializationException(String.format("Unexpected serialization version: %d", (int)timerVersion));
}
sz += CodedOutputStream.computeRawVarint64Size(rollup.getCount());
sz += CodedOutputStream.computeDoubleSizeNoTag(rollup.getRate());
sz += CodedOutputStream.computeRawVarint32Size(rollup.getSampleCount());
sz += sizeOf(rollup.getAverage(), Type.B_ROLLUP_STAT);
sz += sizeOf(rollup.getMaxValue(), Type.B_ROLLUP_STAT);
sz += sizeOf(rollup.getMinValue(), Type.B_ROLLUP_STAT);
sz += sizeOf(rollup.getVariance(), Type.B_ROLLUP_STAT);
sz += sizeOf(rollup.getAverage(), Type.B_ROLLUP_STAT, timerVersion);
sz += sizeOf(rollup.getMaxValue(), Type.B_ROLLUP_STAT, timerVersion);
sz += sizeOf(rollup.getMinValue(), Type.B_ROLLUP_STAT, timerVersion);
sz += sizeOf(rollup.getVariance(), Type.B_ROLLUP_STAT, timerVersion);

Map<String, TimerRollup.Percentile> percentiles = rollup.getPercentiles();
sz += CodedOutputStream.computeRawVarint32Size(rollup.getPercentiles().size());
Expand All @@ -269,7 +274,7 @@ private static int sizeOf(Object o, byte type) throws IOException {

case Type.B_GAUGE:
// just like rollup up until a point.
sz += sizeOf(o, Type.B_ROLLUP);
sz += sizeOf(o, Type.B_ROLLUP, timerVersion);

// here's where it gets different.
GaugeRollup gauge = (GaugeRollup)o;
Expand Down Expand Up @@ -332,14 +337,21 @@ private static SetRollup deserializeV1SetRollup(CodedInputStream in) throws IOEx
}
return rollup;
}
private static void serializeTimer(TimerRollup rollup, byte[] buf) throws IOException {

private static void serializeTimer(TimerRollup rollup, byte[] buf, byte timerVersion) throws IOException {
CodedOutputStream out = CodedOutputStream.newInstance(buf);
timerRollupSize.update(buf.length);
out.writeRawByte(Constants.VERSION_1_TIMER);
out.writeRawByte(timerVersion);

// sum, count, countps, avg, max, min, var
out.writeRawVarint64(rollup.getSum());
if (timerVersion == VERSION_1_TIMER) {
out.writeRawVarint64((long)rollup.getSum());
} else if (timerVersion == VERSION_2_TIMER) {
out.writeDoubleNoTag(rollup.getSum());
} else {
throw new SerializationException(String.format("Unexpected serialization version: %d", (int)timerVersion));
}

out.writeRawVarint64(rollup.getCount());
out.writeDoubleNoTag(rollup.getRate());
out.writeRawVarint32(rollup.getSampleCount());
Expand All @@ -357,9 +369,18 @@ private static void serializeTimer(TimerRollup rollup, byte[] buf) throws IOExce
}
}

private static TimerRollup deserializeV1Timer(CodedInputStream in) throws IOException {
private static TimerRollup deserializeTimer(CodedInputStream in, byte timerVersion) throws IOException {
// note: type and version have already been read.
final long sum = in.readRawVarint64();
final double sum;
if (timerVersion == VERSION_1_TIMER) {
sum = in.readRawVarint64();
} else if (timerVersion == VERSION_2_TIMER) {
sum = in.readDouble();
} else {
throw new SerializationException(String.format("Unexpected serialization version: %d", (int)timerVersion));
}


final long count = in.readRawVarint64();
final double countPs = in.readDouble();
final int sampleCount = in.readRawVarint32();
Expand Down Expand Up @@ -535,7 +556,7 @@ public static class RawSerializer extends AbstractSerializer<Object> {
public ByteBuffer toByteBuffer(Object o) {
try {
byte type = typeOf(o);
byte[] buf = new byte[sizeOf(o, type)];
byte[] buf = new byte[sizeOf(o, type, VERSION_2_TIMER)];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a bit strange to pass in the timer version while serializing non-timer rollups like gauges, counters etc. What do you think about having two overloaded methods named sizeOf with one accepting just two arguments i.e (Object, Type) which will be used by all non-timer rollup types and then the other one with timerVersion as the third argument, which gets called by the timer serializer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is much cleaner. I originally thought there was a problem doing it this way, but it seems much better. I'll push some changes shortly.


serializeFullResMetric(o, buf);

Expand Down Expand Up @@ -583,7 +604,7 @@ private static class BasicRollupSerializer extends AbstractSerializer<BasicRollu
public ByteBuffer toByteBuffer(BasicRollup o) {
try {
byte type = typeOf(o);
byte[] buf = new byte[sizeOf(o, type)];
byte[] buf = new byte[sizeOf(o, type, VERSION_2_TIMER)];
serializeRollup(o, buf);
return ByteBuffer.wrap(buf);
} catch(IOException e) {
Expand Down Expand Up @@ -612,22 +633,33 @@ public static class TimerRollupSerializer extends AbstractSerializer<TimerRollup
public ByteBuffer toByteBuffer(TimerRollup o) {
try {
byte type = typeOf(o);
byte[] buf = new byte[sizeOf(o, type)];
serializeTimer(o, buf);
byte[] buf = new byte[sizeOf(o, type, VERSION_2_TIMER)];
serializeTimer(o, buf, VERSION_2_TIMER);
return ByteBuffer.wrap(buf);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}

@VisibleForTesting
public ByteBuffer toByteBufferWithV1Serialization(TimerRollup o) {
try {
byte type = typeOf(o);
byte[] buf = new byte[sizeOf(o, type, VERSION_1_TIMER)];
serializeTimer(o, buf, VERSION_1_TIMER);
return ByteBuffer.wrap(buf);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}


@Override
public TimerRollup fromByteBuffer(ByteBuffer byteBuffer) {
CodedInputStream in = CodedInputStream.newInstance(byteBuffer.array());
try {
byte version = in.readRawByte();
if (version != VERSION_1_TIMER)
throw new SerializationException(String.format("Unexpected serialization version: %d", (int)version));
return deserializeV1Timer(in);
return deserializeTimer(in, version);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
Expand All @@ -640,7 +672,7 @@ public static class SetRollupSerializer extends AbstractSerializer<SetRollup> {
public ByteBuffer toByteBuffer(SetRollup obj) {
try {
byte type = typeOf(obj);
byte[] buf = new byte[sizeOf(obj, type)];
byte[] buf = new byte[sizeOf(obj, type, VERSION_2_TIMER)];
serializeSetRollup(obj, buf);
return ByteBuffer.wrap(buf);
} catch (IOException ex) {
Expand All @@ -667,7 +699,7 @@ public static class GaugeRollupSerializer extends AbstractSerializer<GaugeRollup
public ByteBuffer toByteBuffer(GaugeRollup o) {
try {
byte type = typeOf(o);
byte[] buf = new byte[sizeOf(o, type)];
byte[] buf = new byte[sizeOf(o, type, VERSION_2_TIMER)];
serializeGauge(o, buf);
return ByteBuffer.wrap(buf);
} catch (IOException e) {
Expand Down Expand Up @@ -696,7 +728,7 @@ public static class CounterRollupSerializer extends AbstractSerializer<CounterRo
public ByteBuffer toByteBuffer(CounterRollup obj) {
try {
byte type = typeOf(obj);
byte[] buf = new byte[sizeOf(obj, type)];
byte[] buf = new byte[sizeOf(obj, type, VERSION_2_TIMER)];
serializeCounterRollup(obj, buf);
return ByteBuffer.wrap(buf);
} catch (IOException ex) {
Expand All @@ -717,4 +749,4 @@ public CounterRollup fromByteBuffer(ByteBuffer byteBuffer) {
}
}
}
}
}
Expand Up @@ -11,7 +11,7 @@


public class TimerRollup implements Rollup, IBasicRollup {
private long sum = 0;
private double sum = 0;
private long count = 0;
private double rate = 0;

Expand All @@ -34,7 +34,7 @@ public TimerRollup() {
super();
}

public TimerRollup withSum(long sum) {
public TimerRollup withSum(double sum) {
this.sum = sum;
return this;
}
Expand Down Expand Up @@ -153,7 +153,7 @@ else if (number instanceof Integer)

// per second rate.
public double getRate() { return rate; }
public long getSum() { return sum; }
public double getSum() { return sum; }
public long getCount() { return count; };
public int getSampleCount() { return sampleCount; }

Expand Down
Expand Up @@ -86,7 +86,7 @@ public void testSetSerialization() throws IOException {

@Test
public void testTimerSerialization() throws IOException {
String timerValue = "{\"type\":\"timer\",\"average\":214,\"count\":1,\"max\":214,\"min\":214,\"percentiles\":{\"98\":214,\"99\":214,\"75\":214,\"999\":214,\"50\":214},\"rate\":0.06666666666666667,\"sampleCount\":1,\"sum\":214,\"variance\":0.0}";
String timerValue = "{\"type\":\"timer\",\"average\":214,\"count\":1,\"max\":214,\"min\":214,\"percentiles\":{\"98\":214,\"99\":214,\"75\":214,\"999\":214,\"50\":214},\"rate\":0.06666666666666667,\"sampleCount\":1,\"sum\":214.0,\"variance\":0.0}";

TimerRollup timerDeserialized = mapper.readValue(timerValue, TimerRollup.class);
String timerSerialized = mapper.writeValueAsString(timerDeserialized);
Expand Down
Expand Up @@ -31,12 +31,12 @@
import java.nio.ByteBuffer;

public class TimerSerializationTest {

@Test
public void testV1RoundTrip() throws IOException {
// build up a Timer
TimerRollup r0 = new TimerRollup()
.withSum(42)
.withSum(Double.valueOf(42))
.withCountPS(23.32d)
.withAverage(56)
.withVariance(853.3245d)
Expand All @@ -45,20 +45,52 @@ public void testV1RoundTrip() throws IOException {
.withCount(345);
r0.setPercentile("foo", 741.32d);
r0.setPercentile("bar", 0.0323d);

if (System.getProperty("GENERATE_TIMER_SERIALIZATION") != null) {
OutputStream os = new FileOutputStream("src/test/resources/serializations/timer_version_" + Constants.VERSION_1_TIMER + ".bin", false);
//The V1 serialization is artificially constructed for the purposes of this test and should no longer be used.
os.write(Base64.encodeBase64(new NumericSerializer.TimerRollupSerializer().toByteBufferWithV1Serialization(r0).array()));
os.write("\n".getBytes());
os.close();
}

Assert.assertTrue(new File("src/test/resources/serializations").exists());

int version = 0;

BufferedReader reader = new BufferedReader(new FileReader("src/test/resources/serializations/timer_version_" + version + ".bin"));
ByteBuffer bb = ByteBuffer.wrap(Base64.decodeBase64(reader.readLine().getBytes()));
TimerRollup r1 = new NumericSerializer.TimerRollupSerializer().fromByteBuffer(bb);
Assert.assertEquals(r0, r1);
}

@Test
public void testV2RoundTrip() throws IOException {
// build up a Timer
TimerRollup r0 = new TimerRollup()
.withSum(Double.valueOf(42))
.withCountPS(23.32d)
.withAverage(56)
.withVariance(853.3245d)
.withMinValue(2)
.withMaxValue(987)
.withCount(345);
r0.setPercentile("foo", 741.32d);
r0.setPercentile("bar", 0.0323d);

if (System.getProperty("GENERATE_TIMER_SERIALIZATION") != null) {
OutputStream os = new FileOutputStream("src/test/resources/serializations/timer_version_" + Constants.VERSION_2_TIMER + ".bin", false);
os.write(Base64.encodeBase64(new NumericSerializer.TimerRollupSerializer().toByteBuffer(r0).array()));
os.write("\n".getBytes());
os.close();
}

Assert.assertTrue(new File("src/test/resources/serializations").exists());

// ensure historical reads work.
int version = 0;
int maxVersion = Constants.VERSION_1_TIMER;
int maxVersion = Constants.VERSION_2_TIMER;

int count = 0;
while (version <= maxVersion) {
BufferedReader reader = new BufferedReader(new FileReader("src/test/resources/serializations/timer_version_" + version + ".bin"));
Expand All @@ -68,8 +100,7 @@ public void testV1RoundTrip() throws IOException {
count++;
version++;
}

Assert.assertTrue("Nothing was tested", count > 0);
}

}
}
Expand Up @@ -52,7 +52,7 @@ public void testTimerRollupSerialization() {
rollup.withMaxValue(20);
rollup.withMinValue(5);
rollup.withVariance(12);
rollup.withSum(10);
rollup.withSum(Double.valueOf(10));
rollup.withCountPS(30);
//Get the JSON object node from Rollup
ObjectNode resultNode = RollupSerializationHelper.rollupToJson(rollup);
Expand All @@ -61,7 +61,7 @@ public void testTimerRollupSerialization() {
Assert.assertEquals(resultNode.get("mean").asLong(), rollup.getAverage().toLong());
Assert.assertEquals(resultNode.get("var").asDouble(), rollup.getVariance().toDouble());
Assert.assertEquals(resultNode.get("count").asLong(), rollup.getCount());
Assert.assertEquals(resultNode.get("sum").asLong(), rollup.getSum());
Assert.assertEquals(resultNode.get("sum").asDouble(), rollup.getSum());
Assert.assertEquals(resultNode.get("rate").asDouble(), rollup.getRate());
}

Expand Down