Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for snappy http content encoding #13529

Merged
merged 10 commits into from Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -35,6 +35,8 @@
import io.netty.handler.codec.compression.Zstd;
import io.netty.handler.codec.compression.ZstdEncoder;
import io.netty.handler.codec.compression.ZstdOptions;
import io.netty.handler.codec.compression.SnappyFrameEncoder;
import io.netty.handler.codec.compression.SnappyOptions;
import io.netty.util.internal.ObjectUtil;

/**
Expand All @@ -51,6 +53,7 @@ public class HttpContentCompressor extends HttpContentEncoder {
private final GzipOptions gzipOptions;
private final DeflateOptions deflateOptions;
private final ZstdOptions zstdOptions;
private final SnappyOptions snappyOptions;

private final int compressionLevel;
private final int windowBits;
Expand Down Expand Up @@ -138,6 +141,7 @@ public HttpContentCompressor(int compressionLevel, int windowBits, int memLevel,
this.gzipOptions = null;
this.deflateOptions = null;
this.zstdOptions = null;
this.snappyOptions = null;
this.factories = null;
this.supportsCompressionOptions = false;
}
Expand Down Expand Up @@ -170,11 +174,13 @@ public HttpContentCompressor(int contentSizeThreshold, CompressionOptions... com
GzipOptions gzipOptions = null;
DeflateOptions deflateOptions = null;
ZstdOptions zstdOptions = null;
SnappyOptions snappyOptions = null;
if (compressionOptions == null || compressionOptions.length == 0) {
brotliOptions = Brotli.isAvailable() ? StandardCompressionOptions.brotli() : null;
gzipOptions = StandardCompressionOptions.gzip();
deflateOptions = StandardCompressionOptions.deflate();
zstdOptions = Zstd.isAvailable() ? StandardCompressionOptions.zstd() : null;
snappyOptions = StandardCompressionOptions.snappy();
} else {
ObjectUtil.deepCheckNotNull("compressionOptions", compressionOptions);
for (CompressionOptions compressionOption : compressionOptions) {
Expand All @@ -192,6 +198,8 @@ public HttpContentCompressor(int contentSizeThreshold, CompressionOptions... com
deflateOptions = (DeflateOptions) compressionOption;
} else if (compressionOption instanceof ZstdOptions) {
zstdOptions = (ZstdOptions) compressionOption;
} else if (compressionOption instanceof SnappyOptions) {
snappyOptions = (SnappyOptions) compressionOption;
} else {
throw new IllegalArgumentException("Unsupported " + CompressionOptions.class.getSimpleName() +
": " + compressionOption);
Expand All @@ -203,6 +211,7 @@ public HttpContentCompressor(int contentSizeThreshold, CompressionOptions... com
this.deflateOptions = deflateOptions;
this.brotliOptions = brotliOptions;
this.zstdOptions = zstdOptions;
this.snappyOptions = snappyOptions;

this.factories = new HashMap<String, CompressionEncoderFactory>();

Expand All @@ -218,6 +227,9 @@ public HttpContentCompressor(int contentSizeThreshold, CompressionOptions... com
if (this.zstdOptions != null) {
this.factories.put("zstd", new ZstdEncoderFactory());
}
if (this.snappyOptions != null) {
this.factories.put("snappy", new SnappyEncoderFactory());
}

this.compressionLevel = -1;
this.windowBits = -1;
Expand Down Expand Up @@ -292,6 +304,7 @@ protected String determineEncoding(String acceptEncoding) {
float starQ = -1.0f;
float brQ = -1.0f;
float zstdQ = -1.0f;
float snappyQ = -1.0f;
float gzipQ = -1.0f;
float deflateQ = -1.0f;
for (String encoding : acceptEncoding.split(",")) {
Expand All @@ -311,17 +324,21 @@ protected String determineEncoding(String acceptEncoding) {
brQ = q;
} else if (encoding.contains("zstd") && q > zstdQ) {
zstdQ = q;
} else if (encoding.contains("snappy") && q > snappyQ) {
snappyQ = q;
} else if (encoding.contains("gzip") && q > gzipQ) {
gzipQ = q;
} else if (encoding.contains("deflate") && q > deflateQ) {
deflateQ = q;
}
}
if (brQ > 0.0f || zstdQ > 0.0f || gzipQ > 0.0f || deflateQ > 0.0f) {
if (brQ > 0.0f || zstdQ > 0.0f || snappyQ > 0.0f || gzipQ > 0.0f || deflateQ > 0.0f) {
if (brQ != -1.0f && brQ >= zstdQ && this.brotliOptions != null) {
return "br";
} else if (zstdQ != -1.0f && zstdQ >= gzipQ && this.zstdOptions != null) {
} else if (zstdQ != -1.0f && zstdQ >= snappyQ && this.zstdOptions != null) {
return "zstd";
} else if (snappyQ != -1.0f && snappyQ >= gzipQ && this.snappyOptions != null) {
return "snappy";
} else if (gzipQ != -1.0f && gzipQ >= deflateQ && this.gzipOptions != null) {
return "gzip";
} else if (deflateQ != -1.0f && this.deflateOptions != null) {
Expand All @@ -335,6 +352,9 @@ protected String determineEncoding(String acceptEncoding) {
if (zstdQ == -1.0f && this.zstdOptions != null) {
return "zstd";
}
if (snappyQ == -1.0f && this.snappyOptions != null) {
return "snappy";
}
if (gzipQ == -1.0f && this.gzipOptions != null) {
return "gzip";
}
Expand Down Expand Up @@ -440,4 +460,16 @@ public MessageToByteEncoder<ByteBuf> createEncoder() {
zstdOptions.blockSize(), zstdOptions.maxEncodeSize());
}
}

/**
* Compression Encoder Factory for create {@link SnappyFrameEncoder}
* used to compress http content for snappy content encoding
*/
private final class SnappyEncoderFactory implements CompressionEncoderFactory {
normanmaurer marked this conversation as resolved.
Show resolved Hide resolved

@Override
public MessageToByteEncoder<ByteBuf> createEncoder() {
return new SnappyFrameEncoder();
}
}
}
Expand Up @@ -43,7 +43,8 @@ void testGetBrTargetContentEncoding() {
StandardCompressionOptions.gzip(),
StandardCompressionOptions.deflate(),
StandardCompressionOptions.brotli(),
StandardCompressionOptions.zstd()
StandardCompressionOptions.zstd(),
StandardCompressionOptions.snappy()
);

String[] tests = {
Expand All @@ -70,7 +71,8 @@ void testGetZstdTargetContentEncoding() {
StandardCompressionOptions.gzip(),
StandardCompressionOptions.deflate(),
StandardCompressionOptions.brotli(),
StandardCompressionOptions.zstd()
StandardCompressionOptions.zstd(),
StandardCompressionOptions.snappy()
);

String[] tests = {
Expand All @@ -90,6 +92,33 @@ void testGetZstdTargetContentEncoding() {
}
}

@Test
void testGetSnappyTargetContentEncoding() {
HttpContentCompressor compressor = new HttpContentCompressor(
StandardCompressionOptions.gzip(),
StandardCompressionOptions.deflate(),
StandardCompressionOptions.brotli(),
StandardCompressionOptions.zstd(),
StandardCompressionOptions.snappy()
);

String[] tests = {
// Accept-Encoding -> Content-Encoding
"", null,
"*;q=0.0", null,
"snappy", "snappy",
"compress, snappy;q=0.5", "snappy",
"snappy; q=0.5, identity", "snappy",
"snappy; q=0, deflate", "snappy",
};
for (int i = 0; i < tests.length; i += 2) {
String acceptEncoding = tests[i];
String contentEncoding = tests[i + 1];
String targetEncoding = compressor.determineEncoding(acceptEncoding);
assertEquals(contentEncoding, targetEncoding);
}
}

@Test
void testAcceptEncodingHttpRequest() {
EmbeddedChannel ch = new EmbeddedChannel(new HttpContentCompressor(null));
Expand Down Expand Up @@ -122,7 +151,7 @@ private static void assertEncodedResponse(HttpResponse res) {

private static FullHttpRequest newRequest() {
FullHttpRequest req = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
req.headers().set(HttpHeaderNames.ACCEPT_ENCODING, "br, zstd, gzip, deflate");
req.headers().set(HttpHeaderNames.ACCEPT_ENCODING, "br, zstd, snappy, gzip, deflate");
return req;
}
}
Expand Up @@ -32,6 +32,8 @@
import io.netty.handler.codec.compression.StandardCompressionOptions;
import io.netty.handler.codec.compression.ZstdEncoder;
import io.netty.handler.codec.compression.ZstdOptions;
import io.netty.handler.codec.compression.SnappyFrameEncoder;
import io.netty.handler.codec.compression.SnappyOptions;
import io.netty.util.concurrent.PromiseCombiner;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.UnstableApi;
Expand All @@ -45,6 +47,7 @@
import static io.netty.handler.codec.http.HttpHeaderValues.X_DEFLATE;
import static io.netty.handler.codec.http.HttpHeaderValues.X_GZIP;
import static io.netty.handler.codec.http.HttpHeaderValues.ZSTD;
import static io.netty.handler.codec.http.HttpHeaderValues.SNAPPY;

/**
* A decorating HTTP2 encoder that will compress data frames according to the {@code content-encoding} header for each
Expand All @@ -68,6 +71,7 @@ public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionE
private GzipOptions gzipCompressionOptions;
private DeflateOptions deflateOptions;
private ZstdOptions zstdOptions;
private SnappyOptions snappyOptions;

/**
* Create a new {@link CompressorHttp2ConnectionEncoder} instance
Expand Down Expand Up @@ -137,6 +141,8 @@ public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate,
deflateOptions = (DeflateOptions) compressionOptions;
} else if (compressionOptions instanceof ZstdOptions) {
zstdOptions = (ZstdOptions) compressionOptions;
} else if (compressionOptions instanceof SnappyOptions) {
snappyOptions = (SnappyOptions) compressionOptions;
} else {
throw new IllegalArgumentException("Unsupported " + CompressionOptions.class.getSimpleName() +
": " + compressionOptions);
Expand Down Expand Up @@ -283,6 +289,10 @@ protected EmbeddedChannel newContentCompressor(ChannelHandlerContext ctx, CharSe
ctx.channel().config(), new ZstdEncoder(zstdOptions.compressionLevel(),
zstdOptions.blockSize(), zstdOptions.maxEncodeSize()));
}
if (snappyOptions != null && SNAPPY.contentEqualsIgnoreCase(contentEncoding)) {
return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
ctx.channel().config(), new SnappyFrameEncoder());
}
// 'identity' or unsupported
return null;
}
Expand Down
Expand Up @@ -338,6 +338,54 @@ public void run() throws Http2Exception {
}
}

@Test
public void snappyEncodingSingleEmptyMessage() throws Exception {
final String text = "";
final ByteBuf data = Unpooled.copiedBuffer(text.getBytes());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final ByteBuf data = Unpooled.copiedBuffer(text.getBytes());
final ByteBuf data = Unpooled.copiedBuffer(text.getBytes(CharsetUtil.US_ASCII));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

bootstrapEnv(data.readableBytes());
try {
final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH)
.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.SNAPPY);

runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() throws Http2Exception {
clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
clientEncoder.writeData(ctxClient(), 3, data.retain(), 0, true, newPromiseClient());
clientHandler.flush(ctxClient());
}
});
awaitServer();
assertEquals(text, serverOut.toString(CharsetUtil.UTF_8.name()));
} finally {
data.release();
}
}

@Test
public void snappyEncodingSingleMessage() throws Exception {
final String text = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbccccccccccccccccccccccc";
final ByteBuf data = Unpooled.copiedBuffer(text.getBytes(CharsetUtil.UTF_8.name()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final ByteBuf data = Unpooled.copiedBuffer(text.getBytes(CharsetUtil.UTF_8.name()));
final ByteBuf data = Unpooled.copiedBuffer(text.getBytes(CharsetUtil.US_ASCII)));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

bootstrapEnv(data.readableBytes());
try {
final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH)
.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.SNAPPY);

runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() throws Http2Exception {
clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
clientEncoder.writeData(ctxClient(), 3, data.retain(), 0, true, newPromiseClient());
clientHandler.flush(ctxClient());
}
});
awaitServer();
assertEquals(text, serverOut.toString(CharsetUtil.UTF_8.name()));
} finally {
data.release();
}
}

@Test
public void deflateEncodingWriteLargeMessage() throws Exception {
final int BUFFER_SIZE = 1 << 12;
Expand Down
@@ -0,0 +1,24 @@
/*
* Copyright 2023 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;

/**
* {@link SnappyOptions} holds config for
* Snappy compression.
*/
public class SnappyOptions implements CompressionOptions {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public class SnappyOptions implements CompressionOptions {
public final class SnappyOptions implements CompressionOptions {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// Will add config if Snappy supports this
}
Expand Up @@ -68,6 +68,14 @@ public static ZstdOptions zstd(int compressionLevel, int blockSize, int maxEncod
return new ZstdOptions(compressionLevel, blockSize, maxEncodeSize);
}

/**
* Create a new {@link SnappyOptions}
*
*/
public static SnappyOptions snappy() {
return new SnappyOptions();
}

/**
* Default implementation of {@link GzipOptions} with
* {@code compressionLevel()} set to 6, {@code windowBits()} set to 15 and {@code memLevel()} set to 8.
Expand Down