Skip to content

Commit

Permalink
Migrate OpenSearch plugin to OpenSearch java client
Browse files Browse the repository at this point in the history
Add tests for both OpenSearch 1.x and 2.x compatibility
  • Loading branch information
wendigo committed Jan 10, 2024
1 parent 36740e6 commit 1744cea
Show file tree
Hide file tree
Showing 46 changed files with 439 additions and 294 deletions.
96 changes: 36 additions & 60 deletions plugin/trino-opensearch/pom.xml
Expand Up @@ -15,20 +15,13 @@

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<dep.elasticsearch.version>6.8.23</dep.elasticsearch.version>
<dep.opensearch.version>2.11.1</dep.opensearch.version>
</properties>

<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
<exclusions>
<!-- org.elasticsearch:elasticsearch:6.8.23 brings in version 4.5.13 (vs 4.5.2) -->
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down Expand Up @@ -114,9 +107,8 @@
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>4.1.2</version>
<version>4.1.5</version>
<exclusions>
<!-- Brings in duplicate classes already in org.slf4j:jcl-over-slf4j (from io.airlift:log-manager) -->
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
Expand All @@ -129,7 +121,6 @@
<artifactId>httpclient</artifactId>
<version>4.5.14</version>
<exclusions>
<!-- Brings in duplicate classes already in org.slf4j:jcl-over-slf4j (from io.airlift:log-manager) -->
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
Expand All @@ -150,51 +141,46 @@
</dependency>

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${dep.elasticsearch.version}</version>
<groupId>org.opensearch</groupId>
<artifactId>opensearch</artifactId>
<version>${dep.opensearch.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</exclusion>
<!-- trino-main brings in 8.4.1 (vs 7.7.3) -->
<exclusion>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-analyzers-common</artifactId>
</exclusion>
<!-- Brings in duplicate classes already in net.java.dev.jna:jna -->
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>jna</artifactId>
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-core</artifactId>
<version>${dep.elasticsearch.version}</version>
<groupId>org.opensearch</groupId>
<artifactId>opensearch-common</artifactId>
<version>${dep.opensearch.version}</version>
<exclusions>
<exclusion>
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-x-content</artifactId>
<version>${dep.elasticsearch.version}</version>
<groupId>org.opensearch</groupId>
<artifactId>opensearch-core</artifactId>
<version>${dep.opensearch.version}</version>
<exclusions>
<exclusion>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>${dep.elasticsearch.version}</version>
<groupId>org.opensearch.client</groupId>
<artifactId>opensearch-rest-client</artifactId>
<version>${dep.opensearch.version}</version>
<exclusions>
<!-- Brings in duplicate classes already in org.slf4j:jcl-over-slf4j (from io.airlift:log-manager) -->
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
Expand All @@ -203,13 +189,13 @@
</dependency>

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${dep.elasticsearch.version}</version>
<groupId>org.opensearch.client</groupId>
<artifactId>opensearch-rest-high-level-client</artifactId>
<version>${dep.opensearch.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
</exclusion>
</exclusions>
</dependency>
Expand Down Expand Up @@ -267,6 +253,13 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.opensearch</groupId>
<artifactId>opensearch-x-content</artifactId>
<version>${dep.opensearch.version}</version>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>http-server</artifactId>
Expand Down Expand Up @@ -389,12 +382,6 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>nginx</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
Expand All @@ -419,17 +406,6 @@
</rules>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<configuration>
<ignoredNonTestScopedDependencies>
<!-- TODO: https://issues.apache.org/jira/browse/MDEP-791 -->
<!-- elasticsearch-x-content is required in compile scope by org.elasticsearch:elasticsearch -->
<ignoredNonTestScopedDependency>org.elasticsearch:elasticsearch-x-content</ignoredNonTestScopedDependency>
</ignoredNonTestScopedDependencies>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
Expand Down
Expand Up @@ -45,8 +45,8 @@ public class NodesSystemTable
ImmutableList.<ColumnMetadata>builder()
.add(new ColumnMetadata("trino_node_id", createUnboundedVarcharType()))
.add(new ColumnMetadata("trino_node_address", createUnboundedVarcharType()))
.add(new ColumnMetadata("elasticsearch_node_id", createUnboundedVarcharType()))
.add(new ColumnMetadata("elasticsearch_node_address", createUnboundedVarcharType()))
.add(new ColumnMetadata("opensearch_node_id", createUnboundedVarcharType()))
.add(new ColumnMetadata("opensearch_node_address", createUnboundedVarcharType()))
.build());

private final OpenSearchClient client;
Expand Down Expand Up @@ -80,26 +80,26 @@ public ConnectorPageSource pageSource(ConnectorTransactionHandle transaction, Co

BlockBuilder nodeId = VARCHAR.createBlockBuilder(null, nodes.size());
BlockBuilder trinoAddress = VARCHAR.createBlockBuilder(null, nodes.size());
BlockBuilder elasticsearchNodeId = VARCHAR.createBlockBuilder(null, nodes.size());
BlockBuilder elasticsearchAddress = VARCHAR.createBlockBuilder(null, nodes.size());
BlockBuilder opensearchNodeId = VARCHAR.createBlockBuilder(null, nodes.size());
BlockBuilder opensearchAddress = VARCHAR.createBlockBuilder(null, nodes.size());

for (OpenSearchNode node : nodes) {
VARCHAR.writeString(nodeId, currentNode.getNodeIdentifier());
VARCHAR.writeString(trinoAddress, currentNode.getHostAndPort().toString());
VARCHAR.writeString(elasticsearchNodeId, node.getId());
VARCHAR.writeString(opensearchNodeId, node.getId());

if (node.getAddress().isPresent()) {
VARCHAR.writeString(elasticsearchAddress, node.getAddress().get());
VARCHAR.writeString(opensearchAddress, node.getAddress().get());
}
else {
elasticsearchAddress.appendNull();
opensearchAddress.appendNull();
}
}

return new FixedPageSource(ImmutableList.of(new Page(
nodeId.build(),
trinoAddress.build(),
elasticsearchNodeId.build(),
elasticsearchAddress.build())));
opensearchNodeId.build(),
opensearchAddress.build())));
}
}
Expand Up @@ -23,11 +23,11 @@
public enum OpenSearchErrorCode
implements ErrorCodeSupplier
{
ELASTICSEARCH_CONNECTION_ERROR(0, EXTERNAL),
ELASTICSEARCH_INVALID_RESPONSE(1, EXTERNAL),
ELASTICSEARCH_SSL_INITIALIZATION_FAILURE(2, EXTERNAL),
ELASTICSEARCH_QUERY_FAILURE(3, USER_ERROR),
ELASTICSEARCH_INVALID_METADATA(4, USER_ERROR);
OPENSEARCH_CONNECTION_ERROR(0, EXTERNAL),
OPENSEARCH_INVALID_RESPONSE(1, EXTERNAL),
OPENSEARCH_SSL_INITIALIZATION_FAILURE(2, EXTERNAL),
OPENSEARCH_QUERY_FAILURE(3, USER_ERROR),
OPENSEARCH_INVALID_METADATA(4, USER_ERROR);

private final ErrorCode errorCode;

Expand Down
Expand Up @@ -56,22 +56,22 @@ public ConnectorPageSource createPageSource(
requireNonNull(split, "split is null");
requireNonNull(table, "table is null");

OpenSearchTableHandle elasticsearchTable = (OpenSearchTableHandle) table;
OpenSearchSplit elasticsearchSplit = (OpenSearchSplit) split;
OpenSearchTableHandle opensearchTable = (OpenSearchTableHandle) table;
OpenSearchSplit opensearchSplit = (OpenSearchSplit) split;

if (elasticsearchTable.getType().equals(QUERY)) {
return new PassthroughQueryPageSource(client, elasticsearchTable);
if (opensearchTable.getType().equals(QUERY)) {
return new PassthroughQueryPageSource(client, opensearchTable);
}

if (columns.isEmpty()) {
return new CountQueryPageSource(client, elasticsearchTable, elasticsearchSplit);
return new CountQueryPageSource(client, opensearchTable, opensearchSplit);
}

return new ScanQueryPageSource(
client,
typeManager,
elasticsearchTable,
elasticsearchSplit,
opensearchTable,
opensearchSplit,
columns.stream()
.map(OpenSearchColumnHandle.class::cast)
.collect(toImmutableList()));
Expand Down
Expand Up @@ -19,14 +19,14 @@
import io.trino.spi.predicate.Range;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.Type;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.ExistsQueryBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryStringQueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.RegexpQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.ExistsQueryBuilder;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryStringQueryBuilder;
import org.opensearch.index.query.RangeQueryBuilder;
import org.opensearch.index.query.RegexpQueryBuilder;
import org.opensearch.index.query.TermQueryBuilder;

import java.time.Instant;
import java.time.ZoneOffset;
Expand Down
Expand Up @@ -26,9 +26,9 @@
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;

import java.util.Arrays;
import java.util.HashMap;
Expand Down
Expand Up @@ -45,7 +45,7 @@
class AwsRequestSigner
implements HttpRequestInterceptor
{
private static final String SERVICE_NAME = "es";
private static final String SERVICE_NAME = "aoss";
private final AWSCredentialsProvider credentialsProvider;
private final AWS4Signer signer;

Expand Down
Expand Up @@ -26,13 +26,13 @@
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.elasticsearch.client.Node;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.rest.RestStatus;
import org.opensearch.client.Node;
import org.opensearch.client.Request;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.Response;
import org.opensearch.client.ResponseException;
import org.opensearch.client.RestClient;
import org.opensearch.core.rest.RestStatus;

import java.io.IOException;
import java.util.Map;
Expand Down
Expand Up @@ -23,17 +23,17 @@
import io.airlift.log.Logger;
import io.airlift.stats.TimeStat;
import io.trino.plugin.elasticsearch.OpenSearchConfig;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.rest.RestStatus;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.ClearScrollResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.rest.RestStatus;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -104,8 +104,8 @@ public ClearScrollResponse clearScroll(ClearScrollRequest clearScrollRequest)

private static boolean isBackpressure(Throwable throwable)
{
return (throwable instanceof ElasticsearchStatusException) &&
(((ElasticsearchStatusException) throwable).status() == RestStatus.TOO_MANY_REQUESTS);
return (throwable instanceof OpenSearchStatusException) &&
(((OpenSearchStatusException) throwable).status() == RestStatus.TOO_MANY_REQUESTS);
}

private void onComplete(ExecutionCompletedEvent<ActionResponse> executionCompletedEvent)
Expand Down

0 comments on commit 1744cea

Please sign in to comment.