Skip to content

Commit

Permalink
Merge 4918ba8 into a8ac5e2
Browse files Browse the repository at this point in the history
  • Loading branch information
tilogaat committed Jan 19, 2015
2 parents a8ac5e2 + 4918ba8 commit a46823f
Show file tree
Hide file tree
Showing 14 changed files with 441 additions and 95 deletions.
Expand Up @@ -33,6 +33,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import com.rackspacecloud.blueflood.utils.InMemoryMetadataIO;

import java.io.DataInputStream;
import java.io.DataOutputStream;
Expand Down Expand Up @@ -269,52 +270,7 @@ public void testPersistence() throws Exception {
Assert.assertEquals("zzzzz", cache1.get(l1, "zee"));
}

private static class InMemoryMetadataIO implements MetadataIO {
private final Table<Locator, String, String> backingTable = Tables.newCustomTable(
Maps.<Locator, Map<String, String>>newHashMap(),
new Supplier<Map<String, String>>() {
@Override
public Map<String, String> get() {
return Maps.newHashMap();
}
}
);

@Override
public void put(Locator locator, String key, String value) throws IOException {
backingTable.put(locator, key, value);
}

@Override
public Map<String, String> getAllValues(Locator locator) throws IOException {
return backingTable.row(locator);
}

@Override
public Table<Locator, String, String> getAllValues(Set<Locator> locators) throws IOException {
Table<Locator, String, String> results = HashBasedTable.create();

for (Locator locator : locators) {
Map<String, String> metaForLoc = backingTable.row(locator);
for (Map.Entry<String, String> meta : metaForLoc.entrySet()) {
results.put(locator, meta.getKey(), meta.getValue());
}
}

return results;
}

@Override
public void putAll(Table<Locator, String, String> meta) throws IOException {
backingTable.putAll(meta);
}

@Override
public int getNumberOfRowsTest() throws IOException {
return backingTable.rowKeySet().size();
}
}

@Parameterized.Parameters
public static Collection<Object[]> getIOs() {
List<Object[]> ios = new ArrayList<Object[]>();
Expand Down
Expand Up @@ -94,4 +94,45 @@ public void testBatchedReads() throws Exception {
Assert.assertEquals(1, metrics.getData().getPoints().size());
}
}

@Test
public void testCanRetrieveNumericMetricsEvenIfNoMetaDataStored() throws Exception {
// Write metrics and also persist their types.
List<Locator> locatorList = new ArrayList<Locator>();
Metric metric = writeMetric("string_metric", "version 1.0.43342346");
MetadataCache.getInstance().put(metric.getLocator(), MetricMetadata.TYPE.name().toLowerCase(), DataType.STRING.toString());
locatorList.add(metric.getLocator());

metric = writeMetric("int_metric", 45);
locatorList.add(metric.getLocator());

metric = writeMetric("long_metric", 67L);
locatorList.add(metric.getLocator());

// Test batch reads
AstyanaxReader reader = AstyanaxReader.getInstance();
Map<Locator, MetricData> results = reader.getDatapointsForRange(locatorList, new Range(metric.getCollectionTime() - 100000,
metric.getCollectionTime() + 100000), Granularity.FULL);

Assert.assertEquals(locatorList.size(), results.size());

for (Locator locator : locatorList) {
MetricData metrics = results.get(locator);
Assert.assertEquals(1, metrics.getData().getPoints().size());
}
}

@Test
public void test_StringMetrics_WithoutMetadata_NotRetrieved() throws Exception {
List<Locator> locatorList = new ArrayList<Locator>();
Metric metric = writeMetric("string_metric_1", "version 1.0.43342346");
locatorList.add(metric.getLocator());

// Test batch reads
AstyanaxReader reader = AstyanaxReader.getInstance();
Map<Locator, MetricData> results = reader.getDatapointsForRange(locatorList, new Range(metric.getCollectionTime() - 100000,
metric.getCollectionTime() + 100000), Granularity.FULL);

Assert.assertEquals(0, results.size());
}
}
Expand Up @@ -26,6 +26,8 @@
import junit.framework.Assert;
import org.junit.Test;

import static org.mockito.Mockito.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -107,11 +109,11 @@ public void setUp() throws Exception {
}

@Test
public void testNormalMetrics() throws IOException {
public void testNormalMetrics() throws Exception {
// full res has 5 samples.
Assert.assertEquals(5, reader.getDataToRoll(SimpleNumber.class,
normalLocator,
range,
range,
CassandraModel.CF_METRICS_FULL).getPoints().size());

// assert nothing in 5m for this locator.
Expand All @@ -124,6 +126,7 @@ public void testNormalMetrics() throws IOException {
SingleRollupReadContext rc = new SingleRollupReadContext(normalLocator, range, Granularity.MIN_5);
RollupBatchWriter batchWriter = new RollupBatchWriter(new ThreadPoolBuilder().build(), rec);
RollupRunnable rr = new RollupRunnable(rec, rc, batchWriter);
rr.discoveryHandler = mock(DiscoveryIO.class);
rr.run();

while (!rec.doneReading() && !rec.doneWriting()) {
Expand All @@ -134,6 +137,9 @@ public void testNormalMetrics() throws IOException {
}
}

//Zero interactions because no current listeners for the event
verifyZeroInteractions(rr.discoveryHandler);

// assert something in 5m for this locator.
Assert.assertEquals(1, reader.getDataToRoll(BasicRollup.class,
normalLocator,
Expand Down Expand Up @@ -179,7 +185,7 @@ private void testRolledupMetric(Locator locator, Class fullResClass, Class rollu
RollupBatchWriter batchWriter = new RollupBatchWriter(new ThreadPoolBuilder().build(), rec);
RollupRunnable rr = new RollupRunnable(rec, rc, batchWriter);
rr.run();

// assert something in 5m for this locator.
while (!rec.doneReading() && !rec.doneWriting()) {
batchWriter.drainBatch();
Expand Down
Expand Up @@ -108,12 +108,16 @@ protected static Keyspace getKeyspace() {
}

protected AbstractSerializer serializerFor(RollupType rollupType, DataType dataType, Granularity gran) {
if (dataType == null) {
return NumericSerializer.serializerFor(RollupType.classOf(rollupType, gran));
}

if (dataType.equals(DataType.STRING)) {
return StringSerializer.get();
} else if (dataType.equals(DataType.BOOLEAN)) {
return BooleanSerializer.get();
} else {
return NumericSerializer.serializerFor(RollupType.classOf(rollupType, gran));
return NumericSerializer.serializerFor(RollupType.classOf(rollupType, gran));
}
}
}
Expand Up @@ -298,19 +298,6 @@ public <T extends Rollup> Points<T> getDataToRoll(Class<T> type, Locator locator
return points;
}

public static String getUnitString(Locator locator) {
String unitString = null;
try {
unitString = metaCache.get(locator, MetricMetadata.UNIT.name().toLowerCase(), String.class);
} catch (CacheException ex) {
log.warn("Cache exception reading unitString from MetadataCache: ", ex);
}
if (unitString == null) {
unitString = UNKNOWN;
}
return unitString;
}

public static String getType(Locator locator) {
String type = null;
try {
Expand All @@ -326,9 +313,14 @@ public static String getType(Locator locator) {

public MetricData getDatapointsForRange(Locator locator, Range range, Granularity gran) {
try {
//TODO: If we stop processing string metrics, we can get rid of this and always return numeric
//Questions: Do we care about pre-agg types
Object type = metaCache.get(locator, dataTypeCacheKey);
RollupType rollupType = RollupType.fromString(metaCache.get(locator, rollupTypeCacheKey));

if (rollupType == null) {
rollupType = RollupType.BF_BASIC;
}
if (type == null) {
return getNumericOrStringRollupDataForRange(locator, range, gran, rollupType);
}
Expand Down Expand Up @@ -364,9 +356,17 @@ public Map<Locator, MetricData> getDatapointsForRange(List<Locator> locators, Ra
for (Locator locator : locators) {
try {
RollupType rollupType = RollupType.fromString((String)
metaCache.get(locator, MetricMetadata.ROLLUP_TYPE.name().toLowerCase()));
DataType dataType = new DataType((String)
metaCache.get(locator, MetricMetadata.TYPE.name().toLowerCase()));
metaCache.get(locator, MetricMetadata.ROLLUP_TYPE.name().toLowerCase()));
if (rollupType == null) {
rollupType = RollupType.BF_BASIC;
}
//TODO: If we stop processing string and boolean, we can always hardcode this to numeric
DataType dataType = getDataType(locator, MetricMetadata.TYPE.name().toLowerCase());

if (dataType == null) {
dataType = DataType.INT;
}

ColumnFamily cf = CassandraModel.getColumnFamily(rollupType, dataType, gran);
List<Locator> locs = locatorsByCF.get(cf);
locs.add(locator);
Expand All @@ -375,13 +375,13 @@ public Map<Locator, MetricData> getDatapointsForRange(List<Locator> locators, Ra
}
}

for (ColumnFamily CF : locatorsByCF.keySet()) {
for (ColumnFamily CF : locatorsByCF.keySet()) {
List<Locator> locs = locatorsByCF.get(CF);
Map<Locator, ColumnList<Long>> metrics = getColumnsFromDB(locs, CF, range);
// transform columns to MetricData
for (Locator loc : metrics.keySet()) {
MetricData data = transformColumnsToMetricData(loc, metrics.get(loc), gran);
if (data != null) {
if (data != null && !data.getData().isEmpty()) {
results.put(loc, data);
}
}
Expand All @@ -398,7 +398,7 @@ public MetricData getHistogramsForRange(Locator locator, Range range, Granularit

ColumnFamily cf = CassandraModel.getColumnFamily(HistogramRollup.class, granularity);
Points<HistogramRollup> histogramRollupPoints = getDataToRoll(HistogramRollup.class, locator, range, cf);
return new MetricData(histogramRollupPoints, getUnitString(locator), MetricData.Type.HISTOGRAM);
return new MetricData(histogramRollupPoints, null, MetricData.Type.HISTOGRAM);
}

// Used for string metrics
Expand All @@ -415,7 +415,7 @@ private MetricData getStringMetricDataForRange(Locator locator, Range range, Gra
}
}

return new MetricData(points, getUnitString(locator), MetricData.Type.STRING);
return new MetricData(points, null, MetricData.Type.STRING);
}

private MetricData getBooleanMetricDataForRange(Locator locator, Range range, Granularity gran) {
Expand All @@ -431,7 +431,7 @@ private MetricData getBooleanMetricDataForRange(Locator locator, Range range, Gr
}
}

return new MetricData(points, getUnitString(locator), MetricData.Type.BOOLEAN);
return new MetricData(points, null, MetricData.Type.BOOLEAN);
}

// todo: replace this with methods that pertain to type (which can be used to derive a serializer).
Expand All @@ -453,7 +453,7 @@ private MetricData getNumericMetricDataForRange(Locator locator, Range range, Gr
}
}

return new MetricData(points, getUnitString(locator), MetricData.Type.NUMBER);
return new MetricData(points, null, MetricData.Type.NUMBER);
}

// gets called when we DO NOT know what the data type is (numeric, string, etc.)
Expand All @@ -473,8 +473,10 @@ private MetricData transformColumnsToMetricData(Locator locator, ColumnList<Long
Granularity gran) {
try {
RollupType rollupType = RollupType.fromString(metaCache.get(locator, rollupTypeCacheKey));
DataType dataType = new DataType(metaCache.get(locator, dataTypeCacheKey));
String unit = getUnitString(locator);
//TODO: if we stop processing string metrics, can we get this info from somewhere else?
//Just return type numeric by default for now
DataType dataType = getDataType(locator, dataTypeCacheKey);
String unit = null;
MetricData.Type outputType = MetricData.Type.from(rollupType, dataType);
Points points = getPointsFromColumns(columns, rollupType, dataType, gran);
MetricData data = new MetricData(points, unit, outputType);
Expand All @@ -484,6 +486,16 @@ private MetricData transformColumnsToMetricData(Locator locator, ColumnList<Long
}
}

private DataType getDataType(Locator locator, String dataTypeCacheKey) throws CacheException{
String meta = metaCache.get(locator, dataTypeCacheKey);
DataType dataType = null;
if (meta != null) {
dataType = new DataType(meta);
}

return dataType;
}

private Points getPointsFromColumns(ColumnList<Long> columnList, RollupType rollupType,
DataType dataType, Granularity gran) {
Points points = new Points();
Expand Down
Expand Up @@ -163,8 +163,11 @@ public static ColumnFamily getColumnFamily(Class<? extends Rollup> type, Granula
}

public static ColumnFamily getColumnFamily(RollupType type, DataType dataType, Granularity gran) {
if (type == RollupType.BF_BASIC &&
(dataType.equals(DataType.BOOLEAN) || dataType.equals(DataType.STRING))) {
if (dataType == null) {
dataType = DataType.INT;
}

if (type == RollupType.BF_BASIC && (dataType.equals(DataType.BOOLEAN) || dataType.equals(DataType.STRING))) {
return CF_METRICS_STRING;
}

Expand Down
Expand Up @@ -22,7 +22,7 @@

public class MetricData {
private final Points data;
private final String unit;
private String unit;
private final Type type;

public MetricData(Points points, String unit, Type type) {
Expand All @@ -35,8 +35,12 @@ public Points getData() {
return data;
}

public void setUnit(String unit) {
this.unit = unit;
}

public String getUnit() {
return unit;
return this.unit;
}

public String getType() {
Expand All @@ -61,6 +65,11 @@ public String toString() {
}

public static Type from(RollupType rollupType, DataType dataType) {
// We no longer store datatype metadata for Numeric datatypes
if (dataType == null) {
return NUMBER;
}

if (dataType.equals(DataType.STRING)) {
return STRING;
} else if (dataType.equals(DataType.BOOLEAN)) {
Expand Down

0 comments on commit a46823f

Please sign in to comment.