diff --git a/persistence/jpa/src/main/java/org/infinispan/persistence/jpa/JpaStore.java b/persistence/jpa/src/main/java/org/infinispan/persistence/jpa/JpaStore.java index 35c2ae264336..9aad2d600a8d 100644 --- a/persistence/jpa/src/main/java/org/infinispan/persistence/jpa/JpaStore.java +++ b/persistence/jpa/src/main/java/org/infinispan/persistence/jpa/JpaStore.java @@ -2,8 +2,9 @@ import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -36,12 +37,11 @@ import org.infinispan.commons.configuration.ConfiguredBy; import org.infinispan.commons.marshall.StreamingMarshaller; import org.infinispan.commons.persistence.Store; +import org.infinispan.commons.util.AbstractIterator; import org.infinispan.executors.ExecutorAllCompletionService; -import org.infinispan.filter.KeyFilter; import org.infinispan.marshall.core.MarshalledEntry; import org.infinispan.marshall.core.MarshalledEntryFactory; import org.infinispan.metadata.InternalMetadata; -import org.infinispan.persistence.TaskContextImpl; import org.infinispan.persistence.jpa.configuration.JpaStoreConfiguration; import org.infinispan.persistence.jpa.impl.EntityManagerFactoryRegistry; import org.infinispan.persistence.jpa.impl.MetadataEntity; @@ -49,12 +49,17 @@ import org.infinispan.persistence.jpa.impl.Stats; import org.infinispan.persistence.spi.AdvancedLoadWriteStore; import org.infinispan.persistence.spi.InitializationContext; +import org.infinispan.util.KeyValuePair; import org.infinispan.util.TimeService; import org.infinispan.util.logging.Log; import org.infinispan.util.logging.LogFactory; +import io.reactivex.Flowable; +import io.reactivex.schedulers.Schedulers; + /** - * + * NOTE: This store can return expired entries on any given operation if {@link JpaStoreConfiguration#storeMetadata()} + * was not set to true. * @author Ray Tsang * */ @@ -70,6 +75,7 @@ public class JpaStore implements AdvancedLoadWriteStore { private StreamingMarshaller marshaller; private MarshalledEntryFactory marshallerEntryFactory; private TimeService timeService; + private ExecutorService executorService; private Stats stats = new Stats(); private boolean setFetchSizeMinInteger = false; @@ -80,6 +86,7 @@ public void init(InitializationContext ctx) { this.marshallerEntryFactory = ctx.getMarshalledEntryFactory(); this.marshaller = ctx.getMarshaller(); this.timeService = ctx.getTimeService(); + this.executorService = ctx.getExecutor(); } @Override @@ -559,11 +566,51 @@ public MarshalledEntry load(Object key) { } @Override - public void process(KeyFilter filter, CacheLoaderTask task, Executor executor, boolean fetchValue, boolean fetchMetadata) { + public Flowable publishKeys(Predicate filter) { + return Flowable.using(() -> { + EntityManager emStream = emf.createEntityManager(); + Session session = emStream.unwrap(Session.class); + Criteria criteria = session.createCriteria(configuration.entityClass()).setProjection(Projections.id()).setReadOnly(true); + if (setFetchSizeMinInteger) { + criteria.setFetchSize(Integer.MIN_VALUE); + } + ScrollableResults results = criteria.scroll(ScrollMode.FORWARD_ONLY); + return new KeyValuePair<>(emStream, results); + }, kvp -> { + ScrollableResults results = kvp.getValue(); + return Flowable.fromIterable(() -> new AbstractIterator() { + @Override + protected K getNext() { + K key = null; + while (key == null && results.next()) { + K testKey = (K) results.get(0); + if (filter == null || filter.test(testKey)) { + key = testKey; + } + } + return key; + } + } + ); + }, kvp -> { + try { + kvp.getValue().close(); + } finally { + kvp.getKey().close(); + } + }); + } + + @Override + public Flowable> publishEntries(Predicate filter, boolean fetchValue, boolean fetchMetadata) { + boolean innerFetchMetadata; if (fetchMetadata && !configuration.storeMetadata()) { log.debug("Metadata cannot be retrieved as JPA Store is not configured to persist metadata."); - fetchMetadata = false; + innerFetchMetadata = false; + } else { + innerFetchMetadata = fetchMetadata; } + // We cannot stream entities as a full table as the entity can contain collections in another tables. // Then, Hibernate uses left outer joins which give us several rows of results for each entity. // We cannot use DISTINCT_ROOT_ENTITY transformer when streaming, that's only available for list() @@ -577,87 +624,17 @@ public void process(KeyFilter filter, CacheLoaderTask task, Executor executor, b // table, because this table does not have records for keys without metadata. With such iteration, // we wouldn't iterate over all keys - therefore, we can iterate only over entity table IDs and we // have to request the metadata in separate connection. - if (fetchValue || fetchMetadata) { - final boolean fv = fetchValue; - final boolean fm = fetchMetadata; - process(filter, task, executor, new ProcessStrategy() { - @Override - public Criteria getCriteria(Session session) { - return session.createCriteria(configuration.entityClass()).setProjection(Projections.id()); - } - - @Override - public Object getKey(Object scrollResult) { - return scrollResult; - } - - @Override - public Callable getTask(CacheLoaderTask task, TaskContext taskContext, Object scrollResult, Object key) { - return new LoadingProcessTask(task, taskContext, key, fv, fm); - } - }); + Flowable keyPublisher = publishKeys(filter); + + if (fetchValue || innerFetchMetadata) { + return keyPublisher + // Run the loading in parallel using executor since it will be blocking + .parallel() + .runOn(Schedulers.from(executorService)) + .map(k -> loadEntry(k, fetchValue, innerFetchMetadata)) + .sequential(); } else { - process(filter, task, executor, new ProcessStrategy() { - @Override - public Criteria getCriteria(Session session) { - return session.createCriteria(configuration.entityClass()).setProjection(Projections.id()); - } - - @Override - public Object getKey(Object scrollResult) { - return scrollResult; - } - - @Override - public Callable getTask(CacheLoaderTask task, TaskContext taskContext, Object scrollResult, Object key) { - return new ProcessTask(task, taskContext, key, null, null); - } - }); - } - } - - private void process(KeyFilter filter, final CacheLoaderTask task, Executor executor, ProcessStrategy strategy) { - ExecutorAllCompletionService eacs = new ExecutorAllCompletionService(executor); - TaskContextImpl taskContext = new TaskContextImpl(); - EntityManager emStream = emf.createEntityManager(); - try { - EntityTransaction txStream = emStream.getTransaction(); - ScrollableResults results = null; - txStream.begin(); - try { - Session session = emStream.unwrap(Session.class); - Criteria criteria = strategy.getCriteria(session).setReadOnly(true); - if (setFetchSizeMinInteger) { - criteria.setFetchSize(Integer.MIN_VALUE); - } - results = criteria.scroll(ScrollMode.FORWARD_ONLY); - try { - while (results.next()) { - if (taskContext.isStopped()) - break; - Object result = results.get(0); - Object key = strategy.getKey(result); - if (filter != null && !filter.accept(key)) { - if (trace) log.trace("Key " + key + " filtered"); - continue; - } - eacs.submit(strategy.getTask(task, taskContext, result, key)); - } - } finally { - if (results != null) results.close(); - } - txStream.commit(); - } finally { - if (txStream != null && txStream.isActive()) { - txStream.rollback(); - } - } - } finally { - emStream.close(); - } - eacs.waitUntilAllCompleted(); - if (eacs.isExceptionThrown()) { - throw new org.infinispan.persistence.spi.PersistenceException("Execution exception!", eacs.getFirstException()); + return keyPublisher.map(k -> marshallerEntryFactory.newMarshalledEntry(k, (Object) null, null)); } } @@ -801,124 +778,44 @@ private String toString(MetadataEntity metadata) { } } - private interface ProcessStrategy { - Criteria getCriteria(Session session); - Object getKey(Object scrollResult); - Callable getTask(CacheLoaderTask task, TaskContext taskContext, Object scrollResult, Object key); - } + private MarshalledEntry loadEntry(Object key, boolean fetchValue, boolean fetchMetadata) { + Object entity; + InternalMetadata metadata; - private class ProcessTask implements Callable { - private final CacheLoaderTask task; - private final TaskContext taskContext; - private final Object key; - private final Object entity; - private final InternalMetadata metadata; - - private ProcessTask(CacheLoaderTask task, TaskContext taskContext, Object key, Object entity, InternalMetadata metadata) { - this.task = task; - this.taskContext = taskContext; - this.key = key; - this.entity = entity; - this.metadata = metadata; + // The loading of entries and metadata is offloaded to another thread. + // We need second entity manager anyway because with MySQL we can't do streaming + // in parallel with other queries using single connection + EntityManager emExec = emf.createEntityManager(); + try { + metadata = fetchMetadata ? getMetadata(emExec, key) : null; if (trace) { - log.tracef("Created process task with key=%s, value=%s, metadata=%s", key, entity, metadata); + log.tracef("Fetched metadata (fetching? %s) %s", fetchMetadata, metadata); } - } - - @Override - public Void call() throws Exception { - try { - final MarshalledEntry marshalledEntry = marshallerEntryFactory.newMarshalledEntry(key, entity, metadata); - if (marshalledEntry != null) { - task.processEntry(marshalledEntry, taskContext); - } + if (metadata != null && metadata.isExpired(timeService.wallClockTime())) { return null; - } catch (Exception e) { - log.errorExecutingParallelStoreTask(e); - throw e; } - } - } - - private class LoadingProcessTask implements Callable { - private final CacheLoaderTask task; - private final TaskContext taskContext; - private final Object key; - private final boolean fetchValue; - private final boolean fetchMetadata; - - private LoadingProcessTask(CacheLoaderTask task, TaskContext taskContext, Object key, boolean fetchValue, boolean fetchMetadata) { - this.task = task; - this.taskContext = taskContext; - this.key = key; - this.fetchValue = fetchValue; - this.fetchMetadata = fetchMetadata; - if (trace) { - log.tracef("Created process task with key=%s, fetchMetadata=%s", key, fetchMetadata); - } - } - - @Override - public Void call() throws Exception { - boolean loaded = false; - Object entity; - InternalMetadata metadata; - - // The loading of entries and metadata is offloaded to another thread. - // We need second entity manager anyway because with MySQL we can't do streaming - // in parallel with other queries using single connection - EntityManager emExec = emf.createEntityManager(); - try { - EntityTransaction txExec = emExec.getTransaction(); - txExec.begin(); - try { - do { - try { - metadata = fetchMetadata ? getMetadata(emExec, key) : null; - if (trace) { - log.tracef("Fetched metadata (fetching? %s) %s", fetchMetadata, metadata); - } - if (metadata != null && metadata.isExpired(timeService.wallClockTime())) { - return null; - } - if (fetchValue) { - entity = findEntity(emExec, key); - if (trace) { - log.tracef("Fetched value %s", entity); - } - } else { - entity = null; - } - } finally { - try { - txExec.commit(); - loaded = true; - } catch (Exception e) { - log.trace("Failed to load once", e); - } - } - } while (!loaded); - } finally { - if (txExec != null && txExec.isActive()) { - txExec.rollback(); - } - - } - } finally { - if (emExec != null) { - emExec.close(); + if (fetchValue) { + entity = findEntity(emExec, key); + if (trace) { + log.tracef("Fetched value %s", entity); } + } else { + entity = null; } - try { - final MarshalledEntry marshalledEntry = marshallerEntryFactory.newMarshalledEntry(key, entity, metadata); - if (marshalledEntry != null) { - task.processEntry(marshalledEntry, taskContext); - } - return null; - } catch (Exception e) { - log.errorExecutingParallelStoreTask(e); - throw e; + } finally { + if (emExec != null) { + emExec.close(); } } + try { + final MarshalledEntry marshalledEntry = marshallerEntryFactory.newMarshalledEntry(key, entity, metadata); + if (marshalledEntry != null) { + return marshalledEntry; + } + return null; + } catch (Exception e) { + log.errorExecutingParallelStoreTask(e); + throw e; + } } } diff --git a/persistence/jpa/src/test/java/org/infinispan/persistence/jpa/AbstractJpaStoreTest.java b/persistence/jpa/src/test/java/org/infinispan/persistence/jpa/AbstractJpaStoreTest.java index d6dbe55b2b1f..73464d933df8 100644 --- a/persistence/jpa/src/test/java/org/infinispan/persistence/jpa/AbstractJpaStoreTest.java +++ b/persistence/jpa/src/test/java/org/infinispan/persistence/jpa/AbstractJpaStoreTest.java @@ -12,7 +12,6 @@ import org.infinispan.metadata.impl.InternalMetadataImpl; import org.infinispan.persistence.InitializationContextImpl; import org.infinispan.persistence.jpa.configuration.JpaStoreConfigurationBuilder; -import org.infinispan.persistence.spi.AdvancedLoadWriteStore; import org.infinispan.test.AbstractInfinispanTest; import org.infinispan.test.fwk.TestCacheManagerFactory; import org.infinispan.test.fwk.TestInternalCacheEntryFactory; @@ -35,7 +34,7 @@ public abstract class AbstractJpaStoreTest extends AbstractInfinispanTest { protected EmbeddedCacheManager cm; - protected AdvancedLoadWriteStore cs; + protected JpaStore cs; //protected TransactionFactory gtf = new TransactionFactory(); @@ -49,7 +48,7 @@ protected EmbeddedCacheManager createCacheManager() { return TestCacheManagerFactory.createCacheManager(true); } - protected AdvancedLoadWriteStore createCacheStore() { + protected JpaStore createCacheStore() { ConfigurationBuilder builder = new ConfigurationBuilder(); builder.persistence().addStore(JpaStoreConfigurationBuilder.class) .persistenceUnitName(PERSISTENCE_UNIT_NAME) diff --git a/persistence/jpa/src/test/java/org/infinispan/persistence/jpa/BaseJpaStoreTest.java b/persistence/jpa/src/test/java/org/infinispan/persistence/jpa/BaseJpaStoreTest.java index 40b024e083f3..19daa0bd9a8f 100644 --- a/persistence/jpa/src/test/java/org/infinispan/persistence/jpa/BaseJpaStoreTest.java +++ b/persistence/jpa/src/test/java/org/infinispan/persistence/jpa/BaseJpaStoreTest.java @@ -5,10 +5,9 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import javax.persistence.EntityManager; import javax.persistence.EntityManagerFactory; @@ -18,10 +17,11 @@ import org.infinispan.commons.util.concurrent.ConcurrentHashSet; import org.infinispan.marshall.core.MarshalledEntry; import org.infinispan.marshall.core.MarshalledEntryImpl; -import org.infinispan.persistence.spi.AdvancedCacheLoader; import org.infinispan.persistence.spi.PersistenceException; import org.testng.annotations.Test; +import io.reactivex.functions.Consumer; + /** * @author Radim Vansa <rvansa@redhat.com> */ @@ -78,15 +78,12 @@ public void testPreload() throws Exception { assertEquals(cs.load(obj3.getKey()).getValue(), obj3.getValue()); final ConcurrentHashMap map = new ConcurrentHashMap(); - AdvancedCacheLoader.CacheLoaderTask taskWithValues = new AdvancedCacheLoader.CacheLoaderTask() { - @Override - public void processEntry(MarshalledEntry marshalledEntry, AdvancedCacheLoader.TaskContext taskContext) throws InterruptedException { - if (marshalledEntry.getKey() != null && marshalledEntry.getValue() != null) { - map.put(marshalledEntry.getKey(), marshalledEntry.getValue()); - } + Consumer> taskWithValues = me -> { + if (me.getKey() != null && me.getValue() != null) { + map.put(me.getKey(), me.getValue()); } }; - cs.process(null, taskWithValues, new ThreadPoolExecutor(1, 2, 1, TimeUnit.SECONDS, new LinkedBlockingQueue(10)), true, false); + cs.publishEntries(null, true, false).blockingSubscribe(taskWithValues); assertEquals(map.size(), 3); assertEquals(map.remove(obj1.getKey()), obj1.getValue()); @@ -95,20 +92,25 @@ public void processEntry(MarshalledEntry marshalledEntry, AdvancedCacheLoader.Ta assertTrue(map.isEmpty()); final ConcurrentHashSet set = new ConcurrentHashSet(); - AdvancedCacheLoader.CacheLoaderTask taskWithoutValues = new AdvancedCacheLoader.CacheLoaderTask() { - @Override - public void processEntry(MarshalledEntry marshalledEntry, AdvancedCacheLoader.TaskContext taskContext) throws InterruptedException { - if (marshalledEntry.getKey() != null) { - set.add(marshalledEntry.getKey()); - } + Consumer> taskWithoutValues = me -> { + if (me.getKey() != null) { + set.add(me.getKey()); } }; - cs.process(null, taskWithoutValues, new ThreadPoolExecutor(1, 2, 1, TimeUnit.SECONDS, new LinkedBlockingQueue(10)), false, false); + + cs.publishEntries(null, false, false).blockingSubscribe(taskWithoutValues); assertEquals(set.size(), 3); assertTrue(set.remove(obj1.getKey())); assertTrue(set.remove(obj2.getKey())); assertTrue(set.remove(obj3.getKey())); - assertTrue(map.isEmpty()); + assertTrue(set.isEmpty()); + + Set collectSet = cs.publishKeys(null).collectInto(new HashSet<>(), Set::add).blockingGet(); + assertEquals(collectSet.size(), 3); + assertTrue(collectSet.remove(obj1.getKey())); + assertTrue(collectSet.remove(obj2.getKey())); + assertTrue(collectSet.remove(obj3.getKey())); + assertTrue(collectSet.isEmpty()); } public void testStoreAndRemoveAll() {