Skip to content

Commit

Permalink
Refactor Netty4 method (opensearch-project#166)
Browse files Browse the repository at this point in the history
* issue opensearch-project#28

Signed-off-by: mloufra <mloufra@amazon.com>

* Update the lastest coomit

Signed-off-by: mloufra <mloufra@amazon.com>

* Rename the method and fix the conflict

Signed-off-by: mloufra <mloufra@amazon.com>

* fix merge conflict

Signed-off-by: mloufra <mloufra@amazon.com>

* Add code coverage report

Signed-off-by: mloufra <mloufra@amazon.com>

* Rebase the lastest commit

Signed-off-by: mloufra <mloufra@amazon.com>

* update the lastest commit

Signed-off-by: mloufra <mloufra@amazon.com>

* refactor netty4 and initialozaExtensionTransport method

Signed-off-by: mloufra <mloufra@amazon.com>

* resolve comment

Signed-off-by: mloufra <mloufra@amazon.com>

Signed-off-by: mloufra <mloufra@amazon.com>
  • Loading branch information
mloufra authored and kokibas committed Mar 17, 2023
1 parent f0a618e commit 0f74e30
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 111 deletions.
105 changes: 3 additions & 102 deletions src/main/java/org/opensearch/sdk/ExtensionsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,14 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterModule;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.io.stream.NamedWriteableRegistryParseRequest;
import org.opensearch.extensions.OpenSearchRequest;
import org.opensearch.extensions.rest.ExtensionRestRequest;
import org.opensearch.extensions.rest.RegisterRestActionsRequest;
import org.opensearch.extensions.settings.RegisterCustomSettingsRequest;
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.network.NetworkService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.discovery.InitializeExtensionsRequest;
import org.opensearch.extensions.ExtensionActionListenerOnFailureRequest;
import org.opensearch.extensions.DiscoveryExtension;
Expand All @@ -35,12 +29,7 @@
import org.opensearch.extensions.ExtensionRequest;
import org.opensearch.extensions.ExtensionsOrchestrator;
import org.opensearch.index.IndicesModuleRequest;
import org.opensearch.indices.IndicesModule;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.rest.RestHandler.Route;
import org.opensearch.transport.netty4.Netty4Transport;
import org.opensearch.transport.SharedGroupFactory;
import org.opensearch.sdk.handlers.ActionListenerOnFailureResponseHandler;
import org.opensearch.sdk.handlers.ClusterSettingsResponseHandler;
import org.opensearch.sdk.handlers.ClusterStateResponseHandler;
Expand All @@ -54,11 +43,7 @@
import org.opensearch.sdk.handlers.UpdateSettingsRequestHandler;
import org.opensearch.sdk.handlers.ExtensionStringResponseHandler;
import org.opensearch.sdk.handlers.OpensearchRequestHandler;
import org.opensearch.search.SearchModule;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ClusterConnectionManager;
import org.opensearch.transport.ConnectionManager;
import org.opensearch.transport.TransportInterceptor;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.TransportSettings;

Expand All @@ -69,14 +54,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.function.Consumer;

import static java.util.Collections.emptySet;
import static org.opensearch.common.UUIDs.randomBase64UUID;

/**
* The primary class to run an extension.
* <p>
Expand Down Expand Up @@ -109,15 +88,14 @@ public class ExtensionsRunner {
* This field is initialized by a call from {@link ExtensionsInitRequestHandler}.
*/
public final Settings settings;
private final TransportInterceptor NOOP_TRANSPORT_INTERCEPTOR = new TransportInterceptor() {
};
private ExtensionNamedWriteableRegistry namedWriteableRegistryApi = new ExtensionNamedWriteableRegistry();
private ExtensionsInitRequestHandler extensionsInitRequestHandler = new ExtensionsInitRequestHandler();
private OpensearchRequestHandler opensearchRequestHandler = new OpensearchRequestHandler();
private ExtensionsIndicesModuleRequestHandler extensionsIndicesModuleRequestHandler = new ExtensionsIndicesModuleRequestHandler();
private ExtensionsIndicesModuleNameRequestHandler extensionsIndicesModuleNameRequestHandler =
new ExtensionsIndicesModuleNameRequestHandler();
private ExtensionsRestRequestHandler extensionsRestRequestHandler = new ExtensionsRestRequestHandler();
private NettyTransport nettyTransport = new NettyTransport();

/*
* TODO: expose an interface for extension to register actions
Expand Down Expand Up @@ -170,7 +148,7 @@ private ExtensionsRunner(Extension extension) throws IOException {
// save custom settings
this.customSettings = extension.getSettings();
// initialize the transport service
this.initializeExtensionTransportService(this.getSettings());
nettyTransport.initializeExtensionTransportService(this.getSettings(), this);
// start listening on configured port and wait for connection from OpenSearch
this.startActionListener(0);
}
Expand Down Expand Up @@ -213,83 +191,6 @@ DiscoveryNode getOpensearchNode() {
return opensearchNode;
}

/**
* Initializes a Netty4Transport object. This object will be wrapped in a {@link TransportService} object.
*
* @param settings The transport settings to configure.
* @param threadPool A thread pool to use.
* @return The configured Netty4Transport object.
*/
public Netty4Transport getNetty4Transport(Settings settings, ThreadPool threadPool) {
NetworkService networkService = new NetworkService(Collections.emptyList());
PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings);
IndicesModule indicesModule = new IndicesModule(Collections.emptyList());
SearchModule searchModule = new SearchModule(settings, Collections.emptyList());

List<NamedWriteableRegistry.Entry> namedWriteables = Stream.of(
NetworkModule.getNamedWriteables().stream(),
indicesModule.getNamedWriteables().stream(),
searchModule.getNamedWriteables().stream(),
null,
ClusterModule.getNamedWriteables().stream()
).flatMap(Function.identity()).collect(Collectors.toList());

final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables);

final CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService();

Netty4Transport transport = new Netty4Transport(
settings,
Version.CURRENT,
threadPool,
networkService,
pageCacheRecycler,
namedWriteableRegistry,
circuitBreakerService,
new SharedGroupFactory(settings)
);

return transport;
}

/**
* Initializes the TransportService object for this extension. This object will control communication between the extension and OpenSearch.
*
* @param settings The transport settings to configure.
* @return The initialized TransportService object.
*/
public TransportService initializeExtensionTransportService(Settings settings) {

ThreadPool threadPool = new ThreadPool(settings);

Netty4Transport transport = getNetty4Transport(settings, threadPool);

final ConnectionManager connectionManager = new ClusterConnectionManager(settings, transport);

// Stop any existing transport service
if (extensionTransportService != null) {
extensionTransportService.stop();
}

// create transport service
extensionTransportService = new TransportService(
settings,
transport,
threadPool,
NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> DiscoveryNode.createLocal(
Settings.builder().put(NODE_NAME_SETTING, settings.get(NODE_NAME_SETTING)).build(),
boundAddress.publishAddress(),
randomBase64UUID()
),
null,
emptySet(),
connectionManager
);
startTransportService(extensionTransportService);
return extensionTransportService;
}

/**
* Starts a TransportService.
*
Expand Down Expand Up @@ -591,7 +492,7 @@ public static void main(String[] args) throws IOException {
ExtensionsRunner extensionsRunner = new ExtensionsRunner();

// initialize the transport service
extensionsRunner.initializeExtensionTransportService(extensionsRunner.getSettings());
extensionsRunner.nettyTransport.initializeExtensionTransportService(extensionsRunner.getSettings(), extensionsRunner);
// start listening on configured port and wait for connection from OpenSearch
extensionsRunner.startActionListener(0);
}
Expand Down
120 changes: 120 additions & 0 deletions src/main/java/org/opensearch/sdk/NettyTransport.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.sdk;

import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.opensearch.Version;
import org.opensearch.cluster.ClusterModule;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.network.NetworkService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.indices.IndicesModule;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.search.SearchModule;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.SharedGroupFactory;
import org.opensearch.transport.TransportInterceptor;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.netty4.Netty4Transport;

import static java.util.Collections.emptySet;
import static org.opensearch.common.UUIDs.randomBase64UUID;

/**
* This class initializes a Netty4Transport object and control communication between the extension and OpenSearch.
*/

public class NettyTransport {
private static final String NODE_NAME_SETTING = "node.name";
private final TransportInterceptor NOOP_TRANSPORT_INTERCEPTOR = new TransportInterceptor() {
};

/**
* Initializes a Netty4Transport object. This object will be wrapped in a {@link TransportService} object.
*
* @param settings The transport settings to configure.
* @param threadPool A thread pool to use.
* @return The configured Netty4Transport object.
*/
public Netty4Transport getNetty4Transport(Settings settings, ThreadPool threadPool) {
NetworkService networkService = new NetworkService(Collections.emptyList());
PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings);
IndicesModule indicesModule = new IndicesModule(Collections.emptyList());
SearchModule searchModule = new SearchModule(settings, Collections.emptyList());

List<NamedWriteableRegistry.Entry> namedWriteables = Stream.of(
NetworkModule.getNamedWriteables().stream(),
indicesModule.getNamedWriteables().stream(),
searchModule.getNamedWriteables().stream(),
ClusterModule.getNamedWriteables().stream()
).flatMap(Function.identity()).collect(Collectors.toList());

final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables);

final CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService();

Netty4Transport transport = new Netty4Transport(
settings,
Version.CURRENT,
threadPool,
networkService,
pageCacheRecycler,
namedWriteableRegistry,
circuitBreakerService,
new SharedGroupFactory(settings)
);

return transport;
}

/**
* Initializes the TransportService object for this extension. This object will control communication between the extension and OpenSearch.
*
* @param settings The transport settings to configure.
* @param extensionsRunner method to call
* @return The initialized TransportService object.
*/
public TransportService initializeExtensionTransportService(Settings settings, ExtensionsRunner extensionsRunner) {

ThreadPool threadPool = new ThreadPool(settings);

Netty4Transport transport = getNetty4Transport(settings, threadPool);

// Stop any existing transport service
if (extensionsRunner.extensionTransportService != null) {
extensionsRunner.extensionTransportService.stop();
}

// create transport service
extensionsRunner.extensionTransportService = new TransportService(
settings,
transport,
threadPool,
NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> DiscoveryNode.createLocal(
Settings.builder().put(NODE_NAME_SETTING, settings.get(NODE_NAME_SETTING)).build(),
boundAddress.publishAddress(),
randomBase64UUID()
),
null,
emptySet()
);
extensionsRunner.startTransportService(extensionsRunner.extensionTransportService);
return extensionsRunner.extensionTransportService;
}

}
11 changes: 5 additions & 6 deletions src/test/java/org/opensearch/sdk/TestNetty4Transport.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@

public class TestNetty4Transport extends OpenSearchTestCase {

private ExtensionsRunner extensionsRunner;
private ThreadPool threadPool;
private NettyTransport nettyTransport = new NettyTransport();

@BeforeEach
public void setUp() throws IOException {
this.extensionsRunner = new ExtensionsRunner();
this.threadPool = new TestThreadPool("test");
}

Expand All @@ -44,7 +43,7 @@ public void testNettyCanBindToMultiplePorts() throws IOException {
.put("transport.profiles.client1.port", 0)
.build();

Netty4Transport transport = extensionsRunner.getNetty4Transport(settings, threadPool);
Netty4Transport transport = nettyTransport.getNetty4Transport(settings, threadPool);

try {
startNetty4Transport(transport);
Expand All @@ -67,7 +66,7 @@ public void testDefaultProfileInheritsFomStandardSettings() throws IOException {
.put("transport.profiles.client1.port", 0)
.build();

Netty4Transport transport = extensionsRunner.getNetty4Transport(settings, threadPool);
Netty4Transport transport = nettyTransport.getNetty4Transport(settings, threadPool);

try {
startNetty4Transport(transport);
Expand All @@ -94,7 +93,7 @@ public void testThatProfileWithoutPortFails() throws IOException {
// attempt creating netty object with invalid settings
IllegalStateException ex = expectThrows(
IllegalStateException.class,
() -> extensionsRunner.getNetty4Transport(settings, threadPool)
() -> nettyTransport.getNetty4Transport(settings, threadPool)
);
assertEquals("profile [no_port] has no port configured", ex.getMessage());
} finally {
Expand All @@ -112,7 +111,7 @@ public void testDefaultProfilePortOverridesGeneralConfiguration() throws IOExcep
.put("transport.profiles.default.port", 0) // default port configuration will overwrite attempt
.build();

Netty4Transport transport = extensionsRunner.getNetty4Transport(settings, threadPool);
Netty4Transport transport = nettyTransport.getNetty4Transport(settings, threadPool);

try {
startNetty4Transport(transport);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class TransportCommunicationIT extends OpenSearchIntegTestCase {
private final int port = 7777;
private final String host = "127.0.0.1";
private volatile String clientResult;
private NettyTransport nettyTransport = new NettyTransport();

@Override
@BeforeEach
Expand All @@ -51,9 +52,8 @@ public void setUp() {
@Test
public void testSocketSetup() throws IOException {

ExtensionsRunner extensionsRunner = new ExtensionsRunner();
ThreadPool threadPool = new TestThreadPool("test");
Netty4Transport transport = extensionsRunner.getNetty4Transport(settings, threadPool);
Netty4Transport transport = nettyTransport.getNetty4Transport(settings, threadPool);

// start netty transport and ensure that address info is exposed
try {
Expand Down Expand Up @@ -147,7 +147,7 @@ private void startTransportandClient(Settings settings, Thread client) throws IO
// retrieve transport service
ExtensionsRunner extensionsRunner = new ExtensionsRunner();
// start transport service
TransportService transportService = extensionsRunner.initializeExtensionTransportService(settings);
TransportService transportService = nettyTransport.initializeExtensionTransportService(settings, extensionsRunner);

assertEquals(Lifecycle.State.STARTED, transportService.lifecycleState());

Expand Down

0 comments on commit 0f74e30

Please sign in to comment.