Skip to content

Commit

Permalink
Merge remote-tracking branch 'elastic/master' into os-pretty-name
Browse files Browse the repository at this point in the history
* elastic/master: (25 commits)
  Fixes fast vector highlighter docs per issue 24318. (elastic#34190)
  [ML] Prevent notifications on deletion of a non existent job (elastic#35337)
  [CCR] Auto follow Coordinator fetch cluster state in system context (elastic#35120)
  Mute test for elastic#35361
  Preserve `date_histogram` format when aggregating on unmapped fields (elastic#35254)
  Test: Mute failing SSL test
  Allow unmapped fields in composite aggregations (elastic#35331)
  [RCI] Add IndexShardOperationPermits.asyncBlockOperations(ActionListener<Releasable>) (elastic#34902)
  HLRC: reindex API with wait_for_completion false (elastic#35202)
  Add docs on JNA temp directory not being noexec (elastic#35355)
  [CCR] Adjust list of dynamic index settings that should be replicated (elastic#35195)
  Replicate index settings to followers (elastic#35089)
  Rename RealmConfig.globalSettings() to settings() (elastic#35330)
  [TEST] Cleanup FileUserPasswdStoreTests (elastic#35329)
  Scripting: Add back lookup vars in score script (elastic#34833)
  watcher: Fix integration tests to ensure correct start/stop of Watcher (elastic#35271)
  Remove ALL shard check in CheckShrinkReadyStep (elastic#35346)
  Use soft-deleted docs to resolve strategy for engine operation (elastic#35230)
  [ILM] Check shard and relocation status in AllocationRoutedStep (elastic#35316)
  Ignore date ranges containing 'now' when pre-processing a percolator query (elastic#35160)
  ...
  • Loading branch information
jasontedor committed Nov 8, 2018
2 parents 9001de0 + d00b23c commit 12c1ecb
Show file tree
Hide file tree
Showing 108 changed files with 3,823 additions and 632 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -486,9 +486,18 @@ static Request rankEval(RankEvalRequest rankEvalRequest) throws IOException {
}

static Request reindex(ReindexRequest reindexRequest) throws IOException {
return prepareReindexRequest(reindexRequest, true);
}

static Request submitReindex(ReindexRequest reindexRequest) throws IOException {
return prepareReindexRequest(reindexRequest, false);
}

private static Request prepareReindexRequest(ReindexRequest reindexRequest, boolean waitForCompletion) throws IOException {
String endpoint = new EndpointBuilder().addPathPart("_reindex").build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
Params params = new Params(request)
.withWaitForCompletion(waitForCompletion)
.withRefresh(reindexRequest.isRefresh())
.withTimeout(reindexRequest.getTimeout())
.withWaitForActiveShards(reindexRequest.getWaitForActiveShards())
Expand Down Expand Up @@ -897,11 +906,8 @@ Params withDetailed(boolean detailed) {
return this;
}

Params withWaitForCompletion(boolean waitForCompletion) {
if (waitForCompletion) {
return putParam("wait_for_completion", Boolean.TRUE.toString());
}
return this;
Params withWaitForCompletion(Boolean waitForCompletion) {
return putParam("wait_for_completion", waitForCompletion.toString());
}

Params withNodes(String[] nodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.client.core.TermVectorsResponse;
import org.elasticsearch.client.core.TermVectorsRequest;
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.ParseField;
Expand Down Expand Up @@ -476,6 +477,20 @@ public final BulkByScrollResponse reindex(ReindexRequest reindexRequest, Request
);
}

/**
* Submits a reindex task.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html">Reindex API on elastic.co</a>
* @param reindexRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the submission response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public final TaskSubmissionResponse submitReindexTask(ReindexRequest reindexRequest, RequestOptions options) throws IOException {
return performRequestAndParseEntity(
reindexRequest, RequestConverters::submitReindex, options, TaskSubmissionResponse::fromXContent, emptySet()
);
}

/**
* Asynchronously executes a reindex request.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html">Reindex API on elastic.co</a>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.client.tasks;

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Objects;

public class TaskSubmissionResponse extends ActionResponse {

private static final ParseField TASK = new ParseField("task");

public static final ConstructingObjectParser<TaskSubmissionResponse, Void> PARSER = new ConstructingObjectParser<>(
"task_submission_response",
true, a -> new TaskSubmissionResponse((String) a[0]));

static {
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), TASK);
}

private final String task;

TaskSubmissionResponse(String task) {
this.task = task;
}

/**
* Get the task id
*
* @return the id of the reindex task.
*/
public String getTask() {
return task;
}

@Override
public int hashCode() {
return Objects.hash(task);
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
TaskSubmissionResponse that = (TaskSubmissionResponse) other;
return Objects.equals(task, that.task);
}

public static TaskSubmissionResponse fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.UpdateByQueryAction;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.rest.RestStatus;
Expand Down Expand Up @@ -706,111 +704,6 @@ public void testBulk() throws IOException {
validateBulkResponses(nbItems, errors, bulkResponse, bulkRequest);
}

public void testReindex() throws Exception {
final String sourceIndex = "source1";
final String destinationIndex = "dest";
{
// Prepare
Settings settings = Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
.build();
createIndex(sourceIndex, settings);
createIndex(destinationIndex, settings);
BulkRequest bulkRequest = new BulkRequest()
.add(new IndexRequest(sourceIndex, "type", "1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
.add(new IndexRequest(sourceIndex, "type", "2").source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON))
.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
assertEquals(
RestStatus.OK,
highLevelClient().bulk(
bulkRequest,
RequestOptions.DEFAULT
).status()
);
}
{
// test1: create one doc in dest
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(sourceIndex);
reindexRequest.setDestIndex(destinationIndex);
reindexRequest.setSourceQuery(new IdsQueryBuilder().addIds("1").types("type"));
reindexRequest.setRefresh(true);
BulkByScrollResponse bulkResponse = execute(reindexRequest, highLevelClient()::reindex, highLevelClient()::reindexAsync);
assertEquals(1, bulkResponse.getCreated());
assertEquals(1, bulkResponse.getTotal());
assertEquals(0, bulkResponse.getDeleted());
assertEquals(0, bulkResponse.getNoops());
assertEquals(0, bulkResponse.getVersionConflicts());
assertEquals(1, bulkResponse.getBatches());
assertTrue(bulkResponse.getTook().getMillis() > 0);
assertEquals(1, bulkResponse.getBatches());
assertEquals(0, bulkResponse.getBulkFailures().size());
assertEquals(0, bulkResponse.getSearchFailures().size());
}
{
// test2: create 1 and update 1
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(sourceIndex);
reindexRequest.setDestIndex(destinationIndex);
BulkByScrollResponse bulkResponse = execute(reindexRequest, highLevelClient()::reindex, highLevelClient()::reindexAsync);
assertEquals(1, bulkResponse.getCreated());
assertEquals(2, bulkResponse.getTotal());
assertEquals(1, bulkResponse.getUpdated());
assertEquals(0, bulkResponse.getDeleted());
assertEquals(0, bulkResponse.getNoops());
assertEquals(0, bulkResponse.getVersionConflicts());
assertEquals(1, bulkResponse.getBatches());
assertTrue(bulkResponse.getTook().getMillis() > 0);
assertEquals(1, bulkResponse.getBatches());
assertEquals(0, bulkResponse.getBulkFailures().size());
assertEquals(0, bulkResponse.getSearchFailures().size());
}
{
// test reindex rethrottling
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(sourceIndex);
reindexRequest.setDestIndex(destinationIndex);

// this following settings are supposed to halt reindexing after first document
reindexRequest.setSourceBatchSize(1);
reindexRequest.setRequestsPerSecond(0.00001f);
final CountDownLatch reindexTaskFinished = new CountDownLatch(1);
highLevelClient().reindexAsync(reindexRequest, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {

@Override
public void onResponse(BulkByScrollResponse response) {
reindexTaskFinished.countDown();
}

@Override
public void onFailure(Exception e) {
fail(e.toString());
}
});

TaskId taskIdToRethrottle = findTaskToRethrottle(ReindexAction.NAME);
float requestsPerSecond = 1000f;
ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
highLevelClient()::reindexRethrottle, highLevelClient()::reindexRethrottleAsync);
assertThat(response.getTasks(), hasSize(1));
assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId());
assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class));
assertEquals(Float.toString(requestsPerSecond),
((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString());
reindexTaskFinished.await(2, TimeUnit.SECONDS);

// any rethrottling after the reindex is done performed with the same taskId should result in a failure
response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
highLevelClient()::reindexRethrottle, highLevelClient()::reindexRethrottleAsync);
assertTrue(response.getTasks().isEmpty());
assertFalse(response.getNodeFailures().isEmpty());
assertEquals(1, response.getNodeFailures().size());
assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]",
response.getNodeFailures().get(0).getCause().getMessage());
}
}

private TaskId findTaskToRethrottle(String actionName) throws IOException {
long start = System.nanoTime();
ListTasksRequest request = new ListTasksRequest();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.client;

import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;
import java.util.Collections;
import java.util.function.BooleanSupplier;

public class ReindexIT extends ESRestHighLevelClientTestCase {

public void testReindex() throws IOException {
final String sourceIndex = "source1";
final String destinationIndex = "dest";
{
// Prepare
Settings settings = Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
.build();
createIndex(sourceIndex, settings);
createIndex(destinationIndex, settings);
BulkRequest bulkRequest = new BulkRequest()
.add(new IndexRequest(sourceIndex, "type", "1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
.add(new IndexRequest(sourceIndex, "type", "2").source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
assertEquals(
RestStatus.OK,
highLevelClient().bulk(
bulkRequest,
RequestOptions.DEFAULT
).status()
);
}
{
// reindex one document with id 1 from source to destination
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(sourceIndex);
reindexRequest.setDestIndex(destinationIndex);
reindexRequest.setSourceQuery(new IdsQueryBuilder().addIds("1").types("type"));
reindexRequest.setRefresh(true);

BulkByScrollResponse bulkResponse = execute(reindexRequest, highLevelClient()::reindex, highLevelClient()::reindexAsync);

assertEquals(1, bulkResponse.getCreated());
assertEquals(1, bulkResponse.getTotal());
assertEquals(0, bulkResponse.getDeleted());
assertEquals(0, bulkResponse.getNoops());
assertEquals(0, bulkResponse.getVersionConflicts());
assertEquals(1, bulkResponse.getBatches());
assertTrue(bulkResponse.getTook().getMillis() > 0);
assertEquals(1, bulkResponse.getBatches());
assertEquals(0, bulkResponse.getBulkFailures().size());
assertEquals(0, bulkResponse.getSearchFailures().size());
}
}

public void testReindexTask() throws IOException, InterruptedException {
final String sourceIndex = "source123";
final String destinationIndex = "dest2";
{
// Prepare
Settings settings = Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
.build();
createIndex(sourceIndex, settings);
createIndex(destinationIndex, settings);
BulkRequest bulkRequest = new BulkRequest()
.add(new IndexRequest(sourceIndex, "type", "1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
.add(new IndexRequest(sourceIndex, "type", "2").source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
assertEquals(
RestStatus.OK,
highLevelClient().bulk(
bulkRequest,
RequestOptions.DEFAULT
).status()
);
}
{
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(sourceIndex);
reindexRequest.setDestIndex(destinationIndex);
reindexRequest.setSourceQuery(new IdsQueryBuilder().addIds("1").types("type"));
reindexRequest.setRefresh(true);

TaskSubmissionResponse reindexSubmission = highLevelClient().submitReindexTask(reindexRequest, RequestOptions.DEFAULT);

BooleanSupplier hasUpgradeCompleted = checkCompletionStatus(reindexSubmission.getTask());
awaitBusy(hasUpgradeCompleted);
}
}

private BooleanSupplier checkCompletionStatus(String taskId) {
return () -> {
try {
Response response = client().performRequest(new Request("GET", "/_tasks/" + taskId));
return (boolean) entityAsMap(response).get("completed");
} catch (IOException e) {
fail(e.getMessage());
return false;
}
};
}
}
Loading

0 comments on commit 12c1ecb

Please sign in to comment.