Skip to content

Commit a1181a8

Browse files
committed
Add integration test for concurrent access in SegmentIndex
1 parent 098b03b commit a1181a8

File tree

1 file changed

+144
-0
lines changed

1 file changed

+144
-0
lines changed
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package org.coroptis.index.it;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
5+
import java.util.Comparator;
6+
import java.util.Map;
7+
import java.util.Queue;
8+
import java.util.Random;
9+
import java.util.concurrent.ConcurrentLinkedQueue;
10+
import java.util.concurrent.CountDownLatch;
11+
import java.util.concurrent.ExecutorService;
12+
import java.util.concurrent.Executors;
13+
import java.util.concurrent.TimeUnit;
14+
import java.util.concurrent.atomic.AtomicLong;
15+
import java.util.stream.Collectors;
16+
17+
import org.hestiastore.index.Entry;
18+
import org.hestiastore.index.datatype.TypeDescriptorInteger;
19+
import org.hestiastore.index.directory.Directory;
20+
import org.hestiastore.index.directory.MemDirectory;
21+
import org.hestiastore.index.segmentindex.IndexConfiguration;
22+
import org.hestiastore.index.segmentindex.SegmentIndex;
23+
import org.junit.jupiter.api.Test;
24+
25+
/**
26+
* Multi-threaded access safety check for {@link SegmentIndex} using the
27+
* thread-safe configuration. Threads perform random puts, deletes and gets; the
28+
* resulting on-disk state is compared with a deterministic replay of the
29+
* completed mutations.
30+
*/
31+
class SegmentIndexConcurrentIT {
32+
33+
private enum OpType {
34+
PUT, DELETE
35+
}
36+
37+
private static final class Op {
38+
final long seq;
39+
final OpType type;
40+
final int key;
41+
final Integer value; // null for delete
42+
43+
Op(final long seq, final OpType type, final int key,
44+
final Integer value) {
45+
this.seq = seq;
46+
this.type = type;
47+
this.key = key;
48+
this.value = value;
49+
}
50+
}
51+
52+
@Test
53+
void concurrent_put_delete_produces_consistent_state() throws Exception {
54+
final Directory directory = new MemDirectory();
55+
final IndexConfiguration<Integer, Integer> conf = IndexConfiguration
56+
.<Integer, Integer>builder()//
57+
.withKeyClass(Integer.class)//
58+
.withValueClass(Integer.class)//
59+
.withKeyTypeDescriptor(new TypeDescriptorInteger())//
60+
.withValueTypeDescriptor(new TypeDescriptorInteger())//
61+
.withName("concurrent-index")//
62+
.withThreadSafe(true)//
63+
.withMaxNumberOfKeysInSegment(20)// small to trigger splits
64+
.withMaxNumberOfKeysInSegmentCache(30)//
65+
.withMaxNumberOfKeysInSegmentCacheDuringFlushing(40)//
66+
.withMaxNumberOfKeysInSegmentChunk(5)//
67+
.withMaxNumberOfKeysInCache(15)// force periodic flushes
68+
.withBloomFilterIndexSizeInBytes(1024)// keep bloom filter
69+
// enabled with tiny
70+
// footprint
71+
.withBloomFilterNumberOfHashFunctions(1)// minimal hashing
72+
.build();
73+
74+
final SegmentIndex<Integer, Integer> index = SegmentIndex
75+
.create(directory, conf);
76+
77+
final int threads = 4;
78+
final int operationsPerThread = 200;
79+
final ExecutorService executor = Executors.newFixedThreadPool(threads);
80+
final CountDownLatch startGate = new CountDownLatch(1);
81+
final CountDownLatch doneGate = new CountDownLatch(threads);
82+
final Queue<Op> operations = new ConcurrentLinkedQueue<>();
83+
final AtomicLong seq = new AtomicLong(0);
84+
85+
for (int t = 0; t < threads; t++) {
86+
final int threadId = t;
87+
executor.submit(() -> {
88+
final Random rnd = new Random(12345L + threadId);
89+
startGate.await();
90+
for (int i = 0; i < operationsPerThread; i++) {
91+
final int key = rnd.nextInt(50);
92+
final double p = rnd.nextDouble();
93+
if (p < 0.6) {
94+
final int value = rnd.nextInt(10_000);
95+
index.put(key, value);
96+
operations.add(new Op(seq.incrementAndGet(), OpType.PUT,
97+
key, value));
98+
} else if (p < 0.9) {
99+
index.delete(key);
100+
operations.add(new Op(seq.incrementAndGet(),
101+
OpType.DELETE, key, null));
102+
} else {
103+
index.get(key); // exercise reads
104+
}
105+
}
106+
doneGate.countDown();
107+
return null;
108+
});
109+
}
110+
111+
startGate.countDown();
112+
doneGate.await(30, TimeUnit.SECONDS);
113+
executor.shutdownNow();
114+
115+
index.flush();
116+
117+
final Map<Integer, Integer> expected = new java.util.HashMap<>();
118+
operations.stream().sorted(Comparator.comparingLong(op -> op.seq))
119+
.forEach(op -> {
120+
if (op.type == OpType.PUT) {
121+
expected.put(op.key, op.value);
122+
} else {
123+
expected.remove(op.key);
124+
}
125+
});
126+
127+
final Map<Integer, Integer> actual = index.getStream()
128+
.collect(Collectors.toMap(Entry::getKey, Entry::getValue,
129+
(first, second) -> second));
130+
131+
assertEquals(expected, actual);
132+
133+
index.close();
134+
135+
// Re-open to ensure persisted state matches expectations
136+
final SegmentIndex<Integer, Integer> reopened = SegmentIndex
137+
.open(directory, conf);
138+
final Map<Integer, Integer> reloaded = reopened.getStream()
139+
.collect(Collectors.toMap(Entry::getKey, Entry::getValue,
140+
(first, second) -> second));
141+
assertEquals(expected, reloaded);
142+
reopened.close();
143+
}
144+
}

0 commit comments

Comments
 (0)