From 499db7886f89a59e7735ef860f127ba659fea9d2 Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Wed, 25 Oct 2023 12:25:24 -0400 Subject: [PATCH] Use new instance of Decompressor on channel initialization (#3583) ### Description Resolves an issue with decompression that can lead to concurrent gzipped requests failing. This removes the `@Sharable` annotation from the `Netty4ConditionalDecompressor` and creates a new instance of the decompressor on channel initialization. `Netty4ConditionalDecompressor` is an `HttpContentDecompressor` which is a subclass of `HttpContentDecoder` - a stateful handler. Netty docs on `@Sharable` annotation: https://netty.io/4.0/api/io/netty/channel/ChannelHandler.Sharable.html * Category (Enhancement, New feature, Bug fix, Test fix, Refactoring, Maintenance, Documentation) Bug fix ### Issues Resolved - https://github.com/opensearch-project/OpenSearch/issues/10802 ### Testing Tested by running OpenSearch w fluentbit and Merge_Log on. See files below which can reproduce the issue from the linked error. I opened this PR as draft pending an integration test to validate the behavior. `docker-compose.yml` ``` version: '3' services: opensearch: # This is also the hostname of the container within the Docker network (i.e. https://opensearch-node1/) image: opensearchproject/opensearch:latest # Specifying the latest available image - modify if you want a specific version container_name: opensearch environment: - cluster.name=opensearch-cluster # Name the cluster - node.name=opensearch # Name the node that will run in this container - discovery.type=single-node - bootstrap.memory_lock=true # Disable JVM heap memory swapping - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" # Set min and max JVM heap sizes to at least 50% of system RAM ulimits: memlock: soft: -1 # Set memlock to unlimited (no soft or hard limit) hard: -1 nofile: soft: 65536 # Maximum number of open files for the opensearch user - set to at least 65536 hard: 65536 volumes: - opensearch-data1:/usr/share/opensearch/data # Creates volume called opensearch-data1 and mounts it to the container # - /Users/craigperkins/Projects/OpenSearch/security/build/distributions/opensearch-security-2.11.0.0-SNAPSHOT.jar:/usr/share/opensearch/plugins/opensearch-security/opensearch-security-2.11.0.0.jar ports: - 9200:9200 # REST API - 9600:9600 # Performance Analyzer networks: - opensearch-net # All of the containers will join the same Docker bridge network fluent-bit: image: fluent/fluent-bit volumes: - ./fluent-bit.conf:/fluent-bit/etc/fluent-bit.conf depends_on: - opensearch networks: - opensearch-net volumes: opensearch-data1: opensearch-data2: networks: opensearch-net: ``` `fluent-bit.conf` ``` [INPUT] Name dummy Dummy {"top": {".dotted": "value"}} [OUTPUT] Name es Host opensearch Port 9200 HTTP_User admin HTTP_Passwd admin Replace_Dots On Suppress_Type_Name On Compress gzip tls On tls.verify Off net.keepalive Off [FILTER] Name kubernetes Match kube.* Buffer_Size 256KB Merge_Log On Keep_Log On ``` ### Check List - [ ] New functionality includes testing - [ ] New functionality has been documented - [x] Commits are signed per the DCO using --signoff By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. For more information on following Developer Certificate of Origin and signing off your commits, please check [here](https://github.com/opensearch-project/OpenSearch/blob/main/CONTRIBUTING.md#developer-certificate-of-origin). --------- Signed-off-by: Craig Perkins Signed-off-by: Craig Perkins Signed-off-by: Peter Nied Co-authored-by: Peter Nied --- .../security/ResourceFocusedTests.java | 10 ++ .../security/rest/CompressionTests.java | 155 ++++++++++++++++++ .../SecurityNonSslHttpServerTransport.java | 4 +- .../netty/Netty4ConditionalDecompressor.java | 2 - .../SecuritySSLNettyHttpServerTransport.java | 4 +- 5 files changed, 167 insertions(+), 8 deletions(-) create mode 100644 src/integrationTest/java/org/opensearch/security/rest/CompressionTests.java diff --git a/src/integrationTest/java/org/opensearch/security/ResourceFocusedTests.java b/src/integrationTest/java/org/opensearch/security/ResourceFocusedTests.java index 3899dfa111..a25423471f 100644 --- a/src/integrationTest/java/org/opensearch/security/ResourceFocusedTests.java +++ b/src/integrationTest/java/org/opensearch/security/ResourceFocusedTests.java @@ -1,3 +1,13 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + package org.opensearch.security; import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; diff --git a/src/integrationTest/java/org/opensearch/security/rest/CompressionTests.java b/src/integrationTest/java/org/opensearch/security/rest/CompressionTests.java new file mode 100644 index 0000000000..cf07f93ad8 --- /dev/null +++ b/src/integrationTest/java/org/opensearch/security/rest/CompressionTests.java @@ -0,0 +1,155 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.security.rest; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; +import org.apache.hc.client5.http.classic.methods.HttpPost; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.http.io.entity.ByteArrayEntity; +import org.apache.hc.core5.http.message.BasicHeader; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.anyOf; +import static org.hamcrest.MatcherAssert.assertThat; +import org.opensearch.test.framework.TestSecurityConfig; +import org.opensearch.test.framework.cluster.ClusterManager; +import org.opensearch.test.framework.cluster.LocalCluster; +import org.opensearch.test.framework.cluster.TestRestClient; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.zip.GZIPOutputStream; +import org.opensearch.test.framework.cluster.TestRestClient.HttpResponse; + +import static org.opensearch.test.framework.TestSecurityConfig.AuthcDomain.AUTHC_HTTPBASIC_INTERNAL; +import static org.opensearch.test.framework.TestSecurityConfig.Role.ALL_ACCESS; +import static org.opensearch.test.framework.cluster.TestRestClientConfiguration.getBasicAuthHeader; + +@RunWith(com.carrotsearch.randomizedtesting.RandomizedRunner.class) +@ThreadLeakScope(ThreadLeakScope.Scope.NONE) +public class CompressionTests { + private static final TestSecurityConfig.User ADMIN_USER = new TestSecurityConfig.User("admin").roles(ALL_ACCESS); + + @ClassRule + public static LocalCluster cluster = new LocalCluster.Builder().clusterManager(ClusterManager.THREE_CLUSTER_MANAGERS) + .authc(AUTHC_HTTPBASIC_INTERNAL) + .users(ADMIN_USER) + .anonymousAuth(false) + .build(); + + @Test + public void testAuthenticatedGzippedRequests() throws Exception { + final String requestPath = "/*/_search"; + final int parallelism = 10; + final int totalNumberOfRequests = 100; + + final String rawBody = "{ \"query\": { \"match\": { \"foo\": \"bar\" }}}"; + + final byte[] compressedRequestBody = createCompressedRequestBody(rawBody); + try (final TestRestClient client = cluster.getRestClient(ADMIN_USER, new BasicHeader("Content-Encoding", "gzip"))) { + + final ForkJoinPool forkJoinPool = new ForkJoinPool(parallelism); + + final List> waitingOn = IntStream.rangeClosed(1, totalNumberOfRequests) + .boxed() + .map(i -> CompletableFuture.supplyAsync(() -> { + final HttpPost post = new HttpPost(client.getHttpServerUri() + requestPath); + post.setEntity(new ByteArrayEntity(compressedRequestBody, ContentType.APPLICATION_JSON)); + return client.executeRequest(post); + }, forkJoinPool)) + .collect(Collectors.toList()); + + final CompletableFuture allOfThem = CompletableFuture.allOf(waitingOn.toArray(new CompletableFuture[0])); + + allOfThem.get(30, TimeUnit.SECONDS); + + waitingOn.stream().forEach(future -> { + try { + final HttpResponse response = future.get(); + response.assertStatusCode(HttpStatus.SC_OK); + } catch (final Exception ex) { + throw new RuntimeException(ex); + } + }); + ; + } + } + + @Test + public void testMixOfAuthenticatedAndUnauthenticatedGzippedRequests() throws Exception { + final String requestPath = "/*/_search"; + final int parallelism = 10; + final int totalNumberOfRequests = 100; + + final String rawBody = "{ \"query\": { \"match\": { \"foo\": \"bar\" }}}"; + + final byte[] compressedRequestBody = createCompressedRequestBody(rawBody); + try (final TestRestClient client = cluster.getRestClient(new BasicHeader("Content-Encoding", "gzip"))) { + + final ForkJoinPool forkJoinPool = new ForkJoinPool(parallelism); + + final Header basicAuthHeader = getBasicAuthHeader(ADMIN_USER.getName(), ADMIN_USER.getPassword()); + + final List> waitingOn = IntStream.rangeClosed(1, totalNumberOfRequests) + .boxed() + .map(i -> CompletableFuture.supplyAsync(() -> { + final HttpPost post = new HttpPost(client.getHttpServerUri() + requestPath); + post.setEntity(new ByteArrayEntity(compressedRequestBody, ContentType.APPLICATION_JSON)); + return i % 2 == 0 ? client.executeRequest(post) : client.executeRequest(post, basicAuthHeader); + }, forkJoinPool)) + .collect(Collectors.toList()); + + final CompletableFuture allOfThem = CompletableFuture.allOf(waitingOn.toArray(new CompletableFuture[0])); + + allOfThem.get(30, TimeUnit.SECONDS); + + waitingOn.stream().forEach(future -> { + try { + final HttpResponse response = future.get(); + assertThat(response.getBody(), not(containsString("json_parse_exception"))); + assertThat(response.getStatusCode(), anyOf(equalTo(HttpStatus.SC_UNAUTHORIZED), equalTo(HttpStatus.SC_OK))); + } catch (final Exception ex) { + throw new RuntimeException(ex); + } + }); + ; + } + } + + static byte[] createCompressedRequestBody(final String rawBody) { + try ( + final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + final GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream) + ) { + gzipOutputStream.write(rawBody.getBytes(StandardCharsets.UTF_8)); + gzipOutputStream.finish(); + + final byte[] compressedRequestBody = byteArrayOutputStream.toByteArray(); + return compressedRequestBody; + } catch (final IOException ioe) { + throw new RuntimeException(ioe); + } + } +} diff --git a/src/main/java/org/opensearch/security/http/SecurityNonSslHttpServerTransport.java b/src/main/java/org/opensearch/security/http/SecurityNonSslHttpServerTransport.java index 71586a2dff..cca1df9b46 100644 --- a/src/main/java/org/opensearch/security/http/SecurityNonSslHttpServerTransport.java +++ b/src/main/java/org/opensearch/security/http/SecurityNonSslHttpServerTransport.java @@ -47,7 +47,6 @@ public class SecurityNonSslHttpServerTransport extends Netty4HttpServerTransport { private final ChannelInboundHandlerAdapter headerVerifier; - private final ChannelInboundHandlerAdapter conditionalDecompressor; public SecurityNonSslHttpServerTransport( final Settings settings, @@ -73,7 +72,6 @@ public SecurityNonSslHttpServerTransport( tracer ); headerVerifier = new Netty4HttpRequestHeaderVerifier(restFilter, threadPool, settings); - conditionalDecompressor = new Netty4ConditionalDecompressor(); } @Override @@ -100,6 +98,6 @@ protected ChannelInboundHandlerAdapter createHeaderVerifier() { @Override protected ChannelInboundHandlerAdapter createDecompressor() { - return conditionalDecompressor; + return new Netty4ConditionalDecompressor(); } } diff --git a/src/main/java/org/opensearch/security/ssl/http/netty/Netty4ConditionalDecompressor.java b/src/main/java/org/opensearch/security/ssl/http/netty/Netty4ConditionalDecompressor.java index c8059fad5d..1eec49add0 100644 --- a/src/main/java/org/opensearch/security/ssl/http/netty/Netty4ConditionalDecompressor.java +++ b/src/main/java/org/opensearch/security/ssl/http/netty/Netty4ConditionalDecompressor.java @@ -8,7 +8,6 @@ package org.opensearch.security.ssl.http.netty; -import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.http.HttpContentDecompressor; @@ -17,7 +16,6 @@ import org.opensearch.security.filter.NettyAttribute; -@Sharable public class Netty4ConditionalDecompressor extends HttpContentDecompressor { @Override diff --git a/src/main/java/org/opensearch/security/ssl/http/netty/SecuritySSLNettyHttpServerTransport.java b/src/main/java/org/opensearch/security/ssl/http/netty/SecuritySSLNettyHttpServerTransport.java index 41e44ce371..eb2acdce49 100644 --- a/src/main/java/org/opensearch/security/ssl/http/netty/SecuritySSLNettyHttpServerTransport.java +++ b/src/main/java/org/opensearch/security/ssl/http/netty/SecuritySSLNettyHttpServerTransport.java @@ -49,7 +49,6 @@ public class SecuritySSLNettyHttpServerTransport extends Netty4HttpServerTranspo private final SecurityKeyStore sks; private final SslExceptionHandler errorHandler; private final ChannelInboundHandlerAdapter headerVerifier; - private final ChannelInboundHandlerAdapter conditionalDecompressor; public SecuritySSLNettyHttpServerTransport( final Settings settings, @@ -79,7 +78,6 @@ public SecuritySSLNettyHttpServerTransport( this.sks = sks; this.errorHandler = errorHandler; headerVerifier = new Netty4HttpRequestHeaderVerifier(restFilter, threadPool, settings); - conditionalDecompressor = new Netty4ConditionalDecompressor(); } @Override @@ -164,6 +162,6 @@ protected ChannelInboundHandlerAdapter createHeaderVerifier() { @Override protected ChannelInboundHandlerAdapter createDecompressor() { - return conditionalDecompressor; + return new Netty4ConditionalDecompressor(); } }