Skip to content

Commit

Permalink
Refactor Compressors from CompressorFactory to CompressorRegistry for…
Browse files Browse the repository at this point in the history
… extensibility

This commit refactors the CompressorFactory static singleton
class and CompressorType enum to a formal CompressorRegistry and enables
downstream implementations to register their own compression
implementations for use in compressing Blob stores and MediaType data.
This is different from Lucene's Codec compression extension points which
expose different compression implementations for Lucene's Stored Fields.

Signed-off-by: Nicholas Walter Knize <nknize@apache.org>
  • Loading branch information
nknize committed Aug 11, 2023
1 parent 5c283e0 commit 57402b8
Show file tree
Hide file tree
Showing 40 changed files with 452 additions and 243 deletions.
38 changes: 38 additions & 0 deletions libs/compress/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

apply plugin: 'opensearch.build'
apply plugin: 'opensearch.publish'

base {
archivesName = 'opensearch-common'
}

dependencies {
api project(':libs:opensearch-common')
api project(':libs:opensearch-core')

//zstd
api "com.github.luben:zstd-jni:${versions.zstd}"

testImplementation(project(":test:framework")) {
// tests use the locally compiled version of server
exclude group: 'org.opensearch', module: 'opensearch-compress'
}
}

tasks.named('forbiddenApisMain').configure {
// :libs:opensearch-compress does not depend on server
// TODO: Need to decide how we want to handle for forbidden signatures with the changes to server
replaceSignatureFiles 'jdk-signatures'
}

jarHell.enabled = false
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

package org.opensearch.common.compress;
package org.opensearch.compress;

import com.github.luben.zstd.RecyclingBufferPool;
import com.github.luben.zstd.ZstdInputStreamNoFinalizer;
Expand All @@ -19,12 +19,14 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;

/**
* {@link Compressor} implementation based on the ZSTD compression algorithm.
*
* @opensearch.internal
* @opensearch.api - registered name requires BWC support
* @opensearch.experimental - class methods might change
*/
public class ZstdCompressor implements Compressor {
// An arbitrary header that we use to identify compressed streams
Expand All @@ -33,6 +35,13 @@ public class ZstdCompressor implements Compressor {
// a XContent
private static final byte[] HEADER = new byte[] { 'Z', 'S', 'T', 'D', '\0' };

/**
* The name to register the compressor by
*
* @opensearch.api - requires BWC support
*/
public static final String NAME = new String(HEADER, StandardCharsets.UTF_8);

private static final int LEVEL = 3;

private static final int BUFFER_SIZE = 4096;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.compress.spi;

import org.opensearch.compress.ZstdCompressor;
import org.opensearch.core.compress.CompressorRegistry;
import org.opensearch.core.compress.spi.CompressorProvider;

import java.util.List;

/**
* Additional "optional" compressor implementations provided by the opensearch compress library
*
* @opensearch.internal
*/
public class CompressionProvider implements CompressorProvider {

/** Returns the concrete {@link org.opensearch.core.common.compress.Compressor}s provided by the compress library */
@Override
public List<CompressorRegistry.Entry> getCompressors() {
return List.of(new CompressorRegistry.Entry(ZstdCompressor.NAME, new ZstdCompressor()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#

org.opensearch.compress.spi.CompressionProvider
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@
* compatible open source license.
*/

package org.opensearch.common.compress;
package org.opensearch.compress;

import org.opensearch.core.common.compress.Compressor;
import org.opensearch.test.core.compress.AbstractCompressorTestCase;

/**
* Test streaming compression
*/
public class ZstdCompressTests extends AbstractCompressorTests {
public class ZstdCompressTests extends AbstractCompressorTestCase {

private final Compressor compressor = new ZstdCompressor();

@Override
Compressor compressor() {
protected Compressor compressor() {
return compressor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,15 @@
import java.io.OutputStream;

/**
* Compressor interface
* Compressor interface used for compressing {@link org.opensearch.core.xcontent.MediaType} and
* {@code org.opensearch.repositories.blobstore.BlobStoreRepository} implementations.
*
* @opensearch.internal
* This is not to be confused with {@link org.apache.lucene.codecs.compressing.Compressor} which is used
* for codec implementations such as {@code org.opensearch.index.codec.customcodecs.Lucene95CustomCodec}
* for compressing {@link org.apache.lucene.document.StoredField}s
*
* @opensearch.api - intended to be extended
* @opensearch.experimental - however, bwc is not guaranteed at this time
*/
public interface Compressor {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* GitHub history for details.
*/

package org.opensearch.common.compress;
package org.opensearch.core.common.compress;

/**
* Exception indicating that we were expecting something compressed, which
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.core.compress;

import org.opensearch.common.Nullable;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.compress.Compressor;
import org.opensearch.core.common.compress.NotCompressedException;
import org.opensearch.core.common.compress.NotXContentException;
import org.opensearch.core.compress.spi.CompressorProvider;
import org.opensearch.core.xcontent.MediaTypeRegistry;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.stream.Collectors;

/**
* A registry that wraps a static Map singleton which holds a mapping of unique String names (typically the
* compressor header as a string) to registerd {@link Compressor} implementations.
*
* This enables plugins, modules, extensions to register their own compression implementations through SPI
*
* @opensearch.internal
*/
public final class CompressorRegistry {
/** No compression singleton - we still register so users can specify NONE in the API*/
public static final Compressor NONE;

// the backing registry map
private static final Map<String, Compressor> registeredCompressors;

// no instance:
private CompressorRegistry() {}

static {
ArrayList<Entry> compressors = new ArrayList<>();
for (CompressorProvider provider : ServiceLoader.load(CompressorProvider.class, CompressorProvider.class.getClassLoader())) {
compressors.addAll(provider.getCompressors());
}
registeredCompressors = Map.copyOf(compressors.stream().collect(Collectors.toMap(Entry::getName, Entry::getCompressor)));
NONE = registeredCompressors.get(NoneCompressor.NAME);
}

public static class Entry {
/** a unique key name to identify the compressor; this is typically the Compressor's Header as a string */
private String name;
/** the compressor to register */
private Compressor compressor;

public Entry(final String name, final Compressor compressor) {
this.name = name;
this.compressor = compressor;
}

public String getName() {
return this.name;
}

public Compressor getCompressor() {
return compressor;
}
}

/**
* Returns the default compressor
*/
public static Compressor defaultCompressor() {
return registeredCompressors.get("DEFLATE");
}

public static Compressor none() {
return registeredCompressors.get("NONE");
}

public static boolean isCompressed(BytesReference bytes) {
return compressor(bytes) != null;
}

@Nullable
public static Compressor compressor(final BytesReference bytes) {
for (Compressor compressor : registeredCompressors.values()) {
if (compressor.isCompressed(bytes) == true) {
// bytes should be either detected as compressed or as xcontent,
// if we have bytes that can be either detected as compressed or
// as a xcontent, we have a problem
assert MediaTypeRegistry.xContentType(bytes) == null;
return compressor;
}
}

if (MediaTypeRegistry.xContentType(bytes) == null) {
throw new NotXContentException("Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes");
}

return null;
}

/** Decompress the provided {@link BytesReference}. */
public static BytesReference uncompress(BytesReference bytes) throws IOException {
Compressor compressor = compressor(bytes);
if (compressor == null) {
throw new NotCompressedException();
}
return compressor.uncompress(bytes);
}

/**
* Uncompress the provided data, data can be detected as compressed using {@link #isCompressed(BytesReference)}.
*/
public static BytesReference uncompressIfNeeded(BytesReference bytes) throws IOException {
Compressor compressor = compressor(Objects.requireNonNull(bytes, "the BytesReference must not be null"));
return compressor == null ? bytes : compressor.uncompress(bytes);
}

/** Returns a registered compressor by its registered name */
public static Compressor getCompressor(final String name) {
if (registeredCompressors.containsKey(name)) {
return registeredCompressors.get(name);
}
throw new IllegalArgumentException("No registered compressor found by name [" + name + "]");
}

/**
* Returns the registered compressors as an Immutable collection
*
* note: used for testing
*/
public static Map<String, Compressor> registeredCompressors() {
// no destructive danger as backing map is immutable
return registeredCompressors;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

package org.opensearch.common.compress;
package org.opensearch.core.compress;

import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.compress.Compressor;
Expand All @@ -18,9 +18,17 @@
/**
* {@link Compressor} no compressor implementation.
*
* @opensearch.internal
* @opensearch.api - registered name requires BWC support
* @opensearch.experimental - class methods might change
*/
public class NoneCompressor implements Compressor {
/**
* The name to register the compressor by
*
* @opensearch.api - requires BWC support
*/
public static final String NAME = "NONE";

@Override
public boolean isCompressed(BytesReference bytes) {
return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.core.compress.spi;

import org.opensearch.core.common.compress.Compressor;
import org.opensearch.core.compress.CompressorRegistry;

import java.util.List;

/**
* Service Provider Interface for plugins, modules, extensions providing custom
* compression algorithms
*
* see {@link Compressor} for implementing methods
* and {@link org.opensearch.core.compress.CompressorRegistry} for the registration of custom
* Compressors
*
* @opensearch.experimental
* @opensearch.api
*/
public interface CompressorProvider {
/** Extensions that implement their own concrete {@link Compressor}s provide them through this interface method*/
List<CompressorRegistry.Entry> getCompressors();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.core.compress.spi;

import org.opensearch.core.compress.CompressorRegistry;
import org.opensearch.core.compress.NoneCompressor;

import java.util.List;

/**
* Default {@link org.opensearch.core.common.compress.Compressor} implementations provided by the
* opensearch core library
*
* @opensearch.internal
*/
public class DefaultCompressorProvider implements CompressorProvider {
/** Returns the default {@link org.opensearch.core.common.compress.Compressor}s provided by the core library */
@Override
public List<CompressorRegistry.Entry> getCompressors() {
return List.of(new CompressorRegistry.Entry(NoneCompressor.NAME, new NoneCompressor()));
}
}
Loading

0 comments on commit 57402b8

Please sign in to comment.