Skip to content

Commit

Permalink
ISPN-15523 Restore procedure performance regression
Browse files Browse the repository at this point in the history
Batch requests while restoring the cache.
  • Loading branch information
jabolina committed Mar 12, 2024
1 parent 8f999f2 commit 9f9a676
Show file tree
Hide file tree
Showing 4 changed files with 282 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletionStage;
Expand All @@ -29,7 +30,9 @@
import org.infinispan.commons.marshall.MarshallingException;
import org.infinispan.commons.marshall.ProtoStreamTypeIds;
import org.infinispan.commons.util.EnumUtil;
import org.infinispan.commons.util.ProcessorInfo;
import org.infinispan.commons.util.Util;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.configuration.ConfigurationManager;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.parsing.CacheParser;
Expand Down Expand Up @@ -59,6 +62,8 @@
import org.reactivestreams.Publisher;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.schedulers.Schedulers;

/**
* {@link org.infinispan.server.core.backup.ContainerResource} implementation for {@link
Expand All @@ -68,6 +73,7 @@
* @since 12.0
*/
public class CacheResource extends AbstractContainerResource {

private final EmbeddedCacheManager cm;
private final ParserRegistry parserRegistry;

Expand Down Expand Up @@ -115,67 +121,120 @@ public CompletionStage<Void> restore(ZipFile zip) {
Properties properties = new Properties();
properties.put(CacheParser.IGNORE_DUPLICATES, true);
for (String cacheName : resources) {
stages.dependsOn(blockingManager.runBlocking(() -> {
Path cacheRoot = root.resolve(cacheName);

// Process .xml
String configFile = configFile(cacheName);
String zipPath = cacheRoot.resolve(configFile).toString();
try (InputStream is = zip.getInputStream(zip.getEntry(zipPath))) {
ConfigurationReader reader = ConfigurationReader.from(is).withProperties(properties).withNamingStrategy(NamingStrategy.KEBAB_CASE).withType(MediaType.fromExtension(configFile)).build();
ConfigurationBuilderHolder builderHolder = parserRegistry.parse(reader, configurationManager.toBuilderHolder());
Configuration config = builderHolder.getNamedConfigurationBuilders().get(cacheName).build();
log.debugf("Restoring Cache %s: %s", cacheName, config.toStringConfiguration(cacheName));
// Create the cache
SecurityActions.getOrCreateCache(cm, cacheName, config);
} catch (IOException e) {
throw new CacheException(e);
CompletionStage<Void> cs = blockingManager
.supplyBlocking(() -> recoverCache(cacheName, properties, configurationManager, zip), "restore-cache-" + cacheName)
.thenCompose(Function.identity());
stages.dependsOn(cs);
}
return stages.freeze();
}

private CompletionStage<Void> recoverCache(String cacheName, Properties properties, ConfigurationManager configurationManager, ZipFile zip) {
Path cacheRoot = root.resolve(cacheName);

// Process .xml
String configFile = configFile(cacheName);
String zipPath = cacheRoot.resolve(configFile).toString();
try (InputStream is = zip.getInputStream(zip.getEntry(zipPath))) {
ConfigurationReader reader = ConfigurationReader.from(is).withProperties(properties).withNamingStrategy(NamingStrategy.KEBAB_CASE).withType(MediaType.fromExtension(configFile)).build();
ConfigurationBuilderHolder builderHolder = parserRegistry.parse(reader, configurationManager.toBuilderHolder());
Configuration config = builderHolder.getNamedConfigurationBuilders().get(cacheName).build();
log.debugf("Restoring Cache %s: %s", cacheName, config.toStringConfiguration(cacheName));
// Create the cache
SecurityActions.getOrCreateCache(cm, cacheName, config);
} catch (IOException e) {
throw new CacheException(e);
}

// Process .dat
String dataFile = dataFile(cacheName);
String data = cacheRoot.resolve(dataFile).toString();
ZipEntry zipEntry = zip.getEntry(data);
if (zipEntry == null)
return CompletableFutures.completedNull();

AdvancedCache<Object, Object> cache = cm.getCache(cacheName).getAdvancedCache();
ComponentRegistry cr = SecurityActions.getComponentRegistry(cache);
CommandsFactory commandsFactory = cr.getCommandsFactory();
KeyPartitioner keyPartitioner = cr.getComponent(KeyPartitioner.class);
InvocationHelper invocationHelper = cr.getComponent(InvocationHelper.class);
StorageConfigurationManager scm = cr.getComponent(StorageConfigurationManager.class);
PersistenceMarshaller persistenceMarshaller = cr.getPersistenceMarshaller();
Marshaller userMarshaller = persistenceMarshaller.getUserMarshaller();

boolean keyMarshalling = !scm.getKeyStorageMediaType().isBinary();
boolean valueMarshalling = !scm.getValueStorageMediaType().isBinary();

SerializationContextRegistry ctxRegistry = SecurityActions.getGlobalComponentRegistry(cm).getComponent(SerializationContextRegistry.class);
ImmutableSerializationContext serCtx = ctxRegistry.getPersistenceCtx();

try {
DataInputStream is = new DataInputStream(zip.getInputStream(zipEntry));
Iterator<CacheBackupEntry> backupEntries = new Iterator<>() {
@Override
public boolean hasNext() {
try {
return is.available() > 0;
} catch (IOException e) {
log.errorf("Failed checking data available to recover %s", cacheName, e);
return false;
}
}

@Override
public CacheBackupEntry next() {
try {
return readMessageStream(serCtx, CacheBackupEntry.class, is);
} catch (IOException e) {
log.errorf("Failed reading entry to recover %s", cacheName, e);
throw new CacheException(e);
}
}
};

// Process .dat
String dataFile = dataFile(cacheName);
String data = cacheRoot.resolve(dataFile).toString();
ZipEntry zipEntry = zip.getEntry(data);
if (zipEntry == null)
return;

AdvancedCache<Object, Object> cache = cm.getCache(cacheName).getAdvancedCache();
ComponentRegistry cr = SecurityActions.getComponentRegistry(cache);
CommandsFactory commandsFactory = cr.getCommandsFactory();
KeyPartitioner keyPartitioner = cr.getComponent(KeyPartitioner.class);
InvocationHelper invocationHelper = cr.getComponent(InvocationHelper.class);
StorageConfigurationManager scm = cr.getComponent(StorageConfigurationManager.class);
PersistenceMarshaller persistenceMarshaller = cr.getPersistenceMarshaller();
Marshaller userMarshaller = persistenceMarshaller.getUserMarshaller();

boolean keyMarshalling = !scm.getKeyStorageMediaType().isBinary();
boolean valueMarshalling = !scm.getValueStorageMediaType().isBinary();

SerializationContextRegistry ctxRegistry = SecurityActions.getGlobalComponentRegistry(cm).getComponent(SerializationContextRegistry.class);
ImmutableSerializationContext serCtx = ctxRegistry.getPersistenceCtx();

int entries = 0;
try (DataInputStream is = new DataInputStream(zip.getInputStream(zipEntry))) {
while (is.available() > 0) {
CacheBackupEntry entry = readMessageStream(serCtx, CacheBackupEntry.class, is);
int batchSize = (int) (Long.highestOneBit(ProcessorInfo.availableProcessors()) << 1);
log.debugf("Cache %s has file of size %d, batch size %d", cacheName, is.available(), batchSize);
Single<Long> restored = Flowable.fromIterable(() -> backupEntries)
.rebatchRequests(batchSize)
.map(entry -> {
Object key = keyMarshalling ? unmarshall(entry.key, userMarshaller) : scm.getKeyWrapper().wrap(entry.key);
Object value = valueMarshalling ? unmarshall(entry.value, userMarshaller) : scm.getValueWrapper().wrap(entry.value);
Metadata metadata = unmarshall(entry.metadata, persistenceMarshaller);
Metadata internalMetadataImpl = new InternalMetadataImpl(metadata, entry.created, entry.lastUsed);

PutKeyValueCommand cmd = commandsFactory.buildPutKeyValueCommand(key, value, keyPartitioner.getSegment(key),
internalMetadataImpl, FlagBitSets.IGNORE_RETURN_VALUES);
commandsFactory.buildPutKeyValueCommand(key, value, keyPartitioner.getSegment(key),
internalMetadataImpl, FlagBitSets.IGNORE_RETURN_VALUES);
cmd.setInternalMetadata(entry.internalMetadata);
invocationHelper.invoke(cmd, 1);
entries++;
}
} catch (IOException e) {
throw new CacheException(e);
}
log.debugf("Cache %s restored %d entries", cacheName, entries);
}, "restore-cache-" + cacheName));
return cmd;
})
.flatMap(cmd -> {
// Flowable does not accept null values.
CompletionStage<Boolean> cs = invocationHelper.invokeAsync(cmd, 1)
.thenApply(CompletableFutures.toTrueFunction());
return Flowable.fromCompletionStage(cs);
}, batchSize)
.count();

return restored
.observeOn(Schedulers.from(blockingManager.asExecutor("restore-cache-" + cacheName)))
.toCompletionStage()
.handle((entries, t) -> {
try {
is.close();
} catch (IOException ignore) { }

if (t != null) {
throw CompletableFutures.asCompletionException(t);
}

log.debugf("Cache %s restored %d entries", cacheName, entries);
return null;
});
} catch (IOException e) {
throw new CacheException(e);
}
return stages.freeze();
}

private CompletionStage<Void> createCacheBackup(String cacheName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -208,6 +209,35 @@ public void testBackupAndRestoreEntryExceeding256Bytes() {
});
}

@Test(groups = "stress")
public void testBackupAndRestoreLargeCache() {
String name = "testBackupAndRestoreLargeCache";
String cacheName = "cache";
int numEntries = 250_000;
createAndRestore(
(source, backupManager) -> {
Cache<String, String> cache = source.administration().getOrCreateCache(cacheName, config(APPLICATION_OBJECT_TYPE));
for (int i = 0; i < numEntries; i++) {
cache.put("my-key-" + i, UUID.randomUUID().toString());
}

return backupManager.create(name, null);
},
(target, backupManager, backup) -> {
assertTrue(target.getCacheNames().isEmpty());
Map<String, BackupManager.Resources> paramMap = Collections.singletonMap("default",
new BackupManagerResources.Builder()
.includeAll()
.build()
);

// Waits for 30s.
await(backupManager.restore(name, backup, paramMap));
assertTrue(target.cacheExists(cacheName));
assertEquals(numEntries, target.getCache(cacheName).size());
});
}

public void testBackupAndRestoreIgnoreResources() throws Exception {
String name = "testBackupAndRestoreIgnoreResources";
createAndRestore(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,13 @@ private void performTest(Function<RestClient, RestResponse> backupAndDownload,
RestResponse getResponse = backupAndDownload.apply(client);
String fileName = getResponse.getHeader("Content-Disposition").split("=")[1];

// Copy the returned zip bytes to the local working dir
File backupZip = new File(WORKING_DIR, fileName);
try (InputStream is = getResponse.getBodyAsStream()) {
Files.copy(is, backupZip.toPath(), StandardCopyOption.REPLACE_EXISTING);
}
getResponse.close();

// Delete the backup from the server
try (RestResponse deleteResponse = delete.apply(client)) {
assertEquals(204, deleteResponse.getStatus());
Expand All @@ -437,13 +444,6 @@ private void performTest(Function<RestClient, RestResponse> backupAndDownload,
startTargetCluster();
client = target.getClient();

// Copy the returned zip bytes to the local working dir
File backupZip = new File(WORKING_DIR, fileName);
try (InputStream is = getResponse.getBodyAsStream()) {
Files.copy(is, backupZip.toPath(), StandardCopyOption.REPLACE_EXISTING);
}
getResponse.close();

if (syncToServer) {
backupZip = new File(target.driver.syncFilesToServer(0, backupZip.getAbsolutePath()));
}
Expand Down Expand Up @@ -535,7 +535,7 @@ private void assertCounter(RestClient client, String name, Element type, Storage
assertStatusAndBodyEquals(OK, Long.toString(expectedValue), client.counter(name).get());
}

private void assertNoServerBackupFilesExist(Cluster cluster) {
static void assertNoServerBackupFilesExist(Cluster cluster) {
for (int i = 0; i < 2; i++) {
cluster.driver.syncFilesFromServer(i, "data");
Path root = cluster.driver.getRootDir().toPath();
Expand Down

0 comments on commit 9f9a676

Please sign in to comment.