Skip to content

Commit

Permalink
ISPN-15523 Fix the blocking publisher in the restore procedure
Browse files Browse the repository at this point in the history
  • Loading branch information
jabolina authored and tristantarrant committed Apr 17, 2024
1 parent 1c97be3 commit b15a444
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@
import java.util.stream.Collector;

import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.reactive.RxJavaInterop;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Scheduler;

/**
* Utility methods for handling {@link CompletionStage} instances.
* @author wburns
Expand Down Expand Up @@ -300,6 +304,16 @@ private void complete() {
abstract R getValue();
}

public static <I> CompletionStage<Void> performConcurrently(Iterable<I> iterable, int parallelism, Scheduler scheduler,
Function<? super I, CompletionStage<Void>> function) {
return Flowable.fromIterable(iterable)
.parallel(parallelism)
.runOn(scheduler)
.concatMap(i -> RxJavaInterop.voidCompletionStageToFlowable(function.apply(i)))
.sequential()
.ignoreElements().toCompletionStage(null);
}

public static <I> CompletionStage<Void> performSequentially(Iterator<I> iterator, Function<? super I, CompletionStage<Void>> function) {
return performSequentially(iterator, function, null, (ignore1, ignore2) -> {});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.infinispan.commons.util.EnumUtil;
import org.infinispan.commons.util.ProcessorInfo;
import org.infinispan.commons.util.Util;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.configuration.ConfigurationManager;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.parsing.CacheParser;
Expand All @@ -42,6 +41,7 @@
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.encoding.impl.StorageConfigurationManager;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.factories.GlobalComponentRegistry;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.marshall.persistence.PersistenceMarshaller;
import org.infinispan.marshall.protostream.impl.SerializationContextRegistry;
Expand All @@ -51,6 +51,7 @@
import org.infinispan.protostream.ImmutableSerializationContext;
import org.infinispan.protostream.annotations.ProtoField;
import org.infinispan.protostream.annotations.ProtoTypeId;
import org.infinispan.reactive.RxJavaInterop;
import org.infinispan.reactive.publisher.PublisherTransformers;
import org.infinispan.reactive.publisher.impl.ClusterPublisherManager;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
Expand All @@ -59,11 +60,10 @@
import org.infinispan.util.concurrent.AggregateCompletionStage;
import org.infinispan.util.concurrent.BlockingManager;
import org.infinispan.util.concurrent.CompletionStages;
import org.infinispan.util.concurrent.NonBlockingManager;
import org.reactivestreams.Publisher;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.schedulers.Schedulers;

/**
* {@link org.infinispan.server.core.backup.ContainerResource} implementation for {@link
Expand Down Expand Up @@ -116,60 +116,65 @@ public CompletionStage<Void> backup() {

@Override
public CompletionStage<Void> restore(ZipFile zip) {
AggregateCompletionStage<Void> stages = CompletionStages.aggregateCompletionStage();
ConfigurationManager configurationManager = SecurityActions.getGlobalComponentRegistry(cm).getComponent(ConfigurationManager.class);
GlobalComponentRegistry gcr = SecurityActions.getGlobalComponentRegistry(cm);
ConfigurationManager configurationManager = gcr.getComponent(ConfigurationManager.class);
NonBlockingManager nbm = gcr.getComponent(NonBlockingManager.class);
Properties properties = new Properties();
properties.put(CacheParser.IGNORE_DUPLICATES, true);
for (String cacheName : resources) {
CompletionStage<Void> cs = blockingManager
.supplyBlocking(() -> recoverCache(cacheName, properties, configurationManager, zip), "restore-cache-" + cacheName)
.thenCompose(Function.identity());
stages.dependsOn(cs);
}
return stages.freeze();
return CompletionStages.performConcurrently(resources, ProcessorInfo.availableProcessors(), nbm.asScheduler(), cacheName -> {
if (log.isDebugEnabled()) log.debugf("Start recover for '%s' at %s", cacheName, System.currentTimeMillis());
CompletionStage<Void> cs = recoverCache(cacheName, properties, configurationManager, zip);
if (log.isDebugEnabled()) {
return cs.whenComplete((ignore, t) -> log.debugf("Finished recover for '%s' at %d", cacheName, System.currentTimeMillis()));
}
return cs;
});
}

private CompletionStage<Void> recoverCache(String cacheName, Properties properties, ConfigurationManager configurationManager, ZipFile zip) {
Path cacheRoot = root.resolve(cacheName);

// Process .xml
String configFile = configFile(cacheName);
String zipPath = cacheRoot.resolve(configFile).toString();
try (InputStream is = zip.getInputStream(zip.getEntry(zipPath))) {
ConfigurationReader reader = ConfigurationReader.from(is).withProperties(properties).withNamingStrategy(NamingStrategy.KEBAB_CASE).withType(MediaType.fromExtension(configFile)).build();
ConfigurationBuilderHolder builderHolder = parserRegistry.parse(reader, configurationManager.toBuilderHolder());
Configuration config = builderHolder.getNamedConfigurationBuilders().get(cacheName).build();
log.debugf("Restoring Cache %s: %s", cacheName, config.toStringConfiguration(cacheName));
// Create the cache
SecurityActions.getOrCreateCache(cm, cacheName, config);
} catch (IOException e) {
throw new CacheException(e);
}
return createCache(cacheName, properties, configurationManager, zip)
.thenCompose(ignore -> restoreCacheContents(cacheName, zip));
}

// Process .dat
String dataFile = dataFile(cacheName);
String data = cacheRoot.resolve(dataFile).toString();
ZipEntry zipEntry = zip.getEntry(data);
if (zipEntry == null)
return CompletableFutures.completedNull();
private CompletionStage<Void> createCache(String cacheName, Properties properties, ConfigurationManager configurationManager, ZipFile zip) {
return blockingManager.runBlocking(() -> {
Path cacheRoot = root.resolve(cacheName);

AdvancedCache<Object, Object> cache = cm.getCache(cacheName).getAdvancedCache();
ComponentRegistry cr = SecurityActions.getComponentRegistry(cache);
CommandsFactory commandsFactory = cr.getCommandsFactory();
KeyPartitioner keyPartitioner = cr.getComponent(KeyPartitioner.class);
InvocationHelper invocationHelper = cr.getComponent(InvocationHelper.class);
StorageConfigurationManager scm = cr.getComponent(StorageConfigurationManager.class);
PersistenceMarshaller persistenceMarshaller = cr.getPersistenceMarshaller();
Marshaller userMarshaller = persistenceMarshaller.getUserMarshaller();
// Process .xml
String configFile = configFile(cacheName);
String zipPath = cacheRoot.resolve(configFile).toString();
try (InputStream is = zip.getInputStream(zip.getEntry(zipPath))) {
ConfigurationReader reader = ConfigurationReader.from(is).withProperties(properties).withNamingStrategy(NamingStrategy.KEBAB_CASE).withType(MediaType.fromExtension(configFile)).build();
ConfigurationBuilderHolder builderHolder = parserRegistry.parse(reader, configurationManager.toBuilderHolder());
Configuration config = builderHolder.getNamedConfigurationBuilders().get(cacheName).build();
log.debugf("Restoring Cache %s: %s", cacheName, config.toStringConfiguration(cacheName));
// Create the cache
SecurityActions.getOrCreateCache(cm, cacheName, config);
} catch (IOException e) {
throw new CacheException(e);
}
}, "create-cache-" + cacheName);
}

boolean keyMarshalling = !scm.getKeyStorageMediaType().isBinary();
boolean valueMarshalling = !scm.getValueStorageMediaType().isBinary();
private CompletionStage<Void> restoreCacheContents(String cacheName, ZipFile zip) {
Flowable<CacheBackupEntry> f = Flowable.using(() -> {
Path cacheRoot = root.resolve(cacheName);

SerializationContextRegistry ctxRegistry = SecurityActions.getGlobalComponentRegistry(cm).getComponent(SerializationContextRegistry.class);
ImmutableSerializationContext serCtx = ctxRegistry.getPersistenceCtx();
// Process .dat
String dataFile = dataFile(cacheName);
String data = cacheRoot.resolve(dataFile).toString();
ZipEntry zipEntry = zip.getEntry(data);
if (zipEntry == null)
return null;

return new DataInputStream(zip.getInputStream(zipEntry));
}, is -> {
if (is == null)
return Flowable.empty();

SerializationContextRegistry ctxRegistry = SecurityActions.getGlobalComponentRegistry(cm).getComponent(SerializationContextRegistry.class);
ImmutableSerializationContext serCtx = ctxRegistry.getPersistenceCtx();

try {
DataInputStream is = new DataInputStream(zip.getInputStream(zipEntry));
Iterator<CacheBackupEntry> backupEntries = new Iterator<>() {
@Override
public boolean hasNext() {
Expand All @@ -191,50 +196,51 @@ public CacheBackupEntry next() {
}
}
};
return Flowable.fromIterable(() -> backupEntries);
}, is -> {
if (is != null)
is.close();
});

int batchSize = (int) (Long.highestOneBit(ProcessorInfo.availableProcessors()) << 1);
log.debugf("Cache %s has file of size %d, batch size %d", cacheName, is.available(), batchSize);
Single<Long> restored = Flowable.fromIterable(() -> backupEntries)
.rebatchRequests(batchSize)
.map(entry -> {
Object key = keyMarshalling ? unmarshall(entry.key, userMarshaller) : scm.getKeyWrapper().wrap(entry.key);
Object value = valueMarshalling ? unmarshall(entry.value, userMarshaller) : scm.getValueWrapper().wrap(entry.value);
Metadata metadata = unmarshall(entry.metadata, persistenceMarshaller);
Metadata internalMetadataImpl = new InternalMetadataImpl(metadata, entry.created, entry.lastUsed);

PutKeyValueCommand cmd = commandsFactory.buildPutKeyValueCommand(key, value, keyPartitioner.getSegment(key),
internalMetadataImpl, FlagBitSets.IGNORE_RETURN_VALUES);
commandsFactory.buildPutKeyValueCommand(key, value, keyPartitioner.getSegment(key),
internalMetadataImpl, FlagBitSets.IGNORE_RETURN_VALUES);
cmd.setInternalMetadata(entry.internalMetadata);
return cmd;
})
.flatMap(cmd -> {
// Flowable does not accept null values.
CompletionStage<Boolean> cs = invocationHelper.invokeAsync(cmd, 1)
.thenApply(CompletableFutures.toTrueFunction());
return Flowable.fromCompletionStage(cs);
}, batchSize)
.count();

return restored
.observeOn(Schedulers.from(blockingManager.asExecutor("restore-cache-" + cacheName)))
.toCompletionStage()
.handle((entries, t) -> {
try {
is.close();
} catch (IOException ignore) { }

if (t != null) {
throw CompletableFutures.asCompletionException(t);
}

log.debugf("Cache %s restored %d entries", cacheName, entries);
return null;
});
} catch (IOException e) {
throw new CacheException(e);
}
// Avoid blocking calls to acquire the cache below.
if (!cm.isRunning(cacheName))
throw new IllegalStateException("Cache " + cacheName + " is not defined");

AdvancedCache<Object, Object> cache = cm.getCache(cacheName).getAdvancedCache();
ComponentRegistry cr = SecurityActions.getComponentRegistry(cache);
CommandsFactory commandsFactory = cr.getCommandsFactory();
KeyPartitioner keyPartitioner = cr.getComponent(KeyPartitioner.class);
InvocationHelper invocationHelper = cr.getComponent(InvocationHelper.class);
StorageConfigurationManager scm = cr.getComponent(StorageConfigurationManager.class);
PersistenceMarshaller persistenceMarshaller = cr.getPersistenceMarshaller();
Marshaller userMarshaller = persistenceMarshaller.getUserMarshaller();

boolean keyMarshalling = !scm.getKeyStorageMediaType().isBinary();
boolean valueMarshalling = !scm.getValueStorageMediaType().isBinary();

int batchSize = SecurityActions.getCacheConfiguration(cm, cacheName).clustering().stateTransfer().chunkSize();
Publisher<Object> p = f.rebatchRequests(batchSize)
.map(entry -> {
Object key = keyMarshalling ? unmarshall(entry.key, userMarshaller) : scm.getKeyWrapper().wrap(entry.key);
Object value = valueMarshalling ? unmarshall(entry.value, userMarshaller) : scm.getValueWrapper().wrap(entry.value);
Metadata metadata = unmarshall(entry.metadata, persistenceMarshaller);
Metadata internalMetadataImpl = new InternalMetadataImpl(metadata, entry.created, entry.lastUsed);

PutKeyValueCommand cmd = commandsFactory.buildPutKeyValueCommand(key, value, keyPartitioner.getSegment(key),
internalMetadataImpl, FlagBitSets.IGNORE_RETURN_VALUES);
commandsFactory.buildPutKeyValueCommand(key, value, keyPartitioner.getSegment(key),
internalMetadataImpl, FlagBitSets.IGNORE_RETURN_VALUES);
cmd.setInternalMetadata(entry.internalMetadata);
return cmd;
})
.flatMap(cmd -> RxJavaInterop.voidCompletionStageToFlowable(invocationHelper.invokeAsync(cmd, 1)));
// The count will subscribe.
// Execute the complete publisher as blocking, this will avoid (hide, more precisely) issues when operations
// are blocking by mistake.
return Flowable.fromPublisher(blockingManager.blockingPublisher(p))
.count()
.toCompletionStage()
.thenAccept(entries -> log.debugf("Cache %s restored %d entries", cacheName, entries));
}

private CompletionStage<Void> createCacheBackup(String cacheName) {
Expand Down

0 comments on commit b15a444

Please sign in to comment.