Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elasticsearch client implementation with pit and no context search #2910

Merged
merged 3 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions data-prepper-plugins/opensearch-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies {
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation 'org.opensearch.client:opensearch-java:2.5.0'
implementation 'org.opensearch.client:opensearch-rest-client:2.7.0'
implementation 'co.elastic.clients:elasticsearch-java:7.17.0'
implementation "org.apache.commons:commons-lang3:3.12.0"
implementation('org.apache.maven:maven-artifact:3.0.3') {
exclude group: 'org.codehaus.plexus'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
}
});
} catch (final Exception e) {
LOG.error("Received an exception while searching with PIT for index '{}'", indexName);
LOG.error("Received an exception while searching with no search context for index '{}'", indexName);
throw new RuntimeException(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.dataprepper.plugins.source.opensearch.worker;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.opensearch.cat.IndicesResponse;
Expand Down Expand Up @@ -32,7 +34,9 @@ public class OpenSearchIndexPartitionCreationSupplier implements Function<Map<St

private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final IndexParametersConfiguration indexParametersConfiguration;
private final OpenSearchClient openSearchClient;
private OpenSearchClient openSearchClient;
private ElasticsearchClient elasticsearchClient;


public OpenSearchIndexPartitionCreationSupplier(final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final ClusterClientFactory clusterClientFactory) {
Expand All @@ -43,6 +47,8 @@ public OpenSearchIndexPartitionCreationSupplier(final OpenSearchSourceConfigurat

if (client instanceof OpenSearchClient) {
this.openSearchClient = (OpenSearchClient) client;
} else if (client instanceof ElasticsearchClient) {
this.elasticsearchClient = (ElasticsearchClient) client;
} else {
throw new IllegalArgumentException(String.format("ClusterClientFactory provided an invalid client object to the index partition creation supplier. " +
"The client must be of type OpenSearchClient. The client passed is of class %s", client.getClass()));
Expand All @@ -55,6 +61,8 @@ public List<PartitionIdentifier> apply(final Map<String, Object> globalStateMap)

if (Objects.nonNull(openSearchClient)) {
return applyForOpenSearchClient(globalStateMap);
} else if (Objects.nonNull(elasticsearchClient)) {
return applyForElasticSearchClient(globalStateMap);
}

return Collections.emptyList();
Expand All @@ -70,13 +78,37 @@ private List<PartitionIdentifier> applyForOpenSearchClient(final Map<String, Obj
}

return indicesResponse.valueBody().stream()
.filter(this::shouldIndexBeProcessed)
.filter(osIndicesRecord -> shouldIndexBeProcessed(osIndicesRecord, null))
.map(indexRecord -> PartitionIdentifier.builder().withPartitionKey(indexRecord.index()).build())
.collect(Collectors.toList());
}

private boolean shouldIndexBeProcessed(final IndicesRecord indicesRecord) {
if (Objects.isNull(indicesRecord.index())) {
private List<PartitionIdentifier> applyForElasticSearchClient(final Map<String, Object> globalStateMap) {
co.elastic.clients.elasticsearch.cat.IndicesResponse indicesResponse;
try {
indicesResponse = elasticsearchClient.cat().indices();
} catch (IOException | ElasticsearchException e) {
LOG.error("There was an exception when calling /_cat/indices to create new index partitions", e);
return Collections.emptyList();
}

return indicesResponse.valueBody().stream()
.filter(esIndicesRecord -> shouldIndexBeProcessed(null, esIndicesRecord))
.map(indexRecord -> PartitionIdentifier.builder().withPartitionKey(indexRecord.index()).build())
.collect(Collectors.toList());
}

private boolean shouldIndexBeProcessed(final IndicesRecord openSearchIndicesRecord, final co.elastic.clients.elasticsearch.cat.indices.IndicesRecord elasticSearchIndicesRecord) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just take in the index name here to simplify the logic.

The calls above to this method would then be filter(record -> shouldIndexBeProcessed(record.index()))


String indexName = null;

if (Objects.nonNull(openSearchIndicesRecord)) {
indexName = openSearchIndicesRecord.index();
} else if (Objects.nonNull(elasticSearchIndicesRecord)) {
indexName = elasticSearchIndicesRecord.index();
}

if (Objects.isNull(indexName)) {
return false;
}

Expand All @@ -87,16 +119,16 @@ private boolean shouldIndexBeProcessed(final IndicesRecord indicesRecord) {
final List<OpenSearchIndex> includedIndices = indexParametersConfiguration.getIncludedIndices();
final List<OpenSearchIndex> excludedIndices = indexParametersConfiguration.getExcludedIndices();

final boolean matchesIncludedPattern = includedIndices.isEmpty() || doesIndexMatchPattern(includedIndices, indicesRecord);
final boolean matchesExcludePattern = doesIndexMatchPattern(excludedIndices, indicesRecord);
final boolean matchesIncludedPattern = includedIndices.isEmpty() || doesIndexMatchPattern(includedIndices, indexName);
final boolean matchesExcludePattern = doesIndexMatchPattern(excludedIndices, indexName);


return matchesIncludedPattern && !matchesExcludePattern;
}

private boolean doesIndexMatchPattern(final List<OpenSearchIndex> indices, final IndicesRecord indicesRecord) {
private boolean doesIndexMatchPattern(final List<OpenSearchIndex> indices, final String indexName) {
for (final OpenSearchIndex index : indices) {
final Matcher matcher = index.getIndexNamePattern().matcher(indicesRecord.index());
final Matcher matcher = index.getIndexNamePattern().matcher(indexName);

if (matcher.matches()) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,12 @@ private OpenSearchIndexProgressState initializeProgressState() {
}

private List<String> getSearchAfter(final OpenSearchIndexProgressState openSearchIndexProgressState, final SearchWithSearchAfterResults searchWithSearchAfterResults) {
if (Objects.isNull(searchWithSearchAfterResults) && Objects.isNull(openSearchIndexProgressState.getSearchAfter())) {
return null;
}

if (Objects.isNull(searchWithSearchAfterResults) && Objects.nonNull(openSearchIndexProgressState.getSearchAfter())) {
return openSearchIndexProgressState.getSearchAfter();
if (Objects.isNull(searchWithSearchAfterResults)) {
if (Objects.isNull(openSearchIndexProgressState.getSearchAfter())) {
return null;
} else {
return openSearchIndexProgressState.getSearchAfter();
}
}

return searchWithSearchAfterResults.getNextSearchAfter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,26 @@
*/
package org.opensearch.dataprepper.plugins.source.opensearch.worker.client;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.elasticsearch._types.ScoreSort;
import co.elastic.clients.elasticsearch._types.SortOptions;
import co.elastic.clients.elasticsearch._types.SortOrder;
import co.elastic.clients.elasticsearch._types.Time;
import co.elastic.clients.elasticsearch._types.query_dsl.MatchAllQuery;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.core.ClosePointInTimeRequest;
import co.elastic.clients.elasticsearch.core.ClosePointInTimeResponse;
import co.elastic.clients.elasticsearch.core.OpenPointInTimeRequest;
import co.elastic.clients.elasticsearch.core.OpenPointInTimeResponse;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.elasticsearch.core.search.PointInTimeReference;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.SearchContextLimitException;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreateScrollRequest;
Expand All @@ -13,32 +33,100 @@
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.NoSearchContextSearchRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_ID_METADATA_ATTRIBUTE_NAME;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.INDEX_METADATA_ATTRIBUTE_NAME;

public class ElasticsearchAccessor implements SearchAccessor, ClusterClientFactory {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's room to consolidate common logic between this class and OpenSearchAccessor. We can handle that down the line though


private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchAccessor.class);

static final String PIT_RESOURCE_LIMIT_ERROR_TYPE = "rejected_execution_exception";

private final ElasticsearchClient elasticsearchClient;
private final SearchContextType searchContextType;

public ElasticsearchAccessor(final ElasticsearchClient elasticsearchClient, final SearchContextType searchContextType) {
this.elasticsearchClient = elasticsearchClient;
this.searchContextType = searchContextType;
}

@Override
public SearchContextType getSearchContextType() {
// todo: implement
return null;
return searchContextType;
}

@Override
public CreatePointInTimeResponse createPit(final CreatePointInTimeRequest createPointInTimeRequest) {
//todo: implement
return null;

OpenPointInTimeResponse openPointInTimeResponse;
try {
openPointInTimeResponse = elasticsearchClient.openPointInTime(OpenPointInTimeRequest.of(request -> request
.keepAlive(Time.of(time -> time.time(createPointInTimeRequest.getKeepAlive())))
.index(createPointInTimeRequest.getIndex())));
} catch (final ElasticsearchException e) {
if (isDueToPitLimitExceeded(e)) {
throw new SearchContextLimitException(String.format("There was an error creating a new point in time for index '%s': %s", createPointInTimeRequest.getIndex(),
e.error().causedBy().reason()));
}
LOG.error("There was an error creating a point in time for Elasticsearch: ", e);
throw e;
} catch (final IOException e) {
LOG.error("There was an error creating a point in time for Elasticsearch: ", e);
throw new RuntimeException(e);
}

return CreatePointInTimeResponse.builder()
.withPitId(openPointInTimeResponse.id())
.withCreationTime(Instant.now().toEpochMilli())
.build();
}

@Override
public SearchWithSearchAfterResults searchWithPit(SearchPointInTimeRequest searchPointInTimeRequest) {
//todo: implement
return null;
public SearchWithSearchAfterResults searchWithPit(final SearchPointInTimeRequest searchPointInTimeRequest) {
final SearchRequest searchRequest = SearchRequest.of(builder -> { builder
.pit(PointInTimeReference.of(pit -> pit
.id(searchPointInTimeRequest.getPitId())
.keepAlive(Time.of(time -> time.time(searchPointInTimeRequest.getKeepAlive())))))
.size(searchPointInTimeRequest.getPaginationSize())
.sort(SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))))
.query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery))));

if (Objects.nonNull(searchPointInTimeRequest.getSearchAfter())) {
builder.searchAfter(searchPointInTimeRequest.getSearchAfter());
}
return builder;
});


return searchWithSearchAfter(searchRequest);
}

@Override
public void deletePit(final DeletePointInTimeRequest deletePointInTimeRequest) {
//todo: implement
try {
final ClosePointInTimeResponse closePointInTimeResponse = elasticsearchClient.closePointInTime(ClosePointInTimeRequest.of(request -> request
.id(deletePointInTimeRequest.getPitId())));
if (closePointInTimeResponse.succeeded()) {
LOG.debug("Successfully deleted point in time id {}", deletePointInTimeRequest.getPitId());
} else {
LOG.warn("Point in time id {} was not deleted successfully. It will expire from keep-alive", deletePointInTimeRequest.getPitId());
}
} catch (final IOException | RuntimeException e) {
LOG.error("There was an error deleting the point in time with id {} for Elasticsearch. It will expire from keep-alive: ", deletePointInTimeRequest.getPitId(), e);
}
}

@Override
Expand All @@ -59,12 +147,56 @@ public void deleteScroll(DeleteScrollRequest deleteScrollRequest) {
}

@Override
public SearchWithSearchAfterResults searchWithoutSearchContext(NoSearchContextSearchRequest noSearchContextSearchRequest) {
return null;
public SearchWithSearchAfterResults searchWithoutSearchContext(final NoSearchContextSearchRequest noSearchContextSearchRequest) {
final SearchRequest searchRequest = SearchRequest.of(builder -> {
builder
.index(noSearchContextSearchRequest.getIndex())
.size(noSearchContextSearchRequest.getPaginationSize())
.sort(SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))))
.query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery))));

if (Objects.nonNull(noSearchContextSearchRequest.getSearchAfter())) {
builder.searchAfter(noSearchContextSearchRequest.getSearchAfter());
}

return builder;
});

return searchWithSearchAfter(searchRequest);
}

@Override
public Object getClient() {
return null;
return elasticsearchClient;
}

private SearchWithSearchAfterResults searchWithSearchAfter(final SearchRequest searchRequest) {

try {
final SearchResponse<ObjectNode> searchResponse = elasticsearchClient.search(searchRequest, ObjectNode.class);

final List<Event> documents = searchResponse.hits().hits().stream()
.map(hit -> JacksonEvent.builder()
.withData(hit.source())
.withEventMetadataAttributes(Map.of(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME, hit.id(), INDEX_METADATA_ATTRIBUTE_NAME, hit.index()))
.withEventType(EventType.DOCUMENT.toString()).build())
.collect(Collectors.toList());

final List<String> nextSearchAfter = Objects.nonNull(searchResponse.hits().hits()) && !searchResponse.hits().hits().isEmpty() ?
searchResponse.hits().hits().get(searchResponse.hits().hits().size() - 1).sort() :
null;

return SearchWithSearchAfterResults.builder()
.withDocuments(documents)
.withNextSearchAfter(nextSearchAfter)
.build();
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

private boolean isDueToPitLimitExceeded(final ElasticsearchException e) {
return Objects.nonNull(e.error()) && Objects.nonNull(e.error().causedBy()) && Objects.nonNull(e.error().causedBy().type())
&& PIT_RESOURCE_LIMIT_ERROR_TYPE.equals(e.error().causedBy().type());
}
}
Loading
Loading