Skip to content

Commit

Permalink
Elasticsearch client implementation with pit and no context search (#…
Browse files Browse the repository at this point in the history
…2910)

Create Elasticsearch client, implement search and pit apis for ElasticsearchAccessor

Signed-off-by: Taylor Gray <tylgry@amazon.com>
  • Loading branch information
graytaylor0 committed Jun 21, 2023
1 parent 1fa7946 commit d4ad1b0
Show file tree
Hide file tree
Showing 13 changed files with 1,138 additions and 377 deletions.
1 change: 1 addition & 0 deletions data-prepper-plugins/opensearch-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies {
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation 'org.opensearch.client:opensearch-java:2.5.0'
implementation 'org.opensearch.client:opensearch-rest-client:2.7.0'
implementation 'co.elastic.clients:elasticsearch-java:7.17.0'
implementation "org.apache.commons:commons-lang3:3.12.0"
implementation('org.apache.maven:maven-artifact:3.0.3') {
exclude group: 'org.codehaus.plexus'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.UsesSourceCoordination;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.OpenSearchClientFactory;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessorStrategy;

Expand Down Expand Up @@ -41,7 +42,9 @@ public void start(final Buffer<Record<Event>> buffer) {
}

private void startProcess(final OpenSearchSourceConfiguration openSearchSourceConfiguration, final Buffer<Record<Event>> buffer) {
final SearchAccessorStrategy searchAccessorStrategy = SearchAccessorStrategy.create(openSearchSourceConfiguration, awsCredentialsSupplier);

final OpenSearchClientFactory openSearchClientFactory = OpenSearchClientFactory.create(awsCredentialsSupplier);
final SearchAccessorStrategy searchAccessorStrategy = SearchAccessorStrategy.create(openSearchSourceConfiguration, openSearchClientFactory);

final SearchAccessor searchAccessor = searchAccessorStrategy.getSearchAccessor();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
}
});
} catch (final Exception e) {
LOG.error("Received an exception while searching with PIT for index '{}'", indexName);
LOG.error("Received an exception while searching with no search context for index '{}'", indexName);
throw new RuntimeException(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@

package org.opensearch.dataprepper.plugins.source.opensearch.worker;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.opensearch.cat.IndicesResponse;
import org.opensearch.client.opensearch.cat.indices.IndicesRecord;
import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.IndexParametersConfiguration;
Expand All @@ -32,7 +33,9 @@ public class OpenSearchIndexPartitionCreationSupplier implements Function<Map<St

private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final IndexParametersConfiguration indexParametersConfiguration;
private final OpenSearchClient openSearchClient;
private OpenSearchClient openSearchClient;
private ElasticsearchClient elasticsearchClient;


public OpenSearchIndexPartitionCreationSupplier(final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final ClusterClientFactory clusterClientFactory) {
Expand All @@ -43,6 +46,8 @@ public OpenSearchIndexPartitionCreationSupplier(final OpenSearchSourceConfigurat

if (client instanceof OpenSearchClient) {
this.openSearchClient = (OpenSearchClient) client;
} else if (client instanceof ElasticsearchClient) {
this.elasticsearchClient = (ElasticsearchClient) client;
} else {
throw new IllegalArgumentException(String.format("ClusterClientFactory provided an invalid client object to the index partition creation supplier. " +
"The client must be of type OpenSearchClient. The client passed is of class %s", client.getClass()));
Expand All @@ -55,6 +60,8 @@ public List<PartitionIdentifier> apply(final Map<String, Object> globalStateMap)

if (Objects.nonNull(openSearchClient)) {
return applyForOpenSearchClient(globalStateMap);
} else if (Objects.nonNull(elasticsearchClient)) {
return applyForElasticSearchClient(globalStateMap);
}

return Collections.emptyList();
Expand All @@ -70,13 +77,29 @@ private List<PartitionIdentifier> applyForOpenSearchClient(final Map<String, Obj
}

return indicesResponse.valueBody().stream()
.filter(this::shouldIndexBeProcessed)
.filter(osIndicesRecord -> shouldIndexBeProcessed(osIndicesRecord.index()))
.map(indexRecord -> PartitionIdentifier.builder().withPartitionKey(indexRecord.index()).build())
.collect(Collectors.toList());
}

private List<PartitionIdentifier> applyForElasticSearchClient(final Map<String, Object> globalStateMap) {
co.elastic.clients.elasticsearch.cat.IndicesResponse indicesResponse;
try {
indicesResponse = elasticsearchClient.cat().indices();
} catch (IOException | ElasticsearchException e) {
LOG.error("There was an exception when calling /_cat/indices to create new index partitions", e);
return Collections.emptyList();
}

return indicesResponse.valueBody().stream()
.filter(esIndicesRecord -> shouldIndexBeProcessed(esIndicesRecord.index()))
.map(indexRecord -> PartitionIdentifier.builder().withPartitionKey(indexRecord.index()).build())
.collect(Collectors.toList());
}

private boolean shouldIndexBeProcessed(final IndicesRecord indicesRecord) {
if (Objects.isNull(indicesRecord.index())) {
private boolean shouldIndexBeProcessed(final String indexName) {

if (Objects.isNull(indexName)) {
return false;
}

Expand All @@ -87,16 +110,16 @@ private boolean shouldIndexBeProcessed(final IndicesRecord indicesRecord) {
final List<OpenSearchIndex> includedIndices = indexParametersConfiguration.getIncludedIndices();
final List<OpenSearchIndex> excludedIndices = indexParametersConfiguration.getExcludedIndices();

final boolean matchesIncludedPattern = includedIndices.isEmpty() || doesIndexMatchPattern(includedIndices, indicesRecord);
final boolean matchesExcludePattern = doesIndexMatchPattern(excludedIndices, indicesRecord);
final boolean matchesIncludedPattern = includedIndices.isEmpty() || doesIndexMatchPattern(includedIndices, indexName);
final boolean matchesExcludePattern = doesIndexMatchPattern(excludedIndices, indexName);


return matchesIncludedPattern && !matchesExcludePattern;
}

private boolean doesIndexMatchPattern(final List<OpenSearchIndex> indices, final IndicesRecord indicesRecord) {
private boolean doesIndexMatchPattern(final List<OpenSearchIndex> indices, final String indexName) {
for (final OpenSearchIndex index : indices) {
final Matcher matcher = index.getIndexNamePattern().matcher(indicesRecord.index());
final Matcher matcher = index.getIndexNamePattern().matcher(indexName);

if (matcher.matches()) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,12 @@ private OpenSearchIndexProgressState initializeProgressState() {
}

private List<String> getSearchAfter(final OpenSearchIndexProgressState openSearchIndexProgressState, final SearchWithSearchAfterResults searchWithSearchAfterResults) {
if (Objects.isNull(searchWithSearchAfterResults) && Objects.isNull(openSearchIndexProgressState.getSearchAfter())) {
return null;
}

if (Objects.isNull(searchWithSearchAfterResults) && Objects.nonNull(openSearchIndexProgressState.getSearchAfter())) {
return openSearchIndexProgressState.getSearchAfter();
if (Objects.isNull(searchWithSearchAfterResults)) {
if (Objects.isNull(openSearchIndexProgressState.getSearchAfter())) {
return null;
} else {
return openSearchIndexProgressState.getSearchAfter();
}
}

return searchWithSearchAfterResults.getNextSearchAfter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,26 @@
*/
package org.opensearch.dataprepper.plugins.source.opensearch.worker.client;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.elasticsearch._types.ScoreSort;
import co.elastic.clients.elasticsearch._types.SortOptions;
import co.elastic.clients.elasticsearch._types.SortOrder;
import co.elastic.clients.elasticsearch._types.Time;
import co.elastic.clients.elasticsearch._types.query_dsl.MatchAllQuery;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.core.ClosePointInTimeRequest;
import co.elastic.clients.elasticsearch.core.ClosePointInTimeResponse;
import co.elastic.clients.elasticsearch.core.OpenPointInTimeRequest;
import co.elastic.clients.elasticsearch.core.OpenPointInTimeResponse;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.elasticsearch.core.search.PointInTimeReference;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.SearchContextLimitException;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreateScrollRequest;
Expand All @@ -13,32 +33,100 @@
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.NoSearchContextSearchRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_ID_METADATA_ATTRIBUTE_NAME;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.INDEX_METADATA_ATTRIBUTE_NAME;

public class ElasticsearchAccessor implements SearchAccessor, ClusterClientFactory {

private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchAccessor.class);

static final String PIT_RESOURCE_LIMIT_ERROR_TYPE = "rejected_execution_exception";

private final ElasticsearchClient elasticsearchClient;
private final SearchContextType searchContextType;

public ElasticsearchAccessor(final ElasticsearchClient elasticsearchClient, final SearchContextType searchContextType) {
this.elasticsearchClient = elasticsearchClient;
this.searchContextType = searchContextType;
}

@Override
public SearchContextType getSearchContextType() {
// todo: implement
return null;
return searchContextType;
}

@Override
public CreatePointInTimeResponse createPit(final CreatePointInTimeRequest createPointInTimeRequest) {
//todo: implement
return null;

OpenPointInTimeResponse openPointInTimeResponse;
try {
openPointInTimeResponse = elasticsearchClient.openPointInTime(OpenPointInTimeRequest.of(request -> request
.keepAlive(Time.of(time -> time.time(createPointInTimeRequest.getKeepAlive())))
.index(createPointInTimeRequest.getIndex())));
} catch (final ElasticsearchException e) {
if (isDueToPitLimitExceeded(e)) {
throw new SearchContextLimitException(String.format("There was an error creating a new point in time for index '%s': %s", createPointInTimeRequest.getIndex(),
e.error().causedBy().reason()));
}
LOG.error("There was an error creating a point in time for Elasticsearch: ", e);
throw e;
} catch (final IOException e) {
LOG.error("There was an error creating a point in time for Elasticsearch: ", e);
throw new RuntimeException(e);
}

return CreatePointInTimeResponse.builder()
.withPitId(openPointInTimeResponse.id())
.withCreationTime(Instant.now().toEpochMilli())
.build();
}

@Override
public SearchWithSearchAfterResults searchWithPit(SearchPointInTimeRequest searchPointInTimeRequest) {
//todo: implement
return null;
public SearchWithSearchAfterResults searchWithPit(final SearchPointInTimeRequest searchPointInTimeRequest) {
final SearchRequest searchRequest = SearchRequest.of(builder -> { builder
.pit(PointInTimeReference.of(pit -> pit
.id(searchPointInTimeRequest.getPitId())
.keepAlive(Time.of(time -> time.time(searchPointInTimeRequest.getKeepAlive())))))
.size(searchPointInTimeRequest.getPaginationSize())
.sort(SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))))
.query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery))));

if (Objects.nonNull(searchPointInTimeRequest.getSearchAfter())) {
builder.searchAfter(searchPointInTimeRequest.getSearchAfter());
}
return builder;
});


return searchWithSearchAfter(searchRequest);
}

@Override
public void deletePit(final DeletePointInTimeRequest deletePointInTimeRequest) {
//todo: implement
try {
final ClosePointInTimeResponse closePointInTimeResponse = elasticsearchClient.closePointInTime(ClosePointInTimeRequest.of(request -> request
.id(deletePointInTimeRequest.getPitId())));
if (closePointInTimeResponse.succeeded()) {
LOG.debug("Successfully deleted point in time id {}", deletePointInTimeRequest.getPitId());
} else {
LOG.warn("Point in time id {} was not deleted successfully. It will expire from keep-alive", deletePointInTimeRequest.getPitId());
}
} catch (final IOException | RuntimeException e) {
LOG.error("There was an error deleting the point in time with id {} for Elasticsearch. It will expire from keep-alive: ", deletePointInTimeRequest.getPitId(), e);
}
}

@Override
Expand All @@ -59,12 +147,56 @@ public void deleteScroll(DeleteScrollRequest deleteScrollRequest) {
}

@Override
public SearchWithSearchAfterResults searchWithoutSearchContext(NoSearchContextSearchRequest noSearchContextSearchRequest) {
return null;
public SearchWithSearchAfterResults searchWithoutSearchContext(final NoSearchContextSearchRequest noSearchContextSearchRequest) {
final SearchRequest searchRequest = SearchRequest.of(builder -> {
builder
.index(noSearchContextSearchRequest.getIndex())
.size(noSearchContextSearchRequest.getPaginationSize())
.sort(SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))))
.query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery))));

if (Objects.nonNull(noSearchContextSearchRequest.getSearchAfter())) {
builder.searchAfter(noSearchContextSearchRequest.getSearchAfter());
}

return builder;
});

return searchWithSearchAfter(searchRequest);
}

@Override
public Object getClient() {
return null;
return elasticsearchClient;
}

private SearchWithSearchAfterResults searchWithSearchAfter(final SearchRequest searchRequest) {

try {
final SearchResponse<ObjectNode> searchResponse = elasticsearchClient.search(searchRequest, ObjectNode.class);

final List<Event> documents = searchResponse.hits().hits().stream()
.map(hit -> JacksonEvent.builder()
.withData(hit.source())
.withEventMetadataAttributes(Map.of(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME, hit.id(), INDEX_METADATA_ATTRIBUTE_NAME, hit.index()))
.withEventType(EventType.DOCUMENT.toString()).build())
.collect(Collectors.toList());

final List<String> nextSearchAfter = Objects.nonNull(searchResponse.hits().hits()) && !searchResponse.hits().hits().isEmpty() ?
searchResponse.hits().hits().get(searchResponse.hits().hits().size() - 1).sort() :
null;

return SearchWithSearchAfterResults.builder()
.withDocuments(documents)
.withNextSearchAfter(nextSearchAfter)
.build();
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

private boolean isDueToPitLimitExceeded(final ElasticsearchException e) {
return Objects.nonNull(e.error()) && Objects.nonNull(e.error().causedBy()) && Objects.nonNull(e.error().causedBy().type())
&& PIT_RESOURCE_LIMIT_ERROR_TYPE.equals(e.error().causedBy().type());
}
}
Loading

0 comments on commit d4ad1b0

Please sign in to comment.