Skip to content

Commit

Permalink
ISSUE apache#1606: Fixed race condition during expansion of concurren…
Browse files Browse the repository at this point in the history
…t open hash maps

### Motivation

As reported in apache#1606, there is a race condition in the concurrent open hash maps implementation. The race happens when the maps gets re-hashed after the expansion and the new arrays are substituting the old ones.

The race itself is that a thread doing a `get()` on the map is first checking the current `capacity` of the map, uses that to get the bucket and then tries to do optimistic read of the value in that bucket.

This assumes `capacity` update is visible only after the `values` array is already swapped, but that is not always the case in current code.

### Changes
 * Use `volatile` qualifier for `capacity` and `values` arrays to ensure ordering of memory read is respected by compiler
 * In rehashing, update `capacity` after `values`

Author: Matteo Merli <mmerli@apache.org>

Reviewers: Enrico Olivelli <eolivelli@gmail.com>, Sijie Guo <sijie@apache.org>

This closes apache#1607 from merlimat/fix-concurrent-maps, closes apache#1606

(cherry picked from commit a7e66e1)
Signed-off-by: Matteo Merli <mmerli@apache.org>
  • Loading branch information
merlimat committed Aug 16, 2018
1 parent e5a5475 commit 36edaca
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,10 @@ public interface EntryProcessor<V> {
// A section is a portion of the hash map that is covered by a single
@SuppressWarnings("serial")
private static final class Section<V> extends StampedLock {
private long[] keys;
private V[] values;
private volatile long[] keys;
private volatile V[] values;

private int capacity;
private volatile int capacity;
private volatile int size;
private int usedBuckets;
private int resizeThreshold;
Expand Down Expand Up @@ -504,10 +504,12 @@ private void rehash() {
}
}

capacity = newCapacity;
keys = newKeys;
values = newValues;
usedBuckets = size;
// Capacity needs to be updated after the values, so that we won't see
// a capacity value bigger than the actual array size
capacity = newCapacity;
resizeThreshold = (int) (capacity * MapFillFactor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ public Set<Long> items() {
@SuppressWarnings("serial")
private static final class Section extends StampedLock {
// Keys and values are stored interleaved in the table array
private long[] table;
private volatile long[] table;

private int capacity;
private volatile int capacity;
private volatile int size;
private int usedBuckets;
private int resizeThreshold;
Expand Down Expand Up @@ -375,9 +375,11 @@ private void rehash() {
}
}

capacity = newCapacity;
table = newTable;
usedBuckets = size;
// Capacity needs to be updated after the values, so that we won't see
// a capacity value bigger than the actual array size
capacity = newCapacity;
resizeThreshold = (int) (capacity * SetFillFactor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,9 @@ public Map<Long, Long> asMap() {
@SuppressWarnings("serial")
private static final class Section extends StampedLock {
// Keys and values are stored interleaved in the table array
private long[] table;
private volatile long[] table;

private int capacity;
private volatile int capacity;
private volatile int size;
private int usedBuckets;
private int resizeThreshold;
Expand Down Expand Up @@ -682,9 +682,11 @@ private void rehash() {
}
}

capacity = newCapacity;
table = newTable;
usedBuckets = size;
// Capacity needs to be updated after the values, so that we won't see
// a capacity value bigger than the actual array size
capacity = newCapacity;
resizeThreshold = (int) (capacity * MapFillFactor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,9 @@ public Map<LongPair, LongPair> asMap() {
@SuppressWarnings("serial")
private static final class Section extends StampedLock {
// Keys and values are stored interleaved in the table array
private long[] table;
private volatile long[] table;

private int capacity;
private volatile int capacity;
private volatile int size;
private int usedBuckets;
private int resizeThreshold;
Expand Down Expand Up @@ -470,9 +470,11 @@ private void rehash() {
}
}

capacity = newCapacity;
table = newTable;
usedBuckets = size;
// Capacity needs to be updated after the values, so that we won't see
// a capacity value bigger than the actual array size
capacity = newCapacity;
resizeThreshold = (int) (capacity * MapFillFactor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,9 @@ public List<V> values() {
@SuppressWarnings("serial")
private static final class Section<K, V> extends StampedLock {
// Keys and values are stored interleaved in the table array
private Object[] table;
private volatile Object[] table;

private int capacity;
private volatile int capacity;
private volatile int size;
private int usedBuckets;
private int resizeThreshold;
Expand Down Expand Up @@ -449,9 +449,11 @@ private void rehash() {
}
}

capacity = newCapacity;
table = newTable;
usedBuckets = size;
// Capacity needs to be updated after the values, so that we won't see
// a capacity value bigger than the actual array size
capacity = newCapacity;
resizeThreshold = (int) (capacity * MapFillFactor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ List<V> values() {
// A section is a portion of the hash map that is covered by a single
@SuppressWarnings("serial")
private static final class Section<V> extends StampedLock {
private V[] values;
private volatile V[] values;

private int capacity;
private volatile int capacity;
private volatile int size;
private int usedBuckets;
private int resizeThreshold;
Expand Down Expand Up @@ -371,9 +371,11 @@ private void rehash() {
}
}

capacity = newCapacity;
values = newValues;
usedBuckets = size;
// Capacity needs to be updated after the values, so that we won't see
// a capacity value bigger than the actual array size
capacity = newCapacity;
resizeThreshold = (int) (capacity * MapFillFactor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -255,6 +256,73 @@ public void concurrentInsertionsAndReads() throws Throwable {
executor.shutdown();
}

@Test
public void stressConcurrentInsertionsAndReads() throws Throwable {
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(4, 1);
ExecutorService executor = Executors.newCachedThreadPool();

final int writeThreads = 16;
final int readThreads = 16;
final int n = 1_000_000;
String value = "value";

CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads);
List<Future<?>> futures = new ArrayList<>();


for (int i = 0; i < writeThreads; i++) {
final int threadIdx = i;

futures.add(executor.submit(() -> {
Random random = new Random(threadIdx);

try {
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}

for (int j = 0; j < n; j++) {
long key = random.nextLong();
// Ensure keys are uniques
key -= key % (threadIdx + 1);

map.put(key, value);
}
}));
}

for (int i = 0; i < readThreads; i++) {
final int threadIdx = i;

futures.add(executor.submit(() -> {
Random random = new Random(threadIdx);

try {
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}

for (int j = 0; j < n; j++) {
long key = random.nextLong();
// Ensure keys are uniques
key -= key % (threadIdx + 1);

map.get(key);
}
}));
}

for (Future<?> future : futures) {
future.get();
}

assertEquals(map.size(), n * writeThreads);

executor.shutdown();
}

@Test
public void testIteration() {
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
Expand Down

0 comments on commit 36edaca

Please sign in to comment.