Skip to content

Commit

Permalink
ISPN-8865 Move AdvancedCacheLoader over to using Publisher instead of
Browse files Browse the repository at this point in the history
process

* Convert JpaStore
  • Loading branch information
wburns authored and danberindei committed May 11, 2018
1 parent 63eafa8 commit 1e4aa89
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 218 deletions.
Expand Up @@ -2,8 +2,9 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -36,25 +37,29 @@
import org.infinispan.commons.configuration.ConfiguredBy;
import org.infinispan.commons.marshall.StreamingMarshaller;
import org.infinispan.commons.persistence.Store;
import org.infinispan.commons.util.AbstractIterator;
import org.infinispan.executors.ExecutorAllCompletionService;
import org.infinispan.filter.KeyFilter;
import org.infinispan.marshall.core.MarshalledEntry;
import org.infinispan.marshall.core.MarshalledEntryFactory;
import org.infinispan.metadata.InternalMetadata;
import org.infinispan.persistence.TaskContextImpl;
import org.infinispan.persistence.jpa.configuration.JpaStoreConfiguration;
import org.infinispan.persistence.jpa.impl.EntityManagerFactoryRegistry;
import org.infinispan.persistence.jpa.impl.MetadataEntity;
import org.infinispan.persistence.jpa.impl.MetadataEntityKey;
import org.infinispan.persistence.jpa.impl.Stats;
import org.infinispan.persistence.spi.AdvancedLoadWriteStore;
import org.infinispan.persistence.spi.InitializationContext;
import org.infinispan.util.KeyValuePair;
import org.infinispan.util.TimeService;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;

/**
*
* NOTE: This store can return expired entries on any given operation if {@link JpaStoreConfiguration#storeMetadata()}
* was not set to true.
* @author <a href="mailto:rtsang@redhat.com">Ray Tsang</a>
*
*/
Expand All @@ -70,6 +75,7 @@ public class JpaStore<K, V> implements AdvancedLoadWriteStore<K, V> {
private StreamingMarshaller marshaller;
private MarshalledEntryFactory marshallerEntryFactory;
private TimeService timeService;
private ExecutorService executorService;
private Stats stats = new Stats();
private boolean setFetchSizeMinInteger = false;

Expand All @@ -80,6 +86,7 @@ public void init(InitializationContext ctx) {
this.marshallerEntryFactory = ctx.getMarshalledEntryFactory();
this.marshaller = ctx.getMarshaller();
this.timeService = ctx.getTimeService();
this.executorService = ctx.getExecutor();
}

@Override
Expand Down Expand Up @@ -559,11 +566,51 @@ public MarshalledEntry load(Object key) {
}

@Override
public void process(KeyFilter filter, CacheLoaderTask task, Executor executor, boolean fetchValue, boolean fetchMetadata) {
public Flowable<K> publishKeys(Predicate<? super K> filter) {
return Flowable.using(() -> {
EntityManager emStream = emf.createEntityManager();
Session session = emStream.unwrap(Session.class);
Criteria criteria = session.createCriteria(configuration.entityClass()).setProjection(Projections.id()).setReadOnly(true);
if (setFetchSizeMinInteger) {
criteria.setFetchSize(Integer.MIN_VALUE);
}
ScrollableResults results = criteria.scroll(ScrollMode.FORWARD_ONLY);
return new KeyValuePair<>(emStream, results);
}, kvp -> {
ScrollableResults results = kvp.getValue();
return Flowable.fromIterable(() -> new AbstractIterator<K>() {
@Override
protected K getNext() {
K key = null;
while (key == null && results.next()) {
K testKey = (K) results.get(0);
if (filter == null || filter.test(testKey)) {
key = testKey;
}
}
return key;
}
}
);
}, kvp -> {
try {
kvp.getValue().close();
} finally {
kvp.getKey().close();
}
});
}

@Override
public Flowable<MarshalledEntry<K, V>> publishEntries(Predicate<? super K> filter, boolean fetchValue, boolean fetchMetadata) {
boolean innerFetchMetadata;
if (fetchMetadata && !configuration.storeMetadata()) {
log.debug("Metadata cannot be retrieved as JPA Store is not configured to persist metadata.");
fetchMetadata = false;
innerFetchMetadata = false;
} else {
innerFetchMetadata = fetchMetadata;
}

// We cannot stream entities as a full table as the entity can contain collections in another tables.
// Then, Hibernate uses left outer joins which give us several rows of results for each entity.
// We cannot use DISTINCT_ROOT_ENTITY transformer when streaming, that's only available for list()
Expand All @@ -577,87 +624,17 @@ public void process(KeyFilter filter, CacheLoaderTask task, Executor executor, b
// table, because this table does not have records for keys without metadata. With such iteration,
// we wouldn't iterate over all keys - therefore, we can iterate only over entity table IDs and we
// have to request the metadata in separate connection.
if (fetchValue || fetchMetadata) {
final boolean fv = fetchValue;
final boolean fm = fetchMetadata;
process(filter, task, executor, new ProcessStrategy() {
@Override
public Criteria getCriteria(Session session) {
return session.createCriteria(configuration.entityClass()).setProjection(Projections.id());
}

@Override
public Object getKey(Object scrollResult) {
return scrollResult;
}

@Override
public Callable<Void> getTask(CacheLoaderTask task, TaskContext taskContext, Object scrollResult, Object key) {
return new LoadingProcessTask(task, taskContext, key, fv, fm);
}
});
Flowable<K> keyPublisher = publishKeys(filter);

if (fetchValue || innerFetchMetadata) {
return keyPublisher
// Run the loading in parallel using executor since it will be blocking
.parallel()
.runOn(Schedulers.from(executorService))
.map(k -> loadEntry(k, fetchValue, innerFetchMetadata))
.sequential();
} else {
process(filter, task, executor, new ProcessStrategy() {
@Override
public Criteria getCriteria(Session session) {
return session.createCriteria(configuration.entityClass()).setProjection(Projections.id());
}

@Override
public Object getKey(Object scrollResult) {
return scrollResult;
}

@Override
public Callable<Void> getTask(CacheLoaderTask task, TaskContext taskContext, Object scrollResult, Object key) {
return new ProcessTask(task, taskContext, key, null, null);
}
});
}
}

private void process(KeyFilter filter, final CacheLoaderTask task, Executor executor, ProcessStrategy strategy) {
ExecutorAllCompletionService eacs = new ExecutorAllCompletionService(executor);
TaskContextImpl taskContext = new TaskContextImpl();
EntityManager emStream = emf.createEntityManager();
try {
EntityTransaction txStream = emStream.getTransaction();
ScrollableResults results = null;
txStream.begin();
try {
Session session = emStream.unwrap(Session.class);
Criteria criteria = strategy.getCriteria(session).setReadOnly(true);
if (setFetchSizeMinInteger) {
criteria.setFetchSize(Integer.MIN_VALUE);
}
results = criteria.scroll(ScrollMode.FORWARD_ONLY);
try {
while (results.next()) {
if (taskContext.isStopped())
break;
Object result = results.get(0);
Object key = strategy.getKey(result);
if (filter != null && !filter.accept(key)) {
if (trace) log.trace("Key " + key + " filtered");
continue;
}
eacs.submit(strategy.getTask(task, taskContext, result, key));
}
} finally {
if (results != null) results.close();
}
txStream.commit();
} finally {
if (txStream != null && txStream.isActive()) {
txStream.rollback();
}
}
} finally {
emStream.close();
}
eacs.waitUntilAllCompleted();
if (eacs.isExceptionThrown()) {
throw new org.infinispan.persistence.spi.PersistenceException("Execution exception!", eacs.getFirstException());
return keyPublisher.map(k -> marshallerEntryFactory.newMarshalledEntry(k, (Object) null, null));
}
}

Expand Down Expand Up @@ -801,124 +778,44 @@ private String toString(MetadataEntity metadata) {
}
}

private interface ProcessStrategy {
Criteria getCriteria(Session session);
Object getKey(Object scrollResult);
Callable<Void> getTask(CacheLoaderTask task, TaskContext taskContext, Object scrollResult, Object key);
}
private MarshalledEntry<K, V> loadEntry(Object key, boolean fetchValue, boolean fetchMetadata) {
Object entity;
InternalMetadata metadata;

private class ProcessTask implements Callable<Void> {
private final CacheLoaderTask task;
private final TaskContext taskContext;
private final Object key;
private final Object entity;
private final InternalMetadata metadata;

private ProcessTask(CacheLoaderTask task, TaskContext taskContext, Object key, Object entity, InternalMetadata metadata) {
this.task = task;
this.taskContext = taskContext;
this.key = key;
this.entity = entity;
this.metadata = metadata;
// The loading of entries and metadata is offloaded to another thread.
// We need second entity manager anyway because with MySQL we can't do streaming
// in parallel with other queries using single connection
EntityManager emExec = emf.createEntityManager();
try {
metadata = fetchMetadata ? getMetadata(emExec, key) : null;
if (trace) {
log.tracef("Created process task with key=%s, value=%s, metadata=%s", key, entity, metadata);
log.tracef("Fetched metadata (fetching? %s) %s", fetchMetadata, metadata);
}
}

@Override
public Void call() throws Exception {
try {
final MarshalledEntry marshalledEntry = marshallerEntryFactory.newMarshalledEntry(key, entity, metadata);
if (marshalledEntry != null) {
task.processEntry(marshalledEntry, taskContext);
}
if (metadata != null && metadata.isExpired(timeService.wallClockTime())) {
return null;
} catch (Exception e) {
log.errorExecutingParallelStoreTask(e);
throw e;
}
}
}

private class LoadingProcessTask implements Callable<Void> {
private final CacheLoaderTask task;
private final TaskContext taskContext;
private final Object key;
private final boolean fetchValue;
private final boolean fetchMetadata;

private LoadingProcessTask(CacheLoaderTask task, TaskContext taskContext, Object key, boolean fetchValue, boolean fetchMetadata) {
this.task = task;
this.taskContext = taskContext;
this.key = key;
this.fetchValue = fetchValue;
this.fetchMetadata = fetchMetadata;
if (trace) {
log.tracef("Created process task with key=%s, fetchMetadata=%s", key, fetchMetadata);
}
}

@Override
public Void call() throws Exception {
boolean loaded = false;
Object entity;
InternalMetadata metadata;

// The loading of entries and metadata is offloaded to another thread.
// We need second entity manager anyway because with MySQL we can't do streaming
// in parallel with other queries using single connection
EntityManager emExec = emf.createEntityManager();
try {
EntityTransaction txExec = emExec.getTransaction();
txExec.begin();
try {
do {
try {
metadata = fetchMetadata ? getMetadata(emExec, key) : null;
if (trace) {
log.tracef("Fetched metadata (fetching? %s) %s", fetchMetadata, metadata);
}
if (metadata != null && metadata.isExpired(timeService.wallClockTime())) {
return null;
}
if (fetchValue) {
entity = findEntity(emExec, key);
if (trace) {
log.tracef("Fetched value %s", entity);
}
} else {
entity = null;
}
} finally {
try {
txExec.commit();
loaded = true;
} catch (Exception e) {
log.trace("Failed to load once", e);
}
}
} while (!loaded);
} finally {
if (txExec != null && txExec.isActive()) {
txExec.rollback();
}

}
} finally {
if (emExec != null) {
emExec.close();
if (fetchValue) {
entity = findEntity(emExec, key);
if (trace) {
log.tracef("Fetched value %s", entity);
}
} else {
entity = null;
}
try {
final MarshalledEntry marshalledEntry = marshallerEntryFactory.newMarshalledEntry(key, entity, metadata);
if (marshalledEntry != null) {
task.processEntry(marshalledEntry, taskContext);
}
return null;
} catch (Exception e) {
log.errorExecutingParallelStoreTask(e);
throw e;
} finally {
if (emExec != null) {
emExec.close();
}
}
try {
final MarshalledEntry<K, V> marshalledEntry = marshallerEntryFactory.newMarshalledEntry(key, entity, metadata);
if (marshalledEntry != null) {
return marshalledEntry;
}
return null;
} catch (Exception e) {
log.errorExecutingParallelStoreTask(e);
throw e;
}
}
}
Expand Up @@ -12,7 +12,6 @@
import org.infinispan.metadata.impl.InternalMetadataImpl;
import org.infinispan.persistence.InitializationContextImpl;
import org.infinispan.persistence.jpa.configuration.JpaStoreConfigurationBuilder;
import org.infinispan.persistence.spi.AdvancedLoadWriteStore;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.test.fwk.TestInternalCacheEntryFactory;
Expand All @@ -35,7 +34,7 @@ public abstract class AbstractJpaStoreTest extends AbstractInfinispanTest {

protected EmbeddedCacheManager cm;

protected AdvancedLoadWriteStore cs;
protected JpaStore<Object, Object> cs;

//protected TransactionFactory gtf = new TransactionFactory();

Expand All @@ -49,7 +48,7 @@ protected EmbeddedCacheManager createCacheManager() {
return TestCacheManagerFactory.createCacheManager(true);
}

protected AdvancedLoadWriteStore createCacheStore() {
protected JpaStore createCacheStore() {
ConfigurationBuilder builder = new ConfigurationBuilder();
builder.persistence().addStore(JpaStoreConfigurationBuilder.class)
.persistenceUnitName(PERSISTENCE_UNIT_NAME)
Expand Down

0 comments on commit 1e4aa89

Please sign in to comment.