Skip to content

Commit

Permalink
Use new instance of Decompressor on channel initialization (#3583)
Browse files Browse the repository at this point in the history
### Description

Resolves an issue with decompression that can lead to concurrent gzipped
requests failing. This removes the `@Sharable` annotation from the
`Netty4ConditionalDecompressor` and creates a new instance of the
decompressor on channel initialization.

`Netty4ConditionalDecompressor` is an `HttpContentDecompressor` which is
a subclass of `HttpContentDecoder` - a stateful handler. Netty docs on
`@Sharable` annotation:
https://netty.io/4.0/api/io/netty/channel/ChannelHandler.Sharable.html

* Category (Enhancement, New feature, Bug fix, Test fix, Refactoring,
Maintenance, Documentation)

Bug fix

### Issues Resolved

- opensearch-project/OpenSearch#10802

### Testing

Tested by running OpenSearch w fluentbit and Merge_Log on. See files
below which can reproduce the issue from the linked error.

I opened this PR as draft pending an integration test to validate the
behavior.

`docker-compose.yml`

```
version: '3'
services:
  opensearch: # This is also the hostname of the container within the Docker network (i.e. https://opensearch-node1/)
    image: opensearchproject/opensearch:latest # Specifying the latest available image - modify if you want a specific version
    container_name: opensearch
    environment:
      - cluster.name=opensearch-cluster # Name the cluster
      - node.name=opensearch # Name the node that will run in this container
      - discovery.type=single-node
      - bootstrap.memory_lock=true # Disable JVM heap memory swapping
      - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" # Set min and max JVM heap sizes to at least 50% of system RAM
    ulimits:
      memlock:
        soft: -1 # Set memlock to unlimited (no soft or hard limit)
        hard: -1
      nofile:
        soft: 65536 # Maximum number of open files for the opensearch user - set to at least 65536
        hard: 65536
    volumes:
      - opensearch-data1:/usr/share/opensearch/data # Creates volume called opensearch-data1 and mounts it to the container
      # - /Users/craigperkins/Projects/OpenSearch/security/build/distributions/opensearch-security-2.11.0.0-SNAPSHOT.jar:/usr/share/opensearch/plugins/opensearch-security/opensearch-security-2.11.0.0.jar
    ports:
      - 9200:9200 # REST API
      - 9600:9600 # Performance Analyzer
    networks:
      - opensearch-net # All of the containers will join the same Docker bridge network
  fluent-bit:
    image: fluent/fluent-bit
    volumes:
      - ./fluent-bit.conf:/fluent-bit/etc/fluent-bit.conf
    depends_on:
      - opensearch
    networks:
      - opensearch-net

volumes:
  opensearch-data1:
  opensearch-data2:

networks:
  opensearch-net:
```

`fluent-bit.conf`

```
[INPUT]
  Name dummy
  Dummy {"top": {".dotted": "value"}}

[OUTPUT]
  Name es
  Host opensearch
  Port 9200
  HTTP_User admin
  HTTP_Passwd admin
  Replace_Dots On
  Suppress_Type_Name On
  Compress gzip
  tls On
  tls.verify Off
  net.keepalive Off

[FILTER]
  Name kubernetes
  Match kube.*
  Buffer_Size 256KB
  Merge_Log On
  Keep_Log On
```

### Check List
- [ ] New functionality includes testing
- [ ] New functionality has been documented
- [x] Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and
signing off your commits, please check
[here](https://github.com/opensearch-project/OpenSearch/blob/main/CONTRIBUTING.md#developer-certificate-of-origin).

---------

Signed-off-by: Craig Perkins <craig5008@gmail.com>
Signed-off-by: Craig Perkins <cwperx@amazon.com>
Signed-off-by: Peter Nied <petern@amazon.com>
Co-authored-by: Peter Nied <petern@amazon.com>
  • Loading branch information
cwperks and peternied committed Oct 25, 2023
1 parent 4f89b4a commit 499db78
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.security;

import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.security.rest;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.io.entity.ByteArrayEntity;
import org.apache.hc.core5.http.message.BasicHeader;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;

import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.MatcherAssert.assertThat;
import org.opensearch.test.framework.TestSecurityConfig;
import org.opensearch.test.framework.cluster.ClusterManager;
import org.opensearch.test.framework.cluster.LocalCluster;
import org.opensearch.test.framework.cluster.TestRestClient;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.zip.GZIPOutputStream;
import org.opensearch.test.framework.cluster.TestRestClient.HttpResponse;

import static org.opensearch.test.framework.TestSecurityConfig.AuthcDomain.AUTHC_HTTPBASIC_INTERNAL;
import static org.opensearch.test.framework.TestSecurityConfig.Role.ALL_ACCESS;
import static org.opensearch.test.framework.cluster.TestRestClientConfiguration.getBasicAuthHeader;

@RunWith(com.carrotsearch.randomizedtesting.RandomizedRunner.class)
@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
public class CompressionTests {
private static final TestSecurityConfig.User ADMIN_USER = new TestSecurityConfig.User("admin").roles(ALL_ACCESS);

@ClassRule
public static LocalCluster cluster = new LocalCluster.Builder().clusterManager(ClusterManager.THREE_CLUSTER_MANAGERS)
.authc(AUTHC_HTTPBASIC_INTERNAL)
.users(ADMIN_USER)
.anonymousAuth(false)
.build();

@Test
public void testAuthenticatedGzippedRequests() throws Exception {
final String requestPath = "/*/_search";
final int parallelism = 10;
final int totalNumberOfRequests = 100;

final String rawBody = "{ \"query\": { \"match\": { \"foo\": \"bar\" }}}";

final byte[] compressedRequestBody = createCompressedRequestBody(rawBody);
try (final TestRestClient client = cluster.getRestClient(ADMIN_USER, new BasicHeader("Content-Encoding", "gzip"))) {

final ForkJoinPool forkJoinPool = new ForkJoinPool(parallelism);

final List<CompletableFuture<HttpResponse>> waitingOn = IntStream.rangeClosed(1, totalNumberOfRequests)
.boxed()
.map(i -> CompletableFuture.supplyAsync(() -> {
final HttpPost post = new HttpPost(client.getHttpServerUri() + requestPath);
post.setEntity(new ByteArrayEntity(compressedRequestBody, ContentType.APPLICATION_JSON));
return client.executeRequest(post);
}, forkJoinPool))
.collect(Collectors.toList());

final CompletableFuture<Void> allOfThem = CompletableFuture.allOf(waitingOn.toArray(new CompletableFuture[0]));

allOfThem.get(30, TimeUnit.SECONDS);

waitingOn.stream().forEach(future -> {
try {
final HttpResponse response = future.get();
response.assertStatusCode(HttpStatus.SC_OK);
} catch (final Exception ex) {
throw new RuntimeException(ex);
}
});
;
}
}

@Test
public void testMixOfAuthenticatedAndUnauthenticatedGzippedRequests() throws Exception {
final String requestPath = "/*/_search";
final int parallelism = 10;
final int totalNumberOfRequests = 100;

final String rawBody = "{ \"query\": { \"match\": { \"foo\": \"bar\" }}}";

final byte[] compressedRequestBody = createCompressedRequestBody(rawBody);
try (final TestRestClient client = cluster.getRestClient(new BasicHeader("Content-Encoding", "gzip"))) {

final ForkJoinPool forkJoinPool = new ForkJoinPool(parallelism);

final Header basicAuthHeader = getBasicAuthHeader(ADMIN_USER.getName(), ADMIN_USER.getPassword());

final List<CompletableFuture<HttpResponse>> waitingOn = IntStream.rangeClosed(1, totalNumberOfRequests)
.boxed()
.map(i -> CompletableFuture.supplyAsync(() -> {
final HttpPost post = new HttpPost(client.getHttpServerUri() + requestPath);
post.setEntity(new ByteArrayEntity(compressedRequestBody, ContentType.APPLICATION_JSON));
return i % 2 == 0 ? client.executeRequest(post) : client.executeRequest(post, basicAuthHeader);
}, forkJoinPool))
.collect(Collectors.toList());

final CompletableFuture<Void> allOfThem = CompletableFuture.allOf(waitingOn.toArray(new CompletableFuture[0]));

allOfThem.get(30, TimeUnit.SECONDS);

waitingOn.stream().forEach(future -> {
try {
final HttpResponse response = future.get();
assertThat(response.getBody(), not(containsString("json_parse_exception")));
assertThat(response.getStatusCode(), anyOf(equalTo(HttpStatus.SC_UNAUTHORIZED), equalTo(HttpStatus.SC_OK)));
} catch (final Exception ex) {
throw new RuntimeException(ex);
}
});
;
}
}

static byte[] createCompressedRequestBody(final String rawBody) {
try (
final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
final GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream)
) {
gzipOutputStream.write(rawBody.getBytes(StandardCharsets.UTF_8));
gzipOutputStream.finish();

final byte[] compressedRequestBody = byteArrayOutputStream.toByteArray();
return compressedRequestBody;
} catch (final IOException ioe) {
throw new RuntimeException(ioe);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
public class SecurityNonSslHttpServerTransport extends Netty4HttpServerTransport {

private final ChannelInboundHandlerAdapter headerVerifier;
private final ChannelInboundHandlerAdapter conditionalDecompressor;

public SecurityNonSslHttpServerTransport(
final Settings settings,
Expand All @@ -73,7 +72,6 @@ public SecurityNonSslHttpServerTransport(
tracer
);
headerVerifier = new Netty4HttpRequestHeaderVerifier(restFilter, threadPool, settings);
conditionalDecompressor = new Netty4ConditionalDecompressor();
}

@Override
Expand All @@ -100,6 +98,6 @@ protected ChannelInboundHandlerAdapter createHeaderVerifier() {

@Override
protected ChannelInboundHandlerAdapter createDecompressor() {
return conditionalDecompressor;
return new Netty4ConditionalDecompressor();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.security.ssl.http.netty;

import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http.HttpContentDecompressor;

Expand All @@ -17,7 +16,6 @@

import org.opensearch.security.filter.NettyAttribute;

@Sharable
public class Netty4ConditionalDecompressor extends HttpContentDecompressor {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public class SecuritySSLNettyHttpServerTransport extends Netty4HttpServerTranspo
private final SecurityKeyStore sks;
private final SslExceptionHandler errorHandler;
private final ChannelInboundHandlerAdapter headerVerifier;
private final ChannelInboundHandlerAdapter conditionalDecompressor;

public SecuritySSLNettyHttpServerTransport(
final Settings settings,
Expand Down Expand Up @@ -79,7 +78,6 @@ public SecuritySSLNettyHttpServerTransport(
this.sks = sks;
this.errorHandler = errorHandler;
headerVerifier = new Netty4HttpRequestHeaderVerifier(restFilter, threadPool, settings);
conditionalDecompressor = new Netty4ConditionalDecompressor();
}

@Override
Expand Down Expand Up @@ -164,6 +162,6 @@ protected ChannelInboundHandlerAdapter createHeaderVerifier() {

@Override
protected ChannelInboundHandlerAdapter createDecompressor() {
return conditionalDecompressor;
return new Netty4ConditionalDecompressor();
}
}

0 comments on commit 499db78

Please sign in to comment.