Skip to content

Commit

Permalink
Modify the TsdbQuery scanner to stop at 1024 calls to .nextRows() so …
Browse files Browse the repository at this point in the history
…that we

can continue in a different thread. It has a thread pool for now but I'd like
to eventually share that with Netty. This avoids the stack overflow problem
in OpenTSDB#334 where the callback chain simply grew too long.
  • Loading branch information
manolama committed May 28, 2015
1 parent 86f4cb1 commit 24d80eb
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 35 deletions.
2 changes: 2 additions & 0 deletions src/core/TSDB.java
Expand Up @@ -808,6 +808,8 @@ public Object call(ArrayList<Object> compactions) throws Exception {
}
}

TsdbQuery.thread_pool.shutdown();

// wait for plugins to shutdown before we close the client
return deferreds.size() > 0
? Deferred.group(deferreds).addCallbacks(new HClientShutdown(),
Expand Down
128 changes: 99 additions & 29 deletions src/core/TsdbQuery.java
Expand Up @@ -21,6 +21,9 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -45,6 +48,14 @@ final class TsdbQuery implements Query {

private static final Logger LOG = LoggerFactory.getLogger(TsdbQuery.class);

/** Used when a query would cause a stack overflow due to async recursion */
protected static final ExecutorService thread_pool =
Executors.newCachedThreadPool();

/** The number of times we call {@code scanner.nextRows()} before breaking
* out a new thread */
private static final int SCAN_RECURSION_DEPTH = 1024;

/** Used whenever there are no results. */
private static final DataPoints[] NO_RESULT = new DataPoints[0];

Expand Down Expand Up @@ -352,31 +363,54 @@ private Deferred<TreeMap<byte[], Span>> findSpans() throws HBaseException {
final Scanner scanner = getScanner();
final Deferred<TreeMap<byte[], Span>> results =
new Deferred<TreeMap<byte[], Span>>();
final long scanner_start_time = System.currentTimeMillis();
final AtomicLong rows_with_data = new AtomicLong();
final byte[] seen_annotations = new byte[] { 0 };

/**
* Scanner callback executed recursively each time we get a set of data
* from storage. This is responsible for determining what columns are
* returned and issuing requests to load leaf objects.
* When the scanner returns a null set of rows, the method initiates the
* final callback.
* Scanner callback executed recursively each time we get rows of datapoints
* from storage. Each time we get a set of rows, we look to see if the
* scanner is complete, i.e. rows == null, or if we have reached the maximum
* recursion depth. If we have hit the max depth, then we will call
* {@code scan_completed} with {@code false} to let the Recursion callback
* know that it should fire up another ScannerCB in a separate thread so this
* one can rollback it's stack.
*/
final class ScannerCB implements Callback<Object,
final class ScannerCB extends Thread implements Callback<Deferred<Boolean>,
ArrayList<ArrayList<KeyValue>>> {

int nrows = 0;
boolean seenAnnotation = false;
int hbase_time = 0; // milliseconds.
long starttime = System.nanoTime();
private final Deferred<Boolean> scan_completed = new Deferred<Boolean>();
private int scan_iterations = 0;
private int nrows = 0;
private byte had_annotation = 0;

/**
* Ctor for the callback class
* @param completed_callback The callback to trigger when we're done or
* hit the max recursion limit.
*/
public ScannerCB(final Callback<Object, Boolean> completed_callback) {
scan_completed.addCallback(completed_callback);
}

/**
* Used by the thread pool if the scanner has exceeded the max number of
* calls to HBase so we don't overflow the stack of this thread.
*/
@Override
public void run() {
scan();
}

/**
* Starts the scanner and is called recursively to fetch the next set of
* rows from the scanner.
* @return The map of spans if loaded successfully, null if no data was
* found
*/
public Object scan() {
starttime = System.nanoTime();
return scanner.nextRows().addCallback(this);
public Deferred<Boolean> scan() {
scanner.nextRows().addCallback(this);
return scan_completed;
}

/**
Expand All @@ -385,28 +419,21 @@ public Object scan() {
* @return null if no rows were found, otherwise the TreeMap with spans
*/
@Override
public Object call(final ArrayList<ArrayList<KeyValue>> rows)
public Deferred<Boolean> call(final ArrayList<ArrayList<KeyValue>> rows)
throws Exception {
hbase_time += (System.nanoTime() - starttime) / 1000000;
try {
if (rows == null) {
hbase_time += (System.nanoTime() - starttime) / 1000000;
scanlatency.add(hbase_time);
LOG.info(TsdbQuery.this + " matched " + nrows + " rows in " +
spans.size() + " spans in " + hbase_time + "ms");
if (nrows < 1 && !seenAnnotation) {
results.callback(null);
} else {
results.callback(spans);
}
seen_annotations[0] |= had_annotation;
rows_with_data.addAndGet(nrows);
scan_completed.callback(true);
scanner.close();
return null;
}
scan_iterations++;

for (final ArrayList<KeyValue> row : rows) {
final byte[] key = row.get(0).key();
if (Bytes.memcmp(metric, key, 0, metric_width) != 0) {
scanner.close();
throw new IllegalDataException(
"HBase returned a row that doesn't match"
+ " our scanner (" + scanner + ")! " + row + " does not start"
Expand All @@ -419,23 +446,66 @@ public Object call(final ArrayList<ArrayList<KeyValue>> rows)
}
final KeyValue compacted =
tsdb.compact(row, datapoints.getAnnotations());
seenAnnotation |= !datapoints.getAnnotations().isEmpty();
if (!datapoints.getAnnotations().isEmpty()) {
had_annotation = 1;
}
if (compacted != null) { // Can be null if we ignored all KVs.
datapoints.addRow(compacted);
nrows++;
}
}

// if we have exceeded the maximum scan recursion depth then we
// need to close out this callback chain and start a new one in
// a totally separate thread.
if (scan_iterations > SCAN_RECURSION_DEPTH) {
LOG.debug("Exceeded max scanner recursion depth of " +
SCAN_RECURSION_DEPTH + " iterations. Continuing scan "
+ "with new thread.");
seen_annotations[0] |= had_annotation;
rows_with_data.addAndGet(nrows);
scan_completed.callback(false);
return null;
}

return scan();
scan();
return null;
} catch (Exception e) {
scanner.close();
results.callback(e);
seen_annotations[0] |= had_annotation;
rows_with_data.addAndGet(nrows);
scan_completed.callback(e);
return null;
}
}
}

new ScannerCB().scan();
/**
* This is called at the end of each scanning loop. The value returned
* determines if we return data or launch another scan iterator in a separate
* thread to avoid stack overflow issues due to async chaining.
*/
class RecursionCB implements Callback<Object, Boolean> {
@Override
public Object call(final Boolean completed) {
if (completed == true) {
LOG.info(TsdbQuery.this + " matched " + rows_with_data.get() + " rows in " +
spans.size() + " spans in " +
(System.currentTimeMillis() - scanner_start_time) + "ms");
if (rows_with_data.get() < 1 && seen_annotations[0] < 1) {
results.callback(null);
} else {
results.callback(spans);
}
} else {
final ScannerCB scanner_cb = new ScannerCB(this);
thread_pool.execute(scanner_cb);
}
return null;
}
}

new ScannerCB(new RecursionCB()).scan();
return results;
}

Expand Down
56 changes: 50 additions & 6 deletions test/core/TestTsdbQuery.java
Expand Up @@ -22,6 +22,7 @@
import static org.powermock.api.mockito.PowerMockito.mock;

import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -50,6 +51,7 @@
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;

import com.stumbleupon.async.Deferred;

Expand All @@ -72,7 +74,7 @@
Scanner.class, TsdbQuery.class, DeleteRequest.class, Annotation.class,
RowKey.class, Span.class, SpanGroup.class, IncomingDataPoints.class })
public final class TestTsdbQuery {
private Config config;
private Config config = mock(Config.class);
private TSDB tsdb = null;
private HBaseClient client = mock(HBaseClient.class);
private UniqueId metrics = mock(UniqueId.class);
Expand All @@ -84,11 +86,21 @@ public final class TestTsdbQuery {
@Before
public void before() throws Exception {
PowerMockito.whenNew(HBaseClient.class)
.withArguments(anyString(), anyString()).thenReturn(client);
config = new Config(false);
config.setFixDuplicates(true); // TODO(jat): test both ways
.withArguments(anyString(), anyString()).thenReturn(client);
when(config.getString("tsd.storage.hbase.data_table")).thenReturn("tsdb");
when(config.getString("tsd.storage.hbase.uid_table")).thenReturn("tsdb-uid");
when(config.getString("tsd.storage.hbase.meta_table")).thenReturn("tsdb-meta");
when(config.getString("tsd.storage.hbase.tree_table")).thenReturn("tsdb-tree");
when(config.fix_duplicates()).thenReturn(true);

// set compactions to FALSE first so we avoid firing off the compaction
// thread while we're performing the tests. Then set it back to true as some
// tests depend on it.
when(config.enable_compactions()).thenReturn(false);
tsdb = new TSDB(config);
when(config.enable_compactions()).thenReturn(true);
query = new TsdbQuery(tsdb);


// replace the "real" field objects with mocks
Field met = tsdb.getClass().getDeclaredField("metrics");
Expand All @@ -103,7 +115,7 @@ public void before() throws Exception {
tagv.setAccessible(true);
tagv.set(tsdb, tag_values);

// mock UniqueId
// mock UniqueId
when(metrics.getId("sys.cpu.user")).thenReturn(new byte[] { 0, 0, 1 });
when(metrics.getNameAsync(new byte[] { 0, 0, 1 }))
.thenReturn(Deferred.fromResult("sys.cpu.user"));
Expand Down Expand Up @@ -296,7 +308,7 @@ public void runLongSingleTS() throws Exception {
query.setTimeSeries("sys.cpu.user", tags, Aggregators.SUM, false);

final DataPoints[] dps = query.run();

System.out.println("Size: " + dps.length);
assertNotNull(dps);
assertEquals("sys.cpu.user", dps[0].metricName());
assertTrue(dps[0].getAggregatedTags().isEmpty());
Expand Down Expand Up @@ -1598,6 +1610,38 @@ public void runInterpolationMsDownsampled() throws Exception {
}
assertEquals(151, dps[0].size());
}

@Test
public void stackOverflowOKTest() throws Exception {
setQueryStorage();
storage.setMaxScannerRows(1);
// dump a bunch of rows of two metrics so that we can test filtering out
// on the metric
HashMap<String, String> tags = new HashMap<String, String>(1);
tags.put("host", "web01");
long timestamp = 1356998400;
for (int i = 1; i <= 1024 * 3; i++) {
tsdb.addPoint("sys.cpu.user", timestamp += 3600, i, tags).joinUninterruptibly();
}

tags.clear();
query.setStartTime(1356998400);
query.setEndTime(1420070400);
query.setTimeSeries("sys.cpu.user", tags, Aggregators.SUM, false);
final DataPoints[] dps = query.run();
assertNotNull(dps);
assertEquals("sys.cpu.user", dps[0].metricName());
assertTrue(dps[0].getAggregatedTags().isEmpty());
assertNull(dps[0].getAnnotations());
assertEquals("web01", dps[0].getTags().get("host"));

int value = 1;
for (DataPoint dp : dps[0]) {
assertEquals(value, dp.longValue());
value++;
}
assertEquals(3072, dps[0].aggregatedSize());
}

//---------------------- //
// Aggregator unit tests //
Expand Down

0 comments on commit 24d80eb

Please sign in to comment.