diff --git a/persistence/soft-index/src/test/java/org/infinispan/persistence/sifs/SoftIndexFileStoreStressTest.java b/persistence/soft-index/src/test/java/org/infinispan/persistence/sifs/SoftIndexFileStoreStressTest.java index 2fcd756c1d44..562b7c0398ec 100644 --- a/persistence/soft-index/src/test/java/org/infinispan/persistence/sifs/SoftIndexFileStoreStressTest.java +++ b/persistence/soft-index/src/test/java/org/infinispan/persistence/sifs/SoftIndexFileStoreStressTest.java @@ -21,7 +21,6 @@ import org.infinispan.container.entries.InternalCacheEntry; import org.infinispan.container.impl.InternalEntryFactory; import org.infinispan.container.impl.InternalEntryFactoryImpl; -import org.infinispan.filter.KeyFilter; import org.infinispan.marshall.TestObjectStreamMarshaller; import org.infinispan.marshall.persistence.impl.MarshalledEntryUtil; import org.infinispan.persistence.sifs.configuration.SoftIndexFileStoreConfigurationBuilder; @@ -32,11 +31,12 @@ import org.infinispan.test.fwk.TestInternalCacheEntryFactory; import org.infinispan.util.EmbeddedTimeService; import org.infinispan.util.PersistenceMockUtil; -import org.infinispan.util.concurrent.WithinThreadExecutor; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import io.reactivex.Flowable; + /** * @author Radim Vansa <rvansa@redhat.com> */ @@ -107,24 +107,24 @@ public Void call() throws Exception { Thread.sleep(100); Map entries = new HashMap<>(); - store.process(KeyFilter.ACCEPT_ALL_FILTER, (marshalledEntry, taskContext) -> { + Flowable.fromPublisher(store.entryPublisher(null, true, false)).blockingForEach(marshalledEntry -> { Object prev = entries.put(marshalledEntry.getKey(), marshalledEntry.getValue()); if (prev != null) { fail("Returned entry twice: " + marshalledEntry.getKey() + " -> " + prev + ", " + marshalledEntry.getValue()); } - }, new WithinThreadExecutor(), true, false); + }); store.stop(); // remove index files Stream.of(new File(tmpDirectory).listFiles(file -> !file.isDirectory())).map(f -> f.delete()); store.start(); - store.process(KeyFilter.ACCEPT_ALL_FILTER, (marshalledEntry, taskContext) -> { + Flowable.fromPublisher(store.entryPublisher(null, true, false)).blockingForEach(marshalledEntry -> { Object stored = entries.get(marshalledEntry.getKey()); if (stored == null) { fail("Loaded " + marshalledEntry.getKey() + " -> " + marshalledEntry.getValue() + " but it's not in the map"); } else if (!Objects.equals(stored, marshalledEntry.getValue())) { fail("Loaded " + marshalledEntry.getKey() + " -> " + marshalledEntry.getValue() + " but it's was " + stored); } - }, new WithinThreadExecutor(), true, false); + }); for (Map.Entry entry : entries.entrySet()) { MarshallableEntry loaded = store.loadEntry(entry.getKey()); if (loaded == null) {