Skip to content

Commit

Permalink
ingest: support default pipelines + bulk upserts (elastic#36618)
Browse files Browse the repository at this point in the history
This commit adds support to enable bulk upserts to use an index's
default pipeline. Bulk upsert, doc_as_upsert, and script_as_upsert
are all supported.

However, bulk script_as_upsert has slightly surprising behavior since
the pipeline is executed _before_ the script is evaluated. This means
that the pipeline only has access the data found in the upsert field
of the script_as_upsert. The non-bulk script_as_upsert (existing behavior)
runs the pipeline _after_ the script is executed. This commit
does _not_ attempt to consolidate the bulk and non-bulk behavior for
script_as_upsert.

This commit also adds additional testing for the non-bulk behavior,
which remains unchanged with this commit.

fixes elastic#36219
  • Loading branch information
jakelandis committed Dec 17, 2018
1 parent 6f9588c commit 38d9f3f
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ teardown:
]
}
- match: { acknowledged: true }

# default pipeline via index
- do:
indices.create:
index: test
Expand All @@ -48,7 +48,7 @@ teardown:
id: 1
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }

# default pipeline via alias
- do:
index:
index: test_alias
Expand All @@ -63,28 +63,117 @@ teardown:
id: 2
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# default pipeline via upsert
- do:
update:
index: test
type: test
id: 3
body:
script:
source: "ctx._source.ran_script = true"
lang: "painless"
upsert: { "bytes_source_field":"1kb" }
- do:
get:
index: test
type: test
id: 3
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# default pipeline via scripted upsert
- do:
update:
index: test
type: test
id: 4
body:
script:
source: "ctx._source.bytes_source_field = '1kb'"
lang: "painless"
upsert : {}
scripted_upsert: true
- do:
get:
index: test
type: test
id: 4
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# default pipeline via doc_as_upsert
- do:
update:
index: test
type: test
id: 5
body:
doc: { "bytes_source_field":"1kb" }
doc_as_upsert: true
- do:
get:
index: test
type: test
id: 5
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# default pipeline via bulk upsert
# note - bulk scripted upsert's execute the pipeline before the script, so any data referenced by the pipeline
# needs to be in the upsert, not the script
- do:
bulk:
refresh: true
body: |
{"update":{"_id":"6","_index":"test","_type":"test"}}
{"script":"ctx._source.ran_script = true","upsert":{"bytes_source_field":"1kb"}}
{"update":{"_id":"7","_index":"test","_type":"test"}}
{"doc":{"bytes_source_field":"2kb"}, "doc_as_upsert":true}
{"update":{"_id":"8","_index":"test","_type":"test"}}
{"script": "ctx._source.ran_script = true","upsert":{"bytes_source_field":"3kb"}, "scripted_upsert" : true}
- do:
mget:
body:
docs:
- { _index: "test", _type: "_doc", _id: "6" }
- { _index: "test", _type: "_doc", _id: "7" }
- { _index: "test", _type: "_doc", _id: "8" }
- match: { docs.0._index: "test" }
- match: { docs.0._id: "6" }
- match: { docs.0._source.bytes_source_field: "1kb" }
- match: { docs.0._source.bytes_target_field: 1024 }
- is_false: docs.0._source.ran_script
- match: { docs.1._index: "test" }
- match: { docs.1._id: "7" }
- match: { docs.1._source.bytes_source_field: "2kb" }
- match: { docs.1._source.bytes_target_field: 2048 }
- match: { docs.2._index: "test" }
- match: { docs.2._id: "8" }
- match: { docs.2._source.bytes_source_field: "3kb" }
- match: { docs.2._source.bytes_target_field: 3072 }
- match: { docs.2._source.ran_script: true }

# explicit no default pipeline
- do:
index:
index: test
type: test
id: 3
id: 9
pipeline: "_none"
body: {bytes_source_field: "1kb"}

- do:
get:
index: test
type: test
id: 3
id: 9
- match: { _source.bytes_source_field: "1kb" }
- is_false: _source.bytes_target_field

# bad request
- do:
catch: bad_request
index:
index: test
type: test
id: 4
id: 10
pipeline: ""
body: {bytes_source_field: "1kb"}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,24 @@ public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportSe
clusterService.addStateApplier(this.ingestForwarder);
}

/**
* Retrieves the {@link IndexRequest} from the provided {@link DocWriteRequest} for index or upsert actions. Upserts are
* modeled as {@link IndexRequest} inside the {@link UpdateRequest}. Ignores {@link org.elasticsearch.action.delete.DeleteRequest}'s
*
* @param docWriteRequest The request to find the {@link IndexRequest}
* @return the found {@link IndexRequest} or {@code null} if one can not be found.
*/
public static IndexRequest getIndexWriteRequest(DocWriteRequest docWriteRequest) {
IndexRequest indexRequest = null;
if (docWriteRequest instanceof IndexRequest) {
indexRequest = (IndexRequest) docWriteRequest;
} else if (docWriteRequest instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) docWriteRequest;
indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest();
}
return indexRequest;
}

@Override
protected final void doExecute(final BulkRequest bulkRequest, final ActionListener<BulkResponse> listener) {
throw new UnsupportedOperationException("task parameter is required for this operation");
Expand Down Expand Up @@ -212,12 +230,12 @@ private void executeIngestAndBulk(Task task, final BulkRequest bulkRequest, fina
final MetaData metaData = clusterService.state().getMetaData();
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = metaData.indices();
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
if (actionRequest instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) actionRequest;
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
if(indexRequest != null){
String pipeline = indexRequest.getPipeline();
if (pipeline == null) {
IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index());
if (indexMetaData == null) {
IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index());
if (indexMetaData == null && indexRequest.index() != null) {
//check the alias
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index());
if (indexOrAlias != null && indexOrAlias.isAlias()) {
Expand Down Expand Up @@ -633,7 +651,7 @@ ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis,
}

void markCurrentItemAsDropped() {
IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(currentSlot);
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot));
failedSlots.set(currentSlot);
itemResponses.add(
new BulkItemResponse(currentSlot, indexRequest.opType(),
Expand All @@ -646,7 +664,7 @@ void markCurrentItemAsDropped() {
}

void markCurrentItemAsFailed(Exception e) {
IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(currentSlot);
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot));
// We hit a error during preprocessing a request, so we:
// 1) Remember the request item slot from the bulk, so that we're done processing all requests we know what failed
// 2) Add a bulk item failure for this request
Expand Down
10 changes: 2 additions & 8 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -388,13 +388,7 @@ public void onFailure(Exception e) {
@Override
protected void doRun() {
for (DocWriteRequest<?> actionRequest : actionRequests) {
IndexRequest indexRequest = null;
if (actionRequest instanceof IndexRequest) {
indexRequest = (IndexRequest) actionRequest;
} else if (actionRequest instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) actionRequest;
indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest();
}
IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest);
if (indexRequest == null) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
Expand Down Expand Up @@ -408,6 +409,57 @@ public void testUseDefaultPipelineWithAlias() throws Exception {
validateDefaultPipeline(new IndexRequest(WITH_DEFAULT_PIPELINE_ALIAS, "type", "id"));
}

public void testUseDefaultPipelineWithBulkUpsert() throws Exception {
Exception exception = new Exception("fake exception");
BulkRequest bulkRequest = new BulkRequest();
IndexRequest indexRequest1 = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id1").source(Collections.emptyMap());
IndexRequest indexRequest2 = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id2").source(Collections.emptyMap());
IndexRequest indexRequest3 = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id3").source(Collections.emptyMap());
UpdateRequest upsertRequest = new UpdateRequest(WITH_DEFAULT_PIPELINE, "type", "id1").upsert(indexRequest1).script(mockScript("1"));
UpdateRequest docAsUpsertRequest = new UpdateRequest(WITH_DEFAULT_PIPELINE, "type", "id2").doc(indexRequest2).docAsUpsert(true);
// this test only covers the mechanics that scripted bulk upserts will execute a default pipeline. However, in practice scripted
// bulk upserts with a default pipeline are a bit surprising since the script executes AFTER the pipeline.
UpdateRequest scriptedUpsert = new UpdateRequest(WITH_DEFAULT_PIPELINE, "type", "id2").upsert(indexRequest3).script(mockScript("1"))
.scriptedUpsert(true);
bulkRequest.add(upsertRequest).add(docAsUpsertRequest).add(scriptedUpsert);

AtomicBoolean responseCalled = new AtomicBoolean(false);
AtomicBoolean failureCalled = new AtomicBoolean(false);
assertNull(indexRequest1.getPipeline());
assertNull(indexRequest2.getPipeline());
assertNull(indexRequest3.getPipeline());
action.execute(null, bulkRequest, ActionListener.wrap(
response -> {
BulkItemResponse itemResponse = response.iterator().next();
assertThat(itemResponse.getFailure().getMessage(), containsString("fake exception"));
responseCalled.set(true);
},
e -> {
assertThat(e, sameInstance(exception));
failureCalled.set(true);
}));

// check failure works, and passes through to the listener
assertFalse(action.isExecuted); // haven't executed yet
assertFalse(responseCalled.get());
assertFalse(failureCalled.get());
verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any());
assertEquals(indexRequest1.getPipeline(), "default_pipeline");
assertEquals(indexRequest2.getPipeline(), "default_pipeline");
assertEquals(indexRequest3.getPipeline(), "default_pipeline");
completionHandler.getValue().accept(exception);
assertTrue(failureCalled.get());

// now check success of the transport bulk action
indexRequest1.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
indexRequest3.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
completionHandler.getValue().accept(null);
assertTrue(action.isExecuted);
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
verifyZeroInteractions(transportService);
}

public void testCreateIndexBeforeRunPipeline() throws Exception {
Exception exception = new Exception("fake exception");
IndexRequest indexRequest = new IndexRequest("missing_index", "type", "id");
Expand Down Expand Up @@ -445,6 +497,7 @@ private void validateDefaultPipeline(IndexRequest indexRequest) {
indexRequest.source(Collections.emptyMap());
AtomicBoolean responseCalled = new AtomicBoolean(false);
AtomicBoolean failureCalled = new AtomicBoolean(false);
assertNull(indexRequest.getPipeline());
singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap(
response -> {
responseCalled.set(true);
Expand All @@ -459,6 +512,7 @@ private void validateDefaultPipeline(IndexRequest indexRequest) {
assertFalse(responseCalled.get());
assertFalse(failureCalled.get());
verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any());
assertEquals(indexRequest.getPipeline(), "default_pipeline");
completionHandler.getValue().accept(exception);
assertTrue(failureCalled.get());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.bulk.TransportBulkActionTookTests.Resolver;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -132,4 +134,23 @@ public void testDeleteNonExistingDocExternalGteVersionCreatesIndex() throws Exce
throw new AssertionError(exception);
}));
}

public void testGetIndexWriteRequest() throws Exception {
IndexRequest indexRequest = new IndexRequest("index", "type", "id1").source(Collections.emptyMap());
UpdateRequest upsertRequest = new UpdateRequest("index", "type", "id1").upsert(indexRequest).script(mockScript("1"));
UpdateRequest docAsUpsertRequest = new UpdateRequest("index", "type", "id2").doc(indexRequest).docAsUpsert(true);
UpdateRequest scriptedUpsert = new UpdateRequest("index", "type", "id2").upsert(indexRequest).script(mockScript("1"))
.scriptedUpsert(true);

assertEquals(TransportBulkAction.getIndexWriteRequest(indexRequest), indexRequest);
assertEquals(TransportBulkAction.getIndexWriteRequest(upsertRequest), indexRequest);
assertEquals(TransportBulkAction.getIndexWriteRequest(docAsUpsertRequest), indexRequest);
assertEquals(TransportBulkAction.getIndexWriteRequest(scriptedUpsert), indexRequest);

DeleteRequest deleteRequest = new DeleteRequest("index", "id");
assertNull(TransportBulkAction.getIndexWriteRequest(deleteRequest));

UpdateRequest badUpsertRequest = new UpdateRequest("index", "type", "id1");
assertNull(TransportBulkAction.getIndexWriteRequest(badUpsertRequest));
}
}

0 comments on commit 38d9f3f

Please sign in to comment.