Skip to content

Commit

Permalink
Attempt a fix for OpenTSDB#823 wherein writes to the annotations byte…
Browse files Browse the repository at this point in the history
… map

were not synchronized, potentially leading to a race condition and
stuck threads. Now we'll send a fresh list to the compaction code
if notes were found, synchronize on the local byte map before making
any modifications. Thanks to @thatsafunnyname.
  • Loading branch information
manolama committed Dec 17, 2016
1 parent 301c16b commit ee1bf44
Showing 1 changed file with 16 additions and 8 deletions.
24 changes: 16 additions & 8 deletions src/core/SaltScanner.java
Expand Up @@ -36,6 +36,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;
import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;

Expand Down Expand Up @@ -305,8 +306,10 @@ final class ScannerCB implements Callback<Object,
private final List<KeyValue> kvs = new ArrayList<KeyValue>();
private final ByteMap<List<Annotation>> annotations =
new ByteMap<List<Annotation>>();
private final Set<String> skips = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
private final Set<String> keepers = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
private final Set<String> skips = Collections.newSetFromMap(
new ConcurrentHashMap<String, Boolean>());
private final Set<String> keepers = Collections.newSetFromMap(
new ConcurrentHashMap<String, Boolean>());

private long scanner_start = -1;
/** nanosecond timestamps */
Expand Down Expand Up @@ -524,12 +527,6 @@ void processRow(final byte[] key, final ArrayList<KeyValue> row) {
tsdb.getClient().delete(del);
}

List<Annotation> notes = annotations.get(key);
if (notes == null) {
notes = new ArrayList<Annotation>();
annotations.put(key, notes);
}

// calculate estimated data point count. We don't want to deserialize
// the byte arrays so we'll just get a rough estimate of compacted
// columns.
Expand Down Expand Up @@ -565,7 +562,18 @@ void processRow(final byte[] key, final ArrayList<KeyValue> row) {
// the scanner
final long compaction_start = DateTime.nanoTime();
try {
final List<Annotation> notes = Lists.newArrayList();
compacted = tsdb.compact(row, notes);
if (!notes.isEmpty()) {
synchronized (annotations) {
List<Annotation> map_notes = annotations.get(key);
if (map_notes == null) {
annotations.put(key, notes);
} else {
map_notes.addAll(notes);
}
}
}
} catch (IllegalDataException idex) {
compaction_time += (DateTime.nanoTime() - compaction_start);
close(false);
Expand Down

0 comments on commit ee1bf44

Please sign in to comment.