Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZSTD snapshotting compression #2996

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `com.netflix.nebula:nebula-publishing-plugin` from 19.2.0 to 20.3.0
- Bump `com.diffplug.spotless` from 6.17.0 to 6.18.0
- Bump `io.opencensus:opencensus-api` from 0.18.0 to 0.31.1 ([#7291](https://github.com/opensearch-project/OpenSearch/pull/7291))
- Add `com.github.luben:zstd-jni` version 1.5.5-3 ([#2996](https://github.com/opensearch-project/OpenSearch/pull/2996))

### Changed
- [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948))
Expand All @@ -50,6 +51,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https://github.com/opensearch-project/OpenSearch/pull/5283))
- Improve summary error message for invalid setting updates ([#4792](https://github.com/opensearch-project/OpenSearch/pull/4792))
- Reduce memory copy in zstd compression ([#7681](https://github.com/opensearch-project/OpenSearch/pull/7681))
- Add ZSTD compression for snapshotting ([#2996](https://github.com/opensearch-project/OpenSearch/pull/2996))

### Deprecated

Expand Down
3 changes: 3 additions & 0 deletions buildSrc/version.properties
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,6 @@ bytebuddy = 1.14.3

# benchmark dependencies
jmh = 1.35

# compression
zstd = 1.5.5-3
3 changes: 0 additions & 3 deletions distribution/tools/plugin-cli/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@ thirdPartyAudit.ignoreViolations(
)

thirdPartyAudit.ignoreMissingClasses(
'com.github.luben.zstd.BufferPool',
'com.github.luben.zstd.ZstdInputStream',
'com.github.luben.zstd.ZstdOutputStream',
'org.brotli.dec.BrotliInputStream',
'org.objectweb.asm.AnnotationVisitor',
'org.objectweb.asm.Attribute',
Expand Down
1 change: 0 additions & 1 deletion modules/transport-netty4/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ thirdPartyAudit {
'org.slf4j.LoggerFactory',
'org.slf4j.spi.LocationAwareLogger',

'com.github.luben.zstd.Zstd',
'com.google.protobuf.nano.CodedOutputByteBufferNano',
'com.google.protobuf.nano.MessageNano',
'com.jcraft.jzlib.Deflater',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ public static final class Repository {
MAX_CHUNK_SIZE,
Property.NodeScope
);
public static final Setting<Boolean> COMPRESS_SETTING = Setting.boolSetting("compress", false, Property.NodeScope);
public static final Setting<Boolean> READONLY_SETTING = Setting.boolSetting("readonly", false, Property.NodeScope);
}

private final BlobPath basePath;
Expand All @@ -118,7 +116,7 @@ public AzureRepository(
) {
super(
metadata,
Repository.COMPRESS_SETTING.get(metadata.settings()),
COMPRESS_SETTING.get(metadata.settings()),
namedXContentRegistry,
clusterService,
recoverySettings,
Expand All @@ -142,8 +140,8 @@ public AzureRepository(
// If the user explicitly did not define a readonly value, we set it by ourselves depending on the location mode setting.
// For secondary_only setting, the repository should be read only
final LocationMode locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings());
if (Repository.READONLY_SETTING.exists(metadata.settings())) {
this.readonly = Repository.READONLY_SETTING.get(metadata.settings());
if (READONLY_SETTING.exists(metadata.settings())) {
this.readonly = READONLY_SETTING.get(metadata.settings());
} else {
this.readonly = locationMode == LocationMode.SECONDARY_ONLY;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import java.util.function.Function;

import static org.opensearch.common.settings.Setting.Property;
import static org.opensearch.common.settings.Setting.boolSetting;
import static org.opensearch.common.settings.Setting.byteSizeSetting;
import static org.opensearch.common.settings.Setting.simpleString;

Expand All @@ -70,7 +69,6 @@

static final Setting<String> BUCKET = simpleString("bucket", Property.NodeScope, Property.Dynamic);
static final Setting<String> BASE_PATH = simpleString("base_path", Property.NodeScope, Property.Dynamic);
static final Setting<Boolean> COMPRESS = boolSetting("compress", false, Property.NodeScope, Property.Dynamic);
static final Setting<ByteSizeValue> CHUNK_SIZE = byteSizeSetting(
"chunk_size",
MAX_CHUNK_SIZE,
Expand All @@ -94,7 +92,14 @@
final ClusterService clusterService,
final RecoverySettings recoverySettings
) {
super(metadata, getSetting(COMPRESS, metadata), namedXContentRegistry, clusterService, recoverySettings, buildLocation(metadata));
super(

Check warning on line 95 in plugins/repository-gcs/src/main/java/org/opensearch/repositories/gcs/GoogleCloudStorageRepository.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-gcs/src/main/java/org/opensearch/repositories/gcs/GoogleCloudStorageRepository.java#L95

Added line #L95 was not covered by tests
metadata,
getSetting(COMPRESS_SETTING, metadata),

Check warning on line 97 in plugins/repository-gcs/src/main/java/org/opensearch/repositories/gcs/GoogleCloudStorageRepository.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-gcs/src/main/java/org/opensearch/repositories/gcs/GoogleCloudStorageRepository.java#L97

Added line #L97 was not covered by tests
namedXContentRegistry,
clusterService,
recoverySettings,
buildLocation(metadata)

Check warning on line 101 in plugins/repository-gcs/src/main/java/org/opensearch/repositories/gcs/GoogleCloudStorageRepository.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-gcs/src/main/java/org/opensearch/repositories/gcs/GoogleCloudStorageRepository.java#L101

Added line #L101 was not covered by tests
);
this.storageService = storageService;

String basePath = BASE_PATH.get(metadata.settings());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public HdfsRepository(
final ClusterService clusterService,
final RecoverySettings recoverySettings
) {
super(metadata, metadata.settings().getAsBoolean("compress", false), namedXContentRegistry, clusterService, recoverySettings);
super(metadata, COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, clusterService, recoverySettings);

this.environment = environment;
this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,6 @@ class S3Repository extends MeteredBlobStoreRepository {
new ByteSizeValue(5, ByteSizeUnit.TB)
);

/**
* When set to true metadata files are stored in compressed format. This setting doesn’t affect index
* files that are already compressed by default. Defaults to false.
*/
static final Setting<Boolean> COMPRESS_SETTING = Setting.boolSetting("compress", false);

/**
* Sets the S3 storage class type for the backup files. Values may be standard, reduced_redundancy,
* standard_ia, onezone_ia and intelligent_tiering. Defaults to standard.
Expand Down
1 change: 0 additions & 1 deletion plugins/transport-nio/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ thirdPartyAudit {
'org.slf4j.LoggerFactory',
'org.slf4j.spi.LocationAwareLogger',

'com.github.luben.zstd.Zstd',
'com.google.protobuf.nano.CodedOutputByteBufferNano',
'com.google.protobuf.nano.MessageNano',
'com.jcraft.jzlib.Deflater',
Expand Down
2 changes: 1 addition & 1 deletion sandbox/plugins/custom-codecs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ opensearchplugin {
}

dependencies {
api "com.github.luben:zstd-jni:1.5.5-1"
api "com.github.luben:zstd-jni:${versions.zstd}"
}

yamlRestTest.enabled = false;
Expand Down

This file was deleted.

3 changes: 3 additions & 0 deletions server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ dependencies {
api "com.google.protobuf:protobuf-java:${versions.protobuf}"
api "jakarta.annotation:jakarta.annotation-api:${versions.jakarta_annotation}"

//zstd
api "com.github.luben:zstd-jni:${versions.zstd}"

testImplementation(project(":test:framework")) {
// tests use the locally compiled version of server
exclude group: 'org.opensearch', module: 'server'
Expand Down
1 change: 1 addition & 0 deletions server/licenses/zstd-jni-1.5.5-3.jar.sha1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
488dd9b15c9e8cf87d857f65f5cd6359c2853381
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,14 @@
*/
public class CompressorFactory {

public static final Compressor COMPRESSOR = new DeflateCompressor();
public static final Compressor DEFLATE_COMPRESSOR = new DeflateCompressor();

@Deprecated
public static final Compressor COMPRESSOR = DEFLATE_COMPRESSOR;

public static final Compressor ZSTD_COMPRESSOR = new ZstdCompressor();

public static final Compressor NONE_COMPRESSOR = new NoneCompressor();

public static boolean isCompressed(BytesReference bytes) {
return compressor(bytes) != null;
Expand All @@ -61,6 +68,9 @@ public static Compressor compressor(BytesReference bytes) {
// as a xcontent, we have a problem
assert XContentHelper.xContentType(bytes) == null;
return COMPRESSOR;
reta marked this conversation as resolved.
Show resolved Hide resolved
} else if (ZSTD_COMPRESSOR.isCompressed(bytes)) {
assert XContentHelper.xContentType(bytes) == null;
return ZSTD_COMPRESSOR;
}

XContentType contentType = XContentHelper.xContentType(bytes);
Expand All @@ -81,7 +91,6 @@ private static boolean isAncient(BytesReference bytes) {

/**
* Uncompress the provided data, data can be detected as compressed using {@link #isCompressed(BytesReference)}.
* @throws NullPointerException a NullPointerException will be thrown when bytes is null
*/
public static BytesReference uncompressIfNeeded(BytesReference bytes) throws IOException {
Compressor compressor = compressor(Objects.requireNonNull(bytes, "the BytesReference must not be null"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.compress;

/**
* Supported compression types
*
* @opensearch.internal
*/
public enum CompressorType {

DEFLATE {
@Override
public Compressor compressor() {
return CompressorFactory.DEFLATE_COMPRESSOR;
}
},

ZSTD {
@Override
public Compressor compressor() {
return CompressorFactory.ZSTD_COMPRESSOR;
}
},

NONE {
@Override
public Compressor compressor() {
return CompressorFactory.NONE_COMPRESSOR;
}
};

public abstract Compressor compressor();
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,9 @@ public InputStream threadLocalInputStream(InputStream in) throws IOException {
* @return decompressing stream
*/
public static InputStream inputStream(InputStream in, boolean threadLocal) throws IOException {
final byte[] headerBytes = new byte[HEADER.length];
int len = 0;
while (len < headerBytes.length) {
final int read = in.read(headerBytes, len, headerBytes.length - len);
if (read == -1) {
break;
}
len += read;
}
if (len != HEADER.length || Arrays.equals(headerBytes, HEADER) == false) {
final byte[] header = in.readNBytes(HEADER.length);

if (Arrays.equals(header, HEADER) == false) {
throw new IllegalArgumentException("Input stream is not compressed with DEFLATE!");
}

Expand Down Expand Up @@ -252,9 +245,11 @@ public BytesReference uncompress(BytesReference bytesReference) throws IOExcepti
} finally {
inflater.reset();
}
final BytesReference res = buffer.copyBytes();
buffer.reset();
return res;
try {
return buffer.copyBytes();
} finally {
buffer.reset();
}
}

// Reusable Deflater reference. Note: This is a separate instance from the one used for the compressing stream wrapper because we
Expand All @@ -271,8 +266,10 @@ public BytesReference compress(BytesReference bytesReference) throws IOException
} finally {
deflater.reset();
}
final BytesReference res = buffer.copyBytes();
buffer.reset();
return res;
try {
return buffer.copyBytes();
} finally {
buffer.reset();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.compress;

import org.opensearch.common.bytes.BytesReference;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
* {@link Compressor} no compressor implementation.
*
* @opensearch.internal
*/
public class NoneCompressor implements Compressor {
@Override
public boolean isCompressed(BytesReference bytes) {
return false;

Check warning on line 25 in server/src/main/java/org/opensearch/common/compress/NoneCompressor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/compress/NoneCompressor.java#L25

Added line #L25 was not covered by tests
}

@Override
public int headerLength() {
return 0;

Check warning on line 30 in server/src/main/java/org/opensearch/common/compress/NoneCompressor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/compress/NoneCompressor.java#L30

Added line #L30 was not covered by tests
}

@Override
public InputStream threadLocalInputStream(InputStream in) throws IOException {
return in;

Check warning on line 35 in server/src/main/java/org/opensearch/common/compress/NoneCompressor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/compress/NoneCompressor.java#L35

Added line #L35 was not covered by tests
}

@Override
public OutputStream threadLocalOutputStream(OutputStream out) throws IOException {
return out;
}

@Override
public BytesReference uncompress(BytesReference bytesReference) throws IOException {
return bytesReference;

Check warning on line 45 in server/src/main/java/org/opensearch/common/compress/NoneCompressor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/compress/NoneCompressor.java#L45

Added line #L45 was not covered by tests
}

@Override
public BytesReference compress(BytesReference bytesReference) throws IOException {
return bytesReference;

Check warning on line 50 in server/src/main/java/org/opensearch/common/compress/NoneCompressor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/compress/NoneCompressor.java#L50

Added line #L50 was not covered by tests
}

}