Skip to content

Commit

Permalink
Merge branch 'master' into optimize-warning-head-de-duplication
Browse files Browse the repository at this point in the history
* master:
  Liberalize StreamOutput#writeStringList (elastic#37768)
  Add PersistentTasksClusterService::unassignPersistentTask method (elastic#37576)
  Tests: disable testRandomGeoCollectionQuery on tiny polygons (elastic#37579)
  Use ILM for Watcher history deletion (elastic#37443)
  Make sure PutMappingRequest accepts content types other than JSON. (elastic#37720)
  Retry ILM steps that fail due to SnapshotInProgressException (elastic#37624)
  Use disassociate in preference to deassociate (elastic#37704)
  Delete Redundant RoutingServiceTests (elastic#37750)
  Always return metadata version if metadata is requested (elastic#37674)
  • Loading branch information
jasontedor committed Jan 23, 2019
2 parents 90cdc65 + 169cb38 commit 94b4687
Show file tree
Hide file tree
Showing 112 changed files with 1,227 additions and 334 deletions.
2 changes: 1 addition & 1 deletion docs/reference/ilm/apis/get-lifecycle.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ PUT _ilm/policy/my_policy

[source,js]
--------------------------------------------------
GET _ilm/policy
GET _ilm/policy/my_policy
--------------------------------------------------
// CONSOLE
// TEST[continued]
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/ilm/update-lifecycle-policy.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ PUT _ilm/policy/my_policy
//////////
[source,js]
--------------------------------------------------
GET _ilm/policy
GET _ilm/policy/my_policy
--------------------------------------------------
// CONSOLE
// TEST[continued]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ private void buildResponse(final ClusterStateRequest request,

if (request.metaData()) {
if (request.indices().length > 0) {
mdBuilder.version(currentState.metaData().version());
String[] indices = indexNameExpressionResolver.concreteIndexNames(currentState, request);
for (String filteredIndex : indices) {
IndexMetaData indexMetaData = currentState.metaData().index(filteredIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public static XContentBuilder buildFromSimplifiedDef(String type, Object... sour
* The mapping source definition.
*/
public PutMappingRequest source(XContentBuilder mappingBuilder) {
return source(Strings.toString(mappingBuilder), mappingBuilder.contentType());
return source(BytesReference.bytes(mappingBuilder), mappingBuilder.contentType());
}

/**
Expand All @@ -264,7 +264,7 @@ public PutMappingRequest source(Map<String, ?> mappingSource) {
try {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
builder.map(mappingSource);
return source(Strings.toString(builder), XContentType.JSON);
return source(BytesReference.bytes(builder), builder.contentType());
} catch (IOException e) {
throw new ElasticsearchGenerationException("Failed to generate [" + mappingSource + "]", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ public void readFrom(StreamInput in) throws IOException {
name = in.readString();

if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
indexPatterns = in.readList(StreamInput::readString);
indexPatterns = in.readStringList();
} else {
indexPatterns = Collections.singletonList(in.readString());
}
Expand Down Expand Up @@ -495,7 +495,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(cause);
out.writeString(name);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
out.writeStringList(indexPatterns);
out.writeStringCollection(indexPatterns);
} else {
out.writeString(indexPatterns.size() > 0 ? indexPatterns.get(0) : "");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ protected ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState
.blocks(currentState.blocks())
.removeGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID)).build();
logger.trace("becomeMasterAndTrimConflictingNodes: {}", tmpState.nodes());
tmpState = PersistentTasksCustomMetaData.deassociateDeadNodes(tmpState);
return ClusterState.builder(allocationService.deassociateDeadNodes(tmpState, false, "removed dead nodes on election"));
tmpState = PersistentTasksCustomMetaData.disassociateDeadNodes(tmpState);
return ClusterState.builder(allocationService.disassociateDeadNodes(tmpState, false, "removed dead nodes on election"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ public ClusterTasksResult<Task> execute(final ClusterState currentState, final L

protected ClusterTasksResult<Task> getTaskClusterTasksResult(ClusterState currentState, List<Task> tasks,
ClusterState remainingNodesClusterState) {
ClusterState ptasksDeassociatedState = PersistentTasksCustomMetaData.deassociateDeadNodes(remainingNodesClusterState);
ClusterState ptasksDisassociatedState = PersistentTasksCustomMetaData.disassociateDeadNodes(remainingNodesClusterState);
final ClusterTasksResult.Builder<Task> resultBuilder = ClusterTasksResult.<Task>builder().successes(tasks);
return resultBuilder.build(allocationService.deassociateDeadNodes(ptasksDeassociatedState, true, describeTasks(tasks)));
return resultBuilder.build(allocationService.disassociateDeadNodes(ptasksDisassociatedState, true, describeTasks(tasks)));
}

// visible for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public static IndexTemplateMetaData readFrom(StreamInput in) throws IOException
Builder builder = new Builder(in.readString());
builder.order(in.readInt());
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
builder.patterns(in.readList(StreamInput::readString));
builder.patterns(in.readStringList());
} else {
builder.patterns(Collections.singletonList(in.readString()));
}
Expand Down Expand Up @@ -224,7 +224,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeInt(order);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
out.writeStringList(patterns);
out.writeStringCollection(patterns);
} else {
out.writeString(patterns.get(0));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
* the delay marker). These are shards that have become unassigned due to a node leaving
* and which were assigned the delay marker based on the index delay setting
* {@link UnassignedInfo#INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING}
* (see {@link AllocationService#deassociateDeadNodes(RoutingAllocation)}).
* (see {@link AllocationService#disassociateDeadNodes(RoutingAllocation)}.
* This class is responsible for choosing the next (closest) delay expiration of a
* delayed shard to schedule a reroute to remove the delay marker.
* The actual removal of the delay marker happens in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ public ShardRouting activePrimary(ShardId shardId) {
*
*/
public ShardRouting activeReplicaWithHighestVersion(ShardId shardId) {
// It's possible for replicaNodeVersion to be null, when deassociating dead nodes
// It's possible for replicaNodeVersion to be null, when disassociating dead nodes
// that have been removed, the shards are failed, and part of the shard failing
// calls this method with an out-of-date RoutingNodes, where the version might not
// be accessible. Therefore, we need to protect against the version being null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,6 @@ protected void doClose() {
* Initiates a reroute.
*/
public final void reroute(String reason) {
performReroute(reason);
}

// visible for testing
protected void performReroute(String reason) {
try {
if (lifecycle.stopped()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,15 +216,15 @@ public ClusterState applyFailedShards(final ClusterState clusterState, final Lis
* unassigned an shards that are associated with nodes that are no longer part of the cluster, potentially promoting replicas
* if needed.
*/
public ClusterState deassociateDeadNodes(ClusterState clusterState, boolean reroute, String reason) {
public ClusterState disassociateDeadNodes(ClusterState clusterState, boolean reroute, String reason) {
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
routingNodes.unassigned().shuffle();
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState,
clusterInfoService.getClusterInfo(), currentNanoTime());

// first, clear from the shards any node id they used to belong to that is now dead
deassociateDeadNodes(allocation);
disassociateDeadNodes(allocation);

if (allocation.routingNodesChanged()) {
clusterState = buildResult(clusterState, allocation);
Expand Down Expand Up @@ -400,7 +400,7 @@ private boolean hasDeadNodes(RoutingAllocation allocation) {
}

private void reroute(RoutingAllocation allocation) {
assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See deassociateDeadNodes";
assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See disassociateDeadNodes";
assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metaData(), allocation.nodes()).isEmpty() :
"auto-expand replicas out of sync with number of nodes in the cluster";

Expand All @@ -414,7 +414,7 @@ private void reroute(RoutingAllocation allocation) {
assert RoutingNodes.assertShardStats(allocation.routingNodes());
}

private void deassociateDeadNodes(RoutingAllocation allocation) {
private void disassociateDeadNodes(RoutingAllocation allocation) {
for (Iterator<RoutingNode> it = allocation.routingNodes().mutableIterator(); it.hasNext(); ) {
RoutingNode node = it.next();
if (allocation.nodes().getDataNodes().containsKey(node.nodeId())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -946,12 +946,26 @@ public <T extends Streamable> List<T> readStreamableList(Supplier<T> constructor
}

/**
* Reads a list of objects
* Reads a list of objects. The list is expected to have been written using {@link StreamOutput#writeList(List)} or
* {@link StreamOutput#writeStreamableList(List)}.
*
* @return the list of objects
* @throws IOException if an I/O exception occurs reading the list
*/
public <T> List<T> readList(Writeable.Reader<T> reader) throws IOException {
public <T> List<T> readList(final Writeable.Reader<T> reader) throws IOException {
return readCollection(reader, ArrayList::new);
}

/**
* Reads a list of strings. The list is expected to have been written using {@link StreamOutput#writeStringCollection(Collection)}.
*
* @return the list of strings
* @throws IOException if an I/O exception occurs reading the list
*/
public List<String> readStringList() throws IOException {
return readList(StreamInput::readString);
}

/**
* Reads a set of objects
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1048,27 +1048,27 @@ public void writeList(List<? extends Writeable> list) throws IOException {
}

/**
* Writes a collection of generic objects via a {@link Writer}
* Writes a collection of objects via a {@link Writer}.
*
* @param collection the collection of objects
* @throws IOException if an I/O exception occurs writing the collection
*/
public <T> void writeCollection(Collection<T> collection, Writer<T> writer) throws IOException {
public <T> void writeCollection(final Collection<T> collection, final Writer<T> writer) throws IOException {
writeVInt(collection.size());
for (T val: collection) {
for (final T val: collection) {
writer.write(this, val);
}
}

public void writeStringCollection(final Collection<String> list) throws IOException {
writeVInt(list.size());
for (String string: list) {
this.writeString(string);
}
}

/**
* Writes a list of strings
* Writes a collection of a strings. The corresponding collection can be read from a stream input using
* {@link StreamInput#readList(Writeable.Reader)}.
*
* @param collection the collection of strings
* @throws IOException if an I/O exception occurs writing the collection
*/
public void writeStringList(List<String> list) throws IOException {
writeStringCollection(list);
public void writeStringCollection(final Collection<String> collection) throws IOException {
writeCollection(collection, StreamOutput::writeString);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ public final class RecoveryResponse extends TransportResponse {

RecoveryResponse(StreamInput in) throws IOException {
super(in);
phase1FileNames = in.readList(StreamInput::readString);
phase1FileNames = in.readStringList();
phase1FileSizes = in.readList(StreamInput::readVLong);
phase1ExistingFileNames = in.readList(StreamInput::readString);
phase1ExistingFileNames = in.readStringList();
phase1ExistingFileSizes = in.readList(StreamInput::readVLong);
phase1TotalSize = in.readVLong();
phase1ExistingTotalSize = in.readVLong();
Expand All @@ -76,9 +76,9 @@ public final class RecoveryResponse extends TransportResponse {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringList(phase1FileNames);
out.writeStringCollection(phase1FileNames);
out.writeCollection(phase1FileSizes, StreamOutput::writeVLong);
out.writeStringList(phase1ExistingFileNames);
out.writeStringCollection(phase1ExistingFileNames);
out.writeCollection(phase1ExistingFileSizes, StreamOutput::writeVLong);
out.writeVLong(phase1TotalSize);
out.writeVLong(phase1ExistingTotalSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,45 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
});
}

/**
* This unassigns a task from any node, i.e. it is assigned to a {@code null} node with the provided reason.
*
* Since the assignment executor node is null, the {@link PersistentTasksClusterService} will attempt to reassign it to a valid
* node quickly.
*
* @param taskId the id of a persistent task
* @param taskAllocationId the expected allocation id of the persistent task
* @param reason the reason for unassigning the task from any node
* @param listener the listener that will be called when task is unassigned
*/
public void unassignPersistentTask(final String taskId,
final long taskAllocationId,
final String reason,
final ActionListener<PersistentTask<?>> listener) {
clusterService.submitStateUpdateTask("unassign persistent task from any node", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState);
if (tasksInProgress.hasTask(taskId, taskAllocationId)) {
logger.trace("Unassigning task {} with allocation id {}", taskId, taskAllocationId);
return update(currentState, tasksInProgress.reassignTask(taskId, unassignedAssignment(reason)));
} else {
throw new ResourceNotFoundException("the task with id {} and allocation id {} doesn't exist", taskId, taskAllocationId);
}
}

@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(newState, taskId));
}
});
}

/**
* Creates a new {@link Assignment} for the given persistent task.
*
Expand All @@ -263,7 +302,7 @@ private <Params extends PersistentTaskParams> Assignment createAssignment(final

AssignmentDecision decision = decider.canAssign();
if (decision.getType() == AssignmentDecision.Type.NO) {
return new Assignment(null, "persistent task [" + taskName + "] cannot be assigned [" + decision.getReason() + "]");
return unassignedAssignment("persistent task [" + taskName + "] cannot be assigned [" + decision.getReason() + "]");
}

return persistentTasksExecutor.getAssignment(taskParams, currentState);
Expand Down Expand Up @@ -404,6 +443,10 @@ private static ClusterState update(ClusterState currentState, PersistentTasksCus
}
}

private static Assignment unassignedAssignment(String reason) {
return new Assignment(null, reason);
}

/**
* Class to periodically try to reassign unassigned persistent tasks.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public static <Params extends PersistentTaskParams> PersistentTask<Params> getTa
* @return If no changes the argument {@code clusterState} is returned else
* a copy with the modified tasks
*/
public static ClusterState deassociateDeadNodes(ClusterState clusterState) {
public static ClusterState disassociateDeadNodes(ClusterState clusterState) {
PersistentTasksCustomMetaData tasks = getPersistentTasksCustomMetaData(clusterState);
if (tasks == null) {
return clusterState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public PluginInfo(final StreamInput in) throws IOException {
}
this.classname = in.readString();
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
extendedPlugins = in.readList(StreamInput::readString);
extendedPlugins = in.readStringList();
} else {
extendedPlugins = Collections.emptyList();
}
Expand All @@ -128,7 +128,7 @@ public void writeTo(final StreamOutput out) throws IOException {
}
out.writeString(classname);
if (out.getVersion().onOrAfter(Version.V_6_2_0)) {
out.writeStringList(extendedPlugins);
out.writeStringCollection(extendedPlugins);
}
out.writeBoolean(hasNativeController);
if (out.getVersion().onOrAfter(Version.V_6_0_0_beta2) && out.getVersion().before(Version.V_6_3_0)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class InternalComposite
public InternalComposite(StreamInput in) throws IOException {
super(in);
this.size = in.readVInt();
this.sourceNames = in.readList(StreamInput::readString);
this.sourceNames = in.readStringList();
this.formats = new ArrayList<>(sourceNames.size());
for (int i = 0; i < sourceNames.size(); i++) {
if (in.getVersion().onOrAfter(Version.V_6_3_0)) {
Expand All @@ -90,7 +90,7 @@ public InternalComposite(StreamInput in) throws IOException {
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeVInt(size);
out.writeStringList(sourceNames);
out.writeStringCollection(sourceNames);
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
for (DocValueFormat format : formats) {
out.writeNamedWriteable(format);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public SearchSourceBuilder(StreamInput in) throws IOException {
}
}
if (in.readBoolean()) {
stats = in.readList(StreamInput::readString);
stats = in.readStringList();
}
suggestBuilder = in.readOptionalWriteable(SuggestBuilder::new);
terminateAfter = in.readVInt();
Expand Down Expand Up @@ -311,7 +311,7 @@ public void writeTo(StreamOutput out) throws IOException {
boolean hasStats = stats != null;
out.writeBoolean(hasStats);
if (hasStats) {
out.writeStringList(stats);
out.writeStringCollection(stats);
}
out.writeOptionalWriteable(suggestBuilder);
out.writeVInt(terminateAfter);
Expand Down
Loading

0 comments on commit 94b4687

Please sign in to comment.