From c8b0d80a8286459857f2db2c0e9d3c1c076ada9d Mon Sep 17 00:00:00 2001 From: mulugetam Date: Thu, 30 May 2024 02:15:58 -0700 Subject: [PATCH] Add hardware-accelerated codecs for DEFLATE and LZ4 (#122) * Add QAT accelerated compression. Signed-off-by: Mulugeta Mammo * Use own classes for QAT codec. Apply SpotlessJavaCheck. Signed-off-by: Mulugeta Mammo * Declare fields final, unless required not to. Throw a valid type of exception. Signed-off-by: Mulugeta Mammo * Use assumeThat in the Qat test classes. Signed-off-by: Mulugeta Mammo * Add more QAT availability check in QatCodecTests. Signed-off-by: Mulugeta Mammo * Make LZ4 the default algorithm for QAT. Signed-off-by: Mulugeta Mammo * Make 'auto' the default execution mode for QAT. Also, minor clean up work. Signed-off-by: Mulugeta Mammo * Revert compression level for ZSTD to 3. Signed-off-by: Mulugeta Mammo * Replace QatLz4/DeflateCompressionMode classes with QatCompressionMode. Signed-off-by: Mulugeta Mammo * Fix a MultiCodecMergeIT test fail. Signed-off-by: Mulugeta Mammo * Remove hard-coded values for default compression level. Signed-off-by: Mulugeta Mammo --------- Signed-off-by: Mulugeta Mammo Signed-off-by: mulugetam Co-authored-by: Mulugeta Mammo --- build.gradle | 1 + licenses/qat-java-1.1.1.jar.sha1 | 1 + licenses/qat-java-LICENSE.txt | 36 +++ licenses/qat-java-NOTICE.txt | 1 + .../codec/rest/CreateIndexWithCodecIT.java | 28 +++ .../index/codec/CodecCompressionLevelIT.java | 131 +++++++++++ .../index/codec/MultiCodecMergeIT.java | 31 +-- .../codec/customcodecs/CustomCodecPlugin.java | 24 +- .../customcodecs/CustomCodecService.java | 34 ++- .../CustomCodecServiceFactory.java | 4 +- .../customcodecs/Lucene99CustomCodec.java | 5 +- .../Lucene99CustomStoredFieldsFormat.java | 2 +- .../codec/customcodecs/Lucene99QatCodec.java | 167 ++++++++++++++ .../Lucene99QatStoredFieldsFormat.java | 176 +++++++++++++++ .../customcodecs/QatCompressionMode.java | 205 +++++++++++++++++ .../codec/customcodecs/QatDeflate99Codec.java | 91 ++++++++ .../codec/customcodecs/QatLz499Codec.java | 91 ++++++++ .../codec/customcodecs/QatZipperFactory.java | 183 ++++++++++++++++ .../index/codec/customcodecs/Zstd99Codec.java | 7 +- .../customcodecs/ZstdCompressionMode.java | 5 +- .../codec/customcodecs/ZstdNoDict99Codec.java | 8 +- .../ZstdNoDictCompressionMode.java | 7 +- .../Lucene95CustomStoredFieldsFormat.java | 9 +- .../codec/customcodecs/package-info.java | 4 +- .../plugin-metadata/plugin-security.policy | 5 + .../services/org.apache.lucene.codecs.Codec | 4 +- .../customcodecs/AbstractCompressorTests.java | 3 +- ...Lucene95CustomStoredFieldsFormatTests.java | 1 - .../Lucene99QatStoredFieldsFormatTests.java | 63 ++++++ .../codec/customcodecs/QatCodecTests.java | 207 ++++++++++++++++++ .../QatDeflateCompressorTests.java | 86 ++++++++ .../customcodecs/QatLz4CompressorTests.java | 86 ++++++++ .../customcodecs/ZstdCompressorTests.java | 4 +- .../ZstdNoDictCompressorTests.java | 4 +- 34 files changed, 1643 insertions(+), 71 deletions(-) create mode 100644 licenses/qat-java-1.1.1.jar.sha1 create mode 100644 licenses/qat-java-LICENSE.txt create mode 100644 licenses/qat-java-NOTICE.txt create mode 100644 src/main/java/org/opensearch/index/codec/customcodecs/Lucene99QatCodec.java create mode 100644 src/main/java/org/opensearch/index/codec/customcodecs/Lucene99QatStoredFieldsFormat.java create mode 100644 src/main/java/org/opensearch/index/codec/customcodecs/QatCompressionMode.java create mode 100644 src/main/java/org/opensearch/index/codec/customcodecs/QatDeflate99Codec.java create mode 100644 src/main/java/org/opensearch/index/codec/customcodecs/QatLz499Codec.java create mode 100644 src/main/java/org/opensearch/index/codec/customcodecs/QatZipperFactory.java create mode 100644 src/test/java/org/opensearch/index/codec/customcodecs/Lucene99QatStoredFieldsFormatTests.java create mode 100644 src/test/java/org/opensearch/index/codec/customcodecs/QatCodecTests.java create mode 100644 src/test/java/org/opensearch/index/codec/customcodecs/QatDeflateCompressorTests.java create mode 100644 src/test/java/org/opensearch/index/codec/customcodecs/QatLz4CompressorTests.java diff --git a/build.gradle b/build.gradle index 56d4261..e0d842f 100644 --- a/build.gradle +++ b/build.gradle @@ -93,6 +93,7 @@ opensearchplugin { dependencies { api "com.github.luben:zstd-jni:1.5.5-5" + api "com.intel.qat:qat-java:1.1.1" } allprojects { diff --git a/licenses/qat-java-1.1.1.jar.sha1 b/licenses/qat-java-1.1.1.jar.sha1 new file mode 100644 index 0000000..ca4633f --- /dev/null +++ b/licenses/qat-java-1.1.1.jar.sha1 @@ -0,0 +1 @@ +3333601cdedf6a711d445118d5bc44ec6a9c65f9 diff --git a/licenses/qat-java-LICENSE.txt b/licenses/qat-java-LICENSE.txt new file mode 100644 index 0000000..d935639 --- /dev/null +++ b/licenses/qat-java-LICENSE.txt @@ -0,0 +1,36 @@ +----------------------------------------------------------------------------- +** Beginning of "BSD License" text. ** + +Qat-Java: Qat-Java is a compression library that uses IntelĀ® QAT to accelerate +compression and decompression. + +Copyright(c) 2007-2023 Intel Corporation. All rights reserved. +All rights reserved. + +BSD License + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the + distribution. + * Neither the name of Intel Corporation nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/licenses/qat-java-NOTICE.txt b/licenses/qat-java-NOTICE.txt new file mode 100644 index 0000000..9e422fd --- /dev/null +++ b/licenses/qat-java-NOTICE.txt @@ -0,0 +1 @@ +Qat-Java is a compression library that uses IntelĀ® QAT to accelerate compression and decompression. diff --git a/src/integrationTest/java/org/opensearch/index/codec/rest/CreateIndexWithCodecIT.java b/src/integrationTest/java/org/opensearch/index/codec/rest/CreateIndexWithCodecIT.java index c332094..713b96e 100644 --- a/src/integrationTest/java/org/opensearch/index/codec/rest/CreateIndexWithCodecIT.java +++ b/src/integrationTest/java/org/opensearch/index/codec/rest/CreateIndexWithCodecIT.java @@ -25,6 +25,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.Strings; +import org.opensearch.index.codec.customcodecs.QatZipperFactory; import org.opensearch.test.rest.OpenSearchRestTestCase; import javax.net.ssl.SSLEngine; @@ -37,8 +38,12 @@ import static org.opensearch.client.RestClientBuilder.DEFAULT_MAX_CONN_PER_ROUTE; import static org.opensearch.client.RestClientBuilder.DEFAULT_MAX_CONN_TOTAL; +import static org.opensearch.index.codec.customcodecs.CustomCodecService.QAT_DEFLATE_CODEC; +import static org.opensearch.index.codec.customcodecs.CustomCodecService.QAT_LZ4_CODEC; import static org.opensearch.index.codec.customcodecs.CustomCodecService.ZSTD_CODEC; import static org.opensearch.index.codec.customcodecs.CustomCodecService.ZSTD_NO_DICT_CODEC; +import static org.hamcrest.Matchers.is; +import static org.junit.Assume.assumeThat; public class CreateIndexWithCodecIT extends OpenSearchRestTestCase { public void testCreateIndexWithZstdCodec() throws IOException { @@ -62,6 +67,29 @@ public void testCreateIndexWithZstdCodec() throws IOException { } } + public void testCreateIndexWithQatCodec() throws IOException { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + + final String index = "custom-codecs-test-index"; + + // creating index + createIndex( + index, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.codec", randomFrom(QAT_DEFLATE_CODEC, QAT_LZ4_CODEC)) + .put("index.codec.compression_level", randomIntBetween(1, 6)) + .build() + ); + + try { + ensureGreen(index); + } finally { + deleteIndex(index); + } + } + @Override protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOException { RestClientBuilder builder = RestClient.builder(hosts); diff --git a/src/internalClusterTest/java/org/opensearch/index/codec/CodecCompressionLevelIT.java b/src/internalClusterTest/java/org/opensearch/index/codec/CodecCompressionLevelIT.java index 7810b58..1858f3d 100644 --- a/src/internalClusterTest/java/org/opensearch/index/codec/CodecCompressionLevelIT.java +++ b/src/internalClusterTest/java/org/opensearch/index/codec/CodecCompressionLevelIT.java @@ -13,6 +13,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.index.codec.customcodecs.CustomCodecPlugin; +import org.opensearch.index.codec.customcodecs.QatZipperFactory; import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; @@ -20,9 +21,13 @@ import java.util.Collections; import java.util.concurrent.ExecutionException; +import static org.opensearch.index.codec.customcodecs.CustomCodecService.QAT_DEFLATE_CODEC; +import static org.opensearch.index.codec.customcodecs.CustomCodecService.QAT_LZ4_CODEC; import static org.opensearch.index.codec.customcodecs.CustomCodecService.ZSTD_CODEC; import static org.opensearch.index.codec.customcodecs.CustomCodecService.ZSTD_NO_DICT_CODEC; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.hamcrest.Matchers.is; +import static org.junit.Assume.assumeThat; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST) public class CodecCompressionLevelIT extends OpenSearchIntegTestCase { @@ -80,6 +85,26 @@ public void testZStandardCodecsCreateIndexWithCompressionLevel() { ensureGreen(index); } + public void testQatCodecsCreateIndexWithCompressionLevel() { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + + internalCluster().ensureAtLeastNumDataNodes(1); + final String index = "test-index"; + + // creating index + createIndex( + index, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.codec", randomFrom(QAT_DEFLATE_CODEC, QAT_LZ4_CODEC)) + .put("index.codec.compression_level", randomIntBetween(1, 6)) + .build() + ); + + ensureGreen(index); + } + public void testZStandardToLuceneCodecsWithCompressionLevel() throws ExecutionException, InterruptedException { internalCluster().ensureAtLeastNumDataNodes(1); @@ -132,6 +157,59 @@ public void testZStandardToLuceneCodecsWithCompressionLevel() throws ExecutionEx ensureGreen(index); } + public void testQatToLuceneCodecsWithCompressionLevel() throws ExecutionException, InterruptedException { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + + internalCluster().ensureAtLeastNumDataNodes(1); + final String index = "test-index"; + + // creating index + createIndex( + index, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.codec", randomFrom(QAT_DEFLATE_CODEC, QAT_LZ4_CODEC)) + .put("index.codec.compression_level", randomIntBetween(1, 6)) + .build() + ); + ensureGreen(index); + + assertAcked(client().admin().indices().prepareClose(index).setWaitForActiveShards(1)); + + Throwable executionException = expectThrows( + ExecutionException.class, + () -> client().admin() + .indices() + .updateSettings( + new UpdateSettingsRequest(index).settings( + Settings.builder().put("index.codec", randomFrom(CodecService.DEFAULT_CODEC, CodecService.BEST_COMPRESSION_CODEC)) + ) + ) + .get() + ); + + Throwable rootCause = Throwables.getRootCause(executionException); + assertEquals(IllegalArgumentException.class, rootCause.getClass()); + assertTrue(rootCause.getMessage().startsWith("Compression level cannot be set")); + + assertAcked( + client().admin() + .indices() + .updateSettings( + new UpdateSettingsRequest(index).settings( + Settings.builder() + .put("index.codec", randomFrom(CodecService.DEFAULT_CODEC, CodecService.BEST_COMPRESSION_CODEC)) + .put("index.codec.compression_level", (String) null) + ) + ) + .get() + ); + + assertAcked(client().admin().indices().prepareOpen(index).setWaitForActiveShards(1)); + ensureGreen(index); + } + public void testLuceneToZStandardCodecsWithCompressionLevel() throws ExecutionException, InterruptedException { internalCluster().ensureAtLeastNumDataNodes(1); @@ -185,4 +263,57 @@ public void testLuceneToZStandardCodecsWithCompressionLevel() throws ExecutionEx ensureGreen(index); } + public void testLuceneToQatCodecsWithCompressionLevel() throws ExecutionException, InterruptedException { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + + internalCluster().ensureAtLeastNumDataNodes(1); + final String index = "test-index"; + + // creating index + createIndex( + index, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.codec", randomFrom(CodecService.DEFAULT_CODEC, CodecService.BEST_COMPRESSION_CODEC)) + .build() + ); + ensureGreen(index); + + assertAcked(client().admin().indices().prepareClose(index).setWaitForActiveShards(1)); + + Throwable executionException = expectThrows( + ExecutionException.class, + () -> client().admin() + .indices() + .updateSettings( + new UpdateSettingsRequest(index).settings( + Settings.builder() + .put("index.codec", randomFrom(CodecService.DEFAULT_CODEC, CodecService.BEST_COMPRESSION_CODEC)) + .put("index.codec.compression_level", randomIntBetween(1, 6)) + ) + ) + .get() + ); + + Throwable rootCause = Throwables.getRootCause(executionException); + assertEquals(IllegalArgumentException.class, rootCause.getClass()); + assertTrue(rootCause.getMessage().startsWith("Compression level cannot be set")); + + assertAcked( + client().admin() + .indices() + .updateSettings( + new UpdateSettingsRequest(index).settings( + Settings.builder() + .put("index.codec", randomFrom(QAT_DEFLATE_CODEC, QAT_LZ4_CODEC)) + .put("index.codec.compression_level", randomIntBetween(1, 6)) + ) + ) + .get() + ); + + assertAcked(client().admin().indices().prepareOpen(index).setWaitForActiveShards(1)); + ensureGreen(index); + } } diff --git a/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecMergeIT.java b/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecMergeIT.java index bb50828..ae16b5d 100644 --- a/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecMergeIT.java +++ b/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecMergeIT.java @@ -16,6 +16,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.index.codec.customcodecs.CustomCodecPlugin; +import org.opensearch.index.codec.customcodecs.QatZipperFactory; import org.opensearch.index.engine.Segment; import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; @@ -24,6 +25,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -51,25 +53,24 @@ protected Collection> nodePlugins() { public void testForceMergeMultipleCodecs() throws ExecutionException, InterruptedException { - Map codecMap = Map.of( - "best_compression", - "BEST_COMPRESSION", - "zlib", - "BEST_COMPRESSION", - "zstd_no_dict", - "ZSTD_NO_DICT", - "zstd", - "ZSTD", - "default", - "BEST_SPEED", - "lz4", - "BEST_SPEED" - ); + Map codecMap = new HashMap() { + { + put("best_compression", "BEST_COMPRESSION"); + put("zlib", "BEST_COMPRESSION"); + put("zstd_no_dict", "ZSTD_NO_DICT"); + put("zstd", "ZSTD"); + put("default", "BEST_SPEED"); + put("lz4", "BEST_SPEED"); + if (QatZipperFactory.isQatAvailable()) { + put("qat_lz4", "QAT_LZ4"); + put("qat_deflate", "QAT_DEFLATE"); + } + } + }; for (Map.Entry codec : codecMap.entrySet()) { forceMergeMultipleCodecs(codec.getKey(), codec.getValue(), codecMap); } - } private void forceMergeMultipleCodecs(String finalCodec, String finalCodecMode, Map codecMap) throws ExecutionException, diff --git a/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecPlugin.java b/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecPlugin.java index 91a13a1..dc3a917 100644 --- a/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecPlugin.java +++ b/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecPlugin.java @@ -8,28 +8,32 @@ package org.opensearch.index.codec.customcodecs; +import org.opensearch.common.settings.Setting; import org.opensearch.index.IndexSettings; import org.opensearch.index.codec.CodecServiceFactory; import org.opensearch.index.engine.EngineConfig; import org.opensearch.plugins.EnginePlugin; import org.opensearch.plugins.Plugin; +import java.util.Arrays; +import java.util.List; import java.util.Optional; /** * A plugin that implements custom codecs. Supports these codecs: + * *
    - *
  • ZSTD - *
  • ZSTDNODICT + *
  • ZSTD_CODEC + *
  • ZSTD_NO_DICT_CODEC + *
  • QAT_LZ4 + *
  • QAT_DEFLATE *
* * @opensearch.internal */ public final class CustomCodecPlugin extends Plugin implements EnginePlugin { - /** - * Creates a new instance - */ + /** Creates a new instance */ public CustomCodecPlugin() {} /** @@ -39,9 +43,17 @@ public CustomCodecPlugin() {} @Override public Optional getCustomCodecServiceFactory(final IndexSettings indexSettings) { String codecName = indexSettings.getValue(EngineConfig.INDEX_CODEC_SETTING); - if (codecName.equals(CustomCodecService.ZSTD_NO_DICT_CODEC) || codecName.equals(CustomCodecService.ZSTD_CODEC)) { + if (codecName.equals(CustomCodecService.ZSTD_NO_DICT_CODEC) + || codecName.equals(CustomCodecService.ZSTD_CODEC) + || codecName.equals(CustomCodecService.QAT_LZ4_CODEC) + || codecName.equals(CustomCodecService.QAT_DEFLATE_CODEC)) { return Optional.of(new CustomCodecServiceFactory()); } return Optional.empty(); } + + @Override + public List> getSettings() { + return Arrays.asList(Lucene99QatCodec.INDEX_CODEC_QAT_MODE_SETTING); + } } diff --git a/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecService.java b/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecService.java index 90519d2..175613e 100644 --- a/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecService.java +++ b/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecService.java @@ -21,26 +21,28 @@ import static org.opensearch.index.engine.EngineConfig.INDEX_CODEC_COMPRESSION_LEVEL_SETTING; -/** - * CustomCodecService provides ZSTD and ZSTD_NO_DICT compression codecs. - */ +/** CustomCodecService provides ZSTD, ZSTD_NO_DICT, QAT_LZ4, and QAT_DEFLATE compression codecs. */ public class CustomCodecService extends CodecService { private final Map codecs; - /** - * ZStandard codec - */ + + /** ZStandard codec */ public static final String ZSTD_CODEC = "zstd"; - /** - * ZStandard without dictionary codec - */ + + /** ZStandard without dictionary codec */ public static final String ZSTD_NO_DICT_CODEC = "zstd_no_dict"; + /** Hardware accelerated (Intel QAT) compression codec for LZ4. */ + public static final String QAT_LZ4_CODEC = "qat_lz4"; + + /** Hardware accelerated (Intel QAT) compression codec for DEFLATE. */ + public static final String QAT_DEFLATE_CODEC = "qat_deflate"; + /** * Creates a new CustomCodecService. * * @param mapperService The mapper service. * @param indexSettings The index settings. - * @param logger The logger. + * @param logger The logger. */ public CustomCodecService(MapperService mapperService, IndexSettings indexSettings, Logger logger) { super(mapperService, indexSettings, logger); @@ -49,9 +51,21 @@ public CustomCodecService(MapperService mapperService, IndexSettings indexSettin if (mapperService == null) { codecs.put(ZSTD_CODEC, new Zstd99Codec(compressionLevel)); codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDict99Codec(compressionLevel)); + codecs.put(QAT_LZ4_CODEC, new QatLz499Codec(compressionLevel, () -> { + return indexSettings.getValue(Lucene99QatCodec.INDEX_CODEC_QAT_MODE_SETTING); + })); + codecs.put(QAT_DEFLATE_CODEC, new QatDeflate99Codec(compressionLevel, () -> { + return indexSettings.getValue(Lucene99QatCodec.INDEX_CODEC_QAT_MODE_SETTING); + })); } else { codecs.put(ZSTD_CODEC, new Zstd99Codec(mapperService, logger, compressionLevel)); codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDict99Codec(mapperService, logger, compressionLevel)); + codecs.put(QAT_LZ4_CODEC, new QatLz499Codec(mapperService, logger, compressionLevel, () -> { + return indexSettings.getValue(Lucene99QatCodec.INDEX_CODEC_QAT_MODE_SETTING); + })); + codecs.put(QAT_DEFLATE_CODEC, new QatDeflate99Codec(mapperService, logger, compressionLevel, () -> { + return indexSettings.getValue(Lucene99QatCodec.INDEX_CODEC_QAT_MODE_SETTING); + })); } this.codecs = codecs.immutableMap(); } diff --git a/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecServiceFactory.java b/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecServiceFactory.java index d634616..39c9d67 100644 --- a/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecServiceFactory.java +++ b/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecServiceFactory.java @@ -12,9 +12,7 @@ import org.opensearch.index.codec.CodecServiceConfig; import org.opensearch.index.codec.CodecServiceFactory; -/** - * A factory for creating new {@link CodecService} instance - */ +/** A factory for creating new {@link CodecService} instance */ public class CustomCodecServiceFactory implements CodecServiceFactory { /** Creates a new instance. */ diff --git a/src/main/java/org/opensearch/index/codec/customcodecs/Lucene99CustomCodec.java b/src/main/java/org/opensearch/index/codec/customcodecs/Lucene99CustomCodec.java index e0b3c2b..6b31167 100644 --- a/src/main/java/org/opensearch/index/codec/customcodecs/Lucene99CustomCodec.java +++ b/src/main/java/org/opensearch/index/codec/customcodecs/Lucene99CustomCodec.java @@ -12,11 +12,14 @@ import org.apache.lucene.codecs.FilterCodec; import org.apache.lucene.codecs.StoredFieldsFormat; import org.apache.lucene.codecs.lucene99.Lucene99Codec; +import org.opensearch.common.settings.Settings; import org.opensearch.index.codec.PerFieldMappingPostingFormatCodec; import org.opensearch.index.mapper.MapperService; import java.util.Set; +import static org.opensearch.index.engine.EngineConfig.INDEX_CODEC_COMPRESSION_LEVEL_SETTING; + /** * * Extends {@link FilterCodec} to reuse the functionality of Lucene Codec. @@ -28,7 +31,7 @@ public abstract class Lucene99CustomCodec extends FilterCodec { /** Default compression level used for compression */ - public static final int DEFAULT_COMPRESSION_LEVEL = 3; + public static final int DEFAULT_COMPRESSION_LEVEL = INDEX_CODEC_COMPRESSION_LEVEL_SETTING.getDefault(Settings.EMPTY); /** Each mode represents a compression algorithm. */ public enum Mode { diff --git a/src/main/java/org/opensearch/index/codec/customcodecs/Lucene99CustomStoredFieldsFormat.java b/src/main/java/org/opensearch/index/codec/customcodecs/Lucene99CustomStoredFieldsFormat.java index 235c801..271827b 100644 --- a/src/main/java/org/opensearch/index/codec/customcodecs/Lucene99CustomStoredFieldsFormat.java +++ b/src/main/java/org/opensearch/index/codec/customcodecs/Lucene99CustomStoredFieldsFormat.java @@ -106,7 +106,7 @@ StoredFieldsFormat impl(Lucene99CustomCodec.Mode mode) { case ZSTD_NO_DICT: return getCustomCompressingStoredFieldsFormat("CustomStoredFieldsZstdNoDict", this.zstdNoDictCompressionMode); default: - throw new AssertionError(); + throw new IllegalStateException("Unsupported compression mode: " + mode); } } diff --git a/src/main/java/org/opensearch/index/codec/customcodecs/Lucene99QatCodec.java b/src/main/java/org/opensearch/index/codec/customcodecs/Lucene99QatCodec.java new file mode 100644 index 0000000..20f265d --- /dev/null +++ b/src/main/java/org/opensearch/index/codec/customcodecs/Lucene99QatCodec.java @@ -0,0 +1,167 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.FilterCodec; +import org.apache.lucene.codecs.StoredFieldsFormat; +import org.apache.lucene.codecs.lucene99.Lucene99Codec; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Setting.Property; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.codec.PerFieldMappingPostingFormatCodec; +import org.opensearch.index.mapper.MapperService; + +import java.util.Set; +import java.util.function.Supplier; + +import com.intel.qat.QatZipper; + +import static org.opensearch.index.engine.EngineConfig.INDEX_CODEC_COMPRESSION_LEVEL_SETTING; + +/** + * Extends {@link FilterCodec} to reuse the functionality of Lucene Codec. + * + * @opensearch.internal + */ +public abstract class Lucene99QatCodec extends FilterCodec { + + /** A setting to specifiy the QAT acceleration mode. */ + public static final Setting INDEX_CODEC_QAT_MODE_SETTING = new Setting<>("index.codec.qatmode", "auto", s -> { + switch (s) { + case "auto": + return QatZipper.Mode.AUTO; + case "hardware": + return QatZipper.Mode.HARDWARE; + default: + throw new IllegalArgumentException("Unknown value for [index.codec.qatmode] must be one of [auto, hardware] but was: " + s); + } + }, Property.IndexScope, Property.Dynamic); + + /** A terse way to reference the default QAT execution mode. */ + public static final QatZipper.Mode DEFAULT_QAT_MODE = INDEX_CODEC_QAT_MODE_SETTING.getDefault(Settings.EMPTY); + + /** Default compression level used for compression */ + public static final int DEFAULT_COMPRESSION_LEVEL = INDEX_CODEC_COMPRESSION_LEVEL_SETTING.getDefault(Settings.EMPTY); + + /** Each mode represents a compression algorithm. */ + public enum Mode { + /** QAT lz4 mode. */ + QAT_LZ4("QATLZ499", Set.of("qat_lz4")), + + /** QAT deflate mode. */ + QAT_DEFLATE("QATDEFLATE99", Set.of("qat_deflate")); + + private final String codec; + private final Set aliases; + + Mode(String codec, Set aliases) { + this.codec = codec; + this.aliases = aliases; + } + + /** Returns the Codec that is registered with Lucene */ + public String getCodec() { + return codec; + } + + /** Returns the aliases of the Codec */ + public Set getAliases() { + return aliases; + } + } + + /** The default compression mode. */ + public static final Mode DEFAULT_COMPRESSION_MODE = Mode.QAT_LZ4; + + private final StoredFieldsFormat storedFieldsFormat; + + /** + * Creates a new compression codec with the default compression level. + * + * @param mode The compression codec (QAT_LZ4 or QAT_DEFLATE). + */ + public Lucene99QatCodec(Mode mode) { + this(mode, DEFAULT_COMPRESSION_LEVEL); + } + + /** + * Creates a new compression codec with the given compression level. We use lowercase letters when + * registering the codec so that we remain consistent with the other compression codecs: default, + * lucene_default, and best_compression. + * + * @param mode The compression codec (QAT_LZ4 or QAT_DEFLATE). + * @param compressionLevel The compression level. + */ + public Lucene99QatCodec(Mode mode, int compressionLevel) { + super(mode.getCodec(), new Lucene99Codec()); + this.storedFieldsFormat = new Lucene99QatStoredFieldsFormat(mode, compressionLevel); + } + + /** + * Creates a new compression codec with the given compression level. We use lowercase letters when + * registering the codec so that we remain consistent with the other compression codecs: default, + * lucene_default, and best_compression. + * + * @param mode The compression codec (QAT_LZ4 or QAT_DEFLATE). + * @param compressionLevel The compression level. + * @param supplier supplier for QAT mode. + */ + public Lucene99QatCodec(Mode mode, int compressionLevel, Supplier supplier) { + super(mode.getCodec(), new Lucene99Codec()); + this.storedFieldsFormat = new Lucene99QatStoredFieldsFormat(mode, compressionLevel, supplier); + } + + /** + * Creates a new compression codec with the given compression level. We use lowercase letters when + * registering the codec so that we remain consistent with the other compression codecs: default, + * lucene_default, and best_compression. + * + * @param mode The compression codec (QAT_LZ4 or QAT_DEFLATE). + * @param compressionLevel The compression level. + * @param mapperService The mapper service. + * @param logger The logger. + */ + public Lucene99QatCodec(Mode mode, int compressionLevel, MapperService mapperService, Logger logger) { + super(mode.getCodec(), new PerFieldMappingPostingFormatCodec(Lucene99Codec.Mode.BEST_SPEED, mapperService, logger)); + this.storedFieldsFormat = new Lucene99QatStoredFieldsFormat(mode, compressionLevel); + } + + /** + * Creates a new compression codec with the given compression level. We use lowercase letters when + * registering the codec so that we remain consistent with the other compression codecs: default, + * lucene_default, and best_compression. + * + * @param mode The compression codec (QAT_LZ4 or QAT_DEFLATE). + * @param compressionLevel The compression level. + * @param mapperService The mapper service. + * @param logger The logger. + * @param supplier supplier for QAT mode. + */ + public Lucene99QatCodec( + Mode mode, + int compressionLevel, + MapperService mapperService, + Logger logger, + Supplier supplier + ) { + super(mode.getCodec(), new PerFieldMappingPostingFormatCodec(Lucene99Codec.Mode.BEST_SPEED, mapperService, logger)); + this.storedFieldsFormat = new Lucene99QatStoredFieldsFormat(mode, compressionLevel, supplier); + } + + @Override + public StoredFieldsFormat storedFieldsFormat() { + return storedFieldsFormat; + } + + @Override + public String toString() { + return getClass().getSimpleName(); + } +} diff --git a/src/main/java/org/opensearch/index/codec/customcodecs/Lucene99QatStoredFieldsFormat.java b/src/main/java/org/opensearch/index/codec/customcodecs/Lucene99QatStoredFieldsFormat.java new file mode 100644 index 0000000..9ea9dd9 --- /dev/null +++ b/src/main/java/org/opensearch/index/codec/customcodecs/Lucene99QatStoredFieldsFormat.java @@ -0,0 +1,176 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.apache.lucene.codecs.StoredFieldsFormat; +import org.apache.lucene.codecs.StoredFieldsReader; +import org.apache.lucene.codecs.StoredFieldsWriter; +import org.apache.lucene.codecs.compressing.CompressionMode; +import org.apache.lucene.codecs.lucene90.compressing.Lucene90CompressingStoredFieldsFormat; +import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.SegmentInfo; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; + +import java.io.IOException; +import java.util.Objects; +import java.util.function.Supplier; + +import com.intel.qat.QatZipper; + +/** Stored field format used by pluggable codec */ +public class Lucene99QatStoredFieldsFormat extends StoredFieldsFormat { + + /** A key that we use to map to a mode */ + public static final String MODE_KEY = Lucene99QatStoredFieldsFormat.class.getSimpleName() + ".mode"; + + private static final int QAT_DEFLATE_BLOCK_LENGTH = 10 * 48 * 1024; + private static final int QAT_DEFLATE_MAX_DOCS_PER_BLOCK = 4096; + private static final int QAT_DEFLATE_BLOCK_SHIFT = 10; + + private static final int QAT_LZ4_BLOCK_LENGTH = 10 * 8 * 1024; + private static final int QAT_LZ4_MAX_DOCS_PER_BLOCK = 4096; + private static final int QAT_LZ4_BLOCK_SHIFT = 10; + + private final QatCompressionMode qatCompressionMode; + private final Lucene99QatCodec.Mode mode; + + /** default constructor */ + public Lucene99QatStoredFieldsFormat() { + this(Lucene99QatCodec.DEFAULT_COMPRESSION_MODE, Lucene99QatCodec.DEFAULT_COMPRESSION_LEVEL); + } + + /** + * Creates a new instance. + * + * @param mode The mode represents QAT_LZ4 or QAT_DEFLATE + */ + public Lucene99QatStoredFieldsFormat(Lucene99QatCodec.Mode mode) { + this(mode, Lucene99QatCodec.DEFAULT_COMPRESSION_LEVEL); + } + + /** + * Creates a new instance with the specified mode and compression level. + * + * @param mode The mode represents QAT_LZ4 or QAT_DEFLATE + * @param compressionLevel The compression level for the mode. + */ + public Lucene99QatStoredFieldsFormat(Lucene99QatCodec.Mode mode, int compressionLevel) { + this(mode, compressionLevel, () -> { return Lucene99QatCodec.DEFAULT_QAT_MODE; }); + } + + /** + * Creates a new instance. + * + * @param mode The mode represents QAT_LZ4 or QAT_DEFLATE + * @param supplier a supplier for QAT acceleration mode. + */ + public Lucene99QatStoredFieldsFormat(Lucene99QatCodec.Mode mode, Supplier supplier) { + this(mode, Lucene99QatCodec.DEFAULT_COMPRESSION_LEVEL, supplier); + } + + /** + * Creates a new instance with the specified mode and compression level. + * + * @param mode The mode represents QAT_LZ4 or QAT_DEFLATE + * @param compressionLevel The compression level for the mode. + * @param supplier a supplier for QAT acceleration mode. + */ + public Lucene99QatStoredFieldsFormat(Lucene99QatCodec.Mode mode, int compressionLevel, Supplier supplier) { + this.mode = Objects.requireNonNull(mode); + qatCompressionMode = new QatCompressionMode(mode, compressionLevel, supplier); + } + + /** + * Returns a {@link StoredFieldsReader} to load stored fields. + * + * @param directory The index directory. + * @param si The SegmentInfo that stores segment information. + * @param fn The fieldInfos. + * @param context The IOContext that holds additional details on the merge/search context. + */ + @Override + public StoredFieldsReader fieldsReader(Directory directory, SegmentInfo si, FieldInfos fn, IOContext context) throws IOException { + if (si.getAttribute(MODE_KEY) != null) { + String value = si.getAttribute(MODE_KEY); + Lucene99QatCodec.Mode mode = Lucene99QatCodec.Mode.valueOf(value); + return impl(mode).fieldsReader(directory, si, fn, context); + } else { + throw new IllegalStateException("missing value for " + MODE_KEY + " for segment: " + si.name); + } + } + + /** + * Returns a {@link StoredFieldsReader} to write stored fields. + * + * @param directory The index directory. + * @param si The SegmentInfo that stores segment information. + * @param context The IOContext that holds additional details on the merge/search context. + */ + @Override + public StoredFieldsWriter fieldsWriter(Directory directory, SegmentInfo si, IOContext context) throws IOException { + String previous = si.putAttribute(MODE_KEY, mode.name()); + if (previous != null && previous.equals(mode.name()) == false) { + throw new IllegalStateException( + "found existing value for " + MODE_KEY + " for segment: " + si.name + " old = " + previous + ", new = " + mode.name() + ); + } + return impl(mode).fieldsWriter(directory, si, context); + } + + private StoredFieldsFormat impl(Lucene99QatCodec.Mode mode) { + switch (mode) { + case QAT_LZ4: + return getQatCompressingStoredFieldsFormat( + "QatStoredFieldsLz4", + qatCompressionMode, + QAT_LZ4_BLOCK_LENGTH, + QAT_LZ4_MAX_DOCS_PER_BLOCK, + QAT_LZ4_BLOCK_SHIFT + ); + case QAT_DEFLATE: + return getQatCompressingStoredFieldsFormat( + "QatStoredFieldsDeflate", + qatCompressionMode, + QAT_DEFLATE_BLOCK_LENGTH, + QAT_DEFLATE_MAX_DOCS_PER_BLOCK, + QAT_DEFLATE_BLOCK_SHIFT + ); + default: + throw new IllegalStateException("Unsupported compression mode: " + mode); + } + } + + private StoredFieldsFormat getQatCompressingStoredFieldsFormat( + String formatName, + CompressionMode compressionMode, + int blockSize, + int maxDocs, + int blockShift + ) { + return new Lucene90CompressingStoredFieldsFormat(formatName, compressionMode, blockSize, maxDocs, blockShift); + } + + /** + * Gets the mode of compression. + * + * @return either QAT_LZ4 or QAT_DEFLATE + */ + public Lucene99QatCodec.Mode getMode() { + return mode; + } + + /** + * + * @return the CompressionMode instance. + */ + public QatCompressionMode getCompressionMode() { + return qatCompressionMode; + } +} diff --git a/src/main/java/org/opensearch/index/codec/customcodecs/QatCompressionMode.java b/src/main/java/org/opensearch/index/codec/customcodecs/QatCompressionMode.java new file mode 100644 index 0000000..193c825 --- /dev/null +++ b/src/main/java/org/opensearch/index/codec/customcodecs/QatCompressionMode.java @@ -0,0 +1,205 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.apache.lucene.codecs.compressing.CompressionMode; +import org.apache.lucene.codecs.compressing.Compressor; +import org.apache.lucene.codecs.compressing.Decompressor; +import org.apache.lucene.store.ByteBuffersDataInput; +import org.apache.lucene.store.DataInput; +import org.apache.lucene.store.DataOutput; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.BytesRef; + +import java.io.IOException; +import java.util.function.Supplier; + +import com.intel.qat.QatZipper; + +/** QatCompressionMode offers QAT_LZ4 and QAT_DEFLATE compressors. */ +public class QatCompressionMode extends CompressionMode { + + private static final int NUM_SUB_BLOCKS = 10; + + private final QatZipper.Algorithm algorithm; + private final int compressionLevel; + private final Supplier supplier; + + /** default constructor */ + protected QatCompressionMode() { + this(Lucene99QatCodec.DEFAULT_COMPRESSION_MODE, Lucene99QatCodec.DEFAULT_COMPRESSION_LEVEL, () -> { + return Lucene99QatCodec.DEFAULT_QAT_MODE; + }); + } + + /** + * Creates a new instance. + * + * @param mode The compression mode (QAT_LZ4 or QAT_DEFLATE) + */ + protected QatCompressionMode(Lucene99QatCodec.Mode mode) { + this(mode, Lucene99QatCodec.DEFAULT_COMPRESSION_LEVEL, () -> { return Lucene99QatCodec.DEFAULT_QAT_MODE; }); + } + + /** + * Creates a new instance. + * + * @param mode The compression mode (QAT_LZ4 or QAT_DEFLATE) + * @param compressionLevel The compression level to use. + */ + protected QatCompressionMode(Lucene99QatCodec.Mode mode, int compressionLevel) { + this(mode, compressionLevel, () -> { return Lucene99QatCodec.DEFAULT_QAT_MODE; }); + } + + /** + * Creates a new instance. + * + * @param mode The compression mode (QAT_LZ4 or QAT_DEFLATE) + * @param compressionLevel The compression level to use. + * @param supplier a supplier for QAT acceleration mode. + */ + protected QatCompressionMode(Lucene99QatCodec.Mode mode, int compressionLevel, Supplier supplier) { + this.algorithm = mode == Lucene99QatCodec.Mode.QAT_LZ4 ? QatZipper.Algorithm.LZ4 : QatZipper.Algorithm.DEFLATE; + this.compressionLevel = compressionLevel; + this.supplier = supplier; + } + + @Override + public Compressor newCompressor() { + return new QatCompressor(algorithm, compressionLevel, supplier.get()); + } + + @Override + public Decompressor newDecompressor() { + return new QatDecompressor(algorithm, supplier.get()); + } + + public int getCompressionLevel() { + return compressionLevel; + } + + /** The QatCompressor. */ + private static final class QatCompressor extends Compressor { + + private byte[] compressedBuffer; + private final QatZipper qatZipper; + + /** compressor with a given compresion level */ + public QatCompressor(QatZipper.Algorithm algorithm, int compressionLevel, QatZipper.Mode qatMode) { + compressedBuffer = BytesRef.EMPTY_BYTES; + qatZipper = QatZipperFactory.createInstance(algorithm, compressionLevel, qatMode, QatZipper.PollingMode.PERIODICAL); + } + + private void compress(byte[] bytes, int offset, int length, DataOutput out) throws IOException { + assert offset >= 0 : "Offset value must be greater than 0."; + + int blockLength = (length + NUM_SUB_BLOCKS - 1) / NUM_SUB_BLOCKS; + out.writeVInt(blockLength); + + final int end = offset + length; + assert end >= 0 : "Buffer read size must be greater than 0."; + + for (int start = offset; start < end; start += blockLength) { + int l = Math.min(blockLength, end - start); + + if (l == 0) { + out.writeVInt(0); + return; + } + + final int maxCompressedLength = qatZipper.maxCompressedLength(l); + compressedBuffer = ArrayUtil.grow(compressedBuffer, maxCompressedLength); + + int compressedSize = qatZipper.compress(bytes, start, l, compressedBuffer, 0, compressedBuffer.length); + out.writeVInt(compressedSize); + out.writeBytes(compressedBuffer, compressedSize); + } + } + + @Override + public void compress(ByteBuffersDataInput buffersInput, DataOutput out) throws IOException { + final int length = (int) buffersInput.size(); + byte[] bytes = new byte[length]; + buffersInput.readBytes(bytes, 0, length); + compress(bytes, 0, length, out); + } + + @Override + public void close() throws IOException {} + } + + /** QAT_DEFLATE decompressor */ + private static final class QatDecompressor extends Decompressor { + + private byte[] compressed; + private final QatZipper qatZipper; + private final QatZipper.Mode qatMode; + private final QatZipper.Algorithm algorithm; + + /** default decompressor */ + public QatDecompressor(QatZipper.Algorithm algorithm, QatZipper.Mode qatMode) { + this.algorithm = algorithm; + this.qatMode = qatMode; + compressed = BytesRef.EMPTY_BYTES; + qatZipper = QatZipperFactory.createInstance(algorithm, qatMode, QatZipper.PollingMode.PERIODICAL); + } + + /*resuable decompress function*/ + @Override + public void decompress(DataInput in, int originalLength, int offset, int length, BytesRef bytes) throws IOException { + assert offset + length <= originalLength : "Buffer read size must be within limit."; + + if (length == 0) { + bytes.length = 0; + return; + } + + final int blockLength = in.readVInt(); + bytes.offset = bytes.length = 0; + int offsetInBlock = 0; + int offsetInBytesRef = offset; + + // Skip unneeded blocks + while (offsetInBlock + blockLength < offset) { + final int compressedLength = in.readVInt(); + in.skipBytes(compressedLength); + offsetInBlock += blockLength; + offsetInBytesRef -= blockLength; + } + + // Read blocks that intersect with the interval we need + while (offsetInBlock < offset + length) { + final int compressedLength = in.readVInt(); + if (compressedLength == 0) { + return; + } + compressed = ArrayUtil.grow(compressed, compressedLength); + in.readBytes(compressed, 0, compressedLength); + + int l = Math.min(blockLength, originalLength - offsetInBlock); + bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + l); + + final int uncompressed = qatZipper.decompress(compressed, 0, compressedLength, bytes.bytes, bytes.length, l); + + bytes.length += uncompressed; + offsetInBlock += blockLength; + } + + bytes.offset = offsetInBytesRef; + bytes.length = length; + + assert bytes.isValid() : "Decompression output is corrupted."; + } + + @Override + public Decompressor clone() { + return new QatDecompressor(algorithm, qatMode); + } + } +} diff --git a/src/main/java/org/opensearch/index/codec/customcodecs/QatDeflate99Codec.java b/src/main/java/org/opensearch/index/codec/customcodecs/QatDeflate99Codec.java new file mode 100644 index 0000000..b24ead0 --- /dev/null +++ b/src/main/java/org/opensearch/index/codec/customcodecs/QatDeflate99Codec.java @@ -0,0 +1,91 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.apache.logging.log4j.Logger; +import org.opensearch.common.settings.Setting; +import org.opensearch.index.codec.CodecAliases; +import org.opensearch.index.codec.CodecSettings; +import org.opensearch.index.engine.EngineConfig; +import org.opensearch.index.mapper.MapperService; + +import java.util.Set; +import java.util.function.Supplier; + +import com.intel.qat.QatZipper; + +/** + * QatDeflate99Codec provides a DEFLATE compressor using the qat-java library. + */ +public class QatDeflate99Codec extends Lucene99QatCodec implements CodecSettings, CodecAliases { + + /** Creates a new QatDeflate99Codec instance with the default compression level. */ + public QatDeflate99Codec() { + this(DEFAULT_COMPRESSION_LEVEL); + } + + /** + * Creates a new QatDeflate99Codec instance. + * + * @param compressionLevel The compression level. + */ + public QatDeflate99Codec(int compressionLevel) { + super(Mode.QAT_DEFLATE, compressionLevel); + } + + /** + * Creates a new QatDeflate99Codec instance with the default compression level. + * + * @param compressionLevel The compression level. + * @param supplier supplier for QAT acceleration mode. + */ + public QatDeflate99Codec(int compressionLevel, Supplier supplier) { + super(Mode.QAT_DEFLATE, compressionLevel, supplier); + } + + /** + * Creates a new QatDeflate99Codec instance. + * + * @param mapperService The mapper service. + * @param logger The logger. + * @param compressionLevel The compression level. + */ + public QatDeflate99Codec(MapperService mapperService, Logger logger, int compressionLevel) { + super(Mode.QAT_DEFLATE, compressionLevel, mapperService, logger); + } + + /** + * Creates a new QatDeflate99Codec instance. + * + * @param mapperService The mapper service. + * @param logger The logger. + * @param compressionLevel The compression level. + * @param supplier supplier for QAT acceleration mode. + */ + public QatDeflate99Codec(MapperService mapperService, Logger logger, int compressionLevel, Supplier supplier) { + super(Mode.QAT_DEFLATE, compressionLevel, mapperService, logger, supplier); + } + + /** The name for this codec. */ + @Override + public String toString() { + return getClass().getSimpleName(); + } + + @Override + public boolean supports(Setting setting) { + return setting.equals(EngineConfig.INDEX_CODEC_COMPRESSION_LEVEL_SETTING); + } + + @Override + public Set aliases() { + return Mode.QAT_DEFLATE.getAliases(); + } +} diff --git a/src/main/java/org/opensearch/index/codec/customcodecs/QatLz499Codec.java b/src/main/java/org/opensearch/index/codec/customcodecs/QatLz499Codec.java new file mode 100644 index 0000000..9e242b0 --- /dev/null +++ b/src/main/java/org/opensearch/index/codec/customcodecs/QatLz499Codec.java @@ -0,0 +1,91 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.apache.logging.log4j.Logger; +import org.opensearch.common.settings.Setting; +import org.opensearch.index.codec.CodecAliases; +import org.opensearch.index.codec.CodecSettings; +import org.opensearch.index.engine.EngineConfig; +import org.opensearch.index.mapper.MapperService; + +import java.util.Set; +import java.util.function.Supplier; + +import com.intel.qat.QatZipper; + +/** + * QatLz499Codec provides an LZ4 compressor using the qat-java library. + */ +public class QatLz499Codec extends Lucene99QatCodec implements CodecSettings, CodecAliases { + + /** Creates a new QatLz499Codec instance with the default compression level. */ + public QatLz499Codec() { + this(DEFAULT_COMPRESSION_LEVEL); + } + + /** + * Creates a new QatLz499Codec instance. + * + * @param compressionLevel The compression level. + */ + public QatLz499Codec(int compressionLevel) { + super(Mode.QAT_LZ4, compressionLevel); + } + + /** + * Creates a new QatLz499Codec instance with the default compression level. + * + * @param compressionLevel The compression level. + * @param supplier supplier for QAT acceleration mode. + */ + public QatLz499Codec(int compressionLevel, Supplier supplier) { + super(Mode.QAT_LZ4, compressionLevel, supplier); + } + + /** + * Creates a new QatLz499Codec instance. + * + * @param mapperService The mapper service. + * @param logger The logger. + * @param compressionLevel The compression level. + */ + public QatLz499Codec(MapperService mapperService, Logger logger, int compressionLevel) { + super(Mode.QAT_LZ4, compressionLevel, mapperService, logger); + } + + /** + * Creates a new QatLz499Codec instance. + * + * @param mapperService The mapper service. + * @param logger The logger. + * @param compressionLevel The compression level. + * @param supplier supplier for QAT acceleration mode. + */ + public QatLz499Codec(MapperService mapperService, Logger logger, int compressionLevel, Supplier supplier) { + super(Mode.QAT_LZ4, compressionLevel, mapperService, logger, supplier); + } + + /** The name for this codec. */ + @Override + public String toString() { + return getClass().getSimpleName(); + } + + @Override + public boolean supports(Setting setting) { + return setting.equals(EngineConfig.INDEX_CODEC_COMPRESSION_LEVEL_SETTING); + } + + @Override + public Set aliases() { + return Mode.QAT_LZ4.getAliases(); + } +} diff --git a/src/main/java/org/opensearch/index/codec/customcodecs/QatZipperFactory.java b/src/main/java/org/opensearch/index/codec/customcodecs/QatZipperFactory.java new file mode 100644 index 0000000..1b55b5e --- /dev/null +++ b/src/main/java/org/opensearch/index/codec/customcodecs/QatZipperFactory.java @@ -0,0 +1,183 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import com.intel.qat.QatZipper; + +import static com.intel.qat.QatZipper.Algorithm; +import static com.intel.qat.QatZipper.DEFAULT_COMPRESS_LEVEL; +import static com.intel.qat.QatZipper.DEFAULT_MODE; +import static com.intel.qat.QatZipper.DEFAULT_POLLING_MODE; +import static com.intel.qat.QatZipper.DEFAULT_RETRY_COUNT; +import static com.intel.qat.QatZipper.Mode; +import static com.intel.qat.QatZipper.PollingMode; + +/** A factory class to create instances of QatZipper */ +public class QatZipperFactory { + + /** + * Creates a new QatZipper with the specified parameters. + * + * @param algorithm the compression algorithm + * @param level the compression level. + * @param mode the mode of QAT execution + * @param retryCount the number of attempts to acquire hardware resources + * @param pmode polling mode. + */ + public static QatZipper createInstance(Algorithm algorithm, int level, Mode mode, int retryCount, PollingMode pmode) { + return new QatZipper(algorithm, level, mode, retryCount, pmode); + } + + /** + * Creates a new QatZipper that uses the DEFLATE algorithm and the default compression level, + * mode, retry count, and polling mode. + */ + public static QatZipper createInstance() { + return createInstance(Algorithm.DEFLATE, DEFAULT_COMPRESS_LEVEL, DEFAULT_MODE, DEFAULT_RETRY_COUNT, DEFAULT_POLLING_MODE); + } + + /** + * Creates a new QatZipper with the specified compression algorithm. Uses the default compression + * level, mode, retry count, and polling mode. + * + * @param algorithm the compression algorithm + */ + public static QatZipper createInstance(Algorithm algorithm) { + return createInstance(algorithm, DEFAULT_COMPRESS_LEVEL, DEFAULT_MODE, DEFAULT_RETRY_COUNT, DEFAULT_POLLING_MODE); + } + + /** + * Creates a new QatZipper with the specified execution mode. Uses the DEFLATE algorithm with the + * default compression level, retry count, and polling mode. + * + * @param mode the mode of QAT execution + */ + public static QatZipper createInstance(Mode mode) { + return createInstance(Algorithm.DEFLATE, DEFAULT_COMPRESS_LEVEL, mode, DEFAULT_RETRY_COUNT, DEFAULT_POLLING_MODE); + } + + /** + * Creates a new QatZipper with the specified polling polling mode. Uses the DEFLATE algorithm + * with the default compression level, mode, and retry count. + * + * @param pmode the polling mode. + */ + public static QatZipper createInstance(PollingMode pmode) { + return createInstance(Algorithm.DEFLATE, DEFAULT_COMPRESS_LEVEL, DEFAULT_MODE, DEFAULT_RETRY_COUNT, pmode); + } + + /** + * Creates a new QatZipper with the specified algorithm and compression level. Uses the default + * mode, retry count, and polling mode. + * + * @param algorithm the compression algorithm (deflate or LZ4). + * @param level the compression level. + */ + public static QatZipper createInstance(Algorithm algorithm, int level) { + return createInstance(algorithm, level, DEFAULT_MODE, DEFAULT_RETRY_COUNT, DEFAULT_POLLING_MODE); + } + + /** + * Creates a new QatZipper with the specified algorithm and mode of execution. Uses the default + * compression level, retry count, and polling mode. + * + * @param algorithm the compression algorithm + * @param mode the mode of QAT execution + */ + public static QatZipper createInstance(Algorithm algorithm, Mode mode) { + return createInstance(algorithm, DEFAULT_COMPRESS_LEVEL, mode, DEFAULT_RETRY_COUNT, DEFAULT_POLLING_MODE); + } + + /** + * Creates a new QatZipper with the specified algorithm and polling mode of execution. Uses the + * default compression level, mode, and retry count. + * + * @param algorithm the compression algorithm + * @param pmode the polling mode. + */ + public static QatZipper createInstance(Algorithm algorithm, PollingMode pmode) { + return createInstance(algorithm, DEFAULT_COMPRESS_LEVEL, DEFAULT_MODE, DEFAULT_RETRY_COUNT, pmode); + } + + /** + * Creates a new QatZipper with the specified algorithm and mode of execution. Uses compression + * level and retry count. + * + * @param algorithm the compression algorithm + * @param mode the mode of QAT execution + * @param pmode the polling mode. + */ + public static QatZipper createInstance(Algorithm algorithm, Mode mode, PollingMode pmode) { + return createInstance(algorithm, DEFAULT_COMPRESS_LEVEL, mode, DEFAULT_RETRY_COUNT, pmode); + } + + /** + * Creates a new QatZipper with the specified algorithm, compression level, and mode . Uses the + * default retry count and polling mode. + * + * @param algorithm the compression algorithm (deflate or LZ4). + * @param level the compression level. + * @param mode the mode of operation (HARDWARE - only hardware, AUTO - hardware with a software + * failover.) + */ + public static QatZipper createInstance(Algorithm algorithm, int level, Mode mode) { + return createInstance(algorithm, level, mode, DEFAULT_RETRY_COUNT, DEFAULT_POLLING_MODE); + } + + /** + * Creates a new QatZipper with the specified algorithm, compression level, and polling mode . + * Uses the default mode and retry count. + * + * @param algorithm the compression algorithm (deflate or LZ4). + * @param level the compression level. + * @param pmode the polling mode. + */ + public static QatZipper createInstance(Algorithm algorithm, int level, PollingMode pmode) { + return createInstance(algorithm, level, DEFAULT_MODE, DEFAULT_RETRY_COUNT, pmode); + } + + /** + * Creates a new QatZipper with the specified parameters and polling mode. + * + * @param algorithm the compression algorithm + * @param level the compression level. + * @param mode the mode of QAT execution + * @param retryCount the number of attempts to acquire hardware resources + */ + public static QatZipper createInstance(Algorithm algorithm, int level, Mode mode, int retryCount) { + return createInstance(algorithm, level, mode, retryCount, DEFAULT_POLLING_MODE); + } + + /** + * Creates a new QatZipper with the specified parameters and retry count. + * + * @param algorithm the compression algorithm + * @param level the compression level. + * @param mode the mode of QAT execution + * @param pmode the polling mode. + */ + public static QatZipper createInstance(Algorithm algorithm, int level, Mode mode, PollingMode pmode) { + return createInstance(algorithm, level, mode, DEFAULT_RETRY_COUNT, pmode); + } + + /** + * Checks if QAT hardware is available. + * + * @return true if QAT hardware is available, false otherwise. + */ + public static boolean isQatAvailable() { + try { + QatZipper qzip = QatZipperFactory.createInstance(); + qzip.end(); + return true; + } catch (UnsatisfiedLinkError | ExceptionInInitializerError | NoClassDefFoundError e) { + return false; + } + } +} diff --git a/src/main/java/org/opensearch/index/codec/customcodecs/Zstd99Codec.java b/src/main/java/org/opensearch/index/codec/customcodecs/Zstd99Codec.java index df86806..29b5f4b 100644 --- a/src/main/java/org/opensearch/index/codec/customcodecs/Zstd99Codec.java +++ b/src/main/java/org/opensearch/index/codec/customcodecs/Zstd99Codec.java @@ -18,13 +18,12 @@ import java.util.Set; /** - * ZstdCodec provides ZSTD compressor using the zstd-jni library. + * ZstdCodec provides ZSTD compressor using the zstd-jni library. */ public class Zstd99Codec extends Lucene99CustomCodec implements CodecSettings, CodecAliases { - /** - * Creates a new ZstdCodec instance with the default compression level. - */ + /** Creates a new ZstdCodec instance with the default compression level. */ public Zstd99Codec() { this(DEFAULT_COMPRESSION_LEVEL); } diff --git a/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCompressionMode.java b/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCompressionMode.java index 1314221..67476ed 100644 --- a/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCompressionMode.java +++ b/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCompressionMode.java @@ -30,13 +30,12 @@ public class ZstdCompressionMode extends CompressionMode { private static final int NUM_SUB_BLOCKS = 10; private static final int DICT_SIZE_FACTOR = 6; - private static final int DEFAULT_COMPRESSION_LEVEL = 6; private final int compressionLevel; /** default constructor */ protected ZstdCompressionMode() { - this.compressionLevel = DEFAULT_COMPRESSION_LEVEL; + this.compressionLevel = Lucene99CustomCodec.DEFAULT_COMPRESSION_LEVEL; } /** @@ -48,7 +47,7 @@ protected ZstdCompressionMode(int compressionLevel) { this.compressionLevel = compressionLevel; } - /** Creates a new compressor instance.*/ + /** Creates a new compressor instance. */ @Override public Compressor newCompressor() { return new ZstdCompressor(compressionLevel); diff --git a/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDict99Codec.java b/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDict99Codec.java index 1d65aee..76c3ce6 100644 --- a/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDict99Codec.java +++ b/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDict99Codec.java @@ -17,14 +17,10 @@ import java.util.Set; -/** - * ZstdNoDictCodec provides ZSTD compressor without a dictionary support. - */ +/** ZstdNoDictCodec provides ZSTD compressor without a dictionary support. */ public class ZstdNoDict99Codec extends Lucene99CustomCodec implements CodecSettings, CodecAliases { - /** - * Creates a new ZstdNoDictCodec instance with the default compression level. - */ + /** Creates a new ZstdNoDictCodec instance with the default compression level. */ public ZstdNoDict99Codec() { this(DEFAULT_COMPRESSION_LEVEL); } diff --git a/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressionMode.java b/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressionMode.java index eabf4c7..2a44581 100644 --- a/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressionMode.java +++ b/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressionMode.java @@ -25,13 +25,12 @@ public class ZstdNoDictCompressionMode extends CompressionMode { private static final int NUM_SUB_BLOCKS = 10; - private static final int DEFAULT_COMPRESSION_LEVEL = 6; private final int compressionLevel; /** default constructor */ protected ZstdNoDictCompressionMode() { - this.compressionLevel = DEFAULT_COMPRESSION_LEVEL; + this.compressionLevel = Lucene99CustomCodec.DEFAULT_COMPRESSION_LEVEL; } /** @@ -43,7 +42,7 @@ protected ZstdNoDictCompressionMode(int compressionLevel) { this.compressionLevel = compressionLevel; } - /** Creates a new compressor instance.*/ + /** Creates a new compressor instance. */ @Override public Compressor newCompressor() { return new ZstdCompressor(compressionLevel); @@ -155,7 +154,7 @@ public void decompress(DataInput in, int originalLength, int offset, int length, compressed = ArrayUtil.growNoCopy(compressed, compressedLength); in.readBytes(compressed, 0, compressedLength); - int l = Math.min(blockLength, originalLength - offsetInBlock); + final int l = Math.min(blockLength, originalLength - offsetInBlock); bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + l); final int uncompressed = (int) Zstd.decompressByteArray(bytes.bytes, bytes.length, l, compressed, 0, compressedLength); diff --git a/src/main/java/org/opensearch/index/codec/customcodecs/backward_codecs/Lucene95CustomStoredFieldsFormat.java b/src/main/java/org/opensearch/index/codec/customcodecs/backward_codecs/Lucene95CustomStoredFieldsFormat.java index 6f51d52..aee7aff 100644 --- a/src/main/java/org/opensearch/index/codec/customcodecs/backward_codecs/Lucene95CustomStoredFieldsFormat.java +++ b/src/main/java/org/opensearch/index/codec/customcodecs/backward_codecs/Lucene95CustomStoredFieldsFormat.java @@ -70,6 +70,7 @@ public Lucene95CustomStoredFieldsFormat(Lucene95CustomCodec.Mode mode, int compr /** * Returns a {@link StoredFieldsReader} to load stored fields. + * * @param directory The index directory. * @param si The SegmentInfo that stores segment information. * @param fn The fieldInfos. @@ -87,6 +88,7 @@ public StoredFieldsReader fieldsReader(Directory directory, SegmentInfo si, Fiel /** * Returns a {@link StoredFieldsReader} to write stored fields. + * * @param directory The index directory. * @param si The SegmentInfo that stores segment information. * @param context The IOContext that holds additional details on the merge/search context. @@ -122,7 +124,7 @@ StoredFieldsFormat impl(Lucene95CustomCodec.Mode mode) { ZSTD_BLOCK_SHIFT ); default: - throw new AssertionError(); + throw new IllegalStateException("Unsupported compression mode: " + mode); } } @@ -130,9 +132,7 @@ public Lucene95CustomCodec.Mode getMode() { return mode; } - /** - * Returns the compression level. - */ + /** Returns the compression level. */ public int getCompressionLevel() { return compressionLevel; } @@ -140,5 +140,4 @@ public int getCompressionLevel() { public CompressionMode getCompressionMode() { return mode == Lucene95CustomCodec.Mode.ZSTD_NO_DICT ? zstdNoDictCompressionMode : zstdCompressionMode; } - } diff --git a/src/main/java/org/opensearch/index/codec/customcodecs/package-info.java b/src/main/java/org/opensearch/index/codec/customcodecs/package-info.java index e996873..2b197c2 100644 --- a/src/main/java/org/opensearch/index/codec/customcodecs/package-info.java +++ b/src/main/java/org/opensearch/index/codec/customcodecs/package-info.java @@ -6,7 +6,5 @@ * compatible open source license. */ -/** - * A plugin that implements compression codecs with native implementation. - */ +/** A plugin that implements compression codecs with native implementation. */ package org.opensearch.index.codec.customcodecs; diff --git a/src/main/plugin-metadata/plugin-security.policy b/src/main/plugin-metadata/plugin-security.policy index 8161010..615ea9d 100644 --- a/src/main/plugin-metadata/plugin-security.policy +++ b/src/main/plugin-metadata/plugin-security.policy @@ -9,3 +9,8 @@ grant codeBase "${codebase.zstd-jni}" { permission java.lang.RuntimePermission "loadLibrary.*"; }; + +grant codeBase "${codebase.qat-java}" { + permission java.lang.RuntimePermission "loadLibrary.*"; + permission org.opensearch.secure_sm.ThreadPermission "modifyArbitraryThread"; +}; diff --git a/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec b/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec index 5912ede..4e25917 100644 --- a/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec +++ b/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec @@ -2,4 +2,6 @@ org.opensearch.index.codec.customcodecs.backward_codecs.Zstd95Codec org.opensearch.index.codec.customcodecs.backward_codecs.ZstdNoDict95Codec org.opensearch.index.codec.customcodecs.backward_codecs.Zstd95DeprecatedCodec org.opensearch.index.codec.customcodecs.Zstd99Codec -org.opensearch.index.codec.customcodecs.ZstdNoDict99Codec \ No newline at end of file +org.opensearch.index.codec.customcodecs.ZstdNoDict99Codec +org.opensearch.index.codec.customcodecs.QatDeflate99Codec +org.opensearch.index.codec.customcodecs.QatLz499Codec diff --git a/src/test/java/org/opensearch/index/codec/customcodecs/AbstractCompressorTests.java b/src/test/java/org/opensearch/index/codec/customcodecs/AbstractCompressorTests.java index cc794eb..573898c 100644 --- a/src/test/java/org/opensearch/index/codec/customcodecs/AbstractCompressorTests.java +++ b/src/test/java/org/opensearch/index/codec/customcodecs/AbstractCompressorTests.java @@ -193,7 +193,7 @@ private void addBytes(Random r, ByteArrayOutputStream bos) throws IOException { bos.write(bytes); } - private void doTest(byte[] bytes) throws IOException { + protected void doTest(byte[] bytes) throws IOException { final int length = bytes.length; ByteBuffersDataInput in = new ByteBuffersDataInput(List.of(ByteBuffer.wrap(bytes))); @@ -215,5 +215,4 @@ private void doTest(byte[] bytes) throws IOException { assertArrayEquals(bytes, restored); } - } diff --git a/src/test/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormatTests.java b/src/test/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormatTests.java index 8e32b17..4643246 100644 --- a/src/test/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormatTests.java +++ b/src/test/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormatTests.java @@ -57,5 +57,4 @@ public void testZstdNoDictCompressionModes() { ); assertTrue(lucene95CustomStoredFieldsFormat.getCompressionMode() instanceof ZstdNoDictCompressionMode); } - } diff --git a/src/test/java/org/opensearch/index/codec/customcodecs/Lucene99QatStoredFieldsFormatTests.java b/src/test/java/org/opensearch/index/codec/customcodecs/Lucene99QatStoredFieldsFormatTests.java new file mode 100644 index 0000000..de3722b --- /dev/null +++ b/src/test/java/org/opensearch/index/codec/customcodecs/Lucene99QatStoredFieldsFormatTests.java @@ -0,0 +1,63 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.opensearch.test.OpenSearchTestCase; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assume.assumeThat; + +public class Lucene99QatStoredFieldsFormatTests extends OpenSearchTestCase { + + public void testLz4Lucene99QatCodecMode() { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + Lucene99QatStoredFieldsFormat lucene99QatStoredFieldsFormat = new Lucene99QatStoredFieldsFormat(Lucene99QatCodec.Mode.QAT_LZ4); + assertEquals(Lucene99QatCodec.Mode.QAT_LZ4, lucene99QatStoredFieldsFormat.getMode()); + } + + public void testDeflateLucene99QatCodecMode() { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + Lucene99QatStoredFieldsFormat lucene99QatStoredFieldsFormat = new Lucene99QatStoredFieldsFormat(Lucene99QatCodec.Mode.QAT_DEFLATE); + assertEquals(Lucene99QatCodec.Mode.QAT_DEFLATE, lucene99QatStoredFieldsFormat.getMode()); + } + + public void testLz4Lucene99QatCodecModeWithCompressionLevel() { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + int randomCompressionLevel = randomIntBetween(1, 6); + Lucene99QatStoredFieldsFormat lucene99QatStoredFieldsFormat = new Lucene99QatStoredFieldsFormat( + Lucene99QatCodec.Mode.QAT_LZ4, + randomCompressionLevel + ); + assertEquals(Lucene99QatCodec.Mode.QAT_LZ4, lucene99QatStoredFieldsFormat.getMode()); + assertEquals(randomCompressionLevel, lucene99QatStoredFieldsFormat.getCompressionMode().getCompressionLevel()); + } + + public void testDeflateLucene99QatCodecModeWithCompressionLevel() { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + int randomCompressionLevel = randomIntBetween(1, 6); + Lucene99QatStoredFieldsFormat lucene99QatStoredFieldsFormat = new Lucene99QatStoredFieldsFormat( + Lucene99QatCodec.Mode.QAT_DEFLATE, + randomCompressionLevel + ); + assertEquals(Lucene99QatCodec.Mode.QAT_DEFLATE, lucene99QatStoredFieldsFormat.getMode()); + assertEquals(randomCompressionLevel, lucene99QatStoredFieldsFormat.getCompressionMode().getCompressionLevel()); + } + + public void testLz4CompressionModes() { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + Lucene99QatStoredFieldsFormat lucene99QatStoredFieldsFormat = new Lucene99QatStoredFieldsFormat(Lucene99QatCodec.Mode.QAT_LZ4); + assertTrue(lucene99QatStoredFieldsFormat.getCompressionMode() instanceof QatCompressionMode); + } + + public void testDeflateCompressionModes() { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + Lucene99QatStoredFieldsFormat lucene99QatStoredFieldsFormat = new Lucene99QatStoredFieldsFormat(Lucene99QatCodec.Mode.QAT_DEFLATE); + assertTrue(lucene99QatStoredFieldsFormat.getCompressionMode() instanceof QatCompressionMode); + } +} diff --git a/src/test/java/org/opensearch/index/codec/customcodecs/QatCodecTests.java b/src/test/java/org/opensearch/index/codec/customcodecs/QatCodecTests.java new file mode 100644 index 0000000..e4220d3 --- /dev/null +++ b/src/test/java/org/opensearch/index/codec/customcodecs/QatCodecTests.java @@ -0,0 +1,207 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.apache.logging.log4j.LogManager; +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.store.Directory; +import org.apache.lucene.tests.util.LuceneTestCase.SuppressCodecs; +import org.opensearch.common.settings.Settings; +import org.opensearch.env.Environment; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.analysis.IndexAnalyzers; +import org.opensearch.index.codec.CodecService; +import org.opensearch.index.codec.CodecServiceConfig; +import org.opensearch.index.codec.CodecServiceFactory; +import org.opensearch.index.codec.CodecSettings; +import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.similarity.SimilarityService; +import org.opensearch.indices.mapper.MapperRegistry; +import org.opensearch.plugins.MapperPlugin; +import org.opensearch.test.IndexSettingsModule; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; + +import java.io.IOException; +import java.util.Collections; +import java.util.Optional; + +import static org.opensearch.index.engine.EngineConfig.INDEX_CODEC_COMPRESSION_LEVEL_SETTING; +import static org.hamcrest.Matchers.is; +import static org.junit.Assume.assumeThat; + +@SuppressCodecs("*") // we test against default codec so never get a random one here! +public class QatCodecTests extends OpenSearchTestCase { + + private CustomCodecPlugin plugin; + + @Before + public void setup() { + plugin = new CustomCodecPlugin(); + } + + public void testQatLz4() throws Exception { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + Codec codec = createCodecService(false).codec("qat_lz4"); + assertStoredFieldsCompressionEquals(Lucene99QatCodec.Mode.QAT_LZ4, codec); + Lucene99QatStoredFieldsFormat storedFieldsFormat = (Lucene99QatStoredFieldsFormat) codec.storedFieldsFormat(); + assertEquals(Lucene99CustomCodec.DEFAULT_COMPRESSION_LEVEL, storedFieldsFormat.getCompressionMode().getCompressionLevel()); + } + + public void testQatDeflate() throws Exception { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + Codec codec = createCodecService(false).codec("qat_deflate"); + assertStoredFieldsCompressionEquals(Lucene99QatCodec.Mode.QAT_DEFLATE, codec); + Lucene99QatStoredFieldsFormat storedFieldsFormat = (Lucene99QatStoredFieldsFormat) codec.storedFieldsFormat(); + assertEquals(Lucene99CustomCodec.DEFAULT_COMPRESSION_LEVEL, storedFieldsFormat.getCompressionMode().getCompressionLevel()); + } + + public void testQatLz4WithCompressionLevel() throws Exception { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + int randomCompressionLevel = randomIntBetween(1, 6); + Codec codec = createCodecService(randomCompressionLevel, "qat_lz4").codec("qat_lz4"); + assertStoredFieldsCompressionEquals(Lucene99QatCodec.Mode.QAT_LZ4, codec); + Lucene99QatStoredFieldsFormat storedFieldsFormat = (Lucene99QatStoredFieldsFormat) codec.storedFieldsFormat(); + assertEquals(randomCompressionLevel, storedFieldsFormat.getCompressionMode().getCompressionLevel()); + } + + public void testQatDeflateWithCompressionLevel() throws Exception { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + int randomCompressionLevel = randomIntBetween(1, 6); + Codec codec = createCodecService(randomCompressionLevel, "qat_deflate").codec("qat_deflate"); + assertStoredFieldsCompressionEquals(Lucene99QatCodec.Mode.QAT_DEFLATE, codec); + Lucene99QatStoredFieldsFormat storedFieldsFormat = (Lucene99QatStoredFieldsFormat) codec.storedFieldsFormat(); + assertEquals(randomCompressionLevel, storedFieldsFormat.getCompressionMode().getCompressionLevel()); + } + + public void testQatCompressionLevelSupport() throws Exception { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + CodecService codecService = createCodecService(false); + CodecSettings qatDeflateCodec = (CodecSettings) codecService.codec("qat_deflate"); + CodecSettings qatLz4Codec = (CodecSettings) codecService.codec("qat_lz4"); + assertTrue(qatDeflateCodec.supports(INDEX_CODEC_COMPRESSION_LEVEL_SETTING)); + assertTrue(qatLz4Codec.supports(INDEX_CODEC_COMPRESSION_LEVEL_SETTING)); + } + + public void testQatLz4MapperServiceNull() throws Exception { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + Codec codec = createCodecService(true).codec("qat_lz4"); + assertStoredFieldsCompressionEquals(Lucene99QatCodec.Mode.QAT_LZ4, codec); + Lucene99QatStoredFieldsFormat storedFieldsFormat = (Lucene99QatStoredFieldsFormat) codec.storedFieldsFormat(); + assertEquals(Lucene99QatCodec.DEFAULT_COMPRESSION_LEVEL, storedFieldsFormat.getCompressionMode().getCompressionLevel()); + } + + public void testQatDeflateMapperServiceNull() throws Exception { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + Codec codec = createCodecService(true).codec("qat_deflate"); + assertStoredFieldsCompressionEquals(Lucene99QatCodec.Mode.QAT_DEFLATE, codec); + Lucene99QatStoredFieldsFormat storedFieldsFormat = (Lucene99QatStoredFieldsFormat) codec.storedFieldsFormat(); + assertEquals(Lucene99QatCodec.DEFAULT_COMPRESSION_LEVEL, storedFieldsFormat.getCompressionMode().getCompressionLevel()); + } + + private void assertStoredFieldsCompressionEquals(Lucene99QatCodec.Mode expected, Codec actual) throws Exception { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + SegmentReader sr = getSegmentReader(actual); + String v = sr.getSegmentInfo().info.getAttribute(Lucene99QatStoredFieldsFormat.MODE_KEY); + assertNotNull(v); + assertEquals(expected, Lucene99QatCodec.Mode.valueOf(v)); + } + + private CodecService createCodecService(boolean isMapperServiceNull) throws IOException { + Settings nodeSettings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build(); + if (isMapperServiceNull) { + return new CustomCodecService( + null, + IndexSettingsModule.newIndexSettings("_na", nodeSettings, Lucene99QatCodec.INDEX_CODEC_QAT_MODE_SETTING), + LogManager.getLogger("test") + ); + } + return buildCodecService(nodeSettings); + } + + private CodecService createCodecService(int randomCompressionLevel, String codec) throws IOException { + Settings nodeSettings = Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) + .put("index.codec", codec) + .put("index.codec.compression_level", randomCompressionLevel) + .build(); + return buildCodecService(nodeSettings); + } + + private CodecService buildCodecService(Settings nodeSettings) throws IOException { + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( + "_na", + nodeSettings, + Lucene99QatCodec.INDEX_CODEC_QAT_MODE_SETTING + ); + SimilarityService similarityService = new SimilarityService(indexSettings, null, Collections.emptyMap()); + IndexAnalyzers indexAnalyzers = createTestAnalysis(indexSettings, nodeSettings).indexAnalyzers; + MapperRegistry mapperRegistry = new MapperRegistry(Collections.emptyMap(), Collections.emptyMap(), MapperPlugin.NOOP_FIELD_FILTER); + MapperService service = new MapperService( + indexSettings, + indexAnalyzers, + xContentRegistry(), + similarityService, + mapperRegistry, + () -> null, + () -> false, + null + ); + + Optional customCodecServiceFactory = plugin.getCustomCodecServiceFactory(indexSettings); + if (customCodecServiceFactory.isPresent()) { + return customCodecServiceFactory.get().createCodecService(new CodecServiceConfig(indexSettings, service, logger)); + } + return new CustomCodecService(service, indexSettings, LogManager.getLogger("test")); + } + + private SegmentReader getSegmentReader(Codec codec) throws IOException { + Directory dir = newDirectory(); + IndexWriterConfig iwc = newIndexWriterConfig(null); + iwc.setCodec(codec); + IndexWriter iw = new IndexWriter(dir, iwc); + iw.addDocument(new Document()); + iw.commit(); + iw.close(); + DirectoryReader ir = DirectoryReader.open(dir); + SegmentReader sr = (SegmentReader) ir.leaves().get(0).reader(); + ir.close(); + dir.close(); + return sr; + } +} diff --git a/src/test/java/org/opensearch/index/codec/customcodecs/QatDeflateCompressorTests.java b/src/test/java/org/opensearch/index/codec/customcodecs/QatDeflateCompressorTests.java new file mode 100644 index 0000000..1994f69 --- /dev/null +++ b/src/test/java/org/opensearch/index/codec/customcodecs/QatDeflateCompressorTests.java @@ -0,0 +1,86 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.index.codec.customcodecs; + +import org.apache.lucene.codecs.compressing.Compressor; +import org.apache.lucene.codecs.compressing.Decompressor; + +import java.io.IOException; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assume.assumeThat; + +/** Test QAT DEFLATE compression */ +public class QatDeflateCompressorTests extends AbstractCompressorTests { + + @Override + Compressor compressor() { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + return new QatCompressionMode(Lucene99QatCodec.Mode.QAT_DEFLATE).newCompressor(); + } + + @Override + Decompressor decompressor() { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + return new QatCompressionMode(Lucene99QatCodec.Mode.QAT_DEFLATE).newDecompressor(); + } + + @Override + public void testEmpty() throws IOException { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + super.testEmpty(); + } + + @Override + public void testShortLiterals() throws IOException { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + super.testShortLiterals(); + } + + @Override + public void testRandom() throws IOException { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + super.testRandom(); + } + + @Override + public void testLineDocs() throws IOException { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + super.testLineDocs(); + } + + @Override + public void testRepetitionsL() throws IOException { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + super.testRepetitionsL(); + } + + @Override + public void testRepetitionsI() throws IOException { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + super.testRepetitionsI(); + } + + @Override + public void testRepetitionsS() throws IOException { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + super.testRepetitionsS(); + } + + @Override + public void testMixed() throws IOException { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + super.testMixed(); + } + + @Override + protected void doTest(byte[] bytes) throws IOException { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + super.doTest(bytes); + } +} diff --git a/src/test/java/org/opensearch/index/codec/customcodecs/QatLz4CompressorTests.java b/src/test/java/org/opensearch/index/codec/customcodecs/QatLz4CompressorTests.java new file mode 100644 index 0000000..905773f --- /dev/null +++ b/src/test/java/org/opensearch/index/codec/customcodecs/QatLz4CompressorTests.java @@ -0,0 +1,86 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.index.codec.customcodecs; + +import org.apache.lucene.codecs.compressing.Compressor; +import org.apache.lucene.codecs.compressing.Decompressor; + +import java.io.IOException; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assume.assumeThat; + +/** Test QAT LZ4 */ +public class QatLz4CompressorTests extends AbstractCompressorTests { + + @Override + Compressor compressor() { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + return new QatCompressionMode(Lucene99QatCodec.Mode.QAT_LZ4).newCompressor(); + } + + @Override + Decompressor decompressor() { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + return new QatCompressionMode(Lucene99QatCodec.Mode.QAT_LZ4).newDecompressor(); + } + + @Override + public void testEmpty() throws IOException { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + super.testEmpty(); + } + + @Override + public void testShortLiterals() throws IOException { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + super.testShortLiterals(); + } + + @Override + public void testRandom() throws IOException { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + super.testRandom(); + } + + @Override + public void testLineDocs() throws IOException { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + super.testLineDocs(); + } + + @Override + public void testRepetitionsL() throws IOException { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + super.testRepetitionsL(); + } + + @Override + public void testRepetitionsI() throws IOException { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + super.testRepetitionsI(); + } + + @Override + public void testRepetitionsS() throws IOException { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + super.testRepetitionsS(); + } + + @Override + public void testMixed() throws IOException { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + super.testMixed(); + } + + @Override + protected void doTest(byte[] bytes) throws IOException { + assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); + super.doTest(bytes); + } +} diff --git a/src/test/java/org/opensearch/index/codec/customcodecs/ZstdCompressorTests.java b/src/test/java/org/opensearch/index/codec/customcodecs/ZstdCompressorTests.java index 78cf62c..3094884 100644 --- a/src/test/java/org/opensearch/index/codec/customcodecs/ZstdCompressorTests.java +++ b/src/test/java/org/opensearch/index/codec/customcodecs/ZstdCompressorTests.java @@ -10,9 +10,7 @@ import org.apache.lucene.codecs.compressing.Compressor; import org.apache.lucene.codecs.compressing.Decompressor; -/** - * Test ZSTD compression (with dictionary enabled) - */ +/** Test ZSTD compression (with dictionary enabled) */ public class ZstdCompressorTests extends AbstractCompressorTests { private final Compressor compressor = new ZstdCompressionMode().newCompressor(); diff --git a/src/test/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressorTests.java b/src/test/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressorTests.java index 2eda81a..caa272e 100644 --- a/src/test/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressorTests.java +++ b/src/test/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressorTests.java @@ -10,9 +10,7 @@ import org.apache.lucene.codecs.compressing.Compressor; import org.apache.lucene.codecs.compressing.Decompressor; -/** - * Test ZSTD compression (with no dictionary). - */ +/** Test ZSTD compression (with no dictionary). */ public class ZstdNoDictCompressorTests extends AbstractCompressorTests { private final Compressor compressor = new ZstdNoDictCompressionMode().newCompressor();