Skip to content

Commit

Permalink
Zstd compression integration and small refactoring
Browse files Browse the repository at this point in the history
- Added ZSTD compressor to snapshotting
- 2 JSON repository settings:
  - readonly
  - compression
were moved into the BlobStoreRepository class and removed from other repos classes where they were used.

Signed-off-by: Andrey Pleskach <ples@aiven.io>
  • Loading branch information
willyborankin committed Oct 23, 2022
1 parent e6d2f69 commit fe6afd7
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ public static final class Repository {
MAX_CHUNK_SIZE,
Property.NodeScope
);
public static final Setting<Boolean> COMPRESS_SETTING = Setting.boolSetting("compress", false, Property.NodeScope);
public static final Setting<Boolean> READONLY_SETTING = Setting.boolSetting("readonly", false, Property.NodeScope);
}

private final BlobPath basePath;
Expand All @@ -118,7 +116,7 @@ public AzureRepository(
) {
super(
metadata,
Repository.COMPRESS_SETTING.get(metadata.settings()),
COMPRESS_SETTING.get(metadata.settings()),
namedXContentRegistry,
clusterService,
recoverySettings,
Expand All @@ -142,8 +140,8 @@ public AzureRepository(
// If the user explicitly did not define a readonly value, we set it by ourselves depending on the location mode setting.
// For secondary_only setting, the repository should be read only
final LocationMode locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings());
if (Repository.READONLY_SETTING.exists(metadata.settings())) {
this.readonly = Repository.READONLY_SETTING.get(metadata.settings());
if (READONLY_SETTING.exists(metadata.settings())) {
this.readonly = READONLY_SETTING.get(metadata.settings());
} else {
this.readonly = locationMode == LocationMode.SECONDARY_ONLY;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import java.util.function.Function;

import static org.opensearch.common.settings.Setting.Property;
import static org.opensearch.common.settings.Setting.boolSetting;
import static org.opensearch.common.settings.Setting.byteSizeSetting;
import static org.opensearch.common.settings.Setting.simpleString;

Expand All @@ -70,7 +69,6 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository {

static final Setting<String> BUCKET = simpleString("bucket", Property.NodeScope, Property.Dynamic);
static final Setting<String> BASE_PATH = simpleString("base_path", Property.NodeScope, Property.Dynamic);
static final Setting<Boolean> COMPRESS = boolSetting("compress", false, Property.NodeScope, Property.Dynamic);
static final Setting<ByteSizeValue> CHUNK_SIZE = byteSizeSetting(
"chunk_size",
MAX_CHUNK_SIZE,
Expand All @@ -94,7 +92,14 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository {
final ClusterService clusterService,
final RecoverySettings recoverySettings
) {
super(metadata, getSetting(COMPRESS, metadata), namedXContentRegistry, clusterService, recoverySettings, buildLocation(metadata));
super(
metadata,
getSetting(COMPRESS_SETTING, metadata),
namedXContentRegistry,
clusterService,
recoverySettings,
buildLocation(metadata)
);
this.storageService = storageService;

String basePath = BASE_PATH.get(metadata.settings());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public HdfsRepository(
final ClusterService clusterService,
final RecoverySettings recoverySettings
) {
super(metadata, metadata.settings().getAsBoolean("compress", false), namedXContentRegistry, clusterService, recoverySettings);
super(metadata, COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, clusterService, recoverySettings);

this.environment = environment;
this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,6 @@ class S3Repository extends MeteredBlobStoreRepository {
new ByteSizeValue(5, ByteSizeUnit.TB)
);

/**
* When set to true metadata files are stored in compressed format. This setting doesn鈥檛 affect index
* files that are already compressed by default. Defaults to false.
*/
static final Setting<Boolean> COMPRESS_SETTING = Setting.boolSetting("compress", false);

/**
* Sets the S3 storage class type for the backup files. Values may be standard, reduced_redundancy,
* standard_ia, onezone_ia and intelligent_tiering. Defaults to standard.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.component.AbstractLifecycleComponent;
import org.opensearch.common.compress.Compressor;
import org.opensearch.common.compress.CompressorFactory;
import org.opensearch.common.compress.CompressorType;
import org.opensearch.common.compress.NotXContentException;
import org.opensearch.common.io.Streams;
import org.opensearch.common.lease.Releasable;
Expand Down Expand Up @@ -138,6 +140,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -246,17 +249,31 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
Setting.Property.NodeScope
);

public static final Setting<Boolean> COMPRESS_SETTING = Setting.boolSetting("compress", false, Setting.Property.NodeScope);

public static final Setting<CompressorType> COMPRESSION_TYPE_SETTING = new Setting<>(
"compression_type",
CompressorType.DEFLATE.name().toLowerCase(Locale.ROOT),
s -> CompressorType.valueOf(s.toUpperCase(Locale.ROOT)),
Setting.Property.NodeScope
);

/**
* Setting to disable writing the {@code index.latest} blob which enables the contents of this repository to be used with a
* url-repository.
*/
public static final Setting<Boolean> SUPPORT_URL_REPO = Setting.boolSetting("support_url_repo", true, Setting.Property.NodeScope);

/***
* Setting to set repository as readonly
*/
public static final Setting<Boolean> READONLY_SETTING = Setting.boolSetting("readonly", false, Setting.Property.NodeScope);

protected final boolean supportURLRepo;

private final int maxShardBlobDeleteBatch;

private final boolean compress;
private final Compressor compressor;

private final boolean cacheRepositoryData;

Expand Down Expand Up @@ -359,7 +376,6 @@ protected BlobStoreRepository(
final ClusterService clusterService,
final RecoverySettings recoverySettings
) {
this.compress = compress;
this.metadata = metadata;
this.namedXContentRegistry = namedXContentRegistry;
this.threadPool = clusterService.getClusterApplierService().threadPool();
Expand All @@ -368,10 +384,11 @@ protected BlobStoreRepository(
this.supportURLRepo = SUPPORT_URL_REPO.get(metadata.settings());
snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", ByteSizeValue.ZERO);
readOnly = metadata.settings().getAsBoolean("readonly", false);
readOnly = READONLY_SETTING.get(metadata.settings());
cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings());
bufferSize = Math.toIntExact(BUFFER_SIZE_SETTING.get(metadata.settings()).getBytes());
maxShardBlobDeleteBatch = MAX_SNAPSHOT_SHARD_BLOB_DELETE_BATCH_SIZE.get(metadata.settings());
this.compressor = compress ? COMPRESSION_TYPE_SETTING.get(metadata.settings()).compressor() : CompressorFactory.NONE_COMPRESSOR;
}

@Override
Expand Down Expand Up @@ -535,13 +552,13 @@ public void cloneShardSnapshot(
sourceMeta.asClone(target.getName(), startTime, threadPool.absoluteTimeInMillis() - startTime),
shardContainer,
target.getUUID(),
compress
compressor
);
INDEX_SHARD_SNAPSHOTS_FORMAT.write(
existingSnapshots.withClone(source.getName(), target.getName()),
shardContainer,
newGen,
compress
compressor
);
return newGen;
}));
Expand Down Expand Up @@ -684,7 +701,7 @@ public BlobStore blobStore() {
* @return true if compression is needed
*/
protected final boolean isCompress() {
return compress;
return compressor != CompressorFactory.NONE_COMPRESSOR;
}

/**
Expand Down Expand Up @@ -1390,7 +1407,7 @@ public void finalizeSnapshot(
executor.execute(
ActionRunnable.run(
allMetaListener,
() -> GLOBAL_METADATA_FORMAT.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), compress)
() -> GLOBAL_METADATA_FORMAT.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), compressor)
)
);

Expand All @@ -1403,7 +1420,7 @@ public void finalizeSnapshot(
if (metaUUID == null) {
// We don't yet have this version of the metadata so we write it
metaUUID = UUIDs.base64UUID();
INDEX_METADATA_FORMAT.write(indexMetaData, indexContainer(index), metaUUID, compress);
INDEX_METADATA_FORMAT.write(indexMetaData, indexContainer(index), metaUUID, compressor);
indexMetaIdentifiers.put(identifiers, metaUUID);
}
indexMetas.put(index, identifiers);
Expand All @@ -1412,7 +1429,7 @@ public void finalizeSnapshot(
executor.execute(
ActionRunnable.run(
allMetaListener,
() -> SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress)
() -> SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compressor)
)
);
}, onUpdateFailure);
Expand Down Expand Up @@ -2423,7 +2440,7 @@ public void snapshotShard(
// reference a generation that has not had all its files fully upload.
indexGeneration = UUIDs.randomBase64UUID();
try {
INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedBlobStoreIndexShardSnapshots, shardContainer, indexGeneration, compress);
INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedBlobStoreIndexShardSnapshots, shardContainer, indexGeneration, compressor);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(
shardId,
Expand Down Expand Up @@ -2455,7 +2472,7 @@ public void snapshotShard(
),
shardContainer,
snapshotId.getUUID(),
compress
compressor
);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);
Expand Down Expand Up @@ -2791,7 +2808,7 @@ private ShardSnapshotMetaDeleteResult deleteFromShardSnapshotMeta(
final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList);
if (indexGeneration < 0L) {
writtenGeneration = UUIDs.randomBase64UUID();
INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedSnapshots, shardContainer, writtenGeneration, compress);
INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedSnapshots, shardContainer, writtenGeneration, compressor);
} else {
writtenGeneration = String.valueOf(indexGeneration);
writeShardIndexBlobAtomic(shardContainer, indexGeneration, updatedSnapshots);
Expand Down Expand Up @@ -2831,7 +2848,7 @@ private void writeShardIndexBlobAtomic(
() -> new ParameterizedMessage("[{}] Writing shard index [{}] to [{}]", metadata.name(), indexGeneration, shardContainer.path())
);
final String blobName = INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(String.valueOf(indexGeneration));
writeAtomic(shardContainer, blobName, INDEX_SHARD_SNAPSHOTS_FORMAT.serialize(updatedSnapshots, blobName, compress), true);
writeAtomic(shardContainer, blobName, INDEX_SHARD_SNAPSHOTS_FORMAT.serialize(updatedSnapshots, blobName, compressor), true);
}

// Unused blobs are all previous index-, data- and meta-blobs and that are not referenced by the new index- as well as all
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.compress.CompressorFactory;
import org.opensearch.common.compress.Compressor;
import org.opensearch.common.io.Streams;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
Expand Down Expand Up @@ -159,15 +159,15 @@ public T deserialize(String blobName, NamedXContentRegistry namedXContentRegistr
* @param obj object to be serialized
* @param blobContainer blob container
* @param name blob name
* @param compress whether to use compression
* @param compressor whether to use compression
*/
public void write(T obj, BlobContainer blobContainer, String name, boolean compress) throws IOException {
public void write(final T obj, final BlobContainer blobContainer, final String name, final Compressor compressor) throws IOException {
final String blobName = blobName(name);
final BytesReference bytes = serialize(obj, blobName, compress);
final BytesReference bytes = serialize(obj, blobName, compressor);
blobContainer.writeBlob(blobName, bytes.streamInput(), bytes.length(), false);
}

public BytesReference serialize(final T obj, final String blobName, final boolean compress) throws IOException {
public BytesReference serialize(final T obj, final String blobName, final Compressor compressor) throws IOException {
try (BytesStreamOutput outputStream = new BytesStreamOutput()) {
try (
OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(
Expand All @@ -187,7 +187,7 @@ public void close() throws IOException {
};
XContentBuilder builder = XContentFactory.contentBuilder(
XContentType.SMILE,
compress ? CompressorFactory.COMPRESSOR.threadLocalOutputStream(indexOutputOutputStream) : indexOutputOutputStream
compressor.threadLocalOutputStream(indexOutputOutputStream)
)
) {
builder.startObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ public class FsRepository extends BlobStoreRepository {
new ByteSizeValue(Long.MAX_VALUE),
Property.NodeScope
);
public static final Setting<Boolean> COMPRESS_SETTING = Setting.boolSetting("compress", false, Property.NodeScope);
public static final Setting<Boolean> REPOSITORIES_COMPRESS_SETTING = Setting.boolSetting(
"repositories.fs.compress",
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.blobstore.fs.FsBlobStore;
import org.opensearch.common.bytes.BytesArray;
import org.opensearch.common.compress.Compressor;
import org.opensearch.common.compress.CompressorFactory;
import org.opensearch.common.compress.CompressorType;
import org.opensearch.common.io.Streams;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.io.stream.StreamInput;
Expand All @@ -54,6 +57,7 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Map;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -117,8 +121,13 @@ public void testBlobStoreOperations() throws IOException {
ChecksumBlobStoreFormat<BlobObj> checksumSMILE = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent);

// Write blobs in different formats
checksumSMILE.write(new BlobObj("checksum smile"), blobContainer, "check-smile", false);
checksumSMILE.write(new BlobObj("checksum smile compressed"), blobContainer, "check-smile-comp", true);
checksumSMILE.write(new BlobObj("checksum smile"), blobContainer, "check-smile", CompressorType.NONE.compressor());
checksumSMILE.write(
new BlobObj("checksum smile compressed"),
blobContainer,
"check-smile-comp",
CompressorFactory.DEFLATE_COMPRESSOR
);

// Assert that all checksum blobs can be read
assertEquals(checksumSMILE.read(blobContainer, "check-smile", xContentRegistry()).getText(), "checksum smile");
Expand All @@ -134,8 +143,8 @@ public void testCompressionIsApplied() throws IOException {
}
ChecksumBlobStoreFormat<BlobObj> checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent);
BlobObj blobObj = new BlobObj(veryRedundantText.toString());
checksumFormat.write(blobObj, blobContainer, "blob-comp", true);
checksumFormat.write(blobObj, blobContainer, "blob-not-comp", false);
checksumFormat.write(blobObj, blobContainer, "blob-comp", CompressorType.DEFLATE.compressor());
checksumFormat.write(blobObj, blobContainer, "blob-not-comp", CompressorType.NONE.compressor());
Map<String, BlobMetadata> blobs = blobContainer.listBlobsByPrefix("blob-");
assertEquals(blobs.size(), 2);
assertThat(blobs.get("blob-not-comp").length(), greaterThan(blobs.get("blob-comp").length()));
Expand All @@ -147,7 +156,12 @@ public void testBlobCorruption() throws IOException {
String testString = randomAlphaOfLength(randomInt(10000));
BlobObj blobObj = new BlobObj(testString);
ChecksumBlobStoreFormat<BlobObj> checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent);
checksumFormat.write(blobObj, blobContainer, "test-path", randomBoolean());
checksumFormat.write(
blobObj,
blobContainer,
"test-path",
randomFrom(Arrays.stream(CompressorType.values()).map(CompressorType::compressor).toArray(Compressor[]::new))
);
assertEquals(checksumFormat.read(blobContainer, "test-path", xContentRegistry()).getText(), testString);
randomCorruption(blobContainer, "test-path");
try {
Expand Down

0 comments on commit fe6afd7

Please sign in to comment.